Skip to content

Commit 42f5bf8

Browse files
adarobcopybara-github
authored andcommitted
Reduce disk usage for C4 generation by deduping on lines (instead of sentence windows) and using hashed text as key.
PiperOrigin-RevId: 281192218
1 parent bda77c7 commit 42f5bf8

File tree

3 files changed

+93
-183
lines changed

3 files changed

+93
-183
lines changed

tensorflow_datasets/text/c4.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,11 @@
4848
}
4949
"""
5050
_VERSION = tfds.core.Version(
51-
"1.0.1", experiments={tfds.core.Experiment.S3: False})
51+
"1.1.0", experiments={tfds.core.Experiment.S3: False})
5252
_SUPPORTED_VERSIONS = [
53-
tfds.core.Version("1.0.0", experiments={tfds.core.Experiment.S3: False})]
53+
tfds.core.Version("1.0.0", experiments={tfds.core.Experiment.S3: False}),
54+
tfds.core.Version("1.0.1", experiments={tfds.core.Experiment.S3: False}),
55+
]
5456

5557
_DOWNLOAD_HOST = "https://commoncrawl.s3.amazonaws.com"
5658
_WET_PATH_URL = "https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-{cc_version}/wet.paths.gz"

tensorflow_datasets/text/c4_utils.py

Lines changed: 50 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -209,115 +209,102 @@ def clean_page(url_and_features,
209209
yield url, features
210210

211211

212-
def _emit_url_to_sentences(page, max_window_size):
213-
"""Emits url to all (lower-cased) sentences grouped by sliding window."""
212+
def _hash_line(line):
213+
m = hashlib.md5()
214+
m.update(tf.compat.as_text(line).encode("utf-8").strip().lower())
215+
return m.hexdigest()
216+
217+
218+
def _emit_url_to_lines(page):
219+
"""Emits url to all (lower-cased, hashed) lines."""
214220
url, features = page
215221
text = features["text"]
216-
for sentences in _get_sentences_by_line(text, lower=True):
217-
# We don't want to emit windows where all "sentences" are just endmarks
218-
# (e.g., "! ! !").
219-
is_solo_endmark = [w in _END_MARKS for w in sentences]
220-
for i in range(len(sentences) - min(len(sentences), max_window_size) + 1):
221-
if not all(is_solo_endmark[i:i+max_window_size]):
222-
yield tuple(sentences[i:i+max_window_size]), url
223-
224-
225-
def _emit_sentences_to_urls(el, counter_inc_fn, skip_n=1):
226-
"""Emits sentences to all but `skip_n` urls."""
227-
sentences, urls = el
222+
for line in text.split("\n"):
223+
yield _hash_line(line), url
224+
225+
226+
def _emit_line_to_urls(el, counter_inc_fn, skip_n=1):
227+
"""Emits (hashed) line to all but `skip_n` urls."""
228+
line, urls = el
228229
# Hash urls and sort to have a consistent, but unbiased, selection when the
229-
# same urls exist for multiple sentences.
230+
# same urls exist for multiple lines.
230231
sorted_urls = sorted(
231232
urls,
232233
key=lambda x: hashlib.md5(tf.compat.as_text(x).encode("utf-8")).
233234
hexdigest())
234235
del sorted_urls[:skip_n]
235236
if sorted_urls:
236-
counter_inc_fn("emitted-sentences-duplicate")
237-
logging.info(
238-
"Emitting sentences to %d documents: %s", len(sorted_urls), sentences)
237+
counter_inc_fn("emitted-lines-duplicate")
239238
for url in sorted_urls:
240-
yield url, sentences
239+
yield url, line
241240

242241

243-
def _remove_sentences_from_text(
244-
el, counter_inc_fn, max_window_size,
245-
min_num_sentences=_MIN_NUM_SENTENCES):
246-
"""Removes matching sentence windows from the page.
242+
def _remove_lines_from_text(
243+
el, counter_inc_fn, min_num_sentences=_MIN_NUM_SENTENCES):
244+
"""Removes matching lines from the page.
247245
248246
Process the result of a join containing a single value for 'features' and zero
249-
or more values for 'sentences'. Each value in 'sentences' is a tuple
250-
containing a window of one or more sentences.
247+
or more values for 'lines'. Each value in 'lines' is a lower-cased, hashed
248+
line.
251249
252250
If a line has fewer sentences than `max_window_size`, the full line is
253251
compared for a match.
254252
255253
Args:
256-
el: `(string, {'features': [string], 'sentences': [tuple(string)]})`,
254+
el: `(string, {'features': features_dict, 'lines': [string]})`,
257255
element containing the result of a join on key with both the page text
258-
and lower-cased sentence windows to remove.
256+
and lower-cased, hashed lines to remove.
259257
counter_inc_fn: function, a function taking the name of a counter to be
260258
incremented and the (optional) amount.
261-
max_window_size: int, the maximum size of a sentence window to slide across
262-
lines.
263259
min_num_sentences: int, the minimum number of sentences a page needs to not
264260
be skipped.
265261
266262
Yields:
267263
url: The URL of the page.
268-
features: The page features with sentences removed.
264+
features: The page features with lines removed from text.
269265
"""
270266
url, join_values = el
271267
features = join_values["features"]
272268

