修复部分问题

This commit is contained in:
dragon
2025-11-20 17:33:18 +08:00
parent dedcd58f8e
commit 35df220bea
6 changed files with 133 additions and 105 deletions

View File

@@ -1,4 +1,4 @@
async function run() { (async ()=>{
let ws = websocket(); let ws = websocket();
let shouldExit = false; let shouldExit = false;
while (!shouldExit) { while (!shouldExit) {
@@ -17,5 +17,4 @@ async function run() {
break; break;
} }
} }
} })()
run().then(r => {});

View File

@@ -1,4 +1,4 @@
const name = (await request.getQuery("name"))?.trim(); const name = (request.getQuery("name"))?.trim();
if (!name) { if (!name) {
throw new Error(`Missing or empty name parameter`); throw new Error(`Missing or empty name parameter`);
@@ -6,23 +6,26 @@ if (!name) {
const ws = websocket(); const ws = websocket();
try { async function eventPull() {
// 事件处理 while (true) {
event.subscribe("messages").on((msg) => { const data = await event.pull('messages')
ws.writeText(msg); ws.writeText(data);
}); }
}
// 主循环 async function messagePull() {
for await (const data of ws.readText()) { while (true) {
const data = await ws.readText()
if (data === "exit") break; if (data === "exit") break;
if (data?.trim()) { if (data?.trim()) {
await event.put("messages", JSON.stringify({ await event.put("messages", JSON.stringify({
name, name:name,
data: data.trim() data: data.trim()
})); }));
} }
} }
} finally { }
ws.close();
} (async () => {
await Promise.all(eventPull(), messagePull())
})()

View File

@@ -51,7 +51,9 @@ func NextCallWrapper(call FilterCall, parentCall NextCall, stack Filter) NextCal
return func(ctx FilterContext, writer http.ResponseWriter, request *http.Request) error { return func(ctx FilterContext, writer http.ResponseWriter, request *http.Request) error {
zap.L().Debug(fmt.Sprintf("call filter(%s) before", stack.Type), zap.Any("filter", stack)) zap.L().Debug(fmt.Sprintf("call filter(%s) before", stack.Type), zap.Any("filter", stack))
err := call(ctx, writer, request, parentCall) err := call(ctx, writer, request, parentCall)
zap.L().Debug(fmt.Sprintf("call filter(%s) after", stack.Type), zap.Any("filter", stack), zap.Error(err)) zap.L().Debug(fmt.Sprintf("call filter(%s) after", stack.Type),
zap.Any("filter", stack),
zap.Error(err))
return err return err
} }
} }

View File

