Skip to content

Commit 137dc33

Browse files
committed
Finish README and initial impl
Missing tests
1 parent b172fdd commit 137dc33

File tree

6 files changed

+320
-1638
lines changed

6 files changed

+320
-1638
lines changed

execute/tokendata/cctp/v2/README.md

Lines changed: 19 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -143,68 +143,22 @@ cctpV2Message2 with attestation2 and DepositHashX
143143
then `attestation1` can be assigned to **either** `tokenPayloadA` or `tokenPayloadB`, and `attestation2` assigned to the
144144
remaining `tokenPayload`.
145145

146-
147-
148-
??? Random thoughts / WIP below ???``
149-
Need: collection of `SourceTokenDataPayloadV2` and `CCTPv2Messages`
150-
- SeqNum maps to SourceTokenDataPayloadV2, but TxHash maps to CCTPv2Messages
151-
`map[TxHash][]SeqNum`?
152-
```
153-
getTxHashes(messages map[cciptypes.SeqNum]cciptypes.Message) map[TxHash][]SeqNum
154-
func (txHashes []TxHash, sourceDomainID, uint32) map[TxHash]CCTPv2Messages
155-
map[TxHash]CCTPv2Messages + map[TxHash][]SeqNum + map[cciptypes.SeqNum]map[int]SourceTokenDataPayloadV2
156-
func
157-
158-
func assignAttestationsToV2TokenPayloads(
159-
messages map[cciptypes.SeqNum]Message,
160-
txHashToSeqNums map[TxHash][]SeqNum,
161-
sourceDomainID uint32,
162-
v2TokenPayloads map[cciptypes.SeqNum]map[int]SourceTokenDataPayloadV2
163-
) map[cciptypes.SeqNum]map[int]AttestationStatus {
164-
result := map[cciptypes.SeqNum]map[int]AttestationStatus
165-
for txHash, seqNums in txHashToSeqNums {
166-
cctpv2Messages := getCCTPv2Messages(sourceDomainID uint32, txHash string) CCTPv2Messages
167-
attestations := extractAttestations(cctpv2Messages)
168-
assignedAttestations := make(map[int]AttestationStatus)
169-
for seqNum, v2TokenPayloads in v2TokenPayloads {
170-
for idx, v2TokenPayload in v2TokenPayloads {
171-
assignedAttestation := assignAttestationForV2TokenPayload(attestations, v2TokenPayload)
172-
assignedAttestation.ID = messages[SeqNum].ID
173-
assignedAttestations[idx] = assignedAttestation
174-
}
175-
result[SeqNum] = assignedAttestations
176-
}
177-
}
178-
return result
179-
}
180-
181-
// For a single Tx
182-
// Maybe fill-in messageID later
183-
func extractAttestations(cctpV2Messages CCTPv2Messages) map[depositHash][]AttestationStatus {
184-
// filter out not complete, not v2
185-
for cctpV2Message in cctpV2Messages {
186-
187-
}
188-
}
189-
190-
191-
// Need to mutate attestations
192-
func assignAttestationForV2TokenPayload (
193-
attestations map[depositHash][]AttestationStatus,
194-
v2TokenPayload SourceTokenDataPayloadV2
195-
) AttestationStatus {
196-
attestationStatuses, ok := attestations.get(v2TokenPayload.DepositHash)
197-
if !ok || len(attestationStatuses) == 0 {
198-
return ErrorAttestationStatus(tokendata.ErrDataMissing)
199-
}
200-
201-
attestation := attestationStatuses[0]
202-
attestations[v2TokenPayload.DepositHash] = attestationStatuses[1:]
203-
return attestation
204-
}
205-
```
206-
You can transform `CCTPv2Messages -> map[depositHash][]AttestationStatus`
207-
208-
Key point: attestations are fungible for the same depositHashes *in the same tx*
209-
210-
Need: `map[cciptypes.SeqNum]map[int](ID, MessageBody, Attestation)`
146+
### Putting it all together
147+
With this understanding we can write the high-level design of `Observe()`. For each chain:
148+
1. Extract data from `messages`: v2TokenPayloads, sourceDomainID, txHashes
149+
2. Fetch attestations from the CCTPv2 API and assign an attestation to each v2TokenPayload
150+
3. Convert each attestation to `TokenData`
151+
152+
Step 2 is implemented by `assignAttestationsToV2TokenPayloads` which, for each txHash:
153+
- fetch `CCTPv2Messages` from the API
154+
- converts `CCTPv2Messages` to a map from `DepositHash` to `[]AttestationStatus`
155+
- Again note that `DepositHash` is not unique and may map to multiple `AttestationStatus`
156+
- `AttestationStatus` acts here as a container for `MessageBody` and `Attestation`
157+
- iterate over each `v2TokenPayload`
158+
- look up `v2TokenPayload.DepositHash` in the `DepositHash` to `[]AttestationStatus` map
159+
- if the lookup returns a non-empty list, destructively pop() an `AttestationStatus`
160+
- **The popped `AttestationStatus` is now assigned to this specific `v2TokenPayload`**
161+
- The pop() being destructive ensures the `AttestationStatus` won't be assigned again
162+
- At the end of this iteration, each `v2TokenPayload` has been assigned a `AttestationStatus` (for this specific TxHash)
163+
164+
Step 3 converts these `AttestationStatus` into `TokenData` and ensures structure is preserved.

