From dedcd58f8eccdaded022bf994580f4ce1e7cdb32 Mon Sep 17 00:00:00 2001 From: ExplodingDragon Date: Thu, 20 Nov 2025 12:42:05 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=20async?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/js_ws/index.js | 37 ++++++++++++++------------ examples/js_ws_event/.pages.yaml | 3 --- examples/js_ws_event/event.js | 43 +++++++++++++++++-------------- examples/js_ws_event/sender.js | 8 ------ pkg/filters/goja/goja.go | 2 +- pkg/filters/goja/var_websocket.go | 35 ++++++++++++++++++++----- 6 files changed, 73 insertions(+), 55 deletions(-) delete mode 100644 examples/js_ws_event/sender.js diff --git a/examples/js_ws/index.js b/examples/js_ws/index.js index afea883..8a3b1ae 100644 --- a/examples/js_ws/index.js +++ b/examples/js_ws/index.js @@ -1,18 +1,21 @@ -let ws = websocket(); -let shouldExit = false; -while (!shouldExit) { - let data = ws.readText(); - switch (data) { - case "exit": - shouldExit = true; - break; - case "panic": - throw Error("错误"); - case "date": - ws.writeText(new Date().toJSON()) - break - default: - ws.writeText("收到信息:" + data) - break; +async function run() { + let ws = websocket(); + let shouldExit = false; + while (!shouldExit) { + let data = await ws.readText(); + switch (data) { + case "exit": + shouldExit = true; + break; + case "panic": + throw Error("错误"); + case "date": + ws.writeText(new Date().toJSON()) + break + default: + ws.writeText("收到信息:" + data) + break; + } } -} \ No newline at end of file +} +run().then(r => {}); diff --git a/examples/js_ws_event/.pages.yaml b/examples/js_ws_event/.pages.yaml index 894dc6c..b49e46f 100644 --- a/examples/js_ws_event/.pages.yaml +++ b/examples/js_ws_event/.pages.yaml @@ -1,7 +1,4 @@ routes: - - path: "sender" - js: - exec: "sender.js" - path: "event" js: exec: "event.js" \ No newline at end of file diff --git a/examples/js_ws_event/event.js b/examples/js_ws_event/event.js index 2669bf3..320b42b 100644 --- a/examples/js_ws_event/event.js +++ b/examples/js_ws_event/event.js @@ -1,23 +1,28 @@ -let name=request.getQuery("name") -if (name===""){ - throw Error(`Missing name "${name}"`) +const name = (await request.getQuery("name"))?.trim(); + +if (!name) { + throw new Error(`Missing or empty name parameter`); } -let ws = websocket(); -event.subscribe("messages").on(function (msg){ - ws.writeText(msg) -}) -let shouldExit = false; -while (!shouldExit) { - let data = ws.readText(); - switch (data) { - case "exit": - shouldExit = true; - break; - default: - event.put("messages",JSON.stringify({ - name:name, - data:data + +const ws = websocket(); + +try { + // 事件处理 + event.subscribe("messages").on((msg) => { + ws.writeText(msg); + }); + + // 主循环 + for await (const data of ws.readText()) { + if (data === "exit") break; + + if (data?.trim()) { + await event.put("messages", JSON.stringify({ + name, + data: data.trim() })); - break; + } } +} finally { + ws.close(); } \ No newline at end of file diff --git a/examples/js_ws_event/sender.js b/examples/js_ws_event/sender.js deleted file mode 100644 index 53c2f07..0000000 --- a/examples/js_ws_event/sender.js +++ /dev/null @@ -1,8 +0,0 @@ -let name=request.getQuery("name") -let message=request.getQuery("data") -event.put("messages", JSON.stringify({ - name:name, - data:message -})); - -// response.write(event.subscribe("messages").get()) \ No newline at end of file diff --git a/pkg/filters/goja/goja.go b/pkg/filters/goja/goja.go index 9492938..036ece4 100644 --- a/pkg/filters/goja/goja.go +++ b/pkg/filters/goja/goja.go @@ -89,7 +89,7 @@ func FilterInstGoJa(gl core.Params) (core.FilterInstance, error) { } if global.EnableWebsocket { var closer io.Closer - closer, err = WebsocketInject(ctx, vm, debug, request, timeoutCancelFunc) + closer, err = WebsocketInject(ctx, vm, debug, request, loop, timeoutCancelFunc) if err != nil { panic(err) } diff --git a/pkg/filters/goja/var_websocket.go b/pkg/filters/goja/var_websocket.go index 05e633e..1a59e26 100644 --- a/pkg/filters/goja/var_websocket.go +++ b/pkg/filters/goja/var_websocket.go @@ -7,13 +7,14 @@ import ( "time" "github.com/dop251/goja" + "github.com/dop251/goja_nodejs/eventloop" "github.com/gorilla/websocket" "github.com/pkg/errors" "go.uber.org/zap" "gopkg.d7z.net/gitea-pages/pkg/core" ) -func WebsocketInject(ctx core.FilterContext, jsCtx *goja.Runtime, w http.ResponseWriter, request *http.Request, cancelFunc context.CancelFunc) (io.Closer, error) { +func WebsocketInject(ctx core.FilterContext, jsCtx *goja.Runtime, w http.ResponseWriter, request *http.Request, loop *eventloop.EventLoop, cancelFunc context.CancelFunc) (io.Closer, error) { closers := NewClosers() return closers, jsCtx.GlobalObject().Set("websocket", func() (any, error) { upgrader := websocket.Upgrader{} @@ -62,12 +63,32 @@ func WebsocketInject(ctx core.FilterContext, jsCtx *goja.Runtime, w http.Respons }, "TypeTextMessage": websocket.TextMessage, "TypeBinaryMessage": websocket.BinaryMessage, - "readText": func() (string, error) { - _, p, err := conn.ReadMessage() - if err != nil { - return "", err - } - return string(p), nil + "readText": func() goja.Value { + promise, resolve, reject := jsCtx.NewPromise() + go func() { + select { + case <-ctx.Done(): + return + default: + } + defer func() { + if r := recover(); r != nil { + zap.L().Debug("websocket panic", zap.Any("panic", r)) + loop.Run(func(runtime *goja.Runtime) { + _ = reject(runtime.ToValue(r)) + }) + } + }() + _, p, err := conn.ReadMessage() + loop.Run(func(runtime *goja.Runtime) { + if err != nil { + _ = reject(runtime.ToValue(err)) + } else { + _ = resolve(runtime.ToValue(string(p))) + } + }) + }() + return promise.Result() }, "read": func() (any, error) { messageType, p, err := conn.ReadMessage()