From 9464e58d90487794cea5c916d20c35002b58dbba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Trzci=C5=84ski?= Date: Wed, 14 Oct 2020 10:50:13 +0200 Subject: [PATCH 1/4] Allow to refresh an existing cached archives when accessed If archive is broken (which should in fact never happen) we fail the first request and mark the cache entry as invalid. It will be refreshed on a next try. --- internal/httprange/http_reader.go | 13 ++----- internal/httprange/resource.go | 56 +++++++++++++++++++++++++++-- internal/httprange/resource_test.go | 19 ++++++---- internal/vfs/zip/archive.go | 26 ++++++++++---- internal/vfs/zip/archive_test.go | 12 +++---- internal/vfs/zip/vfs.go | 33 ++++++++++++----- internal/vfs/zip/vfs_test.go | 2 +- 7 files changed, 119 insertions(+), 42 deletions(-) diff --git a/internal/httprange/http_reader.go b/internal/httprange/http_reader.go index 467256a0f..d49411365 100644 --- a/internal/httprange/http_reader.go +++ b/internal/httprange/http_reader.go @@ -106,21 +106,12 @@ func (r *Reader) prepareRequest() (*http.Request, error) { return nil, ErrInvalidRange } - req, err := http.NewRequest("GET", r.Resource.URL, nil) + req, err := r.Resource.Request() if err != nil { return nil, err } req = req.WithContext(r.ctx) - - if r.Resource.ETag != "" { - req.Header.Set("ETag", r.Resource.ETag) - } else if r.Resource.LastModified != "" { - // Last-Modified should be a fallback mechanism in case ETag is not present - // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Last-Modified - req.Header.Set("If-Range", r.Resource.LastModified) - } - req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", r.offset, r.rangeStart+r.rangeSize-1)) return req, nil @@ -133,6 +124,7 @@ func (r *Reader) setResponse(res *http.Response) error { // some servers return 200 OK for bytes=0- // TODO: should we handle r.Resource.Last-Modified as well? if r.offset > 0 || r.Resource.ETag != "" && r.Resource.ETag != res.Header.Get("ETag") { + r.Resource.setError(ErrContentHasChanged) return ErrContentHasChanged } case http.StatusNotFound: @@ -141,6 +133,7 @@ func (r *Reader) setResponse(res *http.Response) error { // Requested `Range` request succeeded https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/206 break case http.StatusRequestedRangeNotSatisfiable: + r.Resource.setError(ErrRangeRequestsNotSupported) return ErrRangeRequestsNotSupported default: return fmt.Errorf("httprange: read response %d: %q", res.StatusCode, res.Status) diff --git a/internal/httprange/resource.go b/internal/httprange/resource.go index d2dbd340c..f4e8c6806 100644 --- a/internal/httprange/resource.go +++ b/internal/httprange/resource.go @@ -8,15 +8,67 @@ import ( "net/http" "strconv" "strings" + "sync" ) // Resource represents any HTTP resource that can be read by a GET operation. // It holds the resource's URL and metadata about it. type Resource struct { - URL string + url string ETag string LastModified string Size int64 + err error + + lock sync.RWMutex +} + +func (r *Resource) GetURL() string { + r.lock.Lock() + defer r.lock.Unlock() + + return r.url +} + +func (r *Resource) SetURL(url string) { + r.lock.Lock() + defer r.lock.Unlock() + + r.url = url +} + +func (r *Resource) Err() error { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.err +} + +func (r *Resource) setError(err error) { + r.lock.Lock() + defer r.lock.Unlock() + + r.err = err +} + +func (r *Resource) Request() (*http.Request, error) { + r.lock.RLock() + defer r.lock.RUnlock() + + req, err := http.NewRequest("GET", r.url, nil) + if err != nil { + return nil, err + } + + if r.ETag != "" { + req.Header.Set("ETag", r.ETag) + } else if r.LastModified != "" { + // Last-Modified should be a fallback mechanism in case ETag is not present + // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Last-Modified + req.Header.Set("If-Range", r.LastModified) + } + + return req, nil } func NewResource(ctx context.Context, url string) (*Resource, error) { @@ -44,7 +96,7 @@ func NewResource(ctx context.Context, url string) (*Resource, error) { }() resource := &Resource{ - URL: url, + url: url, ETag: res.Header.Get("ETag"), LastModified: res.Header.Get("Last-Modified"), } diff --git a/internal/httprange/resource_test.go b/internal/httprange/resource_test.go index ace8f92cd..c5790c97f 100644 --- a/internal/httprange/resource_test.go +++ b/internal/httprange/resource_test.go @@ -10,8 +10,8 @@ import ( ) func TestNewResource(t *testing.T) { - resource := Resource{ - URL: "/some/resource", + resource := &Resource{ + url: "/some/resource", ETag: "etag", LastModified: "Wed, 21 Oct 2015 07:28:00 GMT", Size: 1, @@ -21,7 +21,7 @@ func TestNewResource(t *testing.T) { url string status int contentRange string - want Resource + want *Resource expectedErrMsg string }{ "status_ok": { @@ -33,10 +33,10 @@ func TestNewResource(t *testing.T) { url: "/some/resource", status: http.StatusPartialContent, contentRange: "bytes 200-1000/67589", - want: func() Resource { - r := resource + want: func() *Resource { + r := *resource r.Size = 67589 - return r + return &r }(), }, "status_partial_content_invalid_content_range": { @@ -44,26 +44,31 @@ func TestNewResource(t *testing.T) { status: http.StatusPartialContent, contentRange: "invalid", expectedErrMsg: "invalid `Content-Range`:", + want: resource, }, "status_partial_content_content_range_not_a_number": { url: "/some/resource", status: http.StatusPartialContent, contentRange: "bytes 200-1000/notanumber", expectedErrMsg: "invalid `Content-Range`:", + want: resource, }, "StatusRequestedRangeNotSatisfiable": { url: "/some/resource", status: http.StatusRequestedRangeNotSatisfiable, expectedErrMsg: ErrRangeRequestsNotSupported.Error(), + want: resource, }, "not_found": { url: "/some/resource", status: http.StatusNotFound, expectedErrMsg: ErrNotFound.Error(), + want: resource, }, "invalid_url": { url: "/%", expectedErrMsg: "invalid URL escape", + want: resource, }, } @@ -86,7 +91,7 @@ func TestNewResource(t *testing.T) { } require.NoError(t, err) - require.Contains(t, got.URL, tt.want.URL) + require.Contains(t, got.url, tt.want.url) require.Equal(t, tt.want.LastModified, got.LastModified) require.Equal(t, tt.want.ETag, got.ETag) require.Equal(t, tt.want.Size, got.Size) diff --git a/internal/vfs/zip/archive.go b/internal/vfs/zip/archive.go index df1757644..017ce1db3 100644 --- a/internal/vfs/zip/archive.go +++ b/internal/vfs/zip/archive.go @@ -38,7 +38,6 @@ var ( type zipArchive struct { fs *zipVFS - path string once sync.Once done chan struct{} openTimeout time.Duration @@ -54,10 +53,9 @@ type zipArchive struct { directories map[string]*zip.FileHeader } -func newArchive(fs *zipVFS, path string, openTimeout time.Duration) *zipArchive { +func newArchive(fs *zipVFS, openTimeout time.Duration) *zipArchive { return &zipArchive{ fs: fs, - path: path, done: make(chan struct{}), files: make(map[string]*zip.File), directories: make(map[string]*zip.FileHeader), @@ -66,7 +64,21 @@ func newArchive(fs *zipVFS, path string, openTimeout time.Duration) *zipArchive } } -func (a *zipArchive) openArchive(parentCtx context.Context) (err error) { +func (a *zipArchive) isValid() bool { + if a.resource != nil { + return a.resource.Err() == nil + } + + // until resource is opened, it is valid + return true +} + +func (a *zipArchive) openArchive(parentCtx context.Context, url string) (err error) { + // always try to update URL on resource + if a.resource != nil { + a.resource.SetURL(url) + } + // return early if openArchive was done already in a concurrent request if ok, err := a.openStatus(); ok { return err @@ -78,7 +90,7 @@ func (a *zipArchive) openArchive(parentCtx context.Context) (err error) { a.once.Do(func() { // read archive once in its own routine with its own timeout // if parentCtx is canceled, readArchive will continue regardless and will be cached in memory - go a.readArchive() + go a.readArchive(url) }) // wait for readArchive to be done or return if the parent context is canceled @@ -100,14 +112,14 @@ func (a *zipArchive) openArchive(parentCtx context.Context) (err error) { // readArchive creates an httprange.Resource that can read the archive's contents and stores a slice of *zip.Files // that can be accessed later when calling any of th vfs.VFS operations -func (a *zipArchive) readArchive() { +func (a *zipArchive) readArchive(url string) { defer close(a.done) // readArchive with a timeout separate from openArchive's ctx, cancel := context.WithTimeout(context.Background(), a.openTimeout) defer cancel() - a.resource, a.err = httprange.NewResource(ctx, a.path) + a.resource, a.err = httprange.NewResource(ctx, url) if a.err != nil { metrics.ZipOpened.WithLabelValues("error").Inc() return diff --git a/internal/vfs/zip/archive_test.go b/internal/vfs/zip/archive_test.go index 2474d419a..e6543be31 100644 --- a/internal/vfs/zip/archive_test.go +++ b/internal/vfs/zip/archive_test.go @@ -246,11 +246,11 @@ func TestArchiveCanBeReadAfterOpenCtxCanceled(t *testing.T) { defer cleanup() fs := New().(*zipVFS) - zip := newArchive(fs, testServerURL+"/public.zip", time.Second) + zip := newArchive(fs, time.Second) ctx, cancel := context.WithCancel(context.Background()) cancel() - err := zip.openArchive(ctx) + err := zip.openArchive(ctx, testServerURL+"/public.zip") require.EqualError(t, err, context.Canceled.Error()) <-zip.done @@ -269,9 +269,9 @@ func TestReadArchiveFails(t *testing.T) { defer cleanup() fs := New().(*zipVFS) - zip := newArchive(fs, testServerURL+"/unkown.html", time.Second) + zip := newArchive(fs, time.Second) - err := zip.openArchive(context.Background()) + err := zip.openArchive(context.Background(), testServerURL+"/unkown.html") require.Error(t, err) require.Contains(t, err.Error(), httprange.ErrNotFound.Error()) @@ -289,9 +289,9 @@ func openZipArchive(t *testing.T, requests *int64) (*zipArchive, func()) { testServerURL, cleanup := newZipFileServerURL(t, "group/zip.gitlab.io/public-without-dirs.zip", requests) fs := New().(*zipVFS) - zip := newArchive(fs, testServerURL+"/public.zip", time.Second) + zip := newArchive(fs, time.Second) - err := zip.openArchive(context.Background()) + err := zip.openArchive(context.Background(), testServerURL+"/public.zip") require.NoError(t, err) // public/ public/index.html public/404.html public/symlink.html diff --git a/internal/vfs/zip/vfs.go b/internal/vfs/zip/vfs.go index 3b69d1e9e..af2aabb83 100644 --- a/internal/vfs/zip/vfs.go +++ b/internal/vfs/zip/vfs.go @@ -110,6 +110,15 @@ func New(options ...Option) vfs.VFS { return zipVFS } +func (fs *zipVFS) keyFromURL(url *url.URL) string { + // We assume that our URL is https://.../artifacts.zip?content-sign=aaa + // our caching key is `https://.../artifacts.zip` + newURL := *url + newURL.RawQuery = "" + newURL.Fragment = "" + return newURL.String() +} + // Root opens an archive given a URL path and returns an instance of zipArchive // that implements the vfs.VFS interface. // To avoid using locks, the findOrOpenArchive function runs inside of a for @@ -123,9 +132,11 @@ func (fs *zipVFS) Root(ctx context.Context, path string) (vfs.Root, error) { return nil, err } + key := fs.keyFromURL(urlPath) + // we do it in loop to not use any additional locks for { - root, err := fs.findOrOpenArchive(ctx, urlPath.String()) + root, err := fs.findOrOpenArchive(ctx, key, urlPath.String()) if err == errAlreadyCached { continue } @@ -147,15 +158,15 @@ func (fs *zipVFS) Name() string { // otherwise creates the archive entry in a cache and try to save it, // if saving fails it's because the archive has already been cached // (e.g. by another concurrent request) -func (fs *zipVFS) findOrCreateArchive(ctx context.Context, path string) (*zipArchive, error) { +func (fs *zipVFS) findOrCreateArchive(ctx context.Context, key string) (*zipArchive, error) { // This needs to happen in lock to ensure that // concurrent access will not remove it // it is needed due to the bug https://github.com/patrickmn/go-cache/issues/48 fs.cacheLock.Lock() defer fs.cacheLock.Unlock() - archive, expiry, found := fs.cache.GetWithExpiration(path) - if found { + archive, expiry, found := fs.cache.GetWithExpiration(key) + if found && archive.(*zipArchive).isValid() { metrics.ZipCacheRequests.WithLabelValues("archive", "hit").Inc() if opened, err := archive.(*zipArchive).openStatus(); opened && err == nil { @@ -170,7 +181,7 @@ func (fs *zipVFS) findOrCreateArchive(ctx context.Context, path string) (*zipArc // We call delete to ensure that expired item // is properly evicted as there's a bug in a cache library: // https://github.com/patrickmn/go-cache/issues/48 - fs.cache.Delete(path) + fs.cache.Delete(key) // if adding the archive to the cache fails it means it's already been added before // this is done to find concurrent additions. @@ -178,7 +189,11 @@ func (fs *zipVFS) findOrCreateArchive(ctx context.Context, path string) (*zipArc return nil, errAlreadyCached } - metrics.ZipCacheRequests.WithLabelValues("archive", "miss").Inc() + if found { + metrics.ZipCacheRequests.WithLabelValues("archive", "invalid").Inc() + } else { + metrics.ZipCacheRequests.WithLabelValues("archive", "miss").Inc() + } metrics.ZipCachedEntries.WithLabelValues("archive").Inc() } @@ -186,13 +201,13 @@ func (fs *zipVFS) findOrCreateArchive(ctx context.Context, path string) (*zipArc } // findOrOpenArchive gets archive from cache and tries to open it -func (fs *zipVFS) findOrOpenArchive(ctx context.Context, path string) (*zipArchive, error) { - zipArchive, err := fs.findOrCreateArchive(ctx, path) +func (fs *zipVFS) findOrOpenArchive(ctx context.Context, key, path string) (*zipArchive, error) { + zipArchive, err := fs.findOrCreateArchive(ctx, key) if err != nil { return nil, err } - err = zipArchive.openArchive(ctx) + err = zipArchive.openArchive(ctx, key) if err != nil { return nil, err } diff --git a/internal/vfs/zip/vfs_test.go b/internal/vfs/zip/vfs_test.go index 8a5e77a8d..52efd4c93 100644 --- a/internal/vfs/zip/vfs_test.go +++ b/internal/vfs/zip/vfs_test.go @@ -96,7 +96,7 @@ func TestVFSFindOrOpenArchiveConcurrentAccess(t *testing.T) { }() require.Eventually(t, func() bool { - _, err := vfs.findOrOpenArchive(context.Background(), path) + _, err := vfs.findOrOpenArchive(context.Background(), path, path) return err == errAlreadyCached }, time.Second, time.Nanosecond) } -- GitLab From 6a7416314ac024e464cb52e9fececc6904415c00 Mon Sep 17 00:00:00 2001 From: Jaime Martinez Date: Fri, 16 Oct 2020 16:12:02 +1100 Subject: [PATCH 2/4] Fix go vet issue Remove GetURL Add new to-do Check err in http_reader tests --- internal/httprange/http_reader_test.go | 4 ++++ internal/httprange/resource.go | 7 ------- internal/httprange/resource_test.go | 11 ++++++----- internal/vfs/zip/vfs.go | 2 ++ 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/internal/httprange/http_reader_test.go b/internal/httprange/http_reader_test.go index 9c8f11d0d..867dff10e 100644 --- a/internal/httprange/http_reader_test.go +++ b/internal/httprange/http_reader_test.go @@ -245,6 +245,10 @@ func TestReaderSetResponse(t *testing.T) { if tt.expectedErrMsg != "" { require.Error(t, err) require.Contains(t, err.Error(), tt.expectedErrMsg) + if res.StatusCode == http.StatusOK || res.StatusCode == http.StatusRequestedRangeNotSatisfiable { + require.Equal(t, err, r.Resource.err) + } + return } diff --git a/internal/httprange/resource.go b/internal/httprange/resource.go index f4e8c6806..10e8a54c1 100644 --- a/internal/httprange/resource.go +++ b/internal/httprange/resource.go @@ -23,13 +23,6 @@ type Resource struct { lock sync.RWMutex } -func (r *Resource) GetURL() string { - r.lock.Lock() - defer r.lock.Unlock() - - return r.url -} - func (r *Resource) SetURL(url string) { r.lock.Lock() defer r.lock.Unlock() diff --git a/internal/httprange/resource_test.go b/internal/httprange/resource_test.go index c5790c97f..ba19fcadf 100644 --- a/internal/httprange/resource_test.go +++ b/internal/httprange/resource_test.go @@ -33,11 +33,12 @@ func TestNewResource(t *testing.T) { url: "/some/resource", status: http.StatusPartialContent, contentRange: "bytes 200-1000/67589", - want: func() *Resource { - r := *resource - r.Size = 67589 - return &r - }(), + want: &Resource{ + url: "/some/resource", + ETag: "etag", + LastModified: "Wed, 21 Oct 2015 07:28:00 GMT", + Size: 67589, + }, }, "status_partial_content_invalid_content_range": { url: "/some/resource", diff --git a/internal/vfs/zip/vfs.go b/internal/vfs/zip/vfs.go index af2aabb83..1d5b8236f 100644 --- a/internal/vfs/zip/vfs.go +++ b/internal/vfs/zip/vfs.go @@ -113,6 +113,8 @@ func New(options ...Option) vfs.VFS { func (fs *zipVFS) keyFromURL(url *url.URL) string { // We assume that our URL is https://.../artifacts.zip?content-sign=aaa // our caching key is `https://.../artifacts.zip` + // TODO: replace caching key with file_sha256 + // https://gitlab.com/gitlab-org/gitlab-pages/-/issues/489 newURL := *url newURL.RawQuery = "" newURL.Fragment = "" -- GitLab From 05282303a15f56227294286a6b1daf99956b7027 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Trzci=C5=84ski?= Date: Fri, 23 Oct 2020 16:42:57 +0200 Subject: [PATCH 3/4] Improve test suite around archive refresh This: - removes `ErrContentChanged` from `httrange` - reworks how we test file-open to discover content change - validates that archive is being refreshed (correctly) - adds a cache configuration methods --- internal/httprange/http_reader.go | 11 +-- internal/httprange/http_reader_test.go | 4 +- internal/vfs/zip/archive_test.go | 108 ++++++++++++++++++++----- internal/vfs/zip/vfs.go | 34 ++++---- 4 files changed, 114 insertions(+), 43 deletions(-) diff --git a/internal/httprange/http_reader.go b/internal/httprange/http_reader.go index d49411365..9852e1fb1 100644 --- a/internal/httprange/http_reader.go +++ b/internal/httprange/http_reader.go @@ -18,15 +18,12 @@ var ( ErrNotFound = errors.New("resource not found") // ErrRangeRequestsNotSupported is returned by Seek and Read - // when the remote server does not allow range requests (Accept-Ranges was not set) - ErrRangeRequestsNotSupported = errors.New("range requests are not supported by the remote server") + // when the remote server does not allow range requests for a given request parameters + ErrRangeRequestsNotSupported = errors.New("requests range is not supported by the remote server") // ErrInvalidRange is returned by Read when trying to read past the end of the file ErrInvalidRange = errors.New("invalid range") - // ErrContentHasChanged is returned by Read when the content has changed since the first request - ErrContentHasChanged = errors.New("content has changed since first request") - // seek errors no need to export them errSeekInvalidWhence = errors.New("invalid whence") errSeekOutsideRange = errors.New("outside of range") @@ -124,8 +121,8 @@ func (r *Reader) setResponse(res *http.Response) error { // some servers return 200 OK for bytes=0- // TODO: should we handle r.Resource.Last-Modified as well? if r.offset > 0 || r.Resource.ETag != "" && r.Resource.ETag != res.Header.Get("ETag") { - r.Resource.setError(ErrContentHasChanged) - return ErrContentHasChanged + r.Resource.setError(ErrRangeRequestsNotSupported) + return ErrRangeRequestsNotSupported } case http.StatusNotFound: return ErrNotFound diff --git a/internal/httprange/http_reader_test.go b/internal/httprange/http_reader_test.go index 867dff10e..a371ab9a4 100644 --- a/internal/httprange/http_reader_test.go +++ b/internal/httprange/http_reader_test.go @@ -214,13 +214,13 @@ func TestReaderSetResponse(t *testing.T) { "status_ok_previous_response_invalid_offset": { status: http.StatusOK, offset: 1, - expectedErrMsg: ErrContentHasChanged.Error(), + expectedErrMsg: ErrRangeRequestsNotSupported.Error(), }, "status_ok_previous_response_different_etag": { status: http.StatusOK, prevETag: "old", resEtag: "new", - expectedErrMsg: ErrContentHasChanged.Error(), + expectedErrMsg: ErrRangeRequestsNotSupported.Error(), }, "requested_range_not_satisfiable": { status: http.StatusRequestedRangeNotSatisfiable, diff --git a/internal/vfs/zip/archive_test.go b/internal/vfs/zip/archive_test.go index e6543be31..366c3741b 100644 --- a/internal/vfs/zip/archive_test.go +++ b/internal/vfs/zip/archive_test.go @@ -72,30 +72,93 @@ func TestOpen(t *testing.T) { func TestOpenCached(t *testing.T) { var requests int64 - zip, cleanup := openZipArchive(t, &requests) + testServerURL, cleanup := newZipFileServerURL(t, "group/zip.gitlab.io/public-without-dirs.zip", &requests) defer cleanup() - t.Run("open file first time", func(t *testing.T) { - requestsStart := requests - f, err := zip.Open(context.Background(), "index.html") - require.NoError(t, err) - defer f.Close() + fs := New() + + // We use array instead of map to ensure + // predictable ordering of test execution + tests := []struct { + name string + vfsPath string + filePath string + expectedOpenErr error + expectedReadErr error + expectedRequests int64 + }{ + { + name: "open file first time", + vfsPath: testServerURL + "/public.zip", + filePath: "index.html", + // we expect five requests to: + // read resource and zip metadata + // read file: data offset and content + expectedRequests: 5, + }, + { + name: "open file second time", + vfsPath: testServerURL + "/public.zip", + filePath: "index.html", + // we expect one request to read file with cached data offset + expectedRequests: 1, + }, + { + name: "when the URL changes", + vfsPath: testServerURL + "/public.zip?new-secret", + filePath: "index.html", + expectedRequests: 1, + }, + { + name: "when opening cached file and content changes", + vfsPath: testServerURL + "/public.zip?changed-content=1", + filePath: "index.html", + expectedRequests: 1, + // we receive an error on `read` as `open` offset is already cached + expectedReadErr: httprange.ErrRangeRequestsNotSupported, + }, + { + name: "after content change archive is reloaded", + vfsPath: testServerURL + "/public.zip?new-secret", + filePath: "index.html", + expectedRequests: 5, + }, + { + name: "when opening non-cached file and content changes", + vfsPath: testServerURL + "/public.zip?changed-content=1", + filePath: "subdir/hello.html", + expectedRequests: 1, + // we receive an error on `read` as `open` offset is already cached + expectedOpenErr: httprange.ErrRangeRequestsNotSupported, + }, + } - _, err = ioutil.ReadAll(f) - require.NoError(t, err) - require.Equal(t, int64(2), atomic.LoadInt64(&requests)-requestsStart, "we expect two requests to read file: data offset and content") - }) + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + start := atomic.LoadInt64(&requests) + zip, err := fs.Root(context.Background(), test.vfsPath) + require.NoError(t, err) - t.Run("open file second time", func(t *testing.T) { - requestsStart := atomic.LoadInt64(&requests) - f, err := zip.Open(context.Background(), "index.html") - require.NoError(t, err) - defer f.Close() + f, err := zip.Open(context.Background(), test.filePath) + if test.expectedOpenErr != nil { + require.Equal(t, test.expectedOpenErr, err) + return + } - _, err = ioutil.ReadAll(f) - require.NoError(t, err) - require.Equal(t, int64(1), atomic.LoadInt64(&requests)-requestsStart, "we expect one request to read file with cached data offset") - }) + require.NoError(t, err) + defer f.Close() + + _, err = ioutil.ReadAll(f) + if test.expectedReadErr != nil { + require.Equal(t, test.expectedReadErr, err) + return + } + require.NoError(t, err) + + end := atomic.LoadInt64(&requests) + require.Equal(t, test.expectedRequests, end-start) + }) + } } func TestLstat(t *testing.T) { @@ -312,6 +375,13 @@ func newZipFileServerURL(t *testing.T, zipFilePath string, requests *int64) (str m := http.NewServeMux() m.HandleFunc("/public.zip", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + r.ParseForm() + + if changedContent := r.Form.Get("changed-content"); changedContent != "" { + w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) + return + } + http.ServeFile(w, r, zipFilePath) if requests != nil { atomic.AddInt64(requests, 1) diff --git a/internal/vfs/zip/vfs.go b/internal/vfs/zip/vfs.go index 1d5b8236f..15058fb13 100644 --- a/internal/vfs/zip/vfs.go +++ b/internal/vfs/zip/vfs.go @@ -110,15 +110,18 @@ func New(options ...Option) vfs.VFS { return zipVFS } -func (fs *zipVFS) keyFromURL(url *url.URL) string { +func (fs *zipVFS) keyFromPath(path string) (string, error) { // We assume that our URL is https://.../artifacts.zip?content-sign=aaa // our caching key is `https://.../artifacts.zip` // TODO: replace caching key with file_sha256 // https://gitlab.com/gitlab-org/gitlab-pages/-/issues/489 - newURL := *url - newURL.RawQuery = "" - newURL.Fragment = "" - return newURL.String() + key, err := url.Parse(path) + if err != nil { + return "", err + } + key.RawQuery = "" + key.Fragment = "" + return key.String(), nil } // Root opens an archive given a URL path and returns an instance of zipArchive @@ -129,16 +132,14 @@ func (fs *zipVFS) keyFromURL(url *url.URL) string { // to try and find the cached archive or return if there's an error, for example // if the context is canceled. func (fs *zipVFS) Root(ctx context.Context, path string) (vfs.Root, error) { - urlPath, err := url.Parse(path) + key, err := fs.keyFromPath(path) if err != nil { return nil, err } - key := fs.keyFromURL(urlPath) - // we do it in loop to not use any additional locks for { - root, err := fs.findOrOpenArchive(ctx, key, urlPath.String()) + root, err := fs.findOrOpenArchive(ctx, key, path) if err == errAlreadyCached { continue } @@ -169,16 +170,19 @@ func (fs *zipVFS) findOrCreateArchive(ctx context.Context, key string) (*zipArch archive, expiry, found := fs.cache.GetWithExpiration(key) if found && archive.(*zipArchive).isValid() { - metrics.ZipCacheRequests.WithLabelValues("archive", "hit").Inc() - if opened, err := archive.(*zipArchive).openStatus(); opened && err == nil { if time.Until(expiry) < fs.cacheRefreshInterval { // refresh item that has been opened successfully - fs.cache.SetDefault(path, archive) + fs.cache.SetDefault(key, archive) + metrics.ZipCacheRequests.WithLabelValues("archive", "hit-refresh").Inc() + } else { + metrics.ZipCacheRequests.WithLabelValues("archive", "hit").Inc() } + } else { + metrics.ZipCacheRequests.WithLabelValues("archive", "hit").Inc() } } else { - archive = newArchive(fs, path, fs.openTimeout) + archive = newArchive(fs, key, fs.openTimeout) // We call delete to ensure that expired item // is properly evicted as there's a bug in a cache library: @@ -187,12 +191,12 @@ func (fs *zipVFS) findOrCreateArchive(ctx context.Context, key string) (*zipArch // if adding the archive to the cache fails it means it's already been added before // this is done to find concurrent additions. - if fs.cache.Add(path, archive, fs.cacheExpirationInterval) != nil { + if fs.cache.Add(key, archive, fs.cacheExpirationInterval) != nil { return nil, errAlreadyCached } if found { - metrics.ZipCacheRequests.WithLabelValues("archive", "invalid").Inc() + metrics.ZipCacheRequests.WithLabelValues("archive", "miss-invalid").Inc() } else { metrics.ZipCacheRequests.WithLabelValues("archive", "miss").Inc() } -- GitLab From 60954c6029aeae992094e023ba0cfb1728d8e566 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Trzci=C5=84ski?= Date: Thu, 29 Oct 2020 15:22:01 +0100 Subject: [PATCH 4/4] Fix failures --- internal/httprange/http_reader.go | 1 + internal/httprange/http_reader_test.go | 61 ++++++++++++++------------ internal/httprange/resource.go | 42 ++++++++++-------- internal/httprange/resource_test.go | 13 ++++-- internal/vfs/zip/archive.go | 34 ++++++++------ internal/vfs/zip/archive_test.go | 57 +++++++++++++++--------- internal/vfs/zip/vfs.go | 38 ++++++++++------ internal/vfs/zip/vfs_test.go | 6 +-- 8 files changed, 153 insertions(+), 99 deletions(-) diff --git a/internal/httprange/http_reader.go b/internal/httprange/http_reader.go index 9852e1fb1..589351fa1 100644 --- a/internal/httprange/http_reader.go +++ b/internal/httprange/http_reader.go @@ -125,6 +125,7 @@ func (r *Reader) setResponse(res *http.Response) error { return ErrRangeRequestsNotSupported } case http.StatusNotFound: + r.Resource.setError(ErrNotFound) return ErrNotFound case http.StatusPartialContent: // Requested `Range` request succeeded https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/206 diff --git a/internal/httprange/http_reader_test.go b/internal/httprange/http_reader_test.go index a371ab9a4..97bfbf24a 100644 --- a/internal/httprange/http_reader_test.go +++ b/internal/httprange/http_reader_test.go @@ -199,61 +199,68 @@ func TestSeekAndRead(t *testing.T) { func TestReaderSetResponse(t *testing.T) { tests := map[string]struct { - status int - offset int64 - prevETag string - resEtag string - expectedErrMsg string + status int + offset int64 + prevETag string + resEtag string + expectedErrMsg string + expectedIsValid bool }{ "partial_content_success": { - status: http.StatusPartialContent, + status: http.StatusPartialContent, + expectedIsValid: true, }, "status_ok_success": { - status: http.StatusOK, + status: http.StatusOK, + expectedIsValid: true, }, "status_ok_previous_response_invalid_offset": { - status: http.StatusOK, - offset: 1, - expectedErrMsg: ErrRangeRequestsNotSupported.Error(), + status: http.StatusOK, + offset: 1, + expectedErrMsg: ErrRangeRequestsNotSupported.Error(), + expectedIsValid: false, }, "status_ok_previous_response_different_etag": { - status: http.StatusOK, - prevETag: "old", - resEtag: "new", - expectedErrMsg: ErrRangeRequestsNotSupported.Error(), + status: http.StatusOK, + prevETag: "old", + resEtag: "new", + expectedErrMsg: ErrRangeRequestsNotSupported.Error(), + expectedIsValid: false, }, "requested_range_not_satisfiable": { - status: http.StatusRequestedRangeNotSatisfiable, - expectedErrMsg: ErrRangeRequestsNotSupported.Error(), + status: http.StatusRequestedRangeNotSatisfiable, + expectedErrMsg: ErrRangeRequestsNotSupported.Error(), + expectedIsValid: false, }, "not_found": { - status: http.StatusNotFound, - expectedErrMsg: ErrNotFound.Error(), + status: http.StatusNotFound, + expectedErrMsg: ErrNotFound.Error(), + expectedIsValid: false, }, "unhandled_status_code": { - status: http.StatusInternalServerError, - expectedErrMsg: "httprange: read response 500:", + status: http.StatusInternalServerError, + expectedErrMsg: "httprange: read response 500:", + expectedIsValid: true, }, } for name, tt := range tests { t.Run(name, func(t *testing.T) { - r := NewReader(context.Background(), &Resource{ETag: tt.prevETag}, tt.offset, 0) + resource := &Resource{ETag: tt.prevETag} + reader := NewReader(context.Background(), resource, tt.offset, 0) res := &http.Response{StatusCode: tt.status, Header: map[string][]string{}} res.Header.Set("ETag", tt.resEtag) - err := r.setResponse(res) + err := reader.setResponse(res) + + require.Equal(t, tt.expectedIsValid, resource.Valid()) + if tt.expectedErrMsg != "" { require.Error(t, err) require.Contains(t, err.Error(), tt.expectedErrMsg) - if res.StatusCode == http.StatusOK || res.StatusCode == http.StatusRequestedRangeNotSatisfiable { - require.Equal(t, err, r.Resource.err) - } - return } require.NoError(t, err) - require.Equal(t, r.res, res) }) } } diff --git a/internal/httprange/resource.go b/internal/httprange/resource.go index 10e8a54c1..8b908fe85 100644 --- a/internal/httprange/resource.go +++ b/internal/httprange/resource.go @@ -8,47 +8,50 @@ import ( "net/http" "strconv" "strings" - "sync" + "sync/atomic" ) // Resource represents any HTTP resource that can be read by a GET operation. // It holds the resource's URL and metadata about it. type Resource struct { - url string ETag string LastModified string Size int64 - err error - lock sync.RWMutex + url atomic.Value + err atomic.Value +} + +func (r *Resource) URL() string { + url, _ := r.url.Load().(string) + return url } func (r *Resource) SetURL(url string) { - r.lock.Lock() - defer r.lock.Unlock() + if r.URL() == url { + // We want to avoid cache lines invalidation + // on CPU due to value change + return + } - r.url = url + r.url.Store(url) } func (r *Resource) Err() error { - r.lock.RLock() - defer r.lock.RUnlock() + err, _ := r.err.Load().(error) + return err +} - return r.err +func (r *Resource) Valid() bool { + return r.Err() == nil } func (r *Resource) setError(err error) { - r.lock.Lock() - defer r.lock.Unlock() - - r.err = err + r.err.Store(err) } func (r *Resource) Request() (*http.Request, error) { - r.lock.RLock() - defer r.lock.RUnlock() - - req, err := http.NewRequest("GET", r.url, nil) + req, err := http.NewRequest("GET", r.URL(), nil) if err != nil { return nil, err } @@ -89,11 +92,12 @@ func NewResource(ctx context.Context, url string) (*Resource, error) { }() resource := &Resource{ - url: url, ETag: res.Header.Get("ETag"), LastModified: res.Header.Get("Last-Modified"), } + resource.SetURL(url) + switch res.StatusCode { case http.StatusOK: resource.Size = res.ContentLength diff --git a/internal/httprange/resource_test.go b/internal/httprange/resource_test.go index ba19fcadf..1d6481fca 100644 --- a/internal/httprange/resource_test.go +++ b/internal/httprange/resource_test.go @@ -4,14 +4,21 @@ import ( "context" "net/http" "net/http/httptest" + "sync/atomic" "testing" "github.com/stretchr/testify/require" ) +func urlValue(url string) atomic.Value { + v := atomic.Value{} + v.Store(url) + return v +} + func TestNewResource(t *testing.T) { resource := &Resource{ - url: "/some/resource", + url: urlValue("/some/resource"), ETag: "etag", LastModified: "Wed, 21 Oct 2015 07:28:00 GMT", Size: 1, @@ -34,7 +41,7 @@ func TestNewResource(t *testing.T) { status: http.StatusPartialContent, contentRange: "bytes 200-1000/67589", want: &Resource{ - url: "/some/resource", + url: urlValue("/some/resource"), ETag: "etag", LastModified: "Wed, 21 Oct 2015 07:28:00 GMT", Size: 67589, @@ -92,7 +99,7 @@ func TestNewResource(t *testing.T) { } require.NoError(t, err) - require.Contains(t, got.url, tt.want.url) + require.Contains(t, got.URL(), tt.want.URL()) require.Equal(t, tt.want.LastModified, got.LastModified) require.Equal(t, tt.want.ETag, got.ETag) require.Equal(t, tt.want.Size, got.Size) diff --git a/internal/vfs/zip/archive.go b/internal/vfs/zip/archive.go index 017ce1db3..1137f0041 100644 --- a/internal/vfs/zip/archive.go +++ b/internal/vfs/zip/archive.go @@ -32,6 +32,15 @@ var ( errNotFile = errors.New("not a file") ) +type archiveStatus int + +const ( + archiveOpening archiveStatus = iota + archiveOpenError + archiveOpened + archiveCorrupted +) + // zipArchive implements the vfs.Root interface. // It represents a zip archive saving all its files in memory. // It holds an httprange.Resource that can be read with httprange.RangedReader in chunks. @@ -64,15 +73,6 @@ func newArchive(fs *zipVFS, openTimeout time.Duration) *zipArchive { } } -func (a *zipArchive) isValid() bool { - if a.resource != nil { - return a.resource.Err() == nil - } - - // until resource is opened, it is valid - return true -} - func (a *zipArchive) openArchive(parentCtx context.Context, url string) (err error) { // always try to update URL on resource if a.resource != nil { @@ -80,7 +80,7 @@ func (a *zipArchive) openArchive(parentCtx context.Context, url string) (err err } // return early if openArchive was done already in a concurrent request - if ok, err := a.openStatus(); ok { + if status, err := a.openStatus(); status != archiveOpening { return err } @@ -293,12 +293,20 @@ func (a *zipArchive) onEvicted() { metrics.ZipArchiveEntriesCached.Sub(float64(len(a.files))) } -func (a *zipArchive) openStatus() (bool, error) { +func (a *zipArchive) openStatus() (archiveStatus, error) { select { case <-a.done: - return true, a.err + if a.err != nil { + return archiveOpenError, a.err + } + + if a.resource != nil && a.resource.Err() != nil { + return archiveCorrupted, a.resource.Err() + } + + return archiveOpened, nil default: - return false, nil + return archiveOpening, nil } } diff --git a/internal/vfs/zip/archive_test.go b/internal/vfs/zip/archive_test.go index 366c3741b..e1b0d116b 100644 --- a/internal/vfs/zip/archive_test.go +++ b/internal/vfs/zip/archive_test.go @@ -80,12 +80,13 @@ func TestOpenCached(t *testing.T) { // We use array instead of map to ensure // predictable ordering of test execution tests := []struct { - name string - vfsPath string - filePath string - expectedOpenErr error - expectedReadErr error - expectedRequests int64 + name string + vfsPath string + filePath string + expectedArchiveStatus archiveStatus + expectedOpenErr error + expectedReadErr error + expectedRequests int64 }{ { name: "open file first time", @@ -94,20 +95,23 @@ func TestOpenCached(t *testing.T) { // we expect five requests to: // read resource and zip metadata // read file: data offset and content - expectedRequests: 5, + expectedRequests: 5, + expectedArchiveStatus: archiveOpened, }, { name: "open file second time", vfsPath: testServerURL + "/public.zip", filePath: "index.html", // we expect one request to read file with cached data offset - expectedRequests: 1, + expectedRequests: 1, + expectedArchiveStatus: archiveOpened, }, { - name: "when the URL changes", - vfsPath: testServerURL + "/public.zip?new-secret", - filePath: "index.html", - expectedRequests: 1, + name: "when the URL changes", + vfsPath: testServerURL + "/public.zip?new-secret", + filePath: "index.html", + expectedRequests: 1, + expectedArchiveStatus: archiveOpened, }, { name: "when opening cached file and content changes", @@ -115,13 +119,15 @@ func TestOpenCached(t *testing.T) { filePath: "index.html", expectedRequests: 1, // we receive an error on `read` as `open` offset is already cached - expectedReadErr: httprange.ErrRangeRequestsNotSupported, + expectedReadErr: httprange.ErrRangeRequestsNotSupported, + expectedArchiveStatus: archiveCorrupted, }, { - name: "after content change archive is reloaded", - vfsPath: testServerURL + "/public.zip?new-secret", - filePath: "index.html", - expectedRequests: 5, + name: "after content change archive is reloaded", + vfsPath: testServerURL + "/public.zip?new-secret", + filePath: "index.html", + expectedRequests: 5, + expectedArchiveStatus: archiveOpened, }, { name: "when opening non-cached file and content changes", @@ -129,7 +135,8 @@ func TestOpenCached(t *testing.T) { filePath: "subdir/hello.html", expectedRequests: 1, // we receive an error on `read` as `open` offset is already cached - expectedOpenErr: httprange.ErrRangeRequestsNotSupported, + expectedOpenErr: httprange.ErrRangeRequestsNotSupported, + expectedArchiveStatus: archiveCorrupted, }, } @@ -142,6 +149,8 @@ func TestOpenCached(t *testing.T) { f, err := zip.Open(context.Background(), test.filePath) if test.expectedOpenErr != nil { require.Equal(t, test.expectedOpenErr, err) + status, _ := zip.(*zipArchive).openStatus() + require.Equal(t, test.expectedArchiveStatus, status) return } @@ -151,9 +160,14 @@ func TestOpenCached(t *testing.T) { _, err = ioutil.ReadAll(f) if test.expectedReadErr != nil { require.Equal(t, test.expectedReadErr, err) + status, _ := zip.(*zipArchive).openStatus() + require.Equal(t, test.expectedArchiveStatus, status) return } + require.NoError(t, err) + status, _ := zip.(*zipArchive).openStatus() + require.Equal(t, test.expectedArchiveStatus, status) end := atomic.LoadInt64(&requests) require.Equal(t, test.expectedRequests, end-start) @@ -375,6 +389,10 @@ func newZipFileServerURL(t *testing.T, zipFilePath string, requests *int64) (str m := http.NewServeMux() m.HandleFunc("/public.zip", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if requests != nil { + atomic.AddInt64(requests, 1) + } + r.ParseForm() if changedContent := r.Form.Get("changed-content"); changedContent != "" { @@ -383,9 +401,6 @@ func newZipFileServerURL(t *testing.T, zipFilePath string, requests *int64) (str } http.ServeFile(w, r, zipFilePath) - if requests != nil { - atomic.AddInt64(requests, 1) - } })) testServer := httptest.NewServer(m) diff --git a/internal/vfs/zip/vfs.go b/internal/vfs/zip/vfs.go index 15058fb13..692a4a69d 100644 --- a/internal/vfs/zip/vfs.go +++ b/internal/vfs/zip/vfs.go @@ -169,20 +169,35 @@ func (fs *zipVFS) findOrCreateArchive(ctx context.Context, key string) (*zipArch defer fs.cacheLock.Unlock() archive, expiry, found := fs.cache.GetWithExpiration(key) - if found && archive.(*zipArchive).isValid() { - if opened, err := archive.(*zipArchive).openStatus(); opened && err == nil { + if found { + status, _ := archive.(*zipArchive).openStatus() + switch status { + case archiveOpening: + metrics.ZipCacheRequests.WithLabelValues("archive", "hit-opening").Inc() + + case archiveOpenError: + // this means that archive is likely corrupted + // we keep it for duration of cache entry expiry (negative cache) + metrics.ZipCacheRequests.WithLabelValues("archive", "hit-open-error").Inc() + + case archiveOpened: if time.Until(expiry) < fs.cacheRefreshInterval { - // refresh item that has been opened successfully fs.cache.SetDefault(key, archive) metrics.ZipCacheRequests.WithLabelValues("archive", "hit-refresh").Inc() } else { metrics.ZipCacheRequests.WithLabelValues("archive", "hit").Inc() } - } else { - metrics.ZipCacheRequests.WithLabelValues("archive", "hit").Inc() + + case archiveCorrupted: + // this means that archive is likely changed + // we should invalidate it immediately + metrics.ZipCacheRequests.WithLabelValues("archive", "corrupted").Inc() + archive = nil } - } else { - archive = newArchive(fs, key, fs.openTimeout) + } + + if archive == nil { + archive = newArchive(fs, fs.openTimeout) // We call delete to ensure that expired item // is properly evicted as there's a bug in a cache library: @@ -192,14 +207,11 @@ func (fs *zipVFS) findOrCreateArchive(ctx context.Context, key string) (*zipArch // if adding the archive to the cache fails it means it's already been added before // this is done to find concurrent additions. if fs.cache.Add(key, archive, fs.cacheExpirationInterval) != nil { + metrics.ZipCacheRequests.WithLabelValues("archive", "already-cached").Inc() return nil, errAlreadyCached } - if found { - metrics.ZipCacheRequests.WithLabelValues("archive", "miss-invalid").Inc() - } else { - metrics.ZipCacheRequests.WithLabelValues("archive", "miss").Inc() - } + metrics.ZipCacheRequests.WithLabelValues("archive", "miss").Inc() metrics.ZipCachedEntries.WithLabelValues("archive").Inc() } @@ -213,7 +225,7 @@ func (fs *zipVFS) findOrOpenArchive(ctx context.Context, key, path string) (*zip return nil, err } - err = zipArchive.openArchive(ctx, key) + err = zipArchive.openArchive(ctx, path) if err != nil { return nil, err } diff --git a/internal/vfs/zip/vfs_test.go b/internal/vfs/zip/vfs_test.go index 52efd4c93..dff2ff433 100644 --- a/internal/vfs/zip/vfs_test.go +++ b/internal/vfs/zip/vfs_test.go @@ -166,7 +166,7 @@ func TestVFSFindOrOpenArchiveRefresh(t *testing.T) { path := testServerURL + test.path // create a new archive and increase counters - archive1, err1 := vfs.findOrOpenArchive(context.Background(), path) + archive1, err1 := vfs.findOrOpenArchive(context.Background(), path, path) if test.expectOpenError { require.Error(t, err1) require.Nil(t, archive1) @@ -182,7 +182,7 @@ func TestVFSFindOrOpenArchiveRefresh(t *testing.T) { if test.expectNewArchive { // should return a new archive - archive2, err2 := vfs.findOrOpenArchive(context.Background(), path) + archive2, err2 := vfs.findOrOpenArchive(context.Background(), path, path) if test.expectOpenError { require.Error(t, err2) require.Nil(t, archive2) @@ -194,7 +194,7 @@ func TestVFSFindOrOpenArchiveRefresh(t *testing.T) { } // should return exactly the same archive - archive2, err2 := vfs.findOrOpenArchive(context.Background(), path) + archive2, err2 := vfs.findOrOpenArchive(context.Background(), path, path) require.Equal(t, archive1, archive2, "same archive is returned") require.Equal(t, err1, err2, "same error for the same archive") -- GitLab