removed pool errors channels
This commit is contained in:
@@ -125,7 +125,6 @@ func NewOutboundWorkerConfig(opts ...OutboundWorkerOption) (*OutboundWorkerConfi
|
||||
var (
|
||||
WithOutboundInboxBufferSize = outbound.WithInboxBufferSize
|
||||
WithOutboundEventsBufferSize = outbound.WithEventsBufferSize
|
||||
WithOutboundErrorsBufferSize = outbound.WithErrorsBufferSize
|
||||
WithOutboundPoolLoggingEnabled = outbound.WithPoolLoggingEnabled
|
||||
WithOutboundPoolLogLevel = outbound.WithPoolLogLevel
|
||||
WithOutboundConnectionConfig = outbound.WithConnectionConfig
|
||||
@@ -161,7 +160,6 @@ func NewInboundWorkerConfig(opts ...InboundWorkerOption) (*InboundWorkerConfig,
|
||||
var (
|
||||
WithInboundInboxBufferSize = inbound.WithInboxBufferSize
|
||||
WithInboundEventsBufferSize = inbound.WithEventsBufferSize
|
||||
WithInboundErrorsBufferSize = inbound.WithErrorsBufferSize
|
||||
WithInboundPoolLoggingEnabled = inbound.WithPoolLoggingEnabled
|
||||
WithInboundPoolLogLevel = inbound.WithPoolLogLevel
|
||||
WithInboundConnectionConfig = inbound.WithConnectionConfig
|
||||
|
||||
@@ -20,7 +20,6 @@ type WorkerFactory func(
|
||||
type PoolConfig struct {
|
||||
InboxBufferSize int
|
||||
EventsBufferSize int
|
||||
ErrorsBufferSize int
|
||||
LoggingEnabled bool
|
||||
LogLevel *slog.Level
|
||||
ConnectionConfig *transport.ConnectionConfig
|
||||
@@ -45,7 +44,6 @@ func GetDefaultPoolConfig() *PoolConfig {
|
||||
return &PoolConfig{
|
||||
InboxBufferSize: 256,
|
||||
EventsBufferSize: 10,
|
||||
ErrorsBufferSize: 10,
|
||||
LoggingEnabled: true,
|
||||
LogLevel: nil,
|
||||
ConnectionConfig: nil,
|
||||
@@ -104,16 +102,6 @@ func WithEventsBufferSize(value int) PoolOption {
|
||||
}
|
||||
}
|
||||
|
||||
func WithErrorsBufferSize(value int) PoolOption {
|
||||
return func(c *PoolConfig) error {
|
||||
if err := validateBufferSize(value); err != nil {
|
||||
return err
|
||||
}
|
||||
c.ErrorsBufferSize = value
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithPoolLoggingEnabled(value bool) PoolOption {
|
||||
return func(c *PoolConfig) error {
|
||||
c.LoggingEnabled = value
|
||||
|
||||
@@ -105,7 +105,6 @@ func TestDefaultPoolConfig(t *testing.T) {
|
||||
assert.Equal(t, &PoolConfig{
|
||||
InboxBufferSize: 256,
|
||||
EventsBufferSize: 10,
|
||||
ErrorsBufferSize: 10,
|
||||
LoggingEnabled: true,
|
||||
LogLevel: nil,
|
||||
ConnectionConfig: nil,
|
||||
@@ -174,7 +173,6 @@ func TestWithBufferSizes(t *testing.T) {
|
||||
err := applyPoolOptions(conf,
|
||||
WithInboxBufferSize(100),
|
||||
WithEventsBufferSize(20),
|
||||
WithErrorsBufferSize(20),
|
||||
)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 100, conf.InboxBufferSize)
|
||||
|
||||
@@ -41,7 +41,6 @@ type PoolEvent struct {
|
||||
type PoolStats struct {
|
||||
ChanInbox int
|
||||
ChanEvents int
|
||||
ChanErrors int
|
||||
|
||||
TotalReceived uint64
|
||||
TotalSent uint64
|
||||
@@ -59,7 +58,6 @@ type PeerStats struct {
|
||||
type PoolPlugin struct {
|
||||
Inbox chan<- types.InboxMessage
|
||||
Events chan<- PoolEvent
|
||||
Errors chan<- error
|
||||
InboxCounter *atomic.Uint64
|
||||
OnExit OnExitFunction
|
||||
Handler slog.Handler
|
||||
@@ -83,7 +81,6 @@ type Pool struct {
|
||||
peers map[string]*Peer
|
||||
inbox chan types.InboxMessage
|
||||
events chan PoolEvent
|
||||
errors chan error
|
||||
|
||||
inboxCounter *atomic.Uint64
|
||||
outgoingCount *atomic.Uint64
|
||||
@@ -140,7 +137,6 @@ func NewPool(ctx context.Context, id string, config *PoolConfig, handler slog.Ha
|
||||
peers: make(map[string]*Peer),
|
||||
inbox: make(chan types.InboxMessage, config.InboxBufferSize),
|
||||
events: make(chan PoolEvent, config.EventsBufferSize),
|
||||
errors: make(chan error, config.ErrorsBufferSize),
|
||||
inboxCounter: &atomic.Uint64{},
|
||||
outgoingCount: &atomic.Uint64{},
|
||||
config: config,
|
||||
@@ -169,10 +165,6 @@ func (p *Pool) Events() <-chan PoolEvent {
|
||||
return p.events
|
||||
}
|
||||
|
||||
func (p *Pool) Errors() <-chan error {
|
||||
return p.errors
|
||||
}
|
||||
|
||||
func (p *Pool) Stats() PoolStats {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
@@ -190,7 +182,6 @@ func (p *Pool) Stats() PoolStats {
|
||||
return PoolStats{
|
||||
ChanInbox: len(p.inbox),
|
||||
ChanEvents: len(p.events),
|
||||
ChanErrors: len(p.errors),
|
||||
|
||||
TotalReceived: p.inboxCounter.Load(),
|
||||
TotalSent: p.outgoingCount.Load(),
|
||||
@@ -245,7 +236,6 @@ func (p *Pool) Close() {
|
||||
p.wg.Wait()
|
||||
close(p.inbox)
|
||||
close(p.events)
|
||||
close(p.errors)
|
||||
|
||||
if p.logger != nil {
|
||||
p.logger.Info("closed")
|
||||
@@ -386,7 +376,6 @@ func (p *Pool) addLocked(id string, socket types.Socket) error {
|
||||
pool := PoolPlugin{
|
||||
Inbox: p.inbox,
|
||||
Events: p.events,
|
||||
Errors: p.errors,
|
||||
InboxCounter: p.inboxCounter,
|
||||
OnExit: onExit,
|
||||
Handler: p.handler,
|
||||
|
||||
@@ -20,7 +20,6 @@ type WorkerFactory func(
|
||||
type PoolConfig struct {
|
||||
InboxBufferSize int
|
||||
EventsBufferSize int
|
||||
ErrorsBufferSize int
|
||||
LoggingEnabled bool
|
||||
LogLevel *slog.Level
|
||||
ConnectionConfig *transport.ConnectionConfig
|
||||
@@ -45,7 +44,6 @@ func GetDefaultPoolConfig() *PoolConfig {
|
||||
return &PoolConfig{
|
||||
InboxBufferSize: 256,
|
||||
EventsBufferSize: 10,
|
||||
ErrorsBufferSize: 10,
|
||||
LoggingEnabled: true,
|
||||
LogLevel: nil,
|
||||
ConnectionConfig: nil,
|
||||
@@ -110,16 +108,6 @@ func WithEventsBufferSize(value int) PoolOption {
|
||||
}
|
||||
}
|
||||
|
||||
func WithErrorsBufferSize(value int) PoolOption {
|
||||
return func(c *PoolConfig) error {
|
||||
if err := validateBufferSize(value); err != nil {
|
||||
return err
|
||||
}
|
||||
c.ErrorsBufferSize = value
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithPoolLoggingEnabled(value bool) PoolOption {
|
||||
return func(c *PoolConfig) error {
|
||||
c.LoggingEnabled = value
|
||||
|
||||
@@ -14,7 +14,6 @@ func TestNewPoolConfig(t *testing.T) {
|
||||
assert.Equal(t, conf, &PoolConfig{
|
||||
InboxBufferSize: 256,
|
||||
EventsBufferSize: 10,
|
||||
ErrorsBufferSize: 10,
|
||||
LoggingEnabled: true,
|
||||
LogLevel: nil,
|
||||
ConnectionConfig: nil,
|
||||
@@ -29,7 +28,6 @@ func TestDefaultPoolConfig(t *testing.T) {
|
||||
assert.Equal(t, conf, &PoolConfig{
|
||||
InboxBufferSize: 256,
|
||||
EventsBufferSize: 10,
|
||||
ErrorsBufferSize: 10,
|
||||
LoggingEnabled: true,
|
||||
LogLevel: nil,
|
||||
ConnectionConfig: nil,
|
||||
@@ -55,7 +53,6 @@ func TestWithBufferSizes(t *testing.T) {
|
||||
err := applyPoolOptions(conf,
|
||||
WithInboxBufferSize(100),
|
||||
WithEventsBufferSize(20),
|
||||
WithErrorsBufferSize(20),
|
||||
)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 100, conf.InboxBufferSize)
|
||||
|
||||
@@ -29,7 +29,6 @@ type PoolEvent struct {
|
||||
type PoolStats struct {
|
||||
ChanInbox int
|
||||
ChanEvents int
|
||||
ChanErrors int
|
||||
|
||||
TotalReceived uint64
|
||||
TotalSent uint64
|
||||
@@ -47,7 +46,6 @@ type PoolPlugin struct {
|
||||
ID string
|
||||
Inbox chan<- types.InboxMessage
|
||||
Events chan<- PoolEvent
|
||||
Errors chan<- error
|
||||
InboxCounter *atomic.Uint64
|
||||
Dialer types.Dialer
|
||||
ConnectionConfig *transport.ConnectionConfig
|
||||
@@ -70,7 +68,6 @@ type Pool struct {
|
||||
peers map[string]*Peer
|
||||
inbox chan types.InboxMessage
|
||||
events chan PoolEvent
|
||||
errors chan error
|
||||
|
||||
inboxCounter *atomic.Uint64
|
||||
outgoingCount *atomic.Uint64
|
||||
@@ -124,7 +121,6 @@ func NewPool(ctx context.Context, id string, config *PoolConfig, handler slog.Ha
|
||||
peers: make(map[string]*Peer),
|
||||
inbox: make(chan types.InboxMessage, config.InboxBufferSize),
|
||||
events: make(chan PoolEvent, config.EventsBufferSize),
|
||||
errors: make(chan error, config.ErrorsBufferSize),
|
||||
inboxCounter: &atomic.Uint64{},
|
||||
outgoingCount: &atomic.Uint64{},
|
||||
dialer: transport.NewDialer(),
|
||||
@@ -153,10 +149,6 @@ func (p *Pool) Events() <-chan PoolEvent {
|
||||
return p.events
|
||||
}
|
||||
|
||||
func (p *Pool) Errors() <-chan error {
|
||||
return p.errors
|
||||
}
|
||||
|
||||
func (p *Pool) Stats() PoolStats {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
@@ -173,7 +165,6 @@ func (p *Pool) Stats() PoolStats {
|
||||
return PoolStats{
|
||||
ChanInbox: len(p.inbox),
|
||||
ChanEvents: len(p.events),
|
||||
ChanErrors: len(p.errors),
|
||||
|
||||
TotalReceived: p.inboxCounter.Load(),
|
||||
TotalSent: p.outgoingCount.Load(),
|
||||
@@ -228,7 +219,6 @@ func (p *Pool) Close() {
|
||||
p.wg.Wait()
|
||||
close(p.inbox)
|
||||
close(p.events)
|
||||
close(p.errors)
|
||||
|
||||
if p.logger != nil {
|
||||
p.logger.Info("closed")
|
||||
@@ -273,7 +263,6 @@ func (p *Pool) Connect(id string) error {
|
||||
ID: p.id,
|
||||
Inbox: p.inbox,
|
||||
Events: p.events,
|
||||
Errors: p.errors,
|
||||
InboxCounter: p.inboxCounter,
|
||||
Dialer: p.dialer,
|
||||
ConnectionConfig: p.config.ConnectionConfig,
|
||||
|
||||
@@ -96,8 +96,6 @@ func TestPoolClose(t *testing.T) {
|
||||
assert.False(t, ok)
|
||||
_, ok = <-pool.Events()
|
||||
assert.False(t, ok)
|
||||
_, ok = <-pool.Errors()
|
||||
assert.False(t, ok)
|
||||
})
|
||||
|
||||
t.Run("connect after close returns error", func(t *testing.T) {
|
||||
|
||||
@@ -556,10 +556,6 @@ func RunDialer(
|
||||
if logger != nil {
|
||||
logger.Warn("dialer: dial failed")
|
||||
}
|
||||
select {
|
||||
case pool.Errors <- err:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
@@ -24,7 +24,6 @@ func TestRunDialer(t *testing.T) {
|
||||
|
||||
mockSocket := honeybeetest.NewMockSocket()
|
||||
pool := PoolPlugin{
|
||||
Errors: make(chan error, 1),
|
||||
Dialer: &honeybeetest.MockDialer{
|
||||
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
|
||||
return mockSocket, nil, nil
|
||||
@@ -61,7 +60,6 @@ func TestRunDialer(t *testing.T) {
|
||||
started := make(chan struct{})
|
||||
startOnce := sync.Once{}
|
||||
pool := PoolPlugin{
|
||||
Errors: make(chan error, 1),
|
||||
Dialer: &honeybeetest.MockDialer{
|
||||
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
|
||||
dialCount.Add(1)
|
||||
@@ -114,7 +112,6 @@ func TestRunDialer(t *testing.T) {
|
||||
|
||||
t.Run("dial failure emits error, succeeds on next signal", func(t *testing.T) {
|
||||
url := "wss://test"
|
||||
errors := make(chan error, 1)
|
||||
dial := make(chan struct{}, 1)
|
||||
newConn := make(chan *transport.Connection, 1)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
@@ -125,7 +122,6 @@ func TestRunDialer(t *testing.T) {
|
||||
mockSocket := honeybeetest.NewMockSocket()
|
||||
connConfig := &transport.ConnectionConfig{Retry: nil} // disable retry
|
||||
pool := PoolPlugin{
|
||||
Errors: errors,
|
||||
Dialer: &honeybeetest.MockDialer{
|
||||
DialContextFunc: func(
|
||||
context.Context, string, http.Header,
|
||||
@@ -143,16 +139,6 @@ func TestRunDialer(t *testing.T) {
|
||||
|
||||
go RunDialer(url, ctx, pool, dial, newConn, nil)
|
||||
dial <- struct{}{}
|
||||
|
||||
honeybeetest.Eventually(t, func() bool {
|
||||
select {
|
||||
case err := <-errors:
|
||||
return err != nil
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}, "expected error")
|
||||
|
||||
dial <- struct{}{}
|
||||
|
||||
honeybeetest.Eventually(t, func() bool {
|
||||
@@ -171,7 +157,7 @@ func TestRunDialer(t *testing.T) {
|
||||
newConn := make(chan *transport.Connection, 1)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
pool := PoolPlugin{Errors: make(chan error, 1)}
|
||||
pool := PoolPlugin{}
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
@@ -198,7 +184,6 @@ func TestRunDialer(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
pool := PoolPlugin{
|
||||
Errors: make(chan error, 1),
|
||||
ConnectionConfig: &transport.ConnectionConfig{Retry: nil},
|
||||
Dialer: &honeybeetest.MockDialer{
|
||||
DialContextFunc: func(ctx context.Context, _ string, _ http.Header) (types.Socket, *http.Response, error) {
|
||||
|
||||
@@ -2,9 +2,7 @@ package outbound
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"git.wisehodl.dev/jay/go-honeybee/honeybeetest"
|
||||
"git.wisehodl.dev/jay/go-honeybee/transport"
|
||||
"git.wisehodl.dev/jay/go-honeybee/types"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -18,17 +16,14 @@ import (
|
||||
func makeWorkerContext(t *testing.T) (
|
||||
inbox chan types.InboxMessage,
|
||||
events chan PoolEvent,
|
||||
errors chan error,
|
||||
pool PoolPlugin,
|
||||
) {
|
||||
t.Helper()
|
||||
inbox = make(chan types.InboxMessage, 256)
|
||||
events = make(chan PoolEvent, 10)
|
||||
errors = make(chan error, 10)
|
||||
pool = PoolPlugin{
|
||||
Inbox: inbox,
|
||||
Events: events,
|
||||
Errors: errors,
|
||||
InboxCounter: &atomic.Uint64{},
|
||||
}
|
||||
return
|
||||
@@ -69,7 +64,7 @@ func TestWorkerStart(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
w := makeWorker(t, ctx, cancel)
|
||||
_, events, _, pool := makeWorkerContext(t)
|
||||
_, events, pool := makeWorkerContext(t)
|
||||
mockSocket := honeybeetest.NewMockSocket()
|
||||
pool.Dialer = mockDialer(mockSocket)
|
||||
|
||||
@@ -95,7 +90,7 @@ func TestWorkerStart(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
w := makeWorker(t, ctx, cancel)
|
||||
_, events, _, pool := makeWorkerContext(t)
|
||||
_, events, pool := makeWorkerContext(t)
|
||||
_, mockSocket, _, outgoingData := setupTestConnection(t)
|
||||
pool.Dialer = mockDialer(mockSocket)
|
||||
|
||||
@@ -133,7 +128,7 @@ func TestWorkerStart(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
w := makeWorker(t, ctx, cancel)
|
||||
inbox, events, _, pool := makeWorkerContext(t)
|
||||
inbox, events, pool := makeWorkerContext(t)
|
||||
|
||||
incomingData := make(chan honeybeetest.MockIncomingData, 10)
|
||||
mockSocket := honeybeetest.NewMockSocket()
|
||||
@@ -188,7 +183,7 @@ func TestWorkerStart(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
w := makeWorker(t, ctx, cancel)
|
||||
_, events, _, pool := makeWorkerContext(t)
|
||||
_, events, pool := makeWorkerContext(t)
|
||||
_, mockSocket, incomingData, _ := setupTestConnection(t)
|
||||
pool.Dialer = mockDialer(mockSocket)
|
||||
|
||||
@@ -234,7 +229,7 @@ func TestWorkerStart(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
w := makeWorker(t, ctx, cancel)
|
||||
_, events, _, pool := makeWorkerContext(t)
|
||||
_, events, pool := makeWorkerContext(t)
|
||||
mockSocket := honeybeetest.NewMockSocket()
|
||||
pool.Dialer = mockDialer(mockSocket)
|
||||
|
||||
@@ -282,7 +277,7 @@ func TestWorkerStart(t *testing.T) {
|
||||
workerCtx, workerCancel := context.WithCancel(parentCtx)
|
||||
|
||||
w := makeWorker(t, workerCtx, workerCancel)
|
||||
_, events, _, pool := makeWorkerContext(t)
|
||||
_, events, pool := makeWorkerContext(t)
|
||||
mockSocket := honeybeetest.NewMockSocket()
|
||||
pool.Dialer = mockDialer(mockSocket)
|
||||
|
||||
@@ -317,34 +312,4 @@ func TestWorkerStart(t *testing.T) {
|
||||
}
|
||||
}, "expected wg to drain after parent cancel")
|
||||
})
|
||||
|
||||
t.Run("dial failure emits to Errors", func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
w := makeWorker(t, ctx, cancel)
|
||||
_, _, errors, pool := makeWorkerContext(t)
|
||||
pool.ConnectionConfig = &transport.ConnectionConfig{Retry: nil}
|
||||
pool.Dialer = &honeybeetest.MockDialer{
|
||||
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
|
||||
return nil, nil, fmt.Errorf("dial failed")
|
||||
},
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
w.Start(pool)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
honeybeetest.Eventually(t, func() bool {
|
||||
select {
|
||||
case err := <-errors:
|
||||
return err != nil
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}, "expected error on Errors channel")
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user