From cc11fcf106da33c16a84972aa9d9e6b72ec952c6 Mon Sep 17 00:00:00 2001 From: Paul Okstad Date: Mon, 25 Feb 2019 08:03:55 -0800 Subject: [PATCH 1/6] update design doc for replication --- doc/design_ha.md | 2 +- internal/praefect/replman.go | 47 ++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) create mode 100644 internal/praefect/replman.go diff --git a/doc/design_ha.md b/doc/design_ha.md index e446dc85e8f..7e174612aca 100644 --- a/doc/design_ha.md +++ b/doc/design_ha.md @@ -11,7 +11,7 @@ The following terminology may be used within the context of the Gitaly HA projec - Praefect - a transparent front end to all Gitaly shards. This reverse proxy ensures that all gRPC calls are forwarded to the correct shard by consulting the coordinator. The reverse proxy also ensures that write actions are performed transactionally when needed. - etymology: from Latin praefectus for _a person appointed to any of various positions of command, authority, or superintendence, as a chief magistrate in ancient Rome or the chief administrative official of a department of France or Italy._ - [pronounced _pree-fect_](https://www.youtube.com/watch?v=MHszCZjPmTQ) -- Node (TODO: we probably need a similar latin name here) - performs the actual git read/write operations to/from disk. Has no knowledge of shards/prafects/coordinators just as the Gitaly service existed prior to HA. +- Backend Node (a.k.a. Gitaly) - performs the actual git read/write operations to/from disk. Has no knowledge of shards/prafects/coordinators just as the Gitaly service existed prior to HA. Sits behind a Praefect server in relation to a request, thus why we consider it a "backend" node. - RPC categories (#1496): - Accessor - a side effect free (or read-only) RPC; does not modify the git repo (!228) - Mutator - an RPC that modifies the data in the git repo (!228) diff --git a/internal/praefect/replman.go b/internal/praefect/replman.go new file mode 100644 index 00000000000..e202c219eca --- /dev/null +++ b/internal/praefect/replman.go @@ -0,0 +1,47 @@ +package praefect + +// ReplicationManager tracks the progress of RPCs being applied to multiple +// downstream servers that make up a shard. +type ReplicationManager struct { + mu sync.Mutex + map[string]*Shard // maps a project to a shard +} + +type State struct { + Checksum []byte +} + +type Shard struct { + primary string // the storage location for the replica + + // Replicas maps a storage location to a state + replicas map[string]State +} + +func (s *Shard) isConsistent() bool { + for _, r := range s.Replicas { + + } +} + +type MutationTracker struct { + +} + + +func (mt *MutationTracker) Abort() { + +} + +func (mt *MutationTracker) Done() { + +} + +func (rm *ReplicationManager) StartMutation(ctx context.Context, project, storage string) { + +} + + + +func (rm *ReplMan) Access(ctx context.Context) + -- GitLab From 8ab8a9990da7da7d1bc50c7437de6dac4313caa5 Mon Sep 17 00:00:00 2001 From: Paul Okstad Date: Wed, 13 Mar 2019 00:28:01 -0700 Subject: [PATCH 2/6] PoC for transaction manager --- internal/praefect/transaction.go | 324 ++++++++++++++++++ vendor/golang.org/x/sync/errgroup/errgroup.go | 66 ++++ vendor/vendor.json | 6 + 3 files changed, 396 insertions(+) create mode 100644 internal/praefect/transaction.go create mode 100644 vendor/golang.org/x/sync/errgroup/errgroup.go diff --git a/internal/praefect/transaction.go b/internal/praefect/transaction.go new file mode 100644 index 00000000000..27ba57ba6b7 --- /dev/null +++ b/internal/praefect/transaction.go @@ -0,0 +1,324 @@ +/*Package praefect provides transaction management functionality to coordinate +one-to-many clients attempting to modify the shards concurrently. + +While Git is distributed in nature, there are some repository wide data points +that can conflict between replicas if something goes wrong. This includes +references, which is why the transaction manager provides an API that allows +an RPC transaction to read/write lock the references being accessed to prevent +contention. +*/ +package praefect + +import ( + "context" + "errors" + "sync" + + "golang.org/x/sync/errgroup" +) + +// Repository represents the identity and location of a repository as requested +// by the client +type Repository struct { + ProjectHash string + StorageLoc string // storage location +} + +// Verifier verifies the project repository state +type Verifier interface { + // CheckSum will return the checksum for a project in a storage location + CheckSum(context.Context, Repository) ([]byte, error) +} + +// ReplicationManager provides services to handle degraded nodes +type ReplicationManager interface { + NotifyDegradation(context.Context, Repository, Node) error +} + +// TransactionManager tracks the progress of RPCs being applied to multiple +// downstream servers that make up a shard. It prevents conflicts that may arise +// from contention between multiple clients trying to modify the same +// references. +type TransactionManager struct { + mu sync.Mutex + shards map[string]*Shard // shards keyed by project + + verifier Verifier + coordinator Coordinator + replman ReplicationManager +} + +func NewTransactionManager(v Verifier, c Coordinator, r ReplicationManager) *TransactionManager { + return &TransactionManager{ + shards: map[string]*Shard{}, + verifier: v, + coordinator: c, + replman: r, + } +} + +// State represents the current state of a backend node +// Note: in the future this may be extended to include refs +type State struct { + Checksum []byte +} + +type RPC struct { + // @zj how do we abstract an RPC invocation? 🤔 +} + +type Node interface { + Storage() string // storage location the node hosts + ForwardRPC(ctx context.Context, rpc *RPC) error +} + +// Shard represents +type Shard struct { + repo Repository + + primary string // the designated primary node name + + // Replicas maps a storage location to the node replicas + storageReplicas map[string]Node + + // refLocks coordinates changes between many clients attempting to mutate + // a reference + refLocks map[string]*sync.RWMutex + + // used to check shard for inconsistencies + verifier Verifier +} + +// ErrShardInconsistent indicates a mutating operation is unable to be executed +// due to lack of quorum of consistent nodes. +var ErrShardInconsistent = errors.New("majority of shard nodes are in inconsistent state") + +func (s Shard) validate(ctx context.Context) (good, bad []Node, err error) { + var ( + mu sync.RWMutex + checksums = map[string][]byte{} + ) + + eg, eCtx := errgroup.WithContext(ctx) + + for storage, _ := range s.storageReplicas { + storage := storage // rescope iterator vars + + eg.Go(func() error { + cs, err := s.verifier.CheckSum(eCtx, s.repo) + if err != nil { + return err + } + + mu.Lock() + checksums[storage] = cs + mu.Unlock() + + return nil + }) + + } + + if err := eg.Wait(); err != nil { + return nil, nil, err + } + + pCS := string(checksums[s.primary]) + for storage, cs := range checksums { + n := s.storageReplicas[storage] + if string(cs) != pCS { + bad = append(bad, n) + continue + } + good = append(good, n) + } + + return good, bad, nil +} + +// MutateTx represents the resources available during a mutator transaction +type MutateTx interface { + AccessTx + // LockRef acquires a write lock on a reference + LockRef(context.Context, string) error + Shard() *Shard +} + +// AccessTx represents the resources available during an accessor transaction +type AccessTx interface { + // Replicas returns all replicas without distinguishing the primary + Replicas() map[string]Node + // RLockRef acquires a read lock on a reference + RLockRef(context.Context, string) error +} + +type transaction struct { + shard *Shard + refLocks map[string]*sync.RWMutex +} + +func (t transaction) Shard() *Shard { return t.shard } + +// LockRef attempts to acquire a write lock on the reference name provided +// (ref). If the context is cancelled first, the lock acquisition will be +// aborted. +func (t transaction) LockRef(ctx context.Context, ref string) error { + lockQ := make(chan *sync.RWMutex) + + go func() { + l := t.shard.refLocks[ref] + l.Lock() + lockQ <- l + }() + + // ensure lockQ is consumed in all code paths so that goroutine doesn't + // stray + select { + + case <-ctx.Done(): + // unlock before aborting + l := <-lockQ + l.Unlock() + + return ctx.Err() + + case l := <-lockQ: + t.refLocks[ref] = l + + } + + return nil +} + +// unlockAll will unlock all acquired locks at the end of a transaction +func (t transaction) unlockAll() { + // unlock all refs + for _, rl := range t.refLocks { + rl.Unlock() + } +} + +func (t transaction) Replicas() map[string]Node { + return t.shard.storageReplicas +} + +// RLockRef attempts to acquire a read lock on the reference name provided +// (ref). If the context is cancelled first, the lock acquisition will be +// aborted. +func (t transaction) RLockRef(ctx context.Context, ref string) error { + lockQ := make(chan *sync.RWMutex) + + go func() { + l := t.shard.refLocks[ref] + l.RLock() + lockQ <- l + }() + + // ensure lockQ is consumed in all code paths so that goroutine doesn't + // stray + select { + + case <-ctx.Done(): + // unlock before aborting + l := <-lockQ + l.RUnlock() + + return ctx.Err() + + case l := <-lockQ: + t.refLocks[ref] = l + + } + + return nil +} + +// Mutate accepts a closure whose environment is a snapshot of the repository +// before the transaction begins. It is the responsibility of the closure +// author to mark all relevant assets being modified during a mutating +// transaction to ensure they are locked and protected from other closures +// modifying the same. +func (tm *TransactionManager) Mutate(ctx context.Context, repo Repository, fn func(MutateTx) error) error { + shard, err := tm.coordinator.FetchShard(ctx, repo) + if err != nil { + return err + } + + good, bad, err := shard.validate(ctx) + if err != nil { + return err + } + + // are a majority of the nodes good (consistent)? + if len(bad) >= len(good) { + return ErrShardInconsistent + } + + eg, eCtx := errgroup.WithContext(ctx) + for _, node := range bad { + node := node // rescope iterator var for goroutine closure + + eg.Go(func() error { + return tm.replman.NotifyDegradation(eCtx, repo, node) + }) + } + + tx := transaction{ + shard: shard, + refLocks: map[string]*sync.RWMutex{}, + } + defer tx.unlockAll() + + // run the transaction function + err = fn(tx) + if err != nil { + return err + } + + return nil +} + +func (tm *TransactionManager) Access(ctx context.Context, repo Repository, fn func(AccessTx) error) error { + shard, err := tm.coordinator.FetchShard(ctx, repo) + if err != nil { + return err + } + + good, bad, err := shard.validate(ctx) + if err != nil { + return err + } + + // are a majority of the nodes good (consistent)? + if len(bad) >= len(good) { + return ErrShardInconsistent + } + + eg, eCtx := errgroup.WithContext(ctx) + for _, node := range bad { + node := node // rescope iterator var for goroutine closure + + eg.Go(func() error { + return tm.replman.NotifyDegradation(eCtx, repo, node) + }) + } + + tx := transaction{ + shard: shard, + refLocks: map[string]*sync.RWMutex{}, + } + defer tx.unlockAll() + + // run the transaction function + err = fn(tx) + if err != nil { + return err + } + + return nil +} + +func (c *Coordinator) FetchShard(ctx context.Context, repo Repository) (*Shard, error) { + // TODO: move this to coordinator.go + return nil, nil +} diff --git a/vendor/golang.org/x/sync/errgroup/errgroup.go b/vendor/golang.org/x/sync/errgroup/errgroup.go new file mode 100644 index 00000000000..9857fe53d3c --- /dev/null +++ b/vendor/golang.org/x/sync/errgroup/errgroup.go @@ -0,0 +1,66 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package errgroup provides synchronization, error propagation, and Context +// cancelation for groups of goroutines working on subtasks of a common task. +package errgroup + +import ( + "context" + "sync" +) + +// A Group is a collection of goroutines working on subtasks that are part of +// the same overall task. +// +// A zero Group is valid and does not cancel on error. +type Group struct { + cancel func() + + wg sync.WaitGroup + + errOnce sync.Once + err error +} + +// WithContext returns a new Group and an associated Context derived from ctx. +// +// The derived Context is canceled the first time a function passed to Go +// returns a non-nil error or the first time Wait returns, whichever occurs +// first. +func WithContext(ctx context.Context) (*Group, context.Context) { + ctx, cancel := context.WithCancel(ctx) + return &Group{cancel: cancel}, ctx +} + +// Wait blocks until all function calls from the Go method have returned, then +// returns the first non-nil error (if any) from them. +func (g *Group) Wait() error { + g.wg.Wait() + if g.cancel != nil { + g.cancel() + } + return g.err +} + +// Go calls the given function in a new goroutine. +// +// The first call to return a non-nil error cancels the group; its error will be +// returned by Wait. +func (g *Group) Go(f func() error) { + g.wg.Add(1) + + go func() { + defer g.wg.Done() + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel() + } + }) + } + }() +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 64ca3b25ba7..a8bf94f7d94 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -588,6 +588,12 @@ "revision": "c44066c5c816ec500d459a2a324a753f78531ae0", "revisionTime": "2018-10-29T02:24:31Z" }, + { + "checksumSHA1": "iEK5hCRfrkdc1JOJsaiWuymHmeQ=", + "path": "golang.org/x/sync/errgroup", + "revision": "e225da77a7e68af35c70ccbf71af2b83e6acac3c", + "revisionTime": "2019-02-15T22:36:53Z" + }, { "checksumSHA1": "vcN67ZjTbGpLLwSghHCbAEvmzMo=", "path": "golang.org/x/sync/semaphore", -- GitLab From 3561b41098f34d364ccc544e4d54e9f8d1b953aa Mon Sep 17 00:00:00 2001 From: Paul Okstad Date: Wed, 13 Mar 2019 00:49:33 -0700 Subject: [PATCH 3/6] remove old replman --- internal/praefect/replman.go | 47 ------------------------------------ 1 file changed, 47 deletions(-) delete mode 100644 internal/praefect/replman.go diff --git a/internal/praefect/replman.go b/internal/praefect/replman.go deleted file mode 100644 index e202c219eca..00000000000 --- a/internal/praefect/replman.go +++ /dev/null @@ -1,47 +0,0 @@ -package praefect - -// ReplicationManager tracks the progress of RPCs being applied to multiple -// downstream servers that make up a shard. -type ReplicationManager struct { - mu sync.Mutex - map[string]*Shard // maps a project to a shard -} - -type State struct { - Checksum []byte -} - -type Shard struct { - primary string // the storage location for the replica - - // Replicas maps a storage location to a state - replicas map[string]State -} - -func (s *Shard) isConsistent() bool { - for _, r := range s.Replicas { - - } -} - -type MutationTracker struct { - -} - - -func (mt *MutationTracker) Abort() { - -} - -func (mt *MutationTracker) Done() { - -} - -func (rm *ReplicationManager) StartMutation(ctx context.Context, project, storage string) { - -} - - - -func (rm *ReplMan) Access(ctx context.Context) - -- GitLab From d9ff755e526298a03042d236c578b359802af459 Mon Sep 17 00:00:00 2001 From: Paul Okstad Date: Wed, 13 Mar 2019 11:03:31 -0700 Subject: [PATCH 4/6] move transaction to dedicated package Transaction will be easy to isolate and work as a WIP if we keep it in a dedicated package. Also, this means using interfaces heavily to avoid relying on future features (coordinator, replication). Coordinator is now an interface to abstract the requirements of transactions. --- .../praefect/{ => transaction}/transaction.go | 39 ++++++----- .../praefect/transaction/transaction_test.go | 64 +++++++++++++++++++ 2 files changed, 83 insertions(+), 20 deletions(-) rename internal/praefect/{ => transaction}/transaction.go (86%) create mode 100644 internal/praefect/transaction/transaction_test.go diff --git a/internal/praefect/transaction.go b/internal/praefect/transaction/transaction.go similarity index 86% rename from internal/praefect/transaction.go rename to internal/praefect/transaction/transaction.go index 27ba57ba6b7..9bc49bc24be 100644 --- a/internal/praefect/transaction.go +++ b/internal/praefect/transaction/transaction.go @@ -1,5 +1,5 @@ -/*Package praefect provides transaction management functionality to coordinate -one-to-many clients attempting to modify the shards concurrently. +/*Package transaction provides transaction management functionality to +coordinate one-to-many clients attempting to modify the shards concurrently. While Git is distributed in nature, there are some repository wide data points that can conflict between replicas if something goes wrong. This includes @@ -7,7 +7,7 @@ references, which is why the transaction manager provides an API that allows an RPC transaction to read/write lock the references being accessed to prevent contention. */ -package praefect +package transaction import ( "context" @@ -30,16 +30,22 @@ type Verifier interface { CheckSum(context.Context, Repository) ([]byte, error) } +// Coordinator allows the transaction manager to look up the shard for a repo +// at the beginning of each transaction +type Coordinator interface { + FetchShard(ctx context.Context, repo Repository) (*Shard, error) +} + // ReplicationManager provides services to handle degraded nodes type ReplicationManager interface { NotifyDegradation(context.Context, Repository, Node) error } -// TransactionManager tracks the progress of RPCs being applied to multiple +// Manager tracks the progress of RPCs being applied to multiple // downstream servers that make up a shard. It prevents conflicts that may arise // from contention between multiple clients trying to modify the same // references. -type TransactionManager struct { +type Manager struct { mu sync.Mutex shards map[string]*Shard // shards keyed by project @@ -48,8 +54,8 @@ type TransactionManager struct { replman ReplicationManager } -func NewTransactionManager(v Verifier, c Coordinator, r ReplicationManager) *TransactionManager { - return &TransactionManager{ +func NewManager(v Verifier, c Coordinator, r ReplicationManager) *Manager { + return &Manager{ shards: map[string]*Shard{}, verifier: v, coordinator: c, @@ -238,8 +244,8 @@ func (t transaction) RLockRef(ctx context.Context, ref string) error { // author to mark all relevant assets being modified during a mutating // transaction to ensure they are locked and protected from other closures // modifying the same. -func (tm *TransactionManager) Mutate(ctx context.Context, repo Repository, fn func(MutateTx) error) error { - shard, err := tm.coordinator.FetchShard(ctx, repo) +func (m *Manager) Mutate(ctx context.Context, repo Repository, fn func(MutateTx) error) error { + shard, err := m.coordinator.FetchShard(ctx, repo) if err != nil { return err } @@ -259,7 +265,7 @@ func (tm *TransactionManager) Mutate(ctx context.Context, repo Repository, fn fu node := node // rescope iterator var for goroutine closure eg.Go(func() error { - return tm.replman.NotifyDegradation(eCtx, repo, node) + return m.replman.NotifyDegradation(eCtx, repo, node) }) } @@ -269,7 +275,6 @@ func (tm *TransactionManager) Mutate(ctx context.Context, repo Repository, fn fu } defer tx.unlockAll() - // run the transaction function err = fn(tx) if err != nil { return err @@ -278,8 +283,8 @@ func (tm *TransactionManager) Mutate(ctx context.Context, repo Repository, fn fu return nil } -func (tm *TransactionManager) Access(ctx context.Context, repo Repository, fn func(AccessTx) error) error { - shard, err := tm.coordinator.FetchShard(ctx, repo) +func (m *Manager) Access(ctx context.Context, repo Repository, fn func(AccessTx) error) error { + shard, err := m.coordinator.FetchShard(ctx, repo) if err != nil { return err } @@ -299,7 +304,7 @@ func (tm *TransactionManager) Access(ctx context.Context, repo Repository, fn fu node := node // rescope iterator var for goroutine closure eg.Go(func() error { - return tm.replman.NotifyDegradation(eCtx, repo, node) + return m.replman.NotifyDegradation(eCtx, repo, node) }) } @@ -309,7 +314,6 @@ func (tm *TransactionManager) Access(ctx context.Context, repo Repository, fn fu } defer tx.unlockAll() - // run the transaction function err = fn(tx) if err != nil { return err @@ -317,8 +321,3 @@ func (tm *TransactionManager) Access(ctx context.Context, repo Repository, fn fu return nil } - -func (c *Coordinator) FetchShard(ctx context.Context, repo Repository) (*Shard, error) { - // TODO: move this to coordinator.go - return nil, nil -} diff --git a/internal/praefect/transaction/transaction_test.go b/internal/praefect/transaction/transaction_test.go new file mode 100644 index 00000000000..ca7064e41db --- /dev/null +++ b/internal/praefect/transaction/transaction_test.go @@ -0,0 +1,64 @@ +package transaction_test + +import ( + "context" + "testing" + + "gitlab.com/gitlab-org/gitaly/internal/praefect" +) + +func TestReplMan(t *testing.T) { + const ( + projA = "project-A" + stor1 = "storage-1" + stor2 = "storage-2" + stor3 = "storage-3" + ) + + mv := &mockVerifier{ + checksums: map[string]map[string][][]byte{ + projA: map[string][][]byte{ + stor1: {[]byte{1}}, + stor2: {[]byte{1}}, + stor3: {[]byte{1}}, + }, + }, + } + + // A replication manager needs to have the ability to verify the state of + // replicas, so it needs a Verifier. + rm := praefect.NewReplicationManager(mv) + + // replication managers are typically used within the context of a request + // when a mutator RPC is received. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + +} + +type mockVerifier struct { + // checksums contains ordered checksums keyed by project and then storage + checksums map[string]map[string][][]byte +} + +func (mv *mockVerifier) CheckSum(_ context.Context, project, storage string) ([]byte, error) { + storages, ok := mv.checksums[project] + if !ok { + panic("no project " + project) + } + + sums, ok := storages[storage] + if !ok { + panic("no storage " + storage) + } + + if len(sums) < 1 { + panic("no more checksums for " + project) + } + + // pop first checksum off list + var sum []byte + sum, mv.checksums[project][storage] = sums[len(sums)-1], sums[:len(sums)-1] + + return sum, nil +} -- GitLab From 7ac86407d32afd258cb4eb63be0454ded71f404f Mon Sep 17 00:00:00 2001 From: Paul Okstad Date: Wed, 13 Mar 2019 22:29:22 -0700 Subject: [PATCH 5/6] split up transaction file for maintainability --- internal/praefect/transaction/doc.go | 10 + internal/praefect/transaction/manager.go | 151 +++++++++ internal/praefect/transaction/shard.go | 101 ++++++ internal/praefect/transaction/transaction.go | 298 +++++------------- .../praefect/transaction/transaction_test.go | 35 +- 5 files changed, 365 insertions(+), 230 deletions(-) create mode 100644 internal/praefect/transaction/doc.go create mode 100644 internal/praefect/transaction/manager.go create mode 100644 internal/praefect/transaction/shard.go diff --git a/internal/praefect/transaction/doc.go b/internal/praefect/transaction/doc.go new file mode 100644 index 00000000000..a90a456682e --- /dev/null +++ b/internal/praefect/transaction/doc.go @@ -0,0 +1,10 @@ +/*Package transaction provides transaction management functionality to +coordinate one-to-many clients attempting to modify the shards concurrently. + +While Git is distributed in nature, there are some repository wide data points +that can conflict between replicas if something goes wrong. This includes +references, which is why the transaction manager provides an API that allows +an RPC transaction to read/write lock the references being accessed to prevent +contention. +*/ +package transaction diff --git a/internal/praefect/transaction/manager.go b/internal/praefect/transaction/manager.go new file mode 100644 index 00000000000..12a92d610ec --- /dev/null +++ b/internal/praefect/transaction/manager.go @@ -0,0 +1,151 @@ +package transaction + +import ( + "context" + "sync" + + "golang.org/x/sync/errgroup" +) + +// Verifier verifies the project repository state by performing a checksum +type Verifier interface { + // CheckSum will return checksum of all refs for a repo + CheckSum(context.Context, Repository) ([]byte, error) +} + +// Coordinator allows the transaction manager to look up the shard for a repo +// at the beginning of each transaction +type Coordinator interface { + FetchShard(ctx context.Context, repo Repository) (*Shard, error) +} + +// ReplicationManager provides services to handle degraded nodes +type ReplicationManager interface { + NotifyDegradation(context.Context, Repository) error +} + +// Manager tracks the progress of RPCs being applied to multiple +// downstream servers that make up a shard. It prevents conflicts that may arise +// from contention between multiple clients trying to modify the same +// references. +type Manager struct { + mu sync.Mutex + shards map[string]*Shard // shards keyed by project + + verifier Verifier + coordinator Coordinator + replman ReplicationManager +} + +// NewManafer returns a manager for coordinating transaction between shards +// that are fetched from the coordinator. Any inconsistencies found will be +// reported to the provided replication manager. +func NewManager(v Verifier, c Coordinator, r ReplicationManager) *Manager { + return &Manager{ + shards: map[string]*Shard{}, + verifier: v, + coordinator: c, + replman: r, + } +} + +// Mutate accepts a closure whose environment is a snapshot of the repository +// before the transaction begins. It is the responsibility of the closure +// author to mark all relevant assets being modified during a mutating +// transaction to ensure they are locked and protected from other closures +// modifying the same. +func (m *Manager) Mutate(ctx context.Context, repo Repository, fn func(MutateTx) error) error { + shard, err := m.coordinator.FetchShard(ctx, repo) + if err != nil { + return err + } + + omits := make(map[string]struct{}) + + // TODO: some smart caching needs to be done to eliminate this check. We + // already check the shard consistency at the end of each transaction so + // we shouldn't need to do it twice except for the first time we fetch a + // shard. + good, bad, err := shard.validate(ctx, omits) + if err != nil { + return err + } + + // are a majority of the nodes good (consistent)? + if len(bad) >= len(good) { + return ErrShardInconsistent + } + + omits, err = m.notifyDegradations(ctx, repo, bad) + if err != nil { + return err + } + + tx := newTransaction(shard, good) + defer tx.unlockAll() + + err = fn(tx) + if err != nil { + return err + } + + // make sure all changes made to replicas are consistent + good, bad, err = shard.validate(ctx, omits) + if err != nil { + return err + } + + _, err = m.notifyDegradations(ctx, repo, bad) + if err != nil { + return err + } + + return nil +} + +func (m *Manager) notifyDegradations(ctx context.Context, repo Repository, degradeds []Node) (map[string]struct{}, error) { + reported := make(map[string]struct{}) + + eg, eCtx := errgroup.WithContext(ctx) + for _, node := range degradeds { + node := node // rescope iterator var for goroutine closure + reported[node.Storage()] = struct{}{} + + eg.Go(func() error { + return m.replman.NotifyDegradation(eCtx, repo) + }) + } + + return reported, eg.Wait() +} + +func (m *Manager) Access(ctx context.Context, repo Repository, fn func(AccessTx) error) error { + shard, err := m.coordinator.FetchShard(ctx, repo) + if err != nil { + return err + } + + // TODO: some smart caching needs to be done to eliminate this check. We + // already check the shard consistency at the end of each transaction so + // we shouldn't need to do it twice except for the first time we fetch a + // shard. + good, bad, err := shard.validate(ctx, map[string]struct{}{}) + if err != nil { + return err + } + + _, err = m.notifyDegradations(ctx, repo, bad) + if err != nil { + return err + } + + tx := newTransaction(shard, good) + defer tx.unlockAll() + + err = fn(tx) + if err != nil { + return err + } + + return nil +} diff --git a/internal/praefect/transaction/shard.go b/internal/praefect/transaction/shard.go new file mode 100644 index 00000000000..af6ec2c29e3 --- /dev/null +++ b/internal/praefect/transaction/shard.go @@ -0,0 +1,101 @@ +package transaction + +import ( + "context" + "errors" + "sync" + + "golang.org/x/sync/errgroup" +) + +// Shard represents a set of Gitaly replicas for repository. Each shard has a +// designated primary and maintains locks to resources that can cause contention +// between clients writing to the same repo. +type Shard struct { + repo Repository + + primary string // the designated primary node name + + storageReplicas map[string]Node // maps storage location to a replica + + refLocks struct { + *sync.RWMutex + m map[string]*sync.RWMutex // maps ref name to a lock + } + + // used to check shard for inconsistencies + verifier Verifier +} + +func NewShard(r Repository, primary string, replicas []Node, v Verifier) *Shard { + return &Shard{ + repo: r, + primary: primary, + storageReplicas: make(map[string]Node), + refLocks: struct { + *sync.RWMutex + m map[string]*sync.RWMutex + }{ + RWMutex: new(sync.RWMutex), + m: make(map[string]*sync.RWMutex), + }, + verifier: v, + } +} + +// ErrShardInconsistent indicates a mutating operation is unable to be executed +// due to lack of quorum of consistent nodes. +var ErrShardInconsistent = errors.New("majority of shard nodes are in inconsistent state") + +// validate will concurrently fetch the checksum of each node in the shard and +// compare against the primary. All replicas consistent with the primary will +// be returned as "good", and the rest "bad". If only a partial check needs to +// be done, a list of Node storage keys can be provided to exclude those from +// the checks. +func (s Shard) validate(ctx context.Context, omits map[string]struct{}) (good, bad []Node, err error) { + var ( + mu sync.RWMutex + checksums = map[string][]byte{} + ) + + eg, eCtx := errgroup.WithContext(ctx) + + for storage, _ := range s.storageReplicas { + _, ok := omits[storage] + if ok { + continue + } + + storage := storage // rescope iterator vars + + eg.Go(func() error { + cs, err := s.verifier.CheckSum(eCtx, s.repo) + if err != nil { + return err + } + + mu.Lock() + checksums[storage] = cs + mu.Unlock() + + return nil + }) + + } + + if err := eg.Wait(); err != nil { + return nil, nil, err + } + + pCS := string(checksums[s.primary]) + for storage, cs := range checksums { + n := s.storageReplicas[storage] + if string(cs) != pCS { + bad = append(bad, n) + continue + } + good = append(good, n) + } + + return good, bad, nil +} diff --git a/internal/praefect/transaction/transaction.go b/internal/praefect/transaction/transaction.go index 9bc49bc24be..2ddff2180c3 100644 --- a/internal/praefect/transaction/transaction.go +++ b/internal/praefect/transaction/transaction.go @@ -1,20 +1,8 @@ -/*Package transaction provides transaction management functionality to -coordinate one-to-many clients attempting to modify the shards concurrently. - -While Git is distributed in nature, there are some repository wide data points -that can conflict between replicas if something goes wrong. This includes -references, which is why the transaction manager provides an API that allows -an RPC transaction to read/write lock the references being accessed to prevent -contention. -*/ package transaction import ( "context" - "errors" "sync" - - "golang.org/x/sync/errgroup" ) // Repository represents the identity and location of a repository as requested @@ -24,45 +12,6 @@ type Repository struct { StorageLoc string // storage location } -// Verifier verifies the project repository state -type Verifier interface { - // CheckSum will return the checksum for a project in a storage location - CheckSum(context.Context, Repository) ([]byte, error) -} - -// Coordinator allows the transaction manager to look up the shard for a repo -// at the beginning of each transaction -type Coordinator interface { - FetchShard(ctx context.Context, repo Repository) (*Shard, error) -} - -// ReplicationManager provides services to handle degraded nodes -type ReplicationManager interface { - NotifyDegradation(context.Context, Repository, Node) error -} - -// Manager tracks the progress of RPCs being applied to multiple -// downstream servers that make up a shard. It prevents conflicts that may arise -// from contention between multiple clients trying to modify the same -// references. -type Manager struct { - mu sync.Mutex - shards map[string]*Shard // shards keyed by project - - verifier Verifier - coordinator Coordinator - replman ReplicationManager -} - -func NewManager(v Verifier, c Coordinator, r ReplicationManager) *Manager { - return &Manager{ - shards: map[string]*Shard{}, - verifier: v, - coordinator: c, - replman: r, - } -} - // State represents the current state of a backend node // Note: in the future this may be extended to include refs type State struct { @@ -76,70 +25,9 @@ type RPC struct { type Node interface { Storage() string // storage location the node hosts ForwardRPC(ctx context.Context, rpc *RPC) error -} - -// Shard represents -type Shard struct { - repo Repository - - primary string // the designated primary node name - - // Replicas maps a storage location to the node replicas - storageReplicas map[string]Node - - // refLocks coordinates changes between many clients attempting to mutate - // a reference - refLocks map[string]*sync.RWMutex - - // used to check shard for inconsistencies - verifier Verifier -} - -// ErrShardInconsistent indicates a mutating operation is unable to be executed -// due to lack of quorum of consistent nodes. -var ErrShardInconsistent = errors.New("majority of shard nodes are in inconsistent state") - -func (s Shard) validate(ctx context.Context) (good, bad []Node, err error) { - var ( - mu sync.RWMutex - checksums = map[string][]byte{} - ) - - eg, eCtx := errgroup.WithContext(ctx) - for storage, _ := range s.storageReplicas { - storage := storage // rescope iterator vars - - eg.Go(func() error { - cs, err := s.verifier.CheckSum(eCtx, s.repo) - if err != nil { - return err - } - - mu.Lock() - checksums[storage] = cs - mu.Unlock() - - return nil - }) - - } - - if err := eg.Wait(); err != nil { - return nil, nil, err - } - - pCS := string(checksums[s.primary]) - for storage, cs := range checksums { - n := s.storageReplicas[storage] - if string(cs) != pCS { - bad = append(bad, n) - continue - } - good = append(good, n) - } - - return good, bad, nil + CheckSum(context.Context, Repository) ([]byte, error) + WriteRef(ctx context.Context, ref, value string) error } // MutateTx represents the resources available during a mutator transaction @@ -147,7 +35,7 @@ type MutateTx interface { AccessTx // LockRef acquires a write lock on a reference LockRef(context.Context, string) error - Shard() *Shard + Primary() Node } // AccessTx represents the resources available during an accessor transaction @@ -159,20 +47,57 @@ type AccessTx interface { } type transaction struct { - shard *Shard - refLocks map[string]*sync.RWMutex + shard *Shard + + replicas map[string]Node // only nodes verified to be consistent + + // unlocks contains callbacks to unlock all locks acquired + unlocks struct { + *sync.RWMutex + m map[string]func() + } + + // refRollbacks contains all reference values before modification + refRollbacks map[string][]byte } -func (t transaction) Shard() *Shard { return t.shard } +func newTransaction(shard *Shard, good []Node) transaction { + replicas := map[string]Node{} + for _, node := range good { + replicas[node.Storage()] = node + } -// LockRef attempts to acquire a write lock on the reference name provided -// (ref). If the context is cancelled first, the lock acquisition will be + return transaction{ + shard: shard, + replicas: replicas, + unlocks: struct { + *sync.RWMutex + m map[string]func() + }{ + RWMutex: new(sync.RWMutex), + m: make(map[string]func()), + }, + } +} + +// LockRef attempts to acquire a write lock on the reference name provided. +// If the context is cancelled first, the lock acquisition will be // aborted. func (t transaction) LockRef(ctx context.Context, ref string) error { lockQ := make(chan *sync.RWMutex) go func() { - l := t.shard.refLocks[ref] + t.shard.refLocks.RLock() + l, ok := t.shard.refLocks.m[ref] + t.shard.refLocks.RUnlock() + + if !ok { + l = new(sync.RWMutex) + t.shard.refLocks.Lock() + t.shard.refLocks.m[ref] = l + t.shard.refLocks.Unlock() + } + l.Lock() lockQ <- l }() @@ -182,40 +107,62 @@ func (t transaction) LockRef(ctx context.Context, ref string) error { select { case <-ctx.Done(): - // unlock before aborting l := <-lockQ l.Unlock() - return ctx.Err() case l := <-lockQ: - t.refLocks[ref] = l + t.unlocks.Lock() + t.unlocks.m[ref] = func() { l.Unlock() } + t.unlocks.Unlock() + return nil } - - return nil } // unlockAll will unlock all acquired locks at the end of a transaction func (t transaction) unlockAll() { // unlock all refs - for _, rl := range t.refLocks { - rl.Unlock() + t.unlocks.RLock() + for _, unlock := range t.unlocks.m { + unlock() } + t.unlocks.RUnlock() +} + +func (t transaction) rollback(ctx context.Context) error { + // for ref, value := range t.refRollbacks { + // + // } + return nil } func (t transaction) Replicas() map[string]Node { - return t.shard.storageReplicas + return t.replicas +} + +func (t transaction) Primary() Node { + return t.replicas[t.shard.primary] } // RLockRef attempts to acquire a read lock on the reference name provided -// (ref). If the context is cancelled first, the lock acquisition will be +// (ref). If the context is cancelled first, the lock acquisition will be // aborted. func (t transaction) RLockRef(ctx context.Context, ref string) error { lockQ := make(chan *sync.RWMutex) go func() { - l := t.shard.refLocks[ref] + t.shard.refLocks.RLock() + l, ok := t.shard.refLocks.m[ref] + t.shard.refLocks.RUnlock() + + if !ok { + l = new(sync.RWMutex) + t.shard.refLocks.Lock() + t.shard.refLocks.m[ref] = l + t.shard.refLocks.Unlock() + } + l.RLock() lockQ <- l }() @@ -232,92 +179,11 @@ func (t transaction) RLockRef(ctx context.Context, ref string) error { return ctx.Err() case l := <-lockQ: - t.refLocks[ref] = l - - } - - return nil -} - -// Mutate accepts a closure whose environment is a snapshot of the repository -// before the transaction begins. It is the responsibility of the closure -// author to mark all relevant assets being modified during a mutating -// transaction to ensure they are locked and protected from other closures -// modifying the same. -func (m *Manager) Mutate(ctx context.Context, repo Repository, fn func(MutateTx) error) error { - shard, err := m.coordinator.FetchShard(ctx, repo) - if err != nil { - return err - } - - good, bad, err := shard.validate(ctx) - if err != nil { - return err - } - - // are a majority of the nodes good (consistent)? - if len(bad) >= len(good) { - return ErrShardInconsistent - } - - eg, eCtx := errgroup.WithContext(ctx) - for _, node := range bad { - node := node // rescope iterator var for goroutine closure + t.unlocks.Lock() + t.unlocks.m[ref] = func() { l.RUnlock() } + t.unlocks.Unlock() - eg.Go(func() error { - return m.replman.NotifyDegradation(eCtx, repo, node) - }) - } - - tx := transaction{ - shard: shard, - refLocks: map[string]*sync.RWMutex{}, - } - defer tx.unlockAll() - - err = fn(tx) - if err != nil { - return err - } + return nil - return nil -} - -func (m *Manager) Access(ctx context.Context, repo Repository, fn func(AccessTx) error) error { - shard, err := m.coordinator.FetchShard(ctx, repo) - if err != nil { - return err } - - good, bad, err := shard.validate(ctx) - if err != nil { - return err - } - - // are a majority of the nodes good (consistent)? - if len(bad) >= len(good) { - return ErrShardInconsistent - } - - eg, eCtx := errgroup.WithContext(ctx) - for _, node := range bad { - node := node // rescope iterator var for goroutine closure - - eg.Go(func() error { - return m.replman.NotifyDegradation(eCtx, repo, node) - }) - } - - tx := transaction{ - shard: shard, - refLocks: map[string]*sync.RWMutex{}, - } - defer tx.unlockAll() - - err = fn(tx) - if err != nil { - return err - } - - return nil } diff --git a/internal/praefect/transaction/transaction_test.go b/internal/praefect/transaction/transaction_test.go index ca7064e41db..34666f807cb 100644 --- a/internal/praefect/transaction/transaction_test.go +++ b/internal/praefect/transaction/transaction_test.go @@ -4,7 +4,7 @@ import ( "context" "testing" - "gitlab.com/gitlab-org/gitaly/internal/praefect" + "gitlab.com/gitlab-org/gitaly/internal/praefect/transaction" ) func TestReplMan(t *testing.T) { @@ -25,40 +25,47 @@ func TestReplMan(t *testing.T) { }, } - // A replication manager needs to have the ability to verify the state of + // A transaction manager needs to have the ability to verify the state of // replicas, so it needs a Verifier. - rm := praefect.NewReplicationManager(mv) + rm := transaction.NewManager(mv, mockCoordinator{}, mockReplMan{}) + rm.Access(context.Background(), transaction.Repository{}, func(transaction.AccessTx) error { + return nil + }) +} - // replication managers are typically used within the context of a request - // when a mutator RPC is received. - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() +type mockCoordinator struct{} +func (_ mockCoordinator) FetchShard(context.Context, transaction.Repository) (*transaction.Shard, error) { + return nil, nil } +type mockReplMan struct{} + +func (_ mockReplMan) NotifyDegradation(context.Context, transaction.Repository) error { return nil } + type mockVerifier struct { // checksums contains ordered checksums keyed by project and then storage checksums map[string]map[string][][]byte } -func (mv *mockVerifier) CheckSum(_ context.Context, project, storage string) ([]byte, error) { - storages, ok := mv.checksums[project] +func (mv *mockVerifier) CheckSum(_ context.Context, repo transaction.Repository) ([]byte, error) { + storages, ok := mv.checksums[repo.ProjectHash] if !ok { - panic("no project " + project) + panic("no project " + repo.ProjectHash) } - sums, ok := storages[storage] + sums, ok := storages[repo.StorageLoc] if !ok { - panic("no storage " + storage) + panic("no storage " + repo.StorageLoc) } if len(sums) < 1 { - panic("no more checksums for " + project) + panic("no more checksums for " + repo.ProjectHash) } // pop first checksum off list var sum []byte - sum, mv.checksums[project][storage] = sums[len(sums)-1], sums[:len(sums)-1] + sum, mv.checksums[repo.ProjectHash][repo.StorageLoc] = sums[len(sums)-1], sums[:len(sums)-1] return sum, nil } -- GitLab From 824fa90ab9d0df5601fa9067bcb43eb1002e9151 Mon Sep 17 00:00:00 2001 From: Paul Okstad Date: Fri, 15 Mar 2019 01:04:24 -0700 Subject: [PATCH 6/6] major simplification of transaction manager --- internal/praefect/transaction/manager.go | 23 +-- internal/praefect/transaction/mock_test.go | 79 ++++++++++ internal/praefect/transaction/shard.go | 37 ++--- internal/praefect/transaction/transaction.go | 134 +++-------------- .../praefect/transaction/transaction_test.go | 140 ++++++++++++------ 5 files changed, 218 insertions(+), 195 deletions(-) create mode 100644 internal/praefect/transaction/mock_test.go diff --git a/internal/praefect/transaction/manager.go b/internal/praefect/transaction/manager.go index 12a92d610ec..7fe38442e1e 100644 --- a/internal/praefect/transaction/manager.go +++ b/internal/praefect/transaction/manager.go @@ -32,7 +32,6 @@ type Manager struct { mu sync.Mutex shards map[string]*Shard // shards keyed by project - verifier Verifier coordinator Coordinator replman ReplicationManager } @@ -40,10 +39,9 @@ type Manager struct { // NewManafer returns a manager for coordinating transaction between shards // that are fetched from the coordinator. Any inconsistencies found will be // reported to the provided replication manager. -func NewManager(v Verifier, c Coordinator, r ReplicationManager) *Manager { +func NewManager(c Coordinator, r ReplicationManager) *Manager { return &Manager{ shards: map[string]*Shard{}, - verifier: v, coordinator: c, replman: r, } @@ -54,12 +52,18 @@ func NewManager(v Verifier, c Coordinator, r ReplicationManager) *Manager { // author to mark all relevant assets being modified during a mutating // transaction to ensure they are locked and protected from other closures // modifying the same. -func (m *Manager) Mutate(ctx context.Context, repo Repository, fn func(MutateTx) error) error { +func (m *Manager) Mutate(ctx context.Context, repo Repository, fn func(Tx) error) error { shard, err := m.coordinator.FetchShard(ctx, repo) if err != nil { return err } + // Prevent other clients from modifying the same shard until finished. + // TODO: serialize access at the reference scope: + // https://gitlab.com/gitlab-org/gitaly/issues/1530 + shard.lock.Lock() + defer shard.lock.Unlock() + omits := make(map[string]struct{}) // TODO: some smart caching needs to be done to eliminate this check. We @@ -81,8 +85,7 @@ func (m *Manager) Mutate(ctx context.Context, repo Repository, fn func(MutateTx) return err } - tx := newTransaction(shard, good) - defer tx.unlockAll() + tx := newTx(shard, good, txCatMutator) err = fn(tx) if err != nil { @@ -119,12 +122,15 @@ func (m *Manager) notifyDegradations(ctx context.Context, repo Repository, degra return reported, eg.Wait() } -func (m *Manager) Access(ctx context.Context, repo Repository, fn func(AccessTx) error) error { +func (m *Manager) Access(ctx context.Context, repo Repository, fn func(Tx) error) error { shard, err := m.coordinator.FetchShard(ctx, repo) if err != nil { return err } + shard.lock.RLock() + defer shard.lock.RUnlock() + // TODO: some smart caching needs to be done to eliminate this check. We // already check the shard consistency at the end of each transaction so // we shouldn't need to do it twice except for the first time we fetch a @@ -139,8 +145,7 @@ func (m *Manager) Access(ctx context.Context, repo Repository, fn func(AccessTx) return err } - tx := newTransaction(shard, good) - defer tx.unlockAll() + tx := newTx(shard, good, txCatAccessor) err = fn(tx) if err != nil { diff --git a/internal/praefect/transaction/mock_test.go b/internal/praefect/transaction/mock_test.go new file mode 100644 index 00000000000..2aab6fcd3a4 --- /dev/null +++ b/internal/praefect/transaction/mock_test.go @@ -0,0 +1,79 @@ +package transaction_test + +import ( + "context" + "fmt" + "sync" + "testing" + + "gitlab.com/gitlab-org/gitaly/internal/praefect/transaction" +) + +type mockCoordinator struct { + shards map[transaction.Repository]*transaction.Shard +} + +func (mc mockCoordinator) FetchShard(_ context.Context, repo transaction.Repository) (*transaction.Shard, error) { + s, ok := mc.shards[repo] + if !ok { + return nil, fmt.Errorf("shard doesn't exist for repo %+v", repo) + } + + return s, nil +} + +type mockReplMan struct{} + +func (_ mockReplMan) NotifyDegradation(context.Context, transaction.Repository) error { return nil } + +type mockNode struct { + sync.RWMutex + + tb testing.TB + rpc *transaction.RPC + + // set the following values in test cases + storage string + checksums map[transaction.Repository][]string +} + +func (mn *mockNode) CheckSum(_ context.Context, repo transaction.Repository) ([]byte, error) { + mn.Lock() + defer mn.Unlock() + + checksums, ok := mn.checksums[repo] + if !ok { + panic( + fmt.Sprintf( + "test setup problem: missing checksums in mock node %s for repo %+v", + mn.storage, repo, + ), + ) + } + + if len(checksums) < 1 { + panic( + fmt.Sprintf( + "test setup problem: not enough checksums for mock node %s for repo %+v", + mn.storage, repo, + ), + ) + } + + cs := checksums[0] + mn.checksums[repo] = checksums[1:len(checksums)] + + mn.tb.Logf("mock node %s returning checksum %s for repo %s", mn.storage, cs, repo) + + return []byte(cs), nil +} + +func (mn *mockNode) ForwardRPC(_ context.Context, rpc *transaction.RPC) error { + mn.Lock() + mn.rpc = rpc + mn.Unlock() + + return nil +} + +func (mn *mockNode) Storage() string { return mn.storage } diff --git a/internal/praefect/transaction/shard.go b/internal/praefect/transaction/shard.go index af6ec2c29e3..a583c61a7fe 100644 --- a/internal/praefect/transaction/shard.go +++ b/internal/praefect/transaction/shard.go @@ -8,38 +8,27 @@ import ( "golang.org/x/sync/errgroup" ) -// Shard represents a set of Gitaly replicas for repository. Each shard has a +// Shard represents a set of Gitaly replicas for a repository. Each shard has a // designated primary and maintains locks to resources that can cause contention // between clients writing to the same repo. type Shard struct { - repo Repository - - primary string // the designated primary node name - + lock *sync.RWMutex // all mutations require a write lock + repo Repository // the repo this replica backs + primary string // the storage location of the primary node storageReplicas map[string]Node // maps storage location to a replica +} - refLocks struct { - *sync.RWMutex - m map[string]*sync.RWMutex // maps ref name to a lock +func NewShard(r Repository, primary string, replicas []Node) *Shard { + sreps := make(map[string]Node) + for _, r := range replicas { + sreps[r.Storage()] = r } - // used to check shard for inconsistencies - verifier Verifier -} - -func NewShard(r Repository, primary string, replicas []Node, v Verifier) *Shard { return &Shard{ repo: r, primary: primary, - storageReplicas: make(map[string]Node), - refLocks: struct { - *sync.RWMutex - m map[string]*sync.RWMutex - }{ - RWMutex: new(sync.RWMutex), - m: make(map[string]*sync.RWMutex), - }, - verifier: v, + storageReplicas: sreps, + lock: new(sync.RWMutex), } } @@ -60,7 +49,7 @@ func (s Shard) validate(ctx context.Context, omits map[string]struct{}) (good, b eg, eCtx := errgroup.WithContext(ctx) - for storage, _ := range s.storageReplicas { + for storage, node := range s.storageReplicas { _, ok := omits[storage] if ok { continue @@ -69,7 +58,7 @@ func (s Shard) validate(ctx context.Context, omits map[string]struct{}) (good, b storage := storage // rescope iterator vars eg.Go(func() error { - cs, err := s.verifier.CheckSum(eCtx, s.repo) + cs, err := node.CheckSum(eCtx, s.repo) if err != nil { return err } diff --git a/internal/praefect/transaction/transaction.go b/internal/praefect/transaction/transaction.go index 2ddff2180c3..36bf0c00a18 100644 --- a/internal/praefect/transaction/transaction.go +++ b/internal/praefect/transaction/transaction.go @@ -2,6 +2,7 @@ package transaction import ( "context" + "errors" "sync" ) @@ -25,28 +26,19 @@ type RPC struct { type Node interface { Storage() string // storage location the node hosts ForwardRPC(ctx context.Context, rpc *RPC) error - CheckSum(context.Context, Repository) ([]byte, error) - WriteRef(ctx context.Context, ref, value string) error } -// MutateTx represents the resources available during a mutator transaction -type MutateTx interface { - AccessTx - // LockRef acquires a write lock on a reference - LockRef(context.Context, string) error - Primary() Node -} +type txCategory int -// AccessTx represents the resources available during an accessor transaction -type AccessTx interface { - // Replicas returns all replicas without distinguishing the primary - Replicas() map[string]Node - // RLockRef acquires a read lock on a reference - RLockRef(context.Context, string) error -} +const ( + txCatAccessor = iota + txCatMutator +) + +type Tx struct { + category txCategory -type transaction struct { shard *Shard replicas map[string]Node // only nodes verified to be consistent @@ -61,13 +53,14 @@ type transaction struct { refRollbacks map[string][]byte } -func newTransaction(shard *Shard, good []Node) transaction { +func newTx(shard *Shard, good []Node, txCat txCategory) Tx { replicas := map[string]Node{} for _, node := range good { replicas[node.Storage()] = node } - return transaction{ + return Tx{ + category: txCat, shard: shard, replicas: replicas, unlocks: struct { @@ -80,110 +73,23 @@ func newTransaction(shard *Shard, good []Node) transaction { } } -// LockRef attempts to acquire a write lock on the reference name provided. -// If the context is cancelled first, the lock acquisition will be -// aborted. -func (t transaction) LockRef(ctx context.Context, ref string) error { - lockQ := make(chan *sync.RWMutex) - - go func() { - t.shard.refLocks.RLock() - l, ok := t.shard.refLocks.m[ref] - t.shard.refLocks.RUnlock() - - if !ok { - l = new(sync.RWMutex) - t.shard.refLocks.Lock() - t.shard.refLocks.m[ref] = l - t.shard.refLocks.Unlock() - } - - l.Lock() - lockQ <- l - }() - - // ensure lockQ is consumed in all code paths so that goroutine doesn't - // stray - select { - - case <-ctx.Done(): - l := <-lockQ - l.Unlock() - return ctx.Err() - - case l := <-lockQ: - t.unlocks.Lock() - t.unlocks.m[ref] = func() { l.Unlock() } - t.unlocks.Unlock() - - return nil - } -} - -// unlockAll will unlock all acquired locks at the end of a transaction -func (t transaction) unlockAll() { - // unlock all refs - t.unlocks.RLock() - for _, unlock := range t.unlocks.m { - unlock() - } - t.unlocks.RUnlock() -} - -func (t transaction) rollback(ctx context.Context) error { +func (t Tx) rollback(ctx context.Context) error { // for ref, value := range t.refRollbacks { // // } return nil } -func (t transaction) Replicas() map[string]Node { +func (t Tx) Replicas() map[string]Node { return t.replicas } -func (t transaction) Primary() Node { - return t.replicas[t.shard.primary] -} - -// RLockRef attempts to acquire a read lock on the reference name provided -// (ref). If the context is cancelled first, the lock acquisition will be -// aborted. -func (t transaction) RLockRef(ctx context.Context, ref string) error { - lockQ := make(chan *sync.RWMutex) - - go func() { - t.shard.refLocks.RLock() - l, ok := t.shard.refLocks.m[ref] - t.shard.refLocks.RUnlock() - - if !ok { - l = new(sync.RWMutex) - t.shard.refLocks.Lock() - t.shard.refLocks.m[ref] = l - t.shard.refLocks.Unlock() - } - - l.RLock() - lockQ <- l - }() - - // ensure lockQ is consumed in all code paths so that goroutine doesn't - // stray - select { - - case <-ctx.Done(): - // unlock before aborting - l := <-lockQ - l.RUnlock() - - return ctx.Err() - - case l := <-lockQ: - t.unlocks.Lock() - t.unlocks.m[ref] = func() { l.RUnlock() } - t.unlocks.Unlock() - - return nil +var ErrAccessorNotPermitted = errors.New("a mutator operation was attempted by an accessor") +func (t Tx) Primary() (Node, error) { + if t.category != txCatMutator { + return nil, ErrAccessorNotPermitted } + + return t.replicas[t.shard.primary], nil } diff --git a/internal/praefect/transaction/transaction_test.go b/internal/praefect/transaction/transaction_test.go index 34666f807cb..ccbbae99d59 100644 --- a/internal/praefect/transaction/transaction_test.go +++ b/internal/praefect/transaction/transaction_test.go @@ -3,69 +3,113 @@ package transaction_test import ( "context" "testing" + "time" "gitlab.com/gitlab-org/gitaly/internal/praefect/transaction" + + "github.com/stretchr/testify/require" ) -func TestReplMan(t *testing.T) { - const ( - projA = "project-A" - stor1 = "storage-1" - stor2 = "storage-2" - stor3 = "storage-3" - ) +const ( + projA = "project-A" + stor1 = "storage-1" + stor2 = "storage-2" + stor3 = "storage-3" +) - mv := &mockVerifier{ - checksums: map[string]map[string][][]byte{ - projA: map[string][][]byte{ - stor1: {[]byte{1}}, - stor2: {[]byte{1}}, - stor3: {[]byte{1}}, - }, - }, +type testCase struct { + name string + shards map[transaction.Repository]*transaction.Shard + txList []struct { + mutator bool + repo transaction.Repository + txFn func(transaction.Tx) error } - - // A transaction manager needs to have the ability to verify the state of - // replicas, so it needs a Verifier. - rm := transaction.NewManager(mv, mockCoordinator{}, mockReplMan{}) - rm.Access(context.Background(), transaction.Repository{}, func(transaction.AccessTx) error { - return nil - }) + expectErr error } -type mockCoordinator struct{} +var ( + repo1 = transaction.Repository{ + ProjectHash: projA, + StorageLoc: stor1, + } -func (_ mockCoordinator) FetchShard(context.Context, transaction.Repository) (*transaction.Shard, error) { - return nil, nil -} + testCases = []func(testing.TB) testCase{ + func(t testing.TB) testCase { + var ( + node1 = &mockNode{ + tb: t, + storage: stor1, + checksums: map[transaction.Repository][]string{ + repo1: []string{"1"}, + }, + } + ) -type mockReplMan struct{} + return testCase{ + name: "one node shard: no-op access tx", + shards: map[transaction.Repository]*transaction.Shard{ + repo1: transaction.NewShard( + repo1, + node1.storage, + []transaction.Node{node1}, + ), + }, + txList: []struct { + mutator bool + repo transaction.Repository + txFn func(transaction.Tx) error + }{ + { + repo: repo1, + txFn: func(_ transaction.Tx) error { + t.Log("this is a no-op transaction") + // checksum should get consumed during consistency + // check + require.Len(t, node1.checksums[repo1], 0) -func (_ mockReplMan) NotifyDegradation(context.Context, transaction.Repository) error { return nil } + return nil + }, + }, + }, + } + }, + } +) -type mockVerifier struct { - // checksums contains ordered checksums keyed by project and then storage - checksums map[string]map[string][][]byte -} +func TestManager(t *testing.T) { + for _, ttFn := range testCases { + tt := ttFn(t) + t.Run(tt.name, func(t *testing.T) { + var ( + mc = mockCoordinator{tt.shards} + mrm = mockReplMan{} + rm = transaction.NewManager(mc, mrm) + ) -func (mv *mockVerifier) CheckSum(_ context.Context, repo transaction.Repository) ([]byte, error) { - storages, ok := mv.checksums[repo.ProjectHash] - if !ok { - panic("no project " + repo.ProjectHash) - } + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() - sums, ok := storages[repo.StorageLoc] - if !ok { - panic("no storage " + repo.StorageLoc) - } + var err error + for _, tx := range tt.txList { - if len(sums) < 1 { - panic("no more checksums for " + repo.ProjectHash) - } + if tx.mutator { + err = rm.Mutate(ctx, tx.repo, tx.txFn) + } else { + err = rm.Access(ctx, tx.repo, tx.txFn) + } - // pop first checksum off list - var sum []byte - sum, mv.checksums[repo.ProjectHash][repo.StorageLoc] = sums[len(sums)-1], sums[:len(sums)-1] + if err != nil { + break + } + } - return sum, nil + if tt.expectErr != nil { + require.Error(t, err) + require.EqualError(t, err, tt.expectErr.Error()) + } else { + require.NoError(t, err) + } + }) + } } -- GitLab