支持 js websocket

This commit is contained in:
dragon
2025-11-19 15:56:32 +08:00
parent 268f21c2af
commit 043b00bbb7
25 changed files with 617 additions and 69 deletions

View File

@@ -27,7 +27,7 @@ func DefaultFilters(config map[string]map[string]any) (map[string]core.FilterIns
if !ok {
item = make(map[string]any)
}
if item["_disable"] == true {
if it, ok := item["Enabled"]; ok && it == false {
zap.L().Debug("skip filter", zap.String("key", key))
continue
}

View File

@@ -3,8 +3,10 @@ package goja
import (
"context"
"errors"
"io"
"net/http"
"path/filepath"
"sync"
"time"
"github.com/dop251/goja"
@@ -15,7 +17,20 @@ import (
"gopkg.d7z.net/gitea-pages/pkg/core"
)
func FilterInstGoJa(_ core.Params) (core.FilterInstance, error) {
func FilterInstGoJa(gl core.Params) (core.FilterInstance, error) {
var global struct {
Timeout time.Duration `json:"timeout"`
EnableDebug bool `json:"debug"`
EnableWebsocket bool `json:"websocket"`
}
global.EnableDebug = true
global.EnableWebsocket = true
if err := gl.Unmarshal(&global); err != nil {
return nil, err
}
if global.Timeout == 0 {
global.Timeout = 60 * time.Second
}
return func(config core.Params) (core.FilterCall, error) {
var param struct {
Exec string `json:"exec"`
@@ -36,19 +51,21 @@ func FilterInstGoJa(_ core.Params) (core.FilterInstance, error) {
if err != nil {
return err
}
debug := NewDebug(param.Debug && request.URL.Query().Get("debug") == "true", request, w)
debug := NewDebug(global.EnableDebug && param.Debug && request.URL.Query().Get("debug") == "true", request, w)
registry := newRegistry(ctx)
registry.RegisterNativeModule(console.ModuleName, console.RequireWithPrinter(debug))
loop := eventloop.NewEventLoop(eventloop.WithRegistry(registry), eventloop.EnableConsole(true))
stop := make(chan struct{}, 1)
shutdown := make(chan struct{}, 1)
timeout, cancelFunc := context.WithTimeout(ctx, 3*time.Second)
defer close(shutdown)
timeout, cancelFunc := context.WithTimeout(ctx, global.Timeout)
defer cancelFunc()
count := 0
closers := NewClosers()
defer closers.Close()
go func() {
defer func() {
shutdown <- struct{}{}
close(shutdown)
}()
select {
case <-timeout.Done():
@@ -67,6 +84,14 @@ func FilterInstGoJa(_ core.Params) (core.FilterInstance, error) {
if err = KVInject(ctx, vm); err != nil {
panic(err)
}
if global.EnableWebsocket {
var closer io.Closer
closer, err = WebsocketInject(vm, debug, request, cancelFunc)
if err != nil {
panic(err)
}
closers.AddCloser(closer.Close)
}
_, err = vm.RunProgram(prg)
})
stop <- struct{}{}
@@ -90,3 +115,36 @@ func newRegistry(ctx core.FilterContext) *require.Registry {
}))
return registry
}
type Closers struct {
mu sync.Mutex
closers []func() error
}
func NewClosers() *Closers {
return &Closers{
closers: make([]func() error, 0),
}
}
func (c *Closers) AddCloser(closer func() error) {
c.mu.Lock()
c.closers = append(c.closers, closer)
c.mu.Unlock()
}
func (c *Closers) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
var errs []error
for i := len(c.closers) - 1; i >= 0; i-- {
if err := c.closers[i](); err != nil {
errs = append(errs, err)
}
}
c.closers = nil
if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
}

View File

@@ -1,11 +1,15 @@
package goja
import (
"bufio"
"bytes"
_ "embed"
"html/template"
"net"
"net/http"
"time"
"github.com/pkg/errors"
)
//go:embed debug.tmpl
@@ -89,6 +93,13 @@ func (d *DebugData) Write(i []byte) (int, error) {
return d.parent.Write(i)
}
func (d *DebugData) Hijack() (net.Conn, *bufio.ReadWriter, error) {
if hijacker, ok := d.parent.(http.Hijacker); ok {
return hijacker.Hijack()
}
return nil, nil, errors.New("not hijackable")
}
func (d *DebugData) WriteHeader(statusCode int) {
if d.enabled {
d.status = statusCode

View File

@@ -12,61 +12,46 @@ import (
func KVInject(ctx core.FilterContext, jsCtx *goja.Runtime) error {
return jsCtx.GlobalObject().Set("kv", map[string]interface{}{
"repo": func(group string) goja.Value {
"repo": func(group string) (goja.Value, error) {
return kvResult(ctx.RepoDB)(ctx, jsCtx, group)
},
"org": func(group string) goja.Value {
"org": func(group string) (goja.Value, error) {
return kvResult(ctx.OrgDB)(ctx, jsCtx, group)
},
})
}
func kvResult(db kv.CursorPagedKV) func(ctx core.FilterContext, jsCtx *goja.Runtime, group string) goja.Value {
return func(ctx core.FilterContext, jsCtx *goja.Runtime, group string) goja.Value {
func kvResult(db kv.CursorPagedKV) func(ctx core.FilterContext, jsCtx *goja.Runtime, group string) (goja.Value, error) {
return func(ctx core.FilterContext, jsCtx *goja.Runtime, group string) (goja.Value, error) {
group = strings.TrimSpace(group)
if group == "" {
panic("kv: invalid group name")
return goja.Undefined(), errors.New("invalid group")
}
db := db.Child(group).(kv.CursorPagedKV)
return jsCtx.ToValue(map[string]interface{}{
"get": func(key string) goja.Value {
"get": func(key string) (goja.Value, error) {
get, err := db.Get(ctx, key)
if err != nil {
if !errors.Is(err, os.ErrNotExist) {
panic(err)
return nil, err
}
return goja.Null()
return goja.Null(), nil
}
return jsCtx.ToValue(get)
return jsCtx.ToValue(get), nil
},
"set": func(key, value string) {
err := db.Put(ctx, key, value, kv.TTLKeep)
if err != nil {
panic(err)
}
"set": func(key, value string) error {
return db.Put(ctx, key, value, kv.TTLKeep)
},
"delete": func(key string) bool {
b, err := db.Delete(ctx, key)
if err != nil {
panic(err)
}
return b
"delete": func(key string) (bool, error) {
return db.Delete(ctx, key)
},
"putIfNotExists": func(key, value string) bool {
exists, err := db.PutIfNotExists(ctx, key, value, kv.TTLKeep)
if err != nil {
panic(err)
}
return exists
"putIfNotExists": func(key, value string) (bool, error) {
return db.PutIfNotExists(ctx, key, value, kv.TTLKeep)
},
"compareAndSwap": func(key, oldValue, newValue string) bool {
swap, err := db.CompareAndSwap(ctx, key, oldValue, newValue)
if err != nil {
panic(err)
}
return swap
"compareAndSwap": func(key, oldValue, newValue string) (bool, error) {
return db.CompareAndSwap(ctx, key, oldValue, newValue)
},
"list": func(limit int64, cursor string) map[string]any {
"list": func(limit int64, cursor string) (map[string]any, error) {
if limit <= 0 {
limit = 100
}
@@ -75,14 +60,14 @@ func kvResult(db kv.CursorPagedKV) func(ctx core.FilterContext, jsCtx *goja.Runt
Cursor: cursor,
})
if err != nil {
panic(err)
return nil, err
}
return map[string]any{
"keys": list.Keys,
"cursor": list.Cursor,
"hasNext": list.HasMore,
}
}, nil
},
})
}), nil
}
}

View File

@@ -18,7 +18,6 @@ func ResponseInject(jsCtx *goja.Runtime, writer http.ResponseWriter, req *http.R
"getHeader": func(key string) string {
return writer.Header().Get(key)
},
"removeHeader": func(key string) {
writer.Header().Del(key)
},
@@ -38,11 +37,12 @@ func ResponseInject(jsCtx *goja.Runtime, writer http.ResponseWriter, req *http.R
},
// 写入响应
"write": func(data string) {
"write": func(data string) error {
_, err := writer.Write([]byte(data))
if err != nil {
panic(err)
return err
}
return nil
},
"writeHead": func(statusCode int, headers ...map[string]string) {
@@ -55,14 +55,14 @@ func ResponseInject(jsCtx *goja.Runtime, writer http.ResponseWriter, req *http.R
writer.WriteHeader(statusCode)
},
"end": func(data ...string) {
"end": func(data ...string) error {
if len(data) > 0 {
_, err := writer.Write([]byte(data[0]))
if err != nil {
panic(err)
return err
}
}
// 在实际的 HTTP 处理中,我们通常不手动结束响应
return nil
},
// 重定向
@@ -75,7 +75,7 @@ func ResponseInject(jsCtx *goja.Runtime, writer http.ResponseWriter, req *http.R
},
// JSON 响应
"json": func(data goja.Value) {
"json": func(data goja.Value) error {
writer.Header().Set("Content-Type", "application/json")
var jsonStr string
@@ -86,14 +86,12 @@ func ResponseInject(jsCtx *goja.Runtime, writer http.ResponseWriter, req *http.R
default:
marshal, err := json.Marshal(v)
if err != nil {
panic(err)
return err
}
jsonStr = string(marshal)
}
_, err := writer.Write([]byte(jsonStr))
if err != nil {
panic(err)
}
return err
},
// 设置 cookie

View File

@@ -0,0 +1,65 @@
package goja
import (
"context"
"io"
"net/http"
"github.com/dop251/goja"
"github.com/gorilla/websocket"
"github.com/pkg/errors"
"go.uber.org/zap"
)
func WebsocketInject(jsCtx *goja.Runtime, w http.ResponseWriter, request *http.Request, cancelFunc context.CancelFunc) (io.Closer, error) {
closers := NewClosers()
return closers, jsCtx.GlobalObject().Set("websocket", func() (any, error) {
upgrader := websocket.Upgrader{}
conn, err := upgrader.Upgrade(w, request, nil)
if err != nil {
return nil, err
}
cancelFunc()
zap.L().Debug("websocket upgrader created")
closers.AddCloser(conn.Close)
return map[string]interface{}{
"TypeTextMessage": websocket.TextMessage,
"TypeBinaryMessage": websocket.BinaryMessage,
"readText": func() (string, error) {
_, p, err := conn.ReadMessage()
if err != nil {
return "", err
}
return string(p), nil
},
"read": func() (any, error) {
messageType, p, err := conn.ReadMessage()
if err != nil {
return nil, err
}
return map[string]interface{}{
"type": messageType,
"data": p,
}, nil
},
"writeText": func(data string) error {
return conn.WriteMessage(websocket.TextMessage, []byte(data))
},
"write": func(mType int, data any) error {
if item, ok := data.(goja.Value); ok {
data = item.Export()
}
var dataRaw []byte
switch it := data.(type) {
case []byte:
dataRaw = it
case string:
dataRaw = []byte(it)
default:
return errors.Errorf("invalid type for websocket text: %T", data)
}
return conn.WriteMessage(mType, dataRaw)
},
}, nil
})
}

View File

@@ -102,7 +102,7 @@ func (c *ProviderCache) Open(ctx context.Context, owner, repo, commit, path stri
return nil, os.ErrNotExist
} else if lastCache != nil {
h := lastCache.Metadata
if h["Not-Found"] == "true" {
if h["_404_"] == "true" {
return nil, os.ErrNotExist
}
respHeader := make(http.Header)
@@ -130,11 +130,23 @@ func (c *ProviderCache) Open(ctx context.Context, owner, repo, commit, path stri
if open != nil {
_ = open.Body.Close()
}
// 当上游返回错误时缓存404结果
if errors.Is(err, os.ErrNotExist) {
if err = c.cacheBlob.Put(ctx, key, map[string]string{
"_404_": "true",
}, bytes.NewBuffer(nil), time.Hour); err != nil {
zap.L().Warn("缓存404失败", zap.Error(err))
}
}
return nil, err
}
if open.StatusCode == http.StatusNotFound {
// TODO: 缓存 404 路由
//_ = c.cache.Put(ctx, key, nil, time.Hour)
// 缓存404路由
if err = c.cacheBlob.Put(ctx, key, map[string]string{
"_404_": "true",
}, bytes.NewBuffer(nil), time.Hour); err != nil {
zap.L().Warn("缓存404失败", zap.Error(err))
}
_ = open.Body.Close()
return nil, os.ErrNotExist
}

View File

@@ -1,6 +1,12 @@
package utils
import "net/http"
import (
"bufio"
"net"
"net/http"
"github.com/pkg/errors"
)
type WrittenResponseWriter struct {
write bool
@@ -18,6 +24,14 @@ func (w *WrittenResponseWriter) Header() http.Header {
return w.base.Header()
}
func (w *WrittenResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
w.write = true
if hijacker, ok := w.base.(http.Hijacker); ok {
return hijacker.Hijack()
}
return nil, nil, errors.New("not hijackable")
}
func (w *WrittenResponseWriter) Write(b []byte) (int, error) {
w.write = true
return w.base.Write(b)