diff --git a/cmd/local/main.go b/cmd/local/main.go index 26f9a9d..502dd84 100644 --- a/cmd/local/main.go +++ b/cmd/local/main.go @@ -78,7 +78,7 @@ func main() { type nopCache struct{} -func (n *nopCache) Child(_ string) cache.Cache { +func (n *nopCache) Child(_ ...string) cache.Cache { return n } diff --git a/examples/js_ws/index.js b/examples/js_ws/index.js index 794bdb5..9fdc196 100644 --- a/examples/js_ws/index.js +++ b/examples/js_ws/index.js @@ -1,19 +1,17 @@ (async ()=>{ let ws = websocket(); - let shouldExit = false; - while (!shouldExit) { + while (true) { let data = await ws.readText(); switch (data) { case "exit": - shouldExit = true; - break; + return case "panic": throw Error("错误"); case "date": - ws.writeText(new Date().toJSON()) + await ws.writeText(new Date().toJSON()) break default: - ws.writeText("收到信息:" + data) + await ws.writeText("收到信息:" + data) break; } } diff --git a/examples/js_ws_event/event.js b/examples/js_ws_event/event.js index ebee90a..e858baa 100644 --- a/examples/js_ws_event/event.js +++ b/examples/js_ws_event/event.js @@ -8,8 +8,8 @@ const ws = websocket(); async function eventPull() { while (true) { - const data = await event.pull('messages') - ws.writeText(data); + const data = await event.load('messages') + await ws.writeText(data); } } @@ -27,5 +27,5 @@ async function messagePull() { } (async () => { - await Promise.all(eventPull(), messagePull()) + await Promise.all([eventPull(), messagePull()]) })() \ No newline at end of file diff --git a/go.mod b/go.mod index 5925af1..df3d597 100644 --- a/go.mod +++ b/go.mod @@ -14,8 +14,8 @@ require ( github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.11.1 - go.uber.org/zap v1.27.0 - gopkg.d7z.net/middleware v0.0.0-20251119134829-0c55a98e6495 + go.uber.org/zap v1.27.1 + gopkg.d7z.net/middleware v0.0.0-20251120123709-5d4e16e0d6fb gopkg.in/yaml.v3 v3.0.1 ) @@ -53,7 +53,7 @@ require ( go.etcd.io/etcd/client/pkg/v3 v3.6.6 // indirect go.etcd.io/etcd/client/v3 v3.6.6 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/crypto v0.44.0 // indirect + golang.org/x/crypto v0.45.0 // indirect golang.org/x/net v0.47.0 // indirect golang.org/x/sys v0.38.0 // indirect golang.org/x/text v0.31.0 // indirect diff --git a/go.sum b/go.sum index de89c2a..d2ca74e 100644 --- a/go.sum +++ b/go.sum @@ -142,12 +142,16 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc= +go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= golang.org/x/crypto v0.44.0 h1:A97SsFvM3AIwEEmTBiaxPPTYpDC47w720rdiiUvgoAU= golang.org/x/crypto v0.44.0/go.mod h1:013i+Nw79BMiQiMsOPcVCB5ZIJbYkerPrGnOa00tvmc= +golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q= +golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -197,6 +201,8 @@ gopkg.d7z.net/middleware v0.0.0-20251114145539-bb74bd940f32 h1:3JvqnWFLWzAoS57vL gopkg.d7z.net/middleware v0.0.0-20251114145539-bb74bd940f32/go.mod h1:/1/EuissKhUbuhUe01rcWuwpA5mt7jASb4uKVNOLtR8= gopkg.d7z.net/middleware v0.0.0-20251119134829-0c55a98e6495 h1:LvjpmL0nkZZtrUXCFZGyoh8O2X9l2B7ZXFldOzN8ShI= gopkg.d7z.net/middleware v0.0.0-20251119134829-0c55a98e6495/go.mod h1:/1/EuissKhUbuhUe01rcWuwpA5mt7jASb4uKVNOLtR8= +gopkg.d7z.net/middleware v0.0.0-20251120123709-5d4e16e0d6fb h1:2+IskB2qGQshl67tHdnzEXCm46+9E/QevYL3xpMul0E= +gopkg.d7z.net/middleware v0.0.0-20251120123709-5d4e16e0d6fb/go.mod h1:/1/EuissKhUbuhUe01rcWuwpA5mt7jASb4uKVNOLtR8= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/pkg/core/alias.go b/pkg/core/alias.go index 54fa47a..64055a1 100644 --- a/pkg/core/alias.go +++ b/pkg/core/alias.go @@ -2,6 +2,7 @@ package core import ( "context" + "encoding/base64" "encoding/json" "fmt" @@ -36,7 +37,7 @@ func (a *DomainAlias) Query(ctx context.Context, domain string) (*Alias, error) func (a *DomainAlias) Bind(ctx context.Context, domains []string, owner, repo, branch string) error { oldDomains := make([]string, 0) - rKey := fmt.Sprintf("%s/%s/%s", owner, repo, branch) + rKey := base64.URLEncoding.EncodeToString([]byte(fmt.Sprintf("%s/%s/%s", owner, repo, branch))) if oldStr, err := a.config.Get(ctx, rKey); err == nil { _ = json.Unmarshal([]byte(oldStr), &oldDomains) } diff --git a/pkg/filters/goja/goja.go b/pkg/filters/goja/goja.go index a76b745..77b2818 100644 --- a/pkg/filters/goja/goja.go +++ b/pkg/filters/goja/goja.go @@ -1,9 +1,7 @@ package goja import ( - "encoding/json" "errors" - "fmt" "io" "net/http" "path/filepath" @@ -15,7 +13,6 @@ import ( "github.com/dop251/goja_nodejs/eventloop" "github.com/dop251/goja_nodejs/require" "github.com/dop251/goja_nodejs/url" - "go.uber.org/zap" "gopkg.d7z.net/gitea-pages/pkg/core" ) @@ -109,16 +106,15 @@ func FilterInstGoJa(gl core.Params) (core.FilterInstance, error) { go func() { for { switch promise.State() { - case goja.PromiseStateFulfilled, goja.PromiseStateRejected: - result := promise.Result().Export() - switch data := result.(type) { + case goja.PromiseStateFulfilled: + stop <- nil + return + case goja.PromiseStateRejected: + switch data := promise.Result().Export().(type) { case error: stop <- data default: - marshal, _ := json.Marshal(result) - zap.L().Debug(fmt.Sprintf("js promise result %s", string(marshal)), - zap.Any("result", promise.Result().ExportType())) - stop <- nil + stop <- errors.New(promise.Result().String()) } return default: @@ -126,7 +122,11 @@ func FilterInstGoJa(gl core.Params) (core.FilterInstance, error) { } } }() + } else { + stop <- nil } + } else { + stop <- nil } }) resultErr := <-stop diff --git a/pkg/filters/goja/var_event.go b/pkg/filters/goja/var_event.go index 9b42340..80db412 100644 --- a/pkg/filters/goja/var_event.go +++ b/pkg/filters/goja/var_event.go @@ -30,8 +30,19 @@ func EventInject(ctx core.FilterContext, jsCtx *goja.Runtime, loop *eventloop.Ev }() return promise }, - "put": func(key, value string) error { - return ctx.Event.Publish(ctx, key, value) + "put": func(key, value string) *goja.Promise { + promise, resolve, reject := jsCtx.NewPromise() + go func() { + err := ctx.Event.Publish(ctx, key, value) + loop.RunOnLoop(func(runtime *goja.Runtime) { + if err != nil { + _ = reject(runtime.ToValue(err)) + } else { + _ = resolve(goja.Undefined()) + } + }) + }() + return promise }, }) } diff --git a/pkg/filters/goja/var_websocket.go b/pkg/filters/goja/var_websocket.go index 5e9f17d..63f7929 100644 --- a/pkg/filters/goja/var_websocket.go +++ b/pkg/filters/goja/var_websocket.go @@ -21,6 +21,8 @@ func WebsocketInject(ctx core.FilterContext, jsCtx *goja.Runtime, w http.Respons if err != nil { return nil, err } + zap.L().Debug("websocket upgrader created") + closers.AddCloser(conn.Close) go func() { ticker := time.NewTicker(15 * time.Second) defer ticker.Stop() @@ -37,28 +39,7 @@ func WebsocketInject(ctx core.FilterContext, jsCtx *goja.Runtime, w http.Respons } } }() - zap.L().Debug("websocket upgrader created") - closers.AddCloser(conn.Close) return map[string]interface{}{ - "on": func(f func(mType int, message string)) { - go func() { - z: - for { - select { - case <-ctx.Done(): - break z - default: - messageType, p, err := conn.ReadMessage() - if err != nil { - break z - } - f(messageType, string(p)) - } - - } - - }() - }, "TypeTextMessage": websocket.TextMessage, "TypeBinaryMessage": websocket.BinaryMessage, "readText": func() *goja.Promise { @@ -91,36 +72,115 @@ func WebsocketInject(ctx core.FilterContext, jsCtx *goja.Runtime, w http.Respons }() return promise }, - "read": func() (any, error) { - messageType, p, err := conn.ReadMessage() - if err != nil { - return nil, err - } - return map[string]interface{}{ - "type": messageType, - "data": p, - }, nil + "read": func() *goja.Promise { + promise, resolve, reject := jsCtx.NewPromise() + go func() { + select { + case <-ctx.Done(): + loop.RunOnLoop(func(runtime *goja.Runtime) { + _ = reject(runtime.ToValue(ctx.Err())) + }) + return + default: + } + defer func() { + if r := recover(); r != nil { + zap.L().Debug("websocket panic", zap.Any("panic", r)) + loop.RunOnLoop(func(runtime *goja.Runtime) { + _ = reject(runtime.ToValue(r)) + }) + } + }() + messageType, p, err := conn.ReadMessage() + loop.RunOnLoop(func(runtime *goja.Runtime) { + if err != nil { + _ = reject(runtime.ToValue(err)) + } else { + _ = resolve(runtime.ToValue(map[string]interface{}{ + "type": messageType, + "data": p, + })) + } + }) + }() + return promise }, - "writeText": func(data string) error { - return conn.WriteMessage(websocket.TextMessage, []byte(data)) + "writeText": func(data string) *goja.Promise { + promise, resolve, reject := jsCtx.NewPromise() + go func() { + select { + case <-ctx.Done(): + loop.RunOnLoop(func(runtime *goja.Runtime) { + _ = reject(runtime.ToValue(ctx.Err())) + }) + return + default: + } + defer func() { + if r := recover(); r != nil { + zap.L().Debug("websocket panic", zap.Any("panic", r)) + loop.RunOnLoop(func(runtime *goja.Runtime) { + _ = reject(runtime.ToValue(r)) + }) + } + }() + err := conn.WriteMessage(websocket.TextMessage, []byte(data)) + loop.RunOnLoop(func(runtime *goja.Runtime) { + if err != nil { + _ = reject(runtime.ToValue(err)) + } else { + _ = resolve(runtime.ToValue(nil)) + } + }) + }() + return promise }, - "write": func(mType int, data any) error { - if item, ok := data.(goja.Value); ok { - data = item.Export() - } - var dataRaw []byte - switch it := data.(type) { - case []byte: - dataRaw = it - case string: - dataRaw = []byte(it) - default: - return errors.Errorf("invalid type for websocket text: %T", data) - } - return conn.WriteMessage(mType, dataRaw) - }, - "ping": func() error { - return conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(1*time.Second)) + "write": func(mType int, data any) *goja.Promise { + promise, resolve, reject := jsCtx.NewPromise() + go func() { + select { + case <-ctx.Done(): + loop.RunOnLoop(func(runtime *goja.Runtime) { + _ = reject(runtime.ToValue(ctx.Err())) + }) + return + default: + } + defer func() { + if r := recover(); r != nil { + zap.L().Debug("websocket panic", zap.Any("panic", r)) + loop.RunOnLoop(func(runtime *goja.Runtime) { + _ = reject(runtime.ToValue(r)) + }) + } + }() + + if item, ok := data.(goja.Value); ok { + data = item.Export() + } + var dataRaw []byte + switch it := data.(type) { + case []byte: + dataRaw = it + case string: + dataRaw = []byte(it) + default: + loop.RunOnLoop(func(runtime *goja.Runtime) { + _ = reject(runtime.ToValue(errors.Errorf("invalid type for websocket text: %T", data))) + }) + return + } + + err := conn.WriteMessage(mType, dataRaw) + loop.RunOnLoop(func(runtime *goja.Runtime) { + if err != nil { + _ = reject(runtime.ToValue(err)) + } else { + _ = resolve(goja.Undefined()) + } + }) + }() + return promise }, }, nil }) diff --git a/pkg/server.go b/pkg/server.go index 8a55302..28d1610 100644 --- a/pkg/server.go +++ b/pkg/server.go @@ -31,9 +31,10 @@ type Server struct { db kv.CursorPagedKV filterMgr map[string]core.FilterInstance - globCache *lru.Cache[string, glob.Glob] + globCache *lru.Cache[string, glob.Glob] + cacheBlob cache.Cache - cacheBlobTtl time.Duration + cacheBlobTTL time.Duration event subscribe.Subscriber errorHandler func(w http.ResponseWriter, r *http.Request, err error) @@ -49,7 +50,7 @@ func NewPageServer( cacheMeta kv.KV, cacheMetaTTL time.Duration, cacheBlob cache.Cache, - cacheBlobTtl time.Duration, + cacheBlobTTL time.Duration, errorHandler func(w http.ResponseWriter, r *http.Request, err error), filterConfig map[string]map[string]any, ) (*Server, error) { @@ -71,7 +72,7 @@ func NewPageServer( filterMgr: defaultFilters, errorHandler: errorHandler, cacheBlob: cacheBlob, - cacheBlobTtl: cacheBlobTtl, + cacheBlobTTL: cacheBlobTTL, event: event, }, nil } @@ -94,7 +95,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, request *http.Request) { }() err := s.Serve(writer, request) if err != nil { - zap.L().Debug("bad request.", zap.Error(err), zap.Any("request", request.RequestURI), zap.Any("id", sessionID)) + zap.L().Debug("bad request", zap.Error(err), zap.Any("request", request.RequestURI), zap.Any("id", sessionID)) if !writer.IsWritten() { s.errorHandler(writer, request, err) } @@ -114,10 +115,10 @@ func (s *Server) Serve(writer *utils.WrittenResponseWriter, request *http.Reques PageContent: meta, Context: cancel, PageVFS: core.NewPageVFS(s.backend, meta.Owner, meta.Repo, meta.CommitID), - Cache: tools.NewTTLCache(s.cacheBlob.Child("filter").Child(meta.Owner).Child(meta.Repo).Child(meta.CommitID), time.Minute), - OrgDB: s.db.Child("org").Child(meta.Owner).(kv.CursorPagedKV), - RepoDB: s.db.Child("repo").Child(meta.Owner).Child(meta.Repo).(kv.CursorPagedKV), - Event: s.event.Child("domain").Child(meta.Owner).Child(meta.Repo), + Cache: tools.NewTTLCache(s.cacheBlob.Child("filter", meta.Owner, meta.Repo, meta.CommitID), time.Minute), + OrgDB: s.db.Child("org", meta.Owner).(kv.CursorPagedKV), + RepoDB: s.db.Child("repo", meta.Owner, meta.Repo).(kv.CursorPagedKV), + Event: s.event.Child("domain", meta.Owner, meta.Repo), Kill: cancelFunc, }