-
Notifications
You must be signed in to change notification settings - Fork 6
Open
Description
I added:
- some logging to your sample code to try to understand merging and tombstone cleanup logic.
- added a merge call. Was a little confused by why alive files kept growing until I realized the merge is one partition at a time(unlike insert). You cover this in docs, but maybe good to mention this in pydoc too.
- added a tombstone call to remove data, logs that shouldn't be needed after merge
Somehow that results in files that were deleted still being in alive files needed by duckdb on 3rd re-run(eg after s3 cleanup). This kind of failure currently seems impossible to debug with stock logs, unless maybe you can infer this from logs. Sorry if this is some newb mistake, still trying to figure out details(eg what _m in name does, log priority) to make a mental picture of how merging works.
(icedb) taras@b550:~/Downloads/icedb$ docker compose up -d
[+] Running 2/4
⠦ Network backing-infra_default Created 0.6s
⠴ Volume "backing-infra_minio_storage" Created 0.5s
✔ Container minio Started 0.3s
✔ Container backing-infra-createbuckets-1 Started 0.5s
(icedb) taras@b550:~/Downloads/icedb$ python taras/sample.py ;mc ls myminio/testbucket/example/_log/
2 inserted files: ['example/_data/u=user_a/d=2023-06-07/103228e7-d363-4ec9-b9b9-fd965a06bdab.parquet', 'example/_data/u=user_b/d=2023-02-11/626680b5-50a5-4934-b640-acd495a35377.parquet']
2 alive files: ['example/_data/u=user_a/d=2023-06-07/103228e7-d363-4ec9-b9b9-fd965a06bdab.parquet', 'example/_data/u=user_b/d=2023-02-11/626680b5-50a5-4934-b640-acd495a35377.parquet']
0 tombstoned files: []
┌─────────┬──────────────┬──────────┐
│ user_id │ count_star() │ page │
│ varchar │ int64 │ varchar │
├─────────┼──────────────┼──────────┤
│ user_a │ 2 │ Home │
│ user_a │ 1 │ Settings │
│ user_b │ 1 │ Home │
└─────────┴──────────────┴──────────┘
Keeping 0 files
0 cleaned log files: []
0 deleted log files: []
0 deleted data files: []
[2024-12-22 14:44:19 EET] 388B STANDARD 1734871459837_dan-mbp.jsonl
[2024-12-22 14:44:19 EET] 49B STANDARD 1734871459880_m_dan-mbp.jsonl
(icedb) taras@b550:~/Downloads/icedb$ python taras/sample.py ;mc ls myminio/testbucket/example/_log/
2 inserted files: ['example/_data/u=user_a/d=2023-06-07/b72d0af4-6f1f-40f4-b646-a263f199abe6.parquet', 'example/_data/u=user_b/d=2023-02-11/38fabb73-acc0-489a-9d03-9e10391ca8a7.parquet']
4 alive files: ['example/_data/u=user_a/d=2023-06-07/103228e7-d363-4ec9-b9b9-fd965a06bdab.parquet', 'example/_data/u=user_b/d=2023-02-11/626680b5-50a5-4934-b640-acd495a35377.parquet', 'example/_data/u=user_a/d=2023-06-07/b72d0af4-6f1f-40f4-b646-a263f199abe6.parquet', 'exa
mple/_data/u=user_b/d=2023-02-11/38fabb73-acc0-489a-9d03-9e10391ca8a7.parquet']
0 tombstoned files: []
┌─────────┬──────────────┬──────────┐
│ user_id │ count_star() │ page │
│ varchar │ int64 │ varchar │
├─────────┼──────────────┼──────────┤
│ user_a │ 4 │ Home │
│ user_a │ 2 │ Settings │
│ user_b │ 2 │ Home │
└─────────┴──────────────┴──────────┘
Merged partition: u=user_a/d=2023-06-07
2 source files merged files: ['example/_data/u=user_a/d=2023-06-07/103228e7-d363-4ec9-b9b9-fd965a06bdab.parquet', 'example/_data/u=user_a/d=2023-06-07/b72d0af4-6f1f-40f4-b646-a263f199abe6.parquet']
1 new merged file files: ['example/_data/u=user_a/d=2023-06-07/14b31494-569e-4ad5-a72f-5efeac838c0f.parquet']
Merged partition: u=user_b/d=2023-02-11
2 source files merged files: ['example/_data/u=user_b/d=2023-02-11/626680b5-50a5-4934-b640-acd495a35377.parquet', 'example/_data/u=user_b/d=2023-02-11/38fabb73-acc0-489a-9d03-9e10391ca8a7.parquet']
1 new merged file files: ['example/_data/u=user_b/d=2023-02-11/b06dfbf4-ea0e-4e7b-a12d-21e204de54a0.parquet']
Keeping 4 files
3 cleaned log files: ['example/_log/1734871459880_m_dan-mbp.jsonl', 'example/_log/1734871476748_m_dan-mbp.jsonl', 'example/_log/1734871476797_m_dan-mbp.jsonl']
2 deleted log files: ['example/_data/u=user_a/d=2023-06-07/103228e7-d363-4ec9-b9b9-fd965a06bdab.parquet', 'example/_data/u=user_b/d=2023-02-11/626680b5-50a5-4934-b640-acd495a35377.parquet']
2 deleted data files: ['example/_data/u=user_a/d=2023-06-07/103228e7-d363-4ec9-b9b9-fd965a06bdab.parquet', 'example/_data/u=user_b/d=2023-02-11/626680b5-50a5-4934-b640-acd495a35377.parquet']
[2024-12-22 14:44:19 EET] 388B STANDARD 1734871459837_dan-mbp.jsonl
[2024-12-22 14:44:36 EET] 388B STANDARD 1734871476662_dan-mbp.jsonl
[2024-12-22 14:44:36 EET] 672B STANDARD 1734871476815_m_dan-mbp.jsonl
(icedb) taras@b550:~/Downloads/icedb$ python taras/sample.py ;mc ls myminio/testbucket/example/_log/
2 inserted files: ['example/_data/u=user_a/d=2023-06-07/95d918b0-fd16-4a11-b0c0-2e58ba869b4e.parquet', 'example/_data/u=user_b/d=2023-02-11/27fc46b6-f217-49c8-94b7-7743c177e691.parquet']
6 alive files: ['example/_data/u=user_a/d=2023-06-07/103228e7-d363-4ec9-b9b9-fd965a06bdab.parquet', 'example/_data/u=user_b/d=2023-02-11/626680b5-50a5-4934-b640-acd495a35377.parquet', 'example/_data/u=user_a/d=2023-06-07/14b31494-569e-4ad5-a72f-5efeac838c0f.parquet', 'exa
mple/_data/u=user_b/d=2023-02-11/b06dfbf4-ea0e-4e7b-a12d-21e204de54a0.parquet', 'example/_data/u=user_a/d=2023-06-07/95d918b0-fd16-4a11-b0c0-2e58ba869b4e.parquet', 'example/_data/u=user_b/d=2023-02-11/27fc46b6-f217-49c8-94b7-7743c177e691.parquet']
2 tombstoned files: ['example/_data/u=user_a/d=2023-06-07/b72d0af4-6f1f-40f4-b646-a263f199abe6.parquet', 'example/_data/u=user_b/d=2023-02-11/38fabb73-acc0-489a-9d03-9e10391ca8a7.parquet']
Traceback (most recent call last):
File "/home/taras/Downloads/icedb/taras/sample.py", line 133, in <module>
print(ddb.sql(query))
^^^^^^^^^^^^^^
duckdb.duckdb.HTTPException: HTTP Error: Unable to connect to URL "http://localhost:9900/testbucket/example/_data/u%3Duser_a/d%3D2023-06-07/103228e7-d363-4ec9-b9b9-fd965a06bdab.parquet": 404 (Not Found)
[2024-12-22 14:44:19 EET] 388B STANDARD 1734871459837_dan-mbp.jsonl
[2024-12-22 14:44:36 EET] 388B STANDARD 1734871476662_dan-mbp.jsonl
[2024-12-22 14:44:36 EET] 672B STANDARD 1734871476815_m_dan-mbp.jsonl
[2024-12-22 14:44:46 EET] 388B STANDARD 1734871486500_dan-mbp.jsonl
My modified sample
import duckdb
from icedb.log import S3Client, IceLogIO, FileMarker
from icedb import IceDBv3, CompressionCodec
from datetime import datetime
from time import time
def print_file_stats(description: str, files: list[FileMarker]):
"""Print summary and details of file markers.
Args:
description: Description of the file group (e.g. "alive", "tombstoned")
files: List of FileMarker objects to summarize
"""
paths = list(map(lambda x: x.path, files))
print(f"{len(paths)} {description} files:", paths)
# S3 configuration dictionary
S3_CONFIG = {
"s3_region": "us-east-1",
"s3_endpoint": "http://localhost:9900",
"s3_access_key_id": "user",
"s3_secret_access_key": "password",
"s3_use_ssl": False,
"s3_url_style": "path" # can be 'path' or 'vhost'
}
# Bucket-specific S3 config not used by DuckDB
S3_BUCKET_CONFIG = {
"bucket": "testbucket",
"prefix": "example",
}
# create an s3 client to talk to minio
s3c = S3Client(
s3prefix=S3_BUCKET_CONFIG["prefix"],
s3bucket=S3_BUCKET_CONFIG["bucket"],
s3region=S3_CONFIG["s3_region"],
s3endpoint=S3_CONFIG["s3_endpoint"],
s3accesskey=S3_CONFIG["s3_access_key_id"],
s3secretkey=S3_CONFIG["s3_secret_access_key"]
)
example_events = [
{
"ts": 1686176939445,
"event": "page_load",
"user_id": "user_a",
"properties": {
"page_name": "Home"
}
}, {
"ts": 1676126229999,
"event": "page_load",
"user_id": "user_b",
"properties": {
"page_name": "Home"
}
}, {
"ts": 1686176939666,
"event": "page_load",
"user_id": "user_a",
"properties": {
"page_name": "Settings"
}
}, {
"ts": 1686176941445,
"event": "page_load",
"user_id": "user_a",
"properties": {
"page_name": "Home"
}
}
]
def part_func(row: dict) -> str:
"""
Partition by user_id, date
"""
row_time = datetime.utcfromtimestamp(row['ts'] / 1000)
part = f"u={row['user_id']}/d={row_time.strftime('%Y-%m-%d')}"
return part
# Initialize the client
ice = IceDBv3(
partition_function=part_func, # Partitions by user_id and date
sort_order=['event', 'ts'], # Sort by event, then timestamp of the event within the data part
# S3 settings from config
s3_region=S3_CONFIG["s3_region"],
s3_access_key=S3_CONFIG["s3_access_key_id"],
s3_secret_key=S3_CONFIG["s3_secret_access_key"],
s3_endpoint=S3_CONFIG["s3_endpoint"],
s3_use_path=S3_CONFIG["s3_url_style"] == "path",
# S3 client instance
s3_client=s3c,
# Other settings
path_safe_hostname="dan-mbp",
compression_codec=CompressionCodec.ZSTD, # Use ZSTD for higher compression ratio compared to default SNAPPY
)
# Insert records
inserted = ice.insert(example_events)
print_file_stats("inserted", inserted)
# Read the log state
log = IceLogIO("demo-host")
_, file_markers, _, _ = log.read_at_max_time(s3c, round(time() * 1000))
alive_files = list(filter(lambda x: x.tombstone is None, file_markers))
tombstoned_files = list(filter(lambda x: x.tombstone, file_markers))
print_file_stats("alive", alive_files)
print_file_stats("tombstoned", tombstoned_files)
# Setup duckdb for querying local minio
ddb = duckdb.connect(":memory:")
ddb.execute("install httpfs")
ddb.execute("load httpfs")
# Set DuckDB S3 configuration from the config dictionary
for key, value in S3_CONFIG.items():
if key == "s3_endpoint":
# Strip protocol prefix by splitting on :// once
value = value.split("://", 1)[1]
ddb.execute(f"SET {key}='{value}'")
# Query alive files
query = ("select user_id, count(*), (properties::JSON)->>'page_name' as page "
"from read_parquet([{}]) "
"group by user_id, page "
"order by count(*) desc").format(
', '.join(list(map(lambda x: "'s3://" + ice.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
)
print(ddb.sql(query))
while True:
new_log, new_file_marker, partition, merged_file_markers, meta = ice.merge()
more_to_merge = partition is not None
if partition: # if any merge happened
print(f"Merged partition: {partition}")
if merged_file_markers:
print_file_stats("source files merged", merged_file_markers)
if new_file_marker:
print_file_stats("new merged file", [new_file_marker])
else:
break;
tombstoned = ice.tombstone_cleanup(10_000)
cleaned_logs, deleted_logs, deleted_data = tombstoned
print(f"{len(cleaned_logs)} cleaned log files:", cleaned_logs)
print(f"{len(deleted_logs)} deleted log files:", deleted_logs)
print(f"{len(deleted_data)} deleted data files:", deleted_data)
Metadata
Metadata
Assignees
Labels
No labels