Skip to content

Commit 36e5356

Browse files
authored
[CHRON-9324] Send RabbitMQ error message on transaction error (#37)
* Send RabbitMQ error message on transaction error * Add transaction error type field * Use operation id as a correlation id
1 parent fbcdd03 commit 36e5356

File tree

3 files changed

+64
-24
lines changed

3 files changed

+64
-24
lines changed

rabbitmq/src/client.rs

Lines changed: 56 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@ use common::try_spawn;
44
use enclose::enclose;
55
use ethcore::client::{BlockChainClient, BlockId, ChainNotify, ChainRouteType, NewBlocks};
66
use ethcore::miner;
7-
use failure::{Error, ResultExt};
8-
use futures::future::lazy;
7+
use ethereum_types::H256;
8+
use failure::{format_err, Error};
9+
use futures::future::{err, lazy};
910
use handler::{Handler, Sender};
1011
use parity_runtime::Executor;
11-
use rabbitmq_adaptor::{ConsumerResult, RabbitConnection, RabbitExt};
12+
use rabbitmq_adaptor::{ConsumerResult, DeliveryExt, RabbitConnection, RabbitExt};
1213
use serde::Deserialize;
1314
use serde_json;
1415
use std::sync::Arc;
@@ -21,7 +22,10 @@ use DEFAULT_REPLY_QUEUE;
2122
use LOG_TARGET;
2223
use NEW_BLOCK_EXCHANGE_NAME;
2324
use NEW_BLOCK_ROUTING_KEY;
25+
use OPERATION_ID;
2426
use PUBLIC_TRANSACTION_QUEUE;
27+
use TX_ERROR_EXCHANGE_NAME;
28+
use TX_ERROR_ROUTING_KEY;
2529

2630
#[derive(Debug, Default, Clone, PartialEq)]
2731
pub struct RabbitMqConfig {
@@ -38,6 +42,20 @@ pub struct PubSubClient<C> {
3842
#[derive(Deserialize)]
3943
struct TransactionMessage {
4044
pub data: Bytes,
45+
pub transaction_hash: H256,
46+
}
47+
48+
#[derive(Serialize)]
49+
struct TransactionErrorMessage {
50+
pub transaction_hash: H256,
51+
pub error_message: String,
52+
pub error_type: ErrorType,
53+
}
54+
55+
#[derive(Serialize)]
56+
pub enum ErrorType {
57+
LocalTransactionError,
58+
TransactionRejected,
4159
}
4260

4361
impl<C: 'static + miner::BlockChainClient + BlockChainClient> PubSubClient<C> {
@@ -58,21 +76,42 @@ impl<C: 'static + miner::BlockChainClient + BlockChainClient> PubSubClient<C> {
5876
.clone()
5977
.register_consumer(
6078
PUBLIC_TRANSACTION_QUEUE.to_string(),
61-
enclose!(() move |message| {
62-
Box::new(std::str::from_utf8(&message.data)
63-
.map_err(Error::from)
64-
.and_then(|payload| {
65-
serde_json::from_str(payload)
66-
.context("Could not deserialize AMQP message payload")
67-
.map_err(Error::from)
68-
})
69-
.and_then(|transaction_message: TransactionMessage| {
70-
sender_handler.send_transaction(transaction_message.data)
71-
.context(format!("Failed to send transaction"))
72-
.map_err(Error::from)
73-
})
79+
enclose!((rabbit) move |message| {
80+
let operation_id = message.get_header(OPERATION_ID);
81+
if operation_id.is_none() {
82+
return Box::new(err(format_err!("Missing protocol-id header")));
83+
}
84+
let operation_id = operation_id.unwrap();
85+
let payload = std::str::from_utf8(&message.data);
86+
if payload.is_err() {
87+
return Box::new(err(format_err!("Could not parse AMQP message")));
88+
}
89+
let payload = payload.unwrap();
90+
let transaction_message = serde_json::from_str(payload);
91+
if transaction_message.is_err() {
92+
return Box::new(err(format_err!("Could not deserialize AMQP message payload")));
93+
}
94+
let transaction_message: TransactionMessage = transaction_message.unwrap();
95+
let transaction_hash = transaction_message.transaction_hash;
96+
Box::new(sender_handler.send_transaction(transaction_message.data)
97+
.map(|_| ())
7498
.into_future()
75-
.map(|_| ConsumerResult::ACK))
99+
.or_else(enclose!((rabbit) move |error| {
100+
let tx_error = TransactionErrorMessage {
101+
transaction_hash,
102+
error_message: format!("{}", error),
103+
error_type: ErrorType::LocalTransactionError,
104+
};
105+
let serialized_message = serde_json::to_string(&tx_error).unwrap();
106+
rabbit.clone().publish(
107+
TX_ERROR_EXCHANGE_NAME.to_string(),
108+
TX_ERROR_ROUTING_KEY.to_string(),
109+
serialized_message.into(),
110+
vec![(OPERATION_ID.to_string(), operation_id)],
111+
).map(|_| ())
112+
}))
113+
.map(|_| ConsumerResult::ACK)
114+
)
76115
}),
77116
)
78117
.map_err(Error::from)

rabbitmq/src/handler.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1-
use failure::{Error, ResultExt};
21
use std::sync::Arc;
32

43
use ethcore::client::BlockChainClient;
54
use ethcore::miner::{self, MinerService};
65
use ethereum_types::H256;
7-
use hex;
86
use rlp::Rlp;
97
use common_types::transaction::{PendingTransaction, SignedTransaction};
8+
use common_types::transaction::Error as TransactionError;
109
use types::Bytes;
1110

1211
/// A Sender which uses references to a client and miner in order to send transactions
@@ -24,20 +23,19 @@ impl<C, M> Sender<C, M> {
2423
}
2524

2625
pub trait Handler: Sync + Send {
27-
fn send_transaction(&self, payload: Bytes) -> Result<H256, Error>;
26+
fn send_transaction(&self, payload: Bytes) -> Result<H256, TransactionError>;
2827
}
2928

3029
impl<C: miner::BlockChainClient + BlockChainClient, M: MinerService> Handler for Sender<C, M> {
31-
fn send_transaction(&self, raw: Bytes) -> Result<H256, Error> {
30+
fn send_transaction(&self, raw: Bytes) -> Result<H256, TransactionError> {
3231
let signed_transaction = Rlp::new(&raw.into_vec())
3332
.as_val()
34-
.map_err(Error::from)
35-
.and_then(|tx| SignedTransaction::new(tx).map_err(Error::from))?;
33+
.map_err(TransactionError::from)
34+
.and_then(|tx| SignedTransaction::new(tx).map_err(TransactionError::from))?;
3635
let pending_transaction: PendingTransaction = signed_transaction.into();
3736
let hash = pending_transaction.transaction.hash();
3837
self.miner
3938
.import_claimed_local_transaction(&*self.client, pending_transaction, false)
40-
.map_err(Error::from)
4139
.map(|_| hash)
4240
}
4341
}

rabbitmq/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,7 @@ const DEFAULT_REPLY_QUEUE: &'static str = "BlockchainInterface.default";
3636
const LOG_TARGET: &'static str = "rabbitmq";
3737
const NEW_BLOCK_EXCHANGE_NAME: &'static str = "BlockchainInterface.Output";
3838
const NEW_BLOCK_ROUTING_KEY: &'static str = "interface.in.new-block";
39+
const OPERATION_ID: &'static str = "operation-id";
3940
const PUBLIC_TRANSACTION_QUEUE: &'static str = "BlockchainInterface.public-tx";
41+
const TX_ERROR_EXCHANGE_NAME: &'static str = "BlockchainInterface.TransactionError";
42+
const TX_ERROR_ROUTING_KEY: &'static str = "interface.out.public-tx.error";

0 commit comments

Comments
 (0)