From d0febac0870e6aabb32c11be7694d5dbfd1563eb Mon Sep 17 00:00:00 2001 From: Paul Okstad Date: Mon, 18 Mar 2019 23:12:10 -0700 Subject: [PATCH 1/2] replication manager draft --- internal/praefect/common.go | 15 +++++++ internal/praefect/node.go | 20 +++++++++ internal/praefect/replication.go | 77 ++++++++++++++++++++++++++++++++ internal/praefect/server.go | 1 + 4 files changed, 113 insertions(+) create mode 100644 internal/praefect/common.go create mode 100644 internal/praefect/node.go create mode 100644 internal/praefect/replication.go diff --git a/internal/praefect/common.go b/internal/praefect/common.go new file mode 100644 index 00000000000..17667283e3e --- /dev/null +++ b/internal/praefect/common.go @@ -0,0 +1,15 @@ +package praefect + +// Repository contains all the information necessary to address a specific +// repository +type Repository struct { + Project string // e.g. gitlab.com/gitaly-org/gitaly + Storage string // e.g. Default +} + +// ReplJob indicates which repo replicas require syncing +type ReplJob struct { + ID string // unique ID to track job progress in datastore + Primary Repository + Replica Node +} diff --git a/internal/praefect/node.go b/internal/praefect/node.go new file mode 100644 index 00000000000..7100e2559ad --- /dev/null +++ b/internal/praefect/node.go @@ -0,0 +1,20 @@ +package praefect + +import ( + "context" + + "google.golang.org/grpc" +) + +// Node is a backend Gitaly node that is responsible for hosting repositories +// in a specific storage location +type Node struct { + storage string // storage location ID (e.g. default) + cc *grpc.ClientConn +} + +// PullReplication will attempt to replicate changes from a primary replica +func (n Node) PullReplication(ctx context.Context, primary Repository) error { + // TODO: replication logic in #1484 + return nil +} diff --git a/internal/praefect/replication.go b/internal/praefect/replication.go new file mode 100644 index 00000000000..e8f2dc49650 --- /dev/null +++ b/internal/praefect/replication.go @@ -0,0 +1,77 @@ +package praefect + +import ( + "context" +) + +// ReplMan is a replication manager for handling replication jobs +type ReplMan struct { + l Logger + // whitelist contains the project names of the repos we wish to replicate + whitelist map[string]struct{} +} + +type ReplManOpt func(*ReplMan) + +func NewReplMan(opts ...ReplManOpt) *ReplMan { + m := &ReplMan{ + whitelist: map[string]struct{}{}, + } + + for _, opt := range opts { + opt(m) + } + + return m +} + +// WithWhitelist will configure a whitelist for repos to allow replication +func WithWhitelist(whitelistedRepos []string) ReplManOpt { + return func(m *ReplMan) { + for _, r := range whitelistedRepos { + m.whitelist[r] = struct{}{} + } + } +} + +// ReplCoordinator represents all the coordinator functionality the replication +// manager relies on +type ReplCoordinator interface { + // ReplicationQueue returns a stream of jobs from + ReplicationQueue(context.Context) (<-chan ReplJob, error) + + // CompleteJob reports if a job was completed. A non-nil jobErr indicates + // the job was not successful. + CompleteJob(ctx context.Context, ID string, jobErr error) error +} + +func (rm *ReplMan) ProcessJobs(ctx context.Context, rc ReplCoordinator) error { + jobQ, err := rc.ReplicationQueue(ctx) + + for { + var ( + job ReplJob + ok bool + ) + + select { + + // context cancelled + case <-ctx.Done(): + return ctx.Err() + + case job, ok = <-jobQ: + if !ok { // channel closed + return nil + } + + } + + jobErr := job.Replica.PullReplication(ctx, job.Primary) + + err = rc.CompleteJob(ctx, job.ID, jobErr) + if err != nil { + rm.l.Errorf("unable to report replication job completion for %+v", job.ID) + } + } +} diff --git a/internal/praefect/server.go b/internal/praefect/server.go index 04779af2cef..6266c159913 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -26,6 +26,7 @@ import ( // into the praefect server type Logger interface { Debugf(format string, args ...interface{}) + Errorf(format string, args ...interface{}) } // Coordinator takes care of directing client requests to the appropriate -- GitLab From 2e5e023029609380cf4ab7234d25efe978e6648a Mon Sep 17 00:00:00 2001 From: Paul Okstad Date: Tue, 19 Mar 2019 20:07:26 +0000 Subject: [PATCH 2/2] Apply suggestion to internal/praefect/replication.go --- internal/praefect/replication.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/praefect/replication.go b/internal/praefect/replication.go index e8f2dc49650..b0d289040a7 100644 --- a/internal/praefect/replication.go +++ b/internal/praefect/replication.go @@ -71,7 +71,7 @@ func (rm *ReplMan) ProcessJobs(ctx context.Context, rc ReplCoordinator) error { err = rc.CompleteJob(ctx, job.ID, jobErr) if err != nil { - rm.l.Errorf("unable to report replication job completion for %+v", job.ID) + rm.l.Errorf("unable to report replication job completion for %v", job.ID) } } } -- GitLab