Wrote health monitor.
This commit is contained in:
@@ -58,7 +58,7 @@ func (w *Worker) Start(
|
||||
func (w *Worker) runReader(
|
||||
conn *transport.Connection,
|
||||
messages chan<- receivedMessage,
|
||||
heartbeat chan<- time.Time,
|
||||
heartbeat chan<- struct{},
|
||||
reconnect chan<- struct{},
|
||||
newConn <-chan *transport.Connection,
|
||||
stop <-chan struct{},
|
||||
@@ -114,10 +114,50 @@ func (w *Worker) runForwarder(
|
||||
}
|
||||
|
||||
func (w *Worker) runHealthMonitor(
|
||||
heartbeat <-chan time.Time,
|
||||
heartbeat <-chan struct{},
|
||||
reconnect chan<- struct{},
|
||||
stop <-chan struct{},
|
||||
poolDone <-chan struct{},
|
||||
) {
|
||||
// disable if idle timeout is disabled
|
||||
if w.config.IdleTimeout <= 0 {
|
||||
// wait for stop signal and exit
|
||||
select {
|
||||
case <-stop:
|
||||
case <-poolDone:
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
timer := time.NewTimer(w.config.IdleTimeout)
|
||||
defer timer.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-stop:
|
||||
return
|
||||
case <-poolDone:
|
||||
return
|
||||
case <-heartbeat:
|
||||
// drain the timer channel and reset
|
||||
if !timer.Stop() {
|
||||
select {
|
||||
case <-timer.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
timer.Reset(w.config.IdleTimeout)
|
||||
// timer completed
|
||||
case <-timer.C:
|
||||
// initiate reconnect, then reset the timer
|
||||
// multiple reconnect signals during reconnection is idempotent
|
||||
select {
|
||||
case reconnect <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
timer.Reset(w.config.IdleTimeout)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) runReconnector(
|
||||
|
||||
@@ -104,3 +104,76 @@ func TestRunForwarder(t *testing.T) {
|
||||
}, honeybeetest.TestTimeout, honeybeetest.TestTick)
|
||||
})
|
||||
}
|
||||
|
||||
func TestRunHealthMonitor(t *testing.T) {
|
||||
t.Run("heartbeat resets timer, no reconnect fired", func(t *testing.T) {
|
||||
heartbeat := make(chan struct{}, 3)
|
||||
reconnect := make(chan struct{}, 1)
|
||||
stop := make(chan struct{})
|
||||
defer close(stop)
|
||||
|
||||
w := &Worker{config: &WorkerConfig{IdleTimeout: 100 * time.Millisecond}}
|
||||
go w.runHealthMonitor(heartbeat, reconnect, stop, nil)
|
||||
|
||||
// send heartbeats faster than the timeout
|
||||
for i := 0; i < 5; i++ {
|
||||
time.Sleep(30 * time.Millisecond)
|
||||
heartbeat <- struct{}{}
|
||||
}
|
||||
|
||||
// because the timer is being reset, reconnect should never occur
|
||||
assert.Never(t, func() bool {
|
||||
select {
|
||||
case <-reconnect:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}, honeybeetest.NegativeTestTimeout, honeybeetest.TestTick)
|
||||
})
|
||||
|
||||
t.Run("idle timeout fires reconnect", func(t *testing.T) {
|
||||
heartbeat := make(chan struct{})
|
||||
reconnect := make(chan struct{}, 1)
|
||||
stop := make(chan struct{})
|
||||
defer close(stop)
|
||||
|
||||
w := &Worker{config: &WorkerConfig{IdleTimeout: 20 * time.Millisecond}}
|
||||
go w.runHealthMonitor(heartbeat, reconnect, stop, nil)
|
||||
|
||||
// send no heartbeats, wait for timeout and reconnect signal
|
||||
assert.Eventually(t, func() bool {
|
||||
select {
|
||||
case <-reconnect:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}, honeybeetest.TestTimeout, honeybeetest.TestTick)
|
||||
})
|
||||
|
||||
t.Run("exits on stop", func(t *testing.T) {
|
||||
heartbeat := make(chan struct{})
|
||||
reconnect := make(chan struct{}, 1)
|
||||
stop := make(chan struct{})
|
||||
|
||||
w := &Worker{config: &WorkerConfig{IdleTimeout: 20 * time.Second}}
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
w.runHealthMonitor(heartbeat, reconnect, stop, nil)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
// send stop signal
|
||||
close(stop)
|
||||
assert.Eventually(t, func() bool {
|
||||
select {
|
||||
case <-done:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}, honeybeetest.TestTimeout, honeybeetest.TestTick)
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user