新增 event , 优化 websocket

This commit is contained in:
ExplodingDragon
2025-11-20 01:19:47 +08:00
parent 043b00bbb7
commit d6440ebb02
16 changed files with 646 additions and 13 deletions

View File

@@ -10,6 +10,7 @@ import (
"go.uber.org/zap"
"gopkg.d7z.net/middleware/kv"
"gopkg.d7z.net/middleware/subscribe"
"gopkg.d7z.net/middleware/tools"
)
@@ -20,6 +21,9 @@ type FilterContext struct {
Cache *tools.TTLCache
OrgDB kv.CursorPagedKV
RepoDB kv.CursorPagedKV
Event subscribe.Subscriber
Kill func()
}
type Params map[string]any

View File

@@ -58,8 +58,8 @@ func FilterInstGoJa(gl core.Params) (core.FilterInstance, error) {
stop := make(chan struct{}, 1)
shutdown := make(chan struct{}, 1)
defer close(shutdown)
timeout, cancelFunc := context.WithTimeout(ctx, global.Timeout)
defer cancelFunc()
timeout, timeoutCancelFunc := context.WithTimeout(ctx, global.Timeout)
defer timeoutCancelFunc()
count := 0
closers := NewClosers()
defer closers.Close()
@@ -84,9 +84,12 @@ func FilterInstGoJa(gl core.Params) (core.FilterInstance, error) {
if err = KVInject(ctx, vm); err != nil {
panic(err)
}
if err = EventInject(ctx, vm); err != nil {
panic(err)
}
if global.EnableWebsocket {
var closer io.Closer
closer, err = WebsocketInject(vm, debug, request, cancelFunc)
closer, err = WebsocketInject(ctx, vm, debug, request, timeoutCancelFunc)
if err != nil {
panic(err)
}

View File

@@ -0,0 +1,43 @@
package goja
import (
"github.com/dop251/goja"
"gopkg.d7z.net/gitea-pages/pkg/core"
)
func EventInject(ctx core.FilterContext, jsCtx *goja.Runtime) error {
return jsCtx.GlobalObject().Set("event", map[string]interface{}{
"subscribe": func(key string) (map[string]any, error) {
subscribe, err := ctx.Event.Subscribe(ctx, key)
if err != nil {
return nil, err
}
return map[string]any{
"on": func(f func(string)) {
go func() {
z:
for {
select {
case <-ctx.Done():
break z
case data := <-subscribe:
f(data)
}
}
}()
},
"get": func() (string, error) {
select {
case <-ctx.Done():
return "", ctx.Err()
case data := <-subscribe:
return data, nil
}
},
}, nil
},
"put": func(key, value string) error {
return ctx.Event.Publish(ctx, key, value)
},
})
}

View File

@@ -50,6 +50,9 @@ func RequestInject(ctx core.FilterContext, jsCtx *goja.Runtime, req *http.Reques
}
return nil
},
"getQuery": func(key string) string {
return req.URL.Query().Get(key)
},
"getHeader": func(name string) string {
return req.Header.Get(name)
},

View File

@@ -4,14 +4,16 @@ import (
"context"
"io"
"net/http"
"time"
"github.com/dop251/goja"
"github.com/gorilla/websocket"
"github.com/pkg/errors"
"go.uber.org/zap"
"gopkg.d7z.net/gitea-pages/pkg/core"
)
func WebsocketInject(jsCtx *goja.Runtime, w http.ResponseWriter, request *http.Request, cancelFunc context.CancelFunc) (io.Closer, error) {
func WebsocketInject(ctx core.FilterContext, jsCtx *goja.Runtime, w http.ResponseWriter, request *http.Request, cancelFunc context.CancelFunc) (io.Closer, error) {
closers := NewClosers()
return closers, jsCtx.GlobalObject().Set("websocket", func() (any, error) {
upgrader := websocket.Upgrader{}
@@ -20,9 +22,44 @@ func WebsocketInject(jsCtx *goja.Runtime, w http.ResponseWriter, request *http.R
return nil, err
}
cancelFunc()
go func() {
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()
f:
for {
select {
case <-ctx.Done():
break f
case <-ticker.C:
}
if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(5*time.Second)); err != nil {
zap.L().Debug("websocket ping failed", zap.Error(err))
ctx.Kill()
}
}
}()
zap.L().Debug("websocket upgrader created")
closers.AddCloser(conn.Close)
return map[string]interface{}{
"on": func(f func(mType int, message string)) {
go func() {
z:
for {
select {
case <-ctx.Done():
break z
default:
messageType, p, err := conn.ReadMessage()
if err != nil {
break z
}
f(messageType, string(p))
}
}
}()
},
"TypeTextMessage": websocket.TextMessage,
"TypeBinaryMessage": websocket.BinaryMessage,
"readText": func() (string, error) {
@@ -60,6 +97,9 @@ func WebsocketInject(jsCtx *goja.Runtime, w http.ResponseWriter, request *http.R
}
return conn.WriteMessage(mType, dataRaw)
},
"ping": func() error {
return conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(1*time.Second))
},
}, nil
})
}

View File

@@ -1,6 +1,7 @@
package pkg
import (
"context"
"errors"
"fmt"
"net/http"
@@ -18,6 +19,7 @@ import (
"gopkg.d7z.net/gitea-pages/pkg/utils"
"gopkg.d7z.net/middleware/cache"
"gopkg.d7z.net/middleware/kv"
"gopkg.d7z.net/middleware/subscribe"
"gopkg.d7z.net/middleware/tools"
)
@@ -28,9 +30,12 @@ type Server struct {
meta *core.PageDomain
db kv.CursorPagedKV
filterMgr map[string]core.FilterInstance
globCache *lru.Cache[string, glob.Glob]
cacheBlob cache.Cache
globCache *lru.Cache[string, glob.Glob]
cacheBlob cache.Cache
cacheBlobTtl time.Duration
event subscribe.Subscriber
errorHandler func(w http.ResponseWriter, r *http.Request, err error)
}
@@ -40,13 +45,15 @@ func NewPageServer(
domain string,
defaultBranch string,
db kv.CursorPagedKV,
event subscribe.Subscriber,
cacheMeta kv.KV,
cacheTTL time.Duration,
cacheMetaTTL time.Duration,
cacheBlob cache.Cache,
cacheBlobTtl time.Duration,
errorHandler func(w http.ResponseWriter, r *http.Request, err error),
filterConfig map[string]map[string]any,
) (*Server, error) {
svcMeta := core.NewServerMeta(client, backend, domain, cacheMeta, cacheTTL)
svcMeta := core.NewServerMeta(client, backend, domain, cacheMeta, cacheMetaTTL)
pageMeta := core.NewPageDomain(svcMeta, core.NewDomainAlias(db.Child("config").Child("alias")), domain, defaultBranch)
globCache, err := lru.New[string, glob.Glob](256)
if err != nil {
@@ -64,6 +71,8 @@ func NewPageServer(
filterMgr: defaultFilters,
errorHandler: errorHandler,
cacheBlob: cacheBlob,
cacheBlobTtl: cacheBlobTtl,
event: event,
}, nil
}
@@ -100,13 +109,17 @@ func (s *Server) Serve(writer *utils.WrittenResponseWriter, request *http.Reques
return err
}
cancel, cancelFunc := context.WithCancel(request.Context())
filterCtx := core.FilterContext{
PageContent: meta,
Context: request.Context(),
Context: cancel,
PageVFS: core.NewPageVFS(s.backend, meta.Owner, meta.Repo, meta.CommitID),
Cache: tools.NewTTLCache(s.cacheBlob.Child("filter").Child(meta.Owner).Child(meta.Repo).Child(meta.CommitID), time.Minute),
OrgDB: s.db.Child("org").Child(meta.Owner).(kv.CursorPagedKV),
RepoDB: s.db.Child("repo").Child(meta.Owner).Child(meta.Repo).(kv.CursorPagedKV),
Event: s.event.Child("domain").Child(meta.Owner).Child(meta.Repo),
Kill: cancelFunc,
}
zap.L().Debug("new request", zap.Any("request path", meta.Path))