Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| b4f79f68c8 | |||
| 0c3564442f | |||
| 5ba60e28ad |
+2
-2
@@ -355,12 +355,12 @@ func newEnvoy(
|
|||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
observer: observer,
|
observer: observer,
|
||||||
handler: handler,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if handler != nil {
|
if handler != nil {
|
||||||
comp := component.FromContext(ctx)
|
comp := component.FromContext(ctx)
|
||||||
e.logger = slog.New(handler).With(slog.Any("component", comp)).With("peer", url)
|
e.handler = handler.WithAttrs([]slog.Attr{slog.String("peer", url)})
|
||||||
|
e.logger = slog.New(e.handler).With(slog.Any("component", comp))
|
||||||
}
|
}
|
||||||
|
|
||||||
e.wg.Add(2)
|
e.wg.Add(2)
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ module git.wisehodl.dev/jay/go-mana-prism
|
|||||||
go 1.25.0
|
go 1.25.0
|
||||||
|
|
||||||
require (
|
require (
|
||||||
git.wisehodl.dev/jay/go-honeybee v0.4.0
|
git.wisehodl.dev/jay/go-honeybee v0.5.0
|
||||||
git.wisehodl.dev/jay/go-mana-component v0.1.0
|
git.wisehodl.dev/jay/go-mana-component v0.1.0
|
||||||
git.wisehodl.dev/jay/go-roots-ws v0.1.0
|
git.wisehodl.dev/jay/go-roots-ws v0.1.0
|
||||||
github.com/stretchr/testify v1.11.1
|
github.com/stretchr/testify v1.11.1
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
git.wisehodl.dev/jay/go-honeybee v0.4.0 h1:+aA2GbCQzgB2mzx0fcEmg319T2KmdHKsSxF8aXAbG7E=
|
git.wisehodl.dev/jay/go-honeybee v0.5.0 h1:tAhXMLJnxMMhWJstA1f3OYVsdQf0rstl5/AaJVzuCWk=
|
||||||
git.wisehodl.dev/jay/go-honeybee v0.4.0/go.mod h1:91H66sH5t1BHERGMihybeL1CDMu6vJgGuxjkUxfaqi4=
|
git.wisehodl.dev/jay/go-honeybee v0.5.0/go.mod h1:91H66sH5t1BHERGMihybeL1CDMu6vJgGuxjkUxfaqi4=
|
||||||
git.wisehodl.dev/jay/go-mana-component v0.1.0 h1:wWYN5MzC9Hq3tEt4z7FjrwNuQz3rZY3RWAmgmNE8EZE=
|
git.wisehodl.dev/jay/go-mana-component v0.1.0 h1:wWYN5MzC9Hq3tEt4z7FjrwNuQz3rZY3RWAmgmNE8EZE=
|
||||||
git.wisehodl.dev/jay/go-mana-component v0.1.0/go.mod h1:r2ZaTjKzwV5JJfC5boikxtjAKusPrzlJU/7qul0EUqA=
|
git.wisehodl.dev/jay/go-mana-component v0.1.0/go.mod h1:r2ZaTjKzwV5JJfC5boikxtjAKusPrzlJU/7qul0EUqA=
|
||||||
git.wisehodl.dev/jay/go-roots-ws v0.1.0 h1:p1veCkpOmL26N//Qz7ekJOYj1Ck30ai4OKq9dxLjodk=
|
git.wisehodl.dev/jay/go-roots-ws v0.1.0 h1:p1veCkpOmL26N//Qz7ekJOYj1Ck30ai4OKq9dxLjodk=
|
||||||
|
|||||||
+36
-2
@@ -262,6 +262,9 @@ func (m *RequestManager) Query(
|
|||||||
// query timed out
|
// query timed out
|
||||||
m.observer.Record(m.envoy.PeerID(),
|
m.observer.Record(m.envoy.PeerID(),
|
||||||
MissedEOSE{SubID: id, At: time.Now()})
|
MissedEOSE{SubID: id, At: time.Now()})
|
||||||
|
if m.logger != nil {
|
||||||
|
m.logger.Warn("missed eose", "req", id)
|
||||||
|
}
|
||||||
m.Cancel(id)
|
m.Cancel(id)
|
||||||
return result, nil, nil
|
return result, nil, nil
|
||||||
}
|
}
|
||||||
@@ -278,7 +281,18 @@ func (m *RequestManager) Cancel(id string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if req.active {
|
if req.active {
|
||||||
go m.envoy.Send(envelope.EncloseClose(id))
|
go func() {
|
||||||
|
err := m.envoy.Send(envelope.EncloseClose(id))
|
||||||
|
if err != nil {
|
||||||
|
if m.logger != nil {
|
||||||
|
m.logger.Warn("close send failed", "req", req.id, "error", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if m.logger != nil {
|
||||||
|
m.logger.Debug("close sent", "req", req.id)
|
||||||
|
}
|
||||||
|
}()
|
||||||
req.active = false
|
req.active = false
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -317,10 +331,16 @@ func (m *RequestManager) activate(req *request) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
m.observer.Record(m.envoy.PeerID(),
|
m.observer.Record(m.envoy.PeerID(),
|
||||||
ReqSendFailed{SubID: req.id, Err: err, At: time.Now()})
|
ReqSendFailed{SubID: req.id, Err: err, At: time.Now()})
|
||||||
|
if m.logger != nil {
|
||||||
|
m.logger.Warn("req send failed", "req", req.id, "error", err)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
m.observer.Record(m.envoy.PeerID(),
|
m.observer.Record(m.envoy.PeerID(),
|
||||||
ReqDispatched{SubID: req.id, DispatchedAt: time.Now()})
|
ReqDispatched{SubID: req.id, DispatchedAt: time.Now()})
|
||||||
|
if m.logger != nil {
|
||||||
|
m.logger.Debug("req sent", "req", req.id)
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -450,7 +470,18 @@ func (m *RequestManager) routeEOSE(msg InboxMessage) {
|
|||||||
req.active = false
|
req.active = false
|
||||||
close(req.buffer)
|
close(req.buffer)
|
||||||
delete(m.reqs, req.id)
|
delete(m.reqs, req.id)
|
||||||
go m.envoy.Send(envelope.EncloseClose(subID))
|
go func() {
|
||||||
|
err := m.envoy.Send(envelope.EncloseClose(subID))
|
||||||
|
if err != nil {
|
||||||
|
if m.logger != nil {
|
||||||
|
m.logger.Warn("close send failed", "req", req.id, "error", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if m.logger != nil {
|
||||||
|
m.logger.Debug("close sent", "req", req.id)
|
||||||
|
}
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -473,6 +504,9 @@ func (m *RequestManager) routeClosed(msg InboxMessage) {
|
|||||||
ReceivedAt: receivedAt,
|
ReceivedAt: receivedAt,
|
||||||
Message: message,
|
Message: message,
|
||||||
})
|
})
|
||||||
|
if m.logger != nil {
|
||||||
|
m.logger.Warn("req closed by peer", "req", req.id, "message", message)
|
||||||
|
}
|
||||||
req.closedOnce.Do(func() {
|
req.closedOnce.Do(func() {
|
||||||
req.closed <- ReqClosed{
|
req.closed <- ReqClosed{
|
||||||
PeerID: msg.ID,
|
PeerID: msg.ID,
|
||||||
|
|||||||
Reference in New Issue
Block a user