diff --git a/honeybee.go b/honeybee.go index 439dee8..a4c3edb 100644 --- a/honeybee.go +++ b/honeybee.go @@ -125,7 +125,6 @@ func NewOutboundWorkerConfig(opts ...OutboundWorkerOption) (*OutboundWorkerConfi var ( WithOutboundInboxBufferSize = outbound.WithInboxBufferSize WithOutboundEventsBufferSize = outbound.WithEventsBufferSize - WithOutboundErrorsBufferSize = outbound.WithErrorsBufferSize WithOutboundPoolLoggingEnabled = outbound.WithPoolLoggingEnabled WithOutboundPoolLogLevel = outbound.WithPoolLogLevel WithOutboundConnectionConfig = outbound.WithConnectionConfig @@ -161,7 +160,6 @@ func NewInboundWorkerConfig(opts ...InboundWorkerOption) (*InboundWorkerConfig, var ( WithInboundInboxBufferSize = inbound.WithInboxBufferSize WithInboundEventsBufferSize = inbound.WithEventsBufferSize - WithInboundErrorsBufferSize = inbound.WithErrorsBufferSize WithInboundPoolLoggingEnabled = inbound.WithPoolLoggingEnabled WithInboundPoolLogLevel = inbound.WithPoolLogLevel WithInboundConnectionConfig = inbound.WithConnectionConfig diff --git a/inbound/config.go b/inbound/config.go index 87ccb9c..967ec5f 100644 --- a/inbound/config.go +++ b/inbound/config.go @@ -20,7 +20,6 @@ type WorkerFactory func( type PoolConfig struct { InboxBufferSize int EventsBufferSize int - ErrorsBufferSize int LoggingEnabled bool LogLevel *slog.Level ConnectionConfig *transport.ConnectionConfig @@ -45,7 +44,6 @@ func GetDefaultPoolConfig() *PoolConfig { return &PoolConfig{ InboxBufferSize: 256, EventsBufferSize: 10, - ErrorsBufferSize: 10, LoggingEnabled: true, LogLevel: nil, ConnectionConfig: nil, @@ -104,16 +102,6 @@ func WithEventsBufferSize(value int) PoolOption { } } -func WithErrorsBufferSize(value int) PoolOption { - return func(c *PoolConfig) error { - if err := validateBufferSize(value); err != nil { - return err - } - c.ErrorsBufferSize = value - return nil - } -} - func WithPoolLoggingEnabled(value bool) PoolOption { return func(c *PoolConfig) error { c.LoggingEnabled = value diff --git a/inbound/config_test.go b/inbound/config_test.go index 4f804e9..ef46f01 100644 --- a/inbound/config_test.go +++ b/inbound/config_test.go @@ -105,7 +105,6 @@ func TestDefaultPoolConfig(t *testing.T) { assert.Equal(t, &PoolConfig{ InboxBufferSize: 256, EventsBufferSize: 10, - ErrorsBufferSize: 10, LoggingEnabled: true, LogLevel: nil, ConnectionConfig: nil, @@ -174,7 +173,6 @@ func TestWithBufferSizes(t *testing.T) { err := applyPoolOptions(conf, WithInboxBufferSize(100), WithEventsBufferSize(20), - WithErrorsBufferSize(20), ) assert.NoError(t, err) assert.Equal(t, 100, conf.InboxBufferSize) diff --git a/inbound/pool.go b/inbound/pool.go index 95b004b..b29d4ac 100644 --- a/inbound/pool.go +++ b/inbound/pool.go @@ -41,7 +41,6 @@ type PoolEvent struct { type PoolStats struct { ChanInbox int ChanEvents int - ChanErrors int TotalReceived uint64 TotalSent uint64 @@ -59,7 +58,6 @@ type PeerStats struct { type PoolPlugin struct { Inbox chan<- types.InboxMessage Events chan<- PoolEvent - Errors chan<- error InboxCounter *atomic.Uint64 OnExit OnExitFunction Handler slog.Handler @@ -83,7 +81,6 @@ type Pool struct { peers map[string]*Peer inbox chan types.InboxMessage events chan PoolEvent - errors chan error inboxCounter *atomic.Uint64 outgoingCount *atomic.Uint64 @@ -140,7 +137,6 @@ func NewPool(ctx context.Context, id string, config *PoolConfig, handler slog.Ha peers: make(map[string]*Peer), inbox: make(chan types.InboxMessage, config.InboxBufferSize), events: make(chan PoolEvent, config.EventsBufferSize), - errors: make(chan error, config.ErrorsBufferSize), inboxCounter: &atomic.Uint64{}, outgoingCount: &atomic.Uint64{}, config: config, @@ -169,10 +165,6 @@ func (p *Pool) Events() <-chan PoolEvent { return p.events } -func (p *Pool) Errors() <-chan error { - return p.errors -} - func (p *Pool) Stats() PoolStats { p.mu.RLock() defer p.mu.RUnlock() @@ -190,7 +182,6 @@ func (p *Pool) Stats() PoolStats { return PoolStats{ ChanInbox: len(p.inbox), ChanEvents: len(p.events), - ChanErrors: len(p.errors), TotalReceived: p.inboxCounter.Load(), TotalSent: p.outgoingCount.Load(), @@ -245,7 +236,6 @@ func (p *Pool) Close() { p.wg.Wait() close(p.inbox) close(p.events) - close(p.errors) if p.logger != nil { p.logger.Info("closed") @@ -386,7 +376,6 @@ func (p *Pool) addLocked(id string, socket types.Socket) error { pool := PoolPlugin{ Inbox: p.inbox, Events: p.events, - Errors: p.errors, InboxCounter: p.inboxCounter, OnExit: onExit, Handler: p.handler, diff --git a/outbound/config.go b/outbound/config.go index 7e4b2f3..dae398e 100644 --- a/outbound/config.go +++ b/outbound/config.go @@ -20,7 +20,6 @@ type WorkerFactory func( type PoolConfig struct { InboxBufferSize int EventsBufferSize int - ErrorsBufferSize int LoggingEnabled bool LogLevel *slog.Level ConnectionConfig *transport.ConnectionConfig @@ -45,7 +44,6 @@ func GetDefaultPoolConfig() *PoolConfig { return &PoolConfig{ InboxBufferSize: 256, EventsBufferSize: 10, - ErrorsBufferSize: 10, LoggingEnabled: true, LogLevel: nil, ConnectionConfig: nil, @@ -110,16 +108,6 @@ func WithEventsBufferSize(value int) PoolOption { } } -func WithErrorsBufferSize(value int) PoolOption { - return func(c *PoolConfig) error { - if err := validateBufferSize(value); err != nil { - return err - } - c.ErrorsBufferSize = value - return nil - } -} - func WithPoolLoggingEnabled(value bool) PoolOption { return func(c *PoolConfig) error { c.LoggingEnabled = value diff --git a/outbound/config_pool_test.go b/outbound/config_pool_test.go index b4ba604..df503fb 100644 --- a/outbound/config_pool_test.go +++ b/outbound/config_pool_test.go @@ -14,7 +14,6 @@ func TestNewPoolConfig(t *testing.T) { assert.Equal(t, conf, &PoolConfig{ InboxBufferSize: 256, EventsBufferSize: 10, - ErrorsBufferSize: 10, LoggingEnabled: true, LogLevel: nil, ConnectionConfig: nil, @@ -29,7 +28,6 @@ func TestDefaultPoolConfig(t *testing.T) { assert.Equal(t, conf, &PoolConfig{ InboxBufferSize: 256, EventsBufferSize: 10, - ErrorsBufferSize: 10, LoggingEnabled: true, LogLevel: nil, ConnectionConfig: nil, @@ -55,7 +53,6 @@ func TestWithBufferSizes(t *testing.T) { err := applyPoolOptions(conf, WithInboxBufferSize(100), WithEventsBufferSize(20), - WithErrorsBufferSize(20), ) assert.NoError(t, err) assert.Equal(t, 100, conf.InboxBufferSize) diff --git a/outbound/pool.go b/outbound/pool.go index 27e1db0..270653f 100644 --- a/outbound/pool.go +++ b/outbound/pool.go @@ -29,7 +29,6 @@ type PoolEvent struct { type PoolStats struct { ChanInbox int ChanEvents int - ChanErrors int TotalReceived uint64 TotalSent uint64 @@ -47,7 +46,6 @@ type PoolPlugin struct { ID string Inbox chan<- types.InboxMessage Events chan<- PoolEvent - Errors chan<- error InboxCounter *atomic.Uint64 Dialer types.Dialer ConnectionConfig *transport.ConnectionConfig @@ -70,7 +68,6 @@ type Pool struct { peers map[string]*Peer inbox chan types.InboxMessage events chan PoolEvent - errors chan error inboxCounter *atomic.Uint64 outgoingCount *atomic.Uint64 @@ -124,7 +121,6 @@ func NewPool(ctx context.Context, id string, config *PoolConfig, handler slog.Ha peers: make(map[string]*Peer), inbox: make(chan types.InboxMessage, config.InboxBufferSize), events: make(chan PoolEvent, config.EventsBufferSize), - errors: make(chan error, config.ErrorsBufferSize), inboxCounter: &atomic.Uint64{}, outgoingCount: &atomic.Uint64{}, dialer: transport.NewDialer(), @@ -153,10 +149,6 @@ func (p *Pool) Events() <-chan PoolEvent { return p.events } -func (p *Pool) Errors() <-chan error { - return p.errors -} - func (p *Pool) Stats() PoolStats { p.mu.RLock() defer p.mu.RUnlock() @@ -173,7 +165,6 @@ func (p *Pool) Stats() PoolStats { return PoolStats{ ChanInbox: len(p.inbox), ChanEvents: len(p.events), - ChanErrors: len(p.errors), TotalReceived: p.inboxCounter.Load(), TotalSent: p.outgoingCount.Load(), @@ -228,7 +219,6 @@ func (p *Pool) Close() { p.wg.Wait() close(p.inbox) close(p.events) - close(p.errors) if p.logger != nil { p.logger.Info("closed") @@ -273,7 +263,6 @@ func (p *Pool) Connect(id string) error { ID: p.id, Inbox: p.inbox, Events: p.events, - Errors: p.errors, InboxCounter: p.inboxCounter, Dialer: p.dialer, ConnectionConfig: p.config.ConnectionConfig, diff --git a/outbound/pool_test.go b/outbound/pool_test.go index 0583a4b..75dcd8f 100644 --- a/outbound/pool_test.go +++ b/outbound/pool_test.go @@ -96,8 +96,6 @@ func TestPoolClose(t *testing.T) { assert.False(t, ok) _, ok = <-pool.Events() assert.False(t, ok) - _, ok = <-pool.Errors() - assert.False(t, ok) }) t.Run("connect after close returns error", func(t *testing.T) { diff --git a/outbound/worker.go b/outbound/worker.go index 06fb092..8a01352 100644 --- a/outbound/worker.go +++ b/outbound/worker.go @@ -556,10 +556,6 @@ func RunDialer( if logger != nil { logger.Warn("dialer: dial failed") } - select { - case pool.Errors <- err: - case <-ctx.Done(): - } continue } diff --git a/outbound/worker_dialer_test.go b/outbound/worker_dialer_test.go index ad51bc1..3da774a 100644 --- a/outbound/worker_dialer_test.go +++ b/outbound/worker_dialer_test.go @@ -24,7 +24,6 @@ func TestRunDialer(t *testing.T) { mockSocket := honeybeetest.NewMockSocket() pool := PoolPlugin{ - Errors: make(chan error, 1), Dialer: &honeybeetest.MockDialer{ DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) { return mockSocket, nil, nil @@ -61,7 +60,6 @@ func TestRunDialer(t *testing.T) { started := make(chan struct{}) startOnce := sync.Once{} pool := PoolPlugin{ - Errors: make(chan error, 1), Dialer: &honeybeetest.MockDialer{ DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) { dialCount.Add(1) @@ -114,7 +112,6 @@ func TestRunDialer(t *testing.T) { t.Run("dial failure emits error, succeeds on next signal", func(t *testing.T) { url := "wss://test" - errors := make(chan error, 1) dial := make(chan struct{}, 1) newConn := make(chan *transport.Connection, 1) ctx, cancel := context.WithCancel(context.Background()) @@ -125,7 +122,6 @@ func TestRunDialer(t *testing.T) { mockSocket := honeybeetest.NewMockSocket() connConfig := &transport.ConnectionConfig{Retry: nil} // disable retry pool := PoolPlugin{ - Errors: errors, Dialer: &honeybeetest.MockDialer{ DialContextFunc: func( context.Context, string, http.Header, @@ -143,16 +139,6 @@ func TestRunDialer(t *testing.T) { go RunDialer(url, ctx, pool, dial, newConn, nil) dial <- struct{}{} - - honeybeetest.Eventually(t, func() bool { - select { - case err := <-errors: - return err != nil - default: - return false - } - }, "expected error") - dial <- struct{}{} honeybeetest.Eventually(t, func() bool { @@ -171,7 +157,7 @@ func TestRunDialer(t *testing.T) { newConn := make(chan *transport.Connection, 1) ctx, cancel := context.WithCancel(context.Background()) - pool := PoolPlugin{Errors: make(chan error, 1)} + pool := PoolPlugin{} done := make(chan struct{}) go func() { @@ -198,7 +184,6 @@ func TestRunDialer(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) pool := PoolPlugin{ - Errors: make(chan error, 1), ConnectionConfig: &transport.ConnectionConfig{Retry: nil}, Dialer: &honeybeetest.MockDialer{ DialContextFunc: func(ctx context.Context, _ string, _ http.Header) (types.Socket, *http.Response, error) { diff --git a/outbound/worker_start_test.go b/outbound/worker_start_test.go index cff479a..5f4fff3 100644 --- a/outbound/worker_start_test.go +++ b/outbound/worker_start_test.go @@ -2,9 +2,7 @@ package outbound import ( "context" - "fmt" "git.wisehodl.dev/jay/go-honeybee/honeybeetest" - "git.wisehodl.dev/jay/go-honeybee/transport" "git.wisehodl.dev/jay/go-honeybee/types" "github.com/gorilla/websocket" "github.com/stretchr/testify/assert" @@ -18,17 +16,14 @@ import ( func makeWorkerContext(t *testing.T) ( inbox chan types.InboxMessage, events chan PoolEvent, - errors chan error, pool PoolPlugin, ) { t.Helper() inbox = make(chan types.InboxMessage, 256) events = make(chan PoolEvent, 10) - errors = make(chan error, 10) pool = PoolPlugin{ Inbox: inbox, Events: events, - Errors: errors, InboxCounter: &atomic.Uint64{}, } return @@ -69,7 +64,7 @@ func TestWorkerStart(t *testing.T) { defer cancel() w := makeWorker(t, ctx, cancel) - _, events, _, pool := makeWorkerContext(t) + _, events, pool := makeWorkerContext(t) mockSocket := honeybeetest.NewMockSocket() pool.Dialer = mockDialer(mockSocket) @@ -95,7 +90,7 @@ func TestWorkerStart(t *testing.T) { defer cancel() w := makeWorker(t, ctx, cancel) - _, events, _, pool := makeWorkerContext(t) + _, events, pool := makeWorkerContext(t) _, mockSocket, _, outgoingData := setupTestConnection(t) pool.Dialer = mockDialer(mockSocket) @@ -133,7 +128,7 @@ func TestWorkerStart(t *testing.T) { defer cancel() w := makeWorker(t, ctx, cancel) - inbox, events, _, pool := makeWorkerContext(t) + inbox, events, pool := makeWorkerContext(t) incomingData := make(chan honeybeetest.MockIncomingData, 10) mockSocket := honeybeetest.NewMockSocket() @@ -188,7 +183,7 @@ func TestWorkerStart(t *testing.T) { defer cancel() w := makeWorker(t, ctx, cancel) - _, events, _, pool := makeWorkerContext(t) + _, events, pool := makeWorkerContext(t) _, mockSocket, incomingData, _ := setupTestConnection(t) pool.Dialer = mockDialer(mockSocket) @@ -234,7 +229,7 @@ func TestWorkerStart(t *testing.T) { defer cancel() w := makeWorker(t, ctx, cancel) - _, events, _, pool := makeWorkerContext(t) + _, events, pool := makeWorkerContext(t) mockSocket := honeybeetest.NewMockSocket() pool.Dialer = mockDialer(mockSocket) @@ -282,7 +277,7 @@ func TestWorkerStart(t *testing.T) { workerCtx, workerCancel := context.WithCancel(parentCtx) w := makeWorker(t, workerCtx, workerCancel) - _, events, _, pool := makeWorkerContext(t) + _, events, pool := makeWorkerContext(t) mockSocket := honeybeetest.NewMockSocket() pool.Dialer = mockDialer(mockSocket) @@ -317,34 +312,4 @@ func TestWorkerStart(t *testing.T) { } }, "expected wg to drain after parent cancel") }) - - t.Run("dial failure emits to Errors", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - w := makeWorker(t, ctx, cancel) - _, _, errors, pool := makeWorkerContext(t) - pool.ConnectionConfig = &transport.ConnectionConfig{Retry: nil} - pool.Dialer = &honeybeetest.MockDialer{ - DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) { - return nil, nil, fmt.Errorf("dial failed") - }, - } - - var wg sync.WaitGroup - wg.Add(1) - go func() { - w.Start(pool) - wg.Done() - }() - - honeybeetest.Eventually(t, func() bool { - select { - case err := <-errors: - return err != nil - default: - return false - } - }, "expected error on Errors channel") - }) }