Skip to content

Deduplication is non-deterministic (and destructive) #778

@BramVanroy

Description

@BramVanroy

In light of the large FineWeb-2 corpora, I came across an inconsistency in the data where extracted data seems not reproducible. After chasing it down, with help by @guipenedo, I found that this is caused by trafilatura's deduplicate option.

  • When enabled, it does not seem to be deterministic: running extract(... deduplicate=True) on the same data can return different results
  • Some (destructive) caching seems to be going on. In the code below, I have a loop that runs 11 times, and in each loop: a file's HTML contents are read, and trafilatura extract is called (with deduplication). But I find that sometimes after a few runs, the contents changes (often run 4 and 7). I find that in run 7, the extraction often ends up as None.

So to me it seems that trafilatura is doing some caching and after a while it will try to update the cache with new info, but in a destructive manner so that at each cache update, more data is removed. What I think is problematic is that this happens even with newly read data. As I said, at each iteration I read the file contents again from disk, and still some other types of caching seem to be happening that ignore the new input.

I am not entirely sure if this is expected behavior in the deduplication algorithm, but I do not think that many people would expect such behavior. No such issue occurs when deduplication is disabled.

Note: I am on trafilatura==1.11.0 but I also tried on 2.0.0 and the issue persists.


Reproducible code (you can also decrease the number of items in uid_and_file_paths: https://gist.github.com/BramVanroy/c7e9778aa1f4259f7066e22e2cd1aa3a or here:

from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from tqdm import tqdm
import s3fs
from warcio.archiveiterator import ArchiveIterator
from trafilatura import extract
from commoncrawl_cc_annotation.utils import extract_uuid


crawl = 'CC-MAIN-2024-10'

uid_and_file_paths = [
    ['<urn:uuid:02111860-def3-4ebd-a7c6-1d414c5ff523>', 's3://commoncrawl/crawl-data/CC-MAIN-2024-10/segments/1707947474533.12/warc/CC-MAIN-20240224112548-20240224142548-00100.warc.gz'],
    ['<urn:uuid:0ba01026-a2f0-4edc-8af8-c70aad5ec4fa>', 's3://commoncrawl/crawl-data/CC-MAIN-2024-10/segments/1707948223038.94/warc/CC-MAIN-20240305060427-20240305090427-00899.warc.gz'],
    ['<urn:uuid:0e1e10f3-be6d-47a1-9b6b-7fd6cbe6e700>', 's3://commoncrawl/crawl-data/CC-MAIN-2024-10/segments/1707948235171.95/warc/CC-MAIN-20240305124045-20240305154045-00400.warc.gz'],
    ['<urn:uuid:0e7553f1-2cd4-458d-b0e3-cb25b4e0fa55>', 's3://commoncrawl/crawl-data/CC-MAIN-2024-10/segments/1707947474690.22/warc/CC-MAIN-20240228012542-20240228042542-00792.warc.gz'],
    ['<urn:uuid:1062f399-eac1-41e4-845e-9ad6d8e69306>', 's3://commoncrawl/crawl-data/CC-MAIN-2024-10/segments/1707947474671.63/warc/CC-MAIN-20240227053544-20240227083544-00199.warc.gz'],
    ['<urn:uuid:12080c8f-978a-43ef-b16b-d17cdd72e584>', 's3://commoncrawl/crawl-data/CC-MAIN-2024-10/segments/1707947476137.72/warc/CC-MAIN-20240302215752-20240303005752-00570.warc.gz'],
    ['<urn:uuid:19ebb70e-6a89-4e8a-bce8-b9e5724a8c2a>', 's3://commoncrawl/crawl-data/CC-MAIN-2024-10/segments/1707947474808.39/warc/CC-MAIN-20240229103115-20240229133115-00808.warc.gz'],
    ['<urn:uuid:3a4823bb-9005-4d98-9ec7-e9e41a4f0e32>', 's3://commoncrawl/crawl-data/CC-MAIN-2024-10/segments/1707947474686.54/warc/CC-MAIN-20240227184934-20240227214934-00067.warc.gz'],
]

def extract_html_from_s3_warc(s3_path, record_id):
    # Create an S3 filesystem instance
    fs = s3fs.S3FileSystem()
    
    # Open the WARC file directly from S3
    with fs.open(s3_path, 'rb') as s3file:
        # Iterate through the WARC file using warcio
        for record in ArchiveIterator(s3file):
            # Check if the WARC record ID matches
            if record.rec_headers.get('WARC-Record-ID') == record_id:
                # Extract and return the HTML content
                payload = record.content_stream().read()
                return payload.decode('utf-8')
    return None

pdout = Path("tmp")
def process_record(uid, s3_path):
    file_uid = extract_uuid(uid)
    pfout = pdout / f"{file_uid}.html"
    if pfout.exists():
        return
    html = extract_html_from_s3_warc(s3_path, uid)
    if html:
        uid = extract_uuid(uid)
        with open(pdout / f"{uid}.html", "w") as f:
            f.write(html)
    else:
        print(f"Could not find HTML for {uid} in {s3_path}")

with ProcessPoolExecutor(max_workers=len(uid_and_file_paths)) as executor:
    futures = []
    for uid, s3_path in uid_and_file_paths:
        futures.append(executor.submit(process_record, uid, s3_path))
    
    for future in tqdm(as_completed(futures), total=len(futures), desc="Processing", unit="record"):
        future.result()

print("Done downloading")

def extract_html(phtml: Path, favor_precision: bool = False, include_images: bool = False, deduplicate: bool = False):
    html = phtml.read_text(encoding="utf-8")
    prev_text = ""
    for idx in range(1, 12):
        html = phtml.read_text(encoding="utf-8")
        text = extract(
            html,
            favor_precision=favor_precision,
            include_images=include_images,
            deduplicate=deduplicate,
        )
        if prev_text and text != prev_text:
            print(f"Extracted text for {phtml.stem} was different in run {idx}\n")
            print("PREVIOUS (1st 100chars):", prev_text[:100])
            print("CURRENT (1st 100chars):", text[:100] if isinstance(text, str) else text)
            print("\n--------------------\n")
        prev_text = text

for pfin in pdout.glob("*.html"):
    extract_html(
        pfin,
        favor_precision=True,
        include_images=False,
        deduplicate=True, # Change to False = NO ISSUE
    )

Metadata

Metadata

Assignees

No one assigned

    Labels

    questionFurther information is requested

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions