Skip to content

Commit 5ec179a

Browse files
committed
connectd: Implement sending of start_batch
Implement the sending of `start_batch` and `protocol_batch_element` from `channeld` to `connectd`. Each real peer wire message is prefixed with `protocol_batch_element` so connectd can know the size of the message that were batched together. `connectd` intercepts `protocol_batch_element` messages and eats them (doesn’t forward them to peer) to get individual messages out of the batch. It needs this to be able to encrypt them individiaully. Afterwards it recombines the now encrypted messages into a single message to send over the wire to the peer. `channeld` remains responsible for making `start_batch` the first message of the message bundle.
1 parent b812360 commit 5ec179a

File tree

2 files changed

+140
-5
lines changed

2 files changed

+140
-5
lines changed

channeld/channeld.c

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1232,6 +1232,78 @@ static s64 sats_diff(struct amount_sat a, struct amount_sat b)
12321232
return (s64)a.satoshis - (s64)b.satoshis; /* Raw: splicing numbers can wrap! */
12331233
}
12341234

1235+
static void send_message_batch(struct peer *peer, u8 **msgs)
1236+
{
1237+
size_t size;
1238+
size_t hdr_size = tal_bytelen(towire_protocol_batch_element(tmpctx,
1239+
&peer->channel_id,
1240+
0));
1241+
u8 *batch_msg, *final_msg, *final_msg_ptr;
1242+
struct tlv_start_batch_tlvs *tlvs;
1243+
1244+
assert(tal_count(msgs) > 0);
1245+
1246+
/* When sending one message, no batching is required */
1247+
if (tal_count(msgs) == 1) {
1248+
peer_write(peer->pps, msgs[0]);
1249+
return;
1250+
}
1251+
1252+
/* We prefix each message with an interal wire type,
1253+
* protocol_batch_element. connectd will eat each message so they don't
1254+
* actually go out to the peer. It's just so connectd can chop up the
1255+
* message batch back out into individual messages. */
1256+
1257+
/* We start by calculating the total size */
1258+
size = 0;
1259+
1260+
/* Build the `start_batch` msg now so know it's size */
1261+
tlvs = tlv_start_batch_tlvs_new(tmpctx);
1262+
tlvs->batch_info = tal(tlvs, u16);
1263+
*tlvs->batch_info = WIRE_COMMITMENT_SIGNED;
1264+
batch_msg = towire_start_batch(tmpctx, &peer->channel_id,
1265+
tal_count(msgs), tlvs);
1266+
size += tal_bytelen(batch_msg) + hdr_size;
1267+
1268+
/* Count the size of all the messages in the batch */
1269+
for(u32 i = 0; i < tal_count(msgs); i++)
1270+
size += tal_bytelen(msgs[i]) + hdr_size;
1271+
1272+
/* Now we know the size of our `final_msg` so we allocate */
1273+
final_msg = tal_arr(tmpctx, u8, size);
1274+
final_msg_ptr = final_msg;
1275+
1276+
status_debug("proto_batch Building batch with %zu bytes, msgs: %zu",
1277+
size, tal_count(msgs));
1278+
1279+
/* Copy the bytes for `start_batch` prefix */
1280+
memcpy(final_msg_ptr,
1281+
towire_protocol_batch_element(tmpctx,
1282+
&peer->channel_id,
1283+
tal_bytelen(batch_msg)),
1284+
hdr_size);
1285+
final_msg_ptr += hdr_size;
1286+
1287+
memcpy(final_msg_ptr, batch_msg, tal_bytelen(batch_msg));
1288+
final_msg_ptr += tal_bytelen(batch_msg);
1289+
1290+
/* Now copy the bytes from all messages in `msgs` */
1291+
for(u32 i = 0; i < tal_count(msgs); i++) {
1292+
memcpy(final_msg_ptr,
1293+
towire_protocol_batch_element(tmpctx,
1294+
&peer->channel_id,
1295+
tal_bytelen(msgs[i])),
1296+
hdr_size);
1297+
final_msg_ptr += hdr_size;
1298+
1299+
memcpy(final_msg_ptr, msgs[i], tal_bytelen(msgs[i]));
1300+
final_msg_ptr += tal_bytelen(msgs[i]);
1301+
}
1302+
1303+
assert(final_msg + size == final_msg_ptr);
1304+
peer_write(peer->pps, take(final_msg));
1305+
}
1306+
12351307
static void send_commit(struct peer *peer)
12361308
{
12371309
const struct htlc **changed_htlcs;
@@ -1398,8 +1470,7 @@ static void send_commit(struct peer *peer)
13981470

13991471
peer->next_index[REMOTE]++;
14001472

1401-
for(u32 i = 0; i < tal_count(msgs); i++)
1402-
peer_write(peer->pps, take(msgs[i]));
1473+
send_message_batch(peer, msgs);
14031474

14041475
maybe_send_shutdown(peer);
14051476

@@ -5102,8 +5173,7 @@ static void resend_commitment(struct peer *peer, struct changed_htlc *last)
51025173
peer->splice_state->inflights[i]->remote_funding));
51035174
}
51045175

