Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d04341bfa2 | |||
| 8c1371e3a0 | |||
| d4da16f82a | |||
| 695389798e | |||
| fac62c0675 | |||
| c82e0184f5 | |||
| c4d35fe6fa | |||
| bcbdb79b32 | |||
| a721eabd48 | |||
| f144a2a724 |
@@ -17,7 +17,7 @@ Logging can be controlled independently at the pool, worker, and connection leve
|
||||
| Connection | `ErrorsBufferSize` | 10 | — | Must be positive |
|
||||
| Connection | `LoggingEnabled` | true | `false` | |
|
||||
| Connection | `LogLevel` | nil | — | nil defers to handler's own filter |
|
||||
| Retry | enabled | yes | `WithoutRetry()` | Governs `Connect()` only |
|
||||
| Retry | enabled | yes | `WithRetryDisabled()` | Governs `Connect()` only |
|
||||
| Retry | `MaxRetries` | 0 | — | 0 means infinite |
|
||||
| Retry | `InitialDelay` | 1s | — | Must be positive |
|
||||
| Retry | `MaxDelay` | 60s | — | Must be ≥ InitialDelay |
|
||||
@@ -61,11 +61,16 @@ Sets the capacity of the channel that buffers inbound messages between the reade
|
||||
**`WithErrorsBufferSize(int)`**
|
||||
Sets the capacity of the channel that carries connection-level errors to the consumer. Must be at least 1.
|
||||
|
||||
### Dialer
|
||||
|
||||
**`WithConnectionDialer(types.Dialer)`**
|
||||
Overrides the dialer used to establish the WebSocket connection. When not set, the connection uses the default dialer. Useful in tests or when routing connections through a custom transport.
|
||||
|
||||
### Retry
|
||||
|
||||
The retry policy governs the `Connect()` call only. It does not affect worker reconnection, which is controlled by `ReconnectDelay` on the worker config.
|
||||
|
||||
**`WithoutRetry()`**
|
||||
**`WithRetryDisabled()`**
|
||||
Disables retry entirely. `Connect()` returns on the first dial failure.
|
||||
|
||||
**`WithRetryMaxRetries(int)`**
|
||||
@@ -124,13 +129,18 @@ Enables or disables worker-level logging.
|
||||
**`honeybee.WithWorkerLogLevel(slog.Level)`**
|
||||
Overrides the minimum log level for worker-scoped records only.
|
||||
|
||||
### Per-connection
|
||||
|
||||
**`honeybee.WithDialer(types.Dialer)`**
|
||||
Overrides the dialer for a single `Connect` call. Passed as a variadic option: `pool.Connect(id, honeybee.WithDialer(d))`. When provided, it takes precedence over the dialer resolved from `ConnectionConfig`. Existing callers that pass no options are unaffected.
|
||||
|
||||
### Wiring
|
||||
|
||||
**`honeybee.WithConnectionConfig(*transport.ConnectionConfig)`**
|
||||
Supplies a connection config used when dialing each peer.
|
||||
**`honeybee.WithConnectionConfig(transport.ConnectionConfig)`**
|
||||
Supplies a connection config used when dialing each peer. Accepted by value; the pool stores its own copy.
|
||||
|
||||
**`honeybee.WithWorkerConfig(*honeybee.WorkerConfig)`**
|
||||
Supplies a worker config applied to every worker the pool creates.
|
||||
**`honeybee.WithWorkerConfig(honeybee.WorkerConfig)`**
|
||||
Supplies a worker config applied to every worker the pool creates. Accepted by value; the pool stores its own copy.
|
||||
|
||||
**`honeybee.WithWorkerFactory(honeybee.WorkerFactory)`**
|
||||
Replaces the default worker constructor. See [EXTEND.md](EXTEND.md) for the factory contract.
|
||||
|
||||
@@ -35,8 +35,7 @@ type PoolPlugin struct {
|
||||
Inbox chan<- honeybee.InboxMessage
|
||||
Events chan<- honeybee.PoolEvent
|
||||
InboxCounter *atomic.Uint64
|
||||
Dialer honeybee.Dialer
|
||||
ConnectionConfig *transport.ConnectionConfig
|
||||
ConnectionConfig transport.ConnectionConfig
|
||||
Handler slog.Handler
|
||||
}
|
||||
```
|
||||
|
||||
@@ -240,7 +240,7 @@ Connections send periodic WebSocket ping frames and listen for the corresponding
|
||||
|
||||
Pong-derived heartbeats reset the keepalive timer alongside data messages and sends. A peer that sends no data but responds to pings will not be disconnected and reconnected by the keepalive mechanism.
|
||||
|
||||
The ping interval is configured via `transport.WithPingInterval` on the `transport.ConnectionConfig`. Import `git.wisehodl.dev/jay/go-honeybee/transport` to construct a `ConnectionConfig`, then pass it to the pool via `honeybee.WithConnectionConfig`, or supply it directly to `NewConnection` and `NewConnectionFromSocket`. The default is 20 seconds. Set to zero to disable pings entirely, in which case only data messages and outbound sends generate heartbeats.
|
||||
The ping interval is configured via `transport.WithPingInterval` on the `transport.ConnectionConfig`. Import `git.wisehodl.dev/jay/go-honeybee/transport` to construct a `ConnectionConfig`, then pass it to the pool by value via `honeybee.WithConnectionConfig`, or supply it directly to `NewConnection` and `NewConnectionFromSocket`. The default is 20 seconds. Set to zero to disable pings entirely, in which case only data messages and outbound sends generate heartbeats.
|
||||
|
||||
## Statistics
|
||||
|
||||
|
||||
@@ -14,9 +14,9 @@ import (
|
||||
type PoolConfig struct {
|
||||
InboxBufferSize int
|
||||
EventsBufferSize int
|
||||
ConnectionConfig *transport.ConnectionConfig
|
||||
ConnectionConfig transport.ConnectionConfig
|
||||
WorkerFactory WorkerFactory
|
||||
WorkerConfig *WorkerConfig
|
||||
WorkerConfig WorkerConfig
|
||||
}
|
||||
|
||||
type PoolOption func(*PoolConfig) error
|
||||
@@ -38,9 +38,9 @@ func GetDefaultPoolConfig() *PoolConfig {
|
||||
return &PoolConfig{
|
||||
InboxBufferSize: 256,
|
||||
EventsBufferSize: 10,
|
||||
ConnectionConfig: nil,
|
||||
ConnectionConfig: *transport.GetDefaultConnectionConfig(),
|
||||
WorkerFactory: nil,
|
||||
WorkerConfig: nil,
|
||||
WorkerConfig: *GetDefaultWorkerConfig(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,19 +58,15 @@ func applyPoolOptions(config *PoolConfig, options ...PoolOption) error {
|
||||
func ValidatePoolConfig(config *PoolConfig) error {
|
||||
var err error
|
||||
|
||||
if config.ConnectionConfig != nil {
|
||||
err = transport.ValidateConnectionConfig(config.ConnectionConfig)
|
||||
err = transport.ValidateConnectionConfig(&config.ConnectionConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if config.WorkerConfig != nil {
|
||||
err = ValidateWorkerConfig(config.WorkerConfig)
|
||||
err = ValidateWorkerConfig(&config.WorkerConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -104,9 +100,9 @@ func WithEventsBufferSize(value int) PoolOption {
|
||||
}
|
||||
}
|
||||
|
||||
func WithConnectionConfig(cc *transport.ConnectionConfig) PoolOption {
|
||||
func WithConnectionConfig(cc transport.ConnectionConfig) PoolOption {
|
||||
return func(c *PoolConfig) error {
|
||||
err := transport.ValidateConnectionConfig(cc)
|
||||
err := transport.ValidateConnectionConfig(&cc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -115,9 +111,9 @@ func WithConnectionConfig(cc *transport.ConnectionConfig) PoolOption {
|
||||
}
|
||||
}
|
||||
|
||||
func WithWorkerConfig(wc *WorkerConfig) PoolOption {
|
||||
func WithWorkerConfig(wc WorkerConfig) PoolOption {
|
||||
return func(c *PoolConfig) error {
|
||||
err := ValidateWorkerConfig(wc)
|
||||
err := ValidateWorkerConfig(&wc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
+27
-13
@@ -14,8 +14,8 @@ func TestNewPoolConfig(t *testing.T) {
|
||||
assert.Equal(t, conf, &PoolConfig{
|
||||
InboxBufferSize: 256,
|
||||
EventsBufferSize: 10,
|
||||
ConnectionConfig: nil,
|
||||
WorkerConfig: nil,
|
||||
ConnectionConfig: *transport.GetDefaultConnectionConfig(),
|
||||
WorkerConfig: *GetDefaultWorkerConfig(),
|
||||
WorkerFactory: nil,
|
||||
})
|
||||
}
|
||||
@@ -26,8 +26,8 @@ func TestDefaultPoolConfig(t *testing.T) {
|
||||
assert.Equal(t, conf, &PoolConfig{
|
||||
InboxBufferSize: 256,
|
||||
EventsBufferSize: 10,
|
||||
ConnectionConfig: nil,
|
||||
WorkerConfig: nil,
|
||||
ConnectionConfig: *transport.GetDefaultConnectionConfig(),
|
||||
WorkerConfig: *GetDefaultWorkerConfig(),
|
||||
WorkerFactory: nil,
|
||||
})
|
||||
}
|
||||
@@ -36,7 +36,9 @@ func TestApplyPoolOptions(t *testing.T) {
|
||||
conf := &PoolConfig{}
|
||||
err := applyPoolOptions(
|
||||
conf,
|
||||
WithConnectionConfig(&transport.ConnectionConfig{}),
|
||||
WithConnectionConfig(transport.ConnectionConfig{
|
||||
Retry: transport.RetryConfig{Disabled: true},
|
||||
}),
|
||||
)
|
||||
|
||||
assert.NoError(t, err)
|
||||
@@ -57,15 +59,21 @@ func TestWithBufferSizes(t *testing.T) {
|
||||
|
||||
func TestWithConnectionConfig(t *testing.T) {
|
||||
conf := &PoolConfig{}
|
||||
opt := WithConnectionConfig(&transport.ConnectionConfig{WriteTimeout: 1 * time.Second})
|
||||
opt := WithConnectionConfig(transport.ConnectionConfig{
|
||||
WriteTimeout: 1 * time.Second,
|
||||
Retry: transport.RetryConfig{Disabled: true},
|
||||
})
|
||||
err := applyPoolOptions(conf, opt)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, conf.ConnectionConfig)
|
||||
assert.Equal(t, 1*time.Second, conf.ConnectionConfig.WriteTimeout)
|
||||
|
||||
// invalid config is rejected
|
||||
conf = &PoolConfig{}
|
||||
opt = WithConnectionConfig(&transport.ConnectionConfig{WriteTimeout: -1 * time.Second})
|
||||
opt = WithConnectionConfig(
|
||||
transport.ConnectionConfig{
|
||||
WriteTimeout: -1 * time.Second,
|
||||
Retry: transport.RetryConfig{Disabled: true},
|
||||
})
|
||||
err = applyPoolOptions(conf, opt)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
@@ -78,8 +86,12 @@ func TestValidatePoolConfig(t *testing.T) {
|
||||
wantErrText string
|
||||
}{
|
||||
{
|
||||
name: "valid empty",
|
||||
conf: *&PoolConfig{},
|
||||
name: "valid empty (retry disabled)",
|
||||
conf: PoolConfig{
|
||||
ConnectionConfig: transport.ConnectionConfig{
|
||||
Retry: transport.RetryConfig{Disabled: true},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "valid defaults",
|
||||
@@ -88,14 +100,16 @@ func TestValidatePoolConfig(t *testing.T) {
|
||||
{
|
||||
name: "valid complete",
|
||||
conf: PoolConfig{
|
||||
ConnectionConfig: &transport.ConnectionConfig{},
|
||||
ConnectionConfig: transport.ConnectionConfig{
|
||||
Retry: transport.RetryConfig{Disabled: true},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "invalid connection config",
|
||||
conf: PoolConfig{
|
||||
ConnectionConfig: &transport.ConnectionConfig{
|
||||
Retry: &transport.RetryConfig{
|
||||
ConnectionConfig: transport.ConnectionConfig{
|
||||
Retry: transport.RetryConfig{
|
||||
InitialDelay: 10 * time.Second,
|
||||
MaxDelay: 1 * time.Second,
|
||||
},
|
||||
|
||||
@@ -56,8 +56,7 @@ type PoolPlugin struct {
|
||||
Inbox chan<- types.InboxMessage
|
||||
Events chan<- PoolEvent
|
||||
InboxCounter *atomic.Uint64
|
||||
Dialer types.Dialer
|
||||
ConnectionConfig *transport.ConnectionConfig
|
||||
ConnectionConfig transport.ConnectionConfig
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
@@ -101,7 +100,8 @@ func NewPool(ctx context.Context, config *PoolConfig, handler slog.Handler,
|
||||
if config.WorkerFactory == nil {
|
||||
config.WorkerFactory = func(
|
||||
ctx context.Context, id string, handler slog.Handler) (Worker, error) {
|
||||
return NewWorker(ctx, id, config.WorkerConfig, handler)
|
||||
wc := config.WorkerConfig
|
||||
return NewWorker(ctx, id, &wc, handler)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -117,12 +117,19 @@ func NewPool(ctx context.Context, config *PoolConfig, handler slog.Handler,
|
||||
logger = slog.New(handler).With(slog.Any("component", c))
|
||||
}
|
||||
|
||||
var dialer types.Dialer
|
||||
if config.ConnectionConfig.Dialer != nil {
|
||||
dialer = config.ConnectionConfig.Dialer
|
||||
} else {
|
||||
dialer = transport.NewDialer()
|
||||
}
|
||||
|
||||
return &Pool{
|
||||
peers: make(map[string]*Peer),
|
||||
inbox: make(chan types.InboxMessage, config.InboxBufferSize),
|
||||
events: make(chan PoolEvent, config.EventsBufferSize),
|
||||
|
||||
dialer: transport.NewDialer(),
|
||||
dialer: dialer,
|
||||
config: config,
|
||||
handler: handler,
|
||||
logger: logger,
|
||||
@@ -194,16 +201,9 @@ func (p *Pool) PeerStats(id string) (PeerStats, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *Pool) SetDialer(d types.Dialer) {
|
||||
if d == nil {
|
||||
panic("dialer cannot be nil")
|
||||
}
|
||||
p.dialer = d
|
||||
}
|
||||
|
||||
func (p *Pool) Close() {
|
||||
if p.logger != nil {
|
||||
p.logger.Debug("closing")
|
||||
p.logger.Info("closing")
|
||||
}
|
||||
|
||||
p.mu.Lock()
|
||||
@@ -231,9 +231,24 @@ func (p *Pool) Close() {
|
||||
}()
|
||||
}
|
||||
|
||||
func (p *Pool) Connect(id string) error {
|
||||
// ConnectOption configures a single Connect call.
|
||||
type ConnectOption func(*connectOptions)
|
||||
|
||||
type connectOptions struct {
|
||||
dialer types.Dialer
|
||||
}
|
||||
|
||||
// WithDialer returns a ConnectOption that overrides the pool dialer for this
|
||||
// connection only.
|
||||
func WithDialer(d types.Dialer) ConnectOption {
|
||||
return func(o *connectOptions) {
|
||||
o.dialer = d
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Pool) Connect(id string, opts ...ConnectOption) error {
|
||||
if p.logger != nil {
|
||||
p.logger.Debug("connecting to peer", "peer", id)
|
||||
p.logger.Info("connecting", "peer", id)
|
||||
}
|
||||
|
||||
id, err := transport.NormalizeURL(id)
|
||||
@@ -258,12 +273,23 @@ func (p *Pool) Connect(id string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
o := &connectOptions{}
|
||||
for _, opt := range opts {
|
||||
opt(o)
|
||||
}
|
||||
effectiveDialer := p.dialer
|
||||
if o.dialer != nil {
|
||||
effectiveDialer = o.dialer
|
||||
}
|
||||
|
||||
cc := p.config.ConnectionConfig.Clone()
|
||||
cc.Dialer = effectiveDialer
|
||||
|
||||
pool := PoolPlugin{
|
||||
Inbox: p.inbox,
|
||||
Events: p.events,
|
||||
InboxCounter: p.inboxCounter,
|
||||
Dialer: p.dialer,
|
||||
ConnectionConfig: p.config.ConnectionConfig,
|
||||
ConnectionConfig: cc,
|
||||
}
|
||||
|
||||
p.wg.Go(func() {
|
||||
@@ -273,7 +299,7 @@ func (p *Pool) Connect(id string) error {
|
||||
p.peers[id] = &Peer{id: id, worker: worker}
|
||||
|
||||
if p.logger != nil {
|
||||
p.logger.Info("registered peer", "peer", id)
|
||||
p.logger.Debug("registered peer", "peer", id)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -281,7 +307,7 @@ func (p *Pool) Connect(id string) error {
|
||||
|
||||
func (p *Pool) Remove(id string) error {
|
||||
if p.logger != nil {
|
||||
p.logger.Debug("disconnecting from peer", "peer", id)
|
||||
p.logger.Info("disconnecting", "peer", id)
|
||||
}
|
||||
|
||||
id, err := transport.NormalizeURL(id)
|
||||
@@ -305,7 +331,7 @@ func (p *Pool) Remove(id string) error {
|
||||
peer.worker.Stop()
|
||||
|
||||
if p.logger != nil {
|
||||
p.logger.Info("disconnected from peer", "peer", id)
|
||||
p.logger.Debug("disconnected from peer", "peer", id)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
+63
-5
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"git.wisehodl.dev/jay/go-honeybee/honeybeetest"
|
||||
"git.wisehodl.dev/jay/go-honeybee/transport"
|
||||
"git.wisehodl.dev/jay/go-honeybee/types"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -15,14 +16,20 @@ import (
|
||||
|
||||
func setupPool(t *testing.T) (*Pool, *honeybeetest.MockDialer) {
|
||||
t.Helper()
|
||||
pool, err := NewPool(context.Background(), nil, nil)
|
||||
assert.NoError(t, err)
|
||||
dialer := &honeybeetest.MockDialer{
|
||||
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
|
||||
return honeybeetest.NewMockSocket(), nil, nil
|
||||
},
|
||||
}
|
||||
pool.dialer = dialer
|
||||
cc := *transport.GetDefaultConnectionConfig()
|
||||
cc.Dialer = dialer
|
||||
pool, err := NewPool(context.Background(), &PoolConfig{
|
||||
InboxBufferSize: 256,
|
||||
EventsBufferSize: 10,
|
||||
ConnectionConfig: cc,
|
||||
WorkerConfig: *GetDefaultWorkerConfig(),
|
||||
}, nil)
|
||||
assert.NoError(t, err)
|
||||
return pool, dialer
|
||||
}
|
||||
|
||||
@@ -83,6 +90,51 @@ func TestPoolConnect(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestPoolConnectWithDialer(t *testing.T) {
|
||||
t.Run("per-call dialer is used instead of pool dialer", func(t *testing.T) {
|
||||
perCallUsed := false
|
||||
perCallDialer := &honeybeetest.MockDialer{
|
||||
DialContextFunc: func(ctx context.Context, url string, h http.Header) (types.Socket, *http.Response, error) {
|
||||
perCallUsed = true
|
||||
return honeybeetest.NewMockSocket(), nil, nil
|
||||
},
|
||||
}
|
||||
|
||||
// pool dialer should NOT be called
|
||||
poolDialer := &honeybeetest.MockDialer{
|
||||
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
|
||||
t.Error("pool dialer should not be called when per-call dialer is provided")
|
||||
return nil, nil, fmt.Errorf("unexpected call")
|
||||
},
|
||||
}
|
||||
|
||||
cc := *transport.GetDefaultConnectionConfig()
|
||||
cc.Dialer = poolDialer
|
||||
pool, err := NewPool(context.Background(), &PoolConfig{
|
||||
InboxBufferSize: 256,
|
||||
EventsBufferSize: 10,
|
||||
ConnectionConfig: cc,
|
||||
WorkerConfig: *GetDefaultWorkerConfig(),
|
||||
}, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = pool.Connect("wss://test", WithDialer(perCallDialer))
|
||||
assert.NoError(t, err)
|
||||
|
||||
honeybeetest.Eventually(t, func() bool {
|
||||
select {
|
||||
case e := <-pool.events:
|
||||
return e.ID == "wss://test" && e.Kind == EventConnected
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}, "expected connected event")
|
||||
|
||||
assert.True(t, perCallUsed, "per-call dialer was not used")
|
||||
pool.Close()
|
||||
})
|
||||
}
|
||||
|
||||
func TestPoolClose(t *testing.T) {
|
||||
t.Run("channels close after pool close", func(t *testing.T) {
|
||||
pool, _ := NewPool(context.Background(), nil, nil)
|
||||
@@ -152,9 +204,15 @@ func TestPoolSend(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
pool, err := NewPool(context.Background(), nil, nil)
|
||||
cc := *transport.GetDefaultConnectionConfig()
|
||||
cc.Dialer = mockDialer
|
||||
pool, err := NewPool(context.Background(), &PoolConfig{
|
||||
InboxBufferSize: 256,
|
||||
EventsBufferSize: 10,
|
||||
ConnectionConfig: cc,
|
||||
WorkerConfig: *GetDefaultWorkerConfig(),
|
||||
}, nil)
|
||||
assert.NoError(t, err)
|
||||
pool.dialer = mockDialer
|
||||
|
||||
err = pool.Connect("wss://test")
|
||||
assert.NoError(t, err)
|
||||
|
||||
+23
-26
@@ -1,6 +1,7 @@
|
||||
package transport
|
||||
|
||||
import (
|
||||
"git.wisehodl.dev/jay/go-honeybee/types"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
@@ -20,10 +21,12 @@ type ConnectionConfig struct {
|
||||
PingInterval time.Duration
|
||||
IncomingBufferSize int
|
||||
ErrorsBufferSize int
|
||||
Retry *RetryConfig
|
||||
Retry RetryConfig
|
||||
Dialer types.Dialer
|
||||
}
|
||||
|
||||
type RetryConfig struct {
|
||||
Disabled bool
|
||||
MaxRetries int
|
||||
InitialDelay time.Duration
|
||||
MaxDelay time.Duration
|
||||
@@ -55,19 +58,22 @@ func GetDefaultConnectionConfig() *ConnectionConfig {
|
||||
PingInterval: 20 * time.Second,
|
||||
IncomingBufferSize: 100,
|
||||
ErrorsBufferSize: 10,
|
||||
Retry: GetDefaultRetryConfig(),
|
||||
}
|
||||
}
|
||||
|
||||
func GetDefaultRetryConfig() *RetryConfig {
|
||||
return &RetryConfig{
|
||||
Retry: RetryConfig{
|
||||
MaxRetries: 0, // Infinite retries
|
||||
InitialDelay: 1 * time.Second,
|
||||
MaxDelay: 60 * time.Second,
|
||||
JitterFactor: 0.2,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (c ConnectionConfig) Clone() ConnectionConfig {
|
||||
if c.RequestHeader != nil {
|
||||
c.RequestHeader = c.RequestHeader.Clone()
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
func applyConnectionOptions(config *ConnectionConfig, options ...ConnectionOption) error {
|
||||
for _, option := range options {
|
||||
if err := option(config); err != nil {
|
||||
@@ -85,7 +91,7 @@ func ValidateConnectionConfig(config *ConnectionConfig) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if config.Retry != nil {
|
||||
if !config.Retry.Disabled {
|
||||
err = validateMaxRetries(config.Retry.MaxRetries)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -223,19 +229,22 @@ func WithErrorsBufferSize(value int) ConnectionOption {
|
||||
}
|
||||
}
|
||||
|
||||
func WithoutRetry() ConnectionOption {
|
||||
func WithConnectionDialer(d types.Dialer) ConnectionOption {
|
||||
return func(c *ConnectionConfig) error {
|
||||
c.Retry = nil
|
||||
c.Dialer = d
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithRetryDisabled() ConnectionOption {
|
||||
return func(c *ConnectionConfig) error {
|
||||
c.Retry.Disabled = true
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithRetryMaxRetries(value int) ConnectionOption {
|
||||
return func(c *ConnectionConfig) error {
|
||||
if c.Retry == nil {
|
||||
c.Retry = GetDefaultRetryConfig()
|
||||
}
|
||||
|
||||
err := validateMaxRetries(value)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -248,10 +257,6 @@ func WithRetryMaxRetries(value int) ConnectionOption {
|
||||
|
||||
func WithRetryInitialDelay(value time.Duration) ConnectionOption {
|
||||
return func(c *ConnectionConfig) error {
|
||||
if c.Retry == nil {
|
||||
c.Retry = GetDefaultRetryConfig()
|
||||
}
|
||||
|
||||
err := validateInitialDelay(value)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -264,10 +269,6 @@ func WithRetryInitialDelay(value time.Duration) ConnectionOption {
|
||||
|
||||
func WithRetryMaxDelay(value time.Duration) ConnectionOption {
|
||||
return func(c *ConnectionConfig) error {
|
||||
if c.Retry == nil {
|
||||
c.Retry = GetDefaultRetryConfig()
|
||||
}
|
||||
|
||||
err := validateMaxDelay(value)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -280,10 +281,6 @@ func WithRetryMaxDelay(value time.Duration) ConnectionOption {
|
||||
|
||||
func WithRetryJitterFactor(value float64) ConnectionOption {
|
||||
return func(c *ConnectionConfig) error {
|
||||
if c.Retry == nil {
|
||||
c.Retry = GetDefaultRetryConfig()
|
||||
}
|
||||
|
||||
err := validateJitterFactor(value)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
+35
-13
@@ -1,6 +1,7 @@
|
||||
package transport
|
||||
|
||||
import (
|
||||
"git.wisehodl.dev/jay/go-honeybee/honeybeetest"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"net/http"
|
||||
"testing"
|
||||
@@ -35,18 +36,12 @@ func TestDefaultConnectionConfig(t *testing.T) {
|
||||
PingInterval: 20 * time.Second,
|
||||
IncomingBufferSize: 100,
|
||||
ErrorsBufferSize: 10,
|
||||
Retry: GetDefaultRetryConfig(),
|
||||
})
|
||||
}
|
||||
|
||||
func TestDefaultRetryConnectionConfig(t *testing.T) {
|
||||
conf := GetDefaultRetryConfig()
|
||||
|
||||
assert.Equal(t, conf, &RetryConfig{
|
||||
Retry: RetryConfig{
|
||||
MaxRetries: 0,
|
||||
InitialDelay: 1 * time.Second,
|
||||
MaxDelay: 60 * time.Second,
|
||||
JitterFactor: 0.2,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
@@ -114,10 +109,10 @@ func TestWithWriteTimeout(t *testing.T) {
|
||||
func TestWithRetry(t *testing.T) {
|
||||
t.Run("without retry", func(t *testing.T) {
|
||||
conf := GetDefaultConnectionConfig()
|
||||
opt := WithoutRetry()
|
||||
opt := WithRetryDisabled()
|
||||
err := applyConnectionOptions(conf, opt)
|
||||
assert.NoError(t, err)
|
||||
assert.Nil(t, conf.Retry)
|
||||
assert.True(t, conf.Retry.Disabled)
|
||||
})
|
||||
|
||||
t.Run("with attempts", func(t *testing.T) {
|
||||
@@ -209,7 +204,7 @@ func TestValidateConnectionConfig(t *testing.T) {
|
||||
}{
|
||||
{
|
||||
name: "valid empty",
|
||||
conf: *&ConnectionConfig{},
|
||||
conf: ConnectionConfig{Retry: RetryConfig{Disabled: true}},
|
||||
},
|
||||
{
|
||||
name: "valid defaults",
|
||||
@@ -220,7 +215,7 @@ func TestValidateConnectionConfig(t *testing.T) {
|
||||
conf: ConnectionConfig{
|
||||
CloseHandler: (func(code int, text string) error { return nil }),
|
||||
WriteTimeout: time.Duration(30),
|
||||
Retry: &RetryConfig{
|
||||
Retry: RetryConfig{
|
||||
MaxRetries: 0,
|
||||
InitialDelay: 2 * time.Second,
|
||||
MaxDelay: 10 * time.Second,
|
||||
@@ -231,7 +226,7 @@ func TestValidateConnectionConfig(t *testing.T) {
|
||||
{
|
||||
name: "invalid - initial delay > max delay",
|
||||
conf: ConnectionConfig{
|
||||
Retry: &RetryConfig{
|
||||
Retry: RetryConfig{
|
||||
InitialDelay: 10 * time.Second,
|
||||
MaxDelay: 1 * time.Second,
|
||||
},
|
||||
@@ -259,3 +254,30 @@ func TestValidateConnectionConfig(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestConnectionConfigClone(t *testing.T) {
|
||||
header := http.Header{}
|
||||
header.Set("X-Test", "val")
|
||||
orig := ConnectionConfig{
|
||||
RequestHeader: header,
|
||||
WriteTimeout: 5 * time.Second,
|
||||
Retry: RetryConfig{Disabled: true},
|
||||
}
|
||||
|
||||
cloned := orig.Clone()
|
||||
|
||||
// values match
|
||||
assert.Equal(t, orig.WriteTimeout, cloned.WriteTimeout)
|
||||
assert.Equal(t, "val", cloned.RequestHeader.Get("X-Test"))
|
||||
|
||||
// header is a distinct copy
|
||||
cloned.RequestHeader.Set("X-Test", "mutated")
|
||||
assert.Equal(t, "val", orig.RequestHeader.Get("X-Test"))
|
||||
}
|
||||
|
||||
func TestWithConnectionDialer(t *testing.T) {
|
||||
mock := &honeybeetest.MockDialer{}
|
||||
conf, err := NewConnectionConfig(WithConnectionDialer(mock))
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, mock, conf.Dialer)
|
||||
}
|
||||
|
||||
+23
-18
@@ -65,7 +65,7 @@ type Connection struct {
|
||||
url *url.URL
|
||||
dialer types.Dialer
|
||||
socket types.Socket
|
||||
config *ConnectionConfig
|
||||
config ConnectionConfig
|
||||
logger *slog.Logger
|
||||
|
||||
incoming chan []byte
|
||||
@@ -107,14 +107,20 @@ func NewConnection(ctx context.Context, urlStr string, config *ConnectionConfig,
|
||||
ctx = component.MustExtend(ctx, "connection")
|
||||
}
|
||||
|
||||
// Clone config to ensure full ownership of all fields.
|
||||
cc := config.Clone()
|
||||
if cc.Dialer == nil {
|
||||
cc.Dialer = NewDialer()
|
||||
}
|
||||
|
||||
conn := &Connection{
|
||||
url: url,
|
||||
dialer: NewDialer(),
|
||||
dialer: cc.Dialer,
|
||||
socket: nil,
|
||||
config: config,
|
||||
incoming: make(chan []byte, config.IncomingBufferSize),
|
||||
config: cc,
|
||||
incoming: make(chan []byte, cc.IncomingBufferSize),
|
||||
heartbeat: make(chan struct{}, 1),
|
||||
errors: make(chan error, config.ErrorsBufferSize),
|
||||
errors: make(chan error, cc.ErrorsBufferSize),
|
||||
incomingCount: &atomic.Uint64{},
|
||||
outgoingCount: &atomic.Uint64{},
|
||||
heartbeatCount: &atomic.Uint64{},
|
||||
@@ -151,14 +157,17 @@ func NewConnectionFromSocket(
|
||||
ctx = component.MustExtend(ctx, "connection")
|
||||
}
|
||||
|
||||
// Clone config to ensure full ownership of all fields.
|
||||
cc := config.Clone()
|
||||
|
||||
conn := &Connection{
|
||||
url: nil,
|
||||
dialer: nil,
|
||||
socket: socket,
|
||||
config: config,
|
||||
incoming: make(chan []byte, config.IncomingBufferSize),
|
||||
config: cc,
|
||||
incoming: make(chan []byte, cc.IncomingBufferSize),
|
||||
heartbeat: make(chan struct{}, 1),
|
||||
errors: make(chan error, config.ErrorsBufferSize),
|
||||
errors: make(chan error, cc.ErrorsBufferSize),
|
||||
incomingCount: &atomic.Uint64{},
|
||||
outgoingCount: &atomic.Uint64{},
|
||||
heartbeatCount: &atomic.Uint64{},
|
||||
@@ -219,7 +228,7 @@ func (c *Connection) Connect(ctx context.Context) error {
|
||||
// socket acquisition failed
|
||||
c.state = StateDisconnected
|
||||
if c.logger != nil {
|
||||
c.logger.Error("connection failed", "error", err)
|
||||
c.logger.Warn("connection failed", "error", err)
|
||||
}
|
||||
return NewConnectionError(err)
|
||||
}
|
||||
@@ -244,7 +253,7 @@ func (c *Connection) Connect(ctx context.Context) error {
|
||||
c.state = StateConnected
|
||||
|
||||
if c.logger != nil {
|
||||
c.logger.Info("connected")
|
||||
c.logger.Debug("connected")
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -311,10 +320,6 @@ func (c *Connection) Stats() ConnectionStats {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Connection) SetDialer(d types.Dialer) {
|
||||
c.dialer = d
|
||||
}
|
||||
|
||||
// ---------------------------/
|
||||
// Reader loop
|
||||
// -------------------------/
|
||||
@@ -357,7 +362,7 @@ func (c *Connection) classifyCloseError(err error) error {
|
||||
switch closeErr.Code {
|
||||
case websocket.CloseNormalClosure, websocket.CloseGoingAway:
|
||||
if c.logger != nil {
|
||||
c.logger.Info("connection closed by peer",
|
||||
c.logger.Debug("connection closed by peer",
|
||||
"code", closeErr.Code,
|
||||
"text", closeErr.Text,
|
||||
)
|
||||
@@ -366,7 +371,7 @@ func (c *Connection) classifyCloseError(err error) error {
|
||||
|
||||
default:
|
||||
if c.logger != nil {
|
||||
c.logger.Error("unexpected close",
|
||||
c.logger.Warn("unexpected close",
|
||||
"code", closeErr.Code,
|
||||
"text", closeErr.Text,
|
||||
)
|
||||
@@ -492,7 +497,7 @@ func (c *Connection) shutdownInner() {
|
||||
})
|
||||
|
||||
if c.logger != nil {
|
||||
c.logger.Info("closing")
|
||||
c.logger.Debug("closing")
|
||||
}
|
||||
|
||||
if c.socket != nil {
|
||||
@@ -518,7 +523,7 @@ func (c *Connection) shutdownCleanup() {
|
||||
close(c.errors)
|
||||
|
||||
if c.logger != nil {
|
||||
c.logger.Info("closed")
|
||||
c.logger.Debug("closed")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -102,7 +102,7 @@ func TestConnectionSend(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("write timeout disabled when zero", func(t *testing.T) {
|
||||
config := &ConnectionConfig{WriteTimeout: 0}
|
||||
config := &ConnectionConfig{WriteTimeout: 0, Retry: RetryConfig{Disabled: true}}
|
||||
|
||||
outgoingData := make(chan honeybeetest.MockOutgoingData, 10)
|
||||
mockSocket := honeybeetest.NewMockSocket()
|
||||
@@ -148,7 +148,7 @@ func TestConnectionSend(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("write timeout sets deadline when positive", func(t *testing.T) {
|
||||
config := &ConnectionConfig{WriteTimeout: 30 * time.Millisecond}
|
||||
config := &ConnectionConfig{WriteTimeout: 30 * time.Millisecond, Retry: RetryConfig{Disabled: true}}
|
||||
|
||||
outgoingData := make(chan honeybeetest.MockOutgoingData, 10)
|
||||
mockSocket := honeybeetest.NewMockSocket()
|
||||
@@ -194,7 +194,7 @@ func TestConnectionSend(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("send fails on deadline error", func(t *testing.T) {
|
||||
config := &ConnectionConfig{WriteTimeout: 1 * time.Millisecond}
|
||||
config := &ConnectionConfig{WriteTimeout: 1 * time.Millisecond, Retry: RetryConfig{Disabled: true}}
|
||||
|
||||
mockSocket := honeybeetest.NewMockSocket()
|
||||
|
||||
|
||||
@@ -69,7 +69,7 @@ func TestNewConnection(t *testing.T) {
|
||||
{
|
||||
name: "valid url, valid config",
|
||||
url: "wss://relay.example.com:8080/path",
|
||||
config: &ConnectionConfig{WriteTimeout: 30 * time.Second},
|
||||
config: &ConnectionConfig{WriteTimeout: 30 * time.Second, Retry: RetryConfig{Disabled: true}},
|
||||
},
|
||||
{
|
||||
name: "invalid url",
|
||||
@@ -82,7 +82,7 @@ func TestNewConnection(t *testing.T) {
|
||||
name: "invalid config",
|
||||
url: "ws://example.com",
|
||||
config: &ConnectionConfig{
|
||||
Retry: &RetryConfig{
|
||||
Retry: RetryConfig{
|
||||
InitialDelay: 10 * time.Second,
|
||||
MaxDelay: 1 * time.Second,
|
||||
},
|
||||
@@ -121,9 +121,13 @@ func TestNewConnection(t *testing.T) {
|
||||
|
||||
// Verify default config is used if nil is passed
|
||||
if tc.config == nil {
|
||||
assert.Equal(t, GetDefaultConnectionConfig(), conn.config)
|
||||
expected := *GetDefaultConnectionConfig()
|
||||
expected.Dialer = conn.config.Dialer // dialer resolved at construction
|
||||
assert.Equal(t, expected, conn.config)
|
||||
} else {
|
||||
assert.Equal(t, tc.config, conn.config)
|
||||
expected := *tc.config
|
||||
expected.Dialer = conn.config.Dialer
|
||||
assert.Equal(t, expected, conn.config)
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -152,13 +156,13 @@ func TestNewConnectionFromSocket(t *testing.T) {
|
||||
{
|
||||
name: "valid socket with valid config",
|
||||
socket: honeybeetest.NewMockSocket(),
|
||||
config: &ConnectionConfig{WriteTimeout: 30 * time.Second},
|
||||
config: &ConnectionConfig{WriteTimeout: 30 * time.Second, Retry: RetryConfig{Disabled: true}},
|
||||
},
|
||||
{
|
||||
name: "invalid config",
|
||||
socket: honeybeetest.NewMockSocket(),
|
||||
config: &ConnectionConfig{
|
||||
Retry: &RetryConfig{
|
||||
Retry: RetryConfig{
|
||||
InitialDelay: 10 * time.Second,
|
||||
MaxDelay: 1 * time.Second,
|
||||
},
|
||||
@@ -173,6 +177,7 @@ func TestNewConnectionFromSocket(t *testing.T) {
|
||||
CloseHandler: func(code int, text string) error {
|
||||
return nil
|
||||
},
|
||||
Retry: RetryConfig{Disabled: true},
|
||||
},
|
||||
},
|
||||
}
|
||||
@@ -219,11 +224,19 @@ func TestNewConnectionFromSocket(t *testing.T) {
|
||||
assert.Equal(t, StateConnected, conn.state)
|
||||
assert.False(t, conn.closed)
|
||||
|
||||
// Verify default config is used if nil is passed
|
||||
// Verify default config is used if nil is passed.
|
||||
// CloseHandler is a func; exclude it from the struct comparison
|
||||
// (identity is verified separately via closeHandlerSet).
|
||||
gotCfg := conn.config
|
||||
gotCfg.CloseHandler = nil
|
||||
if tc.config == nil {
|
||||
assert.Equal(t, GetDefaultConnectionConfig(), conn.config)
|
||||
expected := *GetDefaultConnectionConfig()
|
||||
expected.CloseHandler = nil
|
||||
assert.Equal(t, expected, gotCfg)
|
||||
} else {
|
||||
assert.Equal(t, tc.config, conn.config)
|
||||
expected := *tc.config
|
||||
expected.CloseHandler = nil
|
||||
assert.Equal(t, expected, gotCfg)
|
||||
}
|
||||
|
||||
// Verify close handler was set if provided
|
||||
@@ -260,9 +273,6 @@ func TestConnect(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("connect succeeds and starts goroutines", func(t *testing.T) {
|
||||
conn, err := NewConnection(context.Background(), "ws://test", nil, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
outgoingData := make(chan honeybeetest.MockOutgoingData, 10)
|
||||
|
||||
mockSocket := honeybeetest.NewMockSocket()
|
||||
@@ -276,7 +286,9 @@ func TestConnect(t *testing.T) {
|
||||
return mockSocket, nil, nil
|
||||
},
|
||||
}
|
||||
conn.dialer = mockDialer
|
||||
conn, err := NewConnection(context.Background(), "ws://test",
|
||||
&ConnectionConfig{Retry: RetryConfig{Disabled: true}, Dialer: mockDialer}, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = conn.Connect(context.Background())
|
||||
assert.NoError(t, err)
|
||||
@@ -298,17 +310,6 @@ func TestConnect(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("connect retries on dial failure", func(t *testing.T) {
|
||||
config := &ConnectionConfig{
|
||||
Retry: &RetryConfig{
|
||||
MaxRetries: 2,
|
||||
InitialDelay: 1 * time.Millisecond,
|
||||
MaxDelay: 5 * time.Millisecond,
|
||||
JitterFactor: 0.0,
|
||||
},
|
||||
}
|
||||
conn, err := NewConnection(context.Background(), "ws://test", config, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
attemptCount := 0
|
||||
mockDialer := &honeybeetest.MockDialer{
|
||||
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
|
||||
@@ -319,7 +320,17 @@ func TestConnect(t *testing.T) {
|
||||
return honeybeetest.NewMockSocket(), nil, nil
|
||||
},
|
||||
}
|
||||
conn.dialer = mockDialer
|
||||
config := &ConnectionConfig{
|
||||
Retry: RetryConfig{
|
||||
MaxRetries: 2,
|
||||
InitialDelay: 1 * time.Millisecond,
|
||||
MaxDelay: 5 * time.Millisecond,
|
||||
JitterFactor: 0.0,
|
||||
},
|
||||
Dialer: mockDialer,
|
||||
}
|
||||
conn, err := NewConnection(context.Background(), "ws://test", config, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = conn.Connect(context.Background())
|
||||
assert.NoError(t, err)
|
||||
@@ -330,23 +341,22 @@ func TestConnect(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("connect fails after max retries", func(t *testing.T) {
|
||||
config := &ConnectionConfig{
|
||||
Retry: &RetryConfig{
|
||||
MaxRetries: 2,
|
||||
InitialDelay: 1 * time.Millisecond,
|
||||
MaxDelay: 5 * time.Millisecond,
|
||||
JitterFactor: 0.0,
|
||||
},
|
||||
}
|
||||
conn, err := NewConnection(context.Background(), "ws://test", config, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
mockDialer := &honeybeetest.MockDialer{
|
||||
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
|
||||
return nil, nil, fmt.Errorf("dial failed")
|
||||
},
|
||||
}
|
||||
conn.dialer = mockDialer
|
||||
config := &ConnectionConfig{
|
||||
Retry: RetryConfig{
|
||||
MaxRetries: 2,
|
||||
InitialDelay: 1 * time.Millisecond,
|
||||
MaxDelay: 5 * time.Millisecond,
|
||||
JitterFactor: 0.0,
|
||||
},
|
||||
Dialer: mockDialer,
|
||||
}
|
||||
conn, err := NewConnection(context.Background(), "ws://test", config, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = conn.Connect(context.Background())
|
||||
assert.Error(t, err)
|
||||
@@ -355,18 +365,20 @@ func TestConnect(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("state transitions during connect", func(t *testing.T) {
|
||||
conn, err := NewConnection(context.Background(), "ws://test", nil, nil)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, StateDisconnected, conn.State())
|
||||
|
||||
stateDuringDial := StateDisconnected
|
||||
// conn captured after construction; closure safe because dialer runs during Connect
|
||||
var conn *Connection
|
||||
mockDialer := &honeybeetest.MockDialer{
|
||||
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
|
||||
stateDuringDial = conn.state
|
||||
return honeybeetest.NewMockSocket(), nil, nil
|
||||
},
|
||||
}
|
||||
conn.dialer = mockDialer
|
||||
var err error
|
||||
conn, err = NewConnection(context.Background(), "ws://test",
|
||||
&ConnectionConfig{Retry: RetryConfig{Disabled: true}, Dialer: mockDialer}, nil)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, StateDisconnected, conn.State())
|
||||
|
||||
conn.Connect(context.Background())
|
||||
|
||||
@@ -378,25 +390,24 @@ func TestConnect(t *testing.T) {
|
||||
|
||||
t.Run("close handler configured when provided", func(t *testing.T) {
|
||||
handlerSet := false
|
||||
config := &ConnectionConfig{
|
||||
CloseHandler: func(code int, text string) error {
|
||||
return nil
|
||||
},
|
||||
}
|
||||
conn, err := NewConnection(context.Background(), "ws://test", config, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
mockSocket := honeybeetest.NewMockSocket()
|
||||
mockSocket.SetCloseHandlerFunc = func(h func(int, string) error) {
|
||||
handlerSet = true
|
||||
}
|
||||
|
||||
mockDialer := &honeybeetest.MockDialer{
|
||||
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
|
||||
return mockSocket, nil, nil
|
||||
},
|
||||
}
|
||||
conn.dialer = mockDialer
|
||||
config := &ConnectionConfig{
|
||||
CloseHandler: func(code int, text string) error {
|
||||
return nil
|
||||
},
|
||||
Retry: RetryConfig{Disabled: true},
|
||||
Dialer: mockDialer,
|
||||
}
|
||||
conn, err := NewConnection(context.Background(), "ws://test", config, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
conn.Connect(context.Background())
|
||||
|
||||
@@ -407,17 +418,16 @@ func TestConnect(t *testing.T) {
|
||||
|
||||
t.Run("passes headers when configured", func(t *testing.T) {
|
||||
header := http.Header{"X-Custom": []string{"val"}}
|
||||
conf, _ := NewConnectionConfig(WithRequestHeader(header))
|
||||
conn, _ := NewConnection(context.Background(), "ws://test", conf, nil)
|
||||
|
||||
dialCalled := false
|
||||
conn.dialer = &honeybeetest.MockDialer{
|
||||
mockDialer := &honeybeetest.MockDialer{
|
||||
DialContextFunc: func(ctx context.Context, url string, h http.Header) (types.Socket, *http.Response, error) {
|
||||
assert.Equal(t, "val", h.Get("X-Custom"))
|
||||
dialCalled = true
|
||||
return honeybeetest.NewMockSocket(), nil, nil
|
||||
},
|
||||
}
|
||||
conf, _ := NewConnectionConfig(WithRequestHeader(header), WithConnectionDialer(mockDialer))
|
||||
conn, _ := NewConnection(context.Background(), "ws://test", conf, nil)
|
||||
|
||||
err := conn.Connect(context.Background())
|
||||
|
||||
@@ -429,25 +439,25 @@ func TestConnect(t *testing.T) {
|
||||
func TestConnectContextCancellation(t *testing.T) {
|
||||
t.Run("context cancelled during connect returns before retries exhaust", func(t *testing.T) {
|
||||
config := &ConnectionConfig{
|
||||
Retry: &RetryConfig{
|
||||
Retry: RetryConfig{
|
||||
MaxRetries: 100,
|
||||
InitialDelay: 500 * time.Millisecond,
|
||||
MaxDelay: 1 * time.Second,
|
||||
JitterFactor: 0.0,
|
||||
},
|
||||
}
|
||||
conn, err := NewConnection(context.Background(), "ws://test", config, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
dialCount := atomic.Int32{}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
conn.dialer = &honeybeetest.MockDialer{
|
||||
mockDialer := &honeybeetest.MockDialer{
|
||||
DialContextFunc: func(ctx context.Context, _ string, _ http.Header) (types.Socket, *http.Response, error) {
|
||||
dialCount.Add(1)
|
||||
return nil, nil, fmt.Errorf("dial failed")
|
||||
},
|
||||
}
|
||||
config.Dialer = mockDialer
|
||||
conn, err := NewConnection(context.Background(), "ws://test", config, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
|
||||
+43
-47
@@ -28,16 +28,15 @@ func TestConnectLogging(t *testing.T) {
|
||||
t.Run("success", func(t *testing.T) {
|
||||
mockHandler := honeybeetest.NewMockSlogHandler()
|
||||
|
||||
conn, err := NewConnection(context.Background(), "ws://test", nil, mockHandler)
|
||||
assert.NoError(t, err)
|
||||
|
||||
mockSocket := honeybeetest.NewMockSocket()
|
||||
mockDialer := &honeybeetest.MockDialer{
|
||||
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
|
||||
return mockSocket, nil, nil
|
||||
},
|
||||
}
|
||||
conn.dialer = mockDialer
|
||||
conn, err := NewConnection(context.Background(), "ws://test",
|
||||
&ConnectionConfig{Retry: RetryConfig{Disabled: true}, Dialer: mockDialer}, mockHandler)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = conn.Connect(context.Background())
|
||||
assert.NoError(t, err)
|
||||
@@ -49,7 +48,7 @@ func TestConnectLogging(t *testing.T) {
|
||||
log(slog.LevelDebug, "connecting", map[string]any{}),
|
||||
log(slog.LevelDebug, "dialing", map[string]any{"attempt": 1}),
|
||||
log(slog.LevelDebug, "dial successful", map[string]any{"attempt": 1}),
|
||||
log(slog.LevelInfo, "connected", map[string]any{}),
|
||||
log(slog.LevelDebug, "connected", map[string]any{}),
|
||||
}
|
||||
|
||||
honeybeetest.AssertLogSequence(t, records, expected)
|
||||
@@ -58,25 +57,24 @@ func TestConnectLogging(t *testing.T) {
|
||||
t.Run("max retries failure", func(t *testing.T) {
|
||||
mockHandler := honeybeetest.NewMockSlogHandler()
|
||||
|
||||
config := &ConnectionConfig{
|
||||
Retry: &RetryConfig{
|
||||
MaxRetries: 2,
|
||||
InitialDelay: 1 * time.Millisecond,
|
||||
MaxDelay: 5 * time.Millisecond,
|
||||
JitterFactor: 0.0,
|
||||
},
|
||||
}
|
||||
|
||||
conn, err := NewConnection(context.Background(), "ws://test", config, mockHandler)
|
||||
assert.NoError(t, err)
|
||||
|
||||
dialErr := fmt.Errorf("dial error")
|
||||
mockDialer := &honeybeetest.MockDialer{
|
||||
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
|
||||
return nil, nil, dialErr
|
||||
},
|
||||
}
|
||||
conn.dialer = mockDialer
|
||||
config := &ConnectionConfig{
|
||||
Retry: RetryConfig{
|
||||
MaxRetries: 2,
|
||||
InitialDelay: 1 * time.Millisecond,
|
||||
MaxDelay: 5 * time.Millisecond,
|
||||
JitterFactor: 0.0,
|
||||
},
|
||||
Dialer: mockDialer,
|
||||
}
|
||||
|
||||
conn, err := NewConnection(context.Background(), "ws://test", config, mockHandler)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = conn.Connect(context.Background())
|
||||
assert.Error(t, err)
|
||||
@@ -90,8 +88,8 @@ func TestConnectLogging(t *testing.T) {
|
||||
log(slog.LevelDebug, "dialing", map[string]any{"attempt": 2}),
|
||||
log(slog.LevelWarn, "dial failed, retrying", map[string]any{"attempt": 2, "error": dialErr}),
|
||||
log(slog.LevelDebug, "dialing", map[string]any{"attempt": 3}),
|
||||
log(slog.LevelError, "dial failed, max retries reached", map[string]any{"attempt": 3, "error": dialErr}),
|
||||
log(slog.LevelError, "connection failed", map[string]any{"error": dialErr}),
|
||||
log(slog.LevelDebug, "dial failed, max retries reached", map[string]any{"attempt": 3, "error": dialErr}),
|
||||
log(slog.LevelWarn, "connection failed", map[string]any{"error": dialErr}),
|
||||
}
|
||||
|
||||
honeybeetest.AssertLogSequence(t, records, expected)
|
||||
@@ -100,18 +98,6 @@ func TestConnectLogging(t *testing.T) {
|
||||
t.Run("success after retry", func(t *testing.T) {
|
||||
mockHandler := honeybeetest.NewMockSlogHandler()
|
||||
|
||||
config := &ConnectionConfig{
|
||||
Retry: &RetryConfig{
|
||||
MaxRetries: 3,
|
||||
InitialDelay: 1 * time.Millisecond,
|
||||
MaxDelay: 5 * time.Millisecond,
|
||||
JitterFactor: 0.0,
|
||||
},
|
||||
}
|
||||
|
||||
conn, err := NewConnection(context.Background(), "ws://test", config, mockHandler)
|
||||
assert.NoError(t, err)
|
||||
|
||||
attemptCount := 0
|
||||
dialErr := fmt.Errorf("dial error")
|
||||
mockDialer := &honeybeetest.MockDialer{
|
||||
@@ -123,7 +109,18 @@ func TestConnectLogging(t *testing.T) {
|
||||
return honeybeetest.NewMockSocket(), nil, nil
|
||||
},
|
||||
}
|
||||
conn.dialer = mockDialer
|
||||
config := &ConnectionConfig{
|
||||
Retry: RetryConfig{
|
||||
MaxRetries: 3,
|
||||
InitialDelay: 1 * time.Millisecond,
|
||||
MaxDelay: 5 * time.Millisecond,
|
||||
JitterFactor: 0.0,
|
||||
},
|
||||
Dialer: mockDialer,
|
||||
}
|
||||
|
||||
conn, err := NewConnection(context.Background(), "ws://test", config, mockHandler)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = conn.Connect(context.Background())
|
||||
assert.NoError(t, err)
|
||||
@@ -139,7 +136,7 @@ func TestConnectLogging(t *testing.T) {
|
||||
log(slog.LevelWarn, "dial failed, retrying", map[string]any{"attempt": 2, "error": dialErr}),
|
||||
log(slog.LevelDebug, "dialing", map[string]any{"attempt": 3}),
|
||||
log(slog.LevelDebug, "dial successful", map[string]any{"attempt": 3}),
|
||||
log(slog.LevelInfo, "connected", map[string]any{}),
|
||||
log(slog.LevelDebug, "connected", map[string]any{}),
|
||||
}
|
||||
|
||||
honeybeetest.AssertLogSequence(t, records, expected)
|
||||
@@ -158,14 +155,14 @@ func TestCloseLogging(t *testing.T) {
|
||||
|
||||
honeybeetest.Eventually(t, func() bool {
|
||||
return honeybeetest.FindLogRecord(
|
||||
mockHandler.GetRecords(), slog.LevelInfo, "closed") != nil
|
||||
mockHandler.GetRecords(), slog.LevelDebug, "closed") != nil
|
||||
}, "expected log")
|
||||
|
||||
records := mockHandler.GetRecords()
|
||||
|
||||
expected := []honeybeetest.ExpectedLog{
|
||||
log(slog.LevelInfo, "closing", map[string]any{}),
|
||||
log(slog.LevelInfo, "closed", map[string]any{}),
|
||||
log(slog.LevelDebug, "closing", map[string]any{}),
|
||||
log(slog.LevelDebug, "closed", map[string]any{}),
|
||||
}
|
||||
|
||||
honeybeetest.AssertLogSequence(t, records, expected)
|
||||
@@ -193,7 +190,7 @@ func TestCloseLogging(t *testing.T) {
|
||||
records := mockHandler.GetRecords()
|
||||
|
||||
expected := []honeybeetest.ExpectedLog{
|
||||
log(slog.LevelInfo, "closing", map[string]any{}),
|
||||
log(slog.LevelDebug, "closing", map[string]any{}),
|
||||
log(slog.LevelError, "socket close failed", map[string]any{"error": closeErr}),
|
||||
}
|
||||
|
||||
@@ -219,10 +216,10 @@ func TestReaderLogging(t *testing.T) {
|
||||
|
||||
honeybeetest.Eventually(t, func() bool {
|
||||
return honeybeetest.FindLogRecord(
|
||||
mockHandler.GetRecords(), slog.LevelInfo, "connection closed by peer") != nil
|
||||
mockHandler.GetRecords(), slog.LevelDebug, "connection closed by peer") != nil
|
||||
}, "expected log")
|
||||
|
||||
record := honeybeetest.FindLogRecord(mockHandler.GetRecords(), slog.LevelInfo, "connection closed by peer")
|
||||
record := honeybeetest.FindLogRecord(mockHandler.GetRecords(), slog.LevelDebug, "connection closed by peer")
|
||||
assert.NotNil(t, record)
|
||||
honeybeetest.AssertAttributePresent(t, *record, "code", websocket.CloseNormalClosure)
|
||||
honeybeetest.AssertAttributePresent(t, *record, "text", "goodbye")
|
||||
@@ -246,10 +243,10 @@ func TestReaderLogging(t *testing.T) {
|
||||
|
||||
honeybeetest.Eventually(t, func() bool {
|
||||
return honeybeetest.FindLogRecord(
|
||||
mockHandler.GetRecords(), slog.LevelError, "unexpected close") != nil
|
||||
mockHandler.GetRecords(), slog.LevelWarn, "unexpected close") != nil
|
||||
}, "expected log")
|
||||
|
||||
record := honeybeetest.FindLogRecord(mockHandler.GetRecords(), slog.LevelError, "unexpected close")
|
||||
record := honeybeetest.FindLogRecord(mockHandler.GetRecords(), slog.LevelWarn, "unexpected close")
|
||||
assert.NotNil(t, record)
|
||||
honeybeetest.AssertAttributePresent(t, *record, "code", websocket.CloseProtocolError)
|
||||
honeybeetest.AssertAttributePresent(t, *record, "text", "bad protocol")
|
||||
@@ -279,7 +276,7 @@ func TestWriterLogging(t *testing.T) {
|
||||
t.Run("write deadline error", func(t *testing.T) {
|
||||
mockHandler := honeybeetest.NewMockSlogHandler()
|
||||
|
||||
config := &ConnectionConfig{WriteTimeout: 1 * time.Millisecond}
|
||||
config := &ConnectionConfig{WriteTimeout: 1 * time.Millisecond, Retry: RetryConfig{Disabled: true}}
|
||||
|
||||
deadlineErr := fmt.Errorf("deadline error")
|
||||
mockSocket := honeybeetest.NewMockSocket()
|
||||
@@ -341,16 +338,15 @@ func TestLoggingDisabled(t *testing.T) {
|
||||
t.Run("nil logger produces no logs", func(t *testing.T) {
|
||||
mockHandler := honeybeetest.NewMockSlogHandler()
|
||||
|
||||
conn, err := NewConnection(context.Background(), "ws://test", nil, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
mockSocket := honeybeetest.NewMockSocket()
|
||||
mockDialer := &honeybeetest.MockDialer{
|
||||
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
|
||||
return mockSocket, nil, nil
|
||||
},
|
||||
}
|
||||
conn.dialer = mockDialer
|
||||
conn, err := NewConnection(context.Background(), "ws://test",
|
||||
&ConnectionConfig{Retry: RetryConfig{Disabled: true}, Dialer: mockDialer}, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = conn.Connect(context.Background())
|
||||
assert.NoError(t, err)
|
||||
|
||||
+6
-6
@@ -7,16 +7,16 @@ import (
|
||||
)
|
||||
|
||||
type RetryManager struct {
|
||||
config *RetryConfig
|
||||
config RetryConfig
|
||||
retryCount int
|
||||
saturation int
|
||||
}
|
||||
|
||||
func NewRetryManager(config *RetryConfig) *RetryManager {
|
||||
func NewRetryManager(config RetryConfig) *RetryManager {
|
||||
// saturationCount: retry count at which base delay meets or exceeds MaxDelay.
|
||||
// Conservative by two to preserve jitter variance near the boundary.
|
||||
saturation := 0
|
||||
if config != nil &&
|
||||
if !config.Disabled &&
|
||||
config.InitialDelay > 0 &&
|
||||
config.InitialDelay <= config.MaxDelay {
|
||||
ratio := float64(config.MaxDelay) / float64(config.InitialDelay)
|
||||
@@ -31,7 +31,7 @@ func NewRetryManager(config *RetryConfig) *RetryManager {
|
||||
}
|
||||
|
||||
func (r *RetryManager) ShouldRetry() bool {
|
||||
if r.config == nil {
|
||||
if r.config.Disabled {
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -43,7 +43,7 @@ func (r *RetryManager) ShouldRetry() bool {
|
||||
}
|
||||
|
||||
func (r *RetryManager) CalculateDelay() time.Duration {
|
||||
if r.config == nil {
|
||||
if r.config.Disabled {
|
||||
return time.Second
|
||||
}
|
||||
|
||||
@@ -54,7 +54,7 @@ func (r *RetryManager) CalculateDelay() time.Duration {
|
||||
|
||||
// if saturation is reached, calculated backoff will always be higher than
|
||||
// the maximum delay
|
||||
if r.config != nil && r.retryCount >= r.saturation {
|
||||
if r.retryCount >= r.saturation {
|
||||
return r.config.MaxDelay
|
||||
}
|
||||
|
||||
|
||||
+13
-13
@@ -7,7 +7,7 @@ import (
|
||||
)
|
||||
|
||||
func TestNewRetryManager(t *testing.T) {
|
||||
config := &RetryConfig{
|
||||
config := RetryConfig{
|
||||
MaxRetries: 0,
|
||||
}
|
||||
|
||||
@@ -16,14 +16,14 @@ func TestNewRetryManager(t *testing.T) {
|
||||
assert.Equal(t, config, mgr.config)
|
||||
assert.Equal(t, 0, mgr.retryCount)
|
||||
|
||||
// Should accept nil config
|
||||
mgr = NewRetryManager(nil)
|
||||
assert.Nil(t, mgr.config)
|
||||
// Should accept a disabled config
|
||||
mgr = NewRetryManager(RetryConfig{Disabled: true})
|
||||
assert.True(t, mgr.config.Disabled)
|
||||
assert.Equal(t, 0, mgr.retryCount)
|
||||
}
|
||||
|
||||
func TestRecordRetry(t *testing.T) {
|
||||
mgr := NewRetryManager(nil)
|
||||
mgr := NewRetryManager(RetryConfig{Disabled: true})
|
||||
assert.Equal(t, mgr.retryCount, 0)
|
||||
|
||||
mgr.RecordRetry()
|
||||
@@ -34,13 +34,13 @@ func TestRecordRetry(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestShouldRetry(t *testing.T) {
|
||||
// never retry if config is nil
|
||||
mgr := NewRetryManager(nil)
|
||||
// never retry if config is disabled
|
||||
mgr := NewRetryManager(RetryConfig{Disabled: true})
|
||||
assert.False(t, mgr.ShouldRetry())
|
||||
|
||||
// always retry if max attempt count is zero
|
||||
mgr = &RetryManager{
|
||||
config: &RetryConfig{
|
||||
config: RetryConfig{
|
||||
MaxRetries: 0,
|
||||
},
|
||||
retryCount: 1000,
|
||||
@@ -49,7 +49,7 @@ func TestShouldRetry(t *testing.T) {
|
||||
|
||||
// retry if below max attempt count
|
||||
mgr = &RetryManager{
|
||||
config: &RetryConfig{
|
||||
config: RetryConfig{
|
||||
MaxRetries: 10,
|
||||
},
|
||||
retryCount: 5,
|
||||
@@ -58,7 +58,7 @@ func TestShouldRetry(t *testing.T) {
|
||||
|
||||
// do not retry if above max attempt count
|
||||
mgr = &RetryManager{
|
||||
config: &RetryConfig{
|
||||
config: RetryConfig{
|
||||
MaxRetries: 10,
|
||||
},
|
||||
retryCount: 11,
|
||||
@@ -68,12 +68,12 @@ func TestShouldRetry(t *testing.T) {
|
||||
|
||||
func TestCalculateDelayDisabled(t *testing.T) {
|
||||
// default delay if retry is disabled
|
||||
mgr := NewRetryManager(nil)
|
||||
mgr := NewRetryManager(RetryConfig{Disabled: true})
|
||||
assert.Equal(t, time.Second, mgr.CalculateDelay())
|
||||
}
|
||||
|
||||
func TestCalculateDelayWithoutJitter(t *testing.T) {
|
||||
mgr := NewRetryManager(&RetryConfig{
|
||||
mgr := NewRetryManager(RetryConfig{
|
||||
MaxRetries: 0,
|
||||
InitialDelay: 1 * time.Second,
|
||||
MaxDelay: 5 * time.Second,
|
||||
@@ -105,7 +105,7 @@ func TestCalculateDelayWithoutJitter(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestCalculateDelayWithJitter(t *testing.T) {
|
||||
mgr := NewRetryManager(&RetryConfig{
|
||||
mgr := NewRetryManager(RetryConfig{
|
||||
MaxRetries: 0,
|
||||
InitialDelay: 1 * time.Second,
|
||||
MaxDelay: 5 * time.Second,
|
||||
|
||||
+1
-1
@@ -82,7 +82,7 @@ func AcquireSocket(
|
||||
if !retryMgr.ShouldRetry() {
|
||||
// retry policy expired
|
||||
if logger != nil {
|
||||
logger.Error("dial failed, max retries reached",
|
||||
logger.Debug("dial failed, max retries reached",
|
||||
"error", err,
|
||||
"attempt", retryMgr.RetryCount()+1)
|
||||
}
|
||||
|
||||
@@ -77,7 +77,7 @@ func TestAcquireSocket(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
retryMgr := NewRetryManager(&RetryConfig{
|
||||
retryMgr := NewRetryManager(RetryConfig{
|
||||
MaxRetries: tc.maxRetries,
|
||||
InitialDelay: 1 * time.Millisecond,
|
||||
MaxDelay: 5 * time.Millisecond,
|
||||
@@ -106,7 +106,8 @@ func TestAcquireSocketGuards(t *testing.T) {
|
||||
return honeybeetest.NewMockSocket(), nil, nil
|
||||
},
|
||||
}
|
||||
validRetryMgr := NewRetryManager(GetDefaultRetryConfig())
|
||||
validRetryConfig := GetDefaultConnectionConfig().Retry
|
||||
validRetryMgr := NewRetryManager(validRetryConfig)
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
@@ -167,7 +168,7 @@ func TestAcquireSocketContextCancellation(t *testing.T) {
|
||||
// cancel before acquiring socket
|
||||
cancel()
|
||||
|
||||
retryMgr := NewRetryManager(GetDefaultRetryConfig())
|
||||
retryMgr := NewRetryManager(GetDefaultConnectionConfig().Retry)
|
||||
_, _, err := AcquireSocket(ctx, retryMgr, mockDialer, "ws://test", nil, nil)
|
||||
|
||||
assert.ErrorIs(t, err, context.Canceled)
|
||||
@@ -186,7 +187,7 @@ func TestAcquireSocketContextCancellation(t *testing.T) {
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
retryMgr := NewRetryManager(&RetryConfig{
|
||||
retryMgr := NewRetryManager(RetryConfig{
|
||||
MaxRetries: 10,
|
||||
InitialDelay: 1 * time.Second,
|
||||
MaxDelay: 1 * time.Second,
|
||||
@@ -230,7 +231,7 @@ func TestAcquireSocketContextCancellation(t *testing.T) {
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
retryMgr := NewRetryManager(GetDefaultRetryConfig())
|
||||
retryMgr := NewRetryManager(GetDefaultConnectionConfig().Retry)
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
_, _, err := AcquireSocket(ctx, retryMgr, mockDialer, "ws://test", nil, nil)
|
||||
@@ -263,7 +264,7 @@ func TestAcquireSocketPassesHeaders(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
retryMgr := NewRetryManager(&RetryConfig{MaxRetries: 0})
|
||||
retryMgr := NewRetryManager(RetryConfig{MaxRetries: 0, InitialDelay: 1 * time.Millisecond, MaxDelay: 5 * time.Millisecond})
|
||||
_, _, err := AcquireSocket(context.Background(), retryMgr, mockDialer, "ws://test", header, nil)
|
||||
|
||||
assert.NoError(t, err)
|
||||
|
||||
@@ -2,6 +2,7 @@ package honeybee
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@@ -94,7 +95,6 @@ func NewWorker(
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
config: config,
|
||||
handler: handler,
|
||||
|
||||
processedCount: &atomic.Uint64{},
|
||||
outgoingCount: &atomic.Uint64{},
|
||||
@@ -103,7 +103,8 @@ func NewWorker(
|
||||
|
||||
if handler != nil {
|
||||
comp := component.FromContext(ctx)
|
||||
w.logger = slog.New(handler).With(slog.Any("component", comp), slog.String("peer_id", id))
|
||||
w.handler = handler.WithAttrs([]slog.Attr{slog.String("peer", id)})
|
||||
w.logger = slog.New(w.handler).With(slog.Any("component", comp))
|
||||
}
|
||||
|
||||
return w, nil
|
||||
@@ -124,13 +125,13 @@ func (w *DefaultWorker) Start(pool PoolPlugin) {
|
||||
})
|
||||
|
||||
if w.logger != nil {
|
||||
w.logger.Info("started")
|
||||
w.logger.Debug("started")
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
if w.logger != nil {
|
||||
w.logger.Info("stopped")
|
||||
w.logger.Debug("stopped")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -162,7 +163,7 @@ func (w *DefaultWorker) runSession(ctx context.Context, pool PoolPlugin) {
|
||||
|
||||
case conn = <-newConn:
|
||||
if w.logger != nil {
|
||||
w.logger.Debug("session: connected")
|
||||
w.logger.Info("connected")
|
||||
}
|
||||
break preConn
|
||||
|
||||
@@ -171,7 +172,7 @@ func (w *DefaultWorker) runSession(ctx context.Context, pool PoolPlugin) {
|
||||
|
||||
case <-inactive():
|
||||
if w.logger != nil {
|
||||
w.logger.Info("keepalive: no activity observed")
|
||||
w.logger.Warn("keepalive: no activity observed")
|
||||
}
|
||||
timer.Reset(w.config.KeepaliveTimeout)
|
||||
spawnDialer()
|
||||
@@ -183,7 +184,7 @@ func (w *DefaultWorker) runSession(ctx context.Context, pool PoolPlugin) {
|
||||
pool.Events <- PoolEvent{ID: w.id, Kind: EventConnected, At: time.Now()}
|
||||
|
||||
if w.logger != nil {
|
||||
w.logger.Info("session: started")
|
||||
w.logger.Debug("session: started")
|
||||
}
|
||||
|
||||
// run session loop
|
||||
@@ -195,8 +196,14 @@ func (w *DefaultWorker) runSession(ctx context.Context, pool PoolPlugin) {
|
||||
|
||||
case data, ok := <-conn.Incoming():
|
||||
if !ok {
|
||||
var reason error
|
||||
select {
|
||||
case reason = <-conn.Errors():
|
||||
default:
|
||||
reason = fmt.Errorf("unknown")
|
||||
}
|
||||
if w.logger != nil {
|
||||
w.logger.Debug("reader: disconnected")
|
||||
w.logger.Info("websocket: closed", "reason", reason)
|
||||
}
|
||||
break conn_loop
|
||||
}
|
||||
@@ -210,9 +217,6 @@ func (w *DefaultWorker) runSession(ctx context.Context, pool PoolPlugin) {
|
||||
heartbeat()
|
||||
|
||||
case <-conn.Heartbeat():
|
||||
if w.logger != nil {
|
||||
w.logger.Debug("ping-pong heartbeat")
|
||||
}
|
||||
heartbeat()
|
||||
|
||||
case <-w.sendHeartbeat:
|
||||
@@ -220,7 +224,7 @@ func (w *DefaultWorker) runSession(ctx context.Context, pool PoolPlugin) {
|
||||
|
||||
case <-inactive():
|
||||
if w.logger != nil {
|
||||
w.logger.Info("keepalive: no activity observed")
|
||||
w.logger.Warn("keepalive: no activity observed")
|
||||
}
|
||||
timer.Reset(w.config.KeepaliveTimeout)
|
||||
break conn_loop
|
||||
@@ -231,7 +235,10 @@ func (w *DefaultWorker) runSession(ctx context.Context, pool PoolPlugin) {
|
||||
conn.Close()
|
||||
|
||||
if w.logger != nil {
|
||||
w.logger.Info("session: ended")
|
||||
w.logger.Info("disconnected")
|
||||
}
|
||||
if w.logger != nil {
|
||||
w.logger.Debug("session: ended")
|
||||
}
|
||||
|
||||
// tear down connection
|
||||
@@ -301,16 +308,13 @@ func (w *DefaultWorker) spawnDialer(
|
||||
dialCtx, dialCancel := context.WithCancel(ctx)
|
||||
|
||||
if w.logger != nil {
|
||||
w.logger.Debug("session: requesting connection")
|
||||
w.logger.Debug("session: dialing")
|
||||
}
|
||||
|
||||
go func() {
|
||||
conn, err := connect(w.id, dialCtx, pool, w.handler)
|
||||
|
||||
if err != nil {
|
||||
if w.logger != nil {
|
||||
w.logger.Warn("dialer: dial failed", "error", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -330,12 +334,11 @@ func connect(
|
||||
pool PoolPlugin,
|
||||
handler slog.Handler,
|
||||
) (*transport.Connection, error) {
|
||||
conn, err := transport.NewConnection(ctx, id, pool.ConnectionConfig, handler)
|
||||
cc := pool.ConnectionConfig
|
||||
conn, err := transport.NewConnection(ctx, id, &cc, handler)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conn.SetDialer(pool.Dialer)
|
||||
return conn, conn.Connect(ctx)
|
||||
}
|
||||
|
||||
@@ -345,7 +348,7 @@ func connect(
|
||||
|
||||
func (w *DefaultWorker) Stop() {
|
||||
if w.logger != nil {
|
||||
w.logger.Debug("shutting down")
|
||||
w.logger.Info("shutting down")
|
||||
}
|
||||
w.cancel()
|
||||
}
|
||||
|
||||
+20
-19
@@ -28,6 +28,7 @@ func makeWorkerContext(t *testing.T) (
|
||||
Inbox: inbox,
|
||||
Events: events,
|
||||
InboxCounter: &atomic.Uint64{},
|
||||
ConnectionConfig: *transport.GetDefaultConnectionConfig(),
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -65,7 +66,7 @@ func TestWorkerSession(t *testing.T) {
|
||||
w := makeWorker(t, ctx, cancel)
|
||||
_, events, pool := makeWorkerContext(t)
|
||||
mockSocket := honeybeetest.NewMockSocket()
|
||||
pool.Dialer = mockDialer(mockSocket)
|
||||
pool.ConnectionConfig.Dialer = mockDialer(mockSocket)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Go(func() {
|
||||
@@ -89,13 +90,13 @@ func TestWorkerSession(t *testing.T) {
|
||||
w := makeWorker(t, ctx, cancel)
|
||||
_, events, pool := makeWorkerContext(t)
|
||||
|
||||
pool.Dialer = &honeybeetest.MockDialer{
|
||||
cc, _ := transport.NewConnectionConfig(transport.WithRetryDisabled())
|
||||
pool.ConnectionConfig = *cc
|
||||
pool.ConnectionConfig.Dialer = &honeybeetest.MockDialer{
|
||||
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
|
||||
return nil, nil, errors.New("connection refused")
|
||||
},
|
||||
}
|
||||
cc, _ := transport.NewConnectionConfig(transport.WithoutRetry())
|
||||
pool.ConnectionConfig = cc
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Go(func() { w.Start(pool) })
|
||||
@@ -143,15 +144,15 @@ func TestWorkerSession(t *testing.T) {
|
||||
_, _, pool := makeWorkerContext(t)
|
||||
|
||||
var dialCount atomic.Uint64
|
||||
pool.Dialer = &honeybeetest.MockDialer{
|
||||
cc, _ := transport.NewConnectionConfig(transport.WithRetryDisabled())
|
||||
pool.ConnectionConfig = *cc
|
||||
pool.ConnectionConfig.Dialer = &honeybeetest.MockDialer{
|
||||
DialContextFunc: func(dialCtx context.Context, _ string, _ http.Header) (types.Socket, *http.Response, error) {
|
||||
dialCount.Add(1)
|
||||
<-dialCtx.Done()
|
||||
return nil, nil, dialCtx.Err()
|
||||
},
|
||||
}
|
||||
cc, _ := transport.NewConnectionConfig(transport.WithoutRetry())
|
||||
pool.ConnectionConfig = cc
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Go(func() { w.Start(pool) })
|
||||
@@ -169,14 +170,14 @@ func TestWorkerSession(t *testing.T) {
|
||||
w := makeWorker(t, ctx, cancel)
|
||||
_, events, pool := makeWorkerContext(t)
|
||||
|
||||
pool.Dialer = &honeybeetest.MockDialer{
|
||||
cc, _ := transport.NewConnectionConfig(transport.WithRetryDisabled())
|
||||
pool.ConnectionConfig = *cc
|
||||
pool.ConnectionConfig.Dialer = &honeybeetest.MockDialer{
|
||||
DialContextFunc: func(dialCtx context.Context, _ string, _ http.Header) (types.Socket, *http.Response, error) {
|
||||
<-dialCtx.Done()
|
||||
return nil, nil, dialCtx.Err()
|
||||
},
|
||||
}
|
||||
cc, _ := transport.NewConnectionConfig(transport.WithoutRetry())
|
||||
pool.ConnectionConfig = cc
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Go(func() { w.Start(pool) })
|
||||
@@ -204,7 +205,7 @@ func TestWorkerSession(t *testing.T) {
|
||||
w := makeWorker(t, ctx, cancel)
|
||||
_, events, pool := makeWorkerContext(t)
|
||||
_, mockSocket, _, outgoingData := setupTestConnection(t)
|
||||
pool.Dialer = mockDialer(mockSocket)
|
||||
pool.ConnectionConfig.Dialer = mockDialer(mockSocket)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Go(func() {
|
||||
@@ -255,7 +256,7 @@ func TestWorkerSession(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
pool.Dialer = mockDialer(mockSocket)
|
||||
pool.ConnectionConfig.Dialer = mockDialer(mockSocket)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Go(func() {
|
||||
@@ -311,7 +312,7 @@ func TestWorkerSession(t *testing.T) {
|
||||
}
|
||||
_, events, pool := makeWorkerContext(t)
|
||||
_, mockSocket, incomingData, _ := setupTestConnection(t)
|
||||
pool.Dialer = mockDialer(mockSocket)
|
||||
pool.ConnectionConfig.Dialer = mockDialer(mockSocket)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Go(func() { w.Start(pool) })
|
||||
@@ -377,7 +378,7 @@ func TestWorkerSession(t *testing.T) {
|
||||
var pongHandler func(string) error
|
||||
mockSocket, incomingData, _ := honeybeetest.SetupTestSocket(t)
|
||||
mockSocket.SetPongHandlerFunc = func(h func(string) error) { pongHandler = h }
|
||||
pool.Dialer = mockDialer(mockSocket)
|
||||
pool.ConnectionConfig.Dialer = mockDialer(mockSocket)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Go(func() { w.Start(pool) })
|
||||
@@ -439,7 +440,7 @@ func TestWorkerSession(t *testing.T) {
|
||||
}
|
||||
_, events, pool := makeWorkerContext(t)
|
||||
_, mockSocket, _, _ := setupTestConnection(t)
|
||||
pool.Dialer = mockDialer(mockSocket)
|
||||
pool.ConnectionConfig.Dialer = mockDialer(mockSocket)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Go(func() { w.Start(pool) })
|
||||
@@ -481,7 +482,7 @@ func TestWorkerSession(t *testing.T) {
|
||||
w := makeWorker(t, ctx, cancel)
|
||||
_, events, pool := makeWorkerContext(t)
|
||||
_, mockSocket, incomingData, _ := setupTestConnection(t)
|
||||
pool.Dialer = mockDialer(mockSocket)
|
||||
pool.ConnectionConfig.Dialer = mockDialer(mockSocket)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Go(func() {
|
||||
@@ -525,7 +526,7 @@ func TestWorkerSession(t *testing.T) {
|
||||
w := makeWorker(t, ctx, cancel)
|
||||
_, events, pool := makeWorkerContext(t)
|
||||
_, mockSocket, incomingData, _ := setupTestConnection(t)
|
||||
pool.Dialer = mockDialer(mockSocket)
|
||||
pool.ConnectionConfig.Dialer = mockDialer(mockSocket)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Go(func() { w.Start(pool) })
|
||||
@@ -561,7 +562,7 @@ func TestWorkerSession(t *testing.T) {
|
||||
w := makeWorker(t, ctx, cancel)
|
||||
_, events, pool := makeWorkerContext(t)
|
||||
mockSocket := honeybeetest.NewMockSocket()
|
||||
pool.Dialer = mockDialer(mockSocket)
|
||||
pool.ConnectionConfig.Dialer = mockDialer(mockSocket)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Go(func() {
|
||||
@@ -607,7 +608,7 @@ func TestWorkerSession(t *testing.T) {
|
||||
w := makeWorker(t, workerCtx, workerCancel)
|
||||
_, events, pool := makeWorkerContext(t)
|
||||
mockSocket := honeybeetest.NewMockSocket()
|
||||
pool.Dialer = mockDialer(mockSocket)
|
||||
pool.ConnectionConfig.Dialer = mockDialer(mockSocket)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Go(func() {
|
||||
|
||||
Reference in New Issue
Block a user