Added logging to pools and workers.
This commit is contained in:
+29
-4
@@ -13,12 +13,12 @@ import (
|
||||
|
||||
// Types
|
||||
|
||||
type PoolEventKind int
|
||||
type PoolEventKind string
|
||||
|
||||
const (
|
||||
EventDisconnected PoolEventKind = iota
|
||||
EventDropped
|
||||
EventEvicted
|
||||
EventDisconnected PoolEventKind = "disconnected"
|
||||
EventDropped PoolEventKind = "dropped"
|
||||
EventEvicted PoolEventKind = "evicted"
|
||||
)
|
||||
|
||||
var workerToPoolEvent = map[WorkerExitKind]PoolEventKind{
|
||||
@@ -152,6 +152,10 @@ func (p *Pool) Errors() <-chan error {
|
||||
}
|
||||
|
||||
func (p *Pool) Close() {
|
||||
if p.logger != nil {
|
||||
p.logger.Debug("closing")
|
||||
}
|
||||
|
||||
p.mu.Lock()
|
||||
if p.closed {
|
||||
p.mu.Unlock()
|
||||
@@ -177,6 +181,10 @@ func (p *Pool) Close() {
|
||||
close(p.inbox)
|
||||
close(p.events)
|
||||
close(p.errors)
|
||||
|
||||
if p.logger != nil {
|
||||
p.logger.Info("closed")
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -205,6 +213,11 @@ func (p *Pool) Replace(id string, socket types.Socket) error {
|
||||
|
||||
if peer, exists := p.peers[id]; exists {
|
||||
p.removeLocked(peer)
|
||||
|
||||
if p.logger != nil {
|
||||
p.logger.Info("removed peer", "peer", id)
|
||||
}
|
||||
|
||||
} else {
|
||||
return ErrPeerNotFound
|
||||
}
|
||||
@@ -213,6 +226,10 @@ func (p *Pool) Replace(id string, socket types.Socket) error {
|
||||
}
|
||||
|
||||
func (p *Pool) Remove(id string) error {
|
||||
if p.logger != nil {
|
||||
p.logger.Debug("removing peer", "peer", id)
|
||||
}
|
||||
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
@@ -227,6 +244,10 @@ func (p *Pool) Remove(id string) error {
|
||||
|
||||
p.removeLocked(peer)
|
||||
|
||||
if p.logger != nil {
|
||||
p.logger.Info("removed peer", "peer", id)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -316,6 +337,10 @@ func (p *Pool) addLocked(id string, socket types.Socket) error {
|
||||
|
||||
p.peers[id] = peer
|
||||
|
||||
if p.logger != nil {
|
||||
p.logger.Info("added peer", "peer", id)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
+44
-7
@@ -17,12 +17,12 @@ type Worker interface {
|
||||
Send(data []byte) error
|
||||
}
|
||||
|
||||
type WorkerExitKind int
|
||||
type WorkerExitKind string
|
||||
|
||||
const (
|
||||
ExitDisconnected WorkerExitKind = iota
|
||||
ExitError
|
||||
ExitPolicy
|
||||
ExitDisconnected WorkerExitKind = "disconnected"
|
||||
ExitError WorkerExitKind = "error"
|
||||
ExitPolicy WorkerExitKind = "policy"
|
||||
)
|
||||
|
||||
type DefaultWorker struct {
|
||||
@@ -62,6 +62,10 @@ func NewWorker(
|
||||
}
|
||||
|
||||
func (w *DefaultWorker) Start(pool PoolPlugin) {
|
||||
if w.logger != nil {
|
||||
w.logger.Debug("starting")
|
||||
}
|
||||
|
||||
toQueue := make(chan types.ReceivedMessage, 256)
|
||||
toForwarder := make(chan types.ReceivedMessage, 256)
|
||||
|
||||
@@ -70,7 +74,7 @@ func (w *DefaultWorker) Start(pool PoolPlugin) {
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
RunReader(w.ctx, pool.OnExit, w.conn, toQueue, w.heartbeat)
|
||||
RunReader(w.ctx, pool.OnExit, w.conn, toQueue, w.heartbeat, w.logger)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
@@ -85,13 +89,24 @@ func (w *DefaultWorker) Start(pool PoolPlugin) {
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
RunWatchdog(w.ctx, pool.OnExit, w.heartbeat, w.config.InactivityTimeout)
|
||||
RunWatchdog(w.ctx, pool.OnExit, w.heartbeat, w.config.InactivityTimeout, w.logger)
|
||||
}()
|
||||
|
||||
if w.logger != nil {
|
||||
w.logger.Info("started")
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
if w.logger != nil {
|
||||
w.logger.Info("stopped")
|
||||
}
|
||||
}
|
||||
|
||||
func (w *DefaultWorker) Stop() {
|
||||
if w.logger != nil {
|
||||
w.logger.Debug("shutting down")
|
||||
}
|
||||
w.cancel()
|
||||
}
|
||||
|
||||
@@ -115,6 +130,8 @@ func RunReader(
|
||||
conn *transport.Connection,
|
||||
messages chan<- types.ReceivedMessage,
|
||||
heartbeat chan<- struct{},
|
||||
|
||||
logger *slog.Logger,
|
||||
) {
|
||||
for {
|
||||
select {
|
||||
@@ -122,6 +139,7 @@ func RunReader(
|
||||
return
|
||||
case data, ok := <-conn.Incoming():
|
||||
if !ok {
|
||||
var err error
|
||||
// determine exit kind
|
||||
// by default, the peer dropped unexpectedly
|
||||
kind := ExitError
|
||||
@@ -129,13 +147,21 @@ func RunReader(
|
||||
// the peer-side error is sent before the connection is closed,
|
||||
// so a non-blocking call here is correct
|
||||
// if an error is not sent, then assume the default event kind
|
||||
case err := <-conn.Errors():
|
||||
case err = <-conn.Errors():
|
||||
if errors.Is(err, transport.ErrPeerClosedClean) {
|
||||
kind = ExitDisconnected
|
||||
}
|
||||
default:
|
||||
}
|
||||
|
||||
if logger != nil {
|
||||
if kind == ExitError {
|
||||
logger.Error("reader: peer dropped", "event", kind, "error", err)
|
||||
} else {
|
||||
logger.Info("reader: peer disconnected", "event", kind)
|
||||
}
|
||||
}
|
||||
|
||||
onPeerClose(kind)
|
||||
return
|
||||
}
|
||||
@@ -184,9 +210,13 @@ func RunWatchdog(
|
||||
onInactive OnExitFunction,
|
||||
heartbeat <-chan struct{},
|
||||
timeout time.Duration,
|
||||
logger *slog.Logger,
|
||||
) {
|
||||
// disable watchdog timeout if not configured
|
||||
if timeout <= 0 {
|
||||
if logger != nil {
|
||||
logger.Debug("watchdog: disabled")
|
||||
}
|
||||
// drain heartbeats
|
||||
// wait for cancel and exit
|
||||
for {
|
||||
@@ -198,6 +228,10 @@ func RunWatchdog(
|
||||
}
|
||||
}
|
||||
|
||||
if logger != nil {
|
||||
logger.Debug("watchdog: enabled", "timeout", timeout)
|
||||
}
|
||||
|
||||
timer := time.NewTimer(timeout)
|
||||
defer timer.Stop()
|
||||
|
||||
@@ -217,6 +251,9 @@ func RunWatchdog(
|
||||
// timer completed
|
||||
case <-timer.C:
|
||||
// signal peer is inactive
|
||||
if logger != nil {
|
||||
logger.Info("watchdog: peer is inactive")
|
||||
}
|
||||
onInactive(ExitPolicy)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -23,7 +23,7 @@ func TestRunReader(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
go RunReader(ctx, func(WorkerExitKind) {}, conn, messages, heartbeat)
|
||||
go RunReader(ctx, func(WorkerExitKind) {}, conn, messages, heartbeat, nil)
|
||||
|
||||
before := time.Now()
|
||||
incoming <- honeybeetest.MockIncomingData{MsgType: websocket.TextMessage, Data: []byte("hello")}
|
||||
@@ -57,7 +57,7 @@ func TestRunReader(t *testing.T) {
|
||||
for range messages {
|
||||
}
|
||||
}()
|
||||
go RunReader(ctx, func(WorkerExitKind) {}, conn, messages, heartbeat)
|
||||
go RunReader(ctx, func(WorkerExitKind) {}, conn, messages, heartbeat, nil)
|
||||
|
||||
const n = 3
|
||||
for i := 0; i < n; i++ {
|
||||
@@ -92,7 +92,7 @@ func TestRunReader(t *testing.T) {
|
||||
go RunReader(ctx, func(kind WorkerExitKind) {
|
||||
gotKind = kind
|
||||
close(done)
|
||||
}, conn, messages, heartbeat)
|
||||
}, conn, messages, heartbeat, nil)
|
||||
|
||||
honeybeetest.Eventually(t, func() bool {
|
||||
select {
|
||||
@@ -129,7 +129,7 @@ func TestRunReader(t *testing.T) {
|
||||
go RunReader(ctx, func(kind WorkerExitKind) {
|
||||
gotKind = kind
|
||||
close(done)
|
||||
}, conn, messages, heartbeat)
|
||||
}, conn, messages, heartbeat, nil)
|
||||
|
||||
honeybeetest.Eventually(t, func() bool {
|
||||
select {
|
||||
@@ -166,7 +166,7 @@ func TestRunReader(t *testing.T) {
|
||||
go RunReader(ctx, func(kind WorkerExitKind) {
|
||||
gotKind = kind
|
||||
close(done)
|
||||
}, conn, messages, heartbeat)
|
||||
}, conn, messages, heartbeat, nil)
|
||||
|
||||
honeybeetest.Eventually(t, func() bool {
|
||||
select {
|
||||
@@ -193,7 +193,7 @@ func TestRunReader(t *testing.T) {
|
||||
go func() {
|
||||
RunReader(ctx, func(WorkerExitKind) {
|
||||
called.Store(true)
|
||||
}, conn, messages, heartbeat)
|
||||
}, conn, messages, heartbeat, nil)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
|
||||
@@ -16,7 +16,7 @@ func TestRunWatchdog(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
called := atomic.Bool{}
|
||||
go RunWatchdog(ctx, func(WorkerExitKind) { called.Store(true) }, heartbeat, 200*time.Millisecond)
|
||||
go RunWatchdog(ctx, func(WorkerExitKind) { called.Store(true) }, heartbeat, 200*time.Millisecond, nil)
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
@@ -40,7 +40,7 @@ func TestRunWatchdog(t *testing.T) {
|
||||
count.Add(1)
|
||||
gotKind = kind
|
||||
close(done)
|
||||
}, heartbeat, 20*time.Millisecond)
|
||||
}, heartbeat, 20*time.Millisecond, nil)
|
||||
|
||||
honeybeetest.Eventually(t, func() bool {
|
||||
select {
|
||||
@@ -62,7 +62,7 @@ func TestRunWatchdog(t *testing.T) {
|
||||
called := atomic.Bool{}
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
RunWatchdog(ctx, func(WorkerExitKind) { called.Store(true) }, heartbeat, 20*time.Second)
|
||||
RunWatchdog(ctx, func(WorkerExitKind) { called.Store(true) }, heartbeat, 20*time.Second, nil)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
@@ -87,7 +87,7 @@ func TestRunWatchdog(t *testing.T) {
|
||||
called := atomic.Bool{}
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
RunWatchdog(ctx, func(WorkerExitKind) { called.Store(true) }, heartbeat, 0)
|
||||
RunWatchdog(ctx, func(WorkerExitKind) { called.Store(true) }, heartbeat, 0, nil)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
@@ -112,7 +112,7 @@ func TestRunWatchdog(t *testing.T) {
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
RunWatchdog(ctx, func(WorkerExitKind) {}, heartbeat, 0)
|
||||
RunWatchdog(ctx, func(WorkerExitKind) {}, heartbeat, 0, nil)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user