From cb6346dd6b44cba09e960883fef8d69c8c83d084 Mon Sep 17 00:00:00 2001 From: Alessio Caiazza Date: Mon, 8 Apr 2019 10:34:35 +0200 Subject: [PATCH 01/12] Add a WaitGroup to wait for a graceful restart We terminate gitaly on the first failure from a listening socket, but on a graceful restart we must wait for all the socket to properly terminate active connections. --- changelogs/unreleased/more-graceful.yml | 5 +++++ cmd/gitaly/bootstrap.go | 28 ++++++++++++++++++------- 2 files changed, 26 insertions(+), 7 deletions(-) create mode 100644 changelogs/unreleased/more-graceful.yml diff --git a/changelogs/unreleased/more-graceful.yml b/changelogs/unreleased/more-graceful.yml new file mode 100644 index 00000000000..b950aba6aaa --- /dev/null +++ b/changelogs/unreleased/more-graceful.yml @@ -0,0 +1,5 @@ +--- +title: Wait for all the socket to terminate during a graceful restart +merge_request: +author: +type: fixed diff --git a/cmd/gitaly/bootstrap.go b/cmd/gitaly/bootstrap.go index d3b46405282..4ea0ec94559 100644 --- a/cmd/gitaly/bootstrap.go +++ b/cmd/gitaly/bootstrap.go @@ -1,10 +1,12 @@ package main import ( + "context" "fmt" "net" "os" "os/signal" + "sync" "syscall" "time" @@ -24,6 +26,7 @@ type bootstrap struct { secureListeners []net.Listener serversErrors chan error + wg sync.WaitGroup } // newBootstrap performs tableflip initialization @@ -104,7 +107,9 @@ func (b *bootstrap) listen() error { b.secureListeners = append(b.secureListeners, connectioncounter.New("tls", tlsListener)) } - b.serversErrors = make(chan error, len(b.insecureListeners)+len(b.secureListeners)) + cnt := len(b.insecureListeners) + len(b.secureListeners) + b.serversErrors = make(chan error, cnt) + b.wg.Add(cnt) return nil } @@ -131,14 +136,14 @@ func (b *bootstrap) run() { insecureServer := server.NewInsecure(ruby) defer insecureServer.Stop() - serve(insecureServer, b.insecureListeners, b.Exit(), b.serversErrors) + b.serve(insecureServer, b.insecureListeners) } if len(b.secureListeners) > 0 { secureServer := server.NewSecure(ruby) defer secureServer.Stop() - serve(secureServer, b.secureListeners, b.Exit(), b.serversErrors) + b.serve(secureServer, b.secureListeners) } if err := b.Ready(); err != nil { @@ -166,12 +171,20 @@ func (b *bootstrap) run() { func (b *bootstrap) waitGracePeriod(kill <-chan os.Signal) { log.WithField("graceful_restart_timeout", config.Config.GracefulRestartTimeout).Warn("starting grace period") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + b.wg.Wait() + cancel() + }() + select { case <-time.After(config.Config.GracefulRestartTimeout): log.Error("old process stuck on termination. Grace period expired.") case <-kill: log.Error("force shutdown") - case <-b.serversErrors: + case <-ctx.Done(): log.Info("graceful stop completed") } } @@ -188,9 +201,9 @@ func (b *bootstrap) createUnixListener(socketPath string) (net.Listener, error) return connectioncounter.New("unix", l), err } -func serve(server *grpc.Server, listeners []net.Listener, done <-chan struct{}, errors chan<- error) { +func (b *bootstrap) serve(server *grpc.Server, listeners []net.Listener) { go func() { - <-done + <-b.Exit() server.GracefulStop() }() @@ -199,7 +212,8 @@ func serve(server *grpc.Server, listeners []net.Listener, done <-chan struct{}, // Must pass the listener as a function argument because there is a race // between 'go' and 'for'. go func(l net.Listener) { - errors <- server.Serve(l) + b.serversErrors <- server.Serve(l) + b.wg.Done() }(listener) } } -- GitLab From 924fd0a3419a2086ff24901938ed149c432920a5 Mon Sep 17 00:00:00 2001 From: Alessio Caiazza Date: Mon, 8 Apr 2019 09:33:36 +0000 Subject: [PATCH 02/12] set MR number in changelog --- changelogs/unreleased/more-graceful.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelogs/unreleased/more-graceful.yml b/changelogs/unreleased/more-graceful.yml index b950aba6aaa..0b1d1e91754 100644 --- a/changelogs/unreleased/more-graceful.yml +++ b/changelogs/unreleased/more-graceful.yml @@ -1,5 +1,5 @@ --- title: Wait for all the socket to terminate during a graceful restart -merge_request: +merge_request: 1190 author: type: fixed -- GitLab From ea76de1576f47b498cc844e52291aec5878d74f4 Mon Sep 17 00:00:00 2001 From: Alessio Caiazza Date: Mon, 8 Apr 2019 11:54:50 +0200 Subject: [PATCH 03/12] avoid context.Context --- cmd/gitaly/bootstrap.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/cmd/gitaly/bootstrap.go b/cmd/gitaly/bootstrap.go index 4ea0ec94559..f2ad0c5829c 100644 --- a/cmd/gitaly/bootstrap.go +++ b/cmd/gitaly/bootstrap.go @@ -1,7 +1,6 @@ package main import ( - "context" "fmt" "net" "os" @@ -171,12 +170,10 @@ func (b *bootstrap) run() { func (b *bootstrap) waitGracePeriod(kill <-chan os.Signal) { log.WithField("graceful_restart_timeout", config.Config.GracefulRestartTimeout).Warn("starting grace period") - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - + allServersDone := make(chan struct{}) go func() { b.wg.Wait() - cancel() + close(allServersDone) }() select { @@ -184,7 +181,7 @@ func (b *bootstrap) waitGracePeriod(kill <-chan os.Signal) { log.Error("old process stuck on termination. Grace period expired.") case <-kill: log.Error("force shutdown") - case <-ctx.Done(): + case <-allServersDone: log.Info("graceful stop completed") } } -- GitLab From b28508f4db315abd601b5756b0c41aaf40e73655 Mon Sep 17 00:00:00 2001 From: Jacob Vosmaer Date: Mon, 8 Apr 2019 17:02:23 +0200 Subject: [PATCH 04/12] Sketch refactor of bootstrap code --- cmd/gitaly/bootstrap.go | 216 ------------------ cmd/gitaly/main.go | 119 ++++++++-- internal/bootstrap/bootstrap.go | 172 ++++++++++++++ .../bootstrap}/bootstrap_test.go | 9 +- 4 files changed, 278 insertions(+), 238 deletions(-) delete mode 100644 cmd/gitaly/bootstrap.go create mode 100644 internal/bootstrap/bootstrap.go rename {cmd/gitaly => internal/bootstrap}/bootstrap_test.go (95%) diff --git a/cmd/gitaly/bootstrap.go b/cmd/gitaly/bootstrap.go deleted file mode 100644 index f2ad0c5829c..00000000000 --- a/cmd/gitaly/bootstrap.go +++ /dev/null @@ -1,216 +0,0 @@ -package main - -import ( - "fmt" - "net" - "os" - "os/signal" - "sync" - "syscall" - "time" - - "github.com/cloudflare/tableflip" - log "github.com/sirupsen/logrus" - "gitlab.com/gitlab-org/gitaly/internal/config" - "gitlab.com/gitlab-org/gitaly/internal/connectioncounter" - "gitlab.com/gitlab-org/gitaly/internal/rubyserver" - "gitlab.com/gitlab-org/gitaly/internal/server" - "google.golang.org/grpc" -) - -type bootstrap struct { - *tableflip.Upgrader - - insecureListeners []net.Listener - secureListeners []net.Listener - - serversErrors chan error - wg sync.WaitGroup -} - -// newBootstrap performs tableflip initialization -// -// first boot: -// * gitaly starts as usual, we will refer to it as p1 -// * newBootstrap will build a tableflip.Upgrader, we will refer to it as upg -// * sockets and files must be opened with upg.Fds -// * p1 will trap SIGHUP and invoke upg.Upgrade() -// * when ready to accept incoming connections p1 will call upg.Ready() -// * upg.Exit() channel will be closed when an upgrades completed successfully and the process must terminate -// -// graceful upgrade: -// * user replaces gitaly binary and/or config file -// * user sends SIGHUP to p1 -// * p1 will fork and exec the new gitaly, we will refer to it as p2 -// * from now on p1 will ignore other SIGHUP -// * if p2 terminates with a non-zero exit code, SIGHUP handling will be restored -// * p2 will follow the "first boot" sequence but upg.Fds will provide sockets and files from p1, when available -// * when p2 invokes upg.Ready() all the shared file descriptors not claimed by p2 will be closed -// * upg.Exit() channel in p1 will be closed now and p1 can gracefully terminate already accepted connections -// * upgrades cannot starts again if p1 and p2 are both running, an hard termination should be scheduled to overcome -// freezes during a graceful shutdown -func newBootstrap(pidFile string, upgradesEnabled bool) (*bootstrap, error) { - // PIDFile is optional, if provided tableflip will keep it updated - upg, err := tableflip.New(tableflip.Options{PIDFile: pidFile}) - if err != nil { - return nil, err - } - - if upgradesEnabled { - go func() { - sig := make(chan os.Signal, 1) - signal.Notify(sig, syscall.SIGHUP) - - for range sig { - err := upg.Upgrade() - if err != nil { - log.WithError(err).Error("Upgrade failed") - continue - } - - log.Info("Upgrade succeeded") - } - }() - } - - return &bootstrap{Upgrader: upg}, nil -} - -func (b *bootstrap) listen() error { - if socketPath := config.Config.SocketPath; socketPath != "" { - l, err := b.createUnixListener(socketPath) - if err != nil { - return err - } - - log.WithField("address", socketPath).Info("listening on unix socket") - b.insecureListeners = append(b.insecureListeners, l) - } - - if addr := config.Config.ListenAddr; addr != "" { - l, err := b.Fds.Listen("tcp", addr) - if err != nil { - return err - } - - log.WithField("address", addr).Info("listening at tcp address") - b.insecureListeners = append(b.insecureListeners, connectioncounter.New("tcp", l)) - } - - if addr := config.Config.TLSListenAddr; addr != "" { - tlsListener, err := b.Fds.Listen("tcp", addr) - if err != nil { - return err - } - - b.secureListeners = append(b.secureListeners, connectioncounter.New("tls", tlsListener)) - } - - cnt := len(b.insecureListeners) + len(b.secureListeners) - b.serversErrors = make(chan error, cnt) - b.wg.Add(cnt) - - return nil -} - -func (b *bootstrap) prometheusListener() (net.Listener, error) { - log.WithField("address", config.Config.PrometheusListenAddr).Info("starting prometheus listener") - - return b.Fds.Listen("tcp", config.Config.PrometheusListenAddr) -} - -func (b *bootstrap) run() { - signals := []os.Signal{syscall.SIGTERM, syscall.SIGINT} - done := make(chan os.Signal, len(signals)) - signal.Notify(done, signals...) - - ruby, err := rubyserver.Start() - if err != nil { - log.WithError(err).Error("start ruby server") - return - } - defer ruby.Stop() - - if len(b.insecureListeners) > 0 { - insecureServer := server.NewInsecure(ruby) - defer insecureServer.Stop() - - b.serve(insecureServer, b.insecureListeners) - } - - if len(b.secureListeners) > 0 { - secureServer := server.NewSecure(ruby) - defer secureServer.Stop() - - b.serve(secureServer, b.secureListeners) - } - - if err := b.Ready(); err != nil { - log.WithError(err).Error("incomplete bootstrap") - return - } - - select { - case <-b.Exit(): - // this is the old process and a graceful upgrade is in progress - // the new process signaled its readiness and we started a graceful stop - // however no further upgrades can be started until this process is running - // we set a grace period and then we force a termination. - b.waitGracePeriod(done) - - err = fmt.Errorf("graceful upgrade") - case s := <-done: - err = fmt.Errorf("received signal %q", s) - case err = <-b.serversErrors: - } - - log.WithError(err).Error("terminating") -} - -func (b *bootstrap) waitGracePeriod(kill <-chan os.Signal) { - log.WithField("graceful_restart_timeout", config.Config.GracefulRestartTimeout).Warn("starting grace period") - - allServersDone := make(chan struct{}) - go func() { - b.wg.Wait() - close(allServersDone) - }() - - select { - case <-time.After(config.Config.GracefulRestartTimeout): - log.Error("old process stuck on termination. Grace period expired.") - case <-kill: - log.Error("force shutdown") - case <-allServersDone: - log.Info("graceful stop completed") - } -} - -func (b *bootstrap) createUnixListener(socketPath string) (net.Listener, error) { - if !b.HasParent() { - // During an update the unix socket exists and if we delete it tableflip will not create a new one - if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) { - return nil, err - } - } - - l, err := b.Fds.Listen("unix", socketPath) - return connectioncounter.New("unix", l), err -} - -func (b *bootstrap) serve(server *grpc.Server, listeners []net.Listener) { - go func() { - <-b.Exit() - - server.GracefulStop() - }() - - for _, listener := range listeners { - // Must pass the listener as a function argument because there is a race - // between 'go' and 'for'. - go func(l net.Listener) { - b.serversErrors <- server.Serve(l) - b.wg.Done() - }(listener) - } -} diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go index 10149365f26..e97b7b2a21f 100644 --- a/cmd/gitaly/main.go +++ b/cmd/gitaly/main.go @@ -3,19 +3,24 @@ package main import ( "flag" "fmt" + "net" "net/http" "os" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" log "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/internal/bootstrap" "gitlab.com/gitlab-org/gitaly/internal/config" + "gitlab.com/gitlab-org/gitaly/internal/connectioncounter" "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/linguist" + "gitlab.com/gitlab-org/gitaly/internal/rubyserver" "gitlab.com/gitlab-org/gitaly/internal/server" "gitlab.com/gitlab-org/gitaly/internal/tempdir" "gitlab.com/gitlab-org/gitaly/internal/version" "gitlab.com/gitlab-org/labkit/tracing" + "google.golang.org/grpc" ) var ( @@ -78,11 +83,10 @@ func main() { // gitaly-wrapper is supposed to set config.EnvUpgradesEnabled in order to enable graceful upgrades _, isWrapped := os.LookupEnv(config.EnvUpgradesEnabled) - b, err := newBootstrap(os.Getenv(config.EnvPidFile), isWrapped) + b, err := bootstrap.New(os.Getenv(config.EnvPidFile), isWrapped) if err != nil { log.WithError(err).Fatal("init bootstrap") } - defer b.Stop() // If invoked with -version if *flagVersion { @@ -111,30 +115,111 @@ func main() { tempdir.StartCleaning() - if err = b.listen(); err != nil { - log.WithError(err).Fatal("bootstrap failed") + ruby, err := rubyserver.Start() + if err != nil { + log.WithError(err).Fatal("start ruby server") + } + + go func() { + <-b.Stop + ruby.Stop() + }() + + insecureServer := server.NewInsecure(ruby) + secureServer := server.NewSecure(ruby) + + for _, s := range []*grpc.Server{insecureServer, secureServer} { + go func(s *grpc.Server) { + select { + case <-b.Stop: + s.Stop() + case <-b.GracefulStop: + s.GracefulStop() + } + }(s) } - if config.Config.PrometheusListenAddr != "" { - l, err := b.prometheusListener() - if err != nil { - log.WithError(err).Fatal("configure prometheus listener") + if socketPath := config.Config.SocketPath; socketPath != "" { + b.RegisterStarter(func(listen bootstrap.ListenFunc, errCh chan<- error) error { + l, err := createUnixListener(listen, socketPath, !b.IsFirstBoot()) + if err != nil { + return err + } + + log.WithField("address", socketPath).Info("listening on unix socket") + go func() { + errCh <- insecureServer.Serve(connectioncounter.New("unix", l)) + }() + + return nil + }) + } + + for _, cfg := range []struct { + name, addr string + s *grpc.Server + }{ + {name: "tcp", addr: config.Config.ListenAddr, s: insecureServer}, + {name: "tls", addr: config.Config.TLSListenAddr, s: secureServer}, + } { + if cfg.addr == "" { + continue } - promMux := http.NewServeMux() - promMux.Handle("/metrics", promhttp.Handler()) + b.RegisterStarter(func(listen bootstrap.ListenFunc, errCh chan<- error) error { + l, err := listen("tcp", cfg.addr) + if err != nil { + return err + } + + log.WithField("address", cfg.addr).Infof("listening at %s address", cfg.name) + go func() { + errCh <- cfg.s.Serve(connectioncounter.New(cfg.name, l)) + }() - server.AddPprofHandlers(promMux) + return nil + }) + } - go func() { - err = http.Serve(l, promMux) + if addr := config.Config.PrometheusListenAddr; addr != "" { + b.RegisterExtraStarter(func(listen bootstrap.ListenFunc) error { + l, err := listen("tcp", addr) if err != nil { - log.WithError(err).Fatal("Unable to serve prometheus") + return err } - }() + + log.WithField("address", addr).Info("starting prometheus listener") + + promMux := http.NewServeMux() + promMux.Handle("/metrics", promhttp.Handler()) + + server.AddPprofHandlers(promMux) + + go func() { + if err := http.Serve(l, promMux); err != nil { + log.WithError(err).Fatal("Unable to serve prometheus") + } + }() + + return nil + }) } - b.run() + if err := b.Start(); err != nil { + log.WithError(err).Fatal("unable to start listeners") + } + + b.Run() + + log.Error("shutting down") +} + +func createUnixListener(listen bootstrap.ListenFunc, socketPath string, removeOld bool) (net.Listener, error) { + if removeOld { + if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) { + return nil, err + } + } - log.Fatal("shutting down") + return listen("unix", socketPath) } diff --git a/internal/bootstrap/bootstrap.go b/internal/bootstrap/bootstrap.go new file mode 100644 index 00000000000..b883ac4f141 --- /dev/null +++ b/internal/bootstrap/bootstrap.go @@ -0,0 +1,172 @@ +package bootstrap + +import ( + "fmt" + "net" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/cloudflare/tableflip" + log "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/internal/config" +) + +type Bootstrap struct { + Stop chan struct{} + GracefulStop chan struct{} + + upgrader *tableflip.Upgrader + wg sync.WaitGroup + errChan chan error + starters []Starter + extraStarters []ExtraStarter +} + +// newBootstrap performs tableflip initialization +// +// first boot: +// * gitaly starts as usual, we will refer to it as p1 +// * newBootstrap will build a tableflip.Upgrader, we will refer to it as upg +// * sockets and files must be opened with upg.Fds +// * p1 will trap SIGHUP and invoke upg.Upgrade() +// * when ready to accept incoming connections p1 will call upg.Ready() +// * upg.Exit() channel will be closed when an upgrades completed successfully and the process must terminate +// +// graceful upgrade: +// * user replaces gitaly binary and/or config file +// * user sends SIGHUP to p1 +// * p1 will fork and exec the new gitaly, we will refer to it as p2 +// * from now on p1 will ignore other SIGHUP +// * if p2 terminates with a non-zero exit code, SIGHUP handling will be restored +// * p2 will follow the "first boot" sequence but upg.Fds will provide sockets and files from p1, when available +// * when p2 invokes upg.Ready() all the shared file descriptors not claimed by p2 will be closed +// * upg.Exit() channel in p1 will be closed now and p1 can gracefully terminate already accepted connections +// * upgrades cannot starts again if p1 and p2 are both running, an hard termination should be scheduled to overcome +// freezes during a graceful shutdown +func New(pidFile string, upgradesEnabled bool) (*Bootstrap, error) { + // PIDFile is optional, if provided tableflip will keep it updated + upg, err := tableflip.New(tableflip.Options{PIDFile: pidFile}) + if err != nil { + return nil, err + } + + if upgradesEnabled { + go func() { + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGHUP) + + for range sig { + err := upg.Upgrade() + if err != nil { + log.WithError(err).Error("Upgrade failed") + continue + } + + log.Info("Upgrade succeeded") + } + }() + } + + gracefulStopCh := make(chan struct{}) + go func() { <-upg.Exit(); close(gracefulStopCh) }() + + return &Bootstrap{ + upgrader: upg, + Stop: make(chan struct{}), + GracefulStop: gracefulStopCh, + }, nil +} + +type ListenFunc func(net, addr string) (net.Listener, error) + +type Starter func(ListenFunc, chan<- error) error +type ExtraStarter func(ListenFunc) error + +func (b *Bootstrap) IsFirstBoot() bool { return !b.upgrader.HasParent() } + +func (b *Bootstrap) RegisterStarter(starter Starter) { + b.starters = append(b.starters, starter) +} + +func (b *Bootstrap) RegisterExtraStarter(extra ExtraStarter) { + b.extraStarters = append(b.extraStarters, extra) +} + +func (b *Bootstrap) Start() error { + for _, s := range b.extraStarters { + if err := s(b.upgrader.Fds.Listen); err != nil { + return err + } + } + + b.errChan = make(chan error, len(b.starters)) + + for _, start := range b.starters { + errCh := make(chan error) + + if err := start(b.upgrader.Fds.Listen, errCh); err != nil { + return err + } + + b.wg.Add(1) + go func(errCh chan error) { + err := <-errCh + b.wg.Done() + b.errChan <- err + }(errCh) + } + + return nil +} + +func (b *Bootstrap) Run() { + signals := []os.Signal{syscall.SIGTERM, syscall.SIGINT} + immediateShutdown := make(chan os.Signal, len(signals)) + signal.Notify(immediateShutdown, signals...) + + if err := b.upgrader.Ready(); err != nil { + log.WithError(err).Error("incomplete bootstrap") + return + } + + var err error + select { + case <-b.upgrader.Exit(): + // this is the old process and a graceful upgrade is in progress + // the new process signaled its readiness and we started a graceful stop + // however no further upgrades can be started until this process is running + // we set a grace period and then we force a termination. + b.waitGracePeriod(immediateShutdown) + + err = fmt.Errorf("graceful upgrade") + case s := <-immediateShutdown: + err = fmt.Errorf("received signal %q", s) + case err = <-b.errChan: + } + + close(b.Stop) + + log.WithError(err).Error("terminating") +} + +func (b *Bootstrap) waitGracePeriod(kill <-chan os.Signal) { + log.WithField("graceful_restart_timeout", config.Config.GracefulRestartTimeout).Warn("starting grace period") + + allServersDone := make(chan struct{}) + go func() { + b.wg.Wait() + close(allServersDone) + }() + + select { + case <-time.After(config.Config.GracefulRestartTimeout): + log.Error("old process stuck on termination. Grace period expired.") + case <-kill: + log.Error("force shutdown") + case <-allServersDone: + log.Info("graceful stop completed") + } +} diff --git a/cmd/gitaly/bootstrap_test.go b/internal/bootstrap/bootstrap_test.go similarity index 95% rename from cmd/gitaly/bootstrap_test.go rename to internal/bootstrap/bootstrap_test.go index f6fdf6f902e..199fa12dbac 100644 --- a/cmd/gitaly/bootstrap_test.go +++ b/internal/bootstrap/bootstrap_test.go @@ -1,4 +1,4 @@ -package main +package bootstrap import ( "context" @@ -17,7 +17,7 @@ import ( ) // b is global because tableflip do not allow to init more than one Upgrader per process -var b *bootstrap +var b *Bootstrap var socketPath = path.Join(os.TempDir(), "test-unix-socket") // TestMain helps testing bootstrap. @@ -25,13 +25,12 @@ var socketPath = path.Join(os.TempDir(), "test-unix-socket") // avoid the test suite and start a pid HTTP server on socketPath func TestMain(m *testing.M) { var err error - b, err = newBootstrap("", true) + b, err = New("", true) if err != nil { panic(err) } - if !b.HasParent() { - // Execute test suite if there is no parent. + if b.IsFirstBoot() { os.Exit(m.Run()) } -- GitLab From 76fcd64e721cd4ec14a514c10e61659a949b7a60 Mon Sep 17 00:00:00 2001 From: Jacob Vosmaer Date: Mon, 8 Apr 2019 17:09:16 +0200 Subject: [PATCH 05/12] Return error from Run() --- cmd/gitaly/main.go | 4 +--- internal/bootstrap/bootstrap.go | 7 +++---- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go index e97b7b2a21f..29e3d2b8163 100644 --- a/cmd/gitaly/main.go +++ b/cmd/gitaly/main.go @@ -209,9 +209,7 @@ func main() { log.WithError(err).Fatal("unable to start listeners") } - b.Run() - - log.Error("shutting down") + log.WithError(b.Run()).Error("shutting down") } func createUnixListener(listen bootstrap.ListenFunc, socketPath string, removeOld bool) (net.Listener, error) { diff --git a/internal/bootstrap/bootstrap.go b/internal/bootstrap/bootstrap.go index b883ac4f141..7c70635bbbd 100644 --- a/internal/bootstrap/bootstrap.go +++ b/internal/bootstrap/bootstrap.go @@ -122,14 +122,13 @@ func (b *Bootstrap) Start() error { return nil } -func (b *Bootstrap) Run() { +func (b *Bootstrap) Run() error { signals := []os.Signal{syscall.SIGTERM, syscall.SIGINT} immediateShutdown := make(chan os.Signal, len(signals)) signal.Notify(immediateShutdown, signals...) if err := b.upgrader.Ready(); err != nil { - log.WithError(err).Error("incomplete bootstrap") - return + return err } var err error @@ -149,7 +148,7 @@ func (b *Bootstrap) Run() { close(b.Stop) - log.WithError(err).Error("terminating") + return err } func (b *Bootstrap) waitGracePeriod(kill <-chan os.Signal) { -- GitLab From 331aea8e497335630548fb816d89395d9313012c Mon Sep 17 00:00:00 2001 From: Jacob Vosmaer Date: Mon, 8 Apr 2019 17:12:12 +0200 Subject: [PATCH 06/12] Use defer --- cmd/gitaly/main.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go index 29e3d2b8163..ff1e0f3d6e2 100644 --- a/cmd/gitaly/main.go +++ b/cmd/gitaly/main.go @@ -119,11 +119,7 @@ func main() { if err != nil { log.WithError(err).Fatal("start ruby server") } - - go func() { - <-b.Stop - ruby.Stop() - }() + defer ruby.Stop() insecureServer := server.NewInsecure(ruby) secureServer := server.NewSecure(ruby) -- GitLab From 6ebfe26769a59850c1023f3573d70feaa8a1956a Mon Sep 17 00:00:00 2001 From: Jacob Vosmaer Date: Mon, 8 Apr 2019 17:15:57 +0200 Subject: [PATCH 07/12] Rename Run to Wait --- cmd/gitaly/main.go | 2 +- internal/bootstrap/bootstrap.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go index ff1e0f3d6e2..6ee0928e41b 100644 --- a/cmd/gitaly/main.go +++ b/cmd/gitaly/main.go @@ -205,7 +205,7 @@ func main() { log.WithError(err).Fatal("unable to start listeners") } - log.WithError(b.Run()).Error("shutting down") + log.WithError(b.Wait()).Error("shutting down") } func createUnixListener(listen bootstrap.ListenFunc, socketPath string, removeOld bool) (net.Listener, error) { diff --git a/internal/bootstrap/bootstrap.go b/internal/bootstrap/bootstrap.go index 7c70635bbbd..002fca92014 100644 --- a/internal/bootstrap/bootstrap.go +++ b/internal/bootstrap/bootstrap.go @@ -122,7 +122,7 @@ func (b *Bootstrap) Start() error { return nil } -func (b *Bootstrap) Run() error { +func (b *Bootstrap) Wait() error { signals := []os.Signal{syscall.SIGTERM, syscall.SIGINT} immediateShutdown := make(chan os.Signal, len(signals)) signal.Notify(immediateShutdown, signals...) -- GitLab From 231fbf560f83a8c6fe5e2884a015b5505daa3ee0 Mon Sep 17 00:00:00 2001 From: Jacob Vosmaer Date: Mon, 8 Apr 2019 17:20:40 +0200 Subject: [PATCH 08/12] Remove race-prone *grpc.Server.Stop --- cmd/gitaly/main.go | 8 ++------ internal/bootstrap/bootstrap.go | 4 ---- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go index 6ee0928e41b..ce1669c8d65 100644 --- a/cmd/gitaly/main.go +++ b/cmd/gitaly/main.go @@ -126,12 +126,8 @@ func main() { for _, s := range []*grpc.Server{insecureServer, secureServer} { go func(s *grpc.Server) { - select { - case <-b.Stop: - s.Stop() - case <-b.GracefulStop: - s.GracefulStop() - } + <-b.GracefulStop + s.GracefulStop() }(s) } diff --git a/internal/bootstrap/bootstrap.go b/internal/bootstrap/bootstrap.go index 002fca92014..f48d9bff8ab 100644 --- a/internal/bootstrap/bootstrap.go +++ b/internal/bootstrap/bootstrap.go @@ -15,7 +15,6 @@ import ( ) type Bootstrap struct { - Stop chan struct{} GracefulStop chan struct{} upgrader *tableflip.Upgrader @@ -75,7 +74,6 @@ func New(pidFile string, upgradesEnabled bool) (*Bootstrap, error) { return &Bootstrap{ upgrader: upg, - Stop: make(chan struct{}), GracefulStop: gracefulStopCh, }, nil } @@ -146,8 +144,6 @@ func (b *Bootstrap) Wait() error { case err = <-b.errChan: } - close(b.Stop) - return err } -- GitLab From 296574163b33076e770a5119cc0288140759e0ae Mon Sep 17 00:00:00 2001 From: Jacob Vosmaer Date: Mon, 8 Apr 2019 17:27:53 +0200 Subject: [PATCH 09/12] Fix bugs --- cmd/gitaly/main.go | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go index ce1669c8d65..42f2eb436fc 100644 --- a/cmd/gitaly/main.go +++ b/cmd/gitaly/main.go @@ -133,7 +133,7 @@ func main() { if socketPath := config.Config.SocketPath; socketPath != "" { b.RegisterStarter(func(listen bootstrap.ListenFunc, errCh chan<- error) error { - l, err := createUnixListener(listen, socketPath, !b.IsFirstBoot()) + l, err := createUnixListener(listen, socketPath, b.IsFirstBoot()) if err != nil { return err } @@ -147,10 +147,12 @@ func main() { }) } - for _, cfg := range []struct { + type tcpConfig struct { name, addr string s *grpc.Server - }{ + } + + for _, cfg := range []tcpConfig{ {name: "tcp", addr: config.Config.ListenAddr, s: insecureServer}, {name: "tls", addr: config.Config.TLSListenAddr, s: secureServer}, } { @@ -158,19 +160,22 @@ func main() { continue } - b.RegisterStarter(func(listen bootstrap.ListenFunc, errCh chan<- error) error { - l, err := listen("tcp", cfg.addr) - if err != nil { - return err - } + // be careful with closure over cfg inside for loop + func(cfg tcpConfig) { + b.RegisterStarter(func(listen bootstrap.ListenFunc, errCh chan<- error) error { + l, err := listen("tcp", cfg.addr) + if err != nil { + return err + } - log.WithField("address", cfg.addr).Infof("listening at %s address", cfg.name) - go func() { - errCh <- cfg.s.Serve(connectioncounter.New(cfg.name, l)) - }() + log.WithField("address", cfg.addr).Infof("listening at %s address", cfg.name) + go func() { + errCh <- cfg.s.Serve(connectioncounter.New(cfg.name, l)) + }() - return nil - }) + return nil + }) + }(cfg) } if addr := config.Config.PrometheusListenAddr; addr != "" { -- GitLab From c5c5b826781e73dc9c1024e7aef1563a3cd4c658 Mon Sep 17 00:00:00 2001 From: Jacob Vosmaer Date: Mon, 8 Apr 2019 17:37:50 +0200 Subject: [PATCH 10/12] Remove ExtraStarter concept --- cmd/gitaly/main.go | 10 ++++++++-- internal/bootstrap/bootstrap.go | 19 ++++--------------- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go index 42f2eb436fc..d4b8b960e19 100644 --- a/cmd/gitaly/main.go +++ b/cmd/gitaly/main.go @@ -179,7 +179,7 @@ func main() { } if addr := config.Config.PrometheusListenAddr; addr != "" { - b.RegisterExtraStarter(func(listen bootstrap.ListenFunc) error { + b.RegisterStarter(func(listen bootstrap.ListenFunc, errCh chan<- error) error { l, err := listen("tcp", addr) if err != nil { return err @@ -194,10 +194,16 @@ func main() { go func() { if err := http.Serve(l, promMux); err != nil { - log.WithError(err).Fatal("Unable to serve prometheus") + log.WithError(err).Error("Unable to serve prometheus") } }() + // Prometheus listener should not block graceful shutdown + go func() { + <-b.GracefulStop + errCh <- nil + }() + return nil }) } diff --git a/internal/bootstrap/bootstrap.go b/internal/bootstrap/bootstrap.go index f48d9bff8ab..42e66c74a60 100644 --- a/internal/bootstrap/bootstrap.go +++ b/internal/bootstrap/bootstrap.go @@ -17,11 +17,10 @@ import ( type Bootstrap struct { GracefulStop chan struct{} - upgrader *tableflip.Upgrader - wg sync.WaitGroup - errChan chan error - starters []Starter - extraStarters []ExtraStarter + upgrader *tableflip.Upgrader + wg sync.WaitGroup + errChan chan error + starters []Starter } // newBootstrap performs tableflip initialization @@ -89,17 +88,7 @@ func (b *Bootstrap) RegisterStarter(starter Starter) { b.starters = append(b.starters, starter) } -func (b *Bootstrap) RegisterExtraStarter(extra ExtraStarter) { - b.extraStarters = append(b.extraStarters, extra) -} - func (b *Bootstrap) Start() error { - for _, s := range b.extraStarters { - if err := s(b.upgrader.Fds.Listen); err != nil { - return err - } - } - b.errChan = make(chan error, len(b.starters)) for _, start := range b.starters { -- GitLab From 28392aff1fc6c879f156c1aecba084b823dce2bd Mon Sep 17 00:00:00 2001 From: Jacob Vosmaer Date: Mon, 8 Apr 2019 18:29:36 +0200 Subject: [PATCH 11/12] Remove unused type --- internal/bootstrap/bootstrap.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/bootstrap/bootstrap.go b/internal/bootstrap/bootstrap.go index 42e66c74a60..d7620eed054 100644 --- a/internal/bootstrap/bootstrap.go +++ b/internal/bootstrap/bootstrap.go @@ -80,7 +80,6 @@ func New(pidFile string, upgradesEnabled bool) (*Bootstrap, error) { type ListenFunc func(net, addr string) (net.Listener, error) type Starter func(ListenFunc, chan<- error) error -type ExtraStarter func(ListenFunc) error func (b *Bootstrap) IsFirstBoot() bool { return !b.upgrader.HasParent() } -- GitLab From dad6406ff19ea5c5485f27b8e43760bb34873f23 Mon Sep 17 00:00:00 2001 From: Jacob Vosmaer Date: Mon, 8 Apr 2019 18:31:00 +0200 Subject: [PATCH 12/12] Don't use semicolon --- internal/bootstrap/bootstrap.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/bootstrap/bootstrap.go b/internal/bootstrap/bootstrap.go index d7620eed054..3612a369494 100644 --- a/internal/bootstrap/bootstrap.go +++ b/internal/bootstrap/bootstrap.go @@ -69,7 +69,10 @@ func New(pidFile string, upgradesEnabled bool) (*Bootstrap, error) { } gracefulStopCh := make(chan struct{}) - go func() { <-upg.Exit(); close(gracefulStopCh) }() + go func() { + <-upg.Exit() + close(gracefulStopCh) + }() return &Bootstrap{ upgrader: upg, -- GitLab