@@ -245,22 +245,23 @@ accept(Socket, Timeout) ->
245
245
case ? MODULE :nif_select_read (Socket , Ref ) of
246
246
ok ->
247
247
receive
248
- {select , _AcceptedSocket , Ref , ready_input } ->
248
+ {'$socket' , Socket , select , Ref } ->
249
249
case ? MODULE :nif_accept (Socket ) of
250
250
{error , closed } = E ->
251
251
? MODULE :nif_select_stop (Socket ),
252
252
E ;
253
253
R ->
254
254
R
255
255
end ;
256
- {closed , Ref } ->
256
+ {'$socket' , Socket , abort , { Ref , closed } } ->
257
257
% socket was closed by another process
258
258
% TODO: we need to handle:
259
259
% (a) SELECT_STOP being scheduled
260
- % (b) flush of messages as we can have both
261
- % {closed, Ref} and {select, _, Ref, _} in the
260
+ % (b) flush of messages as we can have both in the
262
261
% queue
263
- {error , closed }
262
+ {error , closed };
263
+ Other ->
264
+ {error , {accept , unexpected , Other , {'$socket' , Socket , select , Ref }}}
264
265
after Timeout ->
265
266
{error , timeout }
266
267
end ;
@@ -299,25 +300,60 @@ recv(Socket, Length) ->
299
300
% % `{ok, Data} = socket:recv(ConnectedSocket)'
300
301
% % @end
301
302
% %-----------------------------------------------------------------------------
302
- -spec recv (Socket :: socket (), Length :: non_neg_integer (), Timeout :: timeout ()) ->
303
- {ok , Data :: binary ()} | {error , Reason :: term ()}.
303
+ -spec recv (
304
+ Socket :: socket (), Length :: non_neg_integer (), Timeout :: timeout () | nowait | reference ()
305
+ ) ->
306
+ {ok , Data :: binary ()}
307
+ | {select , {select_info , recvfrom , reference ()}}
308
+ | {select , {{select_info , recvfrom , reference ()}, Data :: binary ()}}
309
+ | {error , Reason :: term ()}.
310
+ recv (Socket , Length , 0 ) ->
311
+ recv0_noselect (Socket , Length );
312
+ recv (Socket , 0 , Timeout ) when is_integer (Timeout ) orelse Timeout =:= infinity ->
313
+ recv0 (Socket , 0 , Timeout );
314
+ recv (Socket , Length , nowait ) ->
315
+ recv0_nowait (Socket , Length , erlang :make_ref ());
316
+ recv (Socket , Length , Ref ) when is_reference (Ref ) ->
317
+ recv0_nowait (Socket , Length , Ref );
304
318
recv (Socket , Length , Timeout ) ->
319
+ case ? MODULE :getopt (Socket , {socket , type }) of
320
+ {ok , stream } when Timeout =/= infinity ->
321
+ recv0_r (Socket , Length , Timeout , erlang :system_time (millisecond ) + Timeout , []);
322
+ {ok , stream } when Timeout =:= infinity ->
323
+ recv0_r (Socket , Length , Timeout , undefined , []);
324
+ _ ->
325
+ recv0 (Socket , Length , Timeout )
326
+ end .
327
+
328
+ recv0_noselect (Socket , Length ) ->
329
+ case ? MODULE :nif_recv (Socket , Length ) of
330
+ {error , _ } = E ->
331
+ E ;
332
+ {ok , Data } when Length =:= 0 orelse byte_size (Data ) =:= Length ->
333
+ {ok , Data };
334
+ {ok , Data } ->
335
+ case ? MODULE :getopt (Socket , {socket , type }) of
336
+ {ok , stream } ->
337
+ {error , {timeout , Data }};
338
+ {ok , dgram } ->
339
+ {ok , Data }
340
+ end
341
+ end .
342
+
343
+ recv0 (Socket , Length , Timeout ) ->
305
344
Ref = erlang :make_ref (),
306
- ? TRACE (" select read for recv. self=~p ref=~p~n " , [self (), Ref ]),
307
345
case ? MODULE :nif_select_read (Socket , Ref ) of
308
346
ok ->
309
347
receive
310
- {select , _AcceptedSocket , Ref , ready_input } ->
348
+ {'$socket' , Socket , select , Ref } ->
311
349
case ? MODULE :nif_recv (Socket , Length ) of
312
350
{error , _ } = E ->
313
351
? MODULE :nif_select_stop (Socket ),
314
352
E ;
315
- % TODO: Assemble data to have more if Length > byte_size(Data)
316
- % as long as timeout did not expire
317
353
{ok , Data } ->
318
354
{ok , Data }
319
355
end ;
320
- {closed , Ref } ->
356
+ {'$socket' , Socket , abort , { Ref , closed } } ->
321
357
% socket was closed by another process
322
358
% TODO: see above in accept/2
323
359
{error , closed }
@@ -328,6 +364,72 @@ recv(Socket, Length, Timeout) ->
328
364
Error
329
365
end .
330
366
367
+ recv0_nowait (Socket , Length , Ref ) ->
368
+ case ? MODULE :nif_recv (Socket , Length ) of
369
+ {error , timeout } ->
370
+ case ? MODULE :nif_select_read (Socket , Ref ) of
371
+ ok ->
372
+ {select , {select_info , recv , Ref }};
373
+ {error , _ } = Error1 ->
374
+ Error1
375
+ end ;
376
+ {error , _ } = E ->
377
+ E ;
378
+ {ok , Data } when byte_size (Data ) < Length ->
379
+ case ? MODULE :getopt (Socket , {socket , type }) of
380
+ {ok , stream } ->
381
+ case ? MODULE :nif_select_read (Socket , Ref ) of
382
+ ok ->
383
+ {select , {{select_info , recv , Ref }, Data }};
384
+ {error , _ } = Error1 ->
385
+ Error1
386
+ end ;
387
+ {ok , dgram } ->
388
+ {ok , Data }
389
+ end ;
390
+ {ok , Data } ->
391
+ {ok , Data }
392
+ end .
393
+
394
+ recv0_r (Socket , Length , Timeout , EndQuery , Acc ) ->
395
+ Ref = erlang :make_ref (),
396
+ case ? MODULE :nif_select_read (Socket , Ref ) of
397
+ ok ->
398
+ receive
399
+ {'$socket' , Socket , select , Ref } ->
400
+ case ? MODULE :nif_recv (Socket , Length ) of
401
+ {error , _ } = E ->
402
+ ? MODULE :nif_select_stop (Socket ),
403
+ E ;
404
+ {ok , Data } ->
405
+ NewAcc = [Data | Acc ],
406
+ Remaining = Length - byte_size (Data ),
407
+ case Remaining of
408
+ 0 ->
409
+ {ok , list_to_binary (lists :reverse (NewAcc ))};
410
+ _ ->
411
+ NewTimeout =
412
+ case Timeout of
413
+ infinity -> infinity ;
414
+ _ -> EndQuery - erlang :system_time (millisecond )
415
+ end ,
416
+ recv0_r (Socket , Remaining , NewTimeout , EndQuery , NewAcc )
417
+ end
418
+ end ;
419
+ {'$socket' , Socket , abort , {Ref , closed }} ->
420
+ % socket was closed by another process
421
+ % TODO: see above in accept/2
422
+ {error , closed }
423
+ after Timeout ->
424
+ case Acc of
425
+ [] -> {error , timeout };
426
+ _ -> {error , {timeout , list_to_binary (lists :reverse (Acc ))}}
427
+ end
428
+ end ;
429
+ {error , _Reason } = Error ->
430
+ Error
431
+ end .
432
+
331
433
% %-----------------------------------------------------------------------------
332
434
% % @equiv socket:recvfrom(Socket, 0)
333
435
% % @end
@@ -370,25 +472,43 @@ recvfrom(Socket, Length) ->
370
472
% % bytes are available and return these bytes.
371
473
% % @end
372
474
% %-----------------------------------------------------------------------------
373
- -spec recvfrom (Socket :: socket (), Length :: non_neg_integer (), Timeout :: timeout ()) ->
374
- {ok , {Address :: sockaddr (), Data :: binary ()}} | {error , Reason :: term ()}.
475
+ -spec recvfrom (
476
+ Socket :: socket (), Length :: non_neg_integer (), Timeout :: timeout () | nowait | reference ()
477
+ ) ->
478
+ {ok , {Address :: sockaddr (), Data :: binary ()}}
479
+ | {select , {select_info , recvfrom , reference ()}}
480
+ | {error , Reason :: term ()}.
481
+ recvfrom (Socket , Length , 0 ) ->
482
+ recvfrom0_noselect (Socket , Length );
483
+ recvfrom (Socket , Length , nowait ) ->
484
+ recvfrom0_nowait (Socket , Length , erlang :make_ref ());
485
+ recvfrom (Socket , Length , Ref ) when is_reference (Ref ) ->
486
+ recvfrom0_nowait (Socket , Length , Ref );
375
487
recvfrom (Socket , Length , Timeout ) ->
488
+ recvfrom0 (Socket , Length , Timeout ).
489
+
490
+ recvfrom0_noselect (Socket , Length ) ->
491
+ case ? MODULE :nif_recvfrom (Socket , Length ) of
492
+ {error , _ } = E ->
493
+ E ;
494
+ {ok , {_Address , _Data }} = Reply ->
495
+ Reply
496
+ end .
497
+
498
+ recvfrom0 (Socket , Length , Timeout ) ->
376
499
Ref = erlang :make_ref (),
377
- ? TRACE (" select read for recvfrom. self=~p ref=~p " , [self (), Ref ]),
378
500
case ? MODULE :nif_select_read (Socket , Ref ) of
379
501
ok ->
380
502
receive
381
- {select , _AcceptedSocket , Ref , ready_input } ->
503
+ {'$socket' , Socket , select , Ref } ->
382
504
case ? MODULE :nif_recvfrom (Socket , Length ) of
383
505
{error , _ } = E ->
384
506
? MODULE :nif_select_stop (Socket ),
385
507
E ;
386
- % TODO: Assemble data to have more if Length > byte_size(Data)
387
- % as long as timeout did not expire
388
- {ok , {Address , Data }} ->
389
- {ok , {Address , Data }}
508
+ {ok , {_Address , _Data }} = Reply ->
509
+ Reply
390
510
end ;
391
- {closed , Ref } ->
511
+ {'$socket' , Socket , abort , { Ref , closed } } ->
392
512
% socket was closed by another process
393
513
% TODO: see above in accept/2
394
514
{error , closed }
@@ -399,6 +519,21 @@ recvfrom(Socket, Length, Timeout) ->
399
519
Error
400
520
end .
401
521
522
+ recvfrom0_nowait (Socket , Length , Ref ) ->
523
+ case ? MODULE :nif_recvfrom (Socket , Length ) of
524
+ {error , timeout } ->
525
+ case ? MODULE :nif_select_read (Socket , Ref ) of
526
+ ok ->
527
+ {select , {select_info , recvfrom , Ref }};
528
+ {error , _ } = SelectError ->
529
+ SelectError
530
+ end ;
531
+ {error , _ } = RecvError ->
532
+ RecvError ;
533
+ {ok , {_Address , _Data }} = Reply ->
534
+ Reply
535
+ end .
536
+
402
537
% %-----------------------------------------------------------------------------
403
538
% % @param Socket the socket
404
539
% % @param Data the data to send
0 commit comments