diff --git a/journal.go b/journal.go index d62acfd..791a6ec 100644 --- a/journal.go +++ b/journal.go @@ -1,6 +1,8 @@ package prism import ( + "fmt" + "git.wisehodl.dev/jay/go-mana-component" "sync" "time" ) @@ -9,19 +11,11 @@ import ( // Types // ---------------------------------------------------------------------------- -type JournalAuthor string - -const ( - AuthorEmbassy JournalAuthor = "prism.embassy" - AuthorReqManager JournalAuthor = "prism.req_manager" - AuthorStreamReq JournalAuthor = "prism.stream_req" - AuthorQueryReq JournalAuthor = "prism.query_req" -) - // JournalCollector type JournalCollector struct { - entries chan JournalEntry + out chan JournalEntry + buffer chan JournalEntry mu sync.Mutex wg sync.WaitGroup closing bool @@ -32,45 +26,117 @@ type JournalCollector struct { type JournalEntry interface { PeerID() string SealedAt() time.Time - Author() JournalAuthor + Author() component.Component } type entry struct { - peerID string - sealedAt time.Time - author JournalAuthor + peerID string + sealedAt time.Time + component component.Component } -func (e *entry) PeerID() string { return e.peerID } -func (e *entry) SealedAt() time.Time { return e.sealedAt } -func (e *entry) Author() JournalAuthor { return e.author } +func (e *entry) PeerID() string { return e.peerID } +func (e *entry) SealedAt() time.Time { return e.sealedAt } +func (e *entry) Author() component.Component { return e.component } // ---------------------------------------------------------------------------- // Journal Collector // ---------------------------------------------------------------------------- func NewJournalCollector() *JournalCollector { + c := &JournalCollector{ + out: make(chan JournalEntry), + buffer: make(chan JournalEntry, 1024), + } + + go func() { + bufferedPipe(c.buffer, c.out) + close(c.out) + }() + + return c +} + +func (c *JournalCollector) Enroll(ch <-chan JournalEntry) error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.closing { + return fmt.Errorf("journal collector is closing") + } + + c.wg.Add(1) + go func() { + defer c.wg.Done() + for e := range ch { + c.buffer <- e + } + }() + return nil } -func (c *JournalCollector) Enroll(ch <-chan JournalEntry) {} +func (c *JournalCollector) Close() { + c.mu.Lock() + if c.closing { + c.mu.Unlock() + return + } + c.closing = true + c.mu.Unlock() -func (c *JournalCollector) Close() {} + c.wg.Wait() + close(c.buffer) +} -func (c *JournalCollector) Entries() {} +func (c *JournalCollector) Out() <-chan JournalEntry { return c.out } // ---------------------------------------------------------------------------- // Journal Entries // ---------------------------------------------------------------------------- -func newEntry(peerID string, author JournalAuthor) *entry { +func newEntry(peerID string, component component.Component) *entry { return &entry{ - peerID: peerID, - author: author, - sealedAt: time.Now(), + peerID: peerID, + component: component, + sealedAt: time.Now(), } } +// PeerAdded + +type PeerAddedJournal struct { + *entry + Data PeerAddedData +} + +type PeerAddedData struct { + At time.Time +} + +func NewPeerAddedJournal( + peerID string, component component.Component, data PeerAddedData, +) PeerAddedJournal { + return PeerAddedJournal{entry: newEntry(peerID, component), Data: data} +} + +// PeerRemoved + +type PeerRemovedJournal struct { + *entry + Data PeerRemovedData +} + +type PeerRemovedData struct { + At time.Time +} + +func NewPeerRemovedJournal( + peerID string, component component.Component, data PeerRemovedData, +) PeerRemovedJournal { + return PeerRemovedJournal{entry: newEntry(peerID, component), Data: data} +} + // PeerConnected type PeerConnectedJournal struct { @@ -83,9 +149,9 @@ type PeerConnectedData struct { } func NewPeerConnectedJournal( - peerID string, author JournalAuthor, data PeerConnectedData, + peerID string, component component.Component, data PeerConnectedData, ) PeerConnectedJournal { - return PeerConnectedJournal{entry: newEntry(peerID, author), Data: data} + return PeerConnectedJournal{entry: newEntry(peerID, component), Data: data} } // PeerDisconnected @@ -100,9 +166,9 @@ type PeerDisconnectedData struct { } func NewPeerDisconnectedJournal( - peerID string, author JournalAuthor, data PeerDisconnectedData, + peerID string, component component.Component, data PeerDisconnectedData, ) PeerDisconnectedJournal { - return PeerDisconnectedJournal{entry: newEntry(peerID, author), Data: data} + return PeerDisconnectedJournal{entry: newEntry(peerID, component), Data: data} } // ReqQueued @@ -120,9 +186,9 @@ type ReqQueuedData struct { } func NewReqQueuedJournal( - peerID string, author JournalAuthor, data ReqQueuedData, + peerID string, component component.Component, data ReqQueuedData, ) ReqQueuedJournal { - return ReqQueuedJournal{entry: newEntry(peerID, author), Data: data} + return ReqQueuedJournal{entry: newEntry(peerID, component), Data: data} } // CloseQueued @@ -140,9 +206,9 @@ type CloseQueuedData struct { } func NewCloseQueuedJournal( - peerID string, author JournalAuthor, data CloseQueuedData, + peerID string, component component.Component, data CloseQueuedData, ) CloseQueuedJournal { - return CloseQueuedJournal{entry: newEntry(peerID, author), Data: data} + return CloseQueuedJournal{entry: newEntry(peerID, component), Data: data} } // ReqSendOutcome @@ -163,9 +229,9 @@ type ReqSendOutcomeData struct { } func NewReqSendOutcomeJournal( - peerID string, author JournalAuthor, data ReqSendOutcomeData, + peerID string, component component.Component, data ReqSendOutcomeData, ) ReqSendOutcomeJournal { - return ReqSendOutcomeJournal{entry: newEntry(peerID, author), Data: data} + return ReqSendOutcomeJournal{entry: newEntry(peerID, component), Data: data} } // CloseSendOutcome @@ -186,9 +252,9 @@ type CloseSendOutcomeData struct { } func NewCloseSendOutcomeJournal( - peerID string, author JournalAuthor, data CloseSendOutcomeData, + peerID string, component component.Component, data CloseSendOutcomeData, ) CloseSendOutcomeJournal { - return CloseSendOutcomeJournal{entry: newEntry(peerID, author), Data: data} + return CloseSendOutcomeJournal{entry: newEntry(peerID, component), Data: data} } // ReceivedEOSE @@ -204,9 +270,9 @@ type ReceivedEOSEData struct { } func NewReceivedEOSEJournal( - peerID string, author JournalAuthor, data ReceivedEOSEData, + peerID string, component component.Component, data ReceivedEOSEData, ) ReceivedEOSEJournal { - return ReceivedEOSEJournal{entry: newEntry(peerID, author), Data: data} + return ReceivedEOSEJournal{entry: newEntry(peerID, component), Data: data} } // MissedEOSE @@ -222,9 +288,9 @@ type MissedEOSEData struct { } func NewMissedEOSEJournal( - peerID string, author JournalAuthor, data MissedEOSEData, + peerID string, component component.Component, data MissedEOSEData, ) MissedEOSEJournal { - return MissedEOSEJournal{entry: newEntry(peerID, author), Data: data} + return MissedEOSEJournal{entry: newEntry(peerID, component), Data: data} } // ReceivedClosed @@ -241,9 +307,9 @@ type ReceivedClosedData struct { } func NewReceivedClosedJournal( - peerID string, author JournalAuthor, data ReceivedClosedData, + peerID string, component component.Component, data ReceivedClosedData, ) ReceivedClosedJournal { - return ReceivedClosedJournal{entry: newEntry(peerID, author), Data: data} + return ReceivedClosedJournal{entry: newEntry(peerID, component), Data: data} } // ReqClosed @@ -259,7 +325,7 @@ type ReqClosedData struct { } func NewReqClosedJournal( - peerID string, author JournalAuthor, data ReqClosedData, + peerID string, component component.Component, data ReqClosedData, ) ReqClosedJournal { - return ReqClosedJournal{entry: newEntry(peerID, author), Data: data} + return ReqClosedJournal{entry: newEntry(peerID, component), Data: data} } diff --git a/journal_test.go b/journal_test.go new file mode 100644 index 0000000..1234767 --- /dev/null +++ b/journal_test.go @@ -0,0 +1,165 @@ +package prism + +import ( + "context" + "git.wisehodl.dev/jay/go-mana-component" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +type testJournalEntry struct { + *entry +} + +func newTestEntry(peerID string, comp component.Component) JournalEntry { + return &testJournalEntry{entry: newEntry(peerID, comp)} +} + +func TestJournalCollector_SingleProducer(t *testing.T) { + jc := NewJournalCollector() + ch := make(chan JournalEntry, 10) + jc.Enroll(ch) + + ctx := component.MustNew(context.Background(), "test", "emitter") + comp, _ := component.Get(ctx) + e1 := newTestEntry("peer1", comp) + e2 := newTestEntry("peer2", comp) + + ch <- e1 + ch <- e2 + close(ch) + + var received []JournalEntry + out := jc.Out() + + // Wait for entries + Eventually(t, func() bool { + select { + case e := <-out: + received = append(received, e) + default: + } + return len(received) == 2 + }, "should receive all entries") +} + +func TestJournalCollector_MultipleProducers(t *testing.T) { + jc := NewJournalCollector() + ch1 := make(chan JournalEntry, 5) + ch2 := make(chan JournalEntry, 5) + jc.Enroll(ch1) + jc.Enroll(ch2) + + ctx := component.MustNew(context.Background(), "test", "emitter") + comp, _ := component.Get(ctx) + + ch1 <- newTestEntry("p1", comp) + ch2 <- newTestEntry("p2", comp) + ch1 <- newTestEntry("p3", comp) + ch2 <- newTestEntry("p4", comp) + + close(ch1) + close(ch2) + + count := 0 + out := jc.Out() + Eventually(t, func() bool { + select { + case <-out: + count++ + default: + } + return count == 4 + }, "should merge entries from all producers") +} + +func TestJournalCollector_EnrollAfterClose(t *testing.T) { + jc := NewJournalCollector() + jc.Close() + + ch := make(chan JournalEntry) + err := jc.Enroll(ch) + assert.Error(t, err) + assert.Contains(t, err.Error(), "closing") +} + +func TestJournalCollector_CloseBlocks(t *testing.T) { + jc := NewJournalCollector() + ch := make(chan JournalEntry) + jc.Enroll(ch) + + closed := make(chan struct{}) + go func() { + jc.Close() + close(closed) + }() + + // Output (Out()) should still be open because the producer (ch) is open + select { + case <-jc.Out(): + t.Fatal("output channel closed prematurely") + case <-time.After(NegativeTestTimeout): + } + + // Output should not be reached yet + select { + case <-closed: + t.Fatal("Close() returned before producer closed") + default: + } + + close(ch) + + Eventually(t, func() bool { + select { + case _, ok := <-jc.Out(): + return !ok + default: + return false + } + }, "Out() should close after all producers close") + + Eventually(t, func() bool { + select { + case <-closed: + return true + default: + return false + } + }, "Close() should return after producers finish") +} + +func TestJournalCollector_ComponentIdentity(t *testing.T) { + jc := NewJournalCollector() + ch := make(chan JournalEntry, 1) + jc.Enroll(ch) + + mod := "test-mod" + path := "a.b.c" + ctx := component.MustNew(context.Background(), mod, path) + comp, _ := component.Get(ctx) + + entry := newTestEntry("peer-id", comp) + ch <- entry + close(ch) + + out := jc.Out() + var received JournalEntry + Eventually(t, func() bool { + select { + case e := <-out: + received = e + return true + default: + return false + } + }, "should receive the entry") + + typed, ok := received.(*testJournalEntry) + assert.True(t, ok, "should be correct concrete type") + assert.Equal(t, mod, typed.Author().Module()) + assert.Equal(t, path, typed.Author().PathString()) + + jc.Close() +} diff --git a/util.go b/util.go new file mode 100644 index 0000000..dc891c7 --- /dev/null +++ b/util.go @@ -0,0 +1,34 @@ +package prism + +func bufferedPipe[T any](input <-chan T, output chan<- T) { + var buffer []T + + for { + var outOrNil chan<- T + var next T + + // toggle send channel + if len(buffer) > 0 { + outOrNil = output + next = buffer[0] + } else if input == nil { + // input closed + return + } + + select { + case item, ok := <-input: + if !ok { + // input is closed, set input nil + input = nil + continue + } + buffer = append(buffer, item) + case outOrNil <- next: + // discard element, set to zero to free memory + var zero T + buffer[0] = zero + buffer = buffer[1:] + } + } +}