diff --git a/.gitignore b/.gitignore index b7fbb0b4cb7ff119381c6d969ed2919488ee3867..9fb2c7e44a533f93968beb481a7976d0cbc551c6 100644 --- a/.gitignore +++ b/.gitignore @@ -21,6 +21,7 @@ cmd/gitaly-remote/gitaly-remote git-env /gitaly-debug /praefect +/praefect-migrate gitaly.pid /vendor/github.com/libgit2/git2go/vendor /vendor diff --git a/NOTICE b/NOTICE index 5a34cfece57d3a9e34a3050b8084b3df9337184a..9c6f4d5406d07e6ae133859651e9d501fd417576 100644 --- a/NOTICE +++ b/NOTICE @@ -904,6 +904,16 @@ The above copyright notice and this permission notice shall be included in all c THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +LICENSE.md - gitlab.com/gitlab-org/gitaly/vendor/github.com/lib/pq +Copyright (c) 2011-2013, 'pq' Contributors +Portions Copyright (C) 2011 Blake Mizerany + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ LICENSE - gitlab.com/gitlab-org/gitaly/vendor/github.com/libgit2/git2go The MIT License diff --git a/_support/praefect-cluster/.gitignore b/_support/praefect-cluster/.gitignore index 06b873206ee11141ad5ed102b9a4e5e1cd58587f..bd035c2b756830dd9ba935df982fbe2c331ef992 100644 --- a/_support/praefect-cluster/.gitignore +++ b/_support/praefect-cluster/.gitignore @@ -1,3 +1,3 @@ -/gitaly-backup-1 -/gitaly-backup-2 -/gitaly-primary +/gitaly-1 +/gitaly-2 +/gitaly-3 diff --git a/_support/praefect-cluster/gitaly-primary.toml b/_support/praefect-cluster/config.gitaly.toml similarity index 100% rename from _support/praefect-cluster/gitaly-primary.toml rename to _support/praefect-cluster/config.gitaly.toml diff --git a/_support/praefect-cluster/config.praefect.toml b/_support/praefect-cluster/config.praefect.toml index e0f163178b39cb14a436c7ccb63be2ff6c424b1e..2a4297248b1ce6a43e23bd93823f9d90dfa0eac0 100644 --- a/_support/praefect-cluster/config.praefect.toml +++ b/_support/praefect-cluster/config.praefect.toml @@ -24,17 +24,22 @@ whitelist = ["@hashed/2c/62/2c624232cdd221771294dfbb310aca000a0df6ac8b66b696d90e # as shard. listen_addr should be unique for all nodes. # Requires the protocol to be defined, e.g. tcp://host.tld:1234 -[primary_server] - name = "default" +[[storage_node]] # listen_addr = "tcp://gitaly-primary:9999" - listen_addr = "tcp://127.0.0.1:9999" + storage = "praefect-internal-1" + address = "tcp://127.0.0.1:9999" -[[secondary_server]] - name = "backup1" +[[storage_node]] # listen_addr = "tcp://gitaly-backup-1:9999" - listen_addr = "tcp://127.0.0.1:9998" + storage = "praefect-internal-2" + address = "tcp://127.0.0.1:9998" -[[secondary_server]] - name = "backup2" +[[storage_node]] # listen_addr = "tcp://gitaly-backup-2:9999" - listen_addr = "tcp://127.0.0.1:9997" \ No newline at end of file + storage = "praefect-internal-3" + address = "tcp://127.0.0.1:9997" + +[postgres] + user = "johncai" + address = "127.0.0.1:5432" + database = "praefect_test" \ No newline at end of file diff --git a/_support/praefect-cluster/docker-compose.yml b/_support/praefect-cluster/docker-compose.yml index 6eb81be47e33aeb4755e651e27d34089b0abd0ae..09745ea410fa7bf7717ce967be87b57b5f1ebe9e 100644 --- a/_support/praefect-cluster/docker-compose.yml +++ b/_support/praefect-cluster/docker-compose.yml @@ -6,15 +6,15 @@ services: # dockerfile: Dockerfile.praefect # image: praefect:latest # depends_on: -# - gitaly-primary -# - gitaly-backup-1 -# - gitaly-backup-2 +# - gitaly-1 +# - gitaly-2 +# - gitaly-3 # command: ["/etc/gitaly/praefect", "-config", "/etc/gitaly/config.praefect.toml"] # ports: # - "2305:2305" # volumes: # - ./config.praefect.toml:/etc/gitaly/config.praefect.toml - gitaly-primary: + gitaly-1: image: registry.gitlab.com/gitlab-org/build/cng/gitaly:latest environment: - GITALY_TESTING_NO_GIT_HOOKS=1 @@ -24,9 +24,9 @@ services: - "9999:9999" command: ["/usr/local/bin/gitaly", "/etc/config/config.toml"] volumes: - - ./gitaly-primary/data:/home/git/repositories - - ./gitaly-primary.toml:/etc/config/config.toml - gitaly-backup-1: + - ./gitaly-1/data:/home/git/repositories + - ./config.gitaly.toml:/etc/config/config.toml + gitaly-2: image: registry.gitlab.com/gitlab-org/build/cng/gitaly:latest environment: - GITALY_TESTING_NO_GIT_HOOKS=1 @@ -36,9 +36,9 @@ services: - "9998:9999" command: ["/usr/local/bin/gitaly", "/etc/config/config.toml"] volumes: - - ./gitaly-backup-1/data:/home/git/repositories - - ./gitaly-backup-1.toml:/etc/config/config.toml - gitaly-backup-2: + - ./gitaly-2/data:/home/git/repositories + - ./config.gitaly.toml:/etc/config/config.toml + gitaly-3: image: registry.gitlab.com/gitlab-org/build/cng/gitaly:latest environment: - GITALY_TESTING_NO_GIT_HOOKS=1 @@ -48,5 +48,5 @@ services: - "9997:9999" command: ["/usr/local/bin/gitaly", "/etc/config/config.toml"] volumes: - - ./gitaly-backup-2/data:/home/git/repositories - - ./gitaly-backup-2.toml:/etc/config/config.toml \ No newline at end of file + - ./gitaly-3/data:/home/git/repositories + - ./config.gitaly.toml:/etc/config/config.toml \ No newline at end of file diff --git a/_support/praefect-cluster/gitaly-backup-1.toml b/_support/praefect-cluster/gitaly-backup-1.toml deleted file mode 100644 index 89d1884e362a659acb8e2b7667fd3326b4f4644a..0000000000000000000000000000000000000000 --- a/_support/praefect-cluster/gitaly-backup-1.toml +++ /dev/null @@ -1,49 +0,0 @@ -# Example Gitaly configuration file - -# The directory where Gitaly's executables are stored -bin_dir = "/usr/local/bin" - -# listen on a TCP socket. This is insecure (no authentication) -listen_addr = "0.0.0.0:9999" - -# # Optional: export metrics via Prometheus -# prometheus_listen_addr = "localhost:9236" -# - -# # Git executable settings -# [git] -# bin_path = "/usr/bin/git" - -[[storage]] -name = "backup1" -path = "/home/git/repositories" - -# # You can optionally configure more storages for this Gitaly instance to serve up -# -# [[storage]] -# name = "other_storage" -# path = "/mnt/other_storage/repositories" -# - -# # You can optionally configure Gitaly to output JSON-formatted log messages to stdout -# [logging] -# format = "json" -# # Additionally exceptions can be reported to Sentry -# sentry_dsn = "https://:@sentry.io/" - -# # You can optionally configure Gitaly to record histogram latencies on GRPC method calls -# [prometheus] -# grpc_latency_buckets = [0.001, 0.005, 0.025, 0.1, 0.5, 1.0, 10.0, 30.0, 60.0, 300.0, 1500.0] - -[gitaly-ruby] -# The directory where gitaly-ruby is installed -dir = "/srv/gitaly-ruby" - -[gitlab-shell] -# The directory where gitlab-shell is installed -dir = "/srv/gitlab-shell" - -# # You can adjust the concurrency of each RPC endpoint -# [[concurrency]] -# rpc = "/gitaly.RepositoryService/GarbageCollect" -# max_per_repo = 1 diff --git a/_support/praefect-cluster/gitaly-backup-2.toml b/_support/praefect-cluster/gitaly-backup-2.toml deleted file mode 100644 index 1b5ce8d209da656366a57e97d7fe33183aeb9ed1..0000000000000000000000000000000000000000 --- a/_support/praefect-cluster/gitaly-backup-2.toml +++ /dev/null @@ -1,49 +0,0 @@ -# Example Gitaly configuration file - -# The directory where Gitaly's executables are stored -bin_dir = "/usr/local/bin" - -# listen on a TCP socket. This is insecure (no authentication) -listen_addr = "0.0.0.0:9999" - -# # Optional: export metrics via Prometheus -# prometheus_listen_addr = "localhost:9236" -# - -# # Git executable settings -# [git] -# bin_path = "/usr/bin/git" - -[[storage]] -name = "backup2" -path = "/home/git/repositories" - -# # You can optionally configure more storages for this Gitaly instance to serve up -# -# [[storage]] -# name = "other_storage" -# path = "/mnt/other_storage/repositories" -# - -# # You can optionally configure Gitaly to output JSON-formatted log messages to stdout -# [logging] -# format = "json" -# # Additionally exceptions can be reported to Sentry -# sentry_dsn = "https://:@sentry.io/" - -# # You can optionally configure Gitaly to record histogram latencies on GRPC method calls -# [prometheus] -# grpc_latency_buckets = [0.001, 0.005, 0.025, 0.1, 0.5, 1.0, 10.0, 30.0, 60.0, 300.0, 1500.0] - -[gitaly-ruby] -# The directory where gitaly-ruby is installed -dir = "/srv/gitaly-ruby" - -[gitlab-shell] -# The directory where gitlab-shell is installed -dir = "/srv/gitlab-shell" - -# # You can adjust the concurrency of each RPC endpoint -# [[concurrency]] -# rpc = "/gitaly.RepositoryService/GarbageCollect" -# max_per_repo = 1 diff --git a/changelogs/unreleased/jc-sql-data-store.yml b/changelogs/unreleased/jc-sql-data-store.yml new file mode 100644 index 0000000000000000000000000000000000000000..e9ccb210defdd1afc915f82814e97f3207e23fc1 --- /dev/null +++ b/changelogs/unreleased/jc-sql-data-store.yml @@ -0,0 +1,5 @@ +--- +title: SQL datastore for praefect +merge_request: 1370 +author: +type: added diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index acf53d2fad2e1e62f44557550be1b558cacebfad..d4ac47f18126c6a85545c35b1b322100a4a637f3 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -19,6 +19,8 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/database" + "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/version" "gitlab.com/gitlab-org/labkit/tracing" ) @@ -92,19 +94,33 @@ func configure() (config.Config, error) { func run(listeners []net.Listener, conf config.Config) error { + sqlDatastore, err := database.NewSQLDatastore( + conf.Postgres.User, + conf.Postgres.Password, + conf.Postgres.Address, + conf.Postgres.Database, + ) + + if err != nil { + return fmt.Errorf("failed to create sql datastore: %v", err) + } + var ( // top level server dependencies - datastore = praefect.NewMemoryDatastore(conf) - coordinator = praefect.NewCoordinator(logger, datastore) - repl = praefect.NewReplMgr("default", logger, datastore, coordinator, praefect.WithWhitelist(conf.Whitelist)) + datastore = praefect.NewMemoryDatastore() + coordinator = praefect.NewCoordinator(logger, sqlDatastore, protoregistry.GitalyProtoFileDescriptors...) + repl = praefect.NewReplMgr("default", logger, sqlDatastore, datastore, coordinator, praefect.WithWhitelist(conf.Whitelist)) srv = praefect.NewServer(coordinator, repl, nil, logger) - // signal related signals = []os.Signal{syscall.SIGTERM, syscall.SIGINT} termCh = make(chan os.Signal, len(signals)) serverErrors = make(chan error, 1) ) + if err := sqlDatastore.LoadFromConfig(conf); err != nil { + return fmt.Errorf("loading config for database: %v", err) + } + signal.Notify(termCh, signals...) for _, l := range listeners { @@ -114,14 +130,12 @@ func run(listeners []net.Listener, conf config.Config) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - allBackendServers := append(conf.SecondaryServers, conf.PrimaryServer) - - for _, gitaly := range allBackendServers { - if err := coordinator.RegisterNode(gitaly.Name, gitaly.ListenAddr); err != nil { - return fmt.Errorf("failed to register %s: %s", gitaly.Name, err) + for _, node := range conf.StorageNodes { + if err := coordinator.RegisterNode(node.Storage, node.Address); err != nil { + return fmt.Errorf("failed to register %s: %s", node.Address, err) } - logger.WithField("node_name", gitaly.Name).WithField("gitaly listen addr", gitaly.ListenAddr).Info("registered gitaly node") + logger.WithField("node_address", node.Address).Info("registered gitaly node") } go func() { serverErrors <- repl.ProcessBacklog(ctx) }() diff --git a/go.mod b/go.mod index a8b29a4a1a94a21845854751826293c70e10a22f..de8b013fdeef50f3da15a3a9736b4784e15fc70b 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/kelseyhightower/envconfig v1.3.0 github.com/kr/pretty v0.1.0 // indirect + github.com/lib/pq v1.2.0 github.com/libgit2/git2go v0.0.0-20190104134018-ecaeb7a21d47 github.com/prometheus/client_golang v1.0.0 github.com/sirupsen/logrus v1.2.0 diff --git a/go.sum b/go.sum index d97d828f7af8b7b3c388109ab6a182d6c0b1a6c2..91aab83da8d4a549c4680dd0957709675b16eacf 100644 --- a/go.sum +++ b/go.sum @@ -56,6 +56,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0= +github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/libgit2/git2go v0.0.0-20190104134018-ecaeb7a21d47 h1:HDt7WT3kpXSHq4mlOuLzgXH9LeOK1qlhyFdKIAzxxeM= github.com/libgit2/git2go v0.0.0-20190104134018-ecaeb7a21d47/go.mod h1:4bKN42efkbNYMZlvDfxGDxzl066GhpvIircZDsm8Y+Y= github.com/lightstep/lightstep-tracer-go v0.15.6 h1:D0GGa7afJ7GcQvu5as6ssLEEKYXvRgKI5d5cevtz8r4= diff --git a/internal/helper/storage.go b/internal/helper/storage.go index 4e535a5d61acd994d24480797a30c3a724ed2fd0..f3f0d0ba001916ef5155b86e1f6234f57945061e 100644 --- a/internal/helper/storage.go +++ b/internal/helper/storage.go @@ -34,6 +34,17 @@ func ExtractGitalyServers(ctx context.Context) (gitalyServersInfo storage.Gitaly return } +// IncomingToOutgoing creates an outgoing context out of an incoming context with the same storage metadata +func IncomingToOutgoing(ctx context.Context) context.Context { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return ctx + } + + return metadata.NewOutgoingContext(ctx, md) + +} + // InjectGitalyServers injects gitaly-servers metadata into an outgoing context func InjectGitalyServers(ctx context.Context, name, address, token string) (context.Context, error) { diff --git a/internal/praefect/common.go b/internal/praefect/common.go index 2df2a48237300b94f60cc80d882f97cca0a1fd9d..a09a292adaef77047414b459d41af3ad6a837bfc 100644 --- a/internal/praefect/common.go +++ b/internal/praefect/common.go @@ -1,13 +1,5 @@ package praefect -import "google.golang.org/grpc" - -// Node is a wrapper around the grpc client connection for a backend Gitaly node -type Node struct { - Storage string - cc *grpc.ClientConn -} - // logging keys to use with logrus WithField const ( logKeyProjectPath = "ProjectPath" diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index 6a2a5b5d5b84293aed8c2a44bfe1246abb579a30..eb044d308763ef5f92c88b924eac7c56fc6d2083 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -15,8 +15,7 @@ type Config struct { ListenAddr string `toml:"listen_addr"` SocketPath string `toml:"socket_path"` - PrimaryServer *models.GitalyServer `toml:"primary_server"` - SecondaryServers []*models.GitalyServer `toml:"secondary_server"` + StorageNodes []*models.StorageNode `toml:"storage_node"` // Whitelist is a list of relative project paths (paths comprised of project // hashes) that are permitted to use high availability features @@ -24,13 +23,16 @@ type Config struct { Logging config.Logging `toml:"logging"` PrometheusListenAddr string `toml:"prometheus_listen_addr"` + + Postgres *Postgres `toml:"postgres"` } -// GitalyServer allows configuring the servers that RPCs are proxied to -type GitalyServer struct { - Name string `toml:"name"` - ListenAddr string `toml:"listen_addr" split_words:"true"` - Token string `toml:"token"` +// Postgres contains details for connecting to a postgres database +type Postgres struct { + User string `toml:"user"` + Password string `toml:"password"` + Address string `toml:"address"` + Database string `toml:"database"` } // FromFile loads the config for the passed file path @@ -50,32 +52,35 @@ var ( errNoListener = errors.New("no listen address or socket path configured") errNoGitalyServers = errors.New("no primary gitaly backends configured") errDuplicateGitalyAddr = errors.New("gitaly listen addresses are not unique") - errGitalyWithoutName = errors.New("all gitaly servers must have a name") + errGitalyWithoutAddr = errors.New("all gitaly nodes must have an address") + errNoPostgres = errors.New("postgres configuration missing") ) -var emptyServer = &models.GitalyServer{} - // Validate establishes if the config is valid func (c Config) Validate() error { if c.ListenAddr == "" && c.SocketPath == "" { return errNoListener } - if c.PrimaryServer == nil || c.PrimaryServer == emptyServer { + if len(c.StorageNodes) == 0 { return errNoGitalyServers } - listenAddrs := make(map[string]bool, len(c.SecondaryServers)+1) - for _, gitaly := range append(c.SecondaryServers, c.PrimaryServer) { - if gitaly.Name == "" { - return errGitalyWithoutName + listenAddrs := make(map[string]bool, len(c.StorageNodes)) + for _, node := range c.StorageNodes { + if node.Address == "" { + return errGitalyWithoutAddr } - if _, found := listenAddrs[gitaly.ListenAddr]; found { + if _, found := listenAddrs[node.Address]; found { return errDuplicateGitalyAddr } - listenAddrs[gitaly.ListenAddr] = true + listenAddrs[node.Address] = true + } + + if c.Postgres == nil { + return errNoPostgres } return nil diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go index eace5eb2f3e4f04ff3ea4c86456d9e2fcd8caab6..c6de879fcb28daab180de8cf4657ea8e83b38a71 100644 --- a/internal/praefect/config/config_test.go +++ b/internal/praefect/config/config_test.go @@ -9,10 +9,10 @@ import ( ) func TestConfigValidation(t *testing.T) { - primarySrv := &models.GitalyServer{"test", "localhost:23456", "secret-token"} - secondarySrvs := []*models.GitalyServer{ - {"test1", "localhost:23457", "secret-token"}, - {"test2", "localhost:23458", "secret-token"}, + nodes := []*models.StorageNode{ + {ID: 1, Address: "localhost:23456", Token: "secret-token"}, + {ID: 2, Address: "localhost:23457", Token: "secret-token"}, + {ID: 3, Address: "localhost:23458", Token: "secret-token"}, } testCases := []struct { @@ -22,27 +22,32 @@ func TestConfigValidation(t *testing.T) { }{ { desc: "No ListenAddr or SocketPath", - config: Config{ListenAddr: "", PrimaryServer: primarySrv, SecondaryServers: secondarySrvs}, + config: Config{ListenAddr: "", StorageNodes: nodes, Postgres: &Postgres{}}, err: errNoListener, }, { desc: "Only a SocketPath", - config: Config{SocketPath: "/tmp/praefect.socket", PrimaryServer: primarySrv, SecondaryServers: secondarySrvs}, + config: Config{SocketPath: "/tmp/praefect.socket", StorageNodes: nodes, Postgres: &Postgres{}}, err: nil, }, { desc: "No servers", - config: Config{ListenAddr: "localhost:1234"}, + config: Config{ListenAddr: "localhost:1234", Postgres: &Postgres{}}, err: errNoGitalyServers, }, { desc: "duplicate address", - config: Config{ListenAddr: "localhost:1234", PrimaryServer: primarySrv, SecondaryServers: []*models.GitalyServer{primarySrv}}, + config: Config{ListenAddr: "localhost:1234", StorageNodes: append(nodes, &models.StorageNode{Address: nodes[0].Address}), Postgres: &Postgres{}}, err: errDuplicateGitalyAddr, }, + { + desc: "Missing Postgres", + config: Config{ListenAddr: "localhost:1234", StorageNodes: nodes}, + err: errNoPostgres, + }, { desc: "Valid config", - config: Config{ListenAddr: "localhost:1234", PrimaryServer: primarySrv, SecondaryServers: secondarySrvs}, + config: Config{ListenAddr: "localhost:1234", StorageNodes: nodes, Postgres: &Postgres{}}, err: nil, }, } @@ -63,21 +68,27 @@ func TestConfigParsing(t *testing.T) { { filePath: "testdata/config.toml", expected: Config{ - PrimaryServer: &models.GitalyServer{ - Name: "default", - ListenAddr: "tcp://gitaly-primary.example.com", - }, - SecondaryServers: []*models.GitalyServer{ + StorageNodes: []*models.StorageNode{ { - Name: "default", - ListenAddr: "tcp://gitaly-backup1.example.com", + Address: "tcp://gitaly-internal-1.example.com", + Storage: "praefect-internal-1", }, { - Name: "backup", - ListenAddr: "tcp://gitaly-backup2.example.com", + Address: "tcp://gitaly-internal-2.example.com", + Storage: "praefect-internal-2", + }, + { + Address: "tcp://gitaly-internal-3.example.com", + Storage: "praefect-internal-3", }, }, Whitelist: []string{"abcd1234", "edfg5678"}, + Postgres: &Postgres{ + User: "pg_user", + Password: "password", + Address: "/tmp/postgres.socket", + Database: "praefect_test", + }, }, }, } diff --git a/internal/praefect/config/testdata/config.toml b/internal/praefect/config/testdata/config.toml index 81701a359c91b2a493ae6d396638f51d6592e8d0..4464c566365b83ddf1f24f801b199e7025c4c2dc 100644 --- a/internal/praefect/config/testdata/config.toml +++ b/internal/praefect/config/testdata/config.toml @@ -3,20 +3,26 @@ socket_path = "" whitelist = ["abcd1234", "edfg5678"] prometheus_listen_addr = "" -[primary_server] - name = "default" - listen_addr = "tcp://gitaly-primary.example.com" - -[[secondary_server]] - name = "default" - listen_addr = "tcp://gitaly-backup1.example.com" - -[[secondary_server]] - name = "backup" - listen_addr = "tcp://gitaly-backup2.example.com" - [logging] format = "" sentry_dsn = "" ruby_sentry_dsn = "" level = "" + +[[storage_node]] + address = "tcp://gitaly-internal-1.example.com" + storage = "praefect-internal-1" + +[[storage_node]] + address = "tcp://gitaly-internal-2.example.com" + storage = "praefect-internal-2" + +[[storage_node]] + address = "tcp://gitaly-internal-3.example.com" + storage = "praefect-internal-3" + +[postgres] + user = "pg_user" + password = "password" + address = "/tmp/postgres.socket" + database = "praefect_test" diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 8f64022cbbf88da411a0d84ed197dd091b4989d5..053e178cbadfa9375d2989ec2e5ea2d9a64fbecb 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -2,12 +2,15 @@ package praefect import ( "context" + "database/sql" + "errors" "fmt" "os" "os/signal" "sync" "syscall" + "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb" gitalyauth "gitlab.com/gitlab-org/gitaly/auth" gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/helper" @@ -19,8 +22,6 @@ import ( "gitlab.com/gitlab-org/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) // Coordinator takes care of directing client requests to the appropriate @@ -31,14 +32,14 @@ type Coordinator struct { failoverMutex sync.RWMutex connMutex sync.RWMutex - datastore Datastore + datastore ReplicasDatastore nodes map[string]*grpc.ClientConn registry *protoregistry.Registry } // NewCoordinator returns a new Coordinator that utilizes the provided logger -func NewCoordinator(l *logrus.Logger, datastore Datastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { +func NewCoordinator(l *logrus.Logger, datastore ReplicasDatastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { registry := protoregistry.New() registry.RegisterFiles(fileDescriptors...) @@ -55,17 +56,18 @@ func (c *Coordinator) RegisterProtos(protos ...*descriptor.FileDescriptorProto) return c.registry.RegisterFiles(protos...) } -// GetStorageNode returns the registered node for the given storage location -func (c *Coordinator) GetStorageNode(storage string) (Node, error) { - cc, ok := c.getConn(storage) - if !ok { - return Node{}, fmt.Errorf("no node registered for storage location %q", storage) +func targetRepo(mi protoregistry.MethodInfo, frame []byte) (*gitalypb.Repository, error) { + m, err := mi.UnmarshalRequestProto(frame) + if err != nil { + return nil, err } - return Node{ - Storage: storage, - cc: cc, - }, nil + targetRepo, err := mi.TargetRepo(m) + if err != nil { + return nil, err + } + + return targetRepo, nil } // streamDirector determines which downstream servers receive requests @@ -77,34 +79,77 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, c.failoverMutex.RLock() defer c.failoverMutex.RUnlock() - serverConfig, err := c.datastore.GetDefaultPrimary() + frames, err := peeker.Peek(ctx, 1) if err != nil { - err := status.Error( - codes.FailedPrecondition, - "no downstream node registered", - ) return nil, nil, err } - // We only need the primary node, as there's only one primary storage - // location per praefect at this time - cc, ok := c.getConn(serverConfig.Name) - if !ok { - return nil, nil, fmt.Errorf("unable to find existing client connection for %s", serverConfig.Name) + mi, err := c.registry.LookupMethod(fullMethodName) + if err != nil { + return nil, nil, err } - ctx, err = helper.InjectGitalyServers(ctx, serverConfig.Name, serverConfig.ListenAddr, serverConfig.Token) + var primary *models.StorageNode + + if mi.Scope == protoregistry.ScopeRepository { + targetRepo, err := targetRepo(mi, frames[0]) + if err != nil { + return nil, nil, err + } + + primary, err = c.datastore.GetPrimary(targetRepo.GetRelativePath()) + + if err != nil { + if err != sql.ErrNoRows { + return nil, nil, err + } + // if there are no primaries for this repository, pick one + nodes, err := c.datastore.GetStorageNodes() + if err != nil { + return nil, nil, err + } + + if len(nodes) == 0 { + return nil, nil, fmt.Errorf("no nodes serve storage %s", targetRepo.GetStorageName()) + + } + //newPrimary := nodes[rand.New(rand.NewSource(time.Now().Unix())).Intn(len(nodes))] + newPrimary := nodes[0] + + // set the primary + if err = c.datastore.SetPrimary(targetRepo.GetRelativePath(), newPrimary.ID); err != nil { + return nil, nil, err + } + + primary = &newPrimary + } + } else { + //TODO: For now we just pick a random storage node for a non repository scoped RPC, but we will need to figure out exactly how to + // proxy requests that are not repository scoped + node, err := c.datastore.GetStorageNodes() + if err != nil { + return nil, nil, err + } + if len(node) == 0 { + return nil, nil, errors.New("no node storages found") + } + primary = &node[0] + } + + // We only need the primary node, as there's only one primary storage + // location per praefect at this time + cc, err := c.GetConnection(primary.Storage) if err != nil { - return nil, nil, err + return nil, nil, fmt.Errorf("unable to find existing client connection for %s", primary.Storage) } - return ctx, cc, nil + return helper.IncomingToOutgoing(ctx), cc, nil } // RegisterNode will direct traffic to the supplied downstream connection when the storage location // is encountered. -func (c *Coordinator) RegisterNode(storageName, listenAddr string) error { - conn, err := client.Dial(listenAddr, +func (c *Coordinator) RegisterNode(storage, address string) error { + conn, err := client.Dial(address, []grpc.DialOption{ grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())), grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(gitalyconfig.Config.Auth.Token)), @@ -114,23 +159,28 @@ func (c *Coordinator) RegisterNode(storageName, listenAddr string) error { return err } - c.setConn(storageName, conn) + c.setConn(storage, conn) return nil } -func (c *Coordinator) setConn(storageName string, conn *grpc.ClientConn) { +func (c *Coordinator) setConn(storage string, conn *grpc.ClientConn) { c.connMutex.Lock() - c.nodes[storageName] = conn + c.nodes[storage] = conn c.connMutex.Unlock() } -func (c *Coordinator) getConn(storageName string) (*grpc.ClientConn, bool) { +// GetConnection gets the grpc client connection based on an address +func (c *Coordinator) GetConnection(storage string) (*grpc.ClientConn, error) { c.connMutex.RLock() - cc, ok := c.nodes[storageName] + cc, ok := c.nodes[storage] c.connMutex.RUnlock() + if !ok { + return nil, errors.New("client connection not found") + } + + return cc, nil - return cc, ok } // FailoverRotation waits for the SIGUSR1 signal, then promotes the next secondary to be primary @@ -146,55 +196,7 @@ func (c *Coordinator) handleSignalAndRotate() { <-failoverChan c.failoverMutex.Lock() - primary, err := c.datastore.GetDefaultPrimary() - if err != nil { - c.log.Fatalf("error when getting default primary: %v", err) - } - - if err := c.rotateSecondaryToPrimary(primary); err != nil { - c.log.WithError(err).Error("rotating secondary") - } + c.log.Info("failover happens") c.failoverMutex.Unlock() } } - -func (c *Coordinator) rotateSecondaryToPrimary(primary models.GitalyServer) error { - repositories, err := c.datastore.GetRepositoriesForPrimary(primary) - if err != nil { - return err - } - - for _, repoPath := range repositories { - secondaries, err := c.datastore.GetShardSecondaries(models.Repository{ - RelativePath: repoPath, - }) - if err != nil { - return fmt.Errorf("getting secondaries: %v", err) - } - - newPrimary := secondaries[0] - secondaries = append(secondaries[1:], primary) - - if err = c.datastore.SetShardPrimary(models.Repository{ - RelativePath: repoPath, - }, newPrimary); err != nil { - return fmt.Errorf("setting primary: %v", err) - } - - if err = c.datastore.SetShardSecondaries(models.Repository{ - RelativePath: repoPath, - }, secondaries); err != nil { - return fmt.Errorf("setting secondaries: %v", err) - } - } - - // set the new default primary - primary, err = c.datastore.GetShardPrimary(models.Repository{ - RelativePath: repositories[0], - }) - if err != nil { - return fmt.Errorf("getting shard primary: %v", err) - } - - return c.datastore.SetDefaultPrimary(primary) -} diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 50045f8a005e13d4cfaa85eb62a6f6b8de5aaffb..0275c604845a5475f7def28a920a55652c0451ff 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -5,9 +5,6 @@ import ( "testing" "github.com/sirupsen/logrus" - "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/models" ) var testLogger = logrus.New() @@ -17,31 +14,5 @@ func init() { } func TestSecondaryRotation(t *testing.T) { - cfg := config.Config{ - PrimaryServer: &models.GitalyServer{Name: "primary"}, - SecondaryServers: []*models.GitalyServer{&models.GitalyServer{Name: "secondary_1"}, &models.GitalyServer{Name: "secondary_2"}}, - Whitelist: []string{"/repoA", "/repoB"}, - } - d := NewMemoryDatastore(cfg) - c := NewCoordinator(testLogger, d) - - primary, err := d.GetDefaultPrimary() - require.NoError(t, err) - - require.NoError(t, c.rotateSecondaryToPrimary(primary)) - - primary, err = d.GetDefaultPrimary() - require.NoError(t, err) - require.Equal(t, *cfg.SecondaryServers[0], primary, "the first secondary should have gotten promoted to be primary") - - repositories, err := d.GetRepositoriesForPrimary(primary) - require.NoError(t, err) - - for _, repository := range repositories { - shardSecondaries, err := d.GetShardSecondaries(models.Repository{RelativePath: repository}) - require.NoError(t, err) - - require.Len(t, shardSecondaries, 2) - require.Equal(t, *cfg.SecondaryServers[1], shardSecondaries[0]) - } + t.Skip("secondary rotation will change with the new data model") } diff --git a/internal/praefect/database/migrations/1_initial_up.sql b/internal/praefect/database/migrations/1_initial_up.sql new file mode 100644 index 0000000000000000000000000000000000000000..9a9437a622c6731d0ddbf4e936f5f6b09a7e5f2b --- /dev/null +++ b/internal/praefect/database/migrations/1_initial_up.sql @@ -0,0 +1,16 @@ +CREATE TABLE IF NOT EXISTS storage_nodes ( + id SERIAL PRIMARY KEY, + storage TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS repositories ( + id SERIAL PRIMARY KEY, + relative_path TEXT NOT NULL, + "primary" INTEGER REFERENCES storage_nodes (id), +); + +CREATE TABLE IF NOT EXISTS repository_replicas ( + repository_id INTEGER REFERENCES repositories(id), + storage_node_id INTEGER REFERENCES storage_nodes(id), + PRIMARY KEY(repository_id, storage_node_id) +); \ No newline at end of file diff --git a/internal/praefect/database/sql_datastore.go b/internal/praefect/database/sql_datastore.go new file mode 100644 index 0000000000000000000000000000000000000000..a6cc902a658977239b1be093ad5532e385125b1e --- /dev/null +++ b/internal/praefect/database/sql_datastore.go @@ -0,0 +1,248 @@ +package database + +import ( + "errors" + "fmt" + "strings" + + "database/sql" + + // the lib/pg package provides postgres bindings for the sql package + _ "github.com/lib/pq" + + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/models" +) + +// SQLDatastore is a sql based datastore that conforms to the ReplicasDatastore interface +type SQLDatastore struct { + db *sql.DB +} + +// NewSQLDatastore instantiates a new sql datastore with environment variables +func NewSQLDatastore(user, password, address, database string) (*SQLDatastore, error) { + connStr := fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=disable", user, password, address, database) + db, err := sql.Open("postgres", connStr) + if err != nil { + return nil, err + } + + return &SQLDatastore{db: db}, nil +} + +// LoadFromConfig loads the config into the database +func (sd *SQLDatastore) LoadFromConfig(cfg config.Config) error { + _, err := sd.db.Exec(insertStorageNodesQuery(cfg.StorageNodes)) + if err != nil { + return fmt.Errorf("loading StorageNodes: %v", err) + } + + return nil +} + +func insertStorageNodesQuery(storageNodes []*models.StorageNode) string { + q := `INSERT INTO storage_nodes (storage) VALUES %s ON CONFLICT (storage) DO NOTHING` + + var values []string + for _, storageNode := range storageNodes { + values = append(values, fmt.Sprintf(`('%s')`, storageNode.Storage)) + } + + return fmt.Sprintf(q, strings.Join(values, ",")) +} + +// GetReplicas gets the replicas for a repository based on the relative path +func (sd *SQLDatastore) GetReplicas(relativePath string) ([]models.StorageNode, error) { + var replicas []models.StorageNode + + rows, err := sd.db.Query(` + SELECT storage_nodes.id FROM repositories + INNER JOIN repository_replicas ON repositories.id = repository_replicas.repository_id + INNER JOIN storage_nodes ON storage_nodes.id = repository_replicas.storage_node_id WHERE repositories.relative_path = $1`, relativePath) + + if err != nil { + return nil, err + } + + for rows.Next() { + var s models.StorageNode + err = rows.Scan(&s.ID) + if err != nil { + return nil, err + } + replicas = append(replicas, s) + } + + return replicas, nil +} + +// GetStorageNode gets all storage storage_nodes +func (sd *SQLDatastore) GetStorageNode(nodeID int) (models.StorageNode, error) { + var node models.StorageNode + + row := sd.db.QueryRow("SELECT storage_nodes.id, storage_nodes.storage FROM storage_nodes WHERE storage_nodes.id = $1", nodeID) + + err := row.Scan(&node.ID, &node.Storage) + if err != nil { + return node, err + } + + return node, nil + +} + +// GetStorageNodes gets all storage storage_nodes +func (sd *SQLDatastore) GetStorageNodes() ([]models.StorageNode, error) { + var nodeStorages []models.StorageNode + + rows, err := sd.db.Query("SELECT storage_nodes.id, storage_nodes.storage FROM storage_nodes") + + if err != nil { + return nil, err + } + + for rows.Next() { + var nodeStorage models.StorageNode + err = rows.Scan(&nodeStorage.ID, &nodeStorage.Storage) + if err != nil { + return nil, err + } + nodeStorages = append(nodeStorages, nodeStorage) + } + + return nodeStorages, nil + +} + +// GetPrimary gets the primary storage node for a repository of a repository relative path +func (sd *SQLDatastore) GetPrimary(relativePath string) (*models.StorageNode, error) { + + row := sd.db.QueryRow(` + SELECT storage_nodes.id, storage_nodes.storage FROM repositories + INNER JOIN storage_nodes ON repositories.primary = storage_nodes.id + WHERE repositories.relative_path = $1 + `, relativePath) + + var s models.StorageNode + if err := row.Scan(&s.ID, &s.Storage); err != nil { + return nil, err + } + + return &s, nil +} + +// SetPrimary sets the primary storagee node for a repository of a repository relative path +func (sd *SQLDatastore) SetPrimary(relativePath string, storageNodeID int) error { + res, err := sd.db.Exec(`UPDATE repositories SET "primary" = $1 WHERE relative_path = $2`, storageNodeID, relativePath) + if err != nil { + return err + } + + if n, err := res.RowsAffected(); err != nil { + return err + } else if n == 0 { + res, err = sd.db.Exec(`INSERT INTO repositories (storage, relative_path, "primary") VALUES ($1, $2)`, relativePath, storageNodeID) + if err != nil { + return err + } + if n, err := res.RowsAffected(); err != nil { + return err + } else if n == 0 { + return errors.New("failed to set primary") + } + } + + return nil +} + +// AddReplica adds a replica to a repository of a repository relative path +func (sd *SQLDatastore) AddReplica(relativePath string, storageNodeID int) error { + res, err := sd.db.Exec(` + INSERT INTO repository_replicas (repository_id, storage_node_id) + VALUES (SELECT id, $1 FROM repositories WHERE relative_path = $2)`, storageNodeID, relativePath) + if err != nil { + return err + } + + if n, err := res.RowsAffected(); err != nil { + return err + } else if n == 0 { + return errors.New("replica already exists") + } + + return nil +} + +// RemoveReplica removes a replica from a repository of a repository relative path +func (sd *SQLDatastore) RemoveReplica(relativePath string, storageNodeID int) error { + res, err := sd.db.Exec(` + DELETE FROM repository_replicas (repository_relative_path, node_storage_id) + WHERE repository_id = (SELECT id FROM repository WHERE relative_path = $1) AND storage_node_id = $2`, relativePath, storageNodeID) + if err != nil { + return err + } + + if n, err := res.RowsAffected(); err != nil { + return err + } else if n == 0 { + return errors.New("replica did not exist") + } + + return nil +} + +// GetRepository gets the repository for a repository relative path +func (sd *SQLDatastore) GetRepository(relativePath string) (*models.Repository, error) { + primary, err := sd.GetPrimary(relativePath) + if err != nil { + return nil, fmt.Errorf("getting primary: %v", err) + } + + replicas, err := sd.GetReplicas(relativePath) + if err != nil { + return nil, fmt.Errorf("getting replicas: %v", err) + } + + return &models.Repository{RelativePath: relativePath, Primary: *primary, Replicas: replicas}, nil +} + +// RotatePrimary rotates a primary out of being primary, and picks a replica of each repository at random to promote to the new primary +func (sd *SQLDatastore) RotatePrimary(primaryNodeStorageID int) error { + + // Add the primary as a replica + res, err := sd.db.Exec(` + INSERT INTO repository_replicas (repository_id, node_storage_id) VALUES (SELECT repositories.id, repositories.primary FROM repositories WHERE repositories.primary = $1) + `, primaryNodeStorageID) + if err != nil { + return err + } + + affected, err := res.RowsAffected() + if err != nil { + return err + } + + if affected == 0 { + return fmt.Errorf("no repositories with primary %d found", primaryNodeStorageID) + } + + // Choose a new replica + res, err = sd.db.Exec(`UPDATE repositories SET "primary" = + (SELECT repository_replicas.storage_node_id FROM repository_replicas + INNER JOIN repositories ON repository_replicas.repository_id = repositories.id + WHERE repositories.primary = $1 AND repositories.primary != repository_replicas.storage_node_id LIMIT 1) + `, primaryNodeStorageID) + if err != nil { + return err + } + + affected, err = res.RowsAffected() + if err != nil { + return err + } + + if affected == 0 { + return errors.New("no replicas available to rotate") + } + return nil +} diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go index 5678c6a247c1e598bf8388b5c152edb2f0ed38fb..6730ca1e40f38c66d928b7655400638d8e30038b 100644 --- a/internal/praefect/datastore.go +++ b/internal/praefect/datastore.go @@ -11,7 +11,6 @@ import ( "sort" "sync" - "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" ) @@ -42,10 +41,11 @@ const ( // meant for updating the repository so that it is synced with the primary // copy. Scheduled indicates when a replication job should be performed. type ReplJob struct { - ID uint64 // autoincrement ID - Target string // which storage location to replicate to? - Source models.Repository // source for replication - State JobState + ID uint64 // autoincrement ID + TargetNodeID int // which node to replicate to? + SourceStorage string + Source models.Repository // source for replication + State JobState } // replJobs provides sort manipulation behavior @@ -64,32 +64,26 @@ func (b byJobID) Less(i, j int) bool { return b.replJobs[i].ID < b.replJobs[j].I type Datastore interface { ReplJobsDatastore ReplicasDatastore - TemporaryDatastore -} - -// TemporaryDatastore contains methods that will go away once we move to a SQL datastore -type TemporaryDatastore interface { - GetDefaultPrimary() (models.GitalyServer, error) - SetDefaultPrimary(primary models.GitalyServer) error } // ReplicasDatastore manages accessing and setting which secondary replicas // backup a repository type ReplicasDatastore interface { - // GetSecondaries will retrieve all secondary replica storage locations for - // a primary replica - GetShardSecondaries(repo models.Repository) ([]models.GitalyServer, error) + GetReplicas(relativePath string) ([]models.StorageNode, error) + + GetStorageNode(nodeID int) (models.StorageNode, error) + + GetStorageNodes() ([]models.StorageNode, error) + + GetPrimary(relativePath string) (*models.StorageNode, error) - GetShardPrimary(repo models.Repository) (models.GitalyServer, error) + SetPrimary(relativePath string, storageNodeID int) error - // SetSecondaries will set the secondary storage locations for a repository - // in a primary replica. - SetShardSecondaries(repo models.Repository, secondaries []models.GitalyServer) error + AddReplica(relativePath string, storageNodeID int) error - SetShardPrimary(repo models.Repository, primary models.GitalyServer) error + RemoveReplica(relativePath string, storageNodeID int) error - // GetRepositoriesForPrimary returns a map of all of the active shards for a given primary - GetRepositoriesForPrimary(primary models.GitalyServer) ([]string, error) + GetRepository(relativePath string) (*models.Repository, error) } // ReplJobsDatastore represents the behavior needed for fetching and updating @@ -98,58 +92,53 @@ type ReplJobsDatastore interface { // GetJobs fetches a list of chronologically ordered replication // jobs for the given storage replica. The returned list will be at most // count-length. - GetJobs(flag JobState, node string, count int) ([]ReplJob, error) + GetJobs(flag JobState, nodeID int, count int) ([]ReplJob, error) - // CreateSecondaryJobs will create replication jobs for each secondary + // CreateReplicaJobs will create replication jobs for each secondary // replica of a repository known to the datastore. A set of replication job // ID's for the created jobs will be returned upon success. - CreateSecondaryReplJobs(source models.Repository) ([]uint64, error) + CreateReplicaReplJobs(relativePath string) ([]uint64, error) // UpdateReplJob updates the state of an existing replication job UpdateReplJob(jobID uint64, newState JobState) error } -// shard is a set of primary and secondary storage replicas for a project -type shard struct { - primary models.GitalyServer - secondaries []models.GitalyServer -} - type jobRecord struct { - relativePath string // project's relative path - targetNode string - state JobState + sourceStorage string + relativePath string // project's relative path + targetNodeID int + state JobState } // MemoryDatastore is a simple datastore that isn't persisted to disk. It is // only intended for early beta requirements and as a reference implementation // for the eventual SQL implementation type MemoryDatastore struct { - replicas *struct { - sync.RWMutex - m map[string]shard // keyed by project's relative path - } - jobs *struct { sync.RWMutex next uint64 records map[uint64]jobRecord // all jobs indexed by ID } - primary *struct { + storageNodes *struct { sync.RWMutex - server models.GitalyServer + m map[int]models.StorageNode + } + + repositories *struct { + sync.RWMutex + m map[string]models.Repository } } // NewMemoryDatastore returns an initialized in-memory datastore -func NewMemoryDatastore(cfg config.Config) *MemoryDatastore { - m := &MemoryDatastore{ - replicas: &struct { +func NewMemoryDatastore() *MemoryDatastore { + return &MemoryDatastore{ + storageNodes: &struct { sync.RWMutex - m map[string]shard + m map[int]models.StorageNode }{ - m: map[string]shard{}, + m: map[int]models.StorageNode{}, }, jobs: &struct { sync.RWMutex @@ -159,114 +148,157 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore { next: 0, records: map[uint64]jobRecord{}, }, - primary: &struct { + repositories: &struct { sync.RWMutex - server models.GitalyServer + m map[string]models.Repository }{ - server: models.GitalyServer{ - Name: cfg.PrimaryServer.Name, - ListenAddr: cfg.PrimaryServer.ListenAddr, - Token: cfg.PrimaryServer.Token, - }, + m: map[string]models.Repository{}, }, } +} - secondaryServers := make([]models.GitalyServer, len(cfg.SecondaryServers)) - for i, server := range cfg.SecondaryServers { - secondaryServers[i] = *server +// GetReplicas gets the secondaries for a shard based on the relative path +func (md *MemoryDatastore) GetReplicas(relativePath string) ([]models.StorageNode, error) { + md.repositories.RLock() + md.storageNodes.RLock() + defer md.storageNodes.RUnlock() + defer md.repositories.RUnlock() + + shard, ok := md.repositories.m[relativePath] + if !ok { + return nil, errors.New("shard not found") } - for _, repo := range cfg.Whitelist { - // store the configuration file specified shard - m.replicas.m[repo] = shard{ - primary: *cfg.PrimaryServer, - secondaries: secondaryServers, - } + return shard.Replicas, nil +} - // initialize replication job queue to replicate all whitelisted repos - // to every secondary server - for _, secondary := range cfg.SecondaryServers { - m.jobs.next++ - m.jobs.records[m.jobs.next] = jobRecord{ - state: JobStateReady, - targetNode: secondary.Name, - relativePath: repo, - } - } +// GetStorageNode gets all storage nodes +func (md *MemoryDatastore) GetStorageNode(nodeID int) (models.StorageNode, error) { + md.storageNodes.RLock() + defer md.storageNodes.RUnlock() + + node, ok := md.storageNodes.m[nodeID] + if !ok { + return models.StorageNode{}, errors.New("node not found") } - return m + return node, nil } -// GetShardSecondaries will return the set of secondary storage locations for a -// given repository if they exist -func (md *MemoryDatastore) GetShardSecondaries(primary models.Repository) ([]models.GitalyServer, error) { - shard, _ := md.getShard(primary.RelativePath) +// GetStorageNodes gets all storage nodes +func (md *MemoryDatastore) GetStorageNodes() ([]models.StorageNode, error) { + md.storageNodes.RLock() + defer md.storageNodes.RUnlock() - return shard.secondaries, nil + var storageNodes []models.StorageNode + for _, storageNode := range md.storageNodes.m { + storageNodes = append(storageNodes, storageNode) + } + + return storageNodes, nil } -// SetShardSecondaries will replace the set of replicas for a repository -func (md *MemoryDatastore) SetShardSecondaries(repo models.Repository, secondaries []models.GitalyServer) error { - md.replicas.Lock() - defer md.replicas.Unlock() +// GetPrimary gets the primary storage node for a shard of a repository relative path +func (md *MemoryDatastore) GetPrimary(relativePath string) (*models.StorageNode, error) { + md.repositories.RLock() + defer md.repositories.RUnlock() - shard := md.replicas.m[repo.RelativePath] - shard.secondaries = secondaries - md.replicas.m[repo.RelativePath] = shard + shard, ok := md.repositories.m[relativePath] + if !ok { + return nil, errors.New("shard not found") + } + + storageNode, ok := md.storageNodes.m[shard.Primary.ID] + if !ok { + return nil, errors.New("node storage not found") + } + return &storageNode, nil - return nil } -// SetShardPrimary sets the primary for a repository -func (md *MemoryDatastore) SetShardPrimary(repo models.Repository, primary models.GitalyServer) error { - md.replicas.Lock() - defer md.replicas.Unlock() +// SetPrimary sets the primary storagee node for a shard of a repository relative path +func (md *MemoryDatastore) SetPrimary(relativePath string, storageNodeID int) error { + md.repositories.Lock() + defer md.repositories.Unlock() + + shard, ok := md.repositories.m[relativePath] + if !ok { + return errors.New("shard not found") + } + + storageNode, ok := md.storageNodes.m[storageNodeID] + if !ok { + return errors.New("node storage not found") + } - shard := md.replicas.m[repo.RelativePath] - shard.primary = primary - md.replicas.m[repo.RelativePath] = shard + shard.Primary = storageNode + md.repositories.m[relativePath] = shard return nil } -// GetShardPrimary gets the primary for a repository -func (md *MemoryDatastore) GetShardPrimary(repo models.Repository) (models.GitalyServer, error) { - md.replicas.Lock() - defer md.replicas.Unlock() +// AddReplica adds a secondary to a shard of a repository relative path +func (md *MemoryDatastore) AddReplica(relativePath string, storageNodeID int) error { + md.repositories.Lock() + defer md.repositories.Unlock() - shard := md.replicas.m[repo.RelativePath] - return shard.primary, nil + shard, ok := md.repositories.m[relativePath] + if !ok { + return errors.New("shard not found") + } + + storageNode, ok := md.storageNodes.m[storageNodeID] + if !ok { + return errors.New("node storage not found") + } + + shard.Replicas = append(shard.Replicas, storageNode) + + md.repositories.m[relativePath] = shard + return nil } -// GetRepositoriesForPrimary gets all repositories -func (md *MemoryDatastore) GetRepositoriesForPrimary(primary models.GitalyServer) ([]string, error) { - md.replicas.Lock() - defer md.replicas.Unlock() +// RemoveReplica removes a secondary from a shard of a repository relative path +func (md *MemoryDatastore) RemoveReplica(relativePath string, storageNodeID int) error { + md.repositories.Lock() + defer md.repositories.Unlock() - repositories := make([]string, 0, len(md.replicas.m)) + shard, ok := md.repositories.m[relativePath] + if !ok { + return errors.New("shard not found") + } - for repository := range md.replicas.m { - repositories = append(repositories, repository) + var secondaries []models.StorageNode + for _, secondary := range shard.Replicas { + if secondary.ID != storageNodeID { + secondaries = append(secondaries, secondary) + } } - return repositories, nil + shard.Replicas = secondaries + md.repositories.m[relativePath] = shard + return nil } -func (md *MemoryDatastore) getShard(project string) (shard, bool) { - md.replicas.RLock() - replicas, ok := md.replicas.m[project] - md.replicas.RUnlock() +// GetRepository gets the shard for a repository relative path +func (md *MemoryDatastore) GetRepository(relativePath string) (*models.Repository, error) { + md.repositories.Lock() + defer md.repositories.Unlock() - return replicas, ok + shard, ok := md.repositories.m[relativePath] + if !ok { + return nil, errors.New("shard not found") + } + + return &shard, nil } -// ErrSecondariesMissing indicates the repository does not have any backup +// ErrReplicasMissing indicates the repository does not have any backup // replicas -var ErrSecondariesMissing = errors.New("repository missing secondary replicas") +var ErrReplicasMissing = errors.New("repository missing secondary replicas") // GetJobs is a more general method to retrieve jobs of a certain state from the datastore -func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([]ReplJob, error) { +func (md *MemoryDatastore) GetJobs(state JobState, targetNodeID int, count int) ([]ReplJob, error) { md.jobs.RLock() defer md.jobs.RUnlock() @@ -274,7 +306,7 @@ func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([ for i, record := range md.jobs.records { // state is a bitmap that is a combination of one or more JobStates - if record.state&state != 0 && record.targetNode == storage { + if record.state&state != 0 && record.targetNodeID == targetNodeID { job, err := md.replJobFromRecord(i, record) if err != nil { return nil, err @@ -295,58 +327,50 @@ func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([ // replJobFromRecord constructs a replication job from a record and by cross // referencing the current shard for the project being replicated func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (ReplJob, error) { - shard, ok := md.getShard(record.relativePath) - if !ok { - return ReplJob{}, fmt.Errorf( - "unable to find shard for project at relative path %q", - record.relativePath, - ) - } - return ReplJob{ ID: jobID, Source: models.Repository{ RelativePath: record.relativePath, - Storage: shard.primary.Name, }, - State: record.state, - Target: record.targetNode, + SourceStorage: record.sourceStorage, + State: record.state, + TargetNodeID: record.targetNodeID, }, nil } -// ErrInvalidReplTarget indicates a targetNode repository cannot be chosen because +// ErrInvalidReplTarget indicates a targetStorage repository cannot be chosen because // it fails preconditions for being replicatable -var ErrInvalidReplTarget = errors.New("targetNode repository fails preconditions for replication") +var ErrInvalidReplTarget = errors.New("targetStorage repository fails preconditions for replication") -// CreateSecondaryReplJobs creates a replication job for each secondary that +// CreateReplicaReplJobs creates a replication job for each secondary that // backs the specified repository. Upon success, the job IDs will be returned. -func (md *MemoryDatastore) CreateSecondaryReplJobs(source models.Repository) ([]uint64, error) { +func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string) ([]uint64, error) { md.jobs.Lock() defer md.jobs.Unlock() - emptyRepo := models.Repository{} - if source == emptyRepo { + if relativePath == "" { return nil, errors.New("invalid source repository") } - shard, ok := md.getShard(source.RelativePath) - if !ok { + shard, err := md.GetRepository(relativePath) + if err != nil { return nil, fmt.Errorf( "unable to find shard for project at relative path %q", - source.RelativePath, + relativePath, ) } var jobIDs []uint64 - for _, secondary := range shard.secondaries { + for _, secondary := range shard.Replicas { nextID := uint64(len(md.jobs.records) + 1) md.jobs.next++ md.jobs.records[md.jobs.next] = jobRecord{ - targetNode: secondary.Name, - state: JobStatePending, - relativePath: source.RelativePath, + targetNodeID: secondary.ID, + state: JobStatePending, + relativePath: relativePath, + sourceStorage: shard.Primary.Storage, } jobIDs = append(jobIDs, nextID) @@ -375,36 +399,3 @@ func (md *MemoryDatastore) UpdateReplJob(jobID uint64, newState JobState) error md.jobs.records[jobID] = job return nil } - -// SetPrimary sets the primary datastore location -func (md *MemoryDatastore) SetPrimary(primary models.GitalyServer) error { - md.primary.Lock() - defer md.primary.Unlock() - - md.primary.server = primary - - return nil -} - -// GetDefaultPrimary gets the primary datastore location -func (md *MemoryDatastore) GetDefaultPrimary() (models.GitalyServer, error) { - md.primary.RLock() - defer md.primary.RUnlock() - - primary := md.primary.server - if primary == (models.GitalyServer{}) { - return primary, ErrPrimaryNotSet - } - - return primary, nil -} - -// SetDefaultPrimary gets the primary datastore location -func (md *MemoryDatastore) SetDefaultPrimary(primary models.GitalyServer) error { - md.primary.RLock() - defer md.primary.RUnlock() - - md.primary.server = primary - - return nil -} diff --git a/internal/praefect/datastore_memory_test.go b/internal/praefect/datastore_memory_test.go index 6099a8328e5114912770508a2829062a84070a8f..a337b9995ee3955b4bdd4444f897c3a0f7d2a3a0 100644 --- a/internal/praefect/datastore_memory_test.go +++ b/internal/praefect/datastore_memory_test.go @@ -1,93 +1,109 @@ -package praefect_test +package praefect import ( "testing" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/internal/praefect" - "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" ) // TestMemoryDatastoreWhitelist verifies that the in-memory datastore will -// populate itself with the correct replication jobs and shards when initialized +// populate itself with the correct replication jobs and repositories when initialized // with a configuration file specifying the shard and whitelisted repositories. func TestMemoryDatastoreWhitelist(t *testing.T) { - cfg := config.Config{ - PrimaryServer: &models.GitalyServer{ - Name: "default", - }, - SecondaryServers: []*models.GitalyServer{ - { - Name: "backup-1", - }, - { - Name: "backup-2", - }, - }, - Whitelist: []string{"abcd1234", "5678efgh"}, + repo1 := models.Repository{ + RelativePath: "abcd1234", } - mds := praefect.NewMemoryDatastore(cfg) + repo2 := models.Repository{ + RelativePath: "5678efgh", + } + mds := NewMemoryDatastore() - repo1 := models.Repository{ - RelativePath: cfg.Whitelist[0], - Storage: cfg.PrimaryServer.Name, + mds.storageNodes.m[1] = models.StorageNode{ + ID: 1, + Address: "tcp://default", + Storage: "praefect-internal-1", + } + mds.storageNodes.m[2] = models.StorageNode{ + ID: 2, + Address: "tcp://backup-1", + Storage: "praefect-internal-2", + } + mds.storageNodes.m[3] = models.StorageNode{ + ID: 3, + Address: "tcp://backup-2", + Storage: "praefect-internal-3", + } + mds.repositories.m[repo1.RelativePath] = models.Repository{ + RelativePath: repo1.RelativePath, + Primary: mds.storageNodes.m[1], + Replicas: []models.StorageNode{mds.storageNodes.m[2], mds.storageNodes.m[3]}, + } + mds.repositories.m[repo2.RelativePath] = models.Repository{ + RelativePath: repo2.RelativePath, + Primary: mds.storageNodes.m[1], + Replicas: []models.StorageNode{mds.storageNodes.m[2], mds.storageNodes.m[3]}, } - repo2 := models.Repository{ - RelativePath: cfg.Whitelist[1], - Storage: cfg.PrimaryServer.Name, + for _, repo := range []models.Repository{repo1, repo2} { + jobIDs, err := mds.CreateReplicaReplJobs(repo.RelativePath) + require.NoError(t, err) + require.Len(t, jobIDs, 2) } - expectSecondaries := []models.GitalyServer{ - models.GitalyServer{Name: cfg.SecondaryServers[0].Name}, - models.GitalyServer{Name: cfg.SecondaryServers[1].Name}, + expectReplicas := []models.StorageNode{ + mds.storageNodes.m[2], + mds.storageNodes.m[3], } for _, repo := range []models.Repository{repo1, repo2} { - actualSecondaries, err := mds.GetShardSecondaries(repo) + actualReplicas, err := mds.GetReplicas(repo.RelativePath) require.NoError(t, err) - require.ElementsMatch(t, expectSecondaries, actualSecondaries) + require.ElementsMatch(t, expectReplicas, actualReplicas) } - backup1 := cfg.SecondaryServers[0] - backup2 := cfg.SecondaryServers[1] + backup1 := mds.storageNodes.m[2] + backup2 := mds.storageNodes.m[3] - backup1ExpectedJobs := []praefect.ReplJob{ - praefect.ReplJob{ - ID: 1, - Target: backup1.Name, - Source: repo1, - State: praefect.JobStateReady, + backup1ExpectedJobs := []ReplJob{ + ReplJob{ + ID: 1, + TargetNodeID: backup1.ID, + Source: models.Repository{RelativePath: repo1.RelativePath}, + SourceStorage: "praefect-internal-1", + State: JobStatePending, }, - praefect.ReplJob{ - ID: 3, - Target: backup1.Name, - Source: repo2, - State: praefect.JobStateReady, + ReplJob{ + ID: 3, + TargetNodeID: backup1.ID, + Source: models.Repository{RelativePath: repo2.RelativePath}, + SourceStorage: "praefect-internal-1", + State: JobStatePending, }, } - backup2ExpectedJobs := []praefect.ReplJob{ - praefect.ReplJob{ - ID: 2, - Target: backup2.Name, - Source: repo1, - State: praefect.JobStateReady, + backup2ExpectedJobs := []ReplJob{ + ReplJob{ + ID: 2, + TargetNodeID: backup2.ID, + Source: models.Repository{RelativePath: repo1.RelativePath}, + SourceStorage: "praefect-internal-1", + State: JobStatePending, }, - praefect.ReplJob{ - ID: 4, - Target: backup2.Name, - Source: repo2, - State: praefect.JobStateReady, + ReplJob{ + ID: 4, + TargetNodeID: backup2.ID, + Source: models.Repository{RelativePath: repo2.RelativePath}, + SourceStorage: "praefect-internal-1", + State: JobStatePending, }, } - backup1ActualJobs, err := mds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, backup1.Name, 10) + backup1ActualJobs, err := mds.GetJobs(JobStatePending|JobStateReady, backup1.ID, 10) require.NoError(t, err) require.Equal(t, backup1ExpectedJobs, backup1ActualJobs) - backup2ActualJobs, err := mds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, backup2.Name, 10) + backup2ActualJobs, err := mds.GetJobs(JobStatePending|JobStateReady, backup2.ID, 10) require.NoError(t, err) require.Equal(t, backup2ActualJobs, backup2ExpectedJobs) diff --git a/internal/praefect/datastore_test.go b/internal/praefect/datastore_test.go index 417a04be2e901566340f72b050821c8329cc9730..320a401033d70f9a55c38b355977fc67290ade26 100644 --- a/internal/praefect/datastore_test.go +++ b/internal/praefect/datastore_test.go @@ -1,95 +1,104 @@ -package praefect_test +package praefect import ( "testing" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/internal/praefect" - "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" ) -const ( - stor1 = "default" // usually the primary storage location - stor2 = "backup-1" // usually the seoncary storage location +var ( + stor1 = models.StorageNode{ + ID: 1, + Address: "tcp://address-1", + Storage: "praefect-storage-1", + } + stor2 = models.StorageNode{ + ID: 2, + Address: "tcp://address-2", + Storage: "praefect-storage-2", + } proj1 = "abcd1234" // imagine this is a legit project hash ) var ( - repo1Primary = models.Repository{ + repo1Repository = models.Repository{ RelativePath: proj1, - Storage: stor1, } ) var operations = []struct { desc string - opFn func(*testing.T, praefect.Datastore) + opFn func(*testing.T, Datastore) }{ { desc: "query an empty datastore", - opFn: func(t *testing.T, ds praefect.Datastore) { - jobs, err := ds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, stor1, 1) + opFn: func(t *testing.T, ds Datastore) { + jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor1.ID, 1) require.NoError(t, err) require.Len(t, jobs, 0) }, }, { - desc: "insert first replication job before secondary mapped to primary", - opFn: func(t *testing.T, ds praefect.Datastore) { - _, err := ds.CreateSecondaryReplJobs(repo1Primary) - require.Error(t, err, praefect.ErrInvalidReplTarget) + desc: "creating replication jobs before secondaries are added results in no jobs added", + opFn: func(t *testing.T, ds Datastore) { + jobIDs, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath) + require.NoError(t, err) + require.Empty(t, jobIDs) }, }, { desc: "set the primary for the shard", - opFn: func(t *testing.T, ds praefect.Datastore) { - err := ds.SetShardPrimary(repo1Primary, models.GitalyServer{Name: stor1}) + opFn: func(t *testing.T, ds Datastore) { + err := ds.SetPrimary(repo1Repository.RelativePath, stor1.ID) require.NoError(t, err) }, }, { - desc: "associate the replication job target with a primary", - opFn: func(t *testing.T, ds praefect.Datastore) { - err := ds.SetShardSecondaries(repo1Primary, []models.GitalyServer{models.GitalyServer{Name: stor2}}) + desc: "add a secondary replica for the shard", + opFn: func(t *testing.T, ds Datastore) { + err := ds.AddReplica(repo1Repository.RelativePath, stor2.ID) require.NoError(t, err) }, }, { desc: "insert first replication job after secondary mapped to primary", - opFn: func(t *testing.T, ds praefect.Datastore) { - ids, err := ds.CreateSecondaryReplJobs(repo1Primary) + opFn: func(t *testing.T, ds Datastore) { + ids, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath) require.NoError(t, err) require.Equal(t, []uint64{1}, ids) }, }, { desc: "fetch inserted replication jobs after primary mapped", - opFn: func(t *testing.T, ds praefect.Datastore) { - jobs, err := ds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, stor2, 10) + opFn: func(t *testing.T, ds Datastore) { + jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor2.ID, 10) require.NoError(t, err) require.Len(t, jobs, 1) - expectedJob := praefect.ReplJob{ - ID: 1, - Source: repo1Primary, - Target: stor2, - State: praefect.JobStatePending, + expectedJob := ReplJob{ + ID: 1, + Source: models.Repository{ + RelativePath: repo1Repository.RelativePath, + }, + SourceStorage: "praefect-storage-1", + TargetNodeID: stor2.ID, + State: JobStatePending, } require.Equal(t, expectedJob, jobs[0]) }, }, { desc: "mark replication job done", - opFn: func(t *testing.T, ds praefect.Datastore) { - err := ds.UpdateReplJob(1, praefect.JobStateComplete) + opFn: func(t *testing.T, ds Datastore) { + err := ds.UpdateReplJob(1, JobStateComplete) require.NoError(t, err) }, }, { desc: "try fetching completed replication job", - opFn: func(t *testing.T, ds praefect.Datastore) { - jobs, err := ds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, stor1, 1) + opFn: func(t *testing.T, ds Datastore) { + jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor1.ID, 1) require.NoError(t, err) require.Len(t, jobs, 0) }, @@ -97,14 +106,15 @@ var operations = []struct { } // TODO: add SQL datastore flavor -var flavors = map[string]func() praefect.Datastore{ - "in-memory-datastore": func() praefect.Datastore { - return praefect.NewMemoryDatastore( - config.Config{ - PrimaryServer: &models.GitalyServer{ - Name: "default", - }, - }) +var flavors = map[string]func() Datastore{ + "in-memory-datastore": func() Datastore { + ds := NewMemoryDatastore() + + ds.repositories.m[repo1Repository.RelativePath] = repo1Repository + ds.storageNodes.m[stor1.ID] = stor1 + ds.storageNodes.m[stor2.ID] = stor2 + + return ds }, } diff --git a/internal/praefect/mock/mock.pb.go b/internal/praefect/mock/mock.pb.go index b8a8afb01224eed093a8e7a8429f1bccd1873092..f6d19f296e02211fa1c726952d823e273e23c72f 100644 --- a/internal/praefect/mock/mock.pb.go +++ b/internal/praefect/mock/mock.pb.go @@ -9,7 +9,10 @@ import ( math "math" proto "github.com/golang/protobuf/proto" + _ "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb" grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" ) // Reference imports to suppress errors if they are not otherwise used. @@ -109,16 +112,17 @@ func init() { func init() { proto.RegisterFile("mock/mock.proto", fileDescriptor_5ed43251284e3118) } var fileDescriptor_5ed43251284e3118 = []byte{ - // 139 bytes of a gzipped FileDescriptorProto + // 157 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcf, 0xcd, 0x4f, 0xce, - 0xd6, 0x07, 0x11, 0x7a, 0x05, 0x45, 0xf9, 0x25, 0xf9, 0x42, 0x2c, 0x20, 0xb6, 0x92, 0x2a, 0x17, - 0x6f, 0x70, 0x66, 0x6e, 0x41, 0x4e, 0x6a, 0x50, 0x6a, 0x61, 0x69, 0x6a, 0x71, 0x89, 0x90, 0x08, - 0x17, 0x6b, 0x59, 0x62, 0x4e, 0x69, 0xaa, 0x04, 0xa3, 0x02, 0xa3, 0x06, 0x6b, 0x10, 0x84, 0xa3, - 0xa4, 0xc6, 0xc5, 0x07, 0x53, 0x56, 0x5c, 0x90, 0x9f, 0x57, 0x9c, 0x8a, 0x50, 0xc7, 0x84, 0xa4, - 0xce, 0x28, 0x00, 0x66, 0x5c, 0x70, 0x6a, 0x51, 0x59, 0x66, 0x72, 0xaa, 0x90, 0x3d, 0x97, 0x00, - 0x44, 0x20, 0x34, 0x2f, 0xb1, 0xa8, 0x12, 0x4c, 0x08, 0x09, 0xeb, 0x81, 0x9d, 0x81, 0x62, 0xaf, - 0x94, 0x08, 0xaa, 0x20, 0xc4, 0x16, 0x25, 0x86, 0x24, 0x36, 0xb0, 0x6b, 0x8d, 0x01, 0x01, 0x00, - 0x00, 0xff, 0xff, 0xb7, 0xeb, 0x46, 0xfb, 0xc0, 0x00, 0x00, 0x00, + 0xd6, 0x07, 0x11, 0x7a, 0x05, 0x45, 0xf9, 0x25, 0xf9, 0x42, 0x2c, 0x20, 0xb6, 0x14, 0x4f, 0x71, + 0x46, 0x62, 0x51, 0x6a, 0x0a, 0x44, 0x4c, 0x49, 0x95, 0x8b, 0x37, 0x38, 0x33, 0xb7, 0x20, 0x27, + 0x35, 0x28, 0xb5, 0xb0, 0x34, 0xb5, 0xb8, 0x44, 0x48, 0x84, 0x8b, 0xb5, 0x2c, 0x31, 0xa7, 0x34, + 0x55, 0x82, 0x51, 0x81, 0x51, 0x83, 0x35, 0x08, 0xc2, 0x51, 0x52, 0xe3, 0xe2, 0x83, 0x29, 0x2b, + 0x2e, 0xc8, 0xcf, 0x2b, 0x4e, 0x45, 0xa8, 0x63, 0x42, 0x52, 0x67, 0x14, 0x01, 0x33, 0x2e, 0x38, + 0xb5, 0xa8, 0x2c, 0x33, 0x39, 0x55, 0xc8, 0x9d, 0x4b, 0x00, 0x22, 0x10, 0x9a, 0x97, 0x58, 0x54, + 0x09, 0x26, 0x84, 0x84, 0xf5, 0xc0, 0x8e, 0x42, 0xb1, 0x57, 0x4a, 0x04, 0x55, 0x10, 0x62, 0x8b, + 0x12, 0xc7, 0xaf, 0xe9, 0x1a, 0x2c, 0x1c, 0x4c, 0x02, 0x8c, 0x49, 0x6c, 0x60, 0xf7, 0x1a, 0x03, + 0x02, 0x00, 0x00, 0xff, 0xff, 0xb5, 0x14, 0x6a, 0x14, 0xd6, 0x00, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -160,6 +164,14 @@ type SimpleServiceServer interface { SimpleUnaryUnary(context.Context, *SimpleRequest) (*SimpleResponse, error) } +// UnimplementedSimpleServiceServer can be embedded to have forward compatible implementations. +type UnimplementedSimpleServiceServer struct { +} + +func (*UnimplementedSimpleServiceServer) SimpleUnaryUnary(ctx context.Context, req *SimpleRequest) (*SimpleResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method SimpleUnaryUnary not implemented") +} + func RegisterSimpleServiceServer(s *grpc.Server, srv SimpleServiceServer) { s.RegisterService(&_SimpleService_serviceDesc, srv) } diff --git a/internal/praefect/mock/mock.proto b/internal/praefect/mock/mock.proto index aa6ec842a113040c1cf93d0610c579d30865b2ad..59e79d3b91d56781672683ece874f39bd8ef2a26 100644 --- a/internal/praefect/mock/mock.proto +++ b/internal/praefect/mock/mock.proto @@ -7,6 +7,8 @@ syntax = "proto3"; package mock; +import "shared.proto"; + message SimpleRequest { int32 value = 1; } @@ -17,7 +19,10 @@ message SimpleResponse { service SimpleService { // SimpleUnaryUnary is a simple unary request with unary response rpc SimpleUnaryUnary(SimpleRequest) returns (SimpleResponse) { - option (gitaly.op_type).op = ACCESSOR; + option (gitaly.op_type) = { + op: ACCESSOR + scope_level: SERVER + }; } } diff --git a/internal/praefect/mocksvc_test.go b/internal/praefect/mocksvc_test.go index f6e01811b44212dff133ad4c6018c5bc308b1176..adcf7a65e4b589304aa5b2c47944f271fa5bb355 100644 --- a/internal/praefect/mocksvc_test.go +++ b/internal/praefect/mocksvc_test.go @@ -1,4 +1,4 @@ -package praefect_test +package praefect import ( "context" @@ -11,7 +11,7 @@ type simpleUnaryUnaryCallback func(context.Context, *mock.SimpleRequest) (*mock. // mockSvc is an implementation of mock.SimpleServer for testing purposes. The // gRPC stub can be updated via go generate: // -//go:generate protoc --go_out=plugins=grpc:. mock/mock.proto +//go:generate protoc --go_out=plugins=grpc:. -I../../proto -I./ mock/mock.proto //go:generate goimports -w mock/mock.pb.go type mockSvc struct { simpleUnaryUnary simpleUnaryUnaryCallback diff --git a/internal/praefect/models/node.go b/internal/praefect/models/node.go new file mode 100644 index 0000000000000000000000000000000000000000..db3224c4c995fd4ada9461942518120dc5bf8878 --- /dev/null +++ b/internal/praefect/models/node.go @@ -0,0 +1,17 @@ +package models + +// StorageNode describes an address that serves a storage +type StorageNode struct { + ID int + Storage string `toml:"storage"` + Address string `toml:"address"` + Token string `toml:"token"` +} + +// Repository describes a repository's relative path and its primary and list of secondaries +type Repository struct { + ID int + RelativePath string + Primary StorageNode + Replicas []StorageNode +} diff --git a/internal/praefect/models/nodes.go b/internal/praefect/models/nodes.go deleted file mode 100644 index 854254d8773843a2b3b4e60dc6d61da6354fb7e0..0000000000000000000000000000000000000000 --- a/internal/praefect/models/nodes.go +++ /dev/null @@ -1,8 +0,0 @@ -package models - -// GitalyServer allows configuring the servers that RPCs are proxied to -type GitalyServer struct { - Name string `toml:"name"` - ListenAddr string `toml:"listen_addr" split_words:"true"` - Token string `toml:"token"` -} diff --git a/internal/praefect/models/repository.go b/internal/praefect/models/repository.go deleted file mode 100644 index e11cdbf0a7916ff9c9a160e8f946458a9b273ed8..0000000000000000000000000000000000000000 --- a/internal/praefect/models/repository.go +++ /dev/null @@ -1,8 +0,0 @@ -package models - -// Repository provides all necessary information to address a repository hosted -// in a specific Gitaly replica -type Repository struct { - RelativePath string `toml:"relative_path"` // relative path of repository - Storage string `toml:"storage"` // storage location, e.g. default -} diff --git a/internal/praefect/protoregistry/protoregistry.go b/internal/praefect/protoregistry/protoregistry.go index 909b2e2b2efdb907c299c392f07acda781bbb8b8..9817904dc68a44d1b8c7c65d692dd86ae11ff46e 100644 --- a/internal/praefect/protoregistry/protoregistry.go +++ b/internal/praefect/protoregistry/protoregistry.go @@ -43,11 +43,24 @@ const ( OpMutator ) +// Scope represents the scope for an RPC method +type Scope int + +const ( + // ScopeRepository = repository scope + ScopeRepository = iota + // ScopeStorage = storage scope + ScopeStorage + // ScopeServer = serer scope + ScopeServer +) + // MethodInfo contains metadata about the RPC method. Refer to documentation // for message type "OperationMsg" shared.proto in gitlab-org/gitaly-proto for // more documentation. type MethodInfo struct { Operation OpType + Scope Scope targetRepo []int requestName string // protobuf message name for input type requestFactory protoFactory @@ -55,13 +68,6 @@ type MethodInfo struct { // TargetRepo returns the target repository for a protobuf message if it exists func (mi MethodInfo) TargetRepo(msg proto.Message) (*gitalypb.Repository, error) { - if mi.requestName != proto.MessageName(msg) { - return nil, fmt.Errorf( - "proto message %s does not match expected RPC request message %s", - proto.MessageName(msg), mi.requestName, - ) - } - return reflectFindRepoTarget(msg, mi.targetRepo) } @@ -179,6 +185,11 @@ func parseMethodInfo(methodDesc *descriptor.MethodDescriptorProto) (MethodInfo, return MethodInfo{}, err } + scope, err := parseScope(opMsg.GetScopeLevel()) + if err != nil { + return MethodInfo{}, err + } + // for some reason, the protobuf descriptor contains an extra dot in front // of the request name that the generated code does not. This trimming keeps // the two copies consistent for comparisons. @@ -194,9 +205,21 @@ func parseMethodInfo(methodDesc *descriptor.MethodDescriptorProto) (MethodInfo, targetRepo: targetRepo, requestName: requestName, requestFactory: reqFactory, + Scope: scope, }, nil } +func parseScope(scope gitalypb.OperationMsg_Scope) (Scope, error) { + switch scope { + case gitalypb.OperationMsg_REPOSITORY: + return ScopeRepository, nil + case gitalypb.OperationMsg_SERVER: + return ScopeServer, nil + } + + return ScopeRepository, errors.New("scope not found") +} + // parses a string like "1.1" and returns a slice of ints func parseOID(rawFieldOID string) ([]int, error) { var fieldNos []int diff --git a/internal/praefect/protoregistry/targetrepo_test.go b/internal/praefect/protoregistry/targetrepo_test.go index 66b39e2ccb7247d99e5ba1d11f09a1f54381f010..9e2ee02bac948dab278cb38122e25722746fdcd5 100644 --- a/internal/praefect/protoregistry/targetrepo_test.go +++ b/internal/praefect/protoregistry/targetrepo_test.go @@ -56,7 +56,7 @@ func TestProtoRegistryTargetRepo(t *testing.T) { svc: "RepositoryService", method: "RepackIncremental", pbMsg: &gitalypb.RepackIncrementalResponse{}, - expectErr: errors.New("proto message gitaly.RepackIncrementalResponse does not match expected RPC request message gitaly.RepackIncrementalRequest"), + expectErr: errors.New("unable to descend OID [1] into message gitaly.RepackIncrementalResponse: unable to find protobuf field 1 in message RepackIncrementalResponse"), }, { desc: "target nested in oneOf", diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index d3244619d1ee45077e6b4a9ae7770ddae64d3f62..c854d8ac25995a27870b8a8c75c4b65e94b5287d 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -8,32 +8,33 @@ import ( "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "google.golang.org/grpc" "github.com/sirupsen/logrus" ) // Replicator performs the actual replication logic between two nodes type Replicator interface { - Replicate(ctx context.Context, source models.Repository, target Node) error + Replicate(ctx context.Context, source models.Repository, sourceStorage, targetStorage string, target *grpc.ClientConn) error } type defaultReplicator struct { log *logrus.Logger } -func (dr defaultReplicator) Replicate(ctx context.Context, source models.Repository, target Node) error { +func (dr defaultReplicator) Replicate(ctx context.Context, source models.Repository, sourceStorage, targetStorage string, target *grpc.ClientConn) error { repository := &gitalypb.Repository{ - StorageName: target.Storage, + StorageName: targetStorage, RelativePath: source.RelativePath, } remoteRepository := &gitalypb.Repository{ - StorageName: source.Storage, + StorageName: sourceStorage, RelativePath: source.RelativePath, } - repositoryClient := gitalypb.NewRepositoryServiceClient(target.cc) - remoteClient := gitalypb.NewRemoteServiceClient(target.cc) + repositoryClient := gitalypb.NewRepositoryServiceClient(target) + remoteClient := gitalypb.NewRemoteServiceClient(target) // CreateRepository is idempotent if _, err := repositoryClient.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{ @@ -60,7 +61,8 @@ func (dr defaultReplicator) Replicate(ctx context.Context, source models.Reposit // ReplMgr is a replication manager for handling replication jobs type ReplMgr struct { log *logrus.Logger - dataStore Datastore + replicasDS ReplicasDatastore + replJobsDS ReplJobsDatastore coordinator *Coordinator targetNode string // which replica is this replicator responsible for? replicator Replicator // does the actual replication logic @@ -74,10 +76,11 @@ type ReplMgrOpt func(*ReplMgr) // NewReplMgr initializes a replication manager with the provided dependencies // and options -func NewReplMgr(targetNode string, log *logrus.Logger, ds Datastore, c *Coordinator, opts ...ReplMgrOpt) ReplMgr { +func NewReplMgr(targetNode string, log *logrus.Logger, replicasDS ReplicasDatastore, jobsDS ReplJobsDatastore, c *Coordinator, opts ...ReplMgrOpt) ReplMgr { r := ReplMgr{ log: log, - dataStore: ds, + replicasDS: replicasDS, + replJobsDS: jobsDS, whitelist: map[string]struct{}{}, replicator: defaultReplicator{log}, targetNode: targetNode, @@ -118,7 +121,7 @@ func (r ReplMgr) ScheduleReplication(ctx context.Context, repo models.Repository return nil } - id, err := r.dataStore.CreateSecondaryReplJobs(repo) + id, err := r.replJobsDS.CreateReplicaReplJobs(repo.RelativePath) if err != nil { return err } @@ -140,58 +143,63 @@ const ( // ProcessBacklog will process queued jobs. It will block while processing jobs. func (r ReplMgr) ProcessBacklog(ctx context.Context) error { for { - jobs, err := r.dataStore.GetJobs(JobStatePending|JobStateReady, r.targetNode, 10) + nodes, err := r.replicasDS.GetStorageNodes() if err != nil { - return err + return nil } - if len(jobs) == 0 { - r.log.Tracef("no jobs for %s, checking again in %s", r.targetNode, jobFetchInterval) - - select { - // TODO: exponential backoff when no queries are returned - case <-time.After(jobFetchInterval): - continue - - case <-ctx.Done(): - return ctx.Err() - } - } - - r.log.Debugf("fetched replication jobs: %#v", jobs) - - for _, job := range jobs { - r.log.WithField(logWithReplJobID, job.ID). - Infof("processing replication job %#v", job) - node, err := r.coordinator.GetStorageNode(job.Target) - r.log.WithField(logWithReplJobID, job.ID).Infof("got storage node? %+v %v", node, err) + for _, node := range nodes { + jobs, err := r.replJobsDS.GetJobs(JobStatePending|JobStateReady, node.ID, 10) if err != nil { return err } - if err := r.dataStore.UpdateReplJob(job.ID, JobStateInProgress); err != nil { - return err - } + if len(jobs) == 0 { + r.log.Tracef("no jobs for %s, checking again in %s", r.targetNode, jobFetchInterval) - primary, err := r.dataStore.GetShardPrimary(job.Source) - if err != nil { - return err - } - - ctx, err = helper.InjectGitalyServers(ctx, job.Source.Storage, primary.ListenAddr, primary.Token) - if err != nil { - return err - } + select { + // TODO: exponential backoff when no queries are returned + case <-time.After(jobFetchInterval): + continue - if err := r.replicator.Replicate(ctx, job.Source, node); err != nil { - r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating") - return err + case <-ctx.Done(): + return ctx.Err() + } } - r.log.WithField(logWithReplJobID, job.ID). - Info("completed replication") - if err := r.dataStore.UpdateReplJob(job.ID, JobStateComplete); err != nil { - return err + r.log.WithField("node", node).Debugf("fetched replication jobs: %#v", jobs) + + for _, job := range jobs { + r.log.WithField(logWithReplJobID, job.ID). + Infof("processing replication job %#v", job) + node, err := r.replicasDS.GetStorageNode(job.TargetNodeID) + if err != nil { + return err + } + r.log.WithField(logWithReplJobID, job.ID).WithField("storage", node).Info("got storage") + + if err := r.replJobsDS.UpdateReplJob(job.ID, JobStateInProgress); err != nil { + return err + } + + ctx, err = helper.InjectGitalyServers(ctx, "default", node.Address, "") + if err != nil { + return err + } + + cc, err := r.coordinator.GetConnection(node.Storage) + if err != nil { + return err + } + + if err := r.replicator.Replicate(ctx, job.Source, job.SourceStorage, node.Storage, cc); err != nil { + r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating") + return err + } + + if err := r.replJobsDS.UpdateReplJob(job.ID, JobStateComplete); err != nil { + return err + } } } } diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 1294bc9890f2c8906f819f8798e7a04b8c9a6cd3..0e8a75a47411d6482dbe0fc0a428b0e794c7eaa6 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -18,7 +18,6 @@ import ( gitalyauth "gitlab.com/gitlab-org/gitaly/auth" gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/helper" - "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" serverPkg "gitlab.com/gitlab-org/gitaly/internal/server" @@ -28,43 +27,53 @@ import ( // TestReplicatorProcessJobs verifies that a replicator will schedule jobs for // all whitelisted repos func TestReplicatorProcessJobsWhitelist(t *testing.T) { - var ( - cfg = config.Config{ - PrimaryServer: &models.GitalyServer{ - Name: "default", - ListenAddr: "tcp://gitaly-primary.example.com", - }, - SecondaryServers: []*models.GitalyServer{ - { - Name: "backup1", - ListenAddr: "tcp://gitaly-backup1.example.com", - }, - { - Name: "backup2", - ListenAddr: "tcp://gitaly-backup2.example.com", - }, - }, - Whitelist: []string{"abcd1234", "edfg5678"}, - } - datastore = NewMemoryDatastore(cfg) - coordinator = NewCoordinator(logrus.New(), datastore) - resultsCh = make(chan result) - replman = NewReplMgr( - cfg.SecondaryServers[1].Name, - logrus.New(), - datastore, - coordinator, - WithWhitelist(cfg.Whitelist), - WithReplicator(&mockReplicator{resultsCh}), - ) + datastore := NewMemoryDatastore() + datastore.storageNodes.m[1] = models.StorageNode{ + ID: 1, + Address: "tcp://gitaly-primary.example.com", + Storage: "praefect-internal-1", + } + datastore.storageNodes.m[2] = models.StorageNode{ + ID: 2, + Address: "tcp://gitaly-backup1.example.com", + Storage: "praefect-internal-2", + } + datastore.storageNodes.m[3] = models.StorageNode{ + ID: 3, + Address: "tcp://gitaly-backup2.example.com", + Storage: "praefect-internal-3", + } + + datastore.repositories.m["abcd1234"] = models.Repository{ + RelativePath: "abcd1234", + Primary: datastore.storageNodes.m[1], + Replicas: []models.StorageNode{datastore.storageNodes.m[2], datastore.storageNodes.m[3]}, + } + datastore.repositories.m["edfg5678"] = models.Repository{ + RelativePath: "edfg5678", + Primary: datastore.storageNodes.m[1], + Replicas: []models.StorageNode{datastore.storageNodes.m[2], datastore.storageNodes.m[3]}, + } + + for _, repo := range []string{"abcd1234", "edfg5678"} { + jobIDs, err := datastore.CreateReplicaReplJobs(repo) + require.NoError(t, err) + require.Len(t, jobIDs, 2) + } + + coordinator := NewCoordinator(logrus.New(), datastore) + resultsCh := make(chan result) + replman := NewReplMgr( + "default", + logrus.New(), + datastore, + datastore, + coordinator, + WithReplicator(&mockReplicator{resultsCh}), ) - for _, node := range []*models.GitalyServer{ - cfg.PrimaryServer, - cfg.SecondaryServers[0], - cfg.SecondaryServers[1], - } { - err := coordinator.RegisterNode(node.Name, node.ListenAddr) + for _, node := range datastore.storageNodes.m { + err := coordinator.RegisterNode(node.Storage, node.Address) require.NoError(t, err) } @@ -78,14 +87,27 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) { success := make(chan struct{}) + var expectedResults []result + // we expect one job per whitelisted repo with each backend server + for _, shard := range datastore.repositories.m { + for _, secondary := range shard.Replicas { + cc, err := coordinator.GetConnection(secondary.Storage) + require.NoError(t, err) + expectedResults = append(expectedResults, + result{source: models.Repository{RelativePath: shard.RelativePath}, + targetStorage: secondary.Storage, + targetCC: cc, + }) + } + } + go func() { // we expect one job per whitelisted repo with each backend server - for i := 0; i < len(cfg.Whitelist); i++ { - result := <-resultsCh - - assert.Contains(t, cfg.Whitelist, result.source.RelativePath) - assert.Equal(t, cfg.SecondaryServers[1].Name, result.target.Storage) - assert.Equal(t, cfg.PrimaryServer.Name, result.source.Storage) + for _, shard := range datastore.repositories.m { + for range shard.Replicas { + result := <-resultsCh + assert.Contains(t, expectedResults, result) + } } cancel() @@ -106,18 +128,19 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) { } type result struct { - source models.Repository - target Node + source models.Repository + targetStorage string + targetCC *grpc.ClientConn } type mockReplicator struct { resultsCh chan<- result } -func (mr *mockReplicator) Replicate(ctx context.Context, source models.Repository, target Node) error { +func (mr *mockReplicator) Replicate(ctx context.Context, source models.Repository, sourceStorage, targetStorage string, target *grpc.ClientConn) error { select { - case mr.resultsCh <- result{source, target}: + case mr.resultsCh <- result{source, targetStorage, target}: return nil case <-ctx.Done(): @@ -178,11 +201,11 @@ func TestReplicate(t *testing.T) { var replicator defaultReplicator require.NoError(t, replicator.Replicate( ctx, - models.Repository{Storage: "default", RelativePath: testRepo.GetRelativePath()}, - Node{ - cc: conn, - Storage: backupStorageName, - })) + models.Repository{RelativePath: testRepo.GetRelativePath()}, + "default", + backupStorageName, + conn, + )) replicatedPath := filepath.Join(backupDir, filepath.Base(testRepoPath)) testhelper.MustRunCommand(t, nil, "git", "-C", replicatedPath, "cat-file", "-e", commitID) diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 915d7281ad73a65f2664a722d7b41bec0e76c963..656196e65a51da7baf9a9d97b081d9ce68703ee9 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -1,4 +1,4 @@ -package praefect_test +package praefect import ( "context" @@ -7,14 +7,14 @@ import ( "testing" "time" + "github.com/golang/protobuf/proto" "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/client" - "gitlab.com/gitlab-org/gitaly/internal/praefect" - "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/internal/praefect/mock" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "google.golang.org/grpc" ) @@ -44,26 +44,47 @@ func TestServerSimpleUnaryUnary(t *testing.T) { }, } + gz := proto.FileDescriptor("mock/mock.proto") + fd, err := protoregistry.ExtractFileDescriptor(gz) + if err != nil { + panic(err) + } + for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { const ( storagePrimary = "default" - storageBackup = "backup" ) - datastore := praefect.NewMemoryDatastore(config.Config{ - PrimaryServer: &models.GitalyServer{ - Name: "default", - }, - }) - coordinator := praefect.NewCoordinator(logrus.New(), datastore) - replmgr := praefect.NewReplMgr( + datastore := NewMemoryDatastore() + datastore.storageNodes.m[1] = models.StorageNode{ + ID: 1, + Storage: "praefect-internal-1", + } + datastore.storageNodes.m[2] = models.StorageNode{ + ID: 2, + Storage: "praefect-internal-2", + } + + coordinator := NewCoordinator(logrus.New(), datastore, fd) + + for id, nodeStorage := range datastore.storageNodes.m { + backend, cleanup := newMockDownstream(t, tt.callback) + defer cleanup() // clean up mock downstream server resources + + coordinator.RegisterNode(nodeStorage.Storage, backend) + nodeStorage.Address = backend + datastore.storageNodes.m[id] = nodeStorage + } + + replmgr := NewReplMgr( storagePrimary, logrus.New(), datastore, + datastore, coordinator, ) - prf := praefect.NewServer( + prf := NewServer( coordinator, replmgr, nil, @@ -85,13 +106,6 @@ func TestServerSimpleUnaryUnary(t *testing.T) { defer cc.Close() cli := mock.NewSimpleServiceClient(cc) - for _, replica := range []string{storagePrimary, storageBackup} { - backend, cleanup := newMockDownstream(t, tt.callback) - defer cleanup() // clean up mock downstream server resources - - coordinator.RegisterNode(replica, backend) - } - ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel()