diff --git a/Blockchain/Package.resolved b/Blockchain/Package.resolved index be2c74e1..96ecff90 100644 --- a/Blockchain/Package.resolved +++ b/Blockchain/Package.resolved @@ -58,7 +58,7 @@ { "identity" : "swift-numerics", "kind" : "remoteSourceControl", - "location" : "https://github.com/apple/swift-numerics", + "location" : "https://github.com/apple/swift-numerics.git", "state" : { "branch" : "main", "revision" : "e30276bff2ff5ed80566fbdca49f50aa160b0e83" diff --git a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift index 90b8a1a7..fec89005 100644 --- a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift +++ b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift @@ -172,10 +172,14 @@ extension BlockchainDataProvider { public func remove(hash: Data32) async throws { logger.debug("removing block: \(hash)") - try await dataProvider.remove(hash: hash) } + public func remove(workReportHash: Data32) async throws { + logger.debug("removing workReportHash: \(workReportHash)") + try await dataProvider.remove(workReportHash: workReportHash) + } + public nonisolated var genesisBlockHash: Data32 { dataProvider.genesisBlockHash } diff --git a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProviderProtocol.swift b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProviderProtocol.swift index bfc5dc2d..90db05bf 100644 --- a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProviderProtocol.swift +++ b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProviderProtocol.swift @@ -40,5 +40,8 @@ public protocol BlockchainDataProviderProtocol: Sendable { /// remove header, block, workReport, state func remove(hash: Data32) async throws + /// remove workReport + func remove(workReportHash: Data32) async throws + var genesisBlockHash: Data32 { get } } diff --git a/Blockchain/Sources/Blockchain/BlockchainDataProvider/InMemoryDataProvider.swift b/Blockchain/Sources/Blockchain/BlockchainDataProvider/InMemoryDataProvider.swift index 0aa4bd59..6d5b3d4a 100644 --- a/Blockchain/Sources/Blockchain/BlockchainDataProvider/InMemoryDataProvider.swift +++ b/Blockchain/Sources/Blockchain/BlockchainDataProvider/InMemoryDataProvider.swift @@ -129,6 +129,10 @@ extension InMemoryDataProvider: BlockchainDataProviderProtocol { heads.insert(hash) } + public func remove(workReportHash hash: Data32) { + guaranteedWorkReports.removeValue(forKey: hash) + } + public func remove(hash: Data32) { let timeslot = blockByHash[hash]?.header.timeslot ?? stateByBlockHash[hash]?.value.timeslot stateByBlockHash.removeValue(forKey: hash) diff --git a/Blockchain/Sources/Blockchain/RuntimeProtocols/RuntimeEvents.swift b/Blockchain/Sources/Blockchain/RuntimeProtocols/RuntimeEvents.swift index 25594416..703fe0d2 100644 --- a/Blockchain/Sources/Blockchain/RuntimeProtocols/RuntimeEvents.swift +++ b/Blockchain/Sources/Blockchain/RuntimeProtocols/RuntimeEvents.swift @@ -261,9 +261,9 @@ public enum RuntimeEvents { public struct ShardDistributionReceived: Event { public var erasureRoot: Data32 - public var shardIndex: UInt32 + public var shardIndex: UInt16 - public init(erasureRoot: Data32, shardIndex: UInt32) { + public init(erasureRoot: Data32, shardIndex: UInt16) { self.erasureRoot = erasureRoot self.shardIndex = shardIndex } @@ -275,7 +275,7 @@ public enum RuntimeEvents { // Response to shard distribution public struct ShardDistributionReceivedResponse: Event { - public var requestId: Data32 + public let requestId: Data32 public let result: Result<(bundleShard: Data, segmentShards: [Data], justification: Justification), Error> diff --git a/Blockchain/Sources/Blockchain/Validator/DataAvailabilityService.swift b/Blockchain/Sources/Blockchain/Validator/DataAvailabilityService.swift index 56d7990a..e67b1918 100644 --- a/Blockchain/Sources/Blockchain/Validator/DataAvailabilityService.swift +++ b/Blockchain/Sources/Blockchain/Validator/DataAvailabilityService.swift @@ -19,6 +19,9 @@ public enum DataAvailabilityError: Error { case invalidWorkReportSlot case invalidWorkReport case insufficientSignatures + case invalidMerklePath + case emptySegmentShards + case invalidJustificationFormat } public final class DataAvailabilityService: ServiceBase2, @unchecked Sendable, OnSyncCompleted { @@ -47,12 +50,21 @@ public final class DataAvailabilityService: ServiceBase2, @unchecked Sendable, O await subscribe(RuntimeEvents.WorkReportReceived.self, id: "DataAvailabilityService.WorkReportReceived") { [weak self] event in await self?.handleWorkReportReceived(event) } + await subscribe(RuntimeEvents.ShardDistributionReceived.self, + id: "DataAvailabilityService.ShardDistributionReceived") + { [weak self] event in + await self?.handleShardDistributionReceived(event) + } } public func handleWorkReportReceived(_ event: RuntimeEvents.WorkReportReceived) async { await workReportDistribution(workReport: event.workReport, slot: event.slot, signatures: event.signatures) } + public func handleShardDistributionReceived(_ event: RuntimeEvents.ShardDistributionReceived) async { + try? await shardDistribution(erasureRoot: event.erasureRoot, shardIndex: event.shardIndex) + } + /// Purge old data from the data availability stores /// - Parameter epoch: The current epoch index public func purge(epoch _: EpochIndex) async { @@ -130,21 +142,18 @@ public final class DataAvailabilityService: ServiceBase2, @unchecked Sendable, O /// - Parameter bundle: The bundle to export /// - Returns: The erasure root and length of the bundle public func exportWorkpackageBundle(bundle: WorkPackageBundle) async throws -> (erasureRoot: Data32, length: DataLength) { - // 1. Serialize the bundle + // Serialize the bundle let serializedData = try JamEncoder.encode(bundle) let dataLength = DataLength(UInt32(serializedData.count)) - // 2. Calculate the erasure root - // TODO: replace this with real implementation - let erasureRoot = serializedData.blake2b256hash() - - // 3. Extract the work package hash from the bundle - let workPackageHash = bundle.workPackage.hash() - - // 4. Store the serialized bundle in the audit store (short-term storage) - - // chunk the bundle into segments - + // Calculate the erasure root + // Work-package bundle shard hash + let bundleShards = try ErasureCoding.chunk( + data: serializedData, + basicSize: config.value.erasureCodedPieceSize, + recoveryCount: config.value.totalNumberOfValidators + ) + // Chunk the bundle into segments let segmentCount = serializedData.count / 4104 var segments = [Data4104]() for i in 0 ..< segmentCount { @@ -159,19 +168,32 @@ public final class DataAvailabilityService: ServiceBase2, @unchecked Sendable, O segments.append(Data4104(segment)!) } + // Calculate the segments root + let segmentsRoot = Merklization.constantDepthMerklize(segments.map(\.data)) + + var nodes = [Data]() + // workpackage bundle shard hash + segment shard root + for i in 0 ..< bundleShards.count { + let shardHash = bundleShards[i].blake2b256hash() + try nodes.append(JamEncoder.encode(shardHash) + JamEncoder.encode(segmentsRoot)) + } + + // ErasureRoot + let erasureRoot = Merklization.binaryMerklize(nodes) + + // Extract the work package hash from the bundle + let workPackageHash = bundle.workPackage.hash() + + // Store the serialized bundle in the audit store (short-term storage) // Store the segment in the data store for (i, segment) in segments.enumerated() { try await dataStore.set(data: segment, erasureRoot: erasureRoot, index: UInt16(i)) } - // 5. Calculate the segments root - // TODO: replace this with real implementation - let segmentsRoot = serializedData.blake2b256hash() - - // 6. Map the work package hash to the segments root + // Map the work package hash to the segments root try await dataStore.setSegmentRoot(segmentRoot: segmentsRoot, forWorkPackageHash: workPackageHash) - // 7. Set the timestamp for retention tracking + // Set the timestamp for retention tracking // As per GP 14.3.1, items in the audit store are kept until finality (approx. 1 hour) let currentTimestamp = Date() try await dataStore.setTimestamp(erasureRoot: erasureRoot, timestamp: currentTimestamp) @@ -238,23 +260,79 @@ public final class DataAvailabilityService: ServiceBase2, @unchecked Sendable, O // MARK: - Shard Distribution (CE 137) - /// Distribute shards to validators - /// - Parameters: - /// - shards: The shards to distribute - /// - erasureRoot: The erasure root of the data - /// - validators: The validators to distribute to - /// - Returns: Success status of the distribution - public func distributeShards( - shards _: [Data4104], + public func shardDistribution( + erasureRoot: Data32, + shardIndex: UInt16 + ) async throws { + // Generate request ID + let requestId = try JamEncoder.encode(erasureRoot, shardIndex).blake2b256hash() + do { + // TODO: Fetch shard data from local storage + let (bundleShard, segmentShards) = (Data(), [Data()]) + + // Generate Merkle proof justification + let justification = try await generateJustification( + erasureRoot: erasureRoot, + shardIndex: shardIndex, + bundleShard: bundleShard, + segmentShards: segmentShards + ) + + // Respond with shards + proof + publish(RuntimeEvents.ShardDistributionReceivedResponse( + requestId: requestId, + bundleShard: bundleShard, + segmentShards: segmentShards, + justification: justification + )) + + } catch { + publish(RuntimeEvents.ShardDistributionReceivedResponse( + requestId: requestId, + error: error + )) + } + } + + private func generateJustification( erasureRoot _: Data32, - validators _: [ValidatorIndex] - ) async throws -> Bool { - // TODO: Implement shard distribution to validators - // 1. Determine which shards go to which validators - // 2. Send shards to validators over the network - // 3. Track distribution status - // 4. Return success status - throw DataAvailabilityError.distributionError + shardIndex: UInt16, + bundleShard _: Data, + segmentShards: [Data] + ) async throws -> Justification { + guard !segmentShards.isEmpty else { + throw DataAvailabilityError.emptySegmentShards + } + + // GP T(s,i,H) + let merklePath = Merklization.trace( + segmentShards, + index: Int(shardIndex), + hasher: Blake2b256.self + ) + + // TODO: Got Justification + switch merklePath.count { + case 1: + // 0 ++ Hash + guard case let .right(hash) = merklePath.first! else { + throw DataAvailabilityError.invalidMerklePath + } + return .singleHash(hash) + + case 2: + // 1 ++ Hash ++ Hash + guard case let .right(hash1) = merklePath[0], + case let .right(hash2) = merklePath[1] + else { + throw DataAvailabilityError.invalidMerklePath + } + return .doubleHash(hash1, hash2) + + default: + // TODO: 2 ++ Segment Shard (12 bytes) + return .segmentShard(Data12()) + } } // MARK: - Audit Shard Requests (CE 138) diff --git a/Database/Sources/Database/RocksDBBackend.swift b/Database/Sources/Database/RocksDBBackend.swift index a24883bd..ebd45bd6 100644 --- a/Database/Sources/Database/RocksDBBackend.swift +++ b/Database/Sources/Database/RocksDBBackend.swift @@ -225,7 +225,6 @@ extension RocksDBBackend: BlockchainDataProviderProtocol { // TODO: batch delete try blocks.delete(key: hash) - try guaranteedWorkReports.delete(key: hash) if let block = try await getBlock(hash: hash) { try blockHashByTimeslot.delete(key: block.header.timeslot) } @@ -235,6 +234,11 @@ extension RocksDBBackend: BlockchainDataProviderProtocol { } try blockNumberByHash.delete(key: hash) } + + public func remove(workReportHash: Data32) async throws { + logger.trace("remove() \(workReportHash)") + try guaranteedWorkReports.delete(key: workReportHash) + } } extension RocksDBBackend: StateBackendProtocol { diff --git a/Networking/Tests/MsQuicSwiftTests/NetAddrTests.swift b/Networking/Tests/MsQuicSwiftTests/NetAddrTests.swift index dbd3104c..4bfc9247 100644 --- a/Networking/Tests/MsQuicSwiftTests/NetAddrTests.swift +++ b/Networking/Tests/MsQuicSwiftTests/NetAddrTests.swift @@ -1,7 +1,9 @@ import Foundation -import MsQuicSwift -@testable import Networking import Testing +import TracingUtils +import Utils + +@testable import MsQuicSwift struct NetAddrTests { @Test diff --git a/Node/Package.swift b/Node/Package.swift index e895c5d1..018120c6 100644 --- a/Node/Package.swift +++ b/Node/Package.swift @@ -22,7 +22,7 @@ let package = Package( .package(path: "../TracingUtils"), .package(path: "../Utils"), .package(path: "../Database"), - .package(url: "https://github.com/apple/swift-testing.git", branch: "6.0.3"), + .package(url: "https://github.com/apple/swift-testing.git", branch: "6.0.0"), .package(url: "https://github.com/gh123man/Async-Channels.git", from: "1.0.2"), ], targets: [ diff --git a/Node/Sources/Node/NetworkingProtocol/CommonEphemeral/ShardDistributionMessage.swift b/Node/Sources/Node/NetworkingProtocol/CommonEphemeral/ShardDistributionMessage.swift index dadf3be2..8c802069 100644 --- a/Node/Sources/Node/NetworkingProtocol/CommonEphemeral/ShardDistributionMessage.swift +++ b/Node/Sources/Node/NetworkingProtocol/CommonEphemeral/ShardDistributionMessage.swift @@ -5,9 +5,9 @@ import Utils public struct ShardDistributionMessage: Codable, Sendable, Equatable, Hashable { public var erasureRoot: Data32 - public var shardIndex: UInt32 + public var shardIndex: UInt16 - public init(erasureRoot: Data32, shardIndex: UInt32) { + public init(erasureRoot: Data32, shardIndex: UInt16) { self.erasureRoot = erasureRoot self.shardIndex = shardIndex } diff --git a/Node/Sources/Node/NetworkingProtocol/CommonEphemeral/StateRequest.swift b/Node/Sources/Node/NetworkingProtocol/CommonEphemeral/StateRequest.swift index 9dcd6622..ae97335d 100644 --- a/Node/Sources/Node/NetworkingProtocol/CommonEphemeral/StateRequest.swift +++ b/Node/Sources/Node/NetworkingProtocol/CommonEphemeral/StateRequest.swift @@ -5,8 +5,8 @@ import Utils public struct StateRequest: Codable, Sendable, Equatable, Hashable { public var headerHash: Data32 - public var startKey: Data31 // [u8; 31] - public var endKey: Data31 // [u8; 31] + public var startKey: Data31 + public var endKey: Data31 public var maxSize: UInt32 public init(headerHash: Data32, startKey: Data31, endKey: Data31, maxSize: UInt32) { diff --git a/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift b/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift index af5abf3d..09c945c5 100644 --- a/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift +++ b/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift @@ -412,9 +412,7 @@ struct HandlerImpl: NetworkProtocolHandler { let resp = try await blockchain.waitFor(RuntimeEvents.WorkReportReceivedResponse.self) { event in hash == event.workReportHash } - if case let .failure(error) = resp.result { - throw error // failed - } + _ = try resp.result.get() return [] case let .workReportRequest(message): let workReportRef = try await blockchain.dataProvider.getGuaranteedWorkReport(hash: message.workReportHash) @@ -423,13 +421,16 @@ struct HandlerImpl: NetworkProtocolHandler { } return [] case let .shardDistribution(message): - blockchain - .publish(event: RuntimeEvents.ShardDistributionReceived(erasureRoot: message.erasureRoot, shardIndex: message.shardIndex)) - // TODO: waitfor ShardDistributionReceivedResponse - // let resp = try await blockchain.waitFor(RuntimeEvents.ShardDistributionReceivedResponse.self) { event in - // - // } - return [] + let receivedEvent = RuntimeEvents.ShardDistributionReceived(erasureRoot: message.erasureRoot, shardIndex: message.shardIndex) + let requestId = try receivedEvent.generateRequestId() + + blockchain.publish(event: receivedEvent) + + let resp = try await blockchain.waitFor(RuntimeEvents.ShardDistributionReceivedResponse.self) { event in + requestId == event.requestId + } + let (bundleShard, segmentShards, justification) = try resp.result.get() + return try [JamEncoder.encode(bundleShard, segmentShards, justification)] case let .auditShardRequest(message): blockchain .publish(event: RuntimeEvents.AuditShardRequestReceived(erasureRoot: message.erasureRoot, shardIndex: message.shardIndex)) diff --git a/Node/Tests/NodeTests/NetworkManagerTests.swift b/Node/Tests/NodeTests/NetworkManagerTests.swift index 0a73d343..a314a75e 100644 --- a/Node/Tests/NodeTests/NetworkManagerTests.swift +++ b/Node/Tests/NodeTests/NetworkManagerTests.swift @@ -384,35 +384,6 @@ struct NetworkManagerTests { #expect(data == []) } - @Test - func testHandleShardDistribution() async throws { - let erasureRoot = Data32(repeating: 1) - let shardIndex: UInt32 = 2 - - let distributionMessage = CERequest.shardDistribution(ShardDistributionMessage( - erasureRoot: erasureRoot, - shardIndex: shardIndex - )) - - let message = try ShardDistributionMessage.decode(data: distributionMessage.encode(), config: services.config) - #expect(shardIndex == message.shardIndex) - - _ = try await network.handler.handle(ceRequest: distributionMessage) - - let events = await storeMiddleware.wait() - - let receivedEvent = events.first { - if let event = $0 as? RuntimeEvents.ShardDistributionReceived { - return event.erasureRoot == erasureRoot && event.shardIndex == shardIndex - } - return false - } as? RuntimeEvents.ShardDistributionReceived - - let event = try #require(receivedEvent) - #expect(event.erasureRoot == erasureRoot) - #expect(event.shardIndex == shardIndex) - } - @Test func testHandleAuditShardRequest() async throws { let testErasureRoot = Data32(repeating: 1) diff --git a/RPC/.swiftpm/xcode/xcshareddata/xcschemes/RPC.xcscheme b/RPC/.swiftpm/xcode/xcshareddata/xcschemes/RPC.xcscheme new file mode 100644 index 00000000..0d6c1c65 --- /dev/null +++ b/RPC/.swiftpm/xcode/xcshareddata/xcschemes/RPC.xcscheme @@ -0,0 +1,67 @@ + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/RPC/.swiftpm/xcode/xcshareddata/xcschemes/RPCTests.xcscheme b/RPC/.swiftpm/xcode/xcshareddata/xcschemes/RPCTests.xcscheme new file mode 100644 index 00000000..4c208cb6 --- /dev/null +++ b/RPC/.swiftpm/xcode/xcshareddata/xcschemes/RPCTests.xcscheme @@ -0,0 +1,54 @@ + + + + + + + + + + + + + + + + + + + + + diff --git a/boka.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved b/boka.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved index d625cece..df1b92eb 100644 --- a/boka.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved +++ b/boka.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved @@ -1,5 +1,5 @@ { - "originHash" : "29f4f0920a303b91d3ab3d9a73e42e497b83eeafb0e20a02a020e5dbbc9bd47c", + "originHash" : "b24bc350cefbc4a11375aec1aa77c5f0b7f73e4db0612fec8566457591c3980d", "pins" : [ { "identity" : "async-channels", @@ -301,7 +301,7 @@ { "identity" : "swift-syntax", "kind" : "remoteSourceControl", - "location" : "https://github.com/apple/swift-syntax.git", + "location" : "https://github.com/swiftlang/swift-syntax.git", "state" : { "revision" : "0687f71944021d616d34d922343dcef086855920", "version" : "600.0.1" @@ -321,8 +321,8 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/apple/swift-testing.git", "state" : { - "branch" : "0.10.0", - "revision" : "69d59cfc76e5daf498ca61f5af409f594768eef9" + "branch" : "6.0.0", + "revision" : "9aa8076dff01b66bcff9335cde02380d59acacc0" } }, { @@ -330,8 +330,8 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/vapor/vapor.git", "state" : { - "revision" : "87b0edd2633c35de543cb7573efe5fbf456181bc", - "version" : "4.114.1" + "branch" : "4.107.0", + "revision" : "ec23f07eb2eda35f6f179a8c4607769287073f8d" } }, {