From c8c8a528f62daf1790b4dccbe5cd4cd1f901020d Mon Sep 17 00:00:00 2001 From: Jay Date: Wed, 20 May 2026 23:03:06 -0400 Subject: [PATCH] cleanup --- worker.go | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/worker.go b/worker.go index 01ab7af..443fb99 100644 --- a/worker.go +++ b/worker.go @@ -85,29 +85,27 @@ func NewWorker( ctx = component.MustExtend(ctx, "worker") } - var logger *slog.Logger - if handler != nil { - c := component.FromContext(ctx) - logger = slog.New(handler).With(slog.Any("component", c), slog.String("peer_id", id)) - } - - wctx, wcancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(ctx) w := &DefaultWorker{ id: id, sendHeartbeat: make(chan struct{}), - ctx: wctx, - cancel: wcancel, + ctx: ctx, + cancel: cancel, config: config, handler: handler, - logger: logger, processedCount: &atomic.Uint64{}, outgoingCount: &atomic.Uint64{}, restartCount: &atomic.Uint64{}, } + if handler != nil { + comp := component.FromContext(ctx) + w.logger = slog.New(handler).With(slog.Any("component", comp), slog.String("peer_id", id)) + } + return w, nil } @@ -143,7 +141,7 @@ func (w *DefaultWorker) runSession(ctx context.Context, pool PoolPlugin) { spawnDialer := func() { dialCancel = w.spawnDialer(ctx, dialCancel, newConn, pool) } // setup heartbeat - timer, timerC, heartbeat := w.setupHeartbeat() + timer, inactive, heartbeat := w.setupHeartbeat() defer timer.Stop() // main loop @@ -171,7 +169,7 @@ func (w *DefaultWorker) runSession(ctx context.Context, pool PoolPlugin) { case <-w.sendHeartbeat: heartbeat() - case <-timerC(): + case <-inactive(): if w.logger != nil { w.logger.Info("keepalive: no activity observed") } @@ -220,7 +218,7 @@ func (w *DefaultWorker) runSession(ctx context.Context, pool PoolPlugin) { case <-w.sendHeartbeat: heartbeat() - case <-timerC(): + case <-inactive(): if w.logger != nil { w.logger.Info("keepalive: no activity observed") } @@ -254,7 +252,7 @@ func (w *DefaultWorker) runSession(ctx context.Context, pool PoolPlugin) { } func (w *DefaultWorker) setupHeartbeat() ( - timer *time.Timer, timerC func() <-chan time.Time, heartbeat func(), + timer *time.Timer, inactive func() <-chan time.Time, heartbeat func(), ) { if w.config.KeepaliveTimeout > 0 { if w.logger != nil { @@ -280,7 +278,7 @@ func (w *DefaultWorker) setupHeartbeat() ( timer.Reset(w.config.KeepaliveTimeout) } - timerC = func() <-chan time.Time { + inactive = func() <-chan time.Time { if timer == nil { return nil } @@ -359,7 +357,8 @@ func (w *DefaultWorker) Send(data []byte) error { return NewWorkerError(w.id, ErrConnectionUnavailable) } - if err := conn.Send(data); err != nil { + err := conn.Send(data) + if err != nil { return NewWorkerError(w.id, err) }