diff --git a/initiatorpool/config.go b/initiatorpool/config.go index 68cee65..b2ae5a2 100644 --- a/initiatorpool/config.go +++ b/initiatorpool/config.go @@ -99,8 +99,8 @@ func WithWorkerFactory(wf WorkerFactory) PoolOption { // Worker Config type WorkerConfig struct { - IdleTimeout time.Duration - MaxQueueSize int + KeepaliveTimeout time.Duration + MaxQueueSize int } type WorkerOption func(*WorkerConfig) error @@ -118,8 +118,8 @@ func NewWorkerConfig(options ...WorkerOption) (*WorkerConfig, error) { func GetDefaultWorkerConfig() *WorkerConfig { return &WorkerConfig{ - IdleTimeout: 20 * time.Second, - MaxQueueSize: 0, // disabled by default + KeepaliveTimeout: 20 * time.Second, + MaxQueueSize: 0, // disabled by default } } @@ -133,7 +133,7 @@ func applyWorkerOptions(config *WorkerConfig, options ...WorkerOption) error { } func ValidateWorkerConfig(config *WorkerConfig) error { - err := validateIdleTimeout(config.IdleTimeout) + err := validateKeepaliveTimeout(config.KeepaliveTimeout) if err != nil { return err } @@ -153,21 +153,21 @@ func validateMaxQueueSize(value int) error { return nil } -func validateIdleTimeout(value time.Duration) error { +func validateKeepaliveTimeout(value time.Duration) error { if value < 0 { - return InvalidIdleTimeout + return InvalidKeepaliveTimeout } return nil } -// When IdleTimeout is set to zero, idle timeouts are disabled. -func WithIdleTimeout(value time.Duration) WorkerOption { +// When KeepaliveTimeout is set to zero, keepalive timeouts are disabled. +func WithKeepaliveTimeout(value time.Duration) WorkerOption { return func(c *WorkerConfig) error { - err := validateIdleTimeout(value) + err := validateKeepaliveTimeout(value) if err != nil { return err } - c.IdleTimeout = value + c.KeepaliveTimeout = value return nil } } diff --git a/initiatorpool/errors.go b/initiatorpool/errors.go index bf80c96..2044cce 100644 --- a/initiatorpool/errors.go +++ b/initiatorpool/errors.go @@ -4,8 +4,8 @@ import "errors" import "fmt" var ( - InvalidIdleTimeout = errors.New("idle timeout cannot be negative") - InvalidMaxQueueSize = errors.New("maximum queue size cannot be negative") + InvalidKeepaliveTimeout = errors.New("keepalive timeout cannot be negative") + InvalidMaxQueueSize = errors.New("maximum queue size cannot be negative") ) func NewConfigError(text string) error { @@ -15,3 +15,7 @@ func NewConfigError(text string) error { func NewPoolError(text string) error { return fmt.Errorf("pool error: %s", text) } + +func NewWorkerError(id string, text string) error { + return fmt.Errorf("worker %q error: %s", id, text) +} diff --git a/initiatorpool/worker.go b/initiatorpool/worker.go index aebf1a5..87da7bc 100644 --- a/initiatorpool/worker.go +++ b/initiatorpool/worker.go @@ -57,7 +57,14 @@ func (w *Worker) dial(ctx WorkerContext) (*transport.Connection, error) { } func (w *Worker) Send(data []byte) error { - return nil + select { + case w.outbound <- data: + return nil + case <-w.stop: + return NewWorkerError(w.id, "worker is stopped") + default: + return NewWorkerError(w.id, "outbound queue full") + } } func (w *Worker) Start( @@ -67,59 +74,45 @@ func (w *Worker) Start( } func (w *Worker) runSession( - conn *transport.Connection, - messages chan<- receivedMessage, heartbeat chan<- struct{}, - reconnect chan<- struct{}, + dial chan<- struct{}, + keepalive <-chan struct{}, outbound <-chan []byte, - idle <-chan struct{}, newConn <-chan *transport.Connection, ctx WorkerContext, - workerDone <-chan struct{}, + workerStop <-chan struct{}, poolDone <-chan struct{}, -) +) { +} func (w *Worker) runReader( conn *transport.Connection, - messages chan<- receivedMessage, heartbeat chan<- struct{}, - - workerDone <-chan struct{}, - poolDone <-chan struct{}, sessionDone <-chan struct{}, - onStop func(), ) { } func (w *Worker) runWriter( conn *transport.Connection, - outbound <-chan []byte, heartbeat chan<- struct{}, - - workerDone <-chan struct{}, - poolDone <-chan struct{}, sessionDone <-chan struct{}, - onStop func(), ) { } func (w *Worker) runStopMonitor( conn *transport.Connection, - - stop <-chan struct{}, - - workerDone <-chan struct{}, + keepalive <-chan struct{}, + workerStop <-chan struct{}, poolDone <-chan struct{}, sessionDone <-chan struct{}, - onStop func(), ) { } @@ -170,14 +163,14 @@ func (w *Worker) runForwarder( } } -func (w *Worker) runIdleMonitor( +func (w *Worker) runKeepalive( heartbeat <-chan struct{}, - idle chan<- struct{}, + keepalive chan<- struct{}, stop <-chan struct{}, poolDone <-chan struct{}, ) { - // disable idle timeout if not configured - if w.config.IdleTimeout <= 0 { + // disable keepalive timeout if not configured + if w.config.KeepaliveTimeout <= 0 { // wait for stop signal and exit select { case <-stop: @@ -186,7 +179,7 @@ func (w *Worker) runIdleMonitor( return } - timer := time.NewTimer(w.config.IdleTimeout) + timer := time.NewTimer(w.config.KeepaliveTimeout) defer timer.Stop() for { @@ -203,22 +196,23 @@ func (w *Worker) runIdleMonitor( default: } } - timer.Reset(w.config.IdleTimeout) + timer.Reset(w.config.KeepaliveTimeout) // timer completed case <-timer.C: - // send idle signal, then reset the timer + // send keepalive signal, then reset the timer select { - case idle <- struct{}{}: + case keepalive <- struct{}{}: default: } - timer.Reset(w.config.IdleTimeout) + timer.Reset(w.config.KeepaliveTimeout) } } } -func (w *Worker) runReconnector( - reconnect <-chan struct{}, +func (w *Worker) runDialer( + dial <-chan struct{}, newConn chan<- *transport.Connection, + ctx WorkerContext, stop <-chan struct{}, poolDone <-chan struct{}, ) { diff --git a/initiatorpool/worker_test.go b/initiatorpool/worker_test.go index f632560..07b554a 100644 --- a/initiatorpool/worker_test.go +++ b/initiatorpool/worker_test.go @@ -105,15 +105,15 @@ func TestRunForwarder(t *testing.T) { }) } -func TestRunIdleMonitor(t *testing.T) { - t.Run("heartbeat resets timer, no idle signal fired", func(t *testing.T) { +func TestRunKeepalive(t *testing.T) { + t.Run("heartbeat resets timer, no keepalive signal fired", func(t *testing.T) { heartbeat := make(chan struct{}, 3) - idle := make(chan struct{}, 1) + keepalive := make(chan struct{}, 1) stop := make(chan struct{}) defer close(stop) - w := &Worker{config: &WorkerConfig{IdleTimeout: 100 * time.Millisecond}} - go w.runIdleMonitor(heartbeat, idle, stop, nil) + w := &Worker{config: &WorkerConfig{KeepaliveTimeout: 100 * time.Millisecond}} + go w.runKeepalive(heartbeat, keepalive, stop, nil) // send heartbeats faster than the timeout for i := 0; i < 5; i++ { @@ -121,10 +121,10 @@ func TestRunIdleMonitor(t *testing.T) { heartbeat <- struct{}{} } - // because the timer is being reset, idle signal should not be sent + // because the timer is being reset, keepalive signal should not be sent assert.Never(t, func() bool { select { - case <-idle: + case <-keepalive: return true default: return false @@ -132,19 +132,19 @@ func TestRunIdleMonitor(t *testing.T) { }, honeybeetest.NegativeTestTimeout, honeybeetest.TestTick) }) - t.Run("idle timeout fires signal", func(t *testing.T) { + t.Run("keepalive timeout fires signal", func(t *testing.T) { heartbeat := make(chan struct{}) - idle := make(chan struct{}, 1) + keepalive := make(chan struct{}, 1) stop := make(chan struct{}) defer close(stop) - w := &Worker{config: &WorkerConfig{IdleTimeout: 20 * time.Millisecond}} - go w.runIdleMonitor(heartbeat, idle, stop, nil) + w := &Worker{config: &WorkerConfig{KeepaliveTimeout: 20 * time.Millisecond}} + go w.runKeepalive(heartbeat, keepalive, stop, nil) - // send no heartbeats, wait for timeout and idle signal + // send no heartbeats, wait for timeout and keepalive signal assert.Eventually(t, func() bool { select { - case <-idle: + case <-keepalive: return true default: return false @@ -154,13 +154,13 @@ func TestRunIdleMonitor(t *testing.T) { t.Run("exits on stop", func(t *testing.T) { heartbeat := make(chan struct{}) - idle := make(chan struct{}, 1) + keepalive := make(chan struct{}, 1) stop := make(chan struct{}) - w := &Worker{config: &WorkerConfig{IdleTimeout: 20 * time.Second}} + w := &Worker{config: &WorkerConfig{KeepaliveTimeout: 20 * time.Second}} done := make(chan struct{}) go func() { - w.runIdleMonitor(heartbeat, idle, stop, nil) + w.runKeepalive(heartbeat, keepalive, stop, nil) close(done) }() @@ -177,9 +177,3 @@ func TestRunIdleMonitor(t *testing.T) { }) } - -func TestRunReconnector(t *testing.T) { - t.Run("reconnect emits events, new connection", func(t *testing.T) {}) - t.Run("dial failure emits error", func(t *testing.T) {}) - t.Run("exits on stop", func(t *testing.T) {}) -}