From b077745be923e993695b0f1438f56e174149aeac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Romuald=20Atchade=CC=81?= Date: Tue, 8 Jul 2025 15:48:12 -0400 Subject: [PATCH 1/4] Improve current implementation --- .../internal/upload/artifacts_uploader.go | 7 ++- .../internal/upload/multipart_uploader.go | 18 ++++--- workhorse/internal/upload/rewrite.go | 12 ++--- workhorse/internal/upload/uploads.go | 13 +++-- workhorse/internal/upload/uploads_test.go | 47 ++++++++++++------- workhorse/internal/upstream/routes.go | 10 ++-- 6 files changed, 68 insertions(+), 39 deletions(-) diff --git a/workhorse/internal/upload/artifacts_uploader.go b/workhorse/internal/upload/artifacts_uploader.go index a88cac9071584e..6e087874e45077 100644 --- a/workhorse/internal/upload/artifacts_uploader.go +++ b/workhorse/internal/upload/artifacts_uploader.go @@ -55,7 +55,12 @@ func Artifacts(myAPI *api.API, h http.Handler, p Preparer, cfg *config.Config) h tempDir: a.TempPath, SavedFileTracker: SavedFileTracker{Request: r}, } - interceptMultipartFiles(w, r, h, mg, &eagerAuthorizer{a}, p, cfg) + interceptMultipartFiles(w, r, h, &multipartUploaderOpts{ + formProcessor: mg, + fileAuthorizer: &eagerAuthorizer{a}, + preparer: p, + config: cfg, + }) }, "/authorize") } diff --git a/workhorse/internal/upload/multipart_uploader.go b/workhorse/internal/upload/multipart_uploader.go index 635740c2c79fdc..53fe1cea2b1a9d 100644 --- a/workhorse/internal/upload/multipart_uploader.go +++ b/workhorse/internal/upload/multipart_uploader.go @@ -20,9 +20,12 @@ import ( // with a reference to the temporary location. func Multipart(rails api.PreAuthorizer, h http.Handler, p Preparer, cfg *config.Config) http.Handler { return rails.PreAuthorizeHandler(func(w http.ResponseWriter, r *http.Request, a *api.Response) { - s := &SavedFileTracker{Request: r} - - interceptMultipartFiles(w, r, h, s, &eagerAuthorizer{a}, p, cfg) + interceptMultipartFiles(w, r, h, &multipartUploaderOpts{ + formProcessor: &SavedFileTracker{Request: r}, + fileAuthorizer: &eagerAuthorizer{a}, + preparer: p, + config: cfg, + }) }, "/authorize") } @@ -33,8 +36,11 @@ func Multipart(rails api.PreAuthorizer, h http.Handler, p Preparer, cfg *config. // using FixedPreAuthMultipart implies disk buffering. func FixedPreAuthMultipart(myAPI *api.API, h http.Handler, p Preparer, cfg *config.Config) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - s := &SavedFileTracker{Request: r} - fa := &apiAuthorizer{myAPI} - interceptMultipartFiles(w, r, h, s, fa, p, cfg) + interceptMultipartFiles(w, r, h, &multipartUploaderOpts{ + formProcessor: &SavedFileTracker{Request: r}, + fileAuthorizer: &apiAuthorizer{myAPI}, + preparer: p, + config: cfg, + }) }) } diff --git a/workhorse/internal/upload/rewrite.go b/workhorse/internal/upload/rewrite.go index 7f5c8323ddd0a0..86dff416055a70 100644 --- a/workhorse/internal/upload/rewrite.go +++ b/workhorse/internal/upload/rewrite.go @@ -64,7 +64,7 @@ type rewriter struct { finalizedFields map[string]bool } -func rewriteFormFilesFromMultipart(r *http.Request, writer *multipart.Writer, filter MultipartFormProcessor, fa fileAuthorizer, preparer Preparer, cfg *config.Config) error { +func rewriteFormFilesFromMultipart(r *http.Request, writer *multipart.Writer, multipartOpts *multipartUploaderOpts) error { // Create multipart reader reader, err := r.MultipartReader() if err != nil { @@ -72,13 +72,13 @@ func rewriteFormFilesFromMultipart(r *http.Request, writer *multipart.Writer, fi return err } - multipartUploadRequests.WithLabelValues(filter.Name()).Inc() + multipartUploadRequests.WithLabelValues(multipartOpts.formProcessor.Name()).Inc() rew := &rewriter{ writer: writer, - fileAuthorizer: fa, - Preparer: preparer, - filter: filter, + fileAuthorizer: multipartOpts.fileAuthorizer, + Preparer: multipartOpts.preparer, + filter: multipartOpts.formProcessor, finalizedFields: make(map[string]bool), } @@ -110,7 +110,7 @@ func rewriteFormFilesFromMultipart(r *http.Request, writer *multipart.Writer, fi } if filename != "" { - err = rew.handleFilePart(r, name, p, cfg) + err = rew.handleFilePart(r, name, p, multipartOpts.config) } else { err = rew.copyPart(r.Context(), name, p) } diff --git a/workhorse/internal/upload/uploads.go b/workhorse/internal/upload/uploads.go index fd37fd0c04f23e..e0c28a8be9b5fd 100644 --- a/workhorse/internal/upload/uploads.go +++ b/workhorse/internal/upload/uploads.go @@ -26,6 +26,13 @@ import ( "gitlab.com/gitlab-org/gitlab/workhorse/internal/zipartifacts" ) +type multipartUploaderOpts struct { + formProcessor MultipartFormProcessor + fileAuthorizer fileAuthorizer + preparer Preparer + config *config.Config +} + // RewrittenFieldsHeader is the HTTP header used to indicate multipart form fields // that have been rewritten by GitLab Workhorse. const RewrittenFieldsHeader = "Gitlab-Workhorse-Multipart-Fields" @@ -49,7 +56,7 @@ type MultipartFormProcessor interface { // interceptMultipartFiles is the core of the implementation of // Multipart. -func interceptMultipartFiles(w http.ResponseWriter, r *http.Request, h http.Handler, filter MultipartFormProcessor, fa fileAuthorizer, p Preparer, cfg *config.Config) { +func interceptMultipartFiles(w http.ResponseWriter, r *http.Request, h http.Handler, multipartOpts *multipartUploaderOpts) { var body bytes.Buffer writer := multipart.NewWriter(&body) defer func() { @@ -59,7 +66,7 @@ func interceptMultipartFiles(w http.ResponseWriter, r *http.Request, h http.Hand }() // Rewrite multipart form data - err := rewriteFormFilesFromMultipart(r, writer, filter, fa, p, cfg) + err := rewriteFormFilesFromMultipart(r, writer, multipartOpts) if err != nil { switch err { case http.ErrNotMultipart: @@ -101,7 +108,7 @@ func interceptMultipartFiles(w http.ResponseWriter, r *http.Request, h http.Hand r.ContentLength = int64(body.Len()) r.Header.Set("Content-Type", writer.FormDataContentType()) - if err := filter.Finalize(r.Context()); err != nil { + if err := multipartOpts.formProcessor.Finalize(r.Context()); err != nil { fail.Request(w, r, fmt.Errorf("handleFileUploads: Finalize: %v", err)) return } diff --git a/workhorse/internal/upload/uploads_test.go b/workhorse/internal/upload/uploads_test.go index f65c620f45acb7..a49848e6057227 100644 --- a/workhorse/internal/upload/uploads_test.go +++ b/workhorse/internal/upload/uploads_test.go @@ -79,10 +79,13 @@ func TestUploadHandlerForwardingRawData(t *testing.T) { response := httptest.NewRecorder() handler := newProxy(ts.URL) - fa := &eagerAuthorizer{&api.Response{TempPath: tempPath}} - preparer := &DefaultPreparer{} - interceptMultipartFiles(response, httpRequest, handler, nil, fa, preparer, config.NewDefaultConfig()) + interceptMultipartFiles(response, httpRequest, handler, &multipartUploaderOpts{ + formProcessor: nil, + fileAuthorizer: &eagerAuthorizer{&api.Response{TempPath: tempPath}}, + preparer: &DefaultPreparer{}, + config: config.NewDefaultConfig(), + }) require.Equal(t, 202, response.Code) require.Equal(t, "RESPONSE", response.Body.String(), "response body") @@ -146,11 +149,12 @@ func TestUploadHandlerRewritingMultiPartData(t *testing.T) { response := httptest.NewRecorder() handler := newProxy(ts.URL) - - fa := &eagerAuthorizer{&api.Response{TempPath: tempPath}} - preparer := &DefaultPreparer{} - - interceptMultipartFiles(response, httpRequest, handler, &testFormProcessor{}, fa, preparer, config.NewDefaultConfig()) + interceptMultipartFiles(response, httpRequest, handler, &multipartUploaderOpts{ + formProcessor: &testFormProcessor{}, + fileAuthorizer: &eagerAuthorizer{&api.Response{TempPath: tempPath}}, + preparer: &DefaultPreparer{}, + config: config.NewDefaultConfig(), + }) require.Equal(t, 202, response.Code) cancel() // this will trigger an async cleanup @@ -307,10 +311,12 @@ func TestUploadProcessingFile(t *testing.T) { httpRequest.Header.Set("Content-Type", writer.FormDataContentType()) response := httptest.NewRecorder() - fa := &eagerAuthorizer{test.preauth} - preparer := &DefaultPreparer{} - - interceptMultipartFiles(response, httpRequest, nilHandler, &testFormProcessor{}, fa, preparer, config.NewDefaultConfig()) + interceptMultipartFiles(response, httpRequest, nilHandler, &multipartUploaderOpts{ + formProcessor: &testFormProcessor{}, + fileAuthorizer: &eagerAuthorizer{test.preauth}, + preparer: &DefaultPreparer{}, + config: config.NewDefaultConfig(), + }) require.Equal(t, 200, response.Code) require.Equal(t, "test", string(test.content(t))) @@ -408,7 +414,12 @@ func TestUnauthorizedMultipartHeader(t *testing.T) { defer ts.Close() api := api.NewAPI(helper.URLMustParse(ts.URL), "123", http.DefaultTransport) - interceptMultipartFiles(response, httpRequest, nilHandler, &testFormProcessor{}, &apiAuthorizer{api}, &DefaultPreparer{}, config.NewDefaultConfig()) + interceptMultipartFiles(response, httpRequest, nilHandler, &multipartUploaderOpts{ + formProcessor: &testFormProcessor{}, + fileAuthorizer: &apiAuthorizer{api}, + preparer: &DefaultPreparer{}, + config: config.NewDefaultConfig(), + }) require.Equal(t, 401, response.Code) require.Equal(t, "401 Unauthorized\n", response.Body.String()) @@ -626,10 +637,12 @@ func waitUntilDeleted(t *testing.T, path string) { func testInterceptMultipartFiles(t *testing.T, w http.ResponseWriter, r *http.Request, h http.Handler, filter MultipartFormProcessor) { t.Helper() - fa := &eagerAuthorizer{&api.Response{TempPath: t.TempDir()}} - preparer := &DefaultPreparer{} - - interceptMultipartFiles(w, r, h, filter, fa, preparer, config.NewDefaultConfig()) + interceptMultipartFiles(w, r, h, &multipartUploaderOpts{ + formProcessor: filter, + fileAuthorizer: &eagerAuthorizer{&api.Response{TempPath: t.TempDir()}}, + preparer: &DefaultPreparer{}, + config: config.NewDefaultConfig(), + }) } func setupMultipleFiles(t *testing.T) (*http.Request, *httptest.ResponseRecorder) { diff --git a/workhorse/internal/upstream/routes.go b/workhorse/internal/upstream/routes.go index 44514b486070da..a35f93fff49952 100644 --- a/workhorse/internal/upstream/routes.go +++ b/workhorse/internal/upstream/routes.go @@ -285,6 +285,7 @@ func configureRoutes(u *upstream) { preparer := upload.NewObjectStoragePreparer(u.Config) requestBodyUploader := upload.RequestBody(api, signingProxy, preparer) mimeMultipartUploader := upload.Multipart(api, signingProxy, preparer, &u.Config) + artifactsUploader := contentEncodingHandler(upload.Artifacts(api, signingProxy, preparer, &u.Config)) tempfileMultipartProxy := upload.FixedPreAuthMultipart(api, proxy, preparer, &u.Config) ciAPIProxyQueue := queueing.QueueRequests("ci_api_job_requests", tempfileMultipartProxy, u.APILimit, u.APIQueueLimit, u.APIQueueTimeout, prometheus.DefaultRegisterer) @@ -326,16 +327,13 @@ func configureRoutes(u *upstream) { // CI Artifacts u.route("POST", - newRoute(apiPattern+`v4/jobs/[0-9]+/artifacts\z`, "api_jobs_request", railsBackend), - contentEncodingHandler(upload.Artifacts(api, signingProxy, preparer, &u.Config))), + newRoute(apiPattern+`v4/jobs/[0-9]+/artifacts\z`, "api_jobs_request", railsBackend), artifactsUploader), // ActionCable websocket - u.wsRoute(newRoute(`^/-/cable\z`, "action_cable", railsBackend), - cableProxy), + u.wsRoute(newRoute(`^/-/cable\z`, "action_cable", railsBackend), cableProxy), // Terminal websocket - u.wsRoute( - newRoute(projectPattern+`-/environments/[0-9]+/terminal.ws\z`, "project_environments_terminal_ws", railsBackend), + u.wsRoute(newRoute(projectPattern+`-/environments/[0-9]+/terminal.ws\z`, "project_environments_terminal_ws", railsBackend), channel.Handler(api)), u.wsRoute(newRoute(projectPattern+`-/jobs/[0-9]+/terminal.ws\z`, "project_jobs_terminal_ws", railsBackend), channel.Handler(api)), -- GitLab From 36404ed593926d7fbcae2f1cf927715c8b648184 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Romuald=20Atchade=CC=81?= Date: Tue, 8 Jul 2025 20:04:40 -0400 Subject: [PATCH 2/4] Process metadata upload asynchronously --- .../internal/upload/artifacts_uploader.go | 95 +++++++++++++++---- 1 file changed, 78 insertions(+), 17 deletions(-) diff --git a/workhorse/internal/upload/artifacts_uploader.go b/workhorse/internal/upload/artifacts_uploader.go index 6e087874e45077..4fd33068be510f 100644 --- a/workhorse/internal/upload/artifacts_uploader.go +++ b/workhorse/internal/upload/artifacts_uploader.go @@ -42,18 +42,33 @@ type artifactsUploadProcessor struct { processLSIF bool tempDir string + metadataProcessing chan *metadataInfo + SavedFileTracker } +type metadataInfo struct { + fields map[string]string + // Artifact added here if we want to add a retry mechanism in case of failure to generate the metadata. + artifact *destination.FileHandler + metadata *destination.FileHandler + err error +} + // Artifacts is like a Multipart but specific for artifacts upload. func Artifacts(myAPI *api.API, h http.Handler, p Preparer, cfg *config.Config) http.Handler { return myAPI.PreAuthorizeHandler(func(w http.ResponseWriter, r *http.Request, a *api.Response) { + ctx, cancel := context.WithCancel(r.Context()) + defer cancel() + format := r.URL.Query().Get(ArtifactFormatKey) mg := &artifactsUploadProcessor{ format: format, processLSIF: a.ProcessLsif, tempDir: a.TempPath, SavedFileTracker: SavedFileTracker{Request: r}, + + metadataProcessing: make(chan *metadataInfo, 1), } interceptMultipartFiles(w, r, h, &multipartUploaderOpts{ formProcessor: mg, @@ -61,9 +76,31 @@ func Artifacts(myAPI *api.API, h http.Handler, p Preparer, cfg *config.Config) h preparer: p, config: cfg, }) + + go mg.metadataAsyncFinalize(ctx) }, "/authorize") } +func (a *artifactsUploadProcessor) metadataAsyncFinalize(ctx context.Context) { + select { + case metadataInfo, ok := <-a.metadataProcessing: + if !ok { + return + } + if metadataInfo.err != nil { + log.ContextLogger(ctx).WithError(metadataInfo.err).Error("Failed to process zip-metadata") + return + } + + if metadataInfo.metadata != nil { + a.sendMetadataToRailsBackend(ctx, metadataInfo) + } + case <-ctx.Done(): + log.ContextLogger(ctx).Warning("metadata async finalizer finished due to context done") + return + } +} + func (a *artifactsUploadProcessor) generateMetadataFromZip(ctx context.Context, file *destination.FileHandler, readerLimit int64) (*destination.FileHandler, error) { //nolint: funlen metaOpts := &destination.UploadOpts{ LocalTempPath: a.tempDir, @@ -134,6 +171,23 @@ func (a *artifactsUploadProcessor) generateMetadataFromZip(ctx context.Context, } func (a *artifactsUploadProcessor) ProcessFile(ctx context.Context, formName string, file *destination.FileHandler, writer *multipart.Writer, cfg *config.Config) error { + err := a.processArtifact(ctx, formName, file) + if err != nil { + return err + } + + if !strings.EqualFold(a.format, ArtifactFormatZip) && a.format != ArtifactFormatDefault { + return nil + } + + go func(ctx context.Context, file *destination.FileHandler, cfg *config.Config) { + a.processMetadata(ctx, file, cfg) + }(ctx, file, cfg) + + return nil +} + +func (a *artifactsUploadProcessor) processArtifact(ctx context.Context, formName string, file *destination.FileHandler) error { // ProcessFile for artifacts requires file form-data field name to eq `file` if formName != "file" { return fmt.Errorf("invalid form field: %q", formName) @@ -150,31 +204,38 @@ func (a *artifactsUploadProcessor) ProcessFile(ctx context.Context, formName str default: } - if !strings.EqualFold(a.format, ArtifactFormatZip) && a.format != ArtifactFormatDefault { - return nil - } + return nil +} + +func (a *artifactsUploadProcessor) processMetadata(ctx context.Context, file *destination.FileHandler, cfg *config.Config) { + info := &metadataInfo{artifact: file} + defer close(a.metadataProcessing) metadata, err := a.generateMetadataFromZip(ctx, file, cfg.MetadataConfig.ZipReaderLimitBytes) if err != nil { - return err + info.err = fmt.Errorf("generating zip metadata error: %v", err) + a.metadataProcessing <- info + return } - if metadata != nil { - fields, err := metadata.GitLabFinalizeFields("metadata") - if err != nil { - return fmt.Errorf("finalize metadata field error: %v", err) - } - - for k, v := range fields { - if err := writer.WriteField(k, v); err != nil { - return fmt.Errorf("write metadata field error: %v", err) - } - } + if metadata == nil { + a.metadataProcessing <- &metadataInfo{} + return + } - a.Track("metadata", metadata.LocalPath) + info.metadata = metadata + fields, err := metadata.GitLabFinalizeFields("metadata") + if err != nil { + info.err = fmt.Errorf("finalize metadata field error: %v", err) + a.metadataProcessing <- info + return } - return nil + info.fields = fields + a.metadataProcessing <- info +} + +func (a *artifactsUploadProcessor) sendMetadataToRailsBackend(ctx context.Context, info *metadataInfo) { } func (a *artifactsUploadProcessor) Name() string { return "artifacts" } -- GitLab From 27e2f509b168ea2c9c40468e589e03e5fcd3b27f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Romuald=20Atchade=CC=81?= Date: Wed, 9 Jul 2025 12:11:51 -0400 Subject: [PATCH 3/4] Remove metadata handling from workhorse --- .../internal/upload/artifacts_uploader.go | 175 +----------------- workhorse/internal/upload/rewrite.go | 7 +- .../internal/upload/saved_file_tracker.go | 3 +- workhorse/internal/upload/uploads.go | 2 +- 4 files changed, 9 insertions(+), 178 deletions(-) diff --git a/workhorse/internal/upload/artifacts_uploader.go b/workhorse/internal/upload/artifacts_uploader.go index 4fd33068be510f..83efee76a04755 100644 --- a/workhorse/internal/upload/artifacts_uploader.go +++ b/workhorse/internal/upload/artifacts_uploader.go @@ -4,24 +4,13 @@ import ( "context" "fmt" "io" - "mime/multipart" "net/http" - "os" - "os/exec" - "strconv" "strings" - "syscall" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "gitlab.com/gitlab-org/labkit/log" "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" "gitlab.com/gitlab-org/gitlab/workhorse/internal/config" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper/command" "gitlab.com/gitlab-org/gitlab/workhorse/internal/lsif_transformer/parser" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/zipartifacts" ) // Sent by the runner: https://gitlab.com/gitlab-org/gitlab-runner/blob/c24da19ecce8808d9d2950896f70c94f5ea1cc2e/network/gitlab.go#L580 @@ -31,44 +20,23 @@ const ( ArtifactFormatDefault = "" ) -var zipSubcommandsErrorsCounter = promauto.NewCounterVec( - prometheus.CounterOpts{ - Name: "gitlab_workhorse_zip_subcommand_errors_total", - Help: "Errors coming from subcommands used for processing ZIP archives", - }, []string{"error"}) - type artifactsUploadProcessor struct { format string processLSIF bool tempDir string - metadataProcessing chan *metadataInfo - SavedFileTracker } -type metadataInfo struct { - fields map[string]string - // Artifact added here if we want to add a retry mechanism in case of failure to generate the metadata. - artifact *destination.FileHandler - metadata *destination.FileHandler - err error -} - // Artifacts is like a Multipart but specific for artifacts upload. func Artifacts(myAPI *api.API, h http.Handler, p Preparer, cfg *config.Config) http.Handler { return myAPI.PreAuthorizeHandler(func(w http.ResponseWriter, r *http.Request, a *api.Response) { - ctx, cancel := context.WithCancel(r.Context()) - defer cancel() - format := r.URL.Query().Get(ArtifactFormatKey) mg := &artifactsUploadProcessor{ format: format, processLSIF: a.ProcessLsif, tempDir: a.TempPath, SavedFileTracker: SavedFileTracker{Request: r}, - - metadataProcessing: make(chan *metadataInfo, 1), } interceptMultipartFiles(w, r, h, &multipartUploaderOpts{ formProcessor: mg, @@ -76,118 +44,10 @@ func Artifacts(myAPI *api.API, h http.Handler, p Preparer, cfg *config.Config) h preparer: p, config: cfg, }) - - go mg.metadataAsyncFinalize(ctx) }, "/authorize") } -func (a *artifactsUploadProcessor) metadataAsyncFinalize(ctx context.Context) { - select { - case metadataInfo, ok := <-a.metadataProcessing: - if !ok { - return - } - if metadataInfo.err != nil { - log.ContextLogger(ctx).WithError(metadataInfo.err).Error("Failed to process zip-metadata") - return - } - - if metadataInfo.metadata != nil { - a.sendMetadataToRailsBackend(ctx, metadataInfo) - } - case <-ctx.Done(): - log.ContextLogger(ctx).Warning("metadata async finalizer finished due to context done") - return - } -} - -func (a *artifactsUploadProcessor) generateMetadataFromZip(ctx context.Context, file *destination.FileHandler, readerLimit int64) (*destination.FileHandler, error) { //nolint: funlen - metaOpts := &destination.UploadOpts{ - LocalTempPath: a.tempDir, - } - if metaOpts.LocalTempPath == "" { - metaOpts.LocalTempPath = os.TempDir() - } - - fileName := file.LocalPath - if fileName == "" { - fileName = file.RemoteURL - } - - logWriter := log.ContextLogger(ctx).Writer() - defer func() { - if closeErr := logWriter.Close(); closeErr != nil { - log.ContextLogger(ctx).WithError(closeErr).Error("failed to close gitlab-zip-metadata log writer") - } - }() - - zipMd := exec.CommandContext(ctx, "gitlab-zip-metadata", "-zip-reader-limit", strconv.FormatInt(readerLimit, 10), fileName) - zipMd.Stderr = logWriter - zipMd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} - - zipMdOut, err := zipMd.StdoutPipe() - if err != nil { - return nil, err - } - defer func() { - if err = zipMdOut.Close(); err != nil { - log.ContextLogger(ctx).WithError(err).Error("Failed to close zip-metadata stdout") - } - }() - - if err = zipMd.Start(); err != nil { - return nil, err - } - defer func() { - if err = command.KillProcessGroup(zipMd); err != nil { - log.ContextLogger(ctx).WithError(err).Error("Failed to kill zip-metadata process group") - } - }() - - fh, err := destination.Upload(ctx, zipMdOut, -1, "metadata.gz", metaOpts) - if err != nil { - return nil, err - } - - if err := zipMd.Wait(); err != nil { - st, ok := command.ExitStatus(err) - - if !ok { - return nil, err - } - - zipSubcommandsErrorsCounter.WithLabelValues(zipartifacts.ErrorLabelByCode(st)).Inc() - - if st == zipartifacts.CodeNotZip { - return nil, nil - } - - if st == zipartifacts.CodeLimitsReached { - return nil, zipartifacts.ErrBadMetadata - } - } - - return fh, nil -} - -func (a *artifactsUploadProcessor) ProcessFile(ctx context.Context, formName string, file *destination.FileHandler, writer *multipart.Writer, cfg *config.Config) error { - err := a.processArtifact(ctx, formName, file) - if err != nil { - return err - } - - if !strings.EqualFold(a.format, ArtifactFormatZip) && a.format != ArtifactFormatDefault { - return nil - } - - go func(ctx context.Context, file *destination.FileHandler, cfg *config.Config) { - a.processMetadata(ctx, file, cfg) - }(ctx, file, cfg) - - return nil -} - -func (a *artifactsUploadProcessor) processArtifact(ctx context.Context, formName string, file *destination.FileHandler) error { +func (a *artifactsUploadProcessor) ProcessFile(ctx context.Context, formName string, file *destination.FileHandler) error { // ProcessFile for artifacts requires file form-data field name to eq `file` if formName != "file" { return fmt.Errorf("invalid form field: %q", formName) @@ -204,38 +64,11 @@ func (a *artifactsUploadProcessor) processArtifact(ctx context.Context, formName default: } - return nil -} - -func (a *artifactsUploadProcessor) processMetadata(ctx context.Context, file *destination.FileHandler, cfg *config.Config) { - info := &metadataInfo{artifact: file} - defer close(a.metadataProcessing) - - metadata, err := a.generateMetadataFromZip(ctx, file, cfg.MetadataConfig.ZipReaderLimitBytes) - if err != nil { - info.err = fmt.Errorf("generating zip metadata error: %v", err) - a.metadataProcessing <- info - return - } - - if metadata == nil { - a.metadataProcessing <- &metadataInfo{} - return - } - - info.metadata = metadata - fields, err := metadata.GitLabFinalizeFields("metadata") - if err != nil { - info.err = fmt.Errorf("finalize metadata field error: %v", err) - a.metadataProcessing <- info - return + if !strings.EqualFold(a.format, ArtifactFormatZip) && a.format != ArtifactFormatDefault { + return nil } - info.fields = fields - a.metadataProcessing <- info -} - -func (a *artifactsUploadProcessor) sendMetadataToRailsBackend(ctx context.Context, info *metadataInfo) { + return nil } func (a *artifactsUploadProcessor) Name() string { return "artifacts" } diff --git a/workhorse/internal/upload/rewrite.go b/workhorse/internal/upload/rewrite.go index 86dff416055a70..10b4c76cfaadf7 100644 --- a/workhorse/internal/upload/rewrite.go +++ b/workhorse/internal/upload/rewrite.go @@ -16,7 +16,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/config" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/exif" ) @@ -110,7 +109,7 @@ func rewriteFormFilesFromMultipart(r *http.Request, writer *multipart.Writer, mu } if filename != "" { - err = rew.handleFilePart(r, name, p, multipartOpts.config) + err = rew.handleFilePart(r, name, p) } else { err = rew.copyPart(r.Context(), name, p) } @@ -130,7 +129,7 @@ func parseAndNormalizeContentDisposition(header textproto.MIMEHeader) (string, s return params["name"], params["filename"] } -func (rew *rewriter) handleFilePart(r *http.Request, name string, p *multipart.Part, cfg *config.Config) error { +func (rew *rewriter) handleFilePart(r *http.Request, name string, p *multipart.Part) error { if rew.filter.Count() >= maxFilesAllowed { return ErrTooManyFilesUploaded } @@ -187,7 +186,7 @@ func (rew *rewriter) handleFilePart(r *http.Request, name string, p *multipart.P multipartFileUploadBytes.WithLabelValues(rew.filter.Name()).Add(float64(fh.Size)) - return rew.filter.ProcessFile(ctx, name, fh, rew.writer, cfg) + return rew.filter.ProcessFile(ctx, name, fh) } func (rew *rewriter) copyPart(ctx context.Context, name string, p *multipart.Part) error { diff --git a/workhorse/internal/upload/saved_file_tracker.go b/workhorse/internal/upload/saved_file_tracker.go index ea321be62d1b79..ea8489c9a17128 100644 --- a/workhorse/internal/upload/saved_file_tracker.go +++ b/workhorse/internal/upload/saved_file_tracker.go @@ -7,7 +7,6 @@ import ( "mime/multipart" "net/http" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/config" "gitlab.com/gitlab-org/gitlab/workhorse/internal/secret" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/exif" @@ -34,7 +33,7 @@ func (s *SavedFileTracker) Count() int { // ProcessFile processes the uploaded file and tracks its local path. // It returns an error if the field name has already been processed. -func (s *SavedFileTracker) ProcessFile(_ context.Context, fieldName string, file *destination.FileHandler, _ *multipart.Writer, _ *config.Config) error { +func (s *SavedFileTracker) ProcessFile(_ context.Context, fieldName string, file *destination.FileHandler) error { if _, ok := s.rewrittenFields[fieldName]; ok { return fmt.Errorf("the %v field has already been processed", fieldName) } diff --git a/workhorse/internal/upload/uploads.go b/workhorse/internal/upload/uploads.go index e0c28a8be9b5fd..6bb114eda49602 100644 --- a/workhorse/internal/upload/uploads.go +++ b/workhorse/internal/upload/uploads.go @@ -46,7 +46,7 @@ type MultipartClaims struct { // MultipartFormProcessor abstracts away implementation differences // between generic MIME multipart file uploads and CI artifact uploads. type MultipartFormProcessor interface { - ProcessFile(ctx context.Context, formName string, file *destination.FileHandler, writer *multipart.Writer, cfg *config.Config) error + ProcessFile(ctx context.Context, formName string, file *destination.FileHandler) error ProcessField(ctx context.Context, formName string, writer *multipart.Writer) error Finalize(ctx context.Context) error Name() string -- GitLab From 3cb8f42354a3963b1211bb434e51360f7497f65d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Romuald=20Atchade=CC=81?= Date: Wed, 9 Jul 2025 12:16:51 -0400 Subject: [PATCH 4/4] Fix failing tests --- spec/features/file_uploads/ci_artifact_spec.rb | 2 +- workhorse/_support/lint_last_known_acceptable.txt | 1 - workhorse/internal/upload/artifacts_store_test.go | 2 +- workhorse/internal/upload/artifacts_upload_test.go | 12 ++++++------ workhorse/internal/upload/saved_file_tracker_test.go | 7 +++---- 5 files changed, 11 insertions(+), 13 deletions(-) diff --git a/spec/features/file_uploads/ci_artifact_spec.rb b/spec/features/file_uploads/ci_artifact_spec.rb index eaf4e5e2d1cc83..c6c8d02b985844 100644 --- a/spec/features/file_uploads/ci_artifact_spec.rb +++ b/spec/features/file_uploads/ci_artifact_spec.rb @@ -20,7 +20,7 @@ end RSpec.shared_examples 'for ci artifact' do - it { expect { subject }.to change { ::Ci::JobArtifact.count }.by(2) } + it { expect { subject }.to change { ::Ci::JobArtifact.count }.by(1) } it { expect(subject.code).to eq(201) } end diff --git a/workhorse/_support/lint_last_known_acceptable.txt b/workhorse/_support/lint_last_known_acceptable.txt index 47dacd956aec0c..36761a18b6c773 100644 --- a/workhorse/_support/lint_last_known_acceptable.txt +++ b/workhorse/_support/lint_last_known_acceptable.txt @@ -60,7 +60,6 @@ internal/testhelper/testhelper.go:258:39: G115: integer overflow conversion uint internal/testhelper/testhelper.go:272:39: G115: integer overflow conversion uintptr -> int (gosec) internal/transport/transport.go:147: Function 'validateIPAddress' is too long (77 > 60) (funlen) internal/upload/artifacts_upload_test.go:49:1: cognitive complexity 32 of func `testArtifactsUploadServer` is high (> 20) (gocognit) -internal/upload/artifacts_uploader.go:82:11: G204: Subprocess launched with a potential tainted input or cmd arguments (gosec) internal/upload/destination/destination.go:72: internal/upload/destination/destination.go:72: Line contains TODO/BUG/FIXME/NOTE/OPTIMIZE/HACK: "TODO: remove `data` these once rails ful..." (godox) internal/upload/destination/destination.go:117: Function 'Upload' is too long (62 > 60) (funlen) internal/upload/destination/multi_hash.go:4:2: G501: Blocklisted import crypto/md5: weak cryptographic primitive (gosec) diff --git a/workhorse/internal/upload/artifacts_store_test.go b/workhorse/internal/upload/artifacts_store_test.go index 6fee9f0ba597cf..8f4f454bba503d 100644 --- a/workhorse/internal/upload/artifacts_store_test.go +++ b/workhorse/internal/upload/artifacts_store_test.go @@ -128,7 +128,7 @@ func TestUploadHandlerSendingToExternalStorage(t *testing.T) { contentBuffer, contentType := createTestMultipartForm(t, archiveData) response := testUploadArtifacts(t, contentType, ts.URL+Path+qs, &contentBuffer) require.Equal(t, http.StatusOK, response.Code) - testhelper.RequireResponseHeader(t, response, MetadataHeaderKey, MetadataHeaderPresent) + testhelper.RequireResponseHeader(t, response, MetadataHeaderKey, MetadataHeaderMissing) require.Equal(t, 1, storeServerCalled, "store should be called only once") require.Equal(t, 1, responseProcessorCalled, "response processor should be called only once") }) diff --git a/workhorse/internal/upload/artifacts_upload_test.go b/workhorse/internal/upload/artifacts_upload_test.go index 92e35a6c535cd5..f00ba364819a88 100644 --- a/workhorse/internal/upload/artifacts_upload_test.go +++ b/workhorse/internal/upload/artifacts_upload_test.go @@ -175,7 +175,7 @@ func testUploadArtifacts(t *testing.T, contentType, url string, body io.Reader) return response } -func TestUploadHandlerAddingMetadata(t *testing.T) { +func TestUploadHandlerWithoutMetadata(t *testing.T) { testCases := []struct { desc string format string @@ -206,12 +206,12 @@ func TestUploadHandlerAddingMetadata(t *testing.T) { assert.NoError(t, err) rewrittenFields := token.Claims.(*MultipartClaims).RewrittenFields - assert.Len(t, rewrittenFields, 2) + assert.Len(t, rewrittenFields, 1) assert.Contains(t, rewrittenFields, "file") - assert.Contains(t, rewrittenFields, "metadata") + assert.NotContains(t, rewrittenFields, "metadata") assert.Contains(t, r.PostForm, "file.gitlab-workhorse-upload") - assert.Contains(t, r.PostForm, "metadata.gitlab-workhorse-upload") + assert.NotContains(t, r.PostForm, "metadata.gitlab-workhorse-upload") }, ) @@ -225,7 +225,7 @@ func TestUploadHandlerAddingMetadata(t *testing.T) { response := testUploadArtifacts(t, s.writer.FormDataContentType(), s.url, s.buffer) assert.Equal(t, http.StatusOK, response.Code) - testhelper.RequireResponseHeader(t, response, MetadataHeaderKey, MetadataHeaderPresent) + testhelper.RequireResponseHeader(t, response, MetadataHeaderKey, MetadataHeaderMissing) }) } } @@ -301,7 +301,7 @@ func TestLsifFileProcessing(t *testing.T) { response := testUploadArtifacts(t, s.writer.FormDataContentType(), s.url, s.buffer) require.Equal(t, http.StatusOK, response.Code) - testhelper.RequireResponseHeader(t, response, MetadataHeaderKey, MetadataHeaderPresent) + testhelper.RequireResponseHeader(t, response, MetadataHeaderKey, MetadataHeaderMissing) } func TestInvalidLsifFileProcessing(t *testing.T) { diff --git a/workhorse/internal/upload/saved_file_tracker_test.go b/workhorse/internal/upload/saved_file_tracker_test.go index a82d9eebc4e53d..06993de28d218f 100644 --- a/workhorse/internal/upload/saved_file_tracker_test.go +++ b/workhorse/internal/upload/saved_file_tracker_test.go @@ -10,7 +10,6 @@ import ( "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/config" "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination" ) @@ -28,7 +27,7 @@ func TestSavedFileTracking(t *testing.T) { require.Equal(t, "accelerate", tracker.Name()) file := &destination.FileHandler{} - tracker.ProcessFile(ctx, "test", file, nil, config.NewDefaultConfig()) + tracker.ProcessFile(ctx, "test", file) require.Equal(t, 1, tracker.Count()) tracker.Finalize(ctx) @@ -45,9 +44,9 @@ func TestDuplicatedFileProcessing(t *testing.T) { tracker := SavedFileTracker{} file := &destination.FileHandler{} - require.NoError(t, tracker.ProcessFile(context.Background(), "file", file, nil, config.NewDefaultConfig())) + require.NoError(t, tracker.ProcessFile(context.Background(), "file", file)) - err := tracker.ProcessFile(context.Background(), "file", file, nil, config.NewDefaultConfig()) + err := tracker.ProcessFile(context.Background(), "file", file) require.Error(t, err) require.Equal(t, "the file field has already been processed", err.Error()) } -- GitLab