5105-
for(i = 0; i < tal_count(msgs); i++)
5106-
peer_write(peer->pps, take(msgs[i]));
5176+
send_message_batch(peer, msgs);
51075177

51085178
/* If we have already received the revocation for the previous, the
51095179
* other side shouldn't be asking for a retransmit! */

connectd/multiplex.c

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,66 @@ static struct io_plan *io_sock_shutdown_cb(struct io_conn *conn, struct peer *un
399399
return io_sock_shutdown(conn);
400400
}
401401

402+
/* Process and eat protocol_batch_element messages, encrypt each element message
403+
* and return the encrypted messages as one long byte array. */
404+
static u8 *process_batch_elements(struct peer *peer, const u8 *msg TAKES)
405+
{
406+
u8 *ret = tal_arr(peer, u8, 0);
407+
size_t ret_size = 0;
408+
const u8 *cursor = msg;
409+
size_t plen = tal_count(msg);
410+
411+
status_debug("Processing batch elements of %zu bytes. %s", plen,
412+
tal_hex(tmpctx, msg));
413+
414+
do {
415+
u8 *element_bytes;
416+
u16 element_size;
417+
struct channel_id channel_id;
418+
u8 *enc_msg;
419+
420+
if (fromwire_u16(&cursor, &plen) != WIRE_PROTOCOL_BATCH_ELEMENT)
421+
status_failed(STATUS_FAIL_PROTO_BATCH,
422+
"process_batch_elements on msg that is"
423+
" not WIRE_PROTOCOL_BATCH_ELEMENT. %s",
424+
tal_hexstr(tmpctx, cursor, plen));
425+
426+
fromwire_channel_id(&cursor, &plen, &channel_id);
427+
428+
element_size = fromwire_u16(&cursor, &plen);
429+
if (!element_size)
430+
status_failed(STATUS_FAIL_PROTO_BATCH,
431+
"process_batch_elements cannot have zero"
432+
" length elements. %s",
433+
tal_hexstr(tmpctx, cursor, plen));
434+
435+
element_bytes = fromwire_tal_arrn(NULL, &cursor, &plen,
436+
element_size);
437+
if (!element_bytes)
438+
status_failed(STATUS_FAIL_PROTO_BATCH,
439+
"process_batch_elements fromwire_tal_arrn"
440+
" %s",
441+
tal_hexstr(tmpctx, cursor, plen));
442+
443+
status_debug("Processing batch extracted item %s. %s",
444+
peer_wire_name(fromwire_peektype(element_bytes)),
445+
tal_hex(tmpctx, element_bytes));
446+
447+
enc_msg = cryptomsg_encrypt_msg(tmpctx, &peer->cs,
448+
take(element_bytes));
449+
450+
tal_resize(&ret, ret_size + tal_bytelen(enc_msg));
451+
memcpy(&ret[ret_size], enc_msg, tal_bytelen(enc_msg));
452+
ret_size += tal_bytelen(enc_msg);
453+
454+
} while(plen);
455+
456+
if (taken(msg))
457+
tal_free(msg);
458+
459+
return ret;
460+
}
461+
402462
static struct io_plan *encrypt_and_send(struct peer *peer,
403463
const u8 *msg TAKES,
404464
struct io_plan *(*next)
@@ -442,8 +502,13 @@ static struct io_plan *encrypt_and_send(struct peer *peer,
442502

443503
set_urgent_flag(peer, is_urgent(type));
444504

505+
/* Special message type directing us to process batch items. */
506+
if (type == WIRE_PROTOCOL_BATCH_ELEMENT)
507+
peer->sent_to_peer = process_batch_elements(peer, msg);
508+
else
509+
peer->sent_to_peer = cryptomsg_encrypt_msg(peer, &peer->cs, msg);
445510
/* We free this and the encrypted version in next write_to_peer */
446-
peer->sent_to_peer = cryptomsg_encrypt_msg(peer, &peer->cs, msg);
511+
447512
return io_write(peer->to_peer,
448513
peer->sent_to_peer,
449514
tal_bytelen(peer->sent_to_peer),

0 commit comments

Comments
 (0)