From 4c17b9539a38c5d0401a394262342eb01c2a85b1 Mon Sep 17 00:00:00 2001 From: Jay Date: Sat, 18 Apr 2026 12:44:49 -0400 Subject: [PATCH] Renamed package, set up for worker session pattern. --- {initiator => initiatorpool}/config.go | 24 ++--- .../config_pool_test.go | 2 +- {initiator => initiatorpool}/errors.go | 6 +- {initiator => initiatorpool}/pool.go | 5 +- {initiator => initiatorpool}/pool_test.go | 2 +- {initiator => initiatorpool}/worker.go | 102 ++++++++++++++---- {initiator => initiatorpool}/worker_test.go | 34 +++--- 7 files changed, 117 insertions(+), 58 deletions(-) rename {initiator => initiatorpool}/config.go (86%) rename {initiator => initiatorpool}/config_pool_test.go (99%) rename {initiator => initiatorpool}/errors.go (56%) rename {initiator => initiatorpool}/pool.go (98%) rename {initiator => initiatorpool}/pool_test.go (99%) rename {initiator => initiatorpool}/worker.go (61%) rename {initiator => initiatorpool}/worker_test.go (81%) diff --git a/initiator/config.go b/initiatorpool/config.go similarity index 86% rename from initiator/config.go rename to initiatorpool/config.go index dc3943b..68cee65 100644 --- a/initiator/config.go +++ b/initiatorpool/config.go @@ -1,4 +1,4 @@ -package initiator +package initiatorpool import ( "git.wisehodl.dev/jay/go-honeybee/transport" @@ -99,8 +99,8 @@ func WithWorkerFactory(wf WorkerFactory) PoolOption { // Worker Config type WorkerConfig struct { - ReconnectTimeout time.Duration - MaxQueueSize int + IdleTimeout time.Duration + MaxQueueSize int } type WorkerOption func(*WorkerConfig) error @@ -118,8 +118,8 @@ func NewWorkerConfig(options ...WorkerOption) (*WorkerConfig, error) { func GetDefaultWorkerConfig() *WorkerConfig { return &WorkerConfig{ - ReconnectTimeout: 20 * time.Second, - MaxQueueSize: 0, // disabled by default + IdleTimeout: 20 * time.Second, + MaxQueueSize: 0, // disabled by default } } @@ -133,7 +133,7 @@ func applyWorkerOptions(config *WorkerConfig, options ...WorkerOption) error { } func ValidateWorkerConfig(config *WorkerConfig) error { - err := validateReconnectTimeout(config.ReconnectTimeout) + err := validateIdleTimeout(config.IdleTimeout) if err != nil { return err } @@ -153,21 +153,21 @@ func validateMaxQueueSize(value int) error { return nil } -func validateReconnectTimeout(value time.Duration) error { +func validateIdleTimeout(value time.Duration) error { if value < 0 { - return InvalidReconnectTimeout + return InvalidIdleTimeout } return nil } -// When ReconnectTimeout is set to zero, idle timeouts are disabled. -func WithReconnectTimeout(value time.Duration) WorkerOption { +// When IdleTimeout is set to zero, idle timeouts are disabled. +func WithIdleTimeout(value time.Duration) WorkerOption { return func(c *WorkerConfig) error { - err := validateReconnectTimeout(value) + err := validateIdleTimeout(value) if err != nil { return err } - c.ReconnectTimeout = value + c.IdleTimeout = value return nil } } diff --git a/initiator/config_pool_test.go b/initiatorpool/config_pool_test.go similarity index 99% rename from initiator/config_pool_test.go rename to initiatorpool/config_pool_test.go index a1d5bdd..740e73f 100644 --- a/initiator/config_pool_test.go +++ b/initiatorpool/config_pool_test.go @@ -1,4 +1,4 @@ -package initiator +package initiatorpool import ( "git.wisehodl.dev/jay/go-honeybee/transport" diff --git a/initiator/errors.go b/initiatorpool/errors.go similarity index 56% rename from initiator/errors.go rename to initiatorpool/errors.go index 5c4c352..bf80c96 100644 --- a/initiator/errors.go +++ b/initiatorpool/errors.go @@ -1,11 +1,11 @@ -package initiator +package initiatorpool import "errors" import "fmt" var ( - InvalidReconnectTimeout = errors.New("idle timeout cannot be negative") - InvalidMaxQueueSize = errors.New("maximum queue size cannot be negative") + InvalidIdleTimeout = errors.New("idle timeout cannot be negative") + InvalidMaxQueueSize = errors.New("maximum queue size cannot be negative") ) func NewConfigError(text string) error { diff --git a/initiator/pool.go b/initiatorpool/pool.go similarity index 98% rename from initiator/pool.go rename to initiatorpool/pool.go index df433a9..d7607d1 100644 --- a/initiator/pool.go +++ b/initiatorpool/pool.go @@ -1,4 +1,4 @@ -package initiator +package initiatorpool import ( "git.wisehodl.dev/jay/go-honeybee/transport" @@ -111,6 +111,9 @@ func (p *Pool) Errors() chan error { } func (p *Pool) SetDialer(d types.Dialer) { + if d == nil { + panic("dialer cannot be nil") + } p.dialer = d } diff --git a/initiator/pool_test.go b/initiatorpool/pool_test.go similarity index 99% rename from initiator/pool_test.go rename to initiatorpool/pool_test.go index 3935e9a..7f5aac1 100644 --- a/initiator/pool_test.go +++ b/initiatorpool/pool_test.go @@ -1,4 +1,4 @@ -package initiator +package initiatorpool import ( "fmt" diff --git a/initiator/worker.go b/initiatorpool/worker.go similarity index 61% rename from initiator/worker.go rename to initiatorpool/worker.go index 6388536..aebf1a5 100644 --- a/initiator/worker.go +++ b/initiatorpool/worker.go @@ -1,4 +1,4 @@ -package initiator +package initiatorpool import ( "container/list" @@ -15,10 +15,10 @@ type receivedMessage struct { } type Worker struct { - id string - stop <-chan struct{} - config *WorkerConfig - conn *transport.Connection + id string + stop <-chan struct{} + config *WorkerConfig + outbound chan []byte } func NewWorker( @@ -37,16 +37,27 @@ func NewWorker( } w := &Worker{ - id: id, - stop: stop, - config: config, + id: id, + stop: stop, + outbound: make(chan []byte, 64), + config: config, } return w, nil } +func (w *Worker) dial(ctx WorkerContext) (*transport.Connection, error) { + conn, err := transport.NewConnection(w.id, ctx.ConnectionConfig, ctx.Logger) + if err != nil { + return nil, err + } + + conn.SetDialer(ctx.Dialer) + return conn, conn.Connect() +} + func (w *Worker) Send(data []byte) error { - return w.conn.Send(data) + return nil } func (w *Worker) Start( @@ -55,15 +66,61 @@ func (w *Worker) Start( ) { } -func (w *Worker) runReader( +func (w *Worker) runSession( conn *transport.Connection, + messages chan<- receivedMessage, heartbeat chan<- struct{}, reconnect chan<- struct{}, - newConn <-chan *transport.Connection, - stop <-chan struct{}, - poolDone <-chan struct{}, + outbound <-chan []byte, + idle <-chan struct{}, + newConn <-chan *transport.Connection, + + ctx WorkerContext, + + workerDone <-chan struct{}, + poolDone <-chan struct{}, +) + +func (w *Worker) runReader( + conn *transport.Connection, + + messages chan<- receivedMessage, + heartbeat chan<- struct{}, + + workerDone <-chan struct{}, + poolDone <-chan struct{}, + sessionDone <-chan struct{}, + + onStop func(), +) { +} + +func (w *Worker) runWriter( + conn *transport.Connection, + + outbound <-chan []byte, + heartbeat chan<- struct{}, + + workerDone <-chan struct{}, + poolDone <-chan struct{}, + sessionDone <-chan struct{}, + + onStop func(), +) { +} + +func (w *Worker) runStopMonitor( + conn *transport.Connection, + + stop <-chan struct{}, + + workerDone <-chan struct{}, + poolDone <-chan struct{}, + sessionDone <-chan struct{}, + + onStop func(), ) { } @@ -113,14 +170,14 @@ func (w *Worker) runForwarder( } } -func (w *Worker) runHealthMonitor( +func (w *Worker) runIdleMonitor( heartbeat <-chan struct{}, - reconnect chan<- struct{}, + idle chan<- struct{}, stop <-chan struct{}, poolDone <-chan struct{}, ) { - // disable if reconnect timeout is disabled - if w.config.ReconnectTimeout <= 0 { + // disable idle timeout if not configured + if w.config.IdleTimeout <= 0 { // wait for stop signal and exit select { case <-stop: @@ -129,7 +186,7 @@ func (w *Worker) runHealthMonitor( return } - timer := time.NewTimer(w.config.ReconnectTimeout) + timer := time.NewTimer(w.config.IdleTimeout) defer timer.Stop() for { @@ -146,16 +203,15 @@ func (w *Worker) runHealthMonitor( default: } } - timer.Reset(w.config.ReconnectTimeout) + timer.Reset(w.config.IdleTimeout) // timer completed case <-timer.C: - // initiate reconnect, then reset the timer - // multiple reconnect signals during reconnection is idempotent + // send idle signal, then reset the timer select { - case reconnect <- struct{}{}: + case idle <- struct{}{}: default: } - timer.Reset(w.config.ReconnectTimeout) + timer.Reset(w.config.IdleTimeout) } } } diff --git a/initiator/worker_test.go b/initiatorpool/worker_test.go similarity index 81% rename from initiator/worker_test.go rename to initiatorpool/worker_test.go index 033062a..f632560 100644 --- a/initiator/worker_test.go +++ b/initiatorpool/worker_test.go @@ -1,4 +1,4 @@ -package initiator +package initiatorpool import ( "git.wisehodl.dev/jay/go-honeybee/honeybeetest" @@ -105,15 +105,15 @@ func TestRunForwarder(t *testing.T) { }) } -func TestRunHealthMonitor(t *testing.T) { - t.Run("heartbeat resets timer, no reconnect fired", func(t *testing.T) { +func TestRunIdleMonitor(t *testing.T) { + t.Run("heartbeat resets timer, no idle signal fired", func(t *testing.T) { heartbeat := make(chan struct{}, 3) - reconnect := make(chan struct{}, 1) + idle := make(chan struct{}, 1) stop := make(chan struct{}) defer close(stop) - w := &Worker{config: &WorkerConfig{ReconnectTimeout: 100 * time.Millisecond}} - go w.runHealthMonitor(heartbeat, reconnect, stop, nil) + w := &Worker{config: &WorkerConfig{IdleTimeout: 100 * time.Millisecond}} + go w.runIdleMonitor(heartbeat, idle, stop, nil) // send heartbeats faster than the timeout for i := 0; i < 5; i++ { @@ -121,10 +121,10 @@ func TestRunHealthMonitor(t *testing.T) { heartbeat <- struct{}{} } - // because the timer is being reset, reconnect should never occur + // because the timer is being reset, idle signal should not be sent assert.Never(t, func() bool { select { - case <-reconnect: + case <-idle: return true default: return false @@ -132,19 +132,19 @@ func TestRunHealthMonitor(t *testing.T) { }, honeybeetest.NegativeTestTimeout, honeybeetest.TestTick) }) - t.Run("reconnect timeout fires reconnect", func(t *testing.T) { + t.Run("idle timeout fires signal", func(t *testing.T) { heartbeat := make(chan struct{}) - reconnect := make(chan struct{}, 1) + idle := make(chan struct{}, 1) stop := make(chan struct{}) defer close(stop) - w := &Worker{config: &WorkerConfig{ReconnectTimeout: 20 * time.Millisecond}} - go w.runHealthMonitor(heartbeat, reconnect, stop, nil) + w := &Worker{config: &WorkerConfig{IdleTimeout: 20 * time.Millisecond}} + go w.runIdleMonitor(heartbeat, idle, stop, nil) - // send no heartbeats, wait for timeout and reconnect signal + // send no heartbeats, wait for timeout and idle signal assert.Eventually(t, func() bool { select { - case <-reconnect: + case <-idle: return true default: return false @@ -154,13 +154,13 @@ func TestRunHealthMonitor(t *testing.T) { t.Run("exits on stop", func(t *testing.T) { heartbeat := make(chan struct{}) - reconnect := make(chan struct{}, 1) + idle := make(chan struct{}, 1) stop := make(chan struct{}) - w := &Worker{config: &WorkerConfig{ReconnectTimeout: 20 * time.Second}} + w := &Worker{config: &WorkerConfig{IdleTimeout: 20 * time.Second}} done := make(chan struct{}) go func() { - w.runHealthMonitor(heartbeat, reconnect, stop, nil) + w.runIdleMonitor(heartbeat, idle, stop, nil) close(done) }()