Skip to content
This repository was archived by the owner on Mar 23, 2021. It is now read-only.

Commit 0995f99

Browse files
bors[bot]Tobin C. Harding
andauthored
Merge #1733
1733: Refactor Spawn r=tcharding a=tcharding The `Spawn` trait is implemented on `LedgerConnectors`, there are a bunch of refactorings that can be done, this patch does them all. Some highlights are: - Move `http_api::Dependencies` to `crate::Facade` - Inline `LedgerConnectors` fields into `Facade` - Create `init_accepted_swap()` that does the state store insert, initiates the state_machine for this swap, and spawns the swap Resolves: #1679 Co-authored-by: Tobin C. Harding <tobin.harding@coblox.tech>
2 parents 3d8f817 + e2a405f commit 0995f99

File tree

18 files changed

+386
-326
lines changed

18 files changed

+386
-326
lines changed

cnd/src/http_api/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,11 @@ pub mod routes;
33
#[macro_use]
44
pub mod impl_serialize_http;
55
pub mod action;
6-
mod dependencies;
76
mod ethereum_network;
87
mod problem;
98
mod swap_resource;
109

1110
pub use self::{
12-
dependencies::*,
1311
problem::*,
1412
swap_resource::{SwapParameters, SwapResource, SwapStatus},
1513
};

cnd/src/http_api/route_factory.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,10 @@ use crate::{
44
http_api,
55
network::{Network, SendRequest},
66
seed::SwapSeed,
7-
swap_protocols::{
8-
self,
9-
rfc003::{state_store::StateStore, Spawn},
10-
SwapId,
11-
},
7+
swap_protocols::{self, rfc003::state_store::StateStore, LedgerEventsCreator, SwapId},
128
};
139
use libp2p::PeerId;
10+
use tokio::executor::Executor;
1411
use warp::{self, filters::BoxedFilter, Filter, Reply};
1512

