From c0c23715e6fedbf2e2deb2417ca2e39476e9fdb2 Mon Sep 17 00:00:00 2001 From: Jay Date: Sat, 9 May 2026 19:08:16 -0400 Subject: [PATCH] wrote embassy journal emissions --- adapter.go | 67 ++++++++++++++++++++++++++++--- embassy_test.go | 102 ++++++++++++++++++++++++++++++++++++++++++++++++ journal.go | 8 ++-- journal_test.go | 4 +- 4 files changed, 169 insertions(+), 12 deletions(-) diff --git a/adapter.go b/adapter.go index e25db27..ab9a6d1 100644 --- a/adapter.go +++ b/adapter.go @@ -154,11 +154,21 @@ func (e *Embassy) Dispatch(url string) error { subs := e.eventSubs e.mu.Unlock() + at := time.Now() + if e.journals != nil { + c, _ := component.Get(e.ctx) + select { + case <-e.ctx.Done(): + return fmt.Errorf("closing") + case e.journals <- NewPeerAddedJournal(url, c, PeerAddedData{At: at}): + } + } + for _, ch := range subs { select { case <-e.ctx.Done(): return fmt.Errorf("closing") - case ch <- NewPoolEvent(url, EventAdded, time.Now()): + case ch <- NewPoolEvent(url, EventAdded, at): } } @@ -180,11 +190,21 @@ func (e *Embassy) Dismiss(url string) error { subs := e.eventSubs e.mu.Unlock() + at := time.Now() + if e.journals != nil { + c, _ := component.Get(e.ctx) + select { + case <-e.ctx.Done(): + return fmt.Errorf("closing") + case e.journals <- NewPeerRemovedJournal(url, c, PeerRemovedData{At: at}): + } + } + for _, ch := range subs { select { case <-e.ctx.Done(): return fmt.Errorf("closing") - case ch <- NewPoolEvent(url, EventRemoved, time.Now()): + case ch <- NewPoolEvent(url, EventRemoved, at): } } @@ -194,7 +214,6 @@ func (e *Embassy) Dismiss(url string) error { func (e *Embassy) Close() { e.mu.Lock() peers := e.peers - e.peers = make(map[string]bool) e.mu.Unlock() // dismiss peers @@ -206,6 +225,9 @@ func (e *Embassy) Close() { e.wg.Wait() e.mu.Lock() + // reset peers after dismissal + e.peers = make(map[string]bool) + subs := e.eventSubs e.eventSubs = make([]chan PoolEvent, 0) e.mu.Unlock() @@ -214,6 +236,10 @@ func (e *Embassy) Close() { for _, sub := range subs { close(sub) } + + if e.journals != nil { + close(e.journals) + } } func (e *Embassy) Peers() []string { @@ -281,22 +307,51 @@ func (e *Embassy) runEventRouter() { return } + url, err := honeybee.NormalizeURL(ev.ID) + if err != nil { + continue + } + + if !e.HasPeer(url) { + continue + } + kind := convertPoolEvent[ev.Kind] e.mu.Lock() switch kind { case EventConnected: - e.peers[ev.ID] = true + e.peers[url] = true case EventDisconnected: - e.peers[ev.ID] = false + e.peers[url] = false } subs := e.eventSubs + canJournal := e.journals != nil e.mu.Unlock() + if canJournal { + switch kind { + case EventConnected: + c, _ := component.Get(e.ctx) + select { + case <-e.ctx.Done(): + case e.journals <- NewPeerConnectedJournal( + url, c, PeerConnectedData{At: ev.At}): + } + case EventDisconnected: + c, _ := component.Get(e.ctx) + select { + case <-e.ctx.Done(): + case e.journals <- NewPeerDisconnectedJournal( + url, c, PeerDisconnectedData{At: ev.At}): + } + } + } + for _, ch := range subs { select { case <-e.ctx.Done(): - case ch <- NewPoolEvent(ev.ID, kind, ev.At): + case ch <- NewPoolEvent(url, kind, ev.At): } } } diff --git a/embassy_test.go b/embassy_test.go index 6b8becb..2bff79e 100644 --- a/embassy_test.go +++ b/embassy_test.go @@ -50,6 +50,8 @@ func TestEmbassyPoolEvents(t *testing.T) { }) t.Run("connected", func(t *testing.T) { + e.Dispatch("wss://test") + eventsCh <- honeybee.OutboundPoolEvent{ ID: "wss://test", Kind: honeybee.OutboundEventConnected, @@ -67,6 +69,8 @@ func TestEmbassyPoolEvents(t *testing.T) { }) t.Run("disconnected", func(t *testing.T) { + e.Dispatch("wss://test") + eventsCh <- honeybee.OutboundPoolEvent{ ID: "wss://test", Kind: honeybee.OutboundEventDisconnected, @@ -243,3 +247,101 @@ func TestEmbassyClose(t *testing.T) { return !ok1 && !ok2 }, "subs should close") } + +func TestEmbassyJournals(t *testing.T) { + ctx := context.Background() + jc := NewJournalCollector() + eventsCh := make(chan honeybee.OutboundPoolEvent, 1) + + pool := EmbassyPlugin{ + Connect: func(id string) error { return nil }, + Remove: func(id string) error { return nil }, + Send: func(id string, data []byte) error { return nil }, + Events: eventsCh, + } + + e := NewEmbassy(ctx, pool, jc, nil) + out := jc.Out() + peer := "wss://test" + + // added + e.Dispatch(peer) + Eventually(t, func() bool { + select { + case entry := <-out: + _, ok := entry.(PeerAddedJournal) + return ok + default: + return false + } + }, "expected PeerAddedJournal") + + // connected + eventsCh <- honeybee.OutboundPoolEvent{ + ID: peer, + Kind: honeybee.OutboundEventConnected, + At: time.Now(), + } + Eventually(t, func() bool { + select { + case entry := <-out: + e, ok := entry.(PeerConnectedJournal) + + // ensure fields are correct + peerOk := e.PeerID() == "wss://test" + modOk := e.Component().Module() == "prism" + pathOk := e.Component().PathString() == "embassy" + + return ok && peerOk && modOk && pathOk + default: + return false + } + }, "expected PeerConnectedJournal") + + // disconnected + eventsCh <- honeybee.OutboundPoolEvent{ + ID: peer, + Kind: honeybee.OutboundEventDisconnected, + At: time.Now(), + } + Eventually(t, func() bool { + select { + case entry := <-out: + _, ok := entry.(PeerDisconnectedJournal) + return ok + default: + return false + } + }, "expected PeerDisconnectedJournal") + + // removed + e.Dismiss(peer) + Eventually(t, func() bool { + select { + case entry := <-out: + _, ok := entry.(PeerRemovedJournal) + return ok + default: + return false + } + }, "expected PeerRemovedJournal") + + // close embassy: closes journal channel + e.Close() + + // Ensure jc can close now that embassy has closed its journal channel + jcClosed := make(chan struct{}) + go func() { + jc.Close() + close(jcClosed) + }() + + Eventually(t, func() bool { + select { + case <-jcClosed: + return true + default: + return false + } + }, "JournalCollector.Close() should return after Embassy.Close()") +} diff --git a/journal.go b/journal.go index 791a6ec..8afb21f 100644 --- a/journal.go +++ b/journal.go @@ -26,7 +26,7 @@ type JournalCollector struct { type JournalEntry interface { PeerID() string SealedAt() time.Time - Author() component.Component + Component() component.Component } type entry struct { @@ -35,9 +35,9 @@ type entry struct { component component.Component } -func (e *entry) PeerID() string { return e.peerID } -func (e *entry) SealedAt() time.Time { return e.sealedAt } -func (e *entry) Author() component.Component { return e.component } +func (e *entry) PeerID() string { return e.peerID } +func (e *entry) SealedAt() time.Time { return e.sealedAt } +func (e *entry) Component() component.Component { return e.component } // ---------------------------------------------------------------------------- // Journal Collector diff --git a/journal_test.go b/journal_test.go index 1234767..407c111 100644 --- a/journal_test.go +++ b/journal_test.go @@ -158,8 +158,8 @@ func TestJournalCollector_ComponentIdentity(t *testing.T) { typed, ok := received.(*testJournalEntry) assert.True(t, ok, "should be correct concrete type") - assert.Equal(t, mod, typed.Author().Module()) - assert.Equal(t, path, typed.Author().PathString()) + assert.Equal(t, mod, typed.Component().Module()) + assert.Equal(t, path, typed.Component().PathString()) jc.Close() }