273-
assert len(features) == 1, "Invalid page count (%d) for %s" % (len(features),
274-
url)
269+
assert len(features) == 1, "Invalid page count (%d) for %s" % (
270+
len(features), url)
275271
features = features[0]
276272
text = features["text"]
277-
sentences_to_remove = set(join_values["sentences"])
278-
sentences_by_line = _get_sentences_by_line(text, lower=False)
279-
new_sentences_by_line = []
280-
for line_sentences in sentences_by_line:
281-
indices_to_remove = set()
282-
for i in range(
283-
len(line_sentences) - min(len(line_sentences), max_window_size) + 1):
284-
sentence_window = tuple(
285-
s.lower() for s in line_sentences[i:i+max_window_size])
286-
if sentence_window in sentences_to_remove:
287-
indices_to_remove.update(range(i, i+len(sentence_window)))
288-
counter_inc_fn("filtered-sentence-duplicate", len(indices_to_remove))
289-
new_line_sentences = [
290-
s for i, s in enumerate(line_sentences) if i not in indices_to_remove]
291-
if new_line_sentences:
292-
new_sentences_by_line.append(new_line_sentences)
293-
if sum(len(sents) for sents in new_sentences_by_line) < min_num_sentences:
273+
lines_to_remove = set(join_values["lines"])
274+
new_lines = []
275+
for line in text.split("\n"):
276+
if _hash_line(line) in lines_to_remove:
277+
counter_inc_fn("filtered-lines-duplicate")
278+
else:
279+
new_lines.append(line)
280+
new_text = "\n".join(new_lines)
281+
if len(_get_sentences(new_text)) < min_num_sentences:
294282
counter_inc_fn("filtered-doc-toofewsentences")
295283
return
296-
features["text"] = "\n".join(" ".join(sent) for sent in new_sentences_by_line)
297-
yield (url, features)
284+
new_features = features.copy()
285+
new_features["text"] = new_text
286+
yield (url, new_features)
298287

299288

300-
def remove_duplicate_text(pages, sentence_window_size=3):
301-
"""Utility to remove duplicate sentence windows across text documents."""
302-
# Output: url, sentence
289+
def remove_duplicate_text(pages):
290+
"""Utility to remove duplicate lines across text documents."""
291+
# Output: url, lines
303292
beam = tfds.core.lazy_imports.apache_beam
304-
counter_inc_fn = get_counter_inc_fn("dedupe-sentences")
305-
sentences_to_remove = (
293+
counter_inc_fn = get_counter_inc_fn("dedupe-lines")
294+
lines_to_remove = (
306295
pages
307-
| beam.FlatMap(_emit_url_to_sentences,
308-
max_window_size=sentence_window_size)
296+
| beam.FlatMap(_emit_url_to_lines)
309297
| "group_sentences" >> beam.GroupByKey()
310-
| beam.FlatMap(_emit_sentences_to_urls, counter_inc_fn=counter_inc_fn))
298+
| beam.FlatMap(_emit_line_to_urls, counter_inc_fn=counter_inc_fn))
311299

