Skip to content

Commit 1d5f734

Browse files
authored
infra: add pipemode integ tests. (#302)
1 parent 6a4ddbb commit 1d5f734

File tree

3 files changed

+374
-0
lines changed

3 files changed

+374
-0
lines changed
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"). You
4+
# may not use this file except in compliance with the License. A copy of
5+
# the License is located at
6+
#
7+
# http://aws.amazon.com/apache2.0/
8+
#
9+
# or in the "license" file accompanying this file. This file is
10+
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11+
# ANY KIND, either express or implied. See the License for the specific
12+
# language governing permissions and limitations under the License.
13+
from __future__ import absolute_import
14+
15+
import argparse
16+
from random import randint
17+
import struct
18+
import sys
19+
20+
import numpy as np
21+
import tensorflow as tf
22+
23+
# Utility functions for generating a recordio encoded file of labeled numpy data
24+
# for testing. Each file contains one or more records. Each record is a TensorFlow
25+
# protobuf Example object. Each object contains an integer label and a numpy array
26+
# encoded as a byte list.
27+
28+
# This file can be used in script mode to generate a single file or be used
29+
# as a module to generate files via build_record_file.
30+
31+
_kmagic = 0xced7230a
32+
33+
padding = {}
34+
for amount in range(4):
35+
if sys.version_info >= (3,):
36+
padding[amount] = bytes([0x00 for _ in range(amount)])
37+
else:
38+
padding[amount] = bytearray([0x00 for _ in range(amount)])
39+
40+
41+
def write_recordio(f, data, header_flag=0):
42+
"""Writes a single data point as a RecordIO record to the given file."""
43+
length = len(data)
44+
f.write(struct.pack('I', _kmagic))
45+
header = (header_flag << 29) | length
46+
f.write(struct.pack('I', header))
47+
pad = (((length + 3) >> 2) << 2) - length
48+
f.write(data)
49+
f.write(padding[pad])
50+
51+
52+
def write_recordio_multipart(f, data):
53+
"""Writes a single data point into three multipart records."""
54+
length = len(data)
55+
stride = int(length / 3)
56+
57+
data_start = data[0:stride]
58+
data_middle = data[stride:2 * stride]
59+
data_end = data[2 * stride:]
60+
61+
write_recordio(f, data_start, 1)
62+
write_recordio(f, data_middle, 2)
63+
write_recordio(f, data_end, 3)
64+
65+
66+
def string_feature(value):
67+
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value.tostring()]))
68+
69+
70+
def label_feature(value):
71+
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
72+
73+
74+
def write_numpy_array(f, feature_name, label, arr, multipart=False):
75+
feature = {'labels': label_feature(label), feature_name: string_feature(arr)}
76+
example = tf.train.Example(features=tf.train.Features(feature=feature))
77+
if multipart:
78+
write_recordio_multipart(f, example.SerializeToString())
79+
else:
80+
write_recordio(f, example.SerializeToString())
81+
82+
83+
def build_record_file(filename, num_records, dimension, classes=2, data_feature_name='data', multipart=False):
84+
"""Builds a recordio encoded file of TF protobuf Example objects. Each object
85+
is a labeled numpy array. Each example has two field - a single int64 'label'
86+
field and a single bytes list field, containing a serialized numpy array.
87+
88+
Each generated numpy array is a multidimensional normal with
89+
the specified dimension. The normal distribution is class specific, each class
90+
has a different mean for the distribution, so it should be possible to learn
91+
a multiclass classifier on this data. Class means are determnistic - so multiple
92+
calls to this function with the same number of classes will produce samples drawn
93+
from the same distribution for each class.
94+
95+
Args:
96+
filename - the file to write to
97+
num_records - how many labeled numpy arrays to generate
98+
classes - the cardinality of labels
99+
data_feature_name - the name to give the numpy array in the Example object
100+
dimension - the size of each numpy array.
101+
"""
102+
with open(filename, 'wb') as f:
103+
for i in range(num_records):
104+
cur_class = i % classes
105+
loc = int(cur_class - (classes / 2))
106+
write_numpy_array(f, data_feature_name, cur_class, np.random.normal(loc=loc, size=(dimension,)), multipart)
107+
108+
109+
def build_single_record_file(filename, dimension, classes=2, data_feature_name='data'):
110+
cur_class = randint(0, classes - 1)
111+
loc = int(cur_class - (classes / 2))
112+
113+
arr = np.random.normal(loc=loc, size=(dimension,))
114+
feature = {'labels': label_feature(cur_class), data_feature_name: string_feature(arr)}
115+
example = tf.train.Example(features=tf.train.Features(feature=feature))
116+
with open(filename, 'wb') as f:
117+
f.write(example.SerializeToString())
118+
119+
120+
def validate_record_file(filename, dimension):
121+
data = open(filename, 'rb').read()
122+
magic_number, length = struct.unpack('II', data[0:8])
123+
encoded = data[8:8 + length]
124+
125+
features = {
126+
'data': tf.io.FixedLenFeature([], tf.string),
127+
'labels': tf.io.FixedLenFeature([], tf.int64),
128+
}
129+
parsed = tf.io.parse_single_example(encoded, features)
130+
array = tf.io.decode_raw(parsed['data'], tf.float64)
131+
132+
assert array.shape[0] == dimension
133+
134+
135+
if __name__ == '__main__':
136+
parser = argparse.ArgumentParser(description="Generate synthetic multi-class training data")
137+
parser.add_argument('--dimension', default=65536, type=int)
138+
parser.add_argument('--classes', default=2, type=int)
139+
parser.add_argument('--num-records', default=4, type=int)
140+
parser.add_argument('--data-feature-name', default='data')
141+
parser.add_argument('filename', type=str)
142+
args = parser.parse_args()
143+
build_record_file(args.filename, args.num_records, args.dimension, args.classes, args.data_feature_name)
144+
validate_record_file(args.filename, args.dimension)
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"). You
4+
# may not use this file except in compliance with the License. A copy of
5+
# the License is located at
6+
#
7+
# http://aws.amazon.com/apache2.0/
8+
#
9+
# or in the "license" file accompanying this file. This file is
10+
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11+
# ANY KIND, either express or implied. See the License for the specific
12+
# language governing permissions and limitations under the License.
13+
from __future__ import absolute_import
14+
15+
import os
16+
import shutil
17+
import uuid
18+
19+
import pytest
20+
from recordio_utils import build_record_file, build_single_record_file
21+
from sagemaker import s3_input
22+
from sagemaker.tensorflow import TensorFlow
23+
24+
from test.integration.utils import processor, py_version, unique_name_from_base # noqa: F401
25+
from timeout import timeout
26+
27+
DIMENSION = 5
28+
29+
30+
def make_test_data(directory, name, num_files, num_records, dimension, sagemaker_session):
31+
if not os.path.exists('test-data'):
32+
os.makedirs('test-data')
33+
for i in range(num_files):
34+
if num_records > 1:
35+
build_record_file(os.path.join(directory, name + str(i)),
36+
num_records=num_records, dimension=dimension)
37+
else:
38+
build_single_record_file(os.path.join(directory, name + str(i)),
39+
dimension=dimension)
40+
41+
return sagemaker_session.upload_data(path=os.path.join(directory),
42+
key_prefix='pipemode-{}-files'.format(name))
43+
44+
45+
@pytest.fixture(scope='session')
46+
def multi_records_test_data(sagemaker_session):
47+
test_data = 'test-data-' + str(uuid.uuid4())
48+
os.makedirs(test_data)
49+
s3_url = make_test_data(
50+
directory=test_data,
51+
name='multi',
52+
num_files=1,
53+
num_records=1000,
54+
dimension=DIMENSION,
55+
sagemaker_session=sagemaker_session)
56+
shutil.rmtree(test_data)
57+
return s3_url
58+
59+
60+
@pytest.fixture(scope='session')
61+
def single_record_test_data(sagemaker_session):
62+
test_data = 'test-data-' + str(uuid.uuid4())
63+
os.makedirs(test_data)
64+
s3_url = make_test_data(
65+
directory=test_data,
66+
name='single',
67+
num_files=100,
68+
num_records=1,
69+
dimension=DIMENSION,
70+
sagemaker_session=sagemaker_session)
71+
shutil.rmtree(test_data)
72+
return s3_url
73+
74+
75+
def run_test(sagemaker_session, ecr_image, instance_type, framework_version, test_data,
76+
record_wrapper_type=None):
77+
source_path = os.path.join(os.path.dirname(__file__), '..', '..', 'resources', 'pipemode')
78+
script = os.path.join(source_path, 'pipemode.py')
79+
estimator = TensorFlow(entry_point=script,
80+
role='SageMakerRole',
81+
train_instance_type=instance_type,
82+
train_instance_count=1,
83+
sagemaker_session=sagemaker_session,
84+
image_name=ecr_image,
85+
framework_version=framework_version,
86+
script_mode=True,
87+
input_mode='Pipe',
88+
hyperparameters={'dimension': DIMENSION})
89+
input = s3_input(s3_data=test_data,
90+
distribution='FullyReplicated',
91+
record_wrapping=record_wrapper_type,
92+
input_mode='Pipe')
93+
with timeout(minutes=20):
94+
estimator.fit({'elizabeth': input},
95+
job_name=unique_name_from_base('test-sagemaker-pipemode'))
96+
97+
98+
def test_single_record(sagemaker_session, ecr_image, instance_type, framework_version,
99+
single_record_test_data):
100+
run_test(sagemaker_session,
101+
ecr_image,
102+
instance_type,
103+
framework_version,
104+
single_record_test_data,
105+
'RecordIO')
106+
107+
108+
def test_multi_records(sagemaker_session, ecr_image, instance_type, framework_version,
109+
multi_records_test_data):
110+
run_test(sagemaker_session,
111+
ecr_image,
112+
instance_type,
113+
framework_version,
114+
multi_records_test_data)

