feat: replace monotonic counter with base32 random suffix; default label REQ
This commit is contained in:
+19
-5
@@ -2,12 +2,13 @@ package prism
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/rand"
|
||||||
|
"encoding/base32"
|
||||||
"fmt"
|
"fmt"
|
||||||
"git.wisehodl.dev/jay/go-mana-component"
|
"git.wisehodl.dev/jay/go-mana-component"
|
||||||
"git.wisehodl.dev/jay/go-roots-ws"
|
"git.wisehodl.dev/jay/go-roots-ws"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -28,8 +29,7 @@ type ReqClosed struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type RequestManager struct {
|
type RequestManager struct {
|
||||||
reqs map[string]*request
|
reqs map[string]*request
|
||||||
counter atomic.Uint64
|
|
||||||
|
|
||||||
envoy *Envoy
|
envoy *Envoy
|
||||||
events <-chan OutboundPoolEvent
|
events <-chan OutboundPoolEvent
|
||||||
@@ -43,6 +43,20 @@ type RequestManager struct {
|
|||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ----------------------------------------------------------------------------
|
||||||
|
// ID Generation
|
||||||
|
// ----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
var b32 = base32.StdEncoding.WithPadding(base32.NoPadding)
|
||||||
|
|
||||||
|
func generateID() string {
|
||||||
|
b := make([]byte, 5)
|
||||||
|
if _, err := rand.Read(b); err != nil {
|
||||||
|
panic(fmt.Sprintf("generateID: %v", err))
|
||||||
|
}
|
||||||
|
return b32.EncodeToString(b)
|
||||||
|
}
|
||||||
|
|
||||||
// ----------------------------------------------------------------------------
|
// ----------------------------------------------------------------------------
|
||||||
// Options
|
// Options
|
||||||
// ----------------------------------------------------------------------------
|
// ----------------------------------------------------------------------------
|
||||||
@@ -137,9 +151,9 @@ func (m *RequestManager) newStream(
|
|||||||
} else {
|
} else {
|
||||||
label := o.label
|
label := o.label
|
||||||
if label == "" {
|
if label == "" {
|
||||||
label = "req"
|
label = "REQ"
|
||||||
}
|
}
|
||||||
id = fmt.Sprintf("%s:%d", label, m.counter.Add(1))
|
id = fmt.Sprintf("%s:%s", label, generateID())
|
||||||
}
|
}
|
||||||
buffer := make(chan ReqEvent, 64)
|
buffer := make(chan ReqEvent, 64)
|
||||||
closed := make(chan ReqClosed, 1)
|
closed := make(chan ReqClosed, 1)
|
||||||
|
|||||||
+7
-18
@@ -8,7 +8,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestRequestManager_Options(t *testing.T) {
|
func TestRequestManager_Options(t *testing.T) {
|
||||||
t.Run("default id uses req label and monotonic counter", func(t *testing.T) {
|
t.Run("default id uses REQ label and base32 suffix", func(t *testing.T) {
|
||||||
_, envoy := newMockEnvoy(t)
|
_, envoy := newMockEnvoy(t)
|
||||||
m := NewRequestManager(envoy)
|
m := NewRequestManager(envoy)
|
||||||
t.Cleanup(func() { m.Close() })
|
t.Cleanup(func() { m.Close() })
|
||||||
@@ -19,8 +19,9 @@ func TestRequestManager_Options(t *testing.T) {
|
|||||||
idB, _, _, err := m.Stream(filters)
|
idB, _, _, err := m.Stream(filters)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
assert.Equal(t, "req:1", idA)
|
assert.Regexp(t, `^REQ:[A-Z2-7]{8}$`, idA)
|
||||||
assert.Equal(t, "req:2", idB)
|
assert.Regexp(t, `^REQ:[A-Z2-7]{8}$`, idB)
|
||||||
|
assert.NotEqual(t, idA, idB)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("WithLabel sets prefix", func(t *testing.T) {
|
t.Run("WithLabel sets prefix", func(t *testing.T) {
|
||||||
@@ -34,8 +35,9 @@ func TestRequestManager_Options(t *testing.T) {
|
|||||||
idB, _, _, err := m.Stream(filters, WithLabel("profile"))
|
idB, _, _, err := m.Stream(filters, WithLabel("profile"))
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
assert.Equal(t, "feed:1", idA)
|
assert.Regexp(t, `^feed:[A-Z2-7]{8}$`, idA)
|
||||||
assert.Equal(t, "profile:2", idB)
|
assert.Regexp(t, `^profile:[A-Z2-7]{8}$`, idB)
|
||||||
|
assert.NotEqual(t, idA, idB)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("WithID uses caller id", func(t *testing.T) {
|
t.Run("WithID uses caller id", func(t *testing.T) {
|
||||||
@@ -72,19 +74,6 @@ func TestRequestManager_Options(t *testing.T) {
|
|||||||
_, _, _, err = m.Stream(filters, WithID("dup"))
|
_, _, _, err = m.Stream(filters, WithID("dup"))
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("WithID does not advance counter", func(t *testing.T) {
|
|
||||||
_, envoy := newMockEnvoy(t)
|
|
||||||
m := NewRequestManager(envoy)
|
|
||||||
t.Cleanup(func() { m.Close() })
|
|
||||||
|
|
||||||
filters := [][]byte{[]byte(`{}`)}
|
|
||||||
_, _, _, err := m.Stream(filters, WithID("explicit"))
|
|
||||||
assert.NoError(t, err)
|
|
||||||
id, _, _, err := m.Stream(filters)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.Equal(t, "req:1", id)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRequestManager_Stream(t *testing.T) {
|
func TestRequestManager_Stream(t *testing.T) {
|
||||||
|
|||||||
Reference in New Issue
Block a user