refactor to peer-centric architecture

This commit is contained in:
Jay
2026-05-13 16:51:09 -04:00
parent 49ce2eb2ac
commit a096450fc7
17 changed files with 1063 additions and 3959 deletions
+1
View File
@@ -0,0 +1 @@
draft
-399
View File
@@ -1,399 +0,0 @@
package prism
import (
"context"
"fmt"
"git.wisehodl.dev/jay/go-honeybee"
"git.wisehodl.dev/jay/go-mana-component"
"log/slog"
"sync"
"time"
)
// ----------------------------------------------------------------------------
// Types
// ----------------------------------------------------------------------------
type Envelope = []byte
type PoolSendFunc = func(id string, data Envelope) error
// Pool Plugins
type EmbassyPlugin struct {
Connect func(id string) error
Remove func(id string) error
Send PoolSendFunc
Events <-chan honeybee.OutboundPoolEvent
}
type HotelPlugin struct {
Add func(id string, socket honeybee.Socket) error
Replace func(id string, socket honeybee.Socket) error
Remove func(id string) error
Send PoolSendFunc
Events <-chan honeybee.InboundPoolEvent
}
// Events
type PoolEventKind = int
const (
EventConnected PoolEventKind = iota
EventDisconnected
EventAdded
EventRemoved
)
type PoolEvent struct {
ID string
Kind PoolEventKind
At time.Time
}
func NewPoolEvent(id string, kind PoolEventKind, at time.Time) PoolEvent {
return PoolEvent{ID: id, Kind: kind, At: at}
}
var convertPoolEvent = map[honeybee.OutboundPoolEventKind]PoolEventKind{
honeybee.OutboundEventConnected: EventConnected,
honeybee.OutboundEventDisconnected: EventDisconnected,
}
// Adapter
type Adapter interface {
Peers() []string
HasPeer(id string) (string, bool)
IsConnected(id string) bool
Subscribe() <-chan PoolEvent
Send(id string, data Envelope) error
}
// Embassy
type Embassy struct {
pool EmbassyPlugin
peers map[string]bool // peerID: isConnected
journals chan JournalEntry
eventSubs []chan PoolEvent
ctx context.Context
cancel context.CancelFunc
mu sync.RWMutex
wg sync.WaitGroup
logger *slog.Logger
}
// Hotel
type Hotel struct {
pool HotelPlugin
peers map[string]bool // peerID: isConnected
journals chan JournalEntry
eventSubs []chan PoolEvent
ctx context.Context
cancel context.CancelFunc
mu sync.RWMutex
wg sync.WaitGroup
logger *slog.Logger
}
// ----------------------------------------------------------------------------
// Embassy (Outbound Adapter)
// ----------------------------------------------------------------------------
func NewEmbassy(
ctx context.Context,
pool EmbassyPlugin,
collector *JournalCollector,
handler slog.Handler,
) *Embassy {
ctx, cancel := context.WithCancel(
component.MustNew(ctx, "prism", "embassy"))
e := &Embassy{
pool: pool,
peers: make(map[string]bool),
eventSubs: make([]chan PoolEvent, 0),
ctx: ctx,
cancel: cancel,
}
if collector != nil {
e.journals = make(chan JournalEntry, 16)
collector.Enroll(e.journals)
}
if handler != nil {
c := component.FromContext(ctx)
e.logger = slog.New(handler).With(slog.Any("component", c))
}
e.wg.Add(1)
go e.runEventRouter()
return e
}
func (e *Embassy) Dispatch(url string) error {
url, err := honeybee.NormalizeURL(url)
if err != nil {
return fmt.Errorf("invalid url: %s", url)
}
if err := e.pool.Connect(url); err != nil {
return fmt.Errorf("dispatch: %w", err)
}
e.mu.Lock()
e.peers[url] = false
subs := e.eventSubs
e.mu.Unlock()
at := time.Now()
if e.journals != nil {
c := component.FromContext(e.ctx)
select {
case <-e.ctx.Done():
return fmt.Errorf("closing")
case e.journals <- NewPeerAddedJournal(url, c, PeerAddedData{At: at}):
}
}
for _, ch := range subs {
select {
case <-e.ctx.Done():
return fmt.Errorf("closing")
case ch <- NewPoolEvent(url, EventAdded, at):
}
}
return nil
}
func (e *Embassy) Dismiss(url string) error {
url, err := honeybee.NormalizeURL(url)
if err != nil {
return fmt.Errorf("invalid url: %s", url)
}
if err := e.pool.Remove(url); err != nil {
return fmt.Errorf("dismiss: %w", err)
}
e.mu.Lock()
delete(e.peers, url)
subs := e.eventSubs
e.mu.Unlock()
at := time.Now()
if e.journals != nil {
c := component.FromContext(e.ctx)
select {
case <-e.ctx.Done():
return fmt.Errorf("closing")
case e.journals <- NewPeerRemovedJournal(url, c, PeerRemovedData{At: at}):
}
}
for _, ch := range subs {
select {
case <-e.ctx.Done():
return fmt.Errorf("closing")
case ch <- NewPoolEvent(url, EventRemoved, at):
}
}
return nil
}
func (e *Embassy) Close() {
e.mu.Lock()
peers := e.peers
e.mu.Unlock()
// dismiss peers
for peer, _ := range peers {
e.Dismiss(peer)
}
e.cancel()
e.wg.Wait()
e.mu.Lock()
// reset peers after dismissal
e.peers = make(map[string]bool)
subs := e.eventSubs
e.eventSubs = make([]chan PoolEvent, 0)
e.mu.Unlock()
// close subs
for _, sub := range subs {
close(sub)
}
if e.journals != nil {
close(e.journals)
}
}
func (e *Embassy) Peers() []string {
e.mu.RLock()
defer e.mu.RUnlock()
peers := make([]string, 0, len(e.peers))
for p, _ := range e.peers {
peers = append(peers, p)
}
return peers
}
func (e *Embassy) HasPeer(url string) (string, bool) {
url, err := honeybee.NormalizeURL(url)
if err != nil {
return "", false
}
e.mu.RLock()
defer e.mu.RUnlock()
_, ok := e.peers[url]
return url, ok
}
func (e *Embassy) IsConnected(url string) bool {
url, err := honeybee.NormalizeURL(url)
if err != nil {
return false
}
e.mu.RLock()
defer e.mu.RUnlock()
connected, _ := e.peers[url]
return connected
}
func (e *Embassy) Subscribe() <-chan PoolEvent {
e.mu.Lock()
defer e.mu.Unlock()
ch := make(chan PoolEvent, 16)
e.eventSubs = append(e.eventSubs, ch)
return ch
}
func (e *Embassy) Send(id string, data Envelope) error {
return nil
}
// Internal
func (e *Embassy) runEventRouter() {
defer e.wg.Done()
for {
select {
case <-e.ctx.Done():
return
case ev, ok := <-e.pool.Events:
if !ok {
return
}
url, err := honeybee.NormalizeURL(ev.ID)
if err != nil {
continue
}
if _, ok := e.HasPeer(url); !ok {
continue
}
kind := convertPoolEvent[ev.Kind]
e.mu.Lock()
switch kind {
case EventConnected:
e.peers[url] = true
case EventDisconnected:
e.peers[url] = false
}
subs := e.eventSubs
canJournal := e.journals != nil
e.mu.Unlock()
if canJournal {
switch kind {
case EventConnected:
c := component.FromContext(e.ctx)
select {
case <-e.ctx.Done():
case e.journals <- NewPeerConnectedJournal(
url, c, PeerConnectedData{At: ev.At}):
}
case EventDisconnected:
c := component.FromContext(e.ctx)
select {
case <-e.ctx.Done():
case e.journals <- NewPeerDisconnectedJournal(
url, c, PeerDisconnectedData{At: ev.At}):
}
}
}
for _, ch := range subs {
select {
case <-e.ctx.Done():
case ch <- NewPoolEvent(url, kind, ev.At):
}
}
}
}
}
// ----------------------------------------------------------------------------
// Hotel (Inbound Adapter)
// ----------------------------------------------------------------------------
func NewHotel() *Hotel {
return nil
}
func (h *Hotel) Welcome(id string, socket honeybee.Socket) error {
return nil
}
func (h *Hotel) WelcomeBack(id string, socket honeybee.Socket) error {
return nil
}
func (h *Hotel) Farewell(id string) error {
return nil
}
func (h *Hotel) Close() {}
func (h *Hotel) Peers() []string {
return nil
}
func (h *Hotel) HasPeer(id string) (string, bool) {
return "", false
}
func (h *Hotel) IsConnected(id string) bool {
return false
}
func (h *Hotel) Subscribe() <-chan PoolEvent {
return nil
}
func (h *Hotel) Send(id string, data Envelope) error {
return nil
}
-209
View File
@@ -1,209 +0,0 @@
package prism
import (
"context"
"errors"
"fmt"
"git.wisehodl.dev/jay/go-honeybee"
"git.wisehodl.dev/jay/go-mana-component"
"git.wisehodl.dev/jay/go-roots-ws"
"log/slog"
"sync"
"time"
)
// ----------------------------------------------------------------------------
// Errors
// ----------------------------------------------------------------------------
var (
ErrAlreadyStarted = errors.New("clerk already started")
ErrUnknownLabel = errors.New("unknown label")
)
// ----------------------------------------------------------------------------
// Types
// ----------------------------------------------------------------------------
// Letters
type InboundLetter struct {
ID string
Data Envelope
At time.Time
}
// Clerk
type Clerk struct {
inbox <-chan honeybee.InboxMessage
// wiring phase
mu sync.Mutex
started bool
pending []clerkSub
known map[string]struct{}
// runtime phase
routes clerkRoutes
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
logger *slog.Logger
}
type clerkSub struct {
ch chan InboundLetter
labels map[string]struct{}
}
type clerkRoutes = map[string][]chan InboundLetter
// ----------------------------------------------------------------------------
// Clerk
// ----------------------------------------------------------------------------
func NewClerk(
ctx context.Context,
inbox <-chan honeybee.InboxMessage,
knownLabels map[string]struct{},
handler slog.Handler,
) *Clerk {
ctx, cancel := context.WithCancel(
component.MustNew(ctx, "prism", "clerk"))
known := make(map[string]struct{}, len(knownLabels))
for label := range knownLabels {
known[label] = struct{}{}
}
c := &Clerk{
inbox: inbox,
known: known,
ctx: ctx,
cancel: cancel,
}
if handler != nil {
comp := component.FromContext(ctx)
c.logger = slog.New(handler).With(slog.Any("component", comp))
}
return c
}
func (c *Clerk) Subscribe(
labels map[string]struct{},
buffer int,
) (<-chan InboundLetter, error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.started {
return nil, ErrAlreadyStarted
}
for label := range labels {
if _, ok := c.known[label]; !ok {
return nil, fmt.Errorf("%w: %s", ErrUnknownLabel, label)
}
}
subLabels := make(map[string]struct{}, len(labels))
for label := range labels {
subLabels[label] = struct{}{}
}
ch := make(chan InboundLetter, buffer)
c.pending = append(c.pending, clerkSub{ch: ch, labels: subLabels})
return ch, nil
}
func (c *Clerk) Start() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.started {
return ErrAlreadyStarted
}
routes := make(clerkRoutes, len(c.known))
for _, sub := range c.pending {
for label := range sub.labels {
routes[label] = append(routes[label], sub.ch)
}
}
c.routes = routes
c.started = true
c.wg.Add(1)
go c.run()
return nil
}
func (c *Clerk) Close() {
c.cancel()
c.wg.Wait()
c.mu.Lock()
defer c.mu.Unlock()
for _, sub := range c.pending {
close(sub.ch)
}
// prevent double channel closes if Close() is called twice
c.pending = nil
}
func (c *Clerk) run() {
defer c.wg.Done()
for {
select {
case <-c.ctx.Done():
return
case msg, ok := <-c.inbox:
if !ok {
// inbox closed externally, close clerk
c.cancel()
return
}
labelBytes, err := envelope.GetLabel(msg.Data)
if err != nil {
if c.logger != nil {
c.logger.Warn("invalid envelope",
"peer_id", msg.ID,
"received_at", msg.ReceivedAt,
)
}
continue
}
subs, ok := c.routes[string(labelBytes)]
if !ok {
continue
}
letter := InboundLetter{
ID: msg.ID,
Data: msg.Data,
At: msg.ReceivedAt,
}
for _, ch := range subs {
select {
case ch <- letter:
case <-c.ctx.Done():
return
}
}
}
}
}
-171
View File
@@ -1,171 +0,0 @@
package prism
import (
"context"
"git.wisehodl.dev/jay/go-honeybee"
"github.com/stretchr/testify/assert"
"testing"
"time"
)
// ----------------------------------------------------------------------------
// Helpers
// ----------------------------------------------------------------------------
func mockInbox() (chan honeybee.InboxMessage, func(label string)) {
ch := make(chan honeybee.InboxMessage, 8)
inject := func(label string) {
ch <- honeybee.InboxMessage{
ID: "wss://test",
Data: []byte(`["` + label + `","payload"]`),
ReceivedAt: time.Now(),
}
}
return ch, inject
}
func makeClerk(inbox chan honeybee.InboxMessage) *Clerk {
known := map[string]struct{}{
"EVENT": {},
"EOSE": {},
"CLOSE": {},
}
return NewClerk(context.Background(), inbox, known, nil)
}
// ----------------------------------------------------------------------------
// Tests
// ----------------------------------------------------------------------------
func TestClerkRouting(t *testing.T) {
inbox, inject := mockInbox()
c := makeClerk(inbox)
subA, err := c.Subscribe(map[string]struct{}{"EVENT": {}}, 4)
assert.NoError(t, err)
subB, err := c.Subscribe(map[string]struct{}{"EVENT": {}, "EOSE": {}}, 4)
assert.NoError(t, err)
assert.NoError(t, c.Start())
inject("EVENT")
inject("EOSE")
// A receives exactly one letter (EVENT only)
Eventually(t, func() bool {
select {
case l := <-subA:
return string(l.Data) == `["EVENT","payload"]`
default:
return false
}
}, "subA should receive the EVENT letter")
Never(t, func() bool {
select {
case <-subA:
return true
default:
return false
}
}, "subA should receive no further letters")
// B receives two letters (EVENT and EOSE)
count := 0
Eventually(t, func() bool {
select {
case <-subB:
count++
default:
}
return count == 2
}, "subB should receive both letters")
}
func TestClerkStartup(t *testing.T) {
inbox, _ := mockInbox()
c := makeClerk(inbox)
assert.NoError(t, c.Start())
_, err := c.Subscribe(map[string]struct{}{"EVENT": {}}, 4)
assert.ErrorIs(t, err, ErrAlreadyStarted)
c.Close()
}
func TestClerkUnknownSubscriptionLabel(t *testing.T) {
inbox, _ := mockInbox()
c := makeClerk(inbox)
_, err := c.Subscribe(map[string]struct{}{"UNKNOWN": {}}, 4)
assert.ErrorIs(t, err, ErrUnknownLabel)
}
func TestClerkUnknownInboxLabel(t *testing.T) {
inbox, inject := mockInbox()
c := makeClerk(inbox)
// subscribe to every known label
sub, err := c.Subscribe(
map[string]struct{}{"EVENT": {}, "EOSE": {}, "CLOSE": {}}, 4)
assert.NoError(t, err)
assert.NoError(t, c.Start())
// inject a valid nostr label, but is not in the test label set
inject("NOTICE")
Never(t, func() bool {
select {
case <-sub:
return true
default:
return false
}
}, "no subscriber should receive an unknown label")
}
func TestClerkInboxClose(t *testing.T) {
inbox, _ := mockInbox()
c := makeClerk(inbox)
sub, err := c.Subscribe(map[string]struct{}{"EVENT": {}}, 4)
assert.NoError(t, err)
assert.NoError(t, c.Start())
// close the inbox as the pool would on shutdown
close(inbox)
// internal waitgroup should clear
Eventually(t, func() bool {
c.wg.Wait()
return true
}, "wg should clear")
// subscriptions remain open. Close() must be called to completely shut down
Never(t, func() bool {
_, ok := <-sub
return !ok
}, "sub should remain open")
}
func TestClerkClose(t *testing.T) {
inbox, _ := mockInbox()
c := makeClerk(inbox)
subA, err := c.Subscribe(map[string]struct{}{"EVENT": {}}, 4)
assert.NoError(t, err)
subB, err := c.Subscribe(map[string]struct{}{"EOSE": {}}, 4)
assert.NoError(t, err)
assert.NoError(t, c.Start())
c.Close()
Eventually(t, func() bool {
_, okA := <-subA
_, okB := <-subB
return !okA && !okB
}, "all subscriber channels should be closed after Close()")
}
-251
View File
@@ -1,251 +0,0 @@
package prism
import (
"context"
"fmt"
"git.wisehodl.dev/jay/go-mana-component"
"github.com/stretchr/testify/assert"
"sync/atomic"
"testing"
"time"
)
// Helpers
func newTestLetter(ctx context.Context, id uint64) OutboundLetter {
ctx, cancel := context.WithCancel(
component.MustExtend(ctx, "test_letter"))
return OutboundLetter{
id: id,
peerID: "wss://test",
data: []byte("[]"),
ctx: ctx,
cancel: cancel,
}
}
// Tests
func TestCourierSendsAfterConnect(t *testing.T) {
ctx := component.MustNew(context.Background(), "prism", "test")
var sendCount atomic.Uint32
sendFunc := func(data Envelope) error {
sendCount.Add(1)
return nil
}
c := NewCourier(ctx, sendFunc, nil)
called := make(chan LetterOutcome, 1)
c.Enqueue(newTestLetter(ctx, 1), func(o LetterOutcome) { called <- o })
Never(t, func() bool { return sendCount.Load() > 0 },
"should not have sent while disconnected")
c.HandleConnect()
Eventually(t, func() bool { return sendCount.Load() > 0 },
"should have sent after connect")
var outcome LetterOutcome
Eventually(t, func() bool {
select {
default:
return false
case outcome = <-called:
return true
}
}, "should have returned outcome")
assert.Equal(t, uint64(1), outcome.LetterID)
assert.Equal(t, "wss://test", outcome.PeerID)
assert.Equal(t, OutcomeSent, outcome.Kind)
assert.False(t, outcome.SentAt.IsZero())
assert.True(t, outcome.MissedAt.IsZero())
assert.Equal(t, 0, outcome.Retries)
}
func TestCourierMultipleSends(t *testing.T) {
ctx := component.MustNew(context.Background(), "prism", "test")
var sendCount atomic.Uint32
sendFunc := func(data Envelope) error {
sendCount.Add(1)
return nil
}
c := NewCourier(ctx, sendFunc, nil)
c.HandleConnect()
outcomes := make([]LetterOutcome, 0, 2)
called := make(chan LetterOutcome, 2)
c.Enqueue(newTestLetter(ctx, 1), func(o LetterOutcome) { called <- o })
c.Enqueue(newTestLetter(ctx, 2), func(o LetterOutcome) { called <- o })
Eventually(t, func() bool { return sendCount.Load() == 2 },
"should have sent letters")
Eventually(t, func() bool {
select {
default:
return false
case o := <-called:
outcomes = append(outcomes, o)
return len(outcomes) == 2
}
}, "should have returned 2 outcomes")
// callbacks are called in goroutines and may arrive out of order
assert.Equal(t, OutcomeSent, outcomes[0].Kind)
assert.Equal(t, OutcomeSent, outcomes[1].Kind)
}
func TestCourierSkipsCancelledLetter(t *testing.T) {
ctx := component.MustNew(context.Background(), "prism", "test")
var sendCount atomic.Uint32
sendFunc := func(data Envelope) error {
sendCount.Add(1)
return nil
}
c := NewCourier(ctx, sendFunc, nil)
c.HandleConnect()
l := newTestLetter(ctx, 1)
l.cancel()
called := make(chan LetterOutcome, 1)
c.Enqueue(l, func(o LetterOutcome) { called <- o })
var outcome LetterOutcome
Eventually(t, func() bool {
select {
default:
return false
case outcome = <-called:
return true
}
}, "should have returned outcome")
assert.Equal(t, OutcomeCancelled, outcome.Kind)
}
func TestCourierRetryOnFailure(t *testing.T) {
ctx := component.MustNew(context.Background(), "prism", "test")
var sendCount atomic.Uint32
sendFunc := func(data Envelope) error {
sendCount.Add(1)
if sendCount.Load() < 3 {
return fmt.Errorf("transient failure")
}
return nil
}
c := NewCourier(ctx, sendFunc, nil)
c.HandleConnect()
called := make(chan LetterOutcome, 1)
c.Enqueue(newTestLetter(ctx, 1), func(o LetterOutcome) { called <- o })
Eventually(t, func() bool { return sendCount.Load() > 0 },
"should send eventually")
var outcome LetterOutcome
Eventually(t, func() bool {
select {
default:
return false
case outcome = <-called:
return true
}
}, "should have returned outcome")
assert.Equal(t, OutcomeSent, outcome.Kind)
assert.Equal(t, 2, outcome.Retries)
}
func TestCourierPauseOnDisconnect(t *testing.T) {
ctx := component.MustNew(context.Background(), "prism", "test")
var sendCount atomic.Uint32
var gate atomic.Bool
gate.Store(false)
sendFunc := func(data Envelope) error {
// gated send
if gate.Load() {
sendCount.Add(1)
return nil
}
return fmt.Errorf("gate is closed")
}
c := NewCourier(ctx, sendFunc, nil)
c.HandleConnect()
// queue a letter
called := make(chan LetterOutcome, 1)
c.Enqueue(newTestLetter(ctx, 1), func(o LetterOutcome) { called <- o })
// manually wait for letters to queue
time.Sleep(100 * time.Millisecond)
// manually wait for disconnect toggle
c.HandleDisconnect()
time.Sleep(100 * time.Millisecond)
// open gate
gate.Store(true)
// should never have sent in this time
Never(t, func() bool { return sendCount.Load() > 0 },
"should not have sent while disconnected")
// reconnect, gate is open, letter should send
c.HandleConnect()
Eventually(t, func() bool { return sendCount.Load() > 0 },
"should have sent")
}
func TestCourierDrainOnClose(t *testing.T) {
ctx := component.MustNew(context.Background(), "prism", "test")
var sendCount atomic.Uint32
sendFunc := func(data Envelope) error {
sendCount.Add(1)
return nil
}
c := NewCourier(ctx, sendFunc, nil)
// do not connect, queue some letters
outcomes := make([]LetterOutcome, 0, 2)
called := make(chan LetterOutcome, 4)
c.Enqueue(newTestLetter(ctx, 1), func(o LetterOutcome) { called <- o })
c.Enqueue(newTestLetter(ctx, 2), func(o LetterOutcome) { called <- o })
// should not send any letters
Never(t, func() bool { return sendCount.Load() > 0 },
"should not have sent letters")
// close the courier
c.Close()
// expect each letter to return cancelled
Eventually(t, func() bool {
select {
default:
return false
case o := <-called:
outcomes = append(outcomes, o)
return len(outcomes) == 2
}
}, "should have returned 2 outcomes")
if len(outcomes) >= 2 {
assert.Equal(t, OutcomeCancelled, outcomes[0].Kind)
assert.Equal(t, OutcomeCancelled, outcomes[1].Kind)
}
}
+467
View File
@@ -0,0 +1,467 @@
package prism
import (
"context"
"fmt"
"git.wisehodl.dev/jay/go-honeybee"
"git.wisehodl.dev/jay/go-mana-component"
"git.wisehodl.dev/jay/go-roots-ws"
"log/slog"
"sync"
"time"
)
// ----------------------------------------------------------------------------
// Types
// ----------------------------------------------------------------------------
type EmbassyPlugin struct {
Connect func(url string) error
Remove func(url string) error
Send func(url string, data []byte) error
Events <-chan honeybee.OutboundPoolEvent
Inbox <-chan honeybee.InboxMessage
}
type EmbassyEventKind int
const (
EventConnected EmbassyEventKind = iota
EventDisconnected
EventEmbassyUnknown
)
func mapEmbassyEvent(kind honeybee.OutboundPoolEventKind) EmbassyEventKind {
switch kind {
case honeybee.OutboundEventConnected:
return EventConnected
case honeybee.OutboundEventDisconnected:
return EventDisconnected
default:
return EventEmbassyUnknown
}
}
type OutboundPoolEvent struct {
ID string
Kind EmbassyEventKind
At time.Time
}
type InboxMessage struct {
ID string
Data []byte
ReceivedAt time.Time
}
type Embassy struct {
pool EmbassyPlugin
envoys map[string]*Envoy
eventSubs map[string]chan<- OutboundPoolEvent
inboxSubs map[string]chan<- InboxMessage
ctx context.Context
cancel context.CancelFunc
mu sync.RWMutex
wg sync.WaitGroup
handler slog.Handler
logger *slog.Logger
}
type Envoy struct {
url string
connected bool
terminate func()
queue chan []byte
send func(data []byte) error
events <-chan OutboundPoolEvent
inbox <-chan InboxMessage
eventSubs []chan<- OutboundPoolEvent
inboxSubs map[string][]chan<- InboxMessage
ctx context.Context
cancel context.CancelFunc
mu sync.RWMutex
wg sync.WaitGroup
handler slog.Handler
logger *slog.Logger
}
// ----------------------------------------------------------------------------
// Embassy
// ----------------------------------------------------------------------------
func NewEmbassy(
ctx context.Context,
pool EmbassyPlugin,
handler slog.Handler,
) *Embassy {
ctx, cancel := context.WithCancel(component.MustNew(ctx, "prism", "embassy"))
e := &Embassy{
pool: pool,
envoys: make(map[string]*Envoy),
eventSubs: make(map[string]chan<- OutboundPoolEvent),
inboxSubs: make(map[string]chan<- InboxMessage),
ctx: ctx,
cancel: cancel,
}
if handler != nil {
comp := component.FromContext(ctx)
e.handler = handler
e.logger = slog.New(handler).With(slog.Any("component", comp))
}
e.wg.Add(2)
go e.routeEvents()
go e.routeInbox()
return e
}
func (e *Embassy) Dispatch(url string) error {
url, err := honeybee.NormalizeURL(url)
if err != nil {
return fmt.Errorf("invalid url: %w", err)
}
e.mu.RLock()
_, exists := e.envoys[url]
if exists {
e.mu.RUnlock()
return fmt.Errorf("already dispatched: %s", url)
}
e.mu.RUnlock()
e.mu.Lock()
e.pool.Connect(url)
terminate := func() { e.dismiss(url) }
send := func(data []byte) error { return e.send(url, data) }
events := e.subscribeEventsLock(url)
inbox := e.subscribeInboxLock(url)
e.envoys[url] = newEnvoy(e.ctx, url, terminate, send, events, inbox, e.handler)
e.mu.Unlock()
return nil
}
func (e *Embassy) Envoys() []string {
e.mu.RLock()
defer e.mu.RUnlock()
var envoys []string
for url := range e.envoys {
envoys = append(envoys, url)
}
return envoys
}
func (e *Embassy) Call(url string) *Envoy {
url, err := honeybee.NormalizeURL(url)
if err != nil {
return nil
}
e.mu.RLock()
defer e.mu.RUnlock()
envoy, ok := e.envoys[url]
if !ok {
return nil
}
return envoy
}
func (e *Embassy) Close() {
e.cancel()
e.wg.Wait()
}
func (e *Embassy) dismiss(url string) {
e.mu.Lock()
defer e.mu.Unlock()
e.pool.Remove(url)
e.unsubscribeEventsLock(url)
e.unsubscribeInboxLock(url)
delete(e.envoys, url)
}
func (e *Embassy) send(url string, data []byte) error {
return e.pool.Send(url, data)
}
func (e *Embassy) subscribeEventsLock(url string) <-chan OutboundPoolEvent {
ch := make(chan OutboundPoolEvent)
e.eventSubs[url] = ch
return ch
}
func (e *Embassy) unsubscribeEventsLock(url string) {
ch, ok := e.eventSubs[url]
if !ok {
return
}
close(ch)
delete(e.eventSubs, url)
}
func (e *Embassy) subscribeInboxLock(url string) <-chan InboxMessage {
ch := make(chan InboxMessage)
e.inboxSubs[url] = ch
return ch
}
func (e *Embassy) unsubscribeInboxLock(url string) {
ch, ok := e.inboxSubs[url]
if !ok {
return
}
close(ch)
delete(e.inboxSubs, url)
}
func (e *Embassy) routeEvents() {
defer e.wg.Done()
for {
select {
case <-e.ctx.Done():
return
case ev, ok := <-e.pool.Events:
if !ok {
return
}
url, err := honeybee.NormalizeURL(ev.ID)
if err != nil {
continue
}
e.mu.RLock()
sub, ok := e.eventSubs[url]
e.mu.RUnlock()
if !ok {
continue
}
select {
case <-e.ctx.Done():
return
case sub <- OutboundPoolEvent{
ID: ev.ID, Kind: mapEmbassyEvent(ev.Kind), At: ev.At,
}:
}
}
}
}
func (e *Embassy) routeInbox() {
defer e.wg.Done()
for {
select {
case <-e.ctx.Done():
return
case ev, ok := <-e.pool.Inbox:
if !ok {
return
}
url, err := honeybee.NormalizeURL(ev.ID)
if err != nil {
continue
}
e.mu.RLock()
sub, ok := e.inboxSubs[url]
e.mu.RUnlock()
if !ok {
continue
}
select {
case <-e.ctx.Done():
return
case sub <- InboxMessage{
ID: ev.ID, Data: ev.Data, ReceivedAt: ev.ReceivedAt}:
}
}
}
}
// ----------------------------------------------------------------------------
// Envoy
// ----------------------------------------------------------------------------
func newEnvoy(
ctx context.Context,
url string,
terminate func(),
send func(data []byte) error,
events <-chan OutboundPoolEvent,
inbox <-chan InboxMessage,
handler slog.Handler,
) *Envoy {
ctx, cancel := context.WithCancel(component.MustExtend(ctx, "envoy"))
e := &Envoy{
url: url,
terminate: terminate,
queue: make(chan []byte),
send: send,
events: events,
inbox: inbox,
inboxSubs: make(map[string][]chan<- InboxMessage),
ctx: ctx,
cancel: cancel,
}
if handler != nil {
comp := component.FromContext(ctx)
e.handler = handler
e.logger = slog.New(handler).With(slog.Any("component", comp)).With("peer", url)
}
e.wg.Add(2)
go e.publishEvents()
go e.routeInbox()
return e
}
func (e *Envoy) IsConnected() bool {
e.mu.RLock()
defer e.mu.RUnlock()
return e.connected
}
func (e *Envoy) Context() context.Context {
return e.ctx
}
func (e *Envoy) Handler() slog.Handler {
return e.handler
}
func (e *Envoy) Dismiss() {
e.terminate()
e.cancel()
e.wg.Wait()
e.mu.Lock()
defer e.mu.Unlock()
for _, sub := range e.eventSubs {
close(sub)
}
for _, subs := range e.inboxSubs {
for _, sub := range subs {
close(sub)
}
}
e.eventSubs = nil
e.inboxSubs = make(map[string][]chan<- InboxMessage)
}
func (e *Envoy) Send(data []byte) error {
return e.send(data)
}
func (e *Envoy) SubscribeEvents() <-chan OutboundPoolEvent {
e.mu.Lock()
defer e.mu.Unlock()
ch := make(chan OutboundPoolEvent)
e.eventSubs = append(e.eventSubs, ch)
return ch
}
func (e *Envoy) SubscribeInbox(labels []string) <-chan InboxMessage {
e.mu.Lock()
defer e.mu.Unlock()
ch := make(chan InboxMessage)
for _, label := range labels {
if _, ok := e.inboxSubs[label]; !ok {
e.inboxSubs[label] = make([]chan<- InboxMessage, 0)
}
e.inboxSubs[label] = append(e.inboxSubs[label], ch)
}
return ch
}
func (e *Envoy) publishEvents() {
defer e.wg.Done()
for {
select {
case <-e.ctx.Done():
return
case ev, ok := <-e.events:
if !ok {
return
}
e.mu.Lock()
switch ev.Kind {
case EventConnected:
e.connected = true
case EventDisconnected:
e.connected = false
}
subs := e.eventSubs
e.mu.Unlock()
for _, ch := range subs {
select {
case <-e.ctx.Done():
return
case ch <- ev:
}
}
}
}
}
func (e *Envoy) routeInbox() {
defer e.wg.Done()
for {
select {
case <-e.ctx.Done():
return
case msg, ok := <-e.inbox:
if !ok {
return
}
label, err := envelope.GetLabel(msg.Data)
if err != nil {
continue
}
e.mu.RLock()
subs, ok := e.inboxSubs[string(label)]
e.mu.RUnlock()
if !ok {
continue
}
for _, ch := range subs {
select {
case <-e.ctx.Done():
return
case ch <- msg:
}
}
}
}
}
+105 -320
View File
@@ -3,348 +3,133 @@ package prism
import (
"context"
"git.wisehodl.dev/jay/go-honeybee"
"git.wisehodl.dev/jay/go-roots-ws"
"github.com/stretchr/testify/assert"
"sync/atomic"
"testing"
"time"
)
func TestEmbassyPoolEvents(t *testing.T) {
func TestEmbassy_TEMPLATE(t *testing.T) {
ctx := context.Background()
eventsCh := make(chan honeybee.OutboundPoolEvent)
url := "wss://test"
connect := func(url string) error { return nil }
remove := func(url string) error { return nil }
sent := false
_ = sent
send := func(url string, data []byte) error {
sent = true
return nil
}
events := make(chan honeybee.OutboundPoolEvent)
inbox := make(chan honeybee.InboxMessage)
pool := EmbassyPlugin{
Connect: func(id string) error { return nil },
Remove: func(id string) error { return nil },
Send: func(id string, data []byte) error { return nil },
Events: eventsCh,
Connect: connect,
Remove: remove,
Send: send,
Events: events,
Inbox: inbox,
}
e := NewEmbassy(ctx, pool, nil, nil)
sub := e.Subscribe()
t.Run("added then removed", func(t *testing.T) {
err := e.Dispatch("wss://test")
assert.NoError(t, err)
Eventually(t, func() bool {
select {
default:
return false
case ev := <-sub:
return ev.Kind == EventAdded
}
}, "expected added event")
err = e.Dismiss("wss://test")
assert.NoError(t, err)
Eventually(t, func() bool {
select {
default:
return false
case ev := <-sub:
return ev.Kind == EventRemoved
}
}, "expected removed event")
})
t.Run("connected", func(t *testing.T) {
e.Dispatch("wss://test")
eventsCh <- honeybee.OutboundPoolEvent{
ID: "wss://test",
Kind: honeybee.OutboundEventConnected,
At: time.Now(),
}
Eventually(t, func() bool {
select {
default:
return false
case ev := <-sub:
return ev.Kind == EventConnected
}
}, "expected connected event")
})
t.Run("disconnected", func(t *testing.T) {
e.Dispatch("wss://test")
eventsCh <- honeybee.OutboundPoolEvent{
ID: "wss://test",
Kind: honeybee.OutboundEventDisconnected,
At: time.Now(),
}
Eventually(t, func() bool {
select {
default:
return false
case ev := <-sub:
return ev.Kind == EventDisconnected
}
}, "expected disconnected event")
})
embassy := NewEmbassy(ctx, pool, nil)
embassy.Dispatch(url)
envoy := embassy.Call(url)
assert.NotNil(t, envoy)
}
func TestEmbassyPeerRegistry(t *testing.T) {
func TestEmbassy_Dispatch(t *testing.T) {
ctx := context.Background()
eventsCh := make(chan honeybee.OutboundPoolEvent)
url := "wss://test"
connectCalled := make(chan struct{})
removeCalled := make(chan struct{})
connect := func(url string) error { close(connectCalled); return nil }
remove := func(url string) error { close(removeCalled); return nil }
sent := false
send := func(url string, data []byte) error {
sent = true
return nil
}
events := make(chan honeybee.OutboundPoolEvent)
inbox := make(chan honeybee.InboxMessage)
pool := EmbassyPlugin{
Connect: func(id string) error { return nil },
Remove: func(id string) error { return nil },
Send: func(id string, data []byte) error { return nil },
Events: eventsCh,
Connect: connect,
Remove: remove,
Send: send,
Events: events,
Inbox: inbox,
}
e := NewEmbassy(ctx, pool, nil, nil)
embassy := NewEmbassy(ctx, pool, nil)
embassy.Dispatch(url)
envoy := embassy.Call(url)
assert.NotNil(t, envoy)
// add
e.Dispatch("wss://test")
url, ok := e.HasPeer("wss://test/")
assert.Equal(t, "wss://test", url)
assert.True(t, ok)
assert.False(t, e.IsConnected("wss://test"))
// connect
eventsCh <- honeybee.OutboundPoolEvent{
ID: "wss://test",
Kind: honeybee.OutboundEventConnected,
At: time.Now(),
}
Eventually(t, func() bool {
_, exists := e.HasPeer("wss://test")
connected := e.IsConnected("wss://test")
return exists && connected
}, "expected: exists, connected")
// disconnect
eventsCh <- honeybee.OutboundPoolEvent{
ID: "wss://test",
Kind: honeybee.OutboundEventDisconnected,
At: time.Now(),
}
Eventually(t, func() bool {
_, exists := e.HasPeer("wss://test")
connected := e.IsConnected("wss://test")
return exists && !connected
}, "expected: exists, disconnected")
// remove
e.Dismiss("wss://test")
_, ok = e.HasPeer("wss://test")
_, ok := <-connectCalled
assert.False(t, ok)
assert.False(t, e.IsConnected("wss://test"))
}
func TestEmbassyPeers(t *testing.T) {
ctx := context.Background()
eventSub := envoy.SubscribeEvents()
inboxSub := envoy.SubscribeInbox([]string{"EVENT"})
pool := EmbassyPlugin{
Connect: func(id string) error { return nil },
Remove: func(id string) error { return nil },
Send: func(id string, data []byte) error { return nil },
Events: nil,
}
e := NewEmbassy(ctx, pool, nil, nil)
assert.Len(t, e.Peers(), 0)
e.Dispatch("wss://test1")
e.Dispatch("wss://test2")
assert.Len(t, e.Peers(), 2)
e.Dismiss("wss://test2")
assert.Len(t, e.Peers(), 1)
}
func TestEmbassySubFanout(t *testing.T) {
ctx := context.Background()
eventsCh := make(chan honeybee.OutboundPoolEvent)
pool := EmbassyPlugin{
Connect: func(id string) error { return nil },
Remove: func(id string) error { return nil },
Send: func(id string, data []byte) error { return nil },
Events: eventsCh,
}
e := NewEmbassy(ctx, pool, nil, nil)
sub1 := e.Subscribe()
sub2 := e.Subscribe()
e.Dispatch("wss://test")
Eventually(t, func() bool {
select {
default:
return false
case ev := <-sub1:
return ev.Kind == EventAdded
}
}, "expected added event on sub1")
Eventually(t, func() bool {
select {
default:
return false
case ev := <-sub2:
return ev.Kind == EventAdded
}
}, "expected added event on sub2")
}
func TestEmbassyClose(t *testing.T) {
ctx := context.Background()
eventsCh := make(chan honeybee.OutboundPoolEvent, 1)
pool := EmbassyPlugin{
Connect: func(id string) error { return nil },
Remove: func(id string) error { return nil },
Send: func(id string, data []byte) error { return nil },
Events: eventsCh,
}
e := NewEmbassy(ctx, pool, nil, nil)
sub1 := e.Subscribe()
sub2 := e.Subscribe()
e.Dispatch("wss://test")
e.Close()
// peer gets removed
Eventually(t, func() bool {
select {
default:
return false
case ev := <-sub1:
return ev.ID == "wss://test" && ev.Kind == EventRemoved
}
}, "expected peer removed")
Eventually(t, func() bool {
select {
default:
return false
case ev := <-sub2:
return ev.ID == "wss://test" && ev.Kind == EventRemoved
}
}, "expected peer removed")
// peer list is empty
_, ok := e.HasPeer("wss://test")
assert.False(t, ok)
assert.Len(t, e.Peers(), 0)
// subs close
Eventually(t, func() bool {
_, ok1 := <-sub1
_, ok2 := <-sub2
return !ok1 && !ok2
}, "subs should close")
}
func TestEmbassyJournals(t *testing.T) {
ctx := context.Background()
jc := NewJournalCollector()
eventsCh := make(chan honeybee.OutboundPoolEvent, 1)
pool := EmbassyPlugin{
Connect: func(id string) error { return nil },
Remove: func(id string) error { return nil },
Send: func(id string, data []byte) error { return nil },
Events: eventsCh,
}
e := NewEmbassy(ctx, pool, jc, nil)
out := jc.Out()
peer := "wss://test"
// added
e.Dispatch(peer)
Eventually(t, func() bool {
select {
case entry := <-out:
_, ok := entry.(PeerAddedJournal)
return ok
default:
return false
}
}, "expected PeerAddedJournal")
// connected
eventsCh <- honeybee.OutboundPoolEvent{
ID: peer,
Kind: honeybee.OutboundEventConnected,
At: time.Now(),
}
Eventually(t, func() bool {
select {
case entry := <-out:
e, ok := entry.(PeerConnectedJournal)
// ensure fields are correct
peerOk := e.PeerID() == "wss://test"
modOk := e.Component().Module() == "prism"
pathOk := e.Component().PathString() == "embassy"
return ok && peerOk && modOk && pathOk
default:
return false
}
}, "expected PeerConnectedJournal")
// disconnected
eventsCh <- honeybee.OutboundPoolEvent{
ID: peer,
Kind: honeybee.OutboundEventDisconnected,
At: time.Now(),
}
Eventually(t, func() bool {
select {
case entry := <-out:
_, ok := entry.(PeerDisconnectedJournal)
return ok
default:
return false
}
}, "expected PeerDisconnectedJournal")
// removed
e.Dismiss(peer)
Eventually(t, func() bool {
select {
case entry := <-out:
_, ok := entry.(PeerRemovedJournal)
return ok
default:
return false
}
}, "expected PeerRemovedJournal")
// close embassy: closes journal channel
e.Close()
// Ensure jc can close now that embassy has closed its journal channel
jcClosed := make(chan struct{})
gotEvent := atomic.Int64{}
gotInbox := atomic.Int64{}
eventDone := make(chan struct{})
inboxDone := make(chan struct{})
go func() {
jc.Close()
close(jcClosed)
for range eventSub {
gotEvent.Add(1)
}
close(eventDone)
}()
go func() {
for range inboxSub {
gotInbox.Add(1)
}
close(inboxDone)
}()
Eventually(t, func() bool {
select {
case <-jcClosed:
return true
default:
return false
}
}, "JournalCollector.Close() should return after Embassy.Close()")
events <- honeybee.OutboundPoolEvent{
ID: url, Kind: honeybee.OutboundEventConnected, At: time.Now()}
events <- honeybee.OutboundPoolEvent{
ID: "wss://other", Kind: honeybee.OutboundEventConnected, At: time.Now()}
inbox <- honeybee.InboxMessage{
ID: url,
Data: envelope.EncloseEvent([]byte("{}")),
ReceivedAt: time.Now(),
}
inbox <- honeybee.InboxMessage{
ID: "wss://other",
Data: envelope.EncloseEvent([]byte("{}")),
ReceivedAt: time.Now(),
}
Eventually(t, func() bool { return gotEvent.Load() > 0 },
"should have gotten event")
Eventually(t, func() bool { return gotInbox.Load() > 0 },
"should have gotten inbox message")
Eventually(t, func() bool { return envoy.IsConnected() },
"state should have toggled")
Never(t, func() bool { return gotEvent.Load() > 1 },
"should have only gotten one event")
Never(t, func() bool { return gotInbox.Load() > 1 },
"should have only gotten one inbox message")
envoy.Send([]byte("hello"))
assert.True(t, sent)
envoy.Dismiss()
_, ok = <-removeCalled
assert.False(t, ok)
_, ok = <-eventDone
assert.False(t, ok)
_, ok = <-inboxDone
assert.False(t, ok)
// envoy no longer in embassy
envoy = embassy.Call(url)
assert.Nil(t, envoy)
}
+151
View File
@@ -0,0 +1,151 @@
package prism
import (
"context"
"git.wisehodl.dev/jay/go-honeybee"
"git.wisehodl.dev/jay/go-mana-component"
"git.wisehodl.dev/jay/go-roots-ws"
"github.com/stretchr/testify/assert"
"sync"
"testing"
"time"
)
func TestEnvoy_Dismiss(t *testing.T) {
ctx := component.MustNew(context.Background(), "prism", "test")
mu := sync.RWMutex{}
url := "wss://test"
terminated := false
terminate := func() {
mu.Lock()
defer mu.Unlock()
terminated = true
}
envoy := newEnvoy(ctx, url, terminate, nil, nil, nil, nil)
envoy.Dismiss()
mu.RLock()
defer mu.RUnlock()
assert.True(t, terminated)
}
func TestEnvoy_Send(t *testing.T) {
ctx := component.MustNew(context.Background(), "prism", "test")
url := "wss://test"
var sent bool
send := func(data []byte) error {
sent = true
return nil
}
envoy := newEnvoy(ctx, url, nil, send, nil, nil, nil)
envoy.Send([]byte("hello"))
Eventually(t, func() bool {
return sent
}, "should have sent")
}
func TestEnvoy_IsConnected(t *testing.T) {
ctx := component.MustNew(context.Background(), "prism", "test")
mu := sync.RWMutex{}
url := "wss://test"
events := make(chan honeybee.OutboundPoolEvent)
envoy := newEnvoy(ctx, url, nil, nil, events, nil, nil)
eventSub := envoy.SubscribeEvents()
gotEvents := []honeybee.OutboundPoolEvent{}
go func() {
for ev := range eventSub {
mu.Lock()
gotEvents = append(gotEvents, ev)
mu.Unlock()
}
}()
events <- honeybee.OutboundPoolEvent{
ID: url, Kind: honeybee.OutboundEventConnected, At: time.Now()}
Eventually(t, func() bool {
return envoy.IsConnected()
}, "state should have toggled")
}
func TestEnvoy_Events(t *testing.T) {
ctx := component.MustNew(context.Background(), "prism", "test")
mu := sync.RWMutex{}
url := "wss://test"
events := make(chan honeybee.OutboundPoolEvent)
envoy := newEnvoy(ctx, url, nil, nil, events, nil, nil)
eventSub := envoy.SubscribeEvents()
gotEvents := []honeybee.OutboundPoolEvent{}
go func() {
for ev := range eventSub {
mu.Lock()
gotEvents = append(gotEvents, ev)
mu.Unlock()
}
}()
events <- honeybee.OutboundPoolEvent{
ID: url, Kind: honeybee.OutboundEventConnected, At: time.Now()}
Eventually(t, func() bool {
mu.RLock()
defer mu.RUnlock()
return len(gotEvents) > 0
}, "should have gotten event")
mu.RLock()
assert.Equal(t, honeybee.OutboundEventConnected, gotEvents[0].Kind)
mu.RUnlock()
}
func TestEnvoy_Inbox(t *testing.T) {
ctx := component.MustNew(context.Background(), "prism", "test")
mu := sync.RWMutex{}
url := "wss://test"
inbox := make(chan honeybee.InboxMessage)
envoy := newEnvoy(ctx, url, nil, nil, nil, inbox, nil)
inboxSub := envoy.SubscribeInbox([]string{"EVENT"})
gotInbox := []honeybee.InboxMessage{}
go func() {
for ev := range inboxSub {
mu.Lock()
gotInbox = append(gotInbox, ev)
mu.Unlock()
}
}()
inbox <- honeybee.InboxMessage{
ID: url,
Data: envelope.EncloseEvent([]byte("{}")),
ReceivedAt: time.Now(),
}
inbox <- honeybee.InboxMessage{
ID: url,
Data: envelope.EncloseOK("id", true, "ok"),
ReceivedAt: time.Now(),
}
Eventually(t, func() bool {
mu.RLock()
defer mu.RUnlock()
return len(gotInbox) > 0
}, "should have gotten inbox message")
// should only receive the EVENT message
assert.Len(t, gotInbox, 1)
mu.RLock()
data, err := envelope.FindEvent(gotInbox[0].Data)
mu.RUnlock()
assert.NoError(t, err)
assert.Equal(t, "{}", string(data))
}
-327
View File
@@ -1,327 +0,0 @@
package prism
import (
"fmt"
"git.wisehodl.dev/jay/go-mana-component"
"sync"
"time"
)
// ----------------------------------------------------------------------------
// Types
// ----------------------------------------------------------------------------
// JournalCollector
type JournalCollector struct {
out chan JournalEntry
buffer chan JournalEntry
mu sync.Mutex
wg sync.WaitGroup
closing bool
}
// JournalEntry
type JournalEntry interface {
PeerID() string
SealedAt() time.Time
Component() component.Component
}
type entry struct {
peerID string
sealedAt time.Time
component component.Component
}
func (e *entry) PeerID() string { return e.peerID }
func (e *entry) SealedAt() time.Time { return e.sealedAt }
func (e *entry) Component() component.Component { return e.component }
// ----------------------------------------------------------------------------
// Journal Collector
// ----------------------------------------------------------------------------
func NewJournalCollector() *JournalCollector {
c := &JournalCollector{
out: make(chan JournalEntry),
buffer: make(chan JournalEntry, 1024),
}
go func() {
bufferedPipe(c.buffer, c.out)
close(c.out)
}()
return c
}
func (c *JournalCollector) Enroll(ch <-chan JournalEntry) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.closing {
return fmt.Errorf("journal collector is closing")
}
c.wg.Add(1)
go func() {
defer c.wg.Done()
for e := range ch {
c.buffer <- e
}
}()
return nil
}
func (c *JournalCollector) Close() {
c.mu.Lock()
if c.closing {
c.mu.Unlock()
return
}
c.closing = true
c.mu.Unlock()
c.wg.Wait()
close(c.buffer)
}
func (c *JournalCollector) Out() <-chan JournalEntry { return c.out }
// ----------------------------------------------------------------------------
// Journal Entries
// ----------------------------------------------------------------------------
func newEntry(peerID string, component component.Component) *entry {
return &entry{
peerID: peerID,
component: component,
sealedAt: time.Now(),
}
}
// PeerAdded
type PeerAddedJournal struct {
*entry
Data PeerAddedData
}
type PeerAddedData struct {
At time.Time
}
func NewPeerAddedJournal(
peerID string, component component.Component, data PeerAddedData,
) PeerAddedJournal {
return PeerAddedJournal{entry: newEntry(peerID, component), Data: data}
}
// PeerRemoved
type PeerRemovedJournal struct {
*entry
Data PeerRemovedData
}
type PeerRemovedData struct {
At time.Time
}
func NewPeerRemovedJournal(
peerID string, component component.Component, data PeerRemovedData,
) PeerRemovedJournal {
return PeerRemovedJournal{entry: newEntry(peerID, component), Data: data}
}
// PeerConnected
type PeerConnectedJournal struct {
*entry
Data PeerConnectedData
}
type PeerConnectedData struct {
At time.Time
}
func NewPeerConnectedJournal(
peerID string, component component.Component, data PeerConnectedData,
) PeerConnectedJournal {
return PeerConnectedJournal{entry: newEntry(peerID, component), Data: data}
}
// PeerDisconnected
type PeerDisconnectedJournal struct {
*entry
Data PeerDisconnectedData
}
type PeerDisconnectedData struct {
At time.Time
}
func NewPeerDisconnectedJournal(
peerID string, component component.Component, data PeerDisconnectedData,
) PeerDisconnectedJournal {
return PeerDisconnectedJournal{entry: newEntry(peerID, component), Data: data}
}
// ReqQueued
type ReqQueuedJournal struct {
*entry
Data ReqQueuedData
}
type ReqQueuedData struct {
SubID string
LetterID uint64
QueuedAt time.Time
}
func NewReqQueuedJournal(
peerID string, component component.Component, data ReqQueuedData,
) ReqQueuedJournal {
return ReqQueuedJournal{entry: newEntry(peerID, component), Data: data}
}
// CloseQueued
type CloseQueuedJournal struct {
*entry
Data CloseQueuedData
}
type CloseQueuedData struct {
SubID string
LetterID uint64
QueuedAt time.Time
}
func NewCloseQueuedJournal(
peerID string, component component.Component, data CloseQueuedData,
) CloseQueuedJournal {
return CloseQueuedJournal{entry: newEntry(peerID, component), Data: data}
}
// ReqSendOutcome
type ReqSendOutcomeJournal struct {
*entry
Data ReqSendOutcomeData
}
type ReqSendOutcomeData struct {
SubID string
LetterID uint64
Outcome LetterOutcomeKind
SentAt time.Time
MissedAt time.Time
RetryCount int
}
func NewReqSendOutcomeJournal(
peerID string, component component.Component, data ReqSendOutcomeData,
) ReqSendOutcomeJournal {
return ReqSendOutcomeJournal{entry: newEntry(peerID, component), Data: data}
}
// CloseSendOutcome
type CloseSendOutcomeJournal struct {
*entry
Data CloseSendOutcomeData
}
type CloseSendOutcomeData struct {
SubID string
LetterID uint64
Outcome LetterOutcomeKind
SentAt time.Time
MissedAt time.Time
RetryCount int
}
func NewCloseSendOutcomeJournal(
peerID string, component component.Component, data CloseSendOutcomeData,
) CloseSendOutcomeJournal {
return CloseSendOutcomeJournal{entry: newEntry(peerID, component), Data: data}
}
// ReceivedEOSE
type ReceivedEOSEJournal struct {
*entry
Data ReceivedEOSEData
}
type ReceivedEOSEData struct {
SubID string
At time.Time
}
func NewReceivedEOSEJournal(
peerID string, component component.Component, data ReceivedEOSEData,
) ReceivedEOSEJournal {
return ReceivedEOSEJournal{entry: newEntry(peerID, component), Data: data}
}
// MissedEOSE
type MissedEOSEJournal struct {
*entry
Data MissedEOSEData
}
type MissedEOSEData struct {
SubID string
At time.Time
}
func NewMissedEOSEJournal(
peerID string, component component.Component, data MissedEOSEData,
) MissedEOSEJournal {
return MissedEOSEJournal{entry: newEntry(peerID, component), Data: data}
}
// ReceivedClosed
type ReceivedClosedJournal struct {
*entry
Data ReceivedClosedData
}
type ReceivedClosedData struct {
SubID string
At time.Time
Message string
}
func NewReceivedClosedJournal(
peerID string, component component.Component, data ReceivedClosedData,
) ReceivedClosedJournal {
return ReceivedClosedJournal{entry: newEntry(peerID, component), Data: data}
}
// ReqClosed
type ReqClosedJournal struct {
*entry
Data ReqClosedData
}
type ReqClosedData struct {
SubID string
At time.Time
}
func NewReqClosedJournal(
peerID string, component component.Component, data ReqClosedData,
) ReqClosedJournal {
return ReqClosedJournal{entry: newEntry(peerID, component), Data: data}
}
-165
View File
@@ -1,165 +0,0 @@
package prism
import (
"context"
"git.wisehodl.dev/jay/go-mana-component"
"github.com/stretchr/testify/assert"
"testing"
"time"
)
type testJournalEntry struct {
*entry
}
func newTestEntry(peerID string, comp component.Component) JournalEntry {
return &testJournalEntry{entry: newEntry(peerID, comp)}
}
func TestJournalCollector_SingleProducer(t *testing.T) {
jc := NewJournalCollector()
ch := make(chan JournalEntry, 10)
jc.Enroll(ch)
ctx := component.MustNew(context.Background(), "test", "emitter")
comp := component.FromContext(ctx)
e1 := newTestEntry("peer1", comp)
e2 := newTestEntry("peer2", comp)
ch <- e1
ch <- e2
close(ch)
var received []JournalEntry
out := jc.Out()
// Wait for entries
Eventually(t, func() bool {
select {
case e := <-out:
received = append(received, e)
default:
}
return len(received) == 2
}, "should receive all entries")
}
func TestJournalCollector_MultipleProducers(t *testing.T) {
jc := NewJournalCollector()
ch1 := make(chan JournalEntry, 5)
ch2 := make(chan JournalEntry, 5)
jc.Enroll(ch1)
jc.Enroll(ch2)
ctx := component.MustNew(context.Background(), "test", "emitter")
comp := component.FromContext(ctx)
ch1 <- newTestEntry("p1", comp)
ch2 <- newTestEntry("p2", comp)
ch1 <- newTestEntry("p3", comp)
ch2 <- newTestEntry("p4", comp)
close(ch1)
close(ch2)
count := 0
out := jc.Out()
Eventually(t, func() bool {
select {
case <-out:
count++
default:
}
return count == 4
}, "should merge entries from all producers")
}
func TestJournalCollector_EnrollAfterClose(t *testing.T) {
jc := NewJournalCollector()
jc.Close()
ch := make(chan JournalEntry)
err := jc.Enroll(ch)
assert.Error(t, err)
assert.Contains(t, err.Error(), "closing")
}
func TestJournalCollector_CloseBlocks(t *testing.T) {
jc := NewJournalCollector()
ch := make(chan JournalEntry)
jc.Enroll(ch)
closed := make(chan struct{})
go func() {
jc.Close()
close(closed)
}()
// Output (Out()) should still be open because the producer (ch) is open
select {
case <-jc.Out():
t.Fatal("output channel closed prematurely")
case <-time.After(NegativeTestTimeout):
}
// Output should not be reached yet
select {
case <-closed:
t.Fatal("Close() returned before producer closed")
default:
}
close(ch)
Eventually(t, func() bool {
select {
case _, ok := <-jc.Out():
return !ok
default:
return false
}
}, "Out() should close after all producers close")
Eventually(t, func() bool {
select {
case <-closed:
return true
default:
return false
}
}, "Close() should return after producers finish")
}
func TestJournalCollector_ComponentIdentity(t *testing.T) {
jc := NewJournalCollector()
ch := make(chan JournalEntry, 1)
jc.Enroll(ch)
mod := "test-mod"
path := "a.b.c"
ctx := component.MustNew(context.Background(), mod, path)
comp := component.FromContext(ctx)
entry := newTestEntry("peer-id", comp)
ch <- entry
close(ch)
out := jc.Out()
var received JournalEntry
Eventually(t, func() bool {
select {
case e := <-out:
received = e
return true
default:
return false
}
}, "should receive the entry")
typed, ok := received.(*testJournalEntry)
assert.True(t, ok, "should be correct concrete type")
assert.Equal(t, mod, typed.Component().Module())
assert.Equal(t, path, typed.Component().PathString())
jc.Close()
}
-537
View File
@@ -1,537 +0,0 @@
package prism
import (
"container/list"
"context"
"git.wisehodl.dev/jay/go-mana-component"
"log/slog"
"sync"
"sync/atomic"
"time"
)
// ----------------------------------------------------------------------------
// Types
// ----------------------------------------------------------------------------
// Letters
type LetterID = uint64
type OutboundLetter struct {
id uint64
peerID string
data Envelope
ctx context.Context
cancel context.CancelFunc
}
type LetterOutcomeKind int
const (
OutcomeSent LetterOutcomeKind = iota
OutcomeExpired
OutcomeCancelled
OutcomeRejected
)
func (k LetterOutcomeKind) String() string {
switch k {
case OutcomeSent:
return "sent"
case OutcomeExpired:
return "expired"
case OutcomeCancelled:
return "cancelled"
case OutcomeRejected:
return "rejected"
default:
return "unknown"
}
}
type LetterOutcome struct {
LetterID uint64
PeerID string
Kind LetterOutcomeKind
SentAt time.Time
MissedAt time.Time
Retries int
}
// Postmaster
type Postmaster struct {
couriers map[string]*Courier
poolHasPeer func(id string) (string, bool)
poolEvents <-chan PoolEvent // Adapter.Subscribe
poolSend PoolSendFunc // Adapter.Send
counter atomic.Uint64
ctx context.Context
cancel context.CancelFunc
mu sync.RWMutex
wg sync.WaitGroup
cfg postmasterConfig
handler slog.Handler
logger *slog.Logger
}
// Courier
type Courier struct {
tasks chan courierTask
sendFunc func(data Envelope) error
// state
queue list.List
connected bool
sending bool
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
logger *slog.Logger
}
// Messages
type courierTask interface {
dispatch(c *Courier)
}
// Options
const (
DefaultPostmasterDeadline = 30 * time.Second
)
type PostmasterOption func(*postmasterConfig)
type postmasterConfig struct {
defaultDeadline time.Duration
}
func WithDefaultDeadline(d time.Duration) PostmasterOption {
return func(c *postmasterConfig) { c.defaultDeadline = d }
}
type SendOption func(*sendConfig)
type sendConfig struct {
deadline time.Duration
}
func WithDeadline(d time.Duration) SendOption {
return func(c *sendConfig) { c.deadline = d }
}
// ----------------------------------------------------------------------------
// Postmaster
// ----------------------------------------------------------------------------
func NewPostmaster(
ctx context.Context,
poolHasPeer func(id string) (string, bool),
poolEvents <-chan PoolEvent,
poolSendFunc PoolSendFunc,
handler slog.Handler,
opts ...PostmasterOption,
) *Postmaster {
ctx, cancel := context.WithCancel(
component.MustNew(ctx, "prism", "postmaster"))
cfg := postmasterConfig{
defaultDeadline: DefaultPostmasterDeadline,
}
for _, opt := range opts {
opt(&cfg)
}
pm := &Postmaster{
couriers: make(map[string]*Courier),
poolHasPeer: poolHasPeer,
poolEvents: poolEvents,
poolSend: poolSendFunc,
ctx: ctx,
cancel: cancel,
cfg: cfg,
}
if handler != nil {
comp := component.FromContext(ctx)
pm.handler = handler
pm.logger = slog.New(handler).With(slog.Any("component", comp))
}
pm.wg.Add(1)
go pm.handlePoolEvents()
return pm
}
func (pm *Postmaster) Send(
ctx context.Context,
peerID string,
data Envelope,
callback func(LetterOutcome),
opts ...SendOption,
) (uint64, context.CancelFunc) {
cfg := sendConfig{deadline: pm.cfg.defaultDeadline}
for _, opt := range opts {
opt(&cfg)
}
pm.mu.RLock()
defer pm.mu.RUnlock()
// check if peer courier exists
peerID, ok := pm.poolHasPeer(peerID)
if !ok {
go callback(LetterOutcome{PeerID: peerID, Kind: OutcomeRejected})
return 0, func() {}
}
courier, ok := pm.couriers[peerID]
if !ok {
go callback(LetterOutcome{PeerID: peerID, Kind: OutcomeRejected})
return 0, func() {}
}
ctx, cancel := context.WithTimeout(ctx, cfg.deadline)
letter := OutboundLetter{
id: pm.counter.Add(1),
peerID: peerID,
data: data,
ctx: ctx,
cancel: cancel,
}
courier.Enqueue(letter, callback)
return letter.id, cancel
}
func (pm *Postmaster) Peers() []string {
pm.mu.RLock()
defer pm.mu.RUnlock()
peers := make([]string, 0, len(pm.couriers))
for id, _ := range pm.couriers {
peers = append(peers, id)
}
return peers
}
func (pm *Postmaster) Close() {
pm.cancel()
pm.wg.Wait()
// close each courier
pm.mu.Lock()
couriers := pm.couriers
pm.couriers = make(map[string]*Courier)
pm.mu.Unlock()
for _, courier := range couriers {
courier.Close()
}
}
func (pm *Postmaster) handlePoolEvents() {
defer pm.wg.Done()
for {
select {
case <-pm.ctx.Done():
return
case ev := <-pm.poolEvents:
switch ev.Kind {
case EventAdded:
pm.mu.Lock()
_, exists := pm.couriers[ev.ID]
if exists {
pm.mu.Unlock()
continue
}
send := func(data Envelope) error { return pm.poolSend(ev.ID, data) }
courier := NewCourier(pm.ctx, send, pm.handler)
pm.couriers[ev.ID] = courier
pm.mu.Unlock()
case EventRemoved:
pm.mu.Lock()
courier, exists := pm.couriers[ev.ID]
if exists {
delete(pm.couriers, ev.ID)
}
pm.mu.Unlock()
courier.Close()
case EventConnected:
pm.mu.RLock()
courier, exists := pm.couriers[ev.ID]
if exists {
courier.HandleConnect()
}
pm.mu.RUnlock()
case EventDisconnected:
pm.mu.RLock()
courier, exists := pm.couriers[ev.ID]
if exists {
courier.HandleDisconnect()
}
pm.mu.RUnlock()
}
}
}
}
// ----------------------------------------------------------------------------
// Courier
// ----------------------------------------------------------------------------
// Letter State
type letterState struct {
letter OutboundLetter
onOutcome func(LetterOutcome)
sentAt time.Time
missedAt time.Time
retries int
once sync.Once
}
func (s *letterState) isCancelled() bool {
return s.letter.ctx.Err() != nil
}
func (s *letterState) countRetry() { s.retries++ }
func (s *letterState) setSentAt(at time.Time) { s.sentAt = at }
func (s *letterState) setMissedAt(at time.Time) { s.missedAt = at }
// Courier
func NewCourier(
ctx context.Context,
sendFunc func(data Envelope) error, // func => PoolSendFunc(id)
handler slog.Handler,
) *Courier {
ctx, cancel := context.WithCancel(
component.MustExtend(ctx, "courier"))
c := &Courier{
tasks: make(chan courierTask, 64),
sendFunc: sendFunc,
ctx: ctx,
cancel: cancel,
}
if handler != nil {
comp := component.FromContext(ctx)
c.logger = slog.New(handler).With(slog.Any("component", comp))
}
c.wg.Add(1)
go c.run()
return c
}
func (c *Courier) Enqueue(letter OutboundLetter, onOutcome func(LetterOutcome)) {
wrappedLetter := &letterState{
letter: letter,
onOutcome: onOutcome,
}
c.order(taskEnqueue{letter: wrappedLetter})
}
func (c *Courier) HandleConnect() {
c.order(taskConnected{})
}
func (c *Courier) HandleDisconnect() {
c.order(taskDisconnected{})
}
func (c *Courier) Close() {
c.cancel()
c.wg.Wait()
c.terminate()
}
// Internal
func (c *Courier) order(task courierTask) {
select {
case <-c.ctx.Done():
case c.tasks <- task:
}
}
func (c *Courier) run() {
defer c.wg.Done()
for {
select {
case <-c.ctx.Done():
return
case task := <-c.tasks:
task.dispatch(c)
c.maybeSend()
}
}
}
func (c *Courier) maybeSend() {
if !c.preflight() {
c.drain()
return
}
s, ok := c.pop()
if !ok {
return
}
c.sending = true
c.wg.Add(1)
go c.sendOnce(s)
}
func (c *Courier) sendOnce(s *letterState) {
defer c.wg.Done()
err := c.sendFunc(s.letter.data)
c.order(taskHandleSendResult{letter: s, at: time.Now(), err: err})
}
func (c *Courier) doneOnce(s *letterState) {
var kind LetterOutcomeKind
if s.isCancelled() {
// letter was cancelled
if s.letter.ctx.Err() == context.DeadlineExceeded {
// letter expired
kind = OutcomeExpired
} else {
// letter was cancelled externally
kind = OutcomeCancelled
}
} else {
// letter was sent
kind = OutcomeSent
}
outcome := LetterOutcome{
LetterID: s.letter.id,
PeerID: s.letter.peerID,
Kind: kind,
SentAt: s.sentAt,
MissedAt: s.missedAt,
Retries: s.retries,
}
s.once.Do(func() {
s.letter.cancel()
go s.onOutcome(outcome)
})
}
func (c *Courier) terminate() {
// cancel remaining letters
for {
s, ok := c.pop()
if !ok {
break
}
s.letter.cancel()
s.setMissedAt(time.Now())
c.doneOnce(s)
}
}
// Helpers
func (c *Courier) preflight() bool {
isConnected := c.connected
notAlreadySending := !c.sending
hasQueuedLetters := c.queue.Len() > 0
return isConnected && notAlreadySending && hasQueuedLetters
}
func (c *Courier) drain() {
for {
front := c.queue.Front()
if front == nil {
return
}
s := front.Value.(*letterState)
if !s.isCancelled() {
return
}
s.setMissedAt(time.Now())
c.doneOnce(s)
c.queue.Remove(front)
}
}
func (c *Courier) pop() (*letterState, bool) {
for {
front := c.queue.Front()
if front == nil {
return nil, false
}
s := front.Value.(*letterState)
c.queue.Remove(front)
if !s.isCancelled() {
return s, true
}
s.setMissedAt(time.Now())
c.doneOnce(s)
}
}
// ----------------------------------------------------------------------------
// Courier Messages
// ----------------------------------------------------------------------------
type taskEnqueue struct{ letter *letterState }
func (t taskEnqueue) dispatch(c *Courier) {
c.queue.PushBack(t.letter)
}
type taskConnected struct{}
func (t taskConnected) dispatch(c *Courier) {
c.connected = true
}
type taskDisconnected struct{}
func (t taskDisconnected) dispatch(c *Courier) {
c.connected = false
}
type taskHandleSendResult struct {
letter *letterState
at time.Time
err error
}
func (t taskHandleSendResult) dispatch(c *Courier) {
c.sending = false
if t.err != nil {
t.letter.countRetry()
c.queue.PushFront(t.letter)
} else {
t.letter.setSentAt(t.at)
c.doneOnce(t.letter)
}
}
-225
View File
@@ -1,225 +0,0 @@
package prism
import (
"context"
"fmt"
"github.com/stretchr/testify/assert"
"testing"
"time"
)
// Helpers
func mockPostmaster(
ctx context.Context,
) (pm *Postmaster, poolEvents chan PoolEvent) {
poolHasPeer := func(id string) (string, bool) { return id, true }
poolEvents = make(chan PoolEvent, 4)
poolSendFunc := func(id string, data Envelope) error { return nil }
pm = NewPostmaster(ctx, poolHasPeer, poolEvents, poolSendFunc, nil)
return
}
func expectLetterOutcome(
t *testing.T, ch chan LetterOutcome, kind LetterOutcomeKind,
) {
t.Helper()
var outcome LetterOutcome
Eventually(t, func() bool {
select {
default:
return false
case outcome = <-ch:
return true
}
}, "should have received outcome")
assert.Equal(t, kind, outcome.Kind)
}
func expectAllLetterOutcomes(
t *testing.T, ch chan LetterOutcome, kind LetterOutcomeKind, count int,
) {
t.Helper()
outcomes := make([]LetterOutcome, 0, count)
Eventually(t, func() bool {
select {
default:
return false
case o := <-ch:
outcomes = append(outcomes, o)
return len(outcomes) == count
}
}, fmt.Sprintf("should have returned %d outcomes", count))
if len(outcomes) >= count {
for i := range count {
assert.Equal(t, OutcomeCancelled, outcomes[i].Kind)
}
}
}
// Tests
func TestPostmasterUnknownPeerSend(t *testing.T) {
ctx := context.Background()
pm, _ := mockPostmaster(ctx)
called := make(chan LetterOutcome, 1)
pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o })
expectLetterOutcome(t, called, OutcomeRejected)
}
func TestPostmasterSend(t *testing.T) {
ctx := context.Background()
pm, poolEvents := mockPostmaster(ctx)
poolEvents <- PoolEvent{ID: "peer", Kind: EventAdded, At: time.Now()}
poolEvents <- PoolEvent{ID: "peer", Kind: EventConnected, At: time.Now()}
Eventually(t, func() bool { return len(pm.Peers()) > 0 }, "should add peer")
called := make(chan LetterOutcome, 1)
pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o })
expectLetterOutcome(t, called, OutcomeSent)
}
func TestPostmasterCancelInFlight(t *testing.T) {
ctx := context.Background()
pm, poolEvents := mockPostmaster(ctx)
poolEvents <- PoolEvent{ID: "peer", Kind: EventAdded, At: time.Now()}
Eventually(t, func() bool { return len(pm.Peers()) > 0 }, "should add peer")
called := make(chan LetterOutcome, 1)
_, cancel := pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o })
// wait for letter to queue
time.Sleep(100 * time.Millisecond)
// cancel the letter using its callback
cancel()
// connect the pool
poolEvents <- PoolEvent{ID: "peer", Kind: EventConnected, At: time.Now()}
expectLetterOutcome(t, called, OutcomeCancelled)
}
func TestPostmasterExpire(t *testing.T) {
ctx := context.Background()
pm, poolEvents := mockPostmaster(ctx)
poolEvents <- PoolEvent{ID: "peer", Kind: EventAdded, At: time.Now()}
Eventually(t, func() bool { return len(pm.Peers()) > 0 }, "should add peer")
called := make(chan LetterOutcome, 1)
pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o },
WithDeadline(1*time.Millisecond))
// wait for letter to queue and expire
time.Sleep(100 * time.Millisecond)
// connect the pool
poolEvents <- PoolEvent{ID: "peer", Kind: EventConnected, At: time.Now()}
expectLetterOutcome(t, called, OutcomeExpired)
}
func TestPostmasterPeerRemoved(t *testing.T) {
ctx := context.Background()
pm, poolEvents := mockPostmaster(ctx)
// add peer, but do not connect
poolEvents <- PoolEvent{ID: "peer", Kind: EventAdded, At: time.Now()}
Eventually(t, func() bool { return len(pm.Peers()) > 0 }, "should add peer")
// send two letters
called := make(chan LetterOutcome, 2)
pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o })
pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o })
// wait for them to hit the courier queue
time.Sleep(100 * time.Millisecond)
// remove the peer
poolEvents <- PoolEvent{ID: "peer", Kind: EventRemoved, At: time.Now()}
// expect each letter to return cancelled
expectAllLetterOutcomes(t, called, OutcomeCancelled, 2)
// subsequent sends should fail
pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o })
expectLetterOutcome(t, called, OutcomeRejected)
}
func TestPostmasterCourierCloseRace(t *testing.T) {
ctx := context.Background()
pm, poolEvents := mockPostmaster(ctx)
// add peer, but do not connect
poolEvents <- PoolEvent{ID: "peer", Kind: EventAdded, At: time.Now()}
Eventually(t, func() bool { return len(pm.Peers()) > 0 }, "should add peer")
// remove the peer
poolEvents <- PoolEvent{ID: "peer", Kind: EventRemoved, At: time.Now()}
// send a letter
time.Sleep(5 * time.Microsecond) // small wait lines up the race condition
var outcome *LetterOutcome
called := make(chan LetterOutcome, 1)
pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o })
Eventually(t, func() bool {
select {
default:
return false
case o := <-called:
outcome = &o
return true
}
}, "should have returned 1 outcomes")
if outcome == nil {
t.Fatal("did not receive an outcome")
}
// depending on the race, the outcome could be:
// close, then send: send is rejected by the postmaster
// send, then close: send is cancelled by the courier
assert.Contains(t,
[]LetterOutcomeKind{OutcomeCancelled, OutcomeRejected},
outcome.Kind,
)
}
func TestPostmasterClose(t *testing.T) {
ctx := context.Background()
pm, poolEvents := mockPostmaster(ctx)
// add peer, but do not connect
poolEvents <- PoolEvent{ID: "peer", Kind: EventAdded, At: time.Now()}
Eventually(t, func() bool { return len(pm.Peers()) > 0 }, "should add peer")
// send two letters
called := make(chan LetterOutcome, 2)
pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o })
pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o })
// wait for them to hit the courier queue
time.Sleep(100 * time.Millisecond)
// close postmaster
pm.Close()
// expect each letter to return cancelled
expectAllLetterOutcomes(t, called, OutcomeCancelled, 2)
// subsequent sends should be rejected
pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o })
expectLetterOutcome(t, called, OutcomeRejected)
}
-1
View File
@@ -1 +0,0 @@
package prism
-740
View File
@@ -1,740 +0,0 @@
package prism
import (
"context"
"encoding/base32"
"fmt"
"git.wisehodl.dev/jay/go-mana-component"
"git.wisehodl.dev/jay/go-roots-ws"
"log/slog"
"sync"
"time"
)
var (
_ fmt.Formatter
)
// ----------------------------------------------------------------------------
// Types
// ----------------------------------------------------------------------------
// Outputs
type ReqEvent struct {
PeerID string
ReceivedAt time.Time
Data []byte
}
type ReqMessage struct {
PeerID string
ReceivedAt time.Time
Data string
}
// Options
const (
defaultLabel = "REQ"
defaultEOSETimeout = 30 * time.Second
)
type reqConfig struct {
id string
label string
eoseTimeout time.Duration
}
type ReqOption func(*reqConfig)
// Request Manager
type ReqManager struct {
subs map[string]Request
byPeer map[string]map[string]struct{} // peerID -> subID set
postmaster *Postmaster
collector *JournalCollector
journals chan JournalEntry // JournalCollector.Enroll
isConnected func(peerID string) bool // Adapter.IsConnected
poolEvents <-chan PoolEvent // Adapter.Subscribe
poolInbox <-chan InboundLetter // Clerk.Subscribe
ctx context.Context
cancel context.CancelFunc
mu sync.RWMutex
wg sync.WaitGroup
handler slog.Handler
logger *slog.Logger
}
type Request interface {
order(reqTask)
Peers() []string
Close()
}
// Base Request
type request struct {
id string
req Envelope
tasks chan reqTask
closing bool
done chan struct{}
buffer chan ReqEvent
events chan ReqEvent
messages chan ReqMessage
postmaster *Postmaster
journals chan JournalEntry
isConnected func(peerID string) bool
onClose func()
ctx context.Context
wg sync.WaitGroup
peerWg sync.WaitGroup
logger *slog.Logger
}
// Stream Request
type StreamReq struct {
*request
peers map[string]*streamPeer
}
type streamPeer struct {
reqSent bool
closeSent bool
closed bool
closeOnce sync.Once
}
// Query Request
type QueryReq struct {
*request
peers map[string]*queryPeer
eoseTimeout time.Duration
}
type queryPeer struct {
reqSent bool
eoseTimer *time.Timer
closeSent bool
closed bool
closeOnce sync.Once
}
// ----------------------------------------------------------------------------
// Request Options
// ----------------------------------------------------------------------------
func newReqConfig(opts ...ReqOption) reqConfig {
cfg := reqConfig{
id: "",
label: defaultLabel,
eoseTimeout: defaultEOSETimeout,
}
for _, opt := range opts {
opt(&cfg)
}
return cfg
}
func WithID(id string) ReqOption {
return func(c *reqConfig) {
c.id = id
}
}
func WithLabel(label string) ReqOption {
return func(c *reqConfig) {
c.label = label
}
}
func WithEOSETimeout(timeout time.Duration) ReqOption {
return func(c *reqConfig) {
c.eoseTimeout = timeout
}
}
// ----------------------------------------------------------------------------
// Request Manager
// ----------------------------------------------------------------------------
func NewReqManager(
ctx context.Context,
postmaster *Postmaster,
isConnected func(string) bool,
poolEvents <-chan PoolEvent,
poolInbox <-chan InboundLetter,
collector *JournalCollector,
handler slog.Handler,
) *ReqManager {
return nil
}
func (m *ReqManager) OpenStream(
filters [][]byte,
peers []string,
opts ...ReqOption,
) (
id string,
events <-chan ReqEvent,
messages <-chan ReqMessage,
err error,
) {
return "", nil, nil, nil
}
func (m *ReqManager) OpenQuery(
filters [][]byte,
peers []string,
opts ...ReqOption,
) (
id string,
events <-chan ReqEvent,
messages <-chan ReqMessage,
err error,
) {
return "", nil, nil, nil
}
func (m *ReqManager) CloseReq(id string) error {
return nil
}
func (m *ReqManager) Close() {}
func (m *ReqManager) makeOnClose(subID string, peers []string) func() {
return func() {}
}
func (m *ReqManager) routeInbox() {
// parses envelope label and sub ID from the letter
// looks up in sub registry
// calls req.order()
}
func (m *ReqManager) routeEvents() {
// reads PoolEvent
// looks up in m.byPeer
// calls req.order() on each matching request
}
// Helpers
func cleanPeers(peers []string) []string {
return nil
}
var encoder = base32.StdEncoding.WithPadding(base32.NoPadding)
func generateID(prefix string) string {
return ""
}
// ----------------------------------------------------------------------------
// Base Request
// ----------------------------------------------------------------------------
func (r *request) runReturnEvents() {
defer r.wg.Done()
defer close(r.events)
defer close(r.messages)
bufferedPipe(r.buffer, r.events)
}
func (r *request) dispatchEvent(task taskEvent) {
select {
case <-r.done:
case r.buffer <- ReqEvent{
PeerID: task.peerID, ReceivedAt: task.at, Data: task.data}:
}
}
func (r *request) emit(entry JournalEntry) {
select {
case <-r.done:
case r.journals <- entry:
}
}
func (r *request) order(task reqTask) {
select {
case <-r.done:
case r.tasks <- task:
}
}
func (r *request) Close() {
r.order(newCloseReq())
r.wg.Wait()
}
func (r *request) terminate() {
defer r.wg.Done()
r.peerWg.Wait()
close(r.done)
close(r.buffer)
if r.journals != nil {
close(r.journals)
}
r.onClose()
}
// ----------------------------------------------------------------------------
// Stream Request
// ----------------------------------------------------------------------------
func NewStreamReq(
ctx context.Context,
id string,
filters [][]byte,
peers []string,
postmaster *Postmaster,
isConnected func(string) bool,
collector *JournalCollector,
onClose func(),
handler slog.Handler,
) *StreamReq {
ctx = component.MustExtend(ctx, "stream")
r := &StreamReq{
request: &request{
id: id,
req: envelope.EncloseReq(id, filters),
tasks: make(chan reqTask, len(peers)*16),
done: make(chan struct{}),
buffer: make(chan ReqEvent, len(peers)*16),
events: make(chan ReqEvent),
messages: make(chan ReqMessage, len(peers)),
postmaster: postmaster,
isConnected: isConnected,
onClose: onClose,
ctx: ctx,
},
peers: make(map[string]*streamPeer),
}
if collector != nil {
r.journals = make(chan JournalEntry, len(peers)*16)
collector.Enroll(r.journals)
}
if handler != nil {
c := component.FromContext(ctx)
r.logger = slog.New(handler).With(slog.Any("component", c))
}
for _, peerID := range peers {
r.peers[peerID] = &streamPeer{}
r.peerWg.Add(1)
}
r.wg.Add(2)
go r.run()
go r.runReturnEvents()
// send initial REQs
for id := range r.peers {
if r.isConnected(id) {
r.sendReq(id)
}
}
return r
}
func (r *StreamReq) run() {
defer r.wg.Done()
for {
select {
case <-r.done:
return
case t := <-r.tasks:
r.dispatch(t)
}
}
}
func (r *StreamReq) Peers() []string {
peers := make([]string, 0, len(r.peers))
for p := range r.peers {
peers = append(peers, p)
}
return peers
}
func (r *StreamReq) sendReq(peerID string) {
_, ok := r.peers[peerID]
if !ok {
return
}
id, _ := r.postmaster.Send(r.ctx, peerID, r.req,
func(o LetterOutcome) { r.order(newReqOutcomeTask(peerID, o)) })
c := component.FromContext(r.ctx)
r.emit(NewReqQueuedJournal(peerID, c, ReqQueuedData{
SubID: r.id, LetterID: id, QueuedAt: time.Now(),
}))
}
func (r *StreamReq) sendClose(peerID string) {
peer, ok := r.peers[peerID]
if !ok || peer.closeSent {
return
}
if !peer.reqSent {
r.closePeer(peerID)
return
}
id, _ := r.postmaster.Send(r.ctx, peerID, envelope.EncloseClose(r.id),
func(o LetterOutcome) { r.order(newCloseOutcomeTask(peerID, o)) })
peer.closeSent = true
c := component.FromContext(r.ctx)
r.emit(NewCloseQueuedJournal(peerID, c, CloseQueuedData{
SubID: r.id, LetterID: id, QueuedAt: time.Now(),
}))
}
func (r *StreamReq) closePeer(peerID string) {
peer, ok := r.peers[peerID]
if !ok {
return
}
peer.closeOnce.Do(func() {
r.peerWg.Done()
peer.closed = true
})
}
func (r *StreamReq) dispatch(task reqTask) {
switch t := task.(type) {
case taskReqOutcome:
r.dispatchReqOutcome(t)
case taskCloseOutcome:
r.dispatchCloseOutcome(t)
case taskEvent:
r.dispatchEvent(t)
case taskEOSE:
r.dispatchEOSE(t)
case taskClosed:
r.dispatchClosed(t)
case taskClosePeer:
r.dispatchClosePeer(t)
case taskCloseReq:
r.dispatchCloseReq(t)
case taskHandleReconnect:
r.dispatchHandleReconnect(t)
}
}
func (r *StreamReq) dispatchReqOutcome(task taskReqOutcome) {
peer := r.peers[task.peerID]
if task.outcome.Kind == OutcomeSent {
peer.reqSent = true
}
c := component.FromContext(r.ctx)
r.emit(NewReqSendOutcomeJournal(task.peerID, c, ReqSendOutcomeData{
SubID: r.id,
LetterID: task.outcome.LetterID,
Outcome: task.outcome.Kind,
SentAt: task.outcome.SentAt,
MissedAt: task.outcome.MissedAt,
RetryCount: task.outcome.Retries,
}))
}
func (r *StreamReq) dispatchCloseOutcome(task taskCloseOutcome) {
r.closePeer(task.peerID)
c := component.FromContext(r.ctx)
r.emit(NewCloseSendOutcomeJournal(task.peerID, c, CloseSendOutcomeData{
SubID: r.id,
LetterID: task.outcome.LetterID,
Outcome: task.outcome.Kind,
SentAt: task.outcome.SentAt,
MissedAt: task.outcome.MissedAt,
RetryCount: task.outcome.Retries,
}))
}
func (r *StreamReq) dispatchEOSE(task taskEOSE) {
c := component.FromContext(r.ctx)
r.emit(NewReceivedEOSEJournal(task.peerID, c, ReceivedEOSEData{
SubID: r.id, At: task.at,
}))
}
func (r *StreamReq) dispatchClosed(task taskClosed) {
c := component.FromContext(r.ctx)
r.emit(NewReceivedClosedJournal(task.peerID, c, ReceivedClosedData{
SubID: r.id, At: task.at, Message: task.message,
}))
peer := r.peers[task.peerID]
if peer.closed {
return
}
select {
case <-r.done:
case r.messages <- ReqMessage{
PeerID: task.peerID,
ReceivedAt: task.at,
Data: task.message,
}:
}
r.closePeer(task.peerID)
}
func (r *StreamReq) dispatchClosePeer(task taskClosePeer) {
r.closePeer(task.peerID)
}
func (r *StreamReq) dispatchCloseReq(task taskCloseReq) {
if r.closing {
return
}
r.closing = true
for id, peer := range r.peers {
if !peer.closed {
if !r.isConnected(id) {
r.closePeer(id)
} else {
r.sendClose(id)
}
}
}
r.wg.Add(1)
go r.terminate()
}
func (r *StreamReq) dispatchHandleReconnect(task taskHandleReconnect) {
peer, ok := r.peers[task.peerID]
if !ok || peer.closed || r.closing || peer.closeSent {
return
}
r.sendReq(task.peerID)
}
// ----------------------------------------------------------------------------
// Query Request
// ----------------------------------------------------------------------------
func NewQueryReq(
ctx context.Context,
id string,
filters [][]byte,
peers []string,
postmaster *Postmaster,
isConnected func(string) bool,
journals chan<- JournalEntry,
eoseTimeout time.Duration,
onClose func(),
handler slog.Handler,
) *QueryReq {
// start buffered pipe to event output
// pipe return drives channel closures
return nil
}
func (r *QueryReq) Peers() []string {
peers := make([]string, 0, len(r.peers))
for p := range r.peers {
peers = append(peers, p)
}
return peers
}
func (r *QueryReq) sendReq(peerID string) error {
return nil
}
func (r *QueryReq) sendClose(peerID string) error {
return nil
}
func (r *QueryReq) run() {
defer r.wg.Done()
for {
select {
case <-r.done:
return
case t := <-r.tasks:
r.dispatch(t)
}
}
}
func (r *QueryReq) dispatch(task reqTask) {
switch t := task.(type) {
case taskReqOutcome:
r.dispatchReqOutcome(t)
case taskCloseOutcome:
r.dispatchCloseOutcome(t)
case taskEvent:
r.dispatchEvent(t)
case taskEOSE:
r.dispatchEOSE(t)
case taskClosed:
r.dispatchClosed(t)
case taskClosePeer:
r.dispatchClosePeer(t)
case taskCloseReq:
r.dispatchCloseReq(t)
case taskMissedEOSE:
r.dispatchMissedEOSE(t)
}
}
func (r *QueryReq) dispatchReqOutcome(task taskReqOutcome) {}
func (r *QueryReq) dispatchCloseOutcome(task taskCloseOutcome) {}
func (r *QueryReq) dispatchEOSE(task taskEOSE) {}
func (r *QueryReq) dispatchClosed(task taskClosed) {}
func (r *QueryReq) dispatchClosePeer(task taskClosePeer) {}
func (r *QueryReq) dispatchCloseReq(task taskCloseReq) {}
func (r *QueryReq) dispatchMissedEOSE(task taskMissedEOSE) {}
// ----------------------------------------------------------------------------
// Request Tasks
// ----------------------------------------------------------------------------
// Types
type reqTask interface{ reqTask() } // gates task channel
type taskReqOutcome struct {
peerID string
outcome LetterOutcome
}
func (taskReqOutcome) reqTask() {}
type taskCloseOutcome struct {
peerID string
outcome LetterOutcome
}
func (taskCloseOutcome) reqTask() {}
type taskEvent struct {
peerID string
at time.Time
data Envelope
}
func (taskEvent) reqTask() {}
type taskEOSE struct {
peerID string
at time.Time
}
func (taskEOSE) reqTask() {}
type taskClosed struct {
peerID string
at time.Time
message string
}
func (taskClosed) reqTask() {}
type taskClosePeer struct{ peerID string }
func (taskClosePeer) reqTask() {}
type taskCloseReq struct{}
func (taskCloseReq) reqTask() {}
type taskHandleReconnect struct{ peerID string }
func (taskHandleReconnect) reqTask() {}
type taskMissedEOSE struct{ peerID string }
func (taskMissedEOSE) reqTask() {}
// Constructors
func newReqOutcomeTask(peerID string, outcome LetterOutcome) taskReqOutcome {
return taskReqOutcome{peerID: peerID, outcome: outcome}
}
func newCloseOutcomeTask(peerID string, outcome LetterOutcome) taskCloseOutcome {
return taskCloseOutcome{peerID: peerID, outcome: outcome}
}
func newEventTask(peerID string, at time.Time, data Envelope) taskEvent {
return taskEvent{peerID: peerID, at: at, data: data}
}
func newEOSETask(peerID string, at time.Time) taskEOSE {
return taskEOSE{peerID: peerID, at: at}
}
func newClosedTask(peerID string, at time.Time, message string) taskClosed {
return taskClosed{peerID: peerID, at: at, message: message}
}
func newClosePeerTask(peerID string) taskClosePeer {
return taskClosePeer{peerID: peerID}
}
func newCloseReq() taskCloseReq {
return taskCloseReq{}
}
func newHandleReconnect(peerID string) taskHandleReconnect {
return taskHandleReconnect{peerID: peerID}
}
func newMissedEOSETask(peerID string) taskMissedEOSE {
return taskMissedEOSE{peerID: peerID}
}
-1
View File
@@ -1 +0,0 @@
package prism
+339
View File
@@ -0,0 +1,339 @@
package prism
import (
"context"
"encoding/base32"
"fmt"
"git.wisehodl.dev/jay/go-honeybee"
"git.wisehodl.dev/jay/go-mana-component"
"git.wisehodl.dev/jay/go-roots-ws"
"log/slog"
"sync"
"time"
)
// ----------------------------------------------------------------------------
// Types
// ----------------------------------------------------------------------------
type ReqEvent struct {
PeerID string
ReceivedAt time.Time
Data []byte
}
type ReqClosed struct {
PeerID string
ReceivedAt time.Time
Data string
}
type RequestManager struct {
reqs map[string][][]byte
inboxSubs map[string]chan<- InboxMessage
done chan struct{}
reqWg sync.WaitGroup
envoy *Envoy
events <-chan OutboundPoolEvent
inbox <-chan InboxMessage
ctx context.Context
cancel context.CancelFunc
mu sync.RWMutex
wg sync.WaitGroup
handler slog.Handler
logger *slog.Logger
}
type request struct {
id string
req []byte
query bool
inbox <-chan InboxMessage
stop chan struct{}
terminate func()
events chan ReqEvent
closed chan ReqClosed
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
logger *slog.Logger
}
// ----------------------------------------------------------------------------
// Helpers
// ----------------------------------------------------------------------------
var encoder = base32.StdEncoding.WithPadding(base32.NoPadding)
func generateID() string {
return ""
}
// ----------------------------------------------------------------------------
// Request Manager
// ----------------------------------------------------------------------------
func NewRequestManager(envoy *Envoy) *RequestManager {
ctx, cancel := context.WithCancel(
component.MustExtend(envoy.Context(), "request_manager"))
m := &RequestManager{
reqs: make(map[string]*request),
envoy: envoy,
events: envoy.SubscribeEvents(),
inbox: envoy.SubscribeInbox([]string{"EVENT", "EOSE", "CLOSED"}),
ctx: ctx,
cancel: cancel,
}
if h := envoy.Handler(); h != nil {
comp := component.FromContext(ctx)
m.handler = h
m.logger = slog.New(h).With(slog.Any("component", comp))
}
m.wg.Add(1)
go m.handleEvents()
return m
}
func (m *RequestManager) Stream(
filters [][]byte,
) (
reqID string,
events <-chan ReqEvent,
closed <-chan ReqClosed,
) {
ctx := component.MustExtend(m.ctx, "stream")
id := generateID()
terminate := func() {
m.mu.Lock()
defer m.mu.Unlock()
m.unsubscribeInboxLock(id)
delete(m.reqs, id)
m.reqWg.Done()
}
m.mu.Lock()
defer m.mu.Unlock()
m.reqWg.Add(1)
r := newStreamRequest(ctx, id, envelope.EncloseReq(id, filters),
m.subscribeInboxLock(id), m.done, terminate, m.handler)
m.reqs[id] = r
return id, r.Events(), r.Closed()
}
func (m *RequestManager) Query(
filters [][]byte,
timeout time.Duration,
) (
events []ReqEvent,
closed *ReqClosed,
) {
ctx, _ := context.WithTimeout(component.MustExtend(m.ctx, "query"), timeout)
id := generateID()
terminate := func() {
m.mu.Lock()
defer m.mu.Unlock()
m.unsubscribeInboxLock(id)
}
m.mu.Lock()
r := newQueryRequest(ctx, id, envelope.EncloseReq(id, filters),
m.subscribeInboxLock(id), m.done, terminate, m.handler)
m.mu.Unlock()
for {
select {
case <-m.ctx.Done():
return
case rEvent, ok := <-r.Events():
if !ok {
return
}
events = append(events, rEvent)
case rClosed := <-r.Closed():
closed = &rClosed
return
}
}
}
func (m *RequestManager) Cancel(id string) error {
req, ok := m.reqs[id]
if !ok {
return fmt.Errorf("req not found: %s", id)
}
req.Close()
return nil
}
func (m *RequestManager) Close() {
m.cancel()
m.wg.Wait()
}
func (m *RequestManager) start() {
}
func (m *RequestManager) stop() {
}
func (m *RequestManager) subscribeInboxLock(id string) <-chan InboxMessage {
ch := make(chan InboxMessage)
m.inboxSubs[id] = ch
return ch
}
func (m *RequestManager) unsubscribeInboxLock(id string) {
ch, ok := m.inboxSubs[id]
if !ok {
return
}
close(ch)
delete(m.inboxSubs, id)
}
func (m *RequestManager) handleEvents() {
defer m.wg.Done()
for {
select {
case <-m.ctx.Done():
return
case ev := <-m.events:
switch ev.Kind {
case EventConnected:
m.start()
case EventDisconnected:
m.stop()
}
}
}
}
func (m *RequestManager) routeInbox() {
defer m.wg.Done()
for {
select {
case <-m.ctx.Done():
return
case ev, ok := <-m.inbox:
if !ok {
return
}
url, err := honeybee.NormalizeURL(ev.ID)
if err != nil {
continue
}
m.mu.RLock()
sub, ok := m.inboxSubs[url]
m.mu.RUnlock()
if !ok {
continue
}
select {
case <-m.ctx.Done():
return
case sub <- ev:
}
}
}
}
// ----------------------------------------------------------------------------
// Request
// ----------------------------------------------------------------------------
func newStreamRequest(
ctx context.Context,
id string,
req []byte,
inbox <-chan InboxMessage,
stop chan struct{},
terminate func(),
handler slog.Handler,
) *request {
ctx, cancel := context.WithCancel(component.MustExtend(ctx, "request"))
r := &request{
id: id,
req: req,
query: false,
inbox: inbox,
stop: stop,
terminate: terminate,
ctx: ctx,
cancel: cancel,
}
if handler != nil {
comp := component.FromContext(ctx)
r.logger = slog.New(handler).With(slog.Any("component", comp))
}
return r
}
func newQueryRequest(
ctx context.Context,
id string,
req []byte,
inbox <-chan InboxMessage,
stop chan struct{},
terminate func(),
handler slog.Handler,
) *request {
ctx, cancel := context.WithCancel(component.MustExtend(ctx, "request"))
r := &request{
id: id,
req: req,
query: true,
inbox: inbox,
stop: stop,
terminate: terminate,
ctx: ctx,
cancel: cancel,
}
if handler != nil {
comp := component.FromContext(ctx)
r.logger = slog.New(handler).With(slog.Any("component", comp))
}
return r
}
func (r *request) Close() {
r.cancel()
r.wg.Wait()
r.terminate()
}
func (r *request) Events() <-chan ReqEvent {
return r.events
}
func (r *request) Closed() <-chan ReqClosed {
return r.closed
}
-613
View File
@@ -1,613 +0,0 @@
package prism
import (
"context"
"fmt"
"git.wisehodl.dev/jay/go-mana-component"
"git.wisehodl.dev/jay/go-roots-ws"
"github.com/stretchr/testify/assert"
"reflect"
"slices"
"sync"
"sync/atomic"
"testing"
"time"
)
// TODO: remove
var (
_ context.Context
_ assert.Assertions
_ testing.T
_ time.Time
_ fmt.Formatter
)
// Helpers
type reqTestHarness struct {
ctx context.Context
pm *Postmaster
events chan PoolEvent
sent map[string][]string
sentMu *sync.RWMutex
isConnected func(string) bool
collector *JournalCollector
journals <-chan JournalEntry
closed atomic.Bool
}
func setupReqHarness(t *testing.T, peers []string) reqTestHarness {
ctx := component.MustNew(context.Background(), "prism", "test")
pm, poolEvents, sent, sentMu, isConnected := mockReqPostmaster(t, ctx, peers)
collector := NewJournalCollector()
journals := collector.Out()
return reqTestHarness{
ctx: ctx,
pm: pm,
events: poolEvents,
sent: sent,
sentMu: sentMu,
isConnected: isConnected,
collector: collector,
journals: journals,
}
}
func mockReqPostmaster(
t *testing.T,
ctx context.Context,
peers []string,
) (
pm *Postmaster,
poolEvents chan PoolEvent,
sent map[string][]string,
sentMu *sync.RWMutex,
isConnected func(id string) bool,
) {
t.Helper()
poolHasPeer := func(id string) (string, bool) {
if ok := slices.Contains(peers, id); ok {
return id, true
}
return "", false
}
poolEvents = make(chan PoolEvent, 4)
pmEvents := make(chan PoolEvent, 4)
connected := make(map[string]bool)
connMu := sync.RWMutex{}
isConnected = func(id string) bool {
connMu.RLock()
defer connMu.RUnlock()
return connected[id]
}
go func() {
for ev := range poolEvents {
connMu.Lock()
switch ev.Kind {
case EventConnected:
connected[ev.ID] = true
case EventDisconnected:
connected[ev.ID] = false
}
connMu.Unlock()
pmEvents <- ev
}
}()
sent = make(map[string][]string)
sentMu = &sync.RWMutex{}
poolSendFunc := func(id string, data Envelope) error {
sentMu.Lock()
defer sentMu.Unlock()
sent[id] = append(sent[id], string(data))
return nil
}
pm = NewPostmaster(ctx, poolHasPeer, pmEvents, poolSendFunc, nil)
for _, id := range peers {
poolEvents <- PoolEvent{ID: id, Kind: EventAdded, At: time.Now()}
connected[id] = false
}
Eventually(t, func() bool { return len(pm.Peers()) == len(peers) },
"should add peers")
return
}
func expectSentMessage(t *testing.T,
sent map[string][]string,
mu *sync.RWMutex,
peerID string,
msg []byte,
index int,
) {
t.Helper()
Eventually(t, func() bool {
mu.RLock()
defer mu.RUnlock()
if len(sent[peerID]) <= index {
return false
}
return sent[peerID][index] == string(msg)
}, fmt.Sprintf("expected message to be sent to %q: %s", peerID, string(msg)))
}
func neverSentMessage(t *testing.T,
sent map[string][]string,
mu *sync.RWMutex,
peerID string,
msg []byte,
index int,
) {
t.Helper()
Never(t, func() bool {
mu.RLock()
defer mu.RUnlock()
if len(sent[peerID]) <= index {
return false
}
return sent[peerID][index] == string(msg)
}, fmt.Sprintf("unexpected message sent to %q: %s", peerID, string(msg)))
}
func expectJournalEntry(t *testing.T,
journals <-chan JournalEntry,
expected reflect.Type,
) {
t.Helper()
Eventually(t, func() bool {
select {
default:
return false
case entry := <-journals:
got := reflect.TypeOf(entry)
return expected == got
}
}, fmt.Sprintf("expected journal entry: %s", expected))
}
// Tests
func TestStreamReq_InitialReq(t *testing.T) {
t.Run("sends req to one peer", func(t *testing.T) {
peers := []string{"peer"}
h := setupReqHarness(t, peers)
// connect to peer
h.events <- PoolEvent{ID: "peer", Kind: EventConnected, At: time.Now()}
Eventually(t, func() bool { return h.isConnected("peer") },
"expected peer to connect")
// open req
filters := [][]byte{[]byte("{}")}
req := NewStreamReq(
h.ctx, "REQ", filters, peers, h.pm, h.isConnected,
h.collector, func() { h.closed.Store(true) }, nil)
expectedReq := envelope.EncloseReq("REQ", filters)
expectedClose := envelope.EncloseClose("REQ")
expectJournalEntry(t, h.journals, reflect.TypeOf(ReqQueuedJournal{}))
expectJournalEntry(t, h.journals, reflect.TypeOf(ReqSendOutcomeJournal{}))
expectSentMessage(t, h.sent, h.sentMu, "peer", expectedReq, 0)
// close req
req.Close()
expectJournalEntry(t, h.journals, reflect.TypeOf(CloseQueuedJournal{}))
expectJournalEntry(t, h.journals, reflect.TypeOf(CloseSendOutcomeJournal{}))
expectSentMessage(t, h.sent, h.sentMu, "peer", expectedClose, 1)
Eventually(t, func() bool { return h.closed.Load() },
"expected close callback to be called")
})
t.Run("doesn't send to disconnected peer", func(t *testing.T) {
peers := []string{"peer"}
h := setupReqHarness(t, peers)
// open req
filters := [][]byte{[]byte("{}")}
req := NewStreamReq(
h.ctx, "REQ", filters, peers, h.pm, h.isConnected,
h.collector, func() { h.closed.Store(true) }, nil)
expectedReq := envelope.EncloseReq("REQ", filters)
expectedClose := envelope.EncloseClose("REQ")
neverSentMessage(t, h.sent, h.sentMu, "peer", expectedReq, 0)
// close req
req.Close()
neverSentMessage(t, h.sent, h.sentMu, "peer", expectedClose, 0)
Eventually(t, func() bool { return h.closed.Load() },
"expected close callback to be called")
})
t.Run("sends req to multiple peers", func(t *testing.T) {
peers := []string{"peer1", "peer2"}
h := setupReqHarness(t, peers)
// connect to peers
h.events <- PoolEvent{ID: "peer1", Kind: EventConnected, At: time.Now()}
h.events <- PoolEvent{ID: "peer2", Kind: EventConnected, At: time.Now()}
Eventually(t, func() bool { return h.isConnected("peer1") },
"expected peer 1 to connect")
Eventually(t, func() bool { return h.isConnected("peer2") },
"expected peer 2 to connect")
// open req
filters := [][]byte{[]byte("{}")}
req := NewStreamReq(
h.ctx, "REQ", filters, peers, h.pm, h.isConnected,
h.collector, func() { h.closed.Store(true) }, nil)
expectedReq := envelope.EncloseReq("REQ", filters)
expectedClose := envelope.EncloseClose("REQ")
expectSentMessage(t, h.sent, h.sentMu, "peer1", expectedReq, 0)
expectSentMessage(t, h.sent, h.sentMu, "peer2", expectedReq, 0)
// expect two req outcomes
expectJournalEntry(t, h.journals, reflect.TypeOf(ReqSendOutcomeJournal{}))
expectJournalEntry(t, h.journals, reflect.TypeOf(ReqSendOutcomeJournal{}))
// close req
req.Close()
expectSentMessage(t, h.sent, h.sentMu, "peer1", expectedClose, 1)
expectSentMessage(t, h.sent, h.sentMu, "peer2", expectedClose, 1)
Eventually(t, func() bool { return h.closed.Load() },
"expected close callback to be called")
})
}
func TestStreamReq_EventForwarding(t *testing.T) {
t.Run("events are forwarded", func(t *testing.T) {
peers := []string{"peer"}
h := setupReqHarness(t, peers)
filters := [][]byte{[]byte("{}")}
req := NewStreamReq(
h.ctx, "REQ", filters, peers, h.pm, h.isConnected,
h.collector, func() { h.closed.Store(true) }, nil)
// simulate receive event
req.order(newEventTask("peer", time.Now(), []byte("event")))
// receive event
var event ReqEvent
Eventually(t, func() bool {
select {
default:
return false
case event = <-req.events:
return true
}
}, "expected event")
assert.Equal(t, "peer", event.PeerID)
assert.False(t, event.ReceivedAt.IsZero())
assert.Equal(t, []byte("event"), event.Data)
})
t.Run("events channel closes on close", func(t *testing.T) {
peers := []string{"peer"}
h := setupReqHarness(t, peers)
// open req
filters := [][]byte{[]byte("{}")}
req := NewStreamReq(
h.ctx, "REQ", filters, peers, h.pm, h.isConnected,
h.collector, func() { h.closed.Store(true) }, nil)
// close req
req.Close()
Eventually(t, func() bool {
select {
default:
return false
case _, ok := <-req.events:
// expect channel close
return !ok
}
}, "expected event channel to close")
})
}
func TestStreamReq_EOSEHandling(t *testing.T) {
t.Run("eose emits journal", func(t *testing.T) {
peers := []string{"peer"}
h := setupReqHarness(t, peers)
// open req
filters := [][]byte{[]byte("{}")}
req := NewStreamReq(
h.ctx, "REQ", filters, peers, h.pm, h.isConnected,
h.collector, func() { h.closed.Store(true) }, nil)
// simulate EOSE
req.order(newEOSETask("peer", time.Now()))
expectJournalEntry(t, h.journals, reflect.TypeOf(ReceivedEOSEJournal{}))
})
}
func TestStreamReq_ClosedHandling(t *testing.T) {
t.Run("closed forwards message once", func(t *testing.T) {
peers := []string{"peer"}
h := setupReqHarness(t, peers)
// open req
filters := [][]byte{[]byte("{}")}
req := NewStreamReq(
h.ctx, "REQ", filters, peers, h.pm, h.isConnected,
h.collector, func() { h.closed.Store(true) }, nil)
// simulate closed
req.order(newClosedTask("peer", time.Now(), "closed"))
expectJournalEntry(t, h.journals, reflect.TypeOf(ReceivedClosedJournal{}))
// receive message
var message ReqMessage
Eventually(t, func() bool {
select {
default:
return false
case message = <-req.messages:
return true
}
}, "expected closed message")
assert.Equal(t, "peer", message.PeerID)
assert.False(t, message.ReceivedAt.IsZero())
assert.Equal(t, "closed", message.Data)
// multiple closed emit journals
req.order(newClosedTask("peer", time.Now(), "closed"))
expectJournalEntry(t, h.journals, reflect.TypeOf(ReceivedClosedJournal{}))
// but do not emit more than one message to the caller
Never(t, func() bool {
select {
default:
return false
case <-req.messages:
return true
}
}, "second closed message should not arrive")
// close req
req.Close()
// expect messages channel to close
Eventually(t, func() bool {
select {
default:
return false
case _, ok := <-req.messages:
// expect channel close
return !ok
}
}, "expected messages channel to close")
})
}
func TestStreamReq_Reconnect(t *testing.T) {
t.Run("req replays after reconnect", func(t *testing.T) {
peers := []string{"peer"}
h := setupReqHarness(t, peers)
h.events <- PoolEvent{ID: "peer", Kind: EventConnected, At: time.Now()}
Eventually(t, func() bool { return h.isConnected("peer") },
"expected peer to connect")
// open req
filters := [][]byte{[]byte("{}")}
req := NewStreamReq(
h.ctx, "REQ", filters, peers, h.pm, h.isConnected,
h.collector, func() { h.closed.Store(true) }, nil)
expectedReq := envelope.EncloseReq("REQ", filters)
expectedClose := envelope.EncloseClose("REQ")
// initial req is sent
expectSentMessage(t, h.sent, h.sentMu, "peer", expectedReq, 0)
expectJournalEntry(t, h.journals, reflect.TypeOf(ReqSendOutcomeJournal{}))
// cycle disconnect-reconnect
h.events <- PoolEvent{ID: "peer", Kind: EventDisconnected, At: time.Now()}
Eventually(t, func() bool { return !h.isConnected("peer") },
"expected peer to disconnect")
h.events <- PoolEvent{ID: "peer", Kind: EventConnected, At: time.Now()}
Eventually(t, func() bool { return h.isConnected("peer") },
"expected peer to connect")
// simulate req manager handling connect event
req.order(newHandleReconnect("peer"))
// expect replayed req
expectSentMessage(t, h.sent, h.sentMu, "peer", expectedReq, 1)
expectJournalEntry(t, h.journals, reflect.TypeOf(ReqSendOutcomeJournal{}))
// close req
req.Close()
expectSentMessage(t, h.sent, h.sentMu, "peer", expectedClose, 2)
Eventually(t, func() bool { return h.closed.Load() },
"expected close callback to be called")
})
t.Run("delayed connection sends req", func(t *testing.T) {
peers := []string{"peer"}
h := setupReqHarness(t, peers)
// open req
filters := [][]byte{[]byte("{}")}
req := NewStreamReq(
h.ctx, "REQ", filters, peers, h.pm, h.isConnected,
h.collector, func() { h.closed.Store(true) }, nil)
expectedReq := envelope.EncloseReq("REQ", filters)
expectedClose := envelope.EncloseClose("REQ")
neverSentMessage(t, h.sent, h.sentMu, "peer", expectedReq, 0)
// postmaster-side connect
h.events <- PoolEvent{ID: "peer", Kind: EventConnected, At: time.Now()}
Eventually(t, func() bool { return h.isConnected("peer") },
"expected peer to connect")
// simulate req manager handling connect event
req.order(newHandleReconnect("peer"))
expectSentMessage(t, h.sent, h.sentMu, "peer", expectedReq, 0)
expectJournalEntry(t, h.journals, reflect.TypeOf(ReqSendOutcomeJournal{}))
// close req
req.Close()
expectSentMessage(t, h.sent, h.sentMu, "peer", expectedClose, 1)
Eventually(t, func() bool { return h.closed.Load() },
"expected close callback to be called")
})
t.Run("no replay when closing", func(t *testing.T) {
peers := []string{"peer"}
h := setupReqHarness(t, peers)
// open req
filters := [][]byte{[]byte("{}")}
req := NewStreamReq(
h.ctx, "REQ", filters, peers, h.pm, h.isConnected,
h.collector, func() { h.closed.Store(true) }, nil)
expectedReq := envelope.EncloseReq("REQ", filters)
neverSentMessage(t, h.sent, h.sentMu, "peer", expectedReq, 0)
// postmaster-side connect
h.events <- PoolEvent{ID: "peer", Kind: EventConnected, At: time.Now()}
Eventually(t, func() bool { return h.isConnected("peer") },
"expected peer to connect")
// close req
req.Close()
// reconnect during or after close
req.order(newHandleReconnect("peer"))
neverSentMessage(t, h.sent, h.sentMu, "peer", expectedReq, 0)
Eventually(t, func() bool { return h.closed.Load() },
"expected close callback to be called")
})
}
func TestStreamReq_Terminal(t *testing.T) {
}
func TestStreamReq_Close(t *testing.T) {
t.Run("close is idempotent", func(t *testing.T) {
peers := []string{"peer"}
h := setupReqHarness(t, peers)
// connect to peer
h.events <- PoolEvent{ID: "peer", Kind: EventConnected, At: time.Now()}
Eventually(t, func() bool { return h.isConnected("peer") },
"expected peer to connect")
// open req
filters := [][]byte{[]byte("{}")}
req := NewStreamReq(
h.ctx, "REQ", filters, peers, h.pm, h.isConnected,
h.collector, func() { h.closed.Store(true) }, nil)
expectedReq := envelope.EncloseReq("REQ", filters)
expectedClose := envelope.EncloseClose("REQ")
expectSentMessage(t, h.sent, h.sentMu, "peer", expectedReq, 0)
expectJournalEntry(t, h.journals, reflect.TypeOf(ReqSendOutcomeJournal{}))
// close req twice
req.Close()
req.Close()
// only expect one close
expectSentMessage(t, h.sent, h.sentMu, "peer", expectedClose, 1)
// second close never arrives
neverSentMessage(t, h.sent, h.sentMu, "peer", expectedClose, 2)
Eventually(t, func() bool { return h.closed.Load() },
"expected close callback to be called")
})
t.Run("close not sent if req was never sent", func(t *testing.T) {
peers := []string{"peer"}
h := setupReqHarness(t, peers)
// open req
filters := [][]byte{[]byte("{}")}
req := NewStreamReq(
h.ctx, "REQ", filters, peers, h.pm, h.isConnected,
h.collector, func() { h.closed.Store(true) }, nil)
expectedReq := envelope.EncloseReq("REQ", filters)
expectedClose := envelope.EncloseClose("REQ")
neverSentMessage(t, h.sent, h.sentMu, "peer", expectedReq, 0)
// close req
req.Close()
// req was never sent, so a close should not be sent
neverSentMessage(t, h.sent, h.sentMu, "peer", expectedClose, 0)
Eventually(t, func() bool { return h.closed.Load() },
"expected close callback to be called")
})
t.Run("close not sent if req was cancelled", func(t *testing.T) {
peers := []string{"peer"}
h := setupReqHarness(t, peers)
// open req
filters := [][]byte{[]byte("{}")}
req := NewStreamReq(
h.ctx, "REQ", filters, peers, h.pm, h.isConnected,
h.collector, func() { h.closed.Store(true) }, nil)
expectedClose := envelope.EncloseClose("REQ")
// simulate cancelled req outcome
req.order(newReqOutcomeTask("peer", LetterOutcome{Kind: OutcomeCancelled}))
// close req
req.Close()
// req was never sent, so a close should not be sent
neverSentMessage(t, h.sent, h.sentMu, "peer", expectedClose, 0)
Eventually(t, func() bool { return h.closed.Load() },
"expected close callback to be called")
})
}
func TestStreamReq_Journals(t *testing.T) {
}