wrote journal collector

This commit is contained in:
Jay
2026-05-09 18:16:48 -04:00
parent e909e140a8
commit f96e872e4b
3 changed files with 309 additions and 44 deletions
+104 -38
View File
@@ -1,6 +1,8 @@
package prism package prism
import ( import (
"fmt"
"git.wisehodl.dev/jay/go-mana-component"
"sync" "sync"
"time" "time"
) )
@@ -9,19 +11,11 @@ import (
// Types // Types
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
type JournalAuthor string
const (
AuthorEmbassy JournalAuthor = "prism.embassy"
AuthorReqManager JournalAuthor = "prism.req_manager"
AuthorStreamReq JournalAuthor = "prism.stream_req"
AuthorQueryReq JournalAuthor = "prism.query_req"
)
// JournalCollector // JournalCollector
type JournalCollector struct { type JournalCollector struct {
entries chan JournalEntry out chan JournalEntry
buffer chan JournalEntry
mu sync.Mutex mu sync.Mutex
wg sync.WaitGroup wg sync.WaitGroup
closing bool closing bool
@@ -32,45 +26,117 @@ type JournalCollector struct {
type JournalEntry interface { type JournalEntry interface {
PeerID() string PeerID() string
SealedAt() time.Time SealedAt() time.Time
Author() JournalAuthor Author() component.Component
} }
type entry struct { type entry struct {
peerID string peerID string
sealedAt time.Time sealedAt time.Time
author JournalAuthor component component.Component
} }
func (e *entry) PeerID() string { return e.peerID } func (e *entry) PeerID() string { return e.peerID }
func (e *entry) SealedAt() time.Time { return e.sealedAt } func (e *entry) SealedAt() time.Time { return e.sealedAt }
func (e *entry) Author() JournalAuthor { return e.author } func (e *entry) Author() component.Component { return e.component }
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
// Journal Collector // Journal Collector
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
func NewJournalCollector() *JournalCollector { func NewJournalCollector() *JournalCollector {
c := &JournalCollector{
out: make(chan JournalEntry),
buffer: make(chan JournalEntry, 1024),
}
go func() {
bufferedPipe(c.buffer, c.out)
close(c.out)
}()
return c
}
func (c *JournalCollector) Enroll(ch <-chan JournalEntry) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.closing {
return fmt.Errorf("journal collector is closing")
}
c.wg.Add(1)
go func() {
defer c.wg.Done()
for e := range ch {
c.buffer <- e
}
}()
return nil return nil
} }
func (c *JournalCollector) Enroll(ch <-chan JournalEntry) {} func (c *JournalCollector) Close() {
c.mu.Lock()
if c.closing {
c.mu.Unlock()
return
}
c.closing = true
c.mu.Unlock()
func (c *JournalCollector) Close() {} c.wg.Wait()
close(c.buffer)
}
func (c *JournalCollector) Entries() {} func (c *JournalCollector) Out() <-chan JournalEntry { return c.out }
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
// Journal Entries // Journal Entries
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
func newEntry(peerID string, author JournalAuthor) *entry { func newEntry(peerID string, component component.Component) *entry {
return &entry{ return &entry{
peerID: peerID, peerID: peerID,
author: author, component: component,
sealedAt: time.Now(), sealedAt: time.Now(),
} }
} }
// PeerAdded
type PeerAddedJournal struct {
*entry
Data PeerAddedData
}
type PeerAddedData struct {
At time.Time
}
func NewPeerAddedJournal(
peerID string, component component.Component, data PeerAddedData,
) PeerAddedJournal {
return PeerAddedJournal{entry: newEntry(peerID, component), Data: data}
}
// PeerRemoved
type PeerRemovedJournal struct {
*entry
Data PeerRemovedData
}
type PeerRemovedData struct {
At time.Time
}
func NewPeerRemovedJournal(
peerID string, component component.Component, data PeerRemovedData,
) PeerRemovedJournal {
return PeerRemovedJournal{entry: newEntry(peerID, component), Data: data}
}
// PeerConnected // PeerConnected
type PeerConnectedJournal struct { type PeerConnectedJournal struct {
@@ -83,9 +149,9 @@ type PeerConnectedData struct {
} }
func NewPeerConnectedJournal( func NewPeerConnectedJournal(
peerID string, author JournalAuthor, data PeerConnectedData, peerID string, component component.Component, data PeerConnectedData,
) PeerConnectedJournal { ) PeerConnectedJournal {
return PeerConnectedJournal{entry: newEntry(peerID, author), Data: data} return PeerConnectedJournal{entry: newEntry(peerID, component), Data: data}
} }
// PeerDisconnected // PeerDisconnected
@@ -100,9 +166,9 @@ type PeerDisconnectedData struct {
} }
func NewPeerDisconnectedJournal( func NewPeerDisconnectedJournal(
peerID string, author JournalAuthor, data PeerDisconnectedData, peerID string, component component.Component, data PeerDisconnectedData,
) PeerDisconnectedJournal { ) PeerDisconnectedJournal {
return PeerDisconnectedJournal{entry: newEntry(peerID, author), Data: data} return PeerDisconnectedJournal{entry: newEntry(peerID, component), Data: data}
} }
// ReqQueued // ReqQueued
@@ -120,9 +186,9 @@ type ReqQueuedData struct {
} }
func NewReqQueuedJournal( func NewReqQueuedJournal(
peerID string, author JournalAuthor, data ReqQueuedData, peerID string, component component.Component, data ReqQueuedData,
) ReqQueuedJournal { ) ReqQueuedJournal {
return ReqQueuedJournal{entry: newEntry(peerID, author), Data: data} return ReqQueuedJournal{entry: newEntry(peerID, component), Data: data}
} }
// CloseQueued // CloseQueued
@@ -140,9 +206,9 @@ type CloseQueuedData struct {
} }
func NewCloseQueuedJournal( func NewCloseQueuedJournal(
peerID string, author JournalAuthor, data CloseQueuedData, peerID string, component component.Component, data CloseQueuedData,
) CloseQueuedJournal { ) CloseQueuedJournal {
return CloseQueuedJournal{entry: newEntry(peerID, author), Data: data} return CloseQueuedJournal{entry: newEntry(peerID, component), Data: data}
} }
// ReqSendOutcome // ReqSendOutcome
@@ -163,9 +229,9 @@ type ReqSendOutcomeData struct {
} }
func NewReqSendOutcomeJournal( func NewReqSendOutcomeJournal(
peerID string, author JournalAuthor, data ReqSendOutcomeData, peerID string, component component.Component, data ReqSendOutcomeData,
) ReqSendOutcomeJournal { ) ReqSendOutcomeJournal {
return ReqSendOutcomeJournal{entry: newEntry(peerID, author), Data: data} return ReqSendOutcomeJournal{entry: newEntry(peerID, component), Data: data}
} }
// CloseSendOutcome // CloseSendOutcome
@@ -186,9 +252,9 @@ type CloseSendOutcomeData struct {
} }
func NewCloseSendOutcomeJournal( func NewCloseSendOutcomeJournal(
peerID string, author JournalAuthor, data CloseSendOutcomeData, peerID string, component component.Component, data CloseSendOutcomeData,
) CloseSendOutcomeJournal { ) CloseSendOutcomeJournal {
return CloseSendOutcomeJournal{entry: newEntry(peerID, author), Data: data} return CloseSendOutcomeJournal{entry: newEntry(peerID, component), Data: data}
} }
// ReceivedEOSE // ReceivedEOSE
@@ -204,9 +270,9 @@ type ReceivedEOSEData struct {
} }
func NewReceivedEOSEJournal( func NewReceivedEOSEJournal(
peerID string, author JournalAuthor, data ReceivedEOSEData, peerID string, component component.Component, data ReceivedEOSEData,
) ReceivedEOSEJournal { ) ReceivedEOSEJournal {
return ReceivedEOSEJournal{entry: newEntry(peerID, author), Data: data} return ReceivedEOSEJournal{entry: newEntry(peerID, component), Data: data}
} }
// MissedEOSE // MissedEOSE
@@ -222,9 +288,9 @@ type MissedEOSEData struct {
} }
func NewMissedEOSEJournal( func NewMissedEOSEJournal(
peerID string, author JournalAuthor, data MissedEOSEData, peerID string, component component.Component, data MissedEOSEData,
) MissedEOSEJournal { ) MissedEOSEJournal {
return MissedEOSEJournal{entry: newEntry(peerID, author), Data: data} return MissedEOSEJournal{entry: newEntry(peerID, component), Data: data}
} }
// ReceivedClosed // ReceivedClosed
@@ -241,9 +307,9 @@ type ReceivedClosedData struct {
} }
func NewReceivedClosedJournal( func NewReceivedClosedJournal(
peerID string, author JournalAuthor, data ReceivedClosedData, peerID string, component component.Component, data ReceivedClosedData,
) ReceivedClosedJournal { ) ReceivedClosedJournal {
return ReceivedClosedJournal{entry: newEntry(peerID, author), Data: data} return ReceivedClosedJournal{entry: newEntry(peerID, component), Data: data}
} }
// ReqClosed // ReqClosed
@@ -259,7 +325,7 @@ type ReqClosedData struct {
} }
func NewReqClosedJournal( func NewReqClosedJournal(
peerID string, author JournalAuthor, data ReqClosedData, peerID string, component component.Component, data ReqClosedData,
) ReqClosedJournal { ) ReqClosedJournal {
return ReqClosedJournal{entry: newEntry(peerID, author), Data: data} return ReqClosedJournal{entry: newEntry(peerID, component), Data: data}
} }
+165
View File
@@ -0,0 +1,165 @@
package prism
import (
"context"
"git.wisehodl.dev/jay/go-mana-component"
"github.com/stretchr/testify/assert"
"testing"
"time"
)
type testJournalEntry struct {
*entry
}
func newTestEntry(peerID string, comp component.Component) JournalEntry {
return &testJournalEntry{entry: newEntry(peerID, comp)}
}
func TestJournalCollector_SingleProducer(t *testing.T) {
jc := NewJournalCollector()
ch := make(chan JournalEntry, 10)
jc.Enroll(ch)
ctx := component.MustNew(context.Background(), "test", "emitter")
comp, _ := component.Get(ctx)
e1 := newTestEntry("peer1", comp)
e2 := newTestEntry("peer2", comp)
ch <- e1
ch <- e2
close(ch)
var received []JournalEntry
out := jc.Out()
// Wait for entries
Eventually(t, func() bool {
select {
case e := <-out:
received = append(received, e)
default:
}
return len(received) == 2
}, "should receive all entries")
}
func TestJournalCollector_MultipleProducers(t *testing.T) {
jc := NewJournalCollector()
ch1 := make(chan JournalEntry, 5)
ch2 := make(chan JournalEntry, 5)
jc.Enroll(ch1)
jc.Enroll(ch2)
ctx := component.MustNew(context.Background(), "test", "emitter")
comp, _ := component.Get(ctx)
ch1 <- newTestEntry("p1", comp)
ch2 <- newTestEntry("p2", comp)
ch1 <- newTestEntry("p3", comp)
ch2 <- newTestEntry("p4", comp)
close(ch1)
close(ch2)
count := 0
out := jc.Out()
Eventually(t, func() bool {
select {
case <-out:
count++
default:
}
return count == 4
}, "should merge entries from all producers")
}
func TestJournalCollector_EnrollAfterClose(t *testing.T) {
jc := NewJournalCollector()
jc.Close()
ch := make(chan JournalEntry)
err := jc.Enroll(ch)
assert.Error(t, err)
assert.Contains(t, err.Error(), "closing")
}
func TestJournalCollector_CloseBlocks(t *testing.T) {
jc := NewJournalCollector()
ch := make(chan JournalEntry)
jc.Enroll(ch)
closed := make(chan struct{})
go func() {
jc.Close()
close(closed)
}()
// Output (Out()) should still be open because the producer (ch) is open
select {
case <-jc.Out():
t.Fatal("output channel closed prematurely")
case <-time.After(NegativeTestTimeout):
}
// Output should not be reached yet
select {
case <-closed:
t.Fatal("Close() returned before producer closed")
default:
}
close(ch)
Eventually(t, func() bool {
select {
case _, ok := <-jc.Out():
return !ok
default:
return false
}
}, "Out() should close after all producers close")
Eventually(t, func() bool {
select {
case <-closed:
return true
default:
return false
}
}, "Close() should return after producers finish")
}
func TestJournalCollector_ComponentIdentity(t *testing.T) {
jc := NewJournalCollector()
ch := make(chan JournalEntry, 1)
jc.Enroll(ch)
mod := "test-mod"
path := "a.b.c"
ctx := component.MustNew(context.Background(), mod, path)
comp, _ := component.Get(ctx)
entry := newTestEntry("peer-id", comp)
ch <- entry
close(ch)
out := jc.Out()
var received JournalEntry
Eventually(t, func() bool {
select {
case e := <-out:
received = e
return true
default:
return false
}
}, "should receive the entry")
typed, ok := received.(*testJournalEntry)
assert.True(t, ok, "should be correct concrete type")
assert.Equal(t, mod, typed.Author().Module())
assert.Equal(t, path, typed.Author().PathString())
jc.Close()
}
+34
View File
@@ -0,0 +1,34 @@
package prism
func bufferedPipe[T any](input <-chan T, output chan<- T) {
var buffer []T
for {
var outOrNil chan<- T
var next T
// toggle send channel
if len(buffer) > 0 {
outOrNil = output
next = buffer[0]
} else if input == nil {
// input closed
return
}
select {
case item, ok := <-input:
if !ok {
// input is closed, set input nil
input = nil
continue
}
buffer = append(buffer, item)
case outOrNil <- next:
// discard element, set to zero to free memory
var zero T
buffer[0] = zero
buffer = buffer[1:]
}
}
}