diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go index 2577b1c028f6cd10a8b3ba6ae0ab8598ae8688bd..9d9ef61dae65b16c84289013030171a845297593 100644 --- a/cmd/gitaly/main.go +++ b/cmd/gitaly/main.go @@ -50,6 +50,8 @@ import ( "gitlab.com/gitlab-org/labkit/monitoring" "gitlab.com/gitlab-org/labkit/tracing" "google.golang.org/grpc" + "google.golang.org/grpc/health" + healthpb "google.golang.org/grpc/health/grpc_health_v1" ) var flagVersion = flag.Bool("version", false, "Print version and exit") @@ -327,6 +329,8 @@ func run(cfg config.Cfg) error { concurrencyTracker := hook.NewConcurrencyTracker() prometheus.MustRegister(concurrencyTracker) + var healthServers []*health.Server + for _, c := range []starter.Config{ {Name: starter.Unix, Addr: cfg.SocketPath, HandoverOnUpgrade: true}, {Name: starter.Unix, Addr: cfg.InternalSocketPath(), HandoverOnUpgrade: false}, @@ -368,6 +372,9 @@ func run(cfg config.Cfg) error { HousekeepingManager: housekeepingManager, }) b.RegisterStarter(starter.New(c, srv)) + healthServer := health.NewServer() + healthServers = append(healthServers, healthServer) + healthpb.RegisterHealthServer(srv, healthServer) } if addr := cfg.PrometheusListenAddr; addr != "" { @@ -427,5 +434,15 @@ func run(cfg config.Cfg) error { gracefulStopTicker := helper.NewTimerTicker(cfg.GracefulRestartTimeout.Duration()) defer gracefulStopTicker.Stop() - return b.Wait(gracefulStopTicker, gitalyServerFactory.GracefulStop) + return b.Wait(gracefulStopTicker, func() { + log.Info("Making gitaly server unhealthy") + for _, healthServer := range healthServers { + healthServer.Shutdown() + } + + // Wait for praefect to shut down + time.Sleep(20 * time.Second) + + gitalyServerFactory.GracefulStop() + }) } diff --git a/internal/bootstrap/bootstrap.go b/internal/bootstrap/bootstrap.go index a0251fee0a7b9a0de56b5f4ed830ad8d44cdf7b2..2489e88203799d1c3e67aa16838cc39735db0353 100644 --- a/internal/bootstrap/bootstrap.go +++ b/internal/bootstrap/bootstrap.go @@ -185,7 +185,10 @@ func (b *Bootstrap) Wait(gracePeriodTicker helper.Ticker, stopAction func()) err err = fmt.Errorf("graceful upgrade: %v", waitError) case s := <-immediateShutdown: - err = fmt.Errorf("received signal %q", s) + // On a kubernetes context, a statefulSet shutdown the old gitaly before starting the new one + // So we need to gracefully wait for request to be done before quitting + waitError := b.waitGracePeriod(gracePeriodTicker, immediateShutdown, stopAction) + err = fmt.Errorf("received signal %q %v", s, waitError) b.upgrader.Stop() case err = <-b.errChan: } diff --git a/internal/gitaly/service/setup/register.go b/internal/gitaly/service/setup/register.go index 108ccd5e97dffe7041550901b775c81234002920..3a9318a5e4e5d81341dae13082630c8e99f7b4ce 100644 --- a/internal/gitaly/service/setup/register.go +++ b/internal/gitaly/service/setup/register.go @@ -23,8 +23,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/service/ssh" "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" "google.golang.org/grpc" - "google.golang.org/grpc/health" - healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/reflection" ) @@ -147,7 +145,6 @@ func RegisterAll(srv *grpc.Server, deps *service.Dependencies) { )) gitalypb.RegisterInternalGitalyServer(srv, internalgitaly.NewServer(deps.GetCfg().Storages)) - healthpb.RegisterHealthServer(srv, health.NewServer()) reflection.Register(srv) grpcprometheus.Register(srv) } diff --git a/internal/praefect/nodes/health_manager.go b/internal/praefect/nodes/health_manager.go index 21cdfc9fe181d8ed4b8c7657ebaf45341321129b..7c2d3b72926d7b7b702439fa1dfed358d1525b04 100644 --- a/internal/praefect/nodes/health_manager.go +++ b/internal/praefect/nodes/health_manager.go @@ -39,6 +39,8 @@ type HealthManager struct { // clients contains connections to the configured physical storages within each // virtual storage. clients HealthClients + // Tracks the time of the first not ready event per node + notReadyTracker sync.Map // praefectName is the identifier of the Praefect running the HealthManager. It should // be stable through the restarts as they are used to identify quorum members. praefectName string @@ -73,6 +75,7 @@ func NewHealthManager( log.WithError(err).Error("checking health failed") return nil }, + notReadyTracker: sync.Map{}, praefectName: praefectName, healthCheckTimeout: healthcheckTimeout, databaseTimeout: func(ctx context.Context) (context.Context, func()) { @@ -102,8 +105,8 @@ func (hm *HealthManager) Run(ctx context.Context, ticker helper.Ticker) error { case <-ctx.Done(): return ctx.Err() case <-ticker.C(): - virtualStorages, physicalStorages, healthy := hm.performHealthChecks(ctx) - if err := hm.updateHealthChecks(ctx, virtualStorages, physicalStorages, healthy); err != nil { + virtualStorages, physicalStorages, healthy, ready := hm.performHealthChecks(ctx) + if err := hm.updateHealthChecks(ctx, virtualStorages, physicalStorages, healthy, ready); err != nil { if err := hm.handleError(err); err != nil { return err } @@ -126,7 +129,7 @@ func (hm *HealthManager) HealthyNodes() map[string][]string { return hm.locallyHealthy.Load().(map[string][]string) } -func (hm *HealthManager) updateHealthChecks(ctx context.Context, virtualStorages, physicalStorages []string, healthy []bool) error { +func (hm *HealthManager) updateHealthChecks(ctx context.Context, virtualStorages, physicalStorages []string, healthy []bool, ready []bool) error { locallyHealthy := map[string][]string{} for i := range virtualStorages { if !healthy[i] { @@ -143,11 +146,11 @@ func (hm *HealthManager) updateHealthChecks(ctx context.Context, virtualStorages if _, err := hm.db.ExecContext(ctx, ` INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at, last_seen_active_at) -SELECT $1, shard_name, node_name, NOW(), CASE WHEN is_healthy THEN NOW() ELSE NULL END +SELECT $1, shard_name, node_name, NOW(), CASE WHEN is_ready THEN NOW() ELSE NULL END FROM ( SELECT unnest($2::text[]) AS shard_name, unnest($3::text[]) AS node_name, - unnest($4::boolean[]) AS is_healthy + unnest($4::boolean[]) AS is_ready ORDER BY shard_name, node_name ) AS results ON CONFLICT (praefect_name, shard_name, node_name) @@ -158,7 +161,7 @@ ON CONFLICT (praefect_name, shard_name, node_name) hm.praefectName, virtualStorages, physicalStorages, - healthy, + ready, ); err != nil { return fmt.Errorf("update checks: %w", err) } @@ -171,7 +174,7 @@ ON CONFLICT (praefect_name, shard_name, node_name) return nil } -func (hm *HealthManager) performHealthChecks(ctx context.Context) ([]string, []string, []bool) { +func (hm *HealthManager) performHealthChecks(ctx context.Context) ([]string, []string, []bool, []bool) { nodeCount := 0 for _, physicalStorages := range hm.clients { nodeCount += len(physicalStorages) @@ -180,6 +183,7 @@ func (hm *HealthManager) performHealthChecks(ctx context.Context) ([]string, []s virtualStorages := make([]string, nodeCount) physicalStorages := make([]string, nodeCount) healthy := make([]bool, nodeCount) + ready := make([]bool, nodeCount) var wg sync.WaitGroup wg.Add(nodeCount) @@ -198,16 +202,34 @@ func (hm *HealthManager) performHealthChecks(ctx context.Context) ([]string, []s correlationID := correlation.SafeRandomID() ctx := correlation.ContextWithCorrelation(ctx, correlationID) resp, err := client.Check(ctx, &grpc_health_v1.HealthCheckRequest{}) + logger := hm.log.WithFields(logrus.Fields{ + logrus.ErrorKey: err, + "virtual_storage": virtualStorages[i], + "storage": physicalStorages[i], + "correlation_id": correlationID, + }) if err != nil { - hm.log.WithFields(logrus.Fields{ - logrus.ErrorKey: err, - "virtual_storage": virtualStorages[i], - "storage": physicalStorages[i], - "correlation_id": correlationID, - }).Error("failed checking node health") + logger.Error("failed checking node health") } - healthy[i] = resp != nil && resp.Status == grpc_health_v1.HealthCheckResponse_SERVING + // We consider any responses from gitaly node as healthy + healthy[i] = resp != nil + // We consider only `HealthCheckResponse_SERVING` as not ready + ready[i] = resp != nil && resp.Status == grpc_health_v1.HealthCheckResponse_SERVING + + key := fmt.Sprintf("%s-%s", virtualStorages[i], physicalStorages[i]) + if !ready[i] && healthy[i] { + actual, _ := hm.notReadyTracker.LoadOrStore(key, time.Now()) + lastReadyState := actual.(time.Time) + if time.Since(lastReadyState) > 12*time.Second { + healthy[i] = false + logger.Warn("node not ready since more than 12s, making it unhealthy") + } else { + logger.Warnf("node not ready since %s", time.Since(lastReadyState)) + } + } else { + hm.notReadyTracker.Delete(key) + } }(i, client) i++ } @@ -215,5 +237,5 @@ func (hm *HealthManager) performHealthChecks(ctx context.Context) ([]string, []s wg.Wait() - return virtualStorages, physicalStorages, healthy + return virtualStorages, physicalStorages, healthy, ready } diff --git a/internal/praefect/nodes/health_manager_test.go b/internal/praefect/nodes/health_manager_test.go index f53297854f66dc42163e85165df803c1c484a79e..7466ac945d58dc7a25720e3e3b079e06e2628d62 100644 --- a/internal/praefect/nodes/health_manager_test.go +++ b/internal/praefect/nodes/health_manager_test.go @@ -637,20 +637,20 @@ func TestHealthManager_orderedWrites(t *testing.T) { hm1 := NewHealthManager(testhelper.NewDiscardingLogger(t), tx1, praefectName, nil) hm1.handleError = returnErr - require.NoError(t, hm1.updateHealthChecks(ctx, []string{virtualStorage}, []string{"gitaly-1"}, []bool{true})) + require.NoError(t, hm1.updateHealthChecks(ctx, []string{virtualStorage}, []string{"gitaly-1"}, []bool{true}, []bool{true})) tx2Err := make(chan error, 1) hm2 := NewHealthManager(testhelper.NewDiscardingLogger(t), tx2, praefectName, nil) hm2.handleError = returnErr go func() { - tx2Err <- hm2.updateHealthChecks(ctx, []string{virtualStorage, virtualStorage}, []string{"gitaly-2", "gitaly-1"}, []bool{true, true}) + tx2Err <- hm2.updateHealthChecks(ctx, []string{virtualStorage, virtualStorage}, []string{"gitaly-2", "gitaly-1"}, []bool{true, true}, []bool{true, true}) }() // Wait for tx2 to be blocked on the gitaly-1 lock acquired by tx1 testdb.WaitForBlockedQuery(t, ctx, db, "INSERT INTO node_status") // Ensure tx1 can acquire lock on gitaly-2. - require.NoError(t, hm1.updateHealthChecks(ctx, []string{virtualStorage}, []string{"gitaly-2"}, []bool{true})) + require.NoError(t, hm1.updateHealthChecks(ctx, []string{virtualStorage}, []string{"gitaly-2"}, []bool{true}, []bool{true})) // Committing tx1 releases locks and unblocks tx2. require.NoError(t, tx1.Commit())