重构 websocket

This commit is contained in:
ExplodingDragon
2025-11-20 23:56:14 +08:00
parent 35df220bea
commit 827f9bd273
10 changed files with 161 additions and 84 deletions

View File

@@ -78,7 +78,7 @@ func main() {
type nopCache struct{} type nopCache struct{}
func (n *nopCache) Child(_ string) cache.Cache { func (n *nopCache) Child(_ ...string) cache.Cache {
return n return n
} }

View File

@@ -1,19 +1,17 @@
(async ()=>{ (async ()=>{
let ws = websocket(); let ws = websocket();
let shouldExit = false; while (true) {
while (!shouldExit) {
let data = await ws.readText(); let data = await ws.readText();
switch (data) { switch (data) {
case "exit": case "exit":
shouldExit = true; return
break;
case "panic": case "panic":
throw Error("错误"); throw Error("错误");
case "date": case "date":
ws.writeText(new Date().toJSON()) await ws.writeText(new Date().toJSON())
break break
default: default:
ws.writeText("收到信息:" + data) await ws.writeText("收到信息:" + data)
break; break;
} }
} }

View File

@@ -8,8 +8,8 @@ const ws = websocket();
async function eventPull() { async function eventPull() {
while (true) { while (true) {
const data = await event.pull('messages') const data = await event.load('messages')
ws.writeText(data); await ws.writeText(data);
} }
} }
@@ -27,5 +27,5 @@ async function messagePull() {
} }
(async () => { (async () => {
await Promise.all(eventPull(), messagePull()) await Promise.all([eventPull(), messagePull()])
})() })()

6
go.mod
View File

@@ -14,8 +14,8 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.11.1 github.com/stretchr/testify v1.11.1
go.uber.org/zap v1.27.0 go.uber.org/zap v1.27.1
gopkg.d7z.net/middleware v0.0.0-20251119134829-0c55a98e6495 gopkg.d7z.net/middleware v0.0.0-20251120123709-5d4e16e0d6fb
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
) )
@@ -53,7 +53,7 @@ require (
go.etcd.io/etcd/client/pkg/v3 v3.6.6 // indirect go.etcd.io/etcd/client/pkg/v3 v3.6.6 // indirect
go.etcd.io/etcd/client/v3 v3.6.6 // indirect go.etcd.io/etcd/client/v3 v3.6.6 // indirect
go.uber.org/multierr v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.44.0 // indirect golang.org/x/crypto v0.45.0 // indirect
golang.org/x/net v0.47.0 // indirect golang.org/x/net v0.47.0 // indirect
golang.org/x/sys v0.38.0 // indirect golang.org/x/sys v0.38.0 // indirect
golang.org/x/text v0.31.0 // indirect golang.org/x/text v0.31.0 // indirect

6
go.sum
View File

@@ -142,12 +142,16 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc=
go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
golang.org/x/crypto v0.44.0 h1:A97SsFvM3AIwEEmTBiaxPPTYpDC47w720rdiiUvgoAU= golang.org/x/crypto v0.44.0 h1:A97SsFvM3AIwEEmTBiaxPPTYpDC47w720rdiiUvgoAU=
golang.org/x/crypto v0.44.0/go.mod h1:013i+Nw79BMiQiMsOPcVCB5ZIJbYkerPrGnOa00tvmc= golang.org/x/crypto v0.44.0/go.mod h1:013i+Nw79BMiQiMsOPcVCB5ZIJbYkerPrGnOa00tvmc=
golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q=
golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
@@ -197,6 +201,8 @@ gopkg.d7z.net/middleware v0.0.0-20251114145539-bb74bd940f32 h1:3JvqnWFLWzAoS57vL
gopkg.d7z.net/middleware v0.0.0-20251114145539-bb74bd940f32/go.mod h1:/1/EuissKhUbuhUe01rcWuwpA5mt7jASb4uKVNOLtR8= gopkg.d7z.net/middleware v0.0.0-20251114145539-bb74bd940f32/go.mod h1:/1/EuissKhUbuhUe01rcWuwpA5mt7jASb4uKVNOLtR8=
gopkg.d7z.net/middleware v0.0.0-20251119134829-0c55a98e6495 h1:LvjpmL0nkZZtrUXCFZGyoh8O2X9l2B7ZXFldOzN8ShI= gopkg.d7z.net/middleware v0.0.0-20251119134829-0c55a98e6495 h1:LvjpmL0nkZZtrUXCFZGyoh8O2X9l2B7ZXFldOzN8ShI=
gopkg.d7z.net/middleware v0.0.0-20251119134829-0c55a98e6495/go.mod h1:/1/EuissKhUbuhUe01rcWuwpA5mt7jASb4uKVNOLtR8= gopkg.d7z.net/middleware v0.0.0-20251119134829-0c55a98e6495/go.mod h1:/1/EuissKhUbuhUe01rcWuwpA5mt7jASb4uKVNOLtR8=
gopkg.d7z.net/middleware v0.0.0-20251120123709-5d4e16e0d6fb h1:2+IskB2qGQshl67tHdnzEXCm46+9E/QevYL3xpMul0E=
gopkg.d7z.net/middleware v0.0.0-20251120123709-5d4e16e0d6fb/go.mod h1:/1/EuissKhUbuhUe01rcWuwpA5mt7jASb4uKVNOLtR8=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=

View File

@@ -2,6 +2,7 @@ package core
import ( import (
"context" "context"
"encoding/base64"
"encoding/json" "encoding/json"
"fmt" "fmt"
@@ -36,7 +37,7 @@ func (a *DomainAlias) Query(ctx context.Context, domain string) (*Alias, error)
func (a *DomainAlias) Bind(ctx context.Context, domains []string, owner, repo, branch string) error { func (a *DomainAlias) Bind(ctx context.Context, domains []string, owner, repo, branch string) error {
oldDomains := make([]string, 0) oldDomains := make([]string, 0)
rKey := fmt.Sprintf("%s/%s/%s", owner, repo, branch) rKey := base64.URLEncoding.EncodeToString([]byte(fmt.Sprintf("%s/%s/%s", owner, repo, branch)))
if oldStr, err := a.config.Get(ctx, rKey); err == nil { if oldStr, err := a.config.Get(ctx, rKey); err == nil {
_ = json.Unmarshal([]byte(oldStr), &oldDomains) _ = json.Unmarshal([]byte(oldStr), &oldDomains)
} }

View File

@@ -1,9 +1,7 @@
package goja package goja
import ( import (
"encoding/json"
"errors" "errors"
"fmt"
"io" "io"
"net/http" "net/http"
"path/filepath" "path/filepath"
@@ -15,7 +13,6 @@ import (
"github.com/dop251/goja_nodejs/eventloop" "github.com/dop251/goja_nodejs/eventloop"
"github.com/dop251/goja_nodejs/require" "github.com/dop251/goja_nodejs/require"
"github.com/dop251/goja_nodejs/url" "github.com/dop251/goja_nodejs/url"
"go.uber.org/zap"
"gopkg.d7z.net/gitea-pages/pkg/core" "gopkg.d7z.net/gitea-pages/pkg/core"
) )
@@ -109,16 +106,15 @@ func FilterInstGoJa(gl core.Params) (core.FilterInstance, error) {
go func() { go func() {
for { for {
switch promise.State() { switch promise.State() {
case goja.PromiseStateFulfilled, goja.PromiseStateRejected: case goja.PromiseStateFulfilled:
result := promise.Result().Export() stop <- nil
switch data := result.(type) { return
case goja.PromiseStateRejected:
switch data := promise.Result().Export().(type) {
case error: case error:
stop <- data stop <- data
default: default:
marshal, _ := json.Marshal(result) stop <- errors.New(promise.Result().String())
zap.L().Debug(fmt.Sprintf("js promise result %s", string(marshal)),
zap.Any("result", promise.Result().ExportType()))
stop <- nil
} }
return return
default: default:
@@ -126,7 +122,11 @@ func FilterInstGoJa(gl core.Params) (core.FilterInstance, error) {
} }
} }
}() }()
} else {
stop <- nil
} }
} else {
stop <- nil
} }
}) })
resultErr := <-stop resultErr := <-stop

View File

@@ -30,8 +30,19 @@ func EventInject(ctx core.FilterContext, jsCtx *goja.Runtime, loop *eventloop.Ev
}() }()
return promise return promise
}, },
"put": func(key, value string) error { "put": func(key, value string) *goja.Promise {
return ctx.Event.Publish(ctx, key, value) promise, resolve, reject := jsCtx.NewPromise()
go func() {
err := ctx.Event.Publish(ctx, key, value)
loop.RunOnLoop(func(runtime *goja.Runtime) {
if err != nil {
_ = reject(runtime.ToValue(err))
} else {
_ = resolve(goja.Undefined())
}
})
}()
return promise
}, },
}) })
} }

