Enum and field renames.
This commit is contained in:
+6
-6
@@ -52,17 +52,17 @@ type InboundPoolEventKind = inbound.PoolEventKind
|
|||||||
// Inbound Pool event constants
|
// Inbound Pool event constants
|
||||||
|
|
||||||
const (
|
const (
|
||||||
EventPeerDisconnected = inbound.EventPeerDisconnected
|
EventPeerDisconnected = inbound.EventDisconnected
|
||||||
EventPeerDropped = inbound.EventPeerDropped
|
EventPeerDropped = inbound.EventDropped
|
||||||
EventPeerEvicted = inbound.EventPeerEvicted
|
EventPeerEvicted = inbound.EventEvicted
|
||||||
)
|
)
|
||||||
|
|
||||||
// Inbound Worker exit kinds
|
// Inbound Worker exit kinds
|
||||||
|
|
||||||
const (
|
const (
|
||||||
ExitCleanDisconnect = inbound.ExitCleanDisconnect
|
ExitCleanDisconnect = inbound.ExitDisconnected
|
||||||
ExitUnexpectedDrop = inbound.ExitUnexpectedDrop
|
ExitUnexpectedDrop = inbound.ExitError
|
||||||
ExitInactive = inbound.ExitInactive
|
ExitInactive = inbound.ExitPolicy
|
||||||
)
|
)
|
||||||
|
|
||||||
// Connection constructors
|
// Connection constructors
|
||||||
|
|||||||
+6
-6
@@ -10,8 +10,8 @@ import (
|
|||||||
// Worker Config
|
// Worker Config
|
||||||
|
|
||||||
type WorkerConfig struct {
|
type WorkerConfig struct {
|
||||||
MaxQueueSize int
|
MaxQueueSize int
|
||||||
DeadTimeout time.Duration
|
InactivityTimeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
type WorkerOption func(*WorkerConfig) error
|
type WorkerOption func(*WorkerConfig) error
|
||||||
@@ -29,8 +29,8 @@ func NewWorkerConfig(options ...WorkerOption) (*WorkerConfig, error) {
|
|||||||
|
|
||||||
func GetDefaultWorkerConfig() *WorkerConfig {
|
func GetDefaultWorkerConfig() *WorkerConfig {
|
||||||
return &WorkerConfig{
|
return &WorkerConfig{
|
||||||
MaxQueueSize: 0, // queue can grow indefinitely by default
|
MaxQueueSize: 0, // queue can grow indefinitely by default
|
||||||
DeadTimeout: 0, // eviction disabled by default
|
InactivityTimeout: 0, // eviction disabled by default
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -47,7 +47,7 @@ func ValidateWorkerConfig(config *WorkerConfig) error {
|
|||||||
if err := validateMaxQueueSize(config.MaxQueueSize); err != nil {
|
if err := validateMaxQueueSize(config.MaxQueueSize); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := validateDeadTimeout(config.DeadTimeout); err != nil {
|
if err := validateDeadTimeout(config.InactivityTimeout); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@@ -84,7 +84,7 @@ func WithDeadTimeout(value time.Duration) WorkerOption {
|
|||||||
if err := validateDeadTimeout(value); err != nil {
|
if err := validateDeadTimeout(value); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
c.DeadTimeout = value
|
c.InactivityTimeout = value
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,8 +17,8 @@ func TestNewWorkerConfig(t *testing.T) {
|
|||||||
func TestDefaultWorkerConfig(t *testing.T) {
|
func TestDefaultWorkerConfig(t *testing.T) {
|
||||||
conf := GetDefaultWorkerConfig()
|
conf := GetDefaultWorkerConfig()
|
||||||
assert.Equal(t, &WorkerConfig{
|
assert.Equal(t, &WorkerConfig{
|
||||||
MaxQueueSize: 0,
|
MaxQueueSize: 0,
|
||||||
DeadTimeout: 0,
|
InactivityTimeout: 0,
|
||||||
}, conf)
|
}, conf)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -34,11 +34,11 @@ func TestValidateWorkerConfig(t *testing.T) {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "zero dead timeout disabled",
|
name: "zero dead timeout disabled",
|
||||||
conf: WorkerConfig{DeadTimeout: 0},
|
conf: WorkerConfig{InactivityTimeout: 0},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "positive dead timeout",
|
name: "positive dead timeout",
|
||||||
conf: WorkerConfig{DeadTimeout: 30 * time.Second},
|
conf: WorkerConfig{InactivityTimeout: 30 * time.Second},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "negative max queue size",
|
name: "negative max queue size",
|
||||||
@@ -47,7 +47,7 @@ func TestValidateWorkerConfig(t *testing.T) {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "negative dead timeout",
|
name: "negative dead timeout",
|
||||||
conf: WorkerConfig{DeadTimeout: -1 * time.Second},
|
conf: WorkerConfig{InactivityTimeout: -1 * time.Second},
|
||||||
wantErr: InvalidDeadTimeout,
|
wantErr: InvalidDeadTimeout,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@@ -83,7 +83,7 @@ func TestWithDeadTimeout(t *testing.T) {
|
|||||||
|
|
||||||
err := applyWorkerOptions(conf, WithDeadTimeout(30*time.Second))
|
err := applyWorkerOptions(conf, WithDeadTimeout(30*time.Second))
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, 30*time.Second, conf.DeadTimeout)
|
assert.Equal(t, 30*time.Second, conf.InactivityTimeout)
|
||||||
|
|
||||||
err = applyWorkerOptions(conf, WithDeadTimeout(0))
|
err = applyWorkerOptions(conf, WithDeadTimeout(0))
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
@@ -179,9 +179,9 @@ func TestWithConnectionConfig(t *testing.T) {
|
|||||||
func TestWithWorkerConfig(t *testing.T) {
|
func TestWithWorkerConfig(t *testing.T) {
|
||||||
conf := &PoolConfig{}
|
conf := &PoolConfig{}
|
||||||
|
|
||||||
err := applyPoolOptions(conf, WithWorkerConfig(&WorkerConfig{DeadTimeout: 30 * time.Second}))
|
err := applyPoolOptions(conf, WithWorkerConfig(&WorkerConfig{InactivityTimeout: 30 * time.Second}))
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, 30*time.Second, conf.WorkerConfig.DeadTimeout)
|
assert.Equal(t, 30*time.Second, conf.WorkerConfig.InactivityTimeout)
|
||||||
|
|
||||||
err = applyPoolOptions(conf, WithWorkerConfig(&WorkerConfig{MaxQueueSize: -1}))
|
err = applyPoolOptions(conf, WithWorkerConfig(&WorkerConfig{MaxQueueSize: -1}))
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
|
|||||||
+7
-7
@@ -12,18 +12,18 @@ import (
|
|||||||
|
|
||||||
// Types
|
// Types
|
||||||
|
|
||||||
type PoolEventKind string
|
type PoolEventKind int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
EventPeerDisconnected PoolEventKind = "disconnected"
|
EventDisconnected PoolEventKind = iota
|
||||||
EventPeerDropped PoolEventKind = "dropped"
|
EventDropped
|
||||||
EventPeerEvicted PoolEventKind = "evicted"
|
EventEvicted
|
||||||
)
|
)
|
||||||
|
|
||||||
var workerToPoolEvent = map[WorkerExitKind]PoolEventKind{
|
var workerToPoolEvent = map[WorkerExitKind]PoolEventKind{
|
||||||
ExitCleanDisconnect: EventPeerDisconnected,
|
ExitDisconnected: EventDisconnected,
|
||||||
ExitUnexpectedDrop: EventPeerDropped,
|
ExitError: EventDropped,
|
||||||
ExitInactive: EventPeerEvicted,
|
ExitPolicy: EventEvicted,
|
||||||
}
|
}
|
||||||
|
|
||||||
type OnExitFunction func(kind WorkerExitKind)
|
type OnExitFunction func(kind WorkerExitKind)
|
||||||
|
|||||||
@@ -310,7 +310,7 @@ func TestPoolEvents(t *testing.T) {
|
|||||||
Err: &websocket.CloseError{Code: websocket.CloseNormalClosure},
|
Err: &websocket.CloseError{Code: websocket.CloseNormalClosure},
|
||||||
}
|
}
|
||||||
|
|
||||||
expectEvent(t, pool.Events(), "peer-1", EventPeerDisconnected)
|
expectEvent(t, pool.Events(), "peer-1", EventDisconnected)
|
||||||
|
|
||||||
honeybeetest.Eventually(t, func() bool {
|
honeybeetest.Eventually(t, func() bool {
|
||||||
return !slices.Contains(pool.Peers(), "peer-1")
|
return !slices.Contains(pool.Peers(), "peer-1")
|
||||||
@@ -328,7 +328,7 @@ func TestPoolEvents(t *testing.T) {
|
|||||||
Err: &websocket.CloseError{Code: websocket.CloseProtocolError},
|
Err: &websocket.CloseError{Code: websocket.CloseProtocolError},
|
||||||
}
|
}
|
||||||
|
|
||||||
expectEvent(t, pool.Events(), "peer-1", EventPeerDropped)
|
expectEvent(t, pool.Events(), "peer-1", EventDropped)
|
||||||
|
|
||||||
honeybeetest.Eventually(t, func() bool {
|
honeybeetest.Eventually(t, func() bool {
|
||||||
return !slices.Contains(pool.Peers(), "peer-1")
|
return !slices.Contains(pool.Peers(), "peer-1")
|
||||||
@@ -337,7 +337,7 @@ func TestPoolEvents(t *testing.T) {
|
|||||||
|
|
||||||
t.Run("EventPeerEvicted emitted on watchdog timeout", func(t *testing.T) {
|
t.Run("EventPeerEvicted emitted on watchdog timeout", func(t *testing.T) {
|
||||||
config, err := NewPoolConfig(
|
config, err := NewPoolConfig(
|
||||||
WithWorkerConfig(&WorkerConfig{DeadTimeout: 20 * time.Millisecond}),
|
WithWorkerConfig(&WorkerConfig{InactivityTimeout: 20 * time.Millisecond}),
|
||||||
)
|
)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
@@ -348,7 +348,7 @@ func TestPoolEvents(t *testing.T) {
|
|||||||
socket, _, _ := honeybeetest.SetupTestSocket(t)
|
socket, _, _ := honeybeetest.SetupTestSocket(t)
|
||||||
pool.Add("peer-1", socket)
|
pool.Add("peer-1", socket)
|
||||||
|
|
||||||
expectEvent(t, pool.Events(), "peer-1", EventPeerEvicted)
|
expectEvent(t, pool.Events(), "peer-1", EventEvicted)
|
||||||
|
|
||||||
honeybeetest.Eventually(t, func() bool {
|
honeybeetest.Eventually(t, func() bool {
|
||||||
return !slices.Contains(pool.Peers(), "peer-1")
|
return !slices.Contains(pool.Peers(), "peer-1")
|
||||||
|
|||||||
+8
-8
@@ -15,12 +15,12 @@ type Worker interface {
|
|||||||
Send(data []byte) error
|
Send(data []byte) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type WorkerExitKind string
|
type WorkerExitKind int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
ExitCleanDisconnect WorkerExitKind = "disconnected"
|
ExitDisconnected WorkerExitKind = iota
|
||||||
ExitUnexpectedDrop WorkerExitKind = "dropped"
|
ExitError
|
||||||
ExitInactive WorkerExitKind = "inactive"
|
ExitPolicy
|
||||||
)
|
)
|
||||||
|
|
||||||
type ReceivedMessage struct {
|
type ReceivedMessage struct {
|
||||||
@@ -79,7 +79,7 @@ func (w *DefaultWorker) Start(pool PoolPlugin, wg *sync.WaitGroup) {
|
|||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer owg.Done()
|
defer owg.Done()
|
||||||
RunWatchdog(w.ctx, pool.OnExit, w.heartbeat, w.config.DeadTimeout)
|
RunWatchdog(w.ctx, pool.OnExit, w.heartbeat, w.config.InactivityTimeout)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
owg.Wait()
|
owg.Wait()
|
||||||
@@ -119,14 +119,14 @@ func RunReader(
|
|||||||
if !ok {
|
if !ok {
|
||||||
// determine exit kind
|
// determine exit kind
|
||||||
// by default, the peer dropped unexpectedly
|
// by default, the peer dropped unexpectedly
|
||||||
kind := ExitUnexpectedDrop
|
kind := ExitError
|
||||||
select {
|
select {
|
||||||
// the peer-side error is sent before the connection is closed,
|
// the peer-side error is sent before the connection is closed,
|
||||||
// so a non-blocking call here is correct
|
// so a non-blocking call here is correct
|
||||||
// if an error is not sent, then assume the default event kind
|
// if an error is not sent, then assume the default event kind
|
||||||
case err := <-conn.Errors():
|
case err := <-conn.Errors():
|
||||||
if errors.Is(err, transport.ErrPeerClosedClean) {
|
if errors.Is(err, transport.ErrPeerClosedClean) {
|
||||||
kind = ExitCleanDisconnect
|
kind = ExitDisconnected
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
@@ -228,7 +228,7 @@ func RunWatchdog(
|
|||||||
// timer completed
|
// timer completed
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
// signal peer is inactive
|
// signal peer is inactive
|
||||||
onInactive(ExitInactive)
|
onInactive(ExitPolicy)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -102,7 +102,7 @@ func TestRunReader(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}, "expected onPeerClose")
|
}, "expected onPeerClose")
|
||||||
|
|
||||||
assert.Equal(t, ExitCleanDisconnect, gotKind)
|
assert.Equal(t, ExitDisconnected, gotKind)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("unexpected close calls onPeerClose with ExitUnexpectedDrop", func(t *testing.T) {
|
t.Run("unexpected close calls onPeerClose with ExitUnexpectedDrop", func(t *testing.T) {
|
||||||
@@ -139,7 +139,7 @@ func TestRunReader(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}, "expected onPeerClose")
|
}, "expected onPeerClose")
|
||||||
|
|
||||||
assert.Equal(t, ExitUnexpectedDrop, gotKind)
|
assert.Equal(t, ExitError, gotKind)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("read error calls onPeerClose with ExitUnexpectedDrop", func(t *testing.T) {
|
t.Run("read error calls onPeerClose with ExitUnexpectedDrop", func(t *testing.T) {
|
||||||
@@ -176,7 +176,7 @@ func TestRunReader(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}, "expected onPeerClose")
|
}, "expected onPeerClose")
|
||||||
|
|
||||||
assert.Equal(t, ExitUnexpectedDrop, gotKind)
|
assert.Equal(t, ExitError, gotKind)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("ctx.Done exits without calling onPeerClose", func(t *testing.T) {
|
t.Run("ctx.Done exits without calling onPeerClose", func(t *testing.T) {
|
||||||
|
|||||||
@@ -99,7 +99,7 @@ func TestWorkerStart(t *testing.T) {
|
|||||||
|
|
||||||
honeybeetest.Eventually(t, func() bool {
|
honeybeetest.Eventually(t, func() bool {
|
||||||
val := v.exitKind.Load()
|
val := v.exitKind.Load()
|
||||||
return val != nil && val.(WorkerExitKind) == ExitCleanDisconnect
|
return val != nil && val.(WorkerExitKind) == ExitDisconnected
|
||||||
}, "expected ExitCleanDisconnect")
|
}, "expected ExitCleanDisconnect")
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -115,7 +115,7 @@ func TestWorkerStart(t *testing.T) {
|
|||||||
|
|
||||||
honeybeetest.Eventually(t, func() bool {
|
honeybeetest.Eventually(t, func() bool {
|
||||||
val := v.exitKind.Load()
|
val := v.exitKind.Load()
|
||||||
return val != nil && val.(WorkerExitKind) == ExitUnexpectedDrop
|
return val != nil && val.(WorkerExitKind) == ExitError
|
||||||
}, "expected ExitUnexpectedDrop")
|
}, "expected ExitUnexpectedDrop")
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -124,7 +124,7 @@ func TestWorkerStart(t *testing.T) {
|
|||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
worker, err := NewWorker(ctx, "peer-1", conn, &WorkerConfig{
|
worker, err := NewWorker(ctx, "peer-1", conn, &WorkerConfig{
|
||||||
DeadTimeout: 20 * time.Millisecond,
|
InactivityTimeout: 20 * time.Millisecond,
|
||||||
})
|
})
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
worker.cancel = cancel
|
worker.cancel = cancel
|
||||||
@@ -146,7 +146,7 @@ func TestWorkerStart(t *testing.T) {
|
|||||||
|
|
||||||
honeybeetest.Eventually(t, func() bool {
|
honeybeetest.Eventually(t, func() bool {
|
||||||
val := exitKind.Load()
|
val := exitKind.Load()
|
||||||
return val != nil && val.(WorkerExitKind) == ExitInactive
|
return val != nil && val.(WorkerExitKind) == ExitPolicy
|
||||||
}, "expected ExitInactive")
|
}, "expected ExitInactive")
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -52,7 +52,7 @@ func TestRunWatchdog(t *testing.T) {
|
|||||||
}, "expected onInactive")
|
}, "expected onInactive")
|
||||||
|
|
||||||
assert.Equal(t, int32(1), count.Load())
|
assert.Equal(t, int32(1), count.Load())
|
||||||
assert.Equal(t, ExitInactive, gotKind)
|
assert.Equal(t, ExitPolicy, gotKind)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("ctx.Done exits without calling onInactive", func(t *testing.T) {
|
t.Run("ctx.Done exits without calling onInactive", func(t *testing.T) {
|
||||||
|
|||||||
Reference in New Issue
Block a user