diff --git a/_support/praefect-schema.sql b/_support/praefect-schema.sql index b69a2f0c670d4bfff51df94278a6607244a6f213..a356d42393a8ebac1f1cc4d7e9218f4b45c0a22f 100644 --- a/_support/praefect-schema.sql +++ b/_support/praefect-schema.sql @@ -50,6 +50,28 @@ CREATE TYPE public.replication_job_state AS ENUM ( ); +-- +-- Name: replication_job_type; Type: TYPE; Schema: public; Owner: - +-- + +CREATE TYPE public.replication_job_type AS ENUM ( + 'update', + 'create', + 'delete', + 'delete_replica', + 'rename', + 'gc', + 'repack_full', + 'repack_incremental', + 'cleanup', + 'pack_refs', + 'write_commit_graph', + 'midx_repack', + 'optimize_repository', + 'prune_unreachable_objects' +); + + -- -- Name: notify_on_change(); Type: FUNCTION; Schema: public; Owner: - -- @@ -94,6 +116,51 @@ CREATE FUNCTION public.notify_on_change() RETURNS trigger $$; +-- +-- Name: remove_queue_lock_on_repository_removal(); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.remove_queue_lock_on_repository_removal() RETURNS trigger + LANGUAGE plpgsql + AS $$ + BEGIN + DELETE FROM replication_queue_lock + WHERE id LIKE (OLD.virtual_storage || '|%|' || OLD.relative_path); + RETURN NULL; + END; + $$; + + +-- +-- Name: replication_queue_flatten_job(); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.replication_queue_flatten_job() RETURNS trigger + LANGUAGE plpgsql + AS $$ + BEGIN + NEW.change := (NEW.job->>'change')::REPLICATION_JOB_TYPE; + NEW.repository_id := COALESCE( + -- For the old jobs the repository_id field may be 'null' or not set at all. + -- repository_id field could have 0 if event was created by reconciler before + -- repositories table was populated with valid repository_id. + CASE WHEN (NEW.job->>'repository_id')::BIGINT = 0 THEN NULL ELSE (NEW.job->>'repository_id')::BIGINT END, + (SELECT repositories.repository_id FROM repositories WHERE repositories.virtual_storage = NEW.job->>'virtual_storage' AND repositories.relative_path = NEW.job->>'relative_path'), + 0 + ); + -- The reconciler doesn't populate replica_path field that is why we need make sure + -- we have at least an empty value for the column, not to break scan operations. + NEW.replica_path := COALESCE(NEW.job->>'replica_path', ''); + NEW.relative_path := NEW.job->>'relative_path'; + NEW.target_node_storage := NEW.job->>'target_node_storage'; + NEW.source_node_storage := COALESCE(NEW.job->>'source_node_storage', ''); + NEW.virtual_storage := NEW.job->>'virtual_storage'; + NEW.params := (NEW.job->>'params')::JSONB; + RETURN NEW; + END; + $$; + + SET default_tablespace = ''; SET default_with_oids = false; @@ -168,7 +235,15 @@ CREATE TABLE public.replication_queue ( attempt integer DEFAULT 3 NOT NULL, lock_id text, job jsonb, - meta jsonb + meta jsonb, + change public.replication_job_type, + repository_id bigint DEFAULT 0 NOT NULL, + replica_path text DEFAULT ''::text NOT NULL, + relative_path text, + target_node_storage text, + source_node_storage text DEFAULT ''::text NOT NULL, + virtual_storage text, + params jsonb ); @@ -361,8 +436,8 @@ CREATE VIEW public.valid_primaries AS JOIN public.healthy_storages USING (virtual_storage, storage)) LEFT JOIN public.repository_assignments USING (repository_id, storage)) WHERE (NOT (EXISTS ( SELECT - FROM public.replication_queue - WHERE ((replication_queue.state <> ALL (ARRAY['completed'::public.replication_job_state, 'dead'::public.replication_job_state, 'cancelled'::public.replication_job_state])) AND ((replication_queue.job ->> 'change'::text) = 'delete_replica'::text) AND (((replication_queue.job ->> 'repository_id'::text))::bigint = repositories.repository_id) AND ((replication_queue.job ->> 'target_node_storage'::text) = storage_repositories.storage)))))) candidates + FROM public.replication_queue queue + WHERE ((queue.state <> ALL (ARRAY['completed'::public.replication_job_state, 'dead'::public.replication_job_state, 'cancelled'::public.replication_job_state])) AND (queue.change = 'delete_replica'::public.replication_job_type) AND (queue.repository_id = storage_repositories.repository_id) AND (queue.target_node_storage = storage_repositories.storage)))))) candidates WHERE candidates.eligible; @@ -496,14 +571,14 @@ ALTER TABLE ONLY public.virtual_storages -- Name: delete_replica_unique_index; Type: INDEX; Schema: public; Owner: - -- -CREATE UNIQUE INDEX delete_replica_unique_index ON public.replication_queue USING btree (((job ->> 'virtual_storage'::text)), ((job ->> 'relative_path'::text))) WHERE ((state <> ALL (ARRAY['completed'::public.replication_job_state, 'cancelled'::public.replication_job_state, 'dead'::public.replication_job_state])) AND ((job ->> 'change'::text) = 'delete_replica'::text)); +CREATE UNIQUE INDEX delete_replica_unique_index ON public.replication_queue USING btree (virtual_storage, relative_path) WHERE ((state <> ALL (ARRAY['completed'::public.replication_job_state, 'cancelled'::public.replication_job_state, 'dead'::public.replication_job_state])) AND (change = 'delete_replica'::public.replication_job_type)); -- -- Name: replication_queue_target_index; Type: INDEX; Schema: public; Owner: - -- -CREATE INDEX replication_queue_target_index ON public.replication_queue USING btree (((job ->> 'virtual_storage'::text)), ((job ->> 'relative_path'::text)), ((job ->> 'target_node_storage'::text)), ((job ->> 'change'::text))) WHERE (state <> ALL (ARRAY['completed'::public.replication_job_state, 'cancelled'::public.replication_job_state, 'dead'::public.replication_job_state])); +CREATE INDEX replication_queue_target_index ON public.replication_queue USING btree (virtual_storage, relative_path, target_node_storage, change) WHERE (state <> ALL (ARRAY['completed'::public.replication_job_state, 'cancelled'::public.replication_job_state, 'dead'::public.replication_job_state])); -- @@ -559,7 +634,7 @@ CREATE UNIQUE INDEX storage_repositories_new_pkey ON public.storage_repositories -- Name: virtual_target_on_replication_queue_idx; Type: INDEX; Schema: public; Owner: - -- -CREATE INDEX virtual_target_on_replication_queue_idx ON public.replication_queue USING btree (((job ->> 'virtual_storage'::text)), ((job ->> 'target_node_storage'::text))); +CREATE INDEX virtual_target_on_replication_queue_idx ON public.replication_queue USING btree (virtual_storage, target_node_storage); -- @@ -590,12 +665,26 @@ CREATE TRIGGER notify_on_insert AFTER INSERT ON public.storage_repositories REFE CREATE TRIGGER notify_on_update AFTER UPDATE ON public.storage_repositories REFERENCING OLD TABLE AS old NEW TABLE AS new FOR EACH STATEMENT EXECUTE PROCEDURE public.notify_on_change('storage_repositories_updates'); +-- +-- Name: repositories remove_queue_lock_on_repository_removal; Type: TRIGGER; Schema: public; Owner: - +-- + +CREATE TRIGGER remove_queue_lock_on_repository_removal AFTER DELETE ON public.repositories FOR EACH ROW EXECUTE PROCEDURE public.remove_queue_lock_on_repository_removal(); + + +-- +-- Name: replication_queue replication_queue_flatten_job; Type: TRIGGER; Schema: public; Owner: - +-- + +CREATE TRIGGER replication_queue_flatten_job BEFORE INSERT ON public.replication_queue FOR EACH ROW EXECUTE PROCEDURE public.replication_queue_flatten_job(); + + -- -- Name: replication_queue_job_lock replication_queue_job_lock_job_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - -- ALTER TABLE ONLY public.replication_queue_job_lock - ADD CONSTRAINT replication_queue_job_lock_job_id_fkey FOREIGN KEY (job_id) REFERENCES public.replication_queue(id); + ADD CONSTRAINT replication_queue_job_lock_job_id_fkey FOREIGN KEY (job_id) REFERENCES public.replication_queue(id) ON DELETE CASCADE; -- @@ -603,7 +692,15 @@ ALTER TABLE ONLY public.replication_queue_job_lock -- ALTER TABLE ONLY public.replication_queue_job_lock - ADD CONSTRAINT replication_queue_job_lock_lock_id_fkey FOREIGN KEY (lock_id) REFERENCES public.replication_queue_lock(id); + ADD CONSTRAINT replication_queue_job_lock_lock_id_fkey FOREIGN KEY (lock_id) REFERENCES public.replication_queue_lock(id) ON DELETE CASCADE; + + +-- +-- Name: replication_queue replication_queue_repository_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.replication_queue + ADD CONSTRAINT replication_queue_repository_id_fkey FOREIGN KEY (repository_id) REFERENCES public.repositories(repository_id) ON DELETE CASCADE; -- diff --git a/cmd/praefect/subcmd_remove_repository.go b/cmd/praefect/subcmd_remove_repository.go index 49b049615c3bafab231fcd27dc267729fd80c21c..26fb1c8a8bc7d949abbf323f7c699ab70a38d2de 100644 --- a/cmd/praefect/subcmd_remove_repository.go +++ b/cmd/praefect/subcmd_remove_repository.go @@ -12,7 +12,6 @@ import ( "time" "github.com/sirupsen/logrus" - "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" @@ -150,14 +149,6 @@ func (cmd *removeRepository) exec(ctx context.Context, logger logrus.FieldLogger fmt.Fprintln(cmd.w, "Removed repository metadata from the database.") } - fmt.Fprintln(cmd.w, "Removing replication events...") - ticker := helper.NewTimerTicker(time.Second) - defer ticker.Stop() - if err := cmd.removeReplicationEvents(ctx, logger, db, ticker); err != nil { - return fmt.Errorf("remove scheduled replication events: %w", err) - } - fmt.Fprintln(cmd.w, "Replication event removal completed.") - // We should try to remove repository from each of gitaly nodes. fmt.Fprintln(cmd.w, "Removing repository directly from gitaly nodes...") cmd.removeRepositoryForEachGitaly(ctx, cfg, logger) @@ -249,49 +240,6 @@ func (cmd *removeRepository) removeRepository(ctx context.Context, repo *gitalyp return true, nil } -func (cmd *removeRepository) removeReplicationEvents(ctx context.Context, logger logrus.FieldLogger, db *sql.DB, ticker helper.Ticker) error { - // Wait for the completion of the repository replication jobs. - // As some of them could be a repository creation jobs we need to remove those newly created - // repositories after replication finished. - start := time.Now() - var tick helper.Ticker - for found := true; found; { - if tick != nil { - tick.Reset() - <-tick.C() - } else { - tick = ticker - } - - if int(time.Since(start).Seconds())%5 == 0 { - logger.Debug("awaiting for the repository in_progress replication jobs to complete...") - } - row := db.QueryRowContext( - ctx, - `WITH remove_replication_jobs AS ( - DELETE FROM replication_queue - WHERE job->>'virtual_storage' = $1 - AND job->>'relative_path' = $2 - -- Do not remove ongoing replication events as we need to wait - -- for their completion. - AND state != 'in_progress' - ) - SELECT EXISTS( - SELECT - FROM replication_queue - WHERE job->>'virtual_storage' = $1 - AND job->>'relative_path' = $2 - AND state = 'in_progress')`, - cmd.virtualStorage, - cmd.relativePath, - ) - if err := row.Scan(&found); err != nil { - return fmt.Errorf("scan in progress jobs: %w", err) - } - } - return nil -} - func (cmd *removeRepository) removeNode( ctx context.Context, logger logrus.FieldLogger, diff --git a/cmd/praefect/subcmd_remove_repository_test.go b/cmd/praefect/subcmd_remove_repository_test.go index 49bc60fd743f76d21460340af4e2b73c73b2fc69..fd4799071834524d876619d30662c01674357477 100644 --- a/cmd/praefect/subcmd_remove_repository_test.go +++ b/cmd/praefect/subcmd_remove_repository_test.go @@ -16,7 +16,6 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/client" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" - "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" @@ -266,106 +265,3 @@ func requireNoDatabaseInfo(t *testing.T, db testdb.DB, cmd *removeRepository) { ).Scan(&storageRowExists)) require.False(t, storageRowExists) } - -func TestRemoveRepository_removeReplicationEvents(t *testing.T) { - t.Parallel() - const ( - virtualStorage = "praefect" - relativePath = "relative_path/to/repo.git" - ) - ctx := testhelper.Context(t) - - db := testdb.New(t) - - queue := datastore.NewPostgresReplicationEventQueue(db) - - // Set replication event in_progress. - inProgressEvent, err := queue.Enqueue(ctx, datastore.ReplicationEvent{ - Job: datastore.ReplicationJob{ - Change: datastore.CreateRepo, - VirtualStorage: virtualStorage, - TargetNodeStorage: "gitaly-2", - RelativePath: relativePath, - }, - }) - require.NoError(t, err) - inProgress1, err := queue.Dequeue(ctx, virtualStorage, "gitaly-2", 10) - require.NoError(t, err) - require.Len(t, inProgress1, 1) - - // New event - events in the 'ready' state should be removed. - _, err = queue.Enqueue(ctx, datastore.ReplicationEvent{ - Job: datastore.ReplicationJob{ - Change: datastore.UpdateRepo, - VirtualStorage: virtualStorage, - TargetNodeStorage: "gitaly-3", - SourceNodeStorage: "gitaly-1", - RelativePath: relativePath, - }, - }) - require.NoError(t, err) - - // Failed event - should be removed as well. - failedEvent, err := queue.Enqueue(ctx, datastore.ReplicationEvent{ - Job: datastore.ReplicationJob{ - Change: datastore.UpdateRepo, - VirtualStorage: virtualStorage, - TargetNodeStorage: "gitaly-4", - SourceNodeStorage: "gitaly-0", - RelativePath: relativePath, - }, - }) - require.NoError(t, err) - inProgress2, err := queue.Dequeue(ctx, virtualStorage, "gitaly-4", 10) - require.NoError(t, err) - require.Len(t, inProgress2, 1) - // Acknowledge with failed status, so it will remain in the database for the next processing - // attempt or until it is deleted by the 'removeReplicationEvents' method. - acks2, err := queue.Acknowledge(ctx, datastore.JobStateFailed, []uint64{inProgress2[0].ID}) - require.NoError(t, err) - require.Equal(t, []uint64{inProgress2[0].ID}, acks2) - - ticker := helper.NewManualTicker() - defer ticker.Stop() - - errChan := make(chan error, 1) - go func() { - cmd := &removeRepository{virtualStorage: virtualStorage, relativePath: relativePath} - errChan <- cmd.removeReplicationEvents(ctx, testhelper.NewDiscardingLogger(t), db.DB, ticker) - }() - - ticker.Tick() - ticker.Tick() - ticker.Tick() // blocks until previous tick is consumed - - // Now we acknowledge in_progress job, so it stops the processing loop or the command. - acks, err := queue.Acknowledge(ctx, datastore.JobStateCompleted, []uint64{inProgressEvent.ID}) - if assert.NoError(t, err) { - assert.Equal(t, []uint64{inProgress1[0].ID}, acks) - } - - ticker.Tick() - - timeout := time.After(time.Minute) - for checkChan, exists := errChan, true; exists; { - select { - case err := <-checkChan: - require.NoError(t, err) - close(errChan) - checkChan = nil - case <-timeout: - require.FailNow(t, "timeout reached, looks like the command hasn't made any progress") - case <-time.After(50 * time.Millisecond): - // Wait until job removed - row := db.QueryRow(`SELECT EXISTS(SELECT FROM replication_queue WHERE id = $1)`, failedEvent.ID) - require.NoError(t, row.Scan(&exists)) - } - } - // Once there are no in_progress jobs anymore the method returns. - require.NoError(t, <-errChan) - - var notExists bool - row := db.QueryRow(`SELECT NOT EXISTS(SELECT FROM replication_queue)`) - require.NoError(t, row.Scan(¬Exists)) - require.True(t, notExists) -} diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go index e35da6b5d4be7d1b8bdcad1992dae7458bc086b9..c13fe0570a51461f5c178c7515ca01baa433dd34 100644 --- a/internal/praefect/coordinator_pg_test.go +++ b/internal/praefect/coordinator_pg_test.go @@ -191,18 +191,11 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { txMgr := transactions.NewManager(conf) - tx := db.Begin(t) - defer tx.Rollback(t) - // set up the generations prior to transaction - rs := datastore.NewPostgresRepositoryStore(tx, conf.StorageNames()) + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) repoCreated := false for i, n := range tc.nodes { - if n.generation == datastore.GenerationUnknown { - continue - } - if !repoCreated { repoCreated = true require.NoError(t, rs.CreateRepository(ctx, 1, repo.StorageName, repo.RelativePath, repo.RelativePath, storageNodes[i].Storage, nil, nil, true, false)) @@ -211,7 +204,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { require.NoError(t, rs.SetGeneration(ctx, 1, storageNodes[i].Storage, repo.RelativePath, n.generation)) } - testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": conf.StorageNames()}) + testdb.SetHealthyNodes(t, ctx, db, map[string]map[string][]string{"praefect": conf.StorageNames()}) nodeSet, err := DialNodes( ctx, @@ -229,11 +222,11 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { rs, NewPerRepositoryRouter( nodeSet.Connections(), - nodes.NewPerRepositoryElector(tx), + nodes.NewPerRepositoryElector(db), StaticHealthChecker(conf.StorageNames()), NewLockedRandom(rand.New(rand.NewSource(0))), rs, - datastore.NewAssignmentStore(tx, conf.StorageNames()), + datastore.NewAssignmentStore(db, conf.StorageNames()), rs, nil, ), @@ -315,8 +308,6 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { require.Equal(t, n.expectedGeneration, gen, "node %d has wrong generation", i) } - tx.Commit(t) - replicationWaitGroup.Wait() for i, node := range tc.nodes { diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 4749b3acbbdfda093c64ce54eb79c1170ce6b662..0bb224d3d3f222f570f87e263df337acbdae10ee 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -189,7 +189,7 @@ func TestStreamDirectorMutator(t *testing.T) { } testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": conf.StorageNames()}) - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(tx)) queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { assert.True(t, len(queueInterceptor.GetEnqueued()) < 2, "expected only one event to be created") return queue.Enqueue(ctx, event) @@ -322,8 +322,13 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) { txMgr := transactions.NewManager(conf) + db := testdb.New(t) + require.NoError(t, datastore.NewPostgresRepositoryStore(db, conf.StorageNames()). + CreateRepository(ctx, 1, repo.GetStorageName(), repo.GetRelativePath(), repo.GetRelativePath(), "primary", []string{"secondary"}, nil, true, false), + ) + coordinator := NewCoordinator( - datastore.NewPostgresReplicationEventQueue(testdb.New(t)), + datastore.NewPostgresReplicationEventQueue(db), rs, NewNodeManagerRouter(nodeMgr, rs), txMgr, @@ -869,7 +874,7 @@ func TestStreamDirector_repo_creation(t *testing.T) { assert.Equal(t, []string{unhealthySecondaryNode.Storage}, outdatedSecondaries) assert.Equal(t, tc.primaryStored, storePrimary) assert.Equal(t, tc.assignmentsStored, storeAssignments) - return nil + return datastore.NewPostgresRepositoryStore(db, conf.StorageNames()).CreateRepository(ctx, repoID, virtualStorage, relativePath, replicaPath, primary, updatedSecondaries, outdatedSecondaries, storePrimary, storeAssignments) }, } @@ -1078,7 +1083,8 @@ func TestAbsentCorrelationID(t *testing.T) { }, } - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) + db := testdb.New(t) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { assert.True(t, len(queueInterceptor.GetEnqueued()) < 2, "expected only one event to be created") return queue.Enqueue(ctx, event) @@ -1097,7 +1103,7 @@ func TestAbsentCorrelationID(t *testing.T) { defer nodeMgr.Stop() txMgr := transactions.NewManager(conf) - rs := datastore.MockRepositoryStore{} + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) coordinator := NewCoordinator( queueInterceptor, @@ -1553,7 +1559,14 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) { }) } + db := testdb.New(t) + queue := datastore.NewPostgresReplicationEventQueue(db) + require.NoError(t, datastore.NewPostgresRepositoryStore(db, praefectConfig.StorageNames()). + CreateRepository(ctx, 1, testhelper.DefaultStorageName, repoProto.GetRelativePath(), repoProto.GetRelativePath(), repoProto.GetStorageName(), nil, nil, true, false), + ) + praefectConn, _, cleanup := runPraefectServer(t, ctx, praefectConfig, buildOptions{ + withQueue: queue, // Set up a mock manager which sets up primary/secondaries and pretends that all nodes are // healthy. We need fixed roles and unhealthy nodes will not take part in transactions. withNodeMgr: &nodes.MockManager{ @@ -1569,7 +1582,7 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) { }, nil }, }, - // Set up a mock repsoitory store pretending that all nodes are consistent. Only consistent + // Set up a mock repository store pretending that all nodes are consistent. Only consistent // nodes will take part in transactions. withRepoStore: datastore.MockRepositoryStore{ GetReplicaPathFunc: func(ctx context.Context, repositoryID int64) (string, error) { diff --git a/internal/praefect/datastore/collector.go b/internal/praefect/datastore/collector.go index 0b93bd760988e79713303781bb55962942b929ae..c98dcfac16add1b83bce5c205deebadc44c21046 100644 --- a/internal/praefect/datastore/collector.go +++ b/internal/praefect/datastore/collector.go @@ -148,9 +148,9 @@ func (q *QueueDepthCollector) Collect(ch chan<- prometheus.Metric) { defer cancel() rows, err := q.db.QueryContext(ctx, ` -SELECT job->>'virtual_storage', job->>'target_node_storage', state, COUNT(*) +SELECT virtual_storage, target_node_storage, state, COUNT(*) FROM replication_queue -GROUP BY job->>'virtual_storage', job->>'target_node_storage', state +GROUP BY virtual_storage, target_node_storage, state `) if err != nil { q.log.WithError(err).Error("failed to query queue depth metrics") diff --git a/internal/praefect/datastore/collector_test.go b/internal/praefect/datastore/collector_test.go index 3091c83219b0eef1e64f63447bfef5db90eafb48..d59f2112fdd16623bfdded413b56c9078164235e 100644 --- a/internal/praefect/datastore/collector_test.go +++ b/internal/praefect/datastore/collector_test.go @@ -269,15 +269,17 @@ func TestRepositoryStoreCollector_ReplicationQueueDepth(t *testing.T) { readyJobs := 5 for virtualStorage, nodes := range storageNames { for i := 0; i < readyJobs; i++ { + job := ReplicationJob{ + Change: UpdateRepo, + RelativePath: "/project/path-1", + TargetNodeStorage: nodes[1], + SourceNodeStorage: nodes[0], + VirtualStorage: virtualStorage, + Params: nil, + } + insertRepository(t, db, ctx, job.VirtualStorage, job.RelativePath, job.SourceNodeStorage) _, err := queue.Enqueue(ctx, ReplicationEvent{ - Job: ReplicationJob{ - Change: UpdateRepo, - RelativePath: "/project/path-1", - TargetNodeStorage: nodes[1], - SourceNodeStorage: nodes[0], - VirtualStorage: virtualStorage, - Params: nil, - }, + Job: job, }) require.NoError(t, err) } diff --git a/internal/praefect/datastore/glsql/postgres.go b/internal/praefect/datastore/glsql/postgres.go index 24c4093d6307ca1306d04a714643599f167ed85d..381ba2562383dffdfc52f2451d9609d0e50db10d 100644 --- a/internal/praefect/datastore/glsql/postgres.go +++ b/internal/praefect/datastore/glsql/postgres.go @@ -299,6 +299,13 @@ func IsUniqueViolation(err error, constraint string) bool { return isPgError(err, "23505", []errorCondition{withConstraintName(constraint)}) } +// IsForeignKeyViolation returns true if an error is a foreign key violation. +func IsForeignKeyViolation(err error, constraint string) bool { + // https://www.postgresql.org/docs/11/errcodes-appendix.html + // foreign_key_violation + return isPgError(err, "23503", []errorCondition{withConstraintName(constraint)}) +} + func isPgError(err error, code string, conditions []errorCondition) bool { var pgErr *pgconn.PgError if errors.As(err, &pgErr) && pgErr.Code == code { diff --git a/internal/praefect/datastore/glsql/postgres_test.go b/internal/praefect/datastore/glsql/postgres_test.go index 039145a5fd3b86f554e2e0779c95c5a565d1416e..8a1b49ffbfce00a39c817d57fe00dbf5097433b5 100644 --- a/internal/praefect/datastore/glsql/postgres_test.go +++ b/internal/praefect/datastore/glsql/postgres_test.go @@ -324,6 +324,52 @@ func TestIsUniqueViolation(t *testing.T) { } } +func TestIsForeignKeyViolation(t *testing.T) { + for _, tc := range []struct { + desc string + err error + constr string + exp bool + }{ + { + desc: "nil input", + err: nil, + exp: false, + }, + { + desc: "wrong error type", + err: assert.AnError, + exp: false, + }, + { + desc: "wrong code", + err: &pgconn.PgError{Code: "stub"}, + exp: false, + }, + { + desc: "unique violation", + err: &pgconn.PgError{Code: "23503"}, + exp: true, + }, + { + desc: "wrapped unique violation", + err: fmt.Errorf("stub: %w", &pgconn.PgError{Code: "23503"}), + exp: true, + }, + { + desc: "unique violation with accepted conditions", + err: &pgconn.PgError{Code: "23503", ConstraintName: "cname"}, + constr: "cname", + exp: true, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + res := glsql.IsForeignKeyViolation(tc.err, tc.constr) + require.Equal(t, tc.exp, res) + }) + } +} + func TestMigrateSome(t *testing.T) { db := testdb.New(t) dbCfg := testdb.GetConfig(t, db.Name) diff --git a/internal/praefect/datastore/migrations/20220207143330_flatten_queue_job.go b/internal/praefect/datastore/migrations/20220207143330_flatten_queue_job.go new file mode 100644 index 0000000000000000000000000000000000000000..57a2e5688503c0d0ae42dab327c95296ec721ffa --- /dev/null +++ b/internal/praefect/datastore/migrations/20220207143330_flatten_queue_job.go @@ -0,0 +1,73 @@ +package migrations + +import migrate "github.com/rubenv/sql-migrate" + +func init() { + m := &migrate.Migration{ + Id: "20220207143330_flatten_queue_job", + Up: []string{ + `CREATE TYPE REPLICATION_JOB_TYPE AS ENUM('update','create','delete','delete_replica','rename','gc','repack_full','repack_incremental','cleanup','pack_refs','write_commit_graph','midx_repack','optimize_repository', 'prune_unreachable_objects')`, + + // The repository_id, replica_path and source_node_storage columns could have + // NULL values for an existing events, but because a non-pointer receivers + // are used to scan them in go code we should prevent situation when NULL is + // returned from the database as scan will fail in that case. + `ALTER TABLE replication_queue + ADD COLUMN change REPLICATION_JOB_TYPE, + ADD COLUMN repository_id BIGINT NOT NULL DEFAULT 0, + ADD COLUMN replica_path TEXT NOT NULL DEFAULT '', + ADD COLUMN relative_path TEXT, + ADD COLUMN target_node_storage TEXT, + ADD COLUMN source_node_storage TEXT NOT NULL DEFAULT '', + ADD COLUMN virtual_storage TEXT, + ADD COLUMN params JSONB`, + + `-- +migrate StatementBegin + CREATE OR REPLACE FUNCTION replication_queue_flatten_job() RETURNS TRIGGER AS $$ + BEGIN + NEW.change := (NEW.job->>'change')::REPLICATION_JOB_TYPE; + NEW.repository_id := COALESCE( + -- For the old jobs the repository_id field may be 'null' or not set at all. + -- repository_id field could have 0 if event was created by reconciler before + -- repositories table was populated with valid repository_id. + CASE WHEN (NEW.job->>'repository_id')::BIGINT = 0 THEN NULL ELSE (NEW.job->>'repository_id')::BIGINT END, + (SELECT repositories.repository_id FROM repositories WHERE repositories.virtual_storage = NEW.job->>'virtual_storage' AND repositories.relative_path = NEW.job->>'relative_path'), + 0 + ); + -- The reconciler doesn't populate replica_path field that is why we need make sure + -- we have at least an empty value for the column, not to break scan operations. + NEW.replica_path := COALESCE(NEW.job->>'replica_path', ''); + NEW.relative_path := NEW.job->>'relative_path'; + NEW.target_node_storage := NEW.job->>'target_node_storage'; + NEW.source_node_storage := COALESCE(NEW.job->>'source_node_storage', ''); + NEW.virtual_storage := NEW.job->>'virtual_storage'; + NEW.params := (NEW.job->>'params')::JSONB; + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + -- +migrate StatementEnd`, + + `CREATE TRIGGER replication_queue_flatten_job BEFORE INSERT ON replication_queue + FOR EACH ROW EXECUTE PROCEDURE replication_queue_flatten_job()`, + }, + Down: []string{ + `DROP TRIGGER replication_queue_flatten_job ON replication_queue`, + + `DROP FUNCTION replication_queue_flatten_job`, + + `ALTER TABLE replication_queue + DROP COLUMN change, + DROP COLUMN repository_id, + DROP COLUMN replica_path, + DROP COLUMN relative_path, + DROP COLUMN target_node_storage, + DROP COLUMN source_node_storage, + DROP COLUMN virtual_storage, + DROP COLUMN params`, + + `DROP TYPE REPLICATION_JOB_TYPE`, + }, + } + + allMigrations = append(allMigrations, m) +} diff --git a/internal/praefect/datastore/migrations/20220211161203_json_job_to_columns.go b/internal/praefect/datastore/migrations/20220211161203_json_job_to_columns.go new file mode 100644 index 0000000000000000000000000000000000000000..0336ae058130049cdb8b3594498eab67d9e576e2 --- /dev/null +++ b/internal/praefect/datastore/migrations/20220211161203_json_job_to_columns.go @@ -0,0 +1,140 @@ +package migrations + +import migrate "github.com/rubenv/sql-migrate" + +func init() { + m := &migrate.Migration{ + Id: "20220211161203_json_job_to_columns", + Up: []string{ + // 1. The replication_queue was extended with set of the new columns to cover + // job JSON struct. This query update all existing rows to fulfil newly + // added columns with data from the job column. + `UPDATE replication_queue SET + change = (queue.job->>'change')::REPLICATION_JOB_TYPE, + repository_id = COALESCE( + -- For the old jobs the repository_id may be 'null' as we didn't have migration that + -- populates those with the value from the repositories table. + -- The repository_id also may be a 0 in case the job was created before repositories table was fulfilled + -- and event was created by reconciler. + CASE WHEN (queue.job->>'repository_id')::BIGINT = 0 THEN NULL ELSE (queue.job->>'repository_id')::BIGINT END, + (SELECT repositories.repository_id FROM repositories WHERE repositories.virtual_storage = queue.job->>'virtual_storage' AND repositories.relative_path = queue.job->>'relative_path'), + -- If repository_id doesn't exist we still need to fill this column otherwise it may fail on scan into struct. + 0), + replica_path = COALESCE(queue.job->>'replica_path', ''), + relative_path = queue.job->>'relative_path', + target_node_storage = queue.job->>'target_node_storage', + source_node_storage = COALESCE(queue.job->>'source_node_storage', ''), + virtual_storage = queue.job->>'virtual_storage', + params = (queue.job->>'params')::JSONB + FROM replication_queue AS queue + LEFT JOIN repositories ON + queue.job->>'virtual_storage' = repositories.virtual_storage AND + queue.job->>'relative_path' = repositories.relative_path + WHERE replication_queue.id = queue.id`, + + // 2. Drop existing foreign key as it does nothing on removal of the referencable row. + `ALTER TABLE replication_queue_job_lock + DROP CONSTRAINT replication_queue_job_lock_job_id_fkey`, + + // 3. And re-create it with the cascade deletion. + `ALTER TABLE replication_queue_job_lock + ADD CONSTRAINT replication_queue_job_lock_job_id_fkey + FOREIGN KEY (job_id) + REFERENCES replication_queue(id) + ON DELETE CASCADE`, + + // 4. Drop existing foreign key as it does nothing on removal of the referencable row. + `ALTER TABLE replication_queue_job_lock DROP CONSTRAINT replication_queue_job_lock_lock_id_fkey`, + + // 5. And re-create it with the cascade deletion. + `ALTER TABLE replication_queue_job_lock + ADD CONSTRAINT replication_queue_job_lock_lock_id_fkey + FOREIGN KEY (lock_id) + REFERENCES replication_queue_lock(id) + ON DELETE CASCADE`, + + // 6. The replication events without repository_id are too old and should be + // removed for the foreign key constraint to be created on the repositories table. + `DELETE FROM replication_queue + WHERE repository_id = 0`, + + // 7. In case we removed some in_progress replication events we need to remove + // corresponding rows from replication_queue_lock because repository doesn't + // exist anymore and we don't need a lock row for it. + `DELETE FROM replication_queue_lock + WHERE acquired + AND NOT EXISTS(SELECT FROM replication_queue_job_lock WHERE lock_id = id)`, + + // 8. Once the repository is removed (row deleted from repositories table) it + // can't be used anymore. And there is no reason to process any remaining + // replication events. If gitaly node was out of service and repository + // deletion was scheduled as replication job this job will be removed as well + // but https://gitlab.com/gitlab-org/gitaly/-/issues/3719 should deal with it. + // And to automatically cleanup remaining replication events we create a foreign + // key with cascade removal. + `ALTER TABLE replication_queue + ADD CONSTRAINT replication_queue_repository_id_fkey + FOREIGN KEY (repository_id) + REFERENCES repositories(repository_id) + ON DELETE CASCADE`, + + // 9. If repository is removed there is nothing that cleans up replication_queue_lock + // table.The table is used to sync run of the replication jobs and rows in it + // created before rows in the replication_queue table that is why we can't use + // foreign key to remove orphaned lock rows. We rely on the trigger that removes + // rows from replication_queue_lock once the record is removed from the repositories table. + `-- +migrate StatementBegin + CREATE FUNCTION remove_queue_lock_on_repository_removal() RETURNS TRIGGER AS $$ + BEGIN + DELETE FROM replication_queue_lock + WHERE id LIKE (OLD.virtual_storage || '|%|' || OLD.relative_path); + RETURN NULL; + END; + $$ LANGUAGE plpgsql;`, + + // 10. Activates a trigger to remove rows from the replication_queue_lock once the row + // is deleted from the repositories table . + `CREATE TRIGGER remove_queue_lock_on_repository_removal AFTER DELETE ON repositories + FOR EACH ROW EXECUTE PROCEDURE remove_queue_lock_on_repository_removal()`, + }, + Down: []string{ + // 10. + `DROP TRIGGER remove_queue_lock_on_repository_removal ON repositories`, + + // 9. + `DROP FUNCTION remove_queue_lock_on_repository_removal`, + + // 8. + `ALTER TABLE replication_queue + DROP CONSTRAINT replication_queue_repository_id_fkey`, + + // 7. We can't restore deleted rows, nothing to do here. + + // 6. We can't restore deleted rows, nothing to do here. + + // 5. + `ALTER TABLE replication_queue_job_lock DROP CONSTRAINT replication_queue_job_lock_lock_id_fkey`, + + // 4. Re-create foreign key with the default options. + `ALTER TABLE replication_queue_job_lock + ADD CONSTRAINT replication_queue_job_lock_lock_id_fkey + FOREIGN KEY (lock_id) + REFERENCES replication_queue_lock(id)`, + + // 3. + `ALTER TABLE replication_queue_job_lock + DROP CONSTRAINT replication_queue_job_lock_job_id_fkey`, + + // 2. + `ALTER TABLE replication_queue_job_lock + ADD CONSTRAINT replication_queue_job_lock_job_id_fkey + FOREIGN KEY (job_id) + REFERENCES replication_queue(id)`, + + // 1. We don't know what is the set of rows we updated that is why we can't reset + // them back, so nothing to do here. + }, + } + + allMigrations = append(allMigrations, m) +} diff --git a/internal/praefect/datastore/migrations/20220216131417_indexes_on_replication_queue.go b/internal/praefect/datastore/migrations/20220216131417_indexes_on_replication_queue.go new file mode 100644 index 0000000000000000000000000000000000000000..4d19628d6c3d2d5c0cda88db9903ff84b8f57f50 --- /dev/null +++ b/internal/praefect/datastore/migrations/20220216131417_indexes_on_replication_queue.go @@ -0,0 +1,53 @@ +package migrations + +import migrate "github.com/rubenv/sql-migrate" + +func init() { + m := &migrate.Migration{ + Id: "20220216131417_indexes_on_replication_queue", + DisableTransactionUp: true, + Up: []string{ + `DROP INDEX replication_queue_target_index`, + `CREATE INDEX CONCURRENTLY replication_queue_target_index + ON replication_queue (virtual_storage, relative_path, target_node_storage, change) + WHERE state NOT IN ('completed', 'cancelled', 'dead')`, + + `DROP INDEX delete_replica_unique_index`, + `CREATE UNIQUE INDEX CONCURRENTLY delete_replica_unique_index + ON replication_queue (virtual_storage, relative_path) + WHERE state NOT IN ('completed', 'cancelled', 'dead') + AND change = 'delete_replica'`, + + `DROP INDEX IF EXISTS virtual_target_on_replication_queue_idx`, + `CREATE INDEX CONCURRENTLY virtual_target_on_replication_queue_idx + ON replication_queue USING BTREE (virtual_storage, target_node_storage)`, + }, + DisableTransactionDown: true, + Down: []string{ + `DROP INDEX replication_queue_target_index`, + `CREATE INDEX CONCURRENTLY replication_queue_target_index + ON replication_queue ( + (job->>'virtual_storage'), + (job->>'relative_path'), + (job->>'target_node_storage'), + (job->>'change') + ) + WHERE state NOT IN ('completed', 'cancelled', 'dead')`, + + `DROP INDEX delete_replica_unique_index`, + `CREATE UNIQUE INDEX CONCURRENTLY delete_replica_unique_index + ON replication_queue ( + (job->>'virtual_storage'), + (job->>'relative_path') + ) + WHERE state NOT IN ('completed', 'cancelled', 'dead') + AND job->>'change' = 'delete_replica'`, + + `DROP INDEX IF EXISTS virtual_target_on_replication_queue_idx`, + `CREATE INDEX CONCURRENTLY virtual_target_on_replication_queue_idx + ON replication_queue USING BTREE ((job->>'virtual_storage'), (job->>'target_node_storage'))`, + }, + } + + allMigrations = append(allMigrations, m) +} diff --git a/internal/praefect/datastore/migrations/20220216140151_valid_primaries_flatten_job.go b/internal/praefect/datastore/migrations/20220216140151_valid_primaries_flatten_job.go new file mode 100644 index 0000000000000000000000000000000000000000..2ef001033c355a476f0edaf3f645bd89a12e1d8d --- /dev/null +++ b/internal/praefect/datastore/migrations/20220216140151_valid_primaries_flatten_job.go @@ -0,0 +1,65 @@ +package migrations + +import migrate "github.com/rubenv/sql-migrate" + +func init() { + m := &migrate.Migration{ + Id: "20220216140151_valid_primaries_flatten_job", + Up: []string{ + ` +CREATE OR REPLACE VIEW valid_primaries AS + SELECT repository_id, virtual_storage, relative_path, storage + FROM ( + SELECT + repository_id, + repositories.virtual_storage, + repositories.relative_path, + storage, + repository_assignments.storage IS NOT NULL + OR bool_and(repository_assignments.storage IS NULL) OVER (PARTITION BY repository_id) AS eligible + FROM repositories + JOIN (SELECT repository_id, storage, generation FROM storage_repositories) AS storage_repositories USING (repository_id, generation) + JOIN healthy_storages USING (virtual_storage, storage) + LEFT JOIN repository_assignments USING (repository_id, storage) + WHERE NOT EXISTS ( + SELECT FROM replication_queue AS queue + WHERE queue.state NOT IN ('completed', 'dead', 'cancelled') + AND queue.change = 'delete_replica' + AND queue.repository_id = storage_repositories.repository_id + AND queue.target_node_storage = storage_repositories.storage + ) + ) AS candidates + WHERE eligible + `, + }, + Down: []string{ + ` +CREATE OR REPLACE VIEW valid_primaries AS + SELECT repository_id, virtual_storage, relative_path, storage + FROM ( + SELECT + repository_id, + repositories.virtual_storage, + repositories.relative_path, + storage, + repository_assignments.storage IS NOT NULL + OR bool_and(repository_assignments.storage IS NULL) OVER (PARTITION BY repository_id) AS eligible + FROM repositories + JOIN (SELECT repository_id, storage, generation FROM storage_repositories) AS storage_repositories USING (repository_id, generation) + JOIN healthy_storages USING (virtual_storage, storage) + LEFT JOIN repository_assignments USING (repository_id, storage) + WHERE NOT EXISTS ( + SELECT FROM replication_queue AS queue + WHERE queue.state NOT IN ('completed', 'dead', 'cancelled') + AND queue.job->>'change' = 'delete_replica' + AND (queue.job->>'repository_id')::bigint = storage_repositories.repository_id + AND queue.job->>'target_node_storage' = storage_repositories.storage + ) + ) AS candidates + WHERE eligible + `, + }, + } + + allMigrations = append(allMigrations, m) +} diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go index 986564b1894a5ddae3199d8fad60c213331b7335..65948426a9606efcae1a81c4ccf2a41293f2dc79 100644 --- a/internal/praefect/datastore/queue.go +++ b/internal/praefect/datastore/queue.go @@ -101,50 +101,8 @@ type ReplicationEvent struct { Meta Params } -// Mapping returns list of references to the struct fields that correspond to the SQL columns/column aliases. -func (event *ReplicationEvent) Mapping(columns []string) ([]interface{}, error) { - var mapping []interface{} - for _, column := range columns { - switch column { - case "id": - mapping = append(mapping, &event.ID) - case "state": - mapping = append(mapping, &event.State) - case "created_at": - mapping = append(mapping, &event.CreatedAt) - case "updated_at": - mapping = append(mapping, &event.UpdatedAt) - case "attempt": - mapping = append(mapping, &event.Attempt) - case "lock_id": - mapping = append(mapping, &event.LockID) - case "job": - mapping = append(mapping, &event.Job) - case "meta": - mapping = append(mapping, &event.Meta) - default: - return nil, fmt.Errorf("unknown column specified in SELECT statement: %q", column) - } - } - return mapping, nil -} - -// Scan fills receive fields with values fetched from database based on the set of columns/column aliases. -func (event *ReplicationEvent) Scan(columns []string, rows *sql.Rows) error { - mappings, err := event.Mapping(columns) - if err != nil { - return err - } - return rows.Scan(mappings...) -} - // scanReplicationEvents reads all rows and convert them into structs filling all the fields according to fetched columns/column aliases. func scanReplicationEvents(rows *sql.Rows) (events []ReplicationEvent, err error) { - columns, err := rows.Columns() - if err != nil { - return events, err - } - defer func() { if cErr := rows.Close(); cErr != nil && err == nil { err = cErr @@ -153,9 +111,29 @@ func scanReplicationEvents(rows *sql.Rows) (events []ReplicationEvent, err error for rows.Next() { var event ReplicationEvent - if err = event.Scan(columns, rows); err != nil { + var srcNodeStorage sql.NullString + if err = rows.Scan( + &event.ID, + &event.State, + &event.CreatedAt, + &event.UpdatedAt, + &event.LockID, + &event.Attempt, + &event.Meta, + &event.Job.Change, + &event.Job.RepositoryID, + &event.Job.ReplicaPath, + &event.Job.RelativePath, + &event.Job.TargetNodeStorage, + &srcNodeStorage, + &event.Job.VirtualStorage, + &event.Job.Params, + ); err != nil { return events, err } + if srcNodeStorage.Valid { + event.Job.SourceNodeStorage = srcNodeStorage.String + } events = append(events, event) } @@ -226,7 +204,9 @@ func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event Repli INSERT INTO replication_queue(lock_id, job, meta) SELECT insert_lock.id, $4, $5 FROM insert_lock - RETURNING id, state, created_at, updated_at, lock_id, attempt, job, meta` + RETURNING id, state, created_at, updated_at, lock_id, attempt, meta, change, + repository_id, replica_path, relative_path, target_node_storage, + source_node_storage, virtual_storage, params` // this will always return a single row result (because of lock uniqueness) or an error rows, err := rq.qc.QueryContext(ctx, query, event.Job.VirtualStorage, event.Job.TargetNodeStorage, event.Job.RelativePath, event.Job, event.Meta) if err != nil { @@ -275,7 +255,7 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor SELECT id FROM replication_queue WHERE id IN ( - SELECT DISTINCT FIRST_VALUE(queue.id) OVER (PARTITION BY lock_id, job->>'change' ORDER BY queue.created_at) + SELECT DISTINCT FIRST_VALUE(queue.id) OVER (PARTITION BY lock_id, change ORDER BY queue.created_at) FROM replication_queue AS queue JOIN lock ON queue.lock_id = lock.id WHERE queue.state IN ('ready', 'failed' ) @@ -287,12 +267,16 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor ) , job AS ( UPDATE replication_queue AS queue - SET attempt = CASE WHEN job->>'change' = 'delete_replica' THEN queue.attempt ELSE queue.attempt - 1 END + SET attempt = CASE WHEN queue.change = 'delete_replica' THEN queue.attempt ELSE queue.attempt - 1 END , state = 'in_progress' , updated_at = NOW() AT TIME ZONE 'UTC' FROM candidate WHERE queue.id = candidate.id - RETURNING queue.id, queue.state, queue.created_at, queue.updated_at, queue.lock_id, queue.attempt, queue.job, queue.meta + RETURNING queue.id, queue.state, queue.created_at, queue.updated_at, + queue.lock_id, queue.attempt, queue.meta, queue.change, + queue.repository_id, queue.replica_path, queue.relative_path, + queue.target_node_storage, queue.source_node_storage, queue.virtual_storage, + queue.params ) , track_job_lock AS ( INSERT INTO replication_queue_job_lock (job_id, lock_id, triggered_at) @@ -306,7 +290,9 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor FROM track_job_lock AS tracked WHERE lock.id = tracked.lock_id ) - SELECT id, state, created_at, updated_at, lock_id, attempt, job, meta + SELECT id, state, created_at, updated_at, lock_id, attempt, meta, change, + repository_id, replica_path, relative_path, target_node_storage, + source_node_storage, virtual_storage, params FROM job ORDER BY id` rows, err := rq.qc.QueryContext(ctx, query, virtualStorage, nodeStorage, count) @@ -353,7 +339,7 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J query := ` WITH existing AS ( - SELECT id, lock_id, updated_at, job + SELECT id, lock_id, updated_at, change, source_node_storage FROM replication_queue WHERE id = ANY($1) AND state = 'in_progress' @@ -374,9 +360,9 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J -- they are for the exact same repository AND queue.lock_id = existing.lock_id -- and created to apply exact same replication operation (gc, update, ...) - AND queue.job->>'change' = existing.job->>'change' + AND queue.change = existing.change -- from the same source storage (if applicable, as 'gc' has no source) - AND COALESCE(queue.job->>'source_node_storage', '') = COALESCE(existing.job->>'source_node_storage', '')) + AND COALESCE(queue.source_node_storage, '') = COALESCE(existing.source_node_storage, '')) ) ) RETURNING queue.id, queue.lock_id diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go index 03cb27bf4cbd84093509d0af6ef96467c187788a..45052fae55cecbe8d08b1611b848e7f1ff73ec30 100644 --- a/internal/praefect/datastore/queue_test.go +++ b/internal/praefect/datastore/queue_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" ) @@ -111,6 +112,7 @@ func TestPostgresReplicationEventQueue_DeleteReplicaUniqueIndex(t *testing.T) { ctx := testhelper.Context(t) if tc.existingJob != nil { + insertRepository(t, db, ctx, tc.existingJob.Job.VirtualStorage, tc.existingJob.Job.RelativePath, "stub") _, err := db.ExecContext(ctx, ` INSERT INTO replication_queue (state, job) VALUES ($1, $2) @@ -118,14 +120,16 @@ func TestPostgresReplicationEventQueue_DeleteReplicaUniqueIndex(t *testing.T) { require.NoError(t, err) } + job := ReplicationJob{ + Change: DeleteReplica, + VirtualStorage: "praefect", + RelativePath: "relative-path", + TargetNodeStorage: "gitaly-1", + } + insertRepository(t, db, ctx, job.VirtualStorage, job.RelativePath, "stub") _, err := NewPostgresReplicationEventQueue(db).Enqueue(ctx, ReplicationEvent{ State: JobStateReady, - Job: ReplicationJob{ - Change: DeleteReplica, - VirtualStorage: "praefect", - RelativePath: "relative-path", - TargetNodeStorage: "gitaly-1", - }, + Job: job, }) if tc.succeeds { @@ -155,6 +159,7 @@ func TestPostgresReplicationEventQueue_Enqueue(t *testing.T) { Params: nil, }, } + repositoryID := insertRepository(t, db, ctx, eventType.Job.VirtualStorage, eventType.Job.RelativePath, eventType.Job.SourceNodeStorage) actualEvent, err := queue.Enqueue(ctx, eventType) // initial event require.NoError(t, err) @@ -168,6 +173,7 @@ func TestPostgresReplicationEventQueue_Enqueue(t *testing.T) { Attempt: 3, LockID: "praefect|gitaly-1|/project/path-1", Job: ReplicationJob{ + RepositoryID: repositoryID, Change: UpdateRepo, RelativePath: "/project/path-1", TargetNodeStorage: "gitaly-1", @@ -183,18 +189,111 @@ func TestPostgresReplicationEventQueue_Enqueue(t *testing.T) { db.RequireRowsInTable(t, "replication_queue_job_lock", 0) } -func TestPostgresReplicationEventQueue_DeleteReplicaInfiniteAttempts(t *testing.T) { +func TestPostgresReplicationEventQueue_Enqueue_triggerPopulatesColumns(t *testing.T) { t.Parallel() - queue := NewPostgresReplicationEventQueue(testdb.New(t)) + db := testdb.New(t) ctx := testhelper.Context(t) - actualEvent, err := queue.Enqueue(ctx, ReplicationEvent{ - Job: ReplicationJob{ - Change: DeleteReplica, + t.Run("no repository record exists", func(t *testing.T) { + tx := db.Begin(t) + defer tx.Rollback(t) + + job := ReplicationJob{ + Change: UpdateRepo, + RepositoryID: 1, RelativePath: "/project/path-1", + ReplicaPath: "relative/project/path-1", TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", VirtualStorage: "praefect", - }, + Params: nil, + } + queue := NewPostgresReplicationEventQueue(tx) + event := ReplicationEvent{Job: job} + _, err := queue.Enqueue(ctx, event) + ok := glsql.IsForeignKeyViolation(err, "replication_queue_repository_id_fkey") + require.Truef(t, ok, "returned error is not expected: %+v", err) + }) + + t.Run("repository id not set on job, but found in repositories table", func(t *testing.T) { + tx := db.Begin(t) + defer tx.Rollback(t) + + job := ReplicationJob{ + Change: RenameRepo, + RelativePath: "/project/path-1", + ReplicaPath: "relative/project/path-1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + VirtualStorage: "praefect", + Params: Params{"RelativePath": "new/path"}, + } + repositoryID := insertRepository(t, tx, ctx, job.VirtualStorage, job.RelativePath, job.SourceNodeStorage) + queue := NewPostgresReplicationEventQueue(tx) + event := ReplicationEvent{Job: job} + enqueued, err := queue.Enqueue(ctx, event) + require.NoError(t, err) + + actual := extractReplicationJob(t, ctx, tx, enqueued.ID) + job.RepositoryID = repositoryID + require.Equal(t, job, actual) + }) + + t.Run("repository id set on job", func(t *testing.T) { + tx := db.Begin(t) + defer tx.Rollback(t) + + job := ReplicationJob{ + Change: RenameRepo, + RelativePath: "/project/path-1", + ReplicaPath: "relative/project/path-1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + VirtualStorage: "praefect", + Params: Params{"RelativePath": "new/path"}, + } + job.RepositoryID = insertRepository(t, tx, ctx, job.VirtualStorage, job.RelativePath, job.SourceNodeStorage) + + queue := NewPostgresReplicationEventQueue(tx) + event := ReplicationEvent{Job: job} + enqueued, err := queue.Enqueue(ctx, event) + require.NoError(t, err) + + actual := extractReplicationJob(t, ctx, tx, enqueued.ID) + require.Equal(t, job, actual) + }) +} + +func extractReplicationJob(t *testing.T, ctx context.Context, tx *testdb.TxWrapper, id uint64) ReplicationJob { + t.Helper() + const selectJob = ` + SELECT + change, repository_id, replica_path, relative_path, target_node_storage, + source_node_storage, virtual_storage, params + FROM replication_queue WHERE id = $1` + var job ReplicationJob + require.NoError(t, tx.QueryRowContext(ctx, selectJob, id).Scan( + &job.Change, &job.RepositoryID, &job.ReplicaPath, &job.RelativePath, &job.TargetNodeStorage, + &job.SourceNodeStorage, &job.VirtualStorage, &job.Params, + )) + return job +} + +func TestPostgresReplicationEventQueue_DeleteReplicaInfiniteAttempts(t *testing.T) { + t.Parallel() + db := testdb.New(t) + queue := NewPostgresReplicationEventQueue(db) + ctx := testhelper.Context(t) + + job := ReplicationJob{ + Change: DeleteReplica, + RelativePath: "/project/path-1", + TargetNodeStorage: "gitaly-1", + VirtualStorage: "praefect", + } + job.RepositoryID = insertRepository(t, db, ctx, job.VirtualStorage, job.RelativePath, "stub") + actualEvent, err := queue.Enqueue(ctx, ReplicationEvent{ + Job: job, }) require.NoError(t, err) @@ -204,6 +303,7 @@ func TestPostgresReplicationEventQueue_DeleteReplicaInfiniteAttempts(t *testing. Attempt: 3, LockID: "praefect|gitaly-1|/project/path-1", Job: ReplicationJob{ + RepositoryID: job.RepositoryID, Change: DeleteReplica, RelativePath: "/project/path-1", TargetNodeStorage: "gitaly-1", @@ -272,6 +372,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { }, } + eventType1.Job.RepositoryID = insertRepository(t, db, ctx, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage) event1, err := queue.Enqueue(ctx, eventType1) // initial event require.NoError(t, err) @@ -286,6 +387,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { LockID: "praefect-0|gitaly-1|/project/path-1", Job: ReplicationJob{ Change: UpdateRepo, + RepositoryID: eventType1.Job.RepositoryID, RelativePath: "/project/path-1", TargetNodeStorage: "gitaly-1", SourceNodeStorage: "gitaly-0", @@ -308,6 +410,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { LockID: "praefect-0|gitaly-1|/project/path-1", Job: ReplicationJob{ Change: UpdateRepo, + RepositoryID: eventType1.Job.RepositoryID, RelativePath: "/project/path-1", TargetNodeStorage: "gitaly-1", SourceNodeStorage: "gitaly-0", @@ -329,6 +432,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { LockID: "praefect-0|gitaly-2|/project/path-1", Job: ReplicationJob{ Change: RenameRepo, + RepositoryID: eventType1.Job.RepositoryID, RelativePath: "/project/path-1", TargetNodeStorage: "gitaly-2", SourceNodeStorage: "", @@ -340,6 +444,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2, expEvent3}) requireLocks(t, ctx, db, []LockRow{expLock1, expLock2}) // the new lock for another target repeated event + eventType3.Job.RepositoryID = insertRepository(t, db, ctx, eventType3.Job.VirtualStorage, eventType3.Job.RelativePath, eventType3.Job.SourceNodeStorage) event4, err := queue.Enqueue(ctx, eventType3) // event for another repo require.NoError(t, err) @@ -350,6 +455,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { LockID: "praefect-1|gitaly-1|/project/path-2", Job: ReplicationJob{ Change: UpdateRepo, + RepositoryID: eventType3.Job.RepositoryID, RelativePath: "/project/path-2", TargetNodeStorage: "gitaly-1", SourceNodeStorage: "gitaly-0", @@ -381,6 +487,7 @@ func TestPostgresReplicationEventQueue_Dequeue(t *testing.T) { Params: nil, }, } + event.Job.RepositoryID = insertRepository(t, db, ctx, event.Job.VirtualStorage, event.Job.RelativePath, event.Job.SourceNodeStorage) event, err := queue.Enqueue(ctx, event) require.NoError(t, err, "failed to fill in event queue") @@ -464,6 +571,8 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) { // events to fill in the queue events := []ReplicationEvent{eventType1, eventType1, eventType2, eventType1, eventType3, eventType4} for i := range events { + events[i].Job.RepositoryID = insertRepository(t, db, ctx, events[i].Job.VirtualStorage, events[i].Job.RelativePath, events[i].Job.SourceNodeStorage) + var err error events[i], err = queue.Enqueue(ctx, events[i]) require.NoError(t, err, "failed to fill in event queue") @@ -549,6 +658,7 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test } for i := 0; i < 2; i++ { + eventType1.Job.RepositoryID = insertRepository(t, db, ctx, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage) _, err := queue.Enqueue(ctx, eventType1) require.NoError(t, err, "failed to fill in event queue") } @@ -563,6 +673,7 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test requireJobLocks(t, ctx, db, []JobLockRow{{JobID: 1, LockID: "praefect|gitaly-1|/project/path-1"}}) for i := 0; i < 2; i++ { + eventType2.Job.RepositoryID = insertRepository(t, db, ctx, eventType2.Job.VirtualStorage, eventType2.Job.RelativePath, eventType2.Job.SourceNodeStorage) _, err := queue.Enqueue(ctx, eventType2) require.NoError(t, err, "failed to fill in event queue") } @@ -598,7 +709,9 @@ func TestPostgresReplicationEventQueue_Acknowledge(t *testing.T) { }, } + repositoryID := insertRepository(t, db, ctx, event.Job.VirtualStorage, event.Job.RelativePath, event.Job.SourceNodeStorage) event, err := queue.Enqueue(ctx, event) + event.Job.RepositoryID = repositoryID require.NoError(t, err, "failed to fill in event queue") actual, err := queue.Dequeue(ctx, event.Job.VirtualStorage, event.Job.TargetNodeStorage, 100) @@ -675,6 +788,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { events := []ReplicationEvent{eventType1, eventType1, eventType2, eventType1, eventType3, eventType2, eventType4} // events to fill in the queue for i := range events { + events[i].Job.RepositoryID = insertRepository(t, db, ctx, events[i].Job.VirtualStorage, events[i].Job.RelativePath, events[i].Job.SourceNodeStorage) var err error events[i], err = queue.Enqueue(ctx, events[i]) require.NoError(t, err, "failed to fill in event queue") @@ -882,6 +996,7 @@ func TestPostgresReplicationEventQueue_StartHealthUpdate(t *testing.T) { queue := NewPostgresReplicationEventQueue(db) events := []ReplicationEvent{eventType1, eventType2, eventType3, eventType4} for i := range events { + events[i].Job.RepositoryID = insertRepository(t, db, ctx, events[i].Job.VirtualStorage, events[i].Job.RelativePath, events[i].Job.SourceNodeStorage) var err error events[i], err = queue.Enqueue(ctx, events[i]) require.NoError(t, err, "failed to fill in event queue") @@ -956,6 +1071,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { db.TruncateAll(t) source := NewPostgresReplicationEventQueue(db) + insertRepository(t, db, ctx, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage) event, err := source.Enqueue(ctx, eventType1) require.NoError(t, err) @@ -973,10 +1089,12 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { db.TruncateAll(t) source := NewPostgresReplicationEventQueue(db) + insertRepository(t, db, ctx, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage) // move event to 'ready' state event1, err := source.Enqueue(ctx, eventType1) require.NoError(t, err) + insertRepository(t, db, ctx, eventType2.Job.VirtualStorage, eventType2.Job.RelativePath, eventType2.Job.SourceNodeStorage) // move event to 'failed' state event2, err := source.Enqueue(ctx, eventType2) require.NoError(t, err) @@ -986,6 +1104,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { _, err = source.Acknowledge(ctx, JobStateFailed, []uint64{devents2[0].ID}) require.NoError(t, err) + insertRepository(t, db, ctx, eventType3.Job.VirtualStorage, eventType3.Job.RelativePath, eventType3.Job.SourceNodeStorage) // move event to 'dead' state event3, err := source.Enqueue(ctx, eventType3) require.NoError(t, err) @@ -995,6 +1114,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { _, err = source.Acknowledge(ctx, JobStateDead, []uint64{devents3[0].ID}) require.NoError(t, err) + insertRepository(t, db, ctx, eventType4.Job.VirtualStorage, eventType4.Job.RelativePath, eventType4.Job.SourceNodeStorage) event4, err := source.Enqueue(ctx, eventType4) require.NoError(t, err) devents4, err := source.Dequeue(ctx, event4.Job.VirtualStorage, event4.Job.TargetNodeStorage, 1) @@ -1016,6 +1136,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { var events []ReplicationEvent for _, eventType := range []ReplicationEvent{eventType1, eventType2, eventType3} { + insertRepository(t, db, ctx, eventType.Job.VirtualStorage, eventType.Job.RelativePath, eventType.Job.SourceNodeStorage) event, err := source.Enqueue(ctx, eventType) require.NoError(t, err) devents, err := source.Dequeue(ctx, event.Job.VirtualStorage, event.Job.TargetNodeStorage, 1) @@ -1045,6 +1166,73 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { }) } +func TestLockRowIsRemovedOnceRepositoryIsRemoved(t *testing.T) { + t.Parallel() + db := testdb.New(t) + ctx := testhelper.Context(t) + + const ( + virtualStorage = "praefect" + relativePath = "/project/path-1" + primaryStorage = "gitaly-0" + ) + + enqueueJobs := []ReplicationJob{ + // This event will be moved to in_progress state and lock will be with acquired=true. + { + Change: UpdateRepo, + RepositoryID: 1, + RelativePath: relativePath, + ReplicaPath: "relative/project/path-1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: primaryStorage, + VirtualStorage: virtualStorage, + Params: nil, + }, + // This event is for another storage, but for the same repository. + { + Change: UpdateRepo, + RepositoryID: 1, + RelativePath: relativePath, + ReplicaPath: "relative/project/path-2", + TargetNodeStorage: "gitaly-2", + SourceNodeStorage: primaryStorage, + VirtualStorage: virtualStorage, + Params: nil, + }, + } + repositoryID := insertRepository(t, db, ctx, virtualStorage, relativePath, primaryStorage) + + queue := NewPostgresReplicationEventQueue(db) + for _, job := range enqueueJobs { + event := ReplicationEvent{Job: job} + _, err := queue.Enqueue(ctx, event) + require.NoError(t, err) + } + _, err := queue.Dequeue(ctx, enqueueJobs[0].VirtualStorage, enqueueJobs[0].TargetNodeStorage, 1) + require.NoError(t, err, "pickup one job to change it's status and create additional rows") + + _, err = db.ExecContext(ctx, `DELETE FROM repositories where repository_id = $1`, repositoryID) + require.NoError(t, err) + + db.RequireRowsInTable(t, "replication_queue_lock", 0) +} + +func insertRepository(t *testing.T, db glsql.Querier, ctx context.Context, virtualStorage, relativePath, primary string) int64 { + t.Helper() + const query = ` + INSERT INTO repositories(virtual_storage, relative_path, generation, "primary") + VALUES ($1, $2, $3, $4) + ON CONFLICT (virtual_storage, relative_path) + DO UPDATE SET relative_path = excluded.relative_path + RETURNING repository_id` + var repositoryID int64 + err := db.QueryRowContext(ctx, query, virtualStorage, relativePath, 1, primary). + Scan(&repositoryID) + require.NoError(t, err, "create repository record") + return repositoryID +} + func requireEvents(t *testing.T, ctx context.Context, db testdb.DB, expected []ReplicationEvent) { t.Helper() @@ -1058,12 +1246,19 @@ func requireEvents(t *testing.T, ctx context.Context, db testdb.DB, expected []R exp[i].UpdatedAt = nil } - sqlStmt := `SELECT id, state, attempt, lock_id, job FROM replication_queue ORDER BY id` + sqlStmt := `SELECT id, state, created_at, updated_at, lock_id, attempt, meta, change, + repository_id, replica_path, relative_path, target_node_storage, + source_node_storage, virtual_storage, params + FROM replication_queue ORDER BY id` rows, err := db.QueryContext(ctx, sqlStmt) require.NoError(t, err) actual, err := scanReplicationEvents(rows) require.NoError(t, err) + for i := 0; i < len(actual); i++ { + actual[i].CreatedAt = time.Time{} + actual[i].UpdatedAt = nil + } require.Equal(t, exp, actual) } diff --git a/internal/praefect/nodes/per_repository_test.go b/internal/praefect/nodes/per_repository_test.go index a0229c573314ac4ecb6e894e0475285c30df9b63..19f69a7eb091d1bc1773468283b27a1aaf164072 100644 --- a/internal/praefect/nodes/per_repository_test.go +++ b/internal/praefect/nodes/per_repository_test.go @@ -417,24 +417,6 @@ func TestPerRepositoryElector(t *testing.T) { }, }, existingJobs: []datastore.ReplicationEvent{ - { - State: datastore.JobStateReady, - Job: datastore.ReplicationJob{ - Change: datastore.DeleteReplica, - VirtualStorage: "wrong-virtual-storage", - RelativePath: "relative-path-1", - TargetNodeStorage: "gitaly-1", - }, - }, - { - State: datastore.JobStateReady, - Job: datastore.ReplicationJob{ - Change: datastore.DeleteReplica, - VirtualStorage: "virtual-storage-1", - RelativePath: "wrong-relative-path", - TargetNodeStorage: "gitaly-1", - }, - }, { State: datastore.JobStateReady, Job: datastore.ReplicationJob{ diff --git a/internal/praefect/reconciler/reconciler.go b/internal/praefect/reconciler/reconciler.go index 1908ef9ee99c94459493ae55cfa1843d1793c0af..124737dc4f390363aa9ad310d7d6653b6975a79c 100644 --- a/internal/praefect/reconciler/reconciler.go +++ b/internal/praefect/reconciler/reconciler.go @@ -177,21 +177,21 @@ delete_jobs AS ( ) AND NOT EXISTS ( -- Ensure the replica is not used as target or source in any scheduled job. This is to avoid breaking -- any already scheduled jobs. - SELECT FROM replication_queue - WHERE (job->'repository_id')::bigint = repository_id + SELECT FROM replication_queue AS q + WHERE q.repository_id = repositories.repository_id AND ( - job->>'source_node_storage' = storage - OR job->>'target_node_storage' = storage + q.source_node_storage = healthy_storages.storage + OR q.target_node_storage = healthy_storages.storage ) AND state NOT IN ('completed', 'dead') ) AND NOT EXISTS ( -- Ensure there are no other scheduled 'delete_replica' type jobs for the repository. Performing rapid -- repository_assignments could cause the reconciler to schedule deletion against all replicas. To avoid this, -- we do not allow more than one 'delete_replica' job to be active at any given time. - SELECT FROM replication_queue + SELECT FROM replication_queue AS q WHERE state NOT IN ('completed', 'dead') - AND (job->>'repository_id')::bigint = repository_id - AND job->>'change' = 'delete_replica' + AND q.repository_id = repositories.repository_id + AND q.change = 'delete_replica' ) ), @@ -229,20 +229,20 @@ update_jobs AS ( JOIN repositories USING (repository_id, relative_path, generation) JOIN healthy_storages USING (virtual_storage, storage) WHERE NOT EXISTS ( - SELECT FROM replication_queue + SELECT FROM replication_queue AS q WHERE state NOT IN ('completed', 'dead') - AND (job->>'repository_id')::bigint = repository_id - AND job->>'target_node_storage' = storage - AND job->>'change' = 'delete_replica' + AND q.repository_id = repositories.repository_id + AND q.target_node_storage = healthy_storages.storage + AND q.change = 'delete_replica' ) ORDER BY virtual_storage, relative_path ) AS healthy_repositories USING (repository_id) WHERE NOT EXISTS ( - SELECT FROM replication_queue + SELECT FROM replication_queue AS q WHERE state NOT IN ('completed', 'dead') - AND (job->>'repository_id')::bigint = repository_id - AND job->>'target_node_storage' = target_node_storage - AND job->>'change' = 'update' + AND q.repository_id = unhealthy_repositories.repository_id + AND q.target_node_storage = unhealthy_repositories.target_node_storage + AND q.change = 'update' ) ORDER BY repository_id, target_node_storage, random() ), @@ -275,7 +275,7 @@ reconciliation_jobs AS ( -- only perform inserts if we managed to acquire the lock as otherwise -- we'd schedule duplicate jobs WHERE ( SELECT acquired FROM reconciliation_lock ) - RETURNING lock_id, meta, job + RETURNING lock_id, meta, repository_id, change, virtual_storage, relative_path, source_node_storage, target_node_storage ), create_locks AS ( @@ -287,12 +287,12 @@ create_locks AS ( SELECT meta->>'correlation_id', - job->>'repository_id', - job->>'change', - job->>'virtual_storage', - job->>'relative_path', - job->>'source_node_storage', - job->>'target_node_storage' + repository_id, + change, + virtual_storage, + relative_path, + source_node_storage, + target_node_storage FROM reconciliation_jobs `, advisorylock.Reconcile, virtualStorages, storages) if err != nil { diff --git a/internal/praefect/reconciler/reconciler_test.go b/internal/praefect/reconciler/reconciler_test.go index 6390c9478db650d92bc5d690ae4b7b003c14f693..673483d7e0884aad93d541b7087b4b77d09ba3fa 100644 --- a/internal/praefect/reconciler/reconciler_test.go +++ b/internal/praefect/reconciler/reconciler_test.go @@ -648,6 +648,7 @@ func TestReconciler(t *testing.T) { { State: datastore.JobStateReady, Job: datastore.ReplicationJob{ + Change: datastore.GarbageCollect, VirtualStorage: "virtual-storage-1", RelativePath: "relative-path-1", TargetNodeStorage: "storage-2", @@ -670,6 +671,7 @@ func TestReconciler(t *testing.T) { { State: datastore.JobStateInProgress, Job: datastore.ReplicationJob{ + Change: datastore.GarbageCollect, VirtualStorage: "virtual-storage-1", RelativePath: "relative-path-1", TargetNodeStorage: "storage-2", @@ -692,6 +694,7 @@ func TestReconciler(t *testing.T) { { State: datastore.JobStateFailed, Job: datastore.ReplicationJob{ + Change: datastore.GarbageCollect, VirtualStorage: "virtual-storage-1", RelativePath: "relative-path-1", TargetNodeStorage: "storage-2", @@ -714,6 +717,7 @@ func TestReconciler(t *testing.T) { { State: datastore.JobStateReady, Job: datastore.ReplicationJob{ + Change: datastore.GarbageCollect, VirtualStorage: "virtual-storage-1", RelativePath: "relative-path-1", SourceNodeStorage: "storage-2", @@ -736,6 +740,7 @@ func TestReconciler(t *testing.T) { { State: datastore.JobStateInProgress, Job: datastore.ReplicationJob{ + Change: datastore.GarbageCollect, VirtualStorage: "virtual-storage-1", RelativePath: "relative-path-1", SourceNodeStorage: "storage-2", @@ -758,6 +763,7 @@ func TestReconciler(t *testing.T) { { State: datastore.JobStateFailed, Job: datastore.ReplicationJob{ + Change: datastore.GarbageCollect, VirtualStorage: "virtual-storage-1", RelativePath: "relative-path-1", SourceNodeStorage: "storage-2", @@ -777,25 +783,10 @@ func TestReconciler(t *testing.T) { }, }, existingJobs: existingJobs{ - { - State: datastore.JobStateReady, - Job: datastore.ReplicationJob{ - VirtualStorage: "wrong-virtual-storage", - RelativePath: "relative-path-1", - SourceNodeStorage: "storage-2", - }, - }, - { - State: datastore.JobStateReady, - Job: datastore.ReplicationJob{ - VirtualStorage: "virtual-storage-1", - RelativePath: "wrong-relative-path", - SourceNodeStorage: "storage-2", - }, - }, { State: datastore.JobStateDead, Job: datastore.ReplicationJob{ + Change: datastore.GarbageCollect, VirtualStorage: "virtual-storage-1", RelativePath: "relative-path-1", SourceNodeStorage: "storage-2", @@ -804,6 +795,7 @@ func TestReconciler(t *testing.T) { { State: datastore.JobStateCompleted, Job: datastore.ReplicationJob{ + Change: datastore.GarbageCollect, VirtualStorage: "virtual-storage-1", RelativePath: "relative-path-1", SourceNodeStorage: "storage-2", @@ -952,24 +944,6 @@ func TestReconciler(t *testing.T) { }, }, existingJobs: existingJobs{ - { - State: datastore.JobStateFailed, - Job: datastore.ReplicationJob{ - Change: datastore.DeleteReplica, - VirtualStorage: "wrong-virtual-storage", - RelativePath: "relative-path-1", - SourceNodeStorage: "storage-1", - }, - }, - { - State: datastore.JobStateFailed, - Job: datastore.ReplicationJob{ - Change: datastore.DeleteReplica, - VirtualStorage: "virtual-storage-1", - RelativePath: "wrong-relative-path", - SourceNodeStorage: "storage-1", - }, - }, { State: datastore.JobStateDead, Job: datastore.ReplicationJob{ diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 7cd48ff3a1526d7199e1719ce64c1834681b71f9..4018b51bda8933522e415f61fc282838762c6097 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -56,6 +56,8 @@ func TestReplMgr_ProcessBacklog(t *testing.T) { } func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) { + db := testdb.New(t) + primaryCfg, testRepoProto, testRepoPath := testcfg.BuildWithRepo(t, testcfg.WithStorages("primary")) testRepo := localrepo.NewTestRepo(t, primaryCfg, testRepoProto) primaryCfg.SocketPath = testserver.RunGitalyServer(t, primaryCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) @@ -155,7 +157,10 @@ func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) { logger := testhelper.NewDiscardingLogger(t) loggerHook := test.NewLocal(logger) - queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + require.NoError(t, rs.CreateRepository(ctx, repositoryID, conf.VirtualStorages[0].Name, testRepo.GetRelativePath(), testRepo.GetRelativePath(), shard.Primary.GetStorage(), nil, nil, true, false)) + + queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) queue.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) { cancel() // when it is called we know that replication is finished return queue.Acknowledge(ctx, state, ids) @@ -165,10 +170,6 @@ func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) { _, err = queue.Enqueue(ctx, events[0]) require.NoError(t, err) - db := testdb.New(t) - rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - require.NoError(t, rs.CreateRepository(ctx, repositoryID, conf.VirtualStorages[0].Name, testRepo.GetRelativePath(), testRepo.GetRelativePath(), shard.Primary.GetStorage(), nil, nil, true, false)) - replMgr := NewReplMgr( loggerEntry, conf.StorageNames(), @@ -282,6 +283,8 @@ func TestReplicatorDowngradeAttempt(t *testing.T) { func TestReplicator_PropagateReplicationJob(t *testing.T) { t.Parallel() + db := testdb.New(t) + primaryStorage, secondaryStorage := "internal-gitaly-0", "internal-gitaly-1" primCfg := testcfg.Build(t, testcfg.WithStorages(primaryStorage)) @@ -315,7 +318,7 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) { // unlinkat /tmp/gitaly-222007427/381349228/storages.d/internal-gitaly-1/+gitaly/state/path/to/repo: directory not empty // By using WaitGroup we are sure the test cleanup will be started after all replication // requests are completed, so no running cache IO operations happen. - queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) + queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) var wg sync.WaitGroup queue.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { wg.Add(1) @@ -375,6 +378,9 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) { StorageName: conf.VirtualStorages[0].Name, RelativePath: repositoryRelativePath, } + // We need to create repository record in the database before we can schedule replication events + repoStore := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + require.NoError(t, repoStore.CreateRepository(ctx, 1, conf.VirtualStorages[0].Name, repository.GetRelativePath(), repository.GetRelativePath(), repository.GetStorageName(), nil, nil, true, false)) //nolint:staticcheck _, err = repositoryClient.GarbageCollect(ctx, &gitalypb.GarbageCollectRequest{ @@ -685,6 +691,10 @@ func TestProcessBacklog_FailedJobs(t *testing.T) { } func testProcessBacklogFailedJobs(t *testing.T, ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + db := testdb.New(t) + primaryCfg, testRepo, _ := testcfg.BuildWithRepo(t, testcfg.WithStorages("default")) primaryAddr := testserver.RunGitalyServer(t, primaryCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) @@ -714,26 +724,30 @@ func testProcessBacklogFailedJobs(t *testing.T, ctx context.Context) { }, }, } - ctx, cancel := context.WithCancel(ctx) - - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) // this job exists to verify that replication works okJob := datastore.ReplicationJob{ - RepositoryID: 1, - Change: datastore.UpdateRepo, + RepositoryID: 1, + // The optimize_repository is used because we can't use update type. + // All update events won't be picked as they will be marked as done once the first is over. + Change: datastore.OptimizeRepository, RelativePath: testRepo.RelativePath, TargetNodeStorage: secondary.Storage, - SourceNodeStorage: primary.Storage, VirtualStorage: "praefect", } + + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + require.NoError(t, rs.CreateRepository(ctx, okJob.RepositoryID, okJob.VirtualStorage, okJob.RelativePath, okJob.RelativePath, okJob.SourceNodeStorage, nil, nil, true, false)) + + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) event1, err := queueInterceptor.Enqueue(ctx, datastore.ReplicationEvent{Job: okJob}) require.NoError(t, err) require.Equal(t, uint64(1), event1.ID) - // this job checks flow for replication event that fails + // this job checks flow for replication event that fails because of not existing source storage failJob := okJob - failJob.Change = "invalid-operation" + failJob.Change = datastore.UpdateRepo + failJob.SourceNodeStorage = "invalid" event2, err := queueInterceptor.Enqueue(ctx, datastore.ReplicationEvent{Job: failJob}) require.NoError(t, err) require.Equal(t, uint64(2), event2.ID) @@ -745,10 +759,6 @@ func testProcessBacklogFailedJobs(t *testing.T, ctx context.Context) { nodeMgr.Start(0, time.Hour) defer nodeMgr.Stop() - db := testdb.New(t) - rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - require.NoError(t, rs.CreateRepository(ctx, okJob.RepositoryID, okJob.VirtualStorage, okJob.RelativePath, okJob.RelativePath, okJob.SourceNodeStorage, nil, nil, true, false)) - replMgr := NewReplMgr( logEntry, conf.StorageNames(), @@ -792,6 +802,8 @@ func TestProcessBacklog_Success(t *testing.T) { func testProcessBacklogSuccess(t *testing.T, ctx context.Context) { ctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + db := testdb.New(t) primaryCfg, testRepo, _ := testcfg.BuildWithRepo(t, testcfg.WithStorages("primary")) primaryCfg.SocketPath = testserver.RunGitalyServer(t, primaryCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) @@ -825,7 +837,7 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) { }, } - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) queueInterceptor.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) { ackIDs, err := queue.Acknowledge(ctx, state, ids) if len(ids) > 0 { @@ -854,6 +866,9 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) { }, } + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + require.NoError(t, rs.CreateRepository(ctx, eventType1.Job.RepositoryID, eventType1.Job.VirtualStorage, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage, nil, nil, true, false)) + _, err := queueInterceptor.Enqueue(ctx, eventType1) require.NoError(t, err) @@ -869,6 +884,7 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) { // Rename replication job eventType2 := datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ + RepositoryID: 1, Change: datastore.RenameRepo, RelativePath: testRepo.GetRelativePath(), TargetNodeStorage: secondary.Storage, @@ -884,6 +900,7 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) { // Rename replication job eventType3 := datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ + RepositoryID: 1, Change: datastore.RenameRepo, RelativePath: renameTo1, TargetNodeStorage: secondary.Storage, @@ -904,10 +921,6 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) { nodeMgr.Start(0, time.Hour) defer nodeMgr.Stop() - db := testdb.New(t) - rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - require.NoError(t, rs.CreateRepository(ctx, eventType1.Job.RepositoryID, eventType1.Job.VirtualStorage, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage, nil, nil, true, false)) - replMgr := NewReplMgr( logEntry, conf.StorageNames(), @@ -934,6 +947,7 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) { func TestReplMgrProcessBacklog_OnlyHealthyNodes(t *testing.T) { t.Parallel() + db := testdb.New(t) conf := config.Config{ VirtualStorages: []*config.VirtualStorage{ { @@ -951,7 +965,7 @@ func TestReplMgrProcessBacklog_OnlyHealthyNodes(t *testing.T) { var mtx sync.Mutex expStorages := map[string]bool{conf.VirtualStorages[0].Nodes[0].Storage: true, conf.VirtualStorages[0].Nodes[2].Storage: true} - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) queueInterceptor.OnDequeue(func(_ context.Context, virtualStorageName string, storageName string, _ int, _ datastore.ReplicationEventQueue) ([]datastore.ReplicationEvent, error) { select { case <-ctx.Done(): @@ -1012,6 +1026,8 @@ func (m mockReplicator) Replicate(ctx context.Context, event datastore.Replicati func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(testhelper.Context(t)) + t.Cleanup(cancel) + db := testdb.New(t) const virtualStorage = "virtal-storage" const primaryStorage = "storage-1" @@ -1033,7 +1049,10 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) { }, } - queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t)) + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, "ignored", "ignored", primaryStorage, []string{secondaryStorage}, nil, true, false)) + + queue := datastore.NewPostgresReplicationEventQueue(db) _, err := queue.Enqueue(ctx, datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ RepositoryID: 1, @@ -1046,10 +1065,6 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) { }) require.NoError(t, err) - db := testdb.New(t) - rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, "ignored", "ignored", primaryStorage, []string{secondaryStorage}, nil, true, false)) - replMgr := NewReplMgr( testhelper.NewDiscardingLogEntry(t), conf.StorageNames(), diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index c3681a34c42f6737f4b930116dd2bc50887b32b2..4de7e0fc66d62005c76149e072034d901726263a 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -521,7 +521,14 @@ func TestRemoveRepository(t *testing.T) { verifyReposExistence(t, codes.OK) - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) + virtualRepo := proto.Clone(repos[0][0]).(*gitalypb.Repository) + virtualRepo.StorageName = praefectCfg.VirtualStorages[0].Name + + db := testdb.New(t) + repositoryStore := datastore.NewPostgresRepositoryStore(db, praefectCfg.StorageNames()) + require.NoError(t, repositoryStore.CreateRepository(ctx, 1, virtualRepo.GetStorageName(), virtualRepo.GetRelativePath(), virtualRepo.GetRelativePath(), gitalyCfgs[0].Storages[0].Name, nil, nil, true, false)) + + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) repoStore := defaultRepoStore(praefectCfg) txMgr := defaultTxMgr(praefectCfg) nodeMgr, err := nodes.NewManager(testhelper.NewDiscardingLogEntry(t), praefectCfg, nil, @@ -541,15 +548,15 @@ func TestRemoveRepository(t *testing.T) { GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { return relativePath, nil, nil }, + GetRepositoryIDFunc: func(ctx context.Context, virtualStorage, relativePath string) (int64, error) { + return repositoryStore.GetRepositoryID(ctx, virtualStorage, relativePath) + }, }, withNodeMgr: nodeMgr, withTxMgr: txMgr, }) defer cleanup() - virtualRepo := proto.Clone(repos[0][0]).(*gitalypb.Repository) - virtualRepo.StorageName = praefectCfg.VirtualStorages[0].Name - _, err = gitalypb.NewRepositoryServiceClient(cc).RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{ Repository: virtualRepo, }) @@ -616,30 +623,28 @@ func TestRenameRepository(t *testing.T) { repoPaths[i] = filepath.Join(gitalyCfg.Storages[0].Path, relativePath) } - evq := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) - - tx := testdb.New(t).Begin(t) - defer tx.Rollback(t) + db := testdb.New(t) + evq := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) - rs := datastore.NewPostgresRepositoryStore(tx, nil) + rs := datastore.NewPostgresRepositoryStore(db, nil) require.NoError(t, rs.CreateRepository(ctx, 1, "praefect", repo.RelativePath, repo.RelativePath, "gitaly-1", []string{"gitaly-2", "gitaly-3"}, nil, true, false)) nodeSet, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, nil, nil) require.NoError(t, err) defer nodeSet.Close() - testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": praefectCfg.StorageNames()}) + testdb.SetHealthyNodes(t, ctx, db, map[string]map[string][]string{"praefect": praefectCfg.StorageNames()}) cc, _, cleanup := runPraefectServer(t, ctx, praefectCfg, buildOptions{ withQueue: evq, withRepoStore: rs, withRouter: NewPerRepositoryRouter( nodeSet.Connections(), - nodes.NewPerRepositoryElector(tx), + nodes.NewPerRepositoryElector(db), StaticHealthChecker(praefectCfg.StorageNames()), NewLockedRandom(rand.New(rand.NewSource(0))), rs, - datastore.NewAssignmentStore(tx, praefectCfg.StorageNames()), + datastore.NewAssignmentStore(db, praefectCfg.StorageNames()), rs, nil, ),