diff --git a/doc/design_ha.md b/doc/design_ha.md index e446dc85e8fda4cd0285a2ec03cc618b07073add..7e174612acaaacb6cdd585ac0a765f619b97fb93 100644 --- a/doc/design_ha.md +++ b/doc/design_ha.md @@ -11,7 +11,7 @@ The following terminology may be used within the context of the Gitaly HA projec - Praefect - a transparent front end to all Gitaly shards. This reverse proxy ensures that all gRPC calls are forwarded to the correct shard by consulting the coordinator. The reverse proxy also ensures that write actions are performed transactionally when needed. - etymology: from Latin praefectus for _a person appointed to any of various positions of command, authority, or superintendence, as a chief magistrate in ancient Rome or the chief administrative official of a department of France or Italy._ - [pronounced _pree-fect_](https://www.youtube.com/watch?v=MHszCZjPmTQ) -- Node (TODO: we probably need a similar latin name here) - performs the actual git read/write operations to/from disk. Has no knowledge of shards/prafects/coordinators just as the Gitaly service existed prior to HA. +- Backend Node (a.k.a. Gitaly) - performs the actual git read/write operations to/from disk. Has no knowledge of shards/prafects/coordinators just as the Gitaly service existed prior to HA. Sits behind a Praefect server in relation to a request, thus why we consider it a "backend" node. - RPC categories (#1496): - Accessor - a side effect free (or read-only) RPC; does not modify the git repo (!228) - Mutator - an RPC that modifies the data in the git repo (!228) diff --git a/internal/praefect/transaction/doc.go b/internal/praefect/transaction/doc.go new file mode 100644 index 0000000000000000000000000000000000000000..a90a456682eee9e8e9a86b13d0c9030960af2a2b --- /dev/null +++ b/internal/praefect/transaction/doc.go @@ -0,0 +1,10 @@ +/*Package transaction provides transaction management functionality to +coordinate one-to-many clients attempting to modify the shards concurrently. + +While Git is distributed in nature, there are some repository wide data points +that can conflict between replicas if something goes wrong. This includes +references, which is why the transaction manager provides an API that allows +an RPC transaction to read/write lock the references being accessed to prevent +contention. +*/ +package transaction diff --git a/internal/praefect/transaction/manager.go b/internal/praefect/transaction/manager.go new file mode 100644 index 0000000000000000000000000000000000000000..7fe38442e1ef70eed6e0201d628b3d6b752da6f5 --- /dev/null +++ b/internal/praefect/transaction/manager.go @@ -0,0 +1,156 @@ +package transaction + +import ( + "context" + "sync" + + "golang.org/x/sync/errgroup" +) + +// Verifier verifies the project repository state by performing a checksum +type Verifier interface { + // CheckSum will return checksum of all refs for a repo + CheckSum(context.Context, Repository) ([]byte, error) +} + +// Coordinator allows the transaction manager to look up the shard for a repo +// at the beginning of each transaction +type Coordinator interface { + FetchShard(ctx context.Context, repo Repository) (*Shard, error) +} + +// ReplicationManager provides services to handle degraded nodes +type ReplicationManager interface { + NotifyDegradation(context.Context, Repository) error +} + +// Manager tracks the progress of RPCs being applied to multiple +// downstream servers that make up a shard. It prevents conflicts that may arise +// from contention between multiple clients trying to modify the same +// references. +type Manager struct { + mu sync.Mutex + shards map[string]*Shard // shards keyed by project + + coordinator Coordinator + replman ReplicationManager +} + +// NewManafer returns a manager for coordinating transaction between shards +// that are fetched from the coordinator. Any inconsistencies found will be +// reported to the provided replication manager. +func NewManager(c Coordinator, r ReplicationManager) *Manager { + return &Manager{ + shards: map[string]*Shard{}, + coordinator: c, + replman: r, + } +} + +// Mutate accepts a closure whose environment is a snapshot of the repository +// before the transaction begins. It is the responsibility of the closure +// author to mark all relevant assets being modified during a mutating +// transaction to ensure they are locked and protected from other closures +// modifying the same. +func (m *Manager) Mutate(ctx context.Context, repo Repository, fn func(Tx) error) error { + shard, err := m.coordinator.FetchShard(ctx, repo) + if err != nil { + return err + } + + // Prevent other clients from modifying the same shard until finished. + // TODO: serialize access at the reference scope: + // https://gitlab.com/gitlab-org/gitaly/issues/1530 + shard.lock.Lock() + defer shard.lock.Unlock() + + omits := make(map[string]struct{}) + + // TODO: some smart caching needs to be done to eliminate this check. We + // already check the shard consistency at the end of each transaction so + // we shouldn't need to do it twice except for the first time we fetch a + // shard. + good, bad, err := shard.validate(ctx, omits) + if err != nil { + return err + } + + // are a majority of the nodes good (consistent)? + if len(bad) >= len(good) { + return ErrShardInconsistent + } + + omits, err = m.notifyDegradations(ctx, repo, bad) + if err != nil { + return err + } + + tx := newTx(shard, good, txCatMutator) + + err = fn(tx) + if err != nil { + return err + } + + // make sure all changes made to replicas are consistent + good, bad, err = shard.validate(ctx, omits) + if err != nil { + return err + } + + _, err = m.notifyDegradations(ctx, repo, bad) + if err != nil { + return err + } + + return nil +} + +func (m *Manager) notifyDegradations(ctx context.Context, repo Repository, degradeds []Node) (map[string]struct{}, error) { + reported := make(map[string]struct{}) + + eg, eCtx := errgroup.WithContext(ctx) + for _, node := range degradeds { + node := node // rescope iterator var for goroutine closure + reported[node.Storage()] = struct{}{} + + eg.Go(func() error { + return m.replman.NotifyDegradation(eCtx, repo) + }) + } + + return reported, eg.Wait() +} + +func (m *Manager) Access(ctx context.Context, repo Repository, fn func(Tx) error) error { + shard, err := m.coordinator.FetchShard(ctx, repo) + if err != nil { + return err + } + + shard.lock.RLock() + defer shard.lock.RUnlock() + + // TODO: some smart caching needs to be done to eliminate this check. We + // already check the shard consistency at the end of each transaction so + // we shouldn't need to do it twice except for the first time we fetch a + // shard. + good, bad, err := shard.validate(ctx, map[string]struct{}{}) + if err != nil { + return err + } + + _, err = m.notifyDegradations(ctx, repo, bad) + if err != nil { + return err + } + + tx := newTx(shard, good, txCatAccessor) + + err = fn(tx) + if err != nil { + return err + } + + return nil +} diff --git a/internal/praefect/transaction/mock_test.go b/internal/praefect/transaction/mock_test.go new file mode 100644 index 0000000000000000000000000000000000000000..2aab6fcd3a42085ee579d6024d48bc6291c1aab8 --- /dev/null +++ b/internal/praefect/transaction/mock_test.go @@ -0,0 +1,79 @@ +package transaction_test + +import ( + "context" + "fmt" + "sync" + "testing" + + "gitlab.com/gitlab-org/gitaly/internal/praefect/transaction" +) + +type mockCoordinator struct { + shards map[transaction.Repository]*transaction.Shard +} + +func (mc mockCoordinator) FetchShard(_ context.Context, repo transaction.Repository) (*transaction.Shard, error) { + s, ok := mc.shards[repo] + if !ok { + return nil, fmt.Errorf("shard doesn't exist for repo %+v", repo) + } + + return s, nil +} + +type mockReplMan struct{} + +func (_ mockReplMan) NotifyDegradation(context.Context, transaction.Repository) error { return nil } + +type mockNode struct { + sync.RWMutex + + tb testing.TB + rpc *transaction.RPC + + // set the following values in test cases + storage string + checksums map[transaction.Repository][]string +} + +func (mn *mockNode) CheckSum(_ context.Context, repo transaction.Repository) ([]byte, error) { + mn.Lock() + defer mn.Unlock() + + checksums, ok := mn.checksums[repo] + if !ok { + panic( + fmt.Sprintf( + "test setup problem: missing checksums in mock node %s for repo %+v", + mn.storage, repo, + ), + ) + } + + if len(checksums) < 1 { + panic( + fmt.Sprintf( + "test setup problem: not enough checksums for mock node %s for repo %+v", + mn.storage, repo, + ), + ) + } + + cs := checksums[0] + mn.checksums[repo] = checksums[1:len(checksums)] + + mn.tb.Logf("mock node %s returning checksum %s for repo %s", mn.storage, cs, repo) + + return []byte(cs), nil +} + +func (mn *mockNode) ForwardRPC(_ context.Context, rpc *transaction.RPC) error { + mn.Lock() + mn.rpc = rpc + mn.Unlock() + + return nil +} + +func (mn *mockNode) Storage() string { return mn.storage } diff --git a/internal/praefect/transaction/shard.go b/internal/praefect/transaction/shard.go new file mode 100644 index 0000000000000000000000000000000000000000..a583c61a7fe660f81d33c6159d6333cf8e756e68 --- /dev/null +++ b/internal/praefect/transaction/shard.go @@ -0,0 +1,90 @@ +package transaction + +import ( + "context" + "errors" + "sync" + + "golang.org/x/sync/errgroup" +) + +// Shard represents a set of Gitaly replicas for a repository. Each shard has a +// designated primary and maintains locks to resources that can cause contention +// between clients writing to the same repo. +type Shard struct { + lock *sync.RWMutex // all mutations require a write lock + repo Repository // the repo this replica backs + primary string // the storage location of the primary node + storageReplicas map[string]Node // maps storage location to a replica +} + +func NewShard(r Repository, primary string, replicas []Node) *Shard { + sreps := make(map[string]Node) + for _, r := range replicas { + sreps[r.Storage()] = r + } + + return &Shard{ + repo: r, + primary: primary, + storageReplicas: sreps, + lock: new(sync.RWMutex), + } +} + +// ErrShardInconsistent indicates a mutating operation is unable to be executed +// due to lack of quorum of consistent nodes. +var ErrShardInconsistent = errors.New("majority of shard nodes are in inconsistent state") + +// validate will concurrently fetch the checksum of each node in the shard and +// compare against the primary. All replicas consistent with the primary will +// be returned as "good", and the rest "bad". If only a partial check needs to +// be done, a list of Node storage keys can be provided to exclude those from +// the checks. +func (s Shard) validate(ctx context.Context, omits map[string]struct{}) (good, bad []Node, err error) { + var ( + mu sync.RWMutex + checksums = map[string][]byte{} + ) + + eg, eCtx := errgroup.WithContext(ctx) + + for storage, node := range s.storageReplicas { + _, ok := omits[storage] + if ok { + continue + } + + storage := storage // rescope iterator vars + + eg.Go(func() error { + cs, err := node.CheckSum(eCtx, s.repo) + if err != nil { + return err + } + + mu.Lock() + checksums[storage] = cs + mu.Unlock() + + return nil + }) + + } + + if err := eg.Wait(); err != nil { + return nil, nil, err + } + + pCS := string(checksums[s.primary]) + for storage, cs := range checksums { + n := s.storageReplicas[storage] + if string(cs) != pCS { + bad = append(bad, n) + continue + } + good = append(good, n) + } + + return good, bad, nil +} diff --git a/internal/praefect/transaction/transaction.go b/internal/praefect/transaction/transaction.go new file mode 100644 index 0000000000000000000000000000000000000000..36bf0c00a1880eb84d8e6b7e27a9a546e9ee750b --- /dev/null +++ b/internal/praefect/transaction/transaction.go @@ -0,0 +1,95 @@ +package transaction + +import ( + "context" + "errors" + "sync" +) + +// Repository represents the identity and location of a repository as requested +// by the client +type Repository struct { + ProjectHash string + StorageLoc string // storage location +} + +// State represents the current state of a backend node +// Note: in the future this may be extended to include refs +type State struct { + Checksum []byte +} + +type RPC struct { + // @zj how do we abstract an RPC invocation? 🤔 +} + +type Node interface { + Storage() string // storage location the node hosts + ForwardRPC(ctx context.Context, rpc *RPC) error + CheckSum(context.Context, Repository) ([]byte, error) +} + +type txCategory int + +const ( + txCatAccessor = iota + txCatMutator +) + +type Tx struct { + category txCategory + + shard *Shard + + replicas map[string]Node // only nodes verified to be consistent + + // unlocks contains callbacks to unlock all locks acquired + unlocks struct { + *sync.RWMutex + m map[string]func() + } + + // refRollbacks contains all reference values before modification + refRollbacks map[string][]byte +} + +func newTx(shard *Shard, good []Node, txCat txCategory) Tx { + replicas := map[string]Node{} + for _, node := range good { + replicas[node.Storage()] = node + } + + return Tx{ + category: txCat, + shard: shard, + replicas: replicas, + unlocks: struct { + *sync.RWMutex + m map[string]func() + }{ + RWMutex: new(sync.RWMutex), + m: make(map[string]func()), + }, + } +} + +func (t Tx) rollback(ctx context.Context) error { + // for ref, value := range t.refRollbacks { + // + // } + return nil +} + +func (t Tx) Replicas() map[string]Node { + return t.replicas +} + +var ErrAccessorNotPermitted = errors.New("a mutator operation was attempted by an accessor") + +func (t Tx) Primary() (Node, error) { + if t.category != txCatMutator { + return nil, ErrAccessorNotPermitted + } + + return t.replicas[t.shard.primary], nil +} diff --git a/internal/praefect/transaction/transaction_test.go b/internal/praefect/transaction/transaction_test.go new file mode 100644 index 0000000000000000000000000000000000000000..ccbbae99d596acee61c7fe7e3b1e3c9ca75472a0 --- /dev/null +++ b/internal/praefect/transaction/transaction_test.go @@ -0,0 +1,115 @@ +package transaction_test + +import ( + "context" + "testing" + "time" + + "gitlab.com/gitlab-org/gitaly/internal/praefect/transaction" + + "github.com/stretchr/testify/require" +) + +const ( + projA = "project-A" + stor1 = "storage-1" + stor2 = "storage-2" + stor3 = "storage-3" +) + +type testCase struct { + name string + shards map[transaction.Repository]*transaction.Shard + txList []struct { + mutator bool + repo transaction.Repository + txFn func(transaction.Tx) error + } + expectErr error +} + +var ( + repo1 = transaction.Repository{ + ProjectHash: projA, + StorageLoc: stor1, + } + + testCases = []func(testing.TB) testCase{ + func(t testing.TB) testCase { + var ( + node1 = &mockNode{ + tb: t, + storage: stor1, + checksums: map[transaction.Repository][]string{ + repo1: []string{"1"}, + }, + } + ) + + return testCase{ + name: "one node shard: no-op access tx", + shards: map[transaction.Repository]*transaction.Shard{ + repo1: transaction.NewShard( + repo1, + node1.storage, + []transaction.Node{node1}, + ), + }, + txList: []struct { + mutator bool + repo transaction.Repository + txFn func(transaction.Tx) error + }{ + { + repo: repo1, + txFn: func(_ transaction.Tx) error { + t.Log("this is a no-op transaction") + // checksum should get consumed during consistency + // check + require.Len(t, node1.checksums[repo1], 0) + + return nil + }, + }, + }, + } + }, + } +) + +func TestManager(t *testing.T) { + for _, ttFn := range testCases { + tt := ttFn(t) + t.Run(tt.name, func(t *testing.T) { + var ( + mc = mockCoordinator{tt.shards} + mrm = mockReplMan{} + rm = transaction.NewManager(mc, mrm) + ) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + var err error + for _, tx := range tt.txList { + + if tx.mutator { + err = rm.Mutate(ctx, tx.repo, tx.txFn) + } else { + err = rm.Access(ctx, tx.repo, tx.txFn) + } + + if err != nil { + break + } + } + + if tt.expectErr != nil { + require.Error(t, err) + require.EqualError(t, err, tt.expectErr.Error()) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/vendor/golang.org/x/sync/errgroup/errgroup.go b/vendor/golang.org/x/sync/errgroup/errgroup.go new file mode 100644 index 0000000000000000000000000000000000000000..9857fe53d3c91c527ee9b363df2c6d3ed8c52576 --- /dev/null +++ b/vendor/golang.org/x/sync/errgroup/errgroup.go @@ -0,0 +1,66 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package errgroup provides synchronization, error propagation, and Context +// cancelation for groups of goroutines working on subtasks of a common task. +package errgroup + +import ( + "context" + "sync" +) + +// A Group is a collection of goroutines working on subtasks that are part of +// the same overall task. +// +// A zero Group is valid and does not cancel on error. +type Group struct { + cancel func() + + wg sync.WaitGroup + + errOnce sync.Once + err error +} + +// WithContext returns a new Group and an associated Context derived from ctx. +// +// The derived Context is canceled the first time a function passed to Go +// returns a non-nil error or the first time Wait returns, whichever occurs +// first. +func WithContext(ctx context.Context) (*Group, context.Context) { + ctx, cancel := context.WithCancel(ctx) + return &Group{cancel: cancel}, ctx +} + +// Wait blocks until all function calls from the Go method have returned, then +// returns the first non-nil error (if any) from them. +func (g *Group) Wait() error { + g.wg.Wait() + if g.cancel != nil { + g.cancel() + } + return g.err +} + +// Go calls the given function in a new goroutine. +// +// The first call to return a non-nil error cancels the group; its error will be +// returned by Wait. +func (g *Group) Go(f func() error) { + g.wg.Add(1) + + go func() { + defer g.wg.Done() + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel() + } + }) + } + }() +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 64ca3b25ba754cee2a7b5d1c4b7de1f289c76740..a8bf94f7d943a961abc8c0ba6e402ef3e1532245 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -588,6 +588,12 @@ "revision": "c44066c5c816ec500d459a2a324a753f78531ae0", "revisionTime": "2018-10-29T02:24:31Z" }, + { + "checksumSHA1": "iEK5hCRfrkdc1JOJsaiWuymHmeQ=", + "path": "golang.org/x/sync/errgroup", + "revision": "e225da77a7e68af35c70ccbf71af2b83e6acac3c", + "revisionTime": "2019-02-15T22:36:53Z" + }, { "checksumSHA1": "vcN67ZjTbGpLLwSghHCbAEvmzMo=", "path": "golang.org/x/sync/semaphore",