diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 0e691ccca544c4ab62cc559dd042fe598995872d..52a7826e62cd606af44a442a5b3bcbc5ec1e2141 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -475,21 +475,21 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall return nil, fmt.Errorf("mutator call: replication details: %w", err) } + var additionalRepoRelativePath string + if additionalRepo, ok, err := call.methodInfo.AdditionalRepo(call.msg); err != nil { + return nil, helper.ErrInvalidArgument(err) + } else if ok { + additionalRepoRelativePath = additionalRepo.GetRelativePath() + } + var route RepositoryMutatorRoute switch change { case datastore.CreateRepo: - route, err = c.router.RouteRepositoryCreation(ctx, virtualStorage, targetRepo.RelativePath) + route, err = c.router.RouteRepositoryCreation(ctx, virtualStorage, targetRepo.RelativePath, additionalRepoRelativePath) if err != nil { return nil, fmt.Errorf("route repository creation: %w", err) } default: - var additionalRepoRelativePath string - if additionalRepo, ok, err := call.methodInfo.AdditionalRepo(call.msg); err != nil { - return nil, helper.ErrInvalidArgument(err) - } else if ok { - additionalRepoRelativePath = additionalRepo.GetRelativePath() - } - route, err = c.router.RouteRepositoryMutator(ctx, virtualStorage, targetRepo.RelativePath, additionalRepoRelativePath) if err != nil { if errors.Is(err, ErrRepositoryReadOnly) { diff --git a/internal/praefect/router.go b/internal/praefect/router.go index 8c3ec9a5c663b98c12f00e8d3fc3973258b40b8e..1b6f19387d35b96761ce26a15907ec22bb45879e 100644 --- a/internal/praefect/router.go +++ b/internal/praefect/router.go @@ -65,5 +65,5 @@ type Router interface { RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error) // RouteRepositoryCreation decides returns the primary and secondaries that should handle the repository creation // request. It is up to the caller to store the assignments and primary information after finishing the RPC. - RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) + RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error) } diff --git a/internal/praefect/router_node_manager.go b/internal/praefect/router_node_manager.go index 7793c4d6d4f01dff3322f0bc602fa21239892980..9cceaea0aea0a4ea6f1ab313c3fa0acc80de83b0 100644 --- a/internal/praefect/router_node_manager.go +++ b/internal/praefect/router_node_manager.go @@ -125,7 +125,7 @@ func (r *nodeManagerRouter) RouteRepositoryMutator(ctx context.Context, virtualS // RouteRepositoryCreation includes healthy secondaries in the transaction and sets the unhealthy secondaries as // replication targets. The virtual storage's primary acts as the primary for every repository. -func (r *nodeManagerRouter) RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) { +func (r *nodeManagerRouter) RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error) { shard, err := r.mgr.GetShard(ctx, virtualStorage) if err != nil { return RepositoryMutatorRoute{}, fmt.Errorf("get shard: %w", err) @@ -144,9 +144,10 @@ func (r *nodeManagerRouter) RouteRepositoryCreation(ctx context.Context, virtual } return RepositoryMutatorRoute{ - Primary: toRouterNode(shard.Primary), - ReplicaPath: relativePath, - Secondaries: secondaries, - ReplicationTargets: replicationTargets, + Primary: toRouterNode(shard.Primary), + ReplicaPath: relativePath, + AdditionalReplicaPath: additionalRepoRelativePath, + Secondaries: secondaries, + ReplicationTargets: replicationTargets, }, nil } diff --git a/internal/praefect/router_per_repository.go b/internal/praefect/router_per_repository.go index f6d3f89ed580a5b09283dce21d81dcb718c86723..bd3a37f666f2091e11d278dafc78ae4dd6b2e2fd 100644 --- a/internal/praefect/router_per_repository.go +++ b/internal/praefect/router_per_repository.go @@ -190,6 +190,19 @@ func (r *PerRepositoryRouter) RouteRepositoryAccessor(ctx context.Context, virtu }, nil } +func (r *PerRepositoryRouter) resolveAdditionalReplicaPath(ctx context.Context, virtualStorage, additionalRelativePath string) (string, error) { + if additionalRelativePath == "" { + return "", nil + } + + additionalRepositoryID, err := r.rs.GetRepositoryID(ctx, virtualStorage, additionalRelativePath) + if err != nil { + return "", fmt.Errorf("get additional repository id: %w", err) + } + + return r.rs.GetReplicaPath(ctx, additionalRepositoryID) +} + //nolint: revive,stylecheck // This is unintentionally missing documentation. func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath, additionalRelativePath string) (RepositoryMutatorRoute, error) { healthyNodes, err := r.healthyNodes(virtualStorage) @@ -202,17 +215,9 @@ func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtua return RepositoryMutatorRoute{}, fmt.Errorf("get repository id: %w", err) } - var additionalReplicaPath string - if additionalRelativePath != "" { - additionalRepositoryID, err := r.rs.GetRepositoryID(ctx, virtualStorage, additionalRelativePath) - if err != nil { - return RepositoryMutatorRoute{}, fmt.Errorf("get additional repository id: %w", err) - } - - additionalReplicaPath, err = r.rs.GetReplicaPath(ctx, additionalRepositoryID) - if err != nil { - return RepositoryMutatorRoute{}, fmt.Errorf("get additional repository replica path: %w", err) - } + additionalReplicaPath, err := r.resolveAdditionalReplicaPath(ctx, virtualStorage, additionalRelativePath) + if err != nil { + return RepositoryMutatorRoute{}, fmt.Errorf("resolve additional replica path: %w", err) } primary, err := r.pg.GetPrimary(ctx, virtualStorage, repositoryID) @@ -280,7 +285,12 @@ func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtua // RouteRepositoryCreation picks a random healthy node to act as the primary node and selects the secondary nodes // if assignments are enabled. Healthy secondaries take part in the transaction, unhealthy secondaries are set as // replication targets. -func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) { +func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath, additionalRelativePath string) (RepositoryMutatorRoute, error) { + additionalReplicaPath, err := r.resolveAdditionalReplicaPath(ctx, virtualStorage, additionalRelativePath) + if err != nil { + return RepositoryMutatorRoute{}, fmt.Errorf("resolve additional replica path: %w", err) + } + healthyNodes, err := r.healthyNodes(virtualStorage) if err != nil { return RepositoryMutatorRoute{}, err @@ -348,10 +358,11 @@ func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtu } return RepositoryMutatorRoute{ - RepositoryID: id, - ReplicaPath: relativePath, - Primary: primary, - Secondaries: secondaries, - ReplicationTargets: replicationTargets, + RepositoryID: id, + ReplicaPath: relativePath, + AdditionalReplicaPath: additionalReplicaPath, + Primary: primary, + Secondaries: secondaries, + ReplicationTargets: replicationTargets, }, nil } diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go index b98d941001891a51143aee6f9cb0f9967ae14309..dfc68552d11036f7795d9f9c9fb7ffa1de5a0ddf 100644 --- a/internal/praefect/router_per_repository_test.go +++ b/internal/praefect/router_per_repository_test.go @@ -473,19 +473,24 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { db := testdb.New(t) - const relativePath = "relative-path" + const ( + relativePath = "relative-path" + additionalRelativePath = "additional-relative-path" + additionalReplicaPath = "additional-replica-path" + ) for _, tc := range []struct { - desc string - virtualStorage string - healthyNodes StaticHealthChecker - replicationFactor int - primaryCandidates int - primaryPick int - secondaryCandidates int - repositoryExists bool - matchRoute matcher - error error + desc string + virtualStorage string + healthyNodes StaticHealthChecker + replicationFactor int + primaryCandidates int + primaryPick int + secondaryCandidates int + repositoryExists bool + additionalRelativePath string + matchRoute matcher + error error }{ { desc: "no healthy nodes", @@ -499,17 +504,19 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { error: nodes.ErrVirtualStorageNotExist, }, { - desc: "no healthy secondaries", - virtualStorage: "virtual-storage-1", - healthyNodes: StaticHealthChecker{"virtual-storage-1": {"primary"}}, - primaryCandidates: 1, - primaryPick: 0, + desc: "no healthy secondaries", + virtualStorage: "virtual-storage-1", + healthyNodes: StaticHealthChecker{"virtual-storage-1": {"primary"}}, + primaryCandidates: 1, + primaryPick: 0, + additionalRelativePath: additionalRelativePath, matchRoute: requireOneOf( RepositoryMutatorRoute{ - RepositoryID: 1, - ReplicaPath: relativePath, - Primary: RouterNode{Storage: "primary", Connection: primaryConn}, - ReplicationTargets: []string{"secondary-1", "secondary-2"}, + RepositoryID: 1, + ReplicaPath: relativePath, + AdditionalReplicaPath: additionalReplicaPath, + Primary: RouterNode{Storage: "primary", Connection: primaryConn}, + ReplicationTargets: []string{"secondary-1", "secondary-2"}, }, ), }, @@ -614,6 +621,18 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { repositoryExists: true, error: fmt.Errorf("reserve repository id: %w", commonerr.ErrRepositoryAlreadyExists), }, + { + desc: "additional repository doesn't exist", + virtualStorage: "virtual-storage-1", + additionalRelativePath: "non-existent", + error: fmt.Errorf( + "resolve additional replica path: %w", + fmt.Errorf( + "get additional repository id: %w", + commonerr.NewRepositoryNotFoundError("virtual-storage-1", "non-existent"), + ), + ), + }, } { t.Run(tc.desc, func(t *testing.T) { ctx := testhelper.Context(t) @@ -627,6 +646,8 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { ) } + require.NoError(t, rs.CreateRepository(ctx, 2, "virtual-storage-1", additionalRelativePath, additionalReplicaPath, "primary", nil, nil, true, true)) + route, err := NewPerRepositoryRouter( Connections{ "virtual-storage-1": { @@ -650,7 +671,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { nil, rs, map[string]int{"virtual-storage-1": tc.replicationFactor}, - ).RouteRepositoryCreation(ctx, tc.virtualStorage, relativePath) + ).RouteRepositoryCreation(ctx, tc.virtualStorage, relativePath, tc.additionalRelativePath) if tc.error != nil { require.Equal(t, tc.error, err) return