diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index a4692647c54c24860934922c0b72936736cf44de..ed94f6fe49cac5965287537892c84fb5bd10957c 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -361,6 +361,7 @@ func run(cfgs []starter.Config, conf config.Config) error { transactionManager, conf, protoregistry.GitalyProtoPreregistered, + healthChecker, ) repl = praefect.NewReplMgr( diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index 37d8b6e619511e896651edc5ad7d3778fbbbfe47..8642e259b12478dd7a5d4f941616251ee0e4a035 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -166,7 +166,7 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string, registry, err := protoregistry.New(fd) require.NoError(t, err) - coordinator := NewCoordinator(queue, nil, NewNodeManagerRouter(nodeMgr, nil), txMgr, conf, registry) + coordinator := NewCoordinator(queue, nil, NewNodeManagerRouter(nodeMgr, nil), txMgr, conf, registry, nodeMgr) srv := NewGRPCServer(conf, logEntry, registry, coordinator.StreamDirector, nodeMgr, txMgr, queue, nil, nil) diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 4cdf2d8c2e8d925de7d91d74cc16b46b3ed3da0e..d86f7ead537e0f5461fea628623544ae99f61452 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -215,6 +215,7 @@ type Coordinator struct { registry *protoregistry.Registry conf config.Config votersMetric *prometheus.HistogramVec + hc nodes.HealthChecker } // NewCoordinator returns a new Coordinator that utilizes the provided logger @@ -225,6 +226,7 @@ func NewCoordinator( txMgr *transactions.Manager, conf config.Config, r *protoregistry.Registry, + hc nodes.HealthChecker, ) *Coordinator { maxVoters := 1 for _, storage := range conf.VirtualStorages { @@ -248,6 +250,7 @@ func NewCoordinator( }, []string{"virtual_storage"}, ), + hc: hc, } return coordinator @@ -752,7 +755,17 @@ func (c *Coordinator) newRequestFinalizer( ctxlogrus.Extract(ctx).WithError(err).Info("deleted repository does not have a store entry") } case datastore.CreateRepo: - if err := c.rs.CreateRepository(ctx, virtualStorage, targetRepo.GetRelativePath(), primary); err != nil { + healthyNodes := c.hc.HealthyNodes() + + var virtualStorages, physicalStorages []string + for virtualStorage, nodes := range healthyNodes { + for _, node := range nodes { + virtualStorages = append(virtualStorages, virtualStorage) + physicalStorages = append(physicalStorages, node) + } + } + + if err := c.rs.CreateRepository(ctx, virtualStorage, targetRepo.GetRelativePath(), primary, virtualStorages, physicalStorages); err != nil { if !errors.Is(err, datastore.RepositoryExistsError{}) { return fmt.Errorf("create repository: %w", err) } diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go index e00b7066000d118a8cee5de114cea67f210be4bb..af76c1ac57a01597ce49662ee43ac6bd419b9074 100644 --- a/internal/praefect/coordinator_pg_test.go +++ b/internal/praefect/coordinator_pg_test.go @@ -170,6 +170,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { txMgr, conf, protoregistry.GitalyProtoPreregistered, + nodeMgr, ) fullMethod := "/gitaly.SmartHTTPService/PostReceivePack" diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index f4057e3c391b232d867465e08ff147cc4efa6f38..f5b549d4a8e864dcff20da8828fe35928e1212ae 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -98,6 +98,7 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) { transactions.NewManager(conf), conf, protoregistry.GitalyProtoPreregistered, + nil, ) frame, err := proto.Marshal(&gitalypb.CleanupRequest{Repository: &gitalypb.Repository{ @@ -168,6 +169,7 @@ func TestStreamDirectorMutator(t *testing.T) { txMgr, conf, protoregistry.GitalyProtoPreregistered, + nodeMgr, ) frame, err := proto.Marshal(&gitalypb.CreateObjectPoolRequest{ @@ -280,6 +282,7 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) { txMgr, conf, protoregistry.GitalyProtoPreregistered, + nodeMgr, ) fullMethod := "/gitaly.SmartHTTPService/PostReceivePack" @@ -385,6 +388,7 @@ func TestStreamDirectorAccessor(t *testing.T) { txMgr, conf, protoregistry.GitalyProtoPreregistered, + nodeMgr, ) frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo}) @@ -476,6 +480,7 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) { txMgr, conf, protoregistry.GitalyProtoPreregistered, + nodeMgr, ) t.Run("forwards accessor operations", func(t *testing.T) { @@ -638,7 +643,7 @@ func TestStreamDirector_repo_creation(t *testing.T) { var createRepositoryCalled int64 rs := datastore.MockRepositoryStore{ - CreateRepositoryFunc: func(ctx context.Context, virtualStorage, relativePath, storage string) error { + CreateRepositoryFunc: func(ctx context.Context, virtualStorage, relativePath, storage string, healthyVirtualStorages, healthyPhysicalStorages []string) error { atomic.AddInt64(&createRepositoryCalled, 1) assert.Equal(t, targetRepo.StorageName, virtualStorage) assert.Equal(t, targetRepo.RelativePath, relativePath) @@ -654,6 +659,7 @@ func TestStreamDirector_repo_creation(t *testing.T) { txMgr, conf, protoregistry.GitalyProtoPreregistered, + nodeMgr, ) frame, err := proto.Marshal(&gitalypb.CreateRepositoryRequest{ @@ -796,6 +802,7 @@ func TestAbsentCorrelationID(t *testing.T) { txMgr, conf, protoregistry.GitalyProtoPreregistered, + nodeMgr, ) frame, err := proto.Marshal(&gitalypb.CreateObjectPoolRequest{ @@ -918,6 +925,7 @@ func TestStreamDirectorStorageScope(t *testing.T) { nil, conf, protoregistry.GitalyProtoPreregistered, + nodeMgr, ) ctx, cancel := testhelper.Context() @@ -984,6 +992,7 @@ func TestStreamDirectorStorageScopeError(t *testing.T) { nil, config.Config{}, protoregistry.GitalyProtoPreregistered, + mgr, ) frame, err := proto.Marshal(&gitalypb.RemoveNamespaceRequest{StorageName: "", Name: "stub"}) @@ -1013,6 +1022,7 @@ func TestStreamDirectorStorageScopeError(t *testing.T) { nil, config.Config{}, protoregistry.GitalyProtoPreregistered, + mgr, ) frame, err := proto.Marshal(&gitalypb.RemoveNamespaceRequest{StorageName: "fake", Name: "stub"}) @@ -1043,6 +1053,7 @@ func TestStreamDirectorStorageScopeError(t *testing.T) { nil, config.Config{}, protoregistry.GitalyProtoPreregistered, + mgr, ) fullMethod := "/gitaly.NamespaceService/NamespaceExists" @@ -1074,6 +1085,7 @@ func TestStreamDirectorStorageScopeError(t *testing.T) { nil, config.Config{}, protoregistry.GitalyProtoPreregistered, + mgr, ) fullMethod := "/gitaly.NamespaceService/RemoveNamespace" diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go index d3cf1e514d13d591e0a851b59d2c8764b491b22b..7efe0370849ed7ee848d87ca1af8444b5f278e3b 100644 --- a/internal/praefect/datastore/repository_store.go +++ b/internal/praefect/datastore/repository_store.go @@ -99,7 +99,7 @@ type RepositoryStore interface { GetReplicatedGeneration(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error) // CreateRepository creates the repository for the virtual storage and the storage. Returns // RepositoryExistsError when trying to create a repository which already has a matching record. - CreateRepository(ctx context.Context, virtualStorage, relativePath, storage string) error + CreateRepository(ctx context.Context, virtualStorage, relativePath, storage string, healthyVirtualStorages, healthyPhysicalStorages []string) error // DeleteRepository deletes the repository from the virtual storage and the storage. Returns // RepositoryNotExistsError when trying to delete a repository which has no record in the virtual storage // or the storage. @@ -297,14 +297,33 @@ AND storage = ANY($3) //nolint:stylecheck //nolint:golint -func (rs *PostgresRepositoryStore) CreateRepository(ctx context.Context, virtualStorage, relativePath, storage string) error { +func (rs *PostgresRepositoryStore) CreateRepository(ctx context.Context, virtualStorage, relativePath, storage string, healthyVirtualStorages, healthyPhysicalStorages []string) error { const q = ` -WITH repo AS ( +WITH healthy_storages AS ( + SELECT unnest($4::text[]) AS virtual_storage, unnest($5::text[]) AS storage +), primaries AS ( + SELECT storage + FROM healthy_storages + LEFT JOIN storage_repositories USING (virtual_storage, storage) + WHERE virtual_storage = $1 + AND storage_repositories.relative_path = $2 + AND ( + -- If assignments exist for the repository, only the assigned storages elected as primary. + -- If no assignments exist, any healthy node can be elected as the primary + SELECT COUNT(*) = 0 OR COUNT(*) FILTER (WHERE storage = storage_repositories.storage) = 1 + FROM repository_assignments + WHERE repository_assignments.virtual_storage = storage_repositories.virtual_storage + AND repository_assignments.relative_path = storage_repositories.relative_path + ) + ORDER BY generation DESC NULLS LAST, random() + LIMIT 1 +), repo AS ( INSERT INTO repositories ( virtual_storage, relative_path, - generation - ) VALUES ($1, $2, 0) + generation, + "primary" + ) VALUES ($1, $2, 0, (SELECT storage FROM primaries)) ) INSERT INTO storage_repositories ( virtual_storage, @@ -315,7 +334,7 @@ INSERT INTO storage_repositories ( VALUES ($1, $2, $3, 0) ` - _, err := rs.db.ExecContext(ctx, q, virtualStorage, relativePath, storage) + _, err := rs.db.ExecContext(ctx, q, virtualStorage, relativePath, storage, pq.StringArray(healthyVirtualStorages), pq.StringArray(healthyPhysicalStorages)) var pqerr *pq.Error if errors.As(err, &pqerr) && pqerr.Code.Name() == "unique_violation" { diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go index 5809be6eeda23a46789f097bea14a0d45e396a5b..6f805d5cd81ddff3fba95b551895acebaa200c0c 100644 --- a/internal/praefect/datastore/repository_store_mock.go +++ b/internal/praefect/datastore/repository_store_mock.go @@ -9,7 +9,7 @@ type MockRepositoryStore struct { IncrementGenerationFunc func(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) error GetReplicatedGenerationFunc func(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error) SetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error - CreateRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error + CreateRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string, healthyVirtualStorages, healthyPhysicalStorages []string) error DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error RenameRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error GetConsistentStoragesFunc func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) @@ -52,12 +52,12 @@ func (m MockRepositoryStore) SetGeneration(ctx context.Context, virtualStorage, //nolint:stylecheck //nolint:golint -func (m MockRepositoryStore) CreateRepository(ctx context.Context, virtualStorage, relativePath, storage string) error { +func (m MockRepositoryStore) CreateRepository(ctx context.Context, virtualStorage, relativePath, storage string, healthyVirtualStorages, healthyPhysicalStorages []string) error { if m.CreateRepositoryFunc == nil { return nil } - return m.CreateRepositoryFunc(ctx, virtualStorage, relativePath, storage) + return m.CreateRepositoryFunc(ctx, virtualStorage, relativePath, storage, healthyVirtualStorages, healthyPhysicalStorages) } func (m MockRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath, storage string) error { diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go index e8d9019d96b4abab8562eaf60ba5f06747b330b3..c453d6c1d4a3b57439dce1ec687fde38e11bb598 100644 --- a/internal/praefect/datastore/repository_store_test.go +++ b/internal/praefect/datastore/repository_store_test.go @@ -289,7 +289,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("create", func(t *testing.T) { rs, requireState := newStore(t, nil) - require.NoError(t, rs.CreateRepository(ctx, vs, repo, stor)) + require.NoError(t, rs.CreateRepository(ctx, vs, repo, stor, []string{vs}, []string{stor})) requireState(t, ctx, virtualStorageState{ @@ -310,10 +310,10 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("conflict", func(t *testing.T) { rs, _ := newStore(t, nil) - require.NoError(t, rs.CreateRepository(ctx, vs, repo, stor)) + require.NoError(t, rs.CreateRepository(ctx, vs, repo, stor, []string{vs}, []string{stor})) require.Equal(t, RepositoryExistsError{vs, repo, stor}, - rs.CreateRepository(ctx, vs, repo, stor), + rs.CreateRepository(ctx, vs, repo, stor, []string{vs}, []string{stor}), ) }) }) diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index d5ffebf93a13495b1cc7cd6dd27f83f5c73c7abf..32c7aa2c803cc91ab1c5fe4eceeb16507c92a206 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -229,6 +229,7 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp opt.withTxMgr, conf, opt.withAnnotations, + opt.withNodeMgr, ) // TODO: run a replmgr for EVERY virtual storage diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 39bcb96eb66aa0fd021a2ab6db7755b72a92209f..d2d9ee2f89809cebb893577882b6818900d4bc24 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -360,6 +360,7 @@ func TestPropagateReplicationJob(t *testing.T) { txMgr, conf, protoregistry.GitalyProtoPreregistered, + nodeMgr, ) replmgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queue, rs, nodeMgr, NodeSetFromNodeManager(nodeMgr)) diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go index f8ff2823a7872b448026b0c6d62a6d28f40766eb..9ed86e349ffe2cc5f20a63da427f2ffcce354206 100644 --- a/internal/praefect/server_factory_test.go +++ b/internal/praefect/server_factory_test.go @@ -96,6 +96,7 @@ func TestServerFactory(t *testing.T) { txMgr, conf, registry, + nodeMgr, ) checkOwnRegisteredServices := func(ctx context.Context, t *testing.T, cc *grpc.ClientConn) healthpb.HealthClient { diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index cad10cc6b2e9d46714b38415d967751764da1daa..ae42320697791348b7de12a77e6949020ee070fd 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -769,6 +769,7 @@ func TestProxyWrites(t *testing.T) { txMgr, conf, protoregistry.GitalyProtoPreregistered, + nodeMgr, ) server := grpc.NewServer( @@ -928,6 +929,7 @@ func TestErrorThreshold(t *testing.T) { nil, conf, registry, + nodeMgr, ) server := grpc.NewServer(