diff --git a/embassy.go b/embassy.go index 21ca5c8..8d74dba 100644 --- a/embassy.go +++ b/embassy.go @@ -380,6 +380,10 @@ func (e *Envoy) Context() context.Context { return e.ctx } +func (e *Envoy) PeerID() string { + return e.url +} + func (e *Envoy) Handler() slog.Handler { return e.handler } diff --git a/request.go b/request.go index a4435a7..55b3196 100644 --- a/request.go +++ b/request.go @@ -29,6 +29,47 @@ type ReqClosed struct { Data string } +// ---------------------------------------------------------------------------- +// Observables +// ---------------------------------------------------------------------------- + +type ReqDispatched struct { + SubID string + DispatchedAt time.Time +} + +type ReqSendFailed struct { + SubID string + Err error + At time.Time +} + +type FirstEventReceived struct { + SubID string + ReceivedAt time.Time +} + +type StreamEOSEReceived struct { + SubID string + ReceivedAt time.Time +} + +type QueryEOSEReceived struct { + SubID string + ReceivedAt time.Time +} + +type MissedEOSE struct { + SubID string + At time.Time +} + +type ClosedReceived struct { + SubID string + ReceivedAt time.Time + Message string +} + // ---------------------------------------------------------------------------- // ID Generation // ---------------------------------------------------------------------------- @@ -90,8 +131,9 @@ type request struct { id string filters [][]byte - isQuery bool - active bool + isQuery bool + active bool + firstEventSeen bool buffer chan ReqEvent events chan ReqEvent @@ -217,6 +259,9 @@ func (m *RequestManager) Query( } return result, &cl, nil case <-ctx.Done(): + // query timed out + m.observer.Record(m.envoy.PeerID(), + MissedEOSE{SubID: id, At: time.Now()}) m.Cancel(id) return result, nil, nil } @@ -267,7 +312,16 @@ func (m *RequestManager) Close() { func (m *RequestManager) activate(req *request) { req.active = true - go m.envoy.Send(envelope.EncloseReq(req.id, req.filters)) + go func() { + err := m.envoy.Send(envelope.EncloseReq(req.id, req.filters)) + if err != nil { + m.observer.Record(m.envoy.PeerID(), + ReqSendFailed{SubID: req.id, Err: err, At: time.Now()}) + return + } + m.observer.Record(m.envoy.PeerID(), + ReqDispatched{SubID: req.id, DispatchedAt: time.Now()}) + }() } func (m *RequestManager) deregister(req *request) { @@ -345,12 +399,21 @@ func (m *RequestManager) routeEvent(msg InboxMessage) { if err != nil { return } - m.mu.RLock() + m.mu.Lock() req, ok := m.reqs[subID] - m.mu.RUnlock() if !ok { + m.mu.Unlock() return } + if !req.firstEventSeen { + req.firstEventSeen = true + reqSubID := req.id + receivedAt := msg.ReceivedAt + go m.observer.Record(m.envoy.PeerID(), + FirstEventReceived{SubID: reqSubID, ReceivedAt: receivedAt}) + } + m.mu.Unlock() + req.buffer <- ReqEvent{ PeerID: msg.ID, ReceivedAt: msg.ReceivedAt, @@ -372,6 +435,15 @@ func (m *RequestManager) routeEOSE(msg InboxMessage) { if !ok { return } + reqSubID := req.id + receivedAt := msg.ReceivedAt + if req.isQuery { + go m.observer.Record(m.envoy.PeerID(), + QueryEOSEReceived{SubID: reqSubID, ReceivedAt: receivedAt}) + } else { + go m.observer.Record(m.envoy.PeerID(), + StreamEOSEReceived{SubID: reqSubID, ReceivedAt: receivedAt}) + } if req.active && req.isQuery { // manually cleanup query state // specifically, do not close req.closed or events can be missed @@ -393,6 +465,14 @@ func (m *RequestManager) routeClosed(msg InboxMessage) { if !ok { return } + reqSubID := req.id + receivedAt := msg.ReceivedAt + go m.observer.Record(m.envoy.PeerID(), + ClosedReceived{ + SubID: reqSubID, + ReceivedAt: receivedAt, + Message: message, + }) req.closedOnce.Do(func() { req.closed <- ReqClosed{ PeerID: msg.ID,