diff --git a/assets/clickhouse/Dockerfile b/assets/clickhouse/Dockerfile index 0fafbbd..4719f0f 100644 --- a/assets/clickhouse/Dockerfile +++ b/assets/clickhouse/Dockerfile @@ -1,2 +1 @@ -FROM clickhouse/clickhouse-server:latest - +FROM clickhouse/clickhouse-server:25.6@sha256:7566eea413755c8fc43a7b39078b05f16ed350bb33c98f2f087c5f66b0e8163f diff --git a/assets/clickhouse/main.py b/assets/clickhouse/main.py index 75bce8c..fdf55b6 100755 --- a/assets/clickhouse/main.py +++ b/assets/clickhouse/main.py @@ -13,7 +13,7 @@ """ CLICKHOUSE_COLLECTION_NAME = "clickhouse_bench" -class clickhouse_native_json_bench(Benchmark): +class clickhouse_bench(Benchmark): # add any parameters to the tool here def __init__(self, dataset, manual_column_names=True, keys=[], additional_order_by=[], timestamp_key=False): super().__init__(dataset) @@ -154,7 +154,7 @@ def run_applicable(self, dataset_name): def main(): - bench = clickhouse_native_json_bench(sys.argv[1]) + bench = clickhouse_bench(sys.argv[1]) bench.run_everything() if __name__ == "__main__": diff --git a/assets/clickhouse/methodology.md b/assets/clickhouse/methodology.md index ce16885..2977936 100644 --- a/assets/clickhouse/methodology.md +++ b/assets/clickhouse/methodology.md @@ -1,6 +1,6 @@ # ClickHouse methodology -**Version: [23.3.1.2823](https://hub.docker.com/layers/clickhouse/clickhouse-server/23.3.1.2823/images/sha256-b88fd8c71b64d3158751337557ff089ff7b0d1ebf81d9c4c7aa1f0b37a31ee64?context=explore)** +**Version: [25.6](https://hub.docker.com/layers/clickhouse/clickhouse-server/25.6/images/sha256-77ff1f2054e27bec1b4c5eccbf701b63f8409831fea71f162ae2f25872dee1f4)** **File with Formatted Queries:** [Config File](/assets/clickhouse/config.yaml) @@ -67,4 +67,4 @@ On launch we execute ClickHouse’s entrypoint.sh to start ClickHouse in the bac On shutdown, we simply send ClickHouse a stop command. **Clearing Caches:** -Before starting any queries we ask ClickHouse to drop its UNCOMPRESSED CACHE and MARK CACHE. We also flush the file system buffers and clear the filesystem caches. \ No newline at end of file +Before starting any queries we ask ClickHouse to drop its UNCOMPRESSED CACHE and MARK CACHE. We also flush the file system buffers and clear the filesystem caches. diff --git a/assets/clp/main.py b/assets/clp/main.py index 7ad36e6..0e148a0 100755 --- a/assets/clp/main.py +++ b/assets/clp/main.py @@ -5,7 +5,7 @@ CLP_OUT_PATH = f"{WORK_DIR}/archives" CLP_S_BINARY = "/clp/clp-s" -class clp_s_bench(Benchmark): +class clp_bench(Benchmark): def __init__(self, dataset, target_encoded_size=268435456): super().__init__(dataset) @@ -61,7 +61,7 @@ def terminate_procs(self): def main(): - bench = clp_s_bench(sys.argv[1]) + bench = clp_bench(sys.argv[1]) bench.run_everything() if __name__ == "__main__": diff --git a/assets/elasticsearch/Dockerfile b/assets/elasticsearch/Dockerfile index 6ca38cd..d760f96 100644 --- a/assets/elasticsearch/Dockerfile +++ b/assets/elasticsearch/Dockerfile @@ -1,4 +1,4 @@ -FROM ghcr.io/y-scope/clp/clp-core-dependencies-x86-ubuntu-jammy:main +from ubuntu:jammy RUN apt-get update \ && DEBIAN_FRONTEND=noninteractive apt-get install -y \ @@ -8,12 +8,12 @@ RUN apt-get update \ RUN curl -fsSL https://artifacts.elastic.co/GPG-KEY-elasticsearch \ | gpg --dearmor -o /usr/share/keyrings/elasticsearch-keyring.gpg \ && echo "deb [signed-by=/usr/share/keyrings/elasticsearch-keyring.gpg]" \ - "https://artifacts.elastic.co/packages/8.x/apt stable main" \ - | tee /etc/apt/sources.list.d/elastic-8.x.list; + "https://artifacts.elastic.co/packages/9.x/apt stable main" \ + | tee /etc/apt/sources.list.d/elastic-9.x.list; RUN apt-get update \ && DEBIAN_FRONTEND=noninteractive apt-get install -y \ - elasticsearch=8.6.2 \ + elasticsearch=9.0.3 \ libcurl4 \ libcurl4-openssl-dev \ python3-pip \ @@ -21,4 +21,4 @@ RUN apt-get update \ tmux \ vim; -RUN pip3 install elasticsearch==8.6.2 requests; +RUN pip3 install elasticsearch==9.0.3 requests; diff --git a/assets/elasticsearch/ingest.py b/assets/elasticsearch/ingest.py index 2598266..287a8d4 100644 --- a/assets/elasticsearch/ingest.py +++ b/assets/elasticsearch/ingest.py @@ -10,6 +10,14 @@ log_path = sys.argv[1] +def pop_by_path(obj, path): + keys = path.split('.') + for key in keys[:-1]: + obj = obj[key] + out = obj[keys[-1]] + del obj[keys[-1]] + return out + def traverse_data(collection_name): with open(log_path, encoding="utf-8") as f: for line in f: @@ -55,8 +63,17 @@ def traverse_data(collection_name): id_value = str(attr["query"]["_id"]) json_line["attr"]["query"]["_id"] = {} json_line["attr"]["query"]["_id"]["_ooid"] = id_value + + try: + timestamp_val = pop_by_path(json_line, sys.argv[2]) + json_line["@timestamp"] = timestamp_val + except KeyError: + # no such timestamp, ignore + json_line["@timestamp"] = 0 + yield { "_index": collection_name, + "_op_type": "create", "_source": json_line, } @@ -64,6 +81,28 @@ def traverse_data(collection_name): def ingest_dataset(): es = Elasticsearch("http://localhost:9202", request_timeout=1200, retry_on_timeout=True) + if sys.argv[3] != "no_logsdb": + template_body = { + "index_patterns": [collection_name], + "template": { + "settings": { + "index": { + "mode": "logsdb" + }, + }, + "mappings": { + "properties": { + "@timestamp": { + "type": "date", + "format": "date_optional_time||epoch_second||epoch_millis||yyyy-MM-dd HH:mm:ss.SSS zzz" + } + } + } + }, + "priority": 101 + } + es.indices.put_index_template(name=collection_name, body=template_body) + count = 0 for success, info in streaming_bulk( es, diff --git a/assets/elasticsearch/main.py b/assets/elasticsearch/main.py index 3fb23c7..9970982 100755 --- a/assets/elasticsearch/main.py +++ b/assets/elasticsearch/main.py @@ -13,9 +13,14 @@ class elasticsearch_bench(Benchmark): # add any parameters to the tool here - def __init__(self, dataset): + def __init__(self, dataset, logsdb=True): super().__init__(dataset) + self.logsdb = logsdb + + if not logsdb: + self.properties["notes"] = "no logsdb" + @property def compressed_size(self): """ @@ -35,8 +40,13 @@ def ingest(self): """ Ingests the dataset at self.datasets_path """ + if self.logsdb: + logsdb = "anything" + else: + logsdb = "no_logsdb" + self.docker_execute([ - f"python3 {ASSETS_DIR}/ingest.py {self.datasets_path}" + f"python3 {ASSETS_DIR}/ingest.py {self.datasets_path} {self.dataset_meta['timestamp']} {logsdb}" ]) def search(self, query): diff --git a/assets/elasticsearch/methodology.md b/assets/elasticsearch/methodology.md index 2e2c6a1..968a8b1 100644 --- a/assets/elasticsearch/methodology.md +++ b/assets/elasticsearch/methodology.md @@ -1,6 +1,6 @@ # Elasticsearch methodology -**Version:** [8.6.2](https://www.elastic.co/downloads/past-releases/elasticsearch-8-6-2) +**Version:** [9.0.3](https://www.elastic.co/downloads/past-releases/elasticsearch-9-0-3) **File with Formatted Queries:** [Config File](/assets/elasticsearch/config.yaml) @@ -16,6 +16,8 @@ We disable the [xpack](https://www.elastic.co/guide/en/elasticsearch/reference/c Some preprocessing is necessary to make the dataset searchable in Elasticsearch. For more details, refer to the `traverse_data` function in [ingest.py](/assets/elasticsearch/ingest.py). This process generally involves reorganizing specific fields, moving them into outer or inner objects to ensure proper query functionality. +We use the [logs data stream](https://www.elastic.co/docs/manage-data/data-store/data-streams/logs-data-stream) which is optimized for timestamped logs + ### Launch & Shutdown On launch the benchmark framework calls the [launch.sh](/assets/elasticsearch/launch.sh) script. This script automates the configuration of an Elasticsearch instance by modifying its settings to change the HTTP port, disable security features, and ensure it runs in single-node mode. It also updates the `elasticsearch` user settings to allow login and starts the Elasticsearch service in the background. diff --git a/assets/gzip/Dockerfile b/assets/gzip/Dockerfile index 18ea6a5..8b7c41c 100644 --- a/assets/gzip/Dockerfile +++ b/assets/gzip/Dockerfile @@ -2,11 +2,11 @@ # dependencies # If there is any dedicated image available, you should build the benchmarking image on top of that -FROM ghcr.io/y-scope/clp/clp-core-dependencies-x86-ubuntu-jammy:main +FROM ubuntu:jammy # Install necessary packages RUN apt-get update \ - && DEBIAN_FRONTEND=noninteractive apt-get install -y \ + && DEBIAN_FRONTEND=noninteractive apt-get install -y --allow-downgrades \ tmux \ vim \ gzip diff --git a/assets/gzip/main.py b/assets/gzip/main.py index bbb9e4f..3fc453f 100755 --- a/assets/gzip/main.py +++ b/assets/gzip/main.py @@ -37,7 +37,7 @@ def ingest(self): """ Ingests the dataset at self.datasets_path """ - self.docker_execute(f"gzip -c {self.datasets_path} > {GZIP_FILE_PATH} ") + self.docker_execute(f"gzip -c {self.datasets_path} > {GZIP_FILE_PATH} ", shell=True) def search(self, query): """ @@ -45,7 +45,7 @@ def search(self, query): """ assert query == "notasearch" - self.docker_execute(f"gunzip -c {GZIP_FILE_PATH} > {DECOMPRESSED_FILE_PATH}") + self.docker_execute(f"gunzip -c {GZIP_FILE_PATH} > {DECOMPRESSED_FILE_PATH}", shell=True) return 0 def bench_search(self, cold=True): if not cold: @@ -53,7 +53,7 @@ def bench_search(self, cold=True): return self.bench_start(ingest=False) - self.docker_execute(f"gunzip -c {GZIP_FILE_PATH} > {DECOMPRESSED_FILE_PATH}") + self.docker_execute(f"gunzip -c {GZIP_FILE_PATH} > {DECOMPRESSED_FILE_PATH}", shell=True) self.bench_stop() logger.info("Decompression done") diff --git a/assets/overhead_test/Dockerfile b/assets/overhead_test/Dockerfile index 2428ce5..3237030 100644 --- a/assets/overhead_test/Dockerfile +++ b/assets/overhead_test/Dockerfile @@ -2,7 +2,7 @@ # dependencies # If there is any dedicated image available, you should build the benchmarking image on top of that -FROM ghcr.io/y-scope/clp/clp-core-dependencies-x86-ubuntu-jammy:main +FROM ubuntu:jammy # Install necessary packages RUN apt-get update \ diff --git a/assets/presto_clp/main.py b/assets/presto_clp/main.py index 487ae6b..4a5255c 100755 --- a/assets/presto_clp/main.py +++ b/assets/presto_clp/main.py @@ -16,7 +16,7 @@ CLP_PRESTO_HOST_STORAGE = os.path.abspath(os.path.expanduser("~/clp-json-x86_64-v0.4.0-dev")) SQL_PASSWORD = "wqEGPyBdx_w" HOST_IP = "127.0.0.1" -class clp_presto_bench(Benchmark): +class presto_clp_bench(Benchmark): # add any parameters to the tool here def __init__(self, dataset, dataset_variation='cleaned_log'): super().__init__(dataset, dataset_variation=dataset_variation) @@ -102,7 +102,7 @@ def terminate(self): def main(): - bench = clp_presto_bench(sys.argv[1]) + bench = presto_clp_bench(sys.argv[1]) bench.run_everything() if __name__ == "__main__": diff --git a/assets/presto_parquet/Dockerfile b/assets/presto_parquet/Dockerfile index c792d39..03c87e5 100644 --- a/assets/presto_parquet/Dockerfile +++ b/assets/presto_parquet/Dockerfile @@ -1,4 +1,4 @@ -FROM presto/prestissimo-dependency:ubuntu-22.04 +FROM presto/prestissimo-dependency:ubuntu-22.04-presto-0.293 # Install necessary packages RUN apt-get update; @@ -35,22 +35,28 @@ RUN cd /usr/local && sh -c "$(curl --location https://taskfile.dev/install.sh)" RUN pip3 install pyarrow; WORKDIR /home -RUN git clone https://github.com/anlowee/presto.git; +RUN git clone https://github.com/y-scope/presto.git; WORKDIR /home/presto -RUN git checkout xwei/benchmark-test; +RUN git checkout 89ce0f3b4ec713d658f3544e75aeb92fbd3a397d; WORKDIR /home/presto/presto-native-execution RUN mkdir build; RUN rm -rf velox; -RUN git clone https://github.com/anlowee/velox.git; +RUN git clone https://github.com/y-scope/velox.git; WORKDIR /home/presto/presto-native-execution/velox -RUN git checkout xwei/benchmark-test; +RUN git checkout 27ee4bcaad449fd1c8b90c48787f4eaf8e92395f; WORKDIR /home/presto RUN ./mvnw clean install -DskipTests -pl -presto-docs -T1C; -WORKDIR /home/presto/presto-native-execution/build -RUN cmake .. && make -j$(nproc) presto_server; +WORKDIR /home/presto/presto-native-execution +# RUN cmake .. && make -j$(nproc) presto_server; +RUN NUM_THREADS=8 make release; ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64 ENV PATH=$JAVA_HOME/bin:$PATH +RUN apt-get update; + +RUN apt-get install -y \ + mariadb-client \ + net-tools; diff --git a/assets/presto_parquet/include/etc_worker/config.properties b/assets/presto_parquet/include/etc_worker/config.properties index 24a455b..76c8a4e 100644 --- a/assets/presto_parquet/include/etc_worker/config.properties +++ b/assets/presto_parquet/include/etc_worker/config.properties @@ -1,5 +1,5 @@ discovery.uri=http://127.0.0.1:8080 -presto.version=0.293-SNAPSHOT-faae543 +presto.version=0.293-89ce0f3 http-server.http.port=7777 shutdown-onset-sec=1 register-test-functions=false diff --git a/assets/presto_parquet/main.py b/assets/presto_parquet/main.py index 671953f..192dd68 100755 --- a/assets/presto_parquet/main.py +++ b/assets/presto_parquet/main.py @@ -63,20 +63,20 @@ def launch(self): """ Runs the benchmarked tool """ - self.docker_execute("bash -c \"python3 /home/presto/presto-server/target/presto-server-0.293-SNAPSHOT/bin/launcher.py run --etc-dir=/home/include/etc_coordinator\"", background=True) + self.docker_execute("bash -c \"python3 /home/presto/presto-server/target/presto-server-0.293/bin/launcher.py run --etc-dir=/home/include/etc_coordinator\"", background=True) self.wait_for_port(8080) - self.docker_execute("/home/presto/presto-native-execution/build/presto_cpp/main/presto_server --logtostderr=1 --etc_dir=/home/include/etc_worker", background=True) + self.docker_execute("/home/presto/presto-native-execution/_build/release/presto_cpp/main/presto_server --logtostderr=1 --etc_dir=/home/include/etc_worker", background=True) self.wait_for_port(7777) time.sleep(60) # this needs to be more than 10 def hive_execute(self, query, check=True): - return self.docker_execute(f'/home/presto/presto-cli/target/presto-cli-0.293-SNAPSHOT-executable.jar --catalog hive --schema {PARQUET_SCHEMA_NAME} --execute "{query}"', check) + return self.docker_execute(f'/home/presto/presto-cli/target/presto-cli-0.293-executable.jar --catalog hive --schema {PARQUET_SCHEMA_NAME} --execute "{query}"', check) def ingest(self): """ Ingests the dataset at self.datasets_path """ - self.docker_execute(f'/home/presto/presto-cli/target/presto-cli-0.293-SNAPSHOT-executable.jar --catalog hive --schema {PARQUET_SCHEMA_NAME} --execute "CREATE SCHEMA IF NOT EXISTS hive.{PARQUET_SCHEMA_NAME};"') + self.docker_execute(f'/home/presto/presto-cli/target/presto-cli-0.293-executable.jar --catalog hive --schema {PARQUET_SCHEMA_NAME} --execute "CREATE SCHEMA IF NOT EXISTS hive.{PARQUET_SCHEMA_NAME};"') if self.pairwise_arrays: self.hive_execute(f""" \ diff --git a/assets/template/Dockerfile b/assets/template/Dockerfile index 2428ce5..3237030 100644 --- a/assets/template/Dockerfile +++ b/assets/template/Dockerfile @@ -2,7 +2,7 @@ # dependencies # If there is any dedicated image available, you should build the benchmarking image on top of that -FROM ghcr.io/y-scope/clp/clp-core-dependencies-x86-ubuntu-jammy:main +FROM ubuntu:jammy # Install necessary packages RUN apt-get update \ diff --git a/scripts/benchall.py b/scripts/benchall.py index 6854f17..0565fe4 100755 --- a/scripts/benchall.py +++ b/scripts/benchall.py @@ -1,17 +1,18 @@ #!/usr/bin/env python3 -from assets.clp_s.main import clp_s_bench -from assets.clickhouse_native_json.main import clickhouse_native_json_bench +from assets.clp.main import clp_bench +from assets.clickhouse.main import clickhouse_bench from assets.sparksql.main import sparksql_bench -from assets.parquet.main import parquet_bench +from assets.presto_parquet.main import parquet_bench from assets.zstandard.main import zstandard_bench from assets.elasticsearch.main import elasticsearch_bench -from assets.clp_presto.main import clp_presto_bench +from assets.presto_clp.main import presto_clp_bench from assets.overhead_test.main import overhead_test_bench from assets.gzip.main import gzip_bench from src.jsonsync import JsonItem import os +import sys import datetime from pathlib import Path @@ -33,26 +34,27 @@ def get_target_from_name(name): benchmarks = [ # benchmark object, arguments - (clp_s_bench, {}), - (clickhouse_native_json_bench, { - 'manual_column_names': False, - 'keys': [], - 'additional_order_by': [], - 'timestamp_key': True - }), - (clp_presto_bench, { - 'dataset_variation': "cleaned_log" - }), - (parquet_bench, {'mode': 'json string'}), - (parquet_bench, {'mode': 'pairwise arrays'}), - (elasticsearch_bench, {}), - (overhead_test_bench, {}), - (zstandard_bench, {}), - (sparksql_bench, {}), - (gzip_bench, {}), + #(clp_bench, {}), + #(clickhouse_bench, { + # 'manual_column_names': False, + # 'keys': [], + # 'additional_order_by': [], + # 'timestamp_key': True + # }), + #(presto_clp_bench, { + # 'dataset_variation': "cleaned_log" + # }), + #(parquet_bench, {'mode': 'json string'}), + #(parquet_bench, {'mode': 'pairwise arrays'}), + (elasticsearch_bench, {'logsdb': False}), + (elasticsearch_bench, {'logsdb': True}), + #(overhead_test_bench, {}), + #(zstandard_bench, {}), + #(sparksql_bench, {}), + #(gzip_bench, {}), ] -def run(bencher, kwargs, bench_target, attach=False): +def run(bencher, kwargs, bench_target, attach=False, attach_on_error=False): dataset_name = 'error when finding dataset name' bench = None try: @@ -64,13 +66,14 @@ def run(bencher, kwargs, bench_target, attach=False): bench = bencher(bench_target, **kwargs) bench.attach = attach bench.run_applicable(dataset_name) + #bench.run_everything([]) #bench.run_everything(['ingest', 'cold']) except Exception as e: statement = f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {bencher.__name__} with argument {kwargs} failed on dataset {dataset_name}: {type(e).__name__}: {str(e)}" with open((current_dir / 'exceptions.log').resolve(), 'a') as file: file.write(f"{statement}\n") print(statement) - if attach: + if attach or attach_on_error: if bench is not None: bench.docker_attach() else: @@ -79,10 +82,12 @@ def run(bencher, kwargs, bench_target, attach=False): for bencher, kwargs in benchmarks: for bench_target in bench_target_dirs: dataset_name = os.path.basename(bench_target.resolve()).strip() - - #if dataset_name != 'mongod': # only use mongod for now - # continue + + if len(sys.argv) > 1: + if dataset_name != sys.argv[1].strip(): + continue run(bencher, kwargs, bench_target) + #run(bencher, kwargs, bench_target, attach_on_error=True) #run(bencher, kwargs, bench_target, attach=True) #run(sparksql_bench, {}, get_target_from_name('mongod'))