diff --git a/internal/serving/disk/reader.go b/internal/serving/disk/reader.go index b9105f4b4b3cc6f72dce00857b762519adf53719..7c923fea53c46cf540054dda39d251fd388a3b5a 100644 --- a/internal/serving/disk/reader.go +++ b/internal/serving/disk/reader.go @@ -153,7 +153,15 @@ func (reader *Reader) serveFile(ctx context.Context, w http.ResponseWriter, r *h reader.fileSizeMetric.Observe(float64(fi.Size())) w.Header().Set("Content-Type", contentType) - http.ServeContent(w, r, origPath, fi.ModTime(), file) + + // TODO: Support it here + if rs, ok := file.(io.ReadSeeker); ok { + http.ServeContent(w, r, origPath, fi.ModTime(), rs) + } else { + // Support ReadSeeker if available + w.Header().Set("Content-Length", strconv.FormatInt(fi.Size(), 10)) + io.Copy(w, file) + } return nil } diff --git a/internal/serving/disk/serving.go b/internal/serving/disk/serving.go index e7cfcec768810d39c7119437e55b5519cc9bff4b..535e03cbd69dc0a2fa3d01c85a2792b49d7e4e3d 100644 --- a/internal/serving/disk/serving.go +++ b/internal/serving/disk/serving.go @@ -5,6 +5,7 @@ import ( "gitlab.com/gitlab-org/gitlab-pages/internal/serving" "gitlab.com/gitlab-org/gitlab-pages/internal/vfs" "gitlab.com/gitlab-org/gitlab-pages/internal/vfs/local" + "gitlab.com/gitlab-org/gitlab-pages/internal/vfs/zip" "gitlab.com/gitlab-org/gitlab-pages/metrics" ) @@ -16,6 +17,7 @@ var disk = &Disk{ vfs: map[string]vfs.VFS{ "": localVFS, // default to use if not specified "local": localVFS, + "zip": vfs.Instrumented(zip.New(), "zip"), }, }, } diff --git a/internal/source/disk/group.go b/internal/source/disk/group.go index 9ec4267363d744f39e9bfe7e5c0d99399470d6b5..b2d952d195fba341836f43a95a39d666e1733e7d 100644 --- a/internal/source/disk/group.go +++ b/internal/source/disk/group.go @@ -97,6 +97,9 @@ func (g *Group) Resolve(r *http.Request) (*serving.Request, error) { ProjectID: projectConfig.ID, } + lookupPath.VFS = "zip" + lookupPath.Path = "http://192.168.88.233:9000/test-bucket/doc-gitlab-com.zip?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=TEST_KEY%2F20200818%2F%2Fs3%2Faws4_request&X-Amz-Date=20200818T173935Z&X-Amz-Expires=432000&X-Amz-SignedHeaders=host&X-Amz-Signature=95810918d1b2441a07385838ebba5a0f01fdf4dcdf94ea9c602f8e7d06c84019" + return &serving.Request{ Serving: disk.Instance(), LookupPath: lookupPath, diff --git a/internal/vfs/file.go b/internal/vfs/file.go index 5260c847e3e5c18151ea5227b27403b2309bde04..3e06130cbf6081e448043e8b593297e3282ca1e4 100644 --- a/internal/vfs/file.go +++ b/internal/vfs/file.go @@ -5,6 +5,7 @@ import "io" // File represents an open file, which will typically be the response body of a Pages request. type File interface { io.Reader - io.Seeker + // TODO: this is currently unsupported + // io.Seeker io.Closer } diff --git a/internal/vfs/zip/archive.go b/internal/vfs/zip/archive.go new file mode 100644 index 0000000000000000000000000000000000000000..627c59f2ab2df7414e7f6b2cc6d73139eba76d16 --- /dev/null +++ b/internal/vfs/zip/archive.go @@ -0,0 +1,162 @@ +package zip + +import ( + "archive/zip" + "context" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "strings" + "sync" + + "gitlab.com/gitlab-org/gitlab-pages/internal/vfs" + "gitlab.com/gitlab-org/gitlab-pages/internal/vfs/zip/http_range" +) + +const dirPrefix = "public/" +const maxSymlinkSize = 256 + +type zipArchive struct { + path string + once sync.Once + done chan struct{} + + resource *http_range.Resource + reader *http_range.ReadAtReader + archive *zip.Reader + err error + + files map[string]*zip.File +} + +func (a *zipArchive) openArchive(ctx context.Context) error { + a.once.Do(a.readArchive) + + // wait for it to close + // or exit early + select { + case <-a.done: + case <-ctx.Done(): + } + return a.err +} + +func (a *zipArchive) readArchive() { + a.resource, a.err = http_range.NewResource(context.Background(), a.path) + if a.err != nil { + return + } + + a.reader = http_range.NewReadAt(a.resource) + a.reader.WithCachedReader(func() { + a.archive, a.err = zip.NewReader(a.reader, a.resource.Size) + }) + + if a.archive != nil { + for _, file := range a.archive.File { + if !strings.HasPrefix(file.Name, dirPrefix) { + continue + } + a.files[file.Name] = file + } + + // recycle memory + a.archive.File = nil + } + + close(a.done) +} + +func (a *zipArchive) close() { + // no-op: everything can be GC recycled +} + +func (a *zipArchive) findFile(name string) *zip.File { + name = filepath.Join(dirPrefix, name) + + if file := a.files[name]; file != nil { + return file + } + + if dir := a.files[name+"/"]; dir != nil { + return dir + } + + return nil +} + +func (a *zipArchive) Lstat(ctx context.Context, name string) (os.FileInfo, error) { + file := a.findFile(name) + if file == nil { + return nil, os.ErrNotExist + } + + return file.FileInfo(), nil +} + +func (a *zipArchive) Readlink(ctx context.Context, name string) (string, error) { + file := a.findFile(name) + if file == nil { + return "", os.ErrNotExist + } + + if file.FileInfo().Mode()&os.ModeSymlink != os.ModeSymlink { + return "", errors.New("not a symlink") + } + + rc, err := file.Open() + if err != nil { + return "", err + } + defer rc.Close() + + symlink := make([]byte, maxSymlinkSize+1) + _, err = io.ReadFull(rc, symlink) + if err != nil { + return "", err + } + if len(symlink) > maxSymlinkSize { + return "", errors.New("symlink too long") + } + + return string(symlink), nil +} + +func (a *zipArchive) Open(ctx context.Context, name string) (vfs.File, error) { + file := a.findFile(name) + if file == nil { + return nil, os.ErrNotExist + } + + dataOffset, err := file.DataOffset() + if err != nil { + return nil, err + } + + // TODO: We can support `io.Seeker` if file would not be compressed + var reader vfs.File + reader = a.reader.SectionReader(dataOffset, int64(file.CompressedSize64)) + + switch file.Method { + case zip.Deflate: + reader = newDeflateReader(reader) + + case zip.Store: + // no-op + + default: + return nil, fmt.Errorf("unsupported compression: %x", file.Method) + } + + return reader, nil +} + +func newArchive(path string) *zipArchive { + return &zipArchive{ + path: path, + done: make(chan struct{}), + files: make(map[string]*zip.File), + } +} diff --git a/internal/vfs/zip/archive_test.go b/internal/vfs/zip/archive_test.go new file mode 100644 index 0000000000000000000000000000000000000000..fe7b0d502587df50514e9bf8c82ac1c783324b1c --- /dev/null +++ b/internal/vfs/zip/archive_test.go @@ -0,0 +1,37 @@ +package zip + +import ( + "context" + "io/ioutil" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +const URL = "http://192.168.88.233:9000/test-bucket/doc-gitlab-com.zip?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=TEST_KEY%2F20200820%2F%2Fs3%2Faws4_request&X-Amz-Date=20200820T152420Z&X-Amz-Expires=432000&X-Amz-SignedHeaders=host&X-Amz-Signature=fcf49604f53564ce1648e5a0c2d8f1186ba3d9dd5e40d2c3244c57053e0348e9" + +func TestOpenArchive(t *testing.T) { + zip := newArchive(URL) + defer zip.close() + + println("OpenArchive") + ts := time.Now() + err := zip.openArchive(context.Background()) + println(time.Since(ts).String()) + require.NoError(t, err) + + println("Open") + ts = time.Now() + rc, err := zip.Open(context.Background(), "sitemap.xml") + println(time.Since(ts).String()) + require.NoError(t, err) + + println("ReadAll") + ts = time.Now() + data, err := ioutil.ReadAll(rc) + println(time.Since(ts).String()) + require.NoError(t, err) + require.NotNil(t, data) + require.Contains(t, string(data), "") +} diff --git a/internal/vfs/zip/deflate_reader.go b/internal/vfs/zip/deflate_reader.go new file mode 100644 index 0000000000000000000000000000000000000000..2e55ee5a307a703d1bad1de71083c5be51b4edb0 --- /dev/null +++ b/internal/vfs/zip/deflate_reader.go @@ -0,0 +1,27 @@ +package zip + +import ( + "compress/flate" + "io" +) + +type deflateReader struct { + R io.ReadCloser + D io.ReadCloser +} + +func (r *deflateReader) Read(p []byte) (n int, err error) { + return r.D.Read(p) +} + +func (r *deflateReader) Close() error { + r.R.Close() + return r.D.Close() +} + +func newDeflateReader(r io.ReadCloser) *deflateReader { + return &deflateReader{ + R: r, + D: flate.NewReader(r), + } +} diff --git a/internal/vfs/zip/http_range/http_read_at.go b/internal/vfs/zip/http_range/http_read_at.go new file mode 100644 index 0000000000000000000000000000000000000000..85d6205e9a43760f904f2b8de30391f1a457b247 --- /dev/null +++ b/internal/vfs/zip/http_range/http_read_at.go @@ -0,0 +1,54 @@ +package http_range + +import ( + "io" +) + +type ReadAtReader struct { + R *Resource + cachedReader *Reader +} + +func (h *ReadAtReader) cachedRead(p []byte, off int64) (n int, err error) { + if !h.cachedReader.WithinRange(off, int64(len(p))) { + h.cachedReader.Close() + h.cachedReader = NewReader(h.R, off, h.R.Size-off) + } + + return io.ReadFull(h.cachedReader, p) +} + +func (h *ReadAtReader) ephemeralRead(p []byte, off int64) (n int, err error) { + reader := NewReader(h.R, off, int64(len(p))) + defer reader.Close() + + return io.ReadFull(reader, p) +} + +func (h *ReadAtReader) SectionReader(off, n int64) *Reader { + return NewReader(h.R, off, n) +} + +func (h *ReadAtReader) ReadAt(p []byte, off int64) (n int, err error) { + if h.cachedReader != nil { + return h.cachedRead(p, off) + } + + return h.ephemeralRead(p, off) +} + +func (h *ReadAtReader) WithCachedReader(fn func()) { + h.cachedReader = NewReader(h.R, 0, h.R.Size) + + defer func() { + h.cachedReader.Close() + h.cachedReader = nil + }() + + fn() +} + +// NewReadAt creates a ReadAt object on a given resource +func NewReadAt(resource *Resource) *ReadAtReader { + return &ReadAtReader{R: resource} +} diff --git a/internal/vfs/zip/http_range/http_reader.go b/internal/vfs/zip/http_range/http_reader.go new file mode 100644 index 0000000000000000000000000000000000000000..e9db1255bc91225f9f6974e0053fb8ad2f8e6639 --- /dev/null +++ b/internal/vfs/zip/http_range/http_reader.go @@ -0,0 +1,129 @@ +package http_range + +import ( + "errors" + "fmt" + "io" + "net/http" + "time" + + "gitlab.com/gitlab-org/gitlab-pages/internal/httptransport" + "gitlab.com/gitlab-org/gitlab-pages/metrics" +) + +var ( + // ErrRangeRequestsNotSupported is returned by Seek and Read + // when the remote server does not allow range requests (Accept-Ranges was not set) + ErrRangeRequestsNotSupported = errors.New("range requests are not supported by the remote server") + // ErrInvalidRange is returned by Read when trying to read past the end of the file + ErrInvalidRange = errors.New("invalid range") + // ErrContentHasChanged is returned by Read when the content has changed since the first request + ErrContentHasChanged = errors.New("content has changed since first request") +) + +type Reader struct { + R *Resource + offset, size int64 + res *http.Response +} + +var httpClient = &http.Client{ + // TODO: we need connect timeout + // The longest time the request can be executed + Timeout: 30 * time.Minute, + Transport: httptransport.NewTransportWithMetrics(metrics.ZIPHttpReaderReqDuration, metrics.ZIPHttpReaderReqTotal), +} + +func (h *Reader) ensureRequest() (err error) { + if h.res != nil { + return nil + } + + if h.offset < 0 || h.size < 0 || h.offset+h.size > h.R.Size { + return ErrInvalidRange + } + + req, err := http.NewRequest("GET", h.R.URL, nil) + if err != nil { + return err + } + + if h.R.LastModified != "" { + req.Header.Set("If-Range", h.R.LastModified) + } else if h.R.Etag != "" { + req.Header.Set("If-Range", h.R.Etag) + } + + req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", h.offset, h.offset+h.size-1)) + + res, err := httpClient.Do(req) + if err != nil { + return err + } + + // cleanup body on failure to avoid memory leak + defer func() { + if err != nil { + res.Body.Close() + } + }() + + switch res.StatusCode { + case http.StatusOK: + // some servers return 200 OK for bytes=0- + if h.offset > 0 || h.R.Etag != "" && h.R.Etag != res.Header.Get("ETag") { + return ErrContentHasChanged + } + break + + case http.StatusPartialContent: + break + + case http.StatusRequestedRangeNotSatisfiable: + return ErrRangeRequestsNotSupported + + default: + return fmt.Errorf("failed with %d: %q", res.StatusCode, res.Status) + } + + h.res = res + return nil +} + +// WithinRange checks if a given data can be read efficiently +func (h *Reader) WithinRange(offset, n int64) bool { + return h.offset == offset && n <= h.size +} + +// Read reads a data into a given buffer +func (h *Reader) Read(p []byte) (int, error) { + if len(p) == 0 { + return 0, nil + } + + if err := h.ensureRequest(); err != nil { + return 0, err + } + + n, err := h.res.Body.Read(p) + + if err == nil || err == io.EOF { + h.offset += int64(n) + h.size -= int64(n) + } + return n, err +} + +// Close closes a requests body +func (h *Reader) Close() error { + if h.res != nil { + // TODO: should we read till end? + return h.res.Body.Close() + } + return nil +} + +// NewReader creates a Reader object on a given resource for a given range +func NewReader(resource *Resource, offset, n int64) *Reader { + return &Reader{R: resource, offset: offset, size: n} +} diff --git a/internal/vfs/zip/http_range/resource.go b/internal/vfs/zip/http_range/resource.go new file mode 100644 index 0000000000000000000000000000000000000000..d35563b0fb0100448766d2e9f5e0fdbbad689239 --- /dev/null +++ b/internal/vfs/zip/http_range/resource.go @@ -0,0 +1,70 @@ +package http_range + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "net/http" + "strconv" + "strings" +) + +type Resource struct { + URL string + Etag string + LastModified string + Size int64 +} + +func NewResource(ctx context.Context, URL string) (*Resource, error) { + // the `h.URL` is likely presigned only for GET + req, err := http.NewRequest("GET", URL, nil) + if err != nil { + return nil, err + } + + req = req.WithContext(ctx) + + // we fetch a single byte and ensure that range requests is additionally supported + req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", 0, 0)) + res, err := httpClient.Do(req) + if err != nil { + return nil, err + } + defer io.Copy(ioutil.Discard, res.Body) + defer res.Body.Close() + + resource := &Resource{ + URL: URL, + Etag: res.Header.Get("ETag"), + LastModified: res.Header.Get("Last-Modified"), + } + + switch res.StatusCode { + case http.StatusOK: + resource.Size = res.ContentLength + println(resource.URL, resource.Etag, resource.LastModified, resource.Size) + return resource, nil + + case http.StatusPartialContent: + contentRange := res.Header.Get("Content-Range") + ranges := strings.SplitN(contentRange, "/", 2) + if len(ranges) != 2 { + return nil, fmt.Errorf("invalid `Content-Range`: %q", contentRange) + } + + resource.Size, err = strconv.ParseInt(ranges[1], 0, 64) + if err != nil { + return nil, err + } + + return resource, nil + + case http.StatusRequestedRangeNotSatisfiable: + return nil, ErrRangeRequestsNotSupported + + default: + return nil, fmt.Errorf("failed with %d: %q", res.StatusCode, res.Status) + } +} diff --git a/internal/vfs/zip/vfs.go b/internal/vfs/zip/vfs.go new file mode 100644 index 0000000000000000000000000000000000000000..b5f4df7e5f46f21cc7e1a8a7d40798d026fa7fb9 --- /dev/null +++ b/internal/vfs/zip/vfs.go @@ -0,0 +1,56 @@ +package zip + +import ( + "context" + "time" + + "github.com/patrickmn/go-cache" + "gitlab.com/gitlab-org/gitlab-pages/internal/vfs" +) + +const cacheExpirationInterval = time.Minute +const cacheRefreshInterval = time.Minute / 2 +const cacheEvictInterval = time.Minute + +type zipVFS struct { + cache *cache.Cache +} + +func (fs *zipVFS) Root(ctx context.Context, path string) (vfs.Root, error) { + // we do it in loop to not use any additional locks + for { + dir, till, found := fs.cache.GetWithExpiration(path) + if found { + if till.Sub(time.Now()) < cacheRefreshInterval { + // refresh item + fs.cache.Set(path, dir, cache.DefaultExpiration) + } + } else { + dir = newArchive(path) + + // if it errors, it means that it is already added + // retry again to get it + if fs.cache.Add(path, dir, cache.DefaultExpiration) != nil { + continue + } + } + + zipDir := dir.(*zipArchive) + + err := zipDir.openArchive(ctx) + return zipDir, err + } +} + +func New() vfs.VFS { + vfs := &zipVFS{ + cache: cache.New(cacheExpirationInterval, cacheRefreshInterval), + } + + vfs.cache.OnEvicted(func(path string, object interface{}) { + if archive, ok := object.(*zipArchive); archive != nil && ok { + archive.close() + } + }) + return vfs +} diff --git a/metrics/metrics.go b/metrics/metrics.go index 0792a41f415f623b2607cae554d1837c9bf8c86a..f6f82014b54a65222268ad68c25b1ea609c1b7dd 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -77,6 +77,18 @@ var ( Help: "The time (in seconds) it takes to get a response from the GitLab domains API", }, []string{"status_code"}) + // DomainsSourceAPIReqTotal is the number of calls made to the Object Storage that returned a 4XX error + ZIPHttpReaderReqTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "gitlab_pages_zip_reader_requests_total", + Help: "The number of Object Storage API calls with different status codes", + }, []string{"status_code"}) + + // DomainsSourceAPICallDuration is the time it takes to get a response from the Object Storage in seconds + ZIPHttpReaderReqDuration = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "gitlab_pages_zip_reader_requests_duration", + Help: "The time (in seconds) it takes to get a response from the Object Storage", + }, []string{"status_code"}) + // DiskServingFileSize metric for file size serving. serving_types: disk and object_storage DiskServingFileSize = prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "gitlab_pages_disk_serving_file_size_bytes",