diff --git a/internal/praefect/cache_key.go b/internal/praefect/cache_key.go new file mode 100644 index 0000000000000000000000000000000000000000..e7593fe4896a6d7fda98c7e8cf7ee1596264cf24 --- /dev/null +++ b/internal/praefect/cache_key.go @@ -0,0 +1,14 @@ +package praefect + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" +) + +func getKey(call grpcCall, generation int) string { + h := sha256.New() + sprintf := fmt.Sprintf("%s-%d-%s", call.fullMethodName, generation, call.msg) + h.Write([]byte(sprintf)) + return hex.EncodeToString(h.Sum(nil)) +} diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 0e00c03c0f9aaad7ece9079b0f84752b29ae36ab..37da619d933b0e7af929b1ec7209ef0421efbdb6 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -320,11 +320,17 @@ func (c *Coordinator) accessorStreamParameters(ctx context.Context, call grpcCal metrics.ReadDistribution.WithLabelValues(virtualStorage, route.Node.Storage).Inc() + var key *string + if call.fullMethodName != "/gitaly.SmartHTTPService/PostUploadPackWithSidechannel" { + cacheKey := getKey(call, route.Generation) + key = &cacheKey + } + return proxy.NewStreamParameters(proxy.Destination{ Ctx: streamParametersContext(ctx), Conn: route.Node.Connection, Msg: b, - }, nil, nil, nil), nil + }, nil, nil, nil, key), nil } func (c *Coordinator) registerTransaction(ctx context.Context, primary RouterNode, secondaries []RouterNode) (transactions.Transaction, transactions.CancelFunc, error) { @@ -517,7 +523,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall } return firstErr } - return proxy.NewStreamParameters(primaryDest, secondaryDests, reqFinalizer, nil), nil + return proxy.NewStreamParameters(primaryDest, secondaryDests, reqFinalizer, nil, nil), nil } // maintenanceStreamParameters returns stream parameters for a maintenance-style RPC. The RPC call @@ -582,7 +588,7 @@ func (c *Coordinator) maintenanceStreamParameters(ctx context.Context, call grpc } return nil - }, nil), nil + }, nil, nil), nil } // streamParametersContexts converts the contexts with incoming metadata into a context that is @@ -734,7 +740,7 @@ func (c *Coordinator) accessorStorageStreamParameters(ctx context.Context, mi pr Msg: b, } - return proxy.NewStreamParameters(primaryDest, nil, func() error { return nil }, nil), nil + return proxy.NewStreamParameters(primaryDest, nil, func() error { return nil }, nil, nil), nil } func (c *Coordinator) mutatorStorageStreamParameters(ctx context.Context, mi protoregistry.MethodInfo, msg proto.Message, virtualStorage string) (*proxy.StreamParameters, error) { @@ -768,7 +774,7 @@ func (c *Coordinator) mutatorStorageStreamParameters(ctx context.Context, mi pro secondaryDests[i] = proxy.Destination{Ctx: ctx, Conn: secondary.Connection, Msg: b} } - return proxy.NewStreamParameters(primaryDest, secondaryDests, func() error { return nil }, nil), nil + return proxy.NewStreamParameters(primaryDest, secondaryDests, func() error { return nil }, nil, nil), nil } // rewrittenRepositoryMessage rewrites the repository storages and relative paths. diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 19486f40cd206feda5462e724c68281f9f0072f0..9a33189c95505b968061bbe0dc154aed4de68f15 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -95,11 +95,11 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) { ctx := testhelper.Context(t) rs := datastore.MockRepositoryStore{ - GetConsistentStoragesFunc: func(context.Context, string, string) (string, map[string]struct{}, error) { + GetConsistentStoragesFunc: func(context.Context, string, string) (string, map[string]struct{}, int, error) { if tc.readOnly { - return "", map[string]struct{}{storage + "-other": {}}, nil + return "", map[string]struct{}{storage + "-other": {}}, 0, nil } - return "", map[string]struct{}{storage: {}}, nil + return "", map[string]struct{}{storage: {}}, 0, nil }, } @@ -323,8 +323,8 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) { } rs := datastore.MockRepositoryStore{ - GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { - return relativePath, map[string]struct{}{"primary": {}, "secondary": {}}, nil + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, int, error) { + return relativePath, map[string]struct{}{"primary": {}, "secondary": {}}, 0, nil }, } @@ -985,8 +985,8 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) { entry := testhelper.NewDiscardingLogEntry(t) repoStore := datastore.MockRepositoryStore{ - GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { - return relativePath, map[string]struct{}{primaryNodeConf.Storage: {}, secondaryNodeConf.Storage: {}}, nil + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, int, error) { + return relativePath, map[string]struct{}{primaryNodeConf.Storage: {}, secondaryNodeConf.Storage: {}}, 0, nil }, } @@ -1968,8 +1968,8 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) { GetReplicaPathFunc: func(ctx context.Context, repositoryID int64) (string, error) { return repoProto.GetRelativePath(), nil }, - GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { - return relativePath, map[string]struct{}{"primary": {}, "secondary-1": {}, "secondary-2": {}}, nil + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, int, error) { + return relativePath, map[string]struct{}{"primary": {}, "secondary-1": {}, "secondary-2": {}}, 0, nil }, }, }) diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go index 03e7a2d084e644212ce8acf8081bb4ce0be444b0..11812255c35acb8f37abf12874df05b3988b8de4 100644 --- a/internal/praefect/datastore/repository_store.go +++ b/internal/praefect/datastore/repository_store.go @@ -122,7 +122,7 @@ type RepositoryStore interface { // RenameRepository which can be removed in a later release. RenameRepositoryInPlace(ctx context.Context, virtualStorage, relativePath, newRelativePath string) error // GetConsistentStoragesByRepositoryID returns the replica path and the set of up to date storages for the given repository keyed by repository ID. - GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error) + GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, map[string]struct{}, int, error) ConsistentStoragesGetter // RepositoryExists returns whether the repository exists on a virtual storage. RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error) @@ -608,44 +608,44 @@ AND storage = $3 } // GetConsistentStoragesByRepositoryID returns the replica path and the set of up to date storages for the given repository keyed by repository ID. -func (rs *PostgresRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error) { +func (rs *PostgresRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, map[string]struct{}, int, error) { return rs.getConsistentStorages(ctx, ` -SELECT replica_path, ARRAY_AGG(storage) +SELECT replica_path, ARRAY_AGG(storage), generation FROM repositories JOIN storage_repositories USING (repository_id, relative_path, generation) WHERE repository_id = $1 -GROUP BY replica_path +GROUP BY replica_path, generation `, repositoryID) } // GetConsistentStorages returns the replica path and the set of up to date storages for the given repository keyed by virtual storage and relative path. -func (rs *PostgresRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { - replicaPath, storages, err := rs.getConsistentStorages(ctx, ` -SELECT replica_path, ARRAY_AGG(storage) +func (rs *PostgresRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, int, error) { + replicaPath, storages, generation, err := rs.getConsistentStorages(ctx, ` +SELECT replica_path, ARRAY_AGG(storage), generation FROM repositories JOIN storage_repositories USING (repository_id, relative_path, generation) WHERE repositories.virtual_storage = $1 AND repositories.relative_path = $2 -GROUP BY replica_path +GROUP BY replica_path, generation `, virtualStorage, relativePath) if errors.Is(err, commonerr.ErrRepositoryNotFound) { - return "", nil, commonerr.NewRepositoryNotFoundError(virtualStorage, relativePath) + return "", nil, 0, commonerr.NewRepositoryNotFoundError(virtualStorage, relativePath) } - return replicaPath, storages, err + return replicaPath, storages, generation, err } // getConsistentStorages is a helper for querying the consistent storages by different keys. -func (rs *PostgresRepositoryStore) getConsistentStorages(ctx context.Context, query string, params ...interface{}) (string, map[string]struct{}, error) { +func (rs *PostgresRepositoryStore) getConsistentStorages(ctx context.Context, query string, params ...interface{}) (string, map[string]struct{}, int, error) { var replicaPath string var storages glsql.StringArray - - if err := rs.db.QueryRowContext(ctx, query, params...).Scan(&replicaPath, &storages); err != nil { + var generation int + if err := rs.db.QueryRowContext(ctx, query, params...).Scan(&replicaPath, &storages, &generation); err != nil { if errors.Is(err, sql.ErrNoRows) { - return "", nil, commonerr.ErrRepositoryNotFound + return "", nil, 0, commonerr.ErrRepositoryNotFound } - return "", nil, fmt.Errorf("query: %w", err) + return "", nil, 0, fmt.Errorf("query: %w", err) } result := storages.Slice() @@ -654,7 +654,7 @@ func (rs *PostgresRepositoryStore) getConsistentStorages(ctx context.Context, qu consistentStorages[storage] = struct{}{} } - return replicaPath, consistentStorages, nil + return replicaPath, consistentStorages, generation, nil } //nolint: revive,stylecheck // This is unintentionally missing documentation. diff --git a/internal/praefect/datastore/repository_store_bm_test.go b/internal/praefect/datastore/repository_store_bm_test.go index 3f982e4176e300b1d5274975941e3bc2e40dcf1e..deb04cf4667d09bffcc38aeccc93a09c13ce256d 100644 --- a/internal/praefect/datastore/repository_store_bm_test.go +++ b/internal/praefect/datastore/repository_store_bm_test.go @@ -66,7 +66,7 @@ func benchmarkGetConsistentStorages(b *testing.B, nstorages, nrepositories int) require.NoError(b, err) b.StartTimer() - _, _, err = repoStore.GetConsistentStorages(ctx, "vs", "/path/repo/"+strconv.Itoa(nrepositories/2)) + _, _, _, err = repoStore.GetConsistentStorages(ctx, "vs", "/path/repo/"+strconv.Itoa(nrepositories/2)) b.StopTimer() require.NoError(b, err) diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go index 42acafb6ecde19d9a5c93313f1a1007d9639d06a..23148f81925ea3f28d7fca84d8f5c5e411491767 100644 --- a/internal/praefect/datastore/repository_store_mock.go +++ b/internal/praefect/datastore/repository_store_mock.go @@ -16,8 +16,8 @@ type MockRepositoryStore struct { DeleteReplicaFunc func(ctx context.Context, repositoryID int64, storage string) error RenameRepositoryInPlaceFunc func(ctx context.Context, virtualStorage, relativePath, newRelativePath string) error RenameRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error - GetConsistentStoragesByRepositoryIDFunc func(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error) - GetConsistentStoragesFunc func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) + GetConsistentStoragesByRepositoryIDFunc func(ctx context.Context, repositoryID int64) (string, map[string]struct{}, int, error) + GetConsistentStoragesFunc func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, int, error) GetPartiallyAvailableRepositoriesFunc func(ctx context.Context, virtualStorage string) ([]RepositoryMetadata, error) DeleteInvalidRepositoryFunc func(ctx context.Context, repositoryID int64, storage string) error RepositoryExistsFunc func(ctx context.Context, virtualStorage, relativePath string) (bool, error) @@ -115,18 +115,18 @@ func (m MockRepositoryStore) RenameRepository(ctx context.Context, virtualStorag } // GetConsistentStoragesByRepositoryID returns result of execution of the GetConsistentStoragesByRepositoryIDFunc field if it is set or an empty map. -func (m MockRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error) { +func (m MockRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, map[string]struct{}, int, error) { if m.GetConsistentStoragesFunc == nil { - return "", map[string]struct{}{}, nil + return "", map[string]struct{}{}, 0, nil } return m.GetConsistentStoragesByRepositoryIDFunc(ctx, repositoryID) } // GetConsistentStorages returns result of execution of the GetConsistentStoragesFunc field if it is set or an empty map. -func (m MockRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { +func (m MockRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, int, error) { if m.GetConsistentStoragesFunc == nil { - return "", map[string]struct{}{}, nil + return "", map[string]struct{}{}, 0, nil } return m.GetConsistentStoragesFunc(ctx, virtualStorage, relativePath) diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go index 3b6ba6709dc4ae89d7386436e031f376855f92e5..21a7ef343f20f252ab19cfc9f4ec825610822559 100644 --- a/internal/praefect/datastore/repository_store_test.go +++ b/internal/praefect/datastore/repository_store_test.go @@ -880,12 +880,12 @@ func TestRepositoryStore_Postgres(t *testing.T) { }) t.Run("no records", func(t *testing.T) { - replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo) + replicaPath, secondaries, _, err := rs.GetConsistentStorages(ctx, vs, repo) require.Equal(t, commonerr.NewRepositoryNotFoundError(vs, repo), err) require.Empty(t, replicaPath) require.Empty(t, secondaries) - replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, secondaries, _, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) require.Equal(t, commonerr.ErrRepositoryNotFound, err) require.Empty(t, replicaPath) require.Empty(t, secondaries) @@ -912,12 +912,12 @@ func TestRepositoryStore_Postgres(t *testing.T) { ) t.Run("consistent secondary", func(t *testing.T) { - replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo) + replicaPath, secondaries, _, err := rs.GetConsistentStorages(ctx, vs, repo) require.NoError(t, err) require.Equal(t, map[string]struct{}{"primary": {}, "consistent-secondary": {}}, secondaries) require.Equal(t, "replica-path", replicaPath) - replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, secondaries, _, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) require.NoError(t, err) require.Equal(t, map[string]struct{}{"primary": {}, "consistent-secondary": {}}, secondaries) require.Equal(t, "replica-path", replicaPath) @@ -926,12 +926,12 @@ func TestRepositoryStore_Postgres(t *testing.T) { require.NoError(t, rs.SetGeneration(ctx, 1, "primary", repo, 0)) t.Run("outdated primary", func(t *testing.T) { - replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo) + replicaPath, secondaries, _, err := rs.GetConsistentStorages(ctx, vs, repo) require.NoError(t, err) require.Equal(t, map[string]struct{}{"consistent-secondary": {}}, secondaries) require.Equal(t, "replica-path", replicaPath) - replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, secondaries, _, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) require.NoError(t, err) require.Equal(t, map[string]struct{}{"consistent-secondary": {}}, secondaries) require.Equal(t, "replica-path", replicaPath) @@ -958,12 +958,12 @@ func TestRepositoryStore_Postgres(t *testing.T) { }, ) - replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo) + replicaPath, secondaries, _, err := rs.GetConsistentStorages(ctx, vs, repo) require.NoError(t, err) require.Equal(t, map[string]struct{}{"unknown": {}}, secondaries) require.Equal(t, "replica-path", replicaPath) - replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, secondaries, _, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) require.NoError(t, err) require.Equal(t, map[string]struct{}{"unknown": {}}, secondaries) require.Equal(t, "replica-path", replicaPath) @@ -974,12 +974,12 @@ func TestRepositoryStore_Postgres(t *testing.T) { require.NoError(t, err) requireState(t, ctx, db, virtualStorageState{}, storageState{}) - replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo) + replicaPath, secondaries, _, err := rs.GetConsistentStorages(ctx, vs, repo) require.Equal(t, commonerr.NewRepositoryNotFoundError(vs, repo), err) require.Empty(t, secondaries) require.Empty(t, replicaPath) - replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, secondaries, _, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) require.Equal(t, commonerr.ErrRepositoryNotFound, err) require.Empty(t, secondaries) require.Empty(t, replicaPath) @@ -989,31 +989,31 @@ func TestRepositoryStore_Postgres(t *testing.T) { rs := newRepositoryStore(t, nil) require.NoError(t, rs.CreateRepository(ctx, 1, vs, "original-path", "replica-path", "storage-1", []string{"storage-2"}, nil, true, false)) - replicaPath, storages, err := rs.GetConsistentStorages(ctx, vs, "original-path") + replicaPath, storages, _, err := rs.GetConsistentStorages(ctx, vs, "original-path") require.NoError(t, err) require.Equal(t, "replica-path", replicaPath) require.Equal(t, map[string]struct{}{"storage-1": {}, "storage-2": {}}, storages) - replicaPath, storages, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, storages, _, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) require.NoError(t, err) require.Equal(t, "replica-path", replicaPath) require.Equal(t, map[string]struct{}{"storage-1": {}, "storage-2": {}}, storages) require.NoError(t, rs.RenameRepository(ctx, vs, "original-path", "storage-1", "new-path")) - replicaPath, storages, err = rs.GetConsistentStorages(ctx, vs, "new-path") + replicaPath, storages, _, err = rs.GetConsistentStorages(ctx, vs, "new-path") require.NoError(t, err) require.Equal(t, "new-path", replicaPath) require.Equal(t, map[string]struct{}{"storage-1": {}}, storages) - replicaPath, storages, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, storages, _, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) require.NoError(t, err) require.Equal(t, "new-path", replicaPath) require.Equal(t, map[string]struct{}{"storage-1": {}}, storages) require.NoError(t, rs.RenameRepository(ctx, vs, "original-path", "storage-2", "new-path")) - replicaPath, storages, err = rs.GetConsistentStorages(ctx, vs, "new-path") + replicaPath, storages, _, err = rs.GetConsistentStorages(ctx, vs, "new-path") require.NoError(t, err) require.Equal(t, "new-path", replicaPath) require.Equal(t, map[string]struct{}{"storage-1": {}, "storage-2": {}}, storages) - replicaPath, storages, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, storages, _, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) require.NoError(t, err) require.Equal(t, "new-path", replicaPath) require.Equal(t, map[string]struct{}{"storage-1": {}, "storage-2": {}}, storages) diff --git a/internal/praefect/datastore/storage_provider.go b/internal/praefect/datastore/storage_provider.go index 1d0a3fb2869d8882999a399d032f6a6fefee7933..648e90afeb4b53c8d19248a0fb7c3e8c3f21d738 100644 --- a/internal/praefect/datastore/storage_provider.go +++ b/internal/praefect/datastore/storage_provider.go @@ -17,7 +17,7 @@ import ( // ConsistentStoragesGetter returns storages which contain the latest generation of a repository. type ConsistentStoragesGetter interface { // GetConsistentStorages returns the replica path and the set of up to date storages for the given repository keyed by virtual storage and relative path. - GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) + GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, int, error) } // errNotExistingVirtualStorage indicates that the requested virtual storage can't be found or not configured. @@ -133,7 +133,7 @@ func (c *CachingConsistentStoragesGetter) getCache(virtualStorage string) (*lru. return val, found } -func (c *CachingConsistentStoragesGetter) cacheMiss(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { +func (c *CachingConsistentStoragesGetter) cacheMiss(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, int, error) { c.cacheAccessTotal.WithLabelValues(virtualStorage, "miss").Inc() return c.csg.GetConsistentStorages(ctx, virtualStorage, relativePath) } @@ -165,7 +165,7 @@ func (c *CachingConsistentStoragesGetter) isCacheEnabled() bool { } // GetConsistentStorages returns the replica path and the set of up to date storages for the given repository keyed by virtual storage and relative path. -func (c *CachingConsistentStoragesGetter) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { +func (c *CachingConsistentStoragesGetter) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, int, error) { var cache *lru.Cache if c.isCacheEnabled() { @@ -177,20 +177,21 @@ func (c *CachingConsistentStoragesGetter) GetConsistentStorages(ctx context.Cont defer populationDone() if ok { c.cacheAccessTotal.WithLabelValues(virtualStorage, "hit").Inc() - return value.replicaPath, value.storages, nil + return value.replicaPath, value.storages, value.generation, nil } } - replicaPath, storages, err := c.cacheMiss(ctx, virtualStorage, relativePath) + replicaPath, storages, generation, err := c.cacheMiss(ctx, virtualStorage, relativePath) if err == nil && cache != nil { - cache.Add(relativePath, cacheValue{replicaPath: replicaPath, storages: storages}) + cache.Add(relativePath, cacheValue{replicaPath: replicaPath, storages: storages, generation: generation}) c.cacheAccessTotal.WithLabelValues(virtualStorage, "populate").Inc() } - return replicaPath, storages, err + return replicaPath, storages, generation, err } type cacheValue struct { replicaPath string + generation int storages map[string]struct{} } diff --git a/internal/praefect/datastore/storage_provider_test.go b/internal/praefect/datastore/storage_provider_test.go index 399ecc48af7e0b65d6b50679aaeaefc3c39195b7..b6a21bb4c6852024f10c3ad896720c1c6d78fb66 100644 --- a/internal/praefect/datastore/storage_provider_test.go +++ b/internal/praefect/datastore/storage_provider_test.go @@ -37,7 +37,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { cache.Connected() // empty cache should be populated - replicaPath, storages, err := cache.GetConsistentStorages(ctx, "unknown", "/repo/path") + replicaPath, storages, _, err := cache.GetConsistentStorages(ctx, "unknown", "/repo/path") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages) require.Equal(t, "replica-path", replicaPath) @@ -61,7 +61,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { cache.Connected() // empty cache should be populated - replicaPath, storages, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path") + replicaPath, storages, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages) require.Equal(t, "replica-path", replicaPath) @@ -75,7 +75,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { require.NoError(t, err) // populated cache should return cached value - replicaPath, storages, err = cache.GetConsistentStorages(ctx, "vs", "/repo/path") + replicaPath, storages, _, err = cache.GetConsistentStorages(ctx, "vs", "/repo/path") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages) require.Equal(t, "replica-path", replicaPath) @@ -99,7 +99,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { require.NoError(t, err) cache.Connected() - _, _, err = cache.GetConsistentStorages(ctx, "vs", "/repo/path") + _, _, _, err = cache.GetConsistentStorages(ctx, "vs", "/repo/path") require.Equal(t, commonerr.NewRepositoryNotFoundError("vs", "/repo/path"), err) // "populate" metric is not set as there was an error and we don't want this result to be cached @@ -126,7 +126,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { cache.Connected() // first access populates the cache - replicaPath, storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") + replicaPath, storages1, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages1) require.Equal(t, "replica-path", replicaPath) @@ -137,19 +137,19 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { expErr := json.Unmarshal([]byte(notification.Payload), new(struct{})) // second access omits cached data as caching should be disabled - replicaPath, storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") + replicaPath, storages2, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages2) require.Equal(t, "replica-path", replicaPath) // third access retrieves data and caches it - replicaPath, storages3, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") + replicaPath, storages3, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages3) require.Equal(t, "replica-path", replicaPath) // fourth access retrieves data from cache - replicaPath, storages4, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") + replicaPath, storages4, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages4) require.Equal(t, "replica-path", replicaPath) @@ -185,11 +185,11 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { cache.Connected() // first access populates the cache - replicaPath, path1Storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") + replicaPath, path1Storages1, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, path1Storages1) require.Equal(t, "replica-path-1", replicaPath) - replicaPath, path2Storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/2") + replicaPath, path2Storages1, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/2") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}}, path2Storages1) require.Equal(t, "replica-path-2", replicaPath) @@ -203,12 +203,12 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { ) // second access re-uses cached data for '/repo/path/1' - replicaPath1, path1Storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") + replicaPath1, path1Storages2, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, path1Storages2) require.Equal(t, "replica-path-1", replicaPath1) // second access populates the cache again for '/repo/path/2' - replicaPath, path2Storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/2") + replicaPath, path2Storages2, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/2") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}}, path2Storages2) require.Equal(t, "replica-path-2", replicaPath) @@ -235,7 +235,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { cache.Connected() // first access populates the cache - replicaPath, storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path") + replicaPath, storages1, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages1) require.Equal(t, "replica-path", replicaPath) @@ -244,7 +244,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { cache.Disconnect(assert.AnError) // second access retrieve data and doesn't populate the cache - replicaPath, storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path") + replicaPath, storages2, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages2) require.Equal(t, "replica-path", replicaPath) @@ -279,12 +279,12 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { switch i % 6 { case 0, 1: f = func() { - _, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") + _, _, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") assert.NoError(t, err) } case 2, 3: f = func() { - _, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/2") + _, _, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/2") assert.NoError(t, err) } case 4: diff --git a/internal/praefect/grpc-proxy/proxy/cache.go b/internal/praefect/grpc-proxy/proxy/cache.go new file mode 100644 index 0000000000000000000000000000000000000000..94efb26283f27f94bf05a06160967bf0bdf54fbf --- /dev/null +++ b/internal/praefect/grpc-proxy/proxy/cache.go @@ -0,0 +1,323 @@ +package proxy + +import ( + "container/list" + "io" + "os" + "strconv" + "sync" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" + "github.com/sirupsen/logrus" + + "github.com/prometheus/client_golang/prometheus" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +var cache *lru + +func init() { + cacheSize := os.Getenv("CACHE_SIZE") + if cacheSize == "" { + cacheSize = "8" + } + cacheSizeGB, err := strconv.Atoi(cacheSize) + if err != nil { + panic(err) + } + cache = newLRU(cacheSizeGB << 30) + prometheus.MustRegister(cache) +} + +type cacheData struct { + meta metadata.MD + frames []*frame + size int + error error +} + +type cacheStream struct { + grpc.ServerStream + cacheKey *string + data *cacheData + firstMessage bool + tooLong bool +} + +func tryCache(key string, serverStream grpc.ServerStream) (bool, error) { + cache.requestTotals.Inc() + cacheData, ok := cache.get(key) + if !ok { + cache.missTotals.Inc() + return false, nil + } + + ctxlogrus.AddFields(serverStream.Context(), logrus.Fields{"cache": "true"}) + + if cacheData.error != nil { + return true, cacheData.error + } + + cache.bytesFetchedtotals.Add(float64(cacheData.size)) + if err := serverStream.SendHeader(cacheData.meta); err != nil { + return false, err + } + for _, frame := range cacheData.frames { + f := frame + if err := serverStream.SendMsg(f); err != nil { + return false, err + } + } + return true, nil +} + +func newCacheGenStream(key *string, srv grpc.ServerStream) *cacheStream { + c := &cacheStream{ + cacheKey: key, + ServerStream: srv, + tooLong: false, + firstMessage: false, + } + + if key != nil { + c.data = &cacheData{ + size: 0, + frames: []*frame{}, + } + } + + return c +} + +func (s *cacheStream) RecvMsg(m interface{}) error { + if s.cacheKey == nil { + return s.ServerStream.RecvMsg(m) + } + + // We choose to use the first message of the client as cache key + // More than one message would invalidate the cache key + if !s.firstMessage { + s.firstMessage = true + } else { + s.cacheKey = nil + s.tooLong = true + } + + return s.ServerStream.RecvMsg(m) +} + +func (s *cacheStream) SendHeader(md metadata.MD) error { + if s.cacheKey != nil { + s.data.meta = md + } + return s.ServerStream.SendHeader(md) +} + +func (s *cacheStream) SendMsg(m interface{}) error { + if s.cacheKey != nil { + f := m.(*frame) + payload := make([]byte, len(f.payload)) + copy(payload, f.payload) + s.data.frames = append(s.data.frames, &frame{payload: payload}) + s.data.size += len(f.payload) + } + return s.ServerStream.SendMsg(m) +} + +func (s *cacheStream) Close(err error) { + if s.tooLong { + cache.missTooLongTotals.Inc() + cache.missTotals.Inc() + return + } + if s.cacheKey == nil { + return + } + + if err != nil && err != io.EOF { + status := status.Convert(err) + if status.Code() != codes.NotFound { + return + } + s.data.error = err + // Trying to compute the "error size" in memory (4 bytes for the int32) + s.data.size = len(status.Message()) + 4 + } + + cache.add(*s.cacheKey, s.data) + size := float64(s.data.size) + cache.cacheSizeHistogram.Observe(size) + cache.bytesStoredtotals.Add(size) +} + +type lru struct { + sync.RWMutex + cacheSizeHistogram prometheus.Histogram + cacheSize prometheus.Gauge + requestTotals prometheus.Counter + missTotals prometheus.Counter + bytesStoredtotals prometheus.Counter + bytesFetchedtotals prometheus.Counter + evictionIteratorSummary prometheus.Summary + missTooLongTotals prometheus.Counter + size int + currentSize int + evictList *list.List + items map[string]*list.Element +} + +// Describe is used to describe Prometheus metrics. +func (c *lru) Describe(descs chan<- *prometheus.Desc) { + c.cacheSizeHistogram.Describe(descs) + c.cacheSize.Describe(descs) + c.requestTotals.Describe(descs) + c.missTotals.Describe(descs) + c.missTooLongTotals.Describe(descs) + c.bytesStoredtotals.Describe(descs) + c.bytesFetchedtotals.Describe(descs) + c.evictionIteratorSummary.Describe(descs) +} + +// Collect is used to collect Prometheus metrics. +func (c *lru) Collect(metrics chan<- prometheus.Metric) { + c.cacheSizeHistogram.Collect(metrics) + c.cacheSize.Collect(metrics) + c.requestTotals.Collect(metrics) + c.missTotals.Collect(metrics) + c.missTooLongTotals.Collect(metrics) + c.bytesStoredtotals.Collect(metrics) + c.bytesFetchedtotals.Collect(metrics) + c.evictionIteratorSummary.Collect(metrics) +} + +type entry struct { + key string + value *cacheData +} + +func newLRU(size int) *lru { + c := &lru{ + size: size, + evictList: list.New(), + items: make(map[string]*list.Element), + + cacheSizeHistogram: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "gitaly_praefect_cache_size_histogram", + Help: "Cache size histogram", + Buckets: []float64{0, 100, 1_000, 10_000, 100_000, 1_000_000, 5_000_000, 10_000_000}, + }), + evictionIteratorSummary: prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "gitaly_praefect_eviction_iterator", + Help: "Eviction iterator sumarry", + }), + cacheSize: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "gitaly_praefect_cache_size", + Help: "Cache size", + }), + requestTotals: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "gitaly_praefect_cache_requests_total", + Help: "Total number of disk cache requests", + }, + ), + missTotals: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "gitaly_praefect_cache_miss_total", + Help: "Total number of cache misses", + }, + ), + missTooLongTotals: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "gitaly_praefect_cache_miss_too_long_total", + Help: "Total number of cache misses due to message too long", + }, + ), + bytesStoredtotals: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "gitaly_praefect_cache_bytes_stored_total", + Help: "Total number of disk cache bytes stored", + }, + ), + bytesFetchedtotals: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "gitaly_praefect_cache_bytes_fetched_total", + Help: "Total number of disk cache bytes fetched", + }, + ), + } + return c +} + +func (c *lru) add(key string, value *cacheData) { + c.RLock() + // Check for existing item + if ent, ok := c.items[key]; ok { + c.RUnlock() + c.Lock() + c.evictList.MoveToFront(ent) + ent.Value = &entry{key, value} + c.Unlock() + return + } + + c.RUnlock() + c.Lock() + + // Add new item + ent := &entry{key, value} + entry := c.evictList.PushFront(ent) + c.items[key] = entry + c.currentSize += value.size + c.cacheSize.Set(float64(c.currentSize)) + c.Unlock() + + for i := 0; ; i++ { + if c.currentSize > c.size { + c.removeOldest() + } else { + if i > 0 { + c.evictionIteratorSummary.Observe(float64(i)) + } + break + } + } +} + +func (c *lru) get(key string) (value *cacheData, ok bool) { + c.RLock() + if ent, ok := c.items[key]; ok { + c.RUnlock() + c.Lock() + c.evictList.MoveToFront(ent) + c.Unlock() + if ent.Value.(*entry) == nil { + return nil, false + } + return ent.Value.(*entry).value, true + } + c.RUnlock() + return +} + +func (c *lru) removeOldest() { + c.RLock() + ent := c.evictList.Back() + c.RUnlock() + if ent != nil { + c.removeElement(ent) + } +} + +func (c *lru) removeElement(e *list.Element) { + c.Lock() + defer c.Unlock() + c.evictList.Remove(e) + kv := e.Value.(*entry) + c.currentSize -= kv.value.size + c.cacheSize.Set(float64(c.currentSize)) + delete(c.items, kv.key) +} diff --git a/internal/praefect/grpc-proxy/proxy/cache_test.go b/internal/praefect/grpc-proxy/proxy/cache_test.go new file mode 100644 index 0000000000000000000000000000000000000000..44e892b6733342d9968b7790eccad73e1fb62383 --- /dev/null +++ b/internal/praefect/grpc-proxy/proxy/cache_test.go @@ -0,0 +1,44 @@ +package proxy + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestLRU(t *testing.T) { + l := newLRU(10) + + l.add("a", &cacheData{size: 1}) + l.add("b", &cacheData{size: 2}) + l.add("c", &cacheData{size: 3}) + l.add("d", &cacheData{size: 4}) + l.add("e", &cacheData{size: 5}) + + _, ok := l.get("a") + require.Falsef(t, ok, "should not be on LRU") + + _, ok = l.get("b") + require.Falsef(t, ok, "should not be on LRU") + + _, ok = l.get("c") + require.Falsef(t, ok, "should not be on LRU") + + _, ok = l.get("d") + require.Truef(t, ok, "should be on LRU") + + _, ok = l.get("e") + require.Truef(t, ok, "should be on LRU") + + l.add("f", &cacheData{size: 5}) + l.add("f", &cacheData{size: 5}) + l.add("g", &cacheData{size: 9}) + + // e should not be as it's the last one + _, ok = l.get("e") + require.Falsef(t, ok, "should not be on LRU") + + // e should not be as it's the last one + _, ok = l.get("f") + require.Falsef(t, ok, "should not be on LRU") +} diff --git a/internal/praefect/grpc-proxy/proxy/director.go b/internal/praefect/grpc-proxy/proxy/director.go index 5f887213c388470488ccf6703ca4078bd44a093b..a921761727d14a5e09f96c1f463b281162ef7c49 100644 --- a/internal/praefect/grpc-proxy/proxy/director.go +++ b/internal/praefect/grpc-proxy/proxy/director.go @@ -27,6 +27,7 @@ type StreamDirector func(ctx context.Context, fullMethodName string, peeker Stre // StreamParameters encapsulates streaming parameters the praefect coordinator returns to the // proxy handler type StreamParameters struct { + cacheKey *string primary Destination reqFinalizer func() error callOptions []grpc.CallOption @@ -49,8 +50,9 @@ type Destination struct { } // NewStreamParameters returns a new instance of StreamParameters -func NewStreamParameters(primary Destination, secondaries []Destination, reqFinalizer func() error, callOpts []grpc.CallOption) *StreamParameters { +func NewStreamParameters(primary Destination, secondaries []Destination, reqFinalizer func() error, callOpts []grpc.CallOption, cacheKey *string) *StreamParameters { return &StreamParameters{ + cacheKey: cacheKey, primary: primary, secondaries: secondaries, reqFinalizer: reqFinalizer, diff --git a/internal/praefect/grpc-proxy/proxy/examples_test.go b/internal/praefect/grpc-proxy/proxy/examples_test.go index b8d5759bdbef052737e3f22924ee9e8706550bb3..81833cc9f8b7a4d2c6f8ff6c9cb2ee5db62f973b 100644 --- a/internal/praefect/grpc-proxy/proxy/examples_test.go +++ b/internal/praefect/grpc-proxy/proxy/examples_test.go @@ -51,13 +51,13 @@ func ExampleStreamDirector() { return proxy.NewStreamParameters(proxy.Destination{ Conn: conn, Ctx: metadata.IncomingToOutgoing(ctx), - }, nil, nil, nil), err + }, nil, nil, nil, nil), err } else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" { conn, err := grpc.DialContext(ctx, "api-service.prod.svc.local", grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec()))) return proxy.NewStreamParameters(proxy.Destination{ Conn: conn, Ctx: metadata.IncomingToOutgoing(ctx), - }, nil, nil, nil), err + }, nil, nil, nil, nil), err } } return nil, status.Errorf(codes.Unimplemented, "Unknown method") diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go index 82c252526af19c3a565ec5fc0f785c50768d0a04..7f0b4be9ef256612b562dfae4407023d4ea50035 100644 --- a/internal/praefect/grpc-proxy/proxy/handler.go +++ b/internal/praefect/grpc-proxy/proxy/handler.go @@ -130,6 +130,18 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) (fina } }() + // Cache handling, if params contains a cache key, we try to use the cache + if params.cacheKey != nil { + ok, err := tryCache(*params.cacheKey, serverStream) + if ok { + return err + } + if err != nil { + failDestinationsWithError(params, fmt.Errorf("cache stream error: %w", err)) + return err + } + } + clientCtx, clientCancel := context.WithCancel(params.Primary().Ctx) defer clientCancel() // TODO(mwitkow): Add a `forwarded` header to metadata, https://en.wikipedia.org/wiki/X-Forwarded-For. @@ -168,8 +180,10 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) (fina // Explicitly *do not close* p2cErrChan and c2sErrChan, otherwise the select below will not terminate. // Channels do not have to be closed, it is just a control flow mechanism, see // https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ - c2sErrChan := forwardClientToServers(serverStream, allStreams) - p2cErrChan := forwardPrimaryToClient(primaryStream, serverStream) + // CacheGenStream is a cache generator that will wrap `serverStream` method in order to create a cache + cacheGenStream := newCacheGenStream(params.cacheKey, serverStream) + c2sErrChan := forwardClientToServers(cacheGenStream, allStreams) + p2cErrChan := forwardPrimaryToClient(primaryStream, cacheGenStream) secondaryErrChan := receiveSecondaryStreams(secondaryStreams) // We need to wait for the streams from the primary and secondaries. However, we don't need @@ -214,6 +228,9 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) (fina return status.Errorf(codes.Internal, "failed proxying c2s: %v", clientErr) } + // We only persist this cache if everything went ok. + cacheGenStream.Close(primaryErr) + if primaryErr != nil && primaryErr != io.EOF { // we must not propagate Gitaly errors into Sentry sentryhandler.MarkToSkip(serverStream.Context()) diff --git a/internal/praefect/grpc-proxy/proxy/handler_ext_test.go b/internal/praefect/grpc-proxy/proxy/handler_ext_test.go index 7e5234fcd5cf12354646d57cebb9ba09655fd390..5a323a20c0113612e728e5827e9005eca9d9dcd2 100644 --- a/internal/praefect/grpc-proxy/proxy/handler_ext_test.go +++ b/internal/praefect/grpc-proxy/proxy/handler_ext_test.go @@ -245,7 +245,7 @@ func setupProxy(t *testing.T) (context.Context, grpc_testing.TestServiceClient, md, ok := grpc_metadata.FromIncomingContext(ctx) if ok { if _, exists := md[rejectingMdKey]; exists { - return proxy.NewStreamParameters(proxy.Destination{Ctx: metadata.IncomingToOutgoing(ctx), Msg: payload}, nil, nil, nil), status.Errorf(codes.PermissionDenied, "testing rejection") + return proxy.NewStreamParameters(proxy.Destination{Ctx: metadata.IncomingToOutgoing(ctx), Msg: payload}, nil, nil, nil, nil), status.Errorf(codes.PermissionDenied, "testing rejection") } } @@ -254,7 +254,7 @@ func setupProxy(t *testing.T) (context.Context, grpc_testing.TestServiceClient, Ctx: metadata.IncomingToOutgoing(ctx), Conn: proxy2Server, Msg: payload, - }, nil, nil, nil), nil + }, nil, nil, nil, nil), nil } client2Proxy := newProxy(t, ctx, director, "mwitkow.testproto.TestService", "Ping") @@ -370,6 +370,7 @@ func TestProxyErrorPropagation(t *testing.T) { nil, func() error { return tc.requestFinalizerError }, nil, + nil, ), tc.directorError })), ) diff --git a/internal/praefect/grpc-proxy/proxy/peeker_test.go b/internal/praefect/grpc-proxy/proxy/peeker_test.go index 9b63653e87938715a83a9026eda4bf368c3d6284..d01aead138874e20c5ad739716f11611ddaa6386 100644 --- a/internal/praefect/grpc-proxy/proxy/peeker_test.go +++ b/internal/praefect/grpc-proxy/proxy/peeker_test.go @@ -50,7 +50,7 @@ func TestStreamPeeking(t *testing.T) { Ctx: metadata.IncomingToOutgoing(ctx), Conn: backendCC, Msg: peekedMessage, - }, nil, nil, nil), nil + }, nil, nil, nil, nil), nil } // The backend is supposed to still receive the message as expected without any modification @@ -123,7 +123,7 @@ func TestStreamInjecting(t *testing.T) { Ctx: metadata.IncomingToOutgoing(ctx), Conn: backendCC, Msg: replacedMessage, - }, nil, nil, nil), nil + }, nil, nil, nil, nil), nil } // Upon receiving the request the backend server should only ever see the changed request. diff --git a/internal/praefect/middleware/errorhandler_test.go b/internal/praefect/middleware/errorhandler_test.go index 9e221e7412cd62b41598a9df9b814d4b62d9fd75..3fcc8c5deb98a318c00f8eb3a32a299a3a2653aa 100644 --- a/internal/praefect/middleware/errorhandler_test.go +++ b/internal/praefect/middleware/errorhandler_test.go @@ -82,7 +82,7 @@ func TestStreamInterceptor(t *testing.T) { t.Cleanup(func() { testhelper.MustClose(t, cc) }) f, err := peeker.Peek() require.NoError(t, err) - return proxy.NewStreamParameters(proxy.Destination{Conn: cc, Ctx: ctx, Msg: f}, nil, func() error { return nil }, nil), nil + return proxy.NewStreamParameters(proxy.Destination{Conn: cc, Ctx: ctx, Msg: f}, nil, func() error { return nil }, nil, nil), nil })), } diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index 4a869299931c42206ef4370eedb6169d39f9aafa..174886816b173d32e1f19b98ee5fcfade3de68e4 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -262,7 +262,7 @@ func (n *Mgr) GetPrimary(ctx context.Context, virtualStorage string, _ int64) (s //nolint: revive,stylecheck // This is unintentionally missing documentation. func (n *Mgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPath string) (Node, error) { - _, upToDateStorages, err := n.csg.GetConsistentStorages(ctx, virtualStorageName, repoPath) + _, upToDateStorages, _, err := n.csg.GetConsistentStorages(ctx, virtualStorageName, repoPath) if err != nil && !errors.As(err, new(commonerr.RepositoryNotFoundError)) { return nil, err } diff --git a/internal/praefect/nodes/manager_test.go b/internal/praefect/nodes/manager_test.go index d59193d8baa69b3921ce92a1aff03e73a8d5dc2d..c25506c78be8df01e4487ac642714af586a41562 100644 --- a/internal/praefect/nodes/manager_test.go +++ b/internal/praefect/nodes/manager_test.go @@ -326,8 +326,8 @@ func TestMgr_GetSyncedNode(t *testing.T) { verify := func(failover bool, scenario func(t *testing.T, nm Manager, rs datastore.RepositoryStore)) func(*testing.T) { conf.Failover.Enabled = failover rs := datastore.MockRepositoryStore{ - GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { - return relativePath, consistentStorages, consistentSecondariesErr + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, int, error) { + return relativePath, consistentStorages, 0, consistentSecondariesErr }, } diff --git a/internal/praefect/router.go b/internal/praefect/router.go index 139bd62f923b6bd9d50aba086769dc3deb93ab68..22f5d558a6956e2b14134e32961af11734aa71bd 100644 --- a/internal/praefect/router.go +++ b/internal/praefect/router.go @@ -34,7 +34,8 @@ type RepositoryAccessorRoute struct { // ReplicaPath is the disk path where the replicas are stored. ReplicaPath string // Node contains the details of the node that should handle the request. - Node RouterNode + Node RouterNode + Generation int } func (r RepositoryAccessorRoute) addLogFields(ctx context.Context) { diff --git a/internal/praefect/router_node_manager.go b/internal/praefect/router_node_manager.go index 2f2a7a6e0931b8ab93d98e04a9dd5f00c3d00453..7d7756f78ad8bb4934636f71a304ee1bfa17a9f2 100644 --- a/internal/praefect/router_node_manager.go +++ b/internal/praefect/router_node_manager.go @@ -84,7 +84,7 @@ func (r *nodeManagerRouter) RouteRepositoryMutator(ctx context.Context, virtualS // set up a database, so the RepositoryStore here is always a mock. The mock doesn't know about the replica // paths of repositories and thus returns an empty string. This breaks the tests. Instead, we'll just keep // using the relative path in NodeManagerRouter. - _, consistentStorages, err := r.rs.GetConsistentStorages(ctx, virtualStorage, relativePath) + _, consistentStorages, _, err := r.rs.GetConsistentStorages(ctx, virtualStorage, relativePath) if err != nil && !errors.As(err, new(commonerr.RepositoryNotFoundError)) { return RepositoryMutatorRoute{}, fmt.Errorf("consistent storages: %w", err) } diff --git a/internal/praefect/router_per_repository.go b/internal/praefect/router_per_repository.go index 287d8d33b43a45bfe61282703245b6190efd127a..0922ad9959ab45b781358f58523b0e2829422805 100644 --- a/internal/praefect/router_per_repository.go +++ b/internal/praefect/router_per_repository.go @@ -153,7 +153,7 @@ func (r *PerRepositoryRouter) RouteRepositoryAccessor(ctx context.Context, virtu return RepositoryAccessorRoute{}, fmt.Errorf("get primary: %w", err) } - replicaPath, _, err := r.rs.GetConsistentStoragesByRepositoryID(ctx, repositoryID) + replicaPath, _, generation, err := r.rs.GetConsistentStoragesByRepositoryID(ctx, repositoryID) if err != nil { return RepositoryAccessorRoute{}, fmt.Errorf("get replica path: %w", err) } @@ -163,6 +163,7 @@ func (r *PerRepositoryRouter) RouteRepositoryAccessor(ctx context.Context, virtu return RepositoryAccessorRoute{ ReplicaPath: replicaPath, Node: node, + Generation: generation, }, nil } } @@ -170,7 +171,7 @@ func (r *PerRepositoryRouter) RouteRepositoryAccessor(ctx context.Context, virtu return RepositoryAccessorRoute{}, nodes.ErrPrimaryNotHealthy } - replicaPath, consistentStorages, err := r.csg.GetConsistentStorages(ctx, virtualStorage, relativePath) + replicaPath, consistentStorages, generation, err := r.csg.GetConsistentStorages(ctx, virtualStorage, relativePath) if err != nil { return RepositoryAccessorRoute{}, fmt.Errorf("consistent storages: %w", err) } @@ -192,6 +193,7 @@ func (r *PerRepositoryRouter) RouteRepositoryAccessor(ctx context.Context, virtu return RepositoryAccessorRoute{ ReplicaPath: replicaPath, Node: node, + Generation: generation, }, nil } @@ -239,7 +241,7 @@ func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtua return RepositoryMutatorRoute{}, nodes.ErrPrimaryNotHealthy } - replicaPath, consistentStorages, err := r.rs.GetConsistentStoragesByRepositoryID(ctx, repositoryID) + replicaPath, consistentStorages, _, err := r.rs.GetConsistentStoragesByRepositoryID(ctx, repositoryID) if err != nil { return RepositoryMutatorRoute{}, fmt.Errorf("consistent storages: %w", err) } diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go index c4c8eb5d017faeb50a42ee36f2d7015bd3c462c2..959649f42154c1c5b852ffdbb5c2e24fe7643b56 100644 --- a/internal/praefect/router_per_repository_test.go +++ b/internal/praefect/router_per_repository_test.go @@ -248,6 +248,7 @@ func TestPerRepositoryRouter_RouteRepositoryAccessor(t *testing.T) { require.Equal(t, RepositoryAccessorRoute{ ReplicaPath: relativePath, + Generation: 1, Node: RouterNode{ Storage: tc.node, Connection: conns[tc.virtualStorage][tc.node], diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index efecddbab2d92516c5476917c0076d6fdc52f230..543c1398aa03082321bf4fc8f9b95c0200989d1d 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -538,8 +538,8 @@ func TestRemoveRepository(t *testing.T) { cc, _, cleanup := runPraefectServer(t, ctx, praefectCfg, buildOptions{ withQueue: queueInterceptor, withRepoStore: datastore.MockRepositoryStore{ - GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { - return relativePath, nil, nil + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, int, error) { + return relativePath, nil, 0, nil }, }, withNodeMgr: nodeMgr, @@ -825,8 +825,8 @@ func TestProxyWrites(t *testing.T) { _, repo, _ := testcfg.BuildWithRepo(t) rs := datastore.MockRepositoryStore{ - GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { - return relativePath, map[string]struct{}{cfg0.Storages[0].Name: {}, cfg1.Storages[0].Name: {}, cfg2.Storages[0].Name: {}}, nil + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, int, error) { + return relativePath, map[string]struct{}{cfg0.Storages[0].Name: {}, cfg1.Storages[0].Name: {}, cfg2.Storages[0].Name: {}}, 0, nil }, }