View File

@@ -21,6 +21,8 @@ func WebsocketInject(ctx core.FilterContext, jsCtx *goja.Runtime, w http.Respons
if err != nil { if err != nil {
return nil, err return nil, err
} }
zap.L().Debug("websocket upgrader created")
closers.AddCloser(conn.Close)
go func() { go func() {
ticker := time.NewTicker(15 * time.Second) ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop() defer ticker.Stop()
@@ -37,28 +39,7 @@ func WebsocketInject(ctx core.FilterContext, jsCtx *goja.Runtime, w http.Respons
} }
} }
}() }()
zap.L().Debug("websocket upgrader created")
closers.AddCloser(conn.Close)
return map[string]interface{}{ return map[string]interface{}{
"on": func(f func(mType int, message string)) {
go func() {
z:
for {
select {
case <-ctx.Done():
break z
default:
messageType, p, err := conn.ReadMessage()
if err != nil {
break z
}
f(messageType, string(p))
}
}
}()
},
"TypeTextMessage": websocket.TextMessage, "TypeTextMessage": websocket.TextMessage,
"TypeBinaryMessage": websocket.BinaryMessage, "TypeBinaryMessage": websocket.BinaryMessage,
"readText": func() *goja.Promise { "readText": func() *goja.Promise {
@@ -91,36 +72,115 @@ func WebsocketInject(ctx core.FilterContext, jsCtx *goja.Runtime, w http.Respons
}() }()
return promise return promise
}, },
"read": func() (any, error) { "read": func() *goja.Promise {
messageType, p, err := conn.ReadMessage() promise, resolve, reject := jsCtx.NewPromise()
if err != nil { go func() {
return nil, err select {
} case <-ctx.Done():
return map[string]interface{}{ loop.RunOnLoop(func(runtime *goja.Runtime) {
"type": messageType, _ = reject(runtime.ToValue(ctx.Err()))
"data": p, })
}, nil return
default:
}
defer func() {
if r := recover(); r != nil {
zap.L().Debug("websocket panic", zap.Any("panic", r))
loop.RunOnLoop(func(runtime *goja.Runtime) {
_ = reject(runtime.ToValue(r))
})
}
}()
messageType, p, err := conn.ReadMessage()
loop.RunOnLoop(func(runtime *goja.Runtime) {
if err != nil {
_ = reject(runtime.ToValue(err))
} else {
_ = resolve(runtime.ToValue(map[string]interface{}{
"type": messageType,
"data": p,
}))
}
})
}()
return promise
}, },
"writeText": func(data string) error { "writeText": func(data string) *goja.Promise {
return conn.WriteMessage(websocket.TextMessage, []byte(data)) promise, resolve, reject := jsCtx.NewPromise()
go func() {
select {
case <-ctx.Done():
loop.RunOnLoop(func(runtime *goja.Runtime) {
_ = reject(runtime.ToValue(ctx.Err()))
})
return
default:
}
defer func() {
if r := recover(); r != nil {
zap.L().Debug("websocket panic", zap.Any("panic", r))
loop.RunOnLoop(func(runtime *goja.Runtime) {
_ = reject(runtime.ToValue(r))
})
}
}()
err := conn.WriteMessage(websocket.TextMessage, []byte(data))
loop.RunOnLoop(func(runtime *goja.Runtime) {
if err != nil {
_ = reject(runtime.ToValue(err))
} else {
_ = resolve(runtime.ToValue(nil))
}
})
}()
return promise
}, },
"write": func(mType int, data any) error { "write": func(mType int, data any) *goja.Promise {
if item, ok := data.(goja.Value); ok { promise, resolve, reject := jsCtx.NewPromise()
data = item.Export() go func() {
} select {
var dataRaw []byte case <-ctx.Done():
switch it := data.(type) { loop.RunOnLoop(func(runtime *goja.Runtime) {
case []byte: _ = reject(runtime.ToValue(ctx.Err()))
dataRaw = it })
case string: return
dataRaw = []byte(it) default:
default: }
return errors.Errorf("invalid type for websocket text: %T", data) defer func() {
} if r := recover(); r != nil {
return conn.WriteMessage(mType, dataRaw) zap.L().Debug("websocket panic", zap.Any("panic", r))
}, loop.RunOnLoop(func(runtime *goja.Runtime) {
"ping": func() error { _ = reject(runtime.ToValue(r))
return conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(1*time.Second)) })
}
}()
if item, ok := data.(goja.Value); ok {
data = item.Export()
}
var dataRaw []byte
switch it := data.(type) {
case []byte:
dataRaw = it
case string:
dataRaw = []byte(it)
default:
loop.RunOnLoop(func(runtime *goja.Runtime) {
_ = reject(runtime.ToValue(errors.Errorf("invalid type for websocket text: %T", data)))
})
return
}
err := conn.WriteMessage(mType, dataRaw)
loop.RunOnLoop(func(runtime *goja.Runtime) {
if err != nil {
_ = reject(runtime.ToValue(err))
} else {
_ = resolve(goja.Undefined())
}
})
}()
return promise
}, },
}, nil }, nil
}) })

