From 18fc314aa4f37d48c80667489566add646995ef6 Mon Sep 17 00:00:00 2001 From: Adrien Date: Thu, 23 Feb 2023 09:58:36 +0100 Subject: [PATCH 1/2] This PR introduce a ready state for health_manager.go, purpose is, when we received a sigkill (Kubernetes mode), gitaly node health server goes to NOT_SERVING. health_manager.go will detect the not ready state, and will continue for 12s (let's keep 2s just in case) to send traffic to it but will make it unavailable in db in order to trigger a move of primary of any mutator call. Once we are safe, we can pass it to not ready to stop send traffic to it. --- cmd/gitaly/main.go | 19 ++++++- internal/gitaly/service/setup/register.go | 3 -- internal/praefect/nodes/health_manager.go | 52 +++++++++++++------ .../praefect/nodes/health_manager_test.go | 6 +-- 4 files changed, 58 insertions(+), 22 deletions(-) diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go index 2577b1c028f..9d9ef61dae6 100644 --- a/cmd/gitaly/main.go +++ b/cmd/gitaly/main.go @@ -50,6 +50,8 @@ import ( "gitlab.com/gitlab-org/labkit/monitoring" "gitlab.com/gitlab-org/labkit/tracing" "google.golang.org/grpc" + "google.golang.org/grpc/health" + healthpb "google.golang.org/grpc/health/grpc_health_v1" ) var flagVersion = flag.Bool("version", false, "Print version and exit") @@ -327,6 +329,8 @@ func run(cfg config.Cfg) error { concurrencyTracker := hook.NewConcurrencyTracker() prometheus.MustRegister(concurrencyTracker) + var healthServers []*health.Server + for _, c := range []starter.Config{ {Name: starter.Unix, Addr: cfg.SocketPath, HandoverOnUpgrade: true}, {Name: starter.Unix, Addr: cfg.InternalSocketPath(), HandoverOnUpgrade: false}, @@ -368,6 +372,9 @@ func run(cfg config.Cfg) error { HousekeepingManager: housekeepingManager, }) b.RegisterStarter(starter.New(c, srv)) + healthServer := health.NewServer() + healthServers = append(healthServers, healthServer) + healthpb.RegisterHealthServer(srv, healthServer) } if addr := cfg.PrometheusListenAddr; addr != "" { @@ -427,5 +434,15 @@ func run(cfg config.Cfg) error { gracefulStopTicker := helper.NewTimerTicker(cfg.GracefulRestartTimeout.Duration()) defer gracefulStopTicker.Stop() - return b.Wait(gracefulStopTicker, gitalyServerFactory.GracefulStop) + return b.Wait(gracefulStopTicker, func() { + log.Info("Making gitaly server unhealthy") + for _, healthServer := range healthServers { + healthServer.Shutdown() + } + + // Wait for praefect to shut down + time.Sleep(20 * time.Second) + + gitalyServerFactory.GracefulStop() + }) } diff --git a/internal/gitaly/service/setup/register.go b/internal/gitaly/service/setup/register.go index 108ccd5e97d..3a9318a5e4e 100644 --- a/internal/gitaly/service/setup/register.go +++ b/internal/gitaly/service/setup/register.go @@ -23,8 +23,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/service/ssh" "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" "google.golang.org/grpc" - "google.golang.org/grpc/health" - healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/reflection" ) @@ -147,7 +145,6 @@ func RegisterAll(srv *grpc.Server, deps *service.Dependencies) { )) gitalypb.RegisterInternalGitalyServer(srv, internalgitaly.NewServer(deps.GetCfg().Storages)) - healthpb.RegisterHealthServer(srv, health.NewServer()) reflection.Register(srv) grpcprometheus.Register(srv) } diff --git a/internal/praefect/nodes/health_manager.go b/internal/praefect/nodes/health_manager.go index 21cdfc9fe18..7c2d3b72926 100644 --- a/internal/praefect/nodes/health_manager.go +++ b/internal/praefect/nodes/health_manager.go @@ -39,6 +39,8 @@ type HealthManager struct { // clients contains connections to the configured physical storages within each // virtual storage. clients HealthClients + // Tracks the time of the first not ready event per node + notReadyTracker sync.Map // praefectName is the identifier of the Praefect running the HealthManager. It should // be stable through the restarts as they are used to identify quorum members. praefectName string @@ -73,6 +75,7 @@ func NewHealthManager( log.WithError(err).Error("checking health failed") return nil }, + notReadyTracker: sync.Map{}, praefectName: praefectName, healthCheckTimeout: healthcheckTimeout, databaseTimeout: func(ctx context.Context) (context.Context, func()) { @@ -102,8 +105,8 @@ func (hm *HealthManager) Run(ctx context.Context, ticker helper.Ticker) error { case <-ctx.Done(): return ctx.Err() case <-ticker.C(): - virtualStorages, physicalStorages, healthy := hm.performHealthChecks(ctx) - if err := hm.updateHealthChecks(ctx, virtualStorages, physicalStorages, healthy); err != nil { + virtualStorages, physicalStorages, healthy, ready := hm.performHealthChecks(ctx) + if err := hm.updateHealthChecks(ctx, virtualStorages, physicalStorages, healthy, ready); err != nil { if err := hm.handleError(err); err != nil { return err } @@ -126,7 +129,7 @@ func (hm *HealthManager) HealthyNodes() map[string][]string { return hm.locallyHealthy.Load().(map[string][]string) } -func (hm *HealthManager) updateHealthChecks(ctx context.Context, virtualStorages, physicalStorages []string, healthy []bool) error { +func (hm *HealthManager) updateHealthChecks(ctx context.Context, virtualStorages, physicalStorages []string, healthy []bool, ready []bool) error { locallyHealthy := map[string][]string{} for i := range virtualStorages { if !healthy[i] { @@ -143,11 +146,11 @@ func (hm *HealthManager) updateHealthChecks(ctx context.Context, virtualStorages if _, err := hm.db.ExecContext(ctx, ` INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at, last_seen_active_at) -SELECT $1, shard_name, node_name, NOW(), CASE WHEN is_healthy THEN NOW() ELSE NULL END +SELECT $1, shard_name, node_name, NOW(), CASE WHEN is_ready THEN NOW() ELSE NULL END FROM ( SELECT unnest($2::text[]) AS shard_name, unnest($3::text[]) AS node_name, - unnest($4::boolean[]) AS is_healthy + unnest($4::boolean[]) AS is_ready ORDER BY shard_name, node_name ) AS results ON CONFLICT (praefect_name, shard_name, node_name) @@ -158,7 +161,7 @@ ON CONFLICT (praefect_name, shard_name, node_name) hm.praefectName, virtualStorages, physicalStorages, - healthy, + ready, ); err != nil { return fmt.Errorf("update checks: %w", err) } @@ -171,7 +174,7 @@ ON CONFLICT (praefect_name, shard_name, node_name) return nil } -func (hm *HealthManager) performHealthChecks(ctx context.Context) ([]string, []string, []bool) { +func (hm *HealthManager) performHealthChecks(ctx context.Context) ([]string, []string, []bool, []bool) { nodeCount := 0 for _, physicalStorages := range hm.clients { nodeCount += len(physicalStorages) @@ -180,6 +183,7 @@ func (hm *HealthManager) performHealthChecks(ctx context.Context) ([]string, []s virtualStorages := make([]string, nodeCount) physicalStorages := make([]string, nodeCount) healthy := make([]bool, nodeCount) + ready := make([]bool, nodeCount) var wg sync.WaitGroup wg.Add(nodeCount) @@ -198,16 +202,34 @@ func (hm *HealthManager) performHealthChecks(ctx context.Context) ([]string, []s correlationID := correlation.SafeRandomID() ctx := correlation.ContextWithCorrelation(ctx, correlationID) resp, err := client.Check(ctx, &grpc_health_v1.HealthCheckRequest{}) + logger := hm.log.WithFields(logrus.Fields{ + logrus.ErrorKey: err, + "virtual_storage": virtualStorages[i], + "storage": physicalStorages[i], + "correlation_id": correlationID, + }) if err != nil { - hm.log.WithFields(logrus.Fields{ - logrus.ErrorKey: err, - "virtual_storage": virtualStorages[i], - "storage": physicalStorages[i], - "correlation_id": correlationID, - }).Error("failed checking node health") + logger.Error("failed checking node health") } - healthy[i] = resp != nil && resp.Status == grpc_health_v1.HealthCheckResponse_SERVING + // We consider any responses from gitaly node as healthy + healthy[i] = resp != nil + // We consider only `HealthCheckResponse_SERVING` as not ready + ready[i] = resp != nil && resp.Status == grpc_health_v1.HealthCheckResponse_SERVING + + key := fmt.Sprintf("%s-%s", virtualStorages[i], physicalStorages[i]) + if !ready[i] && healthy[i] { + actual, _ := hm.notReadyTracker.LoadOrStore(key, time.Now()) + lastReadyState := actual.(time.Time) + if time.Since(lastReadyState) > 12*time.Second { + healthy[i] = false + logger.Warn("node not ready since more than 12s, making it unhealthy") + } else { + logger.Warnf("node not ready since %s", time.Since(lastReadyState)) + } + } else { + hm.notReadyTracker.Delete(key) + } }(i, client) i++ } @@ -215,5 +237,5 @@ func (hm *HealthManager) performHealthChecks(ctx context.Context) ([]string, []s wg.Wait() - return virtualStorages, physicalStorages, healthy + return virtualStorages, physicalStorages, healthy, ready } diff --git a/internal/praefect/nodes/health_manager_test.go b/internal/praefect/nodes/health_manager_test.go index f53297854f6..7466ac945d5 100644 --- a/internal/praefect/nodes/health_manager_test.go +++ b/internal/praefect/nodes/health_manager_test.go @@ -637,20 +637,20 @@ func TestHealthManager_orderedWrites(t *testing.T) { hm1 := NewHealthManager(testhelper.NewDiscardingLogger(t), tx1, praefectName, nil) hm1.handleError = returnErr - require.NoError(t, hm1.updateHealthChecks(ctx, []string{virtualStorage}, []string{"gitaly-1"}, []bool{true})) + require.NoError(t, hm1.updateHealthChecks(ctx, []string{virtualStorage}, []string{"gitaly-1"}, []bool{true}, []bool{true})) tx2Err := make(chan error, 1) hm2 := NewHealthManager(testhelper.NewDiscardingLogger(t), tx2, praefectName, nil) hm2.handleError = returnErr go func() { - tx2Err <- hm2.updateHealthChecks(ctx, []string{virtualStorage, virtualStorage}, []string{"gitaly-2", "gitaly-1"}, []bool{true, true}) + tx2Err <- hm2.updateHealthChecks(ctx, []string{virtualStorage, virtualStorage}, []string{"gitaly-2", "gitaly-1"}, []bool{true, true}, []bool{true, true}) }() // Wait for tx2 to be blocked on the gitaly-1 lock acquired by tx1 testdb.WaitForBlockedQuery(t, ctx, db, "INSERT INTO node_status") // Ensure tx1 can acquire lock on gitaly-2. - require.NoError(t, hm1.updateHealthChecks(ctx, []string{virtualStorage}, []string{"gitaly-2"}, []bool{true})) + require.NoError(t, hm1.updateHealthChecks(ctx, []string{virtualStorage}, []string{"gitaly-2"}, []bool{true}, []bool{true})) // Committing tx1 releases locks and unblocks tx2. require.NoError(t, tx1.Commit()) -- GitLab From 94ee61c4d252214ab0affe0b04ead8eb0fe62952 Mon Sep 17 00:00:00 2001 From: Adrien Date: Thu, 23 Feb 2023 21:36:24 +0100 Subject: [PATCH 2/2] Add graceful shutdown --- internal/bootstrap/bootstrap.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/bootstrap/bootstrap.go b/internal/bootstrap/bootstrap.go index a0251fee0a7..2489e882037 100644 --- a/internal/bootstrap/bootstrap.go +++ b/internal/bootstrap/bootstrap.go @@ -185,7 +185,10 @@ func (b *Bootstrap) Wait(gracePeriodTicker helper.Ticker, stopAction func()) err err = fmt.Errorf("graceful upgrade: %v", waitError) case s := <-immediateShutdown: - err = fmt.Errorf("received signal %q", s) + // On a kubernetes context, a statefulSet shutdown the old gitaly before starting the new one + // So we need to gracefully wait for request to be done before quitting + waitError := b.waitGracePeriod(gracePeriodTicker, immediateShutdown, stopAction) + err = fmt.Errorf("received signal %q %v", s, waitError) b.upgrader.Stop() case err = <-b.errChan: } -- GitLab