10 Commits

19 changed files with 449 additions and 311 deletions
+16 -6
View File
@@ -17,7 +17,7 @@ Logging can be controlled independently at the pool, worker, and connection leve
| Connection | `ErrorsBufferSize` | 10 | — | Must be positive |
| Connection | `LoggingEnabled` | true | `false` | |
| Connection | `LogLevel` | nil | — | nil defers to handler's own filter |
| Retry | enabled | yes | `WithoutRetry()` | Governs `Connect()` only |
| Retry | enabled | yes | `WithRetryDisabled()` | Governs `Connect()` only |
| Retry | `MaxRetries` | 0 | — | 0 means infinite |
| Retry | `InitialDelay` | 1s | — | Must be positive |
| Retry | `MaxDelay` | 60s | — | Must be ≥ InitialDelay |
@@ -61,11 +61,16 @@ Sets the capacity of the channel that buffers inbound messages between the reade
**`WithErrorsBufferSize(int)`**
Sets the capacity of the channel that carries connection-level errors to the consumer. Must be at least 1.
### Dialer
**`WithConnectionDialer(types.Dialer)`**
Overrides the dialer used to establish the WebSocket connection. When not set, the connection uses the default dialer. Useful in tests or when routing connections through a custom transport.
### Retry
The retry policy governs the `Connect()` call only. It does not affect worker reconnection, which is controlled by `ReconnectDelay` on the worker config.
**`WithoutRetry()`**
**`WithRetryDisabled()`**
Disables retry entirely. `Connect()` returns on the first dial failure.
**`WithRetryMaxRetries(int)`**
@@ -124,13 +129,18 @@ Enables or disables worker-level logging.
**`honeybee.WithWorkerLogLevel(slog.Level)`**
Overrides the minimum log level for worker-scoped records only.
### Per-connection
**`honeybee.WithDialer(types.Dialer)`**
Overrides the dialer for a single `Connect` call. Passed as a variadic option: `pool.Connect(id, honeybee.WithDialer(d))`. When provided, it takes precedence over the dialer resolved from `ConnectionConfig`. Existing callers that pass no options are unaffected.
### Wiring
**`honeybee.WithConnectionConfig(*transport.ConnectionConfig)`**
Supplies a connection config used when dialing each peer.
**`honeybee.WithConnectionConfig(transport.ConnectionConfig)`**
Supplies a connection config used when dialing each peer. Accepted by value; the pool stores its own copy.
**`honeybee.WithWorkerConfig(*honeybee.WorkerConfig)`**
Supplies a worker config applied to every worker the pool creates.
**`honeybee.WithWorkerConfig(honeybee.WorkerConfig)`**
Supplies a worker config applied to every worker the pool creates. Accepted by value; the pool stores its own copy.
**`honeybee.WithWorkerFactory(honeybee.WorkerFactory)`**
Replaces the default worker constructor. See [EXTEND.md](EXTEND.md) for the factory contract.
+1 -2
View File
@@ -35,8 +35,7 @@ type PoolPlugin struct {
Inbox chan<- honeybee.InboxMessage
Events chan<- honeybee.PoolEvent
InboxCounter *atomic.Uint64
Dialer honeybee.Dialer
ConnectionConfig *transport.ConnectionConfig
ConnectionConfig transport.ConnectionConfig
Handler slog.Handler
}
```
+1 -1
View File
@@ -240,7 +240,7 @@ Connections send periodic WebSocket ping frames and listen for the corresponding
Pong-derived heartbeats reset the keepalive timer alongside data messages and sends. A peer that sends no data but responds to pings will not be disconnected and reconnected by the keepalive mechanism.
The ping interval is configured via `transport.WithPingInterval` on the `transport.ConnectionConfig`. Import `git.wisehodl.dev/jay/go-honeybee/transport` to construct a `ConnectionConfig`, then pass it to the pool via `honeybee.WithConnectionConfig`, or supply it directly to `NewConnection` and `NewConnectionFromSocket`. The default is 20 seconds. Set to zero to disable pings entirely, in which case only data messages and outbound sends generate heartbeats.
The ping interval is configured via `transport.WithPingInterval` on the `transport.ConnectionConfig`. Import `git.wisehodl.dev/jay/go-honeybee/transport` to construct a `ConnectionConfig`, then pass it to the pool by value via `honeybee.WithConnectionConfig`, or supply it directly to `NewConnection` and `NewConnectionFromSocket`. The default is 20 seconds. Set to zero to disable pings entirely, in which case only data messages and outbound sends generate heartbeats.
## Statistics
+10 -14
View File
@@ -14,9 +14,9 @@ import (
type PoolConfig struct {
InboxBufferSize int
EventsBufferSize int
ConnectionConfig *transport.ConnectionConfig
ConnectionConfig transport.ConnectionConfig
WorkerFactory WorkerFactory
WorkerConfig *WorkerConfig
WorkerConfig WorkerConfig
}
type PoolOption func(*PoolConfig) error
@@ -38,9 +38,9 @@ func GetDefaultPoolConfig() *PoolConfig {
return &PoolConfig{
InboxBufferSize: 256,
EventsBufferSize: 10,
ConnectionConfig: nil,
ConnectionConfig: *transport.GetDefaultConnectionConfig(),
WorkerFactory: nil,
WorkerConfig: nil,
WorkerConfig: *GetDefaultWorkerConfig(),
}
}
@@ -58,19 +58,15 @@ func applyPoolOptions(config *PoolConfig, options ...PoolOption) error {
func ValidatePoolConfig(config *PoolConfig) error {
var err error
if config.ConnectionConfig != nil {
err = transport.ValidateConnectionConfig(config.ConnectionConfig)
err = transport.ValidateConnectionConfig(&config.ConnectionConfig)
if err != nil {
return err
}
}
if config.WorkerConfig != nil {
err = ValidateWorkerConfig(config.WorkerConfig)
err = ValidateWorkerConfig(&config.WorkerConfig)
if err != nil {
return err
}
}
return nil
}
@@ -104,9 +100,9 @@ func WithEventsBufferSize(value int) PoolOption {
}
}
func WithConnectionConfig(cc *transport.ConnectionConfig) PoolOption {
func WithConnectionConfig(cc transport.ConnectionConfig) PoolOption {
return func(c *PoolConfig) error {
err := transport.ValidateConnectionConfig(cc)
err := transport.ValidateConnectionConfig(&cc)
if err != nil {
return err
}
@@ -115,9 +111,9 @@ func WithConnectionConfig(cc *transport.ConnectionConfig) PoolOption {
}
}
func WithWorkerConfig(wc *WorkerConfig) PoolOption {
func WithWorkerConfig(wc WorkerConfig) PoolOption {
return func(c *PoolConfig) error {
err := ValidateWorkerConfig(wc)
err := ValidateWorkerConfig(&wc)
if err != nil {
return err
}
+27 -13
View File
@@ -14,8 +14,8 @@ func TestNewPoolConfig(t *testing.T) {
assert.Equal(t, conf, &PoolConfig{
InboxBufferSize: 256,
EventsBufferSize: 10,
ConnectionConfig: nil,
WorkerConfig: nil,
ConnectionConfig: *transport.GetDefaultConnectionConfig(),
WorkerConfig: *GetDefaultWorkerConfig(),
WorkerFactory: nil,
})
}
@@ -26,8 +26,8 @@ func TestDefaultPoolConfig(t *testing.T) {
assert.Equal(t, conf, &PoolConfig{
InboxBufferSize: 256,
EventsBufferSize: 10,
ConnectionConfig: nil,
WorkerConfig: nil,
ConnectionConfig: *transport.GetDefaultConnectionConfig(),
WorkerConfig: *GetDefaultWorkerConfig(),
WorkerFactory: nil,
})
}
@@ -36,7 +36,9 @@ func TestApplyPoolOptions(t *testing.T) {
conf := &PoolConfig{}
err := applyPoolOptions(
conf,
WithConnectionConfig(&transport.ConnectionConfig{}),
WithConnectionConfig(transport.ConnectionConfig{
Retry: transport.RetryConfig{Disabled: true},
}),
)
assert.NoError(t, err)
@@ -57,15 +59,21 @@ func TestWithBufferSizes(t *testing.T) {
func TestWithConnectionConfig(t *testing.T) {
conf := &PoolConfig{}
opt := WithConnectionConfig(&transport.ConnectionConfig{WriteTimeout: 1 * time.Second})
opt := WithConnectionConfig(transport.ConnectionConfig{
WriteTimeout: 1 * time.Second,
Retry: transport.RetryConfig{Disabled: true},
})
err := applyPoolOptions(conf, opt)
assert.NoError(t, err)
assert.NotNil(t, conf.ConnectionConfig)
assert.Equal(t, 1*time.Second, conf.ConnectionConfig.WriteTimeout)
// invalid config is rejected
conf = &PoolConfig{}
opt = WithConnectionConfig(&transport.ConnectionConfig{WriteTimeout: -1 * time.Second})
opt = WithConnectionConfig(
transport.ConnectionConfig{
WriteTimeout: -1 * time.Second,
Retry: transport.RetryConfig{Disabled: true},
})
err = applyPoolOptions(conf, opt)
assert.Error(t, err)
}
@@ -78,8 +86,12 @@ func TestValidatePoolConfig(t *testing.T) {
wantErrText string
}{
{
name: "valid empty",
conf: *&PoolConfig{},
name: "valid empty (retry disabled)",
conf: PoolConfig{
ConnectionConfig: transport.ConnectionConfig{
Retry: transport.RetryConfig{Disabled: true},
},
},
},
{
name: "valid defaults",
@@ -88,14 +100,16 @@ func TestValidatePoolConfig(t *testing.T) {
{
name: "valid complete",
conf: PoolConfig{
ConnectionConfig: &transport.ConnectionConfig{},
ConnectionConfig: transport.ConnectionConfig{
Retry: transport.RetryConfig{Disabled: true},
},
},
},
{
name: "invalid connection config",
conf: PoolConfig{
ConnectionConfig: &transport.ConnectionConfig{
Retry: &transport.RetryConfig{
ConnectionConfig: transport.ConnectionConfig{
Retry: transport.RetryConfig{
InitialDelay: 10 * time.Second,
MaxDelay: 1 * time.Second,
},
+45 -19
View File
@@ -56,8 +56,7 @@ type PoolPlugin struct {
Inbox chan<- types.InboxMessage
Events chan<- PoolEvent
InboxCounter *atomic.Uint64
Dialer types.Dialer
ConnectionConfig *transport.ConnectionConfig
ConnectionConfig transport.ConnectionConfig
}
// ----------------------------------------------------------------------------
@@ -101,7 +100,8 @@ func NewPool(ctx context.Context, config *PoolConfig, handler slog.Handler,
if config.WorkerFactory == nil {
config.WorkerFactory = func(
ctx context.Context, id string, handler slog.Handler) (Worker, error) {
return NewWorker(ctx, id, config.WorkerConfig, handler)
wc := config.WorkerConfig
return NewWorker(ctx, id, &wc, handler)
}
}
@@ -117,12 +117,19 @@ func NewPool(ctx context.Context, config *PoolConfig, handler slog.Handler,
logger = slog.New(handler).With(slog.Any("component", c))
}
var dialer types.Dialer
if config.ConnectionConfig.Dialer != nil {
dialer = config.ConnectionConfig.Dialer
} else {
dialer = transport.NewDialer()
}
return &Pool{
peers: make(map[string]*Peer),
inbox: make(chan types.InboxMessage, config.InboxBufferSize),
events: make(chan PoolEvent, config.EventsBufferSize),
dialer: transport.NewDialer(),
dialer: dialer,
config: config,
handler: handler,
logger: logger,
@@ -194,16 +201,9 @@ func (p *Pool) PeerStats(id string) (PeerStats, error) {
}, nil
}
func (p *Pool) SetDialer(d types.Dialer) {
if d == nil {
panic("dialer cannot be nil")
}
p.dialer = d
}
func (p *Pool) Close() {
if p.logger != nil {
p.logger.Debug("closing")
p.logger.Info("closing")
}
p.mu.Lock()
@@ -231,9 +231,24 @@ func (p *Pool) Close() {
}()
}
func (p *Pool) Connect(id string) error {
// ConnectOption configures a single Connect call.
type ConnectOption func(*connectOptions)
type connectOptions struct {
dialer types.Dialer
}
// WithDialer returns a ConnectOption that overrides the pool dialer for this
// connection only.
func WithDialer(d types.Dialer) ConnectOption {
return func(o *connectOptions) {
o.dialer = d
}
}
func (p *Pool) Connect(id string, opts ...ConnectOption) error {
if p.logger != nil {
p.logger.Debug("connecting to peer", "peer", id)
p.logger.Info("connecting", "peer", id)
}
id, err := transport.NormalizeURL(id)
@@ -258,12 +273,23 @@ func (p *Pool) Connect(id string) error {
return err
}
o := &connectOptions{}
for _, opt := range opts {
opt(o)
}
effectiveDialer := p.dialer
if o.dialer != nil {
effectiveDialer = o.dialer
}
cc := p.config.ConnectionConfig.Clone()
cc.Dialer = effectiveDialer
pool := PoolPlugin{
Inbox: p.inbox,
Events: p.events,
InboxCounter: p.inboxCounter,
Dialer: p.dialer,
ConnectionConfig: p.config.ConnectionConfig,
ConnectionConfig: cc,
}
p.wg.Go(func() {
@@ -273,7 +299,7 @@ func (p *Pool) Connect(id string) error {
p.peers[id] = &Peer{id: id, worker: worker}
if p.logger != nil {
p.logger.Info("registered peer", "peer", id)
p.logger.Debug("registered peer", "peer", id)
}
return nil
@@ -281,7 +307,7 @@ func (p *Pool) Connect(id string) error {
func (p *Pool) Remove(id string) error {
if p.logger != nil {
p.logger.Debug("disconnecting from peer", "peer", id)
p.logger.Info("disconnecting", "peer", id)
}
id, err := transport.NormalizeURL(id)
@@ -305,7 +331,7 @@ func (p *Pool) Remove(id string) error {
peer.worker.Stop()
if p.logger != nil {
p.logger.Info("disconnected from peer", "peer", id)
p.logger.Debug("disconnected from peer", "peer", id)
}
return nil
+63 -5
View File
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"git.wisehodl.dev/jay/go-honeybee/honeybeetest"
"git.wisehodl.dev/jay/go-honeybee/transport"
"git.wisehodl.dev/jay/go-honeybee/types"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
@@ -15,14 +16,20 @@ import (
func setupPool(t *testing.T) (*Pool, *honeybeetest.MockDialer) {
t.Helper()
pool, err := NewPool(context.Background(), nil, nil)
assert.NoError(t, err)
dialer := &honeybeetest.MockDialer{
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
return honeybeetest.NewMockSocket(), nil, nil
},
}
pool.dialer = dialer
cc := *transport.GetDefaultConnectionConfig()
cc.Dialer = dialer
pool, err := NewPool(context.Background(), &PoolConfig{
InboxBufferSize: 256,
EventsBufferSize: 10,
ConnectionConfig: cc,
WorkerConfig: *GetDefaultWorkerConfig(),
}, nil)
assert.NoError(t, err)
return pool, dialer
}
@@ -83,6 +90,51 @@ func TestPoolConnect(t *testing.T) {
})
}
func TestPoolConnectWithDialer(t *testing.T) {
t.Run("per-call dialer is used instead of pool dialer", func(t *testing.T) {
perCallUsed := false
perCallDialer := &honeybeetest.MockDialer{
DialContextFunc: func(ctx context.Context, url string, h http.Header) (types.Socket, *http.Response, error) {
perCallUsed = true
return honeybeetest.NewMockSocket(), nil, nil
},
}
// pool dialer should NOT be called
poolDialer := &honeybeetest.MockDialer{
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
t.Error("pool dialer should not be called when per-call dialer is provided")
return nil, nil, fmt.Errorf("unexpected call")
},
}
cc := *transport.GetDefaultConnectionConfig()
cc.Dialer = poolDialer
pool, err := NewPool(context.Background(), &PoolConfig{
InboxBufferSize: 256,
EventsBufferSize: 10,
ConnectionConfig: cc,
WorkerConfig: *GetDefaultWorkerConfig(),
}, nil)
assert.NoError(t, err)
err = pool.Connect("wss://test", WithDialer(perCallDialer))
assert.NoError(t, err)
honeybeetest.Eventually(t, func() bool {
select {
case e := <-pool.events:
return e.ID == "wss://test" && e.Kind == EventConnected
default:
return false
}
}, "expected connected event")
assert.True(t, perCallUsed, "per-call dialer was not used")
pool.Close()
})
}
func TestPoolClose(t *testing.T) {
t.Run("channels close after pool close", func(t *testing.T) {
pool, _ := NewPool(context.Background(), nil, nil)
@@ -152,9 +204,15 @@ func TestPoolSend(t *testing.T) {
},
}
pool, err := NewPool(context.Background(), nil, nil)
cc := *transport.GetDefaultConnectionConfig()
cc.Dialer = mockDialer
pool, err := NewPool(context.Background(), &PoolConfig{
InboxBufferSize: 256,
EventsBufferSize: 10,
ConnectionConfig: cc,
WorkerConfig: *GetDefaultWorkerConfig(),
}, nil)
assert.NoError(t, err)
pool.dialer = mockDialer
err = pool.Connect("wss://test")
assert.NoError(t, err)
+23 -26
View File
@@ -1,6 +1,7 @@
package transport
import (
"git.wisehodl.dev/jay/go-honeybee/types"
"net/http"
"time"
)
@@ -20,10 +21,12 @@ type ConnectionConfig struct {
PingInterval time.Duration
IncomingBufferSize int
ErrorsBufferSize int
Retry *RetryConfig
Retry RetryConfig
Dialer types.Dialer
}
type RetryConfig struct {
Disabled bool
MaxRetries int
InitialDelay time.Duration
MaxDelay time.Duration
@@ -55,19 +58,22 @@ func GetDefaultConnectionConfig() *ConnectionConfig {
PingInterval: 20 * time.Second,
IncomingBufferSize: 100,
ErrorsBufferSize: 10,
Retry: GetDefaultRetryConfig(),
}
}
func GetDefaultRetryConfig() *RetryConfig {
return &RetryConfig{
Retry: RetryConfig{
MaxRetries: 0, // Infinite retries
InitialDelay: 1 * time.Second,
MaxDelay: 60 * time.Second,
JitterFactor: 0.2,
},
}
}
func (c ConnectionConfig) Clone() ConnectionConfig {
if c.RequestHeader != nil {
c.RequestHeader = c.RequestHeader.Clone()
}
return c
}
func applyConnectionOptions(config *ConnectionConfig, options ...ConnectionOption) error {
for _, option := range options {
if err := option(config); err != nil {
@@ -85,7 +91,7 @@ func ValidateConnectionConfig(config *ConnectionConfig) error {
return err
}
if config.Retry != nil {
if !config.Retry.Disabled {
err = validateMaxRetries(config.Retry.MaxRetries)
if err != nil {
return err
@@ -223,19 +229,22 @@ func WithErrorsBufferSize(value int) ConnectionOption {
}
}
func WithoutRetry() ConnectionOption {
func WithConnectionDialer(d types.Dialer) ConnectionOption {
return func(c *ConnectionConfig) error {
c.Retry = nil
c.Dialer = d
return nil
}
}
func WithRetryDisabled() ConnectionOption {
return func(c *ConnectionConfig) error {
c.Retry.Disabled = true
return nil
}
}
func WithRetryMaxRetries(value int) ConnectionOption {
return func(c *ConnectionConfig) error {
if c.Retry == nil {
c.Retry = GetDefaultRetryConfig()
}
err := validateMaxRetries(value)
if err != nil {
return err
@@ -248,10 +257,6 @@ func WithRetryMaxRetries(value int) ConnectionOption {
func WithRetryInitialDelay(value time.Duration) ConnectionOption {
return func(c *ConnectionConfig) error {
if c.Retry == nil {
c.Retry = GetDefaultRetryConfig()
}
err := validateInitialDelay(value)
if err != nil {
return err
@@ -264,10 +269,6 @@ func WithRetryInitialDelay(value time.Duration) ConnectionOption {
func WithRetryMaxDelay(value time.Duration) ConnectionOption {
return func(c *ConnectionConfig) error {
if c.Retry == nil {
c.Retry = GetDefaultRetryConfig()
}
err := validateMaxDelay(value)
if err != nil {
return err
@@ -280,10 +281,6 @@ func WithRetryMaxDelay(value time.Duration) ConnectionOption {
func WithRetryJitterFactor(value float64) ConnectionOption {
return func(c *ConnectionConfig) error {
if c.Retry == nil {
c.Retry = GetDefaultRetryConfig()
}
err := validateJitterFactor(value)
if err != nil {
return err
+35 -13
View File
@@ -1,6 +1,7 @@
package transport
import (
"git.wisehodl.dev/jay/go-honeybee/honeybeetest"
"github.com/stretchr/testify/assert"
"net/http"
"testing"
@@ -35,18 +36,12 @@ func TestDefaultConnectionConfig(t *testing.T) {
PingInterval: 20 * time.Second,
IncomingBufferSize: 100,
ErrorsBufferSize: 10,
Retry: GetDefaultRetryConfig(),
})
}
func TestDefaultRetryConnectionConfig(t *testing.T) {
conf := GetDefaultRetryConfig()
assert.Equal(t, conf, &RetryConfig{
Retry: RetryConfig{
MaxRetries: 0,
InitialDelay: 1 * time.Second,
MaxDelay: 60 * time.Second,
JitterFactor: 0.2,
},
})
}
@@ -114,10 +109,10 @@ func TestWithWriteTimeout(t *testing.T) {
func TestWithRetry(t *testing.T) {
t.Run("without retry", func(t *testing.T) {
conf := GetDefaultConnectionConfig()
opt := WithoutRetry()
opt := WithRetryDisabled()
err := applyConnectionOptions(conf, opt)
assert.NoError(t, err)
assert.Nil(t, conf.Retry)
assert.True(t, conf.Retry.Disabled)
})
t.Run("with attempts", func(t *testing.T) {
@@ -209,7 +204,7 @@ func TestValidateConnectionConfig(t *testing.T) {
}{
{
name: "valid empty",
conf: *&ConnectionConfig{},
conf: ConnectionConfig{Retry: RetryConfig{Disabled: true}},
},
{
name: "valid defaults",
@@ -220,7 +215,7 @@ func TestValidateConnectionConfig(t *testing.T) {
conf: ConnectionConfig{
CloseHandler: (func(code int, text string) error { return nil }),
WriteTimeout: time.Duration(30),
Retry: &RetryConfig{
Retry: RetryConfig{
MaxRetries: 0,
InitialDelay: 2 * time.Second,
MaxDelay: 10 * time.Second,
@@ -231,7 +226,7 @@ func TestValidateConnectionConfig(t *testing.T) {
{
name: "invalid - initial delay > max delay",
conf: ConnectionConfig{
Retry: &RetryConfig{
Retry: RetryConfig{
InitialDelay: 10 * time.Second,
MaxDelay: 1 * time.Second,
},
@@ -259,3 +254,30 @@ func TestValidateConnectionConfig(t *testing.T) {
})
}
}
func TestConnectionConfigClone(t *testing.T) {
header := http.Header{}
header.Set("X-Test", "val")
orig := ConnectionConfig{
RequestHeader: header,
WriteTimeout: 5 * time.Second,
Retry: RetryConfig{Disabled: true},
}
cloned := orig.Clone()
// values match
assert.Equal(t, orig.WriteTimeout, cloned.WriteTimeout)
assert.Equal(t, "val", cloned.RequestHeader.Get("X-Test"))
// header is a distinct copy
cloned.RequestHeader.Set("X-Test", "mutated")
assert.Equal(t, "val", orig.RequestHeader.Get("X-Test"))
}
func TestWithConnectionDialer(t *testing.T) {
mock := &honeybeetest.MockDialer{}
conf, err := NewConnectionConfig(WithConnectionDialer(mock))
assert.NoError(t, err)
assert.Equal(t, mock, conf.Dialer)
}
+23 -18
View File
@@ -65,7 +65,7 @@ type Connection struct {
url *url.URL
dialer types.Dialer
socket types.Socket
config *ConnectionConfig
config ConnectionConfig
logger *slog.Logger
incoming chan []byte
@@ -107,14 +107,20 @@ func NewConnection(ctx context.Context, urlStr string, config *ConnectionConfig,
ctx = component.MustExtend(ctx, "connection")
}
// Clone config to ensure full ownership of all fields.
cc := config.Clone()
if cc.Dialer == nil {
cc.Dialer = NewDialer()
}
conn := &Connection{
url: url,
dialer: NewDialer(),
dialer: cc.Dialer,
socket: nil,
config: config,
incoming: make(chan []byte, config.IncomingBufferSize),
config: cc,
incoming: make(chan []byte, cc.IncomingBufferSize),
heartbeat: make(chan struct{}, 1),
errors: make(chan error, config.ErrorsBufferSize),
errors: make(chan error, cc.ErrorsBufferSize),
incomingCount: &atomic.Uint64{},
outgoingCount: &atomic.Uint64{},
heartbeatCount: &atomic.Uint64{},
@@ -151,14 +157,17 @@ func NewConnectionFromSocket(
ctx = component.MustExtend(ctx, "connection")
}
// Clone config to ensure full ownership of all fields.
cc := config.Clone()
conn := &Connection{
url: nil,
dialer: nil,
socket: socket,
config: config,
incoming: make(chan []byte, config.IncomingBufferSize),
config: cc,
incoming: make(chan []byte, cc.IncomingBufferSize),
heartbeat: make(chan struct{}, 1),
errors: make(chan error, config.ErrorsBufferSize),
errors: make(chan error, cc.ErrorsBufferSize),
incomingCount: &atomic.Uint64{},
outgoingCount: &atomic.Uint64{},
heartbeatCount: &atomic.Uint64{},
@@ -219,7 +228,7 @@ func (c *Connection) Connect(ctx context.Context) error {
// socket acquisition failed
c.state = StateDisconnected
if c.logger != nil {
c.logger.Error("connection failed", "error", err)
c.logger.Warn("connection failed", "error", err)
}
return NewConnectionError(err)
}
@@ -244,7 +253,7 @@ func (c *Connection) Connect(ctx context.Context) error {
c.state = StateConnected
if c.logger != nil {
c.logger.Info("connected")
c.logger.Debug("connected")
}
return nil
@@ -311,10 +320,6 @@ func (c *Connection) Stats() ConnectionStats {
}
}
func (c *Connection) SetDialer(d types.Dialer) {
c.dialer = d
}
// ---------------------------/
// Reader loop
// -------------------------/
@@ -357,7 +362,7 @@ func (c *Connection) classifyCloseError(err error) error {
switch closeErr.Code {
case websocket.CloseNormalClosure, websocket.CloseGoingAway:
if c.logger != nil {
c.logger.Info("connection closed by peer",
c.logger.Debug("connection closed by peer",
"code", closeErr.Code,
"text", closeErr.Text,
)
@@ -366,7 +371,7 @@ func (c *Connection) classifyCloseError(err error) error {
default:
if c.logger != nil {
c.logger.Error("unexpected close",
c.logger.Warn("unexpected close",
"code", closeErr.Code,
"text", closeErr.Text,
)
@@ -492,7 +497,7 @@ func (c *Connection) shutdownInner() {
})
if c.logger != nil {
c.logger.Info("closing")
c.logger.Debug("closing")
}
if c.socket != nil {
@@ -518,7 +523,7 @@ func (c *Connection) shutdownCleanup() {
close(c.errors)
if c.logger != nil {
c.logger.Info("closed")
c.logger.Debug("closed")
}
})
}
+3 -3
View File
@@ -102,7 +102,7 @@ func TestConnectionSend(t *testing.T) {
})
t.Run("write timeout disabled when zero", func(t *testing.T) {
config := &ConnectionConfig{WriteTimeout: 0}
config := &ConnectionConfig{WriteTimeout: 0, Retry: RetryConfig{Disabled: true}}
outgoingData := make(chan honeybeetest.MockOutgoingData, 10)
mockSocket := honeybeetest.NewMockSocket()
@@ -148,7 +148,7 @@ func TestConnectionSend(t *testing.T) {
})
t.Run("write timeout sets deadline when positive", func(t *testing.T) {
config := &ConnectionConfig{WriteTimeout: 30 * time.Millisecond}
config := &ConnectionConfig{WriteTimeout: 30 * time.Millisecond, Retry: RetryConfig{Disabled: true}}
outgoingData := make(chan honeybeetest.MockOutgoingData, 10)
mockSocket := honeybeetest.NewMockSocket()
@@ -194,7 +194,7 @@ func TestConnectionSend(t *testing.T) {
})
t.Run("send fails on deadline error", func(t *testing.T) {
config := &ConnectionConfig{WriteTimeout: 1 * time.Millisecond}
config := &ConnectionConfig{WriteTimeout: 1 * time.Millisecond, Retry: RetryConfig{Disabled: true}}
mockSocket := honeybeetest.NewMockSocket()
+73 -63
View File
@@ -69,7 +69,7 @@ func TestNewConnection(t *testing.T) {
{
name: "valid url, valid config",
url: "wss://relay.example.com:8080/path",
config: &ConnectionConfig{WriteTimeout: 30 * time.Second},
config: &ConnectionConfig{WriteTimeout: 30 * time.Second, Retry: RetryConfig{Disabled: true}},
},
{
name: "invalid url",
@@ -82,7 +82,7 @@ func TestNewConnection(t *testing.T) {
name: "invalid config",
url: "ws://example.com",
config: &ConnectionConfig{
Retry: &RetryConfig{
Retry: RetryConfig{
InitialDelay: 10 * time.Second,
MaxDelay: 1 * time.Second,
},
@@ -121,9 +121,13 @@ func TestNewConnection(t *testing.T) {
// Verify default config is used if nil is passed
if tc.config == nil {
assert.Equal(t, GetDefaultConnectionConfig(), conn.config)
expected := *GetDefaultConnectionConfig()
expected.Dialer = conn.config.Dialer // dialer resolved at construction
assert.Equal(t, expected, conn.config)
} else {
assert.Equal(t, tc.config, conn.config)
expected := *tc.config
expected.Dialer = conn.config.Dialer
assert.Equal(t, expected, conn.config)
}
})
}
@@ -152,13 +156,13 @@ func TestNewConnectionFromSocket(t *testing.T) {
{
name: "valid socket with valid config",
socket: honeybeetest.NewMockSocket(),
config: &ConnectionConfig{WriteTimeout: 30 * time.Second},
config: &ConnectionConfig{WriteTimeout: 30 * time.Second, Retry: RetryConfig{Disabled: true}},
},
{
name: "invalid config",
socket: honeybeetest.NewMockSocket(),
config: &ConnectionConfig{
Retry: &RetryConfig{
Retry: RetryConfig{
InitialDelay: 10 * time.Second,
MaxDelay: 1 * time.Second,
},
@@ -173,6 +177,7 @@ func TestNewConnectionFromSocket(t *testing.T) {
CloseHandler: func(code int, text string) error {
return nil
},
Retry: RetryConfig{Disabled: true},
},
},
}
@@ -219,11 +224,19 @@ func TestNewConnectionFromSocket(t *testing.T) {
assert.Equal(t, StateConnected, conn.state)
assert.False(t, conn.closed)
// Verify default config is used if nil is passed
// Verify default config is used if nil is passed.
// CloseHandler is a func; exclude it from the struct comparison
// (identity is verified separately via closeHandlerSet).
gotCfg := conn.config
gotCfg.CloseHandler = nil
if tc.config == nil {
assert.Equal(t, GetDefaultConnectionConfig(), conn.config)
expected := *GetDefaultConnectionConfig()
expected.CloseHandler = nil
assert.Equal(t, expected, gotCfg)
} else {
assert.Equal(t, tc.config, conn.config)
expected := *tc.config
expected.CloseHandler = nil
assert.Equal(t, expected, gotCfg)
}
// Verify close handler was set if provided
@@ -260,9 +273,6 @@ func TestConnect(t *testing.T) {
})
t.Run("connect succeeds and starts goroutines", func(t *testing.T) {
conn, err := NewConnection(context.Background(), "ws://test", nil, nil)
assert.NoError(t, err)
outgoingData := make(chan honeybeetest.MockOutgoingData, 10)
mockSocket := honeybeetest.NewMockSocket()
@@ -276,7 +286,9 @@ func TestConnect(t *testing.T) {
return mockSocket, nil, nil
},
}
conn.dialer = mockDialer
conn, err := NewConnection(context.Background(), "ws://test",
&ConnectionConfig{Retry: RetryConfig{Disabled: true}, Dialer: mockDialer}, nil)
assert.NoError(t, err)
err = conn.Connect(context.Background())
assert.NoError(t, err)
@@ -298,17 +310,6 @@ func TestConnect(t *testing.T) {
})
t.Run("connect retries on dial failure", func(t *testing.T) {
config := &ConnectionConfig{
Retry: &RetryConfig{
MaxRetries: 2,
InitialDelay: 1 * time.Millisecond,
MaxDelay: 5 * time.Millisecond,
JitterFactor: 0.0,
},
}
conn, err := NewConnection(context.Background(), "ws://test", config, nil)
assert.NoError(t, err)
attemptCount := 0
mockDialer := &honeybeetest.MockDialer{
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
@@ -319,7 +320,17 @@ func TestConnect(t *testing.T) {
return honeybeetest.NewMockSocket(), nil, nil
},
}
conn.dialer = mockDialer
config := &ConnectionConfig{
Retry: RetryConfig{
MaxRetries: 2,
InitialDelay: 1 * time.Millisecond,
MaxDelay: 5 * time.Millisecond,
JitterFactor: 0.0,
},
Dialer: mockDialer,
}
conn, err := NewConnection(context.Background(), "ws://test", config, nil)
assert.NoError(t, err)
err = conn.Connect(context.Background())
assert.NoError(t, err)
@@ -330,23 +341,22 @@ func TestConnect(t *testing.T) {
})
t.Run("connect fails after max retries", func(t *testing.T) {
config := &ConnectionConfig{
Retry: &RetryConfig{
MaxRetries: 2,
InitialDelay: 1 * time.Millisecond,
MaxDelay: 5 * time.Millisecond,
JitterFactor: 0.0,
},
}
conn, err := NewConnection(context.Background(), "ws://test", config, nil)
assert.NoError(t, err)
mockDialer := &honeybeetest.MockDialer{
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
return nil, nil, fmt.Errorf("dial failed")
},
}
conn.dialer = mockDialer
config := &ConnectionConfig{
Retry: RetryConfig{
MaxRetries: 2,
InitialDelay: 1 * time.Millisecond,
MaxDelay: 5 * time.Millisecond,
JitterFactor: 0.0,
},
Dialer: mockDialer,
}
conn, err := NewConnection(context.Background(), "ws://test", config, nil)
assert.NoError(t, err)
err = conn.Connect(context.Background())
assert.Error(t, err)
@@ -355,18 +365,20 @@ func TestConnect(t *testing.T) {
})
t.Run("state transitions during connect", func(t *testing.T) {
conn, err := NewConnection(context.Background(), "ws://test", nil, nil)
assert.NoError(t, err)
assert.Equal(t, StateDisconnected, conn.State())
stateDuringDial := StateDisconnected
// conn captured after construction; closure safe because dialer runs during Connect
var conn *Connection
mockDialer := &honeybeetest.MockDialer{
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
stateDuringDial = conn.state
return honeybeetest.NewMockSocket(), nil, nil
},
}
conn.dialer = mockDialer
var err error
conn, err = NewConnection(context.Background(), "ws://test",
&ConnectionConfig{Retry: RetryConfig{Disabled: true}, Dialer: mockDialer}, nil)
assert.NoError(t, err)
assert.Equal(t, StateDisconnected, conn.State())
conn.Connect(context.Background())
@@ -378,25 +390,24 @@ func TestConnect(t *testing.T) {
t.Run("close handler configured when provided", func(t *testing.T) {
handlerSet := false
config := &ConnectionConfig{
CloseHandler: func(code int, text string) error {
return nil
},
}
conn, err := NewConnection(context.Background(), "ws://test", config, nil)
assert.NoError(t, err)
mockSocket := honeybeetest.NewMockSocket()
mockSocket.SetCloseHandlerFunc = func(h func(int, string) error) {
handlerSet = true
}
mockDialer := &honeybeetest.MockDialer{
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
return mockSocket, nil, nil
},
}
conn.dialer = mockDialer
config := &ConnectionConfig{
CloseHandler: func(code int, text string) error {
return nil
},
Retry: RetryConfig{Disabled: true},
Dialer: mockDialer,
}
conn, err := NewConnection(context.Background(), "ws://test", config, nil)
assert.NoError(t, err)
conn.Connect(context.Background())
@@ -407,17 +418,16 @@ func TestConnect(t *testing.T) {
t.Run("passes headers when configured", func(t *testing.T) {
header := http.Header{"X-Custom": []string{"val"}}
conf, _ := NewConnectionConfig(WithRequestHeader(header))
conn, _ := NewConnection(context.Background(), "ws://test", conf, nil)
dialCalled := false
conn.dialer = &honeybeetest.MockDialer{
mockDialer := &honeybeetest.MockDialer{
DialContextFunc: func(ctx context.Context, url string, h http.Header) (types.Socket, *http.Response, error) {
assert.Equal(t, "val", h.Get("X-Custom"))
dialCalled = true
return honeybeetest.NewMockSocket(), nil, nil
},
}
conf, _ := NewConnectionConfig(WithRequestHeader(header), WithConnectionDialer(mockDialer))
conn, _ := NewConnection(context.Background(), "ws://test", conf, nil)
err := conn.Connect(context.Background())
@@ -429,25 +439,25 @@ func TestConnect(t *testing.T) {
func TestConnectContextCancellation(t *testing.T) {
t.Run("context cancelled during connect returns before retries exhaust", func(t *testing.T) {
config := &ConnectionConfig{
Retry: &RetryConfig{
Retry: RetryConfig{
MaxRetries: 100,
InitialDelay: 500 * time.Millisecond,
MaxDelay: 1 * time.Second,
JitterFactor: 0.0,
},
}
conn, err := NewConnection(context.Background(), "ws://test", config, nil)
assert.NoError(t, err)
dialCount := atomic.Int32{}
ctx, cancel := context.WithCancel(context.Background())
conn.dialer = &honeybeetest.MockDialer{
mockDialer := &honeybeetest.MockDialer{
DialContextFunc: func(ctx context.Context, _ string, _ http.Header) (types.Socket, *http.Response, error) {
dialCount.Add(1)
return nil, nil, fmt.Errorf("dial failed")
},
}
config.Dialer = mockDialer
conn, err := NewConnection(context.Background(), "ws://test", config, nil)
assert.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
done := make(chan error, 1)
go func() {
+43 -47
View File
@@ -28,16 +28,15 @@ func TestConnectLogging(t *testing.T) {
t.Run("success", func(t *testing.T) {
mockHandler := honeybeetest.NewMockSlogHandler()
conn, err := NewConnection(context.Background(), "ws://test", nil, mockHandler)
assert.NoError(t, err)
mockSocket := honeybeetest.NewMockSocket()
mockDialer := &honeybeetest.MockDialer{
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
return mockSocket, nil, nil
},
}
conn.dialer = mockDialer
conn, err := NewConnection(context.Background(), "ws://test",
&ConnectionConfig{Retry: RetryConfig{Disabled: true}, Dialer: mockDialer}, mockHandler)
assert.NoError(t, err)
err = conn.Connect(context.Background())
assert.NoError(t, err)
@@ -49,7 +48,7 @@ func TestConnectLogging(t *testing.T) {
log(slog.LevelDebug, "connecting", map[string]any{}),
log(slog.LevelDebug, "dialing", map[string]any{"attempt": 1}),
log(slog.LevelDebug, "dial successful", map[string]any{"attempt": 1}),
log(slog.LevelInfo, "connected", map[string]any{}),
log(slog.LevelDebug, "connected", map[string]any{}),
}
honeybeetest.AssertLogSequence(t, records, expected)
@@ -58,25 +57,24 @@ func TestConnectLogging(t *testing.T) {
t.Run("max retries failure", func(t *testing.T) {
mockHandler := honeybeetest.NewMockSlogHandler()
config := &ConnectionConfig{
Retry: &RetryConfig{
MaxRetries: 2,
InitialDelay: 1 * time.Millisecond,
MaxDelay: 5 * time.Millisecond,
JitterFactor: 0.0,
},
}
conn, err := NewConnection(context.Background(), "ws://test", config, mockHandler)
assert.NoError(t, err)
dialErr := fmt.Errorf("dial error")
mockDialer := &honeybeetest.MockDialer{
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
return nil, nil, dialErr
},
}
conn.dialer = mockDialer
config := &ConnectionConfig{
Retry: RetryConfig{
MaxRetries: 2,
InitialDelay: 1 * time.Millisecond,
MaxDelay: 5 * time.Millisecond,
JitterFactor: 0.0,
},
Dialer: mockDialer,
}
conn, err := NewConnection(context.Background(), "ws://test", config, mockHandler)
assert.NoError(t, err)
err = conn.Connect(context.Background())
assert.Error(t, err)
@@ -90,8 +88,8 @@ func TestConnectLogging(t *testing.T) {
log(slog.LevelDebug, "dialing", map[string]any{"attempt": 2}),
log(slog.LevelWarn, "dial failed, retrying", map[string]any{"attempt": 2, "error": dialErr}),
log(slog.LevelDebug, "dialing", map[string]any{"attempt": 3}),
log(slog.LevelError, "dial failed, max retries reached", map[string]any{"attempt": 3, "error": dialErr}),
log(slog.LevelError, "connection failed", map[string]any{"error": dialErr}),
log(slog.LevelDebug, "dial failed, max retries reached", map[string]any{"attempt": 3, "error": dialErr}),
log(slog.LevelWarn, "connection failed", map[string]any{"error": dialErr}),
}
honeybeetest.AssertLogSequence(t, records, expected)
@@ -100,18 +98,6 @@ func TestConnectLogging(t *testing.T) {
t.Run("success after retry", func(t *testing.T) {
mockHandler := honeybeetest.NewMockSlogHandler()
config := &ConnectionConfig{
Retry: &RetryConfig{
MaxRetries: 3,
InitialDelay: 1 * time.Millisecond,
MaxDelay: 5 * time.Millisecond,
JitterFactor: 0.0,
},
}
conn, err := NewConnection(context.Background(), "ws://test", config, mockHandler)
assert.NoError(t, err)
attemptCount := 0
dialErr := fmt.Errorf("dial error")
mockDialer := &honeybeetest.MockDialer{
@@ -123,7 +109,18 @@ func TestConnectLogging(t *testing.T) {
return honeybeetest.NewMockSocket(), nil, nil
},
}
conn.dialer = mockDialer
config := &ConnectionConfig{
Retry: RetryConfig{
MaxRetries: 3,
InitialDelay: 1 * time.Millisecond,
MaxDelay: 5 * time.Millisecond,
JitterFactor: 0.0,
},
Dialer: mockDialer,
}
conn, err := NewConnection(context.Background(), "ws://test", config, mockHandler)
assert.NoError(t, err)
err = conn.Connect(context.Background())
assert.NoError(t, err)
@@ -139,7 +136,7 @@ func TestConnectLogging(t *testing.T) {
log(slog.LevelWarn, "dial failed, retrying", map[string]any{"attempt": 2, "error": dialErr}),
log(slog.LevelDebug, "dialing", map[string]any{"attempt": 3}),
log(slog.LevelDebug, "dial successful", map[string]any{"attempt": 3}),
log(slog.LevelInfo, "connected", map[string]any{}),
log(slog.LevelDebug, "connected", map[string]any{}),
}
honeybeetest.AssertLogSequence(t, records, expected)
@@ -158,14 +155,14 @@ func TestCloseLogging(t *testing.T) {
honeybeetest.Eventually(t, func() bool {
return honeybeetest.FindLogRecord(
mockHandler.GetRecords(), slog.LevelInfo, "closed") != nil
mockHandler.GetRecords(), slog.LevelDebug, "closed") != nil
}, "expected log")
records := mockHandler.GetRecords()
expected := []honeybeetest.ExpectedLog{
log(slog.LevelInfo, "closing", map[string]any{}),
log(slog.LevelInfo, "closed", map[string]any{}),
log(slog.LevelDebug, "closing", map[string]any{}),
log(slog.LevelDebug, "closed", map[string]any{}),
}
honeybeetest.AssertLogSequence(t, records, expected)
@@ -193,7 +190,7 @@ func TestCloseLogging(t *testing.T) {
records := mockHandler.GetRecords()
expected := []honeybeetest.ExpectedLog{
log(slog.LevelInfo, "closing", map[string]any{}),
log(slog.LevelDebug, "closing", map[string]any{}),
log(slog.LevelError, "socket close failed", map[string]any{"error": closeErr}),
}
@@ -219,10 +216,10 @@ func TestReaderLogging(t *testing.T) {
honeybeetest.Eventually(t, func() bool {
return honeybeetest.FindLogRecord(
mockHandler.GetRecords(), slog.LevelInfo, "connection closed by peer") != nil
mockHandler.GetRecords(), slog.LevelDebug, "connection closed by peer") != nil
}, "expected log")
record := honeybeetest.FindLogRecord(mockHandler.GetRecords(), slog.LevelInfo, "connection closed by peer")
record := honeybeetest.FindLogRecord(mockHandler.GetRecords(), slog.LevelDebug, "connection closed by peer")
assert.NotNil(t, record)
honeybeetest.AssertAttributePresent(t, *record, "code", websocket.CloseNormalClosure)
honeybeetest.AssertAttributePresent(t, *record, "text", "goodbye")
@@ -246,10 +243,10 @@ func TestReaderLogging(t *testing.T) {
honeybeetest.Eventually(t, func() bool {
return honeybeetest.FindLogRecord(
mockHandler.GetRecords(), slog.LevelError, "unexpected close") != nil
mockHandler.GetRecords(), slog.LevelWarn, "unexpected close") != nil
}, "expected log")
record := honeybeetest.FindLogRecord(mockHandler.GetRecords(), slog.LevelError, "unexpected close")
record := honeybeetest.FindLogRecord(mockHandler.GetRecords(), slog.LevelWarn, "unexpected close")
assert.NotNil(t, record)
honeybeetest.AssertAttributePresent(t, *record, "code", websocket.CloseProtocolError)
honeybeetest.AssertAttributePresent(t, *record, "text", "bad protocol")
@@ -279,7 +276,7 @@ func TestWriterLogging(t *testing.T) {
t.Run("write deadline error", func(t *testing.T) {
mockHandler := honeybeetest.NewMockSlogHandler()
config := &ConnectionConfig{WriteTimeout: 1 * time.Millisecond}
config := &ConnectionConfig{WriteTimeout: 1 * time.Millisecond, Retry: RetryConfig{Disabled: true}}
deadlineErr := fmt.Errorf("deadline error")
mockSocket := honeybeetest.NewMockSocket()
@@ -341,16 +338,15 @@ func TestLoggingDisabled(t *testing.T) {
t.Run("nil logger produces no logs", func(t *testing.T) {
mockHandler := honeybeetest.NewMockSlogHandler()
conn, err := NewConnection(context.Background(), "ws://test", nil, nil)
assert.NoError(t, err)
mockSocket := honeybeetest.NewMockSocket()
mockDialer := &honeybeetest.MockDialer{
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
return mockSocket, nil, nil
},
}
conn.dialer = mockDialer
conn, err := NewConnection(context.Background(), "ws://test",
&ConnectionConfig{Retry: RetryConfig{Disabled: true}, Dialer: mockDialer}, nil)
assert.NoError(t, err)
err = conn.Connect(context.Background())
assert.NoError(t, err)
+6 -6
View File
@@ -7,16 +7,16 @@ import (
)
type RetryManager struct {
config *RetryConfig
config RetryConfig
retryCount int
saturation int
}
func NewRetryManager(config *RetryConfig) *RetryManager {
func NewRetryManager(config RetryConfig) *RetryManager {
// saturationCount: retry count at which base delay meets or exceeds MaxDelay.
// Conservative by two to preserve jitter variance near the boundary.
saturation := 0
if config != nil &&
if !config.Disabled &&
config.InitialDelay > 0 &&
config.InitialDelay <= config.MaxDelay {
ratio := float64(config.MaxDelay) / float64(config.InitialDelay)
@@ -31,7 +31,7 @@ func NewRetryManager(config *RetryConfig) *RetryManager {
}
func (r *RetryManager) ShouldRetry() bool {
if r.config == nil {
if r.config.Disabled {
return false
}
@@ -43,7 +43,7 @@ func (r *RetryManager) ShouldRetry() bool {
}
func (r *RetryManager) CalculateDelay() time.Duration {
if r.config == nil {
if r.config.Disabled {
return time.Second
}
@@ -54,7 +54,7 @@ func (r *RetryManager) CalculateDelay() time.Duration {
// if saturation is reached, calculated backoff will always be higher than
// the maximum delay
if r.config != nil && r.retryCount >= r.saturation {
if r.retryCount >= r.saturation {
return r.config.MaxDelay
}
+13 -13
View File
@@ -7,7 +7,7 @@ import (
)
func TestNewRetryManager(t *testing.T) {
config := &RetryConfig{
config := RetryConfig{
MaxRetries: 0,
}
@@ -16,14 +16,14 @@ func TestNewRetryManager(t *testing.T) {
assert.Equal(t, config, mgr.config)
assert.Equal(t, 0, mgr.retryCount)
// Should accept nil config
mgr = NewRetryManager(nil)
assert.Nil(t, mgr.config)
// Should accept a disabled config
mgr = NewRetryManager(RetryConfig{Disabled: true})
assert.True(t, mgr.config.Disabled)
assert.Equal(t, 0, mgr.retryCount)
}
func TestRecordRetry(t *testing.T) {
mgr := NewRetryManager(nil)
mgr := NewRetryManager(RetryConfig{Disabled: true})
assert.Equal(t, mgr.retryCount, 0)
mgr.RecordRetry()
@@ -34,13 +34,13 @@ func TestRecordRetry(t *testing.T) {
}
func TestShouldRetry(t *testing.T) {
// never retry if config is nil
mgr := NewRetryManager(nil)
// never retry if config is disabled
mgr := NewRetryManager(RetryConfig{Disabled: true})
assert.False(t, mgr.ShouldRetry())
// always retry if max attempt count is zero
mgr = &RetryManager{
config: &RetryConfig{
config: RetryConfig{
MaxRetries: 0,
},
retryCount: 1000,
@@ -49,7 +49,7 @@ func TestShouldRetry(t *testing.T) {
// retry if below max attempt count
mgr = &RetryManager{
config: &RetryConfig{
config: RetryConfig{
MaxRetries: 10,
},
retryCount: 5,
@@ -58,7 +58,7 @@ func TestShouldRetry(t *testing.T) {
// do not retry if above max attempt count
mgr = &RetryManager{
config: &RetryConfig{
config: RetryConfig{
MaxRetries: 10,
},
retryCount: 11,
@@ -68,12 +68,12 @@ func TestShouldRetry(t *testing.T) {
func TestCalculateDelayDisabled(t *testing.T) {
// default delay if retry is disabled
mgr := NewRetryManager(nil)
mgr := NewRetryManager(RetryConfig{Disabled: true})
assert.Equal(t, time.Second, mgr.CalculateDelay())
}
func TestCalculateDelayWithoutJitter(t *testing.T) {
mgr := NewRetryManager(&RetryConfig{
mgr := NewRetryManager(RetryConfig{
MaxRetries: 0,
InitialDelay: 1 * time.Second,
MaxDelay: 5 * time.Second,
@@ -105,7 +105,7 @@ func TestCalculateDelayWithoutJitter(t *testing.T) {
}
func TestCalculateDelayWithJitter(t *testing.T) {
mgr := NewRetryManager(&RetryConfig{
mgr := NewRetryManager(RetryConfig{
MaxRetries: 0,
InitialDelay: 1 * time.Second,
MaxDelay: 5 * time.Second,
+1 -1
View File
@@ -82,7 +82,7 @@ func AcquireSocket(
if !retryMgr.ShouldRetry() {
// retry policy expired
if logger != nil {
logger.Error("dial failed, max retries reached",
logger.Debug("dial failed, max retries reached",
"error", err,
"attempt", retryMgr.RetryCount()+1)
}
+7 -6
View File
@@ -77,7 +77,7 @@ func TestAcquireSocket(t *testing.T) {
},
}
retryMgr := NewRetryManager(&RetryConfig{
retryMgr := NewRetryManager(RetryConfig{
MaxRetries: tc.maxRetries,
InitialDelay: 1 * time.Millisecond,
MaxDelay: 5 * time.Millisecond,
@@ -106,7 +106,8 @@ func TestAcquireSocketGuards(t *testing.T) {
return honeybeetest.NewMockSocket(), nil, nil
},
}
validRetryMgr := NewRetryManager(GetDefaultRetryConfig())
validRetryConfig := GetDefaultConnectionConfig().Retry
validRetryMgr := NewRetryManager(validRetryConfig)
cases := []struct {
name string
@@ -167,7 +168,7 @@ func TestAcquireSocketContextCancellation(t *testing.T) {
// cancel before acquiring socket
cancel()
retryMgr := NewRetryManager(GetDefaultRetryConfig())
retryMgr := NewRetryManager(GetDefaultConnectionConfig().Retry)
_, _, err := AcquireSocket(ctx, retryMgr, mockDialer, "ws://test", nil, nil)
assert.ErrorIs(t, err, context.Canceled)
@@ -186,7 +187,7 @@ func TestAcquireSocketContextCancellation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
retryMgr := NewRetryManager(&RetryConfig{
retryMgr := NewRetryManager(RetryConfig{
MaxRetries: 10,
InitialDelay: 1 * time.Second,
MaxDelay: 1 * time.Second,
@@ -230,7 +231,7 @@ func TestAcquireSocketContextCancellation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
retryMgr := NewRetryManager(GetDefaultRetryConfig())
retryMgr := NewRetryManager(GetDefaultConnectionConfig().Retry)
done := make(chan error, 1)
go func() {
_, _, err := AcquireSocket(ctx, retryMgr, mockDialer, "ws://test", nil, nil)
@@ -263,7 +264,7 @@ func TestAcquireSocketPassesHeaders(t *testing.T) {
},
}
retryMgr := NewRetryManager(&RetryConfig{MaxRetries: 0})
retryMgr := NewRetryManager(RetryConfig{MaxRetries: 0, InitialDelay: 1 * time.Millisecond, MaxDelay: 5 * time.Millisecond})
_, _, err := AcquireSocket(context.Background(), retryMgr, mockDialer, "ws://test", header, nil)
assert.NoError(t, err)
+24 -21
View File
@@ -2,6 +2,7 @@ package honeybee
import (
"context"
"fmt"
"log/slog"
"sync"
"sync/atomic"
@@ -94,7 +95,6 @@ func NewWorker(
ctx: ctx,
cancel: cancel,
config: config,
handler: handler,
processedCount: &atomic.Uint64{},
outgoingCount: &atomic.Uint64{},
@@ -103,7 +103,8 @@ func NewWorker(
if handler != nil {
comp := component.FromContext(ctx)
w.logger = slog.New(handler).With(slog.Any("component", comp), slog.String("peer_id", id))
w.handler = handler.WithAttrs([]slog.Attr{slog.String("peer", id)})
w.logger = slog.New(w.handler).With(slog.Any("component", comp))
}
return w, nil
@@ -124,13 +125,13 @@ func (w *DefaultWorker) Start(pool PoolPlugin) {
})
if w.logger != nil {
w.logger.Info("started")
w.logger.Debug("started")
}
wg.Wait()
if w.logger != nil {
w.logger.Info("stopped")
w.logger.Debug("stopped")
}
}
@@ -162,7 +163,7 @@ func (w *DefaultWorker) runSession(ctx context.Context, pool PoolPlugin) {
case conn = <-newConn:
if w.logger != nil {
w.logger.Debug("session: connected")
w.logger.Info("connected")
}
break preConn
@@ -171,7 +172,7 @@ func (w *DefaultWorker) runSession(ctx context.Context, pool PoolPlugin) {
case <-inactive():
if w.logger != nil {
w.logger.Info("keepalive: no activity observed")
w.logger.Warn("keepalive: no activity observed")
}
timer.Reset(w.config.KeepaliveTimeout)
spawnDialer()
@@ -183,7 +184,7 @@ func (w *DefaultWorker) runSession(ctx context.Context, pool PoolPlugin) {
pool.Events <- PoolEvent{ID: w.id, Kind: EventConnected, At: time.Now()}
if w.logger != nil {
w.logger.Info("session: started")
w.logger.Debug("session: started")
}
// run session loop
@@ -195,8 +196,14 @@ func (w *DefaultWorker) runSession(ctx context.Context, pool PoolPlugin) {
case data, ok := <-conn.Incoming():
if !ok {
var reason error
select {
case reason = <-conn.Errors():
default:
reason = fmt.Errorf("unknown")
}
if w.logger != nil {
w.logger.Debug("reader: disconnected")
w.logger.Info("websocket: closed", "reason", reason)
}
break conn_loop
}
@@ -210,9 +217,6 @@ func (w *DefaultWorker) runSession(ctx context.Context, pool PoolPlugin) {
heartbeat()
case <-conn.Heartbeat():
if w.logger != nil {
w.logger.Debug("ping-pong heartbeat")
}
heartbeat()
case <-w.sendHeartbeat:
@@ -220,7 +224,7 @@ func (w *DefaultWorker) runSession(ctx context.Context, pool PoolPlugin) {
case <-inactive():
if w.logger != nil {
w.logger.Info("keepalive: no activity observed")
w.logger.Warn("keepalive: no activity observed")
}
timer.Reset(w.config.KeepaliveTimeout)
break conn_loop
@@ -231,7 +235,10 @@ func (w *DefaultWorker) runSession(ctx context.Context, pool PoolPlugin) {
conn.Close()
if w.logger != nil {
w.logger.Info("session: ended")
w.logger.Info("disconnected")
}
if w.logger != nil {
w.logger.Debug("session: ended")
}
// tear down connection
@@ -301,16 +308,13 @@ func (w *DefaultWorker) spawnDialer(
dialCtx, dialCancel := context.WithCancel(ctx)
if w.logger != nil {
w.logger.Debug("session: requesting connection")
w.logger.Debug("session: dialing")
}
go func() {
conn, err := connect(w.id, dialCtx, pool, w.handler)
if err != nil {
if w.logger != nil {
w.logger.Warn("dialer: dial failed", "error", err)
}
return
}
@@ -330,12 +334,11 @@ func connect(
pool PoolPlugin,
handler slog.Handler,
) (*transport.Connection, error) {
conn, err := transport.NewConnection(ctx, id, pool.ConnectionConfig, handler)
cc := pool.ConnectionConfig
conn, err := transport.NewConnection(ctx, id, &cc, handler)
if err != nil {
return nil, err
}
conn.SetDialer(pool.Dialer)
return conn, conn.Connect(ctx)
}
@@ -345,7 +348,7 @@ func connect(
func (w *DefaultWorker) Stop() {
if w.logger != nil {
w.logger.Debug("shutting down")
w.logger.Info("shutting down")
}
w.cancel()
}
+20 -19
View File
@@ -28,6 +28,7 @@ func makeWorkerContext(t *testing.T) (
Inbox: inbox,
Events: events,
InboxCounter: &atomic.Uint64{},
ConnectionConfig: *transport.GetDefaultConnectionConfig(),
}
return
}
@@ -65,7 +66,7 @@ func TestWorkerSession(t *testing.T) {
w := makeWorker(t, ctx, cancel)
_, events, pool := makeWorkerContext(t)
mockSocket := honeybeetest.NewMockSocket()
pool.Dialer = mockDialer(mockSocket)
pool.ConnectionConfig.Dialer = mockDialer(mockSocket)
var wg sync.WaitGroup
wg.Go(func() {
@@ -89,13 +90,13 @@ func TestWorkerSession(t *testing.T) {
w := makeWorker(t, ctx, cancel)
_, events, pool := makeWorkerContext(t)
pool.Dialer = &honeybeetest.MockDialer{
cc, _ := transport.NewConnectionConfig(transport.WithRetryDisabled())
pool.ConnectionConfig = *cc
pool.ConnectionConfig.Dialer = &honeybeetest.MockDialer{
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
return nil, nil, errors.New("connection refused")
},
}
cc, _ := transport.NewConnectionConfig(transport.WithoutRetry())
pool.ConnectionConfig = cc
var wg sync.WaitGroup
wg.Go(func() { w.Start(pool) })
@@ -143,15 +144,15 @@ func TestWorkerSession(t *testing.T) {
_, _, pool := makeWorkerContext(t)
var dialCount atomic.Uint64
pool.Dialer = &honeybeetest.MockDialer{
cc, _ := transport.NewConnectionConfig(transport.WithRetryDisabled())
pool.ConnectionConfig = *cc
pool.ConnectionConfig.Dialer = &honeybeetest.MockDialer{
DialContextFunc: func(dialCtx context.Context, _ string, _ http.Header) (types.Socket, *http.Response, error) {
dialCount.Add(1)
<-dialCtx.Done()
return nil, nil, dialCtx.Err()
},
}
cc, _ := transport.NewConnectionConfig(transport.WithoutRetry())
pool.ConnectionConfig = cc
var wg sync.WaitGroup
wg.Go(func() { w.Start(pool) })
@@ -169,14 +170,14 @@ func TestWorkerSession(t *testing.T) {
w := makeWorker(t, ctx, cancel)
_, events, pool := makeWorkerContext(t)
pool.Dialer = &honeybeetest.MockDialer{
cc, _ := transport.NewConnectionConfig(transport.WithRetryDisabled())
pool.ConnectionConfig = *cc
pool.ConnectionConfig.Dialer = &honeybeetest.MockDialer{
DialContextFunc: func(dialCtx context.Context, _ string, _ http.Header) (types.Socket, *http.Response, error) {
<-dialCtx.Done()
return nil, nil, dialCtx.Err()
},
}
cc, _ := transport.NewConnectionConfig(transport.WithoutRetry())
pool.ConnectionConfig = cc
var wg sync.WaitGroup
wg.Go(func() { w.Start(pool) })
@@ -204,7 +205,7 @@ func TestWorkerSession(t *testing.T) {
w := makeWorker(t, ctx, cancel)
_, events, pool := makeWorkerContext(t)
_, mockSocket, _, outgoingData := setupTestConnection(t)
pool.Dialer = mockDialer(mockSocket)
pool.ConnectionConfig.Dialer = mockDialer(mockSocket)
var wg sync.WaitGroup
wg.Go(func() {
@@ -255,7 +256,7 @@ func TestWorkerSession(t *testing.T) {
}
}
pool.Dialer = mockDialer(mockSocket)
pool.ConnectionConfig.Dialer = mockDialer(mockSocket)
var wg sync.WaitGroup
wg.Go(func() {
@@ -311,7 +312,7 @@ func TestWorkerSession(t *testing.T) {
}
_, events, pool := makeWorkerContext(t)
_, mockSocket, incomingData, _ := setupTestConnection(t)
pool.Dialer = mockDialer(mockSocket)
pool.ConnectionConfig.Dialer = mockDialer(mockSocket)
var wg sync.WaitGroup
wg.Go(func() { w.Start(pool) })
@@ -377,7 +378,7 @@ func TestWorkerSession(t *testing.T) {
var pongHandler func(string) error
mockSocket, incomingData, _ := honeybeetest.SetupTestSocket(t)
mockSocket.SetPongHandlerFunc = func(h func(string) error) { pongHandler = h }
pool.Dialer = mockDialer(mockSocket)
pool.ConnectionConfig.Dialer = mockDialer(mockSocket)
var wg sync.WaitGroup
wg.Go(func() { w.Start(pool) })
@@ -439,7 +440,7 @@ func TestWorkerSession(t *testing.T) {
}
_, events, pool := makeWorkerContext(t)
_, mockSocket, _, _ := setupTestConnection(t)
pool.Dialer = mockDialer(mockSocket)
pool.ConnectionConfig.Dialer = mockDialer(mockSocket)
var wg sync.WaitGroup
wg.Go(func() { w.Start(pool) })
@@ -481,7 +482,7 @@ func TestWorkerSession(t *testing.T) {
w := makeWorker(t, ctx, cancel)
_, events, pool := makeWorkerContext(t)
_, mockSocket, incomingData, _ := setupTestConnection(t)
pool.Dialer = mockDialer(mockSocket)
pool.ConnectionConfig.Dialer = mockDialer(mockSocket)
var wg sync.WaitGroup
wg.Go(func() {
@@ -525,7 +526,7 @@ func TestWorkerSession(t *testing.T) {
w := makeWorker(t, ctx, cancel)
_, events, pool := makeWorkerContext(t)
_, mockSocket, incomingData, _ := setupTestConnection(t)
pool.Dialer = mockDialer(mockSocket)
pool.ConnectionConfig.Dialer = mockDialer(mockSocket)
var wg sync.WaitGroup
wg.Go(func() { w.Start(pool) })
@@ -561,7 +562,7 @@ func TestWorkerSession(t *testing.T) {
w := makeWorker(t, ctx, cancel)
_, events, pool := makeWorkerContext(t)
mockSocket := honeybeetest.NewMockSocket()
pool.Dialer = mockDialer(mockSocket)
pool.ConnectionConfig.Dialer = mockDialer(mockSocket)
var wg sync.WaitGroup
wg.Go(func() {
@@ -607,7 +608,7 @@ func TestWorkerSession(t *testing.T) {
w := makeWorker(t, workerCtx, workerCancel)
_, events, pool := makeWorkerContext(t)
mockSocket := honeybeetest.NewMockSocket()
pool.Dialer = mockDialer(mockSocket)
pool.ConnectionConfig.Dialer = mockDialer(mockSocket)
var wg sync.WaitGroup
wg.Go(func() {