51
51
/* This is a thread implementation for Win32 lazy implementation */
52
52
53
53
/* Thread server common information */
54
- typedef struct {
55
- CRITICAL_SECTION lock ;
56
- HANDLE filled ;
57
- HANDLE killed ;
58
54
59
- blas_queue_t * queue ; /* Parameter Pointer */
60
- int shutdown ; /* server shutdown flag */
61
-
62
- } blas_pool_t ;
55
+ static blas_queue_t * work_queue = NULL ;
56
+ static HANDLE kickoff_event = NULL ;
57
+ static CRITICAL_SECTION queue_lock ;
63
58
64
59
/* We need this global for checking if initialization is finished. */
65
60
int blas_server_avail = 0 ;
66
61
67
62
/* Local Variables */
68
63
static BLASULONG server_lock = 0 ;
69
64
70
- static blas_pool_t pool ;
71
65
static HANDLE blas_threads [MAX_CPU_NUMBER ];
72
66
static DWORD blas_threads_id [MAX_CPU_NUMBER ];
67
+ static volatile int thread_target ; // target num of live threads, volatile for cross-thread reads
73
68
74
-
69
+ #if defined (__GNUC__ ) && (__GNUC__ < 6 )
70
+ #define WIN_CAS (dest , exch , comp ) __sync_val_compare_and_swap(dest, comp, exch)
71
+ #else
72
+ #if defined(_WIN64 )
73
+ #define WIN_CAS (dest , exch , comp ) InterlockedCompareExchange64(dest, exch, comp)
74
+ #else
75
+ #define WIN_CAS (dest , exch , comp ) InterlockedCompareExchange(dest, exch, comp)
76
+ #endif
77
+ #endif
75
78
76
79
static void legacy_exec (void * func , int mode , blas_arg_t * args , void * sb ){
77
80
@@ -202,14 +205,10 @@ static void legacy_exec(void *func, int mode, blas_arg_t *args, void *sb){
202
205
static DWORD WINAPI blas_thread_server (void * arg ){
203
206
204
207
/* Thread identifier */
205
- #ifdef SMP_DEBUG
206
208
BLASLONG cpu = (BLASLONG )arg ;
207
- #endif
208
209
209
210
void * buffer , * sa , * sb ;
210
211
blas_queue_t * queue ;
211
- DWORD action ;
212
- HANDLE handles [] = {pool .filled , pool .killed };
213
212
214
213
/* Each server needs each buffer */
215
214
buffer = blas_memory_alloc (2 );
@@ -225,29 +224,44 @@ static DWORD WINAPI blas_thread_server(void *arg){
225
224
#ifdef SMP_DEBUG
226
225
fprintf (STDERR , "Server[%2ld] Waiting for Queue.\n" , cpu );
227
226
#endif
227
+ // event raised when work is added to the queue
228
+ WaitForSingleObject (kickoff_event , INFINITE );
228
229
229
- do {
230
- action = WaitForMultipleObjects ( 2 , handles , FALSE, INFINITE );
231
- } while (( action != WAIT_OBJECT_0 ) && ( action != WAIT_OBJECT_0 + 1 ) );
232
-
233
- if ( action == WAIT_OBJECT_0 + 1 ) break ;
230
+ if ( cpu > thread_target - 2 )
231
+ {
232
+ //printf("thread [%d] exiting.\n", cpu );
233
+ break ; // excess thread, so worker thread exits
234
+ }
234
235
235
236
#ifdef SMP_DEBUG
236
237
fprintf (STDERR , "Server[%2ld] Got it.\n" , cpu );
237
238
#endif
238
239
239
- EnterCriticalSection (& pool .lock );
240
+ #if 1
241
+ EnterCriticalSection (& queue_lock );
242
+
243
+ queue = work_queue ;
244
+ if (queue )
245
+ work_queue = work_queue -> next ;
246
+
247
+ LeaveCriticalSection (& queue_lock );
248
+ #else
249
+ volatile blas_queue_t * queue_next ;
240
250
241
- queue = pool .queue ;
242
- if (queue ) pool .queue = queue -> next ;
251
+ INT_PTR prev_value ;
252
+ do {
253
+ queue = (volatile blas_queue_t * )work_queue ;
254
+ if (!queue )
255
+ break ;
243
256
244
- LeaveCriticalSection (& pool .lock );
257
+ queue_next = (volatile blas_queue_t * )queue -> next ;
258
+ prev_value = WIN_CAS ((INT_PTR * )& work_queue , (INT_PTR )queue_next , (INT_PTR )queue );
259
+ } while (prev_value != queue );
260
+ #endif
245
261
246
262
if (queue ) {
247
263
int (* routine )(blas_arg_t * , void * , void * , void * , void * , BLASLONG ) = queue -> routine ;
248
264
249
- if (pool .queue ) SetEvent (pool .filled );
250
-
251
265
sa = queue -> sa ;
252
266
sb = queue -> sb ;
253
267
@@ -331,14 +345,9 @@ static DWORD WINAPI blas_thread_server(void *arg){
331
345
#ifdef SMP_DEBUG
332
346
fprintf (STDERR , "Server[%2ld] Finished!\n" , cpu );
333
347
#endif
348
+
349
+ queue -> finished = 1 ;
334
350
335
- EnterCriticalSection (& queue -> lock );
336
-
337
- queue -> status = BLAS_STATUS_FINISHED ;
338
-
339
- LeaveCriticalSection (& queue -> lock );
340
-
341
- SetEvent (queue -> finish );
342
351
}
343
352
344
353
/* Shutdown procedure */
@@ -366,15 +375,16 @@ int blas_thread_init(void){
366
375
#endif
367
376
368
377
if (!blas_server_avail ){
378
+ // create the kickoff Event
379
+ kickoff_event = CreateEvent (NULL , TRUE, FALSE, NULL );
369
380
370
- InitializeCriticalSection (& pool .lock );
371
- pool .filled = CreateEvent (NULL , FALSE, FALSE, NULL );
372
- pool .killed = CreateEvent (NULL , TRUE, FALSE, NULL );
381
+ thread_target = blas_cpu_number ;
373
382
374
- pool .shutdown = 0 ;
375
- pool .queue = NULL ;
383
+ InitializeCriticalSection (& queue_lock );
376
384
377
385
for (i = 0 ; i < blas_cpu_number - 1 ; i ++ ){
386
+ //printf("thread_init: creating thread [%d]\n", i);
387
+
378
388
blas_threads [i ] = CreateThread (NULL , 0 ,
379
389
blas_thread_server , (void * )i ,
380
390
0 , & blas_threads_id [i ]);
@@ -409,32 +419,39 @@ int exec_blas_async(BLASLONG pos, blas_queue_t *queue){
409
419
current = queue ;
410
420
411
421
while (current ) {
412
- InitializeCriticalSection (& current -> lock );
413
- current -> finish = CreateEvent (NULL , FALSE, FALSE, NULL );
414
422
current -> position = pos ;
415
423
416
424
#ifdef CONSISTENT_FPCSR
417
425
__asm__ __volatile__ ("fnstcw %0" : "=m" (current -> x87_mode ));
418
426
__asm__ __volatile__ ("stmxcsr %0" : "=m" (current -> sse_mode ));
419
427
#endif
420
428
429
+ current -> finished = 0 ;
421
430
current = current -> next ;
422
431
pos ++ ;
423
432
}
424
433
425
- EnterCriticalSection (& pool .lock );
434
+ EnterCriticalSection (& queue_lock );
435
+
436
+ if (!work_queue )
437
+ {
438
+ work_queue = queue ;
439
+ }
440
+ else
441
+ {
442
+ blas_queue_t * next_item = work_queue ;
443
+
444
+ // find the end of the work queue
445
+ while (next_item )
446
+ next_item = next_item -> next ;
426
447
427
- if (pool .queue ) {
428
- current = pool .queue ;
429
- while (current -> next ) current = current -> next ;
430
- current -> next = queue ;
431
- } else {
432
- pool .queue = queue ;
448
+ // add new work to the end
449
+ next_item = queue ;
433
450
}
434
451
435
- LeaveCriticalSection (& pool . lock );
452
+ LeaveCriticalSection (& queue_lock );
436
453
437
- SetEvent (pool . filled );
454
+ SetEvent (kickoff_event );
438
455
439
456
return 0 ;
440
457
}
@@ -449,21 +466,26 @@ int exec_blas_async_wait(BLASLONG num, blas_queue_t *queue){
449
466
#ifdef SMP_DEBUG
450
467
fprintf (STDERR , "Waiting Queue ..\n" );
451
468
#endif
469
+ while (!queue -> finished )
470
+ YIELDING ;
452
471
453
- WaitForSingleObject (queue -> finish , INFINITE );
454
-
455
- CloseHandle (queue -> finish );
456
- DeleteCriticalSection (& queue -> lock );
457
-
458
- queue = queue -> next ;
459
- num -- ;
472
+ queue = queue -> next ;
473
+ num -- ;
460
474
}
461
475
462
476
#ifdef SMP_DEBUG
463
477
fprintf (STDERR , "Completely Done.\n\n" );
464
478
#endif
479
+ // if work was added to the queue after this batch we can't sleep the worker threads
480
+ // by resetting the event
481
+ EnterCriticalSection (& queue_lock );
465
482
466
- return 0 ;
483
+ if (work_queue == NULL )
484
+ ResetEvent (kickoff_event );
485
+
486
+ LeaveCriticalSection (& queue_lock );
487
+
488
+ return 0 ;
467
489
}
468
490
469
491
/* Execute Threads */
@@ -512,8 +534,6 @@ int BLASFUNC(blas_thread_shutdown)(void){
512
534
513
535
if (blas_server_avail ){
514
536
515
- SetEvent (pool .killed );
516
-
517
537
for (i = 0 ; i < blas_num_threads - 1 ; i ++ ){
518
538
// Could also just use WaitForMultipleObjects
519
539
DWORD wait_thread_value = WaitForSingleObject (blas_threads [i ], 50 );
@@ -528,9 +548,6 @@ int BLASFUNC(blas_thread_shutdown)(void){
528
548
CloseHandle (blas_threads [i ]);
529
549
}
530
550
531
- CloseHandle (pool .filled );
532
- CloseHandle (pool .killed );
533
-
534
551
blas_server_avail = 0 ;
535
552
}
536
553
@@ -552,23 +569,48 @@ void goto_set_num_threads(int num_threads)
552
569
553
570
if (num_threads > MAX_CPU_NUMBER ) num_threads = MAX_CPU_NUMBER ;
554
571
572
+ if (blas_server_avail && num_threads < blas_num_threads ) {
573
+ LOCK_COMMAND (& server_lock );
574
+
575
+ thread_target = num_threads ;
576
+
577
+ SetEvent (kickoff_event );
578
+
579
+ for (i = num_threads - 1 ; i < blas_num_threads - 1 ; i ++ ) {
580
+ //printf("set_num_threads: waiting on thread [%d] to quit.\n", i);
581
+
582
+ WaitForSingleObject (blas_threads [i ], INFINITE );
583
+
584
+ //printf("set_num_threads: thread [%d] has quit.\n", i);
585
+
586
+ CloseHandle (blas_threads [i ]);
587
+ }
588
+
589
+ blas_num_threads = num_threads ;
590
+
591
+ ResetEvent (kickoff_event );
592
+
593
+ UNLOCK_COMMAND (& server_lock );
594
+ }
595
+
555
596
if (num_threads > blas_num_threads ) {
556
597
557
598
LOCK_COMMAND (& server_lock );
558
599
600
+ thread_target = num_threads ;
601
+
559
602
//increased_threads = 1;
560
603
if (!blas_server_avail ){
604
+ // create the kickoff Event
605
+ kickoff_event = CreateEvent (NULL , TRUE, FALSE, NULL );
561
606
562
- InitializeCriticalSection (& pool .lock );
563
- pool .filled = CreateEvent (NULL , FALSE, FALSE, NULL );
564
- pool .killed = CreateEvent (NULL , TRUE, FALSE, NULL );
607
+ InitializeCriticalSection (& queue_lock );
565
608
566
- pool .shutdown = 0 ;
567
- pool .queue = NULL ;
568
609
blas_server_avail = 1 ;
569
610
}
570
611
571
612
for (i = (blas_num_threads > 0 ) ? blas_num_threads - 1 : 0 ; i < num_threads - 1 ; i ++ ){
613
+ //printf("set_num_threads: creating thread [%d]\n", i);
572
614
573
615
blas_threads [i ] = CreateThread (NULL , 0 ,
574
616
blas_thread_server , (void * )i ,
0 commit comments