From 9331879e97898119b57ceda98285df9901616e0d Mon Sep 17 00:00:00 2001 From: Adrien Date: Wed, 21 Jun 2023 11:41:18 +0200 Subject: [PATCH 1/3] Rebalancer draft used in huggingface --- internal/cli/praefect/serve.go | 21 ++ internal/praefect/config/config.go | 9 + internal/praefect/datastore/datastore.go | 2 + internal/praefect/rebalancer/rebalancer.go | 355 +++++++++++++++++++++ internal/praefect/replicator.go | 19 ++ 5 files changed, 406 insertions(+) create mode 100644 internal/praefect/rebalancer/rebalancer.go diff --git a/internal/cli/praefect/serve.go b/internal/cli/praefect/serve.go index 5904dc072d6..d1fedab02ca 100644 --- a/internal/cli/praefect/serve.go +++ b/internal/cli/praefect/serve.go @@ -10,6 +10,8 @@ import ( "runtime/debug" "time" + "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/rebalancer" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -507,6 +509,25 @@ func server( } } + if interval := conf.Rebalancer.SchedulingInterval.Duration(); interval > 0 { + r := rebalancer.NewRebalancer( + logger, + db, + nodeSet, + healthChecker, + conf.StorageNames(), + conf.Rebalancer.BatchSize, + conf.Rebalancer.Ratio, + conf.Rebalancer.HistogramBuckets, + ) + promreg.MustRegister(r) + go func() { + if err := r.Run(ctx, helper.NewTimerTicker(interval)); err != nil { + logger.WithError(err).Error("rebalancer finished execution") + } + }() + } + if interval := conf.RepositoriesCleanup.RunInterval.Duration(); interval > 0 { if db != nil { go func() { diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index de415ecdc30..4aec641a8d6 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -219,6 +219,7 @@ type Config struct { GracefulStopTimeout duration.Duration `toml:"graceful_stop_timeout,omitempty"` RepositoriesCleanup RepositoriesCleanup `toml:"repositories_cleanup,omitempty"` Yamux Yamux `toml:"yamux,omitempty"` + Rebalancer Rebalancer `toml:"rebalancer,omitempty"` } // Yamux contains Yamux related configuration values. @@ -557,3 +558,11 @@ func DefaultRepositoriesCleanup() RepositoriesCleanup { RepositoriesInBatch: 16, } } + +// Rebalancer contains rebalancer specific configuration options. +type Rebalancer struct { + SchedulingInterval duration.Duration `toml:"scheduling_interval,omitempty"` + BatchSize int `toml:"batch_size"` + Ratio float64 `toml:"ratio"` + HistogramBuckets []float64 `toml:"histogram_buckets,omitempty"` +} diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go index dfa9d586ef7..17c6c74c97a 100644 --- a/internal/praefect/datastore/datastore.go +++ b/internal/praefect/datastore/datastore.go @@ -48,6 +48,8 @@ const ( DeleteReplica = ChangeType("delete_replica") // RenameRepo is when a replication renames repo RenameRepo = ChangeType("rename") + // MoveRepo repo storage across virtual_storage + MoveRepo = ChangeType("move") ) // GetAllChangeTypes is used to define and provide all the various ChangeType diff --git a/internal/praefect/rebalancer/rebalancer.go b/internal/praefect/rebalancer/rebalancer.go new file mode 100644 index 00000000000..7c30cd69726 --- /dev/null +++ b/internal/praefect/rebalancer/rebalancer.go @@ -0,0 +1,355 @@ +package rebalancer + +import ( + "context" + "fmt" + + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/v16/internal/helper" + "gitlab.com/gitlab-org/gitaly/v16/internal/praefect" + "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore/advisorylock" + "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" +) + +// Rebalancer implements rebalancer logic for rebalacing repo across storage +type Rebalancer struct { + log logrus.FieldLogger + db glsql.Querier + nodes praefect.NodeSet + healthChecker praefect.HealthChecker + storages map[string][]string + batchSize int + ratio float64 + rebalanceDuration prometheus.Histogram + rebalanceCounter *prometheus.CounterVec +} + +func NewRebalancer( + log logrus.FieldLogger, + db glsql.Querier, + nodes praefect.NodeSet, + healthChecker praefect.HealthChecker, + storages map[string][]string, + batchSize int, + ratio float64, + buckets []float64, +) *Rebalancer { + log = log.WithField("component", "rebalancer") + + r := &Rebalancer{ + log: log, + db: db, + nodes: nodes, + healthChecker: healthChecker, + storages: storages, + batchSize: batchSize, + ratio: ratio, + rebalanceDuration: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "gitaly_praefect_rebalance_scheduling_seconds", + Help: "The time spent performing a single rebalance run.", + Buckets: buckets, + }), + rebalanceCounter: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitaly_praefect_rebalance_count", + Help: "total rebalance job per storage", + }, + []string{"virtual_storage", "storage", "destination_storage"}, + ), + } + + return r +} + +//nolint:revive // This is unintentionally missing documentation. +func (r *Rebalancer) Describe(ch chan<- *prometheus.Desc) { + prometheus.DescribeByCollect(r, ch) +} + +//nolint:revive // This is unintentionally missing documentation. +func (r *Rebalancer) Collect(ch chan<- prometheus.Metric) { + r.rebalanceDuration.Collect(ch) + r.rebalanceCounter.Collect(ch) +} + +// Run rebalance on each tick the Ticker emits. Run returns +// when the context is canceled, returning the error from the context. +func (r *Rebalancer) Run(ctx context.Context, ticker helper.Ticker) error { + if r.batchSize == 0 { + return nil + } + r.log.WithField("storages", r.storages).Info("automatic rebalancer started") + defer r.log.Info("automatic rebalancer stopped") + + defer ticker.Stop() + + for { + ticker.Reset() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C(): + if err := r.rebalance(ctx); err != nil { + r.log.WithError(err).Error("automatic rebalancer failed") + return err + } + } + } +} + +func (r *Rebalancer) rebalance(ctx context.Context) error { + defer prometheus.NewTimer(r.rebalanceDuration).ObserveDuration() + + healthyNodes := r.healthChecker.HealthyNodes() + nodes := make(map[string]map[string]praefect.Node) + for virtualStorage, storages := range r.storages { + healthy := healthyNodes[virtualStorage] + if len(healthy) != len(storages) { + r.log.WithField("virtual_storage", virtualStorage).Info("virtual storage contains not healthy nodes skipping") + continue + } + n := r.nodes[virtualStorage] + nodes[virtualStorage] = n + } + + if len(nodes) == 0 { + return nil + } + + for virtualStorage, nodes := range nodes { + if err := r.rebalanceStorage(ctx, virtualStorage, nodes); err != nil { + r.log.WithError(err).Errorf("rrror rebalancing %s", virtualStorage) + } + } + + return nil +} + +func (r *Rebalancer) rebalanceStorage(ctx context.Context, virtualStorage string, nodes map[string]praefect.Node) error { + logger := r.log.WithField("virtual_storage", virtualStorage) + + stats, err := getNodeStats(ctx, nodes) + if err != nil { + return err + } + + highest := getHighest(stats) + lowests := getLowests(stats, highest, r.ratio) + if len(lowests) == 0 { + logger.Infof("storage is already balanced: %+v", stats) + return nil + } + + logger.Infof("rebalancing node %q to %+v. Nodes :%+v", highest.name, lowests, stats) + + // We use the same lock as the reconciler as we need him to create the replication jobs + // Not waiting for him will create a too high difference resulting on big reconcile queries + rows, err := r.db.QueryContext(ctx, ` +WITH + +rebalance_lock AS ( + SELECT pg_try_advisory_xact_lock($1) AS acquired +), + +-- subquery to check if we already have job scheduled +count_replication_queue AS ( + SELECT count(1) as count FROM replication_queue +), + +-- Select random repositories from highest storage +-- We join with repository_assignments and storage_repositories in order to check if the repository is on a nominal state +-- For that, we compare the current storage list with the wanted storage list +-- We also join with storage_repositories in order to check if the generation is uniform within all storage +repository AS ( + SELECT repositories.repository_id, repositories.virtual_storage, repositories.relative_path + FROM repositories + JOIN repository_assignments USING (repository_id) + JOIN ( + SELECT COUNT(distinct(generation)) AS distinct_generation, repository_id FROM storage_repositories + GROUP BY repository_id + ) AS g ON g.repository_id = repositories.repository_id + + JOIN ( + SELECT STRING_AGG(storage, ',' ORDER BY storage) AS storages, repository_id FROM repository_assignments + GROUP BY repository_id + ) AS ra ON ra.repository_id = repositories.repository_id + + JOIN ( + SELECT STRING_AGG(storage, ',' ORDER BY storage) AS storages, repository_id FROM storage_repositories + GROUP BY repository_id + ) AS sr ON sr.repository_id = repositories.repository_id + + WHERE storage = $3 + AND repositories.virtual_storage = $2 + AND g.distinct_generation = 1 -- Check that generation is uniform + AND sr.storages = ra.storages -- Check that storages are the same + AND (SELECT count FROM count_replication_queue) = 0 -- Check that no replication job exists + AND (SELECT acquired FROM rebalance_lock) -- Check that we have acquired the lock + ORDER BY random() + LIMIT $5 +), + +-- Get all current physical storage, we will need to select a source_target_node from it for the replication +existing_storages AS ( + SELECT storage, repository.repository_id + FROM repository + JOIN storage_repositories USING (repository_id, virtual_storage, relative_path) + WHERE repository.repository_id = storage_repositories.repository_id +), + +-- this function selects distinct rows from the repository table, +-- cross joins them with the elements from the configured_storages array, +-- applies filters to exclude certain combinations of storage and repository_id, +-- and finally inserts the resulting rows into the repository_assignments table. +-- The RETURNING * will allow us to filter action only on moved repositories +created_assignments AS ( + INSERT INTO repository_assignments + SELECT DISTINCT ON (repository_id) virtual_storage, repository.relative_path, storage, repository.repository_id + FROM repository + CROSS JOIN ( + SELECT unnest($4::text[]) AS storage + ) AS configured_storages + -- Filter on existing physical storage, maybe useless but safer + WHERE storage NOT IN ( + SELECT storage + FROM existing_storages + WHERE repository_id = repository.repository_id + ) + ORDER BY repository_id, random() + RETURNING * +), + +-- Delete the old replicas from repository_assignments +-- We only delete with created_assignments result +removed_assignments AS ( + DELETE + FROM repository_assignments + USING created_assignments + WHERE repository_assignments.repository_id = created_assignments.repository_id + AND repository_assignments.storage = $3 + RETURNING * +), + +-- This step will create all jobs in replication_queue +-- The current reconciler takes a lot of times to reconciles physical storage / assignments +-- So we just generate the same replication job as him +reconciliation_jobs AS ( + INSERT INTO replication_queue (lock_id, job, meta) + SELECT + (virtual_storage || '|' || target_node_storage || '|' || relative_path), + to_jsonb(reconciliation_jobs), + jsonb_build_object('correlation_id', encode(random()::text::bytea, 'base64')) + FROM ( + SELECT + repository_id, + virtual_storage, + relative_path, + $3 AS source_node_storage, + storage as target_node_storage, + 'move' AS change + FROM created_assignments + ) AS reconciliation_jobs + RETURNING lock_id, meta, job +), + +create_locks AS ( + INSERT INTO replication_queue_lock(id) + SELECT lock_id + FROM reconciliation_jobs + ON CONFLICT (id) DO NOTHING +) + +SELECT meta->>'correlation_id', job->>'repository_id', job->>'target_node_storage' +FROM reconciliation_jobs; +`, advisorylock.Reconcile, virtualStorage, highest.name, lowests, r.batchSize) + if err != nil { + return fmt.Errorf("query: %w", err) + } + + defer func() { + if err := rows.Close(); err != nil { + r.log.WithError(err).Error("error closing rows") + } + }() + + for rows.Next() { + var j rebalanceJob + if err := rows.Scan(&j.CorrelationID, &j.RepositoryID, &j.TargetStorage); err != nil { + return fmt.Errorf("scan: %w", err) + } + + r.rebalanceCounter.WithLabelValues(virtualStorage, highest.name, j.TargetStorage).Inc() + logger. + WithField("rebalance_from", highest.name). + WithField("rebalance_to", j.TargetStorage). + WithField("repository", j.RepositoryID). + WithField("correlation_id", j.CorrelationID). + Infof("moving repo %d from %q to %q", j.RepositoryID, highest.name, j.TargetStorage) + } + + if err = rows.Err(); err != nil { + return fmt.Errorf("rows.Err: %w", err) + } + + logger.Infof("rebalance done") + + return nil +} + +type NodeStat struct { + name string + used float64 +} + +func getNodeStats(ctx context.Context, nodes map[string]praefect.Node) ([]NodeStat, error) { + var stats []NodeStat + for name, node := range nodes { + server := gitalypb.NewServerServiceClient(node.Connection) + stat, err := server.DiskStatistics(ctx, &gitalypb.DiskStatisticsRequest{}) + if err != nil { + return nil, err + } + for _, r := range stat.StorageStatuses { + if r.StorageName == name { + stats = append(stats, NodeStat{r.StorageName, float64(r.Used) / float64(r.Used+r.Available)}) + } + } + } + return stats, nil +} + +func getLowests(nodes []NodeStat, highest NodeStat, threshold float64) []string { + lowests := make([]string, 0) + + for _, node := range nodes { + if node.used <= highest.used*threshold { + lowests = append(lowests, node.name) + } + } + + return lowests +} + +func getHighest(nodes []NodeStat) NodeStat { + // Get the highest used node + highest := NodeStat{ + used: 0.0, + } + for _, node := range nodes { + node := node + if node.used > highest.used { + highest = node + } + } + return highest +} + +// job is an internal type for formatting log messages +type rebalanceJob struct { + RepositoryID int64 `json:"repository_id"` + CorrelationID string `json:"correlation_id"` + TargetStorage string `json:"target_storage"` +} diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 5dbb8baac4c..95595bbae8a 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -639,6 +639,25 @@ func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.Re err = r.replicator.Destroy(ctx, event, targetCC) case datastore.RenameRepo: err = r.replicator.Rename(ctx, event, targetCC) + case datastore.MoveRepo: + source, ok := r.nodes[event.Job.VirtualStorage][event.Job.SourceNodeStorage] + if !ok { + return fmt.Errorf("no connection to source node %q/%q", event.Job.VirtualStorage, event.Job.SourceNodeStorage) + } + + ctx, err = storage.InjectGitalyServers(ctx, event.Job.SourceNodeStorage, source.Address, source.Token) + if err != nil { + return fmt.Errorf("inject Gitaly servers into context: %w", err) + } + + // First we replicate + if err := r.replicator.Replicate(ctx, event, source.Connection, targetCC); err != nil { + return fmt.Errorf("error replicating: %w", err) + } + + // Then we delete, we need to switch set target from source + event.Job.TargetNodeStorage = event.Job.SourceNodeStorage + err = r.replicator.Destroy(ctx, event, source.Connection) default: err = fmt.Errorf("unknown replication change type encountered: %q", event.Job.Change) } -- GitLab From 1ab01dda1ac1f8c42299f2af8f594fb8763b5180 Mon Sep 17 00:00:00 2001 From: Adrien Date: Wed, 21 Jun 2023 17:45:35 +0200 Subject: [PATCH 2/3] update query --- internal/praefect/rebalancer/rebalancer.go | 63 +++++++++++----------- 1 file changed, 33 insertions(+), 30 deletions(-) diff --git a/internal/praefect/rebalancer/rebalancer.go b/internal/praefect/rebalancer/rebalancer.go index 7c30cd69726..6e1e2a43174 100644 --- a/internal/praefect/rebalancer/rebalancer.go +++ b/internal/praefect/rebalancer/rebalancer.go @@ -3,7 +3,6 @@ package rebalancer import ( "context" "fmt" - "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" @@ -34,8 +33,8 @@ func NewRebalancer( storages map[string][]string, batchSize int, ratio float64, - buckets []float64, -) *Rebalancer { + buckets []float64) *Rebalancer { + log = log.WithField("component", "rebalancer") r := &Rebalancer{ @@ -143,35 +142,38 @@ func (r *Rebalancer) rebalanceStorage(ctx context.Context, virtualStorage string return nil } - logger.Infof("rebalancing node %q to %+v. Nodes :%+v", highest.name, lowests, stats) - // We use the same lock as the reconciler as we need him to create the replication jobs // Not waiting for him will create a too high difference resulting on big reconcile queries rows, err := r.db.QueryContext(ctx, ` -WITH - -rebalance_lock AS ( +-- Acquire a lock +WITH rebalance_lock AS ( SELECT pg_try_advisory_xact_lock($1) AS acquired ), -- subquery to check if we already have job scheduled count_replication_queue AS ( - SELECT count(1) as count FROM replication_queue + SELECT count(1) as count FROM replication_queue WHERE state = 'ready' OR state = 'in_progress' ), --- Select random repositories from highest storage -- We join with repository_assignments and storage_repositories in order to check if the repository is on a nominal state -- For that, we compare the current storage list with the wanted storage list -- We also join with storage_repositories in order to check if the generation is uniform within all storage -repository AS ( - SELECT repositories.repository_id, repositories.virtual_storage, repositories.relative_path +eligible_repositories AS ( + SELECT repositories.repository_id, repositories.virtual_storage, repositories.relative_path, sr.storages as storages FROM repositories JOIN repository_assignments USING (repository_id) + JOIN ( - SELECT COUNT(distinct(generation)) AS distinct_generation, repository_id FROM storage_repositories + SELECT COUNT(DISTINCT(generation)) AS distinct_generation, repository_id FROM storage_repositories GROUP BY repository_id ) AS g ON g.repository_id = repositories.repository_id + LEFT JOIN ( + SELECT (job->>'repository_id')::bigint AS repository_id + FROM replication_queue + GROUP BY repository_id + ) AS rq ON rq.repository_id = repositories.repository_id + JOIN ( SELECT STRING_AGG(storage, ',' ORDER BY storage) AS storages, repository_id FROM repository_assignments GROUP BY repository_id @@ -186,37 +188,33 @@ repository AS ( AND repositories.virtual_storage = $2 AND g.distinct_generation = 1 -- Check that generation is uniform AND sr.storages = ra.storages -- Check that storages are the same - AND (SELECT count FROM count_replication_queue) = 0 -- Check that no replication job exists + AND rq.repository_id IS NULL -- Check that no other job is running for this repo + AND (SELECT count FROM count_replication_queue) <= $6 -- Check that replication job are lower than limit AND (SELECT acquired FROM rebalance_lock) -- Check that we have acquired the lock - ORDER BY random() - LIMIT $5 ), --- Get all current physical storage, we will need to select a source_target_node from it for the replication -existing_storages AS ( - SELECT storage, repository.repository_id - FROM repository - JOIN storage_repositories USING (repository_id, virtual_storage, relative_path) - WHERE repository.repository_id = storage_repositories.repository_id +-- Select random repositories from highest storage +repositories AS ( + SELECT * FROM eligible_repositories + ORDER BY random() + LIMIT $5 ), --- this function selects distinct rows from the repository table, +-- this function selects distinct rows from the repositories subquery, -- cross joins them with the elements from the configured_storages array, -- applies filters to exclude certain combinations of storage and repository_id, -- and finally inserts the resulting rows into the repository_assignments table. -- The RETURNING * will allow us to filter action only on moved repositories created_assignments AS ( INSERT INTO repository_assignments - SELECT DISTINCT ON (repository_id) virtual_storage, repository.relative_path, storage, repository.repository_id - FROM repository + SELECT DISTINCT ON (repository_id) virtual_storage, repositories.relative_path, storage, repositories.repository_id + FROM repositories CROSS JOIN ( SELECT unnest($4::text[]) AS storage ) AS configured_storages -- Filter on existing physical storage, maybe useless but safer WHERE storage NOT IN ( - SELECT storage - FROM existing_storages - WHERE repository_id = repository.repository_id + SELECT unnest(string_to_array(repositories.storages, ',')) AS storages ) ORDER BY repository_id, random() RETURNING * @@ -264,7 +262,8 @@ create_locks AS ( SELECT meta->>'correlation_id', job->>'repository_id', job->>'target_node_storage' FROM reconciliation_jobs; -`, advisorylock.Reconcile, virtualStorage, highest.name, lowests, r.batchSize) +`, advisorylock.Reconcile, virtualStorage, highest.name, lowests, r.batchSize, 10) + if err != nil { return fmt.Errorf("query: %w", err) } @@ -275,7 +274,9 @@ FROM reconciliation_jobs; } }() + size := 0 for rows.Next() { + size++ var j rebalanceJob if err := rows.Scan(&j.CorrelationID, &j.RepositoryID, &j.TargetStorage); err != nil { return fmt.Errorf("scan: %w", err) @@ -294,7 +295,9 @@ FROM reconciliation_jobs; return fmt.Errorf("rows.Err: %w", err) } - logger.Infof("rebalance done") + if size > 0 { + logger.Infof("rebalancing node %q to %+v with %d events. Nodes: %+v", highest.name, lowests, size, stats) + } return nil } -- GitLab From eaada4b807e4f106a188f71da972421a41fb969b Mon Sep 17 00:00:00 2001 From: Adrien Date: Wed, 21 Jun 2023 18:32:49 +0200 Subject: [PATCH 3/3] remove useless subquery --- internal/praefect/rebalancer/rebalancer.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/internal/praefect/rebalancer/rebalancer.go b/internal/praefect/rebalancer/rebalancer.go index 6e1e2a43174..c8ef1e413dd 100644 --- a/internal/praefect/rebalancer/rebalancer.go +++ b/internal/praefect/rebalancer/rebalancer.go @@ -155,10 +155,11 @@ count_replication_queue AS ( SELECT count(1) as count FROM replication_queue WHERE state = 'ready' OR state = 'in_progress' ), +-- Select random repositories from highest storage -- We join with repository_assignments and storage_repositories in order to check if the repository is on a nominal state -- For that, we compare the current storage list with the wanted storage list -- We also join with storage_repositories in order to check if the generation is uniform within all storage -eligible_repositories AS ( +repositories AS ( SELECT repositories.repository_id, repositories.virtual_storage, repositories.relative_path, sr.storages as storages FROM repositories JOIN repository_assignments USING (repository_id) @@ -189,15 +190,11 @@ eligible_repositories AS ( AND g.distinct_generation = 1 -- Check that generation is uniform AND sr.storages = ra.storages -- Check that storages are the same AND rq.repository_id IS NULL -- Check that no other job is running for this repo + AND NOT $4::text[] <@ string_to_array(ra.storages, ',') -- Only get repository that are eligible to move to destination AND (SELECT count FROM count_replication_queue) <= $6 -- Check that replication job are lower than limit AND (SELECT acquired FROM rebalance_lock) -- Check that we have acquired the lock -), - --- Select random repositories from highest storage -repositories AS ( - SELECT * FROM eligible_repositories - ORDER BY random() - LIMIT $5 + ORDER BY random() + LIMIT $5 ), -- this function selects distinct rows from the repositories subquery, -- GitLab