feat: optimize caching strategy and implement concurrency limits

This commit is contained in:
ExplodingDragon
2026-01-31 23:14:15 +08:00
parent 1aec59bfad
commit 662370e018
10 changed files with 148 additions and 41 deletions

View File

@@ -75,7 +75,7 @@ func main() {
provider, domain, memory, provider, domain, memory,
pkg.WithClient(http.DefaultClient), pkg.WithClient(http.DefaultClient),
pkg.WithEvent(subscriber), pkg.WithEvent(subscriber),
pkg.WithMetaCache(memory, 0), pkg.WithMetaCache(memory, 0, 0),
pkg.WithBlobCache(&nopCache{}, 0), pkg.WithBlobCache(&nopCache{}, 0),
pkg.WithErrorHandler(func(w http.ResponseWriter, r *http.Request, err error) { pkg.WithErrorHandler(func(w http.ResponseWriter, r *http.Request, err error) {
if errors.Is(err, os.ErrNotExist) { if errors.Is(err, os.ErrNotExist) {

View File

@@ -84,12 +84,15 @@ type ConfigEvent struct {
} }
type ConfigCache struct { type ConfigCache struct {
Meta string `yaml:"meta"` // 元数据缓存 Meta string `yaml:"meta"` // 元数据缓存
MetaTTL time.Duration `yaml:"meta_ttl"` // 缓存时间 MetaTTL time.Duration `yaml:"meta_ttl"` // 缓存时间
MetaRefresh time.Duration `yaml:"meta_refresh"` // 刷新时间
Blob string `yaml:"blob"` // 缓存归档位置 Blob string `yaml:"blob"` // 缓存归档位置
BlobTTL time.Duration `yaml:"blob_ttl"` // 缓存归档位置 BlobTTL time.Duration `yaml:"blob_ttl"` // 缓存归档位置
BlobLimit units.Base2Bytes `yaml:"blob_limit"` // 单个文件最大大小 BlobLimit units.Base2Bytes `yaml:"blob_limit"` // 单个文件最大大小
BlobConcurrent uint64 `yaml:"blob_concurrent"` // 并发缓存限制
BackendConcurrent uint64 `yaml:"backend_concurrent"` // 并发后端请求限制
} }
func LoadConfig(path string) (*Config, error) { func LoadConfig(path string) (*Config, error) {

View File

@@ -52,7 +52,10 @@ func main() {
} }
defer cacheBlob.Close() defer cacheBlob.Close()
backend := providers.NewProviderCache(gitea, backend := providers.NewProviderCache(gitea,
cacheBlob.Child("backend"), uint64(config.Cache.BlobLimit), cacheBlob.Child("backend"),
uint64(config.Cache.BlobLimit),
config.Cache.BlobConcurrent,
config.Cache.BackendConcurrent,
) )
defer backend.Close() defer backend.Close()
db, err := kv.NewKVFromURL(config.Database.URL) db, err := kv.NewKVFromURL(config.Database.URL)
@@ -74,7 +77,7 @@ func main() {
db, db,
pkg.WithClient(http.DefaultClient), pkg.WithClient(http.DefaultClient),
pkg.WithEvent(event), pkg.WithEvent(event),
pkg.WithMetaCache(cacheMeta, config.Cache.MetaTTL), pkg.WithMetaCache(cacheMeta, config.Cache.MetaTTL, config.Cache.MetaRefresh),
pkg.WithBlobCache(cacheBlob.Child("filter"), config.Cache.BlobTTL), pkg.WithBlobCache(cacheBlob.Child("filter"), config.Cache.BlobTTL),
pkg.WithErrorHandler(config.ErrorHandler), pkg.WithErrorHandler(config.ErrorHandler),
pkg.WithFilterConfig(config.Filters), pkg.WithFilterConfig(config.Filters),

2
go.mod
View File

@@ -15,7 +15,7 @@ require (
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.11.1 github.com/stretchr/testify v1.11.1
go.uber.org/zap v1.27.1 go.uber.org/zap v1.27.1
gopkg.d7z.net/middleware v0.0.0-20260131122058-3c200930af2d gopkg.d7z.net/middleware v0.0.0-20260131134426-cea18952b028
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
) )

2
go.sum
View File

@@ -166,6 +166,8 @@ google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBN
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.d7z.net/middleware v0.0.0-20260131122058-3c200930af2d h1:nNXZgc02tab2+WuAn+ZN2pHYORm+q7vWvVgR1GaMUPg= gopkg.d7z.net/middleware v0.0.0-20260131122058-3c200930af2d h1:nNXZgc02tab2+WuAn+ZN2pHYORm+q7vWvVgR1GaMUPg=
gopkg.d7z.net/middleware v0.0.0-20260131122058-3c200930af2d/go.mod h1:TDqvtfgaXzOvm9gbG8t5FF0AKSKve8pcE9uBVix+1pU= gopkg.d7z.net/middleware v0.0.0-20260131122058-3c200930af2d/go.mod h1:TDqvtfgaXzOvm9gbG8t5FF0AKSKve8pcE9uBVix+1pU=
gopkg.d7z.net/middleware v0.0.0-20260131134426-cea18952b028 h1:BPm7q2ys8IPHAQe01HBSkYH+2itXuP6DvVPZlg45tM4=
gopkg.d7z.net/middleware v0.0.0-20260131134426-cea18952b028/go.mod h1:TDqvtfgaXzOvm9gbG8t5FF0AKSKve8pcE9uBVix+1pU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=

View File

@@ -26,9 +26,10 @@ type ServerMeta struct {
Domain string Domain string
Alias *DomainAlias Alias *DomainAlias
client *http.Client client *http.Client
cache *tools.KVCache[PageMetaContent] cache *tools.KVCache[PageMetaContent]
locker *utils.Locker locker *utils.Locker
refresh time.Duration
} }
// PageConfig 配置 // PageConfig 配置
@@ -38,6 +39,7 @@ type PageMetaContent struct {
LastModified time.Time `json:"last_modified"` // 上次更新时间 LastModified time.Time `json:"last_modified"` // 上次更新时间
IsPage bool `json:"is_page"` // 是否为 Page IsPage bool `json:"is_page"` // 是否为 Page
ErrorMsg string `json:"error"` // 错误消息 (作为 500 错误日志暴露至前端) ErrorMsg string `json:"error"` // 错误消息 (作为 500 错误日志暴露至前端)
RefreshAt time.Time `json:"refresh_at"` // 下次刷新时间
Alias []string `json:"alias"` // alias Alias []string `json:"alias"` // alias
Filters []Filter `json:"filters"` // 路由消息 Filters []Filter `json:"filters"` // 路由消息
@@ -45,7 +47,8 @@ type PageMetaContent struct {
func NewEmptyPageMetaContent() *PageMetaContent { func NewEmptyPageMetaContent() *PageMetaContent {
return &PageMetaContent{ return &PageMetaContent{
IsPage: false, IsPage: false,
RefreshAt: time.Now(),
Filters: []Filter{ Filters: []Filter{
{ {
Path: "**", Path: "**",
@@ -78,6 +81,7 @@ func NewServerMeta(
alias *DomainAlias, alias *DomainAlias,
cache kv.KV, cache kv.KV,
ttl time.Duration, ttl time.Duration,
refresh time.Duration,
) *ServerMeta { ) *ServerMeta {
return &ServerMeta{ return &ServerMeta{
Backend: backend, Backend: backend,
@@ -86,32 +90,58 @@ func NewServerMeta(
client: client, client: client,
cache: tools.NewCache[PageMetaContent](cache, "meta", ttl), cache: tools.NewCache[PageMetaContent](cache, "meta", ttl),
locker: utils.NewLocker(), locker: utils.NewLocker(),
refresh: refresh,
} }
} }
func (s *ServerMeta) GetMeta(ctx context.Context, owner, repo string) (*PageMetaContent, error) { func (s *ServerMeta) GetMeta(ctx context.Context, owner, repo string) (*PageMetaContent, error) {
key := fmt.Sprintf("%s/%s", owner, repo) key := fmt.Sprintf("%s/%s", owner, repo)
if cache, found := s.cache.Load(ctx, key); found { if cache, found := s.cache.Load(ctx, key); found {
if time.Now().After(cache.RefreshAt) {
// 异步刷新
mux := s.locker.Open(key)
if mux.TryLock() {
go func() {
defer mux.Unlock()
bgCtx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
_, _ = s.updateMetaWithLock(bgCtx, owner, repo)
}()
}
}
if cache.IsPage { if cache.IsPage {
return &cache, nil return &cache, nil
} }
return nil, os.ErrNotExist return nil, os.ErrNotExist
} }
return s.updateMeta(ctx, owner, repo)
}
func (s *ServerMeta) updateMeta(ctx context.Context, owner, repo string) (*PageMetaContent, error) {
key := fmt.Sprintf("%s/%s", owner, repo)
mux := s.locker.Open(key) mux := s.locker.Open(key)
mux.Lock() mux.Lock()
defer mux.Unlock() defer mux.Unlock()
if cache, found := s.cache.Load(ctx, key); found { return s.updateMetaWithLock(ctx, owner, repo)
}
func (s *ServerMeta) updateMetaWithLock(ctx context.Context, owner, repo string) (*PageMetaContent, error) {
key := fmt.Sprintf("%s/%s", owner, repo)
// 再次检查缓存
if cache, found := s.cache.Load(ctx, key); found && time.Now().Before(cache.RefreshAt) {
if cache.IsPage { if cache.IsPage {
return &cache, nil return &cache, nil
} }
return nil, os.ErrNotExist return nil, os.ErrNotExist
} }
rel := NewEmptyPageMetaContent() rel := NewEmptyPageMetaContent()
info, err := s.Meta(ctx, owner, repo) info, err := s.Meta(ctx, owner, repo)
if err != nil { if err != nil {
if errors.Is(err, os.ErrNotExist) { if errors.Is(err, os.ErrNotExist) {
rel.IsPage = false rel.IsPage = false
rel.RefreshAt = time.Now().Add(s.refresh)
_ = s.cache.Store(ctx, key, *rel) _ = s.cache.Store(ctx, key, *rel)
} }
return nil, err return nil, err
@@ -119,6 +149,7 @@ func (s *ServerMeta) GetMeta(ctx context.Context, owner, repo string) (*PageMeta
vfs := NewPageVFS(s.Backend, owner, repo, info.ID) vfs := NewPageVFS(s.Backend, owner, repo, info.ID)
rel.CommitID = info.ID rel.CommitID = info.ID
rel.LastModified = info.LastModified rel.LastModified = info.LastModified
rel.RefreshAt = time.Now().Add(s.refresh)
// 检查是否存在 index.html // 检查是否存在 index.html
if exists, _ := vfs.Exists(ctx, "index.html"); !exists { if exists, _ := vfs.Exists(ctx, "index.html"); !exists {

View File

@@ -22,6 +22,8 @@ type ProviderCache struct {
cacheBlob cache.Cache cacheBlob cache.Cache
cacheBlobLimit uint64 cacheBlobLimit uint64
cacheSem chan struct{}
backendSem chan struct{}
} }
func (c *ProviderCache) Close() error { func (c *ProviderCache) Close() error {
@@ -32,11 +34,21 @@ func NewProviderCache(
backend core.Backend, backend core.Backend,
cacheBlob cache.Cache, cacheBlob cache.Cache,
cacheBlobLimit uint64, cacheBlobLimit uint64,
cacheConcurrent uint64,
backendConcurrent uint64,
) *ProviderCache { ) *ProviderCache {
if cacheConcurrent == 0 {
cacheConcurrent = 16 // 默认限制 16 个并发缓存操作
}
if backendConcurrent == 0 {
backendConcurrent = 64 // 默认限制 64 个并发后端请求
}
return &ProviderCache{ return &ProviderCache{
parent: backend, parent: backend,
cacheBlob: cacheBlob, cacheBlob: cacheBlob,
cacheBlobLimit: cacheBlobLimit, cacheBlobLimit: cacheBlobLimit,
cacheSem: make(chan struct{}, cacheConcurrent),
backendSem: make(chan struct{}, backendConcurrent),
} }
} }
@@ -81,6 +93,22 @@ func (c *ProviderCache) Open(ctx context.Context, owner, repo, id, path string,
Header: respHeader, Header: respHeader,
}, nil }, nil
} }
// 获取后端并发锁
select {
case c.backendSem <- struct{}{}:
case <-ctx.Done():
return nil, ctx.Err()
}
releaseBackend := func() { <-c.backendSem }
success := false
defer func() {
if !success {
releaseBackend()
}
}()
open, err := c.parent.Open(ctx, owner, repo, id, path, http.Header{}) open, err := c.parent.Open(ctx, owner, repo, id, path, http.Header{})
if err != nil || open == nil { if err != nil || open == nil {
if open != nil { if open != nil {
@@ -96,6 +124,14 @@ func (c *ProviderCache) Open(ctx context.Context, owner, repo, id, path string,
} }
return nil, err return nil, err
} }
// 包装 Body 以在关闭时释放信号量
open.Body = &utils.CloserWrapper{
ReadCloser: open.Body,
OnClose: releaseBackend,
}
success = true
if open.StatusCode == http.StatusNotFound { if open.StatusCode == http.StatusNotFound {
// 缓存404路由 // 缓存404路由
if err = c.cacheBlob.Put(ctx, key, map[string]string{ if err = c.cacheBlob.Put(ctx, key, map[string]string{
@@ -119,20 +155,34 @@ func (c *ProviderCache) Open(ctx context.Context, owner, repo, id, path string,
} }
return open, nil return open, nil
} }
defer open.Body.Close()
allBytes, err := io.ReadAll(open.Body) // 尝试获取信号量进行缓存
if err != nil { select {
return nil, err case c.cacheSem <- struct{}{}:
defer func() { <-c.cacheSem }()
defer open.Body.Close()
allBytes, err := io.ReadAll(open.Body)
if err != nil {
return nil, err
}
if err = c.cacheBlob.Put(ctx, key, map[string]string{
"Content-Length": open.Header.Get("Content-Length"),
"Last-Modified": open.Header.Get("Last-Modified"),
"Content-Type": open.Header.Get("Content-Type"),
}, bytes.NewBuffer(allBytes), time.Hour); err != nil {
zap.L().Warn("缓存归档失败", zap.Error(err), zap.Int("Size", len(allBytes)), zap.Uint64("MaxSize", c.cacheBlobLimit))
}
open.Body = utils.NopCloser{
ReadSeeker: bytes.NewReader(allBytes),
}
return open, nil
default:
// 无法获取信号量,直接流式返回,不进行缓存
zap.L().Debug("跳过缓存,并发限制已达", zap.String("path", path))
open.Body = &utils.SizeReadCloser{
ReadCloser: open.Body,
Size: length,
}
return open, nil
} }
if err = c.cacheBlob.Put(ctx, key, map[string]string{
"Content-Length": open.Header.Get("Content-Length"),
"Last-Modified": open.Header.Get("Last-Modified"),
"Content-Type": open.Header.Get("Content-Type"),
}, bytes.NewBuffer(allBytes), time.Hour); err != nil {
zap.L().Warn("缓存归档失败", zap.Error(err), zap.Int("Size", len(allBytes)), zap.Uint64("MaxSize", c.cacheBlobLimit))
}
open.Body = utils.NopCloser{
ReadSeeker: bytes.NewReader(allBytes),
}
return open, nil
} }

View File

@@ -41,14 +41,15 @@ type Server struct {
} }
type serverConfig struct { type serverConfig struct {
client *http.Client client *http.Client
event subscribe.Subscriber event subscribe.Subscriber
cacheMeta kv.KV cacheMeta kv.KV
cacheMetaTTL time.Duration cacheMetaTTL time.Duration
cacheBlob cache.Cache cacheMetaRefresh time.Duration
cacheBlobTTL time.Duration cacheBlob cache.Cache
errorHandler func(w http.ResponseWriter, r *http.Request, err error) cacheBlobTTL time.Duration
filterConfig map[string]map[string]any errorHandler func(w http.ResponseWriter, r *http.Request, err error)
filterConfig map[string]map[string]any
} }
type ServerOption func(*serverConfig) type ServerOption func(*serverConfig)
@@ -65,10 +66,11 @@ func WithEvent(event subscribe.Subscriber) ServerOption {
} }
} }
func WithMetaCache(cache kv.KV, ttl time.Duration) ServerOption { func WithMetaCache(cache kv.KV, ttl time.Duration, refresh time.Duration) ServerOption {
return func(c *serverConfig) { return func(c *serverConfig) {
c.cacheMeta = cache c.cacheMeta = cache
c.cacheMetaTTL = ttl c.cacheMetaTTL = ttl
c.cacheMetaRefresh = refresh
} }
} }
@@ -117,6 +119,10 @@ func NewPageServer(
} }
} }
if cfg.cacheMetaRefresh == 0 {
cfg.cacheMetaRefresh = cfg.cacheMetaTTL / 2
}
if cfg.cacheBlob == nil { if cfg.cacheBlob == nil {
var err error var err error
cfg.cacheBlob, err = cache.NewMemoryCache(cache.MemoryCacheConfig{ cfg.cacheBlob, err = cache.NewMemoryCache(cache.MemoryCacheConfig{
@@ -135,7 +141,7 @@ func NewPageServer(
} }
alias := core.NewDomainAlias(db.Child("config", "alias")) alias := core.NewDomainAlias(db.Child("config", "alias"))
svcMeta := core.NewServerMeta(cfg.client, backend, domain, alias, cfg.cacheMeta, cfg.cacheMetaTTL) svcMeta := core.NewServerMeta(cfg.client, backend, domain, alias, cfg.cacheMeta, cfg.cacheMetaTTL, cfg.cacheMetaRefresh)
pageMeta := core.NewPageDomain(svcMeta, domain) pageMeta := core.NewPageDomain(svcMeta, domain)
globCache, err := lru.New[string, glob.Glob](512) globCache, err := lru.New[string, glob.Glob](512)
if err != nil { if err != nil {

View File

@@ -1,8 +1,20 @@
package utils package utils
import "io" import (
"io"
)
type SizeReadCloser struct { type SizeReadCloser struct {
io.ReadCloser io.ReadCloser
Size uint64 Size uint64
} }
type CloserWrapper struct {
io.ReadCloser
OnClose func()
}
func (c *CloserWrapper) Close() error {
defer c.OnClose()
return c.ReadCloser.Close()
}

View File

@@ -53,7 +53,7 @@ func NewTestServer(domain string) *TestServer {
memoryKV, memoryKV,
pkg.WithClient(http.DefaultClient), pkg.WithClient(http.DefaultClient),
pkg.WithEvent(subscribe.NewMemorySubscriber()), pkg.WithEvent(subscribe.NewMemorySubscriber()),
pkg.WithMetaCache(memoryKV.Child("cache"), 0), pkg.WithMetaCache(memoryKV.Child("cache"), 0, 0),
pkg.WithBlobCache(memoryCache, 0), pkg.WithBlobCache(memoryCache, 0),
pkg.WithErrorHandler(func(w http.ResponseWriter, r *http.Request, err error) { pkg.WithErrorHandler(func(w http.ResponseWriter, r *http.Request, err error) {
if errors.Is(err, os.ErrNotExist) { if errors.Is(err, os.ErrNotExist) {