|
| 1 | +#!/usr/bin/env python3 |
| 2 | + |
| 3 | +import argparse |
| 4 | +import configparser |
| 5 | +import datetime |
| 6 | +import os |
| 7 | +import posixpath |
| 8 | +import traceback |
| 9 | +import time |
| 10 | +import ydb |
| 11 | +from collections import Counter |
| 12 | + |
| 13 | +dir = os.path.dirname(__file__) |
| 14 | +config = configparser.ConfigParser() |
| 15 | +config_file_path = f"{dir}/../../config/ydb_qa_db.ini" |
| 16 | +config.read(config_file_path) |
| 17 | + |
| 18 | +build_preset = os.environ.get("build_preset") |
| 19 | +branch = os.environ.get("branch_to_compare") |
| 20 | + |
| 21 | +DATABASE_ENDPOINT = config["QA_DB"]["DATABASE_ENDPOINT"] |
| 22 | +DATABASE_PATH = config["QA_DB"]["DATABASE_PATH"] |
| 23 | + |
| 24 | + |
| 25 | +def create_tables(pool, table_path): |
| 26 | + print(f"> create table: {table_path}") |
| 27 | + |
| 28 | + def callee(session): |
| 29 | + session.execute_scheme(f""" |
| 30 | + CREATE table `{table_path}` ( |
| 31 | + `test_name` Utf8 NOT NULL, |
| 32 | + `suite_folder` Utf8 NOT NULL, |
| 33 | + `full_name` Utf8 NOT NULL, |
| 34 | + `date_window` Date NOT NULL, |
| 35 | + `days_ago_window` Uint64 NOT NULL, |
| 36 | + `history` String, |
| 37 | + `history_class` String, |
| 38 | + `pass_count` Uint64, |
| 39 | + `mute_count` Uint64, |
| 40 | + `fail_count` Uint64, |
| 41 | + `skip_count` Uint64, |
| 42 | + PRIMARY KEY (`test_name`, `suite_folder`, `full_name`,date_window) |
| 43 | + ) |
| 44 | + PARTITION BY HASH(`full_name`) |
| 45 | + WITH (STORE = COLUMN) |
| 46 | + """) |
| 47 | + |
| 48 | + return pool.retry_operation_sync(callee) |
| 49 | + |
| 50 | + |
| 51 | +def bulk_upsert(table_client, table_path, rows): |
| 52 | + print(f"> bulk upsert: {table_path}") |
| 53 | + column_types = ( |
| 54 | + ydb.BulkUpsertColumns() |
| 55 | + .add_column("test_name", ydb.OptionalType(ydb.PrimitiveType.Utf8)) |
| 56 | + .add_column("suite_folder", ydb.OptionalType(ydb.PrimitiveType.Utf8)) |
| 57 | + .add_column("full_name", ydb.OptionalType(ydb.PrimitiveType.Utf8)) |
| 58 | + .add_column("date_window", ydb.OptionalType(ydb.PrimitiveType.Date)) |
| 59 | + .add_column("days_ago_window", ydb.OptionalType(ydb.PrimitiveType.Uint64)) |
| 60 | + .add_column("history", ydb.OptionalType(ydb.PrimitiveType.String)) |
| 61 | + .add_column("history_class", ydb.OptionalType(ydb.PrimitiveType.String)) |
| 62 | + .add_column("pass_count", ydb.OptionalType(ydb.PrimitiveType.Uint64)) |
| 63 | + .add_column("mute_count", ydb.OptionalType(ydb.PrimitiveType.Uint64)) |
| 64 | + .add_column("fail_count", ydb.OptionalType(ydb.PrimitiveType.Uint64)) |
| 65 | + .add_column("skip_count", ydb.OptionalType(ydb.PrimitiveType.Uint64)) |
| 66 | + ) |
| 67 | + table_client.bulk_upsert(table_path, rows, column_types) |
| 68 | + |
| 69 | + |
| 70 | +def main(): |
| 71 | + parser = argparse.ArgumentParser() |
| 72 | + parser.add_argument('--days-window', default=5, type=int, help='how many days back we collecting history') |
| 73 | + |
| 74 | + args, unknown = parser.parse_known_args() |
| 75 | + history_for_n_day = args.days_window |
| 76 | + |
| 77 | + print(f'Getting hostory in window {history_for_n_day} days') |
| 78 | + |
| 79 | + |
| 80 | + if "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS" not in os.environ: |
| 81 | + print( |
| 82 | + "Error: Env variable CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS is missing, skipping" |
| 83 | + ) |
| 84 | + return 1 |
| 85 | + else: |
| 86 | + # Do not set up 'real' variable from gh workflows because it interfere with ydb tests |
| 87 | + # So, set up it locally |
| 88 | + os.environ["YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"] = os.environ[ |
| 89 | + "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS" |
| 90 | + ] |
| 91 | + with ydb.Driver( |
| 92 | + endpoint=DATABASE_ENDPOINT, |
| 93 | + database=DATABASE_PATH, |
| 94 | + credentials=ydb.credentials_from_env_variables(), |
| 95 | + ) as driver: |
| 96 | + driver.wait(timeout=10, fail_fast=True) |
| 97 | + session = ydb.retry_operation_sync( |
| 98 | + lambda: driver.table_client.session().create() |
| 99 | + ) |
| 100 | + |
| 101 | + # settings, paths, consts |
| 102 | + tc_settings = ydb.TableClientSettings().with_native_date_in_result_sets(enabled=True) |
| 103 | + table_client = ydb.TableClient(driver, tc_settings) |
| 104 | + |
| 105 | + table_path = f'test_results/analytics/flaky_tests_history_{history_for_n_day}_days' |
| 106 | + default_start_date = datetime.date(2024, 7, 1) |
| 107 | + |
| 108 | + with ydb.SessionPool(driver) as pool: |
| 109 | + create_tables(pool, table_path) |
| 110 | + |
| 111 | + # geting last date from history |
| 112 | + last_date_query = f"select max(date_window) as max_date_window from `{table_path}`" |
| 113 | + query = ydb.ScanQuery(last_date_query, {}) |
| 114 | + it = table_client.scan_query(query) |
| 115 | + results = [] |
| 116 | + while True: |
| 117 | + try: |
| 118 | + result = next(it) |
| 119 | + results = results + result.result_set.rows |
| 120 | + except StopIteration: |
| 121 | + break |
| 122 | + |
| 123 | + if results[0] and results[0].get( 'max_date_window', default_start_date) is not None: |
| 124 | + last_date = results[0].get( |
| 125 | + 'max_date_window', default_start_date).strftime('%Y-%m-%d') |
| 126 | + else: |
| 127 | + last_date = default_start_date.strftime('%Y-%m-%d') |
| 128 | + |
| 129 | + print(f'last hisotry date: {last_date}') |
| 130 | + # getting history for dates >= last_date |
| 131 | + query_get_history = f""" |
| 132 | + select |
| 133 | + full_name, |
| 134 | + date_base, |
| 135 | + history_list, |
| 136 | + dist_hist, |
| 137 | + suite_folder, |
| 138 | + test_name |
| 139 | + from ( |
| 140 | + select |
| 141 | + full_name, |
| 142 | + date_base, |
| 143 | + AGG_LIST(status) as history_list , |
| 144 | + String::JoinFromList( AGG_LIST_DISTINCT(status) ,',') as dist_hist, |
| 145 | + suite_folder, |
| 146 | + test_name |
| 147 | + from ( |
| 148 | + select * from ( |
| 149 | + select * from ( |
| 150 | + select |
| 151 | + DISTINCT suite_folder || '/' || test_name as full_name, |
| 152 | + suite_folder, |
| 153 | + test_name |
| 154 | + from `test_results/test_runs_results` |
| 155 | + where |
| 156 | + status in ('failure','mute') |
| 157 | + and job_name in ('Nightly-run', 'Postcommit_relwithdebinfo') |
| 158 | + and build_type = 'relwithdebinfo' and |
| 159 | + run_timestamp >= Date('{last_date}') -{history_for_n_day}*Interval("P1D") |
| 160 | + ) as tests_with_fails |
| 161 | + cross join ( |
| 162 | + select |
| 163 | + DISTINCT DateTime::MakeDate(run_timestamp) as date_base |
| 164 | + from `test_results/test_runs_results` |
| 165 | + where |
| 166 | + status in ('failure','mute') |
| 167 | + and job_name in ('Nightly-run', 'Postcommit_relwithdebinfo') |
| 168 | + and build_type = 'relwithdebinfo' |
| 169 | + and run_timestamp>= Date('{last_date}') |
| 170 | + ) as date_list |
| 171 | + ) as test_and_date |
| 172 | + left JOIN ( |
| 173 | + select * from ( |
| 174 | + select |
| 175 | + suite_folder || '/' || test_name as full_name, |
| 176 | + run_timestamp, |
| 177 | + status |
| 178 | + --ROW_NUMBER() OVER (PARTITION BY test_name ORDER BY run_timestamp DESC) AS rn |
| 179 | + from `test_results/test_runs_results` |
| 180 | + where |
| 181 | + run_timestamp >= Date('{last_date}') -{history_for_n_day}*Interval("P1D") and |
| 182 | + job_name in ('Nightly-run', 'Postcommit_relwithdebinfo') |
| 183 | + and build_type = 'relwithdebinfo' |
| 184 | + ) |
| 185 | + ) as hist |
| 186 | + ON test_and_date.full_name=hist.full_name |
| 187 | + where |
| 188 | + hist.run_timestamp >= test_and_date.date_base -{history_for_n_day}*Interval("P1D") AND |
| 189 | + hist.run_timestamp <= test_and_date.date_base |
| 190 | +
|
| 191 | + ) |
| 192 | + GROUP BY full_name,suite_folder,test_name,date_base |
| 193 | + |
| 194 | + ) |
| 195 | + """ |
| 196 | + query = ydb.ScanQuery(query_get_history, {}) |
| 197 | + # start transaction time |
| 198 | + start_time = time.time() |
| 199 | + it = driver.table_client.scan_query(query) |
| 200 | + # end transaction time |
| 201 | + |
| 202 | + results = [] |
| 203 | + prepared_for_update_rows = [] |
| 204 | + while True: |
| 205 | + try: |
| 206 | + result = next(it) |
| 207 | + results = results + result.result_set.rows |
| 208 | + except StopIteration: |
| 209 | + break |
| 210 | + end_time = time.time() |
| 211 | + print(f'transaction duration: {end_time - start_time}') |
| 212 | + |
| 213 | + print(f'history data captured, {len(results)} rows') |
| 214 | + for row in results: |
| 215 | + row['count'] = dict(zip(list(row['history_list']), [list( |
| 216 | + row['history_list']).count(i) for i in list(row['history_list'])])) |
| 217 | + prepared_for_update_rows.append({ |
| 218 | + 'suite_folder': row['suite_folder'], |
| 219 | + 'test_name': row['test_name'], |
| 220 | + 'full_name': row['full_name'], |
| 221 | + 'date_window': row['date_base'], |
| 222 | + 'days_ago_window': history_for_n_day, |
| 223 | + 'history': ','.join(row['history_list']).encode('utf8'), |
| 224 | + 'history_class': row['dist_hist'], |
| 225 | + 'pass_count': row['count'].get('passed', 0), |
| 226 | + 'mute_count': row['count'].get('mute', 0), |
| 227 | + 'fail_count': row['count'].get('failure', 0), |
| 228 | + 'skip_count': row['count'].get('skipped', 0), |
| 229 | + }) |
| 230 | + print('upserting history') |
| 231 | + with ydb.SessionPool(driver) as pool: |
| 232 | + |
| 233 | + create_tables(pool, table_path) |
| 234 | + full_path = posixpath.join(DATABASE_PATH, table_path) |
| 235 | + bulk_upsert(driver.table_client, full_path, |
| 236 | + prepared_for_update_rows) |
| 237 | + |
| 238 | + print('history updated') |
| 239 | + |
| 240 | + |
| 241 | +if __name__ == "__main__": |
| 242 | + main() |
0 commit comments