|
1 |
| -pub mod send_request; |
2 | 1 | pub mod transport;
|
3 | 2 |
|
4 |
| -pub use send_request::*; |
5 |
| - |
6 | 3 | use crate::{
|
7 | 4 | btsieve::{bitcoin::BitcoindConnector, ethereum::Web3Connector},
|
8 | 5 | db::{Save, Saver, Sqlite, Swap},
|
@@ -31,7 +28,7 @@ use libp2p::{
|
31 | 28 | Multiaddr, NetworkBehaviour, PeerId, Swarm, Transport,
|
32 | 29 | };
|
33 | 30 | use libp2p_comit::{
|
34 |
| - frame::{OutboundRequest, Response, ValidatedInboundRequest}, |
| 31 | + frame::{self, OutboundRequest, Response, ValidatedInboundRequest}, |
35 | 32 | BehaviourOutEvent, Comit, PendingInboundRequest,
|
36 | 33 | };
|
37 | 34 | use std::{
|
@@ -79,6 +76,23 @@ impl Display for DialInformation {
|
79 | 76 | }
|
80 | 77 | }
|
81 | 78 |
|
| 79 | +#[derive(Debug, thiserror::Error)] |
| 80 | +pub enum RequestError { |
| 81 | + #[error("peer node had an internal error while processing the request")] |
| 82 | + InternalError, |
| 83 | + #[error("peer node produced an invalid response")] |
| 84 | + InvalidResponse, |
| 85 | + #[error("failed to establish a new connection to make the request")] |
| 86 | + Connecting(io::ErrorKind), |
| 87 | + #[error("unable to send the data on the existing connection")] |
| 88 | + Connection, |
| 89 | +} |
| 90 | + |
| 91 | +#[derive(Debug, serde::Deserialize)] |
| 92 | +pub struct Reason { |
| 93 | + pub value: SwapDeclineReason, |
| 94 | +} |
| 95 | + |
82 | 96 | impl<TSubstream> ComitNode<TSubstream> {
|
83 | 97 | pub fn new(
|
84 | 98 | bitcoin_connector: BitcoindConnector,
|
@@ -340,10 +354,22 @@ where
|
340 | 354 | Ok(())
|
341 | 355 | }
|
342 | 356 |
|
| 357 | +/// Defines all the operations the Comit Node can perform in regards to the |
| 358 | +/// Comit network. |
| 359 | +/// |
| 360 | +/// Ideally, this trait would not be necessary and we would instead have one |
| 361 | +/// trait per function. Unfortunately, an instance of `Swarm` is very hard to |
| 362 | +/// name (see the complex traits bound below). To avoid this kind of code 4 |
| 363 | +/// times, we bundle all these methods up into one trait. |
343 | 364 | pub trait Network: Send + Sync + 'static {
|
344 | 365 | fn comit_peers(&self) -> Box<dyn Iterator<Item = (PeerId, Vec<Multiaddr>)> + Send + 'static>;
|
345 | 366 | fn listen_addresses(&self) -> Vec<Multiaddr>;
|
346 | 367 | fn pending_request_for(&self, swap: SwapId) -> Option<oneshot::Sender<Response>>;
|
| 368 | + fn send_request<AL: rfc003::Ledger, BL: rfc003::Ledger, AA: Asset, BA: Asset>( |
| 369 | + &self, |
| 370 | + peer_identity: DialInformation, |
| 371 | + request: rfc003::Request<AL, BL, AA, BA>, |
| 372 | + ) -> Box<dyn Future<Item = rfc003::Response<AL, BL>, Error = RequestError> + Send>; |
347 | 373 | }
|
348 | 374 |
|
349 | 375 | impl<
|
@@ -380,6 +406,85 @@ where
|
380 | 406 |
|
381 | 407 | response_channels.remove(&swap)
|
382 | 408 | }
|
| 409 | + |
| 410 | + fn send_request<AL: rfc003::Ledger, BL: rfc003::Ledger, AA: Asset, BA: Asset>( |
| 411 | + &self, |
| 412 | + dial_information: DialInformation, |
| 413 | + request: rfc003::Request<AL, BL, AA, BA>, |
| 414 | + ) -> Box<dyn Future<Item = rfc003::Response<AL, BL>, Error = RequestError> + Send> { |
| 415 | + let id = request.swap_id; |
| 416 | + let request = build_outbound_request(request) |
| 417 | + .expect("constructing a frame::OutoingRequest should never fail!"); |
| 418 | + |
| 419 | + let response = { |
| 420 | + let mut swarm = self.lock().unwrap(); |
| 421 | + log::debug!( |
| 422 | + "Making swap request to {}: {:?}", |
| 423 | + dial_information.clone(), |
| 424 | + request |
| 425 | + ); |
| 426 | + |
| 427 | + swarm.send_request(dial_information.clone(), request) |
| 428 | + }; |
| 429 | + |
| 430 | + let response = |
| 431 | + response.then(move |result| match result { |
| 432 | + Ok(mut response) => { |
| 433 | + let decision = response |
| 434 | + .take_header("decision") |
| 435 | + .map(Decision::from_header) |
| 436 | + .map_or(Ok(None), |x| x.map(Some)) |
| 437 | + .map_err(|e| { |
| 438 | + log::error!( |
| 439 | + "Could not deserialize header in response {:?}: {}", |
| 440 | + response, |
| 441 | + e, |
| 442 | + ); |
| 443 | + RequestError::InvalidResponse |
| 444 | + })?; |
| 445 | + |
| 446 | + match decision { |
| 447 | + Some(Decision::Accepted) => { |
| 448 | + match serde_json::from_value::< |
| 449 | + rfc003::messages::AcceptResponseBody<AL, BL>, |
| 450 | + >(response.body().clone()) |
| 451 | + { |
| 452 | + Ok(body) => Ok(Ok(rfc003::Accept { |
| 453 | + swap_id: id, |
| 454 | + beta_ledger_refund_identity: body.beta_ledger_refund_identity, |
| 455 | + alpha_ledger_redeem_identity: body.alpha_ledger_redeem_identity, |
| 456 | + })), |
| 457 | + Err(_e) => Err(RequestError::InvalidResponse), |
| 458 | + } |
| 459 | + } |
| 460 | + |
| 461 | + Some(Decision::Declined) => { |
| 462 | + match serde_json::from_value::<rfc003::messages::DeclineResponseBody>( |
| 463 | + response.body().clone(), |
| 464 | + ) { |
| 465 | + Ok(body) => Ok(Err(rfc003::Decline { |
| 466 | + swap_id: id, |
| 467 | + reason: body.reason, |
| 468 | + })), |
| 469 | + Err(_e) => Err(RequestError::InvalidResponse), |
| 470 | + } |
| 471 | + } |
| 472 | + |
| 473 | + None => Err(RequestError::InvalidResponse), |
| 474 | + } |
| 475 | + } |
| 476 | + Err(e) => { |
| 477 | + log::error!( |
| 478 | + "Unable to request over connection {:?}:{:?}", |
| 479 | + dial_information.clone(), |
| 480 | + e |
| 481 | + ); |
| 482 | + Err(RequestError::Connection) |
| 483 | + } |
| 484 | + }); |
| 485 | + |
| 486 | + Box::new(response) |
| 487 | + } |
383 | 488 | }
|
384 | 489 |
|
385 | 490 | impl<TSubstream> NetworkBehaviourEventProcess<BehaviourOutEvent> for ComitNode<TSubstream> {
|
@@ -460,3 +565,32 @@ fn rfc003_swap_request<AL: rfc003::Ledger, BL: rfc003::Ledger, AA: Asset, BA: As
|
460 | 565 | secret_hash: body.secret_hash,
|
461 | 566 | }
|
462 | 567 | }
|
| 568 | + |
| 569 | +fn build_outbound_request<AL: rfc003::Ledger, BL: rfc003::Ledger, AA: Asset, BA: Asset>( |
| 570 | + request: rfc003::Request<AL, BL, AA, BA>, |
| 571 | +) -> Result<frame::OutboundRequest, serde_json::Error> { |
| 572 | + let alpha_ledger_refund_identity = request.alpha_ledger_refund_identity; |
| 573 | + let beta_ledger_redeem_identity = request.beta_ledger_redeem_identity; |
| 574 | + let alpha_expiry = request.alpha_expiry; |
| 575 | + let beta_expiry = request.beta_expiry; |
| 576 | + let secret_hash = request.secret_hash; |
| 577 | + let protocol = SwapProtocol::Rfc003(request.hash_function); |
| 578 | + |
| 579 | + Ok(frame::OutboundRequest::new("SWAP") |
| 580 | + .with_header("id", request.swap_id.to_header()?) |
| 581 | + .with_header("alpha_ledger", request.alpha_ledger.into().to_header()?) |
| 582 | + .with_header("beta_ledger", request.beta_ledger.into().to_header()?) |
| 583 | + .with_header("alpha_asset", request.alpha_asset.into().to_header()?) |
| 584 | + .with_header("beta_asset", request.beta_asset.into().to_header()?) |
| 585 | + .with_header("protocol", protocol.to_header()?) |
| 586 | + .with_body(serde_json::to_value(rfc003::messages::RequestBody::< |
| 587 | + AL, |
| 588 | + BL, |
| 589 | + > { |
| 590 | + alpha_ledger_refund_identity, |
| 591 | + beta_ledger_redeem_identity, |
| 592 | + alpha_expiry, |
| 593 | + beta_expiry, |
| 594 | + secret_hash, |
| 595 | + })?)) |
| 596 | +} |
0 commit comments