From 4dfeec9150239893f1de8a94d972af8624542e2c Mon Sep 17 00:00:00 2001 From: Pavlo Strokov Date: Sun, 17 Oct 2021 19:56:03 +0300 Subject: [PATCH 01/10] sql: Provide timestamp values as argument to queries We don't have a standard way of using timestamps in SQL statements. We also use a time interval subtraction technic that is pretty fragile. With that change we extract time manipulation from SQL statements into Go code and pass already prepared timestamps as parameters for the queries. This also helps to solve problem of inability to get different timestamps if SQL statements executed in single transaction. It would be needed in the upcoming changes. --- ...211018081858_healthy_storages_view_time.go | 44 +++++++++++++++++++ internal/praefect/datastore/queue.go | 24 +++++----- .../praefect/datastore/storage_cleanup.go | 17 +++---- internal/praefect/nodes/health_manager.go | 6 ++- internal/praefect/nodes/sql_elector.go | 36 ++++++++------- internal/testhelper/db.go | 10 +++-- 6 files changed, 94 insertions(+), 43 deletions(-) create mode 100644 internal/praefect/datastore/migrations/20211018081858_healthy_storages_view_time.go diff --git a/internal/praefect/datastore/migrations/20211018081858_healthy_storages_view_time.go b/internal/praefect/datastore/migrations/20211018081858_healthy_storages_view_time.go new file mode 100644 index 00000000000..fe9fd65b511 --- /dev/null +++ b/internal/praefect/datastore/migrations/20211018081858_healthy_storages_view_time.go @@ -0,0 +1,44 @@ +package migrations + +import migrate "github.com/rubenv/sql-migrate" + +func init() { + m := &migrate.Migration{ + Id: "20211018081858_healthy_storages_view_time", + Up: []string{ + // Re-create view to update usage of the timestamps. + // We should always rely on the UTC time zone to make timestamps consistent + // throughout the schema. + ` +CREATE OR REPLACE VIEW healthy_storages AS + SELECT shard_name AS virtual_storage, node_name AS storage + FROM node_status AS ns + WHERE last_seen_active_at >= CLOCK_TIMESTAMP() AT TIME ZONE 'UTC' - INTERVAL '10 SECOND' + GROUP BY shard_name, node_name + HAVING COUNT(praefect_name) >= ( + SELECT CEIL(COUNT(DISTINCT praefect_name) / 2.0) AS quorum_count + FROM node_status + WHERE shard_name = ns.shard_name + AND last_contact_attempt_at >= CLOCK_TIMESTAMP() AT TIME ZONE 'UTC' - INTERVAL '60 SECOND' + ) + ORDER BY shard_name, node_name`, + }, + Down: []string{ + ` +CREATE OR REPLACE VIEW healthy_storages AS + SELECT shard_name AS virtual_storage, node_name AS storage + FROM node_status AS ns + WHERE last_seen_active_at >= NOW() - INTERVAL '10 SECOND' + GROUP BY shard_name, node_name + HAVING COUNT(praefect_name) >= ( + SELECT CEIL(COUNT(DISTINCT praefect_name) / 2.0) AS quorum_count + FROM node_status + WHERE shard_name = ns.shard_name + AND last_contact_attempt_at >= NOW() - INTERVAL '60 SECOND' + ) + ORDER BY shard_name, node_name`, + }, + } + + allMigrations = append(allMigrations, m) +} diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go index e00affd9dbb..601fcaa50a0 100644 --- a/internal/praefect/datastore/queue.go +++ b/internal/praefect/datastore/queue.go @@ -215,12 +215,12 @@ func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event Repli ON CONFLICT (id) DO UPDATE SET id = EXCLUDED.id RETURNING id ) - INSERT INTO replication_queue(lock_id, job, meta) - SELECT insert_lock.id, $4, $5 + INSERT INTO replication_queue(lock_id, job, meta, created_at) + SELECT insert_lock.id, $4, $5, $6 FROM insert_lock RETURNING id, state, created_at, updated_at, lock_id, attempt, job, meta` // this will always return a single row result (because of lock uniqueness) or an error - rows, err := rq.qc.QueryContext(ctx, query, event.Job.VirtualStorage, event.Job.TargetNodeStorage, event.Job.RelativePath, event.Job, event.Meta) + rows, err := rq.qc.QueryContext(ctx, query, event.Job.VirtualStorage, event.Job.TargetNodeStorage, event.Job.RelativePath, event.Job, event.Meta, time.Now().UTC()) if err != nil { return ReplicationEvent{}, fmt.Errorf("query: %w", err) } @@ -280,14 +280,14 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor UPDATE replication_queue AS queue SET attempt = CASE WHEN job->>'change' = 'delete_replica' THEN queue.attempt ELSE queue.attempt - 1 END , state = 'in_progress' - , updated_at = NOW() AT TIME ZONE 'UTC' + , updated_at = $4 FROM candidate WHERE queue.id = candidate.id RETURNING queue.id, queue.state, queue.created_at, queue.updated_at, queue.lock_id, queue.attempt, queue.job, queue.meta ) , track_job_lock AS ( INSERT INTO replication_queue_job_lock (job_id, lock_id, triggered_at) - SELECT job.id, job.lock_id, NOW() AT TIME ZONE 'UTC' + SELECT job.id, job.lock_id, $4 FROM job RETURNING lock_id ) @@ -300,7 +300,7 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor SELECT id, state, created_at, updated_at, lock_id, attempt, job, meta FROM job ORDER BY id` - rows, err := rq.qc.QueryContext(ctx, query, virtualStorage, nodeStorage, count) + rows, err := rq.qc.QueryContext(ctx, query, virtualStorage, nodeStorage, count, time.Now().UTC()) if err != nil { return nil, fmt.Errorf("query: %w", err) } @@ -379,7 +379,7 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J , updated AS ( UPDATE replication_queue AS queue SET state = $2::REPLICATION_JOB_STATE, - updated_at = NOW() AT TIME ZONE 'UTC' + updated_at = $3 FROM existing WHERE existing.id = queue.id RETURNING queue.id, queue.lock_id @@ -406,7 +406,7 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J ) SELECT id FROM existing` - rows, err := rq.qc.QueryContext(ctx, query, pqIDs, state) + rows, err := rq.qc.QueryContext(ctx, query, pqIDs, state, time.Now().UTC()) if err != nil { return nil, fmt.Errorf("query: %w", err) } @@ -438,7 +438,7 @@ func (rq PostgresReplicationEventQueue) StartHealthUpdate(ctx context.Context, t query := ` UPDATE replication_queue_job_lock - SET triggered_at = NOW() AT TIME ZONE 'UTC' + SET triggered_at = $3 WHERE (job_id, lock_id) IN (SELECT UNNEST($1::BIGINT[]), UNNEST($2::TEXT[]))` for { @@ -446,7 +446,7 @@ func (rq PostgresReplicationEventQueue) StartHealthUpdate(ctx context.Context, t case <-ctx.Done(): return nil case <-trigger: - res, err := rq.qc.ExecContext(ctx, query, jobIDs, lockIDs) + res, err := rq.qc.ExecContext(ctx, query, jobIDs, lockIDs, time.Now().UTC()) if err != nil { if pqError, ok := err.(*pq.Error); ok && pqError.Code.Name() == "query_canceled" { return nil @@ -479,7 +479,7 @@ func (rq PostgresReplicationEventQueue) StartHealthUpdate(ctx context.Context, t func (rq PostgresReplicationEventQueue) AcknowledgeStale(ctx context.Context, staleAfter time.Duration) error { query := ` WITH stale_job_lock AS ( - DELETE FROM replication_queue_job_lock WHERE triggered_at < NOW() AT TIME ZONE 'UTC' - INTERVAL '1 MILLISECOND' * $1 + DELETE FROM replication_queue_job_lock WHERE triggered_at < $1 RETURNING job_id, lock_id ) , update_job AS ( @@ -505,7 +505,7 @@ func (rq PostgresReplicationEventQueue) AcknowledgeStale(ctx context.Context, st GROUP BY lock_id ) AS existing ON removed.lock_id = existing.lock_id AND removed.amount = existing.amount )` - _, err := rq.qc.ExecContext(ctx, query, staleAfter.Milliseconds()) + _, err := rq.qc.ExecContext(ctx, query, time.Now().UTC().Add(-staleAfter)) if err != nil { return fmt.Errorf("exec acknowledge stale: %w", err) } diff --git a/internal/praefect/datastore/storage_cleanup.go b/internal/praefect/datastore/storage_cleanup.go index adc43401bdc..2041828b1fc 100644 --- a/internal/praefect/datastore/storage_cleanup.go +++ b/internal/praefect/datastore/storage_cleanup.go @@ -68,22 +68,23 @@ func (ss *StorageCleanup) Populate(ctx context.Context, virtualStorage, storage // acquired storage. It updates last_run column of the entry on execution. func (ss *StorageCleanup) AcquireNextStorage(ctx context.Context, inactive, updatePeriod time.Duration) (*ClusterPath, func() error, error) { var entry ClusterPath + now := time.Now().UTC() if err := ss.db.QueryRowContext( ctx, `UPDATE storage_cleanups - SET triggered_at = NOW() + SET triggered_at = $3 WHERE (virtual_storage, storage) IN ( SELECT virtual_storage, storage FROM storage_cleanups WHERE - COALESCE(last_run, TO_TIMESTAMP(0)) <= (NOW() - INTERVAL '1 MILLISECOND' * $1) - AND COALESCE(triggered_at, TO_TIMESTAMP(0)) <= (NOW() - INTERVAL '1 MILLISECOND' * $2) + COALESCE(last_run, TO_TIMESTAMP(0)) <= $1 + AND COALESCE(triggered_at, TO_TIMESTAMP(0)) <= $2 ORDER BY last_run NULLS FIRST, virtual_storage, storage LIMIT 1 FOR UPDATE SKIP LOCKED ) RETURNING virtual_storage, storage`, - inactive.Milliseconds(), updatePeriod.Milliseconds(), + now.Add(-inactive), now.Add(-updatePeriod), now, ).Scan(&entry.VirtualStorage, &entry.Storage); err != nil { if !errors.Is(err, sql.ErrNoRows) { return nil, nil, fmt.Errorf("scan: %w", err) @@ -111,9 +112,9 @@ func (ss *StorageCleanup) AcquireNextStorage(ctx context.Context, inactive, upda if _, err := ss.db.ExecContext( ctx, `UPDATE storage_cleanups - SET triggered_at = NOW() + SET triggered_at = $3 WHERE virtual_storage = $1 AND storage = $2`, - entry.VirtualStorage, entry.Storage, + entry.VirtualStorage, entry.Storage, time.Now().UTC(), ); err != nil { return } @@ -131,9 +132,9 @@ func (ss *StorageCleanup) AcquireNextStorage(ctx context.Context, inactive, upda if _, err := ss.db.ExecContext( ctx, `UPDATE storage_cleanups - SET last_run = NOW(), triggered_at = NULL + SET last_run = $3, triggered_at = NULL WHERE virtual_storage = $1 AND storage = $2`, - entry.VirtualStorage, entry.Storage, + entry.VirtualStorage, entry.Storage, time.Now().UTC(), ); err != nil { return fmt.Errorf("update storage_cleanups: %w", err) } diff --git a/internal/praefect/nodes/health_manager.go b/internal/praefect/nodes/health_manager.go index 502f8ce61a8..d05854158e7 100644 --- a/internal/praefect/nodes/health_manager.go +++ b/internal/praefect/nodes/health_manager.go @@ -140,9 +140,10 @@ func (hm *HealthManager) updateHealthChecks(ctx context.Context) error { hm.locallyHealthy.Store(locallyHealthy) + now := time.Now().UTC() if _, err := hm.db.ExecContext(ctx, ` INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at, last_seen_active_at) -SELECT $1, shard_name, node_name, NOW(), CASE WHEN is_healthy THEN NOW() ELSE NULL END +SELECT $1, shard_name, node_name, $5::TIMESTAMP, CASE WHEN is_healthy THEN $5::TIMESTAMP ELSE NULL END FROM ( SELECT unnest($2::text[]) AS shard_name, unnest($3::text[]) AS node_name, @@ -150,13 +151,14 @@ FROM ( ) AS results ON CONFLICT (praefect_name, shard_name, node_name) DO UPDATE SET - last_contact_attempt_at = NOW(), + last_contact_attempt_at = $5, last_seen_active_at = COALESCE(EXCLUDED.last_seen_active_at, node_status.last_seen_active_at) `, hm.praefectName, pq.StringArray(virtualStorages), pq.StringArray(physicalStorages), pq.BoolArray(healthy), + now, ); err != nil { return fmt.Errorf("update checks: %w", err) } diff --git a/internal/praefect/nodes/sql_elector.go b/internal/praefect/nodes/sql_elector.go index a0007c85506..75df348fdd5 100644 --- a/internal/praefect/nodes/sql_elector.go +++ b/internal/praefect/nodes/sql_elector.go @@ -181,13 +181,14 @@ func (s *sqlElector) checkNodes(ctx context.Context) error { defer s.updateMetrics() + timestamp := time.Now().UTC() for _, n := range s.nodes { wg.Add(1) go func(n Node) { defer wg.Done() result, _ := n.CheckHealth(ctx) - if err := s.updateNode(ctx, tx, n, result); err != nil { + if err := s.updateNode(ctx, tx, n, result, timestamp); err != nil { s.log.WithError(err).WithFields(logrus.Fields{ "storage": n.GetStorage(), "address": n.GetAddress(), @@ -198,7 +199,7 @@ func (s *sqlElector) checkNodes(ctx context.Context) error { wg.Wait() - err = s.validateAndUpdatePrimary(ctx, tx) + err = s.validateAndUpdatePrimary(ctx, tx, timestamp) if err != nil { s.log.WithError(err).Error("unable to validate primary") @@ -243,26 +244,26 @@ func (s *sqlElector) setPrimary(candidate *sqlCandidate) { } } -func (s *sqlElector) updateNode(ctx context.Context, tx *sql.Tx, node Node, result bool) error { +func (s *sqlElector) updateNode(ctx context.Context, tx *sql.Tx, node Node, result bool, timestamp time.Time) error { var q string if result { q = `INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at, last_seen_active_at) -VALUES ($1, $2, $3, NOW(), NOW()) +VALUES ($1, $2, $3, $4, $4) ON CONFLICT (praefect_name, shard_name, node_name) DO UPDATE SET -last_contact_attempt_at = NOW(), -last_seen_active_at = NOW()` +last_contact_attempt_at = $4, +last_seen_active_at = $4` } else { // Omit the last_seen_active_at since we weren't successful at contacting this node q = `INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at) -VALUES ($1, $2, $3, NOW()) +VALUES ($1, $2, $3, $4) ON CONFLICT (praefect_name, shard_name, node_name) DO UPDATE SET -last_contact_attempt_at = NOW()` +last_contact_attempt_at = $4` } - _, err := tx.ExecContext(ctx, q, s.praefectName, s.shardName, node.GetStorage()) + _, err := tx.ExecContext(ctx, q, s.praefectName, s.shardName, node.GetStorage(), timestamp) if err != nil { s.log.Errorf("Error updating node: %s", err) } @@ -315,11 +316,11 @@ func (s *sqlElector) updateMetrics() { func (s *sqlElector) getQuorumCount(ctx context.Context, tx *sql.Tx) (int, error) { // This is crude form of service discovery. Find how many active // Praefect nodes based on whether they attempted to update entries. - q := `SELECT COUNT (DISTINCT praefect_name) FROM node_status WHERE shard_name = $1 AND last_contact_attempt_at >= NOW() - INTERVAL '1 MICROSECOND' * $2` + q := `SELECT COUNT (DISTINCT praefect_name) FROM node_status WHERE shard_name = $1 AND last_contact_attempt_at >= $2` var totalCount int - if err := tx.QueryRowContext(ctx, q, s.shardName, activePraefectTimeout.Microseconds()).Scan(&totalCount); err != nil { + if err := tx.QueryRowContext(ctx, q, s.shardName, time.Now().UTC().Add(-activePraefectTimeout)).Scan(&totalCount); err != nil { return 0, fmt.Errorf("error retrieving quorum count: %w", err) } @@ -427,16 +428,17 @@ func (s *sqlElector) electNewPrimary(ctx context.Context, tx *sql.Tx, candidates // N1 (RW) -> N2 (RO) -> N3 (RW): `previous_writable_primary` is N1 as N2 was not write-enabled before the second // failover and thus any data missing from N3 must be on N1. q = `INSERT INTO shard_primaries (elected_by_praefect, shard_name, node_name, elected_at) - SELECT $1::VARCHAR, $2::VARCHAR, $3::VARCHAR, NOW() + SELECT $1::VARCHAR, $2::VARCHAR, $3::VARCHAR, $5 WHERE $3 != COALESCE((SELECT node_name FROM shard_primaries WHERE shard_name = $2::VARCHAR AND demoted = false), '') ON CONFLICT (shard_name) DO UPDATE SET elected_by_praefect = EXCLUDED.elected_by_praefect , node_name = EXCLUDED.node_name , elected_at = EXCLUDED.elected_at , demoted = false - WHERE shard_primaries.elected_at < now() - INTERVAL '1 MICROSECOND' * $4 + WHERE shard_primaries.elected_at < $4 ` - _, err := tx.ExecContext(ctx, q, s.praefectName, s.shardName, newPrimaryStorage, s.failoverTimeout.Microseconds()) + now := time.Now().UTC() + _, err := tx.ExecContext(ctx, q, s.praefectName, s.shardName, newPrimaryStorage, now.Add(-s.failoverTimeout), now) if err != nil { s.log.Errorf("error updating new primary: %s", err) return err @@ -445,7 +447,7 @@ func (s *sqlElector) electNewPrimary(ctx context.Context, tx *sql.Tx, candidates return nil } -func (s *sqlElector) validateAndUpdatePrimary(ctx context.Context, tx *sql.Tx) error { +func (s *sqlElector) validateAndUpdatePrimary(ctx context.Context, tx *sql.Tx, timestamp time.Time) error { quorumCount, err := s.getQuorumCount(ctx, tx) if err != nil { return err @@ -453,12 +455,12 @@ func (s *sqlElector) validateAndUpdatePrimary(ctx context.Context, tx *sql.Tx) e // Retrieves candidates, ranked by the ones that are the most active q := `SELECT node_name FROM node_status - WHERE shard_name = $1 AND last_seen_active_at >= NOW() - INTERVAL '1 MICROSECOND' * $2 + WHERE shard_name = $1 AND last_seen_active_at >= $2 GROUP BY node_name HAVING COUNT(praefect_name) >= $3 ORDER BY COUNT(node_name) DESC, node_name ASC` - rows, err := tx.QueryContext(ctx, q, s.shardName, s.failoverTimeout.Microseconds(), quorumCount) + rows, err := tx.QueryContext(ctx, q, s.shardName, timestamp.Add(-s.failoverTimeout), quorumCount) if err != nil { return fmt.Errorf("error retrieving candidates: %w", err) } diff --git a/internal/testhelper/db.go b/internal/testhelper/db.go index 130e39950ff..a49cfd49e65 100644 --- a/internal/testhelper/db.go +++ b/internal/testhelper/db.go @@ -3,6 +3,7 @@ package testhelper import ( "context" "testing" + "time" "github.com/lib/pq" "github.com/stretchr/testify/require" @@ -37,15 +38,16 @@ SELECT unnest($1::text[]) AS praefect_name, unnest($2::text[]) AS shard_name, unnest($3::text[]) AS node_name, - NOW() AS last_contact_attempt_at, - NOW() AS last_seen_active_at + $4 AS last_contact_attempt_at, + $4 AS last_seen_active_at ON CONFLICT (praefect_name, shard_name, node_name) DO UPDATE SET - last_contact_attempt_at = NOW(), - last_seen_active_at = NOW() + last_contact_attempt_at = $4, + last_seen_active_at = $4 `, pq.StringArray(praefects), pq.StringArray(virtualStorages), pq.StringArray(storages), + time.Now().UTC(), ) require.NoError(t, err) } -- GitLab From 942c72a2db396711f6685c85109b4d560fcdae88 Mon Sep 17 00:00:00 2001 From: Pavlo Strokov Date: Mon, 18 Oct 2021 16:04:34 +0300 Subject: [PATCH 02/10] sql: NewStorageCleanup accepts Querier Change *sql.DB to glsql.Querier to get more flexibility in testing for the StorageCleanup. --- internal/praefect/datastore/storage_cleanup.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/praefect/datastore/storage_cleanup.go b/internal/praefect/datastore/storage_cleanup.go index 2041828b1fc..01e7f58680d 100644 --- a/internal/praefect/datastore/storage_cleanup.go +++ b/internal/praefect/datastore/storage_cleanup.go @@ -9,6 +9,7 @@ import ( "github.com/lib/pq" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" ) // RepositoryClusterPath identifies location of the repository in the cluster. @@ -38,13 +39,13 @@ type ClusterPath struct { } // NewStorageCleanup initialises and returns a new instance of the StorageCleanup. -func NewStorageCleanup(db *sql.DB) *StorageCleanup { +func NewStorageCleanup(db glsql.Querier) *StorageCleanup { return &StorageCleanup{db: db} } // StorageCleanup provides methods on the database for the repository cleanup operation. type StorageCleanup struct { - db *sql.DB + db glsql.Querier } // Populate adds storage to the set, so it can be acquired afterwards. -- GitLab From 6a62fc291c4f469df852eaa3b23b40cf5093fb96 Mon Sep 17 00:00:00 2001 From: Pavlo Strokov Date: Mon, 18 Oct 2021 17:22:18 +0300 Subject: [PATCH 03/10] sql: Reset sequences to initial value We started to use sequences in expected results and it is not always possible to predict a correct value for the expected result. To get a determined results we can restart sequence number generators to initial values before test run. This change adds SequenceReset method that does exactly it for all user defined sequences in the database. It will be used in upcoming changes. --- internal/praefect/datastore/glsql/testing.go | 22 ++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/internal/praefect/datastore/glsql/testing.go b/internal/praefect/datastore/glsql/testing.go index 91557698565..616e1f967ca 100644 --- a/internal/praefect/datastore/glsql/testing.go +++ b/internal/praefect/datastore/glsql/testing.go @@ -117,6 +117,28 @@ func (db DB) Close() error { return nil } +// SequenceReset restarts all sequences in the database. +func (db DB) SequenceReset(t *testing.T) { + t.Helper() + prov := &StringProvider{} + rows, err := db.DB.Query( + `SELECT S.relname + FROM pg_class AS S, pg_depend AS D, pg_class AS T, pg_attribute AS C + WHERE S.relkind = 'S' + AND S.oid = D.objid + AND D.refobjid = T.oid + AND D.refobjid = C.attrelid + AND D.refobjsubid = C.attnum`, + ) + require.NoError(t, err) + require.NoError(t, ScanAll(rows, prov)) + + for _, seqName := range prov.Values() { + _, err := db.DB.Exec(`ALTER SEQUENCE ` + seqName + ` RESTART`) + require.NoError(t, err) + } +} + // NewDB returns a wrapper around the database connection pool. // Must be used only for testing. // The new database with empty relations will be created for each call of this function. -- GitLab From 1a2a24e98b6769834ee36eaacab60e1b3237a684 Mon Sep 17 00:00:00 2001 From: Pavlo Strokov Date: Tue, 19 Oct 2021 10:51:54 +0300 Subject: [PATCH 04/10] sql: Do not truncate all tables As we are gonna to pre-populate database with some data and run tests on top of it we should not truncate full database before each test run. The 'TruncateAll' method calls were replaced with usage of transactions where it is possible and a manual cleanup after test completion where it is not possible to rely on the transactions. To support transaction SQL type some helper functions were refactored to accept 'Querier' interface instead of 'DB' type and a 'Context' that is required by the interface. In some cases the 'SequenceReset' method is called because we use pre-defined values for the repository_id column and need to reset sequence because they are not transactional. --- .../subcmd_set_replication_factor_test.go | 7 ++- internal/praefect/coordinator_pg_test.go | 15 ++++- internal/praefect/coordinator_test.go | 12 ++-- .../praefect/datastore/assignment_test.go | 25 ++++---- internal/praefect/datastore/queue_bm_test.go | 16 ++--- internal/praefect/datastore/queue_test.go | 39 ++++++------ .../datastore/repository_store_test.go | 16 +++-- .../datastore/storage_cleanup_test.go | 62 +++++++++++++------ .../datastore/storage_provider_test.go | 58 ++++++++++------- .../praefect/nodes/health_manager_test.go | 11 ++-- .../praefect/nodes/per_repository_test.go | 18 ++++-- .../praefect/reconciler/reconciler_test.go | 11 +++- internal/praefect/remove_repository_test.go | 5 +- internal/praefect/replicator_pg_test.go | 5 +- internal/praefect/repository_exists_test.go | 6 +- .../praefect/router_per_repository_test.go | 6 +- internal/praefect/server_test.go | 4 +- 17 files changed, 200 insertions(+), 116 deletions(-) diff --git a/cmd/praefect/subcmd_set_replication_factor_test.go b/cmd/praefect/subcmd_set_replication_factor_test.go index 197045a55bf..6e33847b4bd 100644 --- a/cmd/praefect/subcmd_set_replication_factor_test.go +++ b/cmd/praefect/subcmd_set_replication_factor_test.go @@ -83,16 +83,17 @@ func TestSetReplicationFactorSubcommand(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) store := tc.store if tc.store == nil { - store = datastore.NewAssignmentStore(db, map[string][]string{"virtual-storage": {"primary", "secondary"}}) + store = datastore.NewAssignmentStore(tx, map[string][]string{"virtual-storage": {"primary", "secondary"}}) } // create a repository record require.NoError(t, - datastore.NewPostgresRepositoryStore(db, nil).CreateRepository(ctx, 1, "virtual-storage", "relative-path", "primary", nil, nil, false, false), + datastore.NewPostgresRepositoryStore(tx, nil).CreateRepository(ctx, 1, "virtual-storage", "relative-path", "primary", nil, nil, false, false), ) ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer( diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go index 20c447ee74a..53b645666f9 100644 --- a/internal/praefect/coordinator_pg_test.go +++ b/internal/praefect/coordinator_pg_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" @@ -40,6 +41,8 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { expectedGeneration int } + db := glsql.NewDB(t) + testcases := []struct { desc string primaryFails bool @@ -153,11 +156,10 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { }, } - db := glsql.NewDB(t) - for _, tc := range testcases { t.Run(tc.desc, func(t *testing.T) { - db.TruncateAll(t) + db.Truncate(t, "replication_queue_job_lock", "replication_queue", "replication_queue_lock") + db.SequenceReset(t) storageNodes := make([]*config.Node, 0, len(tc.nodes)) for i := range tc.nodes { @@ -208,6 +210,13 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { if !repoCreated { repoCreated = true require.NoError(t, rs.CreateRepository(ctx, 1, repo.StorageName, repo.RelativePath, storageNodes[i].Storage, nil, nil, true, false)) + defer func() { + repositoryStore := datastore.NewPostgresRepositoryStore(db, nil) + _, _, err := repositoryStore.DeleteRepository(ctx, repo.StorageName, repo.RelativePath) + if !errors.As(err, &commonerr.RepositoryNotFoundError{}) { + require.NoError(t, err) + } + }() } require.NoError(t, rs.SetGeneration(ctx, 1, storageNodes[i].Storage, n.generation)) diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index b9e4230d2b8..a7a7561cd32 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -74,7 +74,8 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) { {desc: "read-only", readOnly: true}, } { t.Run(tc.desc, func(t *testing.T) { - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) const ( virtualStorage = "test-virtual-storage" @@ -108,7 +109,7 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) { } coordinator := NewCoordinator( - datastore.NewPostgresReplicationEventQueue(db), + datastore.NewPostgresReplicationEventQueue(tx), rs, NewNodeManagerRouter(&nodes.MockManager{GetShardFunc: func(vs string) (nodes.Shard, error) { require.Equal(t, virtualStorage, vs) @@ -198,7 +199,7 @@ func TestStreamDirectorMutator(t *testing.T) { } testhelper.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": conf.StorageNames()}) - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(tx)) queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { assert.True(t, len(queueInterceptor.GetEnqueued()) < 2, "expected only one event to be created") return queue.Enqueue(ctx, event) @@ -821,7 +822,8 @@ func TestStreamDirector_repo_creation(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) primaryNode := &config.Node{Storage: "praefect-internal-1"} healthySecondaryNode := &config.Node{Storage: "praefect-internal-2"} unhealthySecondaryNode := &config.Node{Storage: "praefect-internal-3"} @@ -924,7 +926,7 @@ func TestStreamDirector_repo_creation(t *testing.T) { } txMgr := transactions.NewManager(conf) - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(tx)) coordinator := NewCoordinator( queueInterceptor, diff --git a/internal/praefect/datastore/assignment_test.go b/internal/praefect/datastore/assignment_test.go index 3062ab12541..12409a6e3d9 100644 --- a/internal/praefect/datastore/assignment_test.go +++ b/internal/praefect/datastore/assignment_test.go @@ -79,9 +79,10 @@ func TestAssignmentStore_GetHostAssignments(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) - rs := NewPostgresRepositoryStore(db, nil) + rs := NewPostgresRepositoryStore(tx, nil) for _, assignment := range tc.existingAssignments { repositoryID, err := rs.GetRepositoryID(ctx, assignment.virtualStorage, assignment.relativePath) if errors.Is(err, commonerr.NewRepositoryNotFoundError(assignment.virtualStorage, assignment.relativePath)) { @@ -104,7 +105,7 @@ func TestAssignmentStore_GetHostAssignments(t *testing.T) { } actualAssignments, err := NewAssignmentStore( - db, + tx, map[string][]string{"virtual-storage": configuredStorages}, ).GetHostAssignments(ctx, tc.virtualStorage, repositoryID) require.Equal(t, tc.error, err) @@ -210,26 +211,28 @@ func TestAssignmentStore_SetReplicationFactor(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) configuredStorages := map[string][]string{"virtual-storage": {"primary", "secondary-1", "secondary-2"}} if !tc.nonExistentRepository { - _, err := db.ExecContext(ctx, ` + _, err := tx.ExecContext(ctx, ` INSERT INTO repositories (virtual_storage, relative_path, "primary", repository_id) - VALUES ('virtual-storage', 'relative-path', 'primary', 1) + VALUES ('virtual-storage', 'relative-path', 'primary', 1000) `) require.NoError(t, err) } for _, storage := range tc.existingAssignments { - _, err := db.ExecContext(ctx, ` - INSERT INTO repository_assignments VALUES ('virtual-storage', 'relative-path', $1, 1) + _, err := tx.ExecContext(ctx, ` + INSERT INTO repository_assignments (virtual_storage, relative_path, storage, repository_id) + VALUES ('virtual-storage', 'relative-path', $1, 1000) `, storage) require.NoError(t, err) } - store := NewAssignmentStore(db, configuredStorages) + store := NewAssignmentStore(tx, configuredStorages) setStorages, err := store.SetReplicationFactor(ctx, "virtual-storage", "relative-path", tc.replicationFactor) require.Equal(t, tc.error, err) @@ -246,10 +249,10 @@ func TestAssignmentStore_SetReplicationFactor(t *testing.T) { tc.requireStorages(t, assignedStorages) var storagesWithIncorrectRepositoryID pq.StringArray - require.NoError(t, db.QueryRowContext(ctx, ` + require.NoError(t, tx.QueryRowContext(ctx, ` SELECT array_agg(storage) FROM repository_assignments - WHERE COALESCE(repository_id != 1, true) + WHERE COALESCE(repository_id != 1000, true) `).Scan(&storagesWithIncorrectRepositoryID)) require.Empty(t, storagesWithIncorrectRepositoryID) }) diff --git a/internal/praefect/datastore/queue_bm_test.go b/internal/praefect/datastore/queue_bm_test.go index 8fea252587c..1f7a20d38b5 100644 --- a/internal/praefect/datastore/queue_bm_test.go +++ b/internal/praefect/datastore/queue_bm_test.go @@ -9,22 +9,22 @@ import ( ) func BenchmarkPostgresReplicationEventQueue_Acknowledge(b *testing.B) { - // go test -tags=postgres -test.run=~ -test.bench=BenchmarkPostgresReplicationEventQueue_Acknowledge/small -benchtime=1000x gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore + // go test -test.run=~ -test.bench=BenchmarkPostgresReplicationEventQueue_Acknowledge/small -benchtime=1000x gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore b.Run("small", func(b *testing.B) { benchmarkPostgresReplicationEventQueueAcknowledge(b, map[JobState]int{JobStateReady: 10, JobStateInProgress: 10, JobStateFailed: 10}) }) - // go test -tags=postgres -test.run=~ -test.bench=BenchmarkPostgresReplicationEventQueue_Acknowledge/medium -benchtime=100x gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore + // go test -test.run=~ -test.bench=BenchmarkPostgresReplicationEventQueue_Acknowledge/medium -benchtime=100x gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore b.Run("medium", func(b *testing.B) { benchmarkPostgresReplicationEventQueueAcknowledge(b, map[JobState]int{JobStateReady: 1_000, JobStateInProgress: 100, JobStateFailed: 100}) }) - // go test -tags=postgres -test.run=~ -test.bench=BenchmarkPostgresReplicationEventQueue_Acknowledge/big -benchtime=10x gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore + // go test -test.run=~ -test.bench=BenchmarkPostgresReplicationEventQueue_Acknowledge/big -benchtime=10x gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore b.Run("big", func(b *testing.B) { benchmarkPostgresReplicationEventQueueAcknowledge(b, map[JobState]int{JobStateReady: 100_000, JobStateInProgress: 100, JobStateFailed: 100}) }) - // go test -tags=postgres -test.run=~ -test.bench=BenchmarkPostgresReplicationEventQueue_Acknowledge/huge -benchtime=1x gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore + // go test -test.run=~ -test.bench=BenchmarkPostgresReplicationEventQueue_Acknowledge/huge -benchtime=1x gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore b.Run("huge", func(b *testing.B) { benchmarkPostgresReplicationEventQueueAcknowledge(b, map[JobState]int{JobStateReady: 1_000_000, JobStateInProgress: 100, JobStateFailed: 100}) }) @@ -36,7 +36,7 @@ func benchmarkPostgresReplicationEventQueueAcknowledge(b *testing.B, setup map[J ctx, cancel := testhelper.Context() defer cancel() - queue := PostgresReplicationEventQueue{db.DB} + queue := PostgresReplicationEventQueue{qc: db.DB} eventTmpl := ReplicationEvent{ Job: ReplicationJob{ Change: UpdateRepo, @@ -59,7 +59,7 @@ func benchmarkPostgresReplicationEventQueueAcknowledge(b *testing.B, setup map[J b.StopTimer() b.ResetTimer() - db.TruncateAll(b) + db.Truncate(b, "replication_queue_job_lock", "replication_queue", "replication_queue_lock") total := 0 for _, count := range setup { @@ -67,7 +67,7 @@ func benchmarkPostgresReplicationEventQueueAcknowledge(b *testing.B, setup map[J total += count } - _, err := db.DB.ExecContext( + _, err := db.ExecContext( ctx, `INSERT INTO replication_queue (state, lock_id, job) SELECT 'ready', 'praefect|gitaly-1|/project/path-'|| T.I, ('{"change":"update","relative_path":"/project/path-'|| T.I || '","virtual_storage":"praefect","source_node_storage":"gitaly-0","target_node_storage":"gitaly-1"}')::JSONB @@ -76,7 +76,7 @@ func benchmarkPostgresReplicationEventQueueAcknowledge(b *testing.B, setup map[J ) require.NoError(b, err) - _, err = db.DB.ExecContext( + _, err = db.ExecContext( ctx, `INSERT INTO replication_queue_lock SELECT DISTINCT lock_id FROM replication_queue`, diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go index 65b939b1f6d..dd8aa41fac6 100644 --- a/internal/praefect/datastore/queue_test.go +++ b/internal/praefect/datastore/queue_test.go @@ -119,20 +119,21 @@ func TestPostgresReplicationEventQueue_DeleteReplicaUniqueIndex(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) ctx, cancel := testhelper.Context() defer cancel() if tc.existingJob != nil { - _, err := db.ExecContext(ctx, ` + _, err := tx.ExecContext(ctx, ` INSERT INTO replication_queue (state, job) VALUES ($1, $2) `, tc.existingJob.State, tc.existingJob.Job) require.NoError(t, err) } - _, err := NewPostgresReplicationEventQueue(db).Enqueue(ctx, ReplicationEvent{ + _, err := NewPostgresReplicationEventQueue(tx).Enqueue(ctx, ReplicationEvent{ State: JobStateReady, Job: ReplicationJob{ Change: DeleteReplica, @@ -864,8 +865,6 @@ func TestPostgresReplicationEventQueue_StartHealthUpdate(t *testing.T) { }) t.Run("stops after first error", func(t *testing.T) { - db.TruncateAll(t) - ctx, cancel := testhelper.Context() defer cancel() @@ -883,8 +882,6 @@ func TestPostgresReplicationEventQueue_StartHealthUpdate(t *testing.T) { }) t.Run("stops if nothing to update (extended coverage)", func(t *testing.T) { - db.TruncateAll(t) - ctx, cancel := testhelper.Context() defer cancel() @@ -907,8 +904,6 @@ func TestPostgresReplicationEventQueue_StartHealthUpdate(t *testing.T) { }) t.Run("triggers all passed in events", func(t *testing.T) { - db.TruncateAll(t) - var wg sync.WaitGroup ctx, cancel := testhelper.Context() defer func() { @@ -991,8 +986,10 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { db := glsql.NewDB(t) t.Run("no stale jobs yet", func(t *testing.T) { - db.TruncateAll(t) - source := NewPostgresReplicationEventQueue(db) + tx := db.Begin(t) + defer tx.Rollback(t) + + source := NewPostgresReplicationEventQueue(tx) event, err := source.Enqueue(ctx, eventType1) require.NoError(t, err) @@ -1002,12 +999,14 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { // events triggered just now (< 1 sec ago), so nothing considered stale require.NoError(t, source.AcknowledgeStale(ctx, time.Second)) - requireEvents(t, ctx, db, devents) + requireEvents(t, ctx, tx, devents) }) t.Run("jobs considered stale only at 'in_progress' state", func(t *testing.T) { - db.TruncateAll(t) - source := NewPostgresReplicationEventQueue(db) + tx := db.Begin(t) + defer tx.Rollback(t) + + source := NewPostgresReplicationEventQueue(tx) // move event to 'ready' state event1, err := source.Enqueue(ctx, eventType1) @@ -1041,12 +1040,14 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { devents2[0].State = JobStateFailed devents4[0].Attempt = 2 devents4[0].State = JobStateFailed - requireEvents(t, ctx, db, []ReplicationEvent{event1, devents2[0], devents4[0]}) + requireEvents(t, ctx, tx, []ReplicationEvent{event1, devents2[0], devents4[0]}) }) t.Run("stale jobs updated for all virtual storages and storages at once", func(t *testing.T) { - db.TruncateAll(t) - source := NewPostgresReplicationEventQueue(db) + tx := db.Begin(t) + defer tx.Rollback(t) + + source := NewPostgresReplicationEventQueue(tx) var events []ReplicationEvent for _, eventType := range []ReplicationEvent{eventType1, eventType2, eventType3} { @@ -1073,11 +1074,11 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { exp = append(exp, e) } - requireEvents(t, ctx, db, exp) + requireEvents(t, ctx, tx, exp) }) } -func requireEvents(t *testing.T, ctx context.Context, db glsql.DB, expected []ReplicationEvent) { +func requireEvents(t *testing.T, ctx context.Context, db glsql.Querier, expected []ReplicationEvent) { t.Helper() // as it is not possible to expect exact time of entity creation/update we do not fetch it from database diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go index a004ba88d2b..1fb0c4444dd 100644 --- a/internal/praefect/datastore/repository_store_test.go +++ b/internal/praefect/datastore/repository_store_test.go @@ -118,12 +118,13 @@ FROM storage_repositories func TestRepositoryStore_Postgres(t *testing.T) { db := glsql.NewDB(t) testRepositoryStore(t, func(t *testing.T, storages map[string][]string) (RepositoryStore, requireStateFunc) { - db.TruncateAll(t) - gs := NewPostgresRepositoryStore(db, storages) + tx := db.Begin(t) + t.Cleanup(func() { tx.Rollback(t) }) + gs := NewPostgresRepositoryStore(tx, storages) return gs, func(t *testing.T, ctx context.Context, vss virtualStorageState, ss storageState) { t.Helper() - requireState(t, ctx, db, vss, ss) + requireState(t, ctx, tx, vss, ss) } }) } @@ -203,9 +204,12 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - db.TruncateAll(t) - - require.NoError(t, NewPostgresRepositoryStore(db, nil).CreateRepository(ctx, 1, "virtual-storage", "relative-path", "primary", []string{"secondary"}, nil, false, false)) + globalRepositoryStore := NewPostgresRepositoryStore(db, nil) + require.NoError(t, globalRepositoryStore.CreateRepository(ctx, 1, "virtual-storage", "relative-path", "primary", []string{"secondary"}, nil, false, false)) + defer func() { + _, _, err := globalRepositoryStore.DeleteRepository(ctx, "virtual-storage", "relative-path") + require.NoError(t, err) + }() firstTx := db.Begin(t) secondTx := db.Begin(t) diff --git a/internal/praefect/datastore/storage_cleanup_test.go b/internal/praefect/datastore/storage_cleanup_test.go index 0e15de63922..1beb61af19a 100644 --- a/internal/praefect/datastore/storage_cleanup_test.go +++ b/internal/praefect/datastore/storage_cleanup_test.go @@ -1,6 +1,7 @@ package datastore import ( + "context" "database/sql" "testing" "time" @@ -18,18 +19,18 @@ func TestStorageCleanup_Populate(t *testing.T) { storageCleanup := NewStorageCleanup(db.DB) require.NoError(t, storageCleanup.Populate(ctx, "praefect", "gitaly-1")) - actual := getAllStoragesCleanup(t, db) + actual := getAllStoragesCleanup(t, ctx, db) single := []storageCleanupRow{{ClusterPath: ClusterPath{VirtualStorage: "praefect", Storage: "gitaly-1"}}} require.Equal(t, single, actual) err := storageCleanup.Populate(ctx, "praefect", "gitaly-1") require.NoError(t, err, "population of the same data should not generate an error") - actual = getAllStoragesCleanup(t, db) + actual = getAllStoragesCleanup(t, ctx, db) require.Equal(t, single, actual, "same data should not create additional rows or change existing") require.NoError(t, storageCleanup.Populate(ctx, "default", "gitaly-2")) multiple := append(single, storageCleanupRow{ClusterPath: ClusterPath{VirtualStorage: "default", Storage: "gitaly-2"}}) - actual = getAllStoragesCleanup(t, db) + actual = getAllStoragesCleanup(t, ctx, db) require.ElementsMatch(t, multiple, actual, "new data should create additional row") } @@ -38,10 +39,12 @@ func TestStorageCleanup_AcquireNextStorage(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() db := glsql.NewDB(t) - storageCleanup := NewStorageCleanup(db.DB) t.Run("ok", func(t *testing.T) { - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) + storageCleanup := NewStorageCleanup(tx) + require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1")) clusterPath, release, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second) @@ -51,7 +54,10 @@ func TestStorageCleanup_AcquireNextStorage(t *testing.T) { }) t.Run("last_run condition", func(t *testing.T) { - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) + storageCleanup := NewStorageCleanup(tx) + require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1")) // Acquire it to initialize last_run column. _, release, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second) @@ -65,7 +71,10 @@ func TestStorageCleanup_AcquireNextStorage(t *testing.T) { }) t.Run("sorting based on storage name as no executions done yet", func(t *testing.T) { - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) + storageCleanup := NewStorageCleanup(tx) + require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1")) require.NoError(t, storageCleanup.Populate(ctx, "vs", "g2")) require.NoError(t, storageCleanup.Populate(ctx, "vs", "g3")) @@ -77,7 +86,10 @@ func TestStorageCleanup_AcquireNextStorage(t *testing.T) { }) t.Run("sorting based on storage name and last_run", func(t *testing.T) { - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) + storageCleanup := NewStorageCleanup(tx) + require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1")) _, release, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second) require.NoError(t, err) @@ -91,7 +103,10 @@ func TestStorageCleanup_AcquireNextStorage(t *testing.T) { }) t.Run("sorting based on last_run", func(t *testing.T) { - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) + storageCleanup := NewStorageCleanup(tx) + require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1")) require.NoError(t, storageCleanup.Populate(ctx, "vs", "g2")) clusterPath, release, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second) @@ -110,7 +125,10 @@ func TestStorageCleanup_AcquireNextStorage(t *testing.T) { }) t.Run("already acquired won't be acquired until released", func(t *testing.T) { - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) + storageCleanup := NewStorageCleanup(tx) + require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1")) _, release1, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second) require.NoError(t, err) @@ -128,7 +146,10 @@ func TestStorageCleanup_AcquireNextStorage(t *testing.T) { }) t.Run("already acquired won't be acquired until released", func(t *testing.T) { - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) + storageCleanup := NewStorageCleanup(tx) + require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1")) _, release1, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second) require.NoError(t, err) @@ -146,30 +167,33 @@ func TestStorageCleanup_AcquireNextStorage(t *testing.T) { }) t.Run("acquired for long time triggers update loop", func(t *testing.T) { - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) + storageCleanup := NewStorageCleanup(tx) + require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1")) start := time.Now().UTC() _, release, err := storageCleanup.AcquireNextStorage(ctx, 0, 200*time.Millisecond) require.NoError(t, err) // Make sure the triggered_at column has a non NULL value after the record is acquired. - check1 := getAllStoragesCleanup(t, db) + check1 := getAllStoragesCleanup(t, ctx, tx) require.Len(t, check1, 1) require.True(t, check1[0].TriggeredAt.Valid) - require.True(t, check1[0].TriggeredAt.Time.After(start), check1[0].TriggeredAt.Time.String(), start.String()) + require.Truef(t, check1[0].TriggeredAt.Time.After(start), "%s is not after %s", check1[0].TriggeredAt, start) // Check the goroutine running in the background updates triggered_at column periodically. time.Sleep(time.Second) - check2 := getAllStoragesCleanup(t, db) + check2 := getAllStoragesCleanup(t, ctx, tx) require.Len(t, check2, 1) require.True(t, check2[0].TriggeredAt.Valid) - require.True(t, check2[0].TriggeredAt.Time.After(check1[0].TriggeredAt.Time), check2[0].TriggeredAt.Time.String(), check1[0].TriggeredAt.Time.String()) + require.Truef(t, check2[0].TriggeredAt.Time.After(check1[0].TriggeredAt.Time), "%s is not after %s", check2[0].TriggeredAt, check1[0].TriggeredAt) require.NoError(t, release()) // Make sure the triggered_at column has a NULL value after the record is released. - check3 := getAllStoragesCleanup(t, db) + check3 := getAllStoragesCleanup(t, ctx, tx) require.Len(t, check3, 1) require.False(t, check3[0].TriggeredAt.Valid) }) @@ -264,8 +288,8 @@ type storageCleanupRow struct { TriggeredAt sql.NullTime } -func getAllStoragesCleanup(t testing.TB, db glsql.DB) []storageCleanupRow { - rows, err := db.Query(`SELECT * FROM storage_cleanups`) +func getAllStoragesCleanup(t testing.TB, ctx context.Context, db glsql.Querier) []storageCleanupRow { + rows, err := db.QueryContext(ctx, `SELECT * FROM storage_cleanups`) require.NoError(t, err) defer func() { require.NoError(t, rows.Close()) diff --git a/internal/praefect/datastore/storage_provider_test.go b/internal/praefect/datastore/storage_provider_test.go index fc214df1e9a..a3054c9c1ca 100644 --- a/internal/praefect/datastore/storage_provider_test.go +++ b/internal/praefect/datastore/storage_provider_test.go @@ -22,13 +22,20 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { t.Parallel() db := glsql.NewDB(t) - rs := NewPostgresRepositoryStore(db, nil) + newPostgresRepositoryStore := func(t *testing.T) *PostgresRepositoryStore { + tx := db.Begin(t) + t.Cleanup(func() { tx.Rollback(t) }) + return NewPostgresRepositoryStore(tx, nil) + } t.Run("unknown virtual storage", func(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - require.NoError(t, rs.CreateRepository(ctx, 1, "unknown", "/repo/path", "g1", []string{"g2", "g3"}, nil, true, false)) + rs := newPostgresRepositoryStore(t) + repositoryID, err := rs.ReserveRepositoryID(ctx, "unknown", "/repo/path") + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, repositoryID, "unknown", "/repo/path", "g1", []string{"g2", "g3"}, nil, true, false)) cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"}) require.NoError(t, err) @@ -49,12 +56,13 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { }) t.Run("miss -> populate -> hit", func(t *testing.T) { - db.TruncateAll(t) - ctx, cancel := testhelper.Context() defer cancel() - require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path", "g1", []string{"g2", "g3"}, nil, true, false)) + rs := newPostgresRepositoryStore(t) + repositoryID, err := rs.ReserveRepositoryID(ctx, "vs", "/repo/path") + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, repositoryID, "vs", "/repo/path", "g1", []string{"g2", "g3"}, nil, true, false)) cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"}) require.NoError(t, err) @@ -91,11 +99,10 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { }) t.Run("repository store returns an error", func(t *testing.T) { - db.TruncateAll(t) - ctx, cancel := testhelper.Context(testhelper.ContextWithLogger(testhelper.DiscardTestEntry(t))) defer cancel() + rs := newPostgresRepositoryStore(t) cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"}) require.NoError(t, err) cache.Connected() @@ -113,15 +120,16 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { }) t.Run("cache is disabled after handling invalid payload", func(t *testing.T) { - db.TruncateAll(t) - logger := testhelper.DiscardTestEntry(t) logHook := test.NewLocal(logger.Logger) ctx, cancel := testhelper.Context(testhelper.ContextWithLogger(logger)) defer cancel() - require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path/1", "g1", []string{"g2", "g3"}, nil, true, false)) + rs := newPostgresRepositoryStore(t) + repositoryID, err := rs.ReserveRepositoryID(ctx, "vs", "/repo/path/1") + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, repositoryID, "vs", "/repo/path/1", "g1", []string{"g2", "g3"}, nil, true, false)) cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"}) require.NoError(t, err) @@ -176,13 +184,16 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { }) t.Run("cache invalidation evicts cached entries", func(t *testing.T) { - db.TruncateAll(t) - ctx, cancel := testhelper.Context() defer cancel() - require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path/1", "g1", []string{"g2", "g3"}, nil, true, false)) - require.NoError(t, rs.CreateRepository(ctx, 2, "vs", "/repo/path/2", "g1", []string{"g2"}, nil, true, false)) + rs := newPostgresRepositoryStore(t) + repositoryID1, err := rs.ReserveRepositoryID(ctx, "vs", "/repo/path/1") + require.NoError(t, err) + repositoryID2, err := rs.ReserveRepositoryID(ctx, "vs", "/repo/path/2") + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, repositoryID1, "vs", "/repo/path/1", "g1", []string{"g2", "g3"}, nil, true, false)) + require.NoError(t, rs.CreateRepository(ctx, repositoryID2, "vs", "/repo/path/2", "g1", []string{"g2"}, nil, true, false)) cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"}) require.NoError(t, err) @@ -229,12 +240,13 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { }) t.Run("disconnect event disables cache", func(t *testing.T) { - db.TruncateAll(t) - ctx, cancel := testhelper.Context() defer cancel() - require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path", "g1", []string{"g2", "g3"}, nil, true, false)) + rs := newPostgresRepositoryStore(t) + repositoryID, err := rs.ReserveRepositoryID(ctx, "vs", "/repo/path") + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, repositoryID, "vs", "/repo/path", "g1", []string{"g2", "g3"}, nil, true, false)) cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"}) require.NoError(t, err) @@ -266,13 +278,17 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { }) t.Run("concurrent access", func(t *testing.T) { - db.TruncateAll(t) - ctx, cancel := testhelper.Context() defer cancel() - require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path/1", "g1", nil, nil, true, false)) - require.NoError(t, rs.CreateRepository(ctx, 2, "vs", "/repo/path/2", "g1", nil, nil, true, false)) + rs := NewPostgresRepositoryStore(glsql.NewDB(t), nil) + + repositoryID1, err := rs.ReserveRepositoryID(ctx, "vs", "/repo/path/1") + require.NoError(t, err) + repositoryID2, err := rs.ReserveRepositoryID(ctx, "vs", "/repo/path/2") + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, repositoryID1, "vs", "/repo/path/1", "g1", nil, nil, true, false)) + require.NoError(t, rs.CreateRepository(ctx, repositoryID2, "vs", "/repo/path/2", "g1", nil, nil, true, false)) cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"}) require.NoError(t, err) diff --git a/internal/praefect/nodes/health_manager_test.go b/internal/praefect/nodes/health_manager_test.go index 07b35909063..b5c44aafaef 100644 --- a/internal/praefect/nodes/health_manager_test.go +++ b/internal/praefect/nodes/health_manager_test.go @@ -473,7 +473,8 @@ func TestHealthManager(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) healthStatus := map[string]grpc_health_v1.HealthCheckResponse_ServingStatus{} // healthManagers are cached in order to keep the internal state intact between different @@ -497,7 +498,7 @@ func TestHealthManager(t *testing.T) { } } - hm = NewHealthManager(testhelper.DiscardTestLogger(t), db, hc.PraefectName, clients) + hm = NewHealthManager(testhelper.DiscardTestLogger(t), tx, hc.PraefectName, clients) hm.handleError = func(err error) error { return err } healthManagers[hc.PraefectName] = hm } @@ -517,7 +518,7 @@ func TestHealthManager(t *testing.T) { // predate earlier health checks to simulate this health check being run after a certain // time period if hc.After > 0 { - predateHealthChecks(t, db, hc.After) + predateHealthChecks(t, ctx, tx, hc.After) } expectedHealthyNodes := map[string][]string{} @@ -557,10 +558,10 @@ func TestHealthManager(t *testing.T) { } } -func predateHealthChecks(t testing.TB, db glsql.DB, amount time.Duration) { +func predateHealthChecks(t testing.TB, ctx context.Context, db glsql.Querier, amount time.Duration) { t.Helper() - _, err := db.Exec(` + _, err := db.ExecContext(ctx, ` UPDATE node_status SET last_contact_attempt_at = last_contact_attempt_at - INTERVAL '1 MICROSECOND' * $1, last_seen_active_at = last_seen_active_at - INTERVAL '1 MICROSECOND' * $1 diff --git a/internal/praefect/nodes/per_repository_test.go b/internal/praefect/nodes/per_repository_test.go index 27d69609b57..184d31614e5 100644 --- a/internal/praefect/nodes/per_repository_test.go +++ b/internal/praefect/nodes/per_repository_test.go @@ -485,8 +485,6 @@ func TestPerRepositoryElector(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - db.TruncateAll(t) - rs := datastore.NewPostgresRepositoryStore(db, nil) for virtualStorage, relativePaths := range tc.state { for relativePath, storages := range relativePaths { @@ -498,6 +496,10 @@ func TestPerRepositoryElector(t *testing.T) { if !repoCreated { repoCreated = true require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, relativePath, storage, nil, nil, false, false)) + defer func(virtualStorage, relativePath string) { + _, _, err = rs.DeleteRepository(ctx, virtualStorage, relativePath) + require.NoError(t, err) + }(virtualStorage, relativePath) } require.NoError(t, rs.SetGeneration(ctx, repositoryID, storage, record.generation)) @@ -520,11 +522,15 @@ func TestPerRepositoryElector(t *testing.T) { event.Job.RepositoryID = repositoryID - _, err = db.ExecContext(ctx, - "INSERT INTO replication_queue (state, job) VALUES ($1, $2)", + var id int64 + require.NoError(t, db.QueryRowContext(ctx, + "INSERT INTO replication_queue (state, job) VALUES ($1, $2) RETURNING id", event.State, event.Job, - ) - require.NoError(t, err) + ).Scan(&id)) + t.Cleanup(func() { + _, err = db.ExecContext(ctx, `DELETE FROM replication_queue WHERE id = $1`, id) + require.NoError(t, err) + }) } previousPrimary := "" diff --git a/internal/praefect/reconciler/reconciler_test.go b/internal/praefect/reconciler/reconciler_test.go index 95871902aba..cf769462d53 100644 --- a/internal/praefect/reconciler/reconciler_test.go +++ b/internal/praefect/reconciler/reconciler_test.go @@ -3,6 +3,7 @@ package reconciler import ( "context" "database/sql" + "errors" "fmt" "sort" "testing" @@ -1026,7 +1027,8 @@ func TestReconciler(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - db.TruncateAll(t) + db.Truncate(t, "replication_queue") + db.SequenceReset(t) // set up the repository generation records expected by the test case rs := datastore.NewPostgresRepositoryStore(db, configuredStorages) @@ -1044,6 +1046,13 @@ func TestReconciler(t *testing.T) { require.NoError(t, err) require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, relativePath, storage, nil, nil, false, false)) + + defer func(virtualStorage, relativePath string) { + _, _, err := rs.DeleteRepository(ctx, virtualStorage, relativePath) + if !errors.As(err, &commonerr.RepositoryNotFoundError{}) { + require.NoError(t, err) + } + }(virtualStorage, relativePath) } require.NoError(t, rs.SetGeneration(ctx, repositoryID, storage, repo.generation)) diff --git a/internal/praefect/remove_repository_test.go b/internal/praefect/remove_repository_test.go index 7ca6abf0089..4fb1f495348 100644 --- a/internal/praefect/remove_repository_test.go +++ b/internal/praefect/remove_repository_test.go @@ -58,7 +58,8 @@ func TestRemoveRepositoryHandler(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) const gitaly1Storage = "gitaly-1" gitaly1Cfg := testcfg.Build(t, testcfg.WithStorages(gitaly1Storage)) @@ -84,7 +85,7 @@ func TestRemoveRepositoryHandler(t *testing.T) { gittest.Exec(t, gitaly1Cfg, "init", "--bare", repoPath) } - rs := datastore.NewPostgresRepositoryStore(db, cfg.StorageNames()) + rs := datastore.NewPostgresRepositoryStore(tx, cfg.StorageNames()) ctx, cancel := testhelper.Context() defer cancel() diff --git a/internal/praefect/replicator_pg_test.go b/internal/praefect/replicator_pg_test.go index c5d23154d6b..7f79191a62a 100644 --- a/internal/praefect/replicator_pg_test.go +++ b/internal/praefect/replicator_pg_test.go @@ -84,9 +84,10 @@ func TestReplicatorDestroy(t *testing.T) { {change: datastore.DeleteRepo}, } { t.Run(string(tc.change), func(t *testing.T) { - db.TruncateAll(t) + tx := db.Begin(t) + defer tx.Rollback(t) - rs := datastore.NewPostgresRepositoryStore(db, nil) + rs := datastore.NewPostgresRepositoryStore(tx, nil) ctx, cancel := testhelper.Context() defer cancel() diff --git a/internal/praefect/repository_exists_test.go b/internal/praefect/repository_exists_test.go index 39a25a71d9d..584fe496798 100644 --- a/internal/praefect/repository_exists_test.go +++ b/internal/praefect/repository_exists_test.go @@ -68,8 +68,10 @@ func TestRepositoryExistsHandler(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - db.TruncateAll(t) - rs := datastore.NewPostgresRepositoryStore(db, map[string][]string{"virtual-storage": {"storage"}}) + tx := db.Begin(t) + defer tx.Rollback(t) + + rs := datastore.NewPostgresRepositoryStore(tx, map[string][]string{"virtual-storage": {"storage"}}) ctx, cancel := testhelper.Context() defer cancel() diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go index 9eab3130744..dd2be1ecc09 100644 --- a/internal/praefect/router_per_repository_test.go +++ b/internal/praefect/router_per_repository_test.go @@ -601,9 +601,11 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - db.TruncateAll(t) + db.SequenceReset(t) + tx := db.Begin(t) + defer tx.Rollback(t) - rs := datastore.NewPostgresRepositoryStore(db, nil) + rs := datastore.NewPostgresRepositoryStore(tx, nil) if tc.repositoryExists { require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", relativePath, "primary", nil, nil, true, true), diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index c635b811fc5..77f371a4d01 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -639,7 +639,9 @@ func TestRenameRepository(t *testing.T) { defer tx.Rollback(t) rs := datastore.NewPostgresRepositoryStore(tx, nil) - require.NoError(t, rs.CreateRepository(ctx, 1, "praefect", repo.RelativePath, "gitaly-1", []string{"gitaly-2", "gitaly-3"}, nil, true, false)) + repositoryID, err := rs.ReserveRepositoryID(ctx, "praefect", repo.RelativePath) + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, repositoryID, "praefect", repo.RelativePath, "gitaly-1", []string{"gitaly-2", "gitaly-3"}, nil, true, false)) nodeSet, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, nil, nil) require.NoError(t, err) -- GitLab From 755d8a6dbc7d9c69f8859d42db383b753a4bb4d8 Mon Sep 17 00:00:00 2001 From: Pavlo Strokov Date: Tue, 19 Oct 2021 15:02:52 +0300 Subject: [PATCH 05/10] sql: List of migrations provided from outside To pre-populate database with some data and run migrations on top of it we should have ability to inject additional migrations. This change extends 'Migrate' function to accept list of migrations. All callers now use 'migrations.All()' list, but it will be extended for the tests in upcoming commit. --- cmd/praefect/subcmd.go | 3 ++- internal/praefect/datastore/glsql/postgres.go | 4 ++-- internal/praefect/datastore/glsql/testing.go | 5 +++-- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go index a85f0b7721f..83afe3d5234 100644 --- a/cmd/praefect/subcmd.go +++ b/cmd/praefect/subcmd.go @@ -16,6 +16,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/migrations" "google.golang.org/grpc" ) @@ -125,7 +126,7 @@ func (s *sqlMigrateSubcommand) Exec(flags *flag.FlagSet, conf config.Config) err } defer clean() - n, err := glsql.Migrate(db, s.ignoreUnknown) + n, err := glsql.Migrate(db, s.ignoreUnknown, migrations.All()) if err != nil { return fmt.Errorf("%s: fail: %v", subCmd, err) } diff --git a/internal/praefect/datastore/glsql/postgres.go b/internal/praefect/datastore/glsql/postgres.go index 4b5efcd86a3..28a80306ab4 100644 --- a/internal/praefect/datastore/glsql/postgres.go +++ b/internal/praefect/datastore/glsql/postgres.go @@ -28,14 +28,14 @@ func OpenDB(conf config.DB) (*sql.DB, error) { } // Migrate will apply all pending SQL migrations. -func Migrate(db *sql.DB, ignoreUnknown bool) (int, error) { +func Migrate(db *sql.DB, ignoreUnknown bool, mgs []*migrate.Migration) (int, error) { migrationSet := migrate.MigrationSet{ IgnoreUnknown: ignoreUnknown, TableName: migrations.MigrationTableName, } migrationSource := &migrate.MemoryMigrationSource{ - Migrations: migrations.All(), + Migrations: mgs, } return migrationSet.Exec(db, "postgres", migrationSource, migrate.Up) diff --git a/internal/praefect/datastore/glsql/testing.go b/internal/praefect/datastore/glsql/testing.go index 616e1f967ca..6ca8bec52bf 100644 --- a/internal/praefect/datastore/glsql/testing.go +++ b/internal/praefect/datastore/glsql/testing.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/migrations" ) const ( @@ -237,7 +238,7 @@ func initPraefectTestDB(t testing.TB, database string) *sql.DB { require.NoErrorf(t, templateDB.Close(), "release connection to the %q database", templateDBConf.DBName) }() - if _, err := Migrate(templateDB, false); err != nil { + if _, err := Migrate(templateDB, false, migrations.All()); err != nil { // If database has unknown migration we try to re-create template database with // current migration. It may be caused by other code changes done in another branch. if pErr := (*migrate.PlanError)(nil); errors.As(err, &pErr) { @@ -253,7 +254,7 @@ func initPraefectTestDB(t testing.TB, database string) *sql.DB { defer func() { require.NoErrorf(t, remigrateTemplateDB.Close(), "release connection to the %q database", templateDBConf.DBName) }() - _, err = Migrate(remigrateTemplateDB, false) + _, err = Migrate(remigrateTemplateDB, false, migrations.All()) require.NoErrorf(t, err, "failed to run database migration on %q", praefectTemplateDatabase) } else { require.NoErrorf(t, err, "failed to run database migration on %q", praefectTemplateDatabase) -- GitLab From 109e213f5b1f5e6578ed184ef8aa56004f054621 Mon Sep 17 00:00:00 2001 From: Pavlo Strokov Date: Tue, 19 Oct 2021 15:11:09 +0300 Subject: [PATCH 06/10] sql: Truncate doesn't restart sequences To get a purely new database schema for each test run we used 'Truncate' method that cleans up tables and resets sequences in the database to initial values. As we are gonna to pre-populate database with some data before test run we should not reset sequences as it would result to duplicate keys during record insertion operations. Those we drop sequence reset and adjust the test with the change. --- internal/praefect/datastore/glsql/testing.go | 3 --- internal/praefect/datastore/glsql/testing_test.go | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/internal/praefect/datastore/glsql/testing.go b/internal/praefect/datastore/glsql/testing.go index 6ca8bec52bf..aa180fd7100 100644 --- a/internal/praefect/datastore/glsql/testing.go +++ b/internal/praefect/datastore/glsql/testing.go @@ -74,9 +74,6 @@ func (db DB) Truncate(t testing.TB, tables ...string) { _, err := db.DB.Exec("DELETE FROM " + table) require.NoError(t, err, "database cleanup failed: %s", tables) } - - _, err := db.DB.Exec("SELECT setval(relname::TEXT, 1, false) from pg_class where relkind = 'S'") - require.NoError(t, err, "database cleanup failed: %s", tables) } // RequireRowsInTable verifies that `tname` table has `n` amount of rows in it. diff --git a/internal/praefect/datastore/glsql/testing_test.go b/internal/praefect/datastore/glsql/testing_test.go index 26a4f499cc9..76b457e993f 100644 --- a/internal/praefect/datastore/glsql/testing_test.go +++ b/internal/praefect/datastore/glsql/testing_test.go @@ -27,5 +27,5 @@ func TestDB_Truncate(t *testing.T) { var id int require.NoError(t, db.QueryRow("INSERT INTO truncate_tbl VALUES (DEFAULT) RETURNING id").Scan(&id)) - require.Equal(t, 1, id, "sequence for primary key must be restarted") + require.Equal(t, 3, id, "sequence for primary key must be restarted") } -- GitLab From 4440c010033ab762621e3313c150987f552baf4e Mon Sep 17 00:00:00 2001 From: Pavlo Strokov Date: Tue, 19 Oct 2021 16:31:05 +0300 Subject: [PATCH 07/10] sql: Run migrations and tests on pre-populated database To get better tests we should run test scenarios on top of pre-populated database. It will give us more confidence about our queries as well as highlight any hidden conditions as triggers execution during run of the migration scripts, etc. The population of the database is done via a migration script that is injected into a common migration set during template database creation. It inserts data into storage_repositories and repositories tables at the moment as we had problems with running migrations on them because of the triggers. The set could be extended as well. We also could add additional code checks to verify migrations were applied as expected. Because of pre-existing data we don't want to remove we change the way we do the testing. Now we rely on the transactions to run the tests and get into initial database state once the test is completed. If it is not possible to use a transaction (nested transactions are not supported) we remove any created artifacts after test completion. We change repository_id hardcoded values to the value returned by the ReserveRepositoryID method for particular virtual storage and relative path as we can't predict auto-generated values for the column anymore. Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/3820 --- cmd/praefect/subcmd_accept_dataloss_test.go | 14 +- cmd/praefect/subcmd_dataloss_test.go | 22 +- .../subcmd_set_replication_factor_test.go | 7 +- internal/praefect/checks_test.go | 3 + internal/praefect/coordinator_pg_test.go | 26 +- internal/praefect/coordinator_test.go | 8 +- .../praefect/datastore/assignment_test.go | 17 +- internal/praefect/datastore/glsql/testing.go | 65 ++- .../datastore/listener_postgres_test.go | 16 +- .../datastore/repository_store_test.go | 452 ++++++++++-------- .../datastore/storage_cleanup_test.go | 59 ++- internal/praefect/info_service_test.go | 4 +- .../praefect/nodes/health_manager_test.go | 6 +- .../praefect/nodes/per_repository_test.go | 14 +- .../praefect/reconciler/reconciler_test.go | 175 +++---- internal/praefect/replicator_pg_test.go | 10 +- .../praefect/repocleaner/repository_test.go | 6 +- .../praefect/router_per_repository_test.go | 7 +- 18 files changed, 525 insertions(+), 386 deletions(-) diff --git a/cmd/praefect/subcmd_accept_dataloss_test.go b/cmd/praefect/subcmd_accept_dataloss_test.go index 55d235afcdf..b58a0f37865 100644 --- a/cmd/praefect/subcmd_accept_dataloss_test.go +++ b/cmd/praefect/subcmd_accept_dataloss_test.go @@ -39,18 +39,20 @@ func TestAcceptDatalossSubcommand(t *testing.T) { rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) startingGenerations := map[string]int{st1: 1, st2: 0, st3: datastore.GenerationUnknown} - repoCreated := false + repositoryID := int64(0) for storage, generation := range startingGenerations { if generation == datastore.GenerationUnknown { continue } - if !repoCreated { - repoCreated = true - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, storage, nil, nil, false, false)) + if repositoryID == 0 { + var err error + repositoryID, err = rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, repositoryID, vs, repo, storage, nil, nil, false, false)) } - require.NoError(t, rs.SetGeneration(ctx, 1, storage, generation)) + require.NoError(t, rs.SetGeneration(ctx, repositoryID, storage, generation)) } ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer(info.NewServer(conf, rs, nil, nil, nil))}) @@ -145,7 +147,7 @@ func TestAcceptDatalossSubcommand(t *testing.T) { require.NoError(t, fs.Parse(tc.args)) tc.matchError(t, cmd.Exec(fs, conf)) for storage, expected := range tc.expectedGenerations { - actual, err := rs.GetGeneration(ctx, 1, storage) + actual, err := rs.GetGeneration(ctx, repositoryID, storage) require.NoError(t, err) require.Equal(t, expected, actual, storage) } diff --git a/cmd/praefect/subcmd_dataloss_test.go b/cmd/praefect/subcmd_dataloss_test.go index 07d7ad60286..deb16746c18 100644 --- a/cmd/praefect/subcmd_dataloss_test.go +++ b/cmd/praefect/subcmd_dataloss_test.go @@ -56,28 +56,28 @@ func TestDatalossSubcommand(t *testing.T) { ` INSERT INTO repositories (repository_id, virtual_storage, relative_path, "primary") VALUES - (1, 'virtual-storage-1', 'repository-1', 'gitaly-1'), - (2, 'virtual-storage-1', 'repository-2', 'gitaly-3') + (100000000, 'virtual-storage-1', 'repository-1', 'gitaly-1'), + (100000001, 'virtual-storage-1', 'repository-2', 'gitaly-3') `, ` INSERT INTO repository_assignments (repository_id, virtual_storage, relative_path, storage) VALUES - (1, 'virtual-storage-1', 'repository-1', 'gitaly-1'), - (1, 'virtual-storage-1', 'repository-1', 'gitaly-2'), - (2, 'virtual-storage-1', 'repository-2', 'gitaly-1'), - (2, 'virtual-storage-1', 'repository-2', 'gitaly-3') + (100000000, 'virtual-storage-1', 'repository-1', 'gitaly-1'), + (100000000, 'virtual-storage-1', 'repository-1', 'gitaly-2'), + (100000001, 'virtual-storage-1', 'repository-2', 'gitaly-1'), + (100000001, 'virtual-storage-1', 'repository-2', 'gitaly-3') `, } { _, err := tx.ExecContext(ctx, q) require.NoError(t, err) } - require.NoError(t, gs.SetGeneration(ctx, 1, "gitaly-1", 1)) - require.NoError(t, gs.SetGeneration(ctx, 1, "gitaly-2", 0)) - require.NoError(t, gs.SetGeneration(ctx, 1, "gitaly-3", 0)) + require.NoError(t, gs.SetGeneration(ctx, 100000000, "gitaly-1", 1)) + require.NoError(t, gs.SetGeneration(ctx, 100000000, "gitaly-2", 0)) + require.NoError(t, gs.SetGeneration(ctx, 100000000, "gitaly-3", 0)) - require.NoError(t, gs.SetGeneration(ctx, 2, "gitaly-2", 1)) - require.NoError(t, gs.SetGeneration(ctx, 2, "gitaly-3", 0)) + require.NoError(t, gs.SetGeneration(ctx, 100000001, "gitaly-2", 1)) + require.NoError(t, gs.SetGeneration(ctx, 100000001, "gitaly-3", 0)) ln, clean := listenAndServe(t, []svcRegistrar{ registerPraefectInfoServer(info.NewServer(cfg, gs, nil, nil, nil)), diff --git a/cmd/praefect/subcmd_set_replication_factor_test.go b/cmd/praefect/subcmd_set_replication_factor_test.go index 6e33847b4bd..8369d436d83 100644 --- a/cmd/praefect/subcmd_set_replication_factor_test.go +++ b/cmd/praefect/subcmd_set_replication_factor_test.go @@ -92,8 +92,11 @@ func TestSetReplicationFactorSubcommand(t *testing.T) { } // create a repository record + rs := datastore.NewPostgresRepositoryStore(tx, nil) + id, err := rs.ReserveRepositoryID(ctx, "virtual-storage", "relative-path") + require.NoError(t, err) require.NoError(t, - datastore.NewPostgresRepositoryStore(tx, nil).CreateRepository(ctx, 1, "virtual-storage", "relative-path", "primary", nil, nil, false, false), + rs.CreateRepository(ctx, id, "virtual-storage", "relative-path", "primary", nil, nil, false, false), ) ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer( @@ -105,7 +108,7 @@ func TestSetReplicationFactorSubcommand(t *testing.T) { cmd := &setReplicationFactorSubcommand{stdout: stdout} fs := cmd.FlagSet() require.NoError(t, fs.Parse(tc.args)) - err := cmd.Exec(fs, config.Config{ + err = cmd.Exec(fs, config.Config{ SocketPath: ln.Addr().String(), }) testassert.GrpcEqualErr(t, tc.error, err) diff --git a/internal/praefect/checks_test.go b/internal/praefect/checks_test.go index 9e6a6f1b5c3..f8adc31f7ff 100644 --- a/internal/praefect/checks_test.go +++ b/internal/praefect/checks_test.go @@ -55,6 +55,9 @@ func TestPraefectMigrations_success(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { var cfg config.Config db := glsql.NewDB(t) + // Migration that adds artificial data to the database needs to be dropped first + _, err := db.Exec(`DELETE FROM ` + migrations.MigrationTableName + " WHERE id = '20200921170400_artificial_repositories'") + require.NoError(t, err) cfg.DB = glsql.GetDBConfig(t, db.Name) require.NoError(t, tc.prepare(cfg)) diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go index 53b645666f9..9dc5db242cb 100644 --- a/internal/praefect/coordinator_pg_test.go +++ b/internal/praefect/coordinator_pg_test.go @@ -10,7 +10,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" @@ -159,7 +158,6 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { for _, tc := range testcases { t.Run(tc.desc, func(t *testing.T) { db.Truncate(t, "replication_queue_job_lock", "replication_queue", "replication_queue_lock") - db.SequenceReset(t) storageNodes := make([]*config.Node, 0, len(tc.nodes)) for i := range tc.nodes { @@ -201,25 +199,25 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { // set up the generations prior to transaction rs := datastore.NewPostgresRepositoryStore(tx, conf.StorageNames()) - repoCreated := false + var id int64 + var err error for i, n := range tc.nodes { if n.generation == datastore.GenerationUnknown { continue } - if !repoCreated { - repoCreated = true - require.NoError(t, rs.CreateRepository(ctx, 1, repo.StorageName, repo.RelativePath, storageNodes[i].Storage, nil, nil, true, false)) + if id == 0 { + id, err = rs.ReserveRepositoryID(ctx, repo.StorageName, repo.RelativePath) + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id, repo.StorageName, repo.RelativePath, storageNodes[i].Storage, nil, nil, true, false)) defer func() { - repositoryStore := datastore.NewPostgresRepositoryStore(db, nil) - _, _, err := repositoryStore.DeleteRepository(ctx, repo.StorageName, repo.RelativePath) - if !errors.As(err, &commonerr.RepositoryNotFoundError{}) { - require.NoError(t, err) - } + repoStore := datastore.NewPostgresRepositoryStore(db, nil) + _, _, err := repoStore.DeleteRepository(ctx, repo.StorageName, repo.RelativePath) + require.NoError(t, err) }() } - require.NoError(t, rs.SetGeneration(ctx, 1, storageNodes[i].Storage, n.generation)) + require.NoError(t, rs.SetGeneration(ctx, id, storageNodes[i].Storage, n.generation)) } testhelper.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": conf.StorageNames()}) @@ -307,7 +305,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { } if tc.concurrentWrite { - require.NoError(t, rs.SetGeneration(ctx, 1, "non-participating-storage", 2)) + require.NoError(t, rs.SetGeneration(ctx, id, "non-participating-storage", 2)) } err = streamParams.RequestFinalizer() @@ -321,7 +319,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { // Nodes that did not successfully commit or did not participate should remain on their // existing generation. for i, n := range tc.nodes { - gen, err := rs.GetGeneration(ctx, 1, storageNodes[i].Storage) + gen, err := rs.GetGeneration(ctx, id, storageNodes[i].Storage) require.NoError(t, err) require.Equal(t, n.expectedGeneration, gen, "node %d has wrong generation", i) } diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index a7a7561cd32..0dbc6b839ef 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -194,8 +194,12 @@ func TestStreamDirectorMutator(t *testing.T) { rs := datastore.NewPostgresRepositoryStore(tx, conf.StorageNames()) + var id int64 + var err error if tc.repositoryExists { - require.NoError(t, rs.CreateRepository(ctx, 1, targetRepo.StorageName, targetRepo.RelativePath, primaryNode.Storage, []string{secondaryNode.Storage}, nil, true, true)) + id, err = rs.ReserveRepositoryID(ctx, targetRepo.StorageName, targetRepo.RelativePath) + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id, targetRepo.StorageName, targetRepo.RelativePath, primaryNode.Storage, []string{secondaryNode.Storage}, nil, true, true)) } testhelper.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": conf.StorageNames()}) @@ -273,7 +277,7 @@ func TestStreamDirectorMutator(t *testing.T) { CreatedAt: events[0].CreatedAt, UpdatedAt: events[0].UpdatedAt, Job: datastore.ReplicationJob{ - RepositoryID: 1, + RepositoryID: id, Change: datastore.UpdateRepo, VirtualStorage: conf.VirtualStorages[0].Name, RelativePath: targetRepo.RelativePath, diff --git a/internal/praefect/datastore/assignment_test.go b/internal/praefect/datastore/assignment_test.go index 12409a6e3d9..599d739efe5 100644 --- a/internal/praefect/datastore/assignment_test.go +++ b/internal/praefect/datastore/assignment_test.go @@ -92,7 +92,7 @@ func TestAssignmentStore_GetHostAssignments(t *testing.T) { require.NoError(t, rs.CreateRepository(ctx, repositoryID, assignment.virtualStorage, assignment.relativePath, assignment.storage, nil, nil, false, false)) } - _, err = db.ExecContext(ctx, ` + _, err = tx.ExecContext(ctx, ` INSERT INTO repository_assignments (repository_id, virtual_storage, relative_path, storage) VALUES ($1, $2, $3, $4) `, repositoryID, assignment.virtualStorage, assignment.relativePath, assignment.storage) @@ -133,6 +133,7 @@ func TestAssignmentStore_SetReplicationFactor(t *testing.T) { } db := glsql.NewDB(t) + const repositoryID = 1e10 for _, tc := range []struct { desc string @@ -219,16 +220,16 @@ func TestAssignmentStore_SetReplicationFactor(t *testing.T) { if !tc.nonExistentRepository { _, err := tx.ExecContext(ctx, ` INSERT INTO repositories (virtual_storage, relative_path, "primary", repository_id) - VALUES ('virtual-storage', 'relative-path', 'primary', 1000) - `) + VALUES ('virtual-storage', 'relative-path', 'primary', $1) + `, repositoryID) require.NoError(t, err) } for _, storage := range tc.existingAssignments { _, err := tx.ExecContext(ctx, ` INSERT INTO repository_assignments (virtual_storage, relative_path, storage, repository_id) - VALUES ('virtual-storage', 'relative-path', $1, 1000) - `, storage) + VALUES ('virtual-storage', 'relative-path', $1, $2) + `, storage, repositoryID) require.NoError(t, err) } @@ -242,7 +243,7 @@ func TestAssignmentStore_SetReplicationFactor(t *testing.T) { tc.requireStorages(t, setStorages) - assignedStorages, err := store.GetHostAssignments(ctx, "virtual-storage", 1) + assignedStorages, err := store.GetHostAssignments(ctx, "virtual-storage", repositoryID) require.NoError(t, err) sort.Strings(assignedStorages) @@ -252,8 +253,8 @@ func TestAssignmentStore_SetReplicationFactor(t *testing.T) { require.NoError(t, tx.QueryRowContext(ctx, ` SELECT array_agg(storage) FROM repository_assignments - WHERE COALESCE(repository_id != 1000, true) - `).Scan(&storagesWithIncorrectRepositoryID)) + WHERE COALESCE(repository_id != $1, true) + `, repositoryID).Scan(&storagesWithIncorrectRepositoryID)) require.Empty(t, storagesWithIncorrectRepositoryID) }) } diff --git a/internal/praefect/datastore/glsql/testing.go b/internal/praefect/datastore/glsql/testing.go index aa180fd7100..e1d37ec91b7 100644 --- a/internal/praefect/datastore/glsql/testing.go +++ b/internal/praefect/datastore/glsql/testing.go @@ -235,7 +235,7 @@ func initPraefectTestDB(t testing.TB, database string) *sql.DB { require.NoErrorf(t, templateDB.Close(), "release connection to the %q database", templateDBConf.DBName) }() - if _, err := Migrate(templateDB, false, migrations.All()); err != nil { + if _, err := Migrate(templateDB, false, append(migrations.All(), testDataMigrations()...)); err != nil { // If database has unknown migration we try to re-create template database with // current migration. It may be caused by other code changes done in another branch. if pErr := (*migrate.PlanError)(nil); errors.As(err, &pErr) { @@ -251,7 +251,7 @@ func initPraefectTestDB(t testing.TB, database string) *sql.DB { defer func() { require.NoErrorf(t, remigrateTemplateDB.Close(), "release connection to the %q database", templateDBConf.DBName) }() - _, err = Migrate(remigrateTemplateDB, false, migrations.All()) + _, err = Migrate(remigrateTemplateDB, false, append(migrations.All(), testDataMigrations()...)) require.NoErrorf(t, err, "failed to run database migration on %q", praefectTemplateDatabase) } else { require.NoErrorf(t, err, "failed to run database migration on %q", praefectTemplateDatabase) @@ -350,3 +350,64 @@ func getDatabaseEnvironment(t testing.TB) map[string]string { return databaseEnv } + +// testDataMigrations function should be used to include additional migration scripts into the +// existing set of migrations. All migrations applied in order. The ordering is done via numeric +// prefix of the migration. To insert migration in between some existing migrations you should +// include it with the prefix that will be greater that the lowest and lesser than the upper one. +func testDataMigrations() []*migrate.Migration { + return []*migrate.Migration{ + { + // Applied after 20200921170311_repositories_primary_column + Id: "20200921170400_artificial_repositories", + Up: []string{ + // random_hex_string generates a random HEX string with requested length. + `-- +migrate StatementBegin + CREATE OR REPLACE FUNCTION random_hex_string(length INTEGER) RETURNS TEXT AS + $$ + DECLARE + chars TEXT[] := '{0,1,2,3,4,5,6,7,8,9,a,b,c,d,e,f}'; + result TEXT := ''; + i INTEGER := 0; + BEGIN + IF length < 0 THEN + RAISE EXCEPTION 'Given length cannot be less than 0'; + END IF; + FOR i IN 1..length LOOP + result := result || chars[1+RANDOM()*(ARRAY_LENGTH(chars, 1)-1)]; + END LOOP; + RETURN result; + END; + $$ LANGUAGE plpgsql`, + + // repository_relative_path generates a relative path for the repository. + // It has a standard format: @hashed/ab/cd/abcd000000000000000000000000000000000000000000000000000000000000.git + `-- +migrate StatementBegin + CREATE OR REPLACE FUNCTION repository_relative_path() RETURNS TEXT AS + $$ + DECLARE + result TEXT; + BEGIN + SELECT CONCAT_WS('/', '@hashed', SUBSTR(path, 1, 2), SUBSTR(path, 3, 2), path || '.git') INTO result + FROM (SELECT random_hex_string(64) AS path) t; + RETURN result; + END; + $$ LANGUAGE plpgsql + -- +migrate StatementEnd`, + + // Populate database tables with artificial data. + `INSERT INTO storage_repositories (virtual_storage, relative_path, storage, generation) + SELECT virtual_storages.name, repositories.relative_path, storages.name, 1 + FROM (SELECT UNNEST(ARRAY['artificial-praefect-1','artificial-praefect-2']) AS name) virtual_storages + CROSS JOIN + (SELECT UNNEST(ARRAY['artificial-gitaly-1','artificial-gitaly-2','artificial-gitaly-3']) AS name) storages + CROSS JOIN + (SELECT generate_series(1,300), repository_relative_path() AS relative_path) repositories`, + + `INSERT INTO repositories (virtual_storage, relative_path, generation) + SELECT DISTINCT virtual_storage, relative_path, 1 + FROM storage_repositories`, + }, + }, + } +} diff --git a/internal/praefect/datastore/listener_postgres_test.go b/internal/praefect/datastore/listener_postgres_test.go index 53f1cc40358..7dc5c99a801 100644 --- a/internal/praefect/datastore/listener_postgres_test.go +++ b/internal/praefect/datastore/listener_postgres_test.go @@ -377,7 +377,7 @@ func TestPostgresListener_Listen_repositories_delete(t *testing.T) { "repositories_updates", func(t *testing.T) { _, err := db.DB.Exec(` - INSERT INTO repositories + INSERT INTO repositories (virtual_storage, relative_path, generation) VALUES ('praefect-1', '/path/to/repo/1', 1), ('praefect-1', '/path/to/repo/2', 1), ('praefect-1', '/path/to/repo/3', 0), @@ -385,7 +385,7 @@ func TestPostgresListener_Listen_repositories_delete(t *testing.T) { require.NoError(t, err) }, func(t *testing.T) { - _, err := db.DB.Exec(`DELETE FROM repositories WHERE generation > 0`) + _, err := db.DB.Exec(`DELETE FROM repositories WHERE generation > 0 AND virtual_storage LIKE 'praefect%'`) require.NoError(t, err) }, func(t *testing.T, n glsql.Notification) { @@ -435,11 +435,14 @@ func TestPostgresListener_Listen_storage_repositories_update(t *testing.T) { db.Name, channel, func(t *testing.T) { - _, err := db.DB.Exec(`INSERT INTO storage_repositories VALUES ('praefect-1', '/path/to/repo', 'gitaly-1', 0)`) + _, err := db.DB.Exec( + `INSERT INTO storage_repositories(virtual_storage, relative_path, storage, generation) + VALUES ('praefect-1', '/path/to/repo', 'gitaly-1', 0)`, + ) require.NoError(t, err) }, func(t *testing.T) { - _, err := db.DB.Exec(`UPDATE storage_repositories SET generation = generation + 1`) + _, err := db.DB.Exec(`UPDATE storage_repositories SET generation = generation + 1 WHERE virtual_storage = 'praefect-1'`) require.NoError(t, err) }, func(t *testing.T, n glsql.Notification) { @@ -461,7 +464,7 @@ func TestPostgresListener_Listen_storage_empty_notification(t *testing.T) { channel, func(t *testing.T) {}, func(t *testing.T) { - _, err := db.DB.Exec(`UPDATE storage_repositories SET generation = 1`) + _, err := db.DB.Exec(`UPDATE storage_repositories SET generation = 1 WHERE repository_id = -1`) require.NoError(t, err) }, nil, // no notification events expected @@ -486,7 +489,7 @@ func TestPostgresListener_Listen_storage_repositories_delete(t *testing.T) { require.NoError(t, err) }, func(t *testing.T) { - _, err := db.DB.Exec(`DELETE FROM storage_repositories`) + _, err := db.DB.Exec(`DELETE FROM storage_repositories WHERE virtual_storage = 'praefect-1'`) require.NoError(t, err) }, func(t *testing.T, n glsql.Notification) { @@ -497,6 +500,7 @@ func TestPostgresListener_Listen_storage_repositories_delete(t *testing.T) { } func testListener(t *testing.T, dbName, channel string, setup func(t *testing.T), trigger func(t *testing.T), verifier func(t *testing.T, notification glsql.Notification)) { + t.Helper() setup(t) readyChan := make(chan struct{}) diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go index 1fb0c4444dd..9781f195ca0 100644 --- a/internal/praefect/datastore/repository_store_test.go +++ b/internal/praefect/datastore/repository_store_test.go @@ -52,8 +52,7 @@ LEFT JOIN ( FROM repository_assignments GROUP BY repository_id, virtual_storage, relative_path ) AS repository_assignments USING (repository_id, virtual_storage, relative_path) - - `) +WHERE virtual_storage NOT LIKE 'artificial%'`) require.NoError(t, err) defer rows.Close() @@ -85,7 +84,7 @@ LEFT JOIN ( requireStorageState := func(t testing.TB, ctx context.Context, exp storageState) { rows, err := db.QueryContext(ctx, ` SELECT repository_id, virtual_storage, relative_path, storage, generation -FROM storage_repositories +FROM storage_repositories WHERE virtual_storage NOT LIKE 'artificial%' `) require.NoError(t, err) defer rows.Close() @@ -142,7 +141,7 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) { first call second call error error - state storageState + state func(id int64) storageState }{ { desc: "both successful", @@ -154,13 +153,15 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) { primary: "primary", secondaries: []string{"secondary"}, }, - state: storageState{ - "virtual-storage": { - "relative-path": { - "primary": {repositoryID: 1, generation: 2}, - "secondary": {repositoryID: 1, generation: 2}, + state: func(id int64) storageState { + return storageState{ + "virtual-storage": { + "relative-path": { + "primary": {repositoryID: id, generation: 2}, + "secondary": {repositoryID: id, generation: 2}, + }, }, - }, + } }, }, { @@ -172,13 +173,15 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) { primary: "primary", secondaries: []string{"secondary"}, }, - state: storageState{ - "virtual-storage": { - "relative-path": { - "primary": {repositoryID: 1, generation: 2}, - "secondary": {repositoryID: 1, generation: 0}, + state: func(id int64) storageState { + return storageState{ + "virtual-storage": { + "relative-path": { + "primary": {repositoryID: id, generation: 2}, + "secondary": {repositoryID: id, generation: 0}, + }, }, - }, + } }, }, { @@ -190,13 +193,15 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) { primary: "secondary", }, error: errWriteToOutdatedNodes, - state: storageState{ - "virtual-storage": { - "relative-path": { - "primary": {repositoryID: 1, generation: 1}, - "secondary": {repositoryID: 1, generation: 0}, + state: func(id int64) storageState { + return storageState{ + "virtual-storage": { + "relative-path": { + "primary": {repositoryID: id, generation: 1}, + "secondary": {repositoryID: id, generation: 0}, + }, }, - }, + } }, }, } { @@ -205,7 +210,9 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) { defer cancel() globalRepositoryStore := NewPostgresRepositoryStore(db, nil) - require.NoError(t, globalRepositoryStore.CreateRepository(ctx, 1, "virtual-storage", "relative-path", "primary", []string{"secondary"}, nil, false, false)) + id, err := globalRepositoryStore.ReserveRepositoryID(ctx, "virtual-storage", "relative-path") + require.NoError(t, err) + require.NoError(t, globalRepositoryStore.CreateRepository(ctx, id, "virtual-storage", "relative-path", "primary", []string{"secondary"}, nil, false, false)) defer func() { _, _, err := globalRepositoryStore.DeleteRepository(ctx, "virtual-storage", "relative-path") require.NoError(t, err) @@ -214,7 +221,7 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) { firstTx := db.Begin(t) secondTx := db.Begin(t) - err := NewPostgresRepositoryStore(firstTx, nil).IncrementGeneration(ctx, 1, tc.first.primary, tc.first.secondaries) + err = NewPostgresRepositoryStore(firstTx, nil).IncrementGeneration(ctx, id, tc.first.primary, tc.first.secondaries) require.NoError(t, err) go func() { @@ -222,13 +229,13 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) { firstTx.Commit(t) }() - err = NewPostgresRepositoryStore(secondTx, nil).IncrementGeneration(ctx, 1, tc.second.primary, tc.second.secondaries) + err = NewPostgresRepositoryStore(secondTx, nil).IncrementGeneration(ctx, id, tc.second.primary, tc.second.secondaries) require.Equal(t, tc.error, err) secondTx.Commit(t) requireState(t, ctx, db, - virtualStorageState{"virtual-storage": {"relative-path": {repositoryID: 1, replicaPath: "relative-path"}}}, - tc.state, + virtualStorageState{"virtual-storage": {"relative-path": {repositoryID: id, replicaPath: "relative-path"}}}, + tc.state(id), ) }) } @@ -277,7 +284,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { rs, requireState := newStore(t, nil) require.Equal(t, - rs.IncrementGeneration(ctx, 1, "primary", []string{"secondary-1"}), + rs.IncrementGeneration(ctx, 1e10, "primary", []string{"secondary-1"}), commonerr.ErrRepositoryNotFound, ) requireState(t, ctx, @@ -288,26 +295,27 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("write to outdated nodes", func(t *testing.T) { rs, requireState := newStore(t, nil) - - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "latest-node", []string{"outdated-primary", "outdated-secondary"}, nil, false, false)) - require.NoError(t, rs.SetGeneration(ctx, 1, "latest-node", 1)) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, "latest-node", []string{"outdated-primary", "outdated-secondary"}, nil, false, false)) + require.NoError(t, rs.SetGeneration(ctx, id, "latest-node", 1)) require.Equal(t, - rs.IncrementGeneration(ctx, 1, "outdated-primary", []string{"outdated-secondary"}), + rs.IncrementGeneration(ctx, id, "outdated-primary", []string{"outdated-secondary"}), errWriteToOutdatedNodes, ) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": {repositoryID: 1, replicaPath: "repository-1"}, + "repository-1": {repositoryID: id, replicaPath: "repository-1"}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "latest-node": {repositoryID: 1, generation: 1}, - "outdated-primary": {repositoryID: 1, generation: 0}, - "outdated-secondary": {repositoryID: 1, generation: 0}, + "latest-node": {repositoryID: id, generation: 1}, + "outdated-primary": {repositoryID: id, generation: 0}, + "outdated-secondary": {repositoryID: id, generation: 0}, }, }, }, @@ -317,82 +325,86 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("increments generation for up to date nodes", func(t *testing.T) { rs, requireState := newStore(t, nil) - for id, pair := range []struct{ virtualStorage, relativePath string }{ + var ids []int64 + for _, pair := range []struct{ virtualStorage, relativePath string }{ {vs, repo}, // create records that don't get modified to ensure the query is correctly scoped by virtual storage // and relative path {vs, "other-relative-path"}, {"other-virtual-storage", repo}, } { - require.NoError(t, rs.CreateRepository(ctx, int64(id+1), pair.virtualStorage, pair.relativePath, "primary", []string{"up-to-date-secondary", "outdated-secondary"}, nil, false, false)) + id, err := rs.ReserveRepositoryID(ctx, pair.virtualStorage, pair.relativePath) + require.NoError(t, err) + ids = append(ids, id) + require.NoError(t, rs.CreateRepository(ctx, id, pair.virtualStorage, pair.relativePath, "primary", []string{"up-to-date-secondary", "outdated-secondary"}, nil, false, false)) } - require.NoError(t, rs.IncrementGeneration(ctx, 1, "primary", []string{"up-to-date-secondary"})) + require.NoError(t, rs.IncrementGeneration(ctx, ids[0], "primary", []string{"up-to-date-secondary"})) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": {repositoryID: 1, replicaPath: "repository-1"}, - "other-relative-path": {repositoryID: 2, replicaPath: "other-relative-path"}, + "repository-1": {repositoryID: ids[0], replicaPath: "repository-1"}, + "other-relative-path": {repositoryID: ids[1], replicaPath: "other-relative-path"}, }, "other-virtual-storage": { - "repository-1": {repositoryID: 3, replicaPath: "repository-1"}, + "repository-1": {repositoryID: ids[2], replicaPath: "repository-1"}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "primary": {repositoryID: 1, generation: 1}, - "up-to-date-secondary": {repositoryID: 1, generation: 1}, - "outdated-secondary": {repositoryID: 1, generation: 0}, + "primary": {repositoryID: ids[0], generation: 1}, + "up-to-date-secondary": {repositoryID: ids[0], generation: 1}, + "outdated-secondary": {repositoryID: ids[0], generation: 0}, }, "other-relative-path": { - "primary": {repositoryID: 2}, - "up-to-date-secondary": {repositoryID: 2}, - "outdated-secondary": {repositoryID: 2}, + "primary": {repositoryID: ids[1]}, + "up-to-date-secondary": {repositoryID: ids[1]}, + "outdated-secondary": {repositoryID: ids[1]}, }, }, "other-virtual-storage": { "repository-1": { - "primary": {repositoryID: 3}, - "up-to-date-secondary": {repositoryID: 3}, - "outdated-secondary": {repositoryID: 3}, + "primary": {repositoryID: ids[2]}, + "up-to-date-secondary": {repositoryID: ids[2]}, + "outdated-secondary": {repositoryID: ids[2]}, }, }, }, ) - require.NoError(t, rs.IncrementGeneration(ctx, 1, "primary", []string{ + require.NoError(t, rs.IncrementGeneration(ctx, ids[0], "primary", []string{ "up-to-date-secondary", "outdated-secondary", "non-existing-secondary", })) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": {repositoryID: 1, replicaPath: "repository-1"}, - "other-relative-path": {repositoryID: 2, replicaPath: "other-relative-path"}, + "repository-1": {repositoryID: ids[0], replicaPath: "repository-1"}, + "other-relative-path": {repositoryID: ids[1], replicaPath: "other-relative-path"}, }, "other-virtual-storage": { - "repository-1": {repositoryID: 3, replicaPath: "repository-1"}, + "repository-1": {repositoryID: ids[2], replicaPath: "repository-1"}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "primary": {repositoryID: 1, generation: 2}, - "up-to-date-secondary": {repositoryID: 1, generation: 2}, - "outdated-secondary": {repositoryID: 1, generation: 0}, + "primary": {repositoryID: ids[0], generation: 2}, + "up-to-date-secondary": {repositoryID: ids[0], generation: 2}, + "outdated-secondary": {repositoryID: ids[0], generation: 0}, }, "other-relative-path": { - "primary": {repositoryID: 2}, - "up-to-date-secondary": {repositoryID: 2}, - "outdated-secondary": {repositoryID: 2}, + "primary": {repositoryID: ids[1]}, + "up-to-date-secondary": {repositoryID: ids[1]}, + "outdated-secondary": {repositoryID: ids[1]}, }, }, "other-virtual-storage": { "repository-1": { - "primary": {repositoryID: 3}, - "up-to-date-secondary": {repositoryID: 3}, - "outdated-secondary": {repositoryID: 3}, + "primary": {repositoryID: ids[2]}, + "up-to-date-secondary": {repositoryID: ids[2]}, + "outdated-secondary": {repositoryID: ids[2]}, }, }, }, @@ -403,18 +415,19 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("SetGeneration", func(t *testing.T) { t.Run("creates a record for the replica", func(t *testing.T) { rs, requireState := newStore(t, nil) - - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, stor, nil, nil, false, false)) - require.NoError(t, rs.SetGeneration(ctx, 1, "storage-2", 0)) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, stor, nil, nil, false, false)) + require.NoError(t, rs.SetGeneration(ctx, id, "storage-2", 0)) requireState(t, ctx, virtualStorageState{"virtual-storage-1": { - "repository-1": {repositoryID: 1, replicaPath: "repository-1"}, + "repository-1": {repositoryID: id, replicaPath: "repository-1"}, }}, storageState{ "virtual-storage-1": { "repository-1": { - "storage-1": {repositoryID: 1, generation: 0}, - "storage-2": {repositoryID: 1, generation: 0}, + "storage-1": {repositoryID: id, generation: 0}, + "storage-2": {repositoryID: id, generation: 0}, }, }, }, @@ -423,20 +436,21 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("updates existing record", func(t *testing.T) { rs, requireState := newStore(t, nil) - - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "storage-1", nil, nil, false, false)) - require.NoError(t, rs.SetGeneration(ctx, 1, stor, 1)) - require.NoError(t, rs.SetGeneration(ctx, 1, stor, 0)) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, "storage-1", nil, nil, false, false)) + require.NoError(t, rs.SetGeneration(ctx, id, stor, 1)) + require.NoError(t, rs.SetGeneration(ctx, id, stor, 0)) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": {repositoryID: 1, replicaPath: "repository-1"}, + "repository-1": {repositoryID: id, replicaPath: "repository-1"}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "storage-1": {repositoryID: 1, generation: 0}, + "storage-1": {repositoryID: id, generation: 0}, }, }, }, @@ -456,19 +470,20 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("sets an existing replica as the latest", func(t *testing.T) { rs, requireState := newStore(t, nil) - - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "storage-1", []string{"storage-2"}, nil, false, false)) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, "storage-1", []string{"storage-2"}, nil, false, false)) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": {repositoryID: 1, replicaPath: "repository-1"}, + "repository-1": {repositoryID: id, replicaPath: "repository-1"}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "storage-1": {repositoryID: 1, generation: 0}, - "storage-2": {repositoryID: 1, generation: 0}, + "storage-1": {repositoryID: id, generation: 0}, + "storage-2": {repositoryID: id, generation: 0}, }, }, }, @@ -478,14 +493,14 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": {repositoryID: 1, replicaPath: "repository-1"}, + "repository-1": {repositoryID: id, replicaPath: "repository-1"}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "storage-1": {repositoryID: 1, generation: 1}, - "storage-2": {repositoryID: 1, generation: 0}, + "storage-1": {repositoryID: id, generation: 1}, + "storage-2": {repositoryID: id, generation: 0}, }, }, }, @@ -494,18 +509,19 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("sets a new replica as the latest", func(t *testing.T) { rs, requireState := newStore(t, nil) - - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "storage-1", nil, nil, false, false)) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, "storage-1", nil, nil, false, false)) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": {repositoryID: 1, replicaPath: "repository-1"}, + "repository-1": {repositoryID: id, replicaPath: "repository-1"}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "storage-1": {repositoryID: 1, generation: 0}, + "storage-1": {repositoryID: id, generation: 0}, }, }, }, @@ -515,14 +531,14 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": {repositoryID: 1, replicaPath: "repository-1"}, + "repository-1": {repositoryID: id, replicaPath: "repository-1"}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "storage-1": {repositoryID: 1, generation: 0}, - "storage-2": {repositoryID: 1, generation: 1}, + "storage-1": {repositoryID: id, generation: 0}, + "storage-2": {repositoryID: id, generation: 1}, }, }, }, @@ -533,13 +549,16 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("GetGeneration", func(t *testing.T) { rs, _ := newStore(t, nil) - generation, err := rs.GetGeneration(ctx, 1, stor) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) + + generation, err := rs.GetGeneration(ctx, id, stor) require.NoError(t, err) require.Equal(t, GenerationUnknown, generation) - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, stor, nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, stor, nil, nil, false, false)) - generation, err = rs.GetGeneration(ctx, 1, stor) + generation, err = rs.GetGeneration(ctx, id, stor) require.NoError(t, err) require.Equal(t, 0, generation) }) @@ -547,48 +566,54 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("GetReplicatedGeneration", func(t *testing.T) { t.Run("no previous record allowed", func(t *testing.T) { rs, _ := newStore(t, nil) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) - gen, err := rs.GetReplicatedGeneration(ctx, 1, "source", "target") + gen, err := rs.GetReplicatedGeneration(ctx, id, "source", "target") require.NoError(t, err) require.Equal(t, GenerationUnknown, gen) - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "source", nil, nil, false, false)) - gen, err = rs.GetReplicatedGeneration(ctx, 1, "source", "target") + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, "source", nil, nil, false, false)) + gen, err = rs.GetReplicatedGeneration(ctx, id, "source", "target") require.NoError(t, err) require.Equal(t, 0, gen) }) t.Run("upgrade allowed", func(t *testing.T) { rs, _ := newStore(t, nil) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "source", nil, nil, false, false)) - require.NoError(t, rs.IncrementGeneration(ctx, 1, "source", nil)) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, "source", nil, nil, false, false)) + require.NoError(t, rs.IncrementGeneration(ctx, id, "source", nil)) - gen, err := rs.GetReplicatedGeneration(ctx, 1, "source", "target") + gen, err := rs.GetReplicatedGeneration(ctx, id, "source", "target") require.NoError(t, err) require.Equal(t, 1, gen) - require.NoError(t, rs.SetGeneration(ctx, 1, "target", 0)) - gen, err = rs.GetReplicatedGeneration(ctx, 1, "source", "target") + require.NoError(t, rs.SetGeneration(ctx, id, "target", 0)) + gen, err = rs.GetReplicatedGeneration(ctx, id, "source", "target") require.NoError(t, err) require.Equal(t, 1, gen) }) t.Run("downgrade prevented", func(t *testing.T) { rs, _ := newStore(t, nil) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "target", nil, nil, false, false)) - require.NoError(t, rs.IncrementGeneration(ctx, 1, "target", nil)) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, "target", nil, nil, false, false)) + require.NoError(t, rs.IncrementGeneration(ctx, id, "target", nil)) - _, err := rs.GetReplicatedGeneration(ctx, 1, "source", "target") + _, err = rs.GetReplicatedGeneration(ctx, id, "source", "target") require.Equal(t, DowngradeAttemptedError{"target", 1, GenerationUnknown}, err) - require.NoError(t, rs.SetGeneration(ctx, 1, "source", 1)) - _, err = rs.GetReplicatedGeneration(ctx, 1, "source", "target") + require.NoError(t, rs.SetGeneration(ctx, id, "source", 1)) + _, err = rs.GetReplicatedGeneration(ctx, id, "source", "target") require.Equal(t, DowngradeAttemptedError{"target", 1, 1}, err) - require.NoError(t, rs.SetGeneration(ctx, 1, "source", 0)) - _, err = rs.GetReplicatedGeneration(ctx, 1, "source", "target") + require.NoError(t, rs.SetGeneration(ctx, id, "source", 0)) + _, err = rs.GetReplicatedGeneration(ctx, id, "source", "target") require.Equal(t, DowngradeAttemptedError{"target", 1, 0}, err) }) }) @@ -657,26 +682,28 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { } { t.Run(tc.desc, func(t *testing.T) { rs, requireState := newStore(t, nil) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "primary", tc.updatedSecondaries, tc.outdatedSecondaries, tc.storePrimary, tc.storeAssignments)) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, "primary", tc.updatedSecondaries, tc.outdatedSecondaries, tc.storePrimary, tc.storeAssignments)) expectedStorageState := storageState{ vs: { repo: { - "primary": {repositoryID: 1, generation: 0}, + "primary": {repositoryID: id, generation: 0}, }, }, } for _, updatedSecondary := range tc.updatedSecondaries { - expectedStorageState[vs][repo][updatedSecondary] = replicaRecord{repositoryID: 1, generation: 0} + expectedStorageState[vs][repo][updatedSecondary] = replicaRecord{repositoryID: id, generation: 0} } requireState(t, ctx, virtualStorageState{ vs: { repo: { - repositoryID: 1, + repositoryID: id, replicaPath: repo, primary: tc.expectedPrimary, assignments: tc.expectedAssignments, @@ -691,21 +718,25 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("conflict due to virtual storage and relative path", func(t *testing.T) { rs, _ := newStore(t, nil) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, stor, nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, stor, nil, nil, false, false)) require.Equal(t, RepositoryExistsError{vs, repo, stor}, - rs.CreateRepository(ctx, 2, vs, repo, stor, nil, nil, false, false), + rs.CreateRepository(ctx, -1, vs, repo, stor, nil, nil, false, false), ) }) t.Run("conflict due to repository id", func(t *testing.T) { rs, _ := newStore(t, nil) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) - require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path-1", "storage-1", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, id, "virtual-storage-1", "relative-path-1", "storage-1", nil, nil, false, false)) require.Equal(t, - fmt.Errorf("repository id 1 already in use"), - rs.CreateRepository(ctx, 1, "virtual-storage-2", "relative-path-2", "storage-2", nil, nil, false, false), + fmt.Errorf("repository id %d already in use", id), + rs.CreateRepository(ctx, id, "virtual-storage-2", "relative-path-2", "storage-2", nil, nil, false, false), ) }) }) @@ -722,34 +753,40 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("delete existing", func(t *testing.T) { rs, requireState := newStore(t, nil) + id1, err := rs.ReserveRepositoryID(ctx, "virtual-storage-1", "repository-1") + require.NoError(t, err) + id2, err := rs.ReserveRepositoryID(ctx, "virtual-storage-2", "repository-2") + require.NoError(t, err) + id3, err := rs.ReserveRepositoryID(ctx, "virtual-storage-3", "repository-3") + require.NoError(t, err) - require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "repository-1", "storage-1", nil, nil, false, false)) - require.NoError(t, rs.CreateRepository(ctx, 2, "virtual-storage-2", "repository-1", "storage-1", []string{"storage-2"}, nil, false, false)) - require.NoError(t, rs.CreateRepository(ctx, 3, "virtual-storage-2", "repository-2", "storage-1", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, id1, "virtual-storage-1", "repository-1", "storage-1", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, id2, "virtual-storage-2", "repository-1", "storage-1", []string{"storage-2"}, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, id3, "virtual-storage-2", "repository-2", "storage-1", nil, nil, false, false)) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": {repositoryID: 1, replicaPath: "repository-1"}, + "repository-1": {repositoryID: id1, replicaPath: "repository-1"}, }, "virtual-storage-2": { - "repository-1": {repositoryID: 2, replicaPath: "repository-1"}, - "repository-2": {repositoryID: 3, replicaPath: "repository-2"}, + "repository-1": {repositoryID: id2, replicaPath: "repository-1"}, + "repository-2": {repositoryID: id3, replicaPath: "repository-2"}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "storage-1": {repositoryID: 1}, + "storage-1": {repositoryID: id1}, }, }, "virtual-storage-2": { "repository-1": { - "storage-1": {repositoryID: 2}, - "storage-2": {repositoryID: 2}, + "storage-1": {repositoryID: id2}, + "storage-2": {repositoryID: id2}, }, "repository-2": { - "storage-1": {repositoryID: 3}, + "storage-1": {repositoryID: id3}, }, }, }, @@ -763,21 +800,21 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": {repositoryID: 1, replicaPath: "repository-1"}, + "repository-1": {repositoryID: id1, replicaPath: "repository-1"}, }, "virtual-storage-2": { - "repository-2": {repositoryID: 3, replicaPath: "repository-2"}, + "repository-2": {repositoryID: id3, replicaPath: "repository-2"}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "storage-1": {repositoryID: 1}, + "storage-1": {repositoryID: id1}, }, }, "virtual-storage-2": { "repository-2": { - "storage-1": {repositoryID: 3}, + "storage-1": {repositoryID: id3}, }, }, }, @@ -793,33 +830,39 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { }) t.Run("delete existing", func(t *testing.T) { - require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path-1", "storage-1", []string{"storage-2"}, nil, false, false)) - require.NoError(t, rs.CreateRepository(ctx, 2, "virtual-storage-1", "relative-path-2", "storage-1", nil, nil, false, false)) - require.NoError(t, rs.CreateRepository(ctx, 3, "virtual-storage-2", "relative-path-1", "storage-1", nil, nil, false, false)) + id1, err := rs.ReserveRepositoryID(ctx, "virtual-storage-1", "relative-path-1") + require.NoError(t, err) + id2, err := rs.ReserveRepositoryID(ctx, "virtual-storage-2", "relative-path-2") + require.NoError(t, err) + id3, err := rs.ReserveRepositoryID(ctx, "virtual-storage-3", "relative-path-3") + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id1, "virtual-storage-1", "relative-path-1", "storage-1", []string{"storage-2"}, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, id2, "virtual-storage-1", "relative-path-2", "storage-1", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, id3, "virtual-storage-2", "relative-path-1", "storage-1", nil, nil, false, false)) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "relative-path-1": {repositoryID: 1, replicaPath: "relative-path-1"}, - "relative-path-2": {repositoryID: 2, replicaPath: "relative-path-2"}, + "relative-path-1": {repositoryID: id1, replicaPath: "relative-path-1"}, + "relative-path-2": {repositoryID: id2, replicaPath: "relative-path-2"}, }, "virtual-storage-2": { - "relative-path-1": {repositoryID: 3, replicaPath: "relative-path-1"}, + "relative-path-1": {repositoryID: id3, replicaPath: "relative-path-1"}, }, }, storageState{ "virtual-storage-1": { "relative-path-1": { - "storage-1": {repositoryID: 1, generation: 0}, - "storage-2": {repositoryID: 1, generation: 0}, + "storage-1": {repositoryID: id1, generation: 0}, + "storage-2": {repositoryID: id1, generation: 0}, }, "relative-path-2": { - "storage-1": {repositoryID: 2, generation: 0}, + "storage-1": {repositoryID: id2, generation: 0}, }, }, "virtual-storage-2": { "relative-path-1": { - "storage-1": {repositoryID: 3, generation: 0}, + "storage-1": {repositoryID: id3, generation: 0}, }, }, }, @@ -830,25 +873,25 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "relative-path-1": {repositoryID: 1, replicaPath: "relative-path-1"}, - "relative-path-2": {repositoryID: 2, replicaPath: "relative-path-2"}, + "relative-path-1": {repositoryID: id1, replicaPath: "relative-path-1"}, + "relative-path-2": {repositoryID: id2, replicaPath: "relative-path-2"}, }, "virtual-storage-2": { - "relative-path-1": {repositoryID: 3, replicaPath: "relative-path-1"}, + "relative-path-1": {repositoryID: id3, replicaPath: "relative-path-1"}, }, }, storageState{ "virtual-storage-1": { "relative-path-1": { - "storage-2": {repositoryID: 1, generation: 0}, + "storage-2": {repositoryID: id1, generation: 0}, }, "relative-path-2": { - "storage-1": {repositoryID: 2, generation: 0}, + "storage-1": {repositoryID: id2, generation: 0}, }, }, "virtual-storage-2": { "relative-path-1": { - "storage-1": {repositoryID: 3, generation: 0}, + "storage-1": {repositoryID: id3, generation: 0}, }, }, }, @@ -868,25 +911,28 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("rename existing", func(t *testing.T) { rs, requireState := newStore(t, nil) - - require.NoError(t, rs.CreateRepository(ctx, 1, vs, "renamed-all", "storage-1", nil, nil, false, false)) - require.NoError(t, rs.CreateRepository(ctx, 2, vs, "renamed-some", "storage-1", []string{"storage-2"}, nil, false, false)) + id1, err := rs.ReserveRepositoryID(ctx, vs, "renamed-all") + require.NoError(t, err) + id2, err := rs.ReserveRepositoryID(ctx, vs, "renamed-some") + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id1, vs, "renamed-all", "storage-1", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, id2, vs, "renamed-some", "storage-1", []string{"storage-2"}, nil, false, false)) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "renamed-all": {repositoryID: 1, replicaPath: "renamed-all"}, - "renamed-some": {repositoryID: 2, replicaPath: "renamed-some"}, + "renamed-all": {repositoryID: id1, replicaPath: "renamed-all"}, + "renamed-some": {repositoryID: id2, replicaPath: "renamed-some"}, }, }, storageState{ "virtual-storage-1": { "renamed-all": { - "storage-1": {repositoryID: 1, generation: 0}, + "storage-1": {repositoryID: id1, generation: 0}, }, "renamed-some": { - "storage-1": {repositoryID: 2, generation: 0}, - "storage-2": {repositoryID: 2, generation: 0}, + "storage-1": {repositoryID: id2, generation: 0}, + "storage-2": {repositoryID: id2, generation: 0}, }, }, }, @@ -898,20 +944,20 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "renamed-all-new": {repositoryID: 1, replicaPath: "renamed-all-new"}, - "renamed-some-new": {repositoryID: 2, replicaPath: "renamed-some-new"}, + "renamed-all-new": {repositoryID: id1, replicaPath: "renamed-all-new"}, + "renamed-some-new": {repositoryID: id2, replicaPath: "renamed-some-new"}, }, }, storageState{ "virtual-storage-1": { "renamed-all-new": { - "storage-1": {repositoryID: 1, generation: 0}, + "storage-1": {repositoryID: id1, generation: 0}, }, "renamed-some-new": { - "storage-1": {repositoryID: 2, generation: 0}, + "storage-1": {repositoryID: id2, generation: 0}, }, "renamed-some": { - "storage-2": {repositoryID: 2, generation: 0}, + "storage-2": {repositoryID: id2, generation: 0}, }, }, }, @@ -930,27 +976,28 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { require.Empty(t, replicaPath) require.Empty(t, secondaries) - replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1e10) require.Equal(t, commonerr.ErrRepositoryNotFound, err) require.Empty(t, replicaPath) require.Empty(t, secondaries) }) - - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "primary", []string{"consistent-secondary"}, nil, false, false)) - require.NoError(t, rs.IncrementGeneration(ctx, 1, "primary", []string{"consistent-secondary"})) - require.NoError(t, rs.SetGeneration(ctx, 1, "inconsistent-secondary", 0)) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, "primary", []string{"consistent-secondary"}, nil, false, false)) + require.NoError(t, rs.IncrementGeneration(ctx, id, "primary", []string{"consistent-secondary"})) + require.NoError(t, rs.SetGeneration(ctx, id, "inconsistent-secondary", 0)) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": {repositoryID: 1, replicaPath: "repository-1"}, + "repository-1": {repositoryID: id, replicaPath: "repository-1"}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "primary": {repositoryID: 1, generation: 1}, - "consistent-secondary": {repositoryID: 1, generation: 1}, - "inconsistent-secondary": {repositoryID: 1, generation: 0}, + "primary": {repositoryID: id, generation: 1}, + "consistent-secondary": {repositoryID: id, generation: 1}, + "inconsistent-secondary": {repositoryID: id, generation: 0}, }, }, }, @@ -962,13 +1009,13 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { require.Equal(t, map[string]struct{}{"primary": {}, "consistent-secondary": {}}, secondaries) require.Equal(t, repo, replicaPath) - replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, id) require.NoError(t, err) require.Equal(t, map[string]struct{}{"primary": {}, "consistent-secondary": {}}, secondaries) require.Equal(t, repo, replicaPath) }) - require.NoError(t, rs.SetGeneration(ctx, 1, "primary", 0)) + require.NoError(t, rs.SetGeneration(ctx, id, "primary", 0)) t.Run("outdated primary", func(t *testing.T) { replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo) @@ -976,28 +1023,28 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { require.Equal(t, map[string]struct{}{"consistent-secondary": {}}, secondaries) require.Equal(t, repo, replicaPath) - replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, id) require.NoError(t, err) require.Equal(t, map[string]struct{}{"consistent-secondary": {}}, secondaries) require.Equal(t, repo, replicaPath) }) t.Run("storage with highest generation is not configured", func(t *testing.T) { - require.NoError(t, rs.SetGeneration(ctx, 1, "unknown", 2)) - require.NoError(t, rs.SetGeneration(ctx, 1, "primary", 1)) + require.NoError(t, rs.SetGeneration(ctx, id, "unknown", 2)) + require.NoError(t, rs.SetGeneration(ctx, id, "primary", 1)) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": {repositoryID: 1, replicaPath: "repository-1"}, + "repository-1": {repositoryID: id, replicaPath: "repository-1"}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "unknown": {repositoryID: 1, generation: 2}, - "primary": {repositoryID: 1, generation: 1}, - "consistent-secondary": {repositoryID: 1, generation: 1}, - "inconsistent-secondary": {repositoryID: 1, generation: 0}, + "unknown": {repositoryID: id, generation: 2}, + "primary": {repositoryID: id, generation: 1}, + "consistent-secondary": {repositoryID: id, generation: 1}, + "inconsistent-secondary": {repositoryID: id, generation: 0}, }, }, }, @@ -1008,7 +1055,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { require.Equal(t, map[string]struct{}{"unknown": {}}, secondaries) require.Equal(t, repo, replicaPath) - replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, id) require.NoError(t, err) require.Equal(t, map[string]struct{}{"unknown": {}}, secondaries) require.Equal(t, repo, replicaPath) @@ -1024,7 +1071,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { require.Empty(t, secondaries) require.Empty(t, replicaPath) - replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, id) require.Equal(t, commonerr.ErrRepositoryNotFound, err) require.Empty(t, secondaries) require.Empty(t, replicaPath) @@ -1034,25 +1081,29 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("DeleteInvalidRepository", func(t *testing.T) { t.Run("only replica", func(t *testing.T) { rs, requireState := newStore(t, nil) - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "invalid-storage", nil, nil, false, false)) - require.NoError(t, rs.DeleteInvalidRepository(ctx, 1, "invalid-storage")) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, "invalid-storage", nil, nil, false, false)) + require.NoError(t, rs.DeleteInvalidRepository(ctx, id, "invalid-storage")) requireState(t, ctx, virtualStorageState{}, storageState{}) }) t.Run("another replica", func(t *testing.T) { rs, requireState := newStore(t, nil) - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "invalid-storage", []string{"other-storage"}, nil, false, false)) - require.NoError(t, rs.DeleteInvalidRepository(ctx, 1, "invalid-storage")) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, "invalid-storage", []string{"other-storage"}, nil, false, false)) + require.NoError(t, rs.DeleteInvalidRepository(ctx, id, "invalid-storage")) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": {repositoryID: 1, replicaPath: "repository-1"}, + "repository-1": {repositoryID: id, replicaPath: "repository-1"}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "other-storage": {repositoryID: 1, generation: 0}, + "other-storage": {repositoryID: id, generation: 0}, }, }, }, @@ -1067,7 +1118,9 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { require.NoError(t, err) require.False(t, exists) - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, stor, nil, nil, false, false)) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, stor, nil, nil, false, false)) exists, err = rs.RepositoryExists(ctx, vs, repo) require.NoError(t, err) require.True(t, exists) @@ -1082,23 +1135,22 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("ReserveRepositoryID", func(t *testing.T) { rs, _ := newStore(t, nil) - id, err := rs.ReserveRepositoryID(ctx, vs, repo) + idInitial, err := rs.ReserveRepositoryID(ctx, vs, repo) require.NoError(t, err) - require.Equal(t, int64(1), id) - id, err = rs.ReserveRepositoryID(ctx, vs, repo) + idNext, err := rs.ReserveRepositoryID(ctx, vs, repo) require.NoError(t, err) - require.Equal(t, int64(2), id) + require.NotEqual(t, idNext, idInitial) - require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, stor, nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, idNext, vs, repo, stor, nil, nil, false, false)) - id, err = rs.ReserveRepositoryID(ctx, vs, repo) + id, err := rs.ReserveRepositoryID(ctx, vs, repo) require.Equal(t, commonerr.ErrRepositoryAlreadyExists, err) require.Equal(t, int64(0), id) id, err = rs.ReserveRepositoryID(ctx, vs, repo+"-2") require.NoError(t, err) - require.Equal(t, int64(3), id) + require.NotEqual(t, idNext, id) }) t.Run("GetRepositoryID", func(t *testing.T) { @@ -1108,11 +1160,13 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { require.Equal(t, commonerr.NewRepositoryNotFoundError(vs, repo), err) require.Equal(t, int64(0), id) - require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, stor, nil, nil, false, false)) + id, err = rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, stor, nil, nil, false, false)) - id, err = rs.GetRepositoryID(ctx, vs, repo) + idReturned, err := rs.GetRepositoryID(ctx, vs, repo) require.Nil(t, err) - require.Equal(t, int64(1), id) + require.Equal(t, id, idReturned) }) } diff --git a/internal/praefect/datastore/storage_cleanup_test.go b/internal/praefect/datastore/storage_cleanup_test.go index 1beb61af19a..4ff18a00ae6 100644 --- a/internal/praefect/datastore/storage_cleanup_test.go +++ b/internal/praefect/datastore/storage_cleanup_test.go @@ -207,8 +207,21 @@ func TestStorageCleanup_Exists(t *testing.T) { db := glsql.NewDB(t) repoStore := NewPostgresRepositoryStore(db.DB, nil) - require.NoError(t, repoStore.CreateRepository(ctx, 0, "vs", "p/1", "g1", []string{"g2", "g3"}, nil, false, false)) - require.NoError(t, repoStore.CreateRepository(ctx, 1, "vs", "p/2", "g1", []string{"g2", "g3"}, nil, false, false)) + const ( + virtualStorage = "vs" + relativePath1 = "p/1" + relativePath2 = "p/2" + storage1 = "g1" + storage2 = "g2" + storage3 = "g3" + ) + id1, err := repoStore.ReserveRepositoryID(ctx, virtualStorage, relativePath1) + require.NoError(t, err) + id2, err := repoStore.ReserveRepositoryID(ctx, virtualStorage, relativePath2) + require.NoError(t, err) + + require.NoError(t, repoStore.CreateRepository(ctx, id1, virtualStorage, relativePath1, storage1, []string{storage2, storage3}, nil, false, false)) + require.NoError(t, repoStore.CreateRepository(ctx, id2, virtualStorage, relativePath2, storage1, []string{storage2, storage3}, nil, false, false)) storageCleanup := NewStorageCleanup(db.DB) for _, tc := range []struct { @@ -220,57 +233,57 @@ func TestStorageCleanup_Exists(t *testing.T) { }{ { desc: "multiple doesn't exist", - virtualStorage: "vs", - storage: "g1", - relativeReplicaPaths: []string{"p/1", "p/2", "path/x", "path/y"}, + virtualStorage: virtualStorage, + storage: storage1, + relativeReplicaPaths: []string{relativePath1, relativePath2, "path/x", "path/y"}, out: []RepositoryClusterPath{ - {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "g1"}, RelativeReplicaPath: "path/x"}, - {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "g1"}, RelativeReplicaPath: "path/y"}, + {ClusterPath: ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativeReplicaPath: "path/x"}, + {ClusterPath: ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativeReplicaPath: "path/y"}, }, }, { desc: "duplicates", - virtualStorage: "vs", - storage: "g1", - relativeReplicaPaths: []string{"p/1", "path/x", "path/x"}, + virtualStorage: virtualStorage, + storage: storage1, + relativeReplicaPaths: []string{relativePath1, "path/x", "path/x"}, out: []RepositoryClusterPath{ - {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "g1"}, RelativeReplicaPath: "path/x"}, + {ClusterPath: ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativeReplicaPath: "path/x"}, }, }, { desc: "all exist", - virtualStorage: "vs", - storage: "g1", - relativeReplicaPaths: []string{"p/1", "p/2"}, + virtualStorage: virtualStorage, + storage: storage1, + relativeReplicaPaths: []string{relativePath1, relativePath2}, out: nil, }, { desc: "all doesn't exist", - virtualStorage: "vs", - storage: "g1", + virtualStorage: virtualStorage, + storage: storage1, relativeReplicaPaths: []string{"path/x", "path/y", "path/z"}, out: []RepositoryClusterPath{ - {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "g1"}, RelativeReplicaPath: "path/x"}, - {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "g1"}, RelativeReplicaPath: "path/y"}, - {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "g1"}, RelativeReplicaPath: "path/z"}, + {ClusterPath: ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativeReplicaPath: "path/x"}, + {ClusterPath: ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativeReplicaPath: "path/y"}, + {ClusterPath: ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativeReplicaPath: "path/z"}, }, }, { desc: "doesn't exist because of storage", - virtualStorage: "vs", + virtualStorage: virtualStorage, storage: "stub", relativeReplicaPaths: []string{"path/x"}, out: []RepositoryClusterPath{ - {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "stub"}, RelativeReplicaPath: "path/x"}, + {ClusterPath: ClusterPath{VirtualStorage: virtualStorage, Storage: "stub"}, RelativeReplicaPath: "path/x"}, }, }, { desc: "doesn't exist because of virtual storage", virtualStorage: "stub", - storage: "g1", + storage: storage1, relativeReplicaPaths: []string{"path/x"}, out: []RepositoryClusterPath{ - {ClusterPath: ClusterPath{VirtualStorage: "stub", Storage: "g1"}, RelativeReplicaPath: "path/x"}, + {ClusterPath: ClusterPath{VirtualStorage: "stub", Storage: storage1}, RelativeReplicaPath: "path/x"}, }, }, } { diff --git a/internal/praefect/info_service_test.go b/internal/praefect/info_service_test.go index a492841245b..e45bfc73a7c 100644 --- a/internal/praefect/info_service_test.go +++ b/internal/praefect/info_service_test.go @@ -78,8 +78,10 @@ func TestInfoService_RepositoryReplicas(t *testing.T) { elector := nodes.NewPerRepositoryElector(tx) conns := nodeSet.Connections() rs := datastore.NewPostgresRepositoryStore(tx, conf.StorageNames()) + id, err := rs.ReserveRepositoryID(ctx, virtualStorage, testRepo.GetRelativePath()) + require.NoError(t, err) require.NoError(t, - rs.CreateRepository(ctx, 1, virtualStorage, testRepo.GetRelativePath(), "g-1", []string{"g-2", "g-3"}, nil, true, false), + rs.CreateRepository(ctx, id, virtualStorage, testRepo.GetRelativePath(), "g-1", []string{"g-2", "g-3"}, nil, true, false), ) cc, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{ diff --git a/internal/praefect/nodes/health_manager_test.go b/internal/praefect/nodes/health_manager_test.go index b5c44aafaef..a36bb67590a 100644 --- a/internal/praefect/nodes/health_manager_test.go +++ b/internal/praefect/nodes/health_manager_test.go @@ -142,7 +142,7 @@ func TestHealthManager(t *testing.T) { }, }, { - After: failoverTimeout, + After: failoverTimeout + time.Millisecond, PraefectName: "praefect-1", LocalStatus: LocalStatus{ "virtual-storage-1": { @@ -178,7 +178,7 @@ func TestHealthManager(t *testing.T) { HealthConsensus: map[string][]string{}, }, { - After: activePraefectTimeout, + After: activePraefectTimeout + time.Millisecond, PraefectName: "praefect-3", LocalStatus: LocalStatus{ "virtual-storage-1": { @@ -456,7 +456,7 @@ func TestHealthManager(t *testing.T) { }, }, { - After: failoverTimeout, + After: failoverTimeout + time.Millisecond, PraefectName: "praefect-1", LocalStatus: LocalStatus{ "virtual-storage-1": { diff --git a/internal/praefect/nodes/per_repository_test.go b/internal/praefect/nodes/per_repository_test.go index 184d31614e5..426cf384d8a 100644 --- a/internal/praefect/nodes/per_repository_test.go +++ b/internal/praefect/nodes/per_repository_test.go @@ -485,29 +485,32 @@ func TestPerRepositoryElector(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { + var repositoryID int64 rs := datastore.NewPostgresRepositoryStore(db, nil) for virtualStorage, relativePaths := range tc.state { for relativePath, storages := range relativePaths { - repositoryID, err := rs.ReserveRepositoryID(ctx, virtualStorage, relativePath) + reservedRepositoryID, err := rs.ReserveRepositoryID(ctx, virtualStorage, relativePath) require.NoError(t, err) repoCreated := false for storage, record := range storages { if !repoCreated { repoCreated = true - require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, relativePath, storage, nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, reservedRepositoryID, virtualStorage, relativePath, storage, nil, nil, false, false)) defer func(virtualStorage, relativePath string) { _, _, err = rs.DeleteRepository(ctx, virtualStorage, relativePath) require.NoError(t, err) }(virtualStorage, relativePath) + repositoryID = reservedRepositoryID } - require.NoError(t, rs.SetGeneration(ctx, repositoryID, storage, record.generation)) + require.NoError(t, rs.SetGeneration(ctx, reservedRepositoryID, storage, record.generation)) if record.assigned { _, err := db.ExecContext(ctx, ` - INSERT INTO repository_assignments VALUES ($1, $2, $3, $4) - `, virtualStorage, relativePath, storage, repositoryID) + INSERT INTO repository_assignments (virtual_storage, relative_path, storage, repository_id) + VALUES ($1, $2, $3, $4) + `, virtualStorage, relativePath, storage, reservedRepositoryID) require.NoError(t, err) } } @@ -534,7 +537,6 @@ func TestPerRepositoryElector(t *testing.T) { } previousPrimary := "" - const repositoryID int64 = 1 for _, step := range tc.steps { runElection := func(tx *glsql.TxWrapper) (string, *logrus.Entry) { diff --git a/internal/praefect/reconciler/reconciler_test.go b/internal/praefect/reconciler/reconciler_test.go index cf769462d53..3d54fb316d0 100644 --- a/internal/praefect/reconciler/reconciler_test.go +++ b/internal/praefect/reconciler/reconciler_test.go @@ -3,9 +3,7 @@ package reconciler import ( "context" "database/sql" - "errors" "fmt" - "sort" "testing" "github.com/lib/pq" @@ -32,9 +30,11 @@ func TestReconciler(t *testing.T) { type repositories map[string]map[string]map[string]storageRecord type existingJobs []datastore.ReplicationEvent - // Jobs is a set of jobs that the reconciliation run might produce. Only one of the job - // sets is expected from a run at a time. - type jobs [][]datastore.ReplicationJob + type jobs []datastore.ReplicationJob + type jobsCondition struct { + oneOf bool + jobs jobs + } type storages map[string][]string configuredStorages := storages{ @@ -86,12 +86,12 @@ func TestReconciler(t *testing.T) { db := glsql.NewDB(t) for _, tc := range []struct { - desc string - healthyStorages storages - repositories repositories - existingJobs existingJobs - deletedRepositories map[string][]string - reconciliationJobs jobs + desc string + healthyStorages storages + repositories repositories + existingJobs existingJobs + deletedRepositories map[string][]string + reconciliationJobCondition jobsCondition }{ { desc: "no repositories", @@ -126,8 +126,8 @@ func TestReconciler(t *testing.T) { }, }, }, - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + jobs: jobs{ { Change: datastore.UpdateRepo, VirtualStorage: "virtual-storage-1", @@ -160,20 +160,22 @@ func TestReconciler(t *testing.T) { return repos }(), - reconciliationJobs: func() jobs { - var generated []datastore.ReplicationJob - for i := 0; i < 2*logBatchSize+1; i++ { - generated = append(generated, datastore.ReplicationJob{ - Change: datastore.UpdateRepo, - VirtualStorage: "virtual-storage-1", - RelativePath: fmt.Sprintf("relative-path-%d", i), - SourceNodeStorage: "storage-1", - TargetNodeStorage: "storage-2", - }) - } + reconciliationJobCondition: jobsCondition{ + jobs: func() jobs { + var generated jobs + for i := 0; i < 2*logBatchSize+1; i++ { + generated = append(generated, datastore.ReplicationJob{ + Change: datastore.UpdateRepo, + VirtualStorage: "virtual-storage-1", + RelativePath: fmt.Sprintf("relative-path-%d", i), + SourceNodeStorage: "storage-1", + TargetNodeStorage: "storage-2", + }) + } - return jobs{generated} - }(), + return generated + }(), + }, }, { desc: "no healthy source to reconcile from", @@ -203,8 +205,8 @@ func TestReconciler(t *testing.T) { }, }, }, - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + jobs: jobs{ { Change: datastore.UpdateRepo, VirtualStorage: "virtual-storage-1", @@ -226,8 +228,8 @@ func TestReconciler(t *testing.T) { }, }, }, - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + jobs: jobs{ { Change: datastore.UpdateRepo, VirtualStorage: "virtual-storage-1", @@ -261,8 +263,8 @@ func TestReconciler(t *testing.T) { }, }, }, - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + jobs: jobs{ { Change: datastore.UpdateRepo, VirtualStorage: "virtual-storage-1", @@ -320,8 +322,8 @@ func TestReconciler(t *testing.T) { TargetNodeStorage: "storage-1", }, ), - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + jobs: jobs{ { Change: datastore.UpdateRepo, VirtualStorage: "virtual-storage-1", @@ -357,8 +359,8 @@ func TestReconciler(t *testing.T) { TargetNodeStorage: "storage-2", }, ), - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + jobs: jobs{ { Change: datastore.UpdateRepo, VirtualStorage: "virtual-storage-1", @@ -404,8 +406,8 @@ func TestReconciler(t *testing.T) { TargetNodeStorage: "storage-3", }, ), - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + jobs: jobs{ { Change: datastore.UpdateRepo, VirtualStorage: "virtual-storage-1", @@ -442,8 +444,8 @@ func TestReconciler(t *testing.T) { }, }, }, - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + jobs: jobs{ { Change: datastore.UpdateRepo, VirtualStorage: "virtual-storage-1", @@ -473,8 +475,8 @@ func TestReconciler(t *testing.T) { }, }, }, - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + jobs: jobs{ { Change: datastore.UpdateRepo, VirtualStorage: "virtual-storage-1", @@ -504,8 +506,8 @@ func TestReconciler(t *testing.T) { }, }, }, - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + jobs: jobs{ { Change: datastore.DeleteReplica, VirtualStorage: "virtual-storage-1", @@ -527,17 +529,15 @@ func TestReconciler(t *testing.T) { }, }, }, - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + oneOf: true, + jobs: jobs{ { Change: datastore.DeleteReplica, VirtualStorage: "virtual-storage-1", RelativePath: "relative-path-1", TargetNodeStorage: "storage-2", - }, - }, - { - { + }, { Change: datastore.DeleteReplica, VirtualStorage: "virtual-storage-1", RelativePath: "relative-path-1", @@ -569,8 +569,8 @@ func TestReconciler(t *testing.T) { }, }, }, - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + jobs: jobs{ { Change: datastore.DeleteReplica, VirtualStorage: "virtual-storage-1", @@ -591,8 +591,8 @@ func TestReconciler(t *testing.T) { }, }, }, - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + jobs: jobs{ { Change: datastore.UpdateRepo, VirtualStorage: "virtual-storage-1", @@ -626,8 +626,8 @@ func TestReconciler(t *testing.T) { }, }, }, - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + jobs: jobs{ { Change: datastore.UpdateRepo, VirtualStorage: "virtual-storage-1", @@ -823,8 +823,8 @@ func TestReconciler(t *testing.T) { }, }, }, - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + jobs: jobs{ { Change: datastore.DeleteReplica, VirtualStorage: "virtual-storage-1", @@ -1011,8 +1011,8 @@ func TestReconciler(t *testing.T) { }, }, }, - reconciliationJobs: jobs{ - { + reconciliationJobCondition: jobsCondition{ + jobs: jobs{ { Change: datastore.DeleteReplica, VirtualStorage: "virtual-storage-1", @@ -1028,19 +1028,15 @@ func TestReconciler(t *testing.T) { defer cancel() db.Truncate(t, "replication_queue") - db.SequenceReset(t) // set up the repository generation records expected by the test case rs := datastore.NewPostgresRepositoryStore(db, configuredStorages) for virtualStorage, relativePaths := range tc.repositories { for relativePath, storages := range relativePaths { var repositoryID int64 - repoCreated := false for storage, repo := range storages { if repo.generation >= 0 { - if !repoCreated { - repoCreated = true - + if repositoryID == 0 { var err error repositoryID, err = rs.ReserveRepositoryID(ctx, virtualStorage, relativePath) require.NoError(t, err) @@ -1049,9 +1045,7 @@ func TestReconciler(t *testing.T) { defer func(virtualStorage, relativePath string) { _, _, err := rs.DeleteRepository(ctx, virtualStorage, relativePath) - if !errors.As(err, &commonerr.RepositoryNotFoundError{}) { - require.NoError(t, err) - } + require.NoError(t, err) }(virtualStorage, relativePath) } @@ -1061,10 +1055,12 @@ func TestReconciler(t *testing.T) { for storage, repo := range storages { if repo.assigned { - _, err := db.ExecContext(ctx, ` - INSERT INTO repository_assignments (repository_id, virtual_storage, relative_path, storage) - VALUES ($1, $2, $3, $4) - `, repositoryID, virtualStorage, relativePath, storage) + _, err := db.ExecContext( + ctx, + `INSERT INTO repository_assignments (repository_id, virtual_storage, relative_path, storage) + VALUES ($1, $2, $3, $4)`, + repositoryID, virtualStorage, relativePath, storage, + ) require.NoError(t, err) } } @@ -1131,15 +1127,13 @@ func TestReconciler(t *testing.T) { } // Fill the expected reconciliation jobs with generated repository IDs. - for _, jobs := range tc.reconciliationJobs { - for i, job := range jobs { - id, err := rs.GetRepositoryID(ctx, job.VirtualStorage, job.RelativePath) - if err != nil { - require.Equal(t, commonerr.NewRepositoryNotFoundError(job.VirtualStorage, job.RelativePath), err) - } - - jobs[i].RepositoryID = id + for i, job := range tc.reconciliationJobCondition.jobs { + id, err := rs.GetRepositoryID(ctx, job.VirtualStorage, job.RelativePath) + if err != nil { + require.Equal(t, commonerr.NewRepositoryNotFoundError(job.VirtualStorage, job.RelativePath), err) } + + tc.reconciliationJobCondition.jobs[i].RepositoryID = id } // run reconcile in two concurrent transactions to ensure everything works @@ -1171,7 +1165,7 @@ func TestReconciler(t *testing.T) { require.NoError(t, err) defer rows.Close() - actualJobs := make([]datastore.ReplicationJob, 0, len(tc.reconciliationJobs)) + actualJobs := make(jobs, 0, len(tc.reconciliationJobCondition.jobs)) for rows.Next() { var job datastore.ReplicationJob var meta datastore.Params @@ -1181,22 +1175,11 @@ func TestReconciler(t *testing.T) { } require.NoError(t, rows.Err()) - - expectedJobs := tc.reconciliationJobs - if expectedJobs == nil { - // If the test case defined there are no jobs to be produced, just set an empty slice so the - // require.Contains matches the empty set. - expectedJobs = jobs{{}} - } - - // Sort the jobs so the require.Contains works below, the order of the produced jobs is not important. - for _, jobs := range append(expectedJobs, actualJobs) { - sort.Slice(jobs, func(i, j int) bool { - return jobs[i].VirtualStorage+jobs[i].RelativePath < jobs[j].VirtualStorage+jobs[j].RelativePath - }) + if tc.reconciliationJobCondition.oneOf { + require.Subset(t, tc.reconciliationJobCondition.jobs, actualJobs) + } else { + require.ElementsMatch(t, tc.reconciliationJobCondition.jobs, actualJobs) } - - require.Contains(t, expectedJobs, actualJobs) }) } } diff --git a/internal/praefect/replicator_pg_test.go b/internal/praefect/replicator_pg_test.go index 7f79191a62a..7e1d3bc1e42 100644 --- a/internal/praefect/replicator_pg_test.go +++ b/internal/praefect/replicator_pg_test.go @@ -51,7 +51,9 @@ func TestReplicatorInvalidSourceRepository(t *testing.T) { rs := datastore.NewPostgresRepositoryStore(glsql.NewDB(t), nil) - require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path-1", "gitaly-1", nil, nil, true, false)) + id, err := rs.ReserveRepositoryID(ctx, "virtual-storage-1", "relative-path-1") + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id, "virtual-storage-1", "relative-path-1", "gitaly-1", nil, nil, true, false)) exists, err := rs.RepositoryExists(ctx, "virtual-storage-1", "relative-path-1") require.NoError(t, err) @@ -60,7 +62,7 @@ func TestReplicatorInvalidSourceRepository(t *testing.T) { r := &defaultReplicator{rs: rs, log: testhelper.DiscardTestLogger(t)} require.NoError(t, r.Replicate(ctx, datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ - RepositoryID: 1, + RepositoryID: id, VirtualStorage: "virtual-storage-1", RelativePath: "relative-path-1", SourceNodeStorage: "gitaly-1", @@ -92,7 +94,9 @@ func TestReplicatorDestroy(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path-1", "storage-1", []string{"storage-2"}, nil, false, false)) + id, err := rs.ReserveRepositoryID(ctx, "virtual-storage-1", "relative-path-1") + require.NoError(t, err) + require.NoError(t, rs.CreateRepository(ctx, id, "virtual-storage-1", "relative-path-1", "storage-1", []string{"storage-2"}, nil, false, false)) ln, err := net.Listen("tcp", "localhost:0") require.NoError(t, err) diff --git a/internal/praefect/repocleaner/repository_test.go b/internal/praefect/repocleaner/repository_test.go index 4cbed8cae78..a78803b3af3 100644 --- a/internal/praefect/repocleaner/repository_test.go +++ b/internal/praefect/repocleaner/repository_test.go @@ -89,7 +89,7 @@ func TestRunner_Run(t *testing.T) { defer cancel() repoStore := datastore.NewPostgresRepositoryStore(db.DB, nil) - for i, set := range []struct { + for _, set := range []struct { relativePath string primary string secondaries []string @@ -110,7 +110,9 @@ func TestRunner_Run(t *testing.T) { secondaries: []string{storage2, storage3}, }, } { - require.NoError(t, repoStore.CreateRepository(ctx, int64(i), conf.VirtualStorages[0].Name, set.relativePath, set.primary, set.secondaries, nil, false, false)) + id, err := repoStore.ReserveRepositoryID(ctx, conf.VirtualStorages[0].Name, set.relativePath) + require.NoError(t, err) + require.NoError(t, repoStore.CreateRepository(ctx, id, conf.VirtualStorages[0].Name, set.relativePath, set.primary, set.secondaries, nil, false, false)) } logger, loggerHook := test.NewNullLogger() diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go index dd2be1ecc09..c23abea2c7e 100644 --- a/internal/praefect/router_per_repository_test.go +++ b/internal/praefect/router_per_repository_test.go @@ -445,6 +445,8 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { return actual.Secondaries[i].Storage < actual.Secondaries[j].Storage }) sort.Strings(actual.ReplicationTargets) + // Hard code it here as there is no way we can predict the value of the sequence + actual.RepositoryID = 1 require.Contains(t, expected, actual) } } @@ -601,14 +603,15 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - db.SequenceReset(t) tx := db.Begin(t) defer tx.Rollback(t) rs := datastore.NewPostgresRepositoryStore(tx, nil) if tc.repositoryExists { + id, err := rs.ReserveRepositoryID(ctx, "virtual-storage-1", "relative-path") + require.NoError(t, err) require.NoError(t, - rs.CreateRepository(ctx, 1, "virtual-storage-1", relativePath, "primary", nil, nil, true, true), + rs.CreateRepository(ctx, id, "virtual-storage-1", relativePath, "primary", nil, nil, true, true), ) } -- GitLab From a6bb648081ee3d4d3a30f50642589f49e0db55c5 Mon Sep 17 00:00:00 2001 From: Pavlo Strokov Date: Tue, 19 Oct 2021 18:17:20 +0300 Subject: [PATCH 08/10] sql: Remove SequenceReset as not needed Because the database should not be cleaned up we don't need SequenceReset method anymore. --- internal/praefect/datastore/glsql/testing.go | 22 -------------------- 1 file changed, 22 deletions(-) diff --git a/internal/praefect/datastore/glsql/testing.go b/internal/praefect/datastore/glsql/testing.go index e1d37ec91b7..4335850ea3b 100644 --- a/internal/praefect/datastore/glsql/testing.go +++ b/internal/praefect/datastore/glsql/testing.go @@ -115,28 +115,6 @@ func (db DB) Close() error { return nil } -// SequenceReset restarts all sequences in the database. -func (db DB) SequenceReset(t *testing.T) { - t.Helper() - prov := &StringProvider{} - rows, err := db.DB.Query( - `SELECT S.relname - FROM pg_class AS S, pg_depend AS D, pg_class AS T, pg_attribute AS C - WHERE S.relkind = 'S' - AND S.oid = D.objid - AND D.refobjid = T.oid - AND D.refobjid = C.attrelid - AND D.refobjsubid = C.attnum`, - ) - require.NoError(t, err) - require.NoError(t, ScanAll(rows, prov)) - - for _, seqName := range prov.Values() { - _, err := db.DB.Exec(`ALTER SEQUENCE ` + seqName + ` RESTART`) - require.NoError(t, err) - } -} - // NewDB returns a wrapper around the database connection pool. // Must be used only for testing. // The new database with empty relations will be created for each call of this function. -- GitLab From bc7df325ecc85da1712ee18717b4df6f87589c01 Mon Sep 17 00:00:00 2001 From: Pavlo Strokov Date: Tue, 26 Oct 2021 09:46:12 +0300 Subject: [PATCH 09/10] test: Fix flaky TestStorageCleanup_AcquireNextStorage The verification of the value filled for the TriggeredAt is flaky as precision of the timestamp returned from the database is less than used in code. That is why value of the start variable often is greater than one contained in the TriggeredAt column. As this check is not mandatory we just skip it and verify the value is set. --- internal/praefect/datastore/storage_cleanup_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/internal/praefect/datastore/storage_cleanup_test.go b/internal/praefect/datastore/storage_cleanup_test.go index 4ff18a00ae6..94f042ac475 100644 --- a/internal/praefect/datastore/storage_cleanup_test.go +++ b/internal/praefect/datastore/storage_cleanup_test.go @@ -172,7 +172,6 @@ func TestStorageCleanup_AcquireNextStorage(t *testing.T) { storageCleanup := NewStorageCleanup(tx) require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1")) - start := time.Now().UTC() _, release, err := storageCleanup.AcquireNextStorage(ctx, 0, 200*time.Millisecond) require.NoError(t, err) @@ -180,7 +179,6 @@ func TestStorageCleanup_AcquireNextStorage(t *testing.T) { check1 := getAllStoragesCleanup(t, ctx, tx) require.Len(t, check1, 1) require.True(t, check1[0].TriggeredAt.Valid) - require.Truef(t, check1[0].TriggeredAt.Time.After(start), "%s is not after %s", check1[0].TriggeredAt, start) // Check the goroutine running in the background updates triggered_at column periodically. time.Sleep(time.Second) -- GitLab From b12983d8beb22486fb21594ab6622914e6a89013 Mon Sep 17 00:00:00 2001 From: Pavlo Strokov Date: Tue, 26 Oct 2021 09:49:58 +0300 Subject: [PATCH 10/10] test: Remove unused TruncateAll The method TruncateAll is not used anymore and should not be used as we don't want our data in the database to be dropped. That is why it is removed now. --- internal/praefect/datastore/glsql/testing.go | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/internal/praefect/datastore/glsql/testing.go b/internal/praefect/datastore/glsql/testing.go index 4335850ea3b..2152b3ee186 100644 --- a/internal/praefect/datastore/glsql/testing.go +++ b/internal/praefect/datastore/glsql/testing.go @@ -85,22 +85,6 @@ func (db DB) RequireRowsInTable(t *testing.T, tname string, n int) { require.Equal(t, n, count, "unexpected amount of rows in table: %d instead of %d", count, n) } -// TruncateAll removes all data from known set of tables. -func (db DB) TruncateAll(t testing.TB) { - db.Truncate(t, - "replication_queue_job_lock", - "replication_queue", - "replication_queue_lock", - "node_status", - "shard_primaries", - "storage_repositories", - "repositories", - "virtual_storages", - "repository_assignments", - "storage_cleanups", - ) -} - // MustExec executes `q` with `args` and verifies there are no errors. func (db DB) MustExec(t testing.TB, q string, args ...interface{}) { _, err := db.DB.Exec(q, args...) -- GitLab