Skip to content

update shardDistribution #336

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 59 commits into from
Jun 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
1235013
update Work-report distribution
MacOMNI Apr 7, 2025
b94eaa1
update workReportRequest
MacOMNI Apr 7, 2025
daa2f98
update WorkReportRef
MacOMNI Apr 8, 2025
3a55564
update blockchain
MacOMNI Apr 8, 2025
c61f810
update workreport
MacOMNI Apr 8, 2025
abf31c4
update swift testing
MacOMNI Apr 8, 2025
64e90d0
update package
MacOMNI Apr 8, 2025
8f2ed97
fix xcode udpate issues
MacOMNI Apr 9, 2025
032e49f
update tests
MacOMNI Apr 9, 2025
fccdc11
update guaranteedWorkReport
MacOMNI Apr 9, 2025
b187b97
update work report
MacOMNI Apr 9, 2025
8d6087c
update db
MacOMNI Apr 9, 2025
876bc86
update OnSyncCompleted
MacOMNI Apr 10, 2025
b50d929
update networkmanager
MacOMNI Apr 11, 2025
3925ab0
update block request
MacOMNI Apr 14, 2025
0212d13
update more tests
MacOMNI Apr 14, 2025
fa89824
update open rpc
MacOMNI Apr 14, 2025
413d107
fix some unstable tests
MacOMNI Apr 14, 2025
bfb8164
fix some issues
MacOMNI Apr 14, 2025
4ff9b78
fix open rpc
MacOMNI Apr 14, 2025
2694049
update OpenRPC
MacOMNI Apr 14, 2025
96c0969
update OpenRPC
MacOMNI Apr 14, 2025
6cba5ef
update open rpc
MacOMNI Apr 15, 2025
645fc48
Merge branch 'master' into dev-JAMNP
MacOMNI Apr 15, 2025
223b7e7
update swiftlint
MacOMNI Apr 15, 2025
2a6476b
update OpenRPC
MacOMNI Apr 15, 2025
d9c93bc
update OpenRPC
MacOMNI Apr 15, 2025
81c2cd9
update vapor
MacOMNI Apr 15, 2025
438a454
update vapor
MacOMNI Apr 15, 2025
c1a3f6c
fix unstable tests
MacOMNI Apr 15, 2025
8b8a7f5
fix unstable tests
MacOMNI Apr 15, 2025
812a01b
fix unstable tests
MacOMNI Apr 16, 2025
6198b3e
update some issues
MacOMNI Apr 16, 2025
de831e8
update networkmanager
MacOMNI Apr 16, 2025
805bde6
update vapor
MacOMNI Apr 16, 2025
b4e1e6b
update vapor
MacOMNI Apr 17, 2025
2456272
update rpc package
MacOMNI Apr 17, 2025
0ed4dbb
update swift test
MacOMNI Apr 17, 2025
c1f6769
update swift pm
MacOMNI Apr 17, 2025
b32f842
update package
MacOMNI Apr 17, 2025
54be471
update swift testing
MacOMNI Apr 17, 2025
38f70df
update more tests
MacOMNI Apr 18, 2025
d3d277a
update swift testing
MacOMNI Apr 21, 2025
947e41f
update local
MacOMNI Apr 21, 2025
22f6e8c
Merge branch 'master' into dev-jamnps
MacOMNI Apr 23, 2025
2b57be8
update package
MacOMNI Apr 23, 2025
130cd2e
update handleShardDistributionReceived
MacOMNI Apr 25, 2025
92c690f
Merge branch 'master' into dev-jamnps
MacOMNI Apr 25, 2025
18a33dd
update networkmanager
MacOMNI Apr 27, 2025
9f1f41f
update shard
MacOMNI Apr 28, 2025
308a100
update data service
MacOMNI Apr 28, 2025
acca51d
update service
MacOMNI Apr 29, 2025
e55ae6b
update data service
MacOMNI May 7, 2025
07a5413
Merge branch 'master' into dev-jamnps
MacOMNI May 7, 2025
c32d39a
update TODO
May 19, 2025
ca79f2b
update rpc
May 19, 2025
1b02e18
fix some issues
May 20, 2025
f2f9ca0
Merge branch 'master' into dev-jamnps
xlc Jun 12, 2025
8773d34
remove bad test
xlc Jun 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Blockchain/Package.resolved

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,14 @@

public func remove(hash: Data32) async throws {
logger.debug("removing block: \(hash)")

try await dataProvider.remove(hash: hash)
}

