From 0ff7e8266e8ea9ada4d0296d027a60bc5d6620ef Mon Sep 17 00:00:00 2001 From: Adrien Date: Thu, 4 Aug 2022 18:55:05 +0200 Subject: [PATCH] Adding a cache layer on top of Praefect Purpose of this PR is to create a cache layer on top of Gitaly avoiding calling gitaly nodes if no changes occurs. For that, we can use the generation from praefect database, each update increase the generation value, that will automatically invalidate the cache. In order to get the generation, we use the current query that get the list of node with the repository. We add the generation on the `select` query. Praefect use a internal cache + listening into db to invalidate this generation. Hash key is composed of: method / repo generation / serialized message Also handle NotFound as cache entry --- internal/praefect/cache_key.go | 14 + internal/praefect/coordinator.go | 16 +- internal/praefect/coordinator_test.go | 18 +- .../praefect/datastore/repository_store.go | 32 +- .../datastore/repository_store_bm_test.go | 2 +- .../datastore/repository_store_mock.go | 12 +- .../datastore/repository_store_test.go | 32 +- .../praefect/datastore/storage_provider.go | 15 +- .../datastore/storage_provider_test.go | 32 +- internal/praefect/grpc-proxy/proxy/cache.go | 323 ++++++++++++++++++ .../praefect/grpc-proxy/proxy/cache_test.go | 44 +++ .../praefect/grpc-proxy/proxy/director.go | 4 +- .../grpc-proxy/proxy/examples_test.go | 4 +- internal/praefect/grpc-proxy/proxy/handler.go | 21 +- .../grpc-proxy/proxy/handler_ext_test.go | 5 +- .../praefect/grpc-proxy/proxy/peeker_test.go | 4 +- .../praefect/middleware/errorhandler_test.go | 2 +- internal/praefect/nodes/manager.go | 2 +- internal/praefect/nodes/manager_test.go | 4 +- internal/praefect/router.go | 3 +- internal/praefect/router_node_manager.go | 2 +- internal/praefect/router_per_repository.go | 8 +- .../praefect/router_per_repository_test.go | 1 + internal/praefect/server_test.go | 8 +- 24 files changed, 510 insertions(+), 98 deletions(-) create mode 100644 internal/praefect/cache_key.go create mode 100644 internal/praefect/grpc-proxy/proxy/cache.go create mode 100644 internal/praefect/grpc-proxy/proxy/cache_test.go diff --git a/internal/praefect/cache_key.go b/internal/praefect/cache_key.go new file mode 100644 index 00000000000..e7593fe4896 --- /dev/null +++ b/internal/praefect/cache_key.go @@ -0,0 +1,14 @@ +package praefect + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" +) + +func getKey(call grpcCall, generation int) string { + h := sha256.New() + sprintf := fmt.Sprintf("%s-%d-%s", call.fullMethodName, generation, call.msg) + h.Write([]byte(sprintf)) + return hex.EncodeToString(h.Sum(nil)) +} diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 0e00c03c0f9..37da619d933 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -320,11 +320,17 @@ func (c *Coordinator) accessorStreamParameters(ctx context.Context, call grpcCal metrics.ReadDistribution.WithLabelValues(virtualStorage, route.Node.Storage).Inc() + var key *string + if call.fullMethodName != "/gitaly.SmartHTTPService/PostUploadPackWithSidechannel" { + cacheKey := getKey(call, route.Generation) + key = &cacheKey + } + return proxy.NewStreamParameters(proxy.Destination{ Ctx: streamParametersContext(ctx), Conn: route.Node.Connection, Msg: b, - }, nil, nil, nil), nil + }, nil, nil, nil, key), nil } func (c *Coordinator) registerTransaction(ctx context.Context, primary RouterNode, secondaries []RouterNode) (transactions.Transaction, transactions.CancelFunc, error) { @@ -517,7 +523,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall } return firstErr } - return proxy.NewStreamParameters(primaryDest, secondaryDests, reqFinalizer, nil), nil + return proxy.NewStreamParameters(primaryDest, secondaryDests, reqFinalizer, nil, nil), nil } // maintenanceStreamParameters returns stream parameters for a maintenance-style RPC. The RPC call @@ -582,7 +588,7 @@ func (c *Coordinator) maintenanceStreamParameters(ctx context.Context, call grpc } return nil - }, nil), nil + }, nil, nil), nil } // streamParametersContexts converts the contexts with incoming metadata into a context that is @@ -734,7 +740,7 @@ func (c *Coordinator) accessorStorageStreamParameters(ctx context.Context, mi pr Msg: b, } - return proxy.NewStreamParameters(primaryDest, nil, func() error { return nil }, nil), nil + return proxy.NewStreamParameters(primaryDest, nil, func() error { return nil }, nil, nil), nil } func (c *Coordinator) mutatorStorageStreamParameters(ctx context.Context, mi protoregistry.MethodInfo, msg proto.Message, virtualStorage string) (*proxy.StreamParameters, error) { @@ -768,7 +774,7 @@ func (c *Coordinator) mutatorStorageStreamParameters(ctx context.Context, mi pro secondaryDests[i] = proxy.Destination{Ctx: ctx, Conn: secondary.Connection, Msg: b} } - return proxy.NewStreamParameters(primaryDest, secondaryDests, func() error { return nil }, nil), nil + return proxy.NewStreamParameters(primaryDest, secondaryDests, func() error { return nil }, nil, nil), nil } // rewrittenRepositoryMessage rewrites the repository storages and relative paths. diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 19486f40cd2..9a33189c955 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -95,11 +95,11 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) { ctx := testhelper.Context(t) rs := datastore.MockRepositoryStore{ - GetConsistentStoragesFunc: func(context.Context, string, string) (string, map[string]struct{}, error) { + GetConsistentStoragesFunc: func(context.Context, string, string) (string, map[string]struct{}, int, error) { if tc.readOnly { - return "", map[string]struct{}{storage + "-other": {}}, nil + return "", map[string]struct{}{storage + "-other": {}}, 0, nil } - return "", map[string]struct{}{storage: {}}, nil + return "", map[string]struct{}{storage: {}}, 0, nil }, } @@ -323,8 +323,8 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) { } rs := datastore.MockRepositoryStore{ - GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { - return relativePath, map[string]struct{}{"primary": {}, "secondary": {}}, nil + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, int, error) { + return relativePath, map[string]struct{}{"primary": {}, "secondary": {}}, 0, nil }, } @@ -985,8 +985,8 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) { entry := testhelper.NewDiscardingLogEntry(t) repoStore := datastore.MockRepositoryStore{ - GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { - return relativePath, map[string]struct{}{primaryNodeConf.Storage: {}, secondaryNodeConf.Storage: {}}, nil + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, int, error) { + return relativePath, map[string]struct{}{primaryNodeConf.Storage: {}, secondaryNodeConf.Storage: {}}, 0, nil }, } @@ -1968,8 +1968,8 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) { GetReplicaPathFunc: func(ctx context.Context, repositoryID int64) (string, error) { return repoProto.GetRelativePath(), nil }, - GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { - return relativePath, map[string]struct{}{"primary": {}, "secondary-1": {}, "secondary-2": {}}, nil + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, int, error) { + return relativePath, map[string]struct{}{"primary": {}, "secondary-1": {}, "secondary-2": {}}, 0, nil }, }, }) diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go index 03e7a2d084e..11812255c35 100644 --- a/internal/praefect/datastore/repository_store.go +++ b/internal/praefect/datastore/repository_store.go @@ -122,7 +122,7 @@ type RepositoryStore interface { // RenameRepository which can be removed in a later release. RenameRepositoryInPlace(ctx context.Context, virtualStorage, relativePath, newRelativePath string) error // GetConsistentStoragesByRepositoryID returns the replica path and the set of up to date storages for the given repository keyed by repository ID. - GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error) + GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, map[string]struct{}, int, error) ConsistentStoragesGetter // RepositoryExists returns whether the repository exists on a virtual storage. RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error) @@ -608,44 +608,44 @@ AND storage = $3 } // GetConsistentStoragesByRepositoryID returns the replica path and the set of up to date storages for the given repository keyed by repository ID. -func (rs *PostgresRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error) { +func (rs *PostgresRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, map[string]struct{}, int, error) { return rs.getConsistentStorages(ctx, ` -SELECT replica_path, ARRAY_AGG(storage) +SELECT replica_path, ARRAY_AGG(storage), generation FROM repositories JOIN storage_repositories USING (repository_id, relative_path, generation) WHERE repository_id = $1 -GROUP BY replica_path +GROUP BY replica_path, generation `, repositoryID) } // GetConsistentStorages returns the replica path and the set of up to date storages for the given repository keyed by virtual storage and relative path. -func (rs *PostgresRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { - replicaPath, storages, err := rs.getConsistentStorages(ctx, ` -SELECT replica_path, ARRAY_AGG(storage) +func (rs *PostgresRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, int, error) { + replicaPath, storages, generation, err := rs.getConsistentStorages(ctx, ` +SELECT replica_path, ARRAY_AGG(storage), generation FROM repositories JOIN storage_repositories USING (repository_id, relative_path, generation) WHERE repositories.virtual_storage = $1 AND repositories.relative_path = $2 -GROUP BY replica_path +GROUP BY replica_path, generation `, virtualStorage, relativePath) if errors.Is(err, commonerr.ErrRepositoryNotFound) { - return "", nil, commonerr.NewRepositoryNotFoundError(virtualStorage, relativePath) + return "", nil, 0, commonerr.NewRepositoryNotFoundError(virtualStorage, relativePath) } - return replicaPath, storages, err + return replicaPath, storages, generation, err } // getConsistentStorages is a helper for querying the consistent storages by different keys. -func (rs *PostgresRepositoryStore) getConsistentStorages(ctx context.Context, query string, params ...interface{}) (string, map[string]struct{}, error) { +func (rs *PostgresRepositoryStore) getConsistentStorages(ctx context.Context, query string, params ...interface{}) (string, map[string]struct{}, int, error) { var replicaPath string var storages glsql.StringArray - - if err := rs.db.QueryRowContext(ctx, query, params...).Scan(&replicaPath, &storages); err != nil { + var generation int + if err := rs.db.QueryRowContext(ctx, query, params...).Scan(&replicaPath, &storages, &generation); err != nil { if errors.Is(err, sql.ErrNoRows) { - return "", nil, commonerr.ErrRepositoryNotFound + return "", nil, 0, commonerr.ErrRepositoryNotFound } - return "", nil, fmt.Errorf("query: %w", err) + return "", nil, 0, fmt.Errorf("query: %w", err) } result := storages.Slice() @@ -654,7 +654,7 @@ func (rs *PostgresRepositoryStore) getConsistentStorages(ctx context.Context, qu consistentStorages[storage] = struct{}{} } - return replicaPath, consistentStorages, nil + return replicaPath, consistentStorages, generation, nil } //nolint: revive,stylecheck // This is unintentionally missing documentation. diff --git a/internal/praefect/datastore/repository_store_bm_test.go b/internal/praefect/datastore/repository_store_bm_test.go index 3f982e4176e..deb04cf4667 100644 --- a/internal/praefect/datastore/repository_store_bm_test.go +++ b/internal/praefect/datastore/repository_store_bm_test.go @@ -66,7 +66,7 @@ func benchmarkGetConsistentStorages(b *testing.B, nstorages, nrepositories int) require.NoError(b, err) b.StartTimer() - _, _, err = repoStore.GetConsistentStorages(ctx, "vs", "/path/repo/"+strconv.Itoa(nrepositories/2)) + _, _, _, err = repoStore.GetConsistentStorages(ctx, "vs", "/path/repo/"+strconv.Itoa(nrepositories/2)) b.StopTimer() require.NoError(b, err) diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go index 42acafb6ecd..23148f81925 100644 --- a/internal/praefect/datastore/repository_store_mock.go +++ b/internal/praefect/datastore/repository_store_mock.go @@ -16,8 +16,8 @@ type MockRepositoryStore struct { DeleteReplicaFunc func(ctx context.Context, repositoryID int64, storage string) error RenameRepositoryInPlaceFunc func(ctx context.Context, virtualStorage, relativePath, newRelativePath string) error RenameRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error - GetConsistentStoragesByRepositoryIDFunc func(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error) - GetConsistentStoragesFunc func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) + GetConsistentStoragesByRepositoryIDFunc func(ctx context.Context, repositoryID int64) (string, map[string]struct{}, int, error) + GetConsistentStoragesFunc func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, int, error) GetPartiallyAvailableRepositoriesFunc func(ctx context.Context, virtualStorage string) ([]RepositoryMetadata, error) DeleteInvalidRepositoryFunc func(ctx context.Context, repositoryID int64, storage string) error RepositoryExistsFunc func(ctx context.Context, virtualStorage, relativePath string) (bool, error) @@ -115,18 +115,18 @@ func (m MockRepositoryStore) RenameRepository(ctx context.Context, virtualStorag } // GetConsistentStoragesByRepositoryID returns result of execution of the GetConsistentStoragesByRepositoryIDFunc field if it is set or an empty map. -func (m MockRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error) { +func (m MockRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, map[string]struct{}, int, error) { if m.GetConsistentStoragesFunc == nil { - return "", map[string]struct{}{}, nil + return "", map[string]struct{}{}, 0, nil } return m.GetConsistentStoragesByRepositoryIDFunc(ctx, repositoryID) } // GetConsistentStorages returns result of execution of the GetConsistentStoragesFunc field if it is set or an empty map. -func (m MockRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { +func (m MockRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, int, error) { if m.GetConsistentStoragesFunc == nil { - return "", map[string]struct{}{}, nil + return "", map[string]struct{}{}, 0, nil } return m.GetConsistentStoragesFunc(ctx, virtualStorage, relativePath) diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go index 3b6ba6709dc..21a7ef343f2 100644 --- a/internal/praefect/datastore/repository_store_test.go +++ b/internal/praefect/datastore/repository_store_test.go @@ -880,12 +880,12 @@ func TestRepositoryStore_Postgres(t *testing.T) { }) t.Run("no records", func(t *testing.T) { - replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo) + replicaPath, secondaries, _, err := rs.GetConsistentStorages(ctx, vs, repo) require.Equal(t, commonerr.NewRepositoryNotFoundError(vs, repo), err) require.Empty(t, replicaPath) require.Empty(t, secondaries) - replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, secondaries, _, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) require.Equal(t, commonerr.ErrRepositoryNotFound, err) require.Empty(t, replicaPath) require.Empty(t, secondaries) @@ -912,12 +912,12 @@ func TestRepositoryStore_Postgres(t *testing.T) { ) t.Run("consistent secondary", func(t *testing.T) { - replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo) + replicaPath, secondaries, _, err := rs.GetConsistentStorages(ctx, vs, repo) require.NoError(t, err) require.Equal(t, map[string]struct{}{"primary": {}, "consistent-secondary": {}}, secondaries) require.Equal(t, "replica-path", replicaPath) - replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, secondaries, _, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) require.NoError(t, err) require.Equal(t, map[string]struct{}{"primary": {}, "consistent-secondary": {}}, secondaries) require.Equal(t, "replica-path", replicaPath) @@ -926,12 +926,12 @@ func TestRepositoryStore_Postgres(t *testing.T) { require.NoError(t, rs.SetGeneration(ctx, 1, "primary", repo, 0)) t.Run("outdated primary", func(t *testing.T) { - replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo) + replicaPath, secondaries, _, err := rs.GetConsistentStorages(ctx, vs, repo) require.NoError(t, err) require.Equal(t, map[string]struct{}{"consistent-secondary": {}}, secondaries) require.Equal(t, "replica-path", replicaPath) - replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, secondaries, _, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) require.NoError(t, err) require.Equal(t, map[string]struct{}{"consistent-secondary": {}}, secondaries) require.Equal(t, "replica-path", replicaPath) @@ -958,12 +958,12 @@ func TestRepositoryStore_Postgres(t *testing.T) { }, ) - replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo) + replicaPath, secondaries, _, err := rs.GetConsistentStorages(ctx, vs, repo) require.NoError(t, err) require.Equal(t, map[string]struct{}{"unknown": {}}, secondaries) require.Equal(t, "replica-path", replicaPath) - replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, secondaries, _, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) require.NoError(t, err) require.Equal(t, map[string]struct{}{"unknown": {}}, secondaries) require.Equal(t, "replica-path", replicaPath) @@ -974,12 +974,12 @@ func TestRepositoryStore_Postgres(t *testing.T) { require.NoError(t, err) requireState(t, ctx, db, virtualStorageState{}, storageState{}) - replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo) + replicaPath, secondaries, _, err := rs.GetConsistentStorages(ctx, vs, repo) require.Equal(t, commonerr.NewRepositoryNotFoundError(vs, repo), err) require.Empty(t, secondaries) require.Empty(t, replicaPath) - replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, secondaries, _, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) require.Equal(t, commonerr.ErrRepositoryNotFound, err) require.Empty(t, secondaries) require.Empty(t, replicaPath) @@ -989,31 +989,31 @@ func TestRepositoryStore_Postgres(t *testing.T) { rs := newRepositoryStore(t, nil) require.NoError(t, rs.CreateRepository(ctx, 1, vs, "original-path", "replica-path", "storage-1", []string{"storage-2"}, nil, true, false)) - replicaPath, storages, err := rs.GetConsistentStorages(ctx, vs, "original-path") + replicaPath, storages, _, err := rs.GetConsistentStorages(ctx, vs, "original-path") require.NoError(t, err) require.Equal(t, "replica-path", replicaPath) require.Equal(t, map[string]struct{}{"storage-1": {}, "storage-2": {}}, storages) - replicaPath, storages, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, storages, _, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) require.NoError(t, err) require.Equal(t, "replica-path", replicaPath) require.Equal(t, map[string]struct{}{"storage-1": {}, "storage-2": {}}, storages) require.NoError(t, rs.RenameRepository(ctx, vs, "original-path", "storage-1", "new-path")) - replicaPath, storages, err = rs.GetConsistentStorages(ctx, vs, "new-path") + replicaPath, storages, _, err = rs.GetConsistentStorages(ctx, vs, "new-path") require.NoError(t, err) require.Equal(t, "new-path", replicaPath) require.Equal(t, map[string]struct{}{"storage-1": {}}, storages) - replicaPath, storages, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, storages, _, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) require.NoError(t, err) require.Equal(t, "new-path", replicaPath) require.Equal(t, map[string]struct{}{"storage-1": {}}, storages) require.NoError(t, rs.RenameRepository(ctx, vs, "original-path", "storage-2", "new-path")) - replicaPath, storages, err = rs.GetConsistentStorages(ctx, vs, "new-path") + replicaPath, storages, _, err = rs.GetConsistentStorages(ctx, vs, "new-path") require.NoError(t, err) require.Equal(t, "new-path", replicaPath) require.Equal(t, map[string]struct{}{"storage-1": {}, "storage-2": {}}, storages) - replicaPath, storages, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, storages, _, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) require.NoError(t, err) require.Equal(t, "new-path", replicaPath) require.Equal(t, map[string]struct{}{"storage-1": {}, "storage-2": {}}, storages) diff --git a/internal/praefect/datastore/storage_provider.go b/internal/praefect/datastore/storage_provider.go index 1d0a3fb2869..648e90afeb4 100644 --- a/internal/praefect/datastore/storage_provider.go +++ b/internal/praefect/datastore/storage_provider.go @@ -17,7 +17,7 @@ import ( // ConsistentStoragesGetter returns storages which contain the latest generation of a repository. type ConsistentStoragesGetter interface { // GetConsistentStorages returns the replica path and the set of up to date storages for the given repository keyed by virtual storage and relative path. - GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) + GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, int, error) } // errNotExistingVirtualStorage indicates that the requested virtual storage can't be found or not configured. @@ -133,7 +133,7 @@ func (c *CachingConsistentStoragesGetter) getCache(virtualStorage string) (*lru. return val, found } -func (c *CachingConsistentStoragesGetter) cacheMiss(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { +func (c *CachingConsistentStoragesGetter) cacheMiss(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, int, error) { c.cacheAccessTotal.WithLabelValues(virtualStorage, "miss").Inc() return c.csg.GetConsistentStorages(ctx, virtualStorage, relativePath) } @@ -165,7 +165,7 @@ func (c *CachingConsistentStoragesGetter) isCacheEnabled() bool { } // GetConsistentStorages returns the replica path and the set of up to date storages for the given repository keyed by virtual storage and relative path. -func (c *CachingConsistentStoragesGetter) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { +func (c *CachingConsistentStoragesGetter) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, int, error) { var cache *lru.Cache if c.isCacheEnabled() { @@ -177,20 +177,21 @@ func (c *CachingConsistentStoragesGetter) GetConsistentStorages(ctx context.Cont defer populationDone() if ok { c.cacheAccessTotal.WithLabelValues(virtualStorage, "hit").Inc() - return value.replicaPath, value.storages, nil + return value.replicaPath, value.storages, value.generation, nil } } - replicaPath, storages, err := c.cacheMiss(ctx, virtualStorage, relativePath) + replicaPath, storages, generation, err := c.cacheMiss(ctx, virtualStorage, relativePath) if err == nil && cache != nil { - cache.Add(relativePath, cacheValue{replicaPath: replicaPath, storages: storages}) + cache.Add(relativePath, cacheValue{replicaPath: replicaPath, storages: storages, generation: generation}) c.cacheAccessTotal.WithLabelValues(virtualStorage, "populate").Inc() } - return replicaPath, storages, err + return replicaPath, storages, generation, err } type cacheValue struct { replicaPath string + generation int storages map[string]struct{} } diff --git a/internal/praefect/datastore/storage_provider_test.go b/internal/praefect/datastore/storage_provider_test.go index 399ecc48af7..b6a21bb4c68 100644 --- a/internal/praefect/datastore/storage_provider_test.go +++ b/internal/praefect/datastore/storage_provider_test.go @@ -37,7 +37,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { cache.Connected() // empty cache should be populated - replicaPath, storages, err := cache.GetConsistentStorages(ctx, "unknown", "/repo/path") + replicaPath, storages, _, err := cache.GetConsistentStorages(ctx, "unknown", "/repo/path") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages) require.Equal(t, "replica-path", replicaPath) @@ -61,7 +61,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { cache.Connected() // empty cache should be populated - replicaPath, storages, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path") + replicaPath, storages, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages) require.Equal(t, "replica-path", replicaPath) @@ -75,7 +75,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { require.NoError(t, err) // populated cache should return cached value - replicaPath, storages, err = cache.GetConsistentStorages(ctx, "vs", "/repo/path") + replicaPath, storages, _, err = cache.GetConsistentStorages(ctx, "vs", "/repo/path") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages) require.Equal(t, "replica-path", replicaPath) @@ -99,7 +99,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { require.NoError(t, err) cache.Connected() - _, _, err = cache.GetConsistentStorages(ctx, "vs", "/repo/path") + _, _, _, err = cache.GetConsistentStorages(ctx, "vs", "/repo/path") require.Equal(t, commonerr.NewRepositoryNotFoundError("vs", "/repo/path"), err) // "populate" metric is not set as there was an error and we don't want this result to be cached @@ -126,7 +126,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { cache.Connected() // first access populates the cache - replicaPath, storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") + replicaPath, storages1, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages1) require.Equal(t, "replica-path", replicaPath) @@ -137,19 +137,19 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { expErr := json.Unmarshal([]byte(notification.Payload), new(struct{})) // second access omits cached data as caching should be disabled - replicaPath, storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") + replicaPath, storages2, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages2) require.Equal(t, "replica-path", replicaPath) // third access retrieves data and caches it - replicaPath, storages3, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") + replicaPath, storages3, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages3) require.Equal(t, "replica-path", replicaPath) // fourth access retrieves data from cache - replicaPath, storages4, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") + replicaPath, storages4, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages4) require.Equal(t, "replica-path", replicaPath) @@ -185,11 +185,11 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { cache.Connected() // first access populates the cache - replicaPath, path1Storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") + replicaPath, path1Storages1, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, path1Storages1) require.Equal(t, "replica-path-1", replicaPath) - replicaPath, path2Storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/2") + replicaPath, path2Storages1, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/2") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}}, path2Storages1) require.Equal(t, "replica-path-2", replicaPath) @@ -203,12 +203,12 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { ) // second access re-uses cached data for '/repo/path/1' - replicaPath1, path1Storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") + replicaPath1, path1Storages2, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, path1Storages2) require.Equal(t, "replica-path-1", replicaPath1) // second access populates the cache again for '/repo/path/2' - replicaPath, path2Storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/2") + replicaPath, path2Storages2, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/2") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}}, path2Storages2) require.Equal(t, "replica-path-2", replicaPath) @@ -235,7 +235,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { cache.Connected() // first access populates the cache - replicaPath, storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path") + replicaPath, storages1, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages1) require.Equal(t, "replica-path", replicaPath) @@ -244,7 +244,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { cache.Disconnect(assert.AnError) // second access retrieve data and doesn't populate the cache - replicaPath, storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path") + replicaPath, storages2, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages2) require.Equal(t, "replica-path", replicaPath) @@ -279,12 +279,12 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { switch i % 6 { case 0, 1: f = func() { - _, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") + _, _, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") assert.NoError(t, err) } case 2, 3: f = func() { - _, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/2") + _, _, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/2") assert.NoError(t, err) } case 4: diff --git a/internal/praefect/grpc-proxy/proxy/cache.go b/internal/praefect/grpc-proxy/proxy/cache.go new file mode 100644 index 00000000000..94efb26283f --- /dev/null +++ b/internal/praefect/grpc-proxy/proxy/cache.go @@ -0,0 +1,323 @@ +package proxy + +import ( + "container/list" + "io" + "os" + "strconv" + "sync" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" + "github.com/sirupsen/logrus" + + "github.com/prometheus/client_golang/prometheus" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +var cache *lru + +func init() { + cacheSize := os.Getenv("CACHE_SIZE") + if cacheSize == "" { + cacheSize = "8" + } + cacheSizeGB, err := strconv.Atoi(cacheSize) + if err != nil { + panic(err) + } + cache = newLRU(cacheSizeGB << 30) + prometheus.MustRegister(cache) +} + +type cacheData struct { + meta metadata.MD + frames []*frame + size int + error error +} + +type cacheStream struct { + grpc.ServerStream + cacheKey *string + data *cacheData + firstMessage bool + tooLong bool +} + +func tryCache(key string, serverStream grpc.ServerStream) (bool, error) { + cache.requestTotals.Inc() + cacheData, ok := cache.get(key) + if !ok { + cache.missTotals.Inc() + return false, nil + } + + ctxlogrus.AddFields(serverStream.Context(), logrus.Fields{"cache": "true"}) + + if cacheData.error != nil { + return true, cacheData.error + } + + cache.bytesFetchedtotals.Add(float64(cacheData.size)) + if err := serverStream.SendHeader(cacheData.meta); err != nil { + return false, err + } + for _, frame := range cacheData.frames { + f := frame + if err := serverStream.SendMsg(f); err != nil { + return false, err + } + } + return true, nil +} + +func newCacheGenStream(key *string, srv grpc.ServerStream) *cacheStream { + c := &cacheStream{ + cacheKey: key, + ServerStream: srv, + tooLong: false, + firstMessage: false, + } + + if key != nil { + c.data = &cacheData{ + size: 0, + frames: []*frame{}, + } + } + + return c +} + +func (s *cacheStream) RecvMsg(m interface{}) error { + if s.cacheKey == nil { + return s.ServerStream.RecvMsg(m) + } + + // We choose to use the first message of the client as cache key + // More than one message would invalidate the cache key + if !s.firstMessage { + s.firstMessage = true + } else { + s.cacheKey = nil + s.tooLong = true + } + + return s.ServerStream.RecvMsg(m) +} + +func (s *cacheStream) SendHeader(md metadata.MD) error { + if s.cacheKey != nil { + s.data.meta = md + } + return s.ServerStream.SendHeader(md) +} + +func (s *cacheStream) SendMsg(m interface{}) error { + if s.cacheKey != nil { + f := m.(*frame) + payload := make([]byte, len(f.payload)) + copy(payload, f.payload) + s.data.frames = append(s.data.frames, &frame{payload: payload}) + s.data.size += len(f.payload) + } + return s.ServerStream.SendMsg(m) +} + +func (s *cacheStream) Close(err error) { + if s.tooLong { + cache.missTooLongTotals.Inc() + cache.missTotals.Inc() + return + } + if s.cacheKey == nil { + return + } + + if err != nil && err != io.EOF { + status := status.Convert(err) + if status.Code() != codes.NotFound { + return + } + s.data.error = err + // Trying to compute the "error size" in memory (4 bytes for the int32) + s.data.size = len(status.Message()) + 4 + } + + cache.add(*s.cacheKey, s.data) + size := float64(s.data.size) + cache.cacheSizeHistogram.Observe(size) + cache.bytesStoredtotals.Add(size) +} + +type lru struct { + sync.RWMutex + cacheSizeHistogram prometheus.Histogram + cacheSize prometheus.Gauge + requestTotals prometheus.Counter + missTotals prometheus.Counter + bytesStoredtotals prometheus.Counter + bytesFetchedtotals prometheus.Counter + evictionIteratorSummary prometheus.Summary + missTooLongTotals prometheus.Counter + size int + currentSize int + evictList *list.List + items map[string]*list.Element +} + +// Describe is used to describe Prometheus metrics. +func (c *lru) Describe(descs chan<- *prometheus.Desc) { + c.cacheSizeHistogram.Describe(descs) + c.cacheSize.Describe(descs) + c.requestTotals.Describe(descs) + c.missTotals.Describe(descs) + c.missTooLongTotals.Describe(descs) + c.bytesStoredtotals.Describe(descs) + c.bytesFetchedtotals.Describe(descs) + c.evictionIteratorSummary.Describe(descs) +} + +// Collect is used to collect Prometheus metrics. +func (c *lru) Collect(metrics chan<- prometheus.Metric) { + c.cacheSizeHistogram.Collect(metrics) + c.cacheSize.Collect(metrics) + c.requestTotals.Collect(metrics) + c.missTotals.Collect(metrics) + c.missTooLongTotals.Collect(metrics) + c.bytesStoredtotals.Collect(metrics) + c.bytesFetchedtotals.Collect(metrics) + c.evictionIteratorSummary.Collect(metrics) +} + +type entry struct { + key string + value *cacheData +} + +func newLRU(size int) *lru { + c := &lru{ + size: size, + evictList: list.New(), + items: make(map[string]*list.Element), + + cacheSizeHistogram: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "gitaly_praefect_cache_size_histogram", + Help: "Cache size histogram", + Buckets: []float64{0, 100, 1_000, 10_000, 100_000, 1_000_000, 5_000_000, 10_000_000}, + }), + evictionIteratorSummary: prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "gitaly_praefect_eviction_iterator", + Help: "Eviction iterator sumarry", + }), + cacheSize: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "gitaly_praefect_cache_size", + Help: "Cache size", + }), + requestTotals: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "gitaly_praefect_cache_requests_total", + Help: "Total number of disk cache requests", + }, + ), + missTotals: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "gitaly_praefect_cache_miss_total", + Help: "Total number of cache misses", + }, + ), + missTooLongTotals: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "gitaly_praefect_cache_miss_too_long_total", + Help: "Total number of cache misses due to message too long", + }, + ), + bytesStoredtotals: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "gitaly_praefect_cache_bytes_stored_total", + Help: "Total number of disk cache bytes stored", + }, + ), + bytesFetchedtotals: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "gitaly_praefect_cache_bytes_fetched_total", + Help: "Total number of disk cache bytes fetched", + }, + ), + } + return c +} + +func (c *lru) add(key string, value *cacheData) { + c.RLock() + // Check for existing item + if ent, ok := c.items[key]; ok { + c.RUnlock() + c.Lock() + c.evictList.MoveToFront(ent) + ent.Value = &entry{key, value} + c.Unlock() + return + } + + c.RUnlock() + c.Lock() + + // Add new item + ent := &entry{key, value} + entry := c.evictList.PushFront(ent) + c.items[key] = entry + c.currentSize += value.size + c.cacheSize.Set(float64(c.currentSize)) + c.Unlock() + + for i := 0; ; i++ { + if c.currentSize > c.size { + c.removeOldest() + } else { + if i > 0 { + c.evictionIteratorSummary.Observe(float64(i)) + } + break + } + } +} + +func (c *lru) get(key string) (value *cacheData, ok bool) { + c.RLock() + if ent, ok := c.items[key]; ok { + c.RUnlock() + c.Lock() + c.evictList.MoveToFront(ent) + c.Unlock() + if ent.Value.(*entry) == nil { + return nil, false + } + return ent.Value.(*entry).value, true + } + c.RUnlock() + return +} + +func (c *lru) removeOldest() { + c.RLock() + ent := c.evictList.Back() + c.RUnlock() + if ent != nil { + c.removeElement(ent) + } +} + +func (c *lru) removeElement(e *list.Element) { + c.Lock() + defer c.Unlock() + c.evictList.Remove(e) + kv := e.Value.(*entry) + c.currentSize -= kv.value.size + c.cacheSize.Set(float64(c.currentSize)) + delete(c.items, kv.key) +} diff --git a/internal/praefect/grpc-proxy/proxy/cache_test.go b/internal/praefect/grpc-proxy/proxy/cache_test.go new file mode 100644 index 00000000000..44e892b6733 --- /dev/null +++ b/internal/praefect/grpc-proxy/proxy/cache_test.go @@ -0,0 +1,44 @@ +package proxy + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestLRU(t *testing.T) { + l := newLRU(10) + + l.add("a", &cacheData{size: 1}) + l.add("b", &cacheData{size: 2}) + l.add("c", &cacheData{size: 3}) + l.add("d", &cacheData{size: 4}) + l.add("e", &cacheData{size: 5}) + + _, ok := l.get("a") + require.Falsef(t, ok, "should not be on LRU") + + _, ok = l.get("b") + require.Falsef(t, ok, "should not be on LRU") + + _, ok = l.get("c") + require.Falsef(t, ok, "should not be on LRU") + + _, ok = l.get("d") + require.Truef(t, ok, "should be on LRU") + + _, ok = l.get("e") + require.Truef(t, ok, "should be on LRU") + + l.add("f", &cacheData{size: 5}) + l.add("f", &cacheData{size: 5}) + l.add("g", &cacheData{size: 9}) + + // e should not be as it's the last one + _, ok = l.get("e") + require.Falsef(t, ok, "should not be on LRU") + + // e should not be as it's the last one + _, ok = l.get("f") + require.Falsef(t, ok, "should not be on LRU") +} diff --git a/internal/praefect/grpc-proxy/proxy/director.go b/internal/praefect/grpc-proxy/proxy/director.go index 5f887213c38..a921761727d 100644 --- a/internal/praefect/grpc-proxy/proxy/director.go +++ b/internal/praefect/grpc-proxy/proxy/director.go @@ -27,6 +27,7 @@ type StreamDirector func(ctx context.Context, fullMethodName string, peeker Stre // StreamParameters encapsulates streaming parameters the praefect coordinator returns to the // proxy handler type StreamParameters struct { + cacheKey *string primary Destination reqFinalizer func() error callOptions []grpc.CallOption @@ -49,8 +50,9 @@ type Destination struct { } // NewStreamParameters returns a new instance of StreamParameters -func NewStreamParameters(primary Destination, secondaries []Destination, reqFinalizer func() error, callOpts []grpc.CallOption) *StreamParameters { +func NewStreamParameters(primary Destination, secondaries []Destination, reqFinalizer func() error, callOpts []grpc.CallOption, cacheKey *string) *StreamParameters { return &StreamParameters{ + cacheKey: cacheKey, primary: primary, secondaries: secondaries, reqFinalizer: reqFinalizer, diff --git a/internal/praefect/grpc-proxy/proxy/examples_test.go b/internal/praefect/grpc-proxy/proxy/examples_test.go index b8d5759bdbe..81833cc9f8b 100644 --- a/internal/praefect/grpc-proxy/proxy/examples_test.go +++ b/internal/praefect/grpc-proxy/proxy/examples_test.go @@ -51,13 +51,13 @@ func ExampleStreamDirector() { return proxy.NewStreamParameters(proxy.Destination{ Conn: conn, Ctx: metadata.IncomingToOutgoing(ctx), - }, nil, nil, nil), err + }, nil, nil, nil, nil), err } else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" { conn, err := grpc.DialContext(ctx, "api-service.prod.svc.local", grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec()))) return proxy.NewStreamParameters(proxy.Destination{ Conn: conn, Ctx: metadata.IncomingToOutgoing(ctx), - }, nil, nil, nil), err + }, nil, nil, nil, nil), err } } return nil, status.Errorf(codes.Unimplemented, "Unknown method") diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go index 82c252526af..7f0b4be9ef2 100644 --- a/internal/praefect/grpc-proxy/proxy/handler.go +++ b/internal/praefect/grpc-proxy/proxy/handler.go @@ -130,6 +130,18 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) (fina } }() + // Cache handling, if params contains a cache key, we try to use the cache + if params.cacheKey != nil { + ok, err := tryCache(*params.cacheKey, serverStream) + if ok { + return err + } + if err != nil { + failDestinationsWithError(params, fmt.Errorf("cache stream error: %w", err)) + return err + } + } + clientCtx, clientCancel := context.WithCancel(params.Primary().Ctx) defer clientCancel() // TODO(mwitkow): Add a `forwarded` header to metadata, https://en.wikipedia.org/wiki/X-Forwarded-For. @@ -168,8 +180,10 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) (fina // Explicitly *do not close* p2cErrChan and c2sErrChan, otherwise the select below will not terminate. // Channels do not have to be closed, it is just a control flow mechanism, see // https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ - c2sErrChan := forwardClientToServers(serverStream, allStreams) - p2cErrChan := forwardPrimaryToClient(primaryStream, serverStream) + // CacheGenStream is a cache generator that will wrap `serverStream` method in order to create a cache + cacheGenStream := newCacheGenStream(params.cacheKey, serverStream) + c2sErrChan := forwardClientToServers(cacheGenStream, allStreams) + p2cErrChan := forwardPrimaryToClient(primaryStream, cacheGenStream) secondaryErrChan := receiveSecondaryStreams(secondaryStreams) // We need to wait for the streams from the primary and secondaries. However, we don't need @@ -214,6 +228,9 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) (fina return status.Errorf(codes.Internal, "failed proxying c2s: %v", clientErr) } + // We only persist this cache if everything went ok. + cacheGenStream.Close(primaryErr) + if primaryErr != nil && primaryErr != io.EOF { // we must not propagate Gitaly errors into Sentry sentryhandler.MarkToSkip(serverStream.Context()) diff --git a/internal/praefect/grpc-proxy/proxy/handler_ext_test.go b/internal/praefect/grpc-proxy/proxy/handler_ext_test.go index 7e5234fcd5c..5a323a20c01 100644 --- a/internal/praefect/grpc-proxy/proxy/handler_ext_test.go +++ b/internal/praefect/grpc-proxy/proxy/handler_ext_test.go @@ -245,7 +245,7 @@ func setupProxy(t *testing.T) (context.Context, grpc_testing.TestServiceClient, md, ok := grpc_metadata.FromIncomingContext(ctx) if ok { if _, exists := md[rejectingMdKey]; exists { - return proxy.NewStreamParameters(proxy.Destination{Ctx: metadata.IncomingToOutgoing(ctx), Msg: payload}, nil, nil, nil), status.Errorf(codes.PermissionDenied, "testing rejection") + return proxy.NewStreamParameters(proxy.Destination{Ctx: metadata.IncomingToOutgoing(ctx), Msg: payload}, nil, nil, nil, nil), status.Errorf(codes.PermissionDenied, "testing rejection") } } @@ -254,7 +254,7 @@ func setupProxy(t *testing.T) (context.Context, grpc_testing.TestServiceClient, Ctx: metadata.IncomingToOutgoing(ctx), Conn: proxy2Server, Msg: payload, - }, nil, nil, nil), nil + }, nil, nil, nil, nil), nil } client2Proxy := newProxy(t, ctx, director, "mwitkow.testproto.TestService", "Ping") @@ -370,6 +370,7 @@ func TestProxyErrorPropagation(t *testing.T) { nil, func() error { return tc.requestFinalizerError }, nil, + nil, ), tc.directorError })), ) diff --git a/internal/praefect/grpc-proxy/proxy/peeker_test.go b/internal/praefect/grpc-proxy/proxy/peeker_test.go index 9b63653e879..d01aead1388 100644 --- a/internal/praefect/grpc-proxy/proxy/peeker_test.go +++ b/internal/praefect/grpc-proxy/proxy/peeker_test.go @@ -50,7 +50,7 @@ func TestStreamPeeking(t *testing.T) { Ctx: metadata.IncomingToOutgoing(ctx), Conn: backendCC, Msg: peekedMessage, - }, nil, nil, nil), nil + }, nil, nil, nil, nil), nil } // The backend is supposed to still receive the message as expected without any modification @@ -123,7 +123,7 @@ func TestStreamInjecting(t *testing.T) { Ctx: metadata.IncomingToOutgoing(ctx), Conn: backendCC, Msg: replacedMessage, - }, nil, nil, nil), nil + }, nil, nil, nil, nil), nil } // Upon receiving the request the backend server should only ever see the changed request. diff --git a/internal/praefect/middleware/errorhandler_test.go b/internal/praefect/middleware/errorhandler_test.go index 9e221e7412c..3fcc8c5deb9 100644 --- a/internal/praefect/middleware/errorhandler_test.go +++ b/internal/praefect/middleware/errorhandler_test.go @@ -82,7 +82,7 @@ func TestStreamInterceptor(t *testing.T) { t.Cleanup(func() { testhelper.MustClose(t, cc) }) f, err := peeker.Peek() require.NoError(t, err) - return proxy.NewStreamParameters(proxy.Destination{Conn: cc, Ctx: ctx, Msg: f}, nil, func() error { return nil }, nil), nil + return proxy.NewStreamParameters(proxy.Destination{Conn: cc, Ctx: ctx, Msg: f}, nil, func() error { return nil }, nil, nil), nil })), } diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index 4a869299931..174886816b1 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -262,7 +262,7 @@ func (n *Mgr) GetPrimary(ctx context.Context, virtualStorage string, _ int64) (s //nolint: revive,stylecheck // This is unintentionally missing documentation. func (n *Mgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPath string) (Node, error) { - _, upToDateStorages, err := n.csg.GetConsistentStorages(ctx, virtualStorageName, repoPath) + _, upToDateStorages, _, err := n.csg.GetConsistentStorages(ctx, virtualStorageName, repoPath) if err != nil && !errors.As(err, new(commonerr.RepositoryNotFoundError)) { return nil, err } diff --git a/internal/praefect/nodes/manager_test.go b/internal/praefect/nodes/manager_test.go index d59193d8baa..c25506c78be 100644 --- a/internal/praefect/nodes/manager_test.go +++ b/internal/praefect/nodes/manager_test.go @@ -326,8 +326,8 @@ func TestMgr_GetSyncedNode(t *testing.T) { verify := func(failover bool, scenario func(t *testing.T, nm Manager, rs datastore.RepositoryStore)) func(*testing.T) { conf.Failover.Enabled = failover rs := datastore.MockRepositoryStore{ - GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { - return relativePath, consistentStorages, consistentSecondariesErr + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, int, error) { + return relativePath, consistentStorages, 0, consistentSecondariesErr }, } diff --git a/internal/praefect/router.go b/internal/praefect/router.go index 139bd62f923..22f5d558a69 100644 --- a/internal/praefect/router.go +++ b/internal/praefect/router.go @@ -34,7 +34,8 @@ type RepositoryAccessorRoute struct { // ReplicaPath is the disk path where the replicas are stored. ReplicaPath string // Node contains the details of the node that should handle the request. - Node RouterNode + Node RouterNode + Generation int } func (r RepositoryAccessorRoute) addLogFields(ctx context.Context) { diff --git a/internal/praefect/router_node_manager.go b/internal/praefect/router_node_manager.go index 2f2a7a6e093..7d7756f78ad 100644 --- a/internal/praefect/router_node_manager.go +++ b/internal/praefect/router_node_manager.go @@ -84,7 +84,7 @@ func (r *nodeManagerRouter) RouteRepositoryMutator(ctx context.Context, virtualS // set up a database, so the RepositoryStore here is always a mock. The mock doesn't know about the replica // paths of repositories and thus returns an empty string. This breaks the tests. Instead, we'll just keep // using the relative path in NodeManagerRouter. - _, consistentStorages, err := r.rs.GetConsistentStorages(ctx, virtualStorage, relativePath) + _, consistentStorages, _, err := r.rs.GetConsistentStorages(ctx, virtualStorage, relativePath) if err != nil && !errors.As(err, new(commonerr.RepositoryNotFoundError)) { return RepositoryMutatorRoute{}, fmt.Errorf("consistent storages: %w", err) } diff --git a/internal/praefect/router_per_repository.go b/internal/praefect/router_per_repository.go index 287d8d33b43..0922ad9959a 100644 --- a/internal/praefect/router_per_repository.go +++ b/internal/praefect/router_per_repository.go @@ -153,7 +153,7 @@ func (r *PerRepositoryRouter) RouteRepositoryAccessor(ctx context.Context, virtu return RepositoryAccessorRoute{}, fmt.Errorf("get primary: %w", err) } - replicaPath, _, err := r.rs.GetConsistentStoragesByRepositoryID(ctx, repositoryID) + replicaPath, _, generation, err := r.rs.GetConsistentStoragesByRepositoryID(ctx, repositoryID) if err != nil { return RepositoryAccessorRoute{}, fmt.Errorf("get replica path: %w", err) } @@ -163,6 +163,7 @@ func (r *PerRepositoryRouter) RouteRepositoryAccessor(ctx context.Context, virtu return RepositoryAccessorRoute{ ReplicaPath: replicaPath, Node: node, + Generation: generation, }, nil } } @@ -170,7 +171,7 @@ func (r *PerRepositoryRouter) RouteRepositoryAccessor(ctx context.Context, virtu return RepositoryAccessorRoute{}, nodes.ErrPrimaryNotHealthy } - replicaPath, consistentStorages, err := r.csg.GetConsistentStorages(ctx, virtualStorage, relativePath) + replicaPath, consistentStorages, generation, err := r.csg.GetConsistentStorages(ctx, virtualStorage, relativePath) if err != nil { return RepositoryAccessorRoute{}, fmt.Errorf("consistent storages: %w", err) } @@ -192,6 +193,7 @@ func (r *PerRepositoryRouter) RouteRepositoryAccessor(ctx context.Context, virtu return RepositoryAccessorRoute{ ReplicaPath: replicaPath, Node: node, + Generation: generation, }, nil } @@ -239,7 +241,7 @@ func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtua return RepositoryMutatorRoute{}, nodes.ErrPrimaryNotHealthy } - replicaPath, consistentStorages, err := r.rs.GetConsistentStoragesByRepositoryID(ctx, repositoryID) + replicaPath, consistentStorages, _, err := r.rs.GetConsistentStoragesByRepositoryID(ctx, repositoryID) if err != nil { return RepositoryMutatorRoute{}, fmt.Errorf("consistent storages: %w", err) } diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go index c4c8eb5d017..959649f4215 100644 --- a/internal/praefect/router_per_repository_test.go +++ b/internal/praefect/router_per_repository_test.go @@ -248,6 +248,7 @@ func TestPerRepositoryRouter_RouteRepositoryAccessor(t *testing.T) { require.Equal(t, RepositoryAccessorRoute{ ReplicaPath: relativePath, + Generation: 1, Node: RouterNode{ Storage: tc.node, Connection: conns[tc.virtualStorage][tc.node], diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index efecddbab2d..543c1398aa0 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -538,8 +538,8 @@ func TestRemoveRepository(t *testing.T) { cc, _, cleanup := runPraefectServer(t, ctx, praefectCfg, buildOptions{ withQueue: queueInterceptor, withRepoStore: datastore.MockRepositoryStore{ - GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { - return relativePath, nil, nil + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, int, error) { + return relativePath, nil, 0, nil }, }, withNodeMgr: nodeMgr, @@ -825,8 +825,8 @@ func TestProxyWrites(t *testing.T) { _, repo, _ := testcfg.BuildWithRepo(t) rs := datastore.MockRepositoryStore{ - GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { - return relativePath, map[string]struct{}{cfg0.Storages[0].Name: {}, cfg1.Storages[0].Name: {}, cfg2.Storages[0].Name: {}}, nil + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, int, error) { + return relativePath, map[string]struct{}{cfg0.Storages[0].Name: {}, cfg1.Storages[0].Name: {}, cfg2.Storages[0].Name: {}}, 0, nil }, } -- GitLab