@@ -1,8 +1,9 @@
package goja package goja
import ( import (
"context" "encoding/json"
"errors" "errors"
"fmt"
"io" "io"
"net/http" "net/http"
"path/filepath" "path/filepath"
@@ -14,6 +15,7 @@ import (
"github.com/dop251/goja_nodejs/eventloop" "github.com/dop251/goja_nodejs/eventloop"
"github.com/dop251/goja_nodejs/require" "github.com/dop251/goja_nodejs/require"
"github.com/dop251/goja_nodejs/url" "github.com/dop251/goja_nodejs/url"
"go.uber.org/zap"
"gopkg.d7z.net/gitea-pages/pkg/core" "gopkg.d7z.net/gitea-pages/pkg/core"
) )
@@ -47,68 +49,93 @@ func FilterInstGoJa(gl core.Params) (core.FilterInstance, error) {
if err != nil { if err != nil {
return err return err
} }
prg, err := goja.Compile("main.js", js, false)
debug := NewDebug(global.EnableDebug && param.Debug && request.URL.Query().
Get("debug") == "true", request, w)
program, err := goja.Compile("main.js", js, false)
if err != nil { if err != nil {
return err return debug.Flush(err)
} }
debug := NewDebug(global.EnableDebug && param.Debug && request.URL.Query().Get("debug") == "true", request, w) registry := newRegistry(ctx, debug)
registry := newRegistry(ctx) jsLoop := eventloop.NewEventLoop(eventloop.WithRegistry(registry),
registry.RegisterNativeModule(console.ModuleName, console.RequireWithPrinter(debug)) eventloop.EnableConsole(true))
loop := eventloop.NewEventLoop(eventloop.WithRegistry(registry), eventloop.EnableConsole(true))
stop := make(chan struct{}, 1) jsLoop.Start()
shutdown := make(chan struct{}, 1) defer jsLoop.Stop()
defer close(shutdown)
timeout, timeoutCancelFunc := context.WithTimeout(ctx, global.Timeout)
defer timeoutCancelFunc()
count := 0
closers := NewClosers() closers := NewClosers()
defer closers.Close() defer closers.Close()
go func() {
defer func() { stop := make(chan error, 1)
shutdown <- struct{}{} defer close(stop)
}()
select { jsLoop.RunOnLoop(func(vm *goja.Runtime) {
case <-timeout.Done(): err := func() error {
case <-stop: url.Enable(vm)
} if err = RequestInject(ctx, vm, request); err != nil {
count = loop.Stop() return err
}() }
loop.Run(func(vm *goja.Runtime) { if err = ResponseInject(vm, debug, request); err != nil {
url.Enable(vm) return err
if err = RequestInject(ctx, vm, request); err != nil { }
panic(err) if err = KVInject(ctx, vm); err != nil {
} return err
if err = ResponseInject(vm, debug, request); err != nil { }
panic(err) if err = EventInject(ctx, vm, jsLoop); err != nil {
} return err
if err = KVInject(ctx, vm); err != nil { }
panic(err) if global.EnableWebsocket {
} var closer io.Closer
if err = EventInject(ctx, vm); err != nil { closer, err = WebsocketInject(ctx, vm, debug, request, jsLoop)
panic(err) if err != nil {
} return err
if global.EnableWebsocket { }
var closer io.Closer closers.AddCloser(closer.Close)
closer, err = WebsocketInject(ctx, vm, debug, request, loop, timeoutCancelFunc) }
if err != nil { return nil
panic(err) }()
if err != nil {
stop <- errors.Join(err, errors.New("js init failed"))
return
}
result, err := vm.RunProgram(program)
if err != nil {
stop <- err
return
}
export := result.Export()
if export != nil {
if promise, ok := export.(*goja.Promise); ok {
go func() {
for {
switch promise.State() {
case goja.PromiseStateFulfilled, goja.PromiseStateRejected:
result := promise.Result().Export()
switch data := result.(type) {
case error:
stop <- data
default:
marshal, _ := json.Marshal(result)
zap.L().Debug(fmt.Sprintf("js promise result %s", string(marshal)),
zap.Any("result", promise.Result().ExportType()))
stop <- nil
}
return
default:
time.Sleep(time.Millisecond * 5)
}
}
}()
} }
closers.AddCloser(closer.Close)
} }
_, err = vm.RunProgram(prg)
}) })
stop <- struct{}{} resultErr := <-stop
close(stop) return debug.Flush(resultErr)
<-shutdown
if count != 0 {
err = errors.Join(context.DeadlineExceeded, err)
}
return debug.Flush(err)
}, nil }, nil
}, nil }, nil
} }
func newRegistry(ctx core.FilterContext) *require.Registry { func newRegistry(ctx core.FilterContext, printer console.Printer) *require.Registry {
registry := require.NewRegistry( registry := require.NewRegistry(
require.WithLoader(func(path string) ([]byte, error) { require.WithLoader(func(path string) ([]byte, error) {
return ctx.PageVFS.Read(ctx, path) return ctx.PageVFS.Read(ctx, path)
@@ -116,6 +143,8 @@ func newRegistry(ctx core.FilterContext) *require.Registry {
require.WithPathResolver(func(base, path string) string { require.WithPathResolver(func(base, path string) string {
return filepath.Join(base, filepath.FromSlash(path)) return filepath.Join(base, filepath.FromSlash(path))
})) }))
registry.RegisterNativeModule(console.ModuleName, console.RequireWithPrinter(printer))
return registry return registry
} }

View File

@@ -2,39 +2,33 @@ package goja
import ( import (
"github.com/dop251/goja" "github.com/dop251/goja"
"github.com/dop251/goja_nodejs/eventloop"
"gopkg.d7z.net/gitea-pages/pkg/core" "gopkg.d7z.net/gitea-pages/pkg/core"
) )
func EventInject(ctx core.FilterContext, jsCtx *goja.Runtime) error { func EventInject(ctx core.FilterContext, jsCtx *goja.Runtime, loop *eventloop.EventLoop) error {
return jsCtx.GlobalObject().Set("event", map[string]interface{}{ return jsCtx.GlobalObject().Set("event", map[string]interface{}{
"subscribe": func(key string) (map[string]any, error) { "load": func(key string) *goja.Promise {
subscribe, err := ctx.Event.Subscribe(ctx, key) promise, resolve, reject := jsCtx.NewPromise()
if err != nil { go func() {
return nil, err subscribe, err := ctx.Event.Subscribe(ctx, key)
} if err != nil {
return map[string]any{ loop.RunOnLoop(func(runtime *goja.Runtime) {
"on": func(f func(string)) { _ = reject(runtime.ToValue(err))
go func() { })
z: }
for { select {
select { case data := <-subscribe:
case <-ctx.Done(): loop.RunOnLoop(func(runtime *goja.Runtime) {
break z _ = resolve(runtime.ToValue(data))
case data := <-subscribe: })
f(data) case <-ctx.Done():
} loop.RunOnLoop(func(runtime *goja.Runtime) {
} _ = reject(runtime.ToValue(ctx.Err()))
}() })
}, }
"get": func() (string, error) { }()
select { return promise
case <-ctx.Done():
return "", ctx.Err()
case data := <-subscribe:
return data, nil
}
},
}, nil
}, },
"put": func(key, value string) error { "put": func(key, value string) error {
return ctx.Event.Publish(ctx, key, value) return ctx.Event.Publish(ctx, key, value)

View File

@@ -1,7 +1,6 @@
package goja package goja
import ( import (
"context"
"io" "io"
"net/http" "net/http"
"time" "time"
@@ -14,7 +13,7 @@ import (
"gopkg.d7z.net/gitea-pages/pkg/core" "gopkg.d7z.net/gitea-pages/pkg/core"
) )
func WebsocketInject(ctx core.FilterContext, jsCtx *goja.Runtime, w http.ResponseWriter, request *http.Request, loop *eventloop.EventLoop, cancelFunc context.CancelFunc) (io.Closer, error) { func WebsocketInject(ctx core.FilterContext, jsCtx *goja.Runtime, w http.ResponseWriter, request *http.Request, loop *eventloop.EventLoop) (io.Closer, error) {
closers := NewClosers() closers := NewClosers()
return closers, jsCtx.GlobalObject().Set("websocket", func() (any, error) { return closers, jsCtx.GlobalObject().Set("websocket", func() (any, error) {
upgrader := websocket.Upgrader{} upgrader := websocket.Upgrader{}
@@ -22,7 +21,6 @@ func WebsocketInject(ctx core.FilterContext, jsCtx *goja.Runtime, w http.Respons
if err != nil { if err != nil {
return nil, err return nil, err
} }
cancelFunc()
go func() { go func() {
ticker := time.NewTicker(15 * time.Second) ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop() defer ticker.Stop()
@@ -63,24 +61,27 @@ func WebsocketInject(ctx core.FilterContext, jsCtx *goja.Runtime, w http.Respons
}, },
"TypeTextMessage": websocket.TextMessage, "TypeTextMessage": websocket.TextMessage,
"TypeBinaryMessage": websocket.BinaryMessage, "TypeBinaryMessage": websocket.BinaryMessage,
"readText": func() goja.Value { "readText": func() *goja.Promise {
promise, resolve, reject := jsCtx.NewPromise() promise, resolve, reject := jsCtx.NewPromise()
go func() { go func() {
select { select {
case <-ctx.Done(): case <-ctx.Done():
loop.RunOnLoop(func(runtime *goja.Runtime) {
_ = reject(runtime.ToValue(ctx.Err()))
})
return return
default: default:
} }
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
zap.L().Debug("websocket panic", zap.Any("panic", r)) zap.L().Debug("websocket panic", zap.Any("panic", r))
loop.Run(func(runtime *goja.Runtime) { loop.RunOnLoop(func(runtime *goja.Runtime) {
_ = reject(runtime.ToValue(r)) _ = reject(runtime.ToValue(r))
}) })
} }
}() }()
_, p, err := conn.ReadMessage() _, p, err := conn.ReadMessage()
loop.Run(func(runtime *goja.Runtime) { loop.RunOnLoop(func(runtime *goja.Runtime) {
if err != nil { if err != nil {
_ = reject(runtime.ToValue(err)) _ = reject(runtime.ToValue(err))
} else { } else {
@@ -88,7 +89,7 @@ func WebsocketInject(ctx core.FilterContext, jsCtx *goja.Runtime, w http.Respons
} }
}) })
}() }()
return promise.Result() return promise
}, },
"read": func() (any, error) { "read": func() (any, error) {
messageType, p, err := conn.ReadMessage() messageType, p, err := conn.ReadMessage()