execute/tokendata/cctp/v2/http_client.go

Lines changed: 23 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ package v2
77

88
import (
99
"context"
10-
"encoding/hex"
1110
"encoding/json"
1211
"fmt"
1312
"net/http"
@@ -32,9 +31,6 @@ type CCTPv2HTTPClient interface {
3231
GetMessages(
3332
ctx context.Context, sourceChain cciptypes.ChainSelector, sourceDomainID uint32, transactionHash string,
3433
) (CCTPv2Messages, error)
35-
GetProcessedMessages(
36-
ctx context.Context, sourceChain cciptypes.ChainSelector, sourceDomainID uint32, transactionHash string,
37-
) ([]ProcessedCCTPMessage, error)
3834
}
3935

4036
// MetricsReporter provides metrics reporting for attestation API calls
@@ -133,8 +129,30 @@ func (c *CCTPv2HTTPClientImpl) GetMessages(
133129
return CCTPv2Messages{}, err
134130
}
135131

132+
// Filter messages to only include complete V2 messages
133+
filteredMessages := make([]CCTPv2Message, 0, len(result.Messages))
134+
for _, msg := range result.Messages {
135+
// Only include messages that are:
136+
// 1. CCTPv2 (version = 2)
137+
// 2. Complete (status = "complete")
138+
if msg.CCTPVersion == 2 && strings.EqualFold(msg.Status, "complete") {
139+
filteredMessages = append(filteredMessages, msg)
140+
}
141+
}
142+
143+
// Log if we filtered out any messages
144+
if len(filteredMessages) != len(result.Messages) {
145+
c.lggr.Debugw(
146+
"Filtered CCTPv2 messages",
147+
"originalCount", len(result.Messages),
148+
"filteredCount", len(filteredMessages),
149+
"transactionHash", transactionHash,
150+
"sourceDomainID", sourceDomainID,
151+
)
152+
}
153+
136154
success = true
137-
return result, nil
155+
return CCTPv2Messages{Messages: filteredMessages}, nil
138156
}
139157

