From e5cfa92a988288f652eb2440ea5cce2385968a1b Mon Sep 17 00:00:00 2001 From: John Cai Date: Wed, 17 Jul 2019 15:35:11 -0700 Subject: [PATCH 1/3] Updating data model for praefect --- .gitignore | 1 + _support/praefect-cluster/.gitignore | 6 +- ...gitaly-primary.toml => config.gitaly.toml} | 0 .../praefect-cluster/config.praefect.toml | 23 +- _support/praefect-cluster/docker-compose.yml | 24 +- .../praefect-cluster/gitaly-backup-1.toml | 49 --- .../praefect-cluster/gitaly-backup-2.toml | 49 --- .../unreleased/jc-data-model-changes.yml | 5 + cmd/praefect/main.go | 16 +- internal/helper/storage.go | 11 + internal/praefect/common.go | 8 - internal/praefect/config/config.go | 28 +- internal/praefect/config/config_test.go | 34 +- internal/praefect/config/testdata/config.toml | 24 +- internal/praefect/coordinator.go | 178 ++++----- internal/praefect/coordinator_test.go | 31 +- internal/praefect/datastore.go | 365 ++++++++++-------- internal/praefect/datastore_memory_test.go | 113 +++--- internal/praefect/datastore_test.go | 86 +++-- internal/praefect/mock/mock.pb.go | 30 +- internal/praefect/mock/mock.proto | 7 +- internal/praefect/mocksvc_test.go | 4 +- internal/praefect/models/node.go | 17 + internal/praefect/models/nodes.go | 8 - internal/praefect/models/repository.go | 8 - .../praefect/protoregistry/protoregistry.go | 37 +- .../praefect/protoregistry/targetrepo_test.go | 2 +- internal/praefect/replicator.go | 112 +++--- internal/praefect/replicator_test.go | 104 ++--- internal/praefect/server_test.go | 51 ++- 30 files changed, 707 insertions(+), 724 deletions(-) rename _support/praefect-cluster/{gitaly-primary.toml => config.gitaly.toml} (100%) delete mode 100644 _support/praefect-cluster/gitaly-backup-1.toml delete mode 100644 _support/praefect-cluster/gitaly-backup-2.toml create mode 100644 changelogs/unreleased/jc-data-model-changes.yml create mode 100644 internal/praefect/models/node.go delete mode 100644 internal/praefect/models/nodes.go delete mode 100644 internal/praefect/models/repository.go diff --git a/.gitignore b/.gitignore index b7fbb0b4cb7..9fb2c7e44a5 100644 --- a/.gitignore +++ b/.gitignore @@ -21,6 +21,7 @@ cmd/gitaly-remote/gitaly-remote git-env /gitaly-debug /praefect +/praefect-migrate gitaly.pid /vendor/github.com/libgit2/git2go/vendor /vendor diff --git a/_support/praefect-cluster/.gitignore b/_support/praefect-cluster/.gitignore index 06b873206ee..bd035c2b756 100644 --- a/_support/praefect-cluster/.gitignore +++ b/_support/praefect-cluster/.gitignore @@ -1,3 +1,3 @@ -/gitaly-backup-1 -/gitaly-backup-2 -/gitaly-primary +/gitaly-1 +/gitaly-2 +/gitaly-3 diff --git a/_support/praefect-cluster/gitaly-primary.toml b/_support/praefect-cluster/config.gitaly.toml similarity index 100% rename from _support/praefect-cluster/gitaly-primary.toml rename to _support/praefect-cluster/config.gitaly.toml diff --git a/_support/praefect-cluster/config.praefect.toml b/_support/praefect-cluster/config.praefect.toml index e0f163178b3..2a4297248b1 100644 --- a/_support/praefect-cluster/config.praefect.toml +++ b/_support/praefect-cluster/config.praefect.toml @@ -24,17 +24,22 @@ whitelist = ["@hashed/2c/62/2c624232cdd221771294dfbb310aca000a0df6ac8b66b696d90e # as shard. listen_addr should be unique for all nodes. # Requires the protocol to be defined, e.g. tcp://host.tld:1234 -[primary_server] - name = "default" +[[storage_node]] # listen_addr = "tcp://gitaly-primary:9999" - listen_addr = "tcp://127.0.0.1:9999" + storage = "praefect-internal-1" + address = "tcp://127.0.0.1:9999" -[[secondary_server]] - name = "backup1" +[[storage_node]] # listen_addr = "tcp://gitaly-backup-1:9999" - listen_addr = "tcp://127.0.0.1:9998" + storage = "praefect-internal-2" + address = "tcp://127.0.0.1:9998" -[[secondary_server]] - name = "backup2" +[[storage_node]] # listen_addr = "tcp://gitaly-backup-2:9999" - listen_addr = "tcp://127.0.0.1:9997" \ No newline at end of file + storage = "praefect-internal-3" + address = "tcp://127.0.0.1:9997" + +[postgres] + user = "johncai" + address = "127.0.0.1:5432" + database = "praefect_test" \ No newline at end of file diff --git a/_support/praefect-cluster/docker-compose.yml b/_support/praefect-cluster/docker-compose.yml index 6eb81be47e3..09745ea410f 100644 --- a/_support/praefect-cluster/docker-compose.yml +++ b/_support/praefect-cluster/docker-compose.yml @@ -6,15 +6,15 @@ services: # dockerfile: Dockerfile.praefect # image: praefect:latest # depends_on: -# - gitaly-primary -# - gitaly-backup-1 -# - gitaly-backup-2 +# - gitaly-1 +# - gitaly-2 +# - gitaly-3 # command: ["/etc/gitaly/praefect", "-config", "/etc/gitaly/config.praefect.toml"] # ports: # - "2305:2305" # volumes: # - ./config.praefect.toml:/etc/gitaly/config.praefect.toml - gitaly-primary: + gitaly-1: image: registry.gitlab.com/gitlab-org/build/cng/gitaly:latest environment: - GITALY_TESTING_NO_GIT_HOOKS=1 @@ -24,9 +24,9 @@ services: - "9999:9999" command: ["/usr/local/bin/gitaly", "/etc/config/config.toml"] volumes: - - ./gitaly-primary/data:/home/git/repositories - - ./gitaly-primary.toml:/etc/config/config.toml - gitaly-backup-1: + - ./gitaly-1/data:/home/git/repositories + - ./config.gitaly.toml:/etc/config/config.toml + gitaly-2: image: registry.gitlab.com/gitlab-org/build/cng/gitaly:latest environment: - GITALY_TESTING_NO_GIT_HOOKS=1 @@ -36,9 +36,9 @@ services: - "9998:9999" command: ["/usr/local/bin/gitaly", "/etc/config/config.toml"] volumes: - - ./gitaly-backup-1/data:/home/git/repositories - - ./gitaly-backup-1.toml:/etc/config/config.toml - gitaly-backup-2: + - ./gitaly-2/data:/home/git/repositories + - ./config.gitaly.toml:/etc/config/config.toml + gitaly-3: image: registry.gitlab.com/gitlab-org/build/cng/gitaly:latest environment: - GITALY_TESTING_NO_GIT_HOOKS=1 @@ -48,5 +48,5 @@ services: - "9997:9999" command: ["/usr/local/bin/gitaly", "/etc/config/config.toml"] volumes: - - ./gitaly-backup-2/data:/home/git/repositories - - ./gitaly-backup-2.toml:/etc/config/config.toml \ No newline at end of file + - ./gitaly-3/data:/home/git/repositories + - ./config.gitaly.toml:/etc/config/config.toml \ No newline at end of file diff --git a/_support/praefect-cluster/gitaly-backup-1.toml b/_support/praefect-cluster/gitaly-backup-1.toml deleted file mode 100644 index 89d1884e362..00000000000 --- a/_support/praefect-cluster/gitaly-backup-1.toml +++ /dev/null @@ -1,49 +0,0 @@ -# Example Gitaly configuration file - -# The directory where Gitaly's executables are stored -bin_dir = "/usr/local/bin" - -# listen on a TCP socket. This is insecure (no authentication) -listen_addr = "0.0.0.0:9999" - -# # Optional: export metrics via Prometheus -# prometheus_listen_addr = "localhost:9236" -# - -# # Git executable settings -# [git] -# bin_path = "/usr/bin/git" - -[[storage]] -name = "backup1" -path = "/home/git/repositories" - -# # You can optionally configure more storages for this Gitaly instance to serve up -# -# [[storage]] -# name = "other_storage" -# path = "/mnt/other_storage/repositories" -# - -# # You can optionally configure Gitaly to output JSON-formatted log messages to stdout -# [logging] -# format = "json" -# # Additionally exceptions can be reported to Sentry -# sentry_dsn = "https://:@sentry.io/" - -# # You can optionally configure Gitaly to record histogram latencies on GRPC method calls -# [prometheus] -# grpc_latency_buckets = [0.001, 0.005, 0.025, 0.1, 0.5, 1.0, 10.0, 30.0, 60.0, 300.0, 1500.0] - -[gitaly-ruby] -# The directory where gitaly-ruby is installed -dir = "/srv/gitaly-ruby" - -[gitlab-shell] -# The directory where gitlab-shell is installed -dir = "/srv/gitlab-shell" - -# # You can adjust the concurrency of each RPC endpoint -# [[concurrency]] -# rpc = "/gitaly.RepositoryService/GarbageCollect" -# max_per_repo = 1 diff --git a/_support/praefect-cluster/gitaly-backup-2.toml b/_support/praefect-cluster/gitaly-backup-2.toml deleted file mode 100644 index 1b5ce8d209d..00000000000 --- a/_support/praefect-cluster/gitaly-backup-2.toml +++ /dev/null @@ -1,49 +0,0 @@ -# Example Gitaly configuration file - -# The directory where Gitaly's executables are stored -bin_dir = "/usr/local/bin" - -# listen on a TCP socket. This is insecure (no authentication) -listen_addr = "0.0.0.0:9999" - -# # Optional: export metrics via Prometheus -# prometheus_listen_addr = "localhost:9236" -# - -# # Git executable settings -# [git] -# bin_path = "/usr/bin/git" - -[[storage]] -name = "backup2" -path = "/home/git/repositories" - -# # You can optionally configure more storages for this Gitaly instance to serve up -# -# [[storage]] -# name = "other_storage" -# path = "/mnt/other_storage/repositories" -# - -# # You can optionally configure Gitaly to output JSON-formatted log messages to stdout -# [logging] -# format = "json" -# # Additionally exceptions can be reported to Sentry -# sentry_dsn = "https://:@sentry.io/" - -# # You can optionally configure Gitaly to record histogram latencies on GRPC method calls -# [prometheus] -# grpc_latency_buckets = [0.001, 0.005, 0.025, 0.1, 0.5, 1.0, 10.0, 30.0, 60.0, 300.0, 1500.0] - -[gitaly-ruby] -# The directory where gitaly-ruby is installed -dir = "/srv/gitaly-ruby" - -[gitlab-shell] -# The directory where gitlab-shell is installed -dir = "/srv/gitlab-shell" - -# # You can adjust the concurrency of each RPC endpoint -# [[concurrency]] -# rpc = "/gitaly.RepositoryService/GarbageCollect" -# max_per_repo = 1 diff --git a/changelogs/unreleased/jc-data-model-changes.yml b/changelogs/unreleased/jc-data-model-changes.yml new file mode 100644 index 00000000000..e40d3dd664d --- /dev/null +++ b/changelogs/unreleased/jc-data-model-changes.yml @@ -0,0 +1,5 @@ +--- +title: Praefect data model changes with replication +merge_request: 1394 +author: +type: added diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index acf53d2fad2..8ce68530d37 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -19,6 +19,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/version" "gitlab.com/gitlab-org/labkit/tracing" ) @@ -95,10 +96,9 @@ func run(listeners []net.Listener, conf config.Config) error { var ( // top level server dependencies datastore = praefect.NewMemoryDatastore(conf) - coordinator = praefect.NewCoordinator(logger, datastore) - repl = praefect.NewReplMgr("default", logger, datastore, coordinator, praefect.WithWhitelist(conf.Whitelist)) + coordinator = praefect.NewCoordinator(logger, datastore, protoregistry.GitalyProtoFileDescriptors...) + repl = praefect.NewReplMgr("default", logger, datastore, datastore, coordinator, praefect.WithWhitelist(conf.Whitelist)) srv = praefect.NewServer(coordinator, repl, nil, logger) - // signal related signals = []os.Signal{syscall.SIGTERM, syscall.SIGINT} termCh = make(chan os.Signal, len(signals)) @@ -114,14 +114,12 @@ func run(listeners []net.Listener, conf config.Config) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - allBackendServers := append(conf.SecondaryServers, conf.PrimaryServer) - - for _, gitaly := range allBackendServers { - if err := coordinator.RegisterNode(gitaly.Name, gitaly.ListenAddr); err != nil { - return fmt.Errorf("failed to register %s: %s", gitaly.Name, err) + for _, node := range conf.StorageNodes { + if err := coordinator.RegisterNode(node.Storage, node.Address); err != nil { + return fmt.Errorf("failed to register %s: %s", node.Address, err) } - logger.WithField("node_name", gitaly.Name).WithField("gitaly listen addr", gitaly.ListenAddr).Info("registered gitaly node") + logger.WithField("node_address", node.Address).Info("registered gitaly node") } go func() { serverErrors <- repl.ProcessBacklog(ctx) }() diff --git a/internal/helper/storage.go b/internal/helper/storage.go index 4e535a5d61a..f3f0d0ba001 100644 --- a/internal/helper/storage.go +++ b/internal/helper/storage.go @@ -34,6 +34,17 @@ func ExtractGitalyServers(ctx context.Context) (gitalyServersInfo storage.Gitaly return } +// IncomingToOutgoing creates an outgoing context out of an incoming context with the same storage metadata +func IncomingToOutgoing(ctx context.Context) context.Context { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return ctx + } + + return metadata.NewOutgoingContext(ctx, md) + +} + // InjectGitalyServers injects gitaly-servers metadata into an outgoing context func InjectGitalyServers(ctx context.Context, name, address, token string) (context.Context, error) { diff --git a/internal/praefect/common.go b/internal/praefect/common.go index 2df2a482373..a09a292adae 100644 --- a/internal/praefect/common.go +++ b/internal/praefect/common.go @@ -1,13 +1,5 @@ package praefect -import "google.golang.org/grpc" - -// Node is a wrapper around the grpc client connection for a backend Gitaly node -type Node struct { - Storage string - cc *grpc.ClientConn -} - // logging keys to use with logrus WithField const ( logKeyProjectPath = "ProjectPath" diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index 6a2a5b5d5b8..eb0fad56b36 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -15,8 +15,7 @@ type Config struct { ListenAddr string `toml:"listen_addr"` SocketPath string `toml:"socket_path"` - PrimaryServer *models.GitalyServer `toml:"primary_server"` - SecondaryServers []*models.GitalyServer `toml:"secondary_server"` + StorageNodes []*models.StorageNode `toml:"storage_node"` // Whitelist is a list of relative project paths (paths comprised of project // hashes) that are permitted to use high availability features @@ -26,13 +25,6 @@ type Config struct { PrometheusListenAddr string `toml:"prometheus_listen_addr"` } -// GitalyServer allows configuring the servers that RPCs are proxied to -type GitalyServer struct { - Name string `toml:"name"` - ListenAddr string `toml:"listen_addr" split_words:"true"` - Token string `toml:"token"` -} - // FromFile loads the config for the passed file path func FromFile(filePath string) (Config, error) { config := &Config{} @@ -50,32 +42,30 @@ var ( errNoListener = errors.New("no listen address or socket path configured") errNoGitalyServers = errors.New("no primary gitaly backends configured") errDuplicateGitalyAddr = errors.New("gitaly listen addresses are not unique") - errGitalyWithoutName = errors.New("all gitaly servers must have a name") + errGitalyWithoutAddr = errors.New("all gitaly nodes must have an address") ) -var emptyServer = &models.GitalyServer{} - // Validate establishes if the config is valid func (c Config) Validate() error { if c.ListenAddr == "" && c.SocketPath == "" { return errNoListener } - if c.PrimaryServer == nil || c.PrimaryServer == emptyServer { + if len(c.StorageNodes) == 0 { return errNoGitalyServers } - listenAddrs := make(map[string]bool, len(c.SecondaryServers)+1) - for _, gitaly := range append(c.SecondaryServers, c.PrimaryServer) { - if gitaly.Name == "" { - return errGitalyWithoutName + listenAddrs := make(map[string]bool, len(c.StorageNodes)) + for _, node := range c.StorageNodes { + if node.Address == "" { + return errGitalyWithoutAddr } - if _, found := listenAddrs[gitaly.ListenAddr]; found { + if _, found := listenAddrs[node.Address]; found { return errDuplicateGitalyAddr } - listenAddrs[gitaly.ListenAddr] = true + listenAddrs[node.Address] = true } return nil diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go index eace5eb2f3e..b89bdd64805 100644 --- a/internal/praefect/config/config_test.go +++ b/internal/praefect/config/config_test.go @@ -9,10 +9,10 @@ import ( ) func TestConfigValidation(t *testing.T) { - primarySrv := &models.GitalyServer{"test", "localhost:23456", "secret-token"} - secondarySrvs := []*models.GitalyServer{ - {"test1", "localhost:23457", "secret-token"}, - {"test2", "localhost:23458", "secret-token"}, + nodes := []*models.StorageNode{ + {ID: 1, Address: "localhost:23456", Token: "secret-token"}, + {ID: 2, Address: "localhost:23457", Token: "secret-token"}, + {ID: 3, Address: "localhost:23458", Token: "secret-token"}, } testCases := []struct { @@ -22,12 +22,12 @@ func TestConfigValidation(t *testing.T) { }{ { desc: "No ListenAddr or SocketPath", - config: Config{ListenAddr: "", PrimaryServer: primarySrv, SecondaryServers: secondarySrvs}, + config: Config{ListenAddr: "", StorageNodes: nodes}, err: errNoListener, }, { desc: "Only a SocketPath", - config: Config{SocketPath: "/tmp/praefect.socket", PrimaryServer: primarySrv, SecondaryServers: secondarySrvs}, + config: Config{SocketPath: "/tmp/praefect.socket", StorageNodes: nodes}, err: nil, }, { @@ -37,12 +37,12 @@ func TestConfigValidation(t *testing.T) { }, { desc: "duplicate address", - config: Config{ListenAddr: "localhost:1234", PrimaryServer: primarySrv, SecondaryServers: []*models.GitalyServer{primarySrv}}, + config: Config{ListenAddr: "localhost:1234", StorageNodes: append(nodes, &models.StorageNode{Address: nodes[0].Address})}, err: errDuplicateGitalyAddr, }, { desc: "Valid config", - config: Config{ListenAddr: "localhost:1234", PrimaryServer: primarySrv, SecondaryServers: secondarySrvs}, + config: Config{ListenAddr: "localhost:1234", StorageNodes: nodes}, err: nil, }, } @@ -63,18 +63,18 @@ func TestConfigParsing(t *testing.T) { { filePath: "testdata/config.toml", expected: Config{ - PrimaryServer: &models.GitalyServer{ - Name: "default", - ListenAddr: "tcp://gitaly-primary.example.com", - }, - SecondaryServers: []*models.GitalyServer{ + StorageNodes: []*models.StorageNode{ + { + Address: "tcp://gitaly-internal-1.example.com", + Storage: "praefect-internal-1", + }, { - Name: "default", - ListenAddr: "tcp://gitaly-backup1.example.com", + Address: "tcp://gitaly-internal-2.example.com", + Storage: "praefect-internal-2", }, { - Name: "backup", - ListenAddr: "tcp://gitaly-backup2.example.com", + Address: "tcp://gitaly-internal-3.example.com", + Storage: "praefect-internal-3", }, }, Whitelist: []string{"abcd1234", "edfg5678"}, diff --git a/internal/praefect/config/testdata/config.toml b/internal/praefect/config/testdata/config.toml index 81701a359c9..defebcca906 100644 --- a/internal/praefect/config/testdata/config.toml +++ b/internal/praefect/config/testdata/config.toml @@ -3,20 +3,20 @@ socket_path = "" whitelist = ["abcd1234", "edfg5678"] prometheus_listen_addr = "" -[primary_server] - name = "default" - listen_addr = "tcp://gitaly-primary.example.com" - -[[secondary_server]] - name = "default" - listen_addr = "tcp://gitaly-backup1.example.com" - -[[secondary_server]] - name = "backup" - listen_addr = "tcp://gitaly-backup2.example.com" - [logging] format = "" sentry_dsn = "" ruby_sentry_dsn = "" level = "" + +[[storage_node]] + address = "tcp://gitaly-internal-1.example.com" + storage = "praefect-internal-1" + +[[storage_node]] + address = "tcp://gitaly-internal-2.example.com" + storage = "praefect-internal-2" + +[[storage_node]] + address = "tcp://gitaly-internal-3.example.com" + storage = "praefect-internal-3" diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index c238604f363..f00a469b68f 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -2,11 +2,14 @@ package praefect import ( "context" + "errors" "fmt" + "math/rand" "os" "os/signal" "sync" "syscall" + "time" gitalyauth "gitlab.com/gitlab-org/gitaly/auth" gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config" @@ -19,8 +22,6 @@ import ( "gitlab.com/gitlab-org/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) // Coordinator takes care of directing client requests to the appropriate @@ -31,14 +32,14 @@ type Coordinator struct { failoverMutex sync.RWMutex connMutex sync.RWMutex - datastore Datastore + datastore ReplicasDatastore nodes map[string]*grpc.ClientConn registry *protoregistry.Registry } // NewCoordinator returns a new Coordinator that utilizes the provided logger -func NewCoordinator(l *logrus.Logger, datastore Datastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { +func NewCoordinator(l *logrus.Logger, datastore ReplicasDatastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { registry := protoregistry.New() registry.RegisterFiles(fileDescriptors...) @@ -55,19 +56,6 @@ func (c *Coordinator) RegisterProtos(protos ...*descriptor.FileDescriptorProto) return c.registry.RegisterFiles(protos...) } -// GetStorageNode returns the registered node for the given storage location -func (c *Coordinator) GetStorageNode(storage string) (Node, error) { - cc, ok := c.getConn(storage) - if !ok { - return Node{}, fmt.Errorf("no node registered for storage location %q", storage) - } - - return Node{ - Storage: storage, - cc: cc, - }, nil -} - // streamDirector determines which downstream servers receive requests func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) { // For phase 1, we need to route messages based on the storage location @@ -77,34 +65,93 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, c.failoverMutex.RLock() defer c.failoverMutex.RUnlock() - serverConfig, err := c.datastore.GetDefaultPrimary() + frame, err := peeker.Peek() if err != nil { - err := status.Error( - codes.FailedPrecondition, - "no downstream node registered", - ) return nil, nil, err } - // We only need the primary node, as there's only one primary storage - // location per praefect at this time - cc, ok := c.getConn(serverConfig.Name) - if !ok { - return nil, nil, fmt.Errorf("unable to find existing client connection for %s", serverConfig.Name) + mi, err := c.registry.LookupMethod(fullMethodName) + if err != nil { + return nil, nil, err } - ctx, err = helper.InjectGitalyServers(ctx, serverConfig.Name, serverConfig.ListenAddr, serverConfig.Token) + var primary *models.StorageNode + + if mi.Scope == protoregistry.ScopeRepository { + m, err := mi.UnmarshalRequestProto(frame) + if err != nil { + return nil, nil, err + } + + targetRepo, err := mi.TargetRepo(m) + if err != nil { + return nil, nil, err + } + + primary, err = c.datastore.GetPrimary(targetRepo.GetRelativePath()) + + if err != nil { + if err != ErrPrimaryNotSet { + return nil, nil, err + } + // if there are no primaries for this repository, pick one + nodes, err := c.datastore.GetStorageNodes() + if err != nil { + return nil, nil, err + } + + if len(nodes) == 0 { + return nil, nil, fmt.Errorf("no nodes serve storage %s", targetRepo.GetStorageName()) + + } + newPrimary := nodes[rand.New(rand.NewSource(time.Now().Unix())).Intn(len(nodes))] + //newPrimary := nodes[0] + + // set the primary + if err = c.datastore.SetPrimary(targetRepo.GetRelativePath(), newPrimary.ID); err != nil { + return nil, nil, err + } + + primary = &newPrimary + } + + targetRepo.StorageName = primary.Storage + + b, err := proxy.Codec().Marshal(m) + if err != nil { + return nil, nil, err + } + if err = peeker.Modify(b); err != nil { + return nil, nil, err + } + + } else { + //TODO: For now we just pick a random storage node for a non repository scoped RPC, but we will need to figure out exactly how to + // proxy requests that are not repository scoped + node, err := c.datastore.GetStorageNodes() + if err != nil { + return nil, nil, err + } + if len(node) == 0 { + return nil, nil, errors.New("no node storages found") + } + primary = &node[0] + } + + // We only need the primary node, as there's only one primary storage + // location per praefect at this time + cc, err := c.GetConnection(primary.Storage) if err != nil { - return nil, nil, err + return nil, nil, fmt.Errorf("unable to find existing client connection for %s", primary.Storage) } - return ctx, cc, nil + return helper.IncomingToOutgoing(ctx), cc, nil } // RegisterNode will direct traffic to the supplied downstream connection when the storage location // is encountered. -func (c *Coordinator) RegisterNode(storageName, listenAddr string) error { - conn, err := client.Dial(listenAddr, +func (c *Coordinator) RegisterNode(storage, address string) error { + conn, err := client.Dial(address, []grpc.DialOption{ grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())), grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(gitalyconfig.Config.Auth.Token)), @@ -114,23 +161,28 @@ func (c *Coordinator) RegisterNode(storageName, listenAddr string) error { return err } - c.setConn(storageName, conn) + c.setConn(storage, conn) return nil } -func (c *Coordinator) setConn(storageName string, conn *grpc.ClientConn) { +func (c *Coordinator) setConn(storage string, conn *grpc.ClientConn) { c.connMutex.Lock() - c.nodes[storageName] = conn + c.nodes[storage] = conn c.connMutex.Unlock() } -func (c *Coordinator) getConn(storageName string) (*grpc.ClientConn, bool) { +// GetConnection gets the grpc client connection based on an address +func (c *Coordinator) GetConnection(storage string) (*grpc.ClientConn, error) { c.connMutex.RLock() - cc, ok := c.nodes[storageName] + cc, ok := c.nodes[storage] c.connMutex.RUnlock() + if !ok { + return nil, errors.New("client connection not found") + } + + return cc, nil - return cc, ok } // FailoverRotation waits for the SIGUSR1 signal, then promotes the next secondary to be primary @@ -146,55 +198,7 @@ func (c *Coordinator) handleSignalAndRotate() { <-failoverChan c.failoverMutex.Lock() - primary, err := c.datastore.GetDefaultPrimary() - if err != nil { - c.log.Fatalf("error when getting default primary: %v", err) - } - - if err := c.rotateSecondaryToPrimary(primary); err != nil { - c.log.WithError(err).Error("rotating secondary") - } + c.log.Info("failover happens") c.failoverMutex.Unlock() } } - -func (c *Coordinator) rotateSecondaryToPrimary(primary models.GitalyServer) error { - repositories, err := c.datastore.GetRepositoriesForPrimary(primary) - if err != nil { - return err - } - - for _, repoPath := range repositories { - secondaries, err := c.datastore.GetShardSecondaries(models.Repository{ - RelativePath: repoPath, - }) - if err != nil { - return fmt.Errorf("getting secondaries: %v", err) - } - - newPrimary := secondaries[0] - secondaries = append(secondaries[1:], primary) - - if err = c.datastore.SetShardPrimary(models.Repository{ - RelativePath: repoPath, - }, newPrimary); err != nil { - return fmt.Errorf("setting primary: %v", err) - } - - if err = c.datastore.SetShardSecondaries(models.Repository{ - RelativePath: repoPath, - }, secondaries); err != nil { - return fmt.Errorf("setting secondaries: %v", err) - } - } - - // set the new default primary - primary, err = c.datastore.GetShardPrimary(models.Repository{ - RelativePath: repositories[0], - }) - if err != nil { - return fmt.Errorf("getting shard primary: %v", err) - } - - return c.datastore.SetDefaultPrimary(primary) -} diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 50045f8a005..0275c604845 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -5,9 +5,6 @@ import ( "testing" "github.com/sirupsen/logrus" - "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/models" ) var testLogger = logrus.New() @@ -17,31 +14,5 @@ func init() { } func TestSecondaryRotation(t *testing.T) { - cfg := config.Config{ - PrimaryServer: &models.GitalyServer{Name: "primary"}, - SecondaryServers: []*models.GitalyServer{&models.GitalyServer{Name: "secondary_1"}, &models.GitalyServer{Name: "secondary_2"}}, - Whitelist: []string{"/repoA", "/repoB"}, - } - d := NewMemoryDatastore(cfg) - c := NewCoordinator(testLogger, d) - - primary, err := d.GetDefaultPrimary() - require.NoError(t, err) - - require.NoError(t, c.rotateSecondaryToPrimary(primary)) - - primary, err = d.GetDefaultPrimary() - require.NoError(t, err) - require.Equal(t, *cfg.SecondaryServers[0], primary, "the first secondary should have gotten promoted to be primary") - - repositories, err := d.GetRepositoriesForPrimary(primary) - require.NoError(t, err) - - for _, repository := range repositories { - shardSecondaries, err := d.GetShardSecondaries(models.Repository{RelativePath: repository}) - require.NoError(t, err) - - require.Len(t, shardSecondaries, 2) - require.Equal(t, *cfg.SecondaryServers[1], shardSecondaries[0]) - } + t.Skip("secondary rotation will change with the new data model") } diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go index 5678c6a247c..85fa22a2325 100644 --- a/internal/praefect/datastore.go +++ b/internal/praefect/datastore.go @@ -34,7 +34,7 @@ const ( // JobStateComplete indicates the job is now complete JobStateComplete // JobStateCancelled indicates the job was cancelled. This can occur if the - // job is no longer relevant (e.g. a node is moved out of a shard) + // job is no longer relevant (e.g. a node is moved out of a repository) JobStateCancelled ) @@ -42,10 +42,11 @@ const ( // meant for updating the repository so that it is synced with the primary // copy. Scheduled indicates when a replication job should be performed. type ReplJob struct { - ID uint64 // autoincrement ID - Target string // which storage location to replicate to? - Source models.Repository // source for replication - State JobState + ID uint64 // autoincrement ID + TargetNodeID int // which node to replicate to? + SourceStorage string + Source models.Repository // source for replication + State JobState } // replJobs provides sort manipulation behavior @@ -64,32 +65,26 @@ func (b byJobID) Less(i, j int) bool { return b.replJobs[i].ID < b.replJobs[j].I type Datastore interface { ReplJobsDatastore ReplicasDatastore - TemporaryDatastore -} - -// TemporaryDatastore contains methods that will go away once we move to a SQL datastore -type TemporaryDatastore interface { - GetDefaultPrimary() (models.GitalyServer, error) - SetDefaultPrimary(primary models.GitalyServer) error } // ReplicasDatastore manages accessing and setting which secondary replicas // backup a repository type ReplicasDatastore interface { - // GetSecondaries will retrieve all secondary replica storage locations for - // a primary replica - GetShardSecondaries(repo models.Repository) ([]models.GitalyServer, error) + GetReplicas(relativePath string) ([]models.StorageNode, error) + + GetStorageNode(nodeID int) (models.StorageNode, error) + + GetStorageNodes() ([]models.StorageNode, error) + + GetPrimary(relativePath string) (*models.StorageNode, error) - GetShardPrimary(repo models.Repository) (models.GitalyServer, error) + SetPrimary(relativePath string, storageNodeID int) error - // SetSecondaries will set the secondary storage locations for a repository - // in a primary replica. - SetShardSecondaries(repo models.Repository, secondaries []models.GitalyServer) error + AddReplica(relativePath string, storageNodeID int) error - SetShardPrimary(repo models.Repository, primary models.GitalyServer) error + RemoveReplica(relativePath string, storageNodeID int) error - // GetRepositoriesForPrimary returns a map of all of the active shards for a given primary - GetRepositoriesForPrimary(primary models.GitalyServer) ([]string, error) + GetRepository(relativePath string) (*models.Repository, error) } // ReplJobsDatastore represents the behavior needed for fetching and updating @@ -98,58 +93,53 @@ type ReplJobsDatastore interface { // GetJobs fetches a list of chronologically ordered replication // jobs for the given storage replica. The returned list will be at most // count-length. - GetJobs(flag JobState, node string, count int) ([]ReplJob, error) + GetJobs(flag JobState, nodeID int, count int) ([]ReplJob, error) - // CreateSecondaryJobs will create replication jobs for each secondary + // CreateReplicaJobs will create replication jobs for each secondary // replica of a repository known to the datastore. A set of replication job // ID's for the created jobs will be returned upon success. - CreateSecondaryReplJobs(source models.Repository) ([]uint64, error) + CreateReplicaReplJobs(relativePath string) ([]uint64, error) // UpdateReplJob updates the state of an existing replication job UpdateReplJob(jobID uint64, newState JobState) error } -// shard is a set of primary and secondary storage replicas for a project -type shard struct { - primary models.GitalyServer - secondaries []models.GitalyServer -} - type jobRecord struct { - relativePath string // project's relative path - targetNode string - state JobState + sourceStorage string + relativePath string // project's relative path + targetNodeID int + state JobState } // MemoryDatastore is a simple datastore that isn't persisted to disk. It is // only intended for early beta requirements and as a reference implementation // for the eventual SQL implementation type MemoryDatastore struct { - replicas *struct { - sync.RWMutex - m map[string]shard // keyed by project's relative path - } - jobs *struct { sync.RWMutex next uint64 records map[uint64]jobRecord // all jobs indexed by ID } - primary *struct { + storageNodes *struct { sync.RWMutex - server models.GitalyServer + m map[int]models.StorageNode + } + + repositories *struct { + sync.RWMutex + m map[string]models.Repository } } // NewMemoryDatastore returns an initialized in-memory datastore func NewMemoryDatastore(cfg config.Config) *MemoryDatastore { m := &MemoryDatastore{ - replicas: &struct { + storageNodes: &struct { sync.RWMutex - m map[string]shard + m map[int]models.StorageNode }{ - m: map[string]shard{}, + m: map[int]models.StorageNode{}, }, jobs: &struct { sync.RWMutex @@ -159,114 +149,190 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore { next: 0, records: map[uint64]jobRecord{}, }, - primary: &struct { + repositories: &struct { sync.RWMutex - server models.GitalyServer + m map[string]models.Repository }{ - server: models.GitalyServer{ - Name: cfg.PrimaryServer.Name, - ListenAddr: cfg.PrimaryServer.ListenAddr, - Token: cfg.PrimaryServer.Token, - }, + m: map[string]models.Repository{}, }, } - secondaryServers := make([]models.GitalyServer, len(cfg.SecondaryServers)) - for i, server := range cfg.SecondaryServers { - secondaryServers[i] = *server + for i, storageNode := range cfg.StorageNodes { + storageNode.ID = i + m.storageNodes.m[i] = *storageNode } - for _, repo := range cfg.Whitelist { - // store the configuration file specified shard - m.replicas.m[repo] = shard{ - primary: *cfg.PrimaryServer, - secondaries: secondaryServers, + for _, repoPath := range cfg.Whitelist { + repo := models.Repository{ + RelativePath: repoPath, } - - // initialize replication job queue to replicate all whitelisted repos - // to every secondary server - for _, secondary := range cfg.SecondaryServers { - m.jobs.next++ - m.jobs.records[m.jobs.next] = jobRecord{ - state: JobStateReady, - targetNode: secondary.Name, - relativePath: repo, + for storageID, storageNode := range cfg.StorageNodes { + + // By default, pick the first storage node to be the primary. We can change this later to pick a randomly selected node + // to be the primary + if repo.Primary == (models.StorageNode{}) { + repo.Primary = *storageNode + } else { + repo.Replicas = append(repo.Replicas, *storageNode) + // initialize replication job queue to replicate all whitelisted repos + // to every replica + m.jobs.next++ + m.jobs.records[m.jobs.next] = jobRecord{ + state: JobStateReady, + targetNodeID: storageID, + sourceStorage: repo.Primary.Storage, + relativePath: repoPath, + } } } + m.repositories.m[repoPath] = repo } return m } -// GetShardSecondaries will return the set of secondary storage locations for a -// given repository if they exist -func (md *MemoryDatastore) GetShardSecondaries(primary models.Repository) ([]models.GitalyServer, error) { - shard, _ := md.getShard(primary.RelativePath) +// GetReplicas gets the secondaries for a repository based on the relative path +func (md *MemoryDatastore) GetReplicas(relativePath string) ([]models.StorageNode, error) { + md.repositories.RLock() + md.storageNodes.RLock() + defer md.storageNodes.RUnlock() + defer md.repositories.RUnlock() + + repository, ok := md.repositories.m[relativePath] + if !ok { + return nil, errors.New("repository not found") + } + + return repository.Replicas, nil +} + +// GetStorageNode gets all storage nodes +func (md *MemoryDatastore) GetStorageNode(nodeID int) (models.StorageNode, error) { + md.storageNodes.RLock() + defer md.storageNodes.RUnlock() - return shard.secondaries, nil + node, ok := md.storageNodes.m[nodeID] + if !ok { + return models.StorageNode{}, errors.New("node not found") + } + + return node, nil } -// SetShardSecondaries will replace the set of replicas for a repository -func (md *MemoryDatastore) SetShardSecondaries(repo models.Repository, secondaries []models.GitalyServer) error { - md.replicas.Lock() - defer md.replicas.Unlock() +// GetStorageNodes gets all storage nodes +func (md *MemoryDatastore) GetStorageNodes() ([]models.StorageNode, error) { + md.storageNodes.RLock() + defer md.storageNodes.RUnlock() - shard := md.replicas.m[repo.RelativePath] - shard.secondaries = secondaries - md.replicas.m[repo.RelativePath] = shard + var storageNodes []models.StorageNode + for _, storageNode := range md.storageNodes.m { + storageNodes = append(storageNodes, storageNode) + } - return nil + return storageNodes, nil } -// SetShardPrimary sets the primary for a repository -func (md *MemoryDatastore) SetShardPrimary(repo models.Repository, primary models.GitalyServer) error { - md.replicas.Lock() - defer md.replicas.Unlock() +// GetPrimary gets the primary storage node for a repository of a repository relative path +func (md *MemoryDatastore) GetPrimary(relativePath string) (*models.StorageNode, error) { + md.repositories.RLock() + defer md.repositories.RUnlock() - shard := md.replicas.m[repo.RelativePath] - shard.primary = primary - md.replicas.m[repo.RelativePath] = shard + repository, ok := md.repositories.m[relativePath] + if !ok { + return nil, ErrPrimaryNotSet + } + storageNode, ok := md.storageNodes.m[repository.Primary.ID] + if !ok { + return nil, errors.New("node storage not found") + } + return &storageNode, nil + +} + +// SetPrimary sets the primary storagee node for a repository of a repository relative path +func (md *MemoryDatastore) SetPrimary(relativePath string, storageNodeID int) error { + md.repositories.Lock() + defer md.repositories.Unlock() + + repository, ok := md.repositories.m[relativePath] + if !ok { + repository = models.Repository{RelativePath: relativePath} + } + + storageNode, ok := md.storageNodes.m[storageNodeID] + if !ok { + return errors.New("node storage not found") + } + + repository.Primary = storageNode + + md.repositories.m[relativePath] = repository return nil } -// GetShardPrimary gets the primary for a repository -func (md *MemoryDatastore) GetShardPrimary(repo models.Repository) (models.GitalyServer, error) { - md.replicas.Lock() - defer md.replicas.Unlock() +// AddReplica adds a secondary to a repository of a repository relative path +func (md *MemoryDatastore) AddReplica(relativePath string, storageNodeID int) error { + md.repositories.Lock() + defer md.repositories.Unlock() + + repository, ok := md.repositories.m[relativePath] + if !ok { + return errors.New("repository not found") + } + + storageNode, ok := md.storageNodes.m[storageNodeID] + if !ok { + return errors.New("node storage not found") + } - shard := md.replicas.m[repo.RelativePath] - return shard.primary, nil + repository.Replicas = append(repository.Replicas, storageNode) + + md.repositories.m[relativePath] = repository + return nil } -// GetRepositoriesForPrimary gets all repositories -func (md *MemoryDatastore) GetRepositoriesForPrimary(primary models.GitalyServer) ([]string, error) { - md.replicas.Lock() - defer md.replicas.Unlock() +// RemoveReplica removes a secondary from a repository of a repository relative path +func (md *MemoryDatastore) RemoveReplica(relativePath string, storageNodeID int) error { + md.repositories.Lock() + defer md.repositories.Unlock() - repositories := make([]string, 0, len(md.replicas.m)) + repository, ok := md.repositories.m[relativePath] + if !ok { + return errors.New("repository not found") + } - for repository := range md.replicas.m { - repositories = append(repositories, repository) + var secondaries []models.StorageNode + for _, secondary := range repository.Replicas { + if secondary.ID != storageNodeID { + secondaries = append(secondaries, secondary) + } } - return repositories, nil + repository.Replicas = secondaries + md.repositories.m[relativePath] = repository + return nil } -func (md *MemoryDatastore) getShard(project string) (shard, bool) { - md.replicas.RLock() - replicas, ok := md.replicas.m[project] - md.replicas.RUnlock() +// GetRepository gets the repository for a repository relative path +func (md *MemoryDatastore) GetRepository(relativePath string) (*models.Repository, error) { + md.repositories.Lock() + defer md.repositories.Unlock() - return replicas, ok + repository, ok := md.repositories.m[relativePath] + if !ok { + return nil, errors.New("repository not found") + } + + return &repository, nil } -// ErrSecondariesMissing indicates the repository does not have any backup +// ErrReplicasMissing indicates the repository does not have any backup // replicas -var ErrSecondariesMissing = errors.New("repository missing secondary replicas") +var ErrReplicasMissing = errors.New("repository missing secondary replicas") // GetJobs is a more general method to retrieve jobs of a certain state from the datastore -func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([]ReplJob, error) { +func (md *MemoryDatastore) GetJobs(state JobState, targetNodeID int, count int) ([]ReplJob, error) { md.jobs.RLock() defer md.jobs.RUnlock() @@ -274,7 +340,7 @@ func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([ for i, record := range md.jobs.records { // state is a bitmap that is a combination of one or more JobStates - if record.state&state != 0 && record.targetNode == storage { + if record.state&state != 0 && record.targetNodeID == targetNodeID { job, err := md.replJobFromRecord(i, record) if err != nil { return nil, err @@ -293,60 +359,52 @@ func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([ } // replJobFromRecord constructs a replication job from a record and by cross -// referencing the current shard for the project being replicated +// referencing the current repository for the project being replicated func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (ReplJob, error) { - shard, ok := md.getShard(record.relativePath) - if !ok { - return ReplJob{}, fmt.Errorf( - "unable to find shard for project at relative path %q", - record.relativePath, - ) - } - return ReplJob{ ID: jobID, Source: models.Repository{ RelativePath: record.relativePath, - Storage: shard.primary.Name, }, - State: record.state, - Target: record.targetNode, + SourceStorage: record.sourceStorage, + State: record.state, + TargetNodeID: record.targetNodeID, }, nil } -// ErrInvalidReplTarget indicates a targetNode repository cannot be chosen because +// ErrInvalidReplTarget indicates a targetStorage repository cannot be chosen because // it fails preconditions for being replicatable -var ErrInvalidReplTarget = errors.New("targetNode repository fails preconditions for replication") +var ErrInvalidReplTarget = errors.New("targetStorage repository fails preconditions for replication") -// CreateSecondaryReplJobs creates a replication job for each secondary that +// CreateReplicaReplJobs creates a replication job for each secondary that // backs the specified repository. Upon success, the job IDs will be returned. -func (md *MemoryDatastore) CreateSecondaryReplJobs(source models.Repository) ([]uint64, error) { +func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string) ([]uint64, error) { md.jobs.Lock() defer md.jobs.Unlock() - emptyRepo := models.Repository{} - if source == emptyRepo { + if relativePath == "" { return nil, errors.New("invalid source repository") } - shard, ok := md.getShard(source.RelativePath) - if !ok { + repository, err := md.GetRepository(relativePath) + if err != nil { return nil, fmt.Errorf( - "unable to find shard for project at relative path %q", - source.RelativePath, + "unable to find repository for project at relative path %q", + relativePath, ) } var jobIDs []uint64 - for _, secondary := range shard.secondaries { + for _, secondary := range repository.Replicas { nextID := uint64(len(md.jobs.records) + 1) md.jobs.next++ md.jobs.records[md.jobs.next] = jobRecord{ - targetNode: secondary.Name, - state: JobStatePending, - relativePath: source.RelativePath, + targetNodeID: secondary.ID, + state: JobStatePending, + relativePath: relativePath, + sourceStorage: repository.Primary.Storage, } jobIDs = append(jobIDs, nextID) @@ -375,36 +433,3 @@ func (md *MemoryDatastore) UpdateReplJob(jobID uint64, newState JobState) error md.jobs.records[jobID] = job return nil } - -// SetPrimary sets the primary datastore location -func (md *MemoryDatastore) SetPrimary(primary models.GitalyServer) error { - md.primary.Lock() - defer md.primary.Unlock() - - md.primary.server = primary - - return nil -} - -// GetDefaultPrimary gets the primary datastore location -func (md *MemoryDatastore) GetDefaultPrimary() (models.GitalyServer, error) { - md.primary.RLock() - defer md.primary.RUnlock() - - primary := md.primary.server - if primary == (models.GitalyServer{}) { - return primary, ErrPrimaryNotSet - } - - return primary, nil -} - -// SetDefaultPrimary gets the primary datastore location -func (md *MemoryDatastore) SetDefaultPrimary(primary models.GitalyServer) error { - md.primary.RLock() - defer md.primary.RUnlock() - - md.primary.server = primary - - return nil -} diff --git a/internal/praefect/datastore_memory_test.go b/internal/praefect/datastore_memory_test.go index 6099a8328e5..a618d946644 100644 --- a/internal/praefect/datastore_memory_test.go +++ b/internal/praefect/datastore_memory_test.go @@ -1,93 +1,94 @@ -package praefect_test +package praefect import ( "testing" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/internal/praefect" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" ) // TestMemoryDatastoreWhitelist verifies that the in-memory datastore will -// populate itself with the correct replication jobs and shards when initialized +// populate itself with the correct replication jobs and repositories when initialized // with a configuration file specifying the shard and whitelisted repositories. func TestMemoryDatastoreWhitelist(t *testing.T) { - cfg := config.Config{ - PrimaryServer: &models.GitalyServer{ - Name: "default", - }, - SecondaryServers: []*models.GitalyServer{ - { - Name: "backup-1", - }, - { - Name: "backup-2", - }, - }, - Whitelist: []string{"abcd1234", "5678efgh"}, - } - - mds := praefect.NewMemoryDatastore(cfg) - repo1 := models.Repository{ - RelativePath: cfg.Whitelist[0], - Storage: cfg.PrimaryServer.Name, + RelativePath: "abcd1234", } - repo2 := models.Repository{ - RelativePath: cfg.Whitelist[1], - Storage: cfg.PrimaryServer.Name, + RelativePath: "5678efgh", } + mds := NewMemoryDatastore(config.Config{ + StorageNodes: []*models.StorageNode{ + &models.StorageNode{ + ID: 0, + Address: "tcp://default", + Storage: "praefect-internal-1", + }, + &models.StorageNode{ + ID: 1, + Address: "tcp://backup-2", + Storage: "praefect-internal-2", + }, &models.StorageNode{ + ID: 2, + Address: "tcp://backup-2", + Storage: "praefect-internal-3", + }}, + Whitelist: []string{repo1.RelativePath, repo2.RelativePath}, + }) - expectSecondaries := []models.GitalyServer{ - models.GitalyServer{Name: cfg.SecondaryServers[0].Name}, - models.GitalyServer{Name: cfg.SecondaryServers[1].Name}, + expectReplicas := []models.StorageNode{ + mds.storageNodes.m[1], + mds.storageNodes.m[2], } for _, repo := range []models.Repository{repo1, repo2} { - actualSecondaries, err := mds.GetShardSecondaries(repo) + actualReplicas, err := mds.GetReplicas(repo.RelativePath) require.NoError(t, err) - require.ElementsMatch(t, expectSecondaries, actualSecondaries) + require.ElementsMatch(t, expectReplicas, actualReplicas) } - backup1 := cfg.SecondaryServers[0] - backup2 := cfg.SecondaryServers[1] + backup1 := mds.storageNodes.m[1] + backup2 := mds.storageNodes.m[2] - backup1ExpectedJobs := []praefect.ReplJob{ - praefect.ReplJob{ - ID: 1, - Target: backup1.Name, - Source: repo1, - State: praefect.JobStateReady, + backup1ExpectedJobs := []ReplJob{ + ReplJob{ + ID: 1, + TargetNodeID: backup1.ID, + Source: models.Repository{RelativePath: repo1.RelativePath}, + SourceStorage: "praefect-internal-1", + State: JobStateReady, }, - praefect.ReplJob{ - ID: 3, - Target: backup1.Name, - Source: repo2, - State: praefect.JobStateReady, + ReplJob{ + ID: 3, + TargetNodeID: backup1.ID, + Source: models.Repository{RelativePath: repo2.RelativePath}, + SourceStorage: "praefect-internal-1", + State: JobStateReady, }, } - backup2ExpectedJobs := []praefect.ReplJob{ - praefect.ReplJob{ - ID: 2, - Target: backup2.Name, - Source: repo1, - State: praefect.JobStateReady, + backup2ExpectedJobs := []ReplJob{ + ReplJob{ + ID: 2, + TargetNodeID: backup2.ID, + Source: models.Repository{RelativePath: repo1.RelativePath}, + SourceStorage: "praefect-internal-1", + State: JobStateReady, }, - praefect.ReplJob{ - ID: 4, - Target: backup2.Name, - Source: repo2, - State: praefect.JobStateReady, + ReplJob{ + ID: 4, + TargetNodeID: backup2.ID, + Source: models.Repository{RelativePath: repo2.RelativePath}, + SourceStorage: "praefect-internal-1", + State: JobStateReady, }, } - backup1ActualJobs, err := mds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, backup1.Name, 10) + backup1ActualJobs, err := mds.GetJobs(JobStatePending|JobStateReady, backup1.ID, 10) require.NoError(t, err) require.Equal(t, backup1ExpectedJobs, backup1ActualJobs) - backup2ActualJobs, err := mds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, backup2.Name, 10) + backup2ActualJobs, err := mds.GetJobs(JobStatePending|JobStateReady, backup2.ID, 10) require.NoError(t, err) require.Equal(t, backup2ActualJobs, backup2ExpectedJobs) diff --git a/internal/praefect/datastore_test.go b/internal/praefect/datastore_test.go index 417a04be2e9..654d5ea7ed1 100644 --- a/internal/praefect/datastore_test.go +++ b/internal/praefect/datastore_test.go @@ -1,95 +1,104 @@ -package praefect_test +package praefect import ( "testing" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/internal/praefect" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" ) -const ( - stor1 = "default" // usually the primary storage location - stor2 = "backup-1" // usually the seoncary storage location +var ( + stor1 = models.StorageNode{ + ID: 0, + Address: "tcp://address-1", + Storage: "praefect-storage-1", + } + stor2 = models.StorageNode{ + ID: 1, + Address: "tcp://address-2", + Storage: "praefect-storage-2", + } proj1 = "abcd1234" // imagine this is a legit project hash ) var ( - repo1Primary = models.Repository{ + repo1Repository = models.Repository{ RelativePath: proj1, - Storage: stor1, } ) var operations = []struct { desc string - opFn func(*testing.T, praefect.Datastore) + opFn func(*testing.T, Datastore) }{ { desc: "query an empty datastore", - opFn: func(t *testing.T, ds praefect.Datastore) { - jobs, err := ds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, stor1, 1) + opFn: func(t *testing.T, ds Datastore) { + jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor1.ID, 1) require.NoError(t, err) require.Len(t, jobs, 0) }, }, { desc: "insert first replication job before secondary mapped to primary", - opFn: func(t *testing.T, ds praefect.Datastore) { - _, err := ds.CreateSecondaryReplJobs(repo1Primary) - require.Error(t, err, praefect.ErrInvalidReplTarget) + opFn: func(t *testing.T, ds Datastore) { + _, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath) + require.Error(t, err, ErrInvalidReplTarget) }, }, { - desc: "set the primary for the shard", - opFn: func(t *testing.T, ds praefect.Datastore) { - err := ds.SetShardPrimary(repo1Primary, models.GitalyServer{Name: stor1}) + desc: "set the primary for the repository", + opFn: func(t *testing.T, ds Datastore) { + err := ds.SetPrimary(repo1Repository.RelativePath, stor1.ID) require.NoError(t, err) }, }, { - desc: "associate the replication job target with a primary", - opFn: func(t *testing.T, ds praefect.Datastore) { - err := ds.SetShardSecondaries(repo1Primary, []models.GitalyServer{models.GitalyServer{Name: stor2}}) + desc: "add a secondary replica for the repository", + opFn: func(t *testing.T, ds Datastore) { + err := ds.AddReplica(repo1Repository.RelativePath, stor2.ID) require.NoError(t, err) }, }, { desc: "insert first replication job after secondary mapped to primary", - opFn: func(t *testing.T, ds praefect.Datastore) { - ids, err := ds.CreateSecondaryReplJobs(repo1Primary) + opFn: func(t *testing.T, ds Datastore) { + ids, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath) require.NoError(t, err) require.Equal(t, []uint64{1}, ids) }, }, { desc: "fetch inserted replication jobs after primary mapped", - opFn: func(t *testing.T, ds praefect.Datastore) { - jobs, err := ds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, stor2, 10) + opFn: func(t *testing.T, ds Datastore) { + jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor2.ID, 10) require.NoError(t, err) require.Len(t, jobs, 1) - expectedJob := praefect.ReplJob{ - ID: 1, - Source: repo1Primary, - Target: stor2, - State: praefect.JobStatePending, + expectedJob := ReplJob{ + ID: 1, + Source: models.Repository{ + RelativePath: repo1Repository.RelativePath, + }, + SourceStorage: "praefect-storage-1", + TargetNodeID: stor2.ID, + State: JobStatePending, } require.Equal(t, expectedJob, jobs[0]) }, }, { desc: "mark replication job done", - opFn: func(t *testing.T, ds praefect.Datastore) { - err := ds.UpdateReplJob(1, praefect.JobStateComplete) + opFn: func(t *testing.T, ds Datastore) { + err := ds.UpdateReplJob(1, JobStateComplete) require.NoError(t, err) }, }, { desc: "try fetching completed replication job", - opFn: func(t *testing.T, ds praefect.Datastore) { - jobs, err := ds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, stor1, 1) + opFn: func(t *testing.T, ds Datastore) { + jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor1.ID, 1) require.NoError(t, err) require.Len(t, jobs, 0) }, @@ -97,14 +106,11 @@ var operations = []struct { } // TODO: add SQL datastore flavor -var flavors = map[string]func() praefect.Datastore{ - "in-memory-datastore": func() praefect.Datastore { - return praefect.NewMemoryDatastore( - config.Config{ - PrimaryServer: &models.GitalyServer{ - Name: "default", - }, - }) +var flavors = map[string]func() Datastore{ + "in-memory-datastore": func() Datastore { + return NewMemoryDatastore(config.Config{ + StorageNodes: []*models.StorageNode{&stor1, &stor2}, + }) }, } diff --git a/internal/praefect/mock/mock.pb.go b/internal/praefect/mock/mock.pb.go index b8a8afb0122..7324cbcaf68 100644 --- a/internal/praefect/mock/mock.pb.go +++ b/internal/praefect/mock/mock.pb.go @@ -9,7 +9,10 @@ import ( math "math" proto "github.com/golang/protobuf/proto" + _ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" ) // Reference imports to suppress errors if they are not otherwise used. @@ -109,16 +112,17 @@ func init() { func init() { proto.RegisterFile("mock/mock.proto", fileDescriptor_5ed43251284e3118) } var fileDescriptor_5ed43251284e3118 = []byte{ - // 139 bytes of a gzipped FileDescriptorProto + // 157 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcf, 0xcd, 0x4f, 0xce, - 0xd6, 0x07, 0x11, 0x7a, 0x05, 0x45, 0xf9, 0x25, 0xf9, 0x42, 0x2c, 0x20, 0xb6, 0x92, 0x2a, 0x17, - 0x6f, 0x70, 0x66, 0x6e, 0x41, 0x4e, 0x6a, 0x50, 0x6a, 0x61, 0x69, 0x6a, 0x71, 0x89, 0x90, 0x08, - 0x17, 0x6b, 0x59, 0x62, 0x4e, 0x69, 0xaa, 0x04, 0xa3, 0x02, 0xa3, 0x06, 0x6b, 0x10, 0x84, 0xa3, - 0xa4, 0xc6, 0xc5, 0x07, 0x53, 0x56, 0x5c, 0x90, 0x9f, 0x57, 0x9c, 0x8a, 0x50, 0xc7, 0x84, 0xa4, - 0xce, 0x28, 0x00, 0x66, 0x5c, 0x70, 0x6a, 0x51, 0x59, 0x66, 0x72, 0xaa, 0x90, 0x3d, 0x97, 0x00, - 0x44, 0x20, 0x34, 0x2f, 0xb1, 0xa8, 0x12, 0x4c, 0x08, 0x09, 0xeb, 0x81, 0x9d, 0x81, 0x62, 0xaf, - 0x94, 0x08, 0xaa, 0x20, 0xc4, 0x16, 0x25, 0x86, 0x24, 0x36, 0xb0, 0x6b, 0x8d, 0x01, 0x01, 0x00, - 0x00, 0xff, 0xff, 0xb7, 0xeb, 0x46, 0xfb, 0xc0, 0x00, 0x00, 0x00, + 0xd6, 0x07, 0x11, 0x7a, 0x05, 0x45, 0xf9, 0x25, 0xf9, 0x42, 0x2c, 0x20, 0xb6, 0x14, 0x4f, 0x71, + 0x46, 0x62, 0x51, 0x6a, 0x0a, 0x44, 0x4c, 0x49, 0x95, 0x8b, 0x37, 0x38, 0x33, 0xb7, 0x20, 0x27, + 0x35, 0x28, 0xb5, 0xb0, 0x34, 0xb5, 0xb8, 0x44, 0x48, 0x84, 0x8b, 0xb5, 0x2c, 0x31, 0xa7, 0x34, + 0x55, 0x82, 0x51, 0x81, 0x51, 0x83, 0x35, 0x08, 0xc2, 0x51, 0x52, 0xe3, 0xe2, 0x83, 0x29, 0x2b, + 0x2e, 0xc8, 0xcf, 0x2b, 0x4e, 0x45, 0xa8, 0x63, 0x42, 0x52, 0x67, 0x14, 0x01, 0x33, 0x2e, 0x38, + 0xb5, 0xa8, 0x2c, 0x33, 0x39, 0x55, 0xc8, 0x9d, 0x4b, 0x00, 0x22, 0x10, 0x9a, 0x97, 0x58, 0x54, + 0x09, 0x26, 0x84, 0x84, 0xf5, 0xc0, 0x8e, 0x42, 0xb1, 0x57, 0x4a, 0x04, 0x55, 0x10, 0x62, 0x8b, + 0x12, 0xc7, 0xaf, 0xe9, 0x1a, 0x2c, 0x1c, 0x4c, 0x02, 0x8c, 0x49, 0x6c, 0x60, 0xf7, 0x1a, 0x03, + 0x02, 0x00, 0x00, 0xff, 0xff, 0xb5, 0x14, 0x6a, 0x14, 0xd6, 0x00, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -160,6 +164,14 @@ type SimpleServiceServer interface { SimpleUnaryUnary(context.Context, *SimpleRequest) (*SimpleResponse, error) } +// UnimplementedSimpleServiceServer can be embedded to have forward compatible implementations. +type UnimplementedSimpleServiceServer struct { +} + +func (*UnimplementedSimpleServiceServer) SimpleUnaryUnary(ctx context.Context, req *SimpleRequest) (*SimpleResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method SimpleUnaryUnary not implemented") +} + func RegisterSimpleServiceServer(s *grpc.Server, srv SimpleServiceServer) { s.RegisterService(&_SimpleService_serviceDesc, srv) } diff --git a/internal/praefect/mock/mock.proto b/internal/praefect/mock/mock.proto index aa6ec842a11..59e79d3b91d 100644 --- a/internal/praefect/mock/mock.proto +++ b/internal/praefect/mock/mock.proto @@ -7,6 +7,8 @@ syntax = "proto3"; package mock; +import "shared.proto"; + message SimpleRequest { int32 value = 1; } @@ -17,7 +19,10 @@ message SimpleResponse { service SimpleService { // SimpleUnaryUnary is a simple unary request with unary response rpc SimpleUnaryUnary(SimpleRequest) returns (SimpleResponse) { - option (gitaly.op_type).op = ACCESSOR; + option (gitaly.op_type) = { + op: ACCESSOR + scope_level: SERVER + }; } } diff --git a/internal/praefect/mocksvc_test.go b/internal/praefect/mocksvc_test.go index f6e01811b44..adcf7a65e4b 100644 --- a/internal/praefect/mocksvc_test.go +++ b/internal/praefect/mocksvc_test.go @@ -1,4 +1,4 @@ -package praefect_test +package praefect import ( "context" @@ -11,7 +11,7 @@ type simpleUnaryUnaryCallback func(context.Context, *mock.SimpleRequest) (*mock. // mockSvc is an implementation of mock.SimpleServer for testing purposes. The // gRPC stub can be updated via go generate: // -//go:generate protoc --go_out=plugins=grpc:. mock/mock.proto +//go:generate protoc --go_out=plugins=grpc:. -I../../proto -I./ mock/mock.proto //go:generate goimports -w mock/mock.pb.go type mockSvc struct { simpleUnaryUnary simpleUnaryUnaryCallback diff --git a/internal/praefect/models/node.go b/internal/praefect/models/node.go new file mode 100644 index 00000000000..db3224c4c99 --- /dev/null +++ b/internal/praefect/models/node.go @@ -0,0 +1,17 @@ +package models + +// StorageNode describes an address that serves a storage +type StorageNode struct { + ID int + Storage string `toml:"storage"` + Address string `toml:"address"` + Token string `toml:"token"` +} + +// Repository describes a repository's relative path and its primary and list of secondaries +type Repository struct { + ID int + RelativePath string + Primary StorageNode + Replicas []StorageNode +} diff --git a/internal/praefect/models/nodes.go b/internal/praefect/models/nodes.go deleted file mode 100644 index 854254d8773..00000000000 --- a/internal/praefect/models/nodes.go +++ /dev/null @@ -1,8 +0,0 @@ -package models - -// GitalyServer allows configuring the servers that RPCs are proxied to -type GitalyServer struct { - Name string `toml:"name"` - ListenAddr string `toml:"listen_addr" split_words:"true"` - Token string `toml:"token"` -} diff --git a/internal/praefect/models/repository.go b/internal/praefect/models/repository.go deleted file mode 100644 index e11cdbf0a79..00000000000 --- a/internal/praefect/models/repository.go +++ /dev/null @@ -1,8 +0,0 @@ -package models - -// Repository provides all necessary information to address a repository hosted -// in a specific Gitaly replica -type Repository struct { - RelativePath string `toml:"relative_path"` // relative path of repository - Storage string `toml:"storage"` // storage location, e.g. default -} diff --git a/internal/praefect/protoregistry/protoregistry.go b/internal/praefect/protoregistry/protoregistry.go index 811b56140cd..770f3ddd0fe 100644 --- a/internal/praefect/protoregistry/protoregistry.go +++ b/internal/praefect/protoregistry/protoregistry.go @@ -43,11 +43,24 @@ const ( OpMutator ) +// Scope represents the scope for an RPC method +type Scope int + +const ( + // ScopeRepository = repository scope + ScopeRepository = iota + // ScopeStorage = storage scope + ScopeStorage + // ScopeServer = serer scope + ScopeServer +) + // MethodInfo contains metadata about the RPC method. Refer to documentation // for message type "OperationMsg" shared.proto in gitlab-org/gitaly-proto for // more documentation. type MethodInfo struct { Operation OpType + Scope Scope targetRepo []int requestName string // protobuf message name for input type requestFactory protoFactory @@ -55,13 +68,6 @@ type MethodInfo struct { // TargetRepo returns the target repository for a protobuf message if it exists func (mi MethodInfo) TargetRepo(msg proto.Message) (*gitalypb.Repository, error) { - if mi.requestName != proto.MessageName(msg) { - return nil, fmt.Errorf( - "proto message %s does not match expected RPC request message %s", - proto.MessageName(msg), mi.requestName, - ) - } - return reflectFindRepoTarget(msg, mi.targetRepo) } @@ -179,6 +185,11 @@ func parseMethodInfo(methodDesc *descriptor.MethodDescriptorProto) (MethodInfo, return MethodInfo{}, err } + scope, err := parseScope(opMsg.GetScopeLevel()) + if err != nil { + return MethodInfo{}, err + } + // for some reason, the protobuf descriptor contains an extra dot in front // of the request name that the generated code does not. This trimming keeps // the two copies consistent for comparisons. @@ -194,9 +205,21 @@ func parseMethodInfo(methodDesc *descriptor.MethodDescriptorProto) (MethodInfo, targetRepo: targetRepo, requestName: requestName, requestFactory: reqFactory, + Scope: scope, }, nil } +func parseScope(scope gitalypb.OperationMsg_Scope) (Scope, error) { + switch scope { + case gitalypb.OperationMsg_REPOSITORY: + return ScopeRepository, nil + case gitalypb.OperationMsg_SERVER: + return ScopeServer, nil + } + + return ScopeRepository, errors.New("scope not found") +} + // parses a string like "1.1" and returns a slice of ints func parseOID(rawFieldOID string) ([]int, error) { var fieldNos []int diff --git a/internal/praefect/protoregistry/targetrepo_test.go b/internal/praefect/protoregistry/targetrepo_test.go index 8d6629524b4..286ebcf415e 100644 --- a/internal/praefect/protoregistry/targetrepo_test.go +++ b/internal/praefect/protoregistry/targetrepo_test.go @@ -56,7 +56,7 @@ func TestProtoRegistryTargetRepo(t *testing.T) { svc: "RepositoryService", method: "RepackIncremental", pbMsg: &gitalypb.RepackIncrementalResponse{}, - expectErr: errors.New("proto message gitaly.RepackIncrementalResponse does not match expected RPC request message gitaly.RepackIncrementalRequest"), + expectErr: errors.New("unable to descend OID [1] into message gitaly.RepackIncrementalResponse: unable to find protobuf field 1 in message RepackIncrementalResponse"), }, { desc: "target nested in oneOf", diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index c56f0488c69..d873fee462e 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -8,32 +8,33 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" "github.com/sirupsen/logrus" ) // Replicator performs the actual replication logic between two nodes type Replicator interface { - Replicate(ctx context.Context, source models.Repository, target Node) error + Replicate(ctx context.Context, source models.Repository, sourceStorage, targetStorage string, target *grpc.ClientConn) error } type defaultReplicator struct { log *logrus.Logger } -func (dr defaultReplicator) Replicate(ctx context.Context, source models.Repository, target Node) error { +func (dr defaultReplicator) Replicate(ctx context.Context, source models.Repository, sourceStorage, targetStorage string, target *grpc.ClientConn) error { repository := &gitalypb.Repository{ - StorageName: target.Storage, + StorageName: targetStorage, RelativePath: source.RelativePath, } remoteRepository := &gitalypb.Repository{ - StorageName: source.Storage, + StorageName: sourceStorage, RelativePath: source.RelativePath, } - repositoryClient := gitalypb.NewRepositoryServiceClient(target.cc) - remoteClient := gitalypb.NewRemoteServiceClient(target.cc) + repositoryClient := gitalypb.NewRepositoryServiceClient(target) + remoteClient := gitalypb.NewRemoteServiceClient(target) // CreateRepository is idempotent if _, err := repositoryClient.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{ @@ -60,7 +61,8 @@ func (dr defaultReplicator) Replicate(ctx context.Context, source models.Reposit // ReplMgr is a replication manager for handling replication jobs type ReplMgr struct { log *logrus.Logger - dataStore Datastore + replicasDS ReplicasDatastore + replJobsDS ReplJobsDatastore coordinator *Coordinator targetNode string // which replica is this replicator responsible for? replicator Replicator // does the actual replication logic @@ -74,10 +76,11 @@ type ReplMgrOpt func(*ReplMgr) // NewReplMgr initializes a replication manager with the provided dependencies // and options -func NewReplMgr(targetNode string, log *logrus.Logger, ds Datastore, c *Coordinator, opts ...ReplMgrOpt) ReplMgr { +func NewReplMgr(targetNode string, log *logrus.Logger, replicasDS ReplicasDatastore, jobsDS ReplJobsDatastore, c *Coordinator, opts ...ReplMgrOpt) ReplMgr { r := ReplMgr{ log: log, - dataStore: ds, + replicasDS: replicasDS, + replJobsDS: jobsDS, whitelist: map[string]struct{}{}, replicator: defaultReplicator{log}, targetNode: targetNode, @@ -118,7 +121,7 @@ func (r ReplMgr) ScheduleReplication(ctx context.Context, repo models.Repository return nil } - id, err := r.dataStore.CreateSecondaryReplJobs(repo) + id, err := r.replJobsDS.CreateReplicaReplJobs(repo.RelativePath) if err != nil { return err } @@ -140,58 +143,65 @@ const ( // ProcessBacklog will process queued jobs. It will block while processing jobs. func (r ReplMgr) ProcessBacklog(ctx context.Context) error { for { - jobs, err := r.dataStore.GetJobs(JobStatePending|JobStateReady, r.targetNode, 10) + nodes, err := r.replicasDS.GetStorageNodes() if err != nil { - return err + return nil } - if len(jobs) == 0 { - r.log.Tracef("no jobs for %s, checking again in %s", r.targetNode, jobFetchInterval) - - select { - // TODO: exponential backoff when no queries are returned - case <-time.After(jobFetchInterval): - continue - - case <-ctx.Done(): - return ctx.Err() - } - } - - r.log.Debugf("fetched replication jobs: %#v", jobs) - - for _, job := range jobs { - r.log.WithField(logWithReplJobID, job.ID). - Infof("processing replication job %#v", job) - node, err := r.coordinator.GetStorageNode(job.Target) - r.log.WithField(logWithReplJobID, job.ID).Infof("got storage node? %+v %v", node, err) + for _, node := range nodes { + jobs, err := r.replJobsDS.GetJobs(JobStatePending|JobStateReady, node.ID, 10) if err != nil { return err } - if err := r.dataStore.UpdateReplJob(job.ID, JobStateInProgress); err != nil { - return err - } + if len(jobs) == 0 { + r.log.Tracef("no jobs for %d, checking again in %s", node.ID, jobFetchInterval) - primary, err := r.dataStore.GetShardPrimary(job.Source) - if err != nil { - return err - } - - ctx, err = helper.InjectGitalyServers(ctx, job.Source.Storage, primary.ListenAddr, primary.Token) - if err != nil { - return err - } + select { + // TODO: exponential backoff when no queries are returned + case <-time.After(jobFetchInterval): + continue - if err := r.replicator.Replicate(ctx, job.Source, node); err != nil { - r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating") - return err + case <-ctx.Done(): + return ctx.Err() + } } - r.log.WithField(logWithReplJobID, job.ID). - Info("completed replication") - if err := r.dataStore.UpdateReplJob(job.ID, JobStateComplete); err != nil { - return err + for _, job := range jobs { + r.log.WithField(logWithReplJobID, job.ID). + Infof("processing replication job %#v", job) + node, err := r.replicasDS.GetStorageNode(job.TargetNodeID) + if err != nil { + return err + } + + repository, err := r.replicasDS.GetRepository(job.Source.RelativePath) + if err != nil { + return err + } + + if err := r.replJobsDS.UpdateReplJob(job.ID, JobStateInProgress); err != nil { + return err + } + + ctx, err = helper.InjectGitalyServers(ctx, job.SourceStorage, repository.Primary.Address, "") + if err != nil { + return err + } + + cc, err := r.coordinator.GetConnection(node.Storage) + if err != nil { + return err + } + + if err := r.replicator.Replicate(ctx, job.Source, job.SourceStorage, node.Storage, cc); err != nil { + r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating") + return err + } + + if err := r.replJobsDS.UpdateReplJob(job.ID, JobStateComplete); err != nil { + return err + } } } } diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 1294bc9890f..b3a4615390d 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -28,43 +28,33 @@ import ( // TestReplicatorProcessJobs verifies that a replicator will schedule jobs for // all whitelisted repos func TestReplicatorProcessJobsWhitelist(t *testing.T) { - var ( - cfg = config.Config{ - PrimaryServer: &models.GitalyServer{ - Name: "default", - ListenAddr: "tcp://gitaly-primary.example.com", - }, - SecondaryServers: []*models.GitalyServer{ - { - Name: "backup1", - ListenAddr: "tcp://gitaly-backup1.example.com", - }, - { - Name: "backup2", - ListenAddr: "tcp://gitaly-backup2.example.com", - }, - }, - Whitelist: []string{"abcd1234", "edfg5678"}, - } - datastore = NewMemoryDatastore(cfg) - coordinator = NewCoordinator(logrus.New(), datastore) - resultsCh = make(chan result) - replman = NewReplMgr( - cfg.SecondaryServers[1].Name, - logrus.New(), - datastore, - coordinator, - WithWhitelist(cfg.Whitelist), - WithReplicator(&mockReplicator{resultsCh}), - ) + datastore := NewMemoryDatastore(config.Config{ + StorageNodes: []*models.StorageNode{ + &models.StorageNode{ + ID: 1, + Address: "tcp://gitaly-primary.example.com", + Storage: "praefect-internal-1", + }, &models.StorageNode{ + ID: 2, + Address: "tcp://gitaly-backup1.example.com", + Storage: "praefect-internal-2", + }}, + Whitelist: []string{"abcd1234", "edfg5678"}, + }) + + coordinator := NewCoordinator(logrus.New(), datastore) + resultsCh := make(chan result) + replman := NewReplMgr( + "default", + logrus.New(), + datastore, + datastore, + coordinator, + WithReplicator(&mockReplicator{resultsCh}), ) - for _, node := range []*models.GitalyServer{ - cfg.PrimaryServer, - cfg.SecondaryServers[0], - cfg.SecondaryServers[1], - } { - err := coordinator.RegisterNode(node.Name, node.ListenAddr) + for _, node := range datastore.storageNodes.m { + err := coordinator.RegisterNode(node.Storage, node.Address) require.NoError(t, err) } @@ -78,14 +68,27 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) { success := make(chan struct{}) + var expectedResults []result + // we expect one job per whitelisted repo with each backend server + for _, shard := range datastore.repositories.m { + for _, secondary := range shard.Replicas { + cc, err := coordinator.GetConnection(secondary.Storage) + require.NoError(t, err) + expectedResults = append(expectedResults, + result{source: models.Repository{RelativePath: shard.RelativePath}, + targetStorage: secondary.Storage, + targetCC: cc, + }) + } + } + go func() { // we expect one job per whitelisted repo with each backend server - for i := 0; i < len(cfg.Whitelist); i++ { - result := <-resultsCh - - assert.Contains(t, cfg.Whitelist, result.source.RelativePath) - assert.Equal(t, cfg.SecondaryServers[1].Name, result.target.Storage) - assert.Equal(t, cfg.PrimaryServer.Name, result.source.Storage) + for _, shard := range datastore.repositories.m { + for range shard.Replicas { + result := <-resultsCh + assert.Contains(t, expectedResults, result) + } } cancel() @@ -106,18 +109,19 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) { } type result struct { - source models.Repository - target Node + source models.Repository + targetStorage string + targetCC *grpc.ClientConn } type mockReplicator struct { resultsCh chan<- result } -func (mr *mockReplicator) Replicate(ctx context.Context, source models.Repository, target Node) error { +func (mr *mockReplicator) Replicate(ctx context.Context, source models.Repository, sourceStorage, targetStorage string, target *grpc.ClientConn) error { select { - case mr.resultsCh <- result{source, target}: + case mr.resultsCh <- result{source, targetStorage, target}: return nil case <-ctx.Done(): @@ -178,11 +182,11 @@ func TestReplicate(t *testing.T) { var replicator defaultReplicator require.NoError(t, replicator.Replicate( ctx, - models.Repository{Storage: "default", RelativePath: testRepo.GetRelativePath()}, - Node{ - cc: conn, - Storage: backupStorageName, - })) + models.Repository{RelativePath: testRepo.GetRelativePath()}, + "default", + backupStorageName, + conn, + )) replicatedPath := filepath.Join(backupDir, filepath.Base(testRepoPath)) testhelper.MustRunCommand(t, nil, "git", "-C", replicatedPath, "cat-file", "-e", commitID) diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 915d7281ad7..5952a66ce9d 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -1,4 +1,4 @@ -package praefect_test +package praefect import ( "context" @@ -7,14 +7,15 @@ import ( "testing" "time" + "github.com/golang/protobuf/proto" "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/client" - "gitlab.com/gitlab-org/gitaly/internal/praefect" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/internal/praefect/mock" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "google.golang.org/grpc" ) @@ -44,26 +45,49 @@ func TestServerSimpleUnaryUnary(t *testing.T) { }, } + gz := proto.FileDescriptor("mock/mock.proto") + fd, err := protoregistry.ExtractFileDescriptor(gz) + if err != nil { + panic(err) + } + for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { const ( storagePrimary = "default" - storageBackup = "backup" ) - datastore := praefect.NewMemoryDatastore(config.Config{ - PrimaryServer: &models.GitalyServer{ - Name: "default", - }, + datastore := NewMemoryDatastore(config.Config{ + StorageNodes: []*models.StorageNode{ + &models.StorageNode{ + ID: 1, + Storage: "praefect-internal-1", + }, + &models.StorageNode{ + ID: 2, + Storage: "praefect-internal-2", + }}, }) - coordinator := praefect.NewCoordinator(logrus.New(), datastore) - replmgr := praefect.NewReplMgr( + + coordinator := NewCoordinator(logrus.New(), datastore, fd) + + for id, nodeStorage := range datastore.storageNodes.m { + backend, cleanup := newMockDownstream(t, tt.callback) + defer cleanup() // clean up mock downstream server resources + + coordinator.RegisterNode(nodeStorage.Storage, backend) + nodeStorage.Address = backend + datastore.storageNodes.m[id] = nodeStorage + } + + replmgr := NewReplMgr( storagePrimary, logrus.New(), datastore, + datastore, coordinator, ) - prf := praefect.NewServer( + prf := NewServer( coordinator, replmgr, nil, @@ -85,13 +109,6 @@ func TestServerSimpleUnaryUnary(t *testing.T) { defer cc.Close() cli := mock.NewSimpleServiceClient(cc) - for _, replica := range []string{storagePrimary, storageBackup} { - backend, cleanup := newMockDownstream(t, tt.callback) - defer cleanup() // clean up mock downstream server resources - - coordinator.RegisterNode(replica, backend) - } - ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() -- GitLab From ed1e85f31caa3456458ad1cc2ca90e8de9f6f51d Mon Sep 17 00:00:00 2001 From: John Cai Date: Thu, 1 Aug 2019 18:57:38 -0700 Subject: [PATCH 2/3] Enable real time replication --- cmd/praefect/main.go | 2 +- internal/praefect/coordinator.go | 52 ++++++++++++------- internal/praefect/datastore.go | 2 + .../praefect/grpc-proxy/proxy/director.go | 2 +- .../grpc-proxy/proxy/examples_test.go | 11 ++-- internal/praefect/grpc-proxy/proxy/handler.go | 3 +- .../praefect/grpc-proxy/proxy/handler_test.go | 7 +-- .../praefect/grpc-proxy/proxy/peeker_test.go | 12 +++-- internal/praefect/replicator_test.go | 2 +- internal/praefect/server_test.go | 2 +- 10 files changed, 58 insertions(+), 37 deletions(-) diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 8ce68530d37..06b3a1477a5 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -96,7 +96,7 @@ func run(listeners []net.Listener, conf config.Config) error { var ( // top level server dependencies datastore = praefect.NewMemoryDatastore(conf) - coordinator = praefect.NewCoordinator(logger, datastore, protoregistry.GitalyProtoFileDescriptors...) + coordinator = praefect.NewCoordinator(logger, datastore, datastore, protoregistry.GitalyProtoFileDescriptors...) repl = praefect.NewReplMgr("default", logger, datastore, datastore, coordinator, praefect.WithWhitelist(conf.Whitelist)) srv = praefect.NewServer(coordinator, repl, nil, logger) // signal related diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index f00a469b68f..6a00ef425b5 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -4,12 +4,10 @@ import ( "context" "errors" "fmt" - "math/rand" "os" "os/signal" "sync" "syscall" - "time" gitalyauth "gitlab.com/gitlab-org/gitaly/auth" gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config" @@ -33,13 +31,14 @@ type Coordinator struct { connMutex sync.RWMutex datastore ReplicasDatastore + jobs ReplJobsDatastore nodes map[string]*grpc.ClientConn registry *protoregistry.Registry } // NewCoordinator returns a new Coordinator that utilizes the provided logger -func NewCoordinator(l *logrus.Logger, datastore ReplicasDatastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { +func NewCoordinator(l *logrus.Logger, datastore ReplicasDatastore, jobsDatastore ReplJobsDatastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { registry := protoregistry.New() registry.RegisterFiles(fileDescriptors...) @@ -48,6 +47,7 @@ func NewCoordinator(l *logrus.Logger, datastore ReplicasDatastore, fileDescripto datastore: datastore, nodes: make(map[string]*grpc.ClientConn), registry: registry, + jobs: jobsDatastore, } } @@ -57,7 +57,7 @@ func (c *Coordinator) RegisterProtos(protos ...*descriptor.FileDescriptorProto) } // streamDirector determines which downstream servers receive requests -func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) { +func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) { // For phase 1, we need to route messages based on the storage location // to the appropriate Gitaly node. c.log.Debugf("Stream director received method %s", fullMethodName) @@ -67,12 +67,14 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, frame, err := peeker.Peek() if err != nil { - return nil, nil, err + return nil, nil, nil, err } + jobUpdateFunc := func() {} + mi, err := c.registry.LookupMethod(fullMethodName) if err != nil { - return nil, nil, err + return nil, nil, nil, err } var primary *models.StorageNode @@ -80,36 +82,35 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, if mi.Scope == protoregistry.ScopeRepository { m, err := mi.UnmarshalRequestProto(frame) if err != nil { - return nil, nil, err + return nil, nil, nil, err } targetRepo, err := mi.TargetRepo(m) if err != nil { - return nil, nil, err + return nil, nil, nil, err } primary, err = c.datastore.GetPrimary(targetRepo.GetRelativePath()) if err != nil { if err != ErrPrimaryNotSet { - return nil, nil, err + return nil, nil, nil, err } // if there are no primaries for this repository, pick one nodes, err := c.datastore.GetStorageNodes() if err != nil { - return nil, nil, err + return nil, nil, nil, err } if len(nodes) == 0 { - return nil, nil, fmt.Errorf("no nodes serve storage %s", targetRepo.GetStorageName()) + return nil, nil, nil, fmt.Errorf("no nodes serve storage %s", targetRepo.GetStorageName()) } - newPrimary := nodes[rand.New(rand.NewSource(time.Now().Unix())).Intn(len(nodes))] - //newPrimary := nodes[0] + newPrimary := nodes[0] // set the primary if err = c.datastore.SetPrimary(targetRepo.GetRelativePath(), newPrimary.ID); err != nil { - return nil, nil, err + return nil, nil, nil, err } primary = &newPrimary @@ -119,21 +120,32 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, b, err := proxy.Codec().Marshal(m) if err != nil { - return nil, nil, err + return nil, nil, nil, err } if err = peeker.Modify(b); err != nil { - return nil, nil, err + return nil, nil, nil, err } + if mi.Operation == protoregistry.OpMutator { + jobIDs, err := c.jobs.CreateReplicaReplJobs(targetRepo.RelativePath) + if err != nil { + return nil, nil, nil, err + } + jobUpdateFunc = func() { + for _, jobID := range jobIDs { + c.jobs.UpdateReplJob(jobID, JobStateReady) + } + } + } } else { //TODO: For now we just pick a random storage node for a non repository scoped RPC, but we will need to figure out exactly how to // proxy requests that are not repository scoped node, err := c.datastore.GetStorageNodes() if err != nil { - return nil, nil, err + return nil, nil, nil, err } if len(node) == 0 { - return nil, nil, errors.New("no node storages found") + return nil, nil, nil, errors.New("no node storages found") } primary = &node[0] } @@ -142,10 +154,10 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, // location per praefect at this time cc, err := c.GetConnection(primary.Storage) if err != nil { - return nil, nil, fmt.Errorf("unable to find existing client connection for %s", primary.Storage) + return nil, nil, nil, fmt.Errorf("unable to find existing client connection for %s", primary.Storage) } - return helper.IncomingToOutgoing(ctx), cc, nil + return helper.IncomingToOutgoing(ctx), cc, jobUpdateFunc, nil } // RegisterNode will direct traffic to the supplied downstream connection when the storage location diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go index 85fa22a2325..69f077b3539 100644 --- a/internal/praefect/datastore.go +++ b/internal/praefect/datastore.go @@ -229,6 +229,8 @@ func (md *MemoryDatastore) GetStorageNodes() ([]models.StorageNode, error) { storageNodes = append(storageNodes, storageNode) } + sort.Slice(storageNodes, func(i, j int) bool { return storageNodes[i].ID < storageNodes[j].ID }) + return storageNodes, nil } diff --git a/internal/praefect/grpc-proxy/proxy/director.go b/internal/praefect/grpc-proxy/proxy/director.go index 10a63b22890..82712765f65 100644 --- a/internal/praefect/grpc-proxy/proxy/director.go +++ b/internal/praefect/grpc-proxy/proxy/director.go @@ -21,4 +21,4 @@ import ( // are invoked. So decisions around authorization, monitoring etc. are better to be handled there. // // See the rather rich example. -type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamModifier) (context.Context, *grpc.ClientConn, error) +type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamModifier) (context.Context, *grpc.ClientConn, func(), error) diff --git a/internal/praefect/grpc-proxy/proxy/examples_test.go b/internal/praefect/grpc-proxy/proxy/examples_test.go index 2c20903636a..1eca8cf39a1 100644 --- a/internal/praefect/grpc-proxy/proxy/examples_test.go +++ b/internal/praefect/grpc-proxy/proxy/examples_test.go @@ -39,10 +39,11 @@ func ExampleTransparentHandler() { // Provide sa simple example of a director that shields internal services and dials a staging or production backend. // This is a *very naive* implementation that creates a new connection on every request. Consider using pooling. func ExampleStreamDirector() { - director = func(ctx context.Context, fullMethodName string, _ proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) { + director = func(ctx context.Context, fullMethodName string, _ proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) { + f := func() {} // Make sure we never forward internal services. if strings.HasPrefix(fullMethodName, "/com.example.internal.") { - return nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method") + return nil, nil, f, grpc.Errorf(codes.Unimplemented, "Unknown method") } md, ok := metadata.FromIncomingContext(ctx) // Copy the inbound metadata explicitly. @@ -53,12 +54,12 @@ func ExampleStreamDirector() { if val, exists := md[":authority"]; exists && val[0] == "staging.api.example.com" { // Make sure we use DialContext so the dialing can be cancelled/time out together with the context. conn, err := grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithCodec(proxy.Codec())) - return outCtx, conn, err + return outCtx, conn, f, err } else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" { conn, err := grpc.DialContext(ctx, "api-service.prod.svc.local", grpc.WithCodec(proxy.Codec())) - return outCtx, conn, err + return outCtx, conn, f, err } } - return nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method") + return nil, nil, f, grpc.Errorf(codes.Unimplemented, "Unknown method") } } diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go index daf12d4b183..e527a4a5c97 100644 --- a/internal/praefect/grpc-proxy/proxy/handler.go +++ b/internal/praefect/grpc-proxy/proxy/handler.go @@ -71,7 +71,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error peeker := newPeeker(serverStream) // We require that the director's returned context inherits from the serverStream.Context(). - outgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName, peeker) + outgoingCtx, backendConn, updateJobFunc, err := s.director(serverStream.Context(), fullMethodName, peeker) if err != nil { return err } @@ -111,6 +111,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error if c2sErr != io.EOF { return c2sErr } + updateJobFunc() return nil } } diff --git a/internal/praefect/grpc-proxy/proxy/handler_test.go b/internal/praefect/grpc-proxy/proxy/handler_test.go index 0fff36ed4e7..345ba917746 100644 --- a/internal/praefect/grpc-proxy/proxy/handler_test.go +++ b/internal/praefect/grpc-proxy/proxy/handler_test.go @@ -207,17 +207,18 @@ func (s *ProxyHappySuite) SetupSuite() { // Setup of the proxy's Director. s.serverClientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithCodec(proxy.Codec())) require.NoError(s.T(), err, "must not error on deferred client Dial") - director := func(ctx context.Context, fullName string, _ proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) { + director := func(ctx context.Context, fullName string, _ proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) { + f := func() {} md, ok := metadata.FromIncomingContext(ctx) if ok { if _, exists := md[rejectingMdKey]; exists { - return ctx, nil, grpc.Errorf(codes.PermissionDenied, "testing rejection") + return ctx, nil, f, grpc.Errorf(codes.PermissionDenied, "testing rejection") } } // Explicitly copy the metadata, otherwise the tests will fail. outCtx, _ := context.WithCancel(ctx) outCtx = metadata.NewOutgoingContext(outCtx, md.Copy()) - return outCtx, s.serverClientConn, nil + return outCtx, s.serverClientConn, f, nil } s.proxy = grpc.NewServer( grpc.CustomCodec(proxy.Codec()), diff --git a/internal/praefect/grpc-proxy/proxy/peeker_test.go b/internal/praefect/grpc-proxy/proxy/peeker_test.go index e274f31e209..86463892c2b 100644 --- a/internal/praefect/grpc-proxy/proxy/peeker_test.go +++ b/internal/praefect/grpc-proxy/proxy/peeker_test.go @@ -28,9 +28,11 @@ func TestStreamPeeking(t *testing.T) { pingReqSent := &testservice.PingRequest{Value: "hi"} // director will peek into stream before routing traffic - director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) { + director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) { t.Logf("director routing method %s to backend", fullMethodName) + f := func() {} + peekedMsg, err := peeker.Peek() require.NoError(t, err) @@ -39,7 +41,7 @@ func TestStreamPeeking(t *testing.T) { require.NoError(t, err) require.Equal(t, pingReqSent, peekedRequest) - return ctx, backendCC, nil + return ctx, backendCC, f, nil } pingResp := &testservice.PingResponse{ @@ -87,9 +89,11 @@ func TestStreamInjecting(t *testing.T) { newValue := "bye" // director will peek into stream and change some frames - director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) { + director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) { t.Logf("modifying request for method %s", fullMethodName) + f := func() {} + peekedMsg, err := peeker.Peek() require.NoError(t, err) @@ -104,7 +108,7 @@ func TestStreamInjecting(t *testing.T) { require.NoError(t, peeker.Modify(newPayload)) - return ctx, backendCC, nil + return ctx, backendCC, f, nil } pingResp := &testservice.PingResponse{ diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index b3a4615390d..70154329c44 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -42,7 +42,7 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) { Whitelist: []string{"abcd1234", "edfg5678"}, }) - coordinator := NewCoordinator(logrus.New(), datastore) + coordinator := NewCoordinator(logrus.New(), datastore, datastore) resultsCh := make(chan result) replman := NewReplMgr( "default", diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 5952a66ce9d..07d090f1b4e 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -69,7 +69,7 @@ func TestServerSimpleUnaryUnary(t *testing.T) { }}, }) - coordinator := NewCoordinator(logrus.New(), datastore, fd) + coordinator := NewCoordinator(logrus.New(), datastore, datastore, fd) for id, nodeStorage := range datastore.storageNodes.m { backend, cleanup := newMockDownstream(t, tt.callback) -- GitLab From 38142319c6eeac0e60f5cfe016cefd6ab9adb595 Mon Sep 17 00:00:00 2001 From: John Cai Date: Fri, 2 Aug 2019 08:58:13 -0700 Subject: [PATCH 3/3] Remove praefect docker --- .gitignore | 1 - _support/praefect-cluster/.gitignore | 3 -- _support/praefect-cluster/README.md | 8 --- _support/praefect-cluster/config.gitaly.toml | 49 ----------------- .../praefect-cluster/config.praefect.toml | 45 ---------------- _support/praefect-cluster/docker-compose.yml | 52 ------------------- 6 files changed, 158 deletions(-) delete mode 100644 _support/praefect-cluster/.gitignore delete mode 100644 _support/praefect-cluster/README.md delete mode 100644 _support/praefect-cluster/config.gitaly.toml delete mode 100644 _support/praefect-cluster/config.praefect.toml delete mode 100644 _support/praefect-cluster/docker-compose.yml diff --git a/.gitignore b/.gitignore index 9fb2c7e44a5..b7fbb0b4cb7 100644 --- a/.gitignore +++ b/.gitignore @@ -21,7 +21,6 @@ cmd/gitaly-remote/gitaly-remote git-env /gitaly-debug /praefect -/praefect-migrate gitaly.pid /vendor/github.com/libgit2/git2go/vendor /vendor diff --git a/_support/praefect-cluster/.gitignore b/_support/praefect-cluster/.gitignore deleted file mode 100644 index bd035c2b756..00000000000 --- a/_support/praefect-cluster/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -/gitaly-1 -/gitaly-2 -/gitaly-3 diff --git a/_support/praefect-cluster/README.md b/_support/praefect-cluster/README.md deleted file mode 100644 index cab4614b61d..00000000000 --- a/_support/praefect-cluster/README.md +++ /dev/null @@ -1,8 +0,0 @@ -# Test cluster with Praefect and multiple Gitaly servers - -This directory contains a -[docker-compose.yml](https://docs.docker.com/compose/) that has Praefect and 3 Gitaly servers -behind it. This setup is meant for testing purposes only and SHOULD NOT be used -in production environments. - -Boot the cluster with `docker-compose up`. After some time you can connect to praefect on port 2305 diff --git a/_support/praefect-cluster/config.gitaly.toml b/_support/praefect-cluster/config.gitaly.toml deleted file mode 100644 index 2379b6951b2..00000000000 --- a/_support/praefect-cluster/config.gitaly.toml +++ /dev/null @@ -1,49 +0,0 @@ -# Example Gitaly configuration file - -# The directory where Gitaly's executables are stored -bin_dir = "/usr/local/bin" - -# listen on a TCP socket. This is insecure (no authentication) -listen_addr = "0.0.0.0:9999" - -# # Optional: export metrics via Prometheus -# prometheus_listen_addr = "localhost:9236" -# - -# # Git executable settings -# [git] -# bin_path = "/usr/bin/git" - -[[storage]] -name = "default" -path = "/home/git/repositories" - -# # You can optionally configure more storages for this Gitaly instance to serve up -# -# [[storage]] -# name = "other_storage" -# path = "/mnt/other_storage/repositories" -# - -# # You can optionally configure Gitaly to output JSON-formatted log messages to stdout -# [logging] -# format = "json" -# # Additionally exceptions can be reported to Sentry -# sentry_dsn = "https://:@sentry.io/" - -# # You can optionally configure Gitaly to record histogram latencies on GRPC method calls -# [prometheus] -# grpc_latency_buckets = [0.001, 0.005, 0.025, 0.1, 0.5, 1.0, 10.0, 30.0, 60.0, 300.0, 1500.0] - -[gitaly-ruby] -# The directory where gitaly-ruby is installed -dir = "/srv/gitaly-ruby" - -[gitlab-shell] -# The directory where gitlab-shell is installed -dir = "/srv/gitlab-shell" - -# # You can adjust the concurrency of each RPC endpoint -# [[concurrency]] -# rpc = "/gitaly.RepositoryService/GarbageCollect" -# max_per_repo = 1 diff --git a/_support/praefect-cluster/config.praefect.toml b/_support/praefect-cluster/config.praefect.toml deleted file mode 100644 index 2a4297248b1..00000000000 --- a/_support/praefect-cluster/config.praefect.toml +++ /dev/null @@ -1,45 +0,0 @@ -# Example Praefect configuration file - -# # TCP address to listen on -listen_addr = ":2305" - -# # Praefect can listen on a socket when placed on the same machine as all clients - #socket_path = "/etc/gitaly/praefect/socket" -# # Praefect will only replicate whitelisted repositories -whitelist = ["@hashed/2c/62/2c624232cdd221771294dfbb310aca000a0df6ac8b66b696d90ef06fdefb64a3.git"] -# # Optional: export metrics via Prometheus -# prometheus_listen_addr = "127.0.01:10101" - -# # You can optionally configure Praefect to output JSON-formatted log messages to stdout -[logging] -# format = "json" - level = "info" -# # Optional: Set log level to only log entries with that severity or above -# # One of, in order: debug, info, warn, errror, fatal, panic -# # Defaults to "info" -# level = "warn" - -# # One or more Gitaly servers need to be configured to be managed. The names -# of each server are used to link multiple nodes, or `gitaly_server`s together -# as shard. listen_addr should be unique for all nodes. -# Requires the protocol to be defined, e.g. tcp://host.tld:1234 - -[[storage_node]] -# listen_addr = "tcp://gitaly-primary:9999" - storage = "praefect-internal-1" - address = "tcp://127.0.0.1:9999" - -[[storage_node]] -# listen_addr = "tcp://gitaly-backup-1:9999" - storage = "praefect-internal-2" - address = "tcp://127.0.0.1:9998" - -[[storage_node]] -# listen_addr = "tcp://gitaly-backup-2:9999" - storage = "praefect-internal-3" - address = "tcp://127.0.0.1:9997" - -[postgres] - user = "johncai" - address = "127.0.0.1:5432" - database = "praefect_test" \ No newline at end of file diff --git a/_support/praefect-cluster/docker-compose.yml b/_support/praefect-cluster/docker-compose.yml deleted file mode 100644 index 09745ea410f..00000000000 --- a/_support/praefect-cluster/docker-compose.yml +++ /dev/null @@ -1,52 +0,0 @@ -version: "3.5" -services: -# praefect: -# build: -# context: ../../ -# dockerfile: Dockerfile.praefect -# image: praefect:latest -# depends_on: -# - gitaly-1 -# - gitaly-2 -# - gitaly-3 -# command: ["/etc/gitaly/praefect", "-config", "/etc/gitaly/config.praefect.toml"] -# ports: -# - "2305:2305" -# volumes: -# - ./config.praefect.toml:/etc/gitaly/config.praefect.toml - gitaly-1: - image: registry.gitlab.com/gitlab-org/build/cng/gitaly:latest - environment: - - GITALY_TESTING_NO_GIT_HOOKS=1 - expose: - - "9999" - ports: - - "9999:9999" - command: ["/usr/local/bin/gitaly", "/etc/config/config.toml"] - volumes: - - ./gitaly-1/data:/home/git/repositories - - ./config.gitaly.toml:/etc/config/config.toml - gitaly-2: - image: registry.gitlab.com/gitlab-org/build/cng/gitaly:latest - environment: - - GITALY_TESTING_NO_GIT_HOOKS=1 - expose: - - "9999" - ports: - - "9998:9999" - command: ["/usr/local/bin/gitaly", "/etc/config/config.toml"] - volumes: - - ./gitaly-2/data:/home/git/repositories - - ./config.gitaly.toml:/etc/config/config.toml - gitaly-3: - image: registry.gitlab.com/gitlab-org/build/cng/gitaly:latest - environment: - - GITALY_TESTING_NO_GIT_HOOKS=1 - expose: - - "9999" - ports: - - "9997:9999" - command: ["/usr/local/bin/gitaly", "/etc/config/config.toml"] - volumes: - - ./gitaly-3/data:/home/git/repositories - - ./config.gitaly.toml:/etc/config/config.toml \ No newline at end of file -- GitLab