This commit is contained in:
Jay
2026-05-20 23:03:06 -04:00
parent f1afca7921
commit c8c8a528f6
+15 -16
View File
@@ -85,29 +85,27 @@ func NewWorker(
ctx = component.MustExtend(ctx, "worker") ctx = component.MustExtend(ctx, "worker")
} }
var logger *slog.Logger ctx, cancel := context.WithCancel(ctx)
if handler != nil {
c := component.FromContext(ctx)
logger = slog.New(handler).With(slog.Any("component", c), slog.String("peer_id", id))
}
wctx, wcancel := context.WithCancel(ctx)
w := &DefaultWorker{ w := &DefaultWorker{
id: id, id: id,
sendHeartbeat: make(chan struct{}), sendHeartbeat: make(chan struct{}),
ctx: wctx, ctx: ctx,
cancel: wcancel, cancel: cancel,
config: config, config: config,
handler: handler, handler: handler,
logger: logger,
processedCount: &atomic.Uint64{}, processedCount: &atomic.Uint64{},
outgoingCount: &atomic.Uint64{}, outgoingCount: &atomic.Uint64{},
restartCount: &atomic.Uint64{}, restartCount: &atomic.Uint64{},
} }
if handler != nil {
comp := component.FromContext(ctx)
w.logger = slog.New(handler).With(slog.Any("component", comp), slog.String("peer_id", id))
}
return w, nil return w, nil
} }
@@ -143,7 +141,7 @@ func (w *DefaultWorker) runSession(ctx context.Context, pool PoolPlugin) {
spawnDialer := func() { dialCancel = w.spawnDialer(ctx, dialCancel, newConn, pool) } spawnDialer := func() { dialCancel = w.spawnDialer(ctx, dialCancel, newConn, pool) }
// setup heartbeat // setup heartbeat
timer, timerC, heartbeat := w.setupHeartbeat() timer, inactive, heartbeat := w.setupHeartbeat()
defer timer.Stop() defer timer.Stop()
// main loop // main loop
@@ -171,7 +169,7 @@ func (w *DefaultWorker) runSession(ctx context.Context, pool PoolPlugin) {
case <-w.sendHeartbeat: case <-w.sendHeartbeat:
heartbeat() heartbeat()
case <-timerC(): case <-inactive():
if w.logger != nil { if w.logger != nil {
w.logger.Info("keepalive: no activity observed") w.logger.Info("keepalive: no activity observed")
} }
@@ -220,7 +218,7 @@ func (w *DefaultWorker) runSession(ctx context.Context, pool PoolPlugin) {
case <-w.sendHeartbeat: case <-w.sendHeartbeat:
heartbeat() heartbeat()
case <-timerC(): case <-inactive():
if w.logger != nil { if w.logger != nil {
w.logger.Info("keepalive: no activity observed") w.logger.Info("keepalive: no activity observed")
} }
@@ -254,7 +252,7 @@ func (w *DefaultWorker) runSession(ctx context.Context, pool PoolPlugin) {
} }
func (w *DefaultWorker) setupHeartbeat() ( func (w *DefaultWorker) setupHeartbeat() (
timer *time.Timer, timerC func() <-chan time.Time, heartbeat func(), timer *time.Timer, inactive func() <-chan time.Time, heartbeat func(),
) { ) {
if w.config.KeepaliveTimeout > 0 { if w.config.KeepaliveTimeout > 0 {
if w.logger != nil { if w.logger != nil {
@@ -280,7 +278,7 @@ func (w *DefaultWorker) setupHeartbeat() (
timer.Reset(w.config.KeepaliveTimeout) timer.Reset(w.config.KeepaliveTimeout)
} }
timerC = func() <-chan time.Time { inactive = func() <-chan time.Time {
if timer == nil { if timer == nil {
return nil return nil
} }
@@ -359,7 +357,8 @@ func (w *DefaultWorker) Send(data []byte) error {
return NewWorkerError(w.id, ErrConnectionUnavailable) return NewWorkerError(w.id, ErrConnectionUnavailable)
} }
if err := conn.Send(data); err != nil { err := conn.Send(data)
if err != nil {
return NewWorkerError(w.id, err) return NewWorkerError(w.id, err)
} }