public func remove(workReportHash: Data32) async throws {
logger.debug("removing workReportHash: \(workReportHash)")
try await dataProvider.remove(workReportHash: workReportHash)
}

Check warning on line 181 in Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift

View check run for this annotation

Codecov / codecov/patch

Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift#L178-L181

Added lines #L178 - L181 were not covered by tests

public nonisolated var genesisBlockHash: Data32 {
dataProvider.genesisBlockHash
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,8 @@ public protocol BlockchainDataProviderProtocol: Sendable {
/// remove header, block, workReport, state
func remove(hash: Data32) async throws

/// remove workReport
func remove(workReportHash: Data32) async throws

var genesisBlockHash: Data32 { get }
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@
heads.insert(hash)
}

public func remove(workReportHash hash: Data32) {
guaranteedWorkReports.removeValue(forKey: hash)
}

Check warning on line 134 in Blockchain/Sources/Blockchain/BlockchainDataProvider/InMemoryDataProvider.swift

View check run for this annotation

Codecov / codecov/patch

Blockchain/Sources/Blockchain/BlockchainDataProvider/InMemoryDataProvider.swift#L132-L134

Added lines #L132 - L134 were not covered by tests

public func remove(hash: Data32) {
let timeslot = blockByHash[hash]?.header.timeslot ?? stateByBlockHash[hash]?.value.timeslot
stateByBlockHash.removeValue(forKey: hash)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,9 @@

public struct ShardDistributionReceived: Event {
public var erasureRoot: Data32
public var shardIndex: UInt32
public var shardIndex: UInt16

public init(erasureRoot: Data32, shardIndex: UInt32) {
public init(erasureRoot: Data32, shardIndex: UInt16) {

Check warning on line 266 in Blockchain/Sources/Blockchain/RuntimeProtocols/RuntimeEvents.swift

View check run for this annotation

Codecov / codecov/patch

Blockchain/Sources/Blockchain/RuntimeProtocols/RuntimeEvents.swift#L266

Added line #L266 was not covered by tests
self.erasureRoot = erasureRoot
self.shardIndex = shardIndex
}
Expand All @@ -275,7 +275,7 @@

// Response to shard distribution
public struct ShardDistributionReceivedResponse: Event {
public var requestId: Data32
public let requestId: Data32

public let result: Result<(bundleShard: Data, segmentShards: [Data], justification: Justification), Error>

Expand Down
146 changes: 112 additions & 34 deletions Blockchain/Sources/Blockchain/Validator/DataAvailabilityService.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
case invalidWorkReportSlot
case invalidWorkReport
case insufficientSignatures
case invalidMerklePath
case emptySegmentShards
case invalidJustificationFormat
}

public final class DataAvailabilityService: ServiceBase2, @unchecked Sendable, OnSyncCompleted {
Expand Down Expand Up @@ -47,12 +50,21 @@
await subscribe(RuntimeEvents.WorkReportReceived.self, id: "DataAvailabilityService.WorkReportReceived") { [weak self] event in
await self?.handleWorkReportReceived(event)
}
await subscribe(RuntimeEvents.ShardDistributionReceived.self,
id: "DataAvailabilityService.ShardDistributionReceived")
{ [weak self] event in
await self?.handleShardDistributionReceived(event)
}

Check warning on line 57 in Blockchain/Sources/Blockchain/Validator/DataAvailabilityService.swift

View check run for this annotation

Codecov / codecov/patch

Blockchain/Sources/Blockchain/Validator/DataAvailabilityService.swift#L56-L57

Added lines #L56 - L57 were not covered by tests
}

public func handleWorkReportReceived(_ event: RuntimeEvents.WorkReportReceived) async {
await workReportDistribution(workReport: event.workReport, slot: event.slot, signatures: event.signatures)
}

public func handleShardDistributionReceived(_ event: RuntimeEvents.ShardDistributionReceived) async {
try? await shardDistribution(erasureRoot: event.erasureRoot, shardIndex: event.shardIndex)
}

Check warning on line 66 in Blockchain/Sources/Blockchain/Validator/DataAvailabilityService.swift

View check run for this annotation

Codecov / codecov/patch

Blockchain/Sources/Blockchain/Validator/DataAvailabilityService.swift#L64-L66

Added lines #L64 - L66 were not covered by tests

/// Purge old data from the data availability stores
/// - Parameter epoch: The current epoch index
public func purge(epoch _: EpochIndex) async {
Expand Down Expand Up @@ -130,21 +142,18 @@
/// - Parameter bundle: The bundle to export
/// - Returns: The erasure root and length of the bundle
public func exportWorkpackageBundle(bundle: WorkPackageBundle) async throws -> (erasureRoot: Data32, length: DataLength) {
// 1. Serialize the bundle
// Serialize the bundle

Check warning on line 145 in Blockchain/Sources/Blockchain/Validator/DataAvailabilityService.swift

View check run for this annotation

Codecov / codecov/patch

Blockchain/Sources/Blockchain/Validator/DataAvailabilityService.swift#L145

Added line #L145 was not covered by tests
let serializedData = try JamEncoder.encode(bundle)
let dataLength = DataLength(UInt32(serializedData.count))

// 2. Calculate the erasure root
// TODO: replace this with real implementation
let erasureRoot = serializedData.blake2b256hash()

// 3. Extract the work package hash from the bundle
let workPackageHash = bundle.workPackage.hash()

// 4. Store the serialized bundle in the audit store (short-term storage)

// chunk the bundle into segments

// Calculate the erasure root
// Work-package bundle shard hash
let bundleShards = try ErasureCoding.chunk(
data: serializedData,
basicSize: config.value.erasureCodedPieceSize,
recoveryCount: config.value.totalNumberOfValidators
)
// Chunk the bundle into segments

Check warning on line 156 in Blockchain/Sources/Blockchain/Validator/DataAvailabilityService.swift

View check run for this annotation

Codecov / codecov/patch

Blockchain/Sources/Blockchain/Validator/DataAvailabilityService.swift#L149-L156

Added lines #L149 - L156 were not covered by tests
let segmentCount = serializedData.count / 4104
var segments = [Data4104]()
for i in 0 ..< segmentCount {
Expand All @@ -159,19 +168,32 @@
segments.append(Data4104(segment)!)
}

// Calculate the segments root
let segmentsRoot = Merklization.constantDepthMerklize(segments.map(\.data))

var nodes = [Data]()
// workpackage bundle shard hash + segment shard root
for i in 0 ..< bundleShards.count {
let shardHash = bundleShards[i].blake2b256hash()
try nodes.append(JamEncoder.encode(shardHash) + JamEncoder.encode(segmentsRoot))
}

// ErasureRoot
let erasureRoot = Merklization.binaryMerklize(nodes)

// Extract the work package hash from the bundle
let workPackageHash = bundle.workPackage.hash()

// Store the serialized bundle in the audit store (short-term storage)

Check warning on line 187 in Blockchain/Sources/Blockchain/Validator/DataAvailabilityService.swift

View check run for this annotation

Codecov / codecov/patch

Blockchain/Sources/Blockchain/Validator/DataAvailabilityService.swift#L171-L187

Added lines #L171 - L187 were not covered by tests
// Store the segment in the data store
for (i, segment) in segments.enumerated() {
try await dataStore.set(data: segment, erasureRoot: erasureRoot, index: UInt16(i))
}

// 5. Calculate the segments root
// TODO: replace this with real implementation
let segmentsRoot = serializedData.blake2b256hash()

// 6. Map the work package hash to the segments root
// Map the work package hash to the segments root

Check warning on line 193 in Blockchain/Sources/Blockchain/Validator/DataAvailabilityService.swift

View check run for this annotation

Codecov / codecov/patch

Blockchain/Sources/Blockchain/Validator/DataAvailabilityService.swift#L193

Added line #L193 was not covered by tests
try await dataStore.setSegmentRoot(segmentRoot: segmentsRoot, forWorkPackageHash: workPackageHash)

// 7. Set the timestamp for retention tracking
// Set the timestamp for retention tracking

Check warning on line 196 in Blockchain/Sources/Blockchain/Validator/DataAvailabilityService.swift

View check run for this annotation

Codecov / codecov/patch

Blockchain/Sources/Blockchain/Validator/DataAvailabilityService.swift#L196

Added line #L196 was not covered by tests
// As per GP 14.3.1, items in the audit store are kept until finality (approx. 1 hour)
let currentTimestamp = Date()
try await dataStore.setTimestamp(erasureRoot: erasureRoot, timestamp: currentTimestamp)
Expand Down Expand Up @@ -238,23 +260,79 @@

// MARK: - Shard Distribution (CE 137)

/// Distribute shards to validators
/// - Parameters:
/// - shards: The shards to distribute
/// - erasureRoot: The erasure root of the data
/// - validators: The validators to distribute to
/// - Returns: Success status of the distribution
public func distributeShards(
shards _: [Data4104],
public func shardDistribution(
erasureRoot: Data32,
shardIndex: UInt16
) async throws {
// Generate request ID
let requestId = try JamEncoder.encode(erasureRoot, shardIndex).blake2b256hash()
do {
// TODO: Fetch shard data from local storage
let (bundleShard, segmentShards) = (Data(), [Data()])

// Generate Merkle proof justification
let justification = try await generateJustification(
erasureRoot: erasureRoot,
shardIndex: shardIndex,
bundleShard: bundleShard,
segmentShards: segmentShards
)

// Respond with shards + proof
publish(RuntimeEvents.ShardDistributionReceivedResponse(
requestId: requestId,
bundleShard: bundleShard,
segmentShards: segmentShards,
justification: justification
))

} catch {
publish(RuntimeEvents.ShardDistributionReceivedResponse(
requestId: requestId,
error: error
))
}
}

Check warning on line 295 in Blockchain/Sources/Blockchain/Validator/DataAvailabilityService.swift

View check run for this annotation

Codecov / codecov/patch

Blockchain/Sources/Blockchain/Validator/DataAvailabilityService.swift#L266-L295

Added lines #L266 - L295 were not covered by tests

private func generateJustification(
erasureRoot _: Data32,
validators _: [ValidatorIndex]
) async throws -> Bool {
// TODO: Implement shard distribution to validators
// 1. Determine which shards go to which validators
// 2. Send shards to validators over the network
// 3. Track distribution status
// 4. Return success status
throw DataAvailabilityError.distributionError
shardIndex: UInt16,
bundleShard _: Data,
segmentShards: [Data]
) async throws -> Justification {
guard !segmentShards.isEmpty else {
throw DataAvailabilityError.emptySegmentShards
}

// GP T(s,i,H)
let merklePath = Merklization.trace(
segmentShards,
index: Int(shardIndex),
hasher: Blake2b256.self
)

// TODO: Got Justification
switch merklePath.count {
case 1:
// 0 ++ Hash
guard case let .right(hash) = merklePath.first! else {
throw DataAvailabilityError.invalidMerklePath
}
return .singleHash(hash)

case 2:
// 1 ++ Hash ++ Hash
guard case let .right(hash1) = merklePath[0],
case let .right(hash2) = merklePath[1]
else {
throw DataAvailabilityError.invalidMerklePath
}
return .doubleHash(hash1, hash2)

default:
// TODO: 2 ++ Segment Shard (12 bytes)
return .segmentShard(Data12())
}

Check warning on line 335 in Blockchain/Sources/Blockchain/Validator/DataAvailabilityService.swift

View check run for this annotation

Codecov / codecov/patch

Blockchain/Sources/Blockchain/Validator/DataAvailabilityService.swift#L302-L335

Added lines #L302 - L335 were not covered by tests
}

// MARK: - Audit Shard Requests (CE 138)
Expand Down
6 changes: 5 additions & 1 deletion Database/Sources/Database/RocksDBBackend.swift
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@
// TODO: batch delete

try blocks.delete(key: hash)
try guaranteedWorkReports.delete(key: hash)
if let block = try await getBlock(hash: hash) {
try blockHashByTimeslot.delete(key: block.header.timeslot)
}
Expand All @@ -235,6 +234,11 @@
}
try blockNumberByHash.delete(key: hash)
}

public func remove(workReportHash: Data32) async throws {
logger.trace("remove() \(workReportHash)")
try guaranteedWorkReports.delete(key: workReportHash)
}

Check warning on line 241 in Database/Sources/Database/RocksDBBackend.swift

View check run for this annotation

Codecov / codecov/patch

Database/Sources/Database/RocksDBBackend.swift#L238-L241

Added lines #L238 - L241 were not covered by tests
}

extension RocksDBBackend: StateBackendProtocol {
Expand Down
6 changes: 4 additions & 2 deletions Networking/Tests/MsQuicSwiftTests/NetAddrTests.swift
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import Foundation
import MsQuicSwift
@testable import Networking
import Testing
import TracingUtils
import Utils

@testable import MsQuicSwift

struct NetAddrTests {
@Test
Expand Down
2 changes: 1 addition & 1 deletion Node/Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ let package = Package(
.package(path: "../TracingUtils"),
.package(path: "../Utils"),
.package(path: "../Database"),
.package(url: "https://github.com/apple/swift-testing.git", branch: "6.0.3"),
.package(url: "https://github.com/apple/swift-testing.git", branch: "6.0.0"),
.package(url: "https://github.com/gh123man/Async-Channels.git", from: "1.0.2"),
],
targets: [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

public struct ShardDistributionMessage: Codable, Sendable, Equatable, Hashable {
public var erasureRoot: Data32
public var shardIndex: UInt32
public var shardIndex: UInt16

public init(erasureRoot: Data32, shardIndex: UInt32) {
public init(erasureRoot: Data32, shardIndex: UInt16) {

Check warning on line 10 in Node/Sources/Node/NetworkingProtocol/CommonEphemeral/ShardDistributionMessage.swift

View check run for this annotation

Codecov / codecov/patch

Node/Sources/Node/NetworkingProtocol/CommonEphemeral/ShardDistributionMessage.swift#L10

Added line #L10 was not covered by tests
self.erasureRoot = erasureRoot
self.shardIndex = shardIndex
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import Utils

public struct StateRequest: Codable, Sendable, Equatable, Hashable {
public var headerHash: Data32
public var startKey: Data31 // [u8; 31]
public var endKey: Data31 // [u8; 31]
public var startKey: Data31
public var endKey: Data31
public var maxSize: UInt32

public init(headerHash: Data32, startKey: Data31, endKey: Data31, maxSize: UInt32) {
Expand Down
21 changes: 11 additions & 10 deletions Node/Sources/Node/NetworkingProtocol/NetworkManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -412,9 +412,7 @@
let resp = try await blockchain.waitFor(RuntimeEvents.WorkReportReceivedResponse.self) { event in
hash == event.workReportHash
}
if case let .failure(error) = resp.result {
throw error // failed
}
_ = try resp.result.get()
return []
case let .workReportRequest(message):
let workReportRef = try await blockchain.dataProvider.getGuaranteedWorkReport(hash: message.workReportHash)
Expand All @@ -423,13 +421,16 @@
}
return []
case let .shardDistribution(message):
blockchain
.publish(event: RuntimeEvents.ShardDistributionReceived(erasureRoot: message.erasureRoot, shardIndex: message.shardIndex))
// TODO: waitfor ShardDistributionReceivedResponse
// let resp = try await blockchain.waitFor(RuntimeEvents.ShardDistributionReceivedResponse.self) { event in
//
// }
return []
let receivedEvent = RuntimeEvents.ShardDistributionReceived(erasureRoot: message.erasureRoot, shardIndex: message.shardIndex)
let requestId = try receivedEvent.generateRequestId()

blockchain.publish(event: receivedEvent)

let resp = try await blockchain.waitFor(RuntimeEvents.ShardDistributionReceivedResponse.self) { event in
requestId == event.requestId
}
let (bundleShard, segmentShards, justification) = try resp.result.get()
return try [JamEncoder.encode(bundleShard, segmentShards, justification)]

Check warning on line 433 in Node/Sources/Node/NetworkingProtocol/NetworkManager.swift

View check run for this annotation

Codecov / codecov/patch

Node/Sources/Node/NetworkingProtocol/NetworkManager.swift#L424-L433

Added lines #L424 - L433 were not covered by tests
case let .auditShardRequest(message):
blockchain
.publish(event: RuntimeEvents.AuditShardRequestReceived(erasureRoot: message.erasureRoot, shardIndex: message.shardIndex))
Expand Down
29 changes: 0 additions & 29 deletions Node/Tests/NodeTests/NetworkManagerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -384,35 +384,6 @@ struct NetworkManagerTests {
#expect(data == [])
}

@Test
func testHandleShardDistribution() async throws {
let erasureRoot = Data32(repeating: 1)
let shardIndex: UInt32 = 2

let distributionMessage = CERequest.shardDistribution(ShardDistributionMessage(
erasureRoot: erasureRoot,
shardIndex: shardIndex
))

let message = try ShardDistributionMessage.decode(data: distributionMessage.encode(), config: services.config)
#expect(shardIndex == message.shardIndex)

_ = try await network.handler.handle(ceRequest: distributionMessage)

let events = await storeMiddleware.wait()

let receivedEvent = events.first {
if let event = $0 as? RuntimeEvents.ShardDistributionReceived {
return event.erasureRoot == erasureRoot && event.shardIndex == shardIndex
}
return false
} as? RuntimeEvents.ShardDistributionReceived

let event = try #require(receivedEvent)
#expect(event.erasureRoot == erasureRoot)
#expect(event.shardIndex == shardIndex)
}

@Test
func testHandleAuditShardRequest() async throws {
let testErasureRoot = Data32(repeating: 1)
Expand Down
Loading