1613
pub const RFC003: &str = "rfc003";
@@ -26,12 +23,13 @@ pub fn new_action_link(id: &SwapId, action: &str) -> String {
2623
pub fn create<
2724
D: Clone
2825
+ StateStore
26+
+ Executor
2927
+ Network
3028
+ SendRequest
31-
+ Spawn
3229
+ SwapSeed
3330
+ DetermineTypes
3431
+ Retrieve
32+
+ LedgerEventsCreator
3533
+ Saver,
3634
>(
3735
peer_id: PeerId,

cnd/src/http_api/routes/rfc003/handlers/action.rs

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,26 +12,35 @@ use crate::{
1212
network::Network,
1313
seed::SwapSeed,
1414
swap_protocols::{
15+
self,
1516
actions::Actions,
1617
rfc003::{
1718
self,
1819
actions::{Action, ActionKind},
1920
bob::State,
2021
messages::{Decision, IntoAcceptMessage},
2122
state_store::StateStore,
22-
Spawn,
2323
},
24-
SwapId,
24+
LedgerEventsCreator, SwapId,
2525
},
2626
};
2727
use anyhow::Context;
28-
use futures::Stream;
2928
use libp2p_comit::frame::Response;
3029
use std::fmt::Debug;
30+
use tokio::executor::Executor;
3131
use warp::http;
3232

3333
#[allow(clippy::unit_arg, clippy::let_unit_value, clippy::cognitive_complexity)]
34-
pub async fn handle_action<D: StateStore + Network + Spawn + SwapSeed + Saver + DetermineTypes>(
34+
pub async fn handle_action<
35+
D: StateStore
36+
+ Network
37+
+ SwapSeed
38+
+ Saver
39+
+ DetermineTypes
40+
+ LedgerEventsCreator
41+
+ Executor
42+
+ Clone,
43+
>(
3544
method: http::Method,
3645
swap_id: SwapId,
3746
action_kind: ActionKind,
@@ -76,16 +85,12 @@ pub async fn handle_action<D: StateStore + Network + Spawn + SwapSeed + Saver +
7685
})?;
7786

7887
let swap_request = state.request();
79-
let seed = dependencies.swap_seed(swap_id);
80-
let state = State::accepted(swap_request.clone(), accept_message, seed);
81-
StateStore::insert(&dependencies, swap_id, state);
82-
83-
let receiver = Spawn::spawn(&dependencies, swap_request, accept_message);
84-
85-
tokio::spawn(receiver.for_each(move |update| {
86-
StateStore::update::<State<AL, BL, AA, BA>>(&dependencies, &swap_id, update);
87-
Ok(())
88-
}));
88+
swap_protocols::init_accepted_swap(
89+
&dependencies,
90+
swap_request,
91+
accept_message,
92+
types.role,
93+
)?;
8994

9095
Ok(ActionResponseBody::None)
9196
}

cnd/src/http_api/routes/rfc003/handlers/post_swap.rs

Lines changed: 38 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,38 @@ use crate::{
55
network::{DialInformation, SendRequest},
66
seed::SwapSeed,
77
swap_protocols::{
8+
self,
89
asset::Asset,
910
ledger,
1011
rfc003::{
11-
self, alice::State, create_ledger_events::CreateLedgerEvents, state_store::StateStore,
12-
Accept, Ledger, Request, SecretHash, SecretSource, Spawn,
12+
self, alice::State, state_store::StateStore, Accept, Decline, Ledger, Request,
13+
SecretHash, SecretSource,
1314
},
14-
HashFunction, LedgerConnectors, Role, SwapId,
15+
HashFunction, LedgerEventsCreator, Role, SwapId,
1516
},
1617
timestamp::Timestamp,
18+
CreateLedgerEvents,
1719
};
18-
use futures::Stream;
20+
use anyhow::Context;
21+
use futures::Future;
1922
use futures_core::{
2023
compat::Future01CompatExt,
2124
future::{FutureExt, TryFutureExt},
2225
};
2326
use serde::{Deserialize, Serialize};
2427
use std::str::FromStr;
28+
use tokio::executor::Executor;
2529

2630
pub async fn handle_post_swap<
27-
D: Clone + StateStore + Save<Swap> + SendRequest + Spawn + SwapSeed + Saver,
31+
D: Clone
32+
+ Executor
33+
+ StateStore
34+
+ Save<Swap>
35+
+ SendRequest
36+
+ SwapSeed
37+
+ Saver
38+
+ Clone
39+
+ LedgerEventsCreator,
2840
>(
2941
dependencies: D,
3042
body: serde_json::Value,
@@ -196,14 +208,18 @@ async fn initiate_request<D, AL, BL, AA, BA>(
196208
swap_request: rfc003::Request<AL, BL, AA, BA>,
197209
) -> anyhow::Result<()>
198210
where
199-
LedgerConnectors: CreateLedgerEvents<AL, AA> + CreateLedgerEvents<BL, BA>,
200211
D: StateStore
212+
+ Executor
201213
+ SendRequest
202-
+ Spawn
203214
+ SwapSeed
204-
+ Saver
205215
+ Save<Request<AL, BL, AA, BA>>
206-
+ Save<Accept<AL, BL>>,
216+
+ Save<Accept<AL, BL>>
217+
+ Save<Swap>
218+
+ Save<Decline>
219+
+ LedgerEventsCreator
220+
+ CreateLedgerEvents<AL, AA>
221+
+ CreateLedgerEvents<BL, BA>
222+
+ Clone,
207223
AL: Ledger,
208224
BL: Ledger,
209225
AA: Asset,
@@ -224,41 +240,32 @@ where
224240
.send_request(peer.clone(), swap_request.clone())
225241
.compat()
226242
.await
227-
.map_err(|e| {
228-
log::error!(
229-
"Failed to send swap request to {} because {:?}",
230-
peer.clone(),
231-
e
232-
);
233-
})?;
243+
.with_context(|| format!("Failed to send swap request to {}", peer.clone()))?;
234244

235245
match response {
236246
Ok(accept) => {
237-
let state = State::accepted(swap_request.clone(), accept, seed);
238-
StateStore::insert(&dependencies, id, state.clone());
239-
Save::save(&dependencies, accept)
240-
.await
241-
.expect("failed to save message to db");
242-
243-
let receiver = Spawn::spawn(&dependencies, swap_request, accept);
244-
tokio::spawn(receiver.for_each(move |update| {
245-
StateStore::update::<State<AL, BL, AA, BA>>(&dependencies, &id, update);
246-
Ok(())
247-
}));
247+
Save::save(&dependencies, accept).await?;
248+
249+
swap_protocols::init_accepted_swap(
250+
&dependencies,
251+
swap_request,
252+
accept,
253+
Role::Alice,
254+
)?;
248255
}
249256
Err(decline) => {
250257
log::info!("Swap declined: {:?}", decline);
251258
let state = State::declined(swap_request.clone(), decline.clone(), seed);
252259
StateStore::insert(&dependencies, id, state.clone());
253-
Save::save(&dependencies, decline.clone())
254-
.await
255-
.expect("failed to save message to db");
260+
Save::save(&dependencies, decline.clone()).await?;
256261
}
257262
};
258263
Ok(())
259264
}
260265
};
261-
tokio::spawn(future.boxed().compat());
266+
tokio::spawn(future.boxed().compat().map_err(|e: anyhow::Error| {
267+
log::error!("{:?}", e);
268+
}));
262269
Ok(())
263270
}
264271

