30
30
31
31
from .server import SalusServer
32
32
from .tfserver import TFDistServer
33
- from .utils import Popen , execute , snake_to_pascal , str2bool
33
+ from .utils import Popen , execute , snake_to_pascal , str2bool , remove_suffix
34
34
from .utils .compatiblity import pathlib , subprocess as sp
35
35
36
36
Path = pathlib .Path
40
40
flags .DEFINE_string ('tfbench_base' , '../tf_benchmarks' , 'Base dir of TFBenchmark based workloads' )
41
41
flags .DEFINE_string ('unit_base' , 'tests' , 'Base dir of unittest based workloads' )
42
42
flags .DEFINE_string ('fathom_base' , '../fathom' , 'Base dir of Fathom based workloads' )
43
+ flags .DEFINE_string ('tfweb_base' , '../tfweb' , 'Base dir of TFWeb based workloads' )
44
+ flags .DEFINE_string ('tfweb_saved_model_dir' , '~/../symbiotic/peifeng/tf_cnn_benchmarks_models/saved_models' ,
45
+ 'SavedModel dir of TFWeb based workloads' )
46
+ flags .DEFINE_string ('tfweb_request_body_dir' , '~/../symbiotic/peifeng/tf_cnn_benchmarks_models/reqeusts' ,
47
+ 'Predefined request body dir for TFWeb based workloads' )
43
48
flags .DEFINE_boolean ('no_capture' , False , 'Do not capture workload outputs' )
44
49
45
50
@@ -113,28 +118,61 @@ def __call__(self, executor, output_file):
113
118
'--num_batches={}' .format (self .wl .batch_num ),
114
119
'--batch_size={}' .format (self .wl .batch_size ),
115
120
]
116
- eval_interval = self .wl .env .pop ('SALUS_TFBENCH_EVAL_INTERVAL' , '0.1' )
117
- eval_rand_factor = self .wl .env .pop ('SALUS_TFBENCH_EVAL_RAND_FACTOR' , '5' )
121
+ eval_interval = self .wl .env .pop ('SALUS_TFBENCH_EVAL_INTERVAL' , None )
122
+ eval_rand_factor = self .wl .env .pop ('SALUS_TFBENCH_EVAL_RAND_FACTOR' , None )
118
123
eval_block = self .wl .env .pop ('SALUS_TFBENCH_EVAL_BLOCK' , 'true' )
124
+
125
+ eval_model_dir = self .wl .env .pop ('SALUS_TFBENCH_EVAL_MODEL_DIR' , 'models' )
126
+ eval_model_dir = str (Path (eval_model_dir ).joinpath (remove_suffix (self .wl .name , 'eval' )))
127
+
128
+ eval_saved_model_dir = self .wl .env .pop ('SALUS_TFBENCH_EVAL_SAVED_MODEL_DIR' , None )
129
+ if eval_saved_model_dir is not None :
130
+ eval_saved_model_dir = str (Path (eval_saved_model_dir ).joinpath (remove_suffix (self .wl .name , 'eval' )))
131
+
132
+ num_seconds = self .wl .env .pop ('SALUS_ITER_SECONDS' , None )
133
+ if num_seconds is not None :
134
+ cmd += [
135
+ '--num_seconds={}' .format (num_seconds )
136
+ ]
137
+
138
+ wait_for_signal = self .wl .env .pop ('SALUS_WAIT_FOR_SIGNAL' , None )
139
+ if wait_for_signal is not None :
140
+ cmd += [
141
+ '--wait_for_signal={}' .format (wait_for_signal )
142
+ ]
143
+
119
144
if self .wl .name .endswith ('eval' ):
120
- model_name = self .wl .name . rsplit ( 'eval' )[ 0 ]
145
+ model_name = remove_suffix ( self .wl .name , 'eval' )
121
146
cmd += [
122
- '--model_dir=models/{}' . format ( model_name ) ,
147
+ '--model_dir=' + eval_model_dir ,
123
148
'--model={}' .format (model_name ),
124
- '--eval_interval_secs={}' .format (eval_interval ),
125
- '--eval_interval_random_factor={}' .format (eval_rand_factor ),
126
149
'--eval_block={}' .format (eval_block ),
127
150
'--eval'
128
151
]
152
+ if eval_interval is not None :
153
+ cmd += [
154
+ '--eval_interval_secs={}' .format (eval_interval ),
155
+ ]
156
+ if eval_rand_factor is not None :
157
+ cmd += [
158
+ '--eval_interval_random_factor={}' .format (eval_rand_factor ),
159
+ ]
160
+ if eval_saved_model_dir is not None :
161
+ cmd += [
162
+ '--saved_model_dir=' + eval_saved_model_dir
163
+ ]
129
164
else :
130
165
cmd += [
131
166
'--model={}' .format (self .wl .name ),
132
167
]
133
168
if str2bool (self .wl .env .pop ('SALUS_SAVE_MODEL' , '' )):
134
169
cmd += [
135
- '--model_dir=models/{}' . format ( self . wl . name ) ,
170
+ '--model_dir=' + eval_model_dir ,
136
171
]
137
172
173
+ cmd += self .wl .extra_args
174
+ logger .info (f'Starting workload with cmd: { cmd } ' )
175
+
138
176
if FLAGS .no_capture :
139
177
return execute (cmd , cwd = str (cwd ), env = self .env )
140
178
else :
@@ -157,6 +195,7 @@ def __call__(self, executor, output_file):
157
195
# type: (Executor, Path) -> Popen
158
196
env = self .env .copy ()
159
197
env ['EXEC_ITER_NUMBER' ] = str (self .wl .batch_num )
198
+ env ['SALUS_BATCH_SIZE' ] = str (self .wl .batch_size )
160
199
if executor == Executor .TFDist :
161
200
env ['SALUS_TFDIST_ENDPOINT' ] = TFDistServer .current_server ().endpoint
162
201
@@ -166,12 +205,16 @@ def __call__(self, executor, output_file):
166
205
'stdbuf' , '-o0' , '-e0' , '--' ,
167
206
'python' , '-m' , pkg , method ,
168
207
]
208
+ cmd += self .wl .extra_args
209
+
210
+ logger .info (f'Starting workload with cmd: { cmd } ' )
169
211
if FLAGS .no_capture :
170
212
return execute (cmd , cwd = str (cwd ), env = self .env )
171
213
else :
172
214
output_file .parent .mkdir (exist_ok = True , parents = True )
173
215
with output_file .open ('w' ) as f :
174
- return execute (cmd , cwd = str (cwd ), env = env , stdout = f , stderr = sp .STDOUT )
216
+ # return execute(cmd, cwd=str(cwd), env=env, stdout=f, stderr=sp.STDOUT)
217
+ return execute (cmd , cwd = str (cwd ), env = env , stdout = f , stderr = None )
175
218
176
219
def _construct_test_name (self , executor ):
177
220
# type: (Executor) -> Tuple[str, str]
@@ -197,6 +240,12 @@ def _construct_test_name(self, executor):
197
240
})
198
241
}
199
242
243
+ variable_batch_size_models = {'vae' , 'superres' , 'seq2seq' , 'mnistsf' , 'mnistcv' , 'mnistlg' }
244
+ if remove_suffix (self .wl .name , 'eval' ) not in variable_batch_size_models :
245
+ if self .wl .batch_size not in self .wl .wtl .available_batch_sizes ():
246
+ raise ValueError (f"Batch size `{ self .wl .batch_size } ' is not supported for { self .wl .name } ,"
247
+ f" available ones: { self .wl .wtl .available_batch_sizes ()} " )
248
+
200
249
if executor == Executor .Salus :
201
250
prefix = 'test_rpc_'
202
251
elif executor == Executor .TF :
@@ -209,19 +258,26 @@ def _construct_test_name(self, executor):
209
258
if self .wl .name .endswith ('eval' ):
210
259
prefix += 'eval_'
211
260
212
- model_name = self .wl .name . rsplit ( 'eval' )[ 0 ]
261
+ model_name = remove_suffix ( self .wl .name , 'eval' )
213
262
214
263
if model_name in supported_model :
215
264
pkg , cls , names = supported_model [model_name ]
216
265
else :
217
266
# fallback to guessing
218
267
pkg = f'test_tf.test_{ model_name } '
219
268
cls = f'Test{ snake_to_pascal (model_name )} '
269
+
270
+ # get method name
220
271
names = {
221
272
s : str (idx )
222
273
for idx , s in enumerate (self .wl .wtl .available_batch_sizes ())
223
274
}
224
- method = f'{ cls } .{ prefix } { names [self .wl .batch_size ]} '
275
+
276
+ postfix = names .get (self .wl .batch_size , '0' )
277
+ if model_name == 'seq2seq' and postfix == '0' :
278
+ postfix = '2_large'
279
+
280
+ method = f'{ cls } .{ prefix } { postfix } '
225
281
return pkg , method
226
282
227
283
@@ -240,7 +296,7 @@ def __call__(self, executor, output_file):
240
296
cmd = [
241
297
'stdbuf' , '-o0' , '-e0' , '--' ,
242
298
'python' , '-m' , 'fathom.cli' ,
243
- '--workload' , self .wl .name . rsplit ( 'eval' )[ 0 ] ,
299
+ '--workload' , remove_suffix ( self .wl .name , 'eval' ),
244
300
'--action' , 'test' if self .wl .name .endswith ('eval' ) else 'train' ,
245
301
'--num_iters' , str (self .wl .batch_num ),
246
302
'--batch_size' , str (self .wl .batch_size ),
@@ -262,9 +318,153 @@ def __call__(self, executor, output_file):
262
318
else :
263
319
raise ValueError (f'Unknown executor: { executor } ' )
264
320
321
+ cmd += self .wl .extra_args
322
+ logger .info (f'Starting workload with cmd: { cmd } ' )
323
+
324
+ if FLAGS .no_capture :
325
+ return execute (cmd , cwd = str (cwd ), env = self .env )
326
+ else :
327
+ output_file .parent .mkdir (exist_ok = True , parents = True )
328
+ with output_file .open ('w' ) as f :
329
+ return execute (cmd , cwd = str (cwd ), env = self .env , stdout = f , stderr = sp .STDOUT )
330
+
331
+
332
+ class TFWebDirectRunner (Runner ):
333
+ """Using TFWeb's load infrastructure to directly run"""
334
+
335
+ def __init__ (self , wl , base_dir = None ):
336
+ super ().__init__ (wl )
337
+ self .base_dir = base_dir
338
+ if self .base_dir is None :
339
+ self .base_dir = FLAGS .tfweb_base
340
+
341
+ def __call__ (self , executor , output_file ):
342
+ model_name = remove_suffix (self .wl .name , 'eval' )
343
+ cwd = self .base_dir
344
+ cmd = [
345
+ 'stdbuf' , '-o0' , '-e0' , '--' ,
346
+ 'examples/direct/client' ,
347
+ '--model="{}"' .format (str (Path (FLAGS .tfweb_saved_model_dir ).joinpath (model_name ))),
348
+ '--batch_size={}' .format (self .wl .batch_size ),
349
+ '--batch_num={}' .format (self .wl .batch_num ),
350
+ ]
351
+
352
+ if executor == Executor .Salus :
353
+ cmd += [
354
+ '--sess_target' , SalusServer .current_server ().endpoint ,
355
+ ]
356
+ elif executor == Executor .TF :
357
+ cmd += [
358
+ '--sess_target' , '""' ,
359
+ ]
360
+ elif executor == Executor .TFDist :
361
+ cmd += [
362
+ '--sess_target' , TFDistServer .current_server ().endpoint ,
363
+ ]
364
+ else :
365
+ raise ValueError (f'Unknown executor: { executor } ' )
366
+ cmd += self .wl .extra_args
367
+ logger .info (f'Starting workload with cmd: { cmd } ' )
368
+
265
369
if FLAGS .no_capture :
266
370
return execute (cmd , cwd = str (cwd ), env = self .env )
267
371
else :
268
372
output_file .parent .mkdir (exist_ok = True , parents = True )
269
373
with output_file .open ('w' ) as f :
270
374
return execute (cmd , cwd = str (cwd ), env = self .env , stdout = f , stderr = sp .STDOUT )
375
+
376
+
377
+ class TFWebRunner (Runner ):
378
+ """
379
+ Run a TFWeb based inference job
380
+
381
+ We start several servers and a balancer on the same node.
382
+ The server commandline: tfweb --model=path/to/saved_model/network --sess_target=...
383
+ The client commandline: gobetween from-file xxx.toml
384
+ """
385
+
386
+ def __init__ (self , wl , base_dir = None ):
387
+ super ().__init__ (wl )
388
+ self .base_dir = base_dir
389
+ if self .base_dir is None :
390
+ self .base_dir = FLAGS .tfweb_base
391
+
392
+ def __call__ (self , executor , output_file ):
393
+ # type: (Executor, Path) -> Popen
394
+ model_name = remove_suffix (self .wl .name , 'web' )
395
+ cwd = self .base_dir
396
+ cmd = [
397
+ 'stdbuf' , '-o0' , '-e0' , '--' ,
398
+ 'examples/cluster/start_cluster' ,
399
+ '--model="{}"' .format (str (Path (FLAGS .tfweb_saved_model_dir ).joinpath (model_name ))),
400
+ ]
401
+
402
+ if executor == Executor .Salus :
403
+ cmd += [
404
+ '--sess_target' , SalusServer .current_server ().endpoint ,
405
+ ]
406
+ elif executor == Executor .TF :
407
+ cmd += [
408
+ '--sess_target' , '""' ,
409
+ ]
410
+ elif executor == Executor .TFDist :
411
+ cmd += [
412
+ '--sess_target' , TFDistServer .current_server ().endpoint ,
413
+ ]
414
+ else :
415
+ raise ValueError (f'Unknown executor: { executor } ' )
416
+
417
+ num_replicas = self .wl .env .pop ('SALUS_TFWEB_REPLICAS' , '1' )
418
+ cmd += [
419
+ '--num_replicas' , num_replicas
420
+ ]
421
+ cmd += self .wl .extra_args
422
+ logger .info (f'Starting workload with cmd: { cmd } ' )
423
+
424
+ if FLAGS .no_capture :
425
+ return execute (cmd , cwd = str (cwd ), env = self .env )
426
+ else :
427
+ output_file .parent .mkdir (exist_ok = True , parents = True )
428
+ with output_file .open ('w' ) as f :
429
+ return execute (cmd , cwd = str (cwd ), env = self .env , stdout = f , stderr = sp .STDOUT )
430
+
431
+
432
+ class TFWebClientRunner (Runner ):
433
+ """
434
+ Run a tfweb client attacker.
435
+ Command: examples/cluster/tfweb-client TARGET REQ_BODY PLANTXT
436
+ """
437
+
438
+ def __init__ (self , wl , base_dir = None ):
439
+ super ().__init__ (wl )
440
+ self .base_dir = base_dir
441
+ if self .base_dir is None :
442
+ self .base_dir = FLAGS .tfweb_base
443
+
444
+ def __call__ (self , executor , output_file ):
445
+ # type: (Executor, Path) -> Popen
446
+
447
+ model_name = remove_suffix (self .wl .name , 'client' )
448
+
449
+ cwd = self .base_dir
450
+ cmd = [
451
+ 'stdbuf' , '-o0' , '-e0' , '--' ,
452
+ 'examples/tfweb-client' ,
453
+ '-output' , str (output_file ),
454
+ self .wl .target ,
455
+ # request body
456
+ str (Path (FLAGS .tfweb_request_body_dir ).joinpath (model_name ).with_suffix ('.txt' )),
457
+ # always write plan to stdin
458
+ '-' ,
459
+ ]
460
+ cmd += self .wl .extra_args
461
+ logger .info (f'Starting workload with cmd: { cmd } ' )
462
+
463
+ proc = execute (cmd , cwd = str (cwd ), env = self .env , stdin = sp .PIPE )
464
+ proc .stdin .write (self ._plan_to_bytes ())
465
+ proc .stdin .close ()
466
+ return proc
467
+
468
+ def _plan_to_bytes (self ):
469
+ return ' ' .join (self .wl .plan ).encode ('utf-8' )
470
+
0 commit comments