Skip to content

Sample code + merging + tombstone cleanup seems to misbehave #136

@tarasglek

Description

@tarasglek

I added:

  • some logging to your sample code to try to understand merging and tombstone cleanup logic.
  • added a merge call. Was a little confused by why alive files kept growing until I realized the merge is one partition at a time(unlike insert). You cover this in docs, but maybe good to mention this in pydoc too.
  • added a tombstone call to remove data, logs that shouldn't be needed after merge

Somehow that results in files that were deleted still being in alive files needed by duckdb on 3rd re-run(eg after s3 cleanup). This kind of failure currently seems impossible to debug with stock logs, unless maybe you can infer this from logs. Sorry if this is some newb mistake, still trying to figure out details(eg what _m in name does, log priority) to make a mental picture of how merging works.

(icedb) taras@b550:~/Downloads/icedb$ docker compose up -d
[+] Running 2/4
 ⠦ Network backing-infra_default            Created                                                                                                                                                                                                                        0.6s
 ⠴ Volume "backing-infra_minio_storage"     Created                                                                                                                                                                                                                        0.5s
 ✔ Container minio                          Started                                                                                                                                                                                                                        0.3s
 ✔ Container backing-infra-createbuckets-1  Started                                                                                                                                                                                                                        0.5s
(icedb) taras@b550:~/Downloads/icedb$ python taras/sample.py ;mc ls myminio/testbucket/example/_log/
2 inserted files: ['example/_data/u=user_a/d=2023-06-07/103228e7-d363-4ec9-b9b9-fd965a06bdab.parquet', 'example/_data/u=user_b/d=2023-02-11/626680b5-50a5-4934-b640-acd495a35377.parquet']
2 alive files: ['example/_data/u=user_a/d=2023-06-07/103228e7-d363-4ec9-b9b9-fd965a06bdab.parquet', 'example/_data/u=user_b/d=2023-02-11/626680b5-50a5-4934-b640-acd495a35377.parquet']
0 tombstoned files: []
┌─────────┬──────────────┬──────────┐
│ user_id │ count_star() │   page   │
│ varchar │    int64     │ varchar  │
├─────────┼──────────────┼──────────┤
│ user_a  │            2 │ Home     │
│ user_a  │            1 │ Settings │
│ user_b  │            1 │ Home     │
└─────────┴──────────────┴──────────┘

