chore: refine config error messages and adjust internal config structures
This commit is contained in:
@@ -75,7 +75,7 @@ func main() {
|
||||
provider, domain, memory,
|
||||
pkg.WithClient(http.DefaultClient),
|
||||
pkg.WithEvent(subscriber),
|
||||
pkg.WithMetaCache(memory, 0, 0),
|
||||
pkg.WithMetaCache(memory, 0, 0, 0),
|
||||
pkg.WithBlobCache(&nopCache{}, 0),
|
||||
pkg.WithErrorHandler(func(w http.ResponseWriter, r *http.Request, err error) {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
|
||||
@@ -84,14 +84,16 @@ type ConfigEvent struct {
|
||||
}
|
||||
|
||||
type ConfigCache struct {
|
||||
Meta string `yaml:"meta"` // 元数据缓存
|
||||
MetaTTL time.Duration `yaml:"meta_ttl"` // 缓存时间
|
||||
MetaRefresh time.Duration `yaml:"meta_refresh"` // 刷新时间
|
||||
Meta string `yaml:"meta"` // 元数据缓存
|
||||
MetaTTL time.Duration `yaml:"meta_ttl"` // 缓存时间
|
||||
MetaRefresh time.Duration `yaml:"meta_refresh"` // 刷新时间
|
||||
MetaRefreshConcurrent int `yaml:"meta_refresh_concurrent"` // 并发刷新限制
|
||||
|
||||
Blob string `yaml:"blob"` // 缓存归档位置
|
||||
BlobTTL time.Duration `yaml:"blob_ttl"` // 缓存归档位置
|
||||
BlobLimit units.Base2Bytes `yaml:"blob_limit"` // 单个文件最大大小
|
||||
BlobConcurrent uint64 `yaml:"blob_concurrent"` // 并发缓存限制
|
||||
BlobNotFoundTTL time.Duration `yaml:"blob_not_found_ttl"` // 404 缓存时间
|
||||
BackendConcurrent uint64 `yaml:"backend_concurrent"` // 并发后端请求限制
|
||||
}
|
||||
|
||||
@@ -108,11 +110,8 @@ func LoadConfig(path string) (*Config, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if c.Domain == "" {
|
||||
return nil, errors.New("domain is required")
|
||||
}
|
||||
if c.Database.URL == "" {
|
||||
return nil, errors.New("c is required")
|
||||
return nil, errors.New("database.url is required")
|
||||
}
|
||||
if c.Event.URL == "" {
|
||||
c.Event.URL = "memory://"
|
||||
|
||||
@@ -56,6 +56,7 @@ func main() {
|
||||
uint64(config.Cache.BlobLimit),
|
||||
config.Cache.BlobConcurrent,
|
||||
config.Cache.BackendConcurrent,
|
||||
config.Cache.BlobNotFoundTTL,
|
||||
)
|
||||
defer backend.Close()
|
||||
db, err := kv.NewKVFromURL(config.Database.URL)
|
||||
@@ -77,7 +78,7 @@ func main() {
|
||||
db,
|
||||
pkg.WithClient(http.DefaultClient),
|
||||
pkg.WithEvent(event),
|
||||
pkg.WithMetaCache(cacheMeta, config.Cache.MetaTTL, config.Cache.MetaRefresh),
|
||||
pkg.WithMetaCache(cacheMeta, config.Cache.MetaTTL, config.Cache.MetaRefresh, config.Cache.MetaRefreshConcurrent),
|
||||
pkg.WithBlobCache(cacheBlob.Child("filter"), config.Cache.BlobTTL),
|
||||
pkg.WithErrorHandler(config.ErrorHandler),
|
||||
pkg.WithFilterConfig(config.Filters),
|
||||
|
||||
@@ -26,10 +26,11 @@ type ServerMeta struct {
|
||||
Domain string
|
||||
Alias *DomainAlias
|
||||
|
||||
client *http.Client
|
||||
cache *tools.KVCache[PageMetaContent]
|
||||
locker *utils.Locker
|
||||
refresh time.Duration
|
||||
client *http.Client
|
||||
cache *tools.KVCache[PageMetaContent]
|
||||
locker *utils.Locker
|
||||
refresh time.Duration
|
||||
refreshSem chan struct{}
|
||||
}
|
||||
|
||||
// PageConfig 配置
|
||||
@@ -82,15 +83,20 @@ func NewServerMeta(
|
||||
cache kv.KV,
|
||||
ttl time.Duration,
|
||||
refresh time.Duration,
|
||||
refreshConcurrent int,
|
||||
) *ServerMeta {
|
||||
if refreshConcurrent <= 0 {
|
||||
refreshConcurrent = 16
|
||||
}
|
||||
return &ServerMeta{
|
||||
Backend: backend,
|
||||
Domain: domain,
|
||||
Alias: alias,
|
||||
client: client,
|
||||
cache: tools.NewCache[PageMetaContent](cache, "meta", ttl),
|
||||
locker: utils.NewLocker(),
|
||||
refresh: refresh,
|
||||
Backend: backend,
|
||||
Domain: domain,
|
||||
Alias: alias,
|
||||
client: client,
|
||||
cache: tools.NewCache[PageMetaContent](cache, "meta", ttl),
|
||||
locker: utils.NewLocker(),
|
||||
refresh: refresh,
|
||||
refreshSem: make(chan struct{}, refreshConcurrent),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,12 +107,19 @@ func (s *ServerMeta) GetMeta(ctx context.Context, owner, repo string) (*PageMeta
|
||||
// 异步刷新
|
||||
mux := s.locker.Open(key)
|
||||
if mux.TryLock() {
|
||||
go func() {
|
||||
defer mux.Unlock()
|
||||
bgCtx, cancel := context.WithTimeout(context.Background(), time.Minute)
|
||||
defer cancel()
|
||||
_, _ = s.updateMetaWithLock(bgCtx, owner, repo)
|
||||
}()
|
||||
select {
|
||||
case s.refreshSem <- struct{}{}:
|
||||
go func() {
|
||||
defer func() { <-s.refreshSem }()
|
||||
defer mux.Unlock()
|
||||
bgCtx, cancel := context.WithTimeout(context.Background(), time.Minute)
|
||||
defer cancel()
|
||||
_, _ = s.updateMetaWithLock(bgCtx, owner, repo)
|
||||
}()
|
||||
default:
|
||||
// 达到并发限制,跳过本次异步刷新,直接返回旧缓存
|
||||
mux.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
if cache.IsPage {
|
||||
|
||||
@@ -24,6 +24,7 @@ type ProviderCache struct {
|
||||
cacheBlobLimit uint64
|
||||
cacheSem chan struct{}
|
||||
backendSem chan struct{}
|
||||
notFoundTTL time.Duration
|
||||
}
|
||||
|
||||
func (c *ProviderCache) Close() error {
|
||||
@@ -36,6 +37,7 @@ func NewProviderCache(
|
||||
cacheBlobLimit uint64,
|
||||
cacheConcurrent uint64,
|
||||
backendConcurrent uint64,
|
||||
notFoundTTL time.Duration,
|
||||
) *ProviderCache {
|
||||
if cacheConcurrent == 0 {
|
||||
cacheConcurrent = 16 // 默认限制 16 个并发缓存操作
|
||||
@@ -43,16 +45,27 @@ func NewProviderCache(
|
||||
if backendConcurrent == 0 {
|
||||
backendConcurrent = 64 // 默认限制 64 个并发后端请求
|
||||
}
|
||||
if notFoundTTL == 0 {
|
||||
notFoundTTL = time.Hour // 默认 404 缓存 1 小时
|
||||
}
|
||||
return &ProviderCache{
|
||||
parent: backend,
|
||||
cacheBlob: cacheBlob,
|
||||
cacheBlobLimit: cacheBlobLimit,
|
||||
cacheSem: make(chan struct{}, cacheConcurrent),
|
||||
backendSem: make(chan struct{}, backendConcurrent),
|
||||
notFoundTTL: notFoundTTL,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ProviderCache) Meta(ctx context.Context, owner, repo string) (*core.Metadata, error) {
|
||||
// 获取后端并发锁
|
||||
select {
|
||||
case c.backendSem <- struct{}{}:
|
||||
defer func() { <-c.backendSem }()
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return c.parent.Meta(ctx, owner, repo)
|
||||
}
|
||||
|
||||
@@ -118,7 +131,7 @@ func (c *ProviderCache) Open(ctx context.Context, owner, repo, id, path string,
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
if err = c.cacheBlob.Put(ctx, key, map[string]string{
|
||||
"404": "true",
|
||||
}, bytes.NewBuffer(nil), time.Hour); err != nil {
|
||||
}, bytes.NewBuffer(nil), c.notFoundTTL); err != nil {
|
||||
zap.L().Warn("缓存404失败", zap.Error(err))
|
||||
}
|
||||
}
|
||||
@@ -136,7 +149,7 @@ func (c *ProviderCache) Open(ctx context.Context, owner, repo, id, path string,
|
||||
// 缓存404路由
|
||||
if err = c.cacheBlob.Put(ctx, key, map[string]string{
|
||||
"404": "true",
|
||||
}, bytes.NewBuffer(nil), time.Hour); err != nil {
|
||||
}, bytes.NewBuffer(nil), c.notFoundTTL); err != nil {
|
||||
zap.L().Warn("缓存404失败", zap.Error(err))
|
||||
}
|
||||
_ = open.Body.Close()
|
||||
|
||||
@@ -41,15 +41,16 @@ type Server struct {
|
||||
}
|
||||
|
||||
type serverConfig struct {
|
||||
client *http.Client
|
||||
event subscribe.Subscriber
|
||||
cacheMeta kv.KV
|
||||
cacheMetaTTL time.Duration
|
||||
cacheMetaRefresh time.Duration
|
||||
cacheBlob cache.Cache
|
||||
cacheBlobTTL time.Duration
|
||||
errorHandler func(w http.ResponseWriter, r *http.Request, err error)
|
||||
filterConfig map[string]map[string]any
|
||||
client *http.Client
|
||||
event subscribe.Subscriber
|
||||
cacheMeta kv.KV
|
||||
cacheMetaTTL time.Duration
|
||||
cacheMetaRefresh time.Duration
|
||||
cacheMetaRefreshConcurrent int
|
||||
cacheBlob cache.Cache
|
||||
cacheBlobTTL time.Duration
|
||||
errorHandler func(w http.ResponseWriter, r *http.Request, err error)
|
||||
filterConfig map[string]map[string]any
|
||||
}
|
||||
|
||||
type ServerOption func(*serverConfig)
|
||||
@@ -66,11 +67,12 @@ func WithEvent(event subscribe.Subscriber) ServerOption {
|
||||
}
|
||||
}
|
||||
|
||||
func WithMetaCache(cache kv.KV, ttl time.Duration, refresh time.Duration) ServerOption {
|
||||
func WithMetaCache(cache kv.KV, ttl time.Duration, refresh time.Duration, refreshConcurrent int) ServerOption {
|
||||
return func(c *serverConfig) {
|
||||
c.cacheMeta = cache
|
||||
c.cacheMetaTTL = ttl
|
||||
c.cacheMetaRefresh = refresh
|
||||
c.cacheMetaRefreshConcurrent = refreshConcurrent
|
||||
}
|
||||
}
|
||||
|
||||
@@ -123,6 +125,10 @@ func NewPageServer(
|
||||
cfg.cacheMetaRefresh = cfg.cacheMetaTTL / 2
|
||||
}
|
||||
|
||||
if cfg.cacheMetaRefreshConcurrent == 0 {
|
||||
cfg.cacheMetaRefreshConcurrent = 16
|
||||
}
|
||||
|
||||
if cfg.cacheBlob == nil {
|
||||
var err error
|
||||
cfg.cacheBlob, err = cache.NewMemoryCache(cache.MemoryCacheConfig{
|
||||
@@ -141,7 +147,7 @@ func NewPageServer(
|
||||
}
|
||||
|
||||
alias := core.NewDomainAlias(db.Child("config", "alias"))
|
||||
svcMeta := core.NewServerMeta(cfg.client, backend, domain, alias, cfg.cacheMeta, cfg.cacheMetaTTL, cfg.cacheMetaRefresh)
|
||||
svcMeta := core.NewServerMeta(cfg.client, backend, domain, alias, cfg.cacheMeta, cfg.cacheMetaTTL, cfg.cacheMetaRefresh, cfg.cacheMetaRefreshConcurrent)
|
||||
pageMeta := core.NewPageDomain(svcMeta, domain)
|
||||
globCache, err := lru.New[string, glob.Glob](512)
|
||||
if err != nil {
|
||||
|
||||
@@ -53,7 +53,7 @@ func NewTestServer(domain string) *TestServer {
|
||||
memoryKV,
|
||||
pkg.WithClient(http.DefaultClient),
|
||||
pkg.WithEvent(subscribe.NewMemorySubscriber()),
|
||||
pkg.WithMetaCache(memoryKV.Child("cache"), 0, 0),
|
||||
pkg.WithMetaCache(memoryKV.Child("cache"), 0, 0, 0),
|
||||
pkg.WithBlobCache(memoryCache, 0),
|
||||
pkg.WithErrorHandler(func(w http.ResponseWriter, r *http.Request, err error) {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
|
||||
Reference in New Issue
Block a user