File tree Expand file tree Collapse file tree 4 files changed +29
-3
lines changed
website/cue/reference/components/sinks/base Expand file tree Collapse file tree 4 files changed +29
-3
lines changed Original file line number Diff line number Diff line change 1
- The ` amqp ` sink now attempts to re-connect to the AMQP broker when the channel has been disconnected.
1
+ The ` amqp ` sink now attempts to re-connect to the AMQP broker when the channel has been disconnected. It will also create up to 4 channels in a pool (configurable with the ` max_channels ` configuration) to improve throughput.
2
2
3
3
authors: aramperes
Original file line number Diff line number Diff line change @@ -7,11 +7,22 @@ use lapin::options::ConfirmSelectOptions;
7
7
pub type AmqpSinkChannels = Pool < AmqpSinkChannelManager > ;
8
8
9
9
pub ( super ) fn new_channel_pool ( config : & AmqpSinkConfig ) -> crate :: Result < AmqpSinkChannels > {
10
+ let max_channels = config. max_channels . try_into ( ) . map_err ( |_| {
11
+ Box :: new ( AmqpError :: PoolError {
12
+ error : "max_channels must fit into usize" . into ( ) ,
13
+ } )
14
+ } ) ?;
15
+ if max_channels == 0 {
16
+ return Err ( Box :: new ( AmqpError :: PoolError {
17
+ error : "max_channels must be positive" . into ( ) ,
18
+ } ) ) ;
19
+ }
10
20
let channel_manager = AmqpSinkChannelManager :: new ( & config. connection ) ;
11
21
let channels = Pool :: builder ( channel_manager)
12
- . max_size ( 4 )
22
+ . max_size ( max_channels )
13
23
. runtime ( deadpool:: Runtime :: Tokio1 )
14
24
. build ( ) ?;
25
+ debug ! ( "AMQP channel pool created with max size: {}" , max_channels) ;
15
26
Ok ( channels)
16
27
}
17
28
Original file line number Diff line number Diff line change @@ -90,6 +90,14 @@ pub struct AmqpSinkConfig {
90
90
skip_serializing_if = "crate::serde::is_default"
91
91
) ]
92
92
pub ( crate ) acknowledgements : AcknowledgementsConfig ,
93
+
94
+ /// Maximum number of AMQP channels to keep active (channels are created as needed).
95
+ #[ serde( default = "default_max_channels" ) ]
96
+ pub ( crate ) max_channels : u32 ,
97
+ }
98
+
99
+ const fn default_max_channels ( ) -> u32 {
100
+ 4
93
101
}
94
102
95
103
impl Default for AmqpSinkConfig {
@@ -101,6 +109,7 @@ impl Default for AmqpSinkConfig {
101
109
encoding : TextSerializerConfig :: default ( ) . into ( ) ,
102
110
connection : AmqpConfig :: default ( ) ,
103
111
acknowledgements : AcknowledgementsConfig :: default ( ) ,
112
+ max_channels : default_max_channels ( ) ,
104
113
}
105
114
}
106
115
}
@@ -111,7 +120,8 @@ impl GenerateConfig for AmqpSinkConfig {
111
120
r#"connection_string = "amqp://localhost:5672/%2f"
112
121
routing_key = "user_id"
113
122
exchange = "test"
114
- encoding.codec = "json""# ,
123
+ encoding.codec = "json"
124
+ max_channels = 4"# ,
115
125
)
116
126
. unwrap ( )
117
127
}
Original file line number Diff line number Diff line change @@ -392,6 +392,11 @@ base: components: sinks: amqp: configuration: {
392
392
required : true
393
393
type : string : syntax : " template "
394
394
}
395
+ max_channels : {
396
+ description : " Maximum number of AMQP channels to keep active (channels are created as needed). "
397
+ required : false
398
+ type : uint : default : 4
399
+ }
395
400
properties : {
396
401
description : """
397
402
Configure the AMQP message properties.
You can’t perform that action at this time.
0 commit comments