Keeping 0 files
0 cleaned log files: []
0 deleted log files: []
0 deleted data files: []
[2024-12-22 14:44:19 EET]   388B STANDARD 1734871459837_dan-mbp.jsonl
[2024-12-22 14:44:19 EET]    49B STANDARD 1734871459880_m_dan-mbp.jsonl
(icedb) taras@b550:~/Downloads/icedb$ python taras/sample.py ;mc ls myminio/testbucket/example/_log/
2 inserted files: ['example/_data/u=user_a/d=2023-06-07/b72d0af4-6f1f-40f4-b646-a263f199abe6.parquet', 'example/_data/u=user_b/d=2023-02-11/38fabb73-acc0-489a-9d03-9e10391ca8a7.parquet']
4 alive files: ['example/_data/u=user_a/d=2023-06-07/103228e7-d363-4ec9-b9b9-fd965a06bdab.parquet', 'example/_data/u=user_b/d=2023-02-11/626680b5-50a5-4934-b640-acd495a35377.parquet', 'example/_data/u=user_a/d=2023-06-07/b72d0af4-6f1f-40f4-b646-a263f199abe6.parquet', 'exa
mple/_data/u=user_b/d=2023-02-11/38fabb73-acc0-489a-9d03-9e10391ca8a7.parquet']
0 tombstoned files: []
┌─────────┬──────────────┬──────────┐
│ user_id │ count_star() │   page   │
│ varchar │    int64     │ varchar  │
├─────────┼──────────────┼──────────┤
│ user_a  │            4 │ Home     │
│ user_a  │            2 │ Settings │
│ user_b  │            2 │ Home     │
└─────────┴──────────────┴──────────┘

Merged partition: u=user_a/d=2023-06-07
2 source files merged files: ['example/_data/u=user_a/d=2023-06-07/103228e7-d363-4ec9-b9b9-fd965a06bdab.parquet', 'example/_data/u=user_a/d=2023-06-07/b72d0af4-6f1f-40f4-b646-a263f199abe6.parquet']
1 new merged file files: ['example/_data/u=user_a/d=2023-06-07/14b31494-569e-4ad5-a72f-5efeac838c0f.parquet']
Merged partition: u=user_b/d=2023-02-11
2 source files merged files: ['example/_data/u=user_b/d=2023-02-11/626680b5-50a5-4934-b640-acd495a35377.parquet', 'example/_data/u=user_b/d=2023-02-11/38fabb73-acc0-489a-9d03-9e10391ca8a7.parquet']
1 new merged file files: ['example/_data/u=user_b/d=2023-02-11/b06dfbf4-ea0e-4e7b-a12d-21e204de54a0.parquet']
Keeping 4 files
3 cleaned log files: ['example/_log/1734871459880_m_dan-mbp.jsonl', 'example/_log/1734871476748_m_dan-mbp.jsonl', 'example/_log/1734871476797_m_dan-mbp.jsonl']
2 deleted log files: ['example/_data/u=user_a/d=2023-06-07/103228e7-d363-4ec9-b9b9-fd965a06bdab.parquet', 'example/_data/u=user_b/d=2023-02-11/626680b5-50a5-4934-b640-acd495a35377.parquet']
2 deleted data files: ['example/_data/u=user_a/d=2023-06-07/103228e7-d363-4ec9-b9b9-fd965a06bdab.parquet', 'example/_data/u=user_b/d=2023-02-11/626680b5-50a5-4934-b640-acd495a35377.parquet']
[2024-12-22 14:44:19 EET]   388B STANDARD 1734871459837_dan-mbp.jsonl
[2024-12-22 14:44:36 EET]   388B STANDARD 1734871476662_dan-mbp.jsonl
[2024-12-22 14:44:36 EET]   672B STANDARD 1734871476815_m_dan-mbp.jsonl
(icedb) taras@b550:~/Downloads/icedb$ python taras/sample.py ;mc ls myminio/testbucket/example/_log/
2 inserted files: ['example/_data/u=user_a/d=2023-06-07/95d918b0-fd16-4a11-b0c0-2e58ba869b4e.parquet', 'example/_data/u=user_b/d=2023-02-11/27fc46b6-f217-49c8-94b7-7743c177e691.parquet']
6 alive files: ['example/_data/u=user_a/d=2023-06-07/103228e7-d363-4ec9-b9b9-fd965a06bdab.parquet', 'example/_data/u=user_b/d=2023-02-11/626680b5-50a5-4934-b640-acd495a35377.parquet', 'example/_data/u=user_a/d=2023-06-07/14b31494-569e-4ad5-a72f-5efeac838c0f.parquet', 'exa
mple/_data/u=user_b/d=2023-02-11/b06dfbf4-ea0e-4e7b-a12d-21e204de54a0.parquet', 'example/_data/u=user_a/d=2023-06-07/95d918b0-fd16-4a11-b0c0-2e58ba869b4e.parquet', 'example/_data/u=user_b/d=2023-02-11/27fc46b6-f217-49c8-94b7-7743c177e691.parquet']
2 tombstoned files: ['example/_data/u=user_a/d=2023-06-07/b72d0af4-6f1f-40f4-b646-a263f199abe6.parquet', 'example/_data/u=user_b/d=2023-02-11/38fabb73-acc0-489a-9d03-9e10391ca8a7.parquet']
Traceback (most recent call last):
  File "/home/taras/Downloads/icedb/taras/sample.py", line 133, in <module>
    print(ddb.sql(query))
          ^^^^^^^^^^^^^^
duckdb.duckdb.HTTPException: HTTP Error: Unable to connect to URL "http://localhost:9900/testbucket/example/_data/u%3Duser_a/d%3D2023-06-07/103228e7-d363-4ec9-b9b9-fd965a06bdab.parquet": 404 (Not Found)
[2024-12-22 14:44:19 EET]   388B STANDARD 1734871459837_dan-mbp.jsonl
[2024-12-22 14:44:36 EET]   388B STANDARD 1734871476662_dan-mbp.jsonl
[2024-12-22 14:44:36 EET]   672B STANDARD 1734871476815_m_dan-mbp.jsonl
[2024-12-22 14:44:46 EET]   388B STANDARD 1734871486500_dan-mbp.jsonl

My modified sample

import duckdb
from icedb.log import S3Client, IceLogIO, FileMarker
from icedb import IceDBv3, CompressionCodec
from datetime import datetime
from time import time

def print_file_stats(description: str, files: list[FileMarker]):
    """Print summary and details of file markers.
    
    Args:
        description: Description of the file group (e.g. "alive", "tombstoned")
        files: List of FileMarker objects to summarize
    """
    paths = list(map(lambda x: x.path, files))
    print(f"{len(paths)} {description} files:", paths)

# S3 configuration dictionary
S3_CONFIG = {
    "s3_region": "us-east-1",
    "s3_endpoint": "http://localhost:9900",
    "s3_access_key_id": "user", 
    "s3_secret_access_key": "password",
    "s3_use_ssl": False,
    "s3_url_style": "path"  # can be 'path' or 'vhost'
}

# Bucket-specific S3 config not used by DuckDB
S3_BUCKET_CONFIG = {
    "bucket": "testbucket",
    "prefix": "example",
}

# create an s3 client to talk to minio
s3c = S3Client(
    s3prefix=S3_BUCKET_CONFIG["prefix"],
    s3bucket=S3_BUCKET_CONFIG["bucket"],
    s3region=S3_CONFIG["s3_region"],
    s3endpoint=S3_CONFIG["s3_endpoint"],
    s3accesskey=S3_CONFIG["s3_access_key_id"],
    s3secretkey=S3_CONFIG["s3_secret_access_key"]
)

example_events = [
    {
        "ts": 1686176939445,
        "event": "page_load",
        "user_id": "user_a",
        "properties": {
            "page_name": "Home"
        }
    }, {
        "ts": 1676126229999,
        "event": "page_load",
        "user_id": "user_b",
        "properties": {
            "page_name": "Home"
        }
    }, {
        "ts": 1686176939666,
        "event": "page_load",
        "user_id": "user_a",
        "properties": {
            "page_name": "Settings"
        }
    }, {
        "ts": 1686176941445,
        "event": "page_load",
        "user_id": "user_a",
        "properties": {
            "page_name": "Home"
        }
    }
]


def part_func(row: dict) -> str:
    """
    Partition by user_id, date
    """
    row_time = datetime.utcfromtimestamp(row['ts'] / 1000)
    part = f"u={row['user_id']}/d={row_time.strftime('%Y-%m-%d')}"
    return part


# Initialize the client
ice = IceDBv3(
    partition_function=part_func,  # Partitions by user_id and date
    sort_order=['event', 'ts'],   # Sort by event, then timestamp of the event within the data part
    # S3 settings from config
    s3_region=S3_CONFIG["s3_region"],
    s3_access_key=S3_CONFIG["s3_access_key_id"],
    s3_secret_key=S3_CONFIG["s3_secret_access_key"],
    s3_endpoint=S3_CONFIG["s3_endpoint"],
    s3_use_path=S3_CONFIG["s3_url_style"] == "path",
    # S3 client instance
    s3_client=s3c,
    # Other settings
    path_safe_hostname="dan-mbp",
    compression_codec=CompressionCodec.ZSTD,  # Use ZSTD for higher compression ratio compared to default SNAPPY
)

# Insert records
inserted = ice.insert(example_events)
print_file_stats("inserted", inserted)

# Read the log state
log = IceLogIO("demo-host")
_, file_markers, _, _ = log.read_at_max_time(s3c, round(time() * 1000))
alive_files = list(filter(lambda x: x.tombstone is None, file_markers))
tombstoned_files = list(filter(lambda x: x.tombstone, file_markers))
print_file_stats("alive", alive_files)
print_file_stats("tombstoned", tombstoned_files)

# Setup duckdb for querying local minio
ddb = duckdb.connect(":memory:")
ddb.execute("install httpfs")
ddb.execute("load httpfs")

# Set DuckDB S3 configuration from the config dictionary
for key, value in S3_CONFIG.items():
    if key == "s3_endpoint":
        # Strip protocol prefix by splitting on :// once
        value = value.split("://", 1)[1]
    ddb.execute(f"SET {key}='{value}'")

# Query alive files
query = ("select user_id, count(*), (properties::JSON)->>'page_name' as page "
         "from read_parquet([{}]) "
         "group by user_id, page "
         "order by count(*) desc").format(
    ', '.join(list(map(lambda x: "'s3://" + ice.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
)
print(ddb.sql(query))

while True:
    new_log, new_file_marker, partition, merged_file_markers, meta = ice.merge()
    more_to_merge = partition is not None
    if partition:  # if any merge happened
        print(f"Merged partition: {partition}")
        if merged_file_markers:
            print_file_stats("source files merged", merged_file_markers)
        if new_file_marker:
            print_file_stats("new merged file", [new_file_marker])
    else:
        break;
tombstoned = ice.tombstone_cleanup(10_000)
cleaned_logs, deleted_logs, deleted_data = tombstoned
print(f"{len(cleaned_logs)} cleaned log files:", cleaned_logs)
print(f"{len(deleted_logs)} deleted log files:", deleted_logs)
print(f"{len(deleted_data)} deleted data files:", deleted_data)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions