Skip to content

Commit 87431eb

Browse files
Previously, the app was not notified when the client disconnection.
This caused issues especially in cases of websocket connections and SSE Events where the app continued to send data to the router, which could not deliver it to the client due to the disconnection. Changes made: Added functionality to send a port message to notify the app of client disconnection in form of port message(_NXT_PORT_MSG_CLIENT_ERROR). On the App side, handled this message and called the registered close_hanlder callback if registered.
1 parent b4201ab commit 87431eb

File tree

3 files changed

+50
-11
lines changed

3 files changed

+50
-11
lines changed

src/nxt_port.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ struct nxt_port_handlers_s {
5959
/* Status report. */
6060
nxt_port_handler_t status;
6161

62+
nxt_port_handler_t client_error;
63+
6264
nxt_port_handler_t oosm;
6365
nxt_port_handler_t shm_ack;
6466
nxt_port_handler_t read_queue;
@@ -115,6 +117,8 @@ typedef enum {
115117
_NXT_PORT_MSG_APP_RESTART = nxt_port_handler_idx(app_restart),
116118
_NXT_PORT_MSG_STATUS = nxt_port_handler_idx(status),
117119

120+
_NXT_PORT_MSG_CLIENT_ERROR = nxt_port_handler_idx(client_error),
121+
118122
_NXT_PORT_MSG_OOSM = nxt_port_handler_idx(oosm),
119123
_NXT_PORT_MSG_SHM_ACK = nxt_port_handler_idx(shm_ack),
120124
_NXT_PORT_MSG_READ_QUEUE = nxt_port_handler_idx(read_queue),
@@ -160,6 +164,8 @@ typedef enum {
160164
NXT_PORT_MSG_APP_RESTART = nxt_msg_last(_NXT_PORT_MSG_APP_RESTART),
161165
NXT_PORT_MSG_STATUS = nxt_msg_last(_NXT_PORT_MSG_STATUS),
162166

167+
NXT_PORT_MSG_CLIENT_ERROR = nxt_msg_last(_NXT_PORT_MSG_CLIENT_ERROR),
168+
163169
NXT_PORT_MSG_OOSM = nxt_msg_last(_NXT_PORT_MSG_OOSM),
164170
NXT_PORT_MSG_SHM_ACK = nxt_msg_last(_NXT_PORT_MSG_SHM_ACK),
165171
NXT_PORT_MSG_READ_QUEUE = _NXT_PORT_MSG_READ_QUEUE,

src/nxt_router.c

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5300,7 +5300,16 @@ nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
53005300
nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
53015301

53025302
if (r->req_rpc_data != NULL) {
5303-
nxt_request_rpc_data_unlink(task, r->req_rpc_data);
5303+
nxt_request_rpc_data_t *req_rpc_data = r->req_rpc_data;
5304+
5305+
if (r->error) {
5306+
nxt_port_socket_write(task, req_rpc_data->app_port,
5307+
NXT_PORT_MSG_CLIENT_ERROR,
5308+
-1, req_rpc_data->stream,
5309+
task->thread->engine->port->id, NULL);
5310+
}
5311+
5312+
nxt_request_rpc_data_unlink(task, req_rpc_data);
53045313
}
53055314

53065315
nxt_http_request_close_handler(task, r, r->proto.any);

src/nxt_unit.c

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
7474
static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
7575
static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
7676
nxt_unit_recv_msg_t *recv_msg);
77+
static int nxt_unit_process_client_error(nxt_unit_ctx_t *ctx,
78+
nxt_unit_recv_msg_t *recv_msg);
7779
static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
7880
static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
7981
nxt_unit_ctx_t *ctx);
@@ -1121,6 +1123,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
11211123
rc = nxt_unit_process_websocket(ctx, &recv_msg);
11221124
break;
11231125

1126+
case _NXT_PORT_MSG_CLIENT_ERROR:
1127+
rc = nxt_unit_process_client_error(ctx, &recv_msg);
1128+
break;
1129+
11241130
case _NXT_PORT_MSG_REMOVE_PID:
11251131
if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
11261132
nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size "
@@ -1377,18 +1383,16 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
13771383

13781384
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
13791385

1386+
res = nxt_unit_request_hash_add(ctx, req);
1387+
if (nxt_slow_path(res != NXT_UNIT_OK)) {
1388+
nxt_unit_req_warn(req, "failed to add request to hash");
1389+
nxt_unit_request_done(req, NXT_UNIT_ERROR);
1390+
return NXT_UNIT_ERROR;
1391+
}
1392+
13801393
if (req->content_length
13811394
> (uint64_t) (req->content_buf->end - req->content_buf->free))
13821395
{
1383-
res = nxt_unit_request_hash_add(ctx, req);
1384-
if (nxt_slow_path(res != NXT_UNIT_OK)) {
1385-
nxt_unit_req_warn(req, "failed to add request to hash");
1386-
1387-
nxt_unit_request_done(req, NXT_UNIT_ERROR);
1388-
1389-
return NXT_UNIT_ERROR;
1390-
}
1391-
13921396
/*
13931397
* If application have separate data handler, we may start
13941398
* request processing and process data when it is arrived.
@@ -1418,7 +1422,7 @@ nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
14181422
nxt_unit_mmap_buf_t *b;
14191423
nxt_unit_request_info_t *req;
14201424

1421-
req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1425+
req = nxt_unit_request_hash_find(ctx, recv_msg->stream, 0);
14221426
if (req == NULL) {
14231427
return NXT_UNIT_OK;
14241428
}
@@ -1722,6 +1726,26 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
17221726
return NXT_UNIT_OK;
17231727
}
17241728

1729+
static int
1730+
nxt_unit_process_client_error(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1731+
{
1732+
nxt_unit_impl_t *lib;
1733+
nxt_unit_request_info_t *req;
1734+
1735+
req = nxt_unit_request_hash_find(ctx, recv_msg->stream, 0);
1736+
1737+
if (req == NULL) {
1738+
return NXT_UNIT_OK;
1739+
}
1740+
1741+
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1742+
1743+
if (lib->callbacks.close_handler) {
1744+
lib->callbacks.close_handler(req);
1745+
}
1746+
1747+
return NXT_UNIT_OK;
1748+
}
17251749

17261750
static int
17271751
nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)

0 commit comments

Comments
 (0)