diff --git a/internal/cli/praefect/serve.go b/internal/cli/praefect/serve.go index 5904dc072d6021cf7bbf31141c98bd0c7d0fdf0e..d1fedab02ca2157a7020353efa8d75174946a802 100644 --- a/internal/cli/praefect/serve.go +++ b/internal/cli/praefect/serve.go @@ -10,6 +10,8 @@ import ( "runtime/debug" "time" + "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/rebalancer" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -507,6 +509,25 @@ func server( } } + if interval := conf.Rebalancer.SchedulingInterval.Duration(); interval > 0 { + r := rebalancer.NewRebalancer( + logger, + db, + nodeSet, + healthChecker, + conf.StorageNames(), + conf.Rebalancer.BatchSize, + conf.Rebalancer.Ratio, + conf.Rebalancer.HistogramBuckets, + ) + promreg.MustRegister(r) + go func() { + if err := r.Run(ctx, helper.NewTimerTicker(interval)); err != nil { + logger.WithError(err).Error("rebalancer finished execution") + } + }() + } + if interval := conf.RepositoriesCleanup.RunInterval.Duration(); interval > 0 { if db != nil { go func() { diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index de415ecdc306247bf0d7ad0728438633f75e369c..4aec641a8d68c6e44c193b1a7f4c6e1db37d7088 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -219,6 +219,7 @@ type Config struct { GracefulStopTimeout duration.Duration `toml:"graceful_stop_timeout,omitempty"` RepositoriesCleanup RepositoriesCleanup `toml:"repositories_cleanup,omitempty"` Yamux Yamux `toml:"yamux,omitempty"` + Rebalancer Rebalancer `toml:"rebalancer,omitempty"` } // Yamux contains Yamux related configuration values. @@ -557,3 +558,11 @@ func DefaultRepositoriesCleanup() RepositoriesCleanup { RepositoriesInBatch: 16, } } + +// Rebalancer contains rebalancer specific configuration options. +type Rebalancer struct { + SchedulingInterval duration.Duration `toml:"scheduling_interval,omitempty"` + BatchSize int `toml:"batch_size"` + Ratio float64 `toml:"ratio"` + HistogramBuckets []float64 `toml:"histogram_buckets,omitempty"` +} diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go index dfa9d586ef7692d7765a9364806e25a16925386b..17c6c74c97ab2a85e1fbeece07e2ccbb8c1c659e 100644 --- a/internal/praefect/datastore/datastore.go +++ b/internal/praefect/datastore/datastore.go @@ -48,6 +48,8 @@ const ( DeleteReplica = ChangeType("delete_replica") // RenameRepo is when a replication renames repo RenameRepo = ChangeType("rename") + // MoveRepo repo storage across virtual_storage + MoveRepo = ChangeType("move") ) // GetAllChangeTypes is used to define and provide all the various ChangeType diff --git a/internal/praefect/rebalancer/rebalancer.go b/internal/praefect/rebalancer/rebalancer.go new file mode 100644 index 0000000000000000000000000000000000000000..c8ef1e413ddc93f631211342431ae0a3a31abcd9 --- /dev/null +++ b/internal/praefect/rebalancer/rebalancer.go @@ -0,0 +1,355 @@ +package rebalancer + +import ( + "context" + "fmt" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/v16/internal/helper" + "gitlab.com/gitlab-org/gitaly/v16/internal/praefect" + "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore/advisorylock" + "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" +) + +// Rebalancer implements rebalancer logic for rebalacing repo across storage +type Rebalancer struct { + log logrus.FieldLogger + db glsql.Querier + nodes praefect.NodeSet + healthChecker praefect.HealthChecker + storages map[string][]string + batchSize int + ratio float64 + rebalanceDuration prometheus.Histogram + rebalanceCounter *prometheus.CounterVec +} + +func NewRebalancer( + log logrus.FieldLogger, + db glsql.Querier, + nodes praefect.NodeSet, + healthChecker praefect.HealthChecker, + storages map[string][]string, + batchSize int, + ratio float64, + buckets []float64) *Rebalancer { + + log = log.WithField("component", "rebalancer") + + r := &Rebalancer{ + log: log, + db: db, + nodes: nodes, + healthChecker: healthChecker, + storages: storages, + batchSize: batchSize, + ratio: ratio, + rebalanceDuration: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "gitaly_praefect_rebalance_scheduling_seconds", + Help: "The time spent performing a single rebalance run.", + Buckets: buckets, + }), + rebalanceCounter: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitaly_praefect_rebalance_count", + Help: "total rebalance job per storage", + }, + []string{"virtual_storage", "storage", "destination_storage"}, + ), + } + + return r +} + +//nolint:revive // This is unintentionally missing documentation. +func (r *Rebalancer) Describe(ch chan<- *prometheus.Desc) { + prometheus.DescribeByCollect(r, ch) +} + +//nolint:revive // This is unintentionally missing documentation. +func (r *Rebalancer) Collect(ch chan<- prometheus.Metric) { + r.rebalanceDuration.Collect(ch) + r.rebalanceCounter.Collect(ch) +} + +// Run rebalance on each tick the Ticker emits. Run returns +// when the context is canceled, returning the error from the context. +func (r *Rebalancer) Run(ctx context.Context, ticker helper.Ticker) error { + if r.batchSize == 0 { + return nil + } + r.log.WithField("storages", r.storages).Info("automatic rebalancer started") + defer r.log.Info("automatic rebalancer stopped") + + defer ticker.Stop() + + for { + ticker.Reset() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C(): + if err := r.rebalance(ctx); err != nil { + r.log.WithError(err).Error("automatic rebalancer failed") + return err + } + } + } +} + +func (r *Rebalancer) rebalance(ctx context.Context) error { + defer prometheus.NewTimer(r.rebalanceDuration).ObserveDuration() + + healthyNodes := r.healthChecker.HealthyNodes() + nodes := make(map[string]map[string]praefect.Node) + for virtualStorage, storages := range r.storages { + healthy := healthyNodes[virtualStorage] + if len(healthy) != len(storages) { + r.log.WithField("virtual_storage", virtualStorage).Info("virtual storage contains not healthy nodes skipping") + continue + } + n := r.nodes[virtualStorage] + nodes[virtualStorage] = n + } + + if len(nodes) == 0 { + return nil + } + + for virtualStorage, nodes := range nodes { + if err := r.rebalanceStorage(ctx, virtualStorage, nodes); err != nil { + r.log.WithError(err).Errorf("rrror rebalancing %s", virtualStorage) + } + } + + return nil +} + +func (r *Rebalancer) rebalanceStorage(ctx context.Context, virtualStorage string, nodes map[string]praefect.Node) error { + logger := r.log.WithField("virtual_storage", virtualStorage) + + stats, err := getNodeStats(ctx, nodes) + if err != nil { + return err + } + + highest := getHighest(stats) + lowests := getLowests(stats, highest, r.ratio) + if len(lowests) == 0 { + logger.Infof("storage is already balanced: %+v", stats) + return nil + } + + // We use the same lock as the reconciler as we need him to create the replication jobs + // Not waiting for him will create a too high difference resulting on big reconcile queries + rows, err := r.db.QueryContext(ctx, ` +-- Acquire a lock +WITH rebalance_lock AS ( + SELECT pg_try_advisory_xact_lock($1) AS acquired +), + +-- subquery to check if we already have job scheduled +count_replication_queue AS ( + SELECT count(1) as count FROM replication_queue WHERE state = 'ready' OR state = 'in_progress' +), + +-- Select random repositories from highest storage +-- We join with repository_assignments and storage_repositories in order to check if the repository is on a nominal state +-- For that, we compare the current storage list with the wanted storage list +-- We also join with storage_repositories in order to check if the generation is uniform within all storage +repositories AS ( + SELECT repositories.repository_id, repositories.virtual_storage, repositories.relative_path, sr.storages as storages + FROM repositories + JOIN repository_assignments USING (repository_id) + + JOIN ( + SELECT COUNT(DISTINCT(generation)) AS distinct_generation, repository_id FROM storage_repositories + GROUP BY repository_id + ) AS g ON g.repository_id = repositories.repository_id + + LEFT JOIN ( + SELECT (job->>'repository_id')::bigint AS repository_id + FROM replication_queue + GROUP BY repository_id + ) AS rq ON rq.repository_id = repositories.repository_id + + JOIN ( + SELECT STRING_AGG(storage, ',' ORDER BY storage) AS storages, repository_id FROM repository_assignments + GROUP BY repository_id + ) AS ra ON ra.repository_id = repositories.repository_id + + JOIN ( + SELECT STRING_AGG(storage, ',' ORDER BY storage) AS storages, repository_id FROM storage_repositories + GROUP BY repository_id + ) AS sr ON sr.repository_id = repositories.repository_id + + WHERE storage = $3 + AND repositories.virtual_storage = $2 + AND g.distinct_generation = 1 -- Check that generation is uniform + AND sr.storages = ra.storages -- Check that storages are the same + AND rq.repository_id IS NULL -- Check that no other job is running for this repo + AND NOT $4::text[] <@ string_to_array(ra.storages, ',') -- Only get repository that are eligible to move to destination + AND (SELECT count FROM count_replication_queue) <= $6 -- Check that replication job are lower than limit + AND (SELECT acquired FROM rebalance_lock) -- Check that we have acquired the lock + ORDER BY random() + LIMIT $5 +), + +-- this function selects distinct rows from the repositories subquery, +-- cross joins them with the elements from the configured_storages array, +-- applies filters to exclude certain combinations of storage and repository_id, +-- and finally inserts the resulting rows into the repository_assignments table. +-- The RETURNING * will allow us to filter action only on moved repositories +created_assignments AS ( + INSERT INTO repository_assignments + SELECT DISTINCT ON (repository_id) virtual_storage, repositories.relative_path, storage, repositories.repository_id + FROM repositories + CROSS JOIN ( + SELECT unnest($4::text[]) AS storage + ) AS configured_storages + -- Filter on existing physical storage, maybe useless but safer + WHERE storage NOT IN ( + SELECT unnest(string_to_array(repositories.storages, ',')) AS storages + ) + ORDER BY repository_id, random() + RETURNING * +), + +-- Delete the old replicas from repository_assignments +-- We only delete with created_assignments result +removed_assignments AS ( + DELETE + FROM repository_assignments + USING created_assignments + WHERE repository_assignments.repository_id = created_assignments.repository_id + AND repository_assignments.storage = $3 + RETURNING * +), + +-- This step will create all jobs in replication_queue +-- The current reconciler takes a lot of times to reconciles physical storage / assignments +-- So we just generate the same replication job as him +reconciliation_jobs AS ( + INSERT INTO replication_queue (lock_id, job, meta) + SELECT + (virtual_storage || '|' || target_node_storage || '|' || relative_path), + to_jsonb(reconciliation_jobs), + jsonb_build_object('correlation_id', encode(random()::text::bytea, 'base64')) + FROM ( + SELECT + repository_id, + virtual_storage, + relative_path, + $3 AS source_node_storage, + storage as target_node_storage, + 'move' AS change + FROM created_assignments + ) AS reconciliation_jobs + RETURNING lock_id, meta, job +), + +create_locks AS ( + INSERT INTO replication_queue_lock(id) + SELECT lock_id + FROM reconciliation_jobs + ON CONFLICT (id) DO NOTHING +) + +SELECT meta->>'correlation_id', job->>'repository_id', job->>'target_node_storage' +FROM reconciliation_jobs; +`, advisorylock.Reconcile, virtualStorage, highest.name, lowests, r.batchSize, 10) + + if err != nil { + return fmt.Errorf("query: %w", err) + } + + defer func() { + if err := rows.Close(); err != nil { + r.log.WithError(err).Error("error closing rows") + } + }() + + size := 0 + for rows.Next() { + size++ + var j rebalanceJob + if err := rows.Scan(&j.CorrelationID, &j.RepositoryID, &j.TargetStorage); err != nil { + return fmt.Errorf("scan: %w", err) + } + + r.rebalanceCounter.WithLabelValues(virtualStorage, highest.name, j.TargetStorage).Inc() + logger. + WithField("rebalance_from", highest.name). + WithField("rebalance_to", j.TargetStorage). + WithField("repository", j.RepositoryID). + WithField("correlation_id", j.CorrelationID). + Infof("moving repo %d from %q to %q", j.RepositoryID, highest.name, j.TargetStorage) + } + + if err = rows.Err(); err != nil { + return fmt.Errorf("rows.Err: %w", err) + } + + if size > 0 { + logger.Infof("rebalancing node %q to %+v with %d events. Nodes: %+v", highest.name, lowests, size, stats) + } + + return nil +} + +type NodeStat struct { + name string + used float64 +} + +func getNodeStats(ctx context.Context, nodes map[string]praefect.Node) ([]NodeStat, error) { + var stats []NodeStat + for name, node := range nodes { + server := gitalypb.NewServerServiceClient(node.Connection) + stat, err := server.DiskStatistics(ctx, &gitalypb.DiskStatisticsRequest{}) + if err != nil { + return nil, err + } + for _, r := range stat.StorageStatuses { + if r.StorageName == name { + stats = append(stats, NodeStat{r.StorageName, float64(r.Used) / float64(r.Used+r.Available)}) + } + } + } + return stats, nil +} + +func getLowests(nodes []NodeStat, highest NodeStat, threshold float64) []string { + lowests := make([]string, 0) + + for _, node := range nodes { + if node.used <= highest.used*threshold { + lowests = append(lowests, node.name) + } + } + + return lowests +} + +func getHighest(nodes []NodeStat) NodeStat { + // Get the highest used node + highest := NodeStat{ + used: 0.0, + } + for _, node := range nodes { + node := node + if node.used > highest.used { + highest = node + } + } + return highest +} + +// job is an internal type for formatting log messages +type rebalanceJob struct { + RepositoryID int64 `json:"repository_id"` + CorrelationID string `json:"correlation_id"` + TargetStorage string `json:"target_storage"` +} diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 5dbb8baac4c151dee28c725a7e02ad0e16900431..95595bbae8af64d9497dffd7614148c3a81353ff 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -639,6 +639,25 @@ func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.Re err = r.replicator.Destroy(ctx, event, targetCC) case datastore.RenameRepo: err = r.replicator.Rename(ctx, event, targetCC) + case datastore.MoveRepo: + source, ok := r.nodes[event.Job.VirtualStorage][event.Job.SourceNodeStorage] + if !ok { + return fmt.Errorf("no connection to source node %q/%q", event.Job.VirtualStorage, event.Job.SourceNodeStorage) + } + + ctx, err = storage.InjectGitalyServers(ctx, event.Job.SourceNodeStorage, source.Address, source.Token) + if err != nil { + return fmt.Errorf("inject Gitaly servers into context: %w", err) + } + + // First we replicate + if err := r.replicator.Replicate(ctx, event, source.Connection, targetCC); err != nil { + return fmt.Errorf("error replicating: %w", err) + } + + // Then we delete, we need to switch set target from source + event.Job.TargetNodeStorage = event.Job.SourceNodeStorage + err = r.replicator.Destroy(ctx, event, source.Connection) default: err = fmt.Errorf("unknown replication change type encountered: %q", event.Job.Change) }