From 6819c6dcf9f2fd7b19e2e0e0bf7b166edc2a08e9 Mon Sep 17 00:00:00 2001 From: John Cai Date: Thu, 27 Jun 2019 15:29:19 -0700 Subject: [PATCH 1/2] Naive failover handler --- cmd/praefect/main.go | 4 ++ config.praefect.toml.example | 4 ++ internal/praefect/admin/failover.go | 70 +++++++++++++++++++++++++++++ internal/praefect/admin/server.go | 33 ++++++++++++++ internal/praefect/config/config.go | 6 +++ internal/praefect/coordinator.go | 29 +++++++++--- 6 files changed, 140 insertions(+), 6 deletions(-) create mode 100644 internal/praefect/admin/failover.go create mode 100644 internal/praefect/admin/server.go diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index b4efeeae96d..f499e2dd209 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -16,6 +16,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/praefect" + "gitlab.com/gitlab-org/gitaly/internal/praefect/admin" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/labkit/tracing" ) @@ -92,6 +93,9 @@ func run(listeners []net.Listener, conf config.Config) error { serverErrors = make(chan error, 1) ) + adminSrv := admin.NewServer(conf.Admin.Port, datastore, coordinator, conf) + go adminSrv.ListenAndServe() + signal.Notify(termCh, signals...) for _, l := range listeners { diff --git a/config.praefect.toml.example b/config.praefect.toml.example index 59e7563f1db..a9a8eee22fa 100644 --- a/config.praefect.toml.example +++ b/config.praefect.toml.example @@ -34,3 +34,7 @@ listen_addr = "127.0.0.1:2305" # [[secondary_server]] # name = "backup" # listen_addr = "tcp://gitaly-backup2.example.com" + + +[praefect_admin] + port = 9093 \ No newline at end of file diff --git a/internal/praefect/admin/failover.go b/internal/praefect/admin/failover.go new file mode 100644 index 00000000000..131270f141b --- /dev/null +++ b/internal/praefect/admin/failover.go @@ -0,0 +1,70 @@ +package admin + +import ( + "encoding/json" + "net/http" +) + +// FailoverRequest is a request for FailoverHandler +type FailoverRequest struct { + NewPrimary string `json:"new_primary"` +} + +// FailoverHandler takes a FailoverRequest and removes the current primary, and promotes a new primary +func (s *Server) FailoverHandler(w http.ResponseWriter, r *http.Request) { + var req FailoverRequest + + if r.Method != http.MethodPost { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + existingPrimary, err := s.d.GetPrimary() + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + if req.NewPrimary == existingPrimary { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("new_primary is already the primary")) + return + } + + var secondaries []string + var found bool + for _, secondary := range s.cfg.SecondaryServers { + if secondary.Name == req.NewPrimary { + found = true + continue + } + secondaries = append(secondaries, secondary.Name) + } + + if !found { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("new_primary is already the primary")) + return + } + + s.coordinator.Lock() + defer s.coordinator.Unlock() + + if err := s.d.SetPrimary(req.NewPrimary); err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + if err := s.coordinator.UnregisterNode(existingPrimary); err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) + return +} diff --git a/internal/praefect/admin/server.go b/internal/praefect/admin/server.go new file mode 100644 index 00000000000..30e6a6bbd9a --- /dev/null +++ b/internal/praefect/admin/server.go @@ -0,0 +1,33 @@ +package admin + +import ( + "fmt" + "net/http" + + "gitlab.com/gitlab-org/gitaly/internal/praefect" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" +) + +type Server struct { + d praefect.Datastore + *http.Server + cfg config.Config + coordinator *praefect.Coordinator +} + +// NewServer creates a new admin server +func NewServer(port int, d praefect.Datastore, coordinator *praefect.Coordinator, c config.Config) *Server { + s := &Server{ + d: d, + Server: &http.Server{ + Addr: fmt.Sprintf("0.0.0.0:%d", port), + }, + cfg: c, + coordinator: coordinator, + } + + router := http.NewServeMux() + router.Handle("/failover", http.HandlerFunc(s.FailoverHandler)) + + return s +} diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index 768104ed163..c009e0b1839 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -22,6 +22,7 @@ type Config struct { Logging config.Logging `toml:"logging"` PrometheusListenAddr string `toml:"prometheus_listen_addr"` + Admin *Admin `toml:"praefect_admin"` } // GitalyServer allows configuring the servers that RPCs are proxied to @@ -30,6 +31,11 @@ type GitalyServer struct { ListenAddr string `toml:"listen_addr" split_words:"true"` } +// Admin configures the admin service +type Admin struct { + Port int `toml:"port"` +} + // FromFile loads the config for the passed file path func FromFile(filePath string) (Config, error) { config := &Config{} diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index b2e6704d591..8f623a46584 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -22,8 +22,9 @@ import ( // downstream server. The coordinator is thread safe; concurrent calls to // register nodes are safe. type Coordinator struct { - log *logrus.Logger - lock sync.RWMutex + sync.Mutex + log *logrus.Logger + connLock sync.RWMutex datastore PrimaryDatastore @@ -66,6 +67,9 @@ func (c *Coordinator) GetStorageNode(storage string) (Node, error) { func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, _ proxy.StreamPeeker) (context.Context, *grpc.ClientConn, error) { // For phase 1, we need to route messages based on the storage location // to the appropriate Gitaly node. + c.Lock() + defer c.Unlock() + c.log.Debugf("Stream director received method %s", fullMethodName) storageName, err := c.datastore.GetPrimary() @@ -105,16 +109,29 @@ func (c *Coordinator) RegisterNode(storageName, listenAddr string) error { return nil } +// UnregisterNode removes a node and its connection +func (c *Coordinator) UnregisterNode(storageName string) error { + c.delConn(storageName) + + return nil +} + func (c *Coordinator) setConn(storageName string, conn *grpc.ClientConn) { - c.lock.Lock() + c.connLock.Lock() c.nodes[storageName] = conn - c.lock.Unlock() + c.connLock.Unlock() } func (c *Coordinator) getConn(storageName string) (*grpc.ClientConn, bool) { - c.lock.RLock() + c.connLock.RLock() cc, ok := c.nodes[storageName] - c.lock.RUnlock() + c.connLock.RUnlock() return cc, ok } + +func (c *Coordinator) delConn(storageName string) { + c.connLock.Lock() + delete(c.nodes, storageName) + c.connLock.Unlock() +} -- GitLab From 6e1925500e9592545179749d526522ecbe180926 Mon Sep 17 00:00:00 2001 From: John Cai Date: Thu, 27 Jun 2019 18:01:32 -0700 Subject: [PATCH 2/2] Add comment --- internal/praefect/admin/server.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/praefect/admin/server.go b/internal/praefect/admin/server.go index 30e6a6bbd9a..cbc6be31205 100644 --- a/internal/praefect/admin/server.go +++ b/internal/praefect/admin/server.go @@ -8,6 +8,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/config" ) +// Server is the struct for the admin server type Server struct { d praefect.Datastore *http.Server -- GitLab