From f88982a0b79c5ebf5b7bb96f2912873d376e13a6 Mon Sep 17 00:00:00 2001 From: Jay Date: Wed, 4 Mar 2026 13:57:06 -0500 Subject: [PATCH] Wrote coordinator and stubs for event writer pipeline. --- go.mod | 4 +- go.sum | 4 + graphstore/connect.go | 4 +- write.go | 227 ++++++++++++++++++++++++++++++++++++++++++ write_test.go | 1 + 5 files changed, 237 insertions(+), 3 deletions(-) create mode 100644 write.go create mode 100644 write_test.go diff --git a/go.mod b/go.mod index bc6434a..5506f8d 100644 --- a/go.mod +++ b/go.mod @@ -1,9 +1,10 @@ module git.wisehodl.dev/jay/go-heartwood -go 1.24 +go 1.24.0 require ( git.wisehodl.dev/jay/go-roots v0.3.1 + github.com/boltdb/bolt v1.3.1 github.com/neo4j/neo4j-go-driver/v6 v6.0.0 github.com/stretchr/testify v1.11.1 ) @@ -15,5 +16,6 @@ require ( github.com/decred/dcrd/crypto/blake256 v1.1.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/sys v0.41.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 31ffac5..68374b9 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ git.wisehodl.dev/jay/go-roots v0.3.1 h1:5UiG3g1S3XCkMB+W2rbNZGpl4IiSlRvae2cLHGfjVcA= git.wisehodl.dev/jay/go-roots v0.3.1/go.mod h1:TQXk/V8MRSw4khMlNSINM8dU5/ARR1Wov+kGw0237rQ= +github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= +github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= github.com/btcsuite/btcd/btcec/v2 v2.3.5 h1:dpAlnAwmT1yIBm3exhT1/8iUSD98RDJM5vqJVQDQLiU= github.com/btcsuite/btcd/btcec/v2 v2.3.5/go.mod h1:m22FrOAiuxl/tht9wIqAoGHcbnCCaPWyauO8y2LGGtQ= github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U= @@ -16,6 +18,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= +golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/graphstore/connect.go b/graphstore/connect.go index 29757eb..c3b5720 100644 --- a/graphstore/connect.go +++ b/graphstore/connect.go @@ -6,7 +6,7 @@ import ( ) // ConnectNeo4j creates a new Neo4j driver and verifies its connectivity. -func ConnectNeo4j(ctx context.Context, uri, user, password string) (neo4j.Driver, error) { +func ConnectNeo4j(ctx context.Context, uri, user, password string) (*neo4j.Driver, error) { driver, err := neo4j.NewDriver( uri, neo4j.BasicAuth(user, password, "")) @@ -19,5 +19,5 @@ func ConnectNeo4j(ctx context.Context, uri, user, password string) (neo4j.Driver return nil, err } - return driver, nil + return &driver, nil } diff --git a/write.go b/write.go new file mode 100644 index 0000000..4325f83 --- /dev/null +++ b/write.go @@ -0,0 +1,227 @@ +package heartwood + +import ( + "fmt" + roots "git.wisehodl.dev/jay/go-roots/events" + "github.com/boltdb/bolt" + "github.com/neo4j/neo4j-go-driver/v6/neo4j" + "sync" + // "git.wisehodl.dev/jay/go-heartwood/graph" + "time" +) + +type EventFollower struct { + ID string + JSON string + Event roots.Event + Subgraph EventSubgraph + Error error +} + +type WriteResult struct { + ResultSummaries []neo4j.ResultSummary + Error error +} + +type WriteReport struct { + InvalidEvents []EventFollower + SkippedEvents []EventFollower + CreatedEventCount int + Neo4jResultSummaries []neo4j.ResultSummary + Duration time.Duration + Error error +} + +func WriteEvents( + events []string, + driver *neo4j.Driver, boltdb *bolt.DB, +) (WriteReport, error) { + start := time.Now() + + err := setupBoltDB(boltdb) + if err != nil { + return WriteReport{}, fmt.Errorf("error setting up bolt db: %w", err) + } + + var wg sync.WaitGroup + + // Create Event Followers + jsonChan := make(chan string, 10) + eventChan := make(chan EventFollower, 10) + + wg.Add(1) + go func() { + defer wg.Done() + createEventFollowers(jsonChan, eventChan) + }() + + // Parse Event JSON + parsedChan := make(chan EventFollower, 10) + invalidChan := make(chan EventFollower, 10) + + wg.Add(1) + go func() { + defer wg.Done() + parseEventJSON(eventChan, parsedChan, invalidChan) + }() + + // Collect Invalid Events + collectedInvalidChan := make(chan []EventFollower) + + wg.Add(1) + go func() { + defer wg.Done() + collectEvents(invalidChan, collectedInvalidChan) + }() + + // Enforce Policy Rules + queuedChan := make(chan EventFollower, 10) + skippedChan := make(chan EventFollower, 10) + + wg.Add(1) + go func() { + defer wg.Done() + enforcePolicyRules(driver, boltdb, parsedChan, queuedChan, skippedChan) + }() + + // Collect Skipped Events + collectedSkippedChan := make(chan []EventFollower) + + wg.Add(1) + go func() { + defer wg.Done() + collectEvents(skippedChan, collectedSkippedChan) + }() + + // Convert Events To Subgraphs + convertedChan := make(chan EventFollower, 10) + + wg.Add(1) + go func() { + defer wg.Done() + convertEventsToSubgraphs(queuedChan, convertedChan) + }() + + // Write Events To Databases + writeResultChan := make(chan WriteResult) + + wg.Add(1) + go func() { + defer wg.Done() + writeEventsToDatabases( + driver, boltdb, + convertedChan, writeResultChan) + }() + + // Send event jsons into pipeline + go func() { + for _, json := range events { + jsonChan <- json + } + close(jsonChan) + }() + + // Wait for pipeline to complete + wg.Wait() + + // Collect results + invalid := <-collectedInvalidChan + skipped := <-collectedSkippedChan + writeResult := <-writeResultChan + + return WriteReport{ + InvalidEvents: invalid, + SkippedEvents: skipped, + CreatedEventCount: len(events) - len(invalid) - len(skipped), + Neo4jResultSummaries: writeResult.ResultSummaries, + Duration: time.Since(start), + Error: writeResult.Error, + }, writeResult.Error +} + +func setupBoltDB(boltdb *bolt.DB) error + +func createEventFollowers(jsonChan chan string, eventChan chan EventFollower) + +func parseEventJSON(inChan, parsedChan, invalidChan chan EventFollower) + +func enforcePolicyRules( + driver *neo4j.Driver, boltdb *bolt.DB, + inChan, queuedChan, skippedChan chan EventFollower) + +func convertEventsToSubgraphs(inChan, convertedChan chan EventFollower) + +func writeEventsToDatabases( + driver *neo4j.Driver, boltdb *bolt.DB, + inChan chan EventFollower, + resultChan chan WriteResult, +) { + var wg sync.WaitGroup + + kvEventChan := make(chan EventFollower, 10) + graphEventChan := make(chan EventFollower, 10) + + kvWriteDone := make(chan struct{}) + + kvErrorChan := make(chan error) + graphResultChan := make(chan WriteResult) + + wg.Add(2) + go func() { + defer wg.Done() + writeEventsToKVStore( + boltdb, + kvEventChan, kvErrorChan) + }() + go func() { + defer wg.Done() + writeEventsToGraphStore( + driver, + graphEventChan, kvWriteDone, graphResultChan) + }() + + // Fan out events to both writers + for follower := range inChan { + kvEventChan <- follower + graphEventChan <- follower + } + close(kvEventChan) + close(graphEventChan) + + wg.Wait() + + kvError := <-kvErrorChan + if kvError != nil { + close(kvWriteDone) // signal abort + resultChan <- WriteResult{Error: kvError} + return + } + + // Signal graph writer to proceed + kvWriteDone <- struct{}{} + close(kvWriteDone) + + graphResult := <-graphResultChan + if graphResult.Error != nil { + resultChan <- WriteResult{Error: graphResult.Error} + return + } + + resultChan <- graphResult + +} + +func writeEventsToKVStore( + boltdb *bolt.DB, + inChan chan EventFollower, + resultChan chan error, +) + +func writeEventsToGraphStore( + driver *neo4j.Driver, + inChan chan EventFollower, + start chan struct{}, + resultChan chan WriteResult, +) + +func collectEvents(inChan chan EventFollower, resultChan chan []EventFollower) diff --git a/write_test.go b/write_test.go new file mode 100644 index 0000000..2cb93b5 --- /dev/null +++ b/write_test.go @@ -0,0 +1 @@ +package heartwood