diff --git a/cmd/local/main.go b/cmd/local/main.go index 7d25b2b..efc2486 100644 --- a/cmd/local/main.go +++ b/cmd/local/main.go @@ -75,7 +75,7 @@ func main() { provider, domain, memory, pkg.WithClient(http.DefaultClient), pkg.WithEvent(subscriber), - pkg.WithMetaCache(memory, 0, 0), + pkg.WithMetaCache(memory, 0, 0, 0), pkg.WithBlobCache(&nopCache{}, 0), pkg.WithErrorHandler(func(w http.ResponseWriter, r *http.Request, err error) { if errors.Is(err, os.ErrNotExist) { diff --git a/cmd/server/config.go b/cmd/server/config.go index 4d5274c..2008c78 100644 --- a/cmd/server/config.go +++ b/cmd/server/config.go @@ -84,14 +84,16 @@ type ConfigEvent struct { } type ConfigCache struct { - Meta string `yaml:"meta"` // 元数据缓存 - MetaTTL time.Duration `yaml:"meta_ttl"` // 缓存时间 - MetaRefresh time.Duration `yaml:"meta_refresh"` // 刷新时间 + Meta string `yaml:"meta"` // 元数据缓存 + MetaTTL time.Duration `yaml:"meta_ttl"` // 缓存时间 + MetaRefresh time.Duration `yaml:"meta_refresh"` // 刷新时间 + MetaRefreshConcurrent int `yaml:"meta_refresh_concurrent"` // 并发刷新限制 Blob string `yaml:"blob"` // 缓存归档位置 BlobTTL time.Duration `yaml:"blob_ttl"` // 缓存归档位置 BlobLimit units.Base2Bytes `yaml:"blob_limit"` // 单个文件最大大小 BlobConcurrent uint64 `yaml:"blob_concurrent"` // 并发缓存限制 + BlobNotFoundTTL time.Duration `yaml:"blob_not_found_ttl"` // 404 缓存时间 BackendConcurrent uint64 `yaml:"backend_concurrent"` // 并发后端请求限制 } @@ -108,11 +110,8 @@ func LoadConfig(path string) (*Config, error) { return nil, err } - if c.Domain == "" { - return nil, errors.New("domain is required") - } if c.Database.URL == "" { - return nil, errors.New("c is required") + return nil, errors.New("database.url is required") } if c.Event.URL == "" { c.Event.URL = "memory://" diff --git a/cmd/server/main.go b/cmd/server/main.go index a027afb..f26d455 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -56,6 +56,7 @@ func main() { uint64(config.Cache.BlobLimit), config.Cache.BlobConcurrent, config.Cache.BackendConcurrent, + config.Cache.BlobNotFoundTTL, ) defer backend.Close() db, err := kv.NewKVFromURL(config.Database.URL) @@ -77,7 +78,7 @@ func main() { db, pkg.WithClient(http.DefaultClient), pkg.WithEvent(event), - pkg.WithMetaCache(cacheMeta, config.Cache.MetaTTL, config.Cache.MetaRefresh), + pkg.WithMetaCache(cacheMeta, config.Cache.MetaTTL, config.Cache.MetaRefresh, config.Cache.MetaRefreshConcurrent), pkg.WithBlobCache(cacheBlob.Child("filter"), config.Cache.BlobTTL), pkg.WithErrorHandler(config.ErrorHandler), pkg.WithFilterConfig(config.Filters), diff --git a/pkg/core/meta.go b/pkg/core/meta.go index 7b113d7..9513c99 100644 --- a/pkg/core/meta.go +++ b/pkg/core/meta.go @@ -26,10 +26,11 @@ type ServerMeta struct { Domain string Alias *DomainAlias - client *http.Client - cache *tools.KVCache[PageMetaContent] - locker *utils.Locker - refresh time.Duration + client *http.Client + cache *tools.KVCache[PageMetaContent] + locker *utils.Locker + refresh time.Duration + refreshSem chan struct{} } // PageConfig 配置 @@ -82,15 +83,20 @@ func NewServerMeta( cache kv.KV, ttl time.Duration, refresh time.Duration, + refreshConcurrent int, ) *ServerMeta { + if refreshConcurrent <= 0 { + refreshConcurrent = 16 + } return &ServerMeta{ - Backend: backend, - Domain: domain, - Alias: alias, - client: client, - cache: tools.NewCache[PageMetaContent](cache, "meta", ttl), - locker: utils.NewLocker(), - refresh: refresh, + Backend: backend, + Domain: domain, + Alias: alias, + client: client, + cache: tools.NewCache[PageMetaContent](cache, "meta", ttl), + locker: utils.NewLocker(), + refresh: refresh, + refreshSem: make(chan struct{}, refreshConcurrent), } } @@ -101,12 +107,19 @@ func (s *ServerMeta) GetMeta(ctx context.Context, owner, repo string) (*PageMeta // 异步刷新 mux := s.locker.Open(key) if mux.TryLock() { - go func() { - defer mux.Unlock() - bgCtx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - _, _ = s.updateMetaWithLock(bgCtx, owner, repo) - }() + select { + case s.refreshSem <- struct{}{}: + go func() { + defer func() { <-s.refreshSem }() + defer mux.Unlock() + bgCtx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + _, _ = s.updateMetaWithLock(bgCtx, owner, repo) + }() + default: + // 达到并发限制,跳过本次异步刷新,直接返回旧缓存 + mux.Unlock() + } } } if cache.IsPage { diff --git a/pkg/providers/cache.go b/pkg/providers/cache.go index 2c1ce27..b3b1f98 100644 --- a/pkg/providers/cache.go +++ b/pkg/providers/cache.go @@ -24,6 +24,7 @@ type ProviderCache struct { cacheBlobLimit uint64 cacheSem chan struct{} backendSem chan struct{} + notFoundTTL time.Duration } func (c *ProviderCache) Close() error { @@ -36,6 +37,7 @@ func NewProviderCache( cacheBlobLimit uint64, cacheConcurrent uint64, backendConcurrent uint64, + notFoundTTL time.Duration, ) *ProviderCache { if cacheConcurrent == 0 { cacheConcurrent = 16 // 默认限制 16 个并发缓存操作 @@ -43,16 +45,27 @@ func NewProviderCache( if backendConcurrent == 0 { backendConcurrent = 64 // 默认限制 64 个并发后端请求 } + if notFoundTTL == 0 { + notFoundTTL = time.Hour // 默认 404 缓存 1 小时 + } return &ProviderCache{ parent: backend, cacheBlob: cacheBlob, cacheBlobLimit: cacheBlobLimit, cacheSem: make(chan struct{}, cacheConcurrent), backendSem: make(chan struct{}, backendConcurrent), + notFoundTTL: notFoundTTL, } } func (c *ProviderCache) Meta(ctx context.Context, owner, repo string) (*core.Metadata, error) { + // 获取后端并发锁 + select { + case c.backendSem <- struct{}{}: + defer func() { <-c.backendSem }() + case <-ctx.Done(): + return nil, ctx.Err() + } return c.parent.Meta(ctx, owner, repo) } @@ -118,7 +131,7 @@ func (c *ProviderCache) Open(ctx context.Context, owner, repo, id, path string, if errors.Is(err, os.ErrNotExist) { if err = c.cacheBlob.Put(ctx, key, map[string]string{ "404": "true", - }, bytes.NewBuffer(nil), time.Hour); err != nil { + }, bytes.NewBuffer(nil), c.notFoundTTL); err != nil { zap.L().Warn("缓存404失败", zap.Error(err)) } } @@ -136,7 +149,7 @@ func (c *ProviderCache) Open(ctx context.Context, owner, repo, id, path string, // 缓存404路由 if err = c.cacheBlob.Put(ctx, key, map[string]string{ "404": "true", - }, bytes.NewBuffer(nil), time.Hour); err != nil { + }, bytes.NewBuffer(nil), c.notFoundTTL); err != nil { zap.L().Warn("缓存404失败", zap.Error(err)) } _ = open.Body.Close() diff --git a/pkg/server.go b/pkg/server.go index 89287ab..0b2a78c 100644 --- a/pkg/server.go +++ b/pkg/server.go @@ -41,15 +41,16 @@ type Server struct { } type serverConfig struct { - client *http.Client - event subscribe.Subscriber - cacheMeta kv.KV - cacheMetaTTL time.Duration - cacheMetaRefresh time.Duration - cacheBlob cache.Cache - cacheBlobTTL time.Duration - errorHandler func(w http.ResponseWriter, r *http.Request, err error) - filterConfig map[string]map[string]any + client *http.Client + event subscribe.Subscriber + cacheMeta kv.KV + cacheMetaTTL time.Duration + cacheMetaRefresh time.Duration + cacheMetaRefreshConcurrent int + cacheBlob cache.Cache + cacheBlobTTL time.Duration + errorHandler func(w http.ResponseWriter, r *http.Request, err error) + filterConfig map[string]map[string]any } type ServerOption func(*serverConfig) @@ -66,11 +67,12 @@ func WithEvent(event subscribe.Subscriber) ServerOption { } } -func WithMetaCache(cache kv.KV, ttl time.Duration, refresh time.Duration) ServerOption { +func WithMetaCache(cache kv.KV, ttl time.Duration, refresh time.Duration, refreshConcurrent int) ServerOption { return func(c *serverConfig) { c.cacheMeta = cache c.cacheMetaTTL = ttl c.cacheMetaRefresh = refresh + c.cacheMetaRefreshConcurrent = refreshConcurrent } } @@ -123,6 +125,10 @@ func NewPageServer( cfg.cacheMetaRefresh = cfg.cacheMetaTTL / 2 } + if cfg.cacheMetaRefreshConcurrent == 0 { + cfg.cacheMetaRefreshConcurrent = 16 + } + if cfg.cacheBlob == nil { var err error cfg.cacheBlob, err = cache.NewMemoryCache(cache.MemoryCacheConfig{ @@ -141,7 +147,7 @@ func NewPageServer( } alias := core.NewDomainAlias(db.Child("config", "alias")) - svcMeta := core.NewServerMeta(cfg.client, backend, domain, alias, cfg.cacheMeta, cfg.cacheMetaTTL, cfg.cacheMetaRefresh) + svcMeta := core.NewServerMeta(cfg.client, backend, domain, alias, cfg.cacheMeta, cfg.cacheMetaTTL, cfg.cacheMetaRefresh, cfg.cacheMetaRefreshConcurrent) pageMeta := core.NewPageDomain(svcMeta, domain) globCache, err := lru.New[string, glob.Glob](512) if err != nil { diff --git a/tests/core/test.go b/tests/core/test.go index 7c37bff..f2283ad 100644 --- a/tests/core/test.go +++ b/tests/core/test.go @@ -53,7 +53,7 @@ func NewTestServer(domain string) *TestServer { memoryKV, pkg.WithClient(http.DefaultClient), pkg.WithEvent(subscribe.NewMemorySubscriber()), - pkg.WithMetaCache(memoryKV.Child("cache"), 0, 0), + pkg.WithMetaCache(memoryKV.Child("cache"), 0, 0, 0), pkg.WithBlobCache(memoryCache, 0), pkg.WithErrorHandler(func(w http.ResponseWriter, r *http.Request, err error) { if errors.Is(err, os.ErrNotExist) {