Phase 7: remove EventTraveller; introduce preparedWrite and excludedEvent

This commit is contained in:
Jay
2026-05-22 16:51:09 -04:00
parent bfd755c1e4
commit 9ff8ee0c9e
2 changed files with 58 additions and 63 deletions
+36 -32
View File
@@ -16,11 +16,14 @@ type WriteOptions struct {
BoltReadBatchSize int
}
type EventTraveller struct {
ID string
type preparedWrite struct {
Event roots.ValidatedEvent
Subgraph *EventSubgraph
Error error
}
type excludedEvent struct {
Event roots.ValidatedEvent
Reason error
}
type WriteResult struct {
@@ -29,7 +32,7 @@ type WriteResult struct {
}
type WriteReport struct {
ExcludedEvents []EventTraveller
ExcludedEvents []excludedEvent
CreatedEventCount int
Neo4jResultSummaries []neo4j.ResultSummary
Duration time.Duration
@@ -56,12 +59,7 @@ func WriteEvents(
return WriteReport{Error: fmt.Errorf("error setting up bolt db: %w", err)}
}
travellers := make([]EventTraveller, 0, len(events))
for _, e := range events {
travellers = append(travellers, EventTraveller{ID: e.ID(), Event: e})
}
queued, excluded := enforcePolicyRules(travellers, boltdb, opts.BoltReadBatchSize)
queued, excluded := enforcePolicyRules(events, boltdb, opts.BoltReadBatchSize)
converted := convertEventsToSubgraphs(queued, opts.Expanders)
writeResult := writeEventsToDatabases(driver, boltdb, converted)
@@ -83,7 +81,7 @@ func setDefaultWriteOptions(opts *WriteOptions) {
}
}
func enforcePolicyRules(in []EventTraveller, boltdb *bolt.DB, batchSize int) (queued []EventTraveller, excluded []EventTraveller) {
func enforcePolicyRules(in []roots.ValidatedEvent, boltdb *bolt.DB, batchSize int) (queued []roots.ValidatedEvent, excluded []excludedEvent) {
for i := 0; i < len(in); i += batchSize {
end := i + batchSize
if end > len(in) {
@@ -92,32 +90,38 @@ func enforcePolicyRules(in []EventTraveller, boltdb *bolt.DB, batchSize int) (qu
batch := in[i:end]
eventIDs := make([]string, 0, len(batch))
for _, traveller := range batch {
eventIDs = append(eventIDs, traveller.ID)
for _, e := range batch {
eventIDs = append(eventIDs, e.ID())
}
existsMap := BatchCheckEventsExist(boltdb, eventIDs)
for _, traveller := range batch {
if existsMap[traveller.ID] {
traveller.Error = fmt.Errorf("skipped: %w", ErrDuplicate)
excluded = append(excluded, traveller)
for _, e := range batch {
if existsMap[e.ID()] {
excluded = append(excluded, excludedEvent{
Event: e,
Reason: fmt.Errorf("skipped: %w", ErrDuplicate),
})
} else {
queued = append(queued, traveller)
queued = append(queued, e)
}
}
}
return queued, excluded
}
func convertEventsToSubgraphs(in []EventTraveller, expanders ExpanderPipeline) []EventTraveller {
for i, traveller := range in {
in[i].Subgraph = EventToSubgraph(traveller.Event, expanders)
func convertEventsToSubgraphs(in []roots.ValidatedEvent, expanders ExpanderPipeline) []preparedWrite {
out := make([]preparedWrite, 0, len(in))
for _, e := range in {
out = append(out, preparedWrite{
Event: e,
Subgraph: EventToSubgraph(e, expanders),
})
}
return in
return out
}
func writeEventsToDatabases(driver neo4j.Driver, boltdb *bolt.DB, travellers []EventTraveller) WriteResult {
func writeEventsToDatabases(driver neo4j.Driver, boltdb *bolt.DB, travellers []preparedWrite) WriteResult {
boltErr := writeEventsToBoltDB(boltdb, travellers)
if boltErr != nil {
return WriteResult{
@@ -132,27 +136,27 @@ func writeEventsToDatabases(driver neo4j.Driver, boltdb *bolt.DB, travellers []E
}
}
func writeEventsToBoltDB(boltdb *bolt.DB, travellers []EventTraveller) error {
func writeEventsToBoltDB(boltdb *bolt.DB, travellers []preparedWrite) error {
var events []EventBlob
for _, traveller := range travellers {
j, err := json.Marshal(traveller.Event)
for _, pw := range travellers {
j, err := json.Marshal(pw.Event)
if err != nil {
return fmt.Errorf("failed to serialize event %s: %w", traveller.ID, err)
return fmt.Errorf("failed to serialize event %s: %w", pw.Event.ID(), err)
}
events = append(events, EventBlob{ID: []byte(traveller.ID), JSON: j})
events = append(events, EventBlob{ID: []byte(pw.Event.ID()), JSON: j})
}
return BatchWriteEvents(boltdb, events)
}
func writeEventsToGraphDB(driver neo4j.Driver, travellers []EventTraveller) ([]neo4j.ResultSummary, error) {
func writeEventsToGraphDB(driver neo4j.Driver, travellers []preparedWrite) ([]neo4j.ResultSummary, error) {
matchKeys := NewSimpleMatchKeys()
batch := NewBatchSubgraph(matchKeys)
for _, traveller := range travellers {
for _, node := range traveller.Subgraph.Nodes() {
for _, pw := range travellers {
for _, node := range pw.Subgraph.Nodes() {
batch.AddNode(node)
}
for _, rel := range traveller.Subgraph.Rels() {
for _, rel := range pw.Subgraph.Rels() {
batch.AddRel(rel)
}
}