From a8fb4789713d1169576374368c7ebf77e6f3f188 Mon Sep 17 00:00:00 2001 From: Jay Date: Fri, 17 Apr 2026 21:46:22 -0400 Subject: [PATCH] Wrote health monitor. --- initiator/worker.go | 44 ++++++++++++++++++++++-- initiator/worker_test.go | 73 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+), 2 deletions(-) diff --git a/initiator/worker.go b/initiator/worker.go index 0aed029..aba53a2 100644 --- a/initiator/worker.go +++ b/initiator/worker.go @@ -58,7 +58,7 @@ func (w *Worker) Start( func (w *Worker) runReader( conn *transport.Connection, messages chan<- receivedMessage, - heartbeat chan<- time.Time, + heartbeat chan<- struct{}, reconnect chan<- struct{}, newConn <-chan *transport.Connection, stop <-chan struct{}, @@ -114,10 +114,50 @@ func (w *Worker) runForwarder( } func (w *Worker) runHealthMonitor( - heartbeat <-chan time.Time, + heartbeat <-chan struct{}, + reconnect chan<- struct{}, stop <-chan struct{}, poolDone <-chan struct{}, ) { + // disable if idle timeout is disabled + if w.config.IdleTimeout <= 0 { + // wait for stop signal and exit + select { + case <-stop: + case <-poolDone: + } + return + } + + timer := time.NewTimer(w.config.IdleTimeout) + defer timer.Stop() + + for { + select { + case <-stop: + return + case <-poolDone: + return + case <-heartbeat: + // drain the timer channel and reset + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + timer.Reset(w.config.IdleTimeout) + // timer completed + case <-timer.C: + // initiate reconnect, then reset the timer + // multiple reconnect signals during reconnection is idempotent + select { + case reconnect <- struct{}{}: + default: + } + timer.Reset(w.config.IdleTimeout) + } + } } func (w *Worker) runReconnector( diff --git a/initiator/worker_test.go b/initiator/worker_test.go index bb477e1..aa213dc 100644 --- a/initiator/worker_test.go +++ b/initiator/worker_test.go @@ -104,3 +104,76 @@ func TestRunForwarder(t *testing.T) { }, honeybeetest.TestTimeout, honeybeetest.TestTick) }) } + +func TestRunHealthMonitor(t *testing.T) { + t.Run("heartbeat resets timer, no reconnect fired", func(t *testing.T) { + heartbeat := make(chan struct{}, 3) + reconnect := make(chan struct{}, 1) + stop := make(chan struct{}) + defer close(stop) + + w := &Worker{config: &WorkerConfig{IdleTimeout: 100 * time.Millisecond}} + go w.runHealthMonitor(heartbeat, reconnect, stop, nil) + + // send heartbeats faster than the timeout + for i := 0; i < 5; i++ { + time.Sleep(30 * time.Millisecond) + heartbeat <- struct{}{} + } + + // because the timer is being reset, reconnect should never occur + assert.Never(t, func() bool { + select { + case <-reconnect: + return true + default: + return false + } + }, honeybeetest.NegativeTestTimeout, honeybeetest.TestTick) + }) + + t.Run("idle timeout fires reconnect", func(t *testing.T) { + heartbeat := make(chan struct{}) + reconnect := make(chan struct{}, 1) + stop := make(chan struct{}) + defer close(stop) + + w := &Worker{config: &WorkerConfig{IdleTimeout: 20 * time.Millisecond}} + go w.runHealthMonitor(heartbeat, reconnect, stop, nil) + + // send no heartbeats, wait for timeout and reconnect signal + assert.Eventually(t, func() bool { + select { + case <-reconnect: + return true + default: + return false + } + }, honeybeetest.TestTimeout, honeybeetest.TestTick) + }) + + t.Run("exits on stop", func(t *testing.T) { + heartbeat := make(chan struct{}) + reconnect := make(chan struct{}, 1) + stop := make(chan struct{}) + + w := &Worker{config: &WorkerConfig{IdleTimeout: 20 * time.Second}} + done := make(chan struct{}) + go func() { + w.runHealthMonitor(heartbeat, reconnect, stop, nil) + close(done) + }() + + // send stop signal + close(stop) + assert.Eventually(t, func() bool { + select { + case <-done: + return true + default: + return false + } + }, honeybeetest.TestTimeout, honeybeetest.TestTick) + }) + +}