From d7283c1c61663cb1bf33d22397eaa9189ffd4455 Mon Sep 17 00:00:00 2001 From: Jay Date: Mon, 11 May 2026 09:56:46 -0400 Subject: [PATCH] cleaned up terminology, added cancel and expire tests, fixed send outcome behavior --- post.go | 168 +++++++++++++++++++------------------ postmaster_test.go | 203 +++++++++++++++++++++++++++++++-------------- 2 files changed, 229 insertions(+), 142 deletions(-) diff --git a/post.go b/post.go index b805c61..1d17a10 100644 --- a/post.go +++ b/post.go @@ -3,7 +3,6 @@ package prism import ( "container/list" "context" - "fmt" "git.wisehodl.dev/jay/go-mana-component" "log/slog" "sync" @@ -81,7 +80,7 @@ type Postmaster struct { // Courier type Courier struct { - cmd chan courierCommand + task chan courierTask sendFunc func(data Envelope) error // state @@ -91,15 +90,14 @@ type Courier struct { ctx context.Context cancel context.CancelFunc - mu sync.Mutex wg sync.WaitGroup logger *slog.Logger } -// Commands +// Messages -type courierCommand interface { - apply(c *Courier) +type courierTask interface { + dispatch(c *Courier) } // Options @@ -178,9 +176,9 @@ func (pm *Postmaster) Send( ctx context.Context, peerID string, data Envelope, - onOutcome func(LetterOutcome), + callback func(LetterOutcome), opts ...SendOption, -) (context.CancelFunc, error) { +) context.CancelFunc { cfg := sendConfig{deadline: pm.cfg.defaultDeadline} for _, opt := range opts { opt(&cfg) @@ -192,11 +190,13 @@ func (pm *Postmaster) Send( // check if peer courier exists peerID, ok := pm.poolHasPeer(peerID) if !ok { - return nil, fmt.Errorf("peer not found") + go callback(LetterOutcome{PeerID: peerID, Kind: OutcomeRejected}) + return func() {} } courier, ok := pm.couriers[peerID] if !ok { - return nil, fmt.Errorf("peer not found") + go callback(LetterOutcome{PeerID: peerID, Kind: OutcomeRejected}) + return func() {} } ctx, cancel := context.WithTimeout(ctx, cfg.deadline) @@ -208,9 +208,9 @@ func (pm *Postmaster) Send( cancel: cancel, } - courier.Enqueue(letter, onOutcome) + courier.Enqueue(letter, callback) - return cancel, nil + return cancel } func (pm *Postmaster) Peers() []string { @@ -293,9 +293,9 @@ func (pm *Postmaster) handlePoolEvents() { // Courier // ---------------------------------------------------------------------------- -// Traveller +// Letter State -type letterTraveller struct { +type letterState struct { letter OutboundLetter onOutcome func(LetterOutcome) @@ -305,13 +305,13 @@ type letterTraveller struct { once sync.Once } -func (t *letterTraveller) isCancelled() bool { - return t.letter.ctx.Err() != nil +func (s *letterState) isCancelled() bool { + return s.letter.ctx.Err() != nil } -func (t *letterTraveller) countRetry() { t.retries++ } -func (t *letterTraveller) setSentAt(at time.Time) { t.sentAt = at } -func (t *letterTraveller) setMissedAt(at time.Time) { t.missedAt = at } +func (s *letterState) countRetry() { s.retries++ } +func (s *letterState) setSentAt(at time.Time) { s.sentAt = at } +func (s *letterState) setMissedAt(at time.Time) { s.missedAt = at } // Courier @@ -324,7 +324,7 @@ func NewCourier( component.MustExtend(ctx, "courier")) c := &Courier{ - cmd: make(chan courierCommand, 64), + task: make(chan courierTask, 64), sendFunc: sendFunc, ctx: ctx, cancel: cancel, @@ -344,42 +344,33 @@ func NewCourier( } func (c *Courier) Enqueue(letter OutboundLetter, onOutcome func(LetterOutcome)) { - traveller := &letterTraveller{ + wrappedLetter := &letterState{ letter: letter, onOutcome: onOutcome, } - c.command(cmdEnqueue{traveller: traveller}) + c.order(taskEnqueue{letter: wrappedLetter}) } func (c *Courier) HandleConnect() { - c.command(cmdHandleConnect{}) + c.order(taskConnected{}) } func (c *Courier) HandleDisconnect() { - c.command(cmdHandleDisconnect{}) + c.order(taskDisconnected{}) } func (c *Courier) Close() { c.cancel() c.wg.Wait() - // cancel remaining letters - for { - t, ok := c.pop() - if !ok { - break - } - t.letter.cancel() - t.setMissedAt(time.Now()) - c.doneOnce(t) - } + c.terminate() } // Internal -func (c *Courier) command(cmd courierCommand) { +func (c *Courier) order(task courierTask) { select { case <-c.ctx.Done(): - case c.cmd <- cmd: + case c.task <- task: } } @@ -390,8 +381,8 @@ func (c *Courier) run() { select { case <-c.ctx.Done(): return - case cmd := <-c.cmd: - cmd.apply(c) + case task := <-c.task: + task.dispatch(c) c.maybeSend() } } @@ -403,28 +394,28 @@ func (c *Courier) maybeSend() { return } - t, ok := c.pop() + s, ok := c.pop() if !ok { return } c.sending = true c.wg.Add(1) - go c.sendOnce(t) + go c.sendOnce(s) } -func (c *Courier) sendOnce(t *letterTraveller) { +func (c *Courier) sendOnce(s *letterState) { defer c.wg.Done() - err := c.sendFunc(t.letter.data) - c.command(cmdHandleSendResult{traveller: t, at: time.Now(), err: err}) + err := c.sendFunc(s.letter.data) + c.order(taskHandleSendResult{letter: s, at: time.Now(), err: err}) } -func (c *Courier) doneOnce(t *letterTraveller) { +func (c *Courier) doneOnce(s *letterState) { var kind LetterOutcomeKind - if t.isCancelled() { + if s.isCancelled() { // letter was cancelled - if t.letter.ctx.Err() == context.DeadlineExceeded { + if s.letter.ctx.Err() == context.DeadlineExceeded { // letter expired kind = OutcomeExpired } else { @@ -437,20 +428,33 @@ func (c *Courier) doneOnce(t *letterTraveller) { } outcome := LetterOutcome{ - LetterID: t.letter.id, - PeerID: t.letter.peerID, + LetterID: s.letter.id, + PeerID: s.letter.peerID, Kind: kind, - SentAt: t.sentAt, - MissedAt: t.missedAt, - Retries: t.retries, + SentAt: s.sentAt, + MissedAt: s.missedAt, + Retries: s.retries, } - t.once.Do(func() { - t.letter.cancel() - go t.onOutcome(outcome) + s.once.Do(func() { + s.letter.cancel() + go s.onOutcome(outcome) }) } +func (c *Courier) terminate() { + // cancel remaining letters + for { + s, ok := c.pop() + if !ok { + break + } + s.letter.cancel() + s.setMissedAt(time.Now()) + c.doneOnce(s) + } +} + // Helpers func (c *Courier) preflight() bool { @@ -467,71 +471,71 @@ func (c *Courier) drain() { return } - t := front.Value.(*letterTraveller) - if !t.isCancelled() { + s := front.Value.(*letterState) + if !s.isCancelled() { return } - t.setMissedAt(time.Now()) - c.doneOnce(t) + s.setMissedAt(time.Now()) + c.doneOnce(s) c.queue.Remove(front) } } -func (c *Courier) pop() (*letterTraveller, bool) { +func (c *Courier) pop() (*letterState, bool) { for { front := c.queue.Front() if front == nil { return nil, false } - t := front.Value.(*letterTraveller) + s := front.Value.(*letterState) c.queue.Remove(front) - if !t.isCancelled() { - return t, true + if !s.isCancelled() { + return s, true } - t.setMissedAt(time.Now()) - c.doneOnce(t) + s.setMissedAt(time.Now()) + c.doneOnce(s) } } // ---------------------------------------------------------------------------- -// Commands +// Courier Messages // ---------------------------------------------------------------------------- -type cmdEnqueue struct{ traveller *letterTraveller } +type taskEnqueue struct{ letter *letterState } -func (cmd cmdEnqueue) apply(c *Courier) { - c.queue.PushBack(cmd.traveller) +func (t taskEnqueue) dispatch(c *Courier) { + c.queue.PushBack(t.letter) } -type cmdHandleConnect struct{} +type taskConnected struct{} -func (cmd cmdHandleConnect) apply(c *Courier) { +func (t taskConnected) dispatch(c *Courier) { c.connected = true } -type cmdHandleDisconnect struct{} +type taskDisconnected struct{} -func (cmd cmdHandleDisconnect) apply(c *Courier) { +func (t taskDisconnected) dispatch(c *Courier) { c.connected = false } -type cmdHandleSendResult struct { - traveller *letterTraveller - at time.Time - err error +type taskHandleSendResult struct { + letter *letterState + at time.Time + err error } -func (cmd cmdHandleSendResult) apply(c *Courier) { +func (t taskHandleSendResult) dispatch(c *Courier) { c.sending = false - if cmd.err != nil { - cmd.traveller.countRetry() - c.queue.PushFront(cmd.traveller) + if t.err != nil { + t.letter.countRetry() + c.queue.PushFront(t.letter) } else { - cmd.traveller.setSentAt(cmd.at) - c.doneOnce(cmd.traveller) + t.letter.setSentAt(t.at) + c.doneOnce(t.letter) } } diff --git a/postmaster_test.go b/postmaster_test.go index d6f3668..9fbd14c 100644 --- a/postmaster_test.go +++ b/postmaster_test.go @@ -2,13 +2,13 @@ package prism import ( "context" - // "git.wisehodl.dev/jay/go-mana-component" "github.com/stretchr/testify/assert" - // "sync/atomic" "testing" "time" ) +const testURL = "wss://test" + func TestPostmasterUnknownPeerSend(t *testing.T) { ctx := context.Background() poolHasPeer := func(id string) (string, bool) { return id, true } @@ -17,8 +17,20 @@ func TestPostmasterUnknownPeerSend(t *testing.T) { pm := NewPostmaster(ctx, poolHasPeer, poolEvents, poolSendFunc, nil) - _, err := pm.Send(ctx, "wss://test", []byte("[]"), func(LetterOutcome) {}) - assert.Error(t, err) + called := make(chan LetterOutcome, 1) + pm.Send(ctx, testURL, nil, func(o LetterOutcome) { called <- o }) + + var outcome LetterOutcome + Eventually(t, func() bool { + select { + default: + return false + case outcome = <-called: + return true + } + }, "should have received outcome") + + assert.Equal(t, OutcomeRejected, outcome.Kind) } func TestPostmasterSend(t *testing.T) { @@ -29,18 +41,13 @@ func TestPostmasterSend(t *testing.T) { pm := NewPostmaster(ctx, poolHasPeer, poolEvents, poolSendFunc, nil) - poolEvents <- PoolEvent{ - ID: "wss://test", Kind: EventAdded, At: time.Now()} - poolEvents <- PoolEvent{ - ID: "wss://test", Kind: EventConnected, At: time.Now()} + poolEvents <- PoolEvent{ID: testURL, Kind: EventAdded, At: time.Now()} + poolEvents <- PoolEvent{ID: testURL, Kind: EventConnected, At: time.Now()} - Eventually(t, func() bool { return len(pm.Peers()) > 0 }, - "should add peer") + Eventually(t, func() bool { return len(pm.Peers()) > 0 }, "should add peer") called := make(chan LetterOutcome, 1) - _, err := pm.Send( - ctx, "wss://test", []byte("[]"), func(o LetterOutcome) { called <- o }) - assert.NoError(t, err) + pm.Send(ctx, testURL, nil, func(o LetterOutcome) { called <- o }) var outcome LetterOutcome Eventually(t, func() bool { @@ -55,6 +62,78 @@ func TestPostmasterSend(t *testing.T) { assert.Equal(t, OutcomeSent, outcome.Kind) } +func TestPostmasterCancelInFlight(t *testing.T) { + ctx := context.Background() + poolHasPeer := func(id string) (string, bool) { return id, true } + poolEvents := make(chan PoolEvent, 4) + poolSendFunc := func(id string, data Envelope) error { return nil } + + pm := NewPostmaster(ctx, poolHasPeer, poolEvents, poolSendFunc, nil) + + poolEvents <- PoolEvent{ID: testURL, Kind: EventAdded, At: time.Now()} + Eventually(t, func() bool { return len(pm.Peers()) > 0 }, "should add peer") + + called := make(chan LetterOutcome, 1) + cancel := pm.Send(ctx, testURL, nil, func(o LetterOutcome) { called <- o }) + + // wait for letter to queue + time.Sleep(100 * time.Millisecond) + + // cancel the letter using its callback + cancel() + + // connect the pool + poolEvents <- PoolEvent{ID: testURL, Kind: EventConnected, At: time.Now()} + + var outcome LetterOutcome + Eventually(t, func() bool { + select { + default: + return false + case outcome = <-called: + return true + } + }, "should have received outcome") + + // letter should drain out of the queue and return cancelled + assert.Equal(t, OutcomeCancelled, outcome.Kind) +} + +func TestPostmasterExpire(t *testing.T) { + ctx := context.Background() + poolHasPeer := func(id string) (string, bool) { return id, true } + poolEvents := make(chan PoolEvent, 4) + poolSendFunc := func(id string, data Envelope) error { return nil } + + pm := NewPostmaster(ctx, poolHasPeer, poolEvents, poolSendFunc, nil) + + poolEvents <- PoolEvent{ID: testURL, Kind: EventAdded, At: time.Now()} + Eventually(t, func() bool { return len(pm.Peers()) > 0 }, "should add peer") + + called := make(chan LetterOutcome, 1) + pm.Send(ctx, testURL, nil, func(o LetterOutcome) { called <- o }, + WithDeadline(1*time.Millisecond)) + + // wait for letter to queue and expire + time.Sleep(100 * time.Millisecond) + + // connect the pool + poolEvents <- PoolEvent{ID: testURL, Kind: EventConnected, At: time.Now()} + + var outcome LetterOutcome + Eventually(t, func() bool { + select { + default: + return false + case outcome = <-called: + return true + } + }, "should have received outcome") + + // letter should drain out of the queue and return expired + assert.Equal(t, OutcomeExpired, outcome.Kind) +} + func TestPostmasterPeerRemoved(t *testing.T) { ctx := context.Background() poolHasPeer := func(id string) (string, bool) { return id, true } @@ -64,27 +143,20 @@ func TestPostmasterPeerRemoved(t *testing.T) { pm := NewPostmaster(ctx, poolHasPeer, poolEvents, poolSendFunc, nil) // add peer, but do not connect - poolEvents <- PoolEvent{ - ID: "wss://test", Kind: EventAdded, At: time.Now()} - Eventually(t, func() bool { return len(pm.Peers()) > 0 }, - "should add peer") + poolEvents <- PoolEvent{ID: testURL, Kind: EventAdded, At: time.Now()} + Eventually(t, func() bool { return len(pm.Peers()) > 0 }, "should add peer") // send two letters outcomes := make([]LetterOutcome, 0, 2) called := make(chan LetterOutcome, 2) - _, err := pm.Send( - ctx, "wss://test", []byte("[]"), func(o LetterOutcome) { called <- o }) - assert.NoError(t, err) - _, err = pm.Send( - ctx, "wss://test", []byte("[]"), func(o LetterOutcome) { called <- o }) - assert.NoError(t, err) + pm.Send(ctx, testURL, nil, func(o LetterOutcome) { called <- o }) + pm.Send(ctx, testURL, nil, func(o LetterOutcome) { called <- o }) // wait for them to hit the courier queue time.Sleep(100 * time.Millisecond) // remove the peer - poolEvents <- PoolEvent{ - ID: "wss://test", Kind: EventRemoved, At: time.Now()} + poolEvents <- PoolEvent{ID: testURL, Kind: EventRemoved, At: time.Now()} // expect each letter to return cancelled Eventually(t, func() bool { @@ -103,9 +175,19 @@ func TestPostmasterPeerRemoved(t *testing.T) { } // subsequent sends should fail - _, err = pm.Send( - ctx, "wss://test", []byte("[]"), func(o LetterOutcome) { called <- o }) - assert.Error(t, err) + pm.Send(ctx, testURL, nil, func(o LetterOutcome) { called <- o }) + + var outcome LetterOutcome + Eventually(t, func() bool { + select { + default: + return false + case outcome = <-called: + return true + } + }, "should have received outcome") + + assert.Equal(t, OutcomeRejected, outcome.Kind) } func TestPostmasterCourierCloseRace(t *testing.T) { @@ -117,42 +199,39 @@ func TestPostmasterCourierCloseRace(t *testing.T) { pm := NewPostmaster(ctx, poolHasPeer, poolEvents, poolSendFunc, nil) // add peer, but do not connect - poolEvents <- PoolEvent{ - ID: "wss://test", Kind: EventAdded, At: time.Now()} - Eventually(t, func() bool { return len(pm.Peers()) > 0 }, - "should add peer") + poolEvents <- PoolEvent{ID: testURL, Kind: EventAdded, At: time.Now()} + Eventually(t, func() bool { return len(pm.Peers()) > 0 }, "should add peer") // remove the peer - poolEvents <- PoolEvent{ - ID: "wss://test", Kind: EventRemoved, At: time.Now()} + poolEvents <- PoolEvent{ID: testURL, Kind: EventRemoved, At: time.Now()} // send a letter time.Sleep(5 * time.Microsecond) // small wait lines up the race condition - var outcome LetterOutcome + var outcome *LetterOutcome called := make(chan LetterOutcome, 1) - _, err := pm.Send( - ctx, "wss://test", []byte("[]"), func(o LetterOutcome) { called <- o }) + pm.Send(ctx, testURL, nil, func(o LetterOutcome) { called <- o }) - if err != nil { - // the close won the race, the letter was not sent - return - } - - // the letter might beat the courier close and return cancelled Eventually(t, func() bool { select { default: return false - case outcome = <-called: + case o := <-called: + outcome = &o return true } }, "should have returned 1 outcomes") - if outcome.LetterID == 0 { + if outcome == nil { t.Fatal("did not receive an outcome") } - assert.Equal(t, OutcomeCancelled, outcome.Kind) + // depending on the race, the outcome could be: + // close, then send: send is rejected by the postmaster + // send, then close: send is cancelled by the courier + assert.Contains(t, + []LetterOutcomeKind{OutcomeCancelled, OutcomeRejected}, + outcome.Kind, + ) } func TestPostmasterClose(t *testing.T) { @@ -164,20 +243,14 @@ func TestPostmasterClose(t *testing.T) { pm := NewPostmaster(ctx, poolHasPeer, poolEvents, poolSendFunc, nil) // add peer, but do not connect - poolEvents <- PoolEvent{ - ID: "wss://test", Kind: EventAdded, At: time.Now()} - Eventually(t, func() bool { return len(pm.Peers()) > 0 }, - "should add peer") + poolEvents <- PoolEvent{ID: testURL, Kind: EventAdded, At: time.Now()} + Eventually(t, func() bool { return len(pm.Peers()) > 0 }, "should add peer") // send two letters outcomes := make([]LetterOutcome, 0, 2) called := make(chan LetterOutcome, 2) - _, err := pm.Send( - ctx, "wss://test", []byte("[]"), func(o LetterOutcome) { called <- o }) - assert.NoError(t, err) - _, err = pm.Send( - ctx, "wss://test", []byte("[]"), func(o LetterOutcome) { called <- o }) - assert.NoError(t, err) + pm.Send(ctx, testURL, nil, func(o LetterOutcome) { called <- o }) + pm.Send(ctx, testURL, nil, func(o LetterOutcome) { called <- o }) // wait for them to hit the courier queue time.Sleep(100 * time.Millisecond) @@ -201,8 +274,18 @@ func TestPostmasterClose(t *testing.T) { assert.Equal(t, OutcomeCancelled, outcomes[1].Kind) } - // subsequent sends should fail - _, err = pm.Send( - ctx, "wss://test", []byte("[]"), func(o LetterOutcome) { called <- o }) - assert.Error(t, err) + // subsequent sends should be rejected + pm.Send(ctx, testURL, nil, func(o LetterOutcome) { called <- o }) + + var outcome LetterOutcome + Eventually(t, func() bool { + select { + default: + return false + case outcome = <-called: + return true + } + }, "should have received outcome") + + assert.Equal(t, OutcomeRejected, outcome.Kind) }