diff --git a/internal/praefect/common.go b/internal/praefect/common.go new file mode 100644 index 0000000000000000000000000000000000000000..17667283e3e5afbeb7e0856633b14aa95a70b562 --- /dev/null +++ b/internal/praefect/common.go @@ -0,0 +1,15 @@ +package praefect + +// Repository contains all the information necessary to address a specific +// repository +type Repository struct { + Project string // e.g. gitlab.com/gitaly-org/gitaly + Storage string // e.g. Default +} + +// ReplJob indicates which repo replicas require syncing +type ReplJob struct { + ID string // unique ID to track job progress in datastore + Primary Repository + Replica Node +} diff --git a/internal/praefect/node.go b/internal/praefect/node.go new file mode 100644 index 0000000000000000000000000000000000000000..7100e2559ad6b99800edffc173ec4bb330c43067 --- /dev/null +++ b/internal/praefect/node.go @@ -0,0 +1,20 @@ +package praefect + +import ( + "context" + + "google.golang.org/grpc" +) + +// Node is a backend Gitaly node that is responsible for hosting repositories +// in a specific storage location +type Node struct { + storage string // storage location ID (e.g. default) + cc *grpc.ClientConn +} + +// PullReplication will attempt to replicate changes from a primary replica +func (n Node) PullReplication(ctx context.Context, primary Repository) error { + // TODO: replication logic in #1484 + return nil +} diff --git a/internal/praefect/replication.go b/internal/praefect/replication.go new file mode 100644 index 0000000000000000000000000000000000000000..b0d289040a7f80ab02facbc7f12a4a7250a13557 --- /dev/null +++ b/internal/praefect/replication.go @@ -0,0 +1,77 @@ +package praefect + +import ( + "context" +) + +// ReplMan is a replication manager for handling replication jobs +type ReplMan struct { + l Logger + // whitelist contains the project names of the repos we wish to replicate + whitelist map[string]struct{} +} + +type ReplManOpt func(*ReplMan) + +func NewReplMan(opts ...ReplManOpt) *ReplMan { + m := &ReplMan{ + whitelist: map[string]struct{}{}, + } + + for _, opt := range opts { + opt(m) + } + + return m +} + +// WithWhitelist will configure a whitelist for repos to allow replication +func WithWhitelist(whitelistedRepos []string) ReplManOpt { + return func(m *ReplMan) { + for _, r := range whitelistedRepos { + m.whitelist[r] = struct{}{} + } + } +} + +// ReplCoordinator represents all the coordinator functionality the replication +// manager relies on +type ReplCoordinator interface { + // ReplicationQueue returns a stream of jobs from + ReplicationQueue(context.Context) (<-chan ReplJob, error) + + // CompleteJob reports if a job was completed. A non-nil jobErr indicates + // the job was not successful. + CompleteJob(ctx context.Context, ID string, jobErr error) error +} + +func (rm *ReplMan) ProcessJobs(ctx context.Context, rc ReplCoordinator) error { + jobQ, err := rc.ReplicationQueue(ctx) + + for { + var ( + job ReplJob + ok bool + ) + + select { + + // context cancelled + case <-ctx.Done(): + return ctx.Err() + + case job, ok = <-jobQ: + if !ok { // channel closed + return nil + } + + } + + jobErr := job.Replica.PullReplication(ctx, job.Primary) + + err = rc.CompleteJob(ctx, job.ID, jobErr) + if err != nil { + rm.l.Errorf("unable to report replication job completion for %v", job.ID) + } + } +} diff --git a/internal/praefect/server.go b/internal/praefect/server.go index 04779af2cef0595f045d2613066d8d4d214810be..6266c159913519b94e7c2b39e3909fec031901a1 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -26,6 +26,7 @@ import ( // into the praefect server type Logger interface { Debugf(format string, args ...interface{}) + Errorf(format string, args ...interface{}) } // Coordinator takes care of directing client requests to the appropriate