Wrote forwarder.

This commit is contained in:
Jay
2026-04-17 21:27:21 -04:00
parent 8113a050e0
commit dc00fd4899
4 changed files with 158 additions and 7 deletions

View File

@@ -1,6 +1,7 @@
package initiator
import (
"container/list"
"git.wisehodl.dev/jay/go-honeybee/transport"
"sync"
"time"
@@ -8,6 +9,11 @@ import (
// Worker
type receivedMessage struct {
data []byte
receivedAt time.Time
}
type Worker struct {
id string
stop <-chan struct{}
@@ -49,8 +55,9 @@ func (w *Worker) Start(
) {
}
func (w *Worker) runReader(conn *transport.Connection,
messages chan<- []byte,
func (w *Worker) runReader(
conn *transport.Connection,
messages chan<- receivedMessage,
heartbeat chan<- time.Time,
reconnect chan<- struct{},
newConn <-chan *transport.Connection,
@@ -61,12 +68,49 @@ func (w *Worker) runReader(conn *transport.Connection,
}
func (w *Worker) runForwarder(
messages <-chan []byte,
inbox chan<- []byte,
messages <-chan receivedMessage,
inbox chan<- InboxMessage,
stop <-chan struct{},
poolDone <-chan struct{},
maxQueueSize int,
) {
queue := list.New()
for {
var out chan<- InboxMessage
var next receivedMessage
// enable inbox if it is populated
if queue.Len() > 0 {
out = inbox
// read the first message in the queue
next = queue.Front().Value.(receivedMessage)
}
select {
case <-stop:
return
case <-poolDone:
return
case msg := <-messages:
// limit queue size if maximum is configured
if maxQueueSize > 0 && queue.Len() >= maxQueueSize {
// drop oldest message
queue.Remove(queue.Front())
}
// add new message
queue.PushBack(msg)
// send next message to inbox
case out <- InboxMessage{
ID: w.id,
Data: next.data,
ReceivedAt: next.receivedAt,
}:
// drop message from queue
queue.Remove(queue.Front())
}
}
}
func (w *Worker) runHealthMonitor(