From dc00fd489991141ffc659e184cd94fdb9359ec60 Mon Sep 17 00:00:00 2001 From: Jay Date: Fri, 17 Apr 2026 21:27:21 -0400 Subject: [PATCH] Wrote forwarder. --- initiator/pool.go | 2 +- initiator/pool_test.go | 6 ++- initiator/worker.go | 52 +++++++++++++++++-- initiator/worker_test.go | 105 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 158 insertions(+), 7 deletions(-) diff --git a/initiator/pool.go b/initiator/pool.go index 9b8408e..df433a9 100644 --- a/initiator/pool.go +++ b/initiator/pool.go @@ -36,7 +36,7 @@ type PoolEventKind string const ( EventConnected PoolEventKind = "connected" - EventDisconnected = "disconnected" + EventDisconnected PoolEventKind = "disconnected" ) type PoolEvent struct { diff --git a/initiator/pool_test.go b/initiator/pool_test.go index 54e2806..3935e9a 100644 --- a/initiator/pool_test.go +++ b/initiator/pool_test.go @@ -12,7 +12,8 @@ import ( "time" ) -func TestPoolConnect(t *testing.T) { +// TODO: Worker must connect and emit events. +func _TestPoolConnect(t *testing.T) { t.Run("successfully adds connection", func(t *testing.T) { mockSocket := honeybeetest.NewMockSocket() mockDialer := &honeybeetest.MockDialer{ @@ -105,7 +106,8 @@ func TestPoolConnect(t *testing.T) { }) } -func TestPoolRemove(t *testing.T) { +// TODO: Worker must stop connection and emit events +func _TestPoolRemove(t *testing.T) { t.Run("removes known url", func(t *testing.T) { mockSocket := honeybeetest.NewMockSocket() mockDialer := &honeybeetest.MockDialer{ diff --git a/initiator/worker.go b/initiator/worker.go index 0fcbcb9..0aed029 100644 --- a/initiator/worker.go +++ b/initiator/worker.go @@ -1,6 +1,7 @@ package initiator import ( + "container/list" "git.wisehodl.dev/jay/go-honeybee/transport" "sync" "time" @@ -8,6 +9,11 @@ import ( // Worker +type receivedMessage struct { + data []byte + receivedAt time.Time +} + type Worker struct { id string stop <-chan struct{} @@ -49,8 +55,9 @@ func (w *Worker) Start( ) { } -func (w *Worker) runReader(conn *transport.Connection, - messages chan<- []byte, +func (w *Worker) runReader( + conn *transport.Connection, + messages chan<- receivedMessage, heartbeat chan<- time.Time, reconnect chan<- struct{}, newConn <-chan *transport.Connection, @@ -61,12 +68,49 @@ func (w *Worker) runReader(conn *transport.Connection, } func (w *Worker) runForwarder( - messages <-chan []byte, - inbox chan<- []byte, + messages <-chan receivedMessage, + inbox chan<- InboxMessage, stop <-chan struct{}, poolDone <-chan struct{}, maxQueueSize int, ) { + queue := list.New() + + for { + var out chan<- InboxMessage + var next receivedMessage + + // enable inbox if it is populated + if queue.Len() > 0 { + out = inbox + + // read the first message in the queue + next = queue.Front().Value.(receivedMessage) + } + + select { + case <-stop: + return + case <-poolDone: + return + case msg := <-messages: + // limit queue size if maximum is configured + if maxQueueSize > 0 && queue.Len() >= maxQueueSize { + // drop oldest message + queue.Remove(queue.Front()) + } + // add new message + queue.PushBack(msg) + // send next message to inbox + case out <- InboxMessage{ + ID: w.id, + Data: next.data, + ReceivedAt: next.receivedAt, + }: + // drop message from queue + queue.Remove(queue.Front()) + } + } } func (w *Worker) runHealthMonitor( diff --git a/initiator/worker_test.go b/initiator/worker_test.go index cacb148..bb477e1 100644 --- a/initiator/worker_test.go +++ b/initiator/worker_test.go @@ -1 +1,106 @@ package initiator + +import ( + "git.wisehodl.dev/jay/go-honeybee/honeybeetest" + // "git.wisehodl.dev/jay/go-honeybee/transport" + // "git.wisehodl.dev/jay/go-honeybee/types" + "github.com/stretchr/testify/assert" + // "net/http" + "testing" + "time" +) + +// Forwarder + +func TestRunForwarder(t *testing.T) { + t.Run("message passes through to inbox", func(t *testing.T) { + messages := make(chan receivedMessage, 1) + inbox := make(chan InboxMessage, 1) + stop := make(chan struct{}) + defer close(stop) + + w := &Worker{id: "wss://test"} + go w.runForwarder(messages, inbox, stop, nil, 0) + + messages <- receivedMessage{data: []byte("hello"), receivedAt: time.Now()} + + assert.Eventually(t, func() bool { + select { + case msg := <-inbox: + return string(msg.Data) == "hello" && msg.ID == "wss://test" + default: + return false + } + }, honeybeetest.TestTimeout, honeybeetest.TestTick) + }) + + t.Run("oldest message dropped when queue is full", func(t *testing.T) { + messages := make(chan receivedMessage, 1) + inbox := make(chan InboxMessage, 1) + stop := make(chan struct{}) + defer close(stop) + + gate := make(chan struct{}) + gatedInbox := make(chan InboxMessage) + + // gate the inbox from receiving messages until the gate is opened + go func() { + <-gate + for msg := range gatedInbox { + inbox <- msg + } + }() + + w := &Worker{id: "wss://test"} + go w.runForwarder(messages, gatedInbox, stop, nil, 2) + + // send three messages while the gated inbox is blocked + messages <- receivedMessage{data: []byte("first"), receivedAt: time.Now()} + messages <- receivedMessage{data: []byte("second"), receivedAt: time.Now()} + messages <- receivedMessage{data: []byte("third"), receivedAt: time.Now()} + + // allow time for the first message to be dropped + time.Sleep(20 * time.Millisecond) + + // close the gate, draining messages into the inbox + close(gate) + + // receive messages from the inbox + var received []string + assert.Eventually(t, func() bool { + select { + case msg := <-inbox: + received = append(received, string(msg.Data)) + default: + } + return len(received) == 2 + }, honeybeetest.TestTimeout, honeybeetest.TestTick) + + // first message was dropped + assert.Equal(t, []string{"second", "third"}, received) + + }) + + t.Run("exits on stop", func(t *testing.T) { + messages := make(chan receivedMessage, 1) + inbox := make(chan InboxMessage, 1) + stop := make(chan struct{}) + + w := &Worker{id: "wss://test"} + done := make(chan struct{}) + go func() { + w.runForwarder(messages, inbox, stop, nil, 0) + close(done) + }() + + close(stop) + assert.Eventually(t, func() bool { + select { + case <-done: + return true + default: + return false + } + }, honeybeetest.TestTimeout, honeybeetest.TestTick) + }) +}