View File

@@ -31,9 +31,10 @@ type Server struct {
db kv.CursorPagedKV db kv.CursorPagedKV
filterMgr map[string]core.FilterInstance filterMgr map[string]core.FilterInstance
globCache *lru.Cache[string, glob.Glob] globCache *lru.Cache[string, glob.Glob]
cacheBlob cache.Cache cacheBlob cache.Cache
cacheBlobTtl time.Duration cacheBlobTTL time.Duration
event subscribe.Subscriber event subscribe.Subscriber
errorHandler func(w http.ResponseWriter, r *http.Request, err error) errorHandler func(w http.ResponseWriter, r *http.Request, err error)
@@ -49,7 +50,7 @@ func NewPageServer(
cacheMeta kv.KV, cacheMeta kv.KV,
cacheMetaTTL time.Duration, cacheMetaTTL time.Duration,
cacheBlob cache.Cache, cacheBlob cache.Cache,
cacheBlobTtl time.Duration, cacheBlobTTL time.Duration,
errorHandler func(w http.ResponseWriter, r *http.Request, err error), errorHandler func(w http.ResponseWriter, r *http.Request, err error),
filterConfig map[string]map[string]any, filterConfig map[string]map[string]any,
) (*Server, error) { ) (*Server, error) {
@@ -71,7 +72,7 @@ func NewPageServer(
filterMgr: defaultFilters, filterMgr: defaultFilters,
errorHandler: errorHandler, errorHandler: errorHandler,
cacheBlob: cacheBlob, cacheBlob: cacheBlob,
cacheBlobTtl: cacheBlobTtl, cacheBlobTTL: cacheBlobTTL,
event: event, event: event,
}, nil }, nil
} }
@@ -94,7 +95,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, request *http.Request) {
}() }()
err := s.Serve(writer, request) err := s.Serve(writer, request)
if err != nil { if err != nil {
zap.L().Debug("bad request.", zap.Error(err), zap.Any("request", request.RequestURI), zap.Any("id", sessionID)) zap.L().Debug("bad request", zap.Error(err), zap.Any("request", request.RequestURI), zap.Any("id", sessionID))
if !writer.IsWritten() { if !writer.IsWritten() {
s.errorHandler(writer, request, err) s.errorHandler(writer, request, err)
} }
@@ -114,10 +115,10 @@ func (s *Server) Serve(writer *utils.WrittenResponseWriter, request *http.Reques
PageContent: meta, PageContent: meta,
Context: cancel, Context: cancel,
PageVFS: core.NewPageVFS(s.backend, meta.Owner, meta.Repo, meta.CommitID), PageVFS: core.NewPageVFS(s.backend, meta.Owner, meta.Repo, meta.CommitID),
Cache: tools.NewTTLCache(s.cacheBlob.Child("filter").Child(meta.Owner).Child(meta.Repo).Child(meta.CommitID), time.Minute), Cache: tools.NewTTLCache(s.cacheBlob.Child("filter", meta.Owner, meta.Repo, meta.CommitID), time.Minute),
OrgDB: s.db.Child("org").Child(meta.Owner).(kv.CursorPagedKV), OrgDB: s.db.Child("org", meta.Owner).(kv.CursorPagedKV),
RepoDB: s.db.Child("repo").Child(meta.Owner).Child(meta.Repo).(kv.CursorPagedKV), RepoDB: s.db.Child("repo", meta.Owner, meta.Repo).(kv.CursorPagedKV),
Event: s.event.Child("domain").Child(meta.Owner).Child(meta.Repo), Event: s.event.Child("domain", meta.Owner, meta.Repo),
Kill: cancelFunc, Kill: cancelFunc,
} }