diff --git a/honeybee.go b/honeybee.go index c6b0933..a360448 100644 --- a/honeybee.go +++ b/honeybee.go @@ -52,17 +52,17 @@ type InboundPoolEventKind = inbound.PoolEventKind // Inbound Pool event constants const ( - EventPeerDisconnected = inbound.EventPeerDisconnected - EventPeerDropped = inbound.EventPeerDropped - EventPeerEvicted = inbound.EventPeerEvicted + EventPeerDisconnected = inbound.EventDisconnected + EventPeerDropped = inbound.EventDropped + EventPeerEvicted = inbound.EventEvicted ) // Inbound Worker exit kinds const ( - ExitCleanDisconnect = inbound.ExitCleanDisconnect - ExitUnexpectedDrop = inbound.ExitUnexpectedDrop - ExitInactive = inbound.ExitInactive + ExitCleanDisconnect = inbound.ExitDisconnected + ExitUnexpectedDrop = inbound.ExitError + ExitInactive = inbound.ExitPolicy ) // Connection constructors diff --git a/inbound/config.go b/inbound/config.go index 3e9a7ad..fb23d1c 100644 --- a/inbound/config.go +++ b/inbound/config.go @@ -10,8 +10,8 @@ import ( // Worker Config type WorkerConfig struct { - MaxQueueSize int - DeadTimeout time.Duration + MaxQueueSize int + InactivityTimeout time.Duration } type WorkerOption func(*WorkerConfig) error @@ -29,8 +29,8 @@ func NewWorkerConfig(options ...WorkerOption) (*WorkerConfig, error) { func GetDefaultWorkerConfig() *WorkerConfig { return &WorkerConfig{ - MaxQueueSize: 0, // queue can grow indefinitely by default - DeadTimeout: 0, // eviction disabled by default + MaxQueueSize: 0, // queue can grow indefinitely by default + InactivityTimeout: 0, // eviction disabled by default } } @@ -47,7 +47,7 @@ func ValidateWorkerConfig(config *WorkerConfig) error { if err := validateMaxQueueSize(config.MaxQueueSize); err != nil { return err } - if err := validateDeadTimeout(config.DeadTimeout); err != nil { + if err := validateDeadTimeout(config.InactivityTimeout); err != nil { return err } return nil @@ -84,7 +84,7 @@ func WithDeadTimeout(value time.Duration) WorkerOption { if err := validateDeadTimeout(value); err != nil { return err } - c.DeadTimeout = value + c.InactivityTimeout = value return nil } } diff --git a/inbound/config_test.go b/inbound/config_test.go index d4d5c22..8af0fc3 100644 --- a/inbound/config_test.go +++ b/inbound/config_test.go @@ -17,8 +17,8 @@ func TestNewWorkerConfig(t *testing.T) { func TestDefaultWorkerConfig(t *testing.T) { conf := GetDefaultWorkerConfig() assert.Equal(t, &WorkerConfig{ - MaxQueueSize: 0, - DeadTimeout: 0, + MaxQueueSize: 0, + InactivityTimeout: 0, }, conf) } @@ -34,11 +34,11 @@ func TestValidateWorkerConfig(t *testing.T) { }, { name: "zero dead timeout disabled", - conf: WorkerConfig{DeadTimeout: 0}, + conf: WorkerConfig{InactivityTimeout: 0}, }, { name: "positive dead timeout", - conf: WorkerConfig{DeadTimeout: 30 * time.Second}, + conf: WorkerConfig{InactivityTimeout: 30 * time.Second}, }, { name: "negative max queue size", @@ -47,7 +47,7 @@ func TestValidateWorkerConfig(t *testing.T) { }, { name: "negative dead timeout", - conf: WorkerConfig{DeadTimeout: -1 * time.Second}, + conf: WorkerConfig{InactivityTimeout: -1 * time.Second}, wantErr: InvalidDeadTimeout, }, } @@ -83,7 +83,7 @@ func TestWithDeadTimeout(t *testing.T) { err := applyWorkerOptions(conf, WithDeadTimeout(30*time.Second)) assert.NoError(t, err) - assert.Equal(t, 30*time.Second, conf.DeadTimeout) + assert.Equal(t, 30*time.Second, conf.InactivityTimeout) err = applyWorkerOptions(conf, WithDeadTimeout(0)) assert.NoError(t, err) @@ -179,9 +179,9 @@ func TestWithConnectionConfig(t *testing.T) { func TestWithWorkerConfig(t *testing.T) { conf := &PoolConfig{} - err := applyPoolOptions(conf, WithWorkerConfig(&WorkerConfig{DeadTimeout: 30 * time.Second})) + err := applyPoolOptions(conf, WithWorkerConfig(&WorkerConfig{InactivityTimeout: 30 * time.Second})) assert.NoError(t, err) - assert.Equal(t, 30*time.Second, conf.WorkerConfig.DeadTimeout) + assert.Equal(t, 30*time.Second, conf.WorkerConfig.InactivityTimeout) err = applyPoolOptions(conf, WithWorkerConfig(&WorkerConfig{MaxQueueSize: -1})) assert.Error(t, err) diff --git a/inbound/pool.go b/inbound/pool.go index 35b96c0..1596b56 100644 --- a/inbound/pool.go +++ b/inbound/pool.go @@ -12,18 +12,18 @@ import ( // Types -type PoolEventKind string +type PoolEventKind int const ( - EventPeerDisconnected PoolEventKind = "disconnected" - EventPeerDropped PoolEventKind = "dropped" - EventPeerEvicted PoolEventKind = "evicted" + EventDisconnected PoolEventKind = iota + EventDropped + EventEvicted ) var workerToPoolEvent = map[WorkerExitKind]PoolEventKind{ - ExitCleanDisconnect: EventPeerDisconnected, - ExitUnexpectedDrop: EventPeerDropped, - ExitInactive: EventPeerEvicted, + ExitDisconnected: EventDisconnected, + ExitError: EventDropped, + ExitPolicy: EventEvicted, } type OnExitFunction func(kind WorkerExitKind) diff --git a/inbound/pool_test.go b/inbound/pool_test.go index 8be7ab2..d485d6a 100644 --- a/inbound/pool_test.go +++ b/inbound/pool_test.go @@ -310,7 +310,7 @@ func TestPoolEvents(t *testing.T) { Err: &websocket.CloseError{Code: websocket.CloseNormalClosure}, } - expectEvent(t, pool.Events(), "peer-1", EventPeerDisconnected) + expectEvent(t, pool.Events(), "peer-1", EventDisconnected) honeybeetest.Eventually(t, func() bool { return !slices.Contains(pool.Peers(), "peer-1") @@ -328,7 +328,7 @@ func TestPoolEvents(t *testing.T) { Err: &websocket.CloseError{Code: websocket.CloseProtocolError}, } - expectEvent(t, pool.Events(), "peer-1", EventPeerDropped) + expectEvent(t, pool.Events(), "peer-1", EventDropped) honeybeetest.Eventually(t, func() bool { return !slices.Contains(pool.Peers(), "peer-1") @@ -337,7 +337,7 @@ func TestPoolEvents(t *testing.T) { t.Run("EventPeerEvicted emitted on watchdog timeout", func(t *testing.T) { config, err := NewPoolConfig( - WithWorkerConfig(&WorkerConfig{DeadTimeout: 20 * time.Millisecond}), + WithWorkerConfig(&WorkerConfig{InactivityTimeout: 20 * time.Millisecond}), ) assert.NoError(t, err) @@ -348,7 +348,7 @@ func TestPoolEvents(t *testing.T) { socket, _, _ := honeybeetest.SetupTestSocket(t) pool.Add("peer-1", socket) - expectEvent(t, pool.Events(), "peer-1", EventPeerEvicted) + expectEvent(t, pool.Events(), "peer-1", EventEvicted) honeybeetest.Eventually(t, func() bool { return !slices.Contains(pool.Peers(), "peer-1") diff --git a/inbound/worker.go b/inbound/worker.go index 45fa208..82d01be 100644 --- a/inbound/worker.go +++ b/inbound/worker.go @@ -15,12 +15,12 @@ type Worker interface { Send(data []byte) error } -type WorkerExitKind string +type WorkerExitKind int const ( - ExitCleanDisconnect WorkerExitKind = "disconnected" - ExitUnexpectedDrop WorkerExitKind = "dropped" - ExitInactive WorkerExitKind = "inactive" + ExitDisconnected WorkerExitKind = iota + ExitError + ExitPolicy ) type ReceivedMessage struct { @@ -79,7 +79,7 @@ func (w *DefaultWorker) Start(pool PoolPlugin, wg *sync.WaitGroup) { go func() { defer owg.Done() - RunWatchdog(w.ctx, pool.OnExit, w.heartbeat, w.config.DeadTimeout) + RunWatchdog(w.ctx, pool.OnExit, w.heartbeat, w.config.InactivityTimeout) }() owg.Wait() @@ -119,14 +119,14 @@ func RunReader( if !ok { // determine exit kind // by default, the peer dropped unexpectedly - kind := ExitUnexpectedDrop + kind := ExitError select { // the peer-side error is sent before the connection is closed, // so a non-blocking call here is correct // if an error is not sent, then assume the default event kind case err := <-conn.Errors(): if errors.Is(err, transport.ErrPeerClosedClean) { - kind = ExitCleanDisconnect + kind = ExitDisconnected } default: } @@ -228,7 +228,7 @@ func RunWatchdog( // timer completed case <-timer.C: // signal peer is inactive - onInactive(ExitInactive) + onInactive(ExitPolicy) return } } diff --git a/inbound/worker_reader_test.go b/inbound/worker_reader_test.go index 0be4ed4..f62d000 100644 --- a/inbound/worker_reader_test.go +++ b/inbound/worker_reader_test.go @@ -102,7 +102,7 @@ func TestRunReader(t *testing.T) { } }, "expected onPeerClose") - assert.Equal(t, ExitCleanDisconnect, gotKind) + assert.Equal(t, ExitDisconnected, gotKind) }) t.Run("unexpected close calls onPeerClose with ExitUnexpectedDrop", func(t *testing.T) { @@ -139,7 +139,7 @@ func TestRunReader(t *testing.T) { } }, "expected onPeerClose") - assert.Equal(t, ExitUnexpectedDrop, gotKind) + assert.Equal(t, ExitError, gotKind) }) t.Run("read error calls onPeerClose with ExitUnexpectedDrop", func(t *testing.T) { @@ -176,7 +176,7 @@ func TestRunReader(t *testing.T) { } }, "expected onPeerClose") - assert.Equal(t, ExitUnexpectedDrop, gotKind) + assert.Equal(t, ExitError, gotKind) }) t.Run("ctx.Done exits without calling onPeerClose", func(t *testing.T) { diff --git a/inbound/worker_test.go b/inbound/worker_test.go index 10129b9..040d10b 100644 --- a/inbound/worker_test.go +++ b/inbound/worker_test.go @@ -99,7 +99,7 @@ func TestWorkerStart(t *testing.T) { honeybeetest.Eventually(t, func() bool { val := v.exitKind.Load() - return val != nil && val.(WorkerExitKind) == ExitCleanDisconnect + return val != nil && val.(WorkerExitKind) == ExitDisconnected }, "expected ExitCleanDisconnect") }) @@ -115,7 +115,7 @@ func TestWorkerStart(t *testing.T) { honeybeetest.Eventually(t, func() bool { val := v.exitKind.Load() - return val != nil && val.(WorkerExitKind) == ExitUnexpectedDrop + return val != nil && val.(WorkerExitKind) == ExitError }, "expected ExitUnexpectedDrop") }) @@ -124,7 +124,7 @@ func TestWorkerStart(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) worker, err := NewWorker(ctx, "peer-1", conn, &WorkerConfig{ - DeadTimeout: 20 * time.Millisecond, + InactivityTimeout: 20 * time.Millisecond, }) assert.NoError(t, err) worker.cancel = cancel @@ -146,7 +146,7 @@ func TestWorkerStart(t *testing.T) { honeybeetest.Eventually(t, func() bool { val := exitKind.Load() - return val != nil && val.(WorkerExitKind) == ExitInactive + return val != nil && val.(WorkerExitKind) == ExitPolicy }, "expected ExitInactive") }) } diff --git a/inbound/worker_watchdog_test.go b/inbound/worker_watchdog_test.go index 614270e..6d0d0fe 100644 --- a/inbound/worker_watchdog_test.go +++ b/inbound/worker_watchdog_test.go @@ -52,7 +52,7 @@ func TestRunWatchdog(t *testing.T) { }, "expected onInactive") assert.Equal(t, int32(1), count.Load()) - assert.Equal(t, ExitInactive, gotKind) + assert.Equal(t, ExitPolicy, gotKind) }) t.Run("ctx.Done exits without calling onInactive", func(t *testing.T) {