Skip to content

Commit 0e11858

Browse files
committed
(benchmark): Filter out failed invocations
This meant a pretty large refactor to a report builder class since I needed to store the failed request IDs in shared state due to the format of the logs.
1 parent a0ac9bf commit 0e11858

File tree

2 files changed

+129
-92
lines changed

2 files changed

+129
-92
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
benchmark/data/*
2+
__pycache__/*
Lines changed: 128 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from typing import List, TypedDict, Optional
22
from datetime import datetime, timedelta, timezone
3-
from time import sleep, time
3+
from time import sleep
44
from csv import DictWriter
55
import re
66
import boto3
@@ -15,113 +15,149 @@ class Row(TypedDict):
1515

1616
class InvocationSummary(TypedDict):
1717
name: str
18+
request_id: str
1819
execution_time_ms: float
1920
cold_start: bool
2021
init_duration_ms: Optional[float]
2122

2223

23-
def submit_logs_query(client, lambda_name: str) -> str:
24-
finish = datetime.now(tz=timezone.utc)
25-
start = datetime.now(tz=timezone.utc) - timedelta(hours=1)
26-
27-
response = client.start_query(
28-
logGroupName=f"/aws/lambda/{lambda_name}",
29-
startTime=int(start.timestamp()),
30-
endTime=int(finish.timestamp()),
31-
queryString="fields @timestamp, @message",
32-
limit=10000,
33-
)
34-
35-
return response["queryId"]
36-
37-
38-
def poll_for_query_result(client, query_id: str):
39-
results = client.get_query_results(queryId=query_id)
40-
while results["status"] not in [
41-
"Complete",
42-
"Failed",
43-
"Cancelled",
44-
"Timeout",
45-
"Unknown",
46-
]:
47-
print(f"Query ID {query_id} not ready. Waiting")
48-
sleep(10)
49-
results = client.get_query_results(queryId=query_id)
50-
51-
return results.get("results", [])
52-
53-
54-
def parse_result_row(row) -> Row:
55-
result = Row(timestamp="", message="")
56-
for field in row:
57-
if field["field"] == "@timestamp":
58-
result["timestamp"] = field["value"]
59-
elif field["field"] == "@message":
60-
result["message"] = field["value"]
61-
62-
return result
63-
64-
65-
def process_report_rows(rows: List[Row], name: str) -> List[InvocationSummary]:
66-
result = []
67-
for row in rows:
68-
if row["message"].startswith("REPORT"):
69-
result.append(calc_report_summary(row["message"], name))
70-
71-
return result
72-
73-
74-
def calc_report_summary(message: str, name: str) -> InvocationSummary:
75-
cold_start = False
76-
init_duration_ms = None
77-
if init_match := re.search(
78-
r"Init Duration: (\d*\.?\d+) ms", message, re.IGNORECASE
79-
):
80-
init_duration_ms = float(init_match.group(1))
81-
cold_start = True
82-
83-
if exec_match := re.search(
84-
r"Billed Duration: (\d*\.?\d+) ms", message, re.IGNORECASE
85-
):
86-
return InvocationSummary(
87-
name=name,
88-
execution_time_ms=float(exec_match.group(1)),
89-
cold_start=cold_start,
90-
init_duration_ms=init_duration_ms,
24+
def get_error_request_id(line: str) -> str:
25+
if id_match := re.search(r"\s(.*?)\sERROR", line, re.IGNORECASE):
26+
return id_match.group(1)
27+
else:
28+
raise ValueError(f"Could not find request ID in error line: {line}")
29+
30+
31+
class LogReportBuilder:
32+
def __init__(self, client):
33+
self.client = client
34+
self.queries = []
35+
self.failed_requests = []
36+
37+
def submit_logs_query(self, lambda_name: str):
38+
finish = datetime.now(tz=timezone.utc)
39+
start = datetime.now(tz=timezone.utc) - timedelta(hours=6)
40+
41+
response = self.client.start_query(
42+
logGroupName=f"/aws/lambda/lambda_benchmark_{lambda_name}_lambda",
43+
startTime=int(start.timestamp()),
44+
endTime=int(finish.timestamp()),
45+
queryString="fields @timestamp, @message",
46+
limit=10000,
9147
)
9248

93-
print(f"Malformed message: {message}")
94-
raise ValueError("Malformed message")
49+
self.queries.append((response["queryId"], lambda_name))
50+
51+
def poll_for_all_query_results(self):
52+
all_results = {}
53+
for query_id, name in self.queries:
54+
print(f"Querying results for {name}")
55+
results = self.client.get_query_results(queryId=query_id)
56+
while results["status"] not in [
57+
"Complete",
58+
"Failed",
59+
"Cancelled",
60+
"Timeout",
61+
"Unknown",
62+
]:
63+
print(f"Query ID {query_id} not ready. Waiting")
64+
sleep(10)
65+
results = self.client.get_query_results(queryId=query_id)
66+
67+
all_results[name] = results.get("results", [])
68+
return all_results
69+
70+
def parse_result_row(self, row) -> Row:
71+
result = Row(timestamp="", message="")
72+
for field in row:
73+
if field["field"] == "@timestamp":
74+
result["timestamp"] = field["value"]
75+
elif field["field"] == "@message":
76+
result["message"] = field["value"]
77+
78+
return result
79+
80+
def process_report_rows(
81+
self, rows: List[Row], name: str
82+
) -> List[InvocationSummary]:
83+
result = []
84+
for row in rows:
85+
if row["message"].startswith("REPORT"):
86+
result.append(self.calc_report_summary(row["message"], name))
87+
elif "ERROR Invoke Error" in row["message"]:
88+
self.failed_requests.append(get_error_request_id(row["message"]))
89+
90+
return result
91+
92+
def calc_report_summary(self, message: str, name: str) -> InvocationSummary:
93+
cold_start = False
94+
init_duration_ms = None
95+
if init_match := re.search(
96+
r"Init Duration: (\d*\.?\d+) ms", message, re.IGNORECASE
97+
):
98+
init_duration_ms = float(init_match.group(1))
99+
cold_start = True
100+
101+
if exec_match := re.search(
102+
r"Billed Duration: (\d*\.?\d+) ms", message, re.IGNORECASE
103+
):
104+
105+
if id_match := re.search(
106+
r"RequestId: (.*?)(?:\s|\\t)", message, re.IGNORECASE
107+
):
108+
return InvocationSummary(
109+
request_id=id_match.group(1),
110+
name=name,
111+
execution_time_ms=float(exec_match.group(1)),
112+
cold_start=cold_start,
113+
init_duration_ms=init_duration_ms,
114+
)
115+
116+
print(f"Malformed message: {message}")
117+
raise ValueError("Malformed message")
118+
119+
def remove_failed_requests(
120+
self, rows: List[InvocationSummary]
121+
) -> List[InvocationSummary]:
122+
failed_requests = set(self.failed_requests)
123+
return [s for s in rows if s["request_id"] not in failed_requests]
124+
125+
def build_report(self) -> List[InvocationSummary]:
126+
for name in LIVE_LAMBDAS:
127+
print(f"Submitting query for {name} Lambda")
128+
self.submit_logs_query(name)
129+
130+
sleep(2)
131+
raw_results = self.poll_for_all_query_results()
132+
133+
print("Processing downloaded data")
134+
parsed: List[InvocationSummary] = []
135+
for name, results in raw_results.items():
136+
rows = [builder.parse_result_row(row) for row in results]
137+
for report in builder.process_report_rows(rows, name):
138+
parsed.append(report)
139+
140+
print("Removing failed requests")
141+
return builder.remove_failed_requests(parsed)
95142

96143

97144
def write_to_csv(data):
98145
with open("../data/parsed_cloudwatch_logs.csv", "w") as f:
99146
writer = DictWriter(
100147
f,
101-
fieldnames=["name", "execution_time_ms", "cold_start", "init_duration_ms"],
148+
fieldnames=[
149+
"name",
150+
"request_id",
151+
"execution_time_ms",
152+
"cold_start",
153+
"init_duration_ms",
154+
],
102155
)
103156
writer.writeheader()
104157
writer.writerows(data)
105158

106159

107160
if __name__ == "__main__":
108-
logs_client = boto3.client("logs")
109-
print("Submitting queries")
110-
queries = [
111-
(submit_logs_query(logs_client, f"lambda_benchmark_{name}_lambda"), name)
112-
for name in LIVE_LAMBDAS
113-
]
114-
sleep(1)
115-
116-
raw_results = {}
117-
for query_id, name in queries:
118-
print(f"Querying results for {name}")
119-
raw_results[name] = poll_for_query_result(logs_client, query_id)
120-
121-
parsed: List[InvocationSummary] = []
122-
for name, results in raw_results.items():
123-
rows = [parse_result_row(row) for row in results]
124-
for report in process_report_rows(rows, name):
125-
parsed.append(report)
126-
127-
write_to_csv(parsed)
161+
builder = LogReportBuilder(boto3.client("logs"))
162+
report = builder.build_report()
163+
write_to_csv(report)

0 commit comments

Comments
 (0)