diff --git a/courier_test.go b/courier_test.go new file mode 100644 index 0000000..e8c4c49 --- /dev/null +++ b/courier_test.go @@ -0,0 +1,84 @@ +package prism + +import ( + "context" + "git.wisehodl.dev/jay/go-mana-component" + "github.com/stretchr/testify/assert" + "testing" + // "time" +) + +// Helpers + +func newTestLetter(ctx context.Context, id uint64) OutboundLetter { + ctx, cancel := context.WithCancel( + component.MustExtend(ctx, "test_letter")) + return OutboundLetter{ + id: id, + peerID: "wss://test", + data: []byte("[]"), + ctx: ctx, + cancel: cancel, + } +} + +// Tests + +func TestCourierSendsAfterConnect(t *testing.T) { + ctx := component.MustNew(context.Background(), "prism", "test") + + sent := make(chan []byte, 1) + sendFunc := func(data Envelope) error { + sent <- data + return nil + } + + c := NewCourier(ctx, sendFunc, nil) + called := make(chan LetterOutcome, 1) + c.Enqueue(newTestLetter(ctx, 1), func(o LetterOutcome) { called <- o }) + + Never(t, func() bool { return len(sent) > 0 }, + "should not have sent while disconnected") + + c.HandleConnect() + + Eventually(t, func() bool { return len(sent) > 0 }, + "should have sent after connect") + + var outcome LetterOutcome + Eventually(t, func() bool { + select { + default: + return false + case outcome = <-called: + return true + } + }, "should have returned outcome") + + assert.Equal(t, uint64(1), outcome.LetterID) + assert.Equal(t, "wss://test", outcome.PeerID) + assert.Equal(t, "sent", outcome.Kind.String()) + assert.False(t, outcome.SentAt.IsZero()) + assert.True(t, outcome.MissedAt.IsZero()) + assert.Equal(t, 0, outcome.Retries) +} + +func TestCourierSequentialSends(t *testing.T) { + +} + +func TestCourierSkipsCancelledLetter(t *testing.T) { + +} + +func TestCourierRetryOnFailure(t *testing.T) { + +} + +func TestCourierPauseOnDisconnect(t *testing.T) { + +} + +func TestCourierDrainOnClose(t *testing.T) { + +} diff --git a/post.go b/post.go index c4ef679..464031c 100644 --- a/post.go +++ b/post.go @@ -3,6 +3,7 @@ package prism import ( "container/list" "context" + "git.wisehodl.dev/jay/go-mana-component" "log/slog" "sync" "sync/atomic" @@ -18,21 +19,50 @@ import ( type LetterID = uint64 type OutboundLetter struct { -} - -type letterRecord struct { + id uint64 + peerID string + data Envelope + ctx context.Context + cancel context.CancelFunc } type LetterOutcomeKind int +const ( + OutcomeSent LetterOutcomeKind = iota + OutcomeExpired + OutcomeCancelled + OutcomeRejected +) + +func (k LetterOutcomeKind) String() string { + switch k { + case OutcomeSent: + return "sent" + case OutcomeExpired: + return "expired" + case OutcomeCancelled: + return "cancelled" + case OutcomeRejected: + return "rejected" + default: + return "unknown" + } +} + type LetterOutcome struct { + LetterID uint64 + PeerID string + Kind LetterOutcomeKind + SentAt time.Time + MissedAt time.Time + Retries int } // Postmaster type Postmaster struct { couriers map[string]*Courier - letters map[LetterID]letterRecord events <-chan PoolEvent // Adapter.Subscribe send PoolSendFunc // Adapter.Send counter atomic.Uint64 @@ -48,10 +78,8 @@ type Postmaster struct { // Courier type Courier struct { - id string // peer id - master *Postmaster - cmd chan courierCommand - send func(data Envelope) error + cmd chan courierCommand + sendFunc func(data Envelope) error // state queue list.List @@ -104,14 +132,235 @@ func (m *Postmaster) Close() {} // Courier // ---------------------------------------------------------------------------- -func NewCourier() *Courier { - return nil +// Traveller + +type letterTraveller struct { + letter OutboundLetter + onOutcome func(LetterOutcome) + + sentAt time.Time + missedAt time.Time + retries int + once sync.Once } -func (c *Courier) Enqueue(letter OutboundLetter) {} +func (t *letterTraveller) isCancelled() bool { + return t.letter.ctx.Err() != nil +} -func (c *Courier) Close() {} +func (t *letterTraveller) countRetry() { t.retries++ } +func (t *letterTraveller) setSentAt(at time.Time) { t.sentAt = at } +func (t *letterTraveller) setMissedAt(at time.Time) { t.missedAt = at } + +// Courier + +func NewCourier( + ctx context.Context, + sendFunc func(data Envelope) error, // func => PoolSendFunc(id) + handler slog.Handler, +) *Courier { + ctx, cancel := context.WithCancel( + component.MustExtend(ctx, "courier")) + + c := &Courier{ + cmd: make(chan courierCommand, 64), + sendFunc: sendFunc, + ctx: ctx, + cancel: cancel, + } + + if handler != nil { + comp, ok := component.Get(ctx) + if ok { + c.logger = slog.New(handler).With(slog.Any("component", comp)) + } + } + + c.wg.Add(1) + go c.run() + + return c +} + +func (c *Courier) Enqueue(letter OutboundLetter, onOutcome func(LetterOutcome)) { + traveller := &letterTraveller{ + letter: letter, + onOutcome: onOutcome, + } + c.command(cmdEnqueue{traveller: traveller}) +} + +func (c *Courier) HandleConnect() { + c.command(cmdHandleConnect{}) +} + +func (c *Courier) HandleDisconnect() { + c.command(cmdHandleDisconnect{}) +} + +func (c *Courier) Close() { + c.cancel() + c.wg.Wait() +} + +// Internal + +func (c *Courier) command(cmd courierCommand) { + select { + case <-c.ctx.Done(): + case c.cmd <- cmd: + } +} + +func (c *Courier) run() { + defer c.wg.Done() + + for { + select { + case <-c.ctx.Done(): + return + case cmd := <-c.cmd: + cmd.apply(c) + c.maybeSend() + } + } +} + +func (c *Courier) maybeSend() { + if !c.preflight() { + c.drain() + return + } + + t, ok := c.pop() + if !ok { + return + } + c.sending = true + + c.wg.Add(1) + go c.sendOnce(t) + +} + +func (c *Courier) sendOnce(t *letterTraveller) { + defer c.wg.Done() + err := c.sendFunc(t.letter.data) + c.command(cmdHandleSendResult{traveller: t, at: time.Now(), err: err}) +} + +func (c *Courier) doneOnce(t *letterTraveller) { + var kind LetterOutcomeKind + if t.isCancelled() { + // letter was cancelled + if t.letter.ctx.Err() == context.DeadlineExceeded { + // letter expired + kind = OutcomeExpired + } else { + // letter was cancelled externally + kind = OutcomeCancelled + } + } else { + // letter was sent + kind = OutcomeSent + } + + outcome := LetterOutcome{ + LetterID: t.letter.id, + PeerID: t.letter.peerID, + Kind: kind, + SentAt: t.sentAt, + MissedAt: t.missedAt, + Retries: t.retries, + } + + t.once.Do(func() { + t.letter.cancel() + go t.onOutcome(outcome) + }) +} + +// Helpers + +func (c *Courier) preflight() bool { + isConnected := c.connected + notAlreadySending := !c.sending + hasQueuedLetters := c.queue.Len() > 0 + return isConnected && notAlreadySending && hasQueuedLetters +} + +func (c *Courier) drain() { + for { + front := c.queue.Front() + if front == nil { + return + } + + t := front.Value.(*letterTraveller) + if !t.isCancelled() { + return + } + + t.setMissedAt(time.Now()) + c.doneOnce(t) + c.queue.Remove(front) + } +} + +func (c *Courier) pop() (*letterTraveller, bool) { + for { + front := c.queue.Front() + if front == nil { + return nil, false + } + + t := front.Value.(*letterTraveller) + c.queue.Remove(front) + + if !t.isCancelled() { + return t, true + } + + t.setMissedAt(time.Now()) + c.doneOnce(t) + } +} // ---------------------------------------------------------------------------- // Commands // ---------------------------------------------------------------------------- + +type cmdEnqueue struct{ traveller *letterTraveller } + +func (cmd cmdEnqueue) apply(c *Courier) { + c.queue.PushBack(cmd.traveller) +} + +type cmdHandleConnect struct{} + +func (cmd cmdHandleConnect) apply(c *Courier) { + c.connected = true +} + +type cmdHandleDisconnect struct{} + +func (cmd cmdHandleDisconnect) apply(c *Courier) { + c.connected = false +} + +type cmdHandleSendResult struct { + traveller *letterTraveller + at time.Time + err error +} + +func (cmd cmdHandleSendResult) apply(c *Courier) { + c.sending = false + if cmd.err != nil { + cmd.traveller.countRetry() + c.queue.PushFront(cmd.traveller) + } else { + cmd.traveller.setSentAt(cmd.at) + c.doneOnce(cmd.traveller) + } +}