diff --git a/cmd/local/main.go b/cmd/local/main.go index 98564a4..26f9a9d 100644 --- a/cmd/local/main.go +++ b/cmd/local/main.go @@ -15,6 +15,7 @@ import ( "gopkg.d7z.net/gitea-pages/pkg/providers" "gopkg.d7z.net/middleware/cache" "gopkg.d7z.net/middleware/kv" + "gopkg.d7z.net/middleware/subscribe" ) var ( @@ -56,8 +57,9 @@ func main() { if err != nil { zap.L().Fatal("failed to init memory provider", zap.Error(err)) } + subscriber := subscribe.NewMemorySubscriber() server, err := pkg.NewPageServer(http.DefaultClient, - provider, domain, "gh-pages", memory, memory, 0, &nopCache{}, + provider, domain, "gh-pages", memory, subscriber, memory, 0, &nopCache{}, 0, func(w http.ResponseWriter, r *http.Request, err error) { if errors.Is(err, os.ErrNotExist) { http.Error(w, "page not found.", http.StatusNotFound) diff --git a/cmd/server/config.go b/cmd/server/config.go index 8bc9dfa..2ab2231 100644 --- a/cmd/server/config.go +++ b/cmd/server/config.go @@ -22,6 +22,7 @@ type Config struct { Domain string `yaml:"domain"` // 基础域名 Database ConfigDatabase `yaml:"database"` // 配置 + Event ConfigEvent `yaml:"event"` // 事件传递 Auth ConfigAuth `yaml:"auth"` // 后端认证配置 @@ -76,6 +77,9 @@ type ConfigPage struct { type ConfigDatabase struct { URL string `yaml:"url"` } +type ConfigEvent struct { + URL string `yaml:"url"` +} type ConfigProxy struct { Enable bool `yaml:"enable"` // 是否允许反向代理 @@ -113,6 +117,9 @@ func LoadConfig(path string) (*Config, error) { if c.Database.URL == "" { return nil, errors.New("c is required") } + if c.Event.URL == "" { + c.Event.URL = "memory://" + } if c.StaticDir != "" { stat, err := os.Stat(c.StaticDir) if err != nil { diff --git a/cmd/server/main.go b/cmd/server/main.go index dda9661..67defd6 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -16,6 +16,7 @@ import ( "gopkg.d7z.net/gitea-pages/pkg/providers" "gopkg.d7z.net/middleware/cache" "gopkg.d7z.net/middleware/kv" + "gopkg.d7z.net/middleware/subscribe" ) var ( @@ -64,15 +65,22 @@ func main() { if !ok { log.Fatalln(errors.New("database not support cursor")) } + event, err := subscribe.NewSubscriberFromURL(config.Event.URL) + if err != nil { + log.Fatalln(err) + } + defer event.Close() pageServer, err := pkg.NewPageServer( http.DefaultClient, backend, config.Domain, config.Page.DefaultBranch, cdb, + event, cacheMeta, config.Cache.MetaTTL, cacheBlob.Child("filter"), + config.Cache.BlobTTL, config.ErrorHandler, config.Filters, ) diff --git a/examples/js_ws_event/.pages.yaml b/examples/js_ws_event/.pages.yaml new file mode 100644 index 0000000..894dc6c --- /dev/null +++ b/examples/js_ws_event/.pages.yaml @@ -0,0 +1,7 @@ +routes: + - path: "sender" + js: + exec: "sender.js" + - path: "event" + js: + exec: "event.js" \ No newline at end of file diff --git a/examples/js_ws_event/event.js b/examples/js_ws_event/event.js new file mode 100644 index 0000000..2669bf3 --- /dev/null +++ b/examples/js_ws_event/event.js @@ -0,0 +1,23 @@ +let name=request.getQuery("name") +if (name===""){ + throw Error(`Missing name "${name}"`) +} +let ws = websocket(); +event.subscribe("messages").on(function (msg){ + ws.writeText(msg) +}) +let shouldExit = false; +while (!shouldExit) { + let data = ws.readText(); + switch (data) { + case "exit": + shouldExit = true; + break; + default: + event.put("messages",JSON.stringify({ + name:name, + data:data + })); + break; + } +} \ No newline at end of file diff --git a/examples/js_ws_event/index.html b/examples/js_ws_event/index.html new file mode 100644 index 0000000..17e557b --- /dev/null +++ b/examples/js_ws_event/index.html @@ -0,0 +1,459 @@ + + + + + + WebSocket聊天客户端 + + + +
+
+
+

WebSocket聊天客户端

+
+
+ 未连接 +
+
+
+
+ + +
+ +
+
+ +
+
+ 💬 +

连接服务器开始聊天

+
+
+ +
+ + +
+
+ + + + \ No newline at end of file diff --git a/examples/js_ws_event/sender.js b/examples/js_ws_event/sender.js new file mode 100644 index 0000000..53c2f07 --- /dev/null +++ b/examples/js_ws_event/sender.js @@ -0,0 +1,8 @@ +let name=request.getQuery("name") +let message=request.getQuery("data") +event.put("messages", JSON.stringify({ + name:name, + data:message +})); + +// response.write(event.subscribe("messages").get()) \ No newline at end of file diff --git a/go.mod b/go.mod index b171daf..5925af1 100644 --- a/go.mod +++ b/go.mod @@ -10,11 +10,12 @@ require ( github.com/go-task/slim-sprig/v3 v3.0.0 github.com/gobwas/glob v0.2.3 github.com/google/uuid v1.6.0 + github.com/gorilla/websocket v1.5.3 github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.11.1 go.uber.org/zap v1.27.0 - gopkg.d7z.net/middleware v0.0.0-20251114145539-bb74bd940f32 + gopkg.d7z.net/middleware v0.0.0-20251119134829-0c55a98e6495 gopkg.in/yaml.v3 v3.0.1 ) @@ -35,7 +36,6 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/pprof v0.0.0-20251114195745-4902fdda35c8 // indirect - github.com/gorilla/websocket v1.5.3 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 // indirect github.com/hashicorp/go-version v1.7.0 // indirect github.com/klauspost/compress v1.18.1 // indirect @@ -59,6 +59,6 @@ require ( golang.org/x/text v0.31.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20251111163417-95abcf5c77ba // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba // indirect - google.golang.org/grpc v1.76.0 // indirect + google.golang.org/grpc v1.77.0 // indirect google.golang.org/protobuf v1.36.10 // indirect ) diff --git a/go.sum b/go.sum index 0d33ea5..de89c2a 100644 --- a/go.sum +++ b/go.sum @@ -120,16 +120,22 @@ go.etcd.io/etcd/client/v3 v3.6.6 h1:G5z1wMf5B9SNexoxOHUGBaULurOZPIgGPsW6CN492ec= go.etcd.io/etcd/client/v3 v3.6.6/go.mod h1:36Qv6baQ07znPR3+n7t+Rk5VHEzVYPvFfGmfF4wBHV8= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ= go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I= +go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= go.opentelemetry.io/otel/metric v1.37.0 h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/WgbsdpcPoZE= go.opentelemetry.io/otel/metric v1.37.0/go.mod h1:04wGrZurHYKOc+RKeye86GwKiTb9FKm1WHtO+4EVr2E= +go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA= go.opentelemetry.io/otel/sdk v1.37.0 h1:ItB0QUqnjesGRvNcmAcU0LyvkVyGJ2xftD29bWdDvKI= go.opentelemetry.io/otel/sdk v1.37.0/go.mod h1:VredYzxUvuo2q3WRcDnKDjbdvmO0sCzOvVAiY+yUkAg= +go.opentelemetry.io/otel/sdk v1.38.0 h1:l48sr5YbNf2hpCUj/FoGhW9yDkl+Ma+LrVl8qaM5b+E= go.opentelemetry.io/otel/sdk/metric v1.37.0 h1:90lI228XrB9jCMuSdA0673aubgRobVZFhbjxHHspCPc= go.opentelemetry.io/otel/sdk/metric v1.37.0/go.mod h1:cNen4ZWfiD37l5NhS+Keb5RXVWZWpRE+9WyVCpbo5ps= +go.opentelemetry.io/otel/sdk/metric v1.38.0 h1:aSH66iL0aZqo//xXzQLYozmWrXxyFkBJ6qT5wthqPoM= go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4= go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0= +go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= @@ -183,10 +189,14 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba h1: google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk= google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A= google.golang.org/grpc v1.76.0/go.mod h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c= +google.golang.org/grpc v1.77.0 h1:wVVY6/8cGA6vvffn+wWK5ToddbgdU3d8MNENr4evgXM= +google.golang.org/grpc v1.77.0/go.mod h1:z0BY1iVj0q8E1uSQCjL9cppRj+gnZjzDnzV0dHhrNig= google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.d7z.net/middleware v0.0.0-20251114145539-bb74bd940f32 h1:3JvqnWFLWzAoS57vLBT1LVePO3RqR32ijM3ZyjyoqyY= gopkg.d7z.net/middleware v0.0.0-20251114145539-bb74bd940f32/go.mod h1:/1/EuissKhUbuhUe01rcWuwpA5mt7jASb4uKVNOLtR8= +gopkg.d7z.net/middleware v0.0.0-20251119134829-0c55a98e6495 h1:LvjpmL0nkZZtrUXCFZGyoh8O2X9l2B7ZXFldOzN8ShI= +gopkg.d7z.net/middleware v0.0.0-20251119134829-0c55a98e6495/go.mod h1:/1/EuissKhUbuhUe01rcWuwpA5mt7jASb4uKVNOLtR8= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/pkg/core/filter.go b/pkg/core/filter.go index 80e27c2..b5a957d 100644 --- a/pkg/core/filter.go +++ b/pkg/core/filter.go @@ -10,6 +10,7 @@ import ( "go.uber.org/zap" "gopkg.d7z.net/middleware/kv" + "gopkg.d7z.net/middleware/subscribe" "gopkg.d7z.net/middleware/tools" ) @@ -20,6 +21,9 @@ type FilterContext struct { Cache *tools.TTLCache OrgDB kv.CursorPagedKV RepoDB kv.CursorPagedKV + Event subscribe.Subscriber + + Kill func() } type Params map[string]any diff --git a/pkg/filters/goja/goja.go b/pkg/filters/goja/goja.go index 454ba57..9492938 100644 --- a/pkg/filters/goja/goja.go +++ b/pkg/filters/goja/goja.go @@ -58,8 +58,8 @@ func FilterInstGoJa(gl core.Params) (core.FilterInstance, error) { stop := make(chan struct{}, 1) shutdown := make(chan struct{}, 1) defer close(shutdown) - timeout, cancelFunc := context.WithTimeout(ctx, global.Timeout) - defer cancelFunc() + timeout, timeoutCancelFunc := context.WithTimeout(ctx, global.Timeout) + defer timeoutCancelFunc() count := 0 closers := NewClosers() defer closers.Close() @@ -84,9 +84,12 @@ func FilterInstGoJa(gl core.Params) (core.FilterInstance, error) { if err = KVInject(ctx, vm); err != nil { panic(err) } + if err = EventInject(ctx, vm); err != nil { + panic(err) + } if global.EnableWebsocket { var closer io.Closer - closer, err = WebsocketInject(vm, debug, request, cancelFunc) + closer, err = WebsocketInject(ctx, vm, debug, request, timeoutCancelFunc) if err != nil { panic(err) } diff --git a/pkg/filters/goja/var_event.go b/pkg/filters/goja/var_event.go new file mode 100644 index 0000000..0d062d8 --- /dev/null +++ b/pkg/filters/goja/var_event.go @@ -0,0 +1,43 @@ +package goja + +import ( + "github.com/dop251/goja" + "gopkg.d7z.net/gitea-pages/pkg/core" +) + +func EventInject(ctx core.FilterContext, jsCtx *goja.Runtime) error { + return jsCtx.GlobalObject().Set("event", map[string]interface{}{ + "subscribe": func(key string) (map[string]any, error) { + subscribe, err := ctx.Event.Subscribe(ctx, key) + if err != nil { + return nil, err + } + return map[string]any{ + "on": func(f func(string)) { + go func() { + z: + for { + select { + case <-ctx.Done(): + break z + case data := <-subscribe: + f(data) + } + } + }() + }, + "get": func() (string, error) { + select { + case <-ctx.Done(): + return "", ctx.Err() + case data := <-subscribe: + return data, nil + } + }, + }, nil + }, + "put": func(key, value string) error { + return ctx.Event.Publish(ctx, key, value) + }, + }) +} diff --git a/pkg/filters/goja/var_request.go b/pkg/filters/goja/var_request.go index b78e7ec..db909a0 100644 --- a/pkg/filters/goja/var_request.go +++ b/pkg/filters/goja/var_request.go @@ -50,6 +50,9 @@ func RequestInject(ctx core.FilterContext, jsCtx *goja.Runtime, req *http.Reques } return nil }, + "getQuery": func(key string) string { + return req.URL.Query().Get(key) + }, "getHeader": func(name string) string { return req.Header.Get(name) }, diff --git a/pkg/filters/goja/var_websocket.go b/pkg/filters/goja/var_websocket.go index 93a098d..05e633e 100644 --- a/pkg/filters/goja/var_websocket.go +++ b/pkg/filters/goja/var_websocket.go @@ -4,14 +4,16 @@ import ( "context" "io" "net/http" + "time" "github.com/dop251/goja" "github.com/gorilla/websocket" "github.com/pkg/errors" "go.uber.org/zap" + "gopkg.d7z.net/gitea-pages/pkg/core" ) -func WebsocketInject(jsCtx *goja.Runtime, w http.ResponseWriter, request *http.Request, cancelFunc context.CancelFunc) (io.Closer, error) { +func WebsocketInject(ctx core.FilterContext, jsCtx *goja.Runtime, w http.ResponseWriter, request *http.Request, cancelFunc context.CancelFunc) (io.Closer, error) { closers := NewClosers() return closers, jsCtx.GlobalObject().Set("websocket", func() (any, error) { upgrader := websocket.Upgrader{} @@ -20,9 +22,44 @@ func WebsocketInject(jsCtx *goja.Runtime, w http.ResponseWriter, request *http.R return nil, err } cancelFunc() + go func() { + ticker := time.NewTicker(15 * time.Second) + defer ticker.Stop() + f: + for { + select { + case <-ctx.Done(): + break f + case <-ticker.C: + } + if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(5*time.Second)); err != nil { + zap.L().Debug("websocket ping failed", zap.Error(err)) + ctx.Kill() + } + } + }() zap.L().Debug("websocket upgrader created") closers.AddCloser(conn.Close) return map[string]interface{}{ + "on": func(f func(mType int, message string)) { + go func() { + z: + for { + select { + case <-ctx.Done(): + break z + default: + messageType, p, err := conn.ReadMessage() + if err != nil { + break z + } + f(messageType, string(p)) + } + + } + + }() + }, "TypeTextMessage": websocket.TextMessage, "TypeBinaryMessage": websocket.BinaryMessage, "readText": func() (string, error) { @@ -60,6 +97,9 @@ func WebsocketInject(jsCtx *goja.Runtime, w http.ResponseWriter, request *http.R } return conn.WriteMessage(mType, dataRaw) }, + "ping": func() error { + return conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(1*time.Second)) + }, }, nil }) } diff --git a/pkg/server.go b/pkg/server.go index 48a1968..8a55302 100644 --- a/pkg/server.go +++ b/pkg/server.go @@ -1,6 +1,7 @@ package pkg import ( + "context" "errors" "fmt" "net/http" @@ -18,6 +19,7 @@ import ( "gopkg.d7z.net/gitea-pages/pkg/utils" "gopkg.d7z.net/middleware/cache" "gopkg.d7z.net/middleware/kv" + "gopkg.d7z.net/middleware/subscribe" "gopkg.d7z.net/middleware/tools" ) @@ -28,9 +30,12 @@ type Server struct { meta *core.PageDomain db kv.CursorPagedKV filterMgr map[string]core.FilterInstance - globCache *lru.Cache[string, glob.Glob] - cacheBlob cache.Cache + globCache *lru.Cache[string, glob.Glob] + cacheBlob cache.Cache + cacheBlobTtl time.Duration + + event subscribe.Subscriber errorHandler func(w http.ResponseWriter, r *http.Request, err error) } @@ -40,13 +45,15 @@ func NewPageServer( domain string, defaultBranch string, db kv.CursorPagedKV, + event subscribe.Subscriber, cacheMeta kv.KV, - cacheTTL time.Duration, + cacheMetaTTL time.Duration, cacheBlob cache.Cache, + cacheBlobTtl time.Duration, errorHandler func(w http.ResponseWriter, r *http.Request, err error), filterConfig map[string]map[string]any, ) (*Server, error) { - svcMeta := core.NewServerMeta(client, backend, domain, cacheMeta, cacheTTL) + svcMeta := core.NewServerMeta(client, backend, domain, cacheMeta, cacheMetaTTL) pageMeta := core.NewPageDomain(svcMeta, core.NewDomainAlias(db.Child("config").Child("alias")), domain, defaultBranch) globCache, err := lru.New[string, glob.Glob](256) if err != nil { @@ -64,6 +71,8 @@ func NewPageServer( filterMgr: defaultFilters, errorHandler: errorHandler, cacheBlob: cacheBlob, + cacheBlobTtl: cacheBlobTtl, + event: event, }, nil } @@ -100,13 +109,17 @@ func (s *Server) Serve(writer *utils.WrittenResponseWriter, request *http.Reques return err } + cancel, cancelFunc := context.WithCancel(request.Context()) filterCtx := core.FilterContext{ PageContent: meta, - Context: request.Context(), + Context: cancel, PageVFS: core.NewPageVFS(s.backend, meta.Owner, meta.Repo, meta.CommitID), Cache: tools.NewTTLCache(s.cacheBlob.Child("filter").Child(meta.Owner).Child(meta.Repo).Child(meta.CommitID), time.Minute), OrgDB: s.db.Child("org").Child(meta.Owner).(kv.CursorPagedKV), RepoDB: s.db.Child("repo").Child(meta.Owner).Child(meta.Repo).(kv.CursorPagedKV), + Event: s.event.Child("domain").Child(meta.Owner).Child(meta.Repo), + + Kill: cancelFunc, } zap.L().Debug("new request", zap.Any("request path", meta.Path)) diff --git a/tests/core/test.go b/tests/core/test.go index 518398d..187e77f 100644 --- a/tests/core/test.go +++ b/tests/core/test.go @@ -14,6 +14,7 @@ import ( "gopkg.d7z.net/gitea-pages/pkg" "gopkg.d7z.net/middleware/cache" "gopkg.d7z.net/middleware/kv" + "gopkg.d7z.net/middleware/subscribe" ) type TestServer struct { @@ -52,9 +53,11 @@ func NewTestServer(domain string) *TestServer { domain, "gh-pages", memoryKV, + subscribe.NewMemorySubscriber(), memoryKV.Child("cache"), 0, memoryCache, + 0, func(w http.ResponseWriter, r *http.Request, err error) { if errors.Is(err, os.ErrNotExist) { http.Error(w, "page not found.", http.StatusNotFound)