From 35df220beab8ef29e7e6f0c3790eb60a30516130 Mon Sep 17 00:00:00 2001 From: dragon Date: Thu, 20 Nov 2025 17:33:18 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E9=83=A8=E5=88=86=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/js_ws/index.js | 5 +- examples/js_ws_event/event.js | 29 ++++--- pkg/core/filter.go | 4 +- pkg/filters/goja/goja.go | 133 ++++++++++++++++++------------ pkg/filters/goja/var_event.go | 52 ++++++------ pkg/filters/goja/var_websocket.go | 15 ++-- 6 files changed, 133 insertions(+), 105 deletions(-) diff --git a/examples/js_ws/index.js b/examples/js_ws/index.js index 8a3b1ae..794bdb5 100644 --- a/examples/js_ws/index.js +++ b/examples/js_ws/index.js @@ -1,4 +1,4 @@ -async function run() { +(async ()=>{ let ws = websocket(); let shouldExit = false; while (!shouldExit) { @@ -17,5 +17,4 @@ async function run() { break; } } -} -run().then(r => {}); +})() \ No newline at end of file diff --git a/examples/js_ws_event/event.js b/examples/js_ws_event/event.js index 320b42b..ebee90a 100644 --- a/examples/js_ws_event/event.js +++ b/examples/js_ws_event/event.js @@ -1,4 +1,4 @@ -const name = (await request.getQuery("name"))?.trim(); +const name = (request.getQuery("name"))?.trim(); if (!name) { throw new Error(`Missing or empty name parameter`); @@ -6,23 +6,26 @@ if (!name) { const ws = websocket(); -try { - // 事件处理 - event.subscribe("messages").on((msg) => { - ws.writeText(msg); - }); +async function eventPull() { + while (true) { + const data = await event.pull('messages') + ws.writeText(data); + } +} - // 主循环 - for await (const data of ws.readText()) { +async function messagePull() { + while (true) { + const data = await ws.readText() if (data === "exit") break; - if (data?.trim()) { await event.put("messages", JSON.stringify({ - name, + name:name, data: data.trim() })); } } -} finally { - ws.close(); -} \ No newline at end of file +} + +(async () => { + await Promise.all(eventPull(), messagePull()) +})() \ No newline at end of file diff --git a/pkg/core/filter.go b/pkg/core/filter.go index b5a957d..54d70ce 100644 --- a/pkg/core/filter.go +++ b/pkg/core/filter.go @@ -51,7 +51,9 @@ func NextCallWrapper(call FilterCall, parentCall NextCall, stack Filter) NextCal return func(ctx FilterContext, writer http.ResponseWriter, request *http.Request) error { zap.L().Debug(fmt.Sprintf("call filter(%s) before", stack.Type), zap.Any("filter", stack)) err := call(ctx, writer, request, parentCall) - zap.L().Debug(fmt.Sprintf("call filter(%s) after", stack.Type), zap.Any("filter", stack), zap.Error(err)) + zap.L().Debug(fmt.Sprintf("call filter(%s) after", stack.Type), + zap.Any("filter", stack), + zap.Error(err)) return err } } diff --git a/pkg/filters/goja/goja.go b/pkg/filters/goja/goja.go index 036ece4..a76b745 100644 --- a/pkg/filters/goja/goja.go +++ b/pkg/filters/goja/goja.go @@ -1,8 +1,9 @@ package goja import ( - "context" + "encoding/json" "errors" + "fmt" "io" "net/http" "path/filepath" @@ -14,6 +15,7 @@ import ( "github.com/dop251/goja_nodejs/eventloop" "github.com/dop251/goja_nodejs/require" "github.com/dop251/goja_nodejs/url" + "go.uber.org/zap" "gopkg.d7z.net/gitea-pages/pkg/core" ) @@ -47,68 +49,93 @@ func FilterInstGoJa(gl core.Params) (core.FilterInstance, error) { if err != nil { return err } - prg, err := goja.Compile("main.js", js, false) + + debug := NewDebug(global.EnableDebug && param.Debug && request.URL.Query(). + Get("debug") == "true", request, w) + program, err := goja.Compile("main.js", js, false) if err != nil { - return err + return debug.Flush(err) } - debug := NewDebug(global.EnableDebug && param.Debug && request.URL.Query().Get("debug") == "true", request, w) - registry := newRegistry(ctx) - registry.RegisterNativeModule(console.ModuleName, console.RequireWithPrinter(debug)) - loop := eventloop.NewEventLoop(eventloop.WithRegistry(registry), eventloop.EnableConsole(true)) - stop := make(chan struct{}, 1) - shutdown := make(chan struct{}, 1) - defer close(shutdown) - timeout, timeoutCancelFunc := context.WithTimeout(ctx, global.Timeout) - defer timeoutCancelFunc() - count := 0 + registry := newRegistry(ctx, debug) + jsLoop := eventloop.NewEventLoop(eventloop.WithRegistry(registry), + eventloop.EnableConsole(true)) + + jsLoop.Start() + defer jsLoop.Stop() + closers := NewClosers() defer closers.Close() - go func() { - defer func() { - shutdown <- struct{}{} - }() - select { - case <-timeout.Done(): - case <-stop: - } - count = loop.Stop() - }() - loop.Run(func(vm *goja.Runtime) { - url.Enable(vm) - if err = RequestInject(ctx, vm, request); err != nil { - panic(err) - } - if err = ResponseInject(vm, debug, request); err != nil { - panic(err) - } - if err = KVInject(ctx, vm); err != nil { - panic(err) - } - if err = EventInject(ctx, vm); err != nil { - panic(err) - } - if global.EnableWebsocket { - var closer io.Closer - closer, err = WebsocketInject(ctx, vm, debug, request, loop, timeoutCancelFunc) - if err != nil { - panic(err) + + stop := make(chan error, 1) + defer close(stop) + + jsLoop.RunOnLoop(func(vm *goja.Runtime) { + err := func() error { + url.Enable(vm) + if err = RequestInject(ctx, vm, request); err != nil { + return err + } + if err = ResponseInject(vm, debug, request); err != nil { + return err + } + if err = KVInject(ctx, vm); err != nil { + return err + } + if err = EventInject(ctx, vm, jsLoop); err != nil { + return err + } + if global.EnableWebsocket { + var closer io.Closer + closer, err = WebsocketInject(ctx, vm, debug, request, jsLoop) + if err != nil { + return err + } + closers.AddCloser(closer.Close) + } + return nil + }() + if err != nil { + stop <- errors.Join(err, errors.New("js init failed")) + return + } + result, err := vm.RunProgram(program) + if err != nil { + stop <- err + return + } + export := result.Export() + if export != nil { + if promise, ok := export.(*goja.Promise); ok { + go func() { + for { + switch promise.State() { + case goja.PromiseStateFulfilled, goja.PromiseStateRejected: + result := promise.Result().Export() + switch data := result.(type) { + case error: + stop <- data + default: + marshal, _ := json.Marshal(result) + zap.L().Debug(fmt.Sprintf("js promise result %s", string(marshal)), + zap.Any("result", promise.Result().ExportType())) + stop <- nil + } + return + default: + time.Sleep(time.Millisecond * 5) + } + } + }() } - closers.AddCloser(closer.Close) } - _, err = vm.RunProgram(prg) }) - stop <- struct{}{} - close(stop) - <-shutdown - if count != 0 { - err = errors.Join(context.DeadlineExceeded, err) - } - return debug.Flush(err) + resultErr := <-stop + return debug.Flush(resultErr) }, nil }, nil } -func newRegistry(ctx core.FilterContext) *require.Registry { +func newRegistry(ctx core.FilterContext, printer console.Printer) *require.Registry { registry := require.NewRegistry( require.WithLoader(func(path string) ([]byte, error) { return ctx.PageVFS.Read(ctx, path) @@ -116,6 +143,8 @@ func newRegistry(ctx core.FilterContext) *require.Registry { require.WithPathResolver(func(base, path string) string { return filepath.Join(base, filepath.FromSlash(path)) })) + registry.RegisterNativeModule(console.ModuleName, console.RequireWithPrinter(printer)) + return registry } diff --git a/pkg/filters/goja/var_event.go b/pkg/filters/goja/var_event.go index 0d062d8..9b42340 100644 --- a/pkg/filters/goja/var_event.go +++ b/pkg/filters/goja/var_event.go @@ -2,39 +2,33 @@ package goja import ( "github.com/dop251/goja" + "github.com/dop251/goja_nodejs/eventloop" "gopkg.d7z.net/gitea-pages/pkg/core" ) -func EventInject(ctx core.FilterContext, jsCtx *goja.Runtime) error { +func EventInject(ctx core.FilterContext, jsCtx *goja.Runtime, loop *eventloop.EventLoop) error { return jsCtx.GlobalObject().Set("event", map[string]interface{}{ - "subscribe": func(key string) (map[string]any, error) { - subscribe, err := ctx.Event.Subscribe(ctx, key) - if err != nil { - return nil, err - } - return map[string]any{ - "on": func(f func(string)) { - go func() { - z: - for { - select { - case <-ctx.Done(): - break z - case data := <-subscribe: - f(data) - } - } - }() - }, - "get": func() (string, error) { - select { - case <-ctx.Done(): - return "", ctx.Err() - case data := <-subscribe: - return data, nil - } - }, - }, nil + "load": func(key string) *goja.Promise { + promise, resolve, reject := jsCtx.NewPromise() + go func() { + subscribe, err := ctx.Event.Subscribe(ctx, key) + if err != nil { + loop.RunOnLoop(func(runtime *goja.Runtime) { + _ = reject(runtime.ToValue(err)) + }) + } + select { + case data := <-subscribe: + loop.RunOnLoop(func(runtime *goja.Runtime) { + _ = resolve(runtime.ToValue(data)) + }) + case <-ctx.Done(): + loop.RunOnLoop(func(runtime *goja.Runtime) { + _ = reject(runtime.ToValue(ctx.Err())) + }) + } + }() + return promise }, "put": func(key, value string) error { return ctx.Event.Publish(ctx, key, value) diff --git a/pkg/filters/goja/var_websocket.go b/pkg/filters/goja/var_websocket.go index 1a59e26..5e9f17d 100644 --- a/pkg/filters/goja/var_websocket.go +++ b/pkg/filters/goja/var_websocket.go @@ -1,7 +1,6 @@ package goja import ( - "context" "io" "net/http" "time" @@ -14,7 +13,7 @@ import ( "gopkg.d7z.net/gitea-pages/pkg/core" ) -func WebsocketInject(ctx core.FilterContext, jsCtx *goja.Runtime, w http.ResponseWriter, request *http.Request, loop *eventloop.EventLoop, cancelFunc context.CancelFunc) (io.Closer, error) { +func WebsocketInject(ctx core.FilterContext, jsCtx *goja.Runtime, w http.ResponseWriter, request *http.Request, loop *eventloop.EventLoop) (io.Closer, error) { closers := NewClosers() return closers, jsCtx.GlobalObject().Set("websocket", func() (any, error) { upgrader := websocket.Upgrader{} @@ -22,7 +21,6 @@ func WebsocketInject(ctx core.FilterContext, jsCtx *goja.Runtime, w http.Respons if err != nil { return nil, err } - cancelFunc() go func() { ticker := time.NewTicker(15 * time.Second) defer ticker.Stop() @@ -63,24 +61,27 @@ func WebsocketInject(ctx core.FilterContext, jsCtx *goja.Runtime, w http.Respons }, "TypeTextMessage": websocket.TextMessage, "TypeBinaryMessage": websocket.BinaryMessage, - "readText": func() goja.Value { + "readText": func() *goja.Promise { promise, resolve, reject := jsCtx.NewPromise() go func() { select { case <-ctx.Done(): + loop.RunOnLoop(func(runtime *goja.Runtime) { + _ = reject(runtime.ToValue(ctx.Err())) + }) return default: } defer func() { if r := recover(); r != nil { zap.L().Debug("websocket panic", zap.Any("panic", r)) - loop.Run(func(runtime *goja.Runtime) { + loop.RunOnLoop(func(runtime *goja.Runtime) { _ = reject(runtime.ToValue(r)) }) } }() _, p, err := conn.ReadMessage() - loop.Run(func(runtime *goja.Runtime) { + loop.RunOnLoop(func(runtime *goja.Runtime) { if err != nil { _ = reject(runtime.ToValue(err)) } else { @@ -88,7 +89,7 @@ func WebsocketInject(ctx core.FilterContext, jsCtx *goja.Runtime, w http.Respons } }) }() - return promise.Result() + return promise }, "read": func() (any, error) { messageType, p, err := conn.ReadMessage()