cnd/src/http_api/routes/rfc003/mod.rs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ use crate::{
1616
network::{Network, SendRequest},
1717
seed::SwapSeed,
1818
swap_protocols::{
19-
rfc003::{actions::ActionKind, state_store::StateStore, Spawn},
20-
SwapId,
19+
rfc003::{actions::ActionKind, state_store::StateStore},
20+
LedgerEventsCreator, SwapId,
2121
},
2222
};
2323
use futures::Future;
@@ -27,9 +27,19 @@ use warp::{http, Rejection, Reply};
2727

2828
pub use self::swap_state::{LedgerState, SwapCommunication, SwapCommunicationState, SwapState};
2929
use crate::{db::Saver, http_api::problem};
30+
use tokio::executor::Executor;
3031

3132
#[allow(clippy::needless_pass_by_value)]
32-
pub fn post_swap<D: Clone + StateStore + Save<Swap> + SendRequest + Spawn + SwapSeed + Saver>(
33+
pub fn post_swap<
34+
D: Clone
35+
+ StateStore
36+
+ Executor
37+
+ Save<Swap>
38+
+ SendRequest
39+
+ SwapSeed
40+
+ Saver
41+
+ LedgerEventsCreator,
42+
>(
3343
dependencies: D,
3444
body: serde_json::Value,
3545
) -> impl Future<Item = impl Reply, Error = Rejection> {
@@ -60,7 +70,17 @@ pub fn get_swap<D: DetermineTypes + Retrieve + StateStore>(
6070
}
6171

6272
#[allow(clippy::needless_pass_by_value)]
63-
pub fn action<D: DetermineTypes + Retrieve + StateStore + Network + Spawn + SwapSeed + Saver>(
73+
pub fn action<
74+
D: DetermineTypes
75+
+ Retrieve
76+
+ StateStore
77+
+ Executor
78+
+ Clone
79+
+ Network
80+
+ SwapSeed
81+
+ Saver
82+
+ LedgerEventsCreator,
83+
>(
6484
method: http::Method,
6585
id: SwapId,
6686
action_kind: ActionKind,

cnd/src/lib.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ pub mod spectral_ext;
3535
pub mod swap_protocols;
3636
pub mod timestamp;
3737

38+
use crate::swap_protocols::{
39+
asset::Asset,
40+
rfc003::{events::LedgerEvents, Ledger},
41+
};
3842
use anyhow::Context;
3943
use directories::ProjectDirs;
4044
use std::path::{Path, PathBuf};
@@ -63,3 +67,7 @@ pub fn default_config_path() -> anyhow::Result<PathBuf> {
6367
pub fn data_dir() -> Option<PathBuf> {
6468
ProjectDirs::from("", "", "comit").map(|proj_dirs| proj_dirs.data_dir().to_path_buf())
6569
}
70+
71+
pub trait CreateLedgerEvents<L: Ledger, A: Asset> {
72+
fn create_ledger_events(&self) -> Box<dyn LedgerEvents<L, A>>;
73+
}

cnd/src/load_swaps.rs

Lines changed: 28 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,71 +1,48 @@
11
#![allow(clippy::type_repetition_in_bounds)]
22
use crate::{
3-
db::{AcceptedSwap, DetermineTypes, LoadAcceptedSwap, Retrieve, Sqlite},
4-
seed::Seed,
3+
db::{DetermineTypes, LoadAcceptedSwap, Retrieve},
4+
ethereum::{Erc20Token, EtherQuantity},
5+
seed::SwapSeed,
56
swap_protocols::{
6-
ledger::LedgerConnectors,
7-
rfc003::{
8-
state_store::{InMemoryStateStore, StateStore},
9-
Spawn,
10-
},
7+
self,
8+
ledger::{Bitcoin, Ethereum},
9+
rfc003::state_store::StateStore,
10+
LedgerEventsCreator,
1111
},
1212
};
13-
use futures::Stream;
14-
use std::sync::Arc;
13+
use tokio::executor::Executor;
1514

1615
#[allow(clippy::cognitive_complexity)]
17-
pub async fn load_swaps_from_database(
18-
ledger_events: LedgerConnectors,
19-
state_store: Arc<InMemoryStateStore>,
20-
seed: Seed,
21-
db: Sqlite,
22-
) -> anyhow::Result<()> {
16+
pub async fn load_swaps_from_database<D>(dependencies: D) -> anyhow::Result<()>
17+
where
18+
D: StateStore
19+
+ Executor
20+
+ Clone
21+
+ SwapSeed
22+
+ LedgerEventsCreator
23+
+ Retrieve
24+
+ DetermineTypes
25+
+ LoadAcceptedSwap<Bitcoin, Ethereum, bitcoin::Amount, EtherQuantity>
26+
+ LoadAcceptedSwap<Ethereum, Bitcoin, EtherQuantity, bitcoin::Amount>
27+
+ LoadAcceptedSwap<Bitcoin, Ethereum, bitcoin::Amount, Erc20Token>
28+
+ LoadAcceptedSwap<Ethereum, Bitcoin, Erc20Token, bitcoin::Amount>,
29+
{
2330
log::debug!("loading swaps from database ...");
2431

25-
for swap in db.all().await?.iter() {
32+
for swap in Retrieve::all(&dependencies).await?.iter() {
2633
let swap_id = swap.swap_id;
2734
log::debug!("got swap from database: {}", swap_id);
2835

29-
let types = db.determine_types(&swap_id).await?;
36+
let types = DetermineTypes::determine_types(&dependencies, &swap_id).await?;
3037

3138
with_swap_types!(types, {
32-
let accepted: Result<AcceptedSwap<AL, BL, AA, BA>, anyhow::Error> =
33-
db.load_accepted_swap(&swap_id).await;
39+
let accepted =
40+
LoadAcceptedSwap::<AL, BL, AA, BA>::load_accepted_swap(&dependencies, &swap_id)
41+
.await;
3442

3543
match accepted {
3644
Ok((request, accept, _at)) => {
37-
match types.role {
38-
Role::Alice => {
39-
let state = alice::State::accepted(request.clone(), accept, seed);
40-
state_store.insert(swap_id, state);
41-
42-
let receiver = ledger_events.spawn(request, accept);
43-
44-
tokio::spawn(receiver.for_each({
45-
let state_store = state_store.clone();
46-
move |update| {
47-
state_store
48-
.update::<alice::State<AL, BL, AA, BA>>(&swap_id, update);
49-
Ok(())
50-
}
51-
}));
52-
}
53-
Role::Bob => {
54-
let state = bob::State::accepted(request.clone(), accept, seed);
55-
state_store.insert(swap_id, state);
56-
57-
let receiver = ledger_events.spawn(request, accept);
58-
59-
tokio::spawn(receiver.for_each({
60-
let state_store = state_store.clone();
61-
move |update| {
62-
state_store
63-
.update::<bob::State<AL, BL, AA, BA>>(&swap_id, update);
64-
Ok(())
65-
}
66-
}));
67-
}
68-
};
45+
swap_protocols::init_accepted_swap(&dependencies, request, accept, types.role)?;
6946
}
7047
Err(e) => log::error!("failed to load swap: {}, continuing ...", e),
7148
};

0 commit comments

Comments
 (0)