312300
# Output: url, text
313301
final_docs = ({
314302
"features": pages,
315-
"sentences": sentences_to_remove
303+
"lines": lines_to_remove
316304
}
317-
| "group_text_and_sentences_by_url" >> beam.CoGroupByKey()
305+
| "group_features_and_lines_by_url" >> beam.CoGroupByKey()
318306
| beam.FlatMap(
319-
_remove_sentences_from_text,
320-
max_window_size=sentence_window_size,
307+
_remove_lines_from_text,
321308
counter_inc_fn=counter_inc_fn))
322309

323310
return final_docs

tensorflow_datasets/text/c4_utils_test.py

Lines changed: 39 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import six
2727
from tensorflow_datasets import testing
28+
from tensorflow_datasets.core.lazy_imports_lib import lazy_imports
2829
from tensorflow_datasets.text import c4_utils
2930

3031
EN_TEXT = """This line has enough words and ends in punctuation, Dr. Roberts!
@@ -230,125 +231,45 @@ def test_clean_page_policy(self):
230231
self.assertEqual(expected_clean_text, out["text"])
231232
self.assertEqual(expected_counters, dict(counters))
232233

233-
def test_emit_url_to_sentences(self):
234-
# Try with punkt language (en).
235-
expected_sentences = (
236-
("this line has enough words and ends in punctuation, dr. roberts!",),
237-
("\"open access.", "powered by scholars.",
238-
"published by universities.\""),
239-
("sentence 1.", "sentence 2.", "sentence 3."),
240-
("sentence 2.", "sentence 3.", "sentence 4."),
241-
("another sentence.", ".", "."),
242-
)
243-
results = c4_utils._emit_url_to_sentences(("url", {
244-
"text":
245-
EXPECTED_CLEAN_EN +
246-
"\nSentence 1. Sentence 2. Sentence 3. Sentence 4."
247-
"\nAnother sentence. . . ? ? ! ! !",
248-
"content-type":
249-
FAKE_CONTENT_TYPE,
250-
"content-length":
251-
FAKE_CONTENT_LENGTH,
252-
"timestamp":
253-
FAKE_TIMESTAMP
254-
}),
255-
max_window_size=3)
256-
ret_sentences, ret_urls = zip(*results)
257-
self.assertEqual(("url",) * len(expected_sentences), ret_urls)
258-
self.assertEqual(expected_sentences, ret_sentences)
259-
260-
def test_emit_sentences_to_urls(self):
261-
counters, counter_inc_fn = _get_counters()
262-
urls = ["urlA", "urlB", "urlC", "urlD"]
263-
sentence = "test sentence."
264-
expected_urls = ("urlA", "urlD")
265-
results = c4_utils._emit_sentences_to_urls((sentence, urls),
266-
counter_inc_fn,
267-
skip_n=2)
268-
ret_urls, ret_sentences = zip(*results)
269-
self.assertEqual(expected_urls, ret_urls)
270-
self.assertEqual((sentence,) * 2, ret_sentences)
271-
self.assertEqual({"emitted-sentences-duplicate": 1}, dict(counters))
272-
273-
def test_remove_sentences_from_page(self):
274-
counters, counter_inc_fn = _get_counters()
275-
sentences_to_remove = [
276-
("this line has enough words and ends in punctuation, dr. roberts!",),
277-
("sentence 1.", "sentence 2.", "sentence 3."),
278-
("sentence 3.", "sentence 4."), # no match
279-
("sentence 1.", "sentence 3.", "sentence 4."), # no match
280-
("sentence 3.", "sentence 4.", "sentence 5."), # no match
234+
def test_remove_duplicate_text(self):
235+
import apache_beam.testing.util as beam_testing_util # pylint:disable=g-import-not-at-top
236+
beam = lazy_imports.apache_beam
237+
input_urls_and_text = [
238+
("url/1-0",
239+
"This is a duplicated line.\nThis is a unique line.\n"
240+
"This one comes first and so it stays."),
241+
("url/2-1",
242+
"This is 2nd unique line.\nThis one comes second so it is removed "
243+
"even though the capitalizaiton is different.\n"
244+
"this is a Duplicated line. "),
245+
("url/3-4",
246+
"This is a 3rd unique line.\nThis is a duplicated line.\n"
247+
"This one comes third and so it is removed. But the page stays "
248+
"because there are still 3 sentences remaining."),
249+
("url/4-4",
250+
"This is a 4th unique line.\nThis is a duplicated line.\n"
251+
"This one comes third and so it is removed, and the page is too "
252+
"since there aren't enough sentences left."),
281253
]
282-
text = (
283-
EXPECTED_CLEAN_EN + "\nSentence 1. Sentence 2. Sentence 3. Sentence 4.")
284-
expected_features = {
285-
"text": ("\"Open Access. Powered by Scholars. "
286-
"Published by Universities.\"\nSentence 4."),
287-
"content-type": FAKE_CONTENT_TYPE,
288-
"content-length": FAKE_CONTENT_LENGTH,
289-
"timestamp": FAKE_TIMESTAMP
290-
}
291-
result = list(
292-
c4_utils._remove_sentences_from_text(("url", {
293-
"features": [{
294-
"text": text,
295-
"content-type": FAKE_CONTENT_TYPE,
296-
"content-length": FAKE_CONTENT_LENGTH,
297-
"timestamp": FAKE_TIMESTAMP
298-
}],
299-
"sentences": sentences_to_remove
300-
}),
301-
max_window_size=3,
302-
counter_inc_fn=counter_inc_fn))
303-
self.assertEqual([("url", expected_features)], result)
304-
self.assertEqual({"filtered-sentence-duplicate": 4}, dict(counters))
305-
306-
counters.clear()
307-
sentences_to_remove.append(("sentence 2.", "sentence 3.", "sentence 4."))
308-
expected_features = {
309-
"text":
310-
("\"Open Access. Powered by Scholars. Published by Universities.\""
311-
),
312-
"content-type": FAKE_CONTENT_TYPE,
313-
"content-length": FAKE_CONTENT_LENGTH,
314-
"timestamp": FAKE_TIMESTAMP
315-
}
316-
result = list(
317-
c4_utils._remove_sentences_from_text(("url", {
318-
"features": [{
319-
"text": text,
320-
"content-type": FAKE_CONTENT_TYPE,
321-
"content-length": FAKE_CONTENT_LENGTH,
322-
"timestamp": FAKE_TIMESTAMP
323-
}],
324-
"sentences": sentences_to_remove
325-
}),
326-
counter_inc_fn=counter_inc_fn,
327-
max_window_size=3,
328-
min_num_sentences=3))
329-
self.assertEqual([("url", expected_features)], result)
330-
self.assertEqual({"filtered-sentence-duplicate": 5}, dict(counters))
331-
332-
counters.clear()
333-
result = list(
334-
c4_utils._remove_sentences_from_text(("url", {
335-
"features": [{
336-
"text": text,
337-
"content-type": FAKE_CONTENT_TYPE,
338-
"content-length": FAKE_CONTENT_LENGTH,
339-
"timestamp": FAKE_TIMESTAMP
340-
}],
341-
"sentences": sentences_to_remove
342-
}),
343-
counter_inc_fn=counter_inc_fn,
344-
max_window_size=3,
345-
min_num_sentences=4))
346-
self.assertEqual([], result)
347-
self.assertEqual(
348-
{
349-
"filtered-sentence-duplicate": 5,
350-
"filtered-doc-toofewsentences": 1
351-
}, dict(counters))
254+
expected_urls_and_text = [
255+
("url/1-0",
256+
"This is a duplicated line.\nThis is a unique line.\n"
257+
"This one comes first and so it stays."),
258+
("url/3-4",
259+
"This is a 3rd unique line.\n"
260+
"This one comes third and so it is removed. But the page stays "
261+
"because there are still 3 sentences remaining."),
262+
]
263+
with beam.Pipeline() as pipeline:
264+
pages = pipeline | beam.Create([
265+
(url, {"text": text}) for url, text in input_urls_and_text
266+
])
267+
deduped_pages = c4_utils.remove_duplicate_text(pages)
268+
beam_testing_util.assert_that(
269+
deduped_pages,
270+
beam_testing_util.equal_to([
271+
(url, {"text": text}) for url, text in expected_urls_and_text
272+
]))
352273

353274
def test_split_wet_file(self):
354275
if six.PY2:

0 commit comments

Comments
 (0)