diff --git a/changelogs/unreleased/more-graceful.yml b/changelogs/unreleased/more-graceful.yml new file mode 100644 index 0000000000000000000000000000000000000000..0b1d1e91754d55b597c225a328f428ee1475de77 --- /dev/null +++ b/changelogs/unreleased/more-graceful.yml @@ -0,0 +1,5 @@ +--- +title: Wait for all the socket to terminate during a graceful restart +merge_request: 1190 +author: +type: fixed diff --git a/cmd/gitaly/bootstrap.go b/cmd/gitaly/bootstrap.go deleted file mode 100644 index d3b464052822b82f1f59e9dd4a4f99331a38ca2c..0000000000000000000000000000000000000000 --- a/cmd/gitaly/bootstrap.go +++ /dev/null @@ -1,205 +0,0 @@ -package main - -import ( - "fmt" - "net" - "os" - "os/signal" - "syscall" - "time" - - "github.com/cloudflare/tableflip" - log "github.com/sirupsen/logrus" - "gitlab.com/gitlab-org/gitaly/internal/config" - "gitlab.com/gitlab-org/gitaly/internal/connectioncounter" - "gitlab.com/gitlab-org/gitaly/internal/rubyserver" - "gitlab.com/gitlab-org/gitaly/internal/server" - "google.golang.org/grpc" -) - -type bootstrap struct { - *tableflip.Upgrader - - insecureListeners []net.Listener - secureListeners []net.Listener - - serversErrors chan error -} - -// newBootstrap performs tableflip initialization -// -// first boot: -// * gitaly starts as usual, we will refer to it as p1 -// * newBootstrap will build a tableflip.Upgrader, we will refer to it as upg -// * sockets and files must be opened with upg.Fds -// * p1 will trap SIGHUP and invoke upg.Upgrade() -// * when ready to accept incoming connections p1 will call upg.Ready() -// * upg.Exit() channel will be closed when an upgrades completed successfully and the process must terminate -// -// graceful upgrade: -// * user replaces gitaly binary and/or config file -// * user sends SIGHUP to p1 -// * p1 will fork and exec the new gitaly, we will refer to it as p2 -// * from now on p1 will ignore other SIGHUP -// * if p2 terminates with a non-zero exit code, SIGHUP handling will be restored -// * p2 will follow the "first boot" sequence but upg.Fds will provide sockets and files from p1, when available -// * when p2 invokes upg.Ready() all the shared file descriptors not claimed by p2 will be closed -// * upg.Exit() channel in p1 will be closed now and p1 can gracefully terminate already accepted connections -// * upgrades cannot starts again if p1 and p2 are both running, an hard termination should be scheduled to overcome -// freezes during a graceful shutdown -func newBootstrap(pidFile string, upgradesEnabled bool) (*bootstrap, error) { - // PIDFile is optional, if provided tableflip will keep it updated - upg, err := tableflip.New(tableflip.Options{PIDFile: pidFile}) - if err != nil { - return nil, err - } - - if upgradesEnabled { - go func() { - sig := make(chan os.Signal, 1) - signal.Notify(sig, syscall.SIGHUP) - - for range sig { - err := upg.Upgrade() - if err != nil { - log.WithError(err).Error("Upgrade failed") - continue - } - - log.Info("Upgrade succeeded") - } - }() - } - - return &bootstrap{Upgrader: upg}, nil -} - -func (b *bootstrap) listen() error { - if socketPath := config.Config.SocketPath; socketPath != "" { - l, err := b.createUnixListener(socketPath) - if err != nil { - return err - } - - log.WithField("address", socketPath).Info("listening on unix socket") - b.insecureListeners = append(b.insecureListeners, l) - } - - if addr := config.Config.ListenAddr; addr != "" { - l, err := b.Fds.Listen("tcp", addr) - if err != nil { - return err - } - - log.WithField("address", addr).Info("listening at tcp address") - b.insecureListeners = append(b.insecureListeners, connectioncounter.New("tcp", l)) - } - - if addr := config.Config.TLSListenAddr; addr != "" { - tlsListener, err := b.Fds.Listen("tcp", addr) - if err != nil { - return err - } - - b.secureListeners = append(b.secureListeners, connectioncounter.New("tls", tlsListener)) - } - - b.serversErrors = make(chan error, len(b.insecureListeners)+len(b.secureListeners)) - - return nil -} - -func (b *bootstrap) prometheusListener() (net.Listener, error) { - log.WithField("address", config.Config.PrometheusListenAddr).Info("starting prometheus listener") - - return b.Fds.Listen("tcp", config.Config.PrometheusListenAddr) -} - -func (b *bootstrap) run() { - signals := []os.Signal{syscall.SIGTERM, syscall.SIGINT} - done := make(chan os.Signal, len(signals)) - signal.Notify(done, signals...) - - ruby, err := rubyserver.Start() - if err != nil { - log.WithError(err).Error("start ruby server") - return - } - defer ruby.Stop() - - if len(b.insecureListeners) > 0 { - insecureServer := server.NewInsecure(ruby) - defer insecureServer.Stop() - - serve(insecureServer, b.insecureListeners, b.Exit(), b.serversErrors) - } - - if len(b.secureListeners) > 0 { - secureServer := server.NewSecure(ruby) - defer secureServer.Stop() - - serve(secureServer, b.secureListeners, b.Exit(), b.serversErrors) - } - - if err := b.Ready(); err != nil { - log.WithError(err).Error("incomplete bootstrap") - return - } - - select { - case <-b.Exit(): - // this is the old process and a graceful upgrade is in progress - // the new process signaled its readiness and we started a graceful stop - // however no further upgrades can be started until this process is running - // we set a grace period and then we force a termination. - b.waitGracePeriod(done) - - err = fmt.Errorf("graceful upgrade") - case s := <-done: - err = fmt.Errorf("received signal %q", s) - case err = <-b.serversErrors: - } - - log.WithError(err).Error("terminating") -} - -func (b *bootstrap) waitGracePeriod(kill <-chan os.Signal) { - log.WithField("graceful_restart_timeout", config.Config.GracefulRestartTimeout).Warn("starting grace period") - - select { - case <-time.After(config.Config.GracefulRestartTimeout): - log.Error("old process stuck on termination. Grace period expired.") - case <-kill: - log.Error("force shutdown") - case <-b.serversErrors: - log.Info("graceful stop completed") - } -} - -func (b *bootstrap) createUnixListener(socketPath string) (net.Listener, error) { - if !b.HasParent() { - // During an update the unix socket exists and if we delete it tableflip will not create a new one - if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) { - return nil, err - } - } - - l, err := b.Fds.Listen("unix", socketPath) - return connectioncounter.New("unix", l), err -} - -func serve(server *grpc.Server, listeners []net.Listener, done <-chan struct{}, errors chan<- error) { - go func() { - <-done - - server.GracefulStop() - }() - - for _, listener := range listeners { - // Must pass the listener as a function argument because there is a race - // between 'go' and 'for'. - go func(l net.Listener) { - errors <- server.Serve(l) - }(listener) - } -} diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go index 10149365f26945a2bd27e2c0547c723a924185c4..d4b8b960e19206f76bd29550dce64f5c09bb79fc 100644 --- a/cmd/gitaly/main.go +++ b/cmd/gitaly/main.go @@ -3,19 +3,24 @@ package main import ( "flag" "fmt" + "net" "net/http" "os" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" log "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/internal/bootstrap" "gitlab.com/gitlab-org/gitaly/internal/config" + "gitlab.com/gitlab-org/gitaly/internal/connectioncounter" "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/linguist" + "gitlab.com/gitlab-org/gitaly/internal/rubyserver" "gitlab.com/gitlab-org/gitaly/internal/server" "gitlab.com/gitlab-org/gitaly/internal/tempdir" "gitlab.com/gitlab-org/gitaly/internal/version" "gitlab.com/gitlab-org/labkit/tracing" + "google.golang.org/grpc" ) var ( @@ -78,11 +83,10 @@ func main() { // gitaly-wrapper is supposed to set config.EnvUpgradesEnabled in order to enable graceful upgrades _, isWrapped := os.LookupEnv(config.EnvUpgradesEnabled) - b, err := newBootstrap(os.Getenv(config.EnvPidFile), isWrapped) + b, err := bootstrap.New(os.Getenv(config.EnvPidFile), isWrapped) if err != nil { log.WithError(err).Fatal("init bootstrap") } - defer b.Stop() // If invoked with -version if *flagVersion { @@ -111,30 +115,112 @@ func main() { tempdir.StartCleaning() - if err = b.listen(); err != nil { - log.WithError(err).Fatal("bootstrap failed") + ruby, err := rubyserver.Start() + if err != nil { + log.WithError(err).Fatal("start ruby server") } + defer ruby.Stop() - if config.Config.PrometheusListenAddr != "" { - l, err := b.prometheusListener() - if err != nil { - log.WithError(err).Fatal("configure prometheus listener") - } + insecureServer := server.NewInsecure(ruby) + secureServer := server.NewSecure(ruby) + + for _, s := range []*grpc.Server{insecureServer, secureServer} { + go func(s *grpc.Server) { + <-b.GracefulStop + s.GracefulStop() + }(s) + } + + if socketPath := config.Config.SocketPath; socketPath != "" { + b.RegisterStarter(func(listen bootstrap.ListenFunc, errCh chan<- error) error { + l, err := createUnixListener(listen, socketPath, b.IsFirstBoot()) + if err != nil { + return err + } - promMux := http.NewServeMux() - promMux.Handle("/metrics", promhttp.Handler()) + log.WithField("address", socketPath).Info("listening on unix socket") + go func() { + errCh <- insecureServer.Serve(connectioncounter.New("unix", l)) + }() + + return nil + }) + } + + type tcpConfig struct { + name, addr string + s *grpc.Server + } + + for _, cfg := range []tcpConfig{ + {name: "tcp", addr: config.Config.ListenAddr, s: insecureServer}, + {name: "tls", addr: config.Config.TLSListenAddr, s: secureServer}, + } { + if cfg.addr == "" { + continue + } - server.AddPprofHandlers(promMux) + // be careful with closure over cfg inside for loop + func(cfg tcpConfig) { + b.RegisterStarter(func(listen bootstrap.ListenFunc, errCh chan<- error) error { + l, err := listen("tcp", cfg.addr) + if err != nil { + return err + } + + log.WithField("address", cfg.addr).Infof("listening at %s address", cfg.name) + go func() { + errCh <- cfg.s.Serve(connectioncounter.New(cfg.name, l)) + }() + + return nil + }) + }(cfg) + } - go func() { - err = http.Serve(l, promMux) + if addr := config.Config.PrometheusListenAddr; addr != "" { + b.RegisterStarter(func(listen bootstrap.ListenFunc, errCh chan<- error) error { + l, err := listen("tcp", addr) if err != nil { - log.WithError(err).Fatal("Unable to serve prometheus") + return err } - }() + + log.WithField("address", addr).Info("starting prometheus listener") + + promMux := http.NewServeMux() + promMux.Handle("/metrics", promhttp.Handler()) + + server.AddPprofHandlers(promMux) + + go func() { + if err := http.Serve(l, promMux); err != nil { + log.WithError(err).Error("Unable to serve prometheus") + } + }() + + // Prometheus listener should not block graceful shutdown + go func() { + <-b.GracefulStop + errCh <- nil + }() + + return nil + }) } - b.run() + if err := b.Start(); err != nil { + log.WithError(err).Fatal("unable to start listeners") + } + + log.WithError(b.Wait()).Error("shutting down") +} + +func createUnixListener(listen bootstrap.ListenFunc, socketPath string, removeOld bool) (net.Listener, error) { + if removeOld { + if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) { + return nil, err + } + } - log.Fatal("shutting down") + return listen("unix", socketPath) } diff --git a/internal/bootstrap/bootstrap.go b/internal/bootstrap/bootstrap.go new file mode 100644 index 0000000000000000000000000000000000000000..3612a369494cac269f4c30cbffc8f69e24b42070 --- /dev/null +++ b/internal/bootstrap/bootstrap.go @@ -0,0 +1,158 @@ +package bootstrap + +import ( + "fmt" + "net" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/cloudflare/tableflip" + log "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/internal/config" +) + +type Bootstrap struct { + GracefulStop chan struct{} + + upgrader *tableflip.Upgrader + wg sync.WaitGroup + errChan chan error + starters []Starter +} + +// newBootstrap performs tableflip initialization +// +// first boot: +// * gitaly starts as usual, we will refer to it as p1 +// * newBootstrap will build a tableflip.Upgrader, we will refer to it as upg +// * sockets and files must be opened with upg.Fds +// * p1 will trap SIGHUP and invoke upg.Upgrade() +// * when ready to accept incoming connections p1 will call upg.Ready() +// * upg.Exit() channel will be closed when an upgrades completed successfully and the process must terminate +// +// graceful upgrade: +// * user replaces gitaly binary and/or config file +// * user sends SIGHUP to p1 +// * p1 will fork and exec the new gitaly, we will refer to it as p2 +// * from now on p1 will ignore other SIGHUP +// * if p2 terminates with a non-zero exit code, SIGHUP handling will be restored +// * p2 will follow the "first boot" sequence but upg.Fds will provide sockets and files from p1, when available +// * when p2 invokes upg.Ready() all the shared file descriptors not claimed by p2 will be closed +// * upg.Exit() channel in p1 will be closed now and p1 can gracefully terminate already accepted connections +// * upgrades cannot starts again if p1 and p2 are both running, an hard termination should be scheduled to overcome +// freezes during a graceful shutdown +func New(pidFile string, upgradesEnabled bool) (*Bootstrap, error) { + // PIDFile is optional, if provided tableflip will keep it updated + upg, err := tableflip.New(tableflip.Options{PIDFile: pidFile}) + if err != nil { + return nil, err + } + + if upgradesEnabled { + go func() { + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGHUP) + + for range sig { + err := upg.Upgrade() + if err != nil { + log.WithError(err).Error("Upgrade failed") + continue + } + + log.Info("Upgrade succeeded") + } + }() + } + + gracefulStopCh := make(chan struct{}) + go func() { + <-upg.Exit() + close(gracefulStopCh) + }() + + return &Bootstrap{ + upgrader: upg, + GracefulStop: gracefulStopCh, + }, nil +} + +type ListenFunc func(net, addr string) (net.Listener, error) + +type Starter func(ListenFunc, chan<- error) error + +func (b *Bootstrap) IsFirstBoot() bool { return !b.upgrader.HasParent() } + +func (b *Bootstrap) RegisterStarter(starter Starter) { + b.starters = append(b.starters, starter) +} + +func (b *Bootstrap) Start() error { + b.errChan = make(chan error, len(b.starters)) + + for _, start := range b.starters { + errCh := make(chan error) + + if err := start(b.upgrader.Fds.Listen, errCh); err != nil { + return err + } + + b.wg.Add(1) + go func(errCh chan error) { + err := <-errCh + b.wg.Done() + b.errChan <- err + }(errCh) + } + + return nil +} + +func (b *Bootstrap) Wait() error { + signals := []os.Signal{syscall.SIGTERM, syscall.SIGINT} + immediateShutdown := make(chan os.Signal, len(signals)) + signal.Notify(immediateShutdown, signals...) + + if err := b.upgrader.Ready(); err != nil { + return err + } + + var err error + select { + case <-b.upgrader.Exit(): + // this is the old process and a graceful upgrade is in progress + // the new process signaled its readiness and we started a graceful stop + // however no further upgrades can be started until this process is running + // we set a grace period and then we force a termination. + b.waitGracePeriod(immediateShutdown) + + err = fmt.Errorf("graceful upgrade") + case s := <-immediateShutdown: + err = fmt.Errorf("received signal %q", s) + case err = <-b.errChan: + } + + return err +} + +func (b *Bootstrap) waitGracePeriod(kill <-chan os.Signal) { + log.WithField("graceful_restart_timeout", config.Config.GracefulRestartTimeout).Warn("starting grace period") + + allServersDone := make(chan struct{}) + go func() { + b.wg.Wait() + close(allServersDone) + }() + + select { + case <-time.After(config.Config.GracefulRestartTimeout): + log.Error("old process stuck on termination. Grace period expired.") + case <-kill: + log.Error("force shutdown") + case <-allServersDone: + log.Info("graceful stop completed") + } +} diff --git a/cmd/gitaly/bootstrap_test.go b/internal/bootstrap/bootstrap_test.go similarity index 95% rename from cmd/gitaly/bootstrap_test.go rename to internal/bootstrap/bootstrap_test.go index f6fdf6f902e570748c040b2c4bf659114338a3d3..199fa12dbac9f2252deb402e579b6754f6a1b7a6 100644 --- a/cmd/gitaly/bootstrap_test.go +++ b/internal/bootstrap/bootstrap_test.go @@ -1,4 +1,4 @@ -package main +package bootstrap import ( "context" @@ -17,7 +17,7 @@ import ( ) // b is global because tableflip do not allow to init more than one Upgrader per process -var b *bootstrap +var b *Bootstrap var socketPath = path.Join(os.TempDir(), "test-unix-socket") // TestMain helps testing bootstrap. @@ -25,13 +25,12 @@ var socketPath = path.Join(os.TempDir(), "test-unix-socket") // avoid the test suite and start a pid HTTP server on socketPath func TestMain(m *testing.M) { var err error - b, err = newBootstrap("", true) + b, err = New("", true) if err != nil { panic(err) } - if !b.HasParent() { - // Execute test suite if there is no parent. + if b.IsFirstBoot() { os.Exit(m.Run()) }