diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go index a85f0b7721fd567e4f1f0819331b0278285ccd88..83afe3d52347dece4541584f27544647bd02414f 100644 --- a/cmd/praefect/subcmd.go +++ b/cmd/praefect/subcmd.go @@ -16,6 +16,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/migrations" "google.golang.org/grpc" ) @@ -125,7 +126,7 @@ func (s *sqlMigrateSubcommand) Exec(flags *flag.FlagSet, conf config.Config) err } defer clean() - n, err := glsql.Migrate(db, s.ignoreUnknown) + n, err := glsql.Migrate(db, s.ignoreUnknown, migrations.All()) if err != nil { return fmt.Errorf("%s: fail: %v", subCmd, err) } diff --git a/cmd/praefect/subcmd_accept_dataloss_test.go b/cmd/praefect/subcmd_accept_dataloss_test.go index 55d235afcdf44169c7e5b75364212efc09598317..b58a0f378652019c8dc166f1b6a749d27ca0400e 100644 --- a/cmd/praefect/subcmd_accept_dataloss_test.go +++ b/cmd/praefect/subcmd_accept_dataloss_test.go @@ -39,18 +39,20 @@ func TestAcceptDatalossSubcommand(t *testing.T) { rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) startingGenerations := map[string]int{st1: 1, st2: 0, st3: datastore.GenerationUnknown} - repoCreated := false + repositoryID := int64(0) for storage, generation := range startingGenerations { if generation == datastore.GenerationUnknown { continue } - if !repoCreated { - repoCreated = true - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, storage, nil, nil, false, false)) + if repositoryID == 0 { + var err error + repositoryID, err = rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, repositoryID, vs, repo, storage, nil, nil, false, false)) } - require.NoError(t, rs.SetGeneration(ctx, 1, storage, generation)) + require.NoError(t, rs.SetGeneration(ctx, repositoryID, storage, generation)) } ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer(info.NewServer(conf, rs, nil, nil, nil))}) @@ -145,7 +147,7 @@ func TestAcceptDatalossSubcommand(t *testing.T) { require.NoError(t, fs.Parse(tc.args)) tc.matchError(t, cmd.Exec(fs, conf)) for storage, expected := range tc.expectedGenerations { - actual, err := rs.GetGeneration(ctx, 1, storage) + actual, err := rs.GetGeneration(ctx, repositoryID, storage) require.NoError(t, err) require.Equal(t, expected, actual, storage) } diff --git a/cmd/praefect/subcmd_dataloss_test.go b/cmd/praefect/subcmd_dataloss_test.go index 07d7ad602869e87741c7ec26c293b19938f6067a..deb16746c18ff012a104173df9ab31b517945d0a 100644 --- a/cmd/praefect/subcmd_dataloss_test.go +++ b/cmd/praefect/subcmd_dataloss_test.go @@ -56,28 +56,28 @@ func TestDatalossSubcommand(t *testing.T) { ` INSERT INTO repositories (repository_id, virtual_storage, relative_path, "primary") VALUES - (1, 'virtual-storage-1', 'repository-1', 'gitaly-1'), - (2, 'virtual-storage-1', 'repository-2', 'gitaly-3') + (100000000, 'virtual-storage-1', 'repository-1', 'gitaly-1'), + (100000001, 'virtual-storage-1', 'repository-2', 'gitaly-3') `, ` INSERT INTO repository_assignments (repository_id, virtual_storage, relative_path, storage) VALUES - (1, 'virtual-storage-1', 'repository-1', 'gitaly-1'), - (1, 'virtual-storage-1', 'repository-1', 'gitaly-2'), - (2, 'virtual-storage-1', 'repository-2', 'gitaly-1'), - (2, 'virtual-storage-1', 'repository-2', 'gitaly-3') + (100000000, 'virtual-storage-1', 'repository-1', 'gitaly-1'), + (100000000, 'virtual-storage-1', 'repository-1', 'gitaly-2'), + (100000001, 'virtual-storage-1', 'repository-2', 'gitaly-1'), + (100000001, 'virtual-storage-1', 'repository-2', 'gitaly-3') `, } { _, err := tx.ExecContext(ctx, q) require.NoError(t, err) } - require.NoError(t, gs.SetGeneration(ctx, 1, "gitaly-1", 1)) - require.NoError(t, gs.SetGeneration(ctx, 1, "gitaly-2", 0)) - require.NoError(t, gs.SetGeneration(ctx, 1, "gitaly-3", 0)) + require.NoError(t, gs.SetGeneration(ctx, 100000000, "gitaly-1", 1)) + require.NoError(t, gs.SetGeneration(ctx, 100000000, "gitaly-2", 0)) + require.NoError(t, gs.SetGeneration(ctx, 100000000, "gitaly-3", 0)) - require.NoError(t, gs.SetGeneration(ctx, 2, "gitaly-2", 1)) - require.NoError(t, gs.SetGeneration(ctx, 2, "gitaly-3", 0)) + require.NoError(t, gs.SetGeneration(ctx, 100000001, "gitaly-2", 1)) + require.NoError(t, gs.SetGeneration(ctx, 100000001, "gitaly-3", 0)) ln, clean := listenAndServe(t, []svcRegistrar{ registerPraefectInfoServer(info.NewServer(cfg, gs, nil, nil, nil)), diff --git a/cmd/praefect/subcmd_set_replication_factor_test.go b/cmd/praefect/subcmd_set_replication_factor_test.go index 197045a55bf521683c2af989a2757f9444a14ae3..8369d436d839f0f52d663aeeefe920222da71eaa 100644 --- a/cmd/praefect/subcmd_set_replication_factor_test.go +++ b/cmd/praefect/subcmd_set_replication_factor_test.go @@ -83,16 +83,20 @@ func TestSetReplicationFactorSubcommand(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) store := tc.store if tc.store == nil { - store = datastore.NewAssignmentStore(db, map[string][]string{"virtual-storage": {"primary", "secondary"}}) + store = datastore.NewAssignmentStore(tx, map[string][]string{"virtual-storage": {"primary", "secondary"}}) } // create a repository record + rs := datastore.NewPostgresRepositoryStore(tx, nil) + id, err := rs.ReserveRepositoryID(ctx, "virtual-storage", "relative-path") + require.NoError(t, err) require.NoError(t, - datastore.NewPostgresRepositoryStore(db, nil).CreateRepository(ctx, 1, "virtual-storage", "relative-path", "primary", nil, nil, false, false), + rs.CreateRepository(ctx, id, "virtual-storage", "relative-path", "primary", nil, nil, false, false), ) ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer( @@ -104,7 +108,7 @@ func TestSetReplicationFactorSubcommand(t *testing.T) { cmd := &setReplicationFactorSubcommand{stdout: stdout} fs := cmd.FlagSet() require.NoError(t, fs.Parse(tc.args)) - err := cmd.Exec(fs, config.Config{ + err = cmd.Exec(fs, config.Config{ SocketPath: ln.Addr().String(), }) testassert.GrpcEqualErr(t, tc.error, err) diff --git a/internal/praefect/checks_test.go b/internal/praefect/checks_test.go index 9e6a6f1b5c38b8fa4128b964f0c745774f2d19e2..f8adc31f7ff35f4664b0edc27928c6f0815e81da 100644 --- a/internal/praefect/checks_test.go +++ b/internal/praefect/checks_test.go @@ -55,6 +55,9 @@ func TestPraefectMigrations_success(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { var cfg config.Config db := glsql.NewDB(t) + // Migration that adds artificial data to the database needs to be dropped first + _, err := db.Exec(`DELETE FROM ` + migrations.MigrationTableName + " WHERE id = '20200921170400_artificial_repositories'") + require.NoError(t, err) cfg.DB = glsql.GetDBConfig(t, db.Name) require.NoError(t, tc.prepare(cfg)) diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go index 20c447ee74a61c1583c843eb1456b83231f60e13..9dc5db242cb719dfbd4d6bb1ae6c797450f65ac2 100644 --- a/internal/praefect/coordinator_pg_test.go +++ b/internal/praefect/coordinator_pg_test.go @@ -40,6 +40,8 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { expectedGeneration int } + db := glsql.NewDB(t) + testcases := []struct { desc string primaryFails bool @@ -153,11 +155,9 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { }, } - db := glsql.NewDB(t) - for _, tc := range testcases { t.Run(tc.desc, func(t *testing.T) { - db.TruncateAll(t) + db.Truncate(t, "replication_queue_job_lock", "replication_queue", "replication_queue_lock") storageNodes := make([]*config.Node, 0, len(tc.nodes)) for i := range tc.nodes { @@ -199,18 +199,25 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { // set up the generations prior to transaction rs := datastore.NewPostgresRepositoryStore(tx, conf.StorageNames()) - repoCreated := false + var id int64 + var err error for i, n := range tc.nodes { if n.generation == datastore.GenerationUnknown { continue } - if !repoCreated { - repoCreated = true - require.NoError(t, rs.CreateRepository(ctx, 1, repo.StorageName, repo.RelativePath, storageNodes[i].Storage, nil, nil, true, false)) + if id == 0 { + id, err = rs.ReserveRepositoryID(ctx, repo.StorageName, repo.RelativePath) + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id, repo.StorageName, repo.RelativePath, storageNodes[i].Storage, nil, nil, true, false)) + defer func() { + repoStore := datastore.NewPostgresRepositoryStore(db, nil) + _, _, err := repoStore.DeleteRepository(ctx, repo.StorageName, repo.RelativePath) + require.NoError(t, err) + }() } - require.NoError(t, rs.SetGeneration(ctx, 1, storageNodes[i].Storage, n.generation)) + require.NoError(t, rs.SetGeneration(ctx, id, storageNodes[i].Storage, n.generation)) } testhelper.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": conf.StorageNames()}) @@ -298,7 +305,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { } if tc.concurrentWrite { - require.NoError(t, rs.SetGeneration(ctx, 1, "non-participating-storage", 2)) + require.NoError(t, rs.SetGeneration(ctx, id, "non-participating-storage", 2)) } err = streamParams.RequestFinalizer() @@ -312,7 +319,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { // Nodes that did not successfully commit or did not participate should remain on their // existing generation. for i, n := range tc.nodes { - gen, err := rs.GetGeneration(ctx, 1, storageNodes[i].Storage) + gen, err := rs.GetGeneration(ctx, id, storageNodes[i].Storage) require.NoError(t, err) require.Equal(t, n.expectedGeneration, gen, "node %d has wrong generation", i) } diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index b9e4230d2b84a4045b5f2ef7d8abe230495e80f8..0dbc6b839ef42befe67b1417ab187af18cdd1ab1 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -74,7 +74,8 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) { {desc: "read-only", readOnly: true}, } { t.Run(tc.desc, func(t *testing.T) { - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) const ( virtualStorage = "test-virtual-storage" @@ -108,7 +109,7 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) { } coordinator := NewCoordinator( - datastore.NewPostgresReplicationEventQueue(db), + datastore.NewPostgresReplicationEventQueue(tx), rs, NewNodeManagerRouter(&nodes.MockManager{GetShardFunc: func(vs string) (nodes.Shard, error) { require.Equal(t, virtualStorage, vs) @@ -193,12 +194,16 @@ func TestStreamDirectorMutator(t *testing.T) { rs := datastore.NewPostgresRepositoryStore(tx, conf.StorageNames()) + var id int64 + var err error if tc.repositoryExists { - require.NoError(t, rs.CreateRepository(ctx, 1, targetRepo.StorageName, targetRepo.RelativePath, primaryNode.Storage, []string{secondaryNode.Storage}, nil, true, true)) + id, err = rs.ReserveRepositoryID(ctx, targetRepo.StorageName, targetRepo.RelativePath) + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id, targetRepo.StorageName, targetRepo.RelativePath, primaryNode.Storage, []string{secondaryNode.Storage}, nil, true, true)) } testhelper.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": conf.StorageNames()}) - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(tx)) queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { assert.True(t, len(queueInterceptor.GetEnqueued()) < 2, "expected only one event to be created") return queue.Enqueue(ctx, event) @@ -272,7 +277,7 @@ func TestStreamDirectorMutator(t *testing.T) { CreatedAt: events[0].CreatedAt, UpdatedAt: events[0].UpdatedAt, Job: datastore.ReplicationJob{ - RepositoryID: 1, + RepositoryID: id, Change: datastore.UpdateRepo, VirtualStorage: conf.VirtualStorages[0].Name, RelativePath: targetRepo.RelativePath, @@ -821,7 +826,8 @@ func TestStreamDirector_repo_creation(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) primaryNode := &config.Node{Storage: "praefect-internal-1"} healthySecondaryNode := &config.Node{Storage: "praefect-internal-2"} unhealthySecondaryNode := &config.Node{Storage: "praefect-internal-3"} @@ -924,7 +930,7 @@ func TestStreamDirector_repo_creation(t *testing.T) { } txMgr := transactions.NewManager(conf) - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(tx)) coordinator := NewCoordinator( queueInterceptor, diff --git a/internal/praefect/datastore/assignment_test.go b/internal/praefect/datastore/assignment_test.go index 3062ab125417182a17ed1af055a027e64f035188..599d739efe50e1bafde7d315d0ed75796b5a96bb 100644 --- a/internal/praefect/datastore/assignment_test.go +++ b/internal/praefect/datastore/assignment_test.go @@ -79,9 +79,10 @@ func TestAssignmentStore_GetHostAssignments(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) - rs := NewPostgresRepositoryStore(db, nil) + rs := NewPostgresRepositoryStore(tx, nil) for _, assignment := range tc.existingAssignments { repositoryID, err := rs.GetRepositoryID(ctx, assignment.virtualStorage, assignment.relativePath) if errors.Is(err, commonerr.NewRepositoryNotFoundError(assignment.virtualStorage, assignment.relativePath)) { @@ -91,7 +92,7 @@ func TestAssignmentStore_GetHostAssignments(t *testing.T) { require.NoError(t, rs.CreateRepository(ctx, repositoryID, assignment.virtualStorage, assignment.relativePath, assignment.storage, nil, nil, false, false)) } - _, err = db.ExecContext(ctx, ` + _, err = tx.ExecContext(ctx, ` INSERT INTO repository_assignments (repository_id, virtual_storage, relative_path, storage) VALUES ($1, $2, $3, $4) `, repositoryID, assignment.virtualStorage, assignment.relativePath, assignment.storage) @@ -104,7 +105,7 @@ func TestAssignmentStore_GetHostAssignments(t *testing.T) { } actualAssignments, err := NewAssignmentStore( - db, + tx, map[string][]string{"virtual-storage": configuredStorages}, ).GetHostAssignments(ctx, tc.virtualStorage, repositoryID) require.Equal(t, tc.error, err) @@ -132,6 +133,7 @@ func TestAssignmentStore_SetReplicationFactor(t *testing.T) { } db := glsql.NewDB(t) + const repositoryID = 1e10 for _, tc := range []struct { desc string @@ -210,26 +212,28 @@ func TestAssignmentStore_SetReplicationFactor(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) configuredStorages := map[string][]string{"virtual-storage": {"primary", "secondary-1", "secondary-2"}} if !tc.nonExistentRepository { - _, err := db.ExecContext(ctx, ` + _, err := tx.ExecContext(ctx, ` INSERT INTO repositories (virtual_storage, relative_path, "primary", repository_id) - VALUES ('virtual-storage', 'relative-path', 'primary', 1) - `) + VALUES ('virtual-storage', 'relative-path', 'primary', $1) + `, repositoryID) require.NoError(t, err) } for _, storage := range tc.existingAssignments { - _, err := db.ExecContext(ctx, ` - INSERT INTO repository_assignments VALUES ('virtual-storage', 'relative-path', $1, 1) - `, storage) + _, err := tx.ExecContext(ctx, ` + INSERT INTO repository_assignments (virtual_storage, relative_path, storage, repository_id) + VALUES ('virtual-storage', 'relative-path', $1, $2) + `, storage, repositoryID) require.NoError(t, err) } - store := NewAssignmentStore(db, configuredStorages) + store := NewAssignmentStore(tx, configuredStorages) setStorages, err := store.SetReplicationFactor(ctx, "virtual-storage", "relative-path", tc.replicationFactor) require.Equal(t, tc.error, err) @@ -239,18 +243,18 @@ func TestAssignmentStore_SetReplicationFactor(t *testing.T) { tc.requireStorages(t, setStorages) - assignedStorages, err := store.GetHostAssignments(ctx, "virtual-storage", 1) + assignedStorages, err := store.GetHostAssignments(ctx, "virtual-storage", repositoryID) require.NoError(t, err) sort.Strings(assignedStorages) tc.requireStorages(t, assignedStorages) var storagesWithIncorrectRepositoryID pq.StringArray - require.NoError(t, db.QueryRowContext(ctx, ` + require.NoError(t, tx.QueryRowContext(ctx, ` SELECT array_agg(storage) FROM repository_assignments - WHERE COALESCE(repository_id != 1, true) - `).Scan(&storagesWithIncorrectRepositoryID)) + WHERE COALESCE(repository_id != $1, true) + `, repositoryID).Scan(&storagesWithIncorrectRepositoryID)) require.Empty(t, storagesWithIncorrectRepositoryID) }) } diff --git a/internal/praefect/datastore/glsql/postgres.go b/internal/praefect/datastore/glsql/postgres.go index 4b5efcd86a33eb0092eec758fe715f095b6f826a..28a80306ab4ac5f926111be7201e3bae8aad12b5 100644 --- a/internal/praefect/datastore/glsql/postgres.go +++ b/internal/praefect/datastore/glsql/postgres.go @@ -28,14 +28,14 @@ func OpenDB(conf config.DB) (*sql.DB, error) { } // Migrate will apply all pending SQL migrations. -func Migrate(db *sql.DB, ignoreUnknown bool) (int, error) { +func Migrate(db *sql.DB, ignoreUnknown bool, mgs []*migrate.Migration) (int, error) { migrationSet := migrate.MigrationSet{ IgnoreUnknown: ignoreUnknown, TableName: migrations.MigrationTableName, } migrationSource := &migrate.MemoryMigrationSource{ - Migrations: migrations.All(), + Migrations: mgs, } return migrationSet.Exec(db, "postgres", migrationSource, migrate.Up) diff --git a/internal/praefect/datastore/glsql/testing.go b/internal/praefect/datastore/glsql/testing.go index 915576985658fba446dcad217e3e78e74ad422c3..2152b3ee18638cb9854ea460107e31eadef20066 100644 --- a/internal/praefect/datastore/glsql/testing.go +++ b/internal/praefect/datastore/glsql/testing.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/migrations" ) const ( @@ -73,9 +74,6 @@ func (db DB) Truncate(t testing.TB, tables ...string) { _, err := db.DB.Exec("DELETE FROM " + table) require.NoError(t, err, "database cleanup failed: %s", tables) } - - _, err := db.DB.Exec("SELECT setval(relname::TEXT, 1, false) from pg_class where relkind = 'S'") - require.NoError(t, err, "database cleanup failed: %s", tables) } // RequireRowsInTable verifies that `tname` table has `n` amount of rows in it. @@ -87,22 +85,6 @@ func (db DB) RequireRowsInTable(t *testing.T, tname string, n int) { require.Equal(t, n, count, "unexpected amount of rows in table: %d instead of %d", count, n) } -// TruncateAll removes all data from known set of tables. -func (db DB) TruncateAll(t testing.TB) { - db.Truncate(t, - "replication_queue_job_lock", - "replication_queue", - "replication_queue_lock", - "node_status", - "shard_primaries", - "storage_repositories", - "repositories", - "virtual_storages", - "repository_assignments", - "storage_cleanups", - ) -} - // MustExec executes `q` with `args` and verifies there are no errors. func (db DB) MustExec(t testing.TB, q string, args ...interface{}) { _, err := db.DB.Exec(q, args...) @@ -215,7 +197,7 @@ func initPraefectTestDB(t testing.TB, database string) *sql.DB { require.NoErrorf(t, templateDB.Close(), "release connection to the %q database", templateDBConf.DBName) }() - if _, err := Migrate(templateDB, false); err != nil { + if _, err := Migrate(templateDB, false, append(migrations.All(), testDataMigrations()...)); err != nil { // If database has unknown migration we try to re-create template database with // current migration. It may be caused by other code changes done in another branch. if pErr := (*migrate.PlanError)(nil); errors.As(err, &pErr) { @@ -231,7 +213,7 @@ func initPraefectTestDB(t testing.TB, database string) *sql.DB { defer func() { require.NoErrorf(t, remigrateTemplateDB.Close(), "release connection to the %q database", templateDBConf.DBName) }() - _, err = Migrate(remigrateTemplateDB, false) + _, err = Migrate(remigrateTemplateDB, false, append(migrations.All(), testDataMigrations()...)) require.NoErrorf(t, err, "failed to run database migration on %q", praefectTemplateDatabase) } else { require.NoErrorf(t, err, "failed to run database migration on %q", praefectTemplateDatabase) @@ -330,3 +312,64 @@ func getDatabaseEnvironment(t testing.TB) map[string]string { return databaseEnv } + +// testDataMigrations function should be used to include additional migration scripts into the +// existing set of migrations. All migrations applied in order. The ordering is done via numeric +// prefix of the migration. To insert migration in between some existing migrations you should +// include it with the prefix that will be greater that the lowest and lesser than the upper one. +func testDataMigrations() []*migrate.Migration { + return []*migrate.Migration{ + { + // Applied after 20200921170311_repositories_primary_column + Id: "20200921170400_artificial_repositories", + Up: []string{ + // random_hex_string generates a random HEX string with requested length. + `-- +migrate StatementBegin + CREATE OR REPLACE FUNCTION random_hex_string(length INTEGER) RETURNS TEXT AS + $$ + DECLARE + chars TEXT[] := '{0,1,2,3,4,5,6,7,8,9,a,b,c,d,e,f}'; + result TEXT := ''; + i INTEGER := 0; + BEGIN + IF length < 0 THEN + RAISE EXCEPTION 'Given length cannot be less than 0'; + END IF; + FOR i IN 1..length LOOP + result := result || chars[1+RANDOM()*(ARRAY_LENGTH(chars, 1)-1)]; + END LOOP; + RETURN result; + END; + $$ LANGUAGE plpgsql`, + + // repository_relative_path generates a relative path for the repository. + // It has a standard format: @hashed/ab/cd/abcd000000000000000000000000000000000000000000000000000000000000.git + `-- +migrate StatementBegin + CREATE OR REPLACE FUNCTION repository_relative_path() RETURNS TEXT AS + $$ + DECLARE + result TEXT; + BEGIN + SELECT CONCAT_WS('/', '@hashed', SUBSTR(path, 1, 2), SUBSTR(path, 3, 2), path || '.git') INTO result + FROM (SELECT random_hex_string(64) AS path) t; + RETURN result; + END; + $$ LANGUAGE plpgsql + -- +migrate StatementEnd`, + + // Populate database tables with artificial data. + `INSERT INTO storage_repositories (virtual_storage, relative_path, storage, generation) + SELECT virtual_storages.name, repositories.relative_path, storages.name, 1 + FROM (SELECT UNNEST(ARRAY['artificial-praefect-1','artificial-praefect-2']) AS name) virtual_storages + CROSS JOIN + (SELECT UNNEST(ARRAY['artificial-gitaly-1','artificial-gitaly-2','artificial-gitaly-3']) AS name) storages + CROSS JOIN + (SELECT generate_series(1,300), repository_relative_path() AS relative_path) repositories`, + + `INSERT INTO repositories (virtual_storage, relative_path, generation) + SELECT DISTINCT virtual_storage, relative_path, 1 + FROM storage_repositories`, + }, + }, + } +} diff --git a/internal/praefect/datastore/glsql/testing_test.go b/internal/praefect/datastore/glsql/testing_test.go index 26a4f499cc983953c0e5dff207047a7ac3f83bfd..76b457e993f8ac9cf4c04660e362149188e885a5 100644 --- a/internal/praefect/datastore/glsql/testing_test.go +++ b/internal/praefect/datastore/glsql/testing_test.go @@ -27,5 +27,5 @@ func TestDB_Truncate(t *testing.T) { var id int require.NoError(t, db.QueryRow("INSERT INTO truncate_tbl VALUES (DEFAULT) RETURNING id").Scan(&id)) - require.Equal(t, 1, id, "sequence for primary key must be restarted") + require.Equal(t, 3, id, "sequence for primary key must be restarted") } diff --git a/internal/praefect/datastore/listener_postgres_test.go b/internal/praefect/datastore/listener_postgres_test.go index 53f1cc40358ed17bde312bf7becf557d79be8bf0..7dc5c99a801d15a8db6f61b4baae508af3f5c85e 100644 --- a/internal/praefect/datastore/listener_postgres_test.go +++ b/internal/praefect/datastore/listener_postgres_test.go @@ -377,7 +377,7 @@ func TestPostgresListener_Listen_repositories_delete(t *testing.T) { "repositories_updates", func(t *testing.T) { _, err := db.DB.Exec(` - INSERT INTO repositories + INSERT INTO repositories (virtual_storage, relative_path, generation) VALUES ('praefect-1', '/path/to/repo/1', 1), ('praefect-1', '/path/to/repo/2', 1), ('praefect-1', '/path/to/repo/3', 0), @@ -385,7 +385,7 @@ func TestPostgresListener_Listen_repositories_delete(t *testing.T) { require.NoError(t, err) }, func(t *testing.T) { - _, err := db.DB.Exec(`DELETE FROM repositories WHERE generation > 0`) + _, err := db.DB.Exec(`DELETE FROM repositories WHERE generation > 0 AND virtual_storage LIKE 'praefect%'`) require.NoError(t, err) }, func(t *testing.T, n glsql.Notification) { @@ -435,11 +435,14 @@ func TestPostgresListener_Listen_storage_repositories_update(t *testing.T) { db.Name, channel, func(t *testing.T) { - _, err := db.DB.Exec(`INSERT INTO storage_repositories VALUES ('praefect-1', '/path/to/repo', 'gitaly-1', 0)`) + _, err := db.DB.Exec( + `INSERT INTO storage_repositories(virtual_storage, relative_path, storage, generation) + VALUES ('praefect-1', '/path/to/repo', 'gitaly-1', 0)`, + ) require.NoError(t, err) }, func(t *testing.T) { - _, err := db.DB.Exec(`UPDATE storage_repositories SET generation = generation + 1`) + _, err := db.DB.Exec(`UPDATE storage_repositories SET generation = generation + 1 WHERE virtual_storage = 'praefect-1'`) require.NoError(t, err) }, func(t *testing.T, n glsql.Notification) { @@ -461,7 +464,7 @@ func TestPostgresListener_Listen_storage_empty_notification(t *testing.T) { channel, func(t *testing.T) {}, func(t *testing.T) { - _, err := db.DB.Exec(`UPDATE storage_repositories SET generation = 1`) + _, err := db.DB.Exec(`UPDATE storage_repositories SET generation = 1 WHERE repository_id = -1`) require.NoError(t, err) }, nil, // no notification events expected @@ -486,7 +489,7 @@ func TestPostgresListener_Listen_storage_repositories_delete(t *testing.T) { require.NoError(t, err) }, func(t *testing.T) { - _, err := db.DB.Exec(`DELETE FROM storage_repositories`) + _, err := db.DB.Exec(`DELETE FROM storage_repositories WHERE virtual_storage = 'praefect-1'`) require.NoError(t, err) }, func(t *testing.T, n glsql.Notification) { @@ -497,6 +500,7 @@ func TestPostgresListener_Listen_storage_repositories_delete(t *testing.T) { } func testListener(t *testing.T, dbName, channel string, setup func(t *testing.T), trigger func(t *testing.T), verifier func(t *testing.T, notification glsql.Notification)) { + t.Helper() setup(t) readyChan := make(chan struct{}) diff --git a/internal/praefect/datastore/migrations/20211018081858_healthy_storages_view_time.go b/internal/praefect/datastore/migrations/20211018081858_healthy_storages_view_time.go new file mode 100644 index 0000000000000000000000000000000000000000..fe9fd65b51182d2ea8f86f07ce149a53c08da48c --- /dev/null +++ b/internal/praefect/datastore/migrations/20211018081858_healthy_storages_view_time.go @@ -0,0 +1,44 @@ +package migrations + +import migrate "github.com/rubenv/sql-migrate" + +func init() { + m := &migrate.Migration{ + Id: "20211018081858_healthy_storages_view_time", + Up: []string{ + // Re-create view to update usage of the timestamps. + // We should always rely on the UTC time zone to make timestamps consistent + // throughout the schema. + ` +CREATE OR REPLACE VIEW healthy_storages AS + SELECT shard_name AS virtual_storage, node_name AS storage + FROM node_status AS ns + WHERE last_seen_active_at >= CLOCK_TIMESTAMP() AT TIME ZONE 'UTC' - INTERVAL '10 SECOND' + GROUP BY shard_name, node_name + HAVING COUNT(praefect_name) >= ( + SELECT CEIL(COUNT(DISTINCT praefect_name) / 2.0) AS quorum_count + FROM node_status + WHERE shard_name = ns.shard_name + AND last_contact_attempt_at >= CLOCK_TIMESTAMP() AT TIME ZONE 'UTC' - INTERVAL '60 SECOND' + ) + ORDER BY shard_name, node_name`, + }, + Down: []string{ + ` +CREATE OR REPLACE VIEW healthy_storages AS + SELECT shard_name AS virtual_storage, node_name AS storage + FROM node_status AS ns + WHERE last_seen_active_at >= NOW() - INTERVAL '10 SECOND' + GROUP BY shard_name, node_name + HAVING COUNT(praefect_name) >= ( + SELECT CEIL(COUNT(DISTINCT praefect_name) / 2.0) AS quorum_count + FROM node_status + WHERE shard_name = ns.shard_name + AND last_contact_attempt_at >= NOW() - INTERVAL '60 SECOND' + ) + ORDER BY shard_name, node_name`, + }, + } + + allMigrations = append(allMigrations, m) +} diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go index e00affd9dbbc163d19a9e5cd51c75616e8856ae9..601fcaa50a08adb4da660f61eab9a27c8de70245 100644 --- a/internal/praefect/datastore/queue.go +++ b/internal/praefect/datastore/queue.go @@ -215,12 +215,12 @@ func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event Repli ON CONFLICT (id) DO UPDATE SET id = EXCLUDED.id RETURNING id ) - INSERT INTO replication_queue(lock_id, job, meta) - SELECT insert_lock.id, $4, $5 + INSERT INTO replication_queue(lock_id, job, meta, created_at) + SELECT insert_lock.id, $4, $5, $6 FROM insert_lock RETURNING id, state, created_at, updated_at, lock_id, attempt, job, meta` // this will always return a single row result (because of lock uniqueness) or an error - rows, err := rq.qc.QueryContext(ctx, query, event.Job.VirtualStorage, event.Job.TargetNodeStorage, event.Job.RelativePath, event.Job, event.Meta) + rows, err := rq.qc.QueryContext(ctx, query, event.Job.VirtualStorage, event.Job.TargetNodeStorage, event.Job.RelativePath, event.Job, event.Meta, time.Now().UTC()) if err != nil { return ReplicationEvent{}, fmt.Errorf("query: %w", err) } @@ -280,14 +280,14 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor UPDATE replication_queue AS queue SET attempt = CASE WHEN job->>'change' = 'delete_replica' THEN queue.attempt ELSE queue.attempt - 1 END , state = 'in_progress' - , updated_at = NOW() AT TIME ZONE 'UTC' + , updated_at = $4 FROM candidate WHERE queue.id = candidate.id RETURNING queue.id, queue.state, queue.created_at, queue.updated_at, queue.lock_id, queue.attempt, queue.job, queue.meta ) , track_job_lock AS ( INSERT INTO replication_queue_job_lock (job_id, lock_id, triggered_at) - SELECT job.id, job.lock_id, NOW() AT TIME ZONE 'UTC' + SELECT job.id, job.lock_id, $4 FROM job RETURNING lock_id ) @@ -300,7 +300,7 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor SELECT id, state, created_at, updated_at, lock_id, attempt, job, meta FROM job ORDER BY id` - rows, err := rq.qc.QueryContext(ctx, query, virtualStorage, nodeStorage, count) + rows, err := rq.qc.QueryContext(ctx, query, virtualStorage, nodeStorage, count, time.Now().UTC()) if err != nil { return nil, fmt.Errorf("query: %w", err) } @@ -379,7 +379,7 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J , updated AS ( UPDATE replication_queue AS queue SET state = $2::REPLICATION_JOB_STATE, - updated_at = NOW() AT TIME ZONE 'UTC' + updated_at = $3 FROM existing WHERE existing.id = queue.id RETURNING queue.id, queue.lock_id @@ -406,7 +406,7 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J ) SELECT id FROM existing` - rows, err := rq.qc.QueryContext(ctx, query, pqIDs, state) + rows, err := rq.qc.QueryContext(ctx, query, pqIDs, state, time.Now().UTC()) if err != nil { return nil, fmt.Errorf("query: %w", err) } @@ -438,7 +438,7 @@ func (rq PostgresReplicationEventQueue) StartHealthUpdate(ctx context.Context, t query := ` UPDATE replication_queue_job_lock - SET triggered_at = NOW() AT TIME ZONE 'UTC' + SET triggered_at = $3 WHERE (job_id, lock_id) IN (SELECT UNNEST($1::BIGINT[]), UNNEST($2::TEXT[]))` for { @@ -446,7 +446,7 @@ func (rq PostgresReplicationEventQueue) StartHealthUpdate(ctx context.Context, t case <-ctx.Done(): return nil case <-trigger: - res, err := rq.qc.ExecContext(ctx, query, jobIDs, lockIDs) + res, err := rq.qc.ExecContext(ctx, query, jobIDs, lockIDs, time.Now().UTC()) if err != nil { if pqError, ok := err.(*pq.Error); ok && pqError.Code.Name() == "query_canceled" { return nil @@ -479,7 +479,7 @@ func (rq PostgresReplicationEventQueue) StartHealthUpdate(ctx context.Context, t func (rq PostgresReplicationEventQueue) AcknowledgeStale(ctx context.Context, staleAfter time.Duration) error { query := ` WITH stale_job_lock AS ( - DELETE FROM replication_queue_job_lock WHERE triggered_at < NOW() AT TIME ZONE 'UTC' - INTERVAL '1 MILLISECOND' * $1 + DELETE FROM replication_queue_job_lock WHERE triggered_at < $1 RETURNING job_id, lock_id ) , update_job AS ( @@ -505,7 +505,7 @@ func (rq PostgresReplicationEventQueue) AcknowledgeStale(ctx context.Context, st GROUP BY lock_id ) AS existing ON removed.lock_id = existing.lock_id AND removed.amount = existing.amount )` - _, err := rq.qc.ExecContext(ctx, query, staleAfter.Milliseconds()) + _, err := rq.qc.ExecContext(ctx, query, time.Now().UTC().Add(-staleAfter)) if err != nil { return fmt.Errorf("exec acknowledge stale: %w", err) } diff --git a/internal/praefect/datastore/queue_bm_test.go b/internal/praefect/datastore/queue_bm_test.go index 8fea252587c08f07d1a2a80abf9fcc48ffdcee38..1f7a20d38b54885ea245a8a3ce141fd22039ffc1 100644 --- a/internal/praefect/datastore/queue_bm_test.go +++ b/internal/praefect/datastore/queue_bm_test.go @@ -9,22 +9,22 @@ import ( ) func BenchmarkPostgresReplicationEventQueue_Acknowledge(b *testing.B) { - // go test -tags=postgres -test.run=~ -test.bench=BenchmarkPostgresReplicationEventQueue_Acknowledge/small -benchtime=1000x gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore + // go test -test.run=~ -test.bench=BenchmarkPostgresReplicationEventQueue_Acknowledge/small -benchtime=1000x gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore b.Run("small", func(b *testing.B) { benchmarkPostgresReplicationEventQueueAcknowledge(b, map[JobState]int{JobStateReady: 10, JobStateInProgress: 10, JobStateFailed: 10}) }) - // go test -tags=postgres -test.run=~ -test.bench=BenchmarkPostgresReplicationEventQueue_Acknowledge/medium -benchtime=100x gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore + // go test -test.run=~ -test.bench=BenchmarkPostgresReplicationEventQueue_Acknowledge/medium -benchtime=100x gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore b.Run("medium", func(b *testing.B) { benchmarkPostgresReplicationEventQueueAcknowledge(b, map[JobState]int{JobStateReady: 1_000, JobStateInProgress: 100, JobStateFailed: 100}) }) - // go test -tags=postgres -test.run=~ -test.bench=BenchmarkPostgresReplicationEventQueue_Acknowledge/big -benchtime=10x gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore + // go test -test.run=~ -test.bench=BenchmarkPostgresReplicationEventQueue_Acknowledge/big -benchtime=10x gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore b.Run("big", func(b *testing.B) { benchmarkPostgresReplicationEventQueueAcknowledge(b, map[JobState]int{JobStateReady: 100_000, JobStateInProgress: 100, JobStateFailed: 100}) }) - // go test -tags=postgres -test.run=~ -test.bench=BenchmarkPostgresReplicationEventQueue_Acknowledge/huge -benchtime=1x gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore + // go test -test.run=~ -test.bench=BenchmarkPostgresReplicationEventQueue_Acknowledge/huge -benchtime=1x gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore b.Run("huge", func(b *testing.B) { benchmarkPostgresReplicationEventQueueAcknowledge(b, map[JobState]int{JobStateReady: 1_000_000, JobStateInProgress: 100, JobStateFailed: 100}) }) @@ -36,7 +36,7 @@ func benchmarkPostgresReplicationEventQueueAcknowledge(b *testing.B, setup map[J ctx, cancel := testhelper.Context() defer cancel() - queue := PostgresReplicationEventQueue{db.DB} + queue := PostgresReplicationEventQueue{qc: db.DB} eventTmpl := ReplicationEvent{ Job: ReplicationJob{ Change: UpdateRepo, @@ -59,7 +59,7 @@ func benchmarkPostgresReplicationEventQueueAcknowledge(b *testing.B, setup map[J b.StopTimer() b.ResetTimer() - db.TruncateAll(b) + db.Truncate(b, "replication_queue_job_lock", "replication_queue", "replication_queue_lock") total := 0 for _, count := range setup { @@ -67,7 +67,7 @@ func benchmarkPostgresReplicationEventQueueAcknowledge(b *testing.B, setup map[J total += count } - _, err := db.DB.ExecContext( + _, err := db.ExecContext( ctx, `INSERT INTO replication_queue (state, lock_id, job) SELECT 'ready', 'praefect|gitaly-1|/project/path-'|| T.I, ('{"change":"update","relative_path":"/project/path-'|| T.I || '","virtual_storage":"praefect","source_node_storage":"gitaly-0","target_node_storage":"gitaly-1"}')::JSONB @@ -76,7 +76,7 @@ func benchmarkPostgresReplicationEventQueueAcknowledge(b *testing.B, setup map[J ) require.NoError(b, err) - _, err = db.DB.ExecContext( + _, err = db.ExecContext( ctx, `INSERT INTO replication_queue_lock SELECT DISTINCT lock_id FROM replication_queue`, diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go index 65b939b1f6d4929ce25fce874e32e77b6d010253..dd8aa41fac6a132b8dcdd68324fbc051dca3fd08 100644 --- a/internal/praefect/datastore/queue_test.go +++ b/internal/praefect/datastore/queue_test.go @@ -119,20 +119,21 @@ func TestPostgresReplicationEventQueue_DeleteReplicaUniqueIndex(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) ctx, cancel := testhelper.Context() defer cancel() if tc.existingJob != nil { - _, err := db.ExecContext(ctx, ` + _, err := tx.ExecContext(ctx, ` INSERT INTO replication_queue (state, job) VALUES ($1, $2) `, tc.existingJob.State, tc.existingJob.Job) require.NoError(t, err) } - _, err := NewPostgresReplicationEventQueue(db).Enqueue(ctx, ReplicationEvent{ + _, err := NewPostgresReplicationEventQueue(tx).Enqueue(ctx, ReplicationEvent{ State: JobStateReady, Job: ReplicationJob{ Change: DeleteReplica, @@ -864,8 +865,6 @@ func TestPostgresReplicationEventQueue_StartHealthUpdate(t *testing.T) { }) t.Run("stops after first error", func(t *testing.T) { - db.TruncateAll(t) - ctx, cancel := testhelper.Context() defer cancel() @@ -883,8 +882,6 @@ func TestPostgresReplicationEventQueue_StartHealthUpdate(t *testing.T) { }) t.Run("stops if nothing to update (extended coverage)", func(t *testing.T) { - db.TruncateAll(t) - ctx, cancel := testhelper.Context() defer cancel() @@ -907,8 +904,6 @@ func TestPostgresReplicationEventQueue_StartHealthUpdate(t *testing.T) { }) t.Run("triggers all passed in events", func(t *testing.T) { - db.TruncateAll(t) - var wg sync.WaitGroup ctx, cancel := testhelper.Context() defer func() { @@ -991,8 +986,10 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { db := glsql.NewDB(t) t.Run("no stale jobs yet", func(t *testing.T) { - db.TruncateAll(t) - source := NewPostgresReplicationEventQueue(db) + tx := db.Begin(t) + defer tx.Rollback(t) + + source := NewPostgresReplicationEventQueue(tx) event, err := source.Enqueue(ctx, eventType1) require.NoError(t, err) @@ -1002,12 +999,14 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { // events triggered just now (< 1 sec ago), so nothing considered stale require.NoError(t, source.AcknowledgeStale(ctx, time.Second)) - requireEvents(t, ctx, db, devents) + requireEvents(t, ctx, tx, devents) }) t.Run("jobs considered stale only at 'in_progress' state", func(t *testing.T) { - db.TruncateAll(t) - source := NewPostgresReplicationEventQueue(db) + tx := db.Begin(t) + defer tx.Rollback(t) + + source := NewPostgresReplicationEventQueue(tx) // move event to 'ready' state event1, err := source.Enqueue(ctx, eventType1) @@ -1041,12 +1040,14 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { devents2[0].State = JobStateFailed devents4[0].Attempt = 2 devents4[0].State = JobStateFailed - requireEvents(t, ctx, db, []ReplicationEvent{event1, devents2[0], devents4[0]}) + requireEvents(t, ctx, tx, []ReplicationEvent{event1, devents2[0], devents4[0]}) }) t.Run("stale jobs updated for all virtual storages and storages at once", func(t *testing.T) { - db.TruncateAll(t) - source := NewPostgresReplicationEventQueue(db) + tx := db.Begin(t) + defer tx.Rollback(t) + + source := NewPostgresReplicationEventQueue(tx) var events []ReplicationEvent for _, eventType := range []ReplicationEvent{eventType1, eventType2, eventType3} { @@ -1073,11 +1074,11 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { exp = append(exp, e) } - requireEvents(t, ctx, db, exp) + requireEvents(t, ctx, tx, exp) }) } -func requireEvents(t *testing.T, ctx context.Context, db glsql.DB, expected []ReplicationEvent) { +func requireEvents(t *testing.T, ctx context.Context, db glsql.Querier, expected []ReplicationEvent) { t.Helper() // as it is not possible to expect exact time of entity creation/update we do not fetch it from database diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go index a004ba88d2b5a7991fc982a1b720d40d096cb780..9781f195ca0cb947d6b79d5e5c460b6228bb1bb3 100644 --- a/internal/praefect/datastore/repository_store_test.go +++ b/internal/praefect/datastore/repository_store_test.go @@ -52,8 +52,7 @@ LEFT JOIN ( FROM repository_assignments GROUP BY repository_id, virtual_storage, relative_path ) AS repository_assignments USING (repository_id, virtual_storage, relative_path) - - `) +WHERE virtual_storage NOT LIKE 'artificial%'`) require.NoError(t, err) defer rows.Close() @@ -85,7 +84,7 @@ LEFT JOIN ( requireStorageState := func(t testing.TB, ctx context.Context, exp storageState) { rows, err := db.QueryContext(ctx, ` SELECT repository_id, virtual_storage, relative_path, storage, generation -FROM storage_repositories +FROM storage_repositories WHERE virtual_storage NOT LIKE 'artificial%' `) require.NoError(t, err) defer rows.Close() @@ -118,12 +117,13 @@ FROM storage_repositories func TestRepositoryStore_Postgres(t *testing.T) { db := glsql.NewDB(t) testRepositoryStore(t, func(t *testing.T, storages map[string][]string) (RepositoryStore, requireStateFunc) { - db.TruncateAll(t) - gs := NewPostgresRepositoryStore(db, storages) + tx := db.Begin(t) + t.Cleanup(func() { tx.Rollback(t) }) + gs := NewPostgresRepositoryStore(tx, storages) return gs, func(t *testing.T, ctx context.Context, vss virtualStorageState, ss storageState) { t.Helper() - requireState(t, ctx, db, vss, ss) + requireState(t, ctx, tx, vss, ss) } }) } @@ -141,7 +141,7 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) { first call second call error error - state storageState + state func(id int64) storageState }{ { desc: "both successful", @@ -153,13 +153,15 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) { primary: "primary", secondaries: []string{"secondary"}, }, - state: storageState{ - "virtual-storage": { - "relative-path": { - "primary": {repositoryID: 1, generation: 2}, - "secondary": {repositoryID: 1, generation: 2}, + state: func(id int64) storageState { + return storageState{ + "virtual-storage": { + "relative-path": { + "primary": {repositoryID: id, generation: 2}, + "secondary": {repositoryID: id, generation: 2}, + }, }, - }, + } }, }, { @@ -171,13 +173,15 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) { primary: "primary", secondaries: []string{"secondary"}, }, - state: storageState{ - "virtual-storage": { - "relative-path": { - "primary": {repositoryID: 1, generation: 2}, - "secondary": {repositoryID: 1, generation: 0}, + state: func(id int64) storageState { + return storageState{ + "virtual-storage": { + "relative-path": { + "primary": {repositoryID: id, generation: 2}, + "secondary": {repositoryID: id, generation: 0}, + }, }, - }, + } }, }, { @@ -189,13 +193,15 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) { primary: "secondary", }, error: errWriteToOutdatedNodes, - state: storageState{ - "virtual-storage": { - "relative-path": { - "primary": {repositoryID: 1, generation: 1}, - "secondary": {repositoryID: 1, generation: 0}, + state: func(id int64) storageState { + return storageState{ + "virtual-storage": { + "relative-path": { + "primary": {repositoryID: id, generation: 1}, + "secondary": {repositoryID: id, generation: 0}, + }, }, - }, + } }, }, } { @@ -203,14 +209,19 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - db.TruncateAll(t) - - require.NoError(t, NewPostgresRepositoryStore(db, nil).CreateRepository(ctx, 1, "virtual-storage", "relative-path", "primary", []string{"secondary"}, nil, false, false)) + globalRepositoryStore := NewPostgresRepositoryStore(db, nil) + id, err := globalRepositoryStore.ReserveRepositoryID(ctx, "virtual-storage", "relative-path") + require.NoError(t, err) + require.NoError(t, globalRepositoryStore.CreateRepository(ctx, id, "virtual-storage", "relative-path", "primary", []string{"secondary"}, nil, false, false)) + defer func() { + _, _, err := globalRepositoryStore.DeleteRepository(ctx, "virtual-storage", "relative-path") + require.NoError(t, err) + }() firstTx := db.Begin(t) secondTx := db.Begin(t) - err := NewPostgresRepositoryStore(firstTx, nil).IncrementGeneration(ctx, 1, tc.first.primary, tc.first.secondaries) + err = NewPostgresRepositoryStore(firstTx, nil).IncrementGeneration(ctx, id, tc.first.primary, tc.first.secondaries) require.NoError(t, err) go func() { @@ -218,13 +229,13 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) { firstTx.Commit(t) }() - err = NewPostgresRepositoryStore(secondTx, nil).IncrementGeneration(ctx, 1, tc.second.primary, tc.second.secondaries) + err = NewPostgresRepositoryStore(secondTx, nil).IncrementGeneration(ctx, id, tc.second.primary, tc.second.secondaries) require.Equal(t, tc.error, err) secondTx.Commit(t) requireState(t, ctx, db, - virtualStorageState{"virtual-storage": {"relative-path": {repositoryID: 1, replicaPath: "relative-path"}}}, - tc.state, + virtualStorageState{"virtual-storage": {"relative-path": {repositoryID: id, replicaPath: "relative-path"}}}, + tc.state(id), ) }) } @@ -273,7 +284,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { rs, requireState := newStore(t, nil) require.Equal(t, - rs.IncrementGeneration(ctx, 1, "primary", []string{"secondary-1"}), + rs.IncrementGeneration(ctx, 1e10, "primary", []string{"secondary-1"}), commonerr.ErrRepositoryNotFound, ) requireState(t, ctx, @@ -284,26 +295,27 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("write to outdated nodes", func(t *testing.T) { rs, requireState := newStore(t, nil) - - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "latest-node", []string{"outdated-primary", "outdated-secondary"}, nil, false, false)) - require.NoError(t, rs.SetGeneration(ctx, 1, "latest-node", 1)) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, "latest-node", []string{"outdated-primary", "outdated-secondary"}, nil, false, false)) + require.NoError(t, rs.SetGeneration(ctx, id, "latest-node", 1)) require.Equal(t, - rs.IncrementGeneration(ctx, 1, "outdated-primary", []string{"outdated-secondary"}), + rs.IncrementGeneration(ctx, id, "outdated-primary", []string{"outdated-secondary"}), errWriteToOutdatedNodes, ) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": {repositoryID: 1, replicaPath: "repository-1"}, + "repository-1": {repositoryID: id, replicaPath: "repository-1"}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "latest-node": {repositoryID: 1, generation: 1}, - "outdated-primary": {repositoryID: 1, generation: 0}, - "outdated-secondary": {repositoryID: 1, generation: 0}, + "latest-node": {repositoryID: id, generation: 1}, + "outdated-primary": {repositoryID: id, generation: 0}, + "outdated-secondary": {repositoryID: id, generation: 0}, }, }, }, @@ -313,82 +325,86 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("increments generation for up to date nodes", func(t *testing.T) { rs, requireState := newStore(t, nil) - for id, pair := range []struct{ virtualStorage, relativePath string }{ + var ids []int64 + for _, pair := range []struct{ virtualStorage, relativePath string }{ {vs, repo}, // create records that don't get modified to ensure the query is correctly scoped by virtual storage // and relative path {vs, "other-relative-path"}, {"other-virtual-storage", repo}, } { - require.NoError(t, rs.CreateRepository(ctx, int64(id+1), pair.virtualStorage, pair.relativePath, "primary", []string{"up-to-date-secondary", "outdated-secondary"}, nil, false, false)) + id, err := rs.ReserveRepositoryID(ctx, pair.virtualStorage, pair.relativePath) + require.NoError(t, err) + ids = append(ids, id) + require.NoError(t, rs.CreateRepository(ctx, id, pair.virtualStorage, pair.relativePath, "primary", []string{"up-to-date-secondary", "outdated-secondary"}, nil, false, false)) } - require.NoError(t, rs.IncrementGeneration(ctx, 1, "primary", []string{"up-to-date-secondary"})) + require.NoError(t, rs.IncrementGeneration(ctx, ids[0], "primary", []string{"up-to-date-secondary"})) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": {repositoryID: 1, replicaPath: "repository-1"}, - "other-relative-path": {repositoryID: 2, replicaPath: "other-relative-path"}, + "repository-1": {repositoryID: ids[0], replicaPath: "repository-1"}, + "other-relative-path": {repositoryID: ids[1], replicaPath: "other-relative-path"}, }, "other-virtual-storage": { - "repository-1": {repositoryID: 3, replicaPath: "repository-1"}, + "repository-1": {repositoryID: ids[2], replicaPath: "repository-1"}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "primary": {repositoryID: 1, generation: 1}, - "up-to-date-secondary": {repositoryID: 1, generation: 1}, - "outdated-secondary": {repositoryID: 1, generation: 0}, + "primary": {repositoryID: ids[0], generation: 1}, + "up-to-date-secondary": {repositoryID: ids[0], generation: 1}, + "outdated-secondary": {repositoryID: ids[0], generation: 0}, }, "other-relative-path": { - "primary": {repositoryID: 2}, - "up-to-date-secondary": {repositoryID: 2}, - "outdated-secondary": {repositoryID: 2}, + "primary": {repositoryID: ids[1]}, + "up-to-date-secondary": {repositoryID: ids[1]}, + "outdated-secondary": {repositoryID: ids[1]}, }, }, "other-virtual-storage": { "repository-1": { - "primary": {repositoryID: 3}, - "up-to-date-secondary": {repositoryID: 3}, - "outdated-secondary": {repositoryID: 3}, + "primary": {repositoryID: ids[2]}, + "up-to-date-secondary": {repositoryID: ids[2]}, + "outdated-secondary": {repositoryID: ids[2]}, }, }, }, ) - require.NoError(t, rs.IncrementGeneration(ctx, 1, "primary", []string{ + require.NoError(t, rs.IncrementGeneration(ctx, ids[0], "primary", []string{ "up-to-date-secondary", "outdated-secondary", "non-existing-secondary", })) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": {repositoryID: 1, replicaPath: "repository-1"}, - "other-relative-path": {repositoryID: 2, replicaPath: "other-relative-path"}, + "repository-1": {repositoryID: ids[0], replicaPath: "repository-1"}, + "other-relative-path": {repositoryID: ids[1], replicaPath: "other-relative-path"}, }, "other-virtual-storage": { - "repository-1": {repositoryID: 3, replicaPath: "repository-1"}, + "repository-1": {repositoryID: ids[2], replicaPath: "repository-1"}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "primary": {repositoryID: 1, generation: 2}, - "up-to-date-secondary": {repositoryID: 1, generation: 2}, - "outdated-secondary": {repositoryID: 1, generation: 0}, + "primary": {repositoryID: ids[0], generation: 2}, + "up-to-date-secondary": {repositoryID: ids[0], generation: 2}, + "outdated-secondary": {repositoryID: ids[0], generation: 0}, }, "other-relative-path": { - "primary": {repositoryID: 2}, - "up-to-date-secondary": {repositoryID: 2}, - "outdated-secondary": {repositoryID: 2}, + "primary": {repositoryID: ids[1]}, + "up-to-date-secondary": {repositoryID: ids[1]}, + "outdated-secondary": {repositoryID: ids[1]}, }, }, "other-virtual-storage": { "repository-1": { - "primary": {repositoryID: 3}, - "up-to-date-secondary": {repositoryID: 3}, - "outdated-secondary": {repositoryID: 3}, + "primary": {repositoryID: ids[2]}, + "up-to-date-secondary": {repositoryID: ids[2]}, + "outdated-secondary": {repositoryID: ids[2]}, }, }, }, @@ -399,18 +415,19 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("SetGeneration", func(t *testing.T) { t.Run("creates a record for the replica", func(t *testing.T) { rs, requireState := newStore(t, nil) - - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, stor, nil, nil, false, false)) - require.NoError(t, rs.SetGeneration(ctx, 1, "storage-2", 0)) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, stor, nil, nil, false, false)) + require.NoError(t, rs.SetGeneration(ctx, id, "storage-2", 0)) requireState(t, ctx, virtualStorageState{"virtual-storage-1": { - "repository-1": {repositoryID: 1, replicaPath: "repository-1"}, + "repository-1": {repositoryID: id, replicaPath: "repository-1"}, }}, storageState{ "virtual-storage-1": { "repository-1": { - "storage-1": {repositoryID: 1, generation: 0}, - "storage-2": {repositoryID: 1, generation: 0}, + "storage-1": {repositoryID: id, generation: 0}, + "storage-2": {repositoryID: id, generation: 0}, }, }, }, @@ -419,20 +436,21 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("updates existing record", func(t *testing.T) { rs, requireState := newStore(t, nil) - - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "storage-1", nil, nil, false, false)) - require.NoError(t, rs.SetGeneration(ctx, 1, stor, 1)) - require.NoError(t, rs.SetGeneration(ctx, 1, stor, 0)) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, "storage-1", nil, nil, false, false)) + require.NoError(t, rs.SetGeneration(ctx, id, stor, 1)) + require.NoError(t, rs.SetGeneration(ctx, id, stor, 0)) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": {repositoryID: 1, replicaPath: "repository-1"}, + "repository-1": {repositoryID: id, replicaPath: "repository-1"}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "storage-1": {repositoryID: 1, generation: 0}, + "storage-1": {repositoryID: id, generation: 0}, }, }, }, @@ -452,19 +470,20 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("sets an existing replica as the latest", func(t *testing.T) { rs, requireState := newStore(t, nil) - - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "storage-1", []string{"storage-2"}, nil, false, false)) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, "storage-1", []string{"storage-2"}, nil, false, false)) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": {repositoryID: 1, replicaPath: "repository-1"}, + "repository-1": {repositoryID: id, replicaPath: "repository-1"}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "storage-1": {repositoryID: 1, generation: 0}, - "storage-2": {repositoryID: 1, generation: 0}, + "storage-1": {repositoryID: id, generation: 0}, + "storage-2": {repositoryID: id, generation: 0}, }, }, }, @@ -474,14 +493,14 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": {repositoryID: 1, replicaPath: "repository-1"}, + "repository-1": {repositoryID: id, replicaPath: "repository-1"}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "storage-1": {repositoryID: 1, generation: 1}, - "storage-2": {repositoryID: 1, generation: 0}, + "storage-1": {repositoryID: id, generation: 1}, + "storage-2": {repositoryID: id, generation: 0}, }, }, }, @@ -490,18 +509,19 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("sets a new replica as the latest", func(t *testing.T) { rs, requireState := newStore(t, nil) - - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "storage-1", nil, nil, false, false)) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, "storage-1", nil, nil, false, false)) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": {repositoryID: 1, replicaPath: "repository-1"}, + "repository-1": {repositoryID: id, replicaPath: "repository-1"}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "storage-1": {repositoryID: 1, generation: 0}, + "storage-1": {repositoryID: id, generation: 0}, }, }, }, @@ -511,14 +531,14 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": {repositoryID: 1, replicaPath: "repository-1"}, + "repository-1": {repositoryID: id, replicaPath: "repository-1"}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "storage-1": {repositoryID: 1, generation: 0}, - "storage-2": {repositoryID: 1, generation: 1}, + "storage-1": {repositoryID: id, generation: 0}, + "storage-2": {repositoryID: id, generation: 1}, }, }, }, @@ -529,13 +549,16 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("GetGeneration", func(t *testing.T) { rs, _ := newStore(t, nil) - generation, err := rs.GetGeneration(ctx, 1, stor) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) + + generation, err := rs.GetGeneration(ctx, id, stor) require.NoError(t, err) require.Equal(t, GenerationUnknown, generation) - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, stor, nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, stor, nil, nil, false, false)) - generation, err = rs.GetGeneration(ctx, 1, stor) + generation, err = rs.GetGeneration(ctx, id, stor) require.NoError(t, err) require.Equal(t, 0, generation) }) @@ -543,48 +566,54 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("GetReplicatedGeneration", func(t *testing.T) { t.Run("no previous record allowed", func(t *testing.T) { rs, _ := newStore(t, nil) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) - gen, err := rs.GetReplicatedGeneration(ctx, 1, "source", "target") + gen, err := rs.GetReplicatedGeneration(ctx, id, "source", "target") require.NoError(t, err) require.Equal(t, GenerationUnknown, gen) - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "source", nil, nil, false, false)) - gen, err = rs.GetReplicatedGeneration(ctx, 1, "source", "target") + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, "source", nil, nil, false, false)) + gen, err = rs.GetReplicatedGeneration(ctx, id, "source", "target") require.NoError(t, err) require.Equal(t, 0, gen) }) t.Run("upgrade allowed", func(t *testing.T) { rs, _ := newStore(t, nil) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "source", nil, nil, false, false)) - require.NoError(t, rs.IncrementGeneration(ctx, 1, "source", nil)) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, "source", nil, nil, false, false)) + require.NoError(t, rs.IncrementGeneration(ctx, id, "source", nil)) - gen, err := rs.GetReplicatedGeneration(ctx, 1, "source", "target") + gen, err := rs.GetReplicatedGeneration(ctx, id, "source", "target") require.NoError(t, err) require.Equal(t, 1, gen) - require.NoError(t, rs.SetGeneration(ctx, 1, "target", 0)) - gen, err = rs.GetReplicatedGeneration(ctx, 1, "source", "target") + require.NoError(t, rs.SetGeneration(ctx, id, "target", 0)) + gen, err = rs.GetReplicatedGeneration(ctx, id, "source", "target") require.NoError(t, err) require.Equal(t, 1, gen) }) t.Run("downgrade prevented", func(t *testing.T) { rs, _ := newStore(t, nil) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "target", nil, nil, false, false)) - require.NoError(t, rs.IncrementGeneration(ctx, 1, "target", nil)) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, "target", nil, nil, false, false)) + require.NoError(t, rs.IncrementGeneration(ctx, id, "target", nil)) - _, err := rs.GetReplicatedGeneration(ctx, 1, "source", "target") + _, err = rs.GetReplicatedGeneration(ctx, id, "source", "target") require.Equal(t, DowngradeAttemptedError{"target", 1, GenerationUnknown}, err) - require.NoError(t, rs.SetGeneration(ctx, 1, "source", 1)) - _, err = rs.GetReplicatedGeneration(ctx, 1, "source", "target") + require.NoError(t, rs.SetGeneration(ctx, id, "source", 1)) + _, err = rs.GetReplicatedGeneration(ctx, id, "source", "target") require.Equal(t, DowngradeAttemptedError{"target", 1, 1}, err) - require.NoError(t, rs.SetGeneration(ctx, 1, "source", 0)) - _, err = rs.GetReplicatedGeneration(ctx, 1, "source", "target") + require.NoError(t, rs.SetGeneration(ctx, id, "source", 0)) + _, err = rs.GetReplicatedGeneration(ctx, id, "source", "target") require.Equal(t, DowngradeAttemptedError{"target", 1, 0}, err) }) }) @@ -653,26 +682,28 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { } { t.Run(tc.desc, func(t *testing.T) { rs, requireState := newStore(t, nil) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "primary", tc.updatedSecondaries, tc.outdatedSecondaries, tc.storePrimary, tc.storeAssignments)) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, "primary", tc.updatedSecondaries, tc.outdatedSecondaries, tc.storePrimary, tc.storeAssignments)) expectedStorageState := storageState{ vs: { repo: { - "primary": {repositoryID: 1, generation: 0}, + "primary": {repositoryID: id, generation: 0}, }, }, } for _, updatedSecondary := range tc.updatedSecondaries { - expectedStorageState[vs][repo][updatedSecondary] = replicaRecord{repositoryID: 1, generation: 0} + expectedStorageState[vs][repo][updatedSecondary] = replicaRecord{repositoryID: id, generation: 0} } requireState(t, ctx, virtualStorageState{ vs: { repo: { - repositoryID: 1, + repositoryID: id, replicaPath: repo, primary: tc.expectedPrimary, assignments: tc.expectedAssignments, @@ -687,21 +718,25 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("conflict due to virtual storage and relative path", func(t *testing.T) { rs, _ := newStore(t, nil) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, stor, nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, stor, nil, nil, false, false)) require.Equal(t, RepositoryExistsError{vs, repo, stor}, - rs.CreateRepository(ctx, 2, vs, repo, stor, nil, nil, false, false), + rs.CreateRepository(ctx, -1, vs, repo, stor, nil, nil, false, false), ) }) t.Run("conflict due to repository id", func(t *testing.T) { rs, _ := newStore(t, nil) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) - require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path-1", "storage-1", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, id, "virtual-storage-1", "relative-path-1", "storage-1", nil, nil, false, false)) require.Equal(t, - fmt.Errorf("repository id 1 already in use"), - rs.CreateRepository(ctx, 1, "virtual-storage-2", "relative-path-2", "storage-2", nil, nil, false, false), + fmt.Errorf("repository id %d already in use", id), + rs.CreateRepository(ctx, id, "virtual-storage-2", "relative-path-2", "storage-2", nil, nil, false, false), ) }) }) @@ -718,34 +753,40 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("delete existing", func(t *testing.T) { rs, requireState := newStore(t, nil) + id1, err := rs.ReserveRepositoryID(ctx, "virtual-storage-1", "repository-1") + require.NoError(t, err) + id2, err := rs.ReserveRepositoryID(ctx, "virtual-storage-2", "repository-2") + require.NoError(t, err) + id3, err := rs.ReserveRepositoryID(ctx, "virtual-storage-3", "repository-3") + require.NoError(t, err) - require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "repository-1", "storage-1", nil, nil, false, false)) - require.NoError(t, rs.CreateRepository(ctx, 2, "virtual-storage-2", "repository-1", "storage-1", []string{"storage-2"}, nil, false, false)) - require.NoError(t, rs.CreateRepository(ctx, 3, "virtual-storage-2", "repository-2", "storage-1", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, id1, "virtual-storage-1", "repository-1", "storage-1", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, id2, "virtual-storage-2", "repository-1", "storage-1", []string{"storage-2"}, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, id3, "virtual-storage-2", "repository-2", "storage-1", nil, nil, false, false)) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": {repositoryID: 1, replicaPath: "repository-1"}, + "repository-1": {repositoryID: id1, replicaPath: "repository-1"}, }, "virtual-storage-2": { - "repository-1": {repositoryID: 2, replicaPath: "repository-1"}, - "repository-2": {repositoryID: 3, replicaPath: "repository-2"}, + "repository-1": {repositoryID: id2, replicaPath: "repository-1"}, + "repository-2": {repositoryID: id3, replicaPath: "repository-2"}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "storage-1": {repositoryID: 1}, + "storage-1": {repositoryID: id1}, }, }, "virtual-storage-2": { "repository-1": { - "storage-1": {repositoryID: 2}, - "storage-2": {repositoryID: 2}, + "storage-1": {repositoryID: id2}, + "storage-2": {repositoryID: id2}, }, "repository-2": { - "storage-1": {repositoryID: 3}, + "storage-1": {repositoryID: id3}, }, }, }, @@ -759,21 +800,21 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": {repositoryID: 1, replicaPath: "repository-1"}, + "repository-1": {repositoryID: id1, replicaPath: "repository-1"}, }, "virtual-storage-2": { - "repository-2": {repositoryID: 3, replicaPath: "repository-2"}, + "repository-2": {repositoryID: id3, replicaPath: "repository-2"}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "storage-1": {repositoryID: 1}, + "storage-1": {repositoryID: id1}, }, }, "virtual-storage-2": { "repository-2": { - "storage-1": {repositoryID: 3}, + "storage-1": {repositoryID: id3}, }, }, }, @@ -789,33 +830,39 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { }) t.Run("delete existing", func(t *testing.T) { - require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path-1", "storage-1", []string{"storage-2"}, nil, false, false)) - require.NoError(t, rs.CreateRepository(ctx, 2, "virtual-storage-1", "relative-path-2", "storage-1", nil, nil, false, false)) - require.NoError(t, rs.CreateRepository(ctx, 3, "virtual-storage-2", "relative-path-1", "storage-1", nil, nil, false, false)) + id1, err := rs.ReserveRepositoryID(ctx, "virtual-storage-1", "relative-path-1") + require.NoError(t, err) + id2, err := rs.ReserveRepositoryID(ctx, "virtual-storage-2", "relative-path-2") + require.NoError(t, err) + id3, err := rs.ReserveRepositoryID(ctx, "virtual-storage-3", "relative-path-3") + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id1, "virtual-storage-1", "relative-path-1", "storage-1", []string{"storage-2"}, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, id2, "virtual-storage-1", "relative-path-2", "storage-1", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, id3, "virtual-storage-2", "relative-path-1", "storage-1", nil, nil, false, false)) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "relative-path-1": {repositoryID: 1, replicaPath: "relative-path-1"}, - "relative-path-2": {repositoryID: 2, replicaPath: "relative-path-2"}, + "relative-path-1": {repositoryID: id1, replicaPath: "relative-path-1"}, + "relative-path-2": {repositoryID: id2, replicaPath: "relative-path-2"}, }, "virtual-storage-2": { - "relative-path-1": {repositoryID: 3, replicaPath: "relative-path-1"}, + "relative-path-1": {repositoryID: id3, replicaPath: "relative-path-1"}, }, }, storageState{ "virtual-storage-1": { "relative-path-1": { - "storage-1": {repositoryID: 1, generation: 0}, - "storage-2": {repositoryID: 1, generation: 0}, + "storage-1": {repositoryID: id1, generation: 0}, + "storage-2": {repositoryID: id1, generation: 0}, }, "relative-path-2": { - "storage-1": {repositoryID: 2, generation: 0}, + "storage-1": {repositoryID: id2, generation: 0}, }, }, "virtual-storage-2": { "relative-path-1": { - "storage-1": {repositoryID: 3, generation: 0}, + "storage-1": {repositoryID: id3, generation: 0}, }, }, }, @@ -826,25 +873,25 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "relative-path-1": {repositoryID: 1, replicaPath: "relative-path-1"}, - "relative-path-2": {repositoryID: 2, replicaPath: "relative-path-2"}, + "relative-path-1": {repositoryID: id1, replicaPath: "relative-path-1"}, + "relative-path-2": {repositoryID: id2, replicaPath: "relative-path-2"}, }, "virtual-storage-2": { - "relative-path-1": {repositoryID: 3, replicaPath: "relative-path-1"}, + "relative-path-1": {repositoryID: id3, replicaPath: "relative-path-1"}, }, }, storageState{ "virtual-storage-1": { "relative-path-1": { - "storage-2": {repositoryID: 1, generation: 0}, + "storage-2": {repositoryID: id1, generation: 0}, }, "relative-path-2": { - "storage-1": {repositoryID: 2, generation: 0}, + "storage-1": {repositoryID: id2, generation: 0}, }, }, "virtual-storage-2": { "relative-path-1": { - "storage-1": {repositoryID: 3, generation: 0}, + "storage-1": {repositoryID: id3, generation: 0}, }, }, }, @@ -864,25 +911,28 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("rename existing", func(t *testing.T) { rs, requireState := newStore(t, nil) - - require.NoError(t, rs.CreateRepository(ctx, 1, vs, "renamed-all", "storage-1", nil, nil, false, false)) - require.NoError(t, rs.CreateRepository(ctx, 2, vs, "renamed-some", "storage-1", []string{"storage-2"}, nil, false, false)) + id1, err := rs.ReserveRepositoryID(ctx, vs, "renamed-all") + require.NoError(t, err) + id2, err := rs.ReserveRepositoryID(ctx, vs, "renamed-some") + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id1, vs, "renamed-all", "storage-1", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, id2, vs, "renamed-some", "storage-1", []string{"storage-2"}, nil, false, false)) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "renamed-all": {repositoryID: 1, replicaPath: "renamed-all"}, - "renamed-some": {repositoryID: 2, replicaPath: "renamed-some"}, + "renamed-all": {repositoryID: id1, replicaPath: "renamed-all"}, + "renamed-some": {repositoryID: id2, replicaPath: "renamed-some"}, }, }, storageState{ "virtual-storage-1": { "renamed-all": { - "storage-1": {repositoryID: 1, generation: 0}, + "storage-1": {repositoryID: id1, generation: 0}, }, "renamed-some": { - "storage-1": {repositoryID: 2, generation: 0}, - "storage-2": {repositoryID: 2, generation: 0}, + "storage-1": {repositoryID: id2, generation: 0}, + "storage-2": {repositoryID: id2, generation: 0}, }, }, }, @@ -894,20 +944,20 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "renamed-all-new": {repositoryID: 1, replicaPath: "renamed-all-new"}, - "renamed-some-new": {repositoryID: 2, replicaPath: "renamed-some-new"}, + "renamed-all-new": {repositoryID: id1, replicaPath: "renamed-all-new"}, + "renamed-some-new": {repositoryID: id2, replicaPath: "renamed-some-new"}, }, }, storageState{ "virtual-storage-1": { "renamed-all-new": { - "storage-1": {repositoryID: 1, generation: 0}, + "storage-1": {repositoryID: id1, generation: 0}, }, "renamed-some-new": { - "storage-1": {repositoryID: 2, generation: 0}, + "storage-1": {repositoryID: id2, generation: 0}, }, "renamed-some": { - "storage-2": {repositoryID: 2, generation: 0}, + "storage-2": {repositoryID: id2, generation: 0}, }, }, }, @@ -926,27 +976,28 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { require.Empty(t, replicaPath) require.Empty(t, secondaries) - replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1e10) require.Equal(t, commonerr.ErrRepositoryNotFound, err) require.Empty(t, replicaPath) require.Empty(t, secondaries) }) - - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "primary", []string{"consistent-secondary"}, nil, false, false)) - require.NoError(t, rs.IncrementGeneration(ctx, 1, "primary", []string{"consistent-secondary"})) - require.NoError(t, rs.SetGeneration(ctx, 1, "inconsistent-secondary", 0)) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, "primary", []string{"consistent-secondary"}, nil, false, false)) + require.NoError(t, rs.IncrementGeneration(ctx, id, "primary", []string{"consistent-secondary"})) + require.NoError(t, rs.SetGeneration(ctx, id, "inconsistent-secondary", 0)) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": {repositoryID: 1, replicaPath: "repository-1"}, + "repository-1": {repositoryID: id, replicaPath: "repository-1"}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "primary": {repositoryID: 1, generation: 1}, - "consistent-secondary": {repositoryID: 1, generation: 1}, - "inconsistent-secondary": {repositoryID: 1, generation: 0}, + "primary": {repositoryID: id, generation: 1}, + "consistent-secondary": {repositoryID: id, generation: 1}, + "inconsistent-secondary": {repositoryID: id, generation: 0}, }, }, }, @@ -958,13 +1009,13 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { require.Equal(t, map[string]struct{}{"primary": {}, "consistent-secondary": {}}, secondaries) require.Equal(t, repo, replicaPath) - replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, id) require.NoError(t, err) require.Equal(t, map[string]struct{}{"primary": {}, "consistent-secondary": {}}, secondaries) require.Equal(t, repo, replicaPath) }) - require.NoError(t, rs.SetGeneration(ctx, 1, "primary", 0)) + require.NoError(t, rs.SetGeneration(ctx, id, "primary", 0)) t.Run("outdated primary", func(t *testing.T) { replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo) @@ -972,28 +1023,28 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { require.Equal(t, map[string]struct{}{"consistent-secondary": {}}, secondaries) require.Equal(t, repo, replicaPath) - replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, id) require.NoError(t, err) require.Equal(t, map[string]struct{}{"consistent-secondary": {}}, secondaries) require.Equal(t, repo, replicaPath) }) t.Run("storage with highest generation is not configured", func(t *testing.T) { - require.NoError(t, rs.SetGeneration(ctx, 1, "unknown", 2)) - require.NoError(t, rs.SetGeneration(ctx, 1, "primary", 1)) + require.NoError(t, rs.SetGeneration(ctx, id, "unknown", 2)) + require.NoError(t, rs.SetGeneration(ctx, id, "primary", 1)) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": {repositoryID: 1, replicaPath: "repository-1"}, + "repository-1": {repositoryID: id, replicaPath: "repository-1"}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "unknown": {repositoryID: 1, generation: 2}, - "primary": {repositoryID: 1, generation: 1}, - "consistent-secondary": {repositoryID: 1, generation: 1}, - "inconsistent-secondary": {repositoryID: 1, generation: 0}, + "unknown": {repositoryID: id, generation: 2}, + "primary": {repositoryID: id, generation: 1}, + "consistent-secondary": {repositoryID: id, generation: 1}, + "inconsistent-secondary": {repositoryID: id, generation: 0}, }, }, }, @@ -1004,7 +1055,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { require.Equal(t, map[string]struct{}{"unknown": {}}, secondaries) require.Equal(t, repo, replicaPath) - replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, id) require.NoError(t, err) require.Equal(t, map[string]struct{}{"unknown": {}}, secondaries) require.Equal(t, repo, replicaPath) @@ -1020,7 +1071,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { require.Empty(t, secondaries) require.Empty(t, replicaPath) - replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, id) require.Equal(t, commonerr.ErrRepositoryNotFound, err) require.Empty(t, secondaries) require.Empty(t, replicaPath) @@ -1030,25 +1081,29 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("DeleteInvalidRepository", func(t *testing.T) { t.Run("only replica", func(t *testing.T) { rs, requireState := newStore(t, nil) - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "invalid-storage", nil, nil, false, false)) - require.NoError(t, rs.DeleteInvalidRepository(ctx, 1, "invalid-storage")) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, "invalid-storage", nil, nil, false, false)) + require.NoError(t, rs.DeleteInvalidRepository(ctx, id, "invalid-storage")) requireState(t, ctx, virtualStorageState{}, storageState{}) }) t.Run("another replica", func(t *testing.T) { rs, requireState := newStore(t, nil) - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "invalid-storage", []string{"other-storage"}, nil, false, false)) - require.NoError(t, rs.DeleteInvalidRepository(ctx, 1, "invalid-storage")) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, "invalid-storage", []string{"other-storage"}, nil, false, false)) + require.NoError(t, rs.DeleteInvalidRepository(ctx, id, "invalid-storage")) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": {repositoryID: 1, replicaPath: "repository-1"}, + "repository-1": {repositoryID: id, replicaPath: "repository-1"}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "other-storage": {repositoryID: 1, generation: 0}, + "other-storage": {repositoryID: id, generation: 0}, }, }, }, @@ -1063,7 +1118,9 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { require.NoError(t, err) require.False(t, exists) - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, stor, nil, nil, false, false)) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, stor, nil, nil, false, false)) exists, err = rs.RepositoryExists(ctx, vs, repo) require.NoError(t, err) require.True(t, exists) @@ -1078,23 +1135,22 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("ReserveRepositoryID", func(t *testing.T) { rs, _ := newStore(t, nil) - id, err := rs.ReserveRepositoryID(ctx, vs, repo) + idInitial, err := rs.ReserveRepositoryID(ctx, vs, repo) require.NoError(t, err) - require.Equal(t, int64(1), id) - id, err = rs.ReserveRepositoryID(ctx, vs, repo) + idNext, err := rs.ReserveRepositoryID(ctx, vs, repo) require.NoError(t, err) - require.Equal(t, int64(2), id) + require.NotEqual(t, idNext, idInitial) - require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, stor, nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, idNext, vs, repo, stor, nil, nil, false, false)) - id, err = rs.ReserveRepositoryID(ctx, vs, repo) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) require.Equal(t, commonerr.ErrRepositoryAlreadyExists, err) require.Equal(t, int64(0), id) id, err = rs.ReserveRepositoryID(ctx, vs, repo+"-2") require.NoError(t, err) - require.Equal(t, int64(3), id) + require.NotEqual(t, idNext, id) }) t.Run("GetRepositoryID", func(t *testing.T) { @@ -1104,11 +1160,13 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { require.Equal(t, commonerr.NewRepositoryNotFoundError(vs, repo), err) require.Equal(t, int64(0), id) - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, stor, nil, nil, false, false)) + id, err = rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, stor, nil, nil, false, false)) - id, err = rs.GetRepositoryID(ctx, vs, repo) + idReturned, err := rs.GetRepositoryID(ctx, vs, repo) require.Nil(t, err) - require.Equal(t, int64(1), id) + require.Equal(t, id, idReturned) }) } diff --git a/internal/praefect/datastore/storage_cleanup.go b/internal/praefect/datastore/storage_cleanup.go index adc43401bdcdc90c2fda63c030db39ae1ee39ddd..01e7f58680db69d5ca831b73031b11f1d89d2949 100644 --- a/internal/praefect/datastore/storage_cleanup.go +++ b/internal/praefect/datastore/storage_cleanup.go @@ -9,6 +9,7 @@ import ( "github.com/lib/pq" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" ) // RepositoryClusterPath identifies location of the repository in the cluster. @@ -38,13 +39,13 @@ type ClusterPath struct { } // NewStorageCleanup initialises and returns a new instance of the StorageCleanup. -func NewStorageCleanup(db *sql.DB) *StorageCleanup { +func NewStorageCleanup(db glsql.Querier) *StorageCleanup { return &StorageCleanup{db: db} } // StorageCleanup provides methods on the database for the repository cleanup operation. type StorageCleanup struct { - db *sql.DB + db glsql.Querier } // Populate adds storage to the set, so it can be acquired afterwards. @@ -68,22 +69,23 @@ func (ss *StorageCleanup) Populate(ctx context.Context, virtualStorage, storage // acquired storage. It updates last_run column of the entry on execution. func (ss *StorageCleanup) AcquireNextStorage(ctx context.Context, inactive, updatePeriod time.Duration) (*ClusterPath, func() error, error) { var entry ClusterPath + now := time.Now().UTC() if err := ss.db.QueryRowContext( ctx, `UPDATE storage_cleanups - SET triggered_at = NOW() + SET triggered_at = $3 WHERE (virtual_storage, storage) IN ( SELECT virtual_storage, storage FROM storage_cleanups WHERE - COALESCE(last_run, TO_TIMESTAMP(0)) <= (NOW() - INTERVAL '1 MILLISECOND' * $1) - AND COALESCE(triggered_at, TO_TIMESTAMP(0)) <= (NOW() - INTERVAL '1 MILLISECOND' * $2) + COALESCE(last_run, TO_TIMESTAMP(0)) <= $1 + AND COALESCE(triggered_at, TO_TIMESTAMP(0)) <= $2 ORDER BY last_run NULLS FIRST, virtual_storage, storage LIMIT 1 FOR UPDATE SKIP LOCKED ) RETURNING virtual_storage, storage`, - inactive.Milliseconds(), updatePeriod.Milliseconds(), + now.Add(-inactive), now.Add(-updatePeriod), now, ).Scan(&entry.VirtualStorage, &entry.Storage); err != nil { if !errors.Is(err, sql.ErrNoRows) { return nil, nil, fmt.Errorf("scan: %w", err) @@ -111,9 +113,9 @@ func (ss *StorageCleanup) AcquireNextStorage(ctx context.Context, inactive, upda if _, err := ss.db.ExecContext( ctx, `UPDATE storage_cleanups - SET triggered_at = NOW() + SET triggered_at = $3 WHERE virtual_storage = $1 AND storage = $2`, - entry.VirtualStorage, entry.Storage, + entry.VirtualStorage, entry.Storage, time.Now().UTC(), ); err != nil { return } @@ -131,9 +133,9 @@ func (ss *StorageCleanup) AcquireNextStorage(ctx context.Context, inactive, upda if _, err := ss.db.ExecContext( ctx, `UPDATE storage_cleanups - SET last_run = NOW(), triggered_at = NULL + SET last_run = $3, triggered_at = NULL WHERE virtual_storage = $1 AND storage = $2`, - entry.VirtualStorage, entry.Storage, + entry.VirtualStorage, entry.Storage, time.Now().UTC(), ); err != nil { return fmt.Errorf("update storage_cleanups: %w", err) } diff --git a/internal/praefect/datastore/storage_cleanup_test.go b/internal/praefect/datastore/storage_cleanup_test.go index 0e15de6392293ebd8d3713fa956f9df95faed0c9..94f042ac475d8120886431c4fdb10793ccc49e3b 100644 --- a/internal/praefect/datastore/storage_cleanup_test.go +++ b/internal/praefect/datastore/storage_cleanup_test.go @@ -1,6 +1,7 @@ package datastore import ( + "context" "database/sql" "testing" "time" @@ -18,18 +19,18 @@ func TestStorageCleanup_Populate(t *testing.T) { storageCleanup := NewStorageCleanup(db.DB) require.NoError(t, storageCleanup.Populate(ctx, "praefect", "gitaly-1")) - actual := getAllStoragesCleanup(t, db) + actual := getAllStoragesCleanup(t, ctx, db) single := []storageCleanupRow{{ClusterPath: ClusterPath{VirtualStorage: "praefect", Storage: "gitaly-1"}}} require.Equal(t, single, actual) err := storageCleanup.Populate(ctx, "praefect", "gitaly-1") require.NoError(t, err, "population of the same data should not generate an error") - actual = getAllStoragesCleanup(t, db) + actual = getAllStoragesCleanup(t, ctx, db) require.Equal(t, single, actual, "same data should not create additional rows or change existing") require.NoError(t, storageCleanup.Populate(ctx, "default", "gitaly-2")) multiple := append(single, storageCleanupRow{ClusterPath: ClusterPath{VirtualStorage: "default", Storage: "gitaly-2"}}) - actual = getAllStoragesCleanup(t, db) + actual = getAllStoragesCleanup(t, ctx, db) require.ElementsMatch(t, multiple, actual, "new data should create additional row") } @@ -38,10 +39,12 @@ func TestStorageCleanup_AcquireNextStorage(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() db := glsql.NewDB(t) - storageCleanup := NewStorageCleanup(db.DB) t.Run("ok", func(t *testing.T) { - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) + storageCleanup := NewStorageCleanup(tx) + require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1")) clusterPath, release, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second) @@ -51,7 +54,10 @@ func TestStorageCleanup_AcquireNextStorage(t *testing.T) { }) t.Run("last_run condition", func(t *testing.T) { - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) + storageCleanup := NewStorageCleanup(tx) + require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1")) // Acquire it to initialize last_run column. _, release, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second) @@ -65,7 +71,10 @@ func TestStorageCleanup_AcquireNextStorage(t *testing.T) { }) t.Run("sorting based on storage name as no executions done yet", func(t *testing.T) { - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) + storageCleanup := NewStorageCleanup(tx) + require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1")) require.NoError(t, storageCleanup.Populate(ctx, "vs", "g2")) require.NoError(t, storageCleanup.Populate(ctx, "vs", "g3")) @@ -77,7 +86,10 @@ func TestStorageCleanup_AcquireNextStorage(t *testing.T) { }) t.Run("sorting based on storage name and last_run", func(t *testing.T) { - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) + storageCleanup := NewStorageCleanup(tx) + require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1")) _, release, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second) require.NoError(t, err) @@ -91,7 +103,10 @@ func TestStorageCleanup_AcquireNextStorage(t *testing.T) { }) t.Run("sorting based on last_run", func(t *testing.T) { - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) + storageCleanup := NewStorageCleanup(tx) + require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1")) require.NoError(t, storageCleanup.Populate(ctx, "vs", "g2")) clusterPath, release, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second) @@ -110,7 +125,10 @@ func TestStorageCleanup_AcquireNextStorage(t *testing.T) { }) t.Run("already acquired won't be acquired until released", func(t *testing.T) { - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) + storageCleanup := NewStorageCleanup(tx) + require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1")) _, release1, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second) require.NoError(t, err) @@ -128,7 +146,10 @@ func TestStorageCleanup_AcquireNextStorage(t *testing.T) { }) t.Run("already acquired won't be acquired until released", func(t *testing.T) { - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) + storageCleanup := NewStorageCleanup(tx) + require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1")) _, release1, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second) require.NoError(t, err) @@ -146,30 +167,31 @@ func TestStorageCleanup_AcquireNextStorage(t *testing.T) { }) t.Run("acquired for long time triggers update loop", func(t *testing.T) { - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) + storageCleanup := NewStorageCleanup(tx) + require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1")) - start := time.Now().UTC() _, release, err := storageCleanup.AcquireNextStorage(ctx, 0, 200*time.Millisecond) require.NoError(t, err) // Make sure the triggered_at column has a non NULL value after the record is acquired. - check1 := getAllStoragesCleanup(t, db) + check1 := getAllStoragesCleanup(t, ctx, tx) require.Len(t, check1, 1) require.True(t, check1[0].TriggeredAt.Valid) - require.True(t, check1[0].TriggeredAt.Time.After(start), check1[0].TriggeredAt.Time.String(), start.String()) // Check the goroutine running in the background updates triggered_at column periodically. time.Sleep(time.Second) - check2 := getAllStoragesCleanup(t, db) + check2 := getAllStoragesCleanup(t, ctx, tx) require.Len(t, check2, 1) require.True(t, check2[0].TriggeredAt.Valid) - require.True(t, check2[0].TriggeredAt.Time.After(check1[0].TriggeredAt.Time), check2[0].TriggeredAt.Time.String(), check1[0].TriggeredAt.Time.String()) + require.Truef(t, check2[0].TriggeredAt.Time.After(check1[0].TriggeredAt.Time), "%s is not after %s", check2[0].TriggeredAt, check1[0].TriggeredAt) require.NoError(t, release()) // Make sure the triggered_at column has a NULL value after the record is released. - check3 := getAllStoragesCleanup(t, db) + check3 := getAllStoragesCleanup(t, ctx, tx) require.Len(t, check3, 1) require.False(t, check3[0].TriggeredAt.Valid) }) @@ -183,8 +205,21 @@ func TestStorageCleanup_Exists(t *testing.T) { db := glsql.NewDB(t) repoStore := NewPostgresRepositoryStore(db.DB, nil) - require.NoError(t, repoStore.CreateRepository(ctx, 0, "vs", "p/1", "g1", []string{"g2", "g3"}, nil, false, false)) - require.NoError(t, repoStore.CreateRepository(ctx, 1, "vs", "p/2", "g1", []string{"g2", "g3"}, nil, false, false)) + const ( + virtualStorage = "vs" + relativePath1 = "p/1" + relativePath2 = "p/2" + storage1 = "g1" + storage2 = "g2" + storage3 = "g3" + ) + id1, err := repoStore.ReserveRepositoryID(ctx, virtualStorage, relativePath1) + require.NoError(t, err) + id2, err := repoStore.ReserveRepositoryID(ctx, virtualStorage, relativePath2) + require.NoError(t, err) + + require.NoError(t, repoStore.CreateRepository(ctx, id1, virtualStorage, relativePath1, storage1, []string{storage2, storage3}, nil, false, false)) + require.NoError(t, repoStore.CreateRepository(ctx, id2, virtualStorage, relativePath2, storage1, []string{storage2, storage3}, nil, false, false)) storageCleanup := NewStorageCleanup(db.DB) for _, tc := range []struct { @@ -196,57 +231,57 @@ func TestStorageCleanup_Exists(t *testing.T) { }{ { desc: "multiple doesn't exist", - virtualStorage: "vs", - storage: "g1", - relativeReplicaPaths: []string{"p/1", "p/2", "path/x", "path/y"}, + virtualStorage: virtualStorage, + storage: storage1, + relativeReplicaPaths: []string{relativePath1, relativePath2, "path/x", "path/y"}, out: []RepositoryClusterPath{ - {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "g1"}, RelativeReplicaPath: "path/x"}, - {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "g1"}, RelativeReplicaPath: "path/y"}, + {ClusterPath: ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativeReplicaPath: "path/x"}, + {ClusterPath: ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativeReplicaPath: "path/y"}, }, }, { desc: "duplicates", - virtualStorage: "vs", - storage: "g1", - relativeReplicaPaths: []string{"p/1", "path/x", "path/x"}, + virtualStorage: virtualStorage, + storage: storage1, + relativeReplicaPaths: []string{relativePath1, "path/x", "path/x"}, out: []RepositoryClusterPath{ - {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "g1"}, RelativeReplicaPath: "path/x"}, + {ClusterPath: ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativeReplicaPath: "path/x"}, }, }, { desc: "all exist", - virtualStorage: "vs", - storage: "g1", - relativeReplicaPaths: []string{"p/1", "p/2"}, + virtualStorage: virtualStorage, + storage: storage1, + relativeReplicaPaths: []string{relativePath1, relativePath2}, out: nil, }, { desc: "all doesn't exist", - virtualStorage: "vs", - storage: "g1", + virtualStorage: virtualStorage, + storage: storage1, relativeReplicaPaths: []string{"path/x", "path/y", "path/z"}, out: []RepositoryClusterPath{ - {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "g1"}, RelativeReplicaPath: "path/x"}, - {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "g1"}, RelativeReplicaPath: "path/y"}, - {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "g1"}, RelativeReplicaPath: "path/z"}, + {ClusterPath: ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativeReplicaPath: "path/x"}, + {ClusterPath: ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativeReplicaPath: "path/y"}, + {ClusterPath: ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativeReplicaPath: "path/z"}, }, }, { desc: "doesn't exist because of storage", - virtualStorage: "vs", + virtualStorage: virtualStorage, storage: "stub", relativeReplicaPaths: []string{"path/x"}, out: []RepositoryClusterPath{ - {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "stub"}, RelativeReplicaPath: "path/x"}, + {ClusterPath: ClusterPath{VirtualStorage: virtualStorage, Storage: "stub"}, RelativeReplicaPath: "path/x"}, }, }, { desc: "doesn't exist because of virtual storage", virtualStorage: "stub", - storage: "g1", + storage: storage1, relativeReplicaPaths: []string{"path/x"}, out: []RepositoryClusterPath{ - {ClusterPath: ClusterPath{VirtualStorage: "stub", Storage: "g1"}, RelativeReplicaPath: "path/x"}, + {ClusterPath: ClusterPath{VirtualStorage: "stub", Storage: storage1}, RelativeReplicaPath: "path/x"}, }, }, } { @@ -264,8 +299,8 @@ type storageCleanupRow struct { TriggeredAt sql.NullTime } -func getAllStoragesCleanup(t testing.TB, db glsql.DB) []storageCleanupRow { - rows, err := db.Query(`SELECT * FROM storage_cleanups`) +func getAllStoragesCleanup(t testing.TB, ctx context.Context, db glsql.Querier) []storageCleanupRow { + rows, err := db.QueryContext(ctx, `SELECT * FROM storage_cleanups`) require.NoError(t, err) defer func() { require.NoError(t, rows.Close()) diff --git a/internal/praefect/datastore/storage_provider_test.go b/internal/praefect/datastore/storage_provider_test.go index fc214df1e9a8056dadb8828f91300ccf6c99cf17..a3054c9c1caccc994a3a3a8bafb7b4947e68f864 100644 --- a/internal/praefect/datastore/storage_provider_test.go +++ b/internal/praefect/datastore/storage_provider_test.go @@ -22,13 +22,20 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { t.Parallel() db := glsql.NewDB(t) - rs := NewPostgresRepositoryStore(db, nil) + newPostgresRepositoryStore := func(t *testing.T) *PostgresRepositoryStore { + tx := db.Begin(t) + t.Cleanup(func() { tx.Rollback(t) }) + return NewPostgresRepositoryStore(tx, nil) + } t.Run("unknown virtual storage", func(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - require.NoError(t, rs.CreateRepository(ctx, 1, "unknown", "/repo/path", "g1", []string{"g2", "g3"}, nil, true, false)) + rs := newPostgresRepositoryStore(t) + repositoryID, err := rs.ReserveRepositoryID(ctx, "unknown", "/repo/path") + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, repositoryID, "unknown", "/repo/path", "g1", []string{"g2", "g3"}, nil, true, false)) cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"}) require.NoError(t, err) @@ -49,12 +56,13 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { }) t.Run("miss -> populate -> hit", func(t *testing.T) { - db.TruncateAll(t) - ctx, cancel := testhelper.Context() defer cancel() - require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path", "g1", []string{"g2", "g3"}, nil, true, false)) + rs := newPostgresRepositoryStore(t) + repositoryID, err := rs.ReserveRepositoryID(ctx, "vs", "/repo/path") + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, repositoryID, "vs", "/repo/path", "g1", []string{"g2", "g3"}, nil, true, false)) cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"}) require.NoError(t, err) @@ -91,11 +99,10 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { }) t.Run("repository store returns an error", func(t *testing.T) { - db.TruncateAll(t) - ctx, cancel := testhelper.Context(testhelper.ContextWithLogger(testhelper.DiscardTestEntry(t))) defer cancel() + rs := newPostgresRepositoryStore(t) cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"}) require.NoError(t, err) cache.Connected() @@ -113,15 +120,16 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { }) t.Run("cache is disabled after handling invalid payload", func(t *testing.T) { - db.TruncateAll(t) - logger := testhelper.DiscardTestEntry(t) logHook := test.NewLocal(logger.Logger) ctx, cancel := testhelper.Context(testhelper.ContextWithLogger(logger)) defer cancel() - require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path/1", "g1", []string{"g2", "g3"}, nil, true, false)) + rs := newPostgresRepositoryStore(t) + repositoryID, err := rs.ReserveRepositoryID(ctx, "vs", "/repo/path/1") + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, repositoryID, "vs", "/repo/path/1", "g1", []string{"g2", "g3"}, nil, true, false)) cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"}) require.NoError(t, err) @@ -176,13 +184,16 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { }) t.Run("cache invalidation evicts cached entries", func(t *testing.T) { - db.TruncateAll(t) - ctx, cancel := testhelper.Context() defer cancel() - require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path/1", "g1", []string{"g2", "g3"}, nil, true, false)) - require.NoError(t, rs.CreateRepository(ctx, 2, "vs", "/repo/path/2", "g1", []string{"g2"}, nil, true, false)) + rs := newPostgresRepositoryStore(t) + repositoryID1, err := rs.ReserveRepositoryID(ctx, "vs", "/repo/path/1") + require.NoError(t, err) + repositoryID2, err := rs.ReserveRepositoryID(ctx, "vs", "/repo/path/2") + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, repositoryID1, "vs", "/repo/path/1", "g1", []string{"g2", "g3"}, nil, true, false)) + require.NoError(t, rs.CreateRepository(ctx, repositoryID2, "vs", "/repo/path/2", "g1", []string{"g2"}, nil, true, false)) cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"}) require.NoError(t, err) @@ -229,12 +240,13 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { }) t.Run("disconnect event disables cache", func(t *testing.T) { - db.TruncateAll(t) - ctx, cancel := testhelper.Context() defer cancel() - require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path", "g1", []string{"g2", "g3"}, nil, true, false)) + rs := newPostgresRepositoryStore(t) + repositoryID, err := rs.ReserveRepositoryID(ctx, "vs", "/repo/path") + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, repositoryID, "vs", "/repo/path", "g1", []string{"g2", "g3"}, nil, true, false)) cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"}) require.NoError(t, err) @@ -266,13 +278,17 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { }) t.Run("concurrent access", func(t *testing.T) { - db.TruncateAll(t) - ctx, cancel := testhelper.Context() defer cancel() - require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path/1", "g1", nil, nil, true, false)) - require.NoError(t, rs.CreateRepository(ctx, 2, "vs", "/repo/path/2", "g1", nil, nil, true, false)) + rs := NewPostgresRepositoryStore(glsql.NewDB(t), nil) + + repositoryID1, err := rs.ReserveRepositoryID(ctx, "vs", "/repo/path/1") + require.NoError(t, err) + repositoryID2, err := rs.ReserveRepositoryID(ctx, "vs", "/repo/path/2") + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, repositoryID1, "vs", "/repo/path/1", "g1", nil, nil, true, false)) + require.NoError(t, rs.CreateRepository(ctx, repositoryID2, "vs", "/repo/path/2", "g1", nil, nil, true, false)) cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"}) require.NoError(t, err) diff --git a/internal/praefect/info_service_test.go b/internal/praefect/info_service_test.go index a492841245b540ee41e679daeb8ac14e5f03b583..e45bfc73a7c4454bef09fca05d6023eed543dba9 100644 --- a/internal/praefect/info_service_test.go +++ b/internal/praefect/info_service_test.go @@ -78,8 +78,10 @@ func TestInfoService_RepositoryReplicas(t *testing.T) { elector := nodes.NewPerRepositoryElector(tx) conns := nodeSet.Connections() rs := datastore.NewPostgresRepositoryStore(tx, conf.StorageNames()) + id, err := rs.ReserveRepositoryID(ctx, virtualStorage, testRepo.GetRelativePath()) + require.NoError(t, err) require.NoError(t, - rs.CreateRepository(ctx, 1, virtualStorage, testRepo.GetRelativePath(), "g-1", []string{"g-2", "g-3"}, nil, true, false), + rs.CreateRepository(ctx, id, virtualStorage, testRepo.GetRelativePath(), "g-1", []string{"g-2", "g-3"}, nil, true, false), ) cc, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{ diff --git a/internal/praefect/nodes/health_manager.go b/internal/praefect/nodes/health_manager.go index 502f8ce61a8a92897208117293c9c7a28d74a1d8..d05854158e7e8baa73327bd1a3e26feae52fc4e5 100644 --- a/internal/praefect/nodes/health_manager.go +++ b/internal/praefect/nodes/health_manager.go @@ -140,9 +140,10 @@ func (hm *HealthManager) updateHealthChecks(ctx context.Context) error { hm.locallyHealthy.Store(locallyHealthy) + now := time.Now().UTC() if _, err := hm.db.ExecContext(ctx, ` INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at, last_seen_active_at) -SELECT $1, shard_name, node_name, NOW(), CASE WHEN is_healthy THEN NOW() ELSE NULL END +SELECT $1, shard_name, node_name, $5::TIMESTAMP, CASE WHEN is_healthy THEN $5::TIMESTAMP ELSE NULL END FROM ( SELECT unnest($2::text[]) AS shard_name, unnest($3::text[]) AS node_name, @@ -150,13 +151,14 @@ FROM ( ) AS results ON CONFLICT (praefect_name, shard_name, node_name) DO UPDATE SET - last_contact_attempt_at = NOW(), + last_contact_attempt_at = $5, last_seen_active_at = COALESCE(EXCLUDED.last_seen_active_at, node_status.last_seen_active_at) `, hm.praefectName, pq.StringArray(virtualStorages), pq.StringArray(physicalStorages), pq.BoolArray(healthy), + now, ); err != nil { return fmt.Errorf("update checks: %w", err) } diff --git a/internal/praefect/nodes/health_manager_test.go b/internal/praefect/nodes/health_manager_test.go index 07b35909063d097cfb7001e8160b590520e350d9..a36bb67590ad9eccafb7937f02b20b180d9adc7c 100644 --- a/internal/praefect/nodes/health_manager_test.go +++ b/internal/praefect/nodes/health_manager_test.go @@ -142,7 +142,7 @@ func TestHealthManager(t *testing.T) { }, }, { - After: failoverTimeout, + After: failoverTimeout + time.Millisecond, PraefectName: "praefect-1", LocalStatus: LocalStatus{ "virtual-storage-1": { @@ -178,7 +178,7 @@ func TestHealthManager(t *testing.T) { HealthConsensus: map[string][]string{}, }, { - After: activePraefectTimeout, + After: activePraefectTimeout + time.Millisecond, PraefectName: "praefect-3", LocalStatus: LocalStatus{ "virtual-storage-1": { @@ -456,7 +456,7 @@ func TestHealthManager(t *testing.T) { }, }, { - After: failoverTimeout, + After: failoverTimeout + time.Millisecond, PraefectName: "praefect-1", LocalStatus: LocalStatus{ "virtual-storage-1": { @@ -473,7 +473,8 @@ func TestHealthManager(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) healthStatus := map[string]grpc_health_v1.HealthCheckResponse_ServingStatus{} // healthManagers are cached in order to keep the internal state intact between different @@ -497,7 +498,7 @@ func TestHealthManager(t *testing.T) { } } - hm = NewHealthManager(testhelper.DiscardTestLogger(t), db, hc.PraefectName, clients) + hm = NewHealthManager(testhelper.DiscardTestLogger(t), tx, hc.PraefectName, clients) hm.handleError = func(err error) error { return err } healthManagers[hc.PraefectName] = hm } @@ -517,7 +518,7 @@ func TestHealthManager(t *testing.T) { // predate earlier health checks to simulate this health check being run after a certain // time period if hc.After > 0 { - predateHealthChecks(t, db, hc.After) + predateHealthChecks(t, ctx, tx, hc.After) } expectedHealthyNodes := map[string][]string{} @@ -557,10 +558,10 @@ func TestHealthManager(t *testing.T) { } } -func predateHealthChecks(t testing.TB, db glsql.DB, amount time.Duration) { +func predateHealthChecks(t testing.TB, ctx context.Context, db glsql.Querier, amount time.Duration) { t.Helper() - _, err := db.Exec(` + _, err := db.ExecContext(ctx, ` UPDATE node_status SET last_contact_attempt_at = last_contact_attempt_at - INTERVAL '1 MICROSECOND' * $1, last_seen_active_at = last_seen_active_at - INTERVAL '1 MICROSECOND' * $1 diff --git a/internal/praefect/nodes/per_repository_test.go b/internal/praefect/nodes/per_repository_test.go index 27d69609b57af494ff5eb8412110bb6c6c1cbf7f..426cf384d8a0ee831ff5c43675438bee93d1de6c 100644 --- a/internal/praefect/nodes/per_repository_test.go +++ b/internal/praefect/nodes/per_repository_test.go @@ -485,27 +485,32 @@ func TestPerRepositoryElector(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - db.TruncateAll(t) - + var repositoryID int64 rs := datastore.NewPostgresRepositoryStore(db, nil) for virtualStorage, relativePaths := range tc.state { for relativePath, storages := range relativePaths { - repositoryID, err := rs.ReserveRepositoryID(ctx, virtualStorage, relativePath) + reservedRepositoryID, err := rs.ReserveRepositoryID(ctx, virtualStorage, relativePath) require.NoError(t, err) repoCreated := false for storage, record := range storages { if !repoCreated { repoCreated = true - require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, relativePath, storage, nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, reservedRepositoryID, virtualStorage, relativePath, storage, nil, nil, false, false)) + defer func(virtualStorage, relativePath string) { + _, _, err = rs.DeleteRepository(ctx, virtualStorage, relativePath) + require.NoError(t, err) + }(virtualStorage, relativePath) + repositoryID = reservedRepositoryID } - require.NoError(t, rs.SetGeneration(ctx, repositoryID, storage, record.generation)) + require.NoError(t, rs.SetGeneration(ctx, reservedRepositoryID, storage, record.generation)) if record.assigned { _, err := db.ExecContext(ctx, ` - INSERT INTO repository_assignments VALUES ($1, $2, $3, $4) - `, virtualStorage, relativePath, storage, repositoryID) + INSERT INTO repository_assignments (virtual_storage, relative_path, storage, repository_id) + VALUES ($1, $2, $3, $4) + `, virtualStorage, relativePath, storage, reservedRepositoryID) require.NoError(t, err) } } @@ -520,15 +525,18 @@ func TestPerRepositoryElector(t *testing.T) { event.Job.RepositoryID = repositoryID - _, err = db.ExecContext(ctx, - "INSERT INTO replication_queue (state, job) VALUES ($1, $2)", + var id int64 + require.NoError(t, db.QueryRowContext(ctx, + "INSERT INTO replication_queue (state, job) VALUES ($1, $2) RETURNING id", event.State, event.Job, - ) - require.NoError(t, err) + ).Scan(&id)) + t.Cleanup(func() { + _, err = db.ExecContext(ctx, `DELETE FROM replication_queue WHERE id = $1`, id) + require.NoError(t, err) + }) } previousPrimary := "" - const repositoryID int64 = 1 for _, step := range tc.steps { runElection := func(tx *glsql.TxWrapper) (string, *logrus.Entry) { diff --git a/internal/praefect/nodes/sql_elector.go b/internal/praefect/nodes/sql_elector.go index a0007c85506c61e470c5c5910a1731ad5586fb72..75df348fdd51be847e8d074697af44d566ddad19 100644 --- a/internal/praefect/nodes/sql_elector.go +++ b/internal/praefect/nodes/sql_elector.go @@ -181,13 +181,14 @@ func (s *sqlElector) checkNodes(ctx context.Context) error { defer s.updateMetrics() + timestamp := time.Now().UTC() for _, n := range s.nodes { wg.Add(1) go func(n Node) { defer wg.Done() result, _ := n.CheckHealth(ctx) - if err := s.updateNode(ctx, tx, n, result); err != nil { + if err := s.updateNode(ctx, tx, n, result, timestamp); err != nil { s.log.WithError(err).WithFields(logrus.Fields{ "storage": n.GetStorage(), "address": n.GetAddress(), @@ -198,7 +199,7 @@ func (s *sqlElector) checkNodes(ctx context.Context) error { wg.Wait() - err = s.validateAndUpdatePrimary(ctx, tx) + err = s.validateAndUpdatePrimary(ctx, tx, timestamp) if err != nil { s.log.WithError(err).Error("unable to validate primary") @@ -243,26 +244,26 @@ func (s *sqlElector) setPrimary(candidate *sqlCandidate) { } } -func (s *sqlElector) updateNode(ctx context.Context, tx *sql.Tx, node Node, result bool) error { +func (s *sqlElector) updateNode(ctx context.Context, tx *sql.Tx, node Node, result bool, timestamp time.Time) error { var q string if result { q = `INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at, last_seen_active_at) -VALUES ($1, $2, $3, NOW(), NOW()) +VALUES ($1, $2, $3, $4, $4) ON CONFLICT (praefect_name, shard_name, node_name) DO UPDATE SET -last_contact_attempt_at = NOW(), -last_seen_active_at = NOW()` +last_contact_attempt_at = $4, +last_seen_active_at = $4` } else { // Omit the last_seen_active_at since we weren't successful at contacting this node q = `INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at) -VALUES ($1, $2, $3, NOW()) +VALUES ($1, $2, $3, $4) ON CONFLICT (praefect_name, shard_name, node_name) DO UPDATE SET -last_contact_attempt_at = NOW()` +last_contact_attempt_at = $4` } - _, err := tx.ExecContext(ctx, q, s.praefectName, s.shardName, node.GetStorage()) + _, err := tx.ExecContext(ctx, q, s.praefectName, s.shardName, node.GetStorage(), timestamp) if err != nil { s.log.Errorf("Error updating node: %s", err) } @@ -315,11 +316,11 @@ func (s *sqlElector) updateMetrics() { func (s *sqlElector) getQuorumCount(ctx context.Context, tx *sql.Tx) (int, error) { // This is crude form of service discovery. Find how many active // Praefect nodes based on whether they attempted to update entries. - q := `SELECT COUNT (DISTINCT praefect_name) FROM node_status WHERE shard_name = $1 AND last_contact_attempt_at >= NOW() - INTERVAL '1 MICROSECOND' * $2` + q := `SELECT COUNT (DISTINCT praefect_name) FROM node_status WHERE shard_name = $1 AND last_contact_attempt_at >= $2` var totalCount int - if err := tx.QueryRowContext(ctx, q, s.shardName, activePraefectTimeout.Microseconds()).Scan(&totalCount); err != nil { + if err := tx.QueryRowContext(ctx, q, s.shardName, time.Now().UTC().Add(-activePraefectTimeout)).Scan(&totalCount); err != nil { return 0, fmt.Errorf("error retrieving quorum count: %w", err) } @@ -427,16 +428,17 @@ func (s *sqlElector) electNewPrimary(ctx context.Context, tx *sql.Tx, candidates // N1 (RW) -> N2 (RO) -> N3 (RW): `previous_writable_primary` is N1 as N2 was not write-enabled before the second // failover and thus any data missing from N3 must be on N1. q = `INSERT INTO shard_primaries (elected_by_praefect, shard_name, node_name, elected_at) - SELECT $1::VARCHAR, $2::VARCHAR, $3::VARCHAR, NOW() + SELECT $1::VARCHAR, $2::VARCHAR, $3::VARCHAR, $5 WHERE $3 != COALESCE((SELECT node_name FROM shard_primaries WHERE shard_name = $2::VARCHAR AND demoted = false), '') ON CONFLICT (shard_name) DO UPDATE SET elected_by_praefect = EXCLUDED.elected_by_praefect , node_name = EXCLUDED.node_name , elected_at = EXCLUDED.elected_at , demoted = false - WHERE shard_primaries.elected_at < now() - INTERVAL '1 MICROSECOND' * $4 + WHERE shard_primaries.elected_at < $4 ` - _, err := tx.ExecContext(ctx, q, s.praefectName, s.shardName, newPrimaryStorage, s.failoverTimeout.Microseconds()) + now := time.Now().UTC() + _, err := tx.ExecContext(ctx, q, s.praefectName, s.shardName, newPrimaryStorage, now.Add(-s.failoverTimeout), now) if err != nil { s.log.Errorf("error updating new primary: %s", err) return err @@ -445,7 +447,7 @@ func (s *sqlElector) electNewPrimary(ctx context.Context, tx *sql.Tx, candidates return nil } -func (s *sqlElector) validateAndUpdatePrimary(ctx context.Context, tx *sql.Tx) error { +func (s *sqlElector) validateAndUpdatePrimary(ctx context.Context, tx *sql.Tx, timestamp time.Time) error { quorumCount, err := s.getQuorumCount(ctx, tx) if err != nil { return err @@ -453,12 +455,12 @@ func (s *sqlElector) validateAndUpdatePrimary(ctx context.Context, tx *sql.Tx) e // Retrieves candidates, ranked by the ones that are the most active q := `SELECT node_name FROM node_status - WHERE shard_name = $1 AND last_seen_active_at >= NOW() - INTERVAL '1 MICROSECOND' * $2 + WHERE shard_name = $1 AND last_seen_active_at >= $2 GROUP BY node_name HAVING COUNT(praefect_name) >= $3 ORDER BY COUNT(node_name) DESC, node_name ASC` - rows, err := tx.QueryContext(ctx, q, s.shardName, s.failoverTimeout.Microseconds(), quorumCount) + rows, err := tx.QueryContext(ctx, q, s.shardName, timestamp.Add(-s.failoverTimeout), quorumCount) if err != nil { return fmt.Errorf("error retrieving candidates: %w", err) } diff --git a/internal/praefect/reconciler/reconciler_test.go b/internal/praefect/reconciler/reconciler_test.go index 95871902aba2ce56e026322efe1d2c84086b1a17..3d54fb316d01ead1561c139de13c7231d37c12cb 100644 --- a/internal/praefect/reconciler/reconciler_test.go +++ b/internal/praefect/reconciler/reconciler_test.go @@ -4,7 +4,6 @@ import ( "context" "database/sql" "fmt" - "sort" "testing" "github.com/lib/pq" @@ -31,9 +30,11 @@ func TestReconciler(t *testing.T) { type repositories map[string]map[string]map[string]storageRecord type existingJobs []datastore.ReplicationEvent - // Jobs is a set of jobs that the reconciliation run might produce. Only one of the job - // sets is expected from a run at a time. - type jobs [][]datastore.ReplicationJob + type jobs []datastore.ReplicationJob + type jobsCondition struct { + oneOf bool + jobs jobs + } type storages map[string][]string configuredStorages := storages{ @@ -85,12 +86,12 @@ func TestReconciler(t *testing.T) { db := glsql.NewDB(t) for _, tc := range []struct { - desc string - healthyStorages storages - repositories repositories - existingJobs existingJobs - deletedRepositories map[string][]string - reconciliationJobs jobs + desc string + healthyStorages storages + repositories repositories + existingJobs existingJobs + deletedRepositories map[string][]string + reconciliationJobCondition jobsCondition }{ { desc: "no repositories", @@ -125,8 +126,8 @@ func TestReconciler(t *testing.T) { }, }, }, - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + jobs: jobs{ { Change: datastore.UpdateRepo, VirtualStorage: "virtual-storage-1", @@ -159,20 +160,22 @@ func TestReconciler(t *testing.T) { return repos }(), - reconciliationJobs: func() jobs { - var generated []datastore.ReplicationJob - for i := 0; i < 2*logBatchSize+1; i++ { - generated = append(generated, datastore.ReplicationJob{ - Change: datastore.UpdateRepo, - VirtualStorage: "virtual-storage-1", - RelativePath: fmt.Sprintf("relative-path-%d", i), - SourceNodeStorage: "storage-1", - TargetNodeStorage: "storage-2", - }) - } + reconciliationJobCondition: jobsCondition{ + jobs: func() jobs { + var generated jobs + for i := 0; i < 2*logBatchSize+1; i++ { + generated = append(generated, datastore.ReplicationJob{ + Change: datastore.UpdateRepo, + VirtualStorage: "virtual-storage-1", + RelativePath: fmt.Sprintf("relative-path-%d", i), + SourceNodeStorage: "storage-1", + TargetNodeStorage: "storage-2", + }) + } - return jobs{generated} - }(), + return generated + }(), + }, }, { desc: "no healthy source to reconcile from", @@ -202,8 +205,8 @@ func TestReconciler(t *testing.T) { }, }, }, - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + jobs: jobs{ { Change: datastore.UpdateRepo, VirtualStorage: "virtual-storage-1", @@ -225,8 +228,8 @@ func TestReconciler(t *testing.T) { }, }, }, - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + jobs: jobs{ { Change: datastore.UpdateRepo, VirtualStorage: "virtual-storage-1", @@ -260,8 +263,8 @@ func TestReconciler(t *testing.T) { }, }, }, - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + jobs: jobs{ { Change: datastore.UpdateRepo, VirtualStorage: "virtual-storage-1", @@ -319,8 +322,8 @@ func TestReconciler(t *testing.T) { TargetNodeStorage: "storage-1", }, ), - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + jobs: jobs{ { Change: datastore.UpdateRepo, VirtualStorage: "virtual-storage-1", @@ -356,8 +359,8 @@ func TestReconciler(t *testing.T) { TargetNodeStorage: "storage-2", }, ), - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + jobs: jobs{ { Change: datastore.UpdateRepo, VirtualStorage: "virtual-storage-1", @@ -403,8 +406,8 @@ func TestReconciler(t *testing.T) { TargetNodeStorage: "storage-3", }, ), - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + jobs: jobs{ { Change: datastore.UpdateRepo, VirtualStorage: "virtual-storage-1", @@ -441,8 +444,8 @@ func TestReconciler(t *testing.T) { }, }, }, - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + jobs: jobs{ { Change: datastore.UpdateRepo, VirtualStorage: "virtual-storage-1", @@ -472,8 +475,8 @@ func TestReconciler(t *testing.T) { }, }, }, - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + jobs: jobs{ { Change: datastore.UpdateRepo, VirtualStorage: "virtual-storage-1", @@ -503,8 +506,8 @@ func TestReconciler(t *testing.T) { }, }, }, - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + jobs: jobs{ { Change: datastore.DeleteReplica, VirtualStorage: "virtual-storage-1", @@ -526,17 +529,15 @@ func TestReconciler(t *testing.T) { }, }, }, - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + oneOf: true, + jobs: jobs{ { Change: datastore.DeleteReplica, VirtualStorage: "virtual-storage-1", RelativePath: "relative-path-1", TargetNodeStorage: "storage-2", - }, - }, - { - { + }, { Change: datastore.DeleteReplica, VirtualStorage: "virtual-storage-1", RelativePath: "relative-path-1", @@ -568,8 +569,8 @@ func TestReconciler(t *testing.T) { }, }, }, - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + jobs: jobs{ { Change: datastore.DeleteReplica, VirtualStorage: "virtual-storage-1", @@ -590,8 +591,8 @@ func TestReconciler(t *testing.T) { }, }, }, - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + jobs: jobs{ { Change: datastore.UpdateRepo, VirtualStorage: "virtual-storage-1", @@ -625,8 +626,8 @@ func TestReconciler(t *testing.T) { }, }, }, - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + jobs: jobs{ { Change: datastore.UpdateRepo, VirtualStorage: "virtual-storage-1", @@ -822,8 +823,8 @@ func TestReconciler(t *testing.T) { }, }, }, - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + jobs: jobs{ { Change: datastore.DeleteReplica, VirtualStorage: "virtual-storage-1", @@ -1010,8 +1011,8 @@ func TestReconciler(t *testing.T) { }, }, }, - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + jobs: jobs{ { Change: datastore.DeleteReplica, VirtualStorage: "virtual-storage-1", @@ -1026,24 +1027,26 @@ func TestReconciler(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - db.TruncateAll(t) + db.Truncate(t, "replication_queue") // set up the repository generation records expected by the test case rs := datastore.NewPostgresRepositoryStore(db, configuredStorages) for virtualStorage, relativePaths := range tc.repositories { for relativePath, storages := range relativePaths { var repositoryID int64 - repoCreated := false for storage, repo := range storages { if repo.generation >= 0 { - if !repoCreated { - repoCreated = true - + if repositoryID == 0 { var err error repositoryID, err = rs.ReserveRepositoryID(ctx, virtualStorage, relativePath) require.NoError(t, err) require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, relativePath, storage, nil, nil, false, false)) + + defer func(virtualStorage, relativePath string) { + _, _, err := rs.DeleteRepository(ctx, virtualStorage, relativePath) + require.NoError(t, err) + }(virtualStorage, relativePath) } require.NoError(t, rs.SetGeneration(ctx, repositoryID, storage, repo.generation)) @@ -1052,10 +1055,12 @@ func TestReconciler(t *testing.T) { for storage, repo := range storages { if repo.assigned { - _, err := db.ExecContext(ctx, ` - INSERT INTO repository_assignments (repository_id, virtual_storage, relative_path, storage) - VALUES ($1, $2, $3, $4) - `, repositoryID, virtualStorage, relativePath, storage) + _, err := db.ExecContext( + ctx, + `INSERT INTO repository_assignments (repository_id, virtual_storage, relative_path, storage) + VALUES ($1, $2, $3, $4)`, + repositoryID, virtualStorage, relativePath, storage, + ) require.NoError(t, err) } } @@ -1122,15 +1127,13 @@ func TestReconciler(t *testing.T) { } // Fill the expected reconciliation jobs with generated repository IDs. - for _, jobs := range tc.reconciliationJobs { - for i, job := range jobs { - id, err := rs.GetRepositoryID(ctx, job.VirtualStorage, job.RelativePath) - if err != nil { - require.Equal(t, commonerr.NewRepositoryNotFoundError(job.VirtualStorage, job.RelativePath), err) - } - - jobs[i].RepositoryID = id + for i, job := range tc.reconciliationJobCondition.jobs { + id, err := rs.GetRepositoryID(ctx, job.VirtualStorage, job.RelativePath) + if err != nil { + require.Equal(t, commonerr.NewRepositoryNotFoundError(job.VirtualStorage, job.RelativePath), err) } + + tc.reconciliationJobCondition.jobs[i].RepositoryID = id } // run reconcile in two concurrent transactions to ensure everything works @@ -1162,7 +1165,7 @@ func TestReconciler(t *testing.T) { require.NoError(t, err) defer rows.Close() - actualJobs := make([]datastore.ReplicationJob, 0, len(tc.reconciliationJobs)) + actualJobs := make(jobs, 0, len(tc.reconciliationJobCondition.jobs)) for rows.Next() { var job datastore.ReplicationJob var meta datastore.Params @@ -1172,22 +1175,11 @@ func TestReconciler(t *testing.T) { } require.NoError(t, rows.Err()) - - expectedJobs := tc.reconciliationJobs - if expectedJobs == nil { - // If the test case defined there are no jobs to be produced, just set an empty slice so the - // require.Contains matches the empty set. - expectedJobs = jobs{{}} + if tc.reconciliationJobCondition.oneOf { + require.Subset(t, tc.reconciliationJobCondition.jobs, actualJobs) + } else { + require.ElementsMatch(t, tc.reconciliationJobCondition.jobs, actualJobs) } - - // Sort the jobs so the require.Contains works below, the order of the produced jobs is not important. - for _, jobs := range append(expectedJobs, actualJobs) { - sort.Slice(jobs, func(i, j int) bool { - return jobs[i].VirtualStorage+jobs[i].RelativePath < jobs[j].VirtualStorage+jobs[j].RelativePath - }) - } - - require.Contains(t, expectedJobs, actualJobs) }) } } diff --git a/internal/praefect/remove_repository_test.go b/internal/praefect/remove_repository_test.go index 7ca6abf00896e474a57d618a2c28c089e202d597..4fb1f49534890c138a698787f372832bdd727130 100644 --- a/internal/praefect/remove_repository_test.go +++ b/internal/praefect/remove_repository_test.go @@ -58,7 +58,8 @@ func TestRemoveRepositoryHandler(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) const gitaly1Storage = "gitaly-1" gitaly1Cfg := testcfg.Build(t, testcfg.WithStorages(gitaly1Storage)) @@ -84,7 +85,7 @@ func TestRemoveRepositoryHandler(t *testing.T) { gittest.Exec(t, gitaly1Cfg, "init", "--bare", repoPath) } - rs := datastore.NewPostgresRepositoryStore(db, cfg.StorageNames()) + rs := datastore.NewPostgresRepositoryStore(tx, cfg.StorageNames()) ctx, cancel := testhelper.Context() defer cancel() diff --git a/internal/praefect/replicator_pg_test.go b/internal/praefect/replicator_pg_test.go index c5d23154d6be291a3352d61f4f4b5aa6db6d4887..7e1d3bc1e42842ff188a37219683216a4cc9cccd 100644 --- a/internal/praefect/replicator_pg_test.go +++ b/internal/praefect/replicator_pg_test.go @@ -51,7 +51,9 @@ func TestReplicatorInvalidSourceRepository(t *testing.T) { rs := datastore.NewPostgresRepositoryStore(glsql.NewDB(t), nil) - require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path-1", "gitaly-1", nil, nil, true, false)) + id, err := rs.ReserveRepositoryID(ctx, "virtual-storage-1", "relative-path-1") + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id, "virtual-storage-1", "relative-path-1", "gitaly-1", nil, nil, true, false)) exists, err := rs.RepositoryExists(ctx, "virtual-storage-1", "relative-path-1") require.NoError(t, err) @@ -60,7 +62,7 @@ func TestReplicatorInvalidSourceRepository(t *testing.T) { r := &defaultReplicator{rs: rs, log: testhelper.DiscardTestLogger(t)} require.NoError(t, r.Replicate(ctx, datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ - RepositoryID: 1, + RepositoryID: id, VirtualStorage: "virtual-storage-1", RelativePath: "relative-path-1", SourceNodeStorage: "gitaly-1", @@ -84,14 +86,17 @@ func TestReplicatorDestroy(t *testing.T) { {change: datastore.DeleteRepo}, } { t.Run(string(tc.change), func(t *testing.T) { - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) - rs := datastore.NewPostgresRepositoryStore(db, nil) + rs := datastore.NewPostgresRepositoryStore(tx, nil) ctx, cancel := testhelper.Context() defer cancel() - require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path-1", "storage-1", []string{"storage-2"}, nil, false, false)) + id, err := rs.ReserveRepositoryID(ctx, "virtual-storage-1", "relative-path-1") + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id, "virtual-storage-1", "relative-path-1", "storage-1", []string{"storage-2"}, nil, false, false)) ln, err := net.Listen("tcp", "localhost:0") require.NoError(t, err) diff --git a/internal/praefect/repocleaner/repository_test.go b/internal/praefect/repocleaner/repository_test.go index 4cbed8cae7830c7681cae0b0ac1830370fdc769d..a78803b3af331695bd64938e5a5a7ba51fe554f9 100644 --- a/internal/praefect/repocleaner/repository_test.go +++ b/internal/praefect/repocleaner/repository_test.go @@ -89,7 +89,7 @@ func TestRunner_Run(t *testing.T) { defer cancel() repoStore := datastore.NewPostgresRepositoryStore(db.DB, nil) - for i, set := range []struct { + for _, set := range []struct { relativePath string primary string secondaries []string @@ -110,7 +110,9 @@ func TestRunner_Run(t *testing.T) { secondaries: []string{storage2, storage3}, }, } { - require.NoError(t, repoStore.CreateRepository(ctx, int64(i), conf.VirtualStorages[0].Name, set.relativePath, set.primary, set.secondaries, nil, false, false)) + id, err := repoStore.ReserveRepositoryID(ctx, conf.VirtualStorages[0].Name, set.relativePath) + require.NoError(t, err) + require.NoError(t, repoStore.CreateRepository(ctx, id, conf.VirtualStorages[0].Name, set.relativePath, set.primary, set.secondaries, nil, false, false)) } logger, loggerHook := test.NewNullLogger() diff --git a/internal/praefect/repository_exists_test.go b/internal/praefect/repository_exists_test.go index 39a25a71d9d82c3f9b70727b9f7805f13d1d1aaf..584fe496798357ce91d7078300679a195f2d946c 100644 --- a/internal/praefect/repository_exists_test.go +++ b/internal/praefect/repository_exists_test.go @@ -68,8 +68,10 @@ func TestRepositoryExistsHandler(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - db.TruncateAll(t) - rs := datastore.NewPostgresRepositoryStore(db, map[string][]string{"virtual-storage": {"storage"}}) + tx := db.Begin(t) + defer tx.Rollback(t) + + rs := datastore.NewPostgresRepositoryStore(tx, map[string][]string{"virtual-storage": {"storage"}}) ctx, cancel := testhelper.Context() defer cancel() diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go index 9eab31307441e5bf4a64fff8852c623b36f83b43..c23abea2c7e221400f8494f637d18aea1b370547 100644 --- a/internal/praefect/router_per_repository_test.go +++ b/internal/praefect/router_per_repository_test.go @@ -445,6 +445,8 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { return actual.Secondaries[i].Storage < actual.Secondaries[j].Storage }) sort.Strings(actual.ReplicationTargets) + // Hard code it here as there is no way we can predict the value of the sequence + actual.RepositoryID = 1 require.Contains(t, expected, actual) } } @@ -601,12 +603,15 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) - rs := datastore.NewPostgresRepositoryStore(db, nil) + rs := datastore.NewPostgresRepositoryStore(tx, nil) if tc.repositoryExists { + id, err := rs.ReserveRepositoryID(ctx, "virtual-storage-1", "relative-path") + require.NoError(t, err) require.NoError(t, - rs.CreateRepository(ctx, 1, "virtual-storage-1", relativePath, "primary", nil, nil, true, true), + rs.CreateRepository(ctx, id, "virtual-storage-1", relativePath, "primary", nil, nil, true, true), ) } diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index c635b811fc5de29f9a720d3f29b70418d1f1d895..77f371a4d01f1928990d744c57606cc73024e416 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -639,7 +639,9 @@ func TestRenameRepository(t *testing.T) { defer tx.Rollback(t) rs := datastore.NewPostgresRepositoryStore(tx, nil) - require.NoError(t, rs.CreateRepository(ctx, 1, "praefect", repo.RelativePath, "gitaly-1", []string{"gitaly-2", "gitaly-3"}, nil, true, false)) + repositoryID, err := rs.ReserveRepositoryID(ctx, "praefect", repo.RelativePath) + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, repositoryID, "praefect", repo.RelativePath, "gitaly-1", []string{"gitaly-2", "gitaly-3"}, nil, true, false)) nodeSet, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, nil, nil) require.NoError(t, err) diff --git a/internal/testhelper/db.go b/internal/testhelper/db.go index 130e39950ff9d35ae0b611c9b6968b47a91541e5..a49cfd49e658a826fdad43bf3fa64888ba723741 100644 --- a/internal/testhelper/db.go +++ b/internal/testhelper/db.go @@ -3,6 +3,7 @@ package testhelper import ( "context" "testing" + "time" "github.com/lib/pq" "github.com/stretchr/testify/require" @@ -37,15 +38,16 @@ SELECT unnest($1::text[]) AS praefect_name, unnest($2::text[]) AS shard_name, unnest($3::text[]) AS node_name, - NOW() AS last_contact_attempt_at, - NOW() AS last_seen_active_at + $4 AS last_contact_attempt_at, + $4 AS last_seen_active_at ON CONFLICT (praefect_name, shard_name, node_name) DO UPDATE SET - last_contact_attempt_at = NOW(), - last_seen_active_at = NOW() + last_contact_attempt_at = $4, + last_seen_active_at = $4 `, pq.StringArray(praefects), pq.StringArray(virtualStorages), pq.StringArray(storages), + time.Now().UTC(), ) require.NoError(t, err) }