From 0c08a7ce094fa8ec0bcf78891ffa63792e8b9e37 Mon Sep 17 00:00:00 2001 From: Jay Date: Sun, 10 May 2026 18:47:18 -0400 Subject: [PATCH] wrote postmaster --- adapter.go | 14 +-- courier_test.go | 2 +- embassy_test.go | 14 ++- post.go | 226 +++++++++++++++++++++++++++++++++++++-------- postmaster_test.go | 208 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 414 insertions(+), 50 deletions(-) create mode 100644 postmaster_test.go diff --git a/adapter.go b/adapter.go index ab9a6d1..5ebb92f 100644 --- a/adapter.go +++ b/adapter.go @@ -64,7 +64,7 @@ var convertPoolEvent = map[honeybee.OutboundPoolEventKind]PoolEventKind{ type Adapter interface { Peers() []string - HasPeer(id string) bool + HasPeer(id string) (string, bool) IsConnected(id string) bool Subscribe() <-chan PoolEvent Send(id string, data Envelope) error @@ -253,17 +253,17 @@ func (e *Embassy) Peers() []string { return peers } -func (e *Embassy) HasPeer(url string) bool { +func (e *Embassy) HasPeer(url string) (string, bool) { url, err := honeybee.NormalizeURL(url) if err != nil { - return false + return "", false } e.mu.RLock() defer e.mu.RUnlock() _, ok := e.peers[url] - return ok + return url, ok } func (e *Embassy) IsConnected(url string) bool { @@ -312,7 +312,7 @@ func (e *Embassy) runEventRouter() { continue } - if !e.HasPeer(url) { + if _, ok := e.HasPeer(url); !ok { continue } @@ -384,8 +384,8 @@ func (h *Hotel) Peers() []string { return nil } -func (h *Hotel) HasPeer(id string) bool { - return false +func (h *Hotel) HasPeer(id string) (string, bool) { + return "", false } func (h *Hotel) IsConnected(id string) bool { diff --git a/courier_test.go b/courier_test.go index 2900da0..0b4988f 100644 --- a/courier_test.go +++ b/courier_test.go @@ -78,7 +78,7 @@ func TestCourierMultipleSends(t *testing.T) { c.HandleConnect() outcomes := make([]LetterOutcome, 0, 2) - called := make(chan LetterOutcome, 4) + called := make(chan LetterOutcome, 2) c.Enqueue(newTestLetter(ctx, 1), func(o LetterOutcome) { called <- o }) c.Enqueue(newTestLetter(ctx, 2), func(o LetterOutcome) { called <- o }) diff --git a/embassy_test.go b/embassy_test.go index 4f1ec7c..cf8bec1 100644 --- a/embassy_test.go +++ b/embassy_test.go @@ -103,7 +103,9 @@ func TestEmbassyPeerRegistry(t *testing.T) { // add e.Dispatch("wss://test") - assert.True(t, e.HasPeer("wss://test")) + url, ok := e.HasPeer("wss://test/") + assert.Equal(t, "wss://test", url) + assert.True(t, ok) assert.False(t, e.IsConnected("wss://test")) // connect @@ -114,7 +116,7 @@ func TestEmbassyPeerRegistry(t *testing.T) { } Eventually(t, func() bool { - exists := e.HasPeer("wss://test") + _, exists := e.HasPeer("wss://test") connected := e.IsConnected("wss://test") return exists && connected }, "expected: exists, connected") @@ -127,7 +129,7 @@ func TestEmbassyPeerRegistry(t *testing.T) { } Eventually(t, func() bool { - exists := e.HasPeer("wss://test") + _, exists := e.HasPeer("wss://test") connected := e.IsConnected("wss://test") return exists && !connected }, "expected: exists, disconnected") @@ -135,7 +137,8 @@ func TestEmbassyPeerRegistry(t *testing.T) { // remove e.Dismiss("wss://test") - assert.False(t, e.HasPeer("wss://test")) + _, ok = e.HasPeer("wss://test") + assert.False(t, ok) assert.False(t, e.IsConnected("wss://test")) } @@ -236,7 +239,8 @@ func TestEmbassyClose(t *testing.T) { }, "expected peer removed") // peer list is empty - assert.False(t, e.HasPeer("wss://test")) + _, ok := e.HasPeer("wss://test") + assert.False(t, ok) assert.Len(t, e.Peers(), 0) // subs close diff --git a/post.go b/post.go index d9c18d8..b805c61 100644 --- a/post.go +++ b/post.go @@ -63,17 +63,19 @@ type LetterOutcome struct { // Postmaster type Postmaster struct { - couriers map[string]*Courier - events <-chan PoolEvent // Adapter.Subscribe - send PoolSendFunc // Adapter.Send - counter atomic.Uint64 + couriers map[string]*Courier + poolHasPeer func(id string) (string, bool) + poolEvents <-chan PoolEvent // Adapter.Subscribe + poolSend PoolSendFunc // Adapter.Send + counter atomic.Uint64 - ctx context.Context - cancel context.CancelFunc - mu sync.Mutex - wg sync.WaitGroup - cfg postmasterConfig - logger *slog.Logger + ctx context.Context + cancel context.CancelFunc + mu sync.RWMutex + wg sync.WaitGroup + cfg postmasterConfig + handler slog.Handler + logger *slog.Logger } // Courier @@ -102,33 +104,190 @@ type courierCommand interface { // Options +const ( + DefaultPostmasterDeadline = 30 * time.Second +) + type PostmasterOption func(*postmasterConfig) -type postmasterConfig struct{} +type postmasterConfig struct { + defaultDeadline time.Duration +} + +func WithDefaultDeadline(d time.Duration) PostmasterOption { + return func(c *postmasterConfig) { c.defaultDeadline = d } +} + +type SendOption func(*sendConfig) + +type sendConfig struct { + deadline time.Duration +} + +func WithDeadline(d time.Duration) SendOption { + return func(c *sendConfig) { c.deadline = d } +} // ---------------------------------------------------------------------------- // Postmaster // ---------------------------------------------------------------------------- func NewPostmaster( - pool *Adapter, - send PoolSendFunc, + ctx context.Context, + poolHasPeer func(id string) (string, bool), + poolEvents <-chan PoolEvent, + poolSendFunc PoolSendFunc, + handler slog.Handler, opts ...PostmasterOption, ) *Postmaster { - return nil + ctx, cancel := context.WithCancel( + component.MustNew(ctx, "prism", "postmaster")) + + cfg := postmasterConfig{ + defaultDeadline: DefaultPostmasterDeadline, + } + for _, opt := range opts { + opt(&cfg) + } + + pm := &Postmaster{ + couriers: make(map[string]*Courier), + poolHasPeer: poolHasPeer, + poolEvents: poolEvents, + poolSend: poolSendFunc, + ctx: ctx, + cancel: cancel, + cfg: cfg, + } + + if handler != nil { + comp, ok := component.Get(ctx) + if ok { + pm.handler = handler + pm.logger = slog.New(handler).With(slog.Any("component", comp)) + } + } + + pm.wg.Add(1) + go pm.handlePoolEvents() + + return pm } -func (m *Postmaster) Send( +func (pm *Postmaster) Send( ctx context.Context, peerID string, data Envelope, - deadline time.Duration, - onOutcome func(LetterOutcome), // should be non-blocking -) (LetterID, error) { - return 0, nil + onOutcome func(LetterOutcome), + opts ...SendOption, +) (context.CancelFunc, error) { + cfg := sendConfig{deadline: pm.cfg.defaultDeadline} + for _, opt := range opts { + opt(&cfg) + } + + pm.mu.RLock() + defer pm.mu.RUnlock() + + // check if peer courier exists + peerID, ok := pm.poolHasPeer(peerID) + if !ok { + return nil, fmt.Errorf("peer not found") + } + courier, ok := pm.couriers[peerID] + if !ok { + return nil, fmt.Errorf("peer not found") + } + + ctx, cancel := context.WithTimeout(ctx, cfg.deadline) + letter := OutboundLetter{ + id: pm.counter.Add(1), + peerID: peerID, + data: data, + ctx: ctx, + cancel: cancel, + } + + courier.Enqueue(letter, onOutcome) + + return cancel, nil } -func (m *Postmaster) Close() {} +func (pm *Postmaster) Peers() []string { + pm.mu.RLock() + defer pm.mu.RUnlock() + + peers := make([]string, 0, len(pm.couriers)) + for id, _ := range pm.couriers { + peers = append(peers, id) + } + return peers +} + +func (pm *Postmaster) Close() { + pm.cancel() + pm.wg.Wait() + + // close each courier + pm.mu.Lock() + couriers := pm.couriers + pm.couriers = make(map[string]*Courier) + pm.mu.Unlock() + + for _, courier := range couriers { + courier.Close() + } +} + +func (pm *Postmaster) handlePoolEvents() { + defer pm.wg.Done() + + for { + select { + case <-pm.ctx.Done(): + return + case ev := <-pm.poolEvents: + switch ev.Kind { + case EventAdded: + pm.mu.Lock() + _, exists := pm.couriers[ev.ID] + if exists { + pm.mu.Unlock() + continue + } + send := func(data Envelope) error { return pm.poolSend(ev.ID, data) } + courier := NewCourier(pm.ctx, send, pm.handler) + pm.couriers[ev.ID] = courier + pm.mu.Unlock() + + case EventRemoved: + pm.mu.Lock() + courier, exists := pm.couriers[ev.ID] + if exists { + delete(pm.couriers, ev.ID) + } + pm.mu.Unlock() + courier.Close() + + case EventConnected: + pm.mu.RLock() + courier, exists := pm.couriers[ev.ID] + if exists { + courier.HandleConnect() + } + pm.mu.RUnlock() + + case EventDisconnected: + pm.mu.RLock() + courier, exists := pm.couriers[ev.ID] + if exists { + courier.HandleDisconnect() + } + pm.mu.RUnlock() + } + } + } +} // ---------------------------------------------------------------------------- // Courier @@ -201,9 +360,18 @@ func (c *Courier) HandleDisconnect() { } func (c *Courier) Close() { - c.command(&cmdCloseCourier{}) c.cancel() c.wg.Wait() + // cancel remaining letters + for { + t, ok := c.pop() + if !ok { + break + } + t.letter.cancel() + t.setMissedAt(time.Now()) + c.doneOnce(t) + } } // Internal @@ -211,7 +379,6 @@ func (c *Courier) Close() { func (c *Courier) command(cmd courierCommand) { select { case <-c.ctx.Done(): - fmt.Println("here") case c.cmd <- cmd: } } @@ -368,18 +535,3 @@ func (cmd cmdHandleSendResult) apply(c *Courier) { c.doneOnce(cmd.traveller) } } - -type cmdCloseCourier struct{} - -func (cmd cmdCloseCourier) apply(c *Courier) { - // cancel remaining letters - for { - t, ok := c.pop() - if !ok { - break - } - t.letter.cancel() - t.setMissedAt(time.Now()) - c.doneOnce(t) - } -} diff --git a/postmaster_test.go b/postmaster_test.go new file mode 100644 index 0000000..d6f3668 --- /dev/null +++ b/postmaster_test.go @@ -0,0 +1,208 @@ +package prism + +import ( + "context" + // "git.wisehodl.dev/jay/go-mana-component" + "github.com/stretchr/testify/assert" + // "sync/atomic" + "testing" + "time" +) + +func TestPostmasterUnknownPeerSend(t *testing.T) { + ctx := context.Background() + poolHasPeer := func(id string) (string, bool) { return id, true } + poolEvents := make(chan PoolEvent, 4) + poolSendFunc := func(id string, data Envelope) error { return nil } + + pm := NewPostmaster(ctx, poolHasPeer, poolEvents, poolSendFunc, nil) + + _, err := pm.Send(ctx, "wss://test", []byte("[]"), func(LetterOutcome) {}) + assert.Error(t, err) +} + +func TestPostmasterSend(t *testing.T) { + ctx := context.Background() + poolHasPeer := func(id string) (string, bool) { return id, true } + poolEvents := make(chan PoolEvent, 4) + poolSendFunc := func(id string, data Envelope) error { return nil } + + pm := NewPostmaster(ctx, poolHasPeer, poolEvents, poolSendFunc, nil) + + poolEvents <- PoolEvent{ + ID: "wss://test", Kind: EventAdded, At: time.Now()} + poolEvents <- PoolEvent{ + ID: "wss://test", Kind: EventConnected, At: time.Now()} + + Eventually(t, func() bool { return len(pm.Peers()) > 0 }, + "should add peer") + + called := make(chan LetterOutcome, 1) + _, err := pm.Send( + ctx, "wss://test", []byte("[]"), func(o LetterOutcome) { called <- o }) + assert.NoError(t, err) + + var outcome LetterOutcome + Eventually(t, func() bool { + select { + default: + return false + case outcome = <-called: + return true + } + }, "should have received outcome") + + assert.Equal(t, OutcomeSent, outcome.Kind) +} + +func TestPostmasterPeerRemoved(t *testing.T) { + ctx := context.Background() + poolHasPeer := func(id string) (string, bool) { return id, true } + poolEvents := make(chan PoolEvent, 4) + poolSendFunc := func(id string, data Envelope) error { return nil } + + pm := NewPostmaster(ctx, poolHasPeer, poolEvents, poolSendFunc, nil) + + // add peer, but do not connect + poolEvents <- PoolEvent{ + ID: "wss://test", Kind: EventAdded, At: time.Now()} + Eventually(t, func() bool { return len(pm.Peers()) > 0 }, + "should add peer") + + // send two letters + outcomes := make([]LetterOutcome, 0, 2) + called := make(chan LetterOutcome, 2) + _, err := pm.Send( + ctx, "wss://test", []byte("[]"), func(o LetterOutcome) { called <- o }) + assert.NoError(t, err) + _, err = pm.Send( + ctx, "wss://test", []byte("[]"), func(o LetterOutcome) { called <- o }) + assert.NoError(t, err) + + // wait for them to hit the courier queue + time.Sleep(100 * time.Millisecond) + + // remove the peer + poolEvents <- PoolEvent{ + ID: "wss://test", Kind: EventRemoved, At: time.Now()} + + // expect each letter to return cancelled + Eventually(t, func() bool { + select { + default: + return false + case o := <-called: + outcomes = append(outcomes, o) + return len(outcomes) == 2 + } + }, "should have returned 2 outcomes") + + if len(outcomes) >= 2 { + assert.Equal(t, OutcomeCancelled, outcomes[0].Kind) + assert.Equal(t, OutcomeCancelled, outcomes[1].Kind) + } + + // subsequent sends should fail + _, err = pm.Send( + ctx, "wss://test", []byte("[]"), func(o LetterOutcome) { called <- o }) + assert.Error(t, err) +} + +func TestPostmasterCourierCloseRace(t *testing.T) { + ctx := context.Background() + poolHasPeer := func(id string) (string, bool) { return id, true } + poolEvents := make(chan PoolEvent, 4) + poolSendFunc := func(id string, data Envelope) error { return nil } + + pm := NewPostmaster(ctx, poolHasPeer, poolEvents, poolSendFunc, nil) + + // add peer, but do not connect + poolEvents <- PoolEvent{ + ID: "wss://test", Kind: EventAdded, At: time.Now()} + Eventually(t, func() bool { return len(pm.Peers()) > 0 }, + "should add peer") + + // remove the peer + poolEvents <- PoolEvent{ + ID: "wss://test", Kind: EventRemoved, At: time.Now()} + + // send a letter + time.Sleep(5 * time.Microsecond) // small wait lines up the race condition + var outcome LetterOutcome + called := make(chan LetterOutcome, 1) + _, err := pm.Send( + ctx, "wss://test", []byte("[]"), func(o LetterOutcome) { called <- o }) + + if err != nil { + // the close won the race, the letter was not sent + return + } + + // the letter might beat the courier close and return cancelled + Eventually(t, func() bool { + select { + default: + return false + case outcome = <-called: + return true + } + }, "should have returned 1 outcomes") + + if outcome.LetterID == 0 { + t.Fatal("did not receive an outcome") + } + + assert.Equal(t, OutcomeCancelled, outcome.Kind) +} + +func TestPostmasterClose(t *testing.T) { + ctx := context.Background() + poolHasPeer := func(id string) (string, bool) { return id, true } + poolEvents := make(chan PoolEvent, 4) + poolSendFunc := func(id string, data Envelope) error { return nil } + + pm := NewPostmaster(ctx, poolHasPeer, poolEvents, poolSendFunc, nil) + + // add peer, but do not connect + poolEvents <- PoolEvent{ + ID: "wss://test", Kind: EventAdded, At: time.Now()} + Eventually(t, func() bool { return len(pm.Peers()) > 0 }, + "should add peer") + + // send two letters + outcomes := make([]LetterOutcome, 0, 2) + called := make(chan LetterOutcome, 2) + _, err := pm.Send( + ctx, "wss://test", []byte("[]"), func(o LetterOutcome) { called <- o }) + assert.NoError(t, err) + _, err = pm.Send( + ctx, "wss://test", []byte("[]"), func(o LetterOutcome) { called <- o }) + assert.NoError(t, err) + + // wait for them to hit the courier queue + time.Sleep(100 * time.Millisecond) + + // close postmaster + pm.Close() + + // expect each letter to return cancelled + Eventually(t, func() bool { + select { + default: + return false + case o := <-called: + outcomes = append(outcomes, o) + return len(outcomes) == 2 + } + }, "should have returned 2 outcomes") + + if len(outcomes) >= 2 { + assert.Equal(t, OutcomeCancelled, outcomes[0].Kind) + assert.Equal(t, OutcomeCancelled, outcomes[1].Kind) + } + + // subsequent sends should fail + _, err = pm.Send( + ctx, "wss://test", []byte("[]"), func(o LetterOutcome) { called <- o }) + assert.Error(t, err) +}