From be3223968e2c28dcbae7568aee619113e514c1d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Trzci=C5=84ski?= Date: Mon, 17 Aug 2020 15:34:08 +0200 Subject: [PATCH 1/8] Add disk ZIP fs --- internal/serving/disk/reader.go | 10 ++- internal/vfs/zip/archive.go | 113 ++++++++++++++++++++++++++++++++ internal/vfs/zip/vfs.go | 44 +++++++++++++ 3 files changed, 166 insertions(+), 1 deletion(-) create mode 100644 internal/vfs/zip/archive.go create mode 100644 internal/vfs/zip/vfs.go diff --git a/internal/serving/disk/reader.go b/internal/serving/disk/reader.go index b9105f4b4..7c923fea5 100644 --- a/internal/serving/disk/reader.go +++ b/internal/serving/disk/reader.go @@ -153,7 +153,15 @@ func (reader *Reader) serveFile(ctx context.Context, w http.ResponseWriter, r *h reader.fileSizeMetric.Observe(float64(fi.Size())) w.Header().Set("Content-Type", contentType) - http.ServeContent(w, r, origPath, fi.ModTime(), file) + + // TODO: Support it here + if rs, ok := file.(io.ReadSeeker); ok { + http.ServeContent(w, r, origPath, fi.ModTime(), rs) + } else { + // Support ReadSeeker if available + w.Header().Set("Content-Length", strconv.FormatInt(fi.Size(), 10)) + io.Copy(w, file) + } return nil } diff --git a/internal/vfs/zip/archive.go b/internal/vfs/zip/archive.go new file mode 100644 index 000000000..aeaf515aa --- /dev/null +++ b/internal/vfs/zip/archive.go @@ -0,0 +1,113 @@ +package zip + +import ( + "archive/zip" + "context" + "io" + "io/ioutil" + "os" + "strings" + "sync" + + "gitlab.com/gitlab-org/gitlab-pages/internal/vfs" +) + +const dirPrefix = "public/" +const maxSymlinkSize = 256 + +type zipArchive struct { + path string + once sync.Once + done chan struct{} + zip *zip.ReadCloser + files map[string]*zip.File + zipErr error +} + +func (a *zipArchive) Open(ctx context.Context) error { + a.once.Do(func() { + a.zip, a.zipErr = zip.OpenReader(a.path) + if a.zip != nil { + a.processZip() + } + close(a.done) + }) + + // wait for it to close + // or exit early + select { + case <-a.done: + case <-ctx.Done(): + } + return a.zipErr +} + +func (a *zipArchive) processZip() { + for _, file := range a.zip.File { + if !strings.HasPrefix(file.Name, dirPrefix) { + continue + } + + a.files[file.Name] = file + } + + // recycle memory + a.zip.File = nil +} + +func (a *zipArchive) Close() { + if a.zip != nil { + a.zip.Close() + a.zip = nil + } +} + +func (a *zipArchive) Lstat(ctx context.Context, name string) (os.FileInfo, error) { + file := a.files[name] + if file == nil { + return nil, os.ErrNotExist + } + + return file.FileInfo(), nil +} + +func (a *zipArchive) Readlink(ctx context.Context, name string) (string, error) { + file := a.files[name] + if file == nil { + return "", os.ErrNotExist + } + + if file.FileInfo().Mode()&os.ModeSymlink != os.ModeSymlink { + return "", os.ErrInvalid + } + + rc, err := file.Open() + if err != nil { + return "", err + } + + data, err := ioutil.ReadAll(&io.LimitedReader{R: rc, N: maxSymlinkSize}) + if err != nil { + return "", err + } + + return string(data), nil +} + +func (a *zipArchive) Open(ctx context.Context, name string) (vfs.File, error) { + file := a.files[name] + if file == nil { + return nil, os.ErrNotExist + } + + rc, err := file.Open() + // TODO: We can support `io.Seeker` if file would not be compressed + return rc, err +} + +func newArchive(path string) zipArchive { + return &zipArchive{ + path: path, + done: make(chan struct{}), + } +} diff --git a/internal/vfs/zip/vfs.go b/internal/vfs/zip/vfs.go new file mode 100644 index 000000000..b5ba720fc --- /dev/null +++ b/internal/vfs/zip/vfs.go @@ -0,0 +1,44 @@ +package zip + +import ( + "context" + "time" + + "github.com/patrickmn/go-cache" + "gitlab.com/gitlab-org/gitlab-pages/internal/vfs" +) + +type zipVFS struct { + cache *cache.Cache +} + +func (fs *zipVFS) Dir(ctx context.Context, path string) (vfs.Dir, error) { + // we do it in loop to not use any additional locks + for { + dir, found := fs.cache.Get(path) + if !found { + dir = newArchive(path) + + // if it errors, it means that it is already added + // retry again to get it + if fs.cache.Add(path, dir, cache.DefaultExpiration) != nil { + continue + } + } + + err := dir.(*zipArchive).Open(ctx) + return dir, err + } +} + +func New() vfs.VFS { + vfs := &zipVFS{ + cache: cache.New(time.Minute, 2*time.Minute), + } + vfs.cache.OnEvicted(func(path string, object interface{}) { + if archive, ok := object.(*Archive); archive != nil && ok { + archive.Close() + } + }) + return vfs +} -- GitLab From e573d3ea296fd09b4a0ab1a8e48ab469126a8876 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Trzci=C5=84ski?= Date: Mon, 17 Aug 2020 16:38:54 +0200 Subject: [PATCH 2/8] Support remote ZIP --- internal/vfs/zip/archive.go | 60 +++++++++---- internal/vfs/zip/deflate_reader.go | 27 ++++++ internal/vfs/zip/http_reader.go | 140 +++++++++++++++++++++++++++++ internal/vfs/zip/vfs.go | 26 ++++-- metrics/metrics.go | 12 +++ 5 files changed, 243 insertions(+), 22 deletions(-) create mode 100644 internal/vfs/zip/deflate_reader.go create mode 100644 internal/vfs/zip/http_reader.go diff --git a/internal/vfs/zip/archive.go b/internal/vfs/zip/archive.go index aeaf515aa..4f7ef6992 100644 --- a/internal/vfs/zip/archive.go +++ b/internal/vfs/zip/archive.go @@ -7,6 +7,7 @@ import ( "io/ioutil" "os" "strings" + "fmt" "sync" "gitlab.com/gitlab-org/gitlab-pages/internal/vfs" @@ -16,17 +17,18 @@ const dirPrefix = "public/" const maxSymlinkSize = 256 type zipArchive struct { - path string - once sync.Once - done chan struct{} - zip *zip.ReadCloser - files map[string]*zip.File - zipErr error + path string + once sync.Once + done chan struct{} + zip *zip.Reader + zipCloser io.Closer + files map[string]*zip.File + zipErr error } -func (a *zipArchive) Open(ctx context.Context) error { +func (a *zipArchive) openArchive(ctx context.Context) error { a.once.Do(func() { - a.zip, a.zipErr = zip.OpenReader(a.path) + a.zip, a.zipCloser, a.zipErr = openZIPArchive(a.path) if a.zip != nil { a.processZip() } @@ -55,11 +57,12 @@ func (a *zipArchive) processZip() { a.zip.File = nil } -func (a *zipArchive) Close() { - if a.zip != nil { - a.zip.Close() - a.zip = nil +func (a *zipArchive) close() { + if a.zipCloser != nil { + a.zipCloser.Close() } + a.zipCloser = nil + a.zip = nil } func (a *zipArchive) Lstat(ctx context.Context, name string) (os.FileInfo, error) { @@ -100,12 +103,39 @@ func (a *zipArchive) Open(ctx context.Context, name string) (vfs.File, error) { return nil, os.ErrNotExist } - rc, err := file.Open() + dataOffset, err := file.DataOffset() + if err != nil { + return nil, err + } + // TODO: We can support `io.Seeker` if file would not be compressed - return rc, err + + if !isHTTPArchive(a.path) { + return file.Open() + } + + var reader io.ReadCloser + reader = &httpReader{ + URL: a.path, + Off: dataOffset, + N: int64(file.UncompressedSize64), + } + + switch file.Method { + case zip.Deflate: + reader = newDeflateReader(reader) + + case zip.Store: + // no-op + + default: + return nil, fmt.Errorf("unsupported compression: %x", file.Method) + } + + return reader, nil } -func newArchive(path string) zipArchive { +func newArchive(path string) *zipArchive { return &zipArchive{ path: path, done: make(chan struct{}), diff --git a/internal/vfs/zip/deflate_reader.go b/internal/vfs/zip/deflate_reader.go new file mode 100644 index 000000000..2e55ee5a3 --- /dev/null +++ b/internal/vfs/zip/deflate_reader.go @@ -0,0 +1,27 @@ +package zip + +import ( + "compress/flate" + "io" +) + +type deflateReader struct { + R io.ReadCloser + D io.ReadCloser +} + +func (r *deflateReader) Read(p []byte) (n int, err error) { + return r.D.Read(p) +} + +func (r *deflateReader) Close() error { + r.R.Close() + return r.D.Close() +} + +func newDeflateReader(r io.ReadCloser) *deflateReader { + return &deflateReader{ + R: r, + D: flate.NewReader(r), + } +} diff --git a/internal/vfs/zip/http_reader.go b/internal/vfs/zip/http_reader.go new file mode 100644 index 000000000..99c94b119 --- /dev/null +++ b/internal/vfs/zip/http_reader.go @@ -0,0 +1,140 @@ +package zip + +import ( + "archive/zip" + "fmt" + "io" + "io/ioutil" + "net/http" + "strings" + "time" + + "gitlab.com/gitlab-org/gitlab-pages/internal/httptransport" + "gitlab.com/gitlab-org/gitlab-pages/metrics" +) + +type httpReader struct { + URL string + Off int64 + N int64 + res *http.Response +} + +var httpClient = &http.Client{ + // TODO: we need connect timeout + // The longest time the request can be executed + Timeout: 30 * time.Minute, + Transport: httptransport.NewTransportWithMetrics(metrics.ZIPHttpReaderReqDuration, metrics.ZIPHttpReaderReqTotal), +} + +func (h *httpReader) ensureRequest() error { + if h.res != nil { + return nil + } + + req, err := http.NewRequest("GET", h.URL, nil) + if err != nil { + return err + } + + req.Header.Set("Range", fmt.Sprintf("%d-%d", h.Off, h.Off+h.N-1)) + res, err := httpClient.Do(req) + if err != nil { + return err + } + if res.StatusCode != http.StatusOK { + res.Body.Close() + // TODO: sanitize URL + return fmt.Errorf("the %q failed with %d: %q", h.URL, res.StatusCode, res.Status) + } + + return nil +} + +func (h *httpReader) Read(p []byte) (n int, err error) { + if len(p) == 0 { + return 0, nil + } + + if err := h.ensureRequest(); err != nil { + return 0, err + } + + return h.res.Body.Read(p) +} + +func (h *httpReader) Close() error { + if h.res != nil { + // TODO: should we read till end? + return h.res.Body.Close() + } + return nil +} + +type httpReadAt struct { + URL string +} + +func (h *httpReadAt) ReadAt(p []byte, off int64) (n int, err error) { + r := httpReader{URL: h.URL, Off: off, N: int64(len(p))} + defer r.Close() + + // TODO: + // Even if ReadAt returns n < len(p), it may use all of p as scratch space during the call. + // If some data is available but not len(p) bytes, ReadAt blocks until either all the data + // is available or an error occurs. In this respect ReadAt is different from Read. + return r.Read(p) +} + +func isHTTPArchive(path string) bool { + return strings.HasPrefix(path, "https://") +} + +func httpSize(path string) (int64, error) { + // the `h.URL` is likely presigned only for GET + req, err := http.NewRequest("GET", path, nil) + if err != nil { + return 0, err + } + + req.Header.Set("Range", fmt.Sprintf("%d-%d", 0, 0)) + res, err := httpClient.Do(req) + if err != nil { + return 0, err + } + defer io.Copy(ioutil.Discard, res.Body) + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + // TODO: sanitize URL + return 0, fmt.Errorf("the %q failed with %d: %q", path, res.StatusCode, res.Status) + } + + return res.ContentLength, nil +} + +func openZIPHTTPArchive(url string) (*zip.Reader, io.Closer, error) { + size, err := httpSize(url) + if err != nil { + return nil, nil, err + } + + r, err := zip.NewReader(&httpReadAt{URL: url}, size) + return r, nil, err +} + +func openZIPDiskArchive(path string) (*zip.Reader, io.Closer, error) { + r, err := zip.OpenReader(path) + if err != nil { + return nil, nil, err + } + return &r.Reader, r, nil +} + +func openZIPArchive(path string) (*zip.Reader, io.Closer, error) { + if isHTTPArchive(path) { + return openZIPHTTPArchive(path) + } + + return openZIPDiskArchive(path) +} diff --git a/internal/vfs/zip/vfs.go b/internal/vfs/zip/vfs.go index b5ba720fc..acdb8de38 100644 --- a/internal/vfs/zip/vfs.go +++ b/internal/vfs/zip/vfs.go @@ -8,6 +8,10 @@ import ( "gitlab.com/gitlab-org/gitlab-pages/internal/vfs" ) +const cacheExpirationInterval = time.Minute +const cacheRefreshInterval = time.Minute / 2 +const cacheEvictInterval = time.Minute + type zipVFS struct { cache *cache.Cache } @@ -15,8 +19,13 @@ type zipVFS struct { func (fs *zipVFS) Dir(ctx context.Context, path string) (vfs.Dir, error) { // we do it in loop to not use any additional locks for { - dir, found := fs.cache.Get(path) - if !found { + dir, till, found := fs.cache.GetWithExpiration(path) + if found { + if till.Sub(time.Now()) < cacheRefreshInterval { + // refresh item + fs.cache.Set(path, dir, cache.DefaultExpiration) + } + } else { dir = newArchive(path) // if it errors, it means that it is already added @@ -26,18 +35,21 @@ func (fs *zipVFS) Dir(ctx context.Context, path string) (vfs.Dir, error) { } } - err := dir.(*zipArchive).Open(ctx) - return dir, err + zipDir := dir.(*zipArchive) + + err := zipDir.openArchive(ctx) + return zipDir, err } } func New() vfs.VFS { vfs := &zipVFS{ - cache: cache.New(time.Minute, 2*time.Minute), + cache: cache.New(cacheExpirationInterval, cacheRefreshInterval), } + vfs.cache.OnEvicted(func(path string, object interface{}) { - if archive, ok := object.(*Archive); archive != nil && ok { - archive.Close() + if archive, ok := object.(*zipArchive); archive != nil && ok { + archive.close() } }) return vfs diff --git a/metrics/metrics.go b/metrics/metrics.go index 0792a41f4..f6f82014b 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -77,6 +77,18 @@ var ( Help: "The time (in seconds) it takes to get a response from the GitLab domains API", }, []string{"status_code"}) + // DomainsSourceAPIReqTotal is the number of calls made to the Object Storage that returned a 4XX error + ZIPHttpReaderReqTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "gitlab_pages_zip_reader_requests_total", + Help: "The number of Object Storage API calls with different status codes", + }, []string{"status_code"}) + + // DomainsSourceAPICallDuration is the time it takes to get a response from the Object Storage in seconds + ZIPHttpReaderReqDuration = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "gitlab_pages_zip_reader_requests_duration", + Help: "The time (in seconds) it takes to get a response from the Object Storage", + }, []string{"status_code"}) + // DiskServingFileSize metric for file size serving. serving_types: disk and object_storage DiskServingFileSize = prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "gitlab_pages_disk_serving_file_size_bytes", -- GitLab From 1b11c70544bc3f8be1e59b3c4ce090b0508feb5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Trzci=C5=84ski?= Date: Tue, 18 Aug 2020 17:50:09 +0200 Subject: [PATCH 3/8] Update ZIP support --- internal/vfs/zip/archive.go | 34 ++++++++++++++++------------------ internal/vfs/zip/vfs.go | 2 +- 2 files changed, 17 insertions(+), 19 deletions(-) diff --git a/internal/vfs/zip/archive.go b/internal/vfs/zip/archive.go index 4f7ef6992..ad61a88d5 100644 --- a/internal/vfs/zip/archive.go +++ b/internal/vfs/zip/archive.go @@ -3,11 +3,12 @@ package zip import ( "archive/zip" "context" + "errors" + "fmt" "io" - "io/ioutil" "os" + "path/filepath" "strings" - "fmt" "sync" "gitlab.com/gitlab-org/gitlab-pages/internal/vfs" @@ -65,8 +66,14 @@ func (a *zipArchive) close() { a.zip = nil } +func (a *zipArchive) findFile(name string) *zip.File { + name = filepath.Clean(name) + name = strings.TrimPrefix(name, "/") + return a.files[name] +} + func (a *zipArchive) Lstat(ctx context.Context, name string) (os.FileInfo, error) { - file := a.files[name] + file := a.findFile(name) if file == nil { return nil, os.ErrNotExist } @@ -74,27 +81,18 @@ func (a *zipArchive) Lstat(ctx context.Context, name string) (os.FileInfo, error return file.FileInfo(), nil } -func (a *zipArchive) Readlink(ctx context.Context, name string) (string, error) { - file := a.files[name] +func (a *zipArchive) EvalSymlinks(ctx context.Context, name string) (string, error) { + file := a.findFile(name) if file == nil { return "", os.ErrNotExist } if file.FileInfo().Mode()&os.ModeSymlink != os.ModeSymlink { - return "", os.ErrInvalid + return name, nil } - rc, err := file.Open() - if err != nil { - return "", err - } - - data, err := ioutil.ReadAll(&io.LimitedReader{R: rc, N: maxSymlinkSize}) - if err != nil { - return "", err - } - - return string(data), nil + // TODO: to be implemented + return "", errors.New("to be implemented") } func (a *zipArchive) Open(ctx context.Context, name string) (vfs.File, error) { @@ -124,7 +122,7 @@ func (a *zipArchive) Open(ctx context.Context, name string) (vfs.File, error) { switch file.Method { case zip.Deflate: reader = newDeflateReader(reader) - + case zip.Store: // no-op diff --git a/internal/vfs/zip/vfs.go b/internal/vfs/zip/vfs.go index acdb8de38..b5f4df7e5 100644 --- a/internal/vfs/zip/vfs.go +++ b/internal/vfs/zip/vfs.go @@ -16,7 +16,7 @@ type zipVFS struct { cache *cache.Cache } -func (fs *zipVFS) Dir(ctx context.Context, path string) (vfs.Dir, error) { +func (fs *zipVFS) Root(ctx context.Context, path string) (vfs.Root, error) { // we do it in loop to not use any additional locks for { dir, till, found := fs.cache.GetWithExpiration(path) -- GitLab From ce2b17ed8b8a6d860494f9461fda389800c2fcdb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Trzci=C5=84ski?= Date: Thu, 20 Aug 2020 16:13:06 +0200 Subject: [PATCH 4/8] Add `zip` to `LookupPath` --- internal/serving/disk/serving.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/serving/disk/serving.go b/internal/serving/disk/serving.go index e7cfcec76..a27b1bc4a 100644 --- a/internal/serving/disk/serving.go +++ b/internal/serving/disk/serving.go @@ -5,6 +5,7 @@ import ( "gitlab.com/gitlab-org/gitlab-pages/internal/serving" "gitlab.com/gitlab-org/gitlab-pages/internal/vfs" "gitlab.com/gitlab-org/gitlab-pages/internal/vfs/local" + "gitlab.com/gitlab-org/gitlab-pages/internal/vfs/zip" "gitlab.com/gitlab-org/gitlab-pages/metrics" ) @@ -14,8 +15,9 @@ var disk = &Disk{ reader: Reader{ fileSizeMetric: metrics.DiskServingFileSize, vfs: map[string]vfs.VFS{ - "": localVFS, // default to use if not specified + "": localVFS, "local": localVFS, + "zip": vfs.Instrumented(zip.New(), "zip"), }, }, } -- GitLab From 30cba86dc73bed7da197dbe50cc55d39905b4429 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Trzci=C5=84ski?= Date: Tue, 18 Aug 2020 20:24:20 +0200 Subject: [PATCH 5/8] Add efficient ZIP reader --- internal/source/disk/group.go | 3 + internal/vfs/zip/archive.go | 22 ++++-- internal/vfs/zip/archive_test.go | 30 ++++++++ internal/vfs/zip/http_read_at.go | 58 ++++++++++++++++ internal/vfs/zip/http_reader.go | 100 ++++++--------------------- internal/vfs/zip/http_reader_test.go | 46 ++++++++++++ internal/vfs/zip/http_zip.go | 74 ++++++++++++++++++++ 7 files changed, 249 insertions(+), 84 deletions(-) create mode 100644 internal/vfs/zip/archive_test.go create mode 100644 internal/vfs/zip/http_read_at.go create mode 100644 internal/vfs/zip/http_reader_test.go create mode 100644 internal/vfs/zip/http_zip.go diff --git a/internal/source/disk/group.go b/internal/source/disk/group.go index 9ec426736..b2d952d19 100644 --- a/internal/source/disk/group.go +++ b/internal/source/disk/group.go @@ -97,6 +97,9 @@ func (g *Group) Resolve(r *http.Request) (*serving.Request, error) { ProjectID: projectConfig.ID, } + lookupPath.VFS = "zip" + lookupPath.Path = "http://192.168.88.233:9000/test-bucket/doc-gitlab-com.zip?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=TEST_KEY%2F20200818%2F%2Fs3%2Faws4_request&X-Amz-Date=20200818T173935Z&X-Amz-Expires=432000&X-Amz-SignedHeaders=host&X-Amz-Signature=95810918d1b2441a07385838ebba5a0f01fdf4dcdf94ea9c602f8e7d06c84019" + return &serving.Request{ Serving: disk.Instance(), LookupPath: lookupPath, diff --git a/internal/vfs/zip/archive.go b/internal/vfs/zip/archive.go index ad61a88d5..39acb2c86 100644 --- a/internal/vfs/zip/archive.go +++ b/internal/vfs/zip/archive.go @@ -50,7 +50,6 @@ func (a *zipArchive) processZip() { if !strings.HasPrefix(file.Name, dirPrefix) { continue } - a.files[file.Name] = file } @@ -67,9 +66,17 @@ func (a *zipArchive) close() { } func (a *zipArchive) findFile(name string) *zip.File { - name = filepath.Clean(name) - name = strings.TrimPrefix(name, "/") - return a.files[name] + name = filepath.Join("public", name) + + if file := a.files[name]; file != nil { + return file + } + + if dir := a.files[name+"/"]; dir != nil { + return dir + } + + return nil } func (a *zipArchive) Lstat(ctx context.Context, name string) (os.FileInfo, error) { @@ -96,7 +103,7 @@ func (a *zipArchive) EvalSymlinks(ctx context.Context, name string) (string, err } func (a *zipArchive) Open(ctx context.Context, name string) (vfs.File, error) { - file := a.files[name] + file := a.findFile(name) if file == nil { return nil, os.ErrNotExist } @@ -135,7 +142,8 @@ func (a *zipArchive) Open(ctx context.Context, name string) (vfs.File, error) { func newArchive(path string) *zipArchive { return &zipArchive{ - path: path, - done: make(chan struct{}), + path: path, + done: make(chan struct{}), + files: make(map[string]*zip.File), } } diff --git a/internal/vfs/zip/archive_test.go b/internal/vfs/zip/archive_test.go new file mode 100644 index 000000000..6cecb1a86 --- /dev/null +++ b/internal/vfs/zip/archive_test.go @@ -0,0 +1,30 @@ +package zip + +import ( + "context" + "io/ioutil" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestOpenArchive(t *testing.T) { + zip := newArchive(URL) + defer zip.close() + + ts := time.Now() + println("OpenArchive") + err := zip.openArchive(context.Background()) + require.NoError(t, err) + + println("Open") + rc, err := zip.Open(context.Background(), "public/sitemap.xml") + require.NoError(t, err) + + println("ReadAll") + data, err := ioutil.ReadAll(rc) + require.NoError(t, err) + require.NotNil(t, data) + require.Contains(t, string(data), "") +} diff --git a/internal/vfs/zip/http_read_at.go b/internal/vfs/zip/http_read_at.go new file mode 100644 index 000000000..fa2194fe7 --- /dev/null +++ b/internal/vfs/zip/http_read_at.go @@ -0,0 +1,58 @@ +package zip + +import ( + "errors" + "io" +) + +type httpReadAt struct { + URL string + Size int64 + cached bool + cachedReader *httpReader +} + +func (h *httpReadAt) cachedRead(p []byte, off int64) (n int, err error) { + if off < 0 || off > h.Size { + return 0, errors.New("outside of bounds") + } + + if h.cachedReader != nil && (h.cachedReader.Off != off || h.cachedReader.N < int64(len(p))) { + h.cachedReader.Close() + h.cachedReader = nil + } + + if h.cachedReader == nil { + h.cachedReader = &httpReader{URL: h.URL, Off: off, N: h.Size - off} + } + + return io.ReadFull(h.cachedReader, p) +} + +func (h *httpReadAt) ephemeralRead(p []byte, off int64) (n int, err error) { + r := httpReader{URL: h.URL, Off: off, N: int64(len(p))} + defer r.Close() + + return io.ReadFull(&r, p) +} + +func (h *httpReadAt) ReadAt(p []byte, off int64) (n int, err error) { + if h.cached { + return h.cachedRead(p, off) + } + + return h.ephemeralRead(p, off) +} + +func (h *httpReadAt) withCachedReader(fn func()) { + h.cached = true + + defer func() { + if h.cachedReader != nil { + h.cachedReader.Close() + } + h.cached = false + }() + + fn() +} diff --git a/internal/vfs/zip/http_reader.go b/internal/vfs/zip/http_reader.go index 99c94b119..c1d140164 100644 --- a/internal/vfs/zip/http_reader.go +++ b/internal/vfs/zip/http_reader.go @@ -1,12 +1,10 @@ package zip import ( - "archive/zip" "fmt" "io" - "io/ioutil" "net/http" - "strings" + "os" "time" "gitlab.com/gitlab-org/gitlab-pages/internal/httptransport" @@ -27,7 +25,9 @@ var httpClient = &http.Client{ Transport: httptransport.NewTransportWithMetrics(metrics.ZIPHttpReaderReqDuration, metrics.ZIPHttpReaderReqTotal), } -func (h *httpReader) ensureRequest() error { +var requests int + +func (h *httpReader) ensureRequest(requestedSize int) error { if h.res != nil { return nil } @@ -36,31 +36,45 @@ func (h *httpReader) ensureRequest() error { if err != nil { return err } + requests++ + if requests > 10000 { + println("Too many requests") + os.Exit(1) + } - req.Header.Set("Range", fmt.Sprintf("%d-%d", h.Off, h.Off+h.N-1)) + req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", h.Off, h.Off+h.N-1)) res, err := httpClient.Do(req) if err != nil { return err } - if res.StatusCode != http.StatusOK { + + println("HTTP Request", "Range", "off=", h.Off, "n=", h.N, "requestedSize=", requestedSize, "statusCode=", res.StatusCode, "requests=", requests) + if res.StatusCode != http.StatusPartialContent { res.Body.Close() // TODO: sanitize URL return fmt.Errorf("the %q failed with %d: %q", h.URL, res.StatusCode, res.Status) } + h.res = res return nil } -func (h *httpReader) Read(p []byte) (n int, err error) { +func (h *httpReader) Read(p []byte) (int, error) { if len(p) == 0 { return 0, nil } - if err := h.ensureRequest(); err != nil { + if err := h.ensureRequest(len(p)); err != nil { return 0, err } - return h.res.Body.Read(p) + n, err := h.res.Body.Read(p) + + if err == nil || err == io.EOF { + h.Off += int64(n) + h.N -= int64(n) + } + return n, err } func (h *httpReader) Close() error { @@ -70,71 +84,3 @@ func (h *httpReader) Close() error { } return nil } - -type httpReadAt struct { - URL string -} - -func (h *httpReadAt) ReadAt(p []byte, off int64) (n int, err error) { - r := httpReader{URL: h.URL, Off: off, N: int64(len(p))} - defer r.Close() - - // TODO: - // Even if ReadAt returns n < len(p), it may use all of p as scratch space during the call. - // If some data is available but not len(p) bytes, ReadAt blocks until either all the data - // is available or an error occurs. In this respect ReadAt is different from Read. - return r.Read(p) -} - -func isHTTPArchive(path string) bool { - return strings.HasPrefix(path, "https://") -} - -func httpSize(path string) (int64, error) { - // the `h.URL` is likely presigned only for GET - req, err := http.NewRequest("GET", path, nil) - if err != nil { - return 0, err - } - - req.Header.Set("Range", fmt.Sprintf("%d-%d", 0, 0)) - res, err := httpClient.Do(req) - if err != nil { - return 0, err - } - defer io.Copy(ioutil.Discard, res.Body) - defer res.Body.Close() - - if res.StatusCode != http.StatusOK { - // TODO: sanitize URL - return 0, fmt.Errorf("the %q failed with %d: %q", path, res.StatusCode, res.Status) - } - - return res.ContentLength, nil -} - -func openZIPHTTPArchive(url string) (*zip.Reader, io.Closer, error) { - size, err := httpSize(url) - if err != nil { - return nil, nil, err - } - - r, err := zip.NewReader(&httpReadAt{URL: url}, size) - return r, nil, err -} - -func openZIPDiskArchive(path string) (*zip.Reader, io.Closer, error) { - r, err := zip.OpenReader(path) - if err != nil { - return nil, nil, err - } - return &r.Reader, r, nil -} - -func openZIPArchive(path string) (*zip.Reader, io.Closer, error) { - if isHTTPArchive(path) { - return openZIPHTTPArchive(path) - } - - return openZIPDiskArchive(path) -} diff --git a/internal/vfs/zip/http_reader_test.go b/internal/vfs/zip/http_reader_test.go new file mode 100644 index 000000000..719d059e3 --- /dev/null +++ b/internal/vfs/zip/http_reader_test.go @@ -0,0 +1,46 @@ +package zip + +import ( + "archive/zip" + "io/ioutil" + "testing" + + "github.com/stretchr/testify/require" +) + +const URL = "http://192.168.88.233:9000/test-bucket/doc-gitlab-com.zip?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=TEST_KEY%2F20200818%2F%2Fs3%2Faws4_request&X-Amz-Date=20200818T173935Z&X-Amz-Expires=432000&X-Amz-SignedHeaders=host&X-Amz-Signature=95810918d1b2441a07385838ebba5a0f01fdf4dcdf94ea9c602f8e7d06c84019" + +func findZipFile(zip *zip.Reader, name string) *zip.File { + for _, file := range zip.File { + if file.Name == name { + return file + } + } + return nil +} + +func TestOpenArchiveHTTP(t *testing.T) { + zip, closer, err := openZIPHTTPArchive(URL) + require.NoError(t, err) + defer closer.Close() + + require.NotNil(t, zip) + require.NotEmpty(t, zip.File) + + sitemap := findZipFile(zip, "public/sitemap.xml") + require.NotNil(t, sitemap) + + println("DataOffset") + _, err = sitemap.DataOffset() + require.NoError(t, err) + + println("Open") + rc, err := sitemap.Open() + require.NoError(t, err) + defer rc.Close() + + println("ReadAll") + data, err := ioutil.ReadAll(rc) + require.NoError(t, err) + require.NotNil(t, data) +} diff --git a/internal/vfs/zip/http_zip.go b/internal/vfs/zip/http_zip.go new file mode 100644 index 000000000..d17a0e7b7 --- /dev/null +++ b/internal/vfs/zip/http_zip.go @@ -0,0 +1,74 @@ +package zip + +import ( + "archive/zip" + "fmt" + "io" + "io/ioutil" + "net/http" + "strconv" + "strings" +) + +func isHTTPArchive(path string) bool { + return strings.HasPrefix(path, "http://") || strings.HasPrefix(path, "https://") +} + +func httpSize(path string) (int64, error) { + // the `h.URL` is likely presigned only for GET + req, err := http.NewRequest("GET", path, nil) + if err != nil { + return 0, err + } + + req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", 0, 0)) + res, err := httpClient.Do(req) + if err != nil { + return 0, err + } + defer io.Copy(ioutil.Discard, res.Body) + defer res.Body.Close() + + if res.StatusCode != http.StatusPartialContent { + // TODO: sanitize URL + return 0, fmt.Errorf("the %q failed with %d: %q", path, res.StatusCode, res.Status) + } + + contentRange := res.Header.Get("Content-Range") + ranges := strings.SplitN(contentRange, "/", 2) + if len(ranges) != 2 { + return 0, fmt.Errorf("the %q has invalid `Content-Range`: %q", path, contentRange) + } + + return strconv.ParseInt(ranges[1], 0, 64) +} + +func openZIPHTTPArchive(url string) (zipReader *zip.Reader, closer io.Closer, err error) { + size, err := httpSize(url) + if err != nil { + return nil, nil, err + } + + httpReader := &httpReadAt{URL: url, Size: size, cached: true} + httpReader.withCachedReader(func() { + zipReader, err = zip.NewReader(httpReader, size) + }) + + return zipReader, ioutil.NopCloser(nil), err +} + +func openZIPDiskArchive(path string) (*zip.Reader, io.Closer, error) { + r, err := zip.OpenReader(path) + if err != nil { + return nil, nil, err + } + return &r.Reader, r, nil +} + +func openZIPArchive(path string) (*zip.Reader, io.Closer, error) { + if isHTTPArchive(path) { + return openZIPHTTPArchive(path) + } + + return openZIPDiskArchive(path) +} -- GitLab From 032054cbd6d86566d6ae1d63958c36fac0e033f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Trzci=C5=84ski?= Date: Thu, 20 Aug 2020 12:47:10 +0200 Subject: [PATCH 6/8] WIP --- internal/vfs/zip/archive_test.go | 9 +++++++-- internal/vfs/zip/http_reader.go | 5 ----- internal/vfs/zip/http_reader_test.go | 1 + 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/internal/vfs/zip/archive_test.go b/internal/vfs/zip/archive_test.go index 6cecb1a86..d550b6357 100644 --- a/internal/vfs/zip/archive_test.go +++ b/internal/vfs/zip/archive_test.go @@ -13,17 +13,22 @@ func TestOpenArchive(t *testing.T) { zip := newArchive(URL) defer zip.close() - ts := time.Now() println("OpenArchive") + ts := time.Now() err := zip.openArchive(context.Background()) + println(time.Since(ts).String()) require.NoError(t, err) println("Open") - rc, err := zip.Open(context.Background(), "public/sitemap.xml") + ts = time.Now() + rc, err := zip.Open(context.Background(), "sitemap.xml") + println(time.Since(ts).String()) require.NoError(t, err) println("ReadAll") + ts = time.Now() data, err := ioutil.ReadAll(rc) + println(time.Since(ts).String()) require.NoError(t, err) require.NotNil(t, data) require.Contains(t, string(data), "") diff --git a/internal/vfs/zip/http_reader.go b/internal/vfs/zip/http_reader.go index c1d140164..d7824f838 100644 --- a/internal/vfs/zip/http_reader.go +++ b/internal/vfs/zip/http_reader.go @@ -4,7 +4,6 @@ import ( "fmt" "io" "net/http" - "os" "time" "gitlab.com/gitlab-org/gitlab-pages/internal/httptransport" @@ -37,10 +36,6 @@ func (h *httpReader) ensureRequest(requestedSize int) error { return err } requests++ - if requests > 10000 { - println("Too many requests") - os.Exit(1) - } req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", h.Off, h.Off+h.N-1)) res, err := httpClient.Do(req) diff --git a/internal/vfs/zip/http_reader_test.go b/internal/vfs/zip/http_reader_test.go index 719d059e3..32fdc1dc8 100644 --- a/internal/vfs/zip/http_reader_test.go +++ b/internal/vfs/zip/http_reader_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/require" ) +//const URL = "https://storage.googleapis.com/gitlab-gprd-artifacts/5b/f5/5bf596ed115cd2bf53b9ac39f77563ee26275e449f62c0b96a0fdb5d719ac6da/2020_08_18/692126387/760066867/artifacts.zip?response-content-disposition=attachment%3B%20filename%3D%22artifacts.zip%22%3B%20filename%2A%3DUTF-8%27%27artifacts.zip&response-content-type=application%2Fzip&GoogleAccessId=gitlab-object-storage-prd@gitlab-production.iam.gserviceaccount.com&Signature=2yWByo4dT4Ic6fkrm2asb0TinnFHnELcciZ6qB6Nhc8eNyTYxaZe9aOQblwR%0AWz9yiI84zaD%2F0eZtiJI06OqGz3u%2Bchsc7Mn%2BEdthhmcR9lIUJrbQh96BUEJf%0A0GniiYOGhEdr2gK9sr%2FYPiX7jv4ABkMmyr%2BZdxCPd1%2F%2FnIGFVyTdX07CbsrI%0AYA67RGOez1w9RqcF0wAy5qKs57E9fXshc%2BWqRZD%2Fbtd4PysOHGAT47i7Vslt%0AuzcaGXyiN7Hl8Ckq3WimeacCoB9L%2FNntsLcwx3llKdE0gpzAH04vjiVi705p%0AKV16FuRsF4qbhjYcKjzUao33QVGGXpdOsikF7JrnmA%3D%3D&Expires=1597782702" const URL = "http://192.168.88.233:9000/test-bucket/doc-gitlab-com.zip?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=TEST_KEY%2F20200818%2F%2Fs3%2Faws4_request&X-Amz-Date=20200818T173935Z&X-Amz-Expires=432000&X-Amz-SignedHeaders=host&X-Amz-Signature=95810918d1b2441a07385838ebba5a0f01fdf4dcdf94ea9c602f8e7d06c84019" func findZipFile(zip *zip.Reader, name string) *zip.File { -- GitLab From 71a39de011a7b2d19470c32ed8f73afc372636bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Trzci=C5=84ski?= Date: Thu, 20 Aug 2020 16:18:08 +0200 Subject: [PATCH 7/8] Support symlinks --- internal/vfs/file.go | 3 ++- internal/vfs/zip/archive.go | 22 ++++++++++++++++++---- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/internal/vfs/file.go b/internal/vfs/file.go index 5260c847e..3e06130cb 100644 --- a/internal/vfs/file.go +++ b/internal/vfs/file.go @@ -5,6 +5,7 @@ import "io" // File represents an open file, which will typically be the response body of a Pages request. type File interface { io.Reader - io.Seeker + // TODO: this is currently unsupported + // io.Seeker io.Closer } diff --git a/internal/vfs/zip/archive.go b/internal/vfs/zip/archive.go index 39acb2c86..3a53c417a 100644 --- a/internal/vfs/zip/archive.go +++ b/internal/vfs/zip/archive.go @@ -88,18 +88,32 @@ func (a *zipArchive) Lstat(ctx context.Context, name string) (os.FileInfo, error return file.FileInfo(), nil } -func (a *zipArchive) EvalSymlinks(ctx context.Context, name string) (string, error) { +func (a *zipArchive) Readlink(ctx context.Context, name string) (string, error) { file := a.findFile(name) if file == nil { return "", os.ErrNotExist } if file.FileInfo().Mode()&os.ModeSymlink != os.ModeSymlink { - return name, nil + return "", errors.New("not a symlink") } - // TODO: to be implemented - return "", errors.New("to be implemented") + rc, err := file.Open() + if err != nil { + return "", err + } + defer rc.Close() + + symlink := make([]byte, maxSymlinkSize+1) + _, err = io.ReadFull(rc, symlink) + if err != nil { + return "", err + } + if len(symlink) > maxSymlinkSize { + return "", errors.New("symlink too long") + } + + return string(symlink), nil } func (a *zipArchive) Open(ctx context.Context, name string) (vfs.File, error) { -- GitLab From f59aec9d526f05bde08f5c90cc4a1c4c1d8766eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Trzci=C5=84ski?= Date: Thu, 20 Aug 2020 17:31:30 +0200 Subject: [PATCH 8/8] Refactor ZIP support to be more OOM --- internal/serving/disk/serving.go | 2 +- internal/vfs/zip/archive.go | 77 ++++++------ internal/vfs/zip/archive_test.go | 2 + internal/vfs/zip/http_range/http_read_at.go | 54 ++++++++ internal/vfs/zip/http_range/http_reader.go | 129 ++++++++++++++++++++ internal/vfs/zip/http_range/resource.go | 70 +++++++++++ internal/vfs/zip/http_read_at.go | 58 --------- internal/vfs/zip/http_reader.go | 81 ------------ internal/vfs/zip/http_reader_test.go | 47 ------- internal/vfs/zip/http_zip.go | 74 ----------- 10 files changed, 294 insertions(+), 300 deletions(-) create mode 100644 internal/vfs/zip/http_range/http_read_at.go create mode 100644 internal/vfs/zip/http_range/http_reader.go create mode 100644 internal/vfs/zip/http_range/resource.go delete mode 100644 internal/vfs/zip/http_read_at.go delete mode 100644 internal/vfs/zip/http_reader.go delete mode 100644 internal/vfs/zip/http_reader_test.go delete mode 100644 internal/vfs/zip/http_zip.go diff --git a/internal/serving/disk/serving.go b/internal/serving/disk/serving.go index a27b1bc4a..535e03cbd 100644 --- a/internal/serving/disk/serving.go +++ b/internal/serving/disk/serving.go @@ -15,7 +15,7 @@ var disk = &Disk{ reader: Reader{ fileSizeMetric: metrics.DiskServingFileSize, vfs: map[string]vfs.VFS{ - "": localVFS, + "": localVFS, // default to use if not specified "local": localVFS, "zip": vfs.Instrumented(zip.New(), "zip"), }, diff --git a/internal/vfs/zip/archive.go b/internal/vfs/zip/archive.go index 3a53c417a..627c59f2a 100644 --- a/internal/vfs/zip/archive.go +++ b/internal/vfs/zip/archive.go @@ -12,29 +12,27 @@ import ( "sync" "gitlab.com/gitlab-org/gitlab-pages/internal/vfs" + "gitlab.com/gitlab-org/gitlab-pages/internal/vfs/zip/http_range" ) const dirPrefix = "public/" const maxSymlinkSize = 256 type zipArchive struct { - path string - once sync.Once - done chan struct{} - zip *zip.Reader - zipCloser io.Closer - files map[string]*zip.File - zipErr error + path string + once sync.Once + done chan struct{} + + resource *http_range.Resource + reader *http_range.ReadAtReader + archive *zip.Reader + err error + + files map[string]*zip.File } func (a *zipArchive) openArchive(ctx context.Context) error { - a.once.Do(func() { - a.zip, a.zipCloser, a.zipErr = openZIPArchive(a.path) - if a.zip != nil { - a.processZip() - } - close(a.done) - }) + a.once.Do(a.readArchive) // wait for it to close // or exit early @@ -42,31 +40,41 @@ func (a *zipArchive) openArchive(ctx context.Context) error { case <-a.done: case <-ctx.Done(): } - return a.zipErr + return a.err } -func (a *zipArchive) processZip() { - for _, file := range a.zip.File { - if !strings.HasPrefix(file.Name, dirPrefix) { - continue +func (a *zipArchive) readArchive() { + a.resource, a.err = http_range.NewResource(context.Background(), a.path) + if a.err != nil { + return + } + + a.reader = http_range.NewReadAt(a.resource) + a.reader.WithCachedReader(func() { + a.archive, a.err = zip.NewReader(a.reader, a.resource.Size) + }) + + if a.archive != nil { + for _, file := range a.archive.File { + if !strings.HasPrefix(file.Name, dirPrefix) { + continue + } + a.files[file.Name] = file } - a.files[file.Name] = file + + // recycle memory + a.archive.File = nil } - // recycle memory - a.zip.File = nil + close(a.done) } func (a *zipArchive) close() { - if a.zipCloser != nil { - a.zipCloser.Close() - } - a.zipCloser = nil - a.zip = nil + // no-op: everything can be GC recycled } func (a *zipArchive) findFile(name string) *zip.File { - name = filepath.Join("public", name) + name = filepath.Join(dirPrefix, name) if file := a.files[name]; file != nil { return file @@ -128,17 +136,8 @@ func (a *zipArchive) Open(ctx context.Context, name string) (vfs.File, error) { } // TODO: We can support `io.Seeker` if file would not be compressed - - if !isHTTPArchive(a.path) { - return file.Open() - } - - var reader io.ReadCloser - reader = &httpReader{ - URL: a.path, - Off: dataOffset, - N: int64(file.UncompressedSize64), - } + var reader vfs.File + reader = a.reader.SectionReader(dataOffset, int64(file.CompressedSize64)) switch file.Method { case zip.Deflate: diff --git a/internal/vfs/zip/archive_test.go b/internal/vfs/zip/archive_test.go index d550b6357..fe7b0d502 100644 --- a/internal/vfs/zip/archive_test.go +++ b/internal/vfs/zip/archive_test.go @@ -9,6 +9,8 @@ import ( "github.com/stretchr/testify/require" ) +const URL = "http://192.168.88.233:9000/test-bucket/doc-gitlab-com.zip?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=TEST_KEY%2F20200820%2F%2Fs3%2Faws4_request&X-Amz-Date=20200820T152420Z&X-Amz-Expires=432000&X-Amz-SignedHeaders=host&X-Amz-Signature=fcf49604f53564ce1648e5a0c2d8f1186ba3d9dd5e40d2c3244c57053e0348e9" + func TestOpenArchive(t *testing.T) { zip := newArchive(URL) defer zip.close() diff --git a/internal/vfs/zip/http_range/http_read_at.go b/internal/vfs/zip/http_range/http_read_at.go new file mode 100644 index 000000000..85d6205e9 --- /dev/null +++ b/internal/vfs/zip/http_range/http_read_at.go @@ -0,0 +1,54 @@ +package http_range + +import ( + "io" +) + +type ReadAtReader struct { + R *Resource + cachedReader *Reader +} + +func (h *ReadAtReader) cachedRead(p []byte, off int64) (n int, err error) { + if !h.cachedReader.WithinRange(off, int64(len(p))) { + h.cachedReader.Close() + h.cachedReader = NewReader(h.R, off, h.R.Size-off) + } + + return io.ReadFull(h.cachedReader, p) +} + +func (h *ReadAtReader) ephemeralRead(p []byte, off int64) (n int, err error) { + reader := NewReader(h.R, off, int64(len(p))) + defer reader.Close() + + return io.ReadFull(reader, p) +} + +func (h *ReadAtReader) SectionReader(off, n int64) *Reader { + return NewReader(h.R, off, n) +} + +func (h *ReadAtReader) ReadAt(p []byte, off int64) (n int, err error) { + if h.cachedReader != nil { + return h.cachedRead(p, off) + } + + return h.ephemeralRead(p, off) +} + +func (h *ReadAtReader) WithCachedReader(fn func()) { + h.cachedReader = NewReader(h.R, 0, h.R.Size) + + defer func() { + h.cachedReader.Close() + h.cachedReader = nil + }() + + fn() +} + +// NewReadAt creates a ReadAt object on a given resource +func NewReadAt(resource *Resource) *ReadAtReader { + return &ReadAtReader{R: resource} +} diff --git a/internal/vfs/zip/http_range/http_reader.go b/internal/vfs/zip/http_range/http_reader.go new file mode 100644 index 000000000..e9db1255b --- /dev/null +++ b/internal/vfs/zip/http_range/http_reader.go @@ -0,0 +1,129 @@ +package http_range + +import ( + "errors" + "fmt" + "io" + "net/http" + "time" + + "gitlab.com/gitlab-org/gitlab-pages/internal/httptransport" + "gitlab.com/gitlab-org/gitlab-pages/metrics" +) + +var ( + // ErrRangeRequestsNotSupported is returned by Seek and Read + // when the remote server does not allow range requests (Accept-Ranges was not set) + ErrRangeRequestsNotSupported = errors.New("range requests are not supported by the remote server") + // ErrInvalidRange is returned by Read when trying to read past the end of the file + ErrInvalidRange = errors.New("invalid range") + // ErrContentHasChanged is returned by Read when the content has changed since the first request + ErrContentHasChanged = errors.New("content has changed since first request") +) + +type Reader struct { + R *Resource + offset, size int64 + res *http.Response +} + +var httpClient = &http.Client{ + // TODO: we need connect timeout + // The longest time the request can be executed + Timeout: 30 * time.Minute, + Transport: httptransport.NewTransportWithMetrics(metrics.ZIPHttpReaderReqDuration, metrics.ZIPHttpReaderReqTotal), +} + +func (h *Reader) ensureRequest() (err error) { + if h.res != nil { + return nil + } + + if h.offset < 0 || h.size < 0 || h.offset+h.size > h.R.Size { + return ErrInvalidRange + } + + req, err := http.NewRequest("GET", h.R.URL, nil) + if err != nil { + return err + } + + if h.R.LastModified != "" { + req.Header.Set("If-Range", h.R.LastModified) + } else if h.R.Etag != "" { + req.Header.Set("If-Range", h.R.Etag) + } + + req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", h.offset, h.offset+h.size-1)) + + res, err := httpClient.Do(req) + if err != nil { + return err + } + + // cleanup body on failure to avoid memory leak + defer func() { + if err != nil { + res.Body.Close() + } + }() + + switch res.StatusCode { + case http.StatusOK: + // some servers return 200 OK for bytes=0- + if h.offset > 0 || h.R.Etag != "" && h.R.Etag != res.Header.Get("ETag") { + return ErrContentHasChanged + } + break + + case http.StatusPartialContent: + break + + case http.StatusRequestedRangeNotSatisfiable: + return ErrRangeRequestsNotSupported + + default: + return fmt.Errorf("failed with %d: %q", res.StatusCode, res.Status) + } + + h.res = res + return nil +} + +// WithinRange checks if a given data can be read efficiently +func (h *Reader) WithinRange(offset, n int64) bool { + return h.offset == offset && n <= h.size +} + +// Read reads a data into a given buffer +func (h *Reader) Read(p []byte) (int, error) { + if len(p) == 0 { + return 0, nil + } + + if err := h.ensureRequest(); err != nil { + return 0, err + } + + n, err := h.res.Body.Read(p) + + if err == nil || err == io.EOF { + h.offset += int64(n) + h.size -= int64(n) + } + return n, err +} + +// Close closes a requests body +func (h *Reader) Close() error { + if h.res != nil { + // TODO: should we read till end? + return h.res.Body.Close() + } + return nil +} + +// NewReader creates a Reader object on a given resource for a given range +func NewReader(resource *Resource, offset, n int64) *Reader { + return &Reader{R: resource, offset: offset, size: n} +} diff --git a/internal/vfs/zip/http_range/resource.go b/internal/vfs/zip/http_range/resource.go new file mode 100644 index 000000000..d35563b0f --- /dev/null +++ b/internal/vfs/zip/http_range/resource.go @@ -0,0 +1,70 @@ +package http_range + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "net/http" + "strconv" + "strings" +) + +type Resource struct { + URL string + Etag string + LastModified string + Size int64 +} + +func NewResource(ctx context.Context, URL string) (*Resource, error) { + // the `h.URL` is likely presigned only for GET + req, err := http.NewRequest("GET", URL, nil) + if err != nil { + return nil, err + } + + req = req.WithContext(ctx) + + // we fetch a single byte and ensure that range requests is additionally supported + req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", 0, 0)) + res, err := httpClient.Do(req) + if err != nil { + return nil, err + } + defer io.Copy(ioutil.Discard, res.Body) + defer res.Body.Close() + + resource := &Resource{ + URL: URL, + Etag: res.Header.Get("ETag"), + LastModified: res.Header.Get("Last-Modified"), + } + + switch res.StatusCode { + case http.StatusOK: + resource.Size = res.ContentLength + println(resource.URL, resource.Etag, resource.LastModified, resource.Size) + return resource, nil + + case http.StatusPartialContent: + contentRange := res.Header.Get("Content-Range") + ranges := strings.SplitN(contentRange, "/", 2) + if len(ranges) != 2 { + return nil, fmt.Errorf("invalid `Content-Range`: %q", contentRange) + } + + resource.Size, err = strconv.ParseInt(ranges[1], 0, 64) + if err != nil { + return nil, err + } + + return resource, nil + + case http.StatusRequestedRangeNotSatisfiable: + return nil, ErrRangeRequestsNotSupported + + default: + return nil, fmt.Errorf("failed with %d: %q", res.StatusCode, res.Status) + } +} diff --git a/internal/vfs/zip/http_read_at.go b/internal/vfs/zip/http_read_at.go deleted file mode 100644 index fa2194fe7..000000000 --- a/internal/vfs/zip/http_read_at.go +++ /dev/null @@ -1,58 +0,0 @@ -package zip - -import ( - "errors" - "io" -) - -type httpReadAt struct { - URL string - Size int64 - cached bool - cachedReader *httpReader -} - -func (h *httpReadAt) cachedRead(p []byte, off int64) (n int, err error) { - if off < 0 || off > h.Size { - return 0, errors.New("outside of bounds") - } - - if h.cachedReader != nil && (h.cachedReader.Off != off || h.cachedReader.N < int64(len(p))) { - h.cachedReader.Close() - h.cachedReader = nil - } - - if h.cachedReader == nil { - h.cachedReader = &httpReader{URL: h.URL, Off: off, N: h.Size - off} - } - - return io.ReadFull(h.cachedReader, p) -} - -func (h *httpReadAt) ephemeralRead(p []byte, off int64) (n int, err error) { - r := httpReader{URL: h.URL, Off: off, N: int64(len(p))} - defer r.Close() - - return io.ReadFull(&r, p) -} - -func (h *httpReadAt) ReadAt(p []byte, off int64) (n int, err error) { - if h.cached { - return h.cachedRead(p, off) - } - - return h.ephemeralRead(p, off) -} - -func (h *httpReadAt) withCachedReader(fn func()) { - h.cached = true - - defer func() { - if h.cachedReader != nil { - h.cachedReader.Close() - } - h.cached = false - }() - - fn() -} diff --git a/internal/vfs/zip/http_reader.go b/internal/vfs/zip/http_reader.go deleted file mode 100644 index d7824f838..000000000 --- a/internal/vfs/zip/http_reader.go +++ /dev/null @@ -1,81 +0,0 @@ -package zip - -import ( - "fmt" - "io" - "net/http" - "time" - - "gitlab.com/gitlab-org/gitlab-pages/internal/httptransport" - "gitlab.com/gitlab-org/gitlab-pages/metrics" -) - -type httpReader struct { - URL string - Off int64 - N int64 - res *http.Response -} - -var httpClient = &http.Client{ - // TODO: we need connect timeout - // The longest time the request can be executed - Timeout: 30 * time.Minute, - Transport: httptransport.NewTransportWithMetrics(metrics.ZIPHttpReaderReqDuration, metrics.ZIPHttpReaderReqTotal), -} - -var requests int - -func (h *httpReader) ensureRequest(requestedSize int) error { - if h.res != nil { - return nil - } - - req, err := http.NewRequest("GET", h.URL, nil) - if err != nil { - return err - } - requests++ - - req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", h.Off, h.Off+h.N-1)) - res, err := httpClient.Do(req) - if err != nil { - return err - } - - println("HTTP Request", "Range", "off=", h.Off, "n=", h.N, "requestedSize=", requestedSize, "statusCode=", res.StatusCode, "requests=", requests) - if res.StatusCode != http.StatusPartialContent { - res.Body.Close() - // TODO: sanitize URL - return fmt.Errorf("the %q failed with %d: %q", h.URL, res.StatusCode, res.Status) - } - - h.res = res - return nil -} - -func (h *httpReader) Read(p []byte) (int, error) { - if len(p) == 0 { - return 0, nil - } - - if err := h.ensureRequest(len(p)); err != nil { - return 0, err - } - - n, err := h.res.Body.Read(p) - - if err == nil || err == io.EOF { - h.Off += int64(n) - h.N -= int64(n) - } - return n, err -} - -func (h *httpReader) Close() error { - if h.res != nil { - // TODO: should we read till end? - return h.res.Body.Close() - } - return nil -} diff --git a/internal/vfs/zip/http_reader_test.go b/internal/vfs/zip/http_reader_test.go deleted file mode 100644 index 32fdc1dc8..000000000 --- a/internal/vfs/zip/http_reader_test.go +++ /dev/null @@ -1,47 +0,0 @@ -package zip - -import ( - "archive/zip" - "io/ioutil" - "testing" - - "github.com/stretchr/testify/require" -) - -//const URL = "https://storage.googleapis.com/gitlab-gprd-artifacts/5b/f5/5bf596ed115cd2bf53b9ac39f77563ee26275e449f62c0b96a0fdb5d719ac6da/2020_08_18/692126387/760066867/artifacts.zip?response-content-disposition=attachment%3B%20filename%3D%22artifacts.zip%22%3B%20filename%2A%3DUTF-8%27%27artifacts.zip&response-content-type=application%2Fzip&GoogleAccessId=gitlab-object-storage-prd@gitlab-production.iam.gserviceaccount.com&Signature=2yWByo4dT4Ic6fkrm2asb0TinnFHnELcciZ6qB6Nhc8eNyTYxaZe9aOQblwR%0AWz9yiI84zaD%2F0eZtiJI06OqGz3u%2Bchsc7Mn%2BEdthhmcR9lIUJrbQh96BUEJf%0A0GniiYOGhEdr2gK9sr%2FYPiX7jv4ABkMmyr%2BZdxCPd1%2F%2FnIGFVyTdX07CbsrI%0AYA67RGOez1w9RqcF0wAy5qKs57E9fXshc%2BWqRZD%2Fbtd4PysOHGAT47i7Vslt%0AuzcaGXyiN7Hl8Ckq3WimeacCoB9L%2FNntsLcwx3llKdE0gpzAH04vjiVi705p%0AKV16FuRsF4qbhjYcKjzUao33QVGGXpdOsikF7JrnmA%3D%3D&Expires=1597782702" -const URL = "http://192.168.88.233:9000/test-bucket/doc-gitlab-com.zip?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=TEST_KEY%2F20200818%2F%2Fs3%2Faws4_request&X-Amz-Date=20200818T173935Z&X-Amz-Expires=432000&X-Amz-SignedHeaders=host&X-Amz-Signature=95810918d1b2441a07385838ebba5a0f01fdf4dcdf94ea9c602f8e7d06c84019" - -func findZipFile(zip *zip.Reader, name string) *zip.File { - for _, file := range zip.File { - if file.Name == name { - return file - } - } - return nil -} - -func TestOpenArchiveHTTP(t *testing.T) { - zip, closer, err := openZIPHTTPArchive(URL) - require.NoError(t, err) - defer closer.Close() - - require.NotNil(t, zip) - require.NotEmpty(t, zip.File) - - sitemap := findZipFile(zip, "public/sitemap.xml") - require.NotNil(t, sitemap) - - println("DataOffset") - _, err = sitemap.DataOffset() - require.NoError(t, err) - - println("Open") - rc, err := sitemap.Open() - require.NoError(t, err) - defer rc.Close() - - println("ReadAll") - data, err := ioutil.ReadAll(rc) - require.NoError(t, err) - require.NotNil(t, data) -} diff --git a/internal/vfs/zip/http_zip.go b/internal/vfs/zip/http_zip.go deleted file mode 100644 index d17a0e7b7..000000000 --- a/internal/vfs/zip/http_zip.go +++ /dev/null @@ -1,74 +0,0 @@ -package zip - -import ( - "archive/zip" - "fmt" - "io" - "io/ioutil" - "net/http" - "strconv" - "strings" -) - -func isHTTPArchive(path string) bool { - return strings.HasPrefix(path, "http://") || strings.HasPrefix(path, "https://") -} - -func httpSize(path string) (int64, error) { - // the `h.URL` is likely presigned only for GET - req, err := http.NewRequest("GET", path, nil) - if err != nil { - return 0, err - } - - req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", 0, 0)) - res, err := httpClient.Do(req) - if err != nil { - return 0, err - } - defer io.Copy(ioutil.Discard, res.Body) - defer res.Body.Close() - - if res.StatusCode != http.StatusPartialContent { - // TODO: sanitize URL - return 0, fmt.Errorf("the %q failed with %d: %q", path, res.StatusCode, res.Status) - } - - contentRange := res.Header.Get("Content-Range") - ranges := strings.SplitN(contentRange, "/", 2) - if len(ranges) != 2 { - return 0, fmt.Errorf("the %q has invalid `Content-Range`: %q", path, contentRange) - } - - return strconv.ParseInt(ranges[1], 0, 64) -} - -func openZIPHTTPArchive(url string) (zipReader *zip.Reader, closer io.Closer, err error) { - size, err := httpSize(url) - if err != nil { - return nil, nil, err - } - - httpReader := &httpReadAt{URL: url, Size: size, cached: true} - httpReader.withCachedReader(func() { - zipReader, err = zip.NewReader(httpReader, size) - }) - - return zipReader, ioutil.NopCloser(nil), err -} - -func openZIPDiskArchive(path string) (*zip.Reader, io.Closer, error) { - r, err := zip.OpenReader(path) - if err != nil { - return nil, nil, err - } - return &r.Reader, r, nil -} - -func openZIPArchive(path string) (*zip.Reader, io.Closer, error) { - if isHTTPArchive(path) { - return openZIPHTTPArchive(path) - } - - return openZIPDiskArchive(path) -} -- GitLab