test/resources/pipemode/pipemode.py

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
import json
2+
import multiprocessing
3+
import os
4+
import tempfile
5+
6+
import tensorflow as tf
7+
from sagemaker_tensorflow import PipeModeDataset
8+
9+
print("Starting estimator script")
10+
11+
ds = PipeModeDataset("elizabeth",benchmark=True)
12+
13+
14+
class BenchmarkConfig(object):
15+
16+
def __init__(self):
17+
self.hp = json.load(open('/opt/ml/input/config/hyperparameters.json'))
18+
19+
@property
20+
def batch_size(self):
21+
return int(self.hp.get('batch_size', 5))
22+
23+
@property
24+
def prefetch_size(self):
25+
return int(self.hp.get('prefetch_size', 1000))
26+
27+
@property
28+
def channel(self):
29+
return self.hp.get('channel', 'elizabeth')
30+
31+
@property
32+
def dimension(self):
33+
return int(self.hp['dimension'])
34+
35+
@property
36+
def epochs(self):
37+
return int(self.hp.get('epochs', 3))
38+
39+
@property
40+
def parallel_transform_calls(self):
41+
return int(self.hp.get('parallel_transform_calls', max(1, multiprocessing.cpu_count() - 2)))
42+
43+
def __repr__(self):
44+
"""Return all properties"""
45+
return str(vars(self))
46+
47+
48+
config = BenchmarkConfig()
49+
50+
51+
def input_fn():
52+
features = {
53+
'data': tf.io.FixedLenFeature([], tf.string),
54+
'labels': tf.io.FixedLenFeature([], tf.int64),
55+
}
56+
57+
def parse(record):
58+
parsed = tf.io.parse_single_example(serialized=record, features=features)
59+
return ({
60+
'data': tf.io.decode_raw(parsed['data'], tf.float64)
61+
}, parsed['labels'])
62+
63+
ds = PipeModeDataset(config.channel)
64+
65+
if config.epochs > 1:
66+
ds = ds.repeat(config.epochs)
67+
if config.prefetch_size > 0:
68+
ds = ds.prefetch(config.prefetch_size)
69+
ds = ds.map(parse, num_parallel_calls=config.parallel_transform_calls)
70+
ds = ds.batch(config.batch_size)
71+
return ds
72+
73+
74+
# Perform Estimator training
75+
column = tf.feature_column.numeric_column('data', shape=(config.dimension, ))
76+
model_dir = tempfile.mkdtemp()
77+
estimator = tf.estimator.LinearClassifier(feature_columns=[column])
78+
79+
print("About to call train")
80+
estimator.train(input_fn=input_fn)
81+
82+
# Confirm that we have read the correct number of pipes
83+
assert os.path.exists('/opt/ml/input/data/{}_{}'.format(config.channel, config.epochs + 1))
84+
85+
print("About to call evaluate")
86+
result = estimator.evaluate(input_fn=input_fn)
87+
for key, value in sorted(result.items()):
88+
print('%s: %s' % (key, value))
89+
90+
91+
# Test that we can create a new PipeModeDataset after training has run
92+
print("Validate that new PipeModeDataset on existing channel can be created")
93+
tf.compat.v1.disable_eager_execution()
94+
95+
ds = PipeModeDataset(config.channel,benchmark=True)
96+
with tf.compat.v1.Session() as sess:
97+
it = tf.compat.v1.data.make_one_shot_iterator(ds)
98+
next = it.get_next()
99+
sess.run(next)
100+
101+
print("Validate create, read, discard, recreate")
102+
103+
# Test that we can create a PipeModeDataset, discard it, and read from a new one
104+
ds = PipeModeDataset(config.channel,benchmark=True)
105+
with tf.compat.v1.Session() as sess:
106+
it = tf.compat.v1.data.make_one_shot_iterator(ds)
107+
next = it.get_next()
108+
109+
110+
with tf.compat.v1.Session() as sess:
111+
it = tf.compat.v1.data.make_one_shot_iterator(ds)
112+
next = it.get_next()
113+
sess.run(next)
114+
115+
print("Validate recreate")
116+
ds = PipeModeDataset(config.channel,benchmark=True)

0 commit comments

Comments
 (0)