diff --git a/honeybee.go b/honeybee.go index a15dd6b..9073d03 100644 --- a/honeybee.go +++ b/honeybee.go @@ -58,17 +58,19 @@ type InboundPoolEventKind = inbound.PoolEventKind // Inbound Pool event constants const ( - InboundEventDisconnected = inbound.EventDisconnected - InboundEventDropped = inbound.EventDropped - InboundEventEvicted = inbound.EventEvicted + InboundEventDisconnected = inbound.EventDisconnected + InboundEventDroppedClose = inbound.EventDroppedClose + InboundEventDroppedError = inbound.EventDroppedError + InboundEventEvictedPolicy = inbound.EventEvictedPolicy ) // Inbound Worker exit kinds const ( - InboundExitDisconnected = inbound.ExitDisconnected - InboundExitError = inbound.ExitError - InboundExitPolicy = inbound.ExitPolicy + InboundExitDisconnected = inbound.ExitDisconnected + InboundExitUnexpectedClose = inbound.ExitUnexpectedClose + InboundExitReadError = inbound.ExitReadError + InboundExitPolicy = inbound.ExitPolicy ) // Connection constructors diff --git a/inbound/config.go b/inbound/config.go index 0ba11da..87ccb9c 100644 --- a/inbound/config.go +++ b/inbound/config.go @@ -1,4 +1,3 @@ -// responderpool/config.go package inbound import ( diff --git a/inbound/pool.go b/inbound/pool.go index c9dc2f3..a87d39d 100644 --- a/inbound/pool.go +++ b/inbound/pool.go @@ -16,15 +16,17 @@ import ( type PoolEventKind string const ( - EventDisconnected PoolEventKind = "disconnected" - EventDropped PoolEventKind = "dropped" - EventEvicted PoolEventKind = "evicted" + EventDisconnected PoolEventKind = "disconnected" + EventDroppedClose PoolEventKind = "dropped_close" + EventDroppedError PoolEventKind = "dropped_error" + EventEvictedPolicy PoolEventKind = "evicted_policy" ) var workerToPoolEvent = map[WorkerExitKind]PoolEventKind{ - ExitDisconnected: EventDisconnected, - ExitError: EventDropped, - ExitPolicy: EventEvicted, + ExitDisconnected: EventDisconnected, + ExitUnexpectedClose: EventDroppedClose, + ExitReadError: EventDroppedError, + ExitPolicy: EventEvictedPolicy, } type OnExitFunction func(kind WorkerExitKind) diff --git a/inbound/pool_test.go b/inbound/pool_test.go index 5cf473f..7a135bd 100644 --- a/inbound/pool_test.go +++ b/inbound/pool_test.go @@ -333,7 +333,7 @@ func TestPoolEvents(t *testing.T) { Err: &websocket.CloseError{Code: websocket.CloseProtocolError}, } - expectEvent(t, pool.Events(), "peer-1", EventDropped) + expectEvent(t, pool.Events(), "peer-1", EventDroppedClose) honeybeetest.Eventually(t, func() bool { return !slices.Contains(pool.Peers(), "peer-1") @@ -353,7 +353,7 @@ func TestPoolEvents(t *testing.T) { socket, _, _ := honeybeetest.SetupTestSocket(t) pool.Add("peer-1", socket) - expectEvent(t, pool.Events(), "peer-1", EventEvicted) + expectEvent(t, pool.Events(), "peer-1", EventEvictedPolicy) honeybeetest.Eventually(t, func() bool { return !slices.Contains(pool.Peers(), "peer-1") diff --git a/inbound/worker.go b/inbound/worker.go index dca42c4..dc52465 100644 --- a/inbound/worker.go +++ b/inbound/worker.go @@ -20,9 +20,10 @@ type Worker interface { type WorkerExitKind string const ( - ExitDisconnected WorkerExitKind = "disconnected" - ExitError WorkerExitKind = "error" - ExitPolicy WorkerExitKind = "policy" + ExitDisconnected WorkerExitKind = "disconnected" + ExitUnexpectedClose WorkerExitKind = "unexpected_close" + ExitReadError WorkerExitKind = "read_error" + ExitPolicy WorkerExitKind = "policy" ) type DefaultWorker struct { @@ -147,7 +148,7 @@ func RunReader( var err error // determine exit kind // by default, the peer dropped unexpectedly - kind := ExitError + kind := ExitUnexpectedClose select { // the peer-side error is sent before the connection is closed, // so a non-blocking call here is correct @@ -156,11 +157,17 @@ func RunReader( if errors.Is(err, transport.ErrPeerClosedClean) { kind = ExitDisconnected } + if errors.Is(err, transport.ErrPeerClosedUnexpected) { + kind = ExitUnexpectedClose + } + if errors.Is(err, transport.ErrReadError) { + kind = ExitReadError + } default: } if logger != nil { - if kind == ExitError { + if kind == ExitUnexpectedClose || kind == ExitReadError { logger.Error("reader: peer dropped", "event", kind, "error", err) } else { logger.Info("reader: peer disconnected", "event", kind) @@ -280,7 +287,7 @@ func RunWatchdog( case <-timer.C: // signal peer is inactive if logger != nil { - logger.Info("watchdog: peer is inactive") + logger.Info("watchdog: no activity observed") } onInactive(ExitPolicy) return diff --git a/inbound/worker_reader_test.go b/inbound/worker_reader_test.go index ab13f84..6c0b860 100644 --- a/inbound/worker_reader_test.go +++ b/inbound/worker_reader_test.go @@ -140,7 +140,7 @@ func TestRunReader(t *testing.T) { } }, "expected onPeerClose") - assert.Equal(t, ExitError, gotKind) + assert.Equal(t, ExitUnexpectedClose, gotKind) }) t.Run("read error calls onPeerClose with ExitUnexpectedDrop", func(t *testing.T) { @@ -177,7 +177,7 @@ func TestRunReader(t *testing.T) { } }, "expected onPeerClose") - assert.Equal(t, ExitError, gotKind) + assert.Equal(t, ExitReadError, gotKind) }) t.Run("ctx.Done exits without calling onPeerClose", func(t *testing.T) { diff --git a/inbound/worker_test.go b/inbound/worker_test.go index c9a2f71..8b3aef5 100644 --- a/inbound/worker_test.go +++ b/inbound/worker_test.go @@ -115,7 +115,7 @@ func TestWorkerStart(t *testing.T) { honeybeetest.Eventually(t, func() bool { val := v.exitKind.Load() - return val != nil && val.(WorkerExitKind) == ExitError + return val != nil && val.(WorkerExitKind) == ExitUnexpectedClose }, "expected ExitUnexpectedDrop") }) diff --git a/outbound/pool.go b/outbound/pool.go index b9bd579..bd8eee6 100644 --- a/outbound/pool.go +++ b/outbound/pool.go @@ -228,7 +228,7 @@ func (p *Pool) Connect(id string) error { p.peers[id] = &Peer{id: id, worker: worker} if p.logger != nil { - p.logger.Info("connected to peer", "peer", id) + p.logger.Info("registered peer", "peer", id) } return nil diff --git a/outbound/worker.go b/outbound/worker.go index 0dd87ae..8566a10 100644 --- a/outbound/worker.go +++ b/outbound/worker.go @@ -416,7 +416,7 @@ func RunKeepalive( case <-timer.C: // send keepalive signal, then reset the timer if logger != nil { - logger.Info("keepalive: peer is inactive") + logger.Info("keepalive: no activity observed") } select { case keepalive <- struct{}{}: diff --git a/transport/config.go b/transport/config.go index 384aedf..95cfc93 100644 --- a/transport/config.go +++ b/transport/config.go @@ -56,7 +56,7 @@ func GetDefaultRetryConfig() *RetryConfig { MaxRetries: 0, // Infinite retries InitialDelay: 1 * time.Second, MaxDelay: 60 * time.Second, - JitterFactor: 0.5, + JitterFactor: 0.2, } } diff --git a/transport/config_test.go b/transport/config_test.go index 86c24e8..e9455e0 100644 --- a/transport/config_test.go +++ b/transport/config_test.go @@ -56,7 +56,7 @@ func TestDefaultRetryConnectionConfig(t *testing.T) { MaxRetries: 0, InitialDelay: 1 * time.Second, MaxDelay: 60 * time.Second, - JitterFactor: 0.5, + JitterFactor: 0.2, }) } diff --git a/transport/connection.go b/transport/connection.go index 62b8983..e3b43d8 100644 --- a/transport/connection.go +++ b/transport/connection.go @@ -399,7 +399,6 @@ func (c *Connection) Send(data []byte) error { if c.logger != nil { c.logger.Error("write deadline error", "error", err) } - c.shutdownExternal() return NewConnectionError(fmt.Errorf("%w: %w", ErrFailedWriteDeadline, err)) } } diff --git a/transport/connection_send_test.go b/transport/connection_send_test.go index ada8823..fdda050 100644 --- a/transport/connection_send_test.go +++ b/transport/connection_send_test.go @@ -215,9 +215,9 @@ func TestConnectionSend(t *testing.T) { err = conn.Send([]byte("test")) assert.ErrorIs(t, err, ErrFailedWriteDeadline) - honeybeetest.Eventually(t, func() bool { + honeybeetest.Never(t, func() bool { return conn.State() == StateClosed - }, "expected closed state") + }, "write error does not close connection") }) t.Run("send fails on socket write error", func(t *testing.T) { diff --git a/transport/retry.go b/transport/retry.go index bfb5048..f12e2b0 100644 --- a/transport/retry.go +++ b/transport/retry.go @@ -14,7 +14,7 @@ type RetryManager struct { func NewRetryManager(config *RetryConfig) *RetryManager { // saturationCount: retry count at which base delay meets or exceeds MaxDelay. - // Conservative by one to preserve jitter variance near the boundary. + // Conservative by two to preserve jitter variance near the boundary. saturation := 0 if config != nil && config.InitialDelay > 0 &&