From cd6f464f28ed53369778bfb0d39083bae12ef684 Mon Sep 17 00:00:00 2001 From: John Hao Date: Thu, 24 Jul 2025 17:46:34 -0400 Subject: [PATCH 01/10] fix gzip by executing redirect in shell --- assets/gzip/main.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/assets/gzip/main.py b/assets/gzip/main.py index bbb9e4f..3fc453f 100755 --- a/assets/gzip/main.py +++ b/assets/gzip/main.py @@ -37,7 +37,7 @@ def ingest(self): """ Ingests the dataset at self.datasets_path """ - self.docker_execute(f"gzip -c {self.datasets_path} > {GZIP_FILE_PATH} ") + self.docker_execute(f"gzip -c {self.datasets_path} > {GZIP_FILE_PATH} ", shell=True) def search(self, query): """ @@ -45,7 +45,7 @@ def search(self, query): """ assert query == "notasearch" - self.docker_execute(f"gunzip -c {GZIP_FILE_PATH} > {DECOMPRESSED_FILE_PATH}") + self.docker_execute(f"gunzip -c {GZIP_FILE_PATH} > {DECOMPRESSED_FILE_PATH}", shell=True) return 0 def bench_search(self, cold=True): if not cold: @@ -53,7 +53,7 @@ def bench_search(self, cold=True): return self.bench_start(ingest=False) - self.docker_execute(f"gunzip -c {GZIP_FILE_PATH} > {DECOMPRESSED_FILE_PATH}") + self.docker_execute(f"gunzip -c {GZIP_FILE_PATH} > {DECOMPRESSED_FILE_PATH}", shell=True) self.bench_stop() logger.info("Decompression done") From 0a4e6d5e374cecbdaf4ce3c97f08834d5dd53b30 Mon Sep 17 00:00:00 2001 From: John Hao Date: Thu, 24 Jul 2025 17:46:53 -0400 Subject: [PATCH 02/10] change ubuntu references to ubuntu:jammy --- assets/elasticsearch/Dockerfile | 2 +- assets/gzip/Dockerfile | 4 ++-- assets/overhead_test/Dockerfile | 2 +- assets/template/Dockerfile | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/assets/elasticsearch/Dockerfile b/assets/elasticsearch/Dockerfile index 6ca38cd..cd0a99f 100644 --- a/assets/elasticsearch/Dockerfile +++ b/assets/elasticsearch/Dockerfile @@ -1,4 +1,4 @@ -FROM ghcr.io/y-scope/clp/clp-core-dependencies-x86-ubuntu-jammy:main +from ubuntu:jammy RUN apt-get update \ && DEBIAN_FRONTEND=noninteractive apt-get install -y \ diff --git a/assets/gzip/Dockerfile b/assets/gzip/Dockerfile index 18ea6a5..8b7c41c 100644 --- a/assets/gzip/Dockerfile +++ b/assets/gzip/Dockerfile @@ -2,11 +2,11 @@ # dependencies # If there is any dedicated image available, you should build the benchmarking image on top of that -FROM ghcr.io/y-scope/clp/clp-core-dependencies-x86-ubuntu-jammy:main +FROM ubuntu:jammy # Install necessary packages RUN apt-get update \ - && DEBIAN_FRONTEND=noninteractive apt-get install -y \ + && DEBIAN_FRONTEND=noninteractive apt-get install -y --allow-downgrades \ tmux \ vim \ gzip diff --git a/assets/overhead_test/Dockerfile b/assets/overhead_test/Dockerfile index 2428ce5..3237030 100644 --- a/assets/overhead_test/Dockerfile +++ b/assets/overhead_test/Dockerfile @@ -2,7 +2,7 @@ # dependencies # If there is any dedicated image available, you should build the benchmarking image on top of that -FROM ghcr.io/y-scope/clp/clp-core-dependencies-x86-ubuntu-jammy:main +FROM ubuntu:jammy # Install necessary packages RUN apt-get update \ diff --git a/assets/template/Dockerfile b/assets/template/Dockerfile index 2428ce5..3237030 100644 --- a/assets/template/Dockerfile +++ b/assets/template/Dockerfile @@ -2,7 +2,7 @@ # dependencies # If there is any dedicated image available, you should build the benchmarking image on top of that -FROM ghcr.io/y-scope/clp/clp-core-dependencies-x86-ubuntu-jammy:main +FROM ubuntu:jammy # Install necessary packages RUN apt-get update \ From 61d181a1d946402610fab8c93fc9547e3f5cbfda Mon Sep 17 00:00:00 2001 From: John Hao Date: Thu, 24 Jul 2025 17:47:14 -0400 Subject: [PATCH 03/10] fix assets renaming --- assets/clickhouse/main.py | 4 ++-- assets/clp/main.py | 4 ++-- assets/presto_clp/main.py | 4 ++-- scripts/benchall.py | 25 +++++++++++++------------ 4 files changed, 19 insertions(+), 18 deletions(-) diff --git a/assets/clickhouse/main.py b/assets/clickhouse/main.py index 75bce8c..fdf55b6 100755 --- a/assets/clickhouse/main.py +++ b/assets/clickhouse/main.py @@ -13,7 +13,7 @@ """ CLICKHOUSE_COLLECTION_NAME = "clickhouse_bench" -class clickhouse_native_json_bench(Benchmark): +class clickhouse_bench(Benchmark): # add any parameters to the tool here def __init__(self, dataset, manual_column_names=True, keys=[], additional_order_by=[], timestamp_key=False): super().__init__(dataset) @@ -154,7 +154,7 @@ def run_applicable(self, dataset_name): def main(): - bench = clickhouse_native_json_bench(sys.argv[1]) + bench = clickhouse_bench(sys.argv[1]) bench.run_everything() if __name__ == "__main__": diff --git a/assets/clp/main.py b/assets/clp/main.py index 7ad36e6..0e148a0 100755 --- a/assets/clp/main.py +++ b/assets/clp/main.py @@ -5,7 +5,7 @@ CLP_OUT_PATH = f"{WORK_DIR}/archives" CLP_S_BINARY = "/clp/clp-s" -class clp_s_bench(Benchmark): +class clp_bench(Benchmark): def __init__(self, dataset, target_encoded_size=268435456): super().__init__(dataset) @@ -61,7 +61,7 @@ def terminate_procs(self): def main(): - bench = clp_s_bench(sys.argv[1]) + bench = clp_bench(sys.argv[1]) bench.run_everything() if __name__ == "__main__": diff --git a/assets/presto_clp/main.py b/assets/presto_clp/main.py index 487ae6b..4a5255c 100755 --- a/assets/presto_clp/main.py +++ b/assets/presto_clp/main.py @@ -16,7 +16,7 @@ CLP_PRESTO_HOST_STORAGE = os.path.abspath(os.path.expanduser("~/clp-json-x86_64-v0.4.0-dev")) SQL_PASSWORD = "wqEGPyBdx_w" HOST_IP = "127.0.0.1" -class clp_presto_bench(Benchmark): +class presto_clp_bench(Benchmark): # add any parameters to the tool here def __init__(self, dataset, dataset_variation='cleaned_log'): super().__init__(dataset, dataset_variation=dataset_variation) @@ -102,7 +102,7 @@ def terminate(self): def main(): - bench = clp_presto_bench(sys.argv[1]) + bench = presto_clp_bench(sys.argv[1]) bench.run_everything() if __name__ == "__main__": diff --git a/scripts/benchall.py b/scripts/benchall.py index 6854f17..8b46e83 100755 --- a/scripts/benchall.py +++ b/scripts/benchall.py @@ -1,12 +1,12 @@ #!/usr/bin/env python3 -from assets.clp_s.main import clp_s_bench -from assets.clickhouse_native_json.main import clickhouse_native_json_bench +from assets.clp.main import clp_bench +from assets.clickhouse.main import clickhouse_bench from assets.sparksql.main import sparksql_bench -from assets.parquet.main import parquet_bench +from assets.presto_parquet.main import parquet_bench from assets.zstandard.main import zstandard_bench from assets.elasticsearch.main import elasticsearch_bench -from assets.clp_presto.main import clp_presto_bench +from assets.presto_clp.main import presto_clp_bench from assets.overhead_test.main import overhead_test_bench from assets.gzip.main import gzip_bench from src.jsonsync import JsonItem @@ -33,14 +33,14 @@ def get_target_from_name(name): benchmarks = [ # benchmark object, arguments - (clp_s_bench, {}), - (clickhouse_native_json_bench, { + (clp_bench, {}), + (clickhouse_bench, { 'manual_column_names': False, 'keys': [], 'additional_order_by': [], 'timestamp_key': True }), - (clp_presto_bench, { + (presto_clp_bench, { 'dataset_variation': "cleaned_log" }), (parquet_bench, {'mode': 'json string'}), @@ -52,7 +52,7 @@ def get_target_from_name(name): (gzip_bench, {}), ] -def run(bencher, kwargs, bench_target, attach=False): +def run(bencher, kwargs, bench_target, attach=False, attach_on_error=False): dataset_name = 'error when finding dataset name' bench = None try: @@ -70,7 +70,7 @@ def run(bencher, kwargs, bench_target, attach=False): with open((current_dir / 'exceptions.log').resolve(), 'a') as file: file.write(f"{statement}\n") print(statement) - if attach: + if attach or attach_on_error: if bench is not None: bench.docker_attach() else: @@ -80,9 +80,10 @@ def run(bencher, kwargs, bench_target, attach=False): for bench_target in bench_target_dirs: dataset_name = os.path.basename(bench_target.resolve()).strip() - #if dataset_name != 'mongod': # only use mongod for now - # continue - run(bencher, kwargs, bench_target) + if dataset_name != 'mongod': # only use mongod for now + continue + #run(bencher, kwargs, bench_target) + run(bencher, kwargs, bench_target, attach_on_error=True) #run(bencher, kwargs, bench_target, attach=True) #run(sparksql_bench, {}, get_target_from_name('mongod')) From ce51b9995a90ec49e78c07bb6e98928e306d6354 Mon Sep 17 00:00:00 2001 From: John Hao Date: Fri, 25 Jul 2025 10:20:47 -0400 Subject: [PATCH 04/10] presto_parquet using yscope presto --- assets/presto_parquet/Dockerfile | 20 ++++++++++++------- .../include/etc_worker/config.properties | 2 +- assets/presto_parquet/main.py | 8 ++++---- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/assets/presto_parquet/Dockerfile b/assets/presto_parquet/Dockerfile index c792d39..03c87e5 100644 --- a/assets/presto_parquet/Dockerfile +++ b/assets/presto_parquet/Dockerfile @@ -1,4 +1,4 @@ -FROM presto/prestissimo-dependency:ubuntu-22.04 +FROM presto/prestissimo-dependency:ubuntu-22.04-presto-0.293 # Install necessary packages RUN apt-get update; @@ -35,22 +35,28 @@ RUN cd /usr/local && sh -c "$(curl --location https://taskfile.dev/install.sh)" RUN pip3 install pyarrow; WORKDIR /home -RUN git clone https://github.com/anlowee/presto.git; +RUN git clone https://github.com/y-scope/presto.git; WORKDIR /home/presto -RUN git checkout xwei/benchmark-test; +RUN git checkout 89ce0f3b4ec713d658f3544e75aeb92fbd3a397d; WORKDIR /home/presto/presto-native-execution RUN mkdir build; RUN rm -rf velox; -RUN git clone https://github.com/anlowee/velox.git; +RUN git clone https://github.com/y-scope/velox.git; WORKDIR /home/presto/presto-native-execution/velox -RUN git checkout xwei/benchmark-test; +RUN git checkout 27ee4bcaad449fd1c8b90c48787f4eaf8e92395f; WORKDIR /home/presto RUN ./mvnw clean install -DskipTests -pl -presto-docs -T1C; -WORKDIR /home/presto/presto-native-execution/build -RUN cmake .. && make -j$(nproc) presto_server; +WORKDIR /home/presto/presto-native-execution +# RUN cmake .. && make -j$(nproc) presto_server; +RUN NUM_THREADS=8 make release; ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64 ENV PATH=$JAVA_HOME/bin:$PATH +RUN apt-get update; + +RUN apt-get install -y \ + mariadb-client \ + net-tools; diff --git a/assets/presto_parquet/include/etc_worker/config.properties b/assets/presto_parquet/include/etc_worker/config.properties index 24a455b..76c8a4e 100644 --- a/assets/presto_parquet/include/etc_worker/config.properties +++ b/assets/presto_parquet/include/etc_worker/config.properties @@ -1,5 +1,5 @@ discovery.uri=http://127.0.0.1:8080 -presto.version=0.293-SNAPSHOT-faae543 +presto.version=0.293-89ce0f3 http-server.http.port=7777 shutdown-onset-sec=1 register-test-functions=false diff --git a/assets/presto_parquet/main.py b/assets/presto_parquet/main.py index 671953f..192dd68 100755 --- a/assets/presto_parquet/main.py +++ b/assets/presto_parquet/main.py @@ -63,20 +63,20 @@ def launch(self): """ Runs the benchmarked tool """ - self.docker_execute("bash -c \"python3 /home/presto/presto-server/target/presto-server-0.293-SNAPSHOT/bin/launcher.py run --etc-dir=/home/include/etc_coordinator\"", background=True) + self.docker_execute("bash -c \"python3 /home/presto/presto-server/target/presto-server-0.293/bin/launcher.py run --etc-dir=/home/include/etc_coordinator\"", background=True) self.wait_for_port(8080) - self.docker_execute("/home/presto/presto-native-execution/build/presto_cpp/main/presto_server --logtostderr=1 --etc_dir=/home/include/etc_worker", background=True) + self.docker_execute("/home/presto/presto-native-execution/_build/release/presto_cpp/main/presto_server --logtostderr=1 --etc_dir=/home/include/etc_worker", background=True) self.wait_for_port(7777) time.sleep(60) # this needs to be more than 10 def hive_execute(self, query, check=True): - return self.docker_execute(f'/home/presto/presto-cli/target/presto-cli-0.293-SNAPSHOT-executable.jar --catalog hive --schema {PARQUET_SCHEMA_NAME} --execute "{query}"', check) + return self.docker_execute(f'/home/presto/presto-cli/target/presto-cli-0.293-executable.jar --catalog hive --schema {PARQUET_SCHEMA_NAME} --execute "{query}"', check) def ingest(self): """ Ingests the dataset at self.datasets_path """ - self.docker_execute(f'/home/presto/presto-cli/target/presto-cli-0.293-SNAPSHOT-executable.jar --catalog hive --schema {PARQUET_SCHEMA_NAME} --execute "CREATE SCHEMA IF NOT EXISTS hive.{PARQUET_SCHEMA_NAME};"') + self.docker_execute(f'/home/presto/presto-cli/target/presto-cli-0.293-executable.jar --catalog hive --schema {PARQUET_SCHEMA_NAME} --execute "CREATE SCHEMA IF NOT EXISTS hive.{PARQUET_SCHEMA_NAME};"') if self.pairwise_arrays: self.hive_execute(f""" \ From 63eb6bb7cd1d517e3bf5cf6a4028b5a72385ccbf Mon Sep 17 00:00:00 2001 From: John Hao Date: Mon, 28 Jul 2025 14:52:15 -0400 Subject: [PATCH 05/10] lock clickhouse version --- assets/clickhouse/Dockerfile | 3 +-- assets/clickhouse/methodology.md | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/assets/clickhouse/Dockerfile b/assets/clickhouse/Dockerfile index 0fafbbd..4719f0f 100644 --- a/assets/clickhouse/Dockerfile +++ b/assets/clickhouse/Dockerfile @@ -1,2 +1 @@ -FROM clickhouse/clickhouse-server:latest - +FROM clickhouse/clickhouse-server:25.6@sha256:7566eea413755c8fc43a7b39078b05f16ed350bb33c98f2f087c5f66b0e8163f diff --git a/assets/clickhouse/methodology.md b/assets/clickhouse/methodology.md index ce16885..2977936 100644 --- a/assets/clickhouse/methodology.md +++ b/assets/clickhouse/methodology.md @@ -1,6 +1,6 @@ # ClickHouse methodology -**Version: [23.3.1.2823](https://hub.docker.com/layers/clickhouse/clickhouse-server/23.3.1.2823/images/sha256-b88fd8c71b64d3158751337557ff089ff7b0d1ebf81d9c4c7aa1f0b37a31ee64?context=explore)** +**Version: [25.6](https://hub.docker.com/layers/clickhouse/clickhouse-server/25.6/images/sha256-77ff1f2054e27bec1b4c5eccbf701b63f8409831fea71f162ae2f25872dee1f4)** **File with Formatted Queries:** [Config File](/assets/clickhouse/config.yaml) @@ -67,4 +67,4 @@ On launch we execute ClickHouse’s entrypoint.sh to start ClickHouse in the bac On shutdown, we simply send ClickHouse a stop command. **Clearing Caches:** -Before starting any queries we ask ClickHouse to drop its UNCOMPRESSED CACHE and MARK CACHE. We also flush the file system buffers and clear the filesystem caches. \ No newline at end of file +Before starting any queries we ask ClickHouse to drop its UNCOMPRESSED CACHE and MARK CACHE. We also flush the file system buffers and clear the filesystem caches. From c163497657f35b01ea186de3f1b4a5714eeaa433 Mon Sep 17 00:00:00 2001 From: John Hao Date: Mon, 28 Jul 2025 14:52:48 -0400 Subject: [PATCH 06/10] improve benchall.py only running on one dataset --- scripts/benchall.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/scripts/benchall.py b/scripts/benchall.py index 8b46e83..0825499 100755 --- a/scripts/benchall.py +++ b/scripts/benchall.py @@ -12,6 +12,7 @@ from src.jsonsync import JsonItem import os +import sys import datetime from pathlib import Path @@ -79,9 +80,10 @@ def run(bencher, kwargs, bench_target, attach=False, attach_on_error=False): for bencher, kwargs in benchmarks: for bench_target in bench_target_dirs: dataset_name = os.path.basename(bench_target.resolve()).strip() - - if dataset_name != 'mongod': # only use mongod for now - continue + + if len(sys.argv) > 1: + if dataset_name != sys.argv[1].strip(): + continue #run(bencher, kwargs, bench_target) run(bencher, kwargs, bench_target, attach_on_error=True) #run(bencher, kwargs, bench_target, attach=True) From b5f7edb96a55def1c9ccb04275f4b1e5c79a9b66 Mon Sep 17 00:00:00 2001 From: John Hao Date: Thu, 31 Jul 2025 14:56:40 -0400 Subject: [PATCH 07/10] elasticsearch changes --- assets/elasticsearch/Dockerfile | 8 ++++---- assets/elasticsearch/ingest.py | 19 +++++++++++++++++++ scripts/benchall.py | 32 ++++++++++++++++---------------- 3 files changed, 39 insertions(+), 20 deletions(-) diff --git a/assets/elasticsearch/Dockerfile b/assets/elasticsearch/Dockerfile index cd0a99f..d760f96 100644 --- a/assets/elasticsearch/Dockerfile +++ b/assets/elasticsearch/Dockerfile @@ -8,12 +8,12 @@ RUN apt-get update \ RUN curl -fsSL https://artifacts.elastic.co/GPG-KEY-elasticsearch \ | gpg --dearmor -o /usr/share/keyrings/elasticsearch-keyring.gpg \ && echo "deb [signed-by=/usr/share/keyrings/elasticsearch-keyring.gpg]" \ - "https://artifacts.elastic.co/packages/8.x/apt stable main" \ - | tee /etc/apt/sources.list.d/elastic-8.x.list; + "https://artifacts.elastic.co/packages/9.x/apt stable main" \ + | tee /etc/apt/sources.list.d/elastic-9.x.list; RUN apt-get update \ && DEBIAN_FRONTEND=noninteractive apt-get install -y \ - elasticsearch=8.6.2 \ + elasticsearch=9.0.3 \ libcurl4 \ libcurl4-openssl-dev \ python3-pip \ @@ -21,4 +21,4 @@ RUN apt-get update \ tmux \ vim; -RUN pip3 install elasticsearch==8.6.2 requests; +RUN pip3 install elasticsearch==9.0.3 requests; diff --git a/assets/elasticsearch/ingest.py b/assets/elasticsearch/ingest.py index 2598266..b1c75c5 100644 --- a/assets/elasticsearch/ingest.py +++ b/assets/elasticsearch/ingest.py @@ -64,6 +64,25 @@ def traverse_data(collection_name): def ingest_dataset(): es = Elasticsearch("http://localhost:9202", request_timeout=1200, retry_on_timeout=True) + template_body = { + #"index_patterns": ["logs-*"], + "template": { + "settings": { + "index": { + "mode": "logsdb" + } + }, + #"mappings": { + # "properties": { + # "message": {"type": "text"}, + # "timestamp": {"type": "date"} + # } + #} + } + } + + es.indices.put_index_template(name=collection_name, body=template_body) + count = 0 for success, info in streaming_bulk( es, diff --git a/scripts/benchall.py b/scripts/benchall.py index 0825499..9137aa0 100755 --- a/scripts/benchall.py +++ b/scripts/benchall.py @@ -34,23 +34,23 @@ def get_target_from_name(name): benchmarks = [ # benchmark object, arguments - (clp_bench, {}), - (clickhouse_bench, { - 'manual_column_names': False, - 'keys': [], - 'additional_order_by': [], - 'timestamp_key': True - }), - (presto_clp_bench, { - 'dataset_variation': "cleaned_log" - }), - (parquet_bench, {'mode': 'json string'}), - (parquet_bench, {'mode': 'pairwise arrays'}), + #(clp_bench, {}), + #(clickhouse_bench, { + # 'manual_column_names': False, + # 'keys': [], + # 'additional_order_by': [], + # 'timestamp_key': True + # }), + #(presto_clp_bench, { + # 'dataset_variation': "cleaned_log" + # }), + #(parquet_bench, {'mode': 'json string'}), + #(parquet_bench, {'mode': 'pairwise arrays'}), (elasticsearch_bench, {}), - (overhead_test_bench, {}), - (zstandard_bench, {}), - (sparksql_bench, {}), - (gzip_bench, {}), + #(overhead_test_bench, {}), + #(zstandard_bench, {}), + #(sparksql_bench, {}), + #(gzip_bench, {}), ] def run(bencher, kwargs, bench_target, attach=False, attach_on_error=False): From cad3f9d933da1f44daa4355f1b9f164dc07e1fad Mon Sep 17 00:00:00 2001 From: John Hao Date: Thu, 31 Jul 2025 17:47:39 -0400 Subject: [PATCH 08/10] working logsdb --- assets/elasticsearch/ingest.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/assets/elasticsearch/ingest.py b/assets/elasticsearch/ingest.py index b1c75c5..c375f4f 100644 --- a/assets/elasticsearch/ingest.py +++ b/assets/elasticsearch/ingest.py @@ -65,19 +65,13 @@ def ingest_dataset(): es = Elasticsearch("http://localhost:9202", request_timeout=1200, retry_on_timeout=True) template_body = { - #"index_patterns": ["logs-*"], + "index_patterns": [collection_name], "template": { "settings": { "index": { "mode": "logsdb" } - }, - #"mappings": { - # "properties": { - # "message": {"type": "text"}, - # "timestamp": {"type": "date"} - # } - #} + } } } From 03772f115d6fb9bb3dba583ca0658da330ca16ba Mon Sep 17 00:00:00 2001 From: John Hao Date: Mon, 4 Aug 2025 02:48:12 -0400 Subject: [PATCH 09/10] working all changes for elasticsearch (9.0 and logsdb) --- assets/elasticsearch/ingest.py | 29 +++++++++++++++++++++++++++-- assets/elasticsearch/main.py | 2 +- assets/elasticsearch/methodology.md | 4 +++- scripts/benchall.py | 1 + 4 files changed, 32 insertions(+), 4 deletions(-) diff --git a/assets/elasticsearch/ingest.py b/assets/elasticsearch/ingest.py index c375f4f..1a576bc 100644 --- a/assets/elasticsearch/ingest.py +++ b/assets/elasticsearch/ingest.py @@ -10,6 +10,14 @@ log_path = sys.argv[1] +def pop_by_path(obj, path): + keys = path.split('.') + for key in keys[:-1]: + obj = obj[key] + out = obj[keys[-1]] + del obj[keys[-1]] + return out + def traverse_data(collection_name): with open(log_path, encoding="utf-8") as f: for line in f: @@ -55,8 +63,17 @@ def traverse_data(collection_name): id_value = str(attr["query"]["_id"]) json_line["attr"]["query"]["_id"] = {} json_line["attr"]["query"]["_id"]["_ooid"] = id_value + + try: + timestamp_val = pop_by_path(json_line, sys.argv[2]) + json_line["@timestamp"] = timestamp_val + except KeyError: + # no such timestamp, ignore + json_line["@timestamp"] = 0 + yield { "_index": collection_name, + "_op_type": "create", "_source": json_line, } @@ -70,11 +87,19 @@ def ingest_dataset(): "settings": { "index": { "mode": "logsdb" + }, + }, + "mappings": { + "properties": { + "@timestamp": { + "type": "date", + "format": "date_optional_time||epoch_second||epoch_millis||yyyy-MM-dd HH:mm:ss.SSS zzz" + } } } - } + }, + "priority": 101 } - es.indices.put_index_template(name=collection_name, body=template_body) count = 0 diff --git a/assets/elasticsearch/main.py b/assets/elasticsearch/main.py index 3fb23c7..297236f 100755 --- a/assets/elasticsearch/main.py +++ b/assets/elasticsearch/main.py @@ -36,7 +36,7 @@ def ingest(self): Ingests the dataset at self.datasets_path """ self.docker_execute([ - f"python3 {ASSETS_DIR}/ingest.py {self.datasets_path}" + f"python3 {ASSETS_DIR}/ingest.py {self.datasets_path} {self.dataset_meta['timestamp']}" ]) def search(self, query): diff --git a/assets/elasticsearch/methodology.md b/assets/elasticsearch/methodology.md index 2e2c6a1..968a8b1 100644 --- a/assets/elasticsearch/methodology.md +++ b/assets/elasticsearch/methodology.md @@ -1,6 +1,6 @@ # Elasticsearch methodology -**Version:** [8.6.2](https://www.elastic.co/downloads/past-releases/elasticsearch-8-6-2) +**Version:** [9.0.3](https://www.elastic.co/downloads/past-releases/elasticsearch-9-0-3) **File with Formatted Queries:** [Config File](/assets/elasticsearch/config.yaml) @@ -16,6 +16,8 @@ We disable the [xpack](https://www.elastic.co/guide/en/elasticsearch/reference/c Some preprocessing is necessary to make the dataset searchable in Elasticsearch. For more details, refer to the `traverse_data` function in [ingest.py](/assets/elasticsearch/ingest.py). This process generally involves reorganizing specific fields, moving them into outer or inner objects to ensure proper query functionality. +We use the [logs data stream](https://www.elastic.co/docs/manage-data/data-store/data-streams/logs-data-stream) which is optimized for timestamped logs + ### Launch & Shutdown On launch the benchmark framework calls the [launch.sh](/assets/elasticsearch/launch.sh) script. This script automates the configuration of an Elasticsearch instance by modifying its settings to change the HTTP port, disable security features, and ensure it runs in single-node mode. It also updates the `elasticsearch` user settings to allow login and starts the Elasticsearch service in the background. diff --git a/scripts/benchall.py b/scripts/benchall.py index 9137aa0..0dabff2 100755 --- a/scripts/benchall.py +++ b/scripts/benchall.py @@ -65,6 +65,7 @@ def run(bencher, kwargs, bench_target, attach=False, attach_on_error=False): bench = bencher(bench_target, **kwargs) bench.attach = attach bench.run_applicable(dataset_name) + #bench.run_everything([]) #bench.run_everything(['ingest', 'cold']) except Exception as e: statement = f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {bencher.__name__} with argument {kwargs} failed on dataset {dataset_name}: {type(e).__name__}: {str(e)}" From 58025014a1e94a96439b6c1ad8b08f4e776993c7 Mon Sep 17 00:00:00 2001 From: John Hao Date: Mon, 4 Aug 2025 08:22:09 -0400 Subject: [PATCH 10/10] add no logsdb option --- assets/elasticsearch/ingest.py | 37 +++++++++++++++++----------------- assets/elasticsearch/main.py | 14 +++++++++++-- scripts/benchall.py | 7 ++++--- 3 files changed, 35 insertions(+), 23 deletions(-) diff --git a/assets/elasticsearch/ingest.py b/assets/elasticsearch/ingest.py index 1a576bc..287a8d4 100644 --- a/assets/elasticsearch/ingest.py +++ b/assets/elasticsearch/ingest.py @@ -81,26 +81,27 @@ def traverse_data(collection_name): def ingest_dataset(): es = Elasticsearch("http://localhost:9202", request_timeout=1200, retry_on_timeout=True) - template_body = { - "index_patterns": [collection_name], - "template": { - "settings": { - "index": { - "mode": "logsdb" + if sys.argv[3] != "no_logsdb": + template_body = { + "index_patterns": [collection_name], + "template": { + "settings": { + "index": { + "mode": "logsdb" + }, }, - }, - "mappings": { - "properties": { - "@timestamp": { - "type": "date", - "format": "date_optional_time||epoch_second||epoch_millis||yyyy-MM-dd HH:mm:ss.SSS zzz" - } + "mappings": { + "properties": { + "@timestamp": { + "type": "date", + "format": "date_optional_time||epoch_second||epoch_millis||yyyy-MM-dd HH:mm:ss.SSS zzz" + } + } } - } - }, - "priority": 101 - } - es.indices.put_index_template(name=collection_name, body=template_body) + }, + "priority": 101 + } + es.indices.put_index_template(name=collection_name, body=template_body) count = 0 for success, info in streaming_bulk( diff --git a/assets/elasticsearch/main.py b/assets/elasticsearch/main.py index 297236f..9970982 100755 --- a/assets/elasticsearch/main.py +++ b/assets/elasticsearch/main.py @@ -13,9 +13,14 @@ class elasticsearch_bench(Benchmark): # add any parameters to the tool here - def __init__(self, dataset): + def __init__(self, dataset, logsdb=True): super().__init__(dataset) + self.logsdb = logsdb + + if not logsdb: + self.properties["notes"] = "no logsdb" + @property def compressed_size(self): """ @@ -35,8 +40,13 @@ def ingest(self): """ Ingests the dataset at self.datasets_path """ + if self.logsdb: + logsdb = "anything" + else: + logsdb = "no_logsdb" + self.docker_execute([ - f"python3 {ASSETS_DIR}/ingest.py {self.datasets_path} {self.dataset_meta['timestamp']}" + f"python3 {ASSETS_DIR}/ingest.py {self.datasets_path} {self.dataset_meta['timestamp']} {logsdb}" ]) def search(self, query): diff --git a/scripts/benchall.py b/scripts/benchall.py index 0dabff2..0565fe4 100755 --- a/scripts/benchall.py +++ b/scripts/benchall.py @@ -46,7 +46,8 @@ def get_target_from_name(name): # }), #(parquet_bench, {'mode': 'json string'}), #(parquet_bench, {'mode': 'pairwise arrays'}), - (elasticsearch_bench, {}), + (elasticsearch_bench, {'logsdb': False}), + (elasticsearch_bench, {'logsdb': True}), #(overhead_test_bench, {}), #(zstandard_bench, {}), #(sparksql_bench, {}), @@ -85,8 +86,8 @@ def run(bencher, kwargs, bench_target, attach=False, attach_on_error=False): if len(sys.argv) > 1: if dataset_name != sys.argv[1].strip(): continue - #run(bencher, kwargs, bench_target) - run(bencher, kwargs, bench_target, attach_on_error=True) + run(bencher, kwargs, bench_target) + #run(bencher, kwargs, bench_target, attach_on_error=True) #run(bencher, kwargs, bench_target, attach=True) #run(sparksql_bench, {}, get_target_from_name('mongod'))