重构项目
This commit is contained in:
@@ -22,19 +22,32 @@ type CacheBackend struct {
|
||||
backend Backend
|
||||
cacheRepo *tools.Cache[map[string]string]
|
||||
cacheBranch *tools.Cache[map[string]*BranchInfo]
|
||||
|
||||
cacheBlob cache.Cache
|
||||
cacheBlobLimit uint64
|
||||
}
|
||||
|
||||
func (c *CacheBackend) Close() error {
|
||||
return c.backend.Close()
|
||||
}
|
||||
|
||||
func NewCacheBackend(backend Backend, cache kv.KV, ttl time.Duration) *CacheBackend {
|
||||
repoCache := tools.NewCache[map[string]string](cache, "repos", ttl)
|
||||
branchCache := tools.NewCache[map[string]*BranchInfo](cache, "branches", ttl)
|
||||
func NewCacheBackend(
|
||||
backend Backend,
|
||||
cacheMeta kv.KV,
|
||||
cacheMetaTtl time.Duration,
|
||||
|
||||
cacheBlob cache.Cache,
|
||||
cacheBlobLimit uint64,
|
||||
) *CacheBackend {
|
||||
repoCache := tools.NewCache[map[string]string](cacheMeta, "repos", cacheMetaTtl)
|
||||
branchCache := tools.NewCache[map[string]*BranchInfo](cacheMeta, "branches", cacheMetaTtl)
|
||||
return &CacheBackend{
|
||||
backend: backend,
|
||||
cacheRepo: repoCache,
|
||||
cacheBranch: branchCache,
|
||||
|
||||
cacheBlob: cacheBlob,
|
||||
cacheBlobLimit: cacheBlobLimit,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -76,37 +89,43 @@ func (c *CacheBackend) Branches(ctx context.Context, owner, repo string) (map[st
|
||||
}
|
||||
|
||||
func (c *CacheBackend) Open(ctx context.Context, client *http.Client, owner, repo, commit, path string, headers http.Header) (*http.Response, error) {
|
||||
return c.backend.Open(ctx, client, owner, repo, commit, path, headers)
|
||||
}
|
||||
|
||||
type CacheBackendBlobReader struct {
|
||||
client *http.Client
|
||||
cache cache.Cache
|
||||
base Backend
|
||||
limit uint64
|
||||
}
|
||||
|
||||
func NewCacheBackendBlobReader(
|
||||
client *http.Client,
|
||||
base Backend,
|
||||
cache cache.Cache,
|
||||
limit uint64,
|
||||
) *CacheBackendBlobReader {
|
||||
return &CacheBackendBlobReader{client: client, base: base, cache: cache, limit: limit}
|
||||
}
|
||||
|
||||
func (c *CacheBackendBlobReader) Open(ctx context.Context, owner, repo, commit, path string) (io.ReadCloser, error) {
|
||||
if headers != nil && headers.Get("Range") != "" {
|
||||
// ignore custom header
|
||||
return c.backend.Open(ctx, client, owner, repo, commit, path, headers)
|
||||
}
|
||||
key := fmt.Sprintf("%s/%s/%s/%s", owner, repo, commit, path)
|
||||
lastCache, err := c.cache.Get(ctx, key)
|
||||
lastCache, err := c.cacheBlob.Get(ctx, key)
|
||||
if err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||
return nil, err
|
||||
} else if lastCache == nil && err == nil {
|
||||
// 边界缓存
|
||||
return nil, os.ErrNotExist
|
||||
} else if lastCache != nil {
|
||||
return lastCache, nil
|
||||
h := lastCache.Metadata
|
||||
if h["Not-Found"] == "true" {
|
||||
return nil, os.ErrNotExist
|
||||
}
|
||||
respHeader := make(http.Header)
|
||||
respHeader.Set("Last-Modified", h["Last-Modified"])
|
||||
respHeader.Set("Content-Type", h["Content-Type"])
|
||||
respHeader.Set("Content-Length", h["Content-Length"])
|
||||
atoi, err := strconv.Atoi(h["Content-Length"])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &http.Response{
|
||||
Status: "200 OK",
|
||||
StatusCode: 200,
|
||||
Proto: "HTTP/1.1",
|
||||
ProtoMajor: 1,
|
||||
ProtoMinor: 1,
|
||||
Body: lastCache,
|
||||
ContentLength: int64(atoi),
|
||||
Request: nil,
|
||||
Header: respHeader,
|
||||
}, nil
|
||||
}
|
||||
open, err := c.base.Open(ctx, c.client, owner, repo, commit, path, http.Header{})
|
||||
open, err := c.backend.Open(ctx, client, owner, repo, commit, path, http.Header{})
|
||||
if err != nil || open == nil {
|
||||
if open != nil {
|
||||
_ = open.Body.Close()
|
||||
@@ -119,38 +138,33 @@ func (c *CacheBackendBlobReader) Open(ctx context.Context, owner, repo, commit,
|
||||
_ = open.Body.Close()
|
||||
return nil, os.ErrNotExist
|
||||
}
|
||||
|
||||
lastMod, err := time.Parse(http.TimeFormat, open.Header.Get("Last-Modified"))
|
||||
if err != nil {
|
||||
// 无时间,跳过
|
||||
return open.Body, nil
|
||||
}
|
||||
length, err := strconv.ParseUint(open.Header.Get("Content-Length"), 10, 64)
|
||||
// 无法计算大小,跳过
|
||||
if err != nil {
|
||||
return open.Body, nil
|
||||
return open, nil
|
||||
}
|
||||
if length > c.limit {
|
||||
if length > c.cacheBlobLimit {
|
||||
// 超过最大大小,跳过
|
||||
return &utils.SizeReadCloser{
|
||||
open.Body = &utils.SizeReadCloser{
|
||||
ReadCloser: open.Body,
|
||||
Size: length,
|
||||
}, nil
|
||||
}
|
||||
return open, nil
|
||||
}
|
||||
|
||||
defer open.Body.Close()
|
||||
allBytes, err := io.ReadAll(open.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = c.cache.Put(ctx, key, bytes.NewBuffer(allBytes), time.Hour); err != nil {
|
||||
zap.L().Warn("缓存归档失败", zap.Error(err), zap.Int("Size", len(allBytes)), zap.Uint64("MaxSize", c.limit))
|
||||
if err = c.cacheBlob.Put(ctx, key, map[string]string{
|
||||
"Content-Length": open.Header.Get("Content-Length"),
|
||||
"Last-Modified": open.Header.Get("Last-Modified"),
|
||||
"Content-Type": open.Header.Get("Content-Type"),
|
||||
}, bytes.NewBuffer(allBytes), time.Hour); err != nil {
|
||||
zap.L().Warn("缓存归档失败", zap.Error(err), zap.Int("Size", len(allBytes)), zap.Uint64("MaxSize", c.cacheBlobLimit))
|
||||
}
|
||||
return &cache.Content{
|
||||
ReadSeekCloser: utils.NopCloser{
|
||||
ReadSeeker: bytes.NewReader(allBytes),
|
||||
},
|
||||
LastModified: lastMod,
|
||||
Length: length,
|
||||
}, nil
|
||||
open.Body = utils.NopCloser{
|
||||
ReadSeeker: bytes.NewReader(allBytes),
|
||||
}
|
||||
return open, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user