29
29
import tensorflow .compat .v2 as tf
30
30
import tensorflow_datasets .public_api as tfds
31
31
32
+ if six .PY3 :
33
+ import bz2 # pylint:disable=g-import-not-at-top
34
+ else :
35
+ # py2's built-in bz2 package does not support reading from file objects.
36
+ import bz2file as bz2 # pylint:disable=g-import-not-at-top
37
+
32
38
33
39
_CITATION = """\
34
40
@ONLINE {wikidump,
@@ -179,7 +185,7 @@ def _base_url(lang):
179
185
xml_urls .append (_base_url (lang ) + fname )
180
186
181
187
# Use dictionary since testing mock always returns the same result.
182
- downloaded_files = dl_manager .download_and_extract ({"xml" : xml_urls })
188
+ downloaded_files = dl_manager .download ({"xml" : xml_urls })
183
189
184
190
return [
185
191
tfds .core .SplitGenerator ( # pylint:disable=g-complex-comprehension
@@ -196,38 +202,46 @@ def _extract_content(filepath):
196
202
"""Extracts article content from a single WikiMedia XML file."""
197
203
logging .info ("generating examples from = %s" , filepath )
198
204
with tf .io .gfile .GFile (filepath , "rb" ) as f :
205
+ f = bz2 .BZ2File (filename = f )
199
206
if six .PY3 :
200
207
# Workaround due to:
201
208
# https://github.com/tensorflow/tensorflow/issues/33563
202
209
utf_f = codecs .getreader ("utf-8" )(f )
203
210
else :
204
211
utf_f = f
205
- for _ , elem in etree .iterparse (utf_f , events = ("end" ,)):
212
+
213
+ # To clear root, to free-up more memory than just `elem.clear()`.
214
+ context = etree .iterparse (utf_f , events = ("end" ,))
215
+ context = iter (context )
216
+ unused_event , root = next (context )
217
+ for unused_event , elem in context :
206
218
if not elem .tag .endswith ("page" ):
207
219
continue
208
220
namespace = elem .tag [:- 4 ]
209
221
title = elem .find ("./{0}title" .format (namespace )).text
210
222
ns = elem .find ("./{0}ns" .format (namespace )).text
223
+ id_ = elem .find ("./{0}id" .format (namespace )).text
211
224
212
225
# Filter pages that are not in the "main" namespace.
213
226
if ns != "0" :
227
+ root .clear ()
214
228
continue
215
229
216
230
raw_content = elem .find (
217
231
"./{0}revision/{0}text" .format (namespace )).text
218
- elem .clear ()
232
+ root .clear ()
219
233
220
234
# Filter redirects.
221
235
if raw_content is None or raw_content .lower ().startswith ("#redirect" ):
222
236
beam .metrics .Metrics .counter (language , "filtered-redirects" ).inc ()
223
237
continue
224
238
225
239
beam .metrics .Metrics .counter (language , "extracted-examples" ).inc ()
226
- yield (title , raw_content )
240
+ yield (id_ , title , raw_content )
227
241
228
242
def _clean_content (inputs ):
229
243
"""Cleans raw wikicode to extract text."""
230
- title , raw_content = inputs
244
+ id_ , title , raw_content = inputs
231
245
try :
232
246
text = _parse_and_clean_wikicode (raw_content )
233
247
except (
@@ -242,7 +256,7 @@ def _clean_content(inputs):
242
256
243
257
beam .metrics .Metrics .counter (language , "cleaned-examples" ).inc ()
244
258
245
- yield title , {
259
+ yield id_ , {
246
260
"title" : title ,
247
261
"text" : text
248
262
}
@@ -251,6 +265,7 @@ def _clean_content(inputs):
251
265
pipeline
252
266
| beam .Create (filepaths )
253
267
| beam .FlatMap (_extract_content )
268
+ | beam .transforms .Reshuffle ()
254
269
| beam .FlatMap (_clean_content )
255
270
)
256
271
0 commit comments