Skip to content

Commit 055b71e

Browse files
authored
Save Scalars With Mirrored Strategy (#259)
1 parent e635944 commit 055b71e

File tree

4 files changed

+53
-20
lines changed

4 files changed

+53
-20
lines changed

smdebug/core/hook.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,10 @@ def _get_worker_name(self):
294294
def _get_num_workers(self):
295295
pass
296296

297+
@abstractmethod
298+
def _is_not_supported(self):
299+
pass
300+
297301
#### Save Manager methods ####
298302

299303
def _should_collection_be_saved(self, coll_name: str) -> bool:
@@ -418,7 +422,10 @@ def _close_writers(self) -> None:
418422
for mode in to_delete_writers:
419423
del self.tb_writers[mode]
420424

421-
def _initialize_writers(self) -> None:
425+
def _initialize_writers(self, only_initialize_if_missing=False) -> None:
426+
# Function is overridden in smdebug/tensorflow/base_hook.py
427+
if only_initialize_if_missing and self.writer:
428+
return
422429
if self.dry_run:
423430
return
424431
if self.first_process is False:
@@ -630,6 +637,11 @@ def _write_scalars(self):
630637
If sm_metric is set to True for certain scalars, then that scalar is written to
631638
SageMaker as well. By default, loss values are sm_metric.
632639
"""
640+
if self._is_not_supported():
641+
# Do not log scalars if smdebug hook is not supported
642+
# Like when TFDistributionStrategy.UNSUPPORTED
643+
self.scalar_cache = []
644+
return
633645
for scalar_obj in self.scalar_cache:
634646
scalar_name = scalar_obj.name
635647
scalar_val = scalar_obj.value
@@ -652,8 +664,7 @@ def _write_scalars(self):
652664
scalar_name, scalar_val, self.step, timestamp=timestamp
653665
)
654666
if write_event:
655-
if self.writer is None:
656-
self._initialize_writers()
667+
self._initialize_writers(only_initialize_if_missing=True)
657668
self._write_raw_tensor_simple(scalar_name, scalar_val, timestamp=timestamp)
658669

659670
self.scalar_cache = []

smdebug/tensorflow/base_hook.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,10 @@ def _get_writers(self, tensor_name, tensor_ref) -> List[FileWriter]:
282282
elif self.distribution_strategy == TFDistributionStrategy.MIRRORED:
283283
if len(self.device_map):
284284
# else is for metrics in Keras
285-
worker = tensor_ref.tf_obj.device if tensor_ref.tf_obj is not None else "CPU"
285+
if tensor_ref is not None and tensor_ref.tf_obj is not None:
286+
worker = tensor_ref.tf_obj.device
287+
else:
288+
worker = "CPU"
286289
# if device str is empty or cpu in worker
287290
if not bool(worker) or "CPU" in worker:
288291
if self.save_all_workers:

tests/tensorflow2/test_keras.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
This was tested with TensorFlow 2.1, by running
77
`python tests/tensorflow2/test_keras.py` from the main directory.
88
"""
9+
# Standard Library
10+
import time
11+
912
# Third Party
1013
import pytest
1114
import tensorflow.compat.v2 as tf
@@ -394,6 +397,10 @@ def test_gradtape_persistent(out_dir, saveall):
394397
@pytest.mark.parametrize("saveall", [True, False])
395398
def test_keras_fit(out_dir, tf_eager_mode, saveall):
396399
hook = smd.KerasHook(out_dir=out_dir, save_all=saveall)
400+
ts = time.time()
401+
hook.save_scalar("foobar", 1, sm_metric=True, timestamp=ts)
402+
scalars_to_be_saved = dict()
403+
scalars_to_be_saved["scalar/foobar"] = (ts, 0)
397404
helper_keras_fit(
398405
trial_dir=out_dir,
399406
hook=hook,
@@ -403,9 +410,9 @@ def test_keras_fit(out_dir, tf_eager_mode, saveall):
403410

404411
trial = smd.create_trial(path=out_dir)
405412
# can't save gradients in TF 2.x eager mode
406-
if saveall: # save losses, metrics, weights, biases
413+
if saveall: # save losses, metrics, weights, biases, scalar
407414
if tf_eager_mode:
408-
assert len(trial.tensor_names()) == (12 if is_tf_2_2() else 13)
415+
assert len(trial.tensor_names()) == (13 if is_tf_2_2() else 14)
409416
else:
410417
assert len(trial.tensor_names()) == 21
411418
assert len(trial.tensor_names(collection=CollectionKeys.BIASES)) == 2
@@ -421,11 +428,13 @@ def test_keras_fit(out_dir, tf_eager_mode, saveall):
421428
"No Optimizer Variables Should be Saved in EVAL Mode",
422429
)
423430
else: # save the default losses and metrics
424-
assert len(trial.tensor_names()) == (3 if is_tf_2_2() and tf_eager_mode else 4)
431+
assert len(trial.tensor_names()) == (4 if is_tf_2_2() and tf_eager_mode else 5)
425432
assert len(trial.tensor_names(collection=CollectionKeys.LOSSES)) == 1
426433
assert len(trial.tensor_names(collection=CollectionKeys.METRICS)) == (
427434
2 if is_tf_2_2() and tf_eager_mode else 3
428435
)
436+
for tname in trial.tensor_names():
437+
assert trial.tensor(tname).value(0) is not None
429438

430439

431440
@pytest.mark.slow

tests/tensorflow2/test_keras_mirrored.py

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33

44
# Standard Library
55
import os
6+
import time
67

78
# Third Party
89
import pytest
910
import tensorflow.compat.v2 as tf
1011
import tensorflow_datasets as tfds
1112
from tensorflow.python.client import device_lib
13+
from tests.core.utils import verify_files
1214
from tests.tensorflow2.utils import is_tf_2_2
1315
from tests.tensorflow.utils import create_trial_fast_refresh
1416

@@ -119,6 +121,10 @@ def scale(image, label):
119121
)
120122

