From 0564aed9d77e7fbbf757b6fe83db9fc3159c6b4e Mon Sep 17 00:00:00 2001 From: Pavlo Strokov Date: Fri, 11 Feb 2022 17:13:14 +0200 Subject: [PATCH 01/12] migrations: Flatten replication job JSON We are gonna to add foreign key constraints to the replication_queue table to auto-remove any rows that become orphan (no repository exists). Because repository_id is a field inside the job column of type JSONB we can't create a foreign key on it. Those we need to create a column for that field. Because we already know the full stable set of the columns we need for the replication job we extract all fields from the job column to be just simple columns of the table. It is expected to have a non-null values for repository_id, replica_path and source_node_storage columns for the existing rows as we don't handle NULLs in go code. Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/3102 --- .../20220207143330_flatten_queue_job.go | 73 ++++++++++++ internal/praefect/datastore/queue_test.go | 106 ++++++++++++++++++ 2 files changed, 179 insertions(+) create mode 100644 internal/praefect/datastore/migrations/20220207143330_flatten_queue_job.go diff --git a/internal/praefect/datastore/migrations/20220207143330_flatten_queue_job.go b/internal/praefect/datastore/migrations/20220207143330_flatten_queue_job.go new file mode 100644 index 00000000000..57a2e568850 --- /dev/null +++ b/internal/praefect/datastore/migrations/20220207143330_flatten_queue_job.go @@ -0,0 +1,73 @@ +package migrations + +import migrate "github.com/rubenv/sql-migrate" + +func init() { + m := &migrate.Migration{ + Id: "20220207143330_flatten_queue_job", + Up: []string{ + `CREATE TYPE REPLICATION_JOB_TYPE AS ENUM('update','create','delete','delete_replica','rename','gc','repack_full','repack_incremental','cleanup','pack_refs','write_commit_graph','midx_repack','optimize_repository', 'prune_unreachable_objects')`, + + // The repository_id, replica_path and source_node_storage columns could have + // NULL values for an existing events, but because a non-pointer receivers + // are used to scan them in go code we should prevent situation when NULL is + // returned from the database as scan will fail in that case. + `ALTER TABLE replication_queue + ADD COLUMN change REPLICATION_JOB_TYPE, + ADD COLUMN repository_id BIGINT NOT NULL DEFAULT 0, + ADD COLUMN replica_path TEXT NOT NULL DEFAULT '', + ADD COLUMN relative_path TEXT, + ADD COLUMN target_node_storage TEXT, + ADD COLUMN source_node_storage TEXT NOT NULL DEFAULT '', + ADD COLUMN virtual_storage TEXT, + ADD COLUMN params JSONB`, + + `-- +migrate StatementBegin + CREATE OR REPLACE FUNCTION replication_queue_flatten_job() RETURNS TRIGGER AS $$ + BEGIN + NEW.change := (NEW.job->>'change')::REPLICATION_JOB_TYPE; + NEW.repository_id := COALESCE( + -- For the old jobs the repository_id field may be 'null' or not set at all. + -- repository_id field could have 0 if event was created by reconciler before + -- repositories table was populated with valid repository_id. + CASE WHEN (NEW.job->>'repository_id')::BIGINT = 0 THEN NULL ELSE (NEW.job->>'repository_id')::BIGINT END, + (SELECT repositories.repository_id FROM repositories WHERE repositories.virtual_storage = NEW.job->>'virtual_storage' AND repositories.relative_path = NEW.job->>'relative_path'), + 0 + ); + -- The reconciler doesn't populate replica_path field that is why we need make sure + -- we have at least an empty value for the column, not to break scan operations. + NEW.replica_path := COALESCE(NEW.job->>'replica_path', ''); + NEW.relative_path := NEW.job->>'relative_path'; + NEW.target_node_storage := NEW.job->>'target_node_storage'; + NEW.source_node_storage := COALESCE(NEW.job->>'source_node_storage', ''); + NEW.virtual_storage := NEW.job->>'virtual_storage'; + NEW.params := (NEW.job->>'params')::JSONB; + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + -- +migrate StatementEnd`, + + `CREATE TRIGGER replication_queue_flatten_job BEFORE INSERT ON replication_queue + FOR EACH ROW EXECUTE PROCEDURE replication_queue_flatten_job()`, + }, + Down: []string{ + `DROP TRIGGER replication_queue_flatten_job ON replication_queue`, + + `DROP FUNCTION replication_queue_flatten_job`, + + `ALTER TABLE replication_queue + DROP COLUMN change, + DROP COLUMN repository_id, + DROP COLUMN replica_path, + DROP COLUMN relative_path, + DROP COLUMN target_node_storage, + DROP COLUMN source_node_storage, + DROP COLUMN virtual_storage, + DROP COLUMN params`, + + `DROP TYPE REPLICATION_JOB_TYPE`, + }, + } + + allMigrations = append(allMigrations, m) +} diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go index 03cb27bf4cb..08884c3ba29 100644 --- a/internal/praefect/datastore/queue_test.go +++ b/internal/praefect/datastore/queue_test.go @@ -183,6 +183,112 @@ func TestPostgresReplicationEventQueue_Enqueue(t *testing.T) { db.RequireRowsInTable(t, "replication_queue_job_lock", 0) } +func TestPostgresReplicationEventQueue_Enqueue_triggerPopulatesColumns(t *testing.T) { + t.Parallel() + db := testdb.New(t) + ctx := testhelper.Context(t) + + type action func(t *testing.T, tx *testdb.TxWrapper, event ReplicationEvent) ReplicationEvent + + for _, tc := range []struct { + desc string + job ReplicationJob + beforeEnqueue action + afterEnqueue action + }{ + { + desc: "repository id embedded into the job", + job: ReplicationJob{ + Change: UpdateRepo, + RepositoryID: 1, + RelativePath: "/project/path-1", + ReplicaPath: "relative/project/path-1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + VirtualStorage: "praefect", + Params: nil, + }, + }, + { + desc: "repository id extracted from repositories table", + job: ReplicationJob{ + Change: RenameRepo, + RelativePath: "/project/path-1", + ReplicaPath: "relative/project/path-1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + VirtualStorage: "praefect", + Params: Params{"RelativePath": "new/path"}, + }, + beforeEnqueue: func(t *testing.T, tx *testdb.TxWrapper, event ReplicationEvent) ReplicationEvent { + const query = ` + INSERT INTO repositories(virtual_storage, relative_path, generation, "primary", replica_path) + VALUES ($1, $2, $3, $4, $5) + RETURNING repository_id` + var repositoryID int64 + err := tx.QueryRowContext(ctx, query, event.Job.VirtualStorage, event.Job.RelativePath, 1, event.Job.SourceNodeStorage, event.Job.ReplicaPath). + Scan(&repositoryID) + require.NoError(t, err, "create repository record to have it as a source of repository id") + return event + }, + afterEnqueue: func(t *testing.T, tx *testdb.TxWrapper, event ReplicationEvent) ReplicationEvent { + const query = `SELECT repository_id FROM repositories WHERE virtual_storage = $1 AND relative_path = $2` + err := tx.QueryRowContext(ctx, query, event.Job.VirtualStorage, event.Job.RelativePath). + Scan(&event.Job.RepositoryID) + require.NoError(t, err, "create repository record to have it as a source of repository id") + return event + }, + }, + { + desc: "repository id doesn't exist", + job: ReplicationJob{ + Change: RenameRepo, + RelativePath: "/project/path-1", + ReplicaPath: "relative/project/path-1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + VirtualStorage: "praefect", + Params: Params{"RelativePath": "new/path"}, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + tx := db.Begin(t) + defer tx.Rollback(t) + + queue := NewPostgresReplicationEventQueue(tx) + event := ReplicationEvent{Job: tc.job} + if tc.beforeEnqueue != nil { + event = tc.beforeEnqueue(t, tx, event) + } + enqueued, err := queue.Enqueue(ctx, event) + require.NoError(t, err) + + if tc.afterEnqueue != nil { + enqueued = tc.afterEnqueue(t, tx, enqueued) + } + + job := extractReplicationJob(t, ctx, tx, enqueued.ID) + require.Equal(t, enqueued.Job, job) + }) + } +} + +func extractReplicationJob(t *testing.T, ctx context.Context, tx *testdb.TxWrapper, id uint64) ReplicationJob { + t.Helper() + const selectJob = ` + SELECT + change, repository_id, replica_path, relative_path, target_node_storage, + source_node_storage, virtual_storage, params + FROM replication_queue WHERE id = $1` + var job ReplicationJob + require.NoError(t, tx.QueryRowContext(ctx, selectJob, id).Scan( + &job.Change, &job.RepositoryID, &job.ReplicaPath, &job.RelativePath, &job.TargetNodeStorage, + &job.SourceNodeStorage, &job.VirtualStorage, &job.Params, + )) + return job +} + func TestPostgresReplicationEventQueue_DeleteReplicaInfiniteAttempts(t *testing.T) { t.Parallel() queue := NewPostgresReplicationEventQueue(testdb.New(t)) -- GitLab From 57b3f3a83d55e655ca59a7e677f506ebf41c654b Mon Sep 17 00:00:00 2001 From: Pavlo Strokov Date: Tue, 15 Feb 2022 22:05:43 +0200 Subject: [PATCH 02/12] glsql: Check for foreign key database errors The helper function IsForeignKeyViolation allows to check if error returned from database is a foreign key constraint error. --- internal/praefect/datastore/glsql/postgres.go | 7 +++ .../praefect/datastore/glsql/postgres_test.go | 46 +++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/internal/praefect/datastore/glsql/postgres.go b/internal/praefect/datastore/glsql/postgres.go index 24c4093d630..381ba256238 100644 --- a/internal/praefect/datastore/glsql/postgres.go +++ b/internal/praefect/datastore/glsql/postgres.go @@ -299,6 +299,13 @@ func IsUniqueViolation(err error, constraint string) bool { return isPgError(err, "23505", []errorCondition{withConstraintName(constraint)}) } +// IsForeignKeyViolation returns true if an error is a foreign key violation. +func IsForeignKeyViolation(err error, constraint string) bool { + // https://www.postgresql.org/docs/11/errcodes-appendix.html + // foreign_key_violation + return isPgError(err, "23503", []errorCondition{withConstraintName(constraint)}) +} + func isPgError(err error, code string, conditions []errorCondition) bool { var pgErr *pgconn.PgError if errors.As(err, &pgErr) && pgErr.Code == code { diff --git a/internal/praefect/datastore/glsql/postgres_test.go b/internal/praefect/datastore/glsql/postgres_test.go index 039145a5fd3..8a1b49ffbfc 100644 --- a/internal/praefect/datastore/glsql/postgres_test.go +++ b/internal/praefect/datastore/glsql/postgres_test.go @@ -324,6 +324,52 @@ func TestIsUniqueViolation(t *testing.T) { } } +func TestIsForeignKeyViolation(t *testing.T) { + for _, tc := range []struct { + desc string + err error + constr string + exp bool + }{ + { + desc: "nil input", + err: nil, + exp: false, + }, + { + desc: "wrong error type", + err: assert.AnError, + exp: false, + }, + { + desc: "wrong code", + err: &pgconn.PgError{Code: "stub"}, + exp: false, + }, + { + desc: "unique violation", + err: &pgconn.PgError{Code: "23503"}, + exp: true, + }, + { + desc: "wrapped unique violation", + err: fmt.Errorf("stub: %w", &pgconn.PgError{Code: "23503"}), + exp: true, + }, + { + desc: "unique violation with accepted conditions", + err: &pgconn.PgError{Code: "23503", ConstraintName: "cname"}, + constr: "cname", + exp: true, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + res := glsql.IsForeignKeyViolation(tc.err, tc.constr) + require.Equal(t, tc.exp, res) + }) + } +} + func TestMigrateSome(t *testing.T) { db := testdb.New(t) dbCfg := testdb.GetConfig(t, db.Name) -- GitLab From a08910de6b5dbc8cfd2e3f559288a60cee0b2a84 Mon Sep 17 00:00:00 2001 From: Pavlo Strokov Date: Tue, 15 Feb 2022 22:18:49 +0200 Subject: [PATCH 03/12] migrations: Migrate an existing events and add constraint The set of newly added columns in the previous commit remains empty or contains default values. We do update for the replication_queue table where we populate colums with the values extracted from the 'job' column corresponding to the JSON field names. If some old replication events have no replication_id field set we try to extract it from the repositories table. We consider all replication events with unset replication_id (0 value) as too old and remove them otherwise we won't be able to create a foreign key constraint. The foreign key between replication_queue and repositories tables is used to clean up replication events once the repository is removed. To make replication_queue_job_lock table more self-managed we re-create replication_queue_job_lock_job_id_fkey and replication_queue_job_lock_lock_id_fkey with cascade removal. It will give us a confidence the rows are removed from the table in case corresponding rows are removed from the replication_queue or replication_queue_lock tables. If there were in_progress events the rows for those will be removed from the replication_queue and replication_queue_job_lock tables, but the rows in the replication_queue_lock will remain with 'acquired' column set to 'true'. Because the repositories not exist anymore we remove rows from the replication_queue_lock as they won't be used anymore. And to prevent existence of the unused rows in the replication_queue_lock table we create a trigger on the repositories table to delete rows from replication_queue_lock table after repository row removal. Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/3974 --- internal/praefect/datastore/collector_test.go | 18 +- .../20220211161203_json_job_to_columns.go | 140 +++++++++ internal/praefect/datastore/queue_test.go | 266 ++++++++++++------ 3 files changed, 323 insertions(+), 101 deletions(-) create mode 100644 internal/praefect/datastore/migrations/20220211161203_json_job_to_columns.go diff --git a/internal/praefect/datastore/collector_test.go b/internal/praefect/datastore/collector_test.go index 3091c83219b..d59f2112fdd 100644 --- a/internal/praefect/datastore/collector_test.go +++ b/internal/praefect/datastore/collector_test.go @@ -269,15 +269,17 @@ func TestRepositoryStoreCollector_ReplicationQueueDepth(t *testing.T) { readyJobs := 5 for virtualStorage, nodes := range storageNames { for i := 0; i < readyJobs; i++ { + job := ReplicationJob{ + Change: UpdateRepo, + RelativePath: "/project/path-1", + TargetNodeStorage: nodes[1], + SourceNodeStorage: nodes[0], + VirtualStorage: virtualStorage, + Params: nil, + } + insertRepository(t, db, ctx, job.VirtualStorage, job.RelativePath, job.SourceNodeStorage) _, err := queue.Enqueue(ctx, ReplicationEvent{ - Job: ReplicationJob{ - Change: UpdateRepo, - RelativePath: "/project/path-1", - TargetNodeStorage: nodes[1], - SourceNodeStorage: nodes[0], - VirtualStorage: virtualStorage, - Params: nil, - }, + Job: job, }) require.NoError(t, err) } diff --git a/internal/praefect/datastore/migrations/20220211161203_json_job_to_columns.go b/internal/praefect/datastore/migrations/20220211161203_json_job_to_columns.go new file mode 100644 index 00000000000..0336ae05813 --- /dev/null +++ b/internal/praefect/datastore/migrations/20220211161203_json_job_to_columns.go @@ -0,0 +1,140 @@ +package migrations + +import migrate "github.com/rubenv/sql-migrate" + +func init() { + m := &migrate.Migration{ + Id: "20220211161203_json_job_to_columns", + Up: []string{ + // 1. The replication_queue was extended with set of the new columns to cover + // job JSON struct. This query update all existing rows to fulfil newly + // added columns with data from the job column. + `UPDATE replication_queue SET + change = (queue.job->>'change')::REPLICATION_JOB_TYPE, + repository_id = COALESCE( + -- For the old jobs the repository_id may be 'null' as we didn't have migration that + -- populates those with the value from the repositories table. + -- The repository_id also may be a 0 in case the job was created before repositories table was fulfilled + -- and event was created by reconciler. + CASE WHEN (queue.job->>'repository_id')::BIGINT = 0 THEN NULL ELSE (queue.job->>'repository_id')::BIGINT END, + (SELECT repositories.repository_id FROM repositories WHERE repositories.virtual_storage = queue.job->>'virtual_storage' AND repositories.relative_path = queue.job->>'relative_path'), + -- If repository_id doesn't exist we still need to fill this column otherwise it may fail on scan into struct. + 0), + replica_path = COALESCE(queue.job->>'replica_path', ''), + relative_path = queue.job->>'relative_path', + target_node_storage = queue.job->>'target_node_storage', + source_node_storage = COALESCE(queue.job->>'source_node_storage', ''), + virtual_storage = queue.job->>'virtual_storage', + params = (queue.job->>'params')::JSONB + FROM replication_queue AS queue + LEFT JOIN repositories ON + queue.job->>'virtual_storage' = repositories.virtual_storage AND + queue.job->>'relative_path' = repositories.relative_path + WHERE replication_queue.id = queue.id`, + + // 2. Drop existing foreign key as it does nothing on removal of the referencable row. + `ALTER TABLE replication_queue_job_lock + DROP CONSTRAINT replication_queue_job_lock_job_id_fkey`, + + // 3. And re-create it with the cascade deletion. + `ALTER TABLE replication_queue_job_lock + ADD CONSTRAINT replication_queue_job_lock_job_id_fkey + FOREIGN KEY (job_id) + REFERENCES replication_queue(id) + ON DELETE CASCADE`, + + // 4. Drop existing foreign key as it does nothing on removal of the referencable row. + `ALTER TABLE replication_queue_job_lock DROP CONSTRAINT replication_queue_job_lock_lock_id_fkey`, + + // 5. And re-create it with the cascade deletion. + `ALTER TABLE replication_queue_job_lock + ADD CONSTRAINT replication_queue_job_lock_lock_id_fkey + FOREIGN KEY (lock_id) + REFERENCES replication_queue_lock(id) + ON DELETE CASCADE`, + + // 6. The replication events without repository_id are too old and should be + // removed for the foreign key constraint to be created on the repositories table. + `DELETE FROM replication_queue + WHERE repository_id = 0`, + + // 7. In case we removed some in_progress replication events we need to remove + // corresponding rows from replication_queue_lock because repository doesn't + // exist anymore and we don't need a lock row for it. + `DELETE FROM replication_queue_lock + WHERE acquired + AND NOT EXISTS(SELECT FROM replication_queue_job_lock WHERE lock_id = id)`, + + // 8. Once the repository is removed (row deleted from repositories table) it + // can't be used anymore. And there is no reason to process any remaining + // replication events. If gitaly node was out of service and repository + // deletion was scheduled as replication job this job will be removed as well + // but https://gitlab.com/gitlab-org/gitaly/-/issues/3719 should deal with it. + // And to automatically cleanup remaining replication events we create a foreign + // key with cascade removal. + `ALTER TABLE replication_queue + ADD CONSTRAINT replication_queue_repository_id_fkey + FOREIGN KEY (repository_id) + REFERENCES repositories(repository_id) + ON DELETE CASCADE`, + + // 9. If repository is removed there is nothing that cleans up replication_queue_lock + // table.The table is used to sync run of the replication jobs and rows in it + // created before rows in the replication_queue table that is why we can't use + // foreign key to remove orphaned lock rows. We rely on the trigger that removes + // rows from replication_queue_lock once the record is removed from the repositories table. + `-- +migrate StatementBegin + CREATE FUNCTION remove_queue_lock_on_repository_removal() RETURNS TRIGGER AS $$ + BEGIN + DELETE FROM replication_queue_lock + WHERE id LIKE (OLD.virtual_storage || '|%|' || OLD.relative_path); + RETURN NULL; + END; + $$ LANGUAGE plpgsql;`, + + // 10. Activates a trigger to remove rows from the replication_queue_lock once the row + // is deleted from the repositories table . + `CREATE TRIGGER remove_queue_lock_on_repository_removal AFTER DELETE ON repositories + FOR EACH ROW EXECUTE PROCEDURE remove_queue_lock_on_repository_removal()`, + }, + Down: []string{ + // 10. + `DROP TRIGGER remove_queue_lock_on_repository_removal ON repositories`, + + // 9. + `DROP FUNCTION remove_queue_lock_on_repository_removal`, + + // 8. + `ALTER TABLE replication_queue + DROP CONSTRAINT replication_queue_repository_id_fkey`, + + // 7. We can't restore deleted rows, nothing to do here. + + // 6. We can't restore deleted rows, nothing to do here. + + // 5. + `ALTER TABLE replication_queue_job_lock DROP CONSTRAINT replication_queue_job_lock_lock_id_fkey`, + + // 4. Re-create foreign key with the default options. + `ALTER TABLE replication_queue_job_lock + ADD CONSTRAINT replication_queue_job_lock_lock_id_fkey + FOREIGN KEY (lock_id) + REFERENCES replication_queue_lock(id)`, + + // 3. + `ALTER TABLE replication_queue_job_lock + DROP CONSTRAINT replication_queue_job_lock_job_id_fkey`, + + // 2. + `ALTER TABLE replication_queue_job_lock + ADD CONSTRAINT replication_queue_job_lock_job_id_fkey + FOREIGN KEY (job_id) + REFERENCES replication_queue(id)`, + + // 1. We don't know what is the set of rows we updated that is why we can't reset + // them back, so nothing to do here. + }, + } + + allMigrations = append(allMigrations, m) +} diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go index 08884c3ba29..96407f12574 100644 --- a/internal/praefect/datastore/queue_test.go +++ b/internal/praefect/datastore/queue_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" ) @@ -111,6 +112,7 @@ func TestPostgresReplicationEventQueue_DeleteReplicaUniqueIndex(t *testing.T) { ctx := testhelper.Context(t) if tc.existingJob != nil { + insertRepository(t, db, ctx, tc.existingJob.Job.VirtualStorage, tc.existingJob.Job.RelativePath, "stub") _, err := db.ExecContext(ctx, ` INSERT INTO replication_queue (state, job) VALUES ($1, $2) @@ -118,14 +120,16 @@ func TestPostgresReplicationEventQueue_DeleteReplicaUniqueIndex(t *testing.T) { require.NoError(t, err) } + job := ReplicationJob{ + Change: DeleteReplica, + VirtualStorage: "praefect", + RelativePath: "relative-path", + TargetNodeStorage: "gitaly-1", + } + insertRepository(t, db, ctx, job.VirtualStorage, job.RelativePath, "stub") _, err := NewPostgresReplicationEventQueue(db).Enqueue(ctx, ReplicationEvent{ State: JobStateReady, - Job: ReplicationJob{ - Change: DeleteReplica, - VirtualStorage: "praefect", - RelativePath: "relative-path", - TargetNodeStorage: "gitaly-1", - }, + Job: job, }) if tc.succeeds { @@ -155,6 +159,7 @@ func TestPostgresReplicationEventQueue_Enqueue(t *testing.T) { Params: nil, }, } + insertRepository(t, db, ctx, eventType.Job.VirtualStorage, eventType.Job.RelativePath, eventType.Job.SourceNodeStorage) actualEvent, err := queue.Enqueue(ctx, eventType) // initial event require.NoError(t, err) @@ -188,90 +193,74 @@ func TestPostgresReplicationEventQueue_Enqueue_triggerPopulatesColumns(t *testin db := testdb.New(t) ctx := testhelper.Context(t) - type action func(t *testing.T, tx *testdb.TxWrapper, event ReplicationEvent) ReplicationEvent + t.Run("no repository record exists", func(t *testing.T) { + tx := db.Begin(t) + defer tx.Rollback(t) - for _, tc := range []struct { - desc string - job ReplicationJob - beforeEnqueue action - afterEnqueue action - }{ - { - desc: "repository id embedded into the job", - job: ReplicationJob{ - Change: UpdateRepo, - RepositoryID: 1, - RelativePath: "/project/path-1", - ReplicaPath: "relative/project/path-1", - TargetNodeStorage: "gitaly-1", - SourceNodeStorage: "gitaly-0", - VirtualStorage: "praefect", - Params: nil, - }, - }, - { - desc: "repository id extracted from repositories table", - job: ReplicationJob{ - Change: RenameRepo, - RelativePath: "/project/path-1", - ReplicaPath: "relative/project/path-1", - TargetNodeStorage: "gitaly-1", - SourceNodeStorage: "gitaly-0", - VirtualStorage: "praefect", - Params: Params{"RelativePath": "new/path"}, - }, - beforeEnqueue: func(t *testing.T, tx *testdb.TxWrapper, event ReplicationEvent) ReplicationEvent { - const query = ` - INSERT INTO repositories(virtual_storage, relative_path, generation, "primary", replica_path) - VALUES ($1, $2, $3, $4, $5) - RETURNING repository_id` - var repositoryID int64 - err := tx.QueryRowContext(ctx, query, event.Job.VirtualStorage, event.Job.RelativePath, 1, event.Job.SourceNodeStorage, event.Job.ReplicaPath). - Scan(&repositoryID) - require.NoError(t, err, "create repository record to have it as a source of repository id") - return event - }, - afterEnqueue: func(t *testing.T, tx *testdb.TxWrapper, event ReplicationEvent) ReplicationEvent { - const query = `SELECT repository_id FROM repositories WHERE virtual_storage = $1 AND relative_path = $2` - err := tx.QueryRowContext(ctx, query, event.Job.VirtualStorage, event.Job.RelativePath). - Scan(&event.Job.RepositoryID) - require.NoError(t, err, "create repository record to have it as a source of repository id") - return event - }, - }, - { - desc: "repository id doesn't exist", - job: ReplicationJob{ - Change: RenameRepo, - RelativePath: "/project/path-1", - ReplicaPath: "relative/project/path-1", - TargetNodeStorage: "gitaly-1", - SourceNodeStorage: "gitaly-0", - VirtualStorage: "praefect", - Params: Params{"RelativePath": "new/path"}, - }, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - tx := db.Begin(t) - defer tx.Rollback(t) + job := ReplicationJob{ + Change: UpdateRepo, + RepositoryID: 1, + RelativePath: "/project/path-1", + ReplicaPath: "relative/project/path-1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + VirtualStorage: "praefect", + Params: nil, + } + queue := NewPostgresReplicationEventQueue(tx) + event := ReplicationEvent{Job: job} + _, err := queue.Enqueue(ctx, event) + ok := glsql.IsForeignKeyViolation(err, "replication_queue_repository_id_fkey") + require.Truef(t, ok, "returned error is not expected: %+v", err) + }) - queue := NewPostgresReplicationEventQueue(tx) - event := ReplicationEvent{Job: tc.job} - if tc.beforeEnqueue != nil { - event = tc.beforeEnqueue(t, tx, event) - } - enqueued, err := queue.Enqueue(ctx, event) - require.NoError(t, err) + t.Run("repository id not set on job, but found in repositories table", func(t *testing.T) { + tx := db.Begin(t) + defer tx.Rollback(t) - if tc.afterEnqueue != nil { - enqueued = tc.afterEnqueue(t, tx, enqueued) - } + job := ReplicationJob{ + Change: RenameRepo, + RelativePath: "/project/path-1", + ReplicaPath: "relative/project/path-1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + VirtualStorage: "praefect", + Params: Params{"RelativePath": "new/path"}, + } + repositoryID := insertRepository(t, tx, ctx, job.VirtualStorage, job.RelativePath, job.SourceNodeStorage) + queue := NewPostgresReplicationEventQueue(tx) + event := ReplicationEvent{Job: job} + enqueued, err := queue.Enqueue(ctx, event) + require.NoError(t, err) - job := extractReplicationJob(t, ctx, tx, enqueued.ID) - require.Equal(t, enqueued.Job, job) - }) - } + actual := extractReplicationJob(t, ctx, tx, enqueued.ID) + job.RepositoryID = repositoryID + require.Equal(t, job, actual) + }) + + t.Run("repository id set on job", func(t *testing.T) { + tx := db.Begin(t) + defer tx.Rollback(t) + + job := ReplicationJob{ + Change: RenameRepo, + RelativePath: "/project/path-1", + ReplicaPath: "relative/project/path-1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + VirtualStorage: "praefect", + Params: Params{"RelativePath": "new/path"}, + } + job.RepositoryID = insertRepository(t, tx, ctx, job.VirtualStorage, job.RelativePath, job.SourceNodeStorage) + + queue := NewPostgresReplicationEventQueue(tx) + event := ReplicationEvent{Job: job} + enqueued, err := queue.Enqueue(ctx, event) + require.NoError(t, err) + + actual := extractReplicationJob(t, ctx, tx, enqueued.ID) + require.Equal(t, job, actual) + }) } func extractReplicationJob(t *testing.T, ctx context.Context, tx *testdb.TxWrapper, id uint64) ReplicationJob { @@ -291,16 +280,19 @@ func extractReplicationJob(t *testing.T, ctx context.Context, tx *testdb.TxWrapp func TestPostgresReplicationEventQueue_DeleteReplicaInfiniteAttempts(t *testing.T) { t.Parallel() - queue := NewPostgresReplicationEventQueue(testdb.New(t)) + db := testdb.New(t) + queue := NewPostgresReplicationEventQueue(db) ctx := testhelper.Context(t) + job := ReplicationJob{ + Change: DeleteReplica, + RelativePath: "/project/path-1", + TargetNodeStorage: "gitaly-1", + VirtualStorage: "praefect", + } + job.RepositoryID = insertRepository(t, db, ctx, job.VirtualStorage, job.RelativePath, "stub") actualEvent, err := queue.Enqueue(ctx, ReplicationEvent{ - Job: ReplicationJob{ - Change: DeleteReplica, - RelativePath: "/project/path-1", - TargetNodeStorage: "gitaly-1", - VirtualStorage: "praefect", - }, + Job: job, }) require.NoError(t, err) @@ -310,6 +302,7 @@ func TestPostgresReplicationEventQueue_DeleteReplicaInfiniteAttempts(t *testing. Attempt: 3, LockID: "praefect|gitaly-1|/project/path-1", Job: ReplicationJob{ + RepositoryID: job.RepositoryID, Change: DeleteReplica, RelativePath: "/project/path-1", TargetNodeStorage: "gitaly-1", @@ -378,6 +371,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { }, } + eventType1.Job.RepositoryID = insertRepository(t, db, ctx, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage) event1, err := queue.Enqueue(ctx, eventType1) // initial event require.NoError(t, err) @@ -392,6 +386,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { LockID: "praefect-0|gitaly-1|/project/path-1", Job: ReplicationJob{ Change: UpdateRepo, + RepositoryID: eventType1.Job.RepositoryID, RelativePath: "/project/path-1", TargetNodeStorage: "gitaly-1", SourceNodeStorage: "gitaly-0", @@ -414,6 +409,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { LockID: "praefect-0|gitaly-1|/project/path-1", Job: ReplicationJob{ Change: UpdateRepo, + RepositoryID: eventType1.Job.RepositoryID, RelativePath: "/project/path-1", TargetNodeStorage: "gitaly-1", SourceNodeStorage: "gitaly-0", @@ -446,6 +442,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2, expEvent3}) requireLocks(t, ctx, db, []LockRow{expLock1, expLock2}) // the new lock for another target repeated event + eventType3.Job.RepositoryID = insertRepository(t, db, ctx, eventType3.Job.VirtualStorage, eventType3.Job.RelativePath, eventType3.Job.SourceNodeStorage) event4, err := queue.Enqueue(ctx, eventType3) // event for another repo require.NoError(t, err) @@ -456,6 +453,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { LockID: "praefect-1|gitaly-1|/project/path-2", Job: ReplicationJob{ Change: UpdateRepo, + RepositoryID: eventType3.Job.RepositoryID, RelativePath: "/project/path-2", TargetNodeStorage: "gitaly-1", SourceNodeStorage: "gitaly-0", @@ -487,6 +485,7 @@ func TestPostgresReplicationEventQueue_Dequeue(t *testing.T) { Params: nil, }, } + event.Job.RepositoryID = insertRepository(t, db, ctx, event.Job.VirtualStorage, event.Job.RelativePath, event.Job.SourceNodeStorage) event, err := queue.Enqueue(ctx, event) require.NoError(t, err, "failed to fill in event queue") @@ -570,6 +569,8 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) { // events to fill in the queue events := []ReplicationEvent{eventType1, eventType1, eventType2, eventType1, eventType3, eventType4} for i := range events { + events[i].Job.RepositoryID = insertRepository(t, db, ctx, events[i].Job.VirtualStorage, events[i].Job.RelativePath, events[i].Job.SourceNodeStorage) + var err error events[i], err = queue.Enqueue(ctx, events[i]) require.NoError(t, err, "failed to fill in event queue") @@ -655,6 +656,7 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test } for i := 0; i < 2; i++ { + eventType1.Job.RepositoryID = insertRepository(t, db, ctx, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage) _, err := queue.Enqueue(ctx, eventType1) require.NoError(t, err, "failed to fill in event queue") } @@ -669,6 +671,7 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test requireJobLocks(t, ctx, db, []JobLockRow{{JobID: 1, LockID: "praefect|gitaly-1|/project/path-1"}}) for i := 0; i < 2; i++ { + eventType2.Job.RepositoryID = insertRepository(t, db, ctx, eventType2.Job.VirtualStorage, eventType2.Job.RelativePath, eventType2.Job.SourceNodeStorage) _, err := queue.Enqueue(ctx, eventType2) require.NoError(t, err, "failed to fill in event queue") } @@ -704,7 +707,9 @@ func TestPostgresReplicationEventQueue_Acknowledge(t *testing.T) { }, } + repositoryID := insertRepository(t, db, ctx, event.Job.VirtualStorage, event.Job.RelativePath, event.Job.SourceNodeStorage) event, err := queue.Enqueue(ctx, event) + event.Job.RepositoryID = repositoryID require.NoError(t, err, "failed to fill in event queue") actual, err := queue.Dequeue(ctx, event.Job.VirtualStorage, event.Job.TargetNodeStorage, 100) @@ -781,6 +786,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { events := []ReplicationEvent{eventType1, eventType1, eventType2, eventType1, eventType3, eventType2, eventType4} // events to fill in the queue for i := range events { + events[i].Job.RepositoryID = insertRepository(t, db, ctx, events[i].Job.VirtualStorage, events[i].Job.RelativePath, events[i].Job.SourceNodeStorage) var err error events[i], err = queue.Enqueue(ctx, events[i]) require.NoError(t, err, "failed to fill in event queue") @@ -988,6 +994,7 @@ func TestPostgresReplicationEventQueue_StartHealthUpdate(t *testing.T) { queue := NewPostgresReplicationEventQueue(db) events := []ReplicationEvent{eventType1, eventType2, eventType3, eventType4} for i := range events { + events[i].Job.RepositoryID = insertRepository(t, db, ctx, events[i].Job.VirtualStorage, events[i].Job.RelativePath, events[i].Job.SourceNodeStorage) var err error events[i], err = queue.Enqueue(ctx, events[i]) require.NoError(t, err, "failed to fill in event queue") @@ -1062,6 +1069,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { db.TruncateAll(t) source := NewPostgresReplicationEventQueue(db) + insertRepository(t, db, ctx, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage) event, err := source.Enqueue(ctx, eventType1) require.NoError(t, err) @@ -1079,10 +1087,12 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { db.TruncateAll(t) source := NewPostgresReplicationEventQueue(db) + insertRepository(t, db, ctx, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage) // move event to 'ready' state event1, err := source.Enqueue(ctx, eventType1) require.NoError(t, err) + insertRepository(t, db, ctx, eventType2.Job.VirtualStorage, eventType2.Job.RelativePath, eventType2.Job.SourceNodeStorage) // move event to 'failed' state event2, err := source.Enqueue(ctx, eventType2) require.NoError(t, err) @@ -1092,6 +1102,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { _, err = source.Acknowledge(ctx, JobStateFailed, []uint64{devents2[0].ID}) require.NoError(t, err) + insertRepository(t, db, ctx, eventType3.Job.VirtualStorage, eventType3.Job.RelativePath, eventType3.Job.SourceNodeStorage) // move event to 'dead' state event3, err := source.Enqueue(ctx, eventType3) require.NoError(t, err) @@ -1101,6 +1112,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { _, err = source.Acknowledge(ctx, JobStateDead, []uint64{devents3[0].ID}) require.NoError(t, err) + insertRepository(t, db, ctx, eventType4.Job.VirtualStorage, eventType4.Job.RelativePath, eventType4.Job.SourceNodeStorage) event4, err := source.Enqueue(ctx, eventType4) require.NoError(t, err) devents4, err := source.Dequeue(ctx, event4.Job.VirtualStorage, event4.Job.TargetNodeStorage, 1) @@ -1122,6 +1134,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { var events []ReplicationEvent for _, eventType := range []ReplicationEvent{eventType1, eventType2, eventType3} { + insertRepository(t, db, ctx, eventType.Job.VirtualStorage, eventType.Job.RelativePath, eventType.Job.SourceNodeStorage) event, err := source.Enqueue(ctx, eventType) require.NoError(t, err) devents, err := source.Dequeue(ctx, event.Job.VirtualStorage, event.Job.TargetNodeStorage, 1) @@ -1151,6 +1164,73 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { }) } +func TestLockRowIsRemovedOnceRepositoryIsRemoved(t *testing.T) { + t.Parallel() + db := testdb.New(t) + ctx := testhelper.Context(t) + + const ( + virtualStorage = "praefect" + relativePath = "/project/path-1" + primaryStorage = "gitaly-0" + ) + + enqueueJobs := []ReplicationJob{ + // This event will be moved to in_progress state and lock will be with acquired=true. + { + Change: UpdateRepo, + RepositoryID: 1, + RelativePath: relativePath, + ReplicaPath: "relative/project/path-1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: primaryStorage, + VirtualStorage: virtualStorage, + Params: nil, + }, + // This event is for another storage, but for the same repository. + { + Change: UpdateRepo, + RepositoryID: 1, + RelativePath: relativePath, + ReplicaPath: "relative/project/path-2", + TargetNodeStorage: "gitaly-2", + SourceNodeStorage: primaryStorage, + VirtualStorage: virtualStorage, + Params: nil, + }, + } + repositoryID := insertRepository(t, db, ctx, virtualStorage, relativePath, primaryStorage) + + queue := NewPostgresReplicationEventQueue(db) + for _, job := range enqueueJobs { + event := ReplicationEvent{Job: job} + _, err := queue.Enqueue(ctx, event) + require.NoError(t, err) + } + _, err := queue.Dequeue(ctx, enqueueJobs[0].VirtualStorage, enqueueJobs[0].TargetNodeStorage, 1) + require.NoError(t, err, "pickup one job to change it's status and create additional rows") + + _, err = db.ExecContext(ctx, `DELETE FROM repositories where repository_id = $1`, repositoryID) + require.NoError(t, err) + + db.RequireRowsInTable(t, "replication_queue_lock", 0) +} + +func insertRepository(t *testing.T, db glsql.Querier, ctx context.Context, virtualStorage, relativePath, primary string) int64 { + t.Helper() + const query = ` + INSERT INTO repositories(virtual_storage, relative_path, generation, "primary") + VALUES ($1, $2, $3, $4) + ON CONFLICT (virtual_storage, relative_path) + DO UPDATE SET relative_path = excluded.relative_path + RETURNING repository_id` + var repositoryID int64 + err := db.QueryRowContext(ctx, query, virtualStorage, relativePath, 1, primary). + Scan(&repositoryID) + require.NoError(t, err, "create repository record") + return repositoryID +} + func requireEvents(t *testing.T, ctx context.Context, db testdb.DB, expected []ReplicationEvent) { t.Helper() -- GitLab From 48e2eef89245a24a259e76292398d8e637f6a86b Mon Sep 17 00:00:00 2001 From: Pavlo Strokov Date: Wed, 16 Feb 2022 16:16:06 +0200 Subject: [PATCH 04/12] migration: Indexes re-creation on replication_queue As we flattened job column of JSONB type to a set of simple columns we deprecate usage of the job column and replaces it with references on the corresponding columns of the replication_queue table for all the indexes that are exist on the replication_queue. Now they are referencing a new columns. --- ...0216131417_indexes_on_replication_queue.go | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 internal/praefect/datastore/migrations/20220216131417_indexes_on_replication_queue.go diff --git a/internal/praefect/datastore/migrations/20220216131417_indexes_on_replication_queue.go b/internal/praefect/datastore/migrations/20220216131417_indexes_on_replication_queue.go new file mode 100644 index 00000000000..4d19628d6c3 --- /dev/null +++ b/internal/praefect/datastore/migrations/20220216131417_indexes_on_replication_queue.go @@ -0,0 +1,53 @@ +package migrations + +import migrate "github.com/rubenv/sql-migrate" + +func init() { + m := &migrate.Migration{ + Id: "20220216131417_indexes_on_replication_queue", + DisableTransactionUp: true, + Up: []string{ + `DROP INDEX replication_queue_target_index`, + `CREATE INDEX CONCURRENTLY replication_queue_target_index + ON replication_queue (virtual_storage, relative_path, target_node_storage, change) + WHERE state NOT IN ('completed', 'cancelled', 'dead')`, + + `DROP INDEX delete_replica_unique_index`, + `CREATE UNIQUE INDEX CONCURRENTLY delete_replica_unique_index + ON replication_queue (virtual_storage, relative_path) + WHERE state NOT IN ('completed', 'cancelled', 'dead') + AND change = 'delete_replica'`, + + `DROP INDEX IF EXISTS virtual_target_on_replication_queue_idx`, + `CREATE INDEX CONCURRENTLY virtual_target_on_replication_queue_idx + ON replication_queue USING BTREE (virtual_storage, target_node_storage)`, + }, + DisableTransactionDown: true, + Down: []string{ + `DROP INDEX replication_queue_target_index`, + `CREATE INDEX CONCURRENTLY replication_queue_target_index + ON replication_queue ( + (job->>'virtual_storage'), + (job->>'relative_path'), + (job->>'target_node_storage'), + (job->>'change') + ) + WHERE state NOT IN ('completed', 'cancelled', 'dead')`, + + `DROP INDEX delete_replica_unique_index`, + `CREATE UNIQUE INDEX CONCURRENTLY delete_replica_unique_index + ON replication_queue ( + (job->>'virtual_storage'), + (job->>'relative_path') + ) + WHERE state NOT IN ('completed', 'cancelled', 'dead') + AND job->>'change' = 'delete_replica'`, + + `DROP INDEX IF EXISTS virtual_target_on_replication_queue_idx`, + `CREATE INDEX CONCURRENTLY virtual_target_on_replication_queue_idx + ON replication_queue USING BTREE ((job->>'virtual_storage'), (job->>'target_node_storage'))`, + }, + } + + allMigrations = append(allMigrations, m) +} -- GitLab From b818117017a838600b300697724c27b0f18e2d21 Mon Sep 17 00:00:00 2001 From: Pavlo Strokov Date: Wed, 16 Feb 2022 17:54:36 +0200 Subject: [PATCH 05/12] migration: Use new columns instead of JBONB job column As we flattened job column of JSONB type to a set of simple columns we deprecate usage of the job column and replaces it with references on the corresponding columns of the replication_queue table for valid_primaries view. --- ...20216140151_valid_primaries_flatten_job.go | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 internal/praefect/datastore/migrations/20220216140151_valid_primaries_flatten_job.go diff --git a/internal/praefect/datastore/migrations/20220216140151_valid_primaries_flatten_job.go b/internal/praefect/datastore/migrations/20220216140151_valid_primaries_flatten_job.go new file mode 100644 index 00000000000..2ef001033c3 --- /dev/null +++ b/internal/praefect/datastore/migrations/20220216140151_valid_primaries_flatten_job.go @@ -0,0 +1,65 @@ +package migrations + +import migrate "github.com/rubenv/sql-migrate" + +func init() { + m := &migrate.Migration{ + Id: "20220216140151_valid_primaries_flatten_job", + Up: []string{ + ` +CREATE OR REPLACE VIEW valid_primaries AS + SELECT repository_id, virtual_storage, relative_path, storage + FROM ( + SELECT + repository_id, + repositories.virtual_storage, + repositories.relative_path, + storage, + repository_assignments.storage IS NOT NULL + OR bool_and(repository_assignments.storage IS NULL) OVER (PARTITION BY repository_id) AS eligible + FROM repositories + JOIN (SELECT repository_id, storage, generation FROM storage_repositories) AS storage_repositories USING (repository_id, generation) + JOIN healthy_storages USING (virtual_storage, storage) + LEFT JOIN repository_assignments USING (repository_id, storage) + WHERE NOT EXISTS ( + SELECT FROM replication_queue AS queue + WHERE queue.state NOT IN ('completed', 'dead', 'cancelled') + AND queue.change = 'delete_replica' + AND queue.repository_id = storage_repositories.repository_id + AND queue.target_node_storage = storage_repositories.storage + ) + ) AS candidates + WHERE eligible + `, + }, + Down: []string{ + ` +CREATE OR REPLACE VIEW valid_primaries AS + SELECT repository_id, virtual_storage, relative_path, storage + FROM ( + SELECT + repository_id, + repositories.virtual_storage, + repositories.relative_path, + storage, + repository_assignments.storage IS NOT NULL + OR bool_and(repository_assignments.storage IS NULL) OVER (PARTITION BY repository_id) AS eligible + FROM repositories + JOIN (SELECT repository_id, storage, generation FROM storage_repositories) AS storage_repositories USING (repository_id, generation) + JOIN healthy_storages USING (virtual_storage, storage) + LEFT JOIN repository_assignments USING (repository_id, storage) + WHERE NOT EXISTS ( + SELECT FROM replication_queue AS queue + WHERE queue.state NOT IN ('completed', 'dead', 'cancelled') + AND queue.job->>'change' = 'delete_replica' + AND (queue.job->>'repository_id')::bigint = storage_repositories.repository_id + AND queue.job->>'target_node_storage' = storage_repositories.storage + ) + ) AS candidates + WHERE eligible + `, + }, + } + + allMigrations = append(allMigrations, m) +} -- GitLab From 3329d80d1083fa4dd80128b6e678b99a0b9f7c4f Mon Sep 17 00:00:00 2001 From: Pavlo Strokov Date: Sat, 19 Feb 2022 15:23:52 +0200 Subject: [PATCH 06/12] migrations: Update expected database schema definition --- _support/praefect-schema.sql | 113 ++++++++++++++++++++++++++++++++--- 1 file changed, 105 insertions(+), 8 deletions(-) diff --git a/_support/praefect-schema.sql b/_support/praefect-schema.sql index b69a2f0c670..a356d42393a 100644 --- a/_support/praefect-schema.sql +++ b/_support/praefect-schema.sql @@ -50,6 +50,28 @@ CREATE TYPE public.replication_job_state AS ENUM ( ); +-- +-- Name: replication_job_type; Type: TYPE; Schema: public; Owner: - +-- + +CREATE TYPE public.replication_job_type AS ENUM ( + 'update', + 'create', + 'delete', + 'delete_replica', + 'rename', + 'gc', + 'repack_full', + 'repack_incremental', + 'cleanup', + 'pack_refs', + 'write_commit_graph', + 'midx_repack', + 'optimize_repository', + 'prune_unreachable_objects' +); + + -- -- Name: notify_on_change(); Type: FUNCTION; Schema: public; Owner: - -- @@ -94,6 +116,51 @@ CREATE FUNCTION public.notify_on_change() RETURNS trigger $$; +-- +-- Name: remove_queue_lock_on_repository_removal(); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.remove_queue_lock_on_repository_removal() RETURNS trigger + LANGUAGE plpgsql + AS $$ + BEGIN + DELETE FROM replication_queue_lock + WHERE id LIKE (OLD.virtual_storage || '|%|' || OLD.relative_path); + RETURN NULL; + END; + $$; + + +-- +-- Name: replication_queue_flatten_job(); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.replication_queue_flatten_job() RETURNS trigger + LANGUAGE plpgsql + AS $$ + BEGIN + NEW.change := (NEW.job->>'change')::REPLICATION_JOB_TYPE; + NEW.repository_id := COALESCE( + -- For the old jobs the repository_id field may be 'null' or not set at all. + -- repository_id field could have 0 if event was created by reconciler before + -- repositories table was populated with valid repository_id. + CASE WHEN (NEW.job->>'repository_id')::BIGINT = 0 THEN NULL ELSE (NEW.job->>'repository_id')::BIGINT END, + (SELECT repositories.repository_id FROM repositories WHERE repositories.virtual_storage = NEW.job->>'virtual_storage' AND repositories.relative_path = NEW.job->>'relative_path'), + 0 + ); + -- The reconciler doesn't populate replica_path field that is why we need make sure + -- we have at least an empty value for the column, not to break scan operations. + NEW.replica_path := COALESCE(NEW.job->>'replica_path', ''); + NEW.relative_path := NEW.job->>'relative_path'; + NEW.target_node_storage := NEW.job->>'target_node_storage'; + NEW.source_node_storage := COALESCE(NEW.job->>'source_node_storage', ''); + NEW.virtual_storage := NEW.job->>'virtual_storage'; + NEW.params := (NEW.job->>'params')::JSONB; + RETURN NEW; + END; + $$; + + SET default_tablespace = ''; SET default_with_oids = false; @@ -168,7 +235,15 @@ CREATE TABLE public.replication_queue ( attempt integer DEFAULT 3 NOT NULL, lock_id text, job jsonb, - meta jsonb + meta jsonb, + change public.replication_job_type, + repository_id bigint DEFAULT 0 NOT NULL, + replica_path text DEFAULT ''::text NOT NULL, + relative_path text, + target_node_storage text, + source_node_storage text DEFAULT ''::text NOT NULL, + virtual_storage text, + params jsonb ); @@ -361,8 +436,8 @@ CREATE VIEW public.valid_primaries AS JOIN public.healthy_storages USING (virtual_storage, storage)) LEFT JOIN public.repository_assignments USING (repository_id, storage)) WHERE (NOT (EXISTS ( SELECT - FROM public.replication_queue - WHERE ((replication_queue.state <> ALL (ARRAY['completed'::public.replication_job_state, 'dead'::public.replication_job_state, 'cancelled'::public.replication_job_state])) AND ((replication_queue.job ->> 'change'::text) = 'delete_replica'::text) AND (((replication_queue.job ->> 'repository_id'::text))::bigint = repositories.repository_id) AND ((replication_queue.job ->> 'target_node_storage'::text) = storage_repositories.storage)))))) candidates + FROM public.replication_queue queue + WHERE ((queue.state <> ALL (ARRAY['completed'::public.replication_job_state, 'dead'::public.replication_job_state, 'cancelled'::public.replication_job_state])) AND (queue.change = 'delete_replica'::public.replication_job_type) AND (queue.repository_id = storage_repositories.repository_id) AND (queue.target_node_storage = storage_repositories.storage)))))) candidates WHERE candidates.eligible; @@ -496,14 +571,14 @@ ALTER TABLE ONLY public.virtual_storages -- Name: delete_replica_unique_index; Type: INDEX; Schema: public; Owner: - -- -CREATE UNIQUE INDEX delete_replica_unique_index ON public.replication_queue USING btree (((job ->> 'virtual_storage'::text)), ((job ->> 'relative_path'::text))) WHERE ((state <> ALL (ARRAY['completed'::public.replication_job_state, 'cancelled'::public.replication_job_state, 'dead'::public.replication_job_state])) AND ((job ->> 'change'::text) = 'delete_replica'::text)); +CREATE UNIQUE INDEX delete_replica_unique_index ON public.replication_queue USING btree (virtual_storage, relative_path) WHERE ((state <> ALL (ARRAY['completed'::public.replication_job_state, 'cancelled'::public.replication_job_state, 'dead'::public.replication_job_state])) AND (change = 'delete_replica'::public.replication_job_type)); -- -- Name: replication_queue_target_index; Type: INDEX; Schema: public; Owner: - -- -CREATE INDEX replication_queue_target_index ON public.replication_queue USING btree (((job ->> 'virtual_storage'::text)), ((job ->> 'relative_path'::text)), ((job ->> 'target_node_storage'::text)), ((job ->> 'change'::text))) WHERE (state <> ALL (ARRAY['completed'::public.replication_job_state, 'cancelled'::public.replication_job_state, 'dead'::public.replication_job_state])); +CREATE INDEX replication_queue_target_index ON public.replication_queue USING btree (virtual_storage, relative_path, target_node_storage, change) WHERE (state <> ALL (ARRAY['completed'::public.replication_job_state, 'cancelled'::public.replication_job_state, 'dead'::public.replication_job_state])); -- @@ -559,7 +634,7 @@ CREATE UNIQUE INDEX storage_repositories_new_pkey ON public.storage_repositories -- Name: virtual_target_on_replication_queue_idx; Type: INDEX; Schema: public; Owner: - -- -CREATE INDEX virtual_target_on_replication_queue_idx ON public.replication_queue USING btree (((job ->> 'virtual_storage'::text)), ((job ->> 'target_node_storage'::text))); +CREATE INDEX virtual_target_on_replication_queue_idx ON public.replication_queue USING btree (virtual_storage, target_node_storage); -- @@ -590,12 +665,26 @@ CREATE TRIGGER notify_on_insert AFTER INSERT ON public.storage_repositories REFE CREATE TRIGGER notify_on_update AFTER UPDATE ON public.storage_repositories REFERENCING OLD TABLE AS old NEW TABLE AS new FOR EACH STATEMENT EXECUTE PROCEDURE public.notify_on_change('storage_repositories_updates'); +-- +-- Name: repositories remove_queue_lock_on_repository_removal; Type: TRIGGER; Schema: public; Owner: - +-- + +CREATE TRIGGER remove_queue_lock_on_repository_removal AFTER DELETE ON public.repositories FOR EACH ROW EXECUTE PROCEDURE public.remove_queue_lock_on_repository_removal(); + + +-- +-- Name: replication_queue replication_queue_flatten_job; Type: TRIGGER; Schema: public; Owner: - +-- + +CREATE TRIGGER replication_queue_flatten_job BEFORE INSERT ON public.replication_queue FOR EACH ROW EXECUTE PROCEDURE public.replication_queue_flatten_job(); + + -- -- Name: replication_queue_job_lock replication_queue_job_lock_job_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - -- ALTER TABLE ONLY public.replication_queue_job_lock - ADD CONSTRAINT replication_queue_job_lock_job_id_fkey FOREIGN KEY (job_id) REFERENCES public.replication_queue(id); + ADD CONSTRAINT replication_queue_job_lock_job_id_fkey FOREIGN KEY (job_id) REFERENCES public.replication_queue(id) ON DELETE CASCADE; -- @@ -603,7 +692,15 @@ ALTER TABLE ONLY public.replication_queue_job_lock -- ALTER TABLE ONLY public.replication_queue_job_lock - ADD CONSTRAINT replication_queue_job_lock_lock_id_fkey FOREIGN KEY (lock_id) REFERENCES public.replication_queue_lock(id); + ADD CONSTRAINT replication_queue_job_lock_lock_id_fkey FOREIGN KEY (lock_id) REFERENCES public.replication_queue_lock(id) ON DELETE CASCADE; + + +-- +-- Name: replication_queue replication_queue_repository_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.replication_queue + ADD CONSTRAINT replication_queue_repository_id_fkey FOREIGN KEY (repository_id) REFERENCES public.repositories(repository_id) ON DELETE CASCADE; -- -- GitLab From 58cd719e36ad0faca7100eced1bf952a06ba43a7 Mon Sep 17 00:00:00 2001 From: Pavlo Strokov Date: Wed, 16 Feb 2022 14:43:10 +0200 Subject: [PATCH 07/12] remove-repository: No need to remove replication events Praefect sub-command 'remove-repository' doesn't need to remove schdeuled replication events from the database anymore as all events associated with the repository will be removed automatically after removal of the row from the repositories table. --- cmd/praefect/subcmd_remove_repository.go | 52 --------- cmd/praefect/subcmd_remove_repository_test.go | 104 ------------------ 2 files changed, 156 deletions(-) diff --git a/cmd/praefect/subcmd_remove_repository.go b/cmd/praefect/subcmd_remove_repository.go index 49b049615c3..26fb1c8a8bc 100644 --- a/cmd/praefect/subcmd_remove_repository.go +++ b/cmd/praefect/subcmd_remove_repository.go @@ -12,7 +12,6 @@ import ( "time" "github.com/sirupsen/logrus" - "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" @@ -150,14 +149,6 @@ func (cmd *removeRepository) exec(ctx context.Context, logger logrus.FieldLogger fmt.Fprintln(cmd.w, "Removed repository metadata from the database.") } - fmt.Fprintln(cmd.w, "Removing replication events...") - ticker := helper.NewTimerTicker(time.Second) - defer ticker.Stop() - if err := cmd.removeReplicationEvents(ctx, logger, db, ticker); err != nil { - return fmt.Errorf("remove scheduled replication events: %w", err) - } - fmt.Fprintln(cmd.w, "Replication event removal completed.") - // We should try to remove repository from each of gitaly nodes. fmt.Fprintln(cmd.w, "Removing repository directly from gitaly nodes...") cmd.removeRepositoryForEachGitaly(ctx, cfg, logger) @@ -249,49 +240,6 @@ func (cmd *removeRepository) removeRepository(ctx context.Context, repo *gitalyp return true, nil } -func (cmd *removeRepository) removeReplicationEvents(ctx context.Context, logger logrus.FieldLogger, db *sql.DB, ticker helper.Ticker) error { - // Wait for the completion of the repository replication jobs. - // As some of them could be a repository creation jobs we need to remove those newly created - // repositories after replication finished. - start := time.Now() - var tick helper.Ticker - for found := true; found; { - if tick != nil { - tick.Reset() - <-tick.C() - } else { - tick = ticker - } - - if int(time.Since(start).Seconds())%5 == 0 { - logger.Debug("awaiting for the repository in_progress replication jobs to complete...") - } - row := db.QueryRowContext( - ctx, - `WITH remove_replication_jobs AS ( - DELETE FROM replication_queue - WHERE job->>'virtual_storage' = $1 - AND job->>'relative_path' = $2 - -- Do not remove ongoing replication events as we need to wait - -- for their completion. - AND state != 'in_progress' - ) - SELECT EXISTS( - SELECT - FROM replication_queue - WHERE job->>'virtual_storage' = $1 - AND job->>'relative_path' = $2 - AND state = 'in_progress')`, - cmd.virtualStorage, - cmd.relativePath, - ) - if err := row.Scan(&found); err != nil { - return fmt.Errorf("scan in progress jobs: %w", err) - } - } - return nil -} - func (cmd *removeRepository) removeNode( ctx context.Context, logger logrus.FieldLogger, diff --git a/cmd/praefect/subcmd_remove_repository_test.go b/cmd/praefect/subcmd_remove_repository_test.go index 49bc60fd743..fd479907183 100644 --- a/cmd/praefect/subcmd_remove_repository_test.go +++ b/cmd/praefect/subcmd_remove_repository_test.go @@ -16,7 +16,6 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/client" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" - "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" @@ -266,106 +265,3 @@ func requireNoDatabaseInfo(t *testing.T, db testdb.DB, cmd *removeRepository) { ).Scan(&storageRowExists)) require.False(t, storageRowExists) } - -func TestRemoveRepository_removeReplicationEvents(t *testing.T) { - t.Parallel() - const ( - virtualStorage = "praefect" - relativePath = "relative_path/to/repo.git" - ) - ctx := testhelper.Context(t) - - db := testdb.New(t) - - queue := datastore.NewPostgresReplicationEventQueue(db) - - // Set replication event in_progress. - inProgressEvent, err := queue.Enqueue(ctx, datastore.ReplicationEvent{ - Job: datastore.ReplicationJob{ - Change: datastore.CreateRepo, - VirtualStorage: virtualStorage, - TargetNodeStorage: "gitaly-2", - RelativePath: relativePath, - }, - }) - require.NoError(t, err) - inProgress1, err := queue.Dequeue(ctx, virtualStorage, "gitaly-2", 10) - require.NoError(t, err) - require.Len(t, inProgress1, 1) - - // New event - events in the 'ready' state should be removed. - _, err = queue.Enqueue(ctx, datastore.ReplicationEvent{ - Job: datastore.ReplicationJob{ - Change: datastore.UpdateRepo, - VirtualStorage: virtualStorage, - TargetNodeStorage: "gitaly-3", - SourceNodeStorage: "gitaly-1", - RelativePath: relativePath, - }, - }) - require.NoError(t, err) - - // Failed event - should be removed as well. - failedEvent, err := queue.Enqueue(ctx, datastore.ReplicationEvent{ - Job: datastore.ReplicationJob{ - Change: datastore.UpdateRepo, - VirtualStorage: virtualStorage, - TargetNodeStorage: "gitaly-4", - SourceNodeStorage: "gitaly-0", - RelativePath: relativePath, - }, - }) - require.NoError(t, err) - inProgress2, err := queue.Dequeue(ctx, virtualStorage, "gitaly-4", 10) - require.NoError(t, err) - require.Len(t, inProgress2, 1) - // Acknowledge with failed status, so it will remain in the database for the next processing - // attempt or until it is deleted by the 'removeReplicationEvents' method. - acks2, err := queue.Acknowledge(ctx, datastore.JobStateFailed, []uint64{inProgress2[0].ID}) - require.NoError(t, err) - require.Equal(t, []uint64{inProgress2[0].ID}, acks2) - - ticker := helper.NewManualTicker() - defer ticker.Stop() - - errChan := make(chan error, 1) - go func() { - cmd := &removeRepository{virtualStorage: virtualStorage, relativePath: relativePath} - errChan <- cmd.removeReplicationEvents(ctx, testhelper.NewDiscardingLogger(t), db.DB, ticker) - }() - - ticker.Tick() - ticker.Tick() - ticker.Tick() // blocks until previous tick is consumed - - // Now we acknowledge in_progress job, so it stops the processing loop or the command. - acks, err := queue.Acknowledge(ctx, datastore.JobStateCompleted, []uint64{inProgressEvent.ID}) - if assert.NoError(t, err) { - assert.Equal(t, []uint64{inProgress1[0].ID}, acks) - } - - ticker.Tick() - - timeout := time.After(time.Minute) - for checkChan, exists := errChan, true; exists; { - select { - case err := <-checkChan: - require.NoError(t, err) - close(errChan) - checkChan = nil - case <-timeout: - require.FailNow(t, "timeout reached, looks like the command hasn't made any progress") - case <-time.After(50 * time.Millisecond): - // Wait until job removed - row := db.QueryRow(`SELECT EXISTS(SELECT FROM replication_queue WHERE id = $1)`, failedEvent.ID) - require.NoError(t, row.Scan(&exists)) - } - } - // Once there are no in_progress jobs anymore the method returns. - require.NoError(t, <-errChan) - - var notExists bool - row := db.QueryRow(`SELECT NOT EXISTS(SELECT FROM replication_queue)`) - require.NoError(t, row.Scan(¬Exists)) - require.True(t, notExists) -} -- GitLab From 052885354a655db51f2a3740584a58b27bea5a85 Mon Sep 17 00:00:00 2001 From: Pavlo Strokov Date: Wed, 16 Feb 2022 14:57:51 +0200 Subject: [PATCH 08/12] reconciler: Use new columns instead of JBONB job column As we migrate JSONB job column to a set of simple columns in this commit we change query that references deprecated job column with references on the corresponding simple columns of the replication_queue table. Because tests were written with some inconsistency in this commit we address it by providing a proper value for the Change filed of the ReplicationJob where it is missed. And because of the foreign key constraint between replication_queue and repositories table in it not possible anymore to create replication events for non-existing repositories. That is why setup code of some test-case was changed. --- internal/praefect/reconciler/reconciler.go | 44 +++++++++---------- .../praefect/reconciler/reconciler_test.go | 42 ++++-------------- 2 files changed, 30 insertions(+), 56 deletions(-) diff --git a/internal/praefect/reconciler/reconciler.go b/internal/praefect/reconciler/reconciler.go index 1908ef9ee99..124737dc4f3 100644 --- a/internal/praefect/reconciler/reconciler.go +++ b/internal/praefect/reconciler/reconciler.go @@ -177,21 +177,21 @@ delete_jobs AS ( ) AND NOT EXISTS ( -- Ensure the replica is not used as target or source in any scheduled job. This is to avoid breaking -- any already scheduled jobs. - SELECT FROM replication_queue - WHERE (job->'repository_id')::bigint = repository_id + SELECT FROM replication_queue AS q + WHERE q.repository_id = repositories.repository_id AND ( - job->>'source_node_storage' = storage - OR job->>'target_node_storage' = storage + q.source_node_storage = healthy_storages.storage + OR q.target_node_storage = healthy_storages.storage ) AND state NOT IN ('completed', 'dead') ) AND NOT EXISTS ( -- Ensure there are no other scheduled 'delete_replica' type jobs for the repository. Performing rapid -- repository_assignments could cause the reconciler to schedule deletion against all replicas. To avoid this, -- we do not allow more than one 'delete_replica' job to be active at any given time. - SELECT FROM replication_queue + SELECT FROM replication_queue AS q WHERE state NOT IN ('completed', 'dead') - AND (job->>'repository_id')::bigint = repository_id - AND job->>'change' = 'delete_replica' + AND q.repository_id = repositories.repository_id + AND q.change = 'delete_replica' ) ), @@ -229,20 +229,20 @@ update_jobs AS ( JOIN repositories USING (repository_id, relative_path, generation) JOIN healthy_storages USING (virtual_storage, storage) WHERE NOT EXISTS ( - SELECT FROM replication_queue + SELECT FROM replication_queue AS q WHERE state NOT IN ('completed', 'dead') - AND (job->>'repository_id')::bigint = repository_id - AND job->>'target_node_storage' = storage - AND job->>'change' = 'delete_replica' + AND q.repository_id = repositories.repository_id + AND q.target_node_storage = healthy_storages.storage + AND q.change = 'delete_replica' ) ORDER BY virtual_storage, relative_path ) AS healthy_repositories USING (repository_id) WHERE NOT EXISTS ( - SELECT FROM replication_queue + SELECT FROM replication_queue AS q WHERE state NOT IN ('completed', 'dead') - AND (job->>'repository_id')::bigint = repository_id - AND job->>'target_node_storage' = target_node_storage - AND job->>'change' = 'update' + AND q.repository_id = unhealthy_repositories.repository_id + AND q.target_node_storage = unhealthy_repositories.target_node_storage + AND q.change = 'update' ) ORDER BY repository_id, target_node_storage, random() ), @@ -275,7 +275,7 @@ reconciliation_jobs AS ( -- only perform inserts if we managed to acquire the lock as otherwise -- we'd schedule duplicate jobs WHERE ( SELECT acquired FROM reconciliation_lock ) - RETURNING lock_id, meta, job + RETURNING lock_id, meta, repository_id, change, virtual_storage, relative_path, source_node_storage, target_node_storage ), create_locks AS ( @@ -287,12 +287,12 @@ create_locks AS ( SELECT meta->>'correlation_id', - job->>'repository_id', - job->>'change', - job->>'virtual_storage', - job->>'relative_path', - job->>'source_node_storage', - job->>'target_node_storage' + repository_id, + change, + virtual_storage, + relative_path, + source_node_storage, + target_node_storage FROM reconciliation_jobs `, advisorylock.Reconcile, virtualStorages, storages) if err != nil { diff --git a/internal/praefect/reconciler/reconciler_test.go b/internal/praefect/reconciler/reconciler_test.go index 6390c9478db..673483d7e08 100644 --- a/internal/praefect/reconciler/reconciler_test.go +++ b/internal/praefect/reconciler/reconciler_test.go @@ -648,6 +648,7 @@ func TestReconciler(t *testing.T) { { State: datastore.JobStateReady, Job: datastore.ReplicationJob{ + Change: datastore.GarbageCollect, VirtualStorage: "virtual-storage-1", RelativePath: "relative-path-1", TargetNodeStorage: "storage-2", @@ -670,6 +671,7 @@ func TestReconciler(t *testing.T) { { State: datastore.JobStateInProgress, Job: datastore.ReplicationJob{ + Change: datastore.GarbageCollect, VirtualStorage: "virtual-storage-1", RelativePath: "relative-path-1", TargetNodeStorage: "storage-2", @@ -692,6 +694,7 @@ func TestReconciler(t *testing.T) { { State: datastore.JobStateFailed, Job: datastore.ReplicationJob{ + Change: datastore.GarbageCollect, VirtualStorage: "virtual-storage-1", RelativePath: "relative-path-1", TargetNodeStorage: "storage-2", @@ -714,6 +717,7 @@ func TestReconciler(t *testing.T) { { State: datastore.JobStateReady, Job: datastore.ReplicationJob{ + Change: datastore.GarbageCollect, VirtualStorage: "virtual-storage-1", RelativePath: "relative-path-1", SourceNodeStorage: "storage-2", @@ -736,6 +740,7 @@ func TestReconciler(t *testing.T) { { State: datastore.JobStateInProgress, Job: datastore.ReplicationJob{ + Change: datastore.GarbageCollect, VirtualStorage: "virtual-storage-1", RelativePath: "relative-path-1", SourceNodeStorage: "storage-2", @@ -758,6 +763,7 @@ func TestReconciler(t *testing.T) { { State: datastore.JobStateFailed, Job: datastore.ReplicationJob{ + Change: datastore.GarbageCollect, VirtualStorage: "virtual-storage-1", RelativePath: "relative-path-1", SourceNodeStorage: "storage-2", @@ -777,25 +783,10 @@ func TestReconciler(t *testing.T) { }, }, existingJobs: existingJobs{ - { - State: datastore.JobStateReady, - Job: datastore.ReplicationJob{ - VirtualStorage: "wrong-virtual-storage", - RelativePath: "relative-path-1", - SourceNodeStorage: "storage-2", - }, - }, - { - State: datastore.JobStateReady, - Job: datastore.ReplicationJob{ - VirtualStorage: "virtual-storage-1", - RelativePath: "wrong-relative-path", - SourceNodeStorage: "storage-2", - }, - }, { State: datastore.JobStateDead, Job: datastore.ReplicationJob{ + Change: datastore.GarbageCollect, VirtualStorage: "virtual-storage-1", RelativePath: "relative-path-1", SourceNodeStorage: "storage-2", @@ -804,6 +795,7 @@ func TestReconciler(t *testing.T) { { State: datastore.JobStateCompleted, Job: datastore.ReplicationJob{ + Change: datastore.GarbageCollect, VirtualStorage: "virtual-storage-1", RelativePath: "relative-path-1", SourceNodeStorage: "storage-2", @@ -952,24 +944,6 @@ func TestReconciler(t *testing.T) { }, }, existingJobs: existingJobs{ - { - State: datastore.JobStateFailed, - Job: datastore.ReplicationJob{ - Change: datastore.DeleteReplica, - VirtualStorage: "wrong-virtual-storage", - RelativePath: "relative-path-1", - SourceNodeStorage: "storage-1", - }, - }, - { - State: datastore.JobStateFailed, - Job: datastore.ReplicationJob{ - Change: datastore.DeleteReplica, - VirtualStorage: "virtual-storage-1", - RelativePath: "wrong-relative-path", - SourceNodeStorage: "storage-1", - }, - }, { State: datastore.JobStateDead, Job: datastore.ReplicationJob{ -- GitLab From 3cb0c82f8f75aefc5dffef98f8b86404d9de3db7 Mon Sep 17 00:00:00 2001 From: Pavlo Strokov Date: Wed, 16 Feb 2022 15:09:20 +0200 Subject: [PATCH 09/12] datastore: Use new columns instead of JBONB job column As we flattened job column of JSONB type to a set of simple columns we deprecate usage of the job column and replaces it with references on the corresponding columns of the replication_queue table for QueueDepthCollector. --- internal/praefect/datastore/collector.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/praefect/datastore/collector.go b/internal/praefect/datastore/collector.go index 0b93bd76098..c98dcfac16a 100644 --- a/internal/praefect/datastore/collector.go +++ b/internal/praefect/datastore/collector.go @@ -148,9 +148,9 @@ func (q *QueueDepthCollector) Collect(ch chan<- prometheus.Metric) { defer cancel() rows, err := q.db.QueryContext(ctx, ` -SELECT job->>'virtual_storage', job->>'target_node_storage', state, COUNT(*) +SELECT virtual_storage, target_node_storage, state, COUNT(*) FROM replication_queue -GROUP BY job->>'virtual_storage', job->>'target_node_storage', state +GROUP BY virtual_storage, target_node_storage, state `) if err != nil { q.log.WithError(err).Error("failed to query queue depth metrics") -- GitLab From a269dcdac7caf7282f2f10a21eea33a1d8258dd0 Mon Sep 17 00:00:00 2001 From: Pavlo Strokov Date: Wed, 16 Feb 2022 18:09:18 +0200 Subject: [PATCH 10/12] datastore: Use new columns instead of JBONB job column As we flattened job column of JSONB type to a set of simple columns we deprecate usage of the job column and replaces it with references on the corresponding columns of the replication_queue table for PostgresReplicationEventQueue. The data is retrieved from the columns on the scan operation. The tests are fixed to include repository_id into replication jobs. --- internal/praefect/datastore/queue.go | 88 ++++++++++------------- internal/praefect/datastore/queue_test.go | 13 +++- 2 files changed, 48 insertions(+), 53 deletions(-) diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go index 986564b1894..65948426a96 100644 --- a/internal/praefect/datastore/queue.go +++ b/internal/praefect/datastore/queue.go @@ -101,50 +101,8 @@ type ReplicationEvent struct { Meta Params } -// Mapping returns list of references to the struct fields that correspond to the SQL columns/column aliases. -func (event *ReplicationEvent) Mapping(columns []string) ([]interface{}, error) { - var mapping []interface{} - for _, column := range columns { - switch column { - case "id": - mapping = append(mapping, &event.ID) - case "state": - mapping = append(mapping, &event.State) - case "created_at": - mapping = append(mapping, &event.CreatedAt) - case "updated_at": - mapping = append(mapping, &event.UpdatedAt) - case "attempt": - mapping = append(mapping, &event.Attempt) - case "lock_id": - mapping = append(mapping, &event.LockID) - case "job": - mapping = append(mapping, &event.Job) - case "meta": - mapping = append(mapping, &event.Meta) - default: - return nil, fmt.Errorf("unknown column specified in SELECT statement: %q", column) - } - } - return mapping, nil -} - -// Scan fills receive fields with values fetched from database based on the set of columns/column aliases. -func (event *ReplicationEvent) Scan(columns []string, rows *sql.Rows) error { - mappings, err := event.Mapping(columns) - if err != nil { - return err - } - return rows.Scan(mappings...) -} - // scanReplicationEvents reads all rows and convert them into structs filling all the fields according to fetched columns/column aliases. func scanReplicationEvents(rows *sql.Rows) (events []ReplicationEvent, err error) { - columns, err := rows.Columns() - if err != nil { - return events, err - } - defer func() { if cErr := rows.Close(); cErr != nil && err == nil { err = cErr @@ -153,9 +111,29 @@ func scanReplicationEvents(rows *sql.Rows) (events []ReplicationEvent, err error for rows.Next() { var event ReplicationEvent - if err = event.Scan(columns, rows); err != nil { + var srcNodeStorage sql.NullString + if err = rows.Scan( + &event.ID, + &event.State, + &event.CreatedAt, + &event.UpdatedAt, + &event.LockID, + &event.Attempt, + &event.Meta, + &event.Job.Change, + &event.Job.RepositoryID, + &event.Job.ReplicaPath, + &event.Job.RelativePath, + &event.Job.TargetNodeStorage, + &srcNodeStorage, + &event.Job.VirtualStorage, + &event.Job.Params, + ); err != nil { return events, err } + if srcNodeStorage.Valid { + event.Job.SourceNodeStorage = srcNodeStorage.String + } events = append(events, event) } @@ -226,7 +204,9 @@ func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event Repli INSERT INTO replication_queue(lock_id, job, meta) SELECT insert_lock.id, $4, $5 FROM insert_lock - RETURNING id, state, created_at, updated_at, lock_id, attempt, job, meta` + RETURNING id, state, created_at, updated_at, lock_id, attempt, meta, change, + repository_id, replica_path, relative_path, target_node_storage, + source_node_storage, virtual_storage, params` // this will always return a single row result (because of lock uniqueness) or an error rows, err := rq.qc.QueryContext(ctx, query, event.Job.VirtualStorage, event.Job.TargetNodeStorage, event.Job.RelativePath, event.Job, event.Meta) if err != nil { @@ -275,7 +255,7 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor SELECT id FROM replication_queue WHERE id IN ( - SELECT DISTINCT FIRST_VALUE(queue.id) OVER (PARTITION BY lock_id, job->>'change' ORDER BY queue.created_at) + SELECT DISTINCT FIRST_VALUE(queue.id) OVER (PARTITION BY lock_id, change ORDER BY queue.created_at) FROM replication_queue AS queue JOIN lock ON queue.lock_id = lock.id WHERE queue.state IN ('ready', 'failed' ) @@ -287,12 +267,16 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor ) , job AS ( UPDATE replication_queue AS queue - SET attempt = CASE WHEN job->>'change' = 'delete_replica' THEN queue.attempt ELSE queue.attempt - 1 END + SET attempt = CASE WHEN queue.change = 'delete_replica' THEN queue.attempt ELSE queue.attempt - 1 END , state = 'in_progress' , updated_at = NOW() AT TIME ZONE 'UTC' FROM candidate WHERE queue.id = candidate.id - RETURNING queue.id, queue.state, queue.created_at, queue.updated_at, queue.lock_id, queue.attempt, queue.job, queue.meta + RETURNING queue.id, queue.state, queue.created_at, queue.updated_at, + queue.lock_id, queue.attempt, queue.meta, queue.change, + queue.repository_id, queue.replica_path, queue.relative_path, + queue.target_node_storage, queue.source_node_storage, queue.virtual_storage, + queue.params ) , track_job_lock AS ( INSERT INTO replication_queue_job_lock (job_id, lock_id, triggered_at) @@ -306,7 +290,9 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor FROM track_job_lock AS tracked WHERE lock.id = tracked.lock_id ) - SELECT id, state, created_at, updated_at, lock_id, attempt, job, meta + SELECT id, state, created_at, updated_at, lock_id, attempt, meta, change, + repository_id, replica_path, relative_path, target_node_storage, + source_node_storage, virtual_storage, params FROM job ORDER BY id` rows, err := rq.qc.QueryContext(ctx, query, virtualStorage, nodeStorage, count) @@ -353,7 +339,7 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J query := ` WITH existing AS ( - SELECT id, lock_id, updated_at, job + SELECT id, lock_id, updated_at, change, source_node_storage FROM replication_queue WHERE id = ANY($1) AND state = 'in_progress' @@ -374,9 +360,9 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J -- they are for the exact same repository AND queue.lock_id = existing.lock_id -- and created to apply exact same replication operation (gc, update, ...) - AND queue.job->>'change' = existing.job->>'change' + AND queue.change = existing.change -- from the same source storage (if applicable, as 'gc' has no source) - AND COALESCE(queue.job->>'source_node_storage', '') = COALESCE(existing.job->>'source_node_storage', '')) + AND COALESCE(queue.source_node_storage, '') = COALESCE(existing.source_node_storage, '')) ) ) RETURNING queue.id, queue.lock_id diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go index 96407f12574..45052fae55c 100644 --- a/internal/praefect/datastore/queue_test.go +++ b/internal/praefect/datastore/queue_test.go @@ -159,7 +159,7 @@ func TestPostgresReplicationEventQueue_Enqueue(t *testing.T) { Params: nil, }, } - insertRepository(t, db, ctx, eventType.Job.VirtualStorage, eventType.Job.RelativePath, eventType.Job.SourceNodeStorage) + repositoryID := insertRepository(t, db, ctx, eventType.Job.VirtualStorage, eventType.Job.RelativePath, eventType.Job.SourceNodeStorage) actualEvent, err := queue.Enqueue(ctx, eventType) // initial event require.NoError(t, err) @@ -173,6 +173,7 @@ func TestPostgresReplicationEventQueue_Enqueue(t *testing.T) { Attempt: 3, LockID: "praefect|gitaly-1|/project/path-1", Job: ReplicationJob{ + RepositoryID: repositoryID, Change: UpdateRepo, RelativePath: "/project/path-1", TargetNodeStorage: "gitaly-1", @@ -431,6 +432,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { LockID: "praefect-0|gitaly-2|/project/path-1", Job: ReplicationJob{ Change: RenameRepo, + RepositoryID: eventType1.Job.RepositoryID, RelativePath: "/project/path-1", TargetNodeStorage: "gitaly-2", SourceNodeStorage: "", @@ -1244,12 +1246,19 @@ func requireEvents(t *testing.T, ctx context.Context, db testdb.DB, expected []R exp[i].UpdatedAt = nil } - sqlStmt := `SELECT id, state, attempt, lock_id, job FROM replication_queue ORDER BY id` + sqlStmt := `SELECT id, state, created_at, updated_at, lock_id, attempt, meta, change, + repository_id, replica_path, relative_path, target_node_storage, + source_node_storage, virtual_storage, params + FROM replication_queue ORDER BY id` rows, err := db.QueryContext(ctx, sqlStmt) require.NoError(t, err) actual, err := scanReplicationEvents(rows) require.NoError(t, err) + for i := 0; i < len(actual); i++ { + actual[i].CreatedAt = time.Time{} + actual[i].UpdatedAt = nil + } require.Equal(t, exp, actual) } -- GitLab From c61dedb0af342fb1f21a2fa994aad9fb9873587c Mon Sep 17 00:00:00 2001 From: Pavlo Strokov Date: Wed, 16 Feb 2022 21:09:22 +0200 Subject: [PATCH 11/12] nodes: Respecting new constraint on repositories table Because tests were written with some inconsistency and because of the foreign key constraint between replication_queue and repositories table it is not possible anymore to create replication events for non-existing repositories. That is why the setup code is changed. --- internal/praefect/nodes/per_repository_test.go | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/internal/praefect/nodes/per_repository_test.go b/internal/praefect/nodes/per_repository_test.go index a0229c57331..19f69a7eb09 100644 --- a/internal/praefect/nodes/per_repository_test.go +++ b/internal/praefect/nodes/per_repository_test.go @@ -417,24 +417,6 @@ func TestPerRepositoryElector(t *testing.T) { }, }, existingJobs: []datastore.ReplicationEvent{ - { - State: datastore.JobStateReady, - Job: datastore.ReplicationJob{ - Change: datastore.DeleteReplica, - VirtualStorage: "wrong-virtual-storage", - RelativePath: "relative-path-1", - TargetNodeStorage: "gitaly-1", - }, - }, - { - State: datastore.JobStateReady, - Job: datastore.ReplicationJob{ - Change: datastore.DeleteReplica, - VirtualStorage: "virtual-storage-1", - RelativePath: "wrong-relative-path", - TargetNodeStorage: "gitaly-1", - }, - }, { State: datastore.JobStateReady, Job: datastore.ReplicationJob{ -- GitLab From bdd73865268ec2d237bf4cb6694c97e91bab8761 Mon Sep 17 00:00:00 2001 From: Pavlo Strokov Date: Fri, 18 Feb 2022 12:41:33 +0200 Subject: [PATCH 12/12] praefect: Repository should exist before scheduling events After introduction of the repository ID we use it for various operations where the relative path and virtual storage were used. One of the use cases is creation of the replication events. The injected repository ID is used to identify concrete repository later. Because now we have a foreign key we are not allowed to create replication events for repositories that are not exist yet in the database. That is why we use PostgresRepositoryStore to create a repository records before we enqueue any replication events. The change also contains fixes of some inconsistencies in setup logic as double-database creation, unneeded transactions usage. --- internal/praefect/coordinator_pg_test.go | 17 ++---- internal/praefect/coordinator_test.go | 25 ++++++-- internal/praefect/replicator_test.go | 73 ++++++++++++++---------- internal/praefect/server_test.go | 29 ++++++---- 4 files changed, 84 insertions(+), 60 deletions(-) diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go index e35da6b5d4b..c13fe0570a5 100644 --- a/internal/praefect/coordinator_pg_test.go +++ b/internal/praefect/coordinator_pg_test.go @@ -191,18 +191,11 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { txMgr := transactions.NewManager(conf) - tx := db.Begin(t) - defer tx.Rollback(t) - // set up the generations prior to transaction - rs := datastore.NewPostgresRepositoryStore(tx, conf.StorageNames()) + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) repoCreated := false for i, n := range tc.nodes { - if n.generation == datastore.GenerationUnknown { - continue - } - if !repoCreated { repoCreated = true require.NoError(t, rs.CreateRepository(ctx, 1, repo.StorageName, repo.RelativePath, repo.RelativePath, storageNodes[i].Storage, nil, nil, true, false)) @@ -211,7 +204,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { require.NoError(t, rs.SetGeneration(ctx, 1, storageNodes[i].Storage, repo.RelativePath, n.generation)) } - testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": conf.StorageNames()}) + testdb.SetHealthyNodes(t, ctx, db, map[string]map[string][]string{"praefect": conf.StorageNames()}) nodeSet, err := DialNodes( ctx, @@ -229,11 +222,11 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { rs, NewPerRepositoryRouter( nodeSet.Connections(), - nodes.NewPerRepositoryElector(tx), + nodes.NewPerRepositoryElector(db), StaticHealthChecker(conf.StorageNames()), NewLockedRandom(rand.New(rand.NewSource(0))), rs, - datastore.NewAssignmentStore(tx, conf.StorageNames()), + datastore.NewAssignmentStore(db, conf.StorageNames()), rs, nil, ), @@ -315,8 +308,6 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { require.Equal(t, n.expectedGeneration, gen, "node %d has wrong generation", i) } - tx.Commit(t) - replicationWaitGroup.Wait() for i, node := range tc.nodes { diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 4749b3acbbd..0bb224d3d3f 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -189,7 +189,7 @@ func TestStreamDirectorMutator(t *testing.T) { } testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": conf.StorageNames()}) - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(tx)) queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { assert.True(t, len(queueInterceptor.GetEnqueued()) < 2, "expected only one event to be created") return queue.Enqueue(ctx, event) @@ -322,8 +322,13 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) { txMgr := transactions.NewManager(conf) + db := testdb.New(t) + require.NoError(t, datastore.NewPostgresRepositoryStore(db, conf.StorageNames()). + CreateRepository(ctx, 1, repo.GetStorageName(), repo.GetRelativePath(), repo.GetRelativePath(), "primary", []string{"secondary"}, nil, true, false), + ) + coordinator := NewCoordinator( - datastore.NewPostgresReplicationEventQueue(testdb.New(t)), + datastore.NewPostgresReplicationEventQueue(db), rs, NewNodeManagerRouter(nodeMgr, rs), txMgr, @@ -869,7 +874,7 @@ func TestStreamDirector_repo_creation(t *testing.T) { assert.Equal(t, []string{unhealthySecondaryNode.Storage}, outdatedSecondaries) assert.Equal(t, tc.primaryStored, storePrimary) assert.Equal(t, tc.assignmentsStored, storeAssignments) - return nil + return datastore.NewPostgresRepositoryStore(db, conf.StorageNames()).CreateRepository(ctx, repoID, virtualStorage, relativePath, replicaPath, primary, updatedSecondaries, outdatedSecondaries, storePrimary, storeAssignments) }, } @@ -1078,7 +1083,8 @@ func TestAbsentCorrelationID(t *testing.T) { }, } - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) + db := testdb.New(t) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { assert.True(t, len(queueInterceptor.GetEnqueued()) < 2, "expected only one event to be created") return queue.Enqueue(ctx, event) @@ -1097,7 +1103,7 @@ func TestAbsentCorrelationID(t *testing.T) { defer nodeMgr.Stop() txMgr := transactions.NewManager(conf) - rs := datastore.MockRepositoryStore{} + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) coordinator := NewCoordinator( queueInterceptor, @@ -1553,7 +1559,14 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) { }) } + db := testdb.New(t) + queue := datastore.NewPostgresReplicationEventQueue(db) + require.NoError(t, datastore.NewPostgresRepositoryStore(db, praefectConfig.StorageNames()). + CreateRepository(ctx, 1, testhelper.DefaultStorageName, repoProto.GetRelativePath(), repoProto.GetRelativePath(), repoProto.GetStorageName(), nil, nil, true, false), + ) + praefectConn, _, cleanup := runPraefectServer(t, ctx, praefectConfig, buildOptions{ + withQueue: queue, // Set up a mock manager which sets up primary/secondaries and pretends that all nodes are // healthy. We need fixed roles and unhealthy nodes will not take part in transactions. withNodeMgr: &nodes.MockManager{ @@ -1569,7 +1582,7 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) { }, nil }, }, - // Set up a mock repsoitory store pretending that all nodes are consistent. Only consistent + // Set up a mock repository store pretending that all nodes are consistent. Only consistent // nodes will take part in transactions. withRepoStore: datastore.MockRepositoryStore{ GetReplicaPathFunc: func(ctx context.Context, repositoryID int64) (string, error) { diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 7cd48ff3a15..4018b51bda8 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -56,6 +56,8 @@ func TestReplMgr_ProcessBacklog(t *testing.T) { } func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) { + db := testdb.New(t) + primaryCfg, testRepoProto, testRepoPath := testcfg.BuildWithRepo(t, testcfg.WithStorages("primary")) testRepo := localrepo.NewTestRepo(t, primaryCfg, testRepoProto) primaryCfg.SocketPath = testserver.RunGitalyServer(t, primaryCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) @@ -155,7 +157,10 @@ func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) { logger := testhelper.NewDiscardingLogger(t) loggerHook := test.NewLocal(logger) - queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + require.NoError(t, rs.CreateRepository(ctx, repositoryID, conf.VirtualStorages[0].Name, testRepo.GetRelativePath(), testRepo.GetRelativePath(), shard.Primary.GetStorage(), nil, nil, true, false)) + + queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) queue.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) { cancel() // when it is called we know that replication is finished return queue.Acknowledge(ctx, state, ids) @@ -165,10 +170,6 @@ func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) { _, err = queue.Enqueue(ctx, events[0]) require.NoError(t, err) - db := testdb.New(t) - rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - require.NoError(t, rs.CreateRepository(ctx, repositoryID, conf.VirtualStorages[0].Name, testRepo.GetRelativePath(), testRepo.GetRelativePath(), shard.Primary.GetStorage(), nil, nil, true, false)) - replMgr := NewReplMgr( loggerEntry, conf.StorageNames(), @@ -282,6 +283,8 @@ func TestReplicatorDowngradeAttempt(t *testing.T) { func TestReplicator_PropagateReplicationJob(t *testing.T) { t.Parallel() + db := testdb.New(t) + primaryStorage, secondaryStorage := "internal-gitaly-0", "internal-gitaly-1" primCfg := testcfg.Build(t, testcfg.WithStorages(primaryStorage)) @@ -315,7 +318,7 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) { // unlinkat /tmp/gitaly-222007427/381349228/storages.d/internal-gitaly-1/+gitaly/state/path/to/repo: directory not empty // By using WaitGroup we are sure the test cleanup will be started after all replication // requests are completed, so no running cache IO operations happen. - queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) + queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) var wg sync.WaitGroup queue.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { wg.Add(1) @@ -375,6 +378,9 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) { StorageName: conf.VirtualStorages[0].Name, RelativePath: repositoryRelativePath, } + // We need to create repository record in the database before we can schedule replication events + repoStore := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + require.NoError(t, repoStore.CreateRepository(ctx, 1, conf.VirtualStorages[0].Name, repository.GetRelativePath(), repository.GetRelativePath(), repository.GetStorageName(), nil, nil, true, false)) //nolint:staticcheck _, err = repositoryClient.GarbageCollect(ctx, &gitalypb.GarbageCollectRequest{ @@ -685,6 +691,10 @@ func TestProcessBacklog_FailedJobs(t *testing.T) { } func testProcessBacklogFailedJobs(t *testing.T, ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + db := testdb.New(t) + primaryCfg, testRepo, _ := testcfg.BuildWithRepo(t, testcfg.WithStorages("default")) primaryAddr := testserver.RunGitalyServer(t, primaryCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) @@ -714,26 +724,30 @@ func testProcessBacklogFailedJobs(t *testing.T, ctx context.Context) { }, }, } - ctx, cancel := context.WithCancel(ctx) - - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) // this job exists to verify that replication works okJob := datastore.ReplicationJob{ - RepositoryID: 1, - Change: datastore.UpdateRepo, + RepositoryID: 1, + // The optimize_repository is used because we can't use update type. + // All update events won't be picked as they will be marked as done once the first is over. + Change: datastore.OptimizeRepository, RelativePath: testRepo.RelativePath, TargetNodeStorage: secondary.Storage, - SourceNodeStorage: primary.Storage, VirtualStorage: "praefect", } + + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + require.NoError(t, rs.CreateRepository(ctx, okJob.RepositoryID, okJob.VirtualStorage, okJob.RelativePath, okJob.RelativePath, okJob.SourceNodeStorage, nil, nil, true, false)) + + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) event1, err := queueInterceptor.Enqueue(ctx, datastore.ReplicationEvent{Job: okJob}) require.NoError(t, err) require.Equal(t, uint64(1), event1.ID) - // this job checks flow for replication event that fails + // this job checks flow for replication event that fails because of not existing source storage failJob := okJob - failJob.Change = "invalid-operation" + failJob.Change = datastore.UpdateRepo + failJob.SourceNodeStorage = "invalid" event2, err := queueInterceptor.Enqueue(ctx, datastore.ReplicationEvent{Job: failJob}) require.NoError(t, err) require.Equal(t, uint64(2), event2.ID) @@ -745,10 +759,6 @@ func testProcessBacklogFailedJobs(t *testing.T, ctx context.Context) { nodeMgr.Start(0, time.Hour) defer nodeMgr.Stop() - db := testdb.New(t) - rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - require.NoError(t, rs.CreateRepository(ctx, okJob.RepositoryID, okJob.VirtualStorage, okJob.RelativePath, okJob.RelativePath, okJob.SourceNodeStorage, nil, nil, true, false)) - replMgr := NewReplMgr( logEntry, conf.StorageNames(), @@ -792,6 +802,8 @@ func TestProcessBacklog_Success(t *testing.T) { func testProcessBacklogSuccess(t *testing.T, ctx context.Context) { ctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + db := testdb.New(t) primaryCfg, testRepo, _ := testcfg.BuildWithRepo(t, testcfg.WithStorages("primary")) primaryCfg.SocketPath = testserver.RunGitalyServer(t, primaryCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) @@ -825,7 +837,7 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) { }, } - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) queueInterceptor.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) { ackIDs, err := queue.Acknowledge(ctx, state, ids) if len(ids) > 0 { @@ -854,6 +866,9 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) { }, } + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + require.NoError(t, rs.CreateRepository(ctx, eventType1.Job.RepositoryID, eventType1.Job.VirtualStorage, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage, nil, nil, true, false)) + _, err := queueInterceptor.Enqueue(ctx, eventType1) require.NoError(t, err) @@ -869,6 +884,7 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) { // Rename replication job eventType2 := datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ + RepositoryID: 1, Change: datastore.RenameRepo, RelativePath: testRepo.GetRelativePath(), TargetNodeStorage: secondary.Storage, @@ -884,6 +900,7 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) { // Rename replication job eventType3 := datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ + RepositoryID: 1, Change: datastore.RenameRepo, RelativePath: renameTo1, TargetNodeStorage: secondary.Storage, @@ -904,10 +921,6 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) { nodeMgr.Start(0, time.Hour) defer nodeMgr.Stop() - db := testdb.New(t) - rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - require.NoError(t, rs.CreateRepository(ctx, eventType1.Job.RepositoryID, eventType1.Job.VirtualStorage, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage, nil, nil, true, false)) - replMgr := NewReplMgr( logEntry, conf.StorageNames(), @@ -934,6 +947,7 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) { func TestReplMgrProcessBacklog_OnlyHealthyNodes(t *testing.T) { t.Parallel() + db := testdb.New(t) conf := config.Config{ VirtualStorages: []*config.VirtualStorage{ { @@ -951,7 +965,7 @@ func TestReplMgrProcessBacklog_OnlyHealthyNodes(t *testing.T) { var mtx sync.Mutex expStorages := map[string]bool{conf.VirtualStorages[0].Nodes[0].Storage: true, conf.VirtualStorages[0].Nodes[2].Storage: true} - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) queueInterceptor.OnDequeue(func(_ context.Context, virtualStorageName string, storageName string, _ int, _ datastore.ReplicationEventQueue) ([]datastore.ReplicationEvent, error) { select { case <-ctx.Done(): @@ -1012,6 +1026,8 @@ func (m mockReplicator) Replicate(ctx context.Context, event datastore.Replicati func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(testhelper.Context(t)) + t.Cleanup(cancel) + db := testdb.New(t) const virtualStorage = "virtal-storage" const primaryStorage = "storage-1" @@ -1033,7 +1049,10 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) { }, } - queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t)) + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, "ignored", "ignored", primaryStorage, []string{secondaryStorage}, nil, true, false)) + + queue := datastore.NewPostgresReplicationEventQueue(db) _, err := queue.Enqueue(ctx, datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ RepositoryID: 1, @@ -1046,10 +1065,6 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) { }) require.NoError(t, err) - db := testdb.New(t) - rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, "ignored", "ignored", primaryStorage, []string{secondaryStorage}, nil, true, false)) - replMgr := NewReplMgr( testhelper.NewDiscardingLogEntry(t), conf.StorageNames(), diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index c3681a34c42..4de7e0fc66d 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -521,7 +521,14 @@ func TestRemoveRepository(t *testing.T) { verifyReposExistence(t, codes.OK) - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) + virtualRepo := proto.Clone(repos[0][0]).(*gitalypb.Repository) + virtualRepo.StorageName = praefectCfg.VirtualStorages[0].Name + + db := testdb.New(t) + repositoryStore := datastore.NewPostgresRepositoryStore(db, praefectCfg.StorageNames()) + require.NoError(t, repositoryStore.CreateRepository(ctx, 1, virtualRepo.GetStorageName(), virtualRepo.GetRelativePath(), virtualRepo.GetRelativePath(), gitalyCfgs[0].Storages[0].Name, nil, nil, true, false)) + + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) repoStore := defaultRepoStore(praefectCfg) txMgr := defaultTxMgr(praefectCfg) nodeMgr, err := nodes.NewManager(testhelper.NewDiscardingLogEntry(t), praefectCfg, nil, @@ -541,15 +548,15 @@ func TestRemoveRepository(t *testing.T) { GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { return relativePath, nil, nil }, + GetRepositoryIDFunc: func(ctx context.Context, virtualStorage, relativePath string) (int64, error) { + return repositoryStore.GetRepositoryID(ctx, virtualStorage, relativePath) + }, }, withNodeMgr: nodeMgr, withTxMgr: txMgr, }) defer cleanup() - virtualRepo := proto.Clone(repos[0][0]).(*gitalypb.Repository) - virtualRepo.StorageName = praefectCfg.VirtualStorages[0].Name - _, err = gitalypb.NewRepositoryServiceClient(cc).RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{ Repository: virtualRepo, }) @@ -616,30 +623,28 @@ func TestRenameRepository(t *testing.T) { repoPaths[i] = filepath.Join(gitalyCfg.Storages[0].Path, relativePath) } - evq := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) - - tx := testdb.New(t).Begin(t) - defer tx.Rollback(t) + db := testdb.New(t) + evq := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) - rs := datastore.NewPostgresRepositoryStore(tx, nil) + rs := datastore.NewPostgresRepositoryStore(db, nil) require.NoError(t, rs.CreateRepository(ctx, 1, "praefect", repo.RelativePath, repo.RelativePath, "gitaly-1", []string{"gitaly-2", "gitaly-3"}, nil, true, false)) nodeSet, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, nil, nil) require.NoError(t, err) defer nodeSet.Close() - testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": praefectCfg.StorageNames()}) + testdb.SetHealthyNodes(t, ctx, db, map[string]map[string][]string{"praefect": praefectCfg.StorageNames()}) cc, _, cleanup := runPraefectServer(t, ctx, praefectCfg, buildOptions{ withQueue: evq, withRepoStore: rs, withRouter: NewPerRepositoryRouter( nodeSet.Connections(), - nodes.NewPerRepositoryElector(tx), + nodes.NewPerRepositoryElector(db), StaticHealthChecker(praefectCfg.StorageNames()), NewLockedRandom(rand.New(rand.NewSource(0))), rs, - datastore.NewAssignmentStore(tx, praefectCfg.StorageNames()), + datastore.NewAssignmentStore(db, praefectCfg.StorageNames()), rs, nil, ), -- GitLab