@@ -93,30 +93,42 @@ private async Task HandleMessage(NpgsqlConnection connection, CancellationToken
93
93
// firstMessage that was already read from the channel
94
94
ct . ThrowIfCancellationRequested ( ) ;
95
95
96
- await using var batch = connection . CreateBatch ( ) ;
96
+ var payloads = new string [ messages . Count ] ;
97
97
98
- foreach ( var message in messages )
98
+ for ( var i = 0 ; i < messages . Count ; i ++ )
99
99
{
100
- var command = batch . CreateBatchCommand ( ) ;
100
+ payloads [ i ] = messages [ i ] . FormattedPayload ;
101
+ }
102
+
103
+ const string sql =
104
+ """
105
+ SELECT
106
+ pg_notify(t.channel, t.message)
107
+ from
108
+ (select
109
+ @channel as channel,
110
+ unnest(@messages) as message
111
+ ) as t;
112
+ """ ;
113
+
114
+ await using var command = connection . CreateCommand ( ) ;
101
115
102
- command . CommandText = "SELECT pg_notify(@channel, @message);" ;
116
+ command . CommandText = sql ;
103
117
104
- var channel = new NpgsqlParameter ( "channel" , DbType . String )
118
+ command . Parameters . Add (
119
+ new NpgsqlParameter ( "channel" , NpgsqlDbType . Text )
105
120
{
106
121
Value = _channelName
107
- } ;
108
- var msg = new NpgsqlParameter ( "message" , DbType . String )
109
- {
110
- Value = message . FormattedPayload
111
- } ;
112
- command . Parameters . Add ( channel ) ;
113
- command . Parameters . Add ( msg ) ;
122
+ } ) ;
114
123
115
- batch . BatchCommands . Add ( command ) ;
116
- }
124
+ command . Parameters . Add (
125
+ new NpgsqlParameter ( "messages" , NpgsqlDbType . Array | NpgsqlDbType . Varchar )
126
+ {
127
+ Value = payloads
128
+ } ) ;
117
129
118
- await batch . PrepareAsync ( ct ) ;
119
- await batch . ExecuteNonQueryAsync ( ct ) ;
130
+ await command . PrepareAsync ( ct ) ;
131
+ await command . ExecuteNonQueryAsync ( ct ) ;
120
132
}
121
133
catch ( Exception ex )
122
134
{
0 commit comments