121123
hooks.append(hook)
124+
scalars_to_be_saved = dict()
125+
ts = time.time()
126+
scalars_to_be_saved["scalar/foobar"] = (ts, steps)
127+
hook.save_scalar("foobar", 1, sm_metric=True, timestamp=ts)
122128

123129
if steps is None:
124130
steps = ["train"]
@@ -131,7 +137,7 @@ def scale(image, label):
131137
model.predict(train_dataset, steps=4, callbacks=hooks, verbose=0)
132138

133139
smd.get_hook().close()
134-
return strategy
140+
return strategy, scalars_to_be_saved
135141

136142

137143
def exhaustive_check(trial_dir, include_workers="one", eager=True):
@@ -144,7 +150,7 @@ def exhaustive_check(trial_dir, include_workers="one", eager=True):
144150
CollectionKeys.METRICS,
145151
CollectionKeys.OPTIMIZER_VARIABLES,
146152
]
147-
strategy = train_model(
153+
strategy, _ = train_model(
148154
trial_dir,
149155
include_collections=include_collections,
150156
steps=["train", "eval", "predict", "train"],
@@ -158,9 +164,11 @@ def exhaustive_check(trial_dir, include_workers="one", eager=True):
158164
if include_workers == "all":
159165
assert len(tr.workers()) == strategy.num_replicas_in_sync
160166
if eager:
161-
assert len(tr.tensor_names()) == (6 + 1 + 2 + 5 if is_tf_2_2() else 6 + 1 + 3 + 5)
162-
# 6 weights, 1 loss, 3 metrics, 5 optimizer variables for Tf 2.1
163-
# 6 weights, 1 loss, 2 metrics, 5 optimizer variables for Tf 2.2
167+
assert len(tr.tensor_names()) == (
168+
6 + 1 + 2 + 5 + 1 if is_tf_2_2() else 6 + 1 + 3 + 5 + 1
169+
)
170+
# 6 weights, 1 loss, 3 metrics, 5 optimizer variables for Tf 2.1, 1 scalar
171+
# 6 weights, 1 loss, 2 metrics, 5 optimizer variables for Tf 2.2, 1 scalar
164172
else:
165173
assert len(tr.tensor_names()) == (6 + 6 + 1 + 3 + strategy.num_replicas_in_sync * 3 + 5)
166174
else:
@@ -235,20 +243,21 @@ def test_tf_keras(out_dir, tf_eager_mode, include_workers="all"):
235243
@pytest.mark.slow
236244
@pytest.mark.parametrize("workers", ["one", "all"])
237245
def test_save_all(out_dir, tf_eager_mode, workers):
238-
strategy = train_model(
246+
save_config = SaveConfig(save_steps=[5])
247+
strategy, saved_scalars = train_model(
239248
out_dir,
240249
include_collections=None,
241250
save_all=True,
242-
save_config=SaveConfig(save_steps=[5]),
251+
save_config=save_config,
243252
steps=["train"],
244253
eager=tf_eager_mode,
245254
include_workers=workers,
246255
)
247256
tr = create_trial_fast_refresh(out_dir)
248257
print(tr.tensor_names())
249258
if tf_eager_mode:
250-
assert len(tr.tensor_names()) == (6 + 2 + 1 + 5 if is_tf_2_2() else 6 + 3 + 1 + 5)
251-
# weights, metrics, losses, optimizer variables
259+
assert len(tr.tensor_names()) == (6 + 2 + 1 + 5 + 1 if is_tf_2_2() else 6 + 3 + 1 + 5 + 1)
260+
# weights, metrics, losses, optimizer variables, scalar
252261
else:
253262
assert (
254263
len(tr.tensor_names())
@@ -266,6 +275,7 @@ def test_save_all(out_dir, tf_eager_mode, workers):
266275
assert len(tr.tensor(tname).workers(0)) == (
267276
1 if workers == "one" else strategy.num_replicas_in_sync
268277
)
278+
verify_files(out_dir, save_config, saved_scalars)
269279

270280

271281
@pytest.mark.slow
@@ -350,7 +360,7 @@ def test_include_regex(out_dir, tf_eager_mode, workers):
350360
include_workers=workers,
351361
)
352362
hook.get_collection("custom_coll").include("dense")
353-
strategy = train_model(out_dir, hook=hook, steps=["train"], eager=tf_eager_mode)
363+
strategy, _ = train_model(out_dir, hook=hook, steps=["train"], eager=tf_eager_mode)
354364

355365
tr = create_trial_fast_refresh(out_dir)
356366
tnames = tr.tensor_names(collection="custom_coll")
@@ -378,7 +388,7 @@ def test_include_regex_opt_var(out_dir, tf_eager_mode, workers):
378388
include_workers=workers,
379389
)
380390
hook.get_collection("custom_optimizer_variables").include("Adam")
381-
strategy = train_model(out_dir, hook=hook, steps=["train"], eager=tf_eager_mode)
391+
strategy, _ = train_model(out_dir, hook=hook, steps=["train"], eager=tf_eager_mode)
382392

383393
tr = create_trial_fast_refresh(out_dir)
384394
tnames = tr.tensor_names(collection="custom_optimizer_variables")
@@ -411,11 +421,11 @@ def test_clash_with_tb_callback(out_dir):
411421
add_callbacks=["tensorboard"],
412422
)
413423
tr = create_trial_fast_refresh(out_dir)
414-
assert len(tr.tensor_names()) == (9 if is_tf_2_2() else 10)
424+
assert len(tr.tensor_names()) == (10 if is_tf_2_2() else 11)
415425

416426

417427
def test_one_device(out_dir, tf_eager_mode):
418-
strategy = train_model(
428+
strategy, _ = train_model(
419429
out_dir,
420430
include_collections=[
421431
CollectionKeys.WEIGHTS,

0 commit comments

Comments
 (0)