140158
// parseResponseBody parses the JSON response from Circle's attestation API
@@ -194,131 +212,3 @@ type CCTPv2DecodedMessageBody struct {
194212
ExpirationBlock string `json:"expirationBlock,omitempty"`
195213
HookData string `json:"hookData,omitempty"`
196214
}
197-
198-
// ProcessedCCTPMessage contains validated and decoded CCTP message data ready for use.
199-
// This is the result of processing a raw CCTPv2Message from the API by:
200-
// - Filtering for complete status and CCTP version 2
201-
// - Calculating the depositHash from the decoded message
202-
// - Decoding hex-encoded message and attestation bytes
203-
type ProcessedCCTPMessage struct {
204-
DepositHash [32]byte // Content-addressable hash identifying the transfer
205-
MessageBytes []byte // Decoded CCTP message bytes
206-
AttestationBytes []byte // Decoded attestation signature bytes
207-
}
208-
209-
// GetProcessedMessages fetches CCTP messages for a transaction and processes them.
210-
// Returns only complete, version 2 messages with decoded hex data.
211-
// This method:
212-
// 1. Calls GetMessages() to fetch raw messages from Circle API
213-
// 2. Filters for messages with CCTPVersion == 2 and Status == "complete"
214-
// 3. Calculates depositHash for each message using CalculateDepositHash()
215-
// 4. Decodes hex-encoded message and attestation bytes
216-
// 5. Returns a list of ProcessedCCTPMessage ready for use
217-
func (c *CCTPv2HTTPClientImpl) GetProcessedMessages(
218-
ctx context.Context,
219-
sourceChain cciptypes.ChainSelector,
220-
sourceDomainID uint32,
221-
transactionHash string,
222-
) ([]ProcessedCCTPMessage, error) {
223-
// Fetch raw messages from Circle API
224-
rawMessages, err := c.GetMessages(ctx, sourceChain, sourceDomainID, transactionHash)
225-
if err != nil {
226-
return nil, fmt.Errorf("failed to get messages: %w", err)
227-
}
228-
229-
var processedMessages []ProcessedCCTPMessage
230-
231-
for i, msg := range rawMessages.Messages {
232-
// Filter: Only process CCTP version 2 messages
233-
if msg.CCTPVersion != 2 {
234-
c.lggr.Debugw(
235-
"Skipping message with non-v2 CCTP version",
236-
"sourceChain", sourceChain,
237-
"sourceDomainID", sourceDomainID,
238-
"transactionHash", transactionHash,
239-
"messageIndex", i,
240-
"cctpVersion", msg.CCTPVersion,
241-
)
242-
continue
243-
}
244-
245-
// Filter: Only process complete messages
246-
if msg.Status != "complete" {
247-
c.lggr.Debugw(
248-
"Skipping message with incomplete status",
249-
"sourceChain", sourceChain,
250-
"sourceDomainID", sourceDomainID,
251-
"transactionHash", transactionHash,
252-
"messageIndex", i,
253-
"status", msg.Status,
254-
)
255-
continue
256-
}
257-
258-
// Calculate depositHash from decoded message
259-
depositHash, err := CalculateDepositHash(msg.DecodedMessage)
260-
if err != nil {
261-
c.lggr.Warnw(
262-
"Failed to calculate depositHash for message",
263-
"sourceChain", sourceChain,
264-
"sourceDomainID", sourceDomainID,
265-
"transactionHash", transactionHash,
266-
"messageIndex", i,
267-
"error", err,
268-
)
269-
continue
270-
}
271-
272-
// Decode hex-encoded message bytes
273-
messageBytes, err := hexDecode(msg.Message)
274-
if err != nil {
275-
c.lggr.Warnw(
276-
"Failed to decode message hex",
277-
"sourceChain", sourceChain,
278-
"sourceDomainID", sourceDomainID,
279-
"transactionHash", transactionHash,
280-
"messageIndex", i,
281-
"error", err,
282-
)
283-
continue
284-
}
285-
286-
// Decode hex-encoded attestation bytes
287-
attestationBytes, err := hexDecode(msg.Attestation)
288-
if err != nil {
289-
c.lggr.Warnw(
290-
"Failed to decode attestation hex",
291-
"sourceChain", sourceChain,
292-
"sourceDomainID", sourceDomainID,
293-
"transactionHash", transactionHash,
294-
"messageIndex", i,
295-
"error", err,
296-
)
297-
continue
298-
}
299-
300-
// Create processed message
301-
processedMessages = append(processedMessages, ProcessedCCTPMessage{
302-
DepositHash: depositHash,
303-
MessageBytes: messageBytes,
304-
AttestationBytes: attestationBytes,
305-
})
306-
}
307-
308-
c.lggr.Debugw(
309-
"Processed CCTP messages",
310-
"sourceChain", sourceChain,
311-
"sourceDomainID", sourceDomainID,
312-
"transactionHash", transactionHash,
313-
"totalMessages", len(rawMessages.Messages),
314-
"processedMessages", len(processedMessages),
315-
)
316-
317-
return processedMessages, nil
318-
}
319-
320-
// hexDecode decodes a hex string (with or without 0x prefix) to bytes.
321-
func hexDecode(hexStr string) ([]byte, error) {
322-
hexStr = strings.TrimPrefix(hexStr, "0x")
323-
return hex.DecodeString(hexStr)
324-
}

0 commit comments

Comments
 (0)