From e909e140a8a77cfe416c7eecdf3a3a4dac24a83a Mon Sep 17 00:00:00 2001 From: Jay Date: Sat, 9 May 2026 17:36:03 -0400 Subject: [PATCH] Wrote embassy --- adapter.go | 195 ++++++++++++++++++++++++++++++++++++-- embassy_test.go | 245 ++++++++++++++++++++++++++++++++++++++++++++++++ go.mod | 13 ++- go.sum | 14 ++- helpers_test.go | 23 +++++ journal.go | 2 +- 6 files changed, 477 insertions(+), 15 deletions(-) create mode 100644 embassy_test.go create mode 100644 helpers_test.go diff --git a/adapter.go b/adapter.go index 373f524..e25db27 100644 --- a/adapter.go +++ b/adapter.go @@ -2,7 +2,9 @@ package prism import ( "context" + "fmt" "git.wisehodl.dev/jay/go-honeybee" + "git.wisehodl.dev/jay/go-mana-component" "log/slog" "sync" "time" @@ -39,6 +41,7 @@ type PoolEventKind = int const ( EventConnected PoolEventKind = iota EventDisconnected + EventAdded EventRemoved ) @@ -48,6 +51,15 @@ type PoolEvent struct { At time.Time } +func NewPoolEvent(id string, kind PoolEventKind, at time.Time) PoolEvent { + return PoolEvent{ID: id, Kind: kind, At: at} +} + +var convertPoolEvent = map[honeybee.OutboundPoolEventKind]PoolEventKind{ + honeybee.OutboundEventConnected: EventConnected, + honeybee.OutboundEventDisconnected: EventDisconnected, +} + // Adapter type Adapter interface { @@ -92,40 +104,205 @@ type Hotel struct { // Embassy (Outbound Adapter) // ---------------------------------------------------------------------------- -func NewEmbassy() *Embassy { - return nil +func NewEmbassy( + ctx context.Context, + pool EmbassyPlugin, + jc *JournalCollector, + handler slog.Handler, +) *Embassy { + ctx, cancel := context.WithCancel( + component.MustNew(ctx, "prism", "embassy")) + + e := &Embassy{ + pool: pool, + peers: make(map[string]bool), + eventSubs: make([]chan PoolEvent, 0), + ctx: ctx, + cancel: cancel, + } + + if jc != nil { + e.journals = make(chan JournalEntry, 16) + jc.Enroll(e.journals) + } + + if handler != nil { + c, ok := component.Get(ctx) + if ok { + e.logger = slog.New(handler).With(slog.Any("component", c)) + } + } + + e.wg.Add(1) + go e.runEventRouter() + + return e } func (e *Embassy) Dispatch(url string) error { + url, err := honeybee.NormalizeURL(url) + if err != nil { + return fmt.Errorf("invalid url: %s", url) + } + + if err := e.pool.Connect(url); err != nil { + return fmt.Errorf("dispatch: %w", err) + } + + e.mu.Lock() + e.peers[url] = false + subs := e.eventSubs + e.mu.Unlock() + + for _, ch := range subs { + select { + case <-e.ctx.Done(): + return fmt.Errorf("closing") + case ch <- NewPoolEvent(url, EventAdded, time.Now()): + } + } + return nil } func (e *Embassy) Dismiss(url string) error { + url, err := honeybee.NormalizeURL(url) + if err != nil { + return fmt.Errorf("invalid url: %s", url) + } + + if err := e.pool.Remove(url); err != nil { + return fmt.Errorf("dismiss: %w", err) + } + + e.mu.Lock() + delete(e.peers, url) + subs := e.eventSubs + e.mu.Unlock() + + for _, ch := range subs { + select { + case <-e.ctx.Done(): + return fmt.Errorf("closing") + case ch <- NewPoolEvent(url, EventRemoved, time.Now()): + } + } + return nil } -func (e *Embassy) Close() {} +func (e *Embassy) Close() { + e.mu.Lock() + peers := e.peers + e.peers = make(map[string]bool) + e.mu.Unlock() + + // dismiss peers + for peer, _ := range peers { + e.Dismiss(peer) + } + + e.cancel() + e.wg.Wait() + + e.mu.Lock() + subs := e.eventSubs + e.eventSubs = make([]chan PoolEvent, 0) + e.mu.Unlock() + + // close subs + for _, sub := range subs { + close(sub) + } +} func (e *Embassy) Peers() []string { - return nil + e.mu.RLock() + defer e.mu.RUnlock() + + peers := make([]string, 0, len(e.peers)) + for p, _ := range e.peers { + peers = append(peers, p) + } + return peers } -func (e *Embassy) HasPeer(id string) bool { - return false +func (e *Embassy) HasPeer(url string) bool { + url, err := honeybee.NormalizeURL(url) + if err != nil { + return false + } + + e.mu.RLock() + defer e.mu.RUnlock() + + _, ok := e.peers[url] + return ok } -func (e *Embassy) IsConnected(id string) bool { - return false +func (e *Embassy) IsConnected(url string) bool { + url, err := honeybee.NormalizeURL(url) + if err != nil { + return false + } + + e.mu.RLock() + defer e.mu.RUnlock() + + connected, _ := e.peers[url] + return connected } func (e *Embassy) Subscribe() <-chan PoolEvent { - return nil + e.mu.Lock() + defer e.mu.Unlock() + + ch := make(chan PoolEvent, 16) + e.eventSubs = append(e.eventSubs, ch) + + return ch } func (e *Embassy) Send(id string, data Envelope) error { return nil } +// Internal + +func (e *Embassy) runEventRouter() { + defer e.wg.Done() + + for { + select { + case <-e.ctx.Done(): + return + case ev, ok := <-e.pool.Events: + if !ok { + return + } + + kind := convertPoolEvent[ev.Kind] + + e.mu.Lock() + switch kind { + case EventConnected: + e.peers[ev.ID] = true + case EventDisconnected: + e.peers[ev.ID] = false + } + subs := e.eventSubs + e.mu.Unlock() + + for _, ch := range subs { + select { + case <-e.ctx.Done(): + case ch <- NewPoolEvent(ev.ID, kind, ev.At): + } + } + } + } +} + // ---------------------------------------------------------------------------- // Hotel (Inbound Adapter) // ---------------------------------------------------------------------------- diff --git a/embassy_test.go b/embassy_test.go new file mode 100644 index 0000000..6b8becb --- /dev/null +++ b/embassy_test.go @@ -0,0 +1,245 @@ +package prism + +import ( + "context" + // "fmt" + "git.wisehodl.dev/jay/go-honeybee" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestEmbassyPoolEvents(t *testing.T) { + ctx := context.Background() + eventsCh := make(chan honeybee.OutboundPoolEvent) + + pool := EmbassyPlugin{ + Connect: func(id string) error { return nil }, + Remove: func(id string) error { return nil }, + Send: func(id string, data []byte) error { return nil }, + Events: eventsCh, + } + + e := NewEmbassy(ctx, pool, nil, nil) + sub := e.Subscribe() + + t.Run("added then removed", func(t *testing.T) { + err := e.Dispatch("wss://test") + assert.NoError(t, err) + + Eventually(t, func() bool { + select { + default: + return false + case ev := <-sub: + return ev.Kind == EventAdded + } + }, "expected added event") + + err = e.Dismiss("wss://test") + assert.NoError(t, err) + + Eventually(t, func() bool { + select { + default: + return false + case ev := <-sub: + return ev.Kind == EventRemoved + } + }, "expected removed event") + }) + + t.Run("connected", func(t *testing.T) { + eventsCh <- honeybee.OutboundPoolEvent{ + ID: "wss://test", + Kind: honeybee.OutboundEventConnected, + At: time.Now(), + } + + Eventually(t, func() bool { + select { + default: + return false + case ev := <-sub: + return ev.Kind == EventConnected + } + }, "expected connected event") + }) + + t.Run("disconnected", func(t *testing.T) { + eventsCh <- honeybee.OutboundPoolEvent{ + ID: "wss://test", + Kind: honeybee.OutboundEventDisconnected, + At: time.Now(), + } + + Eventually(t, func() bool { + select { + default: + return false + case ev := <-sub: + return ev.Kind == EventDisconnected + } + }, "expected disconnected event") + }) +} + +func TestEmbassyPeerRegistry(t *testing.T) { + ctx := context.Background() + eventsCh := make(chan honeybee.OutboundPoolEvent) + + pool := EmbassyPlugin{ + Connect: func(id string) error { return nil }, + Remove: func(id string) error { return nil }, + Send: func(id string, data []byte) error { return nil }, + Events: eventsCh, + } + + e := NewEmbassy(ctx, pool, nil, nil) + + // add + e.Dispatch("wss://test") + + assert.True(t, e.HasPeer("wss://test")) + assert.False(t, e.IsConnected("wss://test")) + + // connect + eventsCh <- honeybee.OutboundPoolEvent{ + ID: "wss://test", + Kind: honeybee.OutboundEventConnected, + At: time.Now(), + } + + Eventually(t, func() bool { + exists := e.HasPeer("wss://test") + connected := e.IsConnected("wss://test") + return exists && connected + }, "expected: exists, connected") + + // disconnect + eventsCh <- honeybee.OutboundPoolEvent{ + ID: "wss://test", + Kind: honeybee.OutboundEventDisconnected, + At: time.Now(), + } + + Eventually(t, func() bool { + exists := e.HasPeer("wss://test") + connected := e.IsConnected("wss://test") + return exists && !connected + }, "expected: exists, disconnected") + + // remove + e.Dismiss("wss://test") + + assert.False(t, e.HasPeer("wss://test")) + assert.False(t, e.IsConnected("wss://test")) +} + +func TestEmbassyPeers(t *testing.T) { + ctx := context.Background() + + pool := EmbassyPlugin{ + Connect: func(id string) error { return nil }, + Remove: func(id string) error { return nil }, + Send: func(id string, data []byte) error { return nil }, + Events: nil, + } + + e := NewEmbassy(ctx, pool, nil, nil) + + assert.Len(t, e.Peers(), 0) + + e.Dispatch("wss://test1") + e.Dispatch("wss://test2") + assert.Len(t, e.Peers(), 2) + + e.Dismiss("wss://test2") + assert.Len(t, e.Peers(), 1) +} + +func TestEmbassySubFanout(t *testing.T) { + ctx := context.Background() + eventsCh := make(chan honeybee.OutboundPoolEvent) + + pool := EmbassyPlugin{ + Connect: func(id string) error { return nil }, + Remove: func(id string) error { return nil }, + Send: func(id string, data []byte) error { return nil }, + Events: eventsCh, + } + + e := NewEmbassy(ctx, pool, nil, nil) + sub1 := e.Subscribe() + sub2 := e.Subscribe() + + e.Dispatch("wss://test") + + Eventually(t, func() bool { + select { + default: + return false + case ev := <-sub1: + return ev.Kind == EventAdded + } + }, "expected added event on sub1") + + Eventually(t, func() bool { + select { + default: + return false + case ev := <-sub2: + return ev.Kind == EventAdded + } + }, "expected added event on sub2") +} + +func TestEmbassyClose(t *testing.T) { + ctx := context.Background() + eventsCh := make(chan honeybee.OutboundPoolEvent, 1) + + pool := EmbassyPlugin{ + Connect: func(id string) error { return nil }, + Remove: func(id string) error { return nil }, + Send: func(id string, data []byte) error { return nil }, + Events: eventsCh, + } + + e := NewEmbassy(ctx, pool, nil, nil) + sub1 := e.Subscribe() + sub2 := e.Subscribe() + + e.Dispatch("wss://test") + + e.Close() + + // peer gets removed + Eventually(t, func() bool { + select { + default: + return false + case ev := <-sub1: + return ev.ID == "wss://test" && ev.Kind == EventRemoved + } + }, "expected peer removed") + + Eventually(t, func() bool { + select { + default: + return false + case ev := <-sub2: + return ev.ID == "wss://test" && ev.Kind == EventRemoved + } + }, "expected peer removed") + + // peer list is empty + assert.False(t, e.HasPeer("wss://test")) + assert.Len(t, e.Peers(), 0) + + // subs close + Eventually(t, func() bool { + _, ok1 := <-sub1 + _, ok2 := <-sub2 + return !ok1 && !ok2 + }, "subs should close") +} diff --git a/go.mod b/go.mod index 32a9853..fc179df 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,14 @@ module git.wisehodl.dev/jay/go-mana-prism go 1.25.0 require ( - git.wisehodl.dev/jay/go-honeybee v0.2.0 // indirect - git.wisehodl.dev/jay/go-roots-ws v0.1.0 // indirect - github.com/gorilla/websocket v1.5.3 // indirect + git.wisehodl.dev/jay/go-honeybee v0.2.0 + github.com/stretchr/testify v1.11.1 +) + +require ( + git.wisehodl.dev/jay/go-mana-component v0.1.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/gorilla/websocket v1.5.3 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 82ee211..33935db 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,16 @@ git.wisehodl.dev/jay/go-honeybee v0.2.0 h1:bF+/7WQzJnGBv5VuPBkWjshWWMbK4PZy8gia7AtVxt0= git.wisehodl.dev/jay/go-honeybee v0.2.0/go.mod h1:Xf3atUWJ2JgWVYpTBBxSgzL3ELdAo0znpqwpBZk9DlA= -git.wisehodl.dev/jay/go-roots-ws v0.1.0 h1:p1veCkpOmL26N//Qz7ekJOYj1Ck30ai4OKq9dxLjodk= -git.wisehodl.dev/jay/go-roots-ws v0.1.0/go.mod h1:ANQOOP13lHs2uQwYhrSQGAlL7+zR6QvbLzNPmNBJssQ= +git.wisehodl.dev/jay/go-mana-component v0.1.0 h1:wWYN5MzC9Hq3tEt4z7FjrwNuQz3rZY3RWAmgmNE8EZE= +git.wisehodl.dev/jay/go-mana-component v0.1.0/go.mod h1:r2ZaTjKzwV5JJfC5boikxtjAKusPrzlJU/7qul0EUqA= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/helpers_test.go b/helpers_test.go new file mode 100644 index 0000000..d820c12 --- /dev/null +++ b/helpers_test.go @@ -0,0 +1,23 @@ +package prism + +import ( + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +const ( + TestTimeout = 2 * time.Second + TestTick = 10 * time.Millisecond + NegativeTestTimeout = 100 * time.Millisecond +) + +func Eventually(t *testing.T, condition func() bool, msg string) { + t.Helper() + assert.Eventually(t, condition, TestTimeout, TestTick, msg) +} + +func Never(t *testing.T, condition func() bool, msg string) { + t.Helper() + assert.Never(t, condition, NegativeTestTimeout, TestTick, msg) +} diff --git a/journal.go b/journal.go index d53a133..d62acfd 100644 --- a/journal.go +++ b/journal.go @@ -53,7 +53,7 @@ func NewJournalCollector() *JournalCollector { return nil } -func (c *JournalCollector) Enroll() {} +func (c *JournalCollector) Enroll(ch <-chan JournalEntry) {} func (c *JournalCollector) Close() {}