From 2d5e55ccaa84b01bf3a7b86690aeb7bce24d8c91 Mon Sep 17 00:00:00 2001 From: Jay Date: Sat, 18 Apr 2026 10:59:45 -0400 Subject: [PATCH] Rename to reconnect timeout. --- initiator/config.go | 22 +++++++++++----------- initiator/errors.go | 4 ++-- initiator/worker.go | 10 +++++----- initiator/worker_test.go | 14 ++++++++++---- 4 files changed, 28 insertions(+), 22 deletions(-) diff --git a/initiator/config.go b/initiator/config.go index 35810c4..dc3943b 100644 --- a/initiator/config.go +++ b/initiator/config.go @@ -99,8 +99,8 @@ func WithWorkerFactory(wf WorkerFactory) PoolOption { // Worker Config type WorkerConfig struct { - IdleTimeout time.Duration - MaxQueueSize int + ReconnectTimeout time.Duration + MaxQueueSize int } type WorkerOption func(*WorkerConfig) error @@ -118,8 +118,8 @@ func NewWorkerConfig(options ...WorkerOption) (*WorkerConfig, error) { func GetDefaultWorkerConfig() *WorkerConfig { return &WorkerConfig{ - IdleTimeout: 20 * time.Second, - MaxQueueSize: 0, // disabled by default + ReconnectTimeout: 20 * time.Second, + MaxQueueSize: 0, // disabled by default } } @@ -133,7 +133,7 @@ func applyWorkerOptions(config *WorkerConfig, options ...WorkerOption) error { } func ValidateWorkerConfig(config *WorkerConfig) error { - err := validateIdleTimeout(config.IdleTimeout) + err := validateReconnectTimeout(config.ReconnectTimeout) if err != nil { return err } @@ -153,21 +153,21 @@ func validateMaxQueueSize(value int) error { return nil } -func validateIdleTimeout(value time.Duration) error { +func validateReconnectTimeout(value time.Duration) error { if value < 0 { - return InvalidIdleTimeout + return InvalidReconnectTimeout } return nil } -// When IdleTimeout is set to zero, idle timeouts are disabled. -func WithIdleTimeout(value time.Duration) WorkerOption { +// When ReconnectTimeout is set to zero, idle timeouts are disabled. +func WithReconnectTimeout(value time.Duration) WorkerOption { return func(c *WorkerConfig) error { - err := validateIdleTimeout(value) + err := validateReconnectTimeout(value) if err != nil { return err } - c.IdleTimeout = value + c.ReconnectTimeout = value return nil } } diff --git a/initiator/errors.go b/initiator/errors.go index cf31318..5c4c352 100644 --- a/initiator/errors.go +++ b/initiator/errors.go @@ -4,8 +4,8 @@ import "errors" import "fmt" var ( - InvalidIdleTimeout = errors.New("idle timeout cannot be negative") - InvalidMaxQueueSize = errors.New("maximum queue size cannot be negative") + InvalidReconnectTimeout = errors.New("idle timeout cannot be negative") + InvalidMaxQueueSize = errors.New("maximum queue size cannot be negative") ) func NewConfigError(text string) error { diff --git a/initiator/worker.go b/initiator/worker.go index aba53a2..6388536 100644 --- a/initiator/worker.go +++ b/initiator/worker.go @@ -119,8 +119,8 @@ func (w *Worker) runHealthMonitor( stop <-chan struct{}, poolDone <-chan struct{}, ) { - // disable if idle timeout is disabled - if w.config.IdleTimeout <= 0 { + // disable if reconnect timeout is disabled + if w.config.ReconnectTimeout <= 0 { // wait for stop signal and exit select { case <-stop: @@ -129,7 +129,7 @@ func (w *Worker) runHealthMonitor( return } - timer := time.NewTimer(w.config.IdleTimeout) + timer := time.NewTimer(w.config.ReconnectTimeout) defer timer.Stop() for { @@ -146,7 +146,7 @@ func (w *Worker) runHealthMonitor( default: } } - timer.Reset(w.config.IdleTimeout) + timer.Reset(w.config.ReconnectTimeout) // timer completed case <-timer.C: // initiate reconnect, then reset the timer @@ -155,7 +155,7 @@ func (w *Worker) runHealthMonitor( case reconnect <- struct{}{}: default: } - timer.Reset(w.config.IdleTimeout) + timer.Reset(w.config.ReconnectTimeout) } } } diff --git a/initiator/worker_test.go b/initiator/worker_test.go index aa213dc..033062a 100644 --- a/initiator/worker_test.go +++ b/initiator/worker_test.go @@ -112,7 +112,7 @@ func TestRunHealthMonitor(t *testing.T) { stop := make(chan struct{}) defer close(stop) - w := &Worker{config: &WorkerConfig{IdleTimeout: 100 * time.Millisecond}} + w := &Worker{config: &WorkerConfig{ReconnectTimeout: 100 * time.Millisecond}} go w.runHealthMonitor(heartbeat, reconnect, stop, nil) // send heartbeats faster than the timeout @@ -132,13 +132,13 @@ func TestRunHealthMonitor(t *testing.T) { }, honeybeetest.NegativeTestTimeout, honeybeetest.TestTick) }) - t.Run("idle timeout fires reconnect", func(t *testing.T) { + t.Run("reconnect timeout fires reconnect", func(t *testing.T) { heartbeat := make(chan struct{}) reconnect := make(chan struct{}, 1) stop := make(chan struct{}) defer close(stop) - w := &Worker{config: &WorkerConfig{IdleTimeout: 20 * time.Millisecond}} + w := &Worker{config: &WorkerConfig{ReconnectTimeout: 20 * time.Millisecond}} go w.runHealthMonitor(heartbeat, reconnect, stop, nil) // send no heartbeats, wait for timeout and reconnect signal @@ -157,7 +157,7 @@ func TestRunHealthMonitor(t *testing.T) { reconnect := make(chan struct{}, 1) stop := make(chan struct{}) - w := &Worker{config: &WorkerConfig{IdleTimeout: 20 * time.Second}} + w := &Worker{config: &WorkerConfig{ReconnectTimeout: 20 * time.Second}} done := make(chan struct{}) go func() { w.runHealthMonitor(heartbeat, reconnect, stop, nil) @@ -177,3 +177,9 @@ func TestRunHealthMonitor(t *testing.T) { }) } + +func TestRunReconnector(t *testing.T) { + t.Run("reconnect emits events, new connection", func(t *testing.T) {}) + t.Run("dial failure emits error", func(t *testing.T) {}) + t.Run("exits on stop", func(t *testing.T) {}) +}