From 19c62682b9a7594c334ea0869c3c95da421098da Mon Sep 17 00:00:00 2001 From: Jay Date: Sat, 9 May 2026 20:15:00 -0400 Subject: [PATCH] wrote clerk --- clerk.go | 157 ++++++++++++++++++++++++++++++++++++++++++-- clerk_test.go | 171 ++++++++++++++++++++++++++++++++++++++++++++++++ embassy_test.go | 1 - go.mod | 1 + go.sum | 2 + 5 files changed, 327 insertions(+), 5 deletions(-) create mode 100644 clerk_test.go diff --git a/clerk.go b/clerk.go index 74a03e7..d46f79b 100644 --- a/clerk.go +++ b/clerk.go @@ -2,12 +2,25 @@ package prism import ( "context" + "errors" + "fmt" "git.wisehodl.dev/jay/go-honeybee" + "git.wisehodl.dev/jay/go-mana-component" + "git.wisehodl.dev/jay/go-roots-ws" "log/slog" "sync" "time" ) +// ---------------------------------------------------------------------------- +// Errors +// ---------------------------------------------------------------------------- + +var ( + ErrAlreadyStarted = errors.New("clerk already started") + ErrUnknownLabel = errors.New("unknown label") +) + // ---------------------------------------------------------------------------- // Types // ---------------------------------------------------------------------------- @@ -51,12 +64,148 @@ type clerkRoutes = map[string][]chan InboundLetter // Clerk // ---------------------------------------------------------------------------- -func NewClerk() *Clerk { +func NewClerk( + ctx context.Context, + inbox <-chan honeybee.InboxMessage, + knownLabels map[string]struct{}, + handler slog.Handler, +) *Clerk { + ctx, cancel := context.WithCancel( + component.MustNew(ctx, "prism", "clerk")) + + known := make(map[string]struct{}, len(knownLabels)) + for label := range knownLabels { + known[label] = struct{}{} + } + + c := &Clerk{ + inbox: inbox, + known: known, + ctx: ctx, + cancel: cancel, + } + + if handler != nil { + comp, ok := component.Get(ctx) + if ok { + c.logger = slog.New(handler).With(slog.Any("component", comp)) + } + } + + return c +} + +func (c *Clerk) Subscribe( + labels map[string]struct{}, + buffer int, +) (<-chan InboundLetter, error) { + c.mu.Lock() + defer c.mu.Unlock() + + if c.started { + return nil, ErrAlreadyStarted + } + + for label := range labels { + if _, ok := c.known[label]; !ok { + return nil, fmt.Errorf("%w: %s", ErrUnknownLabel, label) + } + } + + subLabels := make(map[string]struct{}, len(labels)) + for label := range labels { + subLabels[label] = struct{}{} + } + + ch := make(chan InboundLetter, buffer) + c.pending = append(c.pending, clerkSub{ch: ch, labels: subLabels}) + + return ch, nil +} + +func (c *Clerk) Start() error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.started { + return ErrAlreadyStarted + } + + routes := make(clerkRoutes, len(c.known)) + for _, sub := range c.pending { + for label := range sub.labels { + routes[label] = append(routes[label], sub.ch) + } + } + + c.routes = routes + c.started = true + + c.wg.Add(1) + go c.run() + return nil } -func (c *Clerk) Subscribe() {} +func (c *Clerk) Close() { + c.cancel() + c.wg.Wait() -func (c *Clerk) Start() {} + c.mu.Lock() + defer c.mu.Unlock() -func (c *Clerk) Close() {} + for _, sub := range c.pending { + close(sub.ch) + } + + // prevent double channel closes if Close() is called twice + c.pending = nil +} + +func (c *Clerk) run() { + defer c.wg.Done() + + for { + select { + case <-c.ctx.Done(): + return + + case msg, ok := <-c.inbox: + if !ok { + // inbox closed externally, close clerk + c.cancel() + return + } + + labelBytes, err := envelope.GetLabel(msg.Data) + if err != nil { + if c.logger != nil { + c.logger.Warn("invalid envelope", + "peer_id", msg.ID, + "received_at", msg.ReceivedAt, + ) + } + continue + } + + subs, ok := c.routes[string(labelBytes)] + if !ok { + continue + } + + letter := InboundLetter{ + ID: msg.ID, + Data: msg.Data, + At: msg.ReceivedAt, + } + + for _, ch := range subs { + select { + case ch <- letter: + case <-c.ctx.Done(): + return + } + } + } + } +} diff --git a/clerk_test.go b/clerk_test.go new file mode 100644 index 0000000..5ea374f --- /dev/null +++ b/clerk_test.go @@ -0,0 +1,171 @@ +package prism + +import ( + "context" + "git.wisehodl.dev/jay/go-honeybee" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +// ---------------------------------------------------------------------------- +// Helpers +// ---------------------------------------------------------------------------- + +func mockInbox() (chan honeybee.InboxMessage, func(label string)) { + ch := make(chan honeybee.InboxMessage, 8) + inject := func(label string) { + ch <- honeybee.InboxMessage{ + ID: "wss://test", + Data: []byte(`["` + label + `","payload"]`), + ReceivedAt: time.Now(), + } + } + return ch, inject +} + +func makeClerk(inbox chan honeybee.InboxMessage) *Clerk { + known := map[string]struct{}{ + "EVENT": {}, + "EOSE": {}, + "CLOSE": {}, + } + return NewClerk(context.Background(), inbox, known, nil) +} + +// ---------------------------------------------------------------------------- +// Tests +// ---------------------------------------------------------------------------- + +func TestClerkRouting(t *testing.T) { + inbox, inject := mockInbox() + c := makeClerk(inbox) + + subA, err := c.Subscribe(map[string]struct{}{"EVENT": {}}, 4) + assert.NoError(t, err) + + subB, err := c.Subscribe(map[string]struct{}{"EVENT": {}, "EOSE": {}}, 4) + assert.NoError(t, err) + + assert.NoError(t, c.Start()) + + inject("EVENT") + inject("EOSE") + + // A receives exactly one letter (EVENT only) + Eventually(t, func() bool { + select { + case l := <-subA: + return string(l.Data) == `["EVENT","payload"]` + default: + return false + } + }, "subA should receive the EVENT letter") + + Never(t, func() bool { + select { + case <-subA: + return true + default: + return false + } + }, "subA should receive no further letters") + + // B receives two letters (EVENT and EOSE) + count := 0 + Eventually(t, func() bool { + select { + case <-subB: + count++ + default: + } + return count == 2 + }, "subB should receive both letters") +} + +func TestClerkStartup(t *testing.T) { + inbox, _ := mockInbox() + c := makeClerk(inbox) + + assert.NoError(t, c.Start()) + + _, err := c.Subscribe(map[string]struct{}{"EVENT": {}}, 4) + assert.ErrorIs(t, err, ErrAlreadyStarted) + + c.Close() +} + +func TestClerkUnknownSubscriptionLabel(t *testing.T) { + inbox, _ := mockInbox() + c := makeClerk(inbox) + + _, err := c.Subscribe(map[string]struct{}{"UNKNOWN": {}}, 4) + assert.ErrorIs(t, err, ErrUnknownLabel) +} + +func TestClerkUnknownInboxLabel(t *testing.T) { + inbox, inject := mockInbox() + c := makeClerk(inbox) + + // subscribe to every known label + sub, err := c.Subscribe( + map[string]struct{}{"EVENT": {}, "EOSE": {}, "CLOSE": {}}, 4) + assert.NoError(t, err) + assert.NoError(t, c.Start()) + + // inject a valid nostr label, but is not in the test label set + inject("NOTICE") + + Never(t, func() bool { + select { + case <-sub: + return true + default: + return false + } + }, "no subscriber should receive an unknown label") +} + +func TestClerkInboxClose(t *testing.T) { + inbox, _ := mockInbox() + c := makeClerk(inbox) + + sub, err := c.Subscribe(map[string]struct{}{"EVENT": {}}, 4) + assert.NoError(t, err) + assert.NoError(t, c.Start()) + + // close the inbox as the pool would on shutdown + close(inbox) + + // internal waitgroup should clear + Eventually(t, func() bool { + c.wg.Wait() + return true + }, "wg should clear") + + // subscriptions remain open. Close() must be called to completely shut down + Never(t, func() bool { + _, ok := <-sub + return !ok + }, "sub should remain open") +} + +func TestClerkClose(t *testing.T) { + inbox, _ := mockInbox() + c := makeClerk(inbox) + + subA, err := c.Subscribe(map[string]struct{}{"EVENT": {}}, 4) + assert.NoError(t, err) + subB, err := c.Subscribe(map[string]struct{}{"EOSE": {}}, 4) + assert.NoError(t, err) + + assert.NoError(t, c.Start()) + + c.Close() + + Eventually(t, func() bool { + _, okA := <-subA + _, okB := <-subB + return !okA && !okB + }, "all subscriber channels should be closed after Close()") +} diff --git a/embassy_test.go b/embassy_test.go index 2bff79e..4f1ec7c 100644 --- a/embassy_test.go +++ b/embassy_test.go @@ -2,7 +2,6 @@ package prism import ( "context" - // "fmt" "git.wisehodl.dev/jay/go-honeybee" "github.com/stretchr/testify/assert" "testing" diff --git a/go.mod b/go.mod index fc179df..0786321 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( require ( git.wisehodl.dev/jay/go-mana-component v0.1.0 // indirect + git.wisehodl.dev/jay/go-roots-ws v0.1.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/gorilla/websocket v1.5.3 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/go.sum b/go.sum index 33935db..992dfaf 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ git.wisehodl.dev/jay/go-honeybee v0.2.0 h1:bF+/7WQzJnGBv5VuPBkWjshWWMbK4PZy8gia7 git.wisehodl.dev/jay/go-honeybee v0.2.0/go.mod h1:Xf3atUWJ2JgWVYpTBBxSgzL3ELdAo0znpqwpBZk9DlA= git.wisehodl.dev/jay/go-mana-component v0.1.0 h1:wWYN5MzC9Hq3tEt4z7FjrwNuQz3rZY3RWAmgmNE8EZE= git.wisehodl.dev/jay/go-mana-component v0.1.0/go.mod h1:r2ZaTjKzwV5JJfC5boikxtjAKusPrzlJU/7qul0EUqA= +git.wisehodl.dev/jay/go-roots-ws v0.1.0 h1:p1veCkpOmL26N//Qz7ekJOYj1Ck30ai4OKq9dxLjodk= +git.wisehodl.dev/jay/go-roots-ws v0.1.0/go.mod h1:ANQOOP13lHs2uQwYhrSQGAlL7+zR6QvbLzNPmNBJssQ= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=