diff --git a/config.toml.example b/config.toml.example index 0bc4dc46748d79c4f566735af827996aca24fc91..e056123a50b12ef8a5c1de10accda206210b661b 100644 --- a/config.toml.example +++ b/config.toml.example @@ -161,6 +161,9 @@ self_signed_cert = false # # Optional: defaults to 1 # # The number of goroutines performing write-ahead log backups. # wal_backup_worker_count = 1 +# # Optional: defaults to 5,242,880 bytes (5MiB). If 0, the driver will choose a reasonable default. +# # The size of each part in a multipart upload to object storage. +# buffer_size = 0 # # Bundle-URI # [bundle_uri] diff --git a/internal/backup/sink.go b/internal/backup/sink.go index 557e8254d0e17ba9f44f000b66455dbc47646783..d0fafc3279c2d183412718d8afa46572c1838879 100644 --- a/internal/backup/sink.go +++ b/internal/backup/sink.go @@ -24,7 +24,7 @@ import ( // The storage engine is chosen based on the provided uri. // It is the caller's responsibility to provide all required environment // variables in order to get properly initialized storage engine driver. -func ResolveSink(ctx context.Context, uri string) (*Sink, error) { +func ResolveSink(ctx context.Context, uri string, opts ...SinkOption) (*Sink, error) { parsed, err := url.Parse(uri) if err != nil { return nil, err @@ -36,23 +36,44 @@ func ResolveSink(ctx context.Context, uri string) (*Sink, error) { // a full set of variations. Instead we trim it up to the service option only. scheme = scheme[i+1:] } - + var sink *Sink switch scheme { case s3blob.Scheme, azureblob.Scheme, gcsblob.Scheme, memblob.Scheme: - return newSink(ctx, uri) + sink, err = newSink(ctx, uri) case fileblob.Scheme, "": // fileblob.OpenBucket requires a bare path without 'file://'. - return newFileblobSink(parsed.Path) + sink, err = newFileblobSink(parsed.Path) default: return nil, fmt.Errorf("unsupported sink URI scheme: %q", scheme) } + + if err != nil { + return nil, fmt.Errorf("failed to create object storage sink: %w", err) + } + + for _, opt := range opts { + opt(sink) + } + + return sink, nil +} + +// WithBufferSize sets the buffer size for the sink. +func WithBufferSize(size int) SinkOption { + return func(s *Sink) { + s.bufferSize = size + } } // Sink uses a storage engine that can be defined by the construction url on creation. type Sink struct { - bucket *blob.Bucket + bucket *blob.Bucket + bufferSize int } +// SinkOption is a function that configures a Sink. +type SinkOption func(*Sink) + // newSink returns initialized instance of Sink instance. func newSink(ctx context.Context, url string) (*Sink, error) { bucket, err := blob.OpenBucket(ctx, url) @@ -98,6 +119,7 @@ func (s Sink) Close() error { // bucket. It is the callers responsibility to Close the reader after usage. func (s Sink) GetWriter(ctx context.Context, relativePath string) (io.WriteCloser, error) { writer, err := s.bucket.NewWriter(ctx, relativePath, &blob.WriterOptions{ + BufferSize: s.bufferSize, // 'no-store' - we don't want the backup to be cached as the content could be changed, // so we always want a fresh and up to date data // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control#cacheability diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index 9c90110e74478b90636719888438b28ddffa5c87..11b25e6015384648bc475d23e9fa497c9656d263 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -536,7 +536,7 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { var backupLocator backup.Locator if cfg.Backup.GoCloudURL != "" { var err error - backupSink, err = backup.ResolveSink(ctx, cfg.Backup.GoCloudURL) + backupSink, err = backup.ResolveSink(ctx, cfg.Backup.GoCloudURL, backup.WithBufferSize(cfg.Backup.BufferSize)) if err != nil { return fmt.Errorf("resolve backup sink: %w", err) } diff --git a/internal/gitaly/config/config.go b/internal/gitaly/config/config.go index 8b775f39b892ecebe3ff76667df3815b56431f2d..89b040ebb2f21d6092e614febc4f897bec8d3cbf 100644 --- a/internal/gitaly/config/config.go +++ b/internal/gitaly/config/config.go @@ -608,6 +608,8 @@ type BackupConfig struct { WALWorkerCount uint `json:"wal_backup_worker_count,omitempty" toml:"wal_backup_worker_count,omitempty"` // Layout determines how backup files are located. Layout string `json:"layout,omitempty" toml:"layout,omitempty"` + // BufferSize specifies the size of the buffer used when uploading backup parts to object storage. + BufferSize int `json:"buffer_size,omitempty" toml:"buffer_size,omitempty"` } // Validate runs validation on all fields and returns any errors found.