From 9ac57641c08ee9d239e217d40b9d6c7be9409cd2 Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Fri, 27 Jun 2025 10:53:50 +0200 Subject: [PATCH 1/2] refactor: optimize deduplication logic --- write.go | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/write.go b/write.go index b78c9a7..cd183ad 100644 --- a/write.go +++ b/write.go @@ -4,6 +4,7 @@ import ( "fmt" "time" + "github.com/ethereum/go-ethereum/crypto" "github.com/nats-io/nats.go" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -44,28 +45,29 @@ func WriteMessages(channel chan *vaa.VAA, natsURL string, natsStream string, bat Help: "Latency (milliseconds) of batch acks from the stream server", }) - for message := range channel { - bytes, err := message.Marshal() + for vaa := range channel { + vaaBytes, err := vaa.Marshal() if err != nil { - log.Panic().Err(err).Str("id", message.MessageID()).Msg("Failed to marshal VAA") + log.Panic().Err(err).Str("id", vaa.MessageID()).Msg("Failed to marshal VAA") } - // Use the signing digest of the VAA as the NATS message ID to prevent - // DoS against valid messages by taking advantage of stream - // deduplication. - signingDigest := message.SigningDigest().Hex() + vaaHash := crypto.Keccak256Hash(vaaBytes).Hex() _, err = js.PublishMsgAsync(&nats.Msg{ Subject: natsStream, - Header: nats.Header{"Nats-Msg-Id": []string{signingDigest}}, - Data: bytes, + // This header allows dedup using the vaa hash + Header: nats.Header{"Nats-Msg-Id": []string{vaaHash}}, + Data: vaaBytes, }) + // Signing digest is the hash of the payload of the VAA + signingDigest := vaa.SigningDigest().Hex() + if err != nil { - log.Error().Str("id", message.MessageID()).Str("signing_digest", signingDigest).Err(err).Msg("Failed to publish message") + log.Error().Str("id", vaa.MessageID()).Str("signing_digest", signingDigest).Err(err).Msg("Failed to publish message") } else { - log.Debug().Str("id", message.MessageID()).Str("signing_digest", signingDigest).Msg("Published message") + log.Debug().Str("id", vaa.MessageID()).Str("signing_digest", signingDigest).Msg("Published message") } batchCounter++ From 2c3e9ea93ef50a700d264256bbec1f829130b485 Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Fri, 27 Jun 2025 11:45:53 +0200 Subject: [PATCH 2/2] fix: update precommit action version --- .github/workflows/ci-lint-and-format.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci-lint-and-format.yaml b/.github/workflows/ci-lint-and-format.yaml index c1f45ac..f2e9b03 100644 --- a/.github/workflows/ci-lint-and-format.yaml +++ b/.github/workflows/ci-lint-and-format.yaml @@ -19,4 +19,4 @@ jobs: with: version: v1.54 args: --timeout=30m - - uses: pre-commit/action@v2.0.3 + - uses: pre-commit/action@v3.0.0