From 662370e0184704898ab74ee3eec15cd904ea9198 Mon Sep 17 00:00:00 2001 From: ExplodingDragon Date: Sat, 31 Jan 2026 23:14:15 +0800 Subject: [PATCH] feat: optimize caching strategy and implement concurrency limits --- cmd/local/main.go | 2 +- cmd/server/config.go | 13 ++++--- cmd/server/main.go | 7 ++-- go.mod | 2 +- go.sum | 2 ++ pkg/core/meta.go | 41 +++++++++++++++++++--- pkg/providers/cache.go | 80 ++++++++++++++++++++++++++++++++++-------- pkg/server.go | 26 ++++++++------ pkg/utils/reader.go | 14 +++++++- tests/core/test.go | 2 +- 10 files changed, 148 insertions(+), 41 deletions(-) diff --git a/cmd/local/main.go b/cmd/local/main.go index 098ff39..7d25b2b 100644 --- a/cmd/local/main.go +++ b/cmd/local/main.go @@ -75,7 +75,7 @@ func main() { provider, domain, memory, pkg.WithClient(http.DefaultClient), pkg.WithEvent(subscriber), - pkg.WithMetaCache(memory, 0), + pkg.WithMetaCache(memory, 0, 0), pkg.WithBlobCache(&nopCache{}, 0), pkg.WithErrorHandler(func(w http.ResponseWriter, r *http.Request, err error) { if errors.Is(err, os.ErrNotExist) { diff --git a/cmd/server/config.go b/cmd/server/config.go index 724c712..4d5274c 100644 --- a/cmd/server/config.go +++ b/cmd/server/config.go @@ -84,12 +84,15 @@ type ConfigEvent struct { } type ConfigCache struct { - Meta string `yaml:"meta"` // 元数据缓存 - MetaTTL time.Duration `yaml:"meta_ttl"` // 缓存时间 + Meta string `yaml:"meta"` // 元数据缓存 + MetaTTL time.Duration `yaml:"meta_ttl"` // 缓存时间 + MetaRefresh time.Duration `yaml:"meta_refresh"` // 刷新时间 - Blob string `yaml:"blob"` // 缓存归档位置 - BlobTTL time.Duration `yaml:"blob_ttl"` // 缓存归档位置 - BlobLimit units.Base2Bytes `yaml:"blob_limit"` // 单个文件最大大小 + Blob string `yaml:"blob"` // 缓存归档位置 + BlobTTL time.Duration `yaml:"blob_ttl"` // 缓存归档位置 + BlobLimit units.Base2Bytes `yaml:"blob_limit"` // 单个文件最大大小 + BlobConcurrent uint64 `yaml:"blob_concurrent"` // 并发缓存限制 + BackendConcurrent uint64 `yaml:"backend_concurrent"` // 并发后端请求限制 } func LoadConfig(path string) (*Config, error) { diff --git a/cmd/server/main.go b/cmd/server/main.go index 179c490..a027afb 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -52,7 +52,10 @@ func main() { } defer cacheBlob.Close() backend := providers.NewProviderCache(gitea, - cacheBlob.Child("backend"), uint64(config.Cache.BlobLimit), + cacheBlob.Child("backend"), + uint64(config.Cache.BlobLimit), + config.Cache.BlobConcurrent, + config.Cache.BackendConcurrent, ) defer backend.Close() db, err := kv.NewKVFromURL(config.Database.URL) @@ -74,7 +77,7 @@ func main() { db, pkg.WithClient(http.DefaultClient), pkg.WithEvent(event), - pkg.WithMetaCache(cacheMeta, config.Cache.MetaTTL), + pkg.WithMetaCache(cacheMeta, config.Cache.MetaTTL, config.Cache.MetaRefresh), pkg.WithBlobCache(cacheBlob.Child("filter"), config.Cache.BlobTTL), pkg.WithErrorHandler(config.ErrorHandler), pkg.WithFilterConfig(config.Filters), diff --git a/go.mod b/go.mod index 40b6fb7..65c2dfa 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.11.1 go.uber.org/zap v1.27.1 - gopkg.d7z.net/middleware v0.0.0-20260131122058-3c200930af2d + gopkg.d7z.net/middleware v0.0.0-20260131134426-cea18952b028 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum index cc7b3f6..3be44e7 100644 --- a/go.sum +++ b/go.sum @@ -166,6 +166,8 @@ google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBN google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.d7z.net/middleware v0.0.0-20260131122058-3c200930af2d h1:nNXZgc02tab2+WuAn+ZN2pHYORm+q7vWvVgR1GaMUPg= gopkg.d7z.net/middleware v0.0.0-20260131122058-3c200930af2d/go.mod h1:TDqvtfgaXzOvm9gbG8t5FF0AKSKve8pcE9uBVix+1pU= +gopkg.d7z.net/middleware v0.0.0-20260131134426-cea18952b028 h1:BPm7q2ys8IPHAQe01HBSkYH+2itXuP6DvVPZlg45tM4= +gopkg.d7z.net/middleware v0.0.0-20260131134426-cea18952b028/go.mod h1:TDqvtfgaXzOvm9gbG8t5FF0AKSKve8pcE9uBVix+1pU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/pkg/core/meta.go b/pkg/core/meta.go index 563a7ad..7b113d7 100644 --- a/pkg/core/meta.go +++ b/pkg/core/meta.go @@ -26,9 +26,10 @@ type ServerMeta struct { Domain string Alias *DomainAlias - client *http.Client - cache *tools.KVCache[PageMetaContent] - locker *utils.Locker + client *http.Client + cache *tools.KVCache[PageMetaContent] + locker *utils.Locker + refresh time.Duration } // PageConfig 配置 @@ -38,6 +39,7 @@ type PageMetaContent struct { LastModified time.Time `json:"last_modified"` // 上次更新时间 IsPage bool `json:"is_page"` // 是否为 Page ErrorMsg string `json:"error"` // 错误消息 (作为 500 错误日志暴露至前端) + RefreshAt time.Time `json:"refresh_at"` // 下次刷新时间 Alias []string `json:"alias"` // alias Filters []Filter `json:"filters"` // 路由消息 @@ -45,7 +47,8 @@ type PageMetaContent struct { func NewEmptyPageMetaContent() *PageMetaContent { return &PageMetaContent{ - IsPage: false, + IsPage: false, + RefreshAt: time.Now(), Filters: []Filter{ { Path: "**", @@ -78,6 +81,7 @@ func NewServerMeta( alias *DomainAlias, cache kv.KV, ttl time.Duration, + refresh time.Duration, ) *ServerMeta { return &ServerMeta{ Backend: backend, @@ -86,32 +90,58 @@ func NewServerMeta( client: client, cache: tools.NewCache[PageMetaContent](cache, "meta", ttl), locker: utils.NewLocker(), + refresh: refresh, } } func (s *ServerMeta) GetMeta(ctx context.Context, owner, repo string) (*PageMetaContent, error) { key := fmt.Sprintf("%s/%s", owner, repo) if cache, found := s.cache.Load(ctx, key); found { + if time.Now().After(cache.RefreshAt) { + // 异步刷新 + mux := s.locker.Open(key) + if mux.TryLock() { + go func() { + defer mux.Unlock() + bgCtx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + _, _ = s.updateMetaWithLock(bgCtx, owner, repo) + }() + } + } if cache.IsPage { return &cache, nil } return nil, os.ErrNotExist } + return s.updateMeta(ctx, owner, repo) +} + +func (s *ServerMeta) updateMeta(ctx context.Context, owner, repo string) (*PageMetaContent, error) { + key := fmt.Sprintf("%s/%s", owner, repo) mux := s.locker.Open(key) mux.Lock() defer mux.Unlock() - if cache, found := s.cache.Load(ctx, key); found { + return s.updateMetaWithLock(ctx, owner, repo) +} + +func (s *ServerMeta) updateMetaWithLock(ctx context.Context, owner, repo string) (*PageMetaContent, error) { + key := fmt.Sprintf("%s/%s", owner, repo) + // 再次检查缓存 + if cache, found := s.cache.Load(ctx, key); found && time.Now().Before(cache.RefreshAt) { if cache.IsPage { return &cache, nil } return nil, os.ErrNotExist } + rel := NewEmptyPageMetaContent() info, err := s.Meta(ctx, owner, repo) if err != nil { if errors.Is(err, os.ErrNotExist) { rel.IsPage = false + rel.RefreshAt = time.Now().Add(s.refresh) _ = s.cache.Store(ctx, key, *rel) } return nil, err @@ -119,6 +149,7 @@ func (s *ServerMeta) GetMeta(ctx context.Context, owner, repo string) (*PageMeta vfs := NewPageVFS(s.Backend, owner, repo, info.ID) rel.CommitID = info.ID rel.LastModified = info.LastModified + rel.RefreshAt = time.Now().Add(s.refresh) // 检查是否存在 index.html if exists, _ := vfs.Exists(ctx, "index.html"); !exists { diff --git a/pkg/providers/cache.go b/pkg/providers/cache.go index e630a3e..2c1ce27 100644 --- a/pkg/providers/cache.go +++ b/pkg/providers/cache.go @@ -22,6 +22,8 @@ type ProviderCache struct { cacheBlob cache.Cache cacheBlobLimit uint64 + cacheSem chan struct{} + backendSem chan struct{} } func (c *ProviderCache) Close() error { @@ -32,11 +34,21 @@ func NewProviderCache( backend core.Backend, cacheBlob cache.Cache, cacheBlobLimit uint64, + cacheConcurrent uint64, + backendConcurrent uint64, ) *ProviderCache { + if cacheConcurrent == 0 { + cacheConcurrent = 16 // 默认限制 16 个并发缓存操作 + } + if backendConcurrent == 0 { + backendConcurrent = 64 // 默认限制 64 个并发后端请求 + } return &ProviderCache{ parent: backend, cacheBlob: cacheBlob, cacheBlobLimit: cacheBlobLimit, + cacheSem: make(chan struct{}, cacheConcurrent), + backendSem: make(chan struct{}, backendConcurrent), } } @@ -81,6 +93,22 @@ func (c *ProviderCache) Open(ctx context.Context, owner, repo, id, path string, Header: respHeader, }, nil } + + // 获取后端并发锁 + select { + case c.backendSem <- struct{}{}: + case <-ctx.Done(): + return nil, ctx.Err() + } + + releaseBackend := func() { <-c.backendSem } + success := false + defer func() { + if !success { + releaseBackend() + } + }() + open, err := c.parent.Open(ctx, owner, repo, id, path, http.Header{}) if err != nil || open == nil { if open != nil { @@ -96,6 +124,14 @@ func (c *ProviderCache) Open(ctx context.Context, owner, repo, id, path string, } return nil, err } + + // 包装 Body 以在关闭时释放信号量 + open.Body = &utils.CloserWrapper{ + ReadCloser: open.Body, + OnClose: releaseBackend, + } + success = true + if open.StatusCode == http.StatusNotFound { // 缓存404路由 if err = c.cacheBlob.Put(ctx, key, map[string]string{ @@ -119,20 +155,34 @@ func (c *ProviderCache) Open(ctx context.Context, owner, repo, id, path string, } return open, nil } - defer open.Body.Close() - allBytes, err := io.ReadAll(open.Body) - if err != nil { - return nil, err + + // 尝试获取信号量进行缓存 + select { + case c.cacheSem <- struct{}{}: + defer func() { <-c.cacheSem }() + defer open.Body.Close() + allBytes, err := io.ReadAll(open.Body) + if err != nil { + return nil, err + } + if err = c.cacheBlob.Put(ctx, key, map[string]string{ + "Content-Length": open.Header.Get("Content-Length"), + "Last-Modified": open.Header.Get("Last-Modified"), + "Content-Type": open.Header.Get("Content-Type"), + }, bytes.NewBuffer(allBytes), time.Hour); err != nil { + zap.L().Warn("缓存归档失败", zap.Error(err), zap.Int("Size", len(allBytes)), zap.Uint64("MaxSize", c.cacheBlobLimit)) + } + open.Body = utils.NopCloser{ + ReadSeeker: bytes.NewReader(allBytes), + } + return open, nil + default: + // 无法获取信号量,直接流式返回,不进行缓存 + zap.L().Debug("跳过缓存,并发限制已达", zap.String("path", path)) + open.Body = &utils.SizeReadCloser{ + ReadCloser: open.Body, + Size: length, + } + return open, nil } - if err = c.cacheBlob.Put(ctx, key, map[string]string{ - "Content-Length": open.Header.Get("Content-Length"), - "Last-Modified": open.Header.Get("Last-Modified"), - "Content-Type": open.Header.Get("Content-Type"), - }, bytes.NewBuffer(allBytes), time.Hour); err != nil { - zap.L().Warn("缓存归档失败", zap.Error(err), zap.Int("Size", len(allBytes)), zap.Uint64("MaxSize", c.cacheBlobLimit)) - } - open.Body = utils.NopCloser{ - ReadSeeker: bytes.NewReader(allBytes), - } - return open, nil } diff --git a/pkg/server.go b/pkg/server.go index 4408670..89287ab 100644 --- a/pkg/server.go +++ b/pkg/server.go @@ -41,14 +41,15 @@ type Server struct { } type serverConfig struct { - client *http.Client - event subscribe.Subscriber - cacheMeta kv.KV - cacheMetaTTL time.Duration - cacheBlob cache.Cache - cacheBlobTTL time.Duration - errorHandler func(w http.ResponseWriter, r *http.Request, err error) - filterConfig map[string]map[string]any + client *http.Client + event subscribe.Subscriber + cacheMeta kv.KV + cacheMetaTTL time.Duration + cacheMetaRefresh time.Duration + cacheBlob cache.Cache + cacheBlobTTL time.Duration + errorHandler func(w http.ResponseWriter, r *http.Request, err error) + filterConfig map[string]map[string]any } type ServerOption func(*serverConfig) @@ -65,10 +66,11 @@ func WithEvent(event subscribe.Subscriber) ServerOption { } } -func WithMetaCache(cache kv.KV, ttl time.Duration) ServerOption { +func WithMetaCache(cache kv.KV, ttl time.Duration, refresh time.Duration) ServerOption { return func(c *serverConfig) { c.cacheMeta = cache c.cacheMetaTTL = ttl + c.cacheMetaRefresh = refresh } } @@ -117,6 +119,10 @@ func NewPageServer( } } + if cfg.cacheMetaRefresh == 0 { + cfg.cacheMetaRefresh = cfg.cacheMetaTTL / 2 + } + if cfg.cacheBlob == nil { var err error cfg.cacheBlob, err = cache.NewMemoryCache(cache.MemoryCacheConfig{ @@ -135,7 +141,7 @@ func NewPageServer( } alias := core.NewDomainAlias(db.Child("config", "alias")) - svcMeta := core.NewServerMeta(cfg.client, backend, domain, alias, cfg.cacheMeta, cfg.cacheMetaTTL) + svcMeta := core.NewServerMeta(cfg.client, backend, domain, alias, cfg.cacheMeta, cfg.cacheMetaTTL, cfg.cacheMetaRefresh) pageMeta := core.NewPageDomain(svcMeta, domain) globCache, err := lru.New[string, glob.Glob](512) if err != nil { diff --git a/pkg/utils/reader.go b/pkg/utils/reader.go index 7b5a786..c8757f5 100644 --- a/pkg/utils/reader.go +++ b/pkg/utils/reader.go @@ -1,8 +1,20 @@ package utils -import "io" +import ( + "io" +) type SizeReadCloser struct { io.ReadCloser Size uint64 } + +type CloserWrapper struct { + io.ReadCloser + OnClose func() +} + +func (c *CloserWrapper) Close() error { + defer c.OnClose() + return c.ReadCloser.Close() +} diff --git a/tests/core/test.go b/tests/core/test.go index 6d68608..7c37bff 100644 --- a/tests/core/test.go +++ b/tests/core/test.go @@ -53,7 +53,7 @@ func NewTestServer(domain string) *TestServer { memoryKV, pkg.WithClient(http.DefaultClient), pkg.WithEvent(subscribe.NewMemorySubscriber()), - pkg.WithMetaCache(memoryKV.Child("cache"), 0), + pkg.WithMetaCache(memoryKV.Child("cache"), 0, 0), pkg.WithBlobCache(memoryCache, 0), pkg.WithErrorHandler(func(w http.ResponseWriter, r *http.Request, err error) { if errors.Is(err, os.ErrNotExist) {