Skip to content

Commit 17635e7

Browse files
authored
Merge pull request #18 from awslabs/cloudwatchlogs
Add cloudwatchlogs module and bumping version to 0.0.1! 🚀
2 parents 6c1fe73 + 0a40bef commit 17635e7

22 files changed

+623
-51
lines changed

README.md

Lines changed: 55 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# AWS Data Wrangler (beta)
1+
# AWS Data Wrangler
22

33
> Utility belt to handle data on AWS.
44
@@ -14,16 +14,28 @@
1414

1515
## Use Cases
1616

17-
* Pandas -> Parquet (S3)
18-
* Pandas -> CSV (S3)
17+
### Pandas
18+
* Pandas -> Parquet (S3) (Parallel :rocket:)
19+
* Pandas -> CSV (S3) (Parallel :rocket:)
1920
* Pandas -> Glue Catalog
20-
* Pandas -> Athena
21-
* Pandas -> Redshift
21+
* Pandas -> Athena (Parallel :rocket:)
22+
* Pandas -> Redshift (Parallel :rocket:)
2223
* CSV (S3) -> Pandas (One shot or Batching)
2324
* Athena -> Pandas (One shot or Batching)
24-
* PySpark -> Redshift
25-
* Delete S3 objects (parallel :rocket:)
26-
* Encrypt S3 data with KMS keys
25+
* CloudWatch Logs Insights -> Pandas (NEW :star:)
26+
* Encrypt Pandas Dataframes on S3 with KMS keys (NEW :star:)
27+
28+
### PySpark
29+
* PySpark -> Redshift (Parallel :rocket:) (NEW :star:)
30+
31+
### General
32+
* List S3 objects (Parallel :rocket:)
33+
* Delete S3 objects (Parallel :rocket:)
34+
* Delete listed S3 objects (Parallel :rocket:)
35+
* Delete NOT listed S3 objects (Parallel :rocket:)
36+
* Copy listed S3 objects (Parallel :rocket:)
37+
* Get the size of S3 objects (Parallel :rocket:)
38+
* Get CloudWatch Logs Insights query results (NEW :star:)
2739

2840
## Installation
2941

@@ -37,7 +49,9 @@ Runs anywhere (AWS Lambda, AWS Glue, EMR, EC2, on-premises, local, etc).
3749

3850
## Examples
3951

40-
### Writing Pandas Dataframe to S3 + Glue Catalog
52+
### Pandas
53+
54+
#### Writing Pandas Dataframe to S3 + Glue Catalog
4155

4256
```py3
4357
session = awswrangler.Session()
@@ -51,7 +65,7 @@ session.pandas.to_parquet(
5165

5266
If a Glue Database name is passed, all the metadata will be created in the Glue Catalog. If not, only the s3 data write will be done.
5367

54-
### Writing Pandas Dataframe to S3 as Parquet encrypting with a KMS key
68+
#### Writing Pandas Dataframe to S3 as Parquet encrypting with a KMS key
5569

5670
```py3
5771
extra_args = {
@@ -64,7 +78,7 @@ session.pandas.to_parquet(
6478
)
6579
```
6680

67-
### Reading from AWS Athena to Pandas
81+
#### Reading from AWS Athena to Pandas
6882

6983
```py3
7084
session = awswrangler.Session()
@@ -74,7 +88,7 @@ dataframe = session.pandas.read_sql_athena(
7488
)
7589
```
7690

77-
### Reading from AWS Athena to Pandas in chunks (For memory restrictions)
91+
#### Reading from AWS Athena to Pandas in chunks (For memory restrictions)
7892

7993
```py3
8094
session = awswrangler.Session()
@@ -87,14 +101,14 @@ for dataframe in dataframe_iter:
87101
print(dataframe) # Do whatever you want
88102
```
89103

90-
### Reading from S3 (CSV) to Pandas
104+
#### Reading from S3 (CSV) to Pandas
91105

92106
```py3
93107
session = awswrangler.Session()
94108
dataframe = session.pandas.read_csv(path="s3://...")
95109
```
96110

97-
### Reading from S3 (CSV) to Pandas in chunks (For memory restrictions)
111+
#### Reading from S3 (CSV) to Pandas in chunks (For memory restrictions)
98112

99113
```py3
100114
session = awswrangler.Session()
@@ -106,7 +120,17 @@ for dataframe in dataframe_iter:
106120
print(dataframe) # Do whatever you want
107121
```
108122

109-
### Typical Pandas ETL
123+
#### Reading from CloudWatch Logs Insights to Pandas
124+
125+
```py3
126+
session = awswrangler.Session()
127+
dataframe = session.pandas.read_log_query(
128+
log_group_names=[LOG_GROUP_NAME],
129+
query="fields @timestamp, @message | sort @timestamp desc | limit 5",
130+
)
131+
```
132+
133+
#### Typical Pandas ETL
110134

111135
```py3
112136
import pandas
@@ -125,7 +149,9 @@ session.pandas.to_parquet( # Storing the data and metadata to Data Lake
125149
)
126150
```
127151

128-
### Loading Pyspark Dataframe to Redshift
152+
### PySpark
153+
154+
#### Loading PySpark Dataframe to Redshift
129155

130156
```py3
131157
session = awswrangler.Session(spark_session=spark)
@@ -140,13 +166,25 @@ session.spark.to_redshift(
140166
)
141167
```
142168

143-
### Deleting a bunch of S3 objects
169+
### General
170+
171+
#### Deleting a bunch of S3 objects (parallel :rocket:)
144172

145173
```py3
146174
session = awswrangler.Session()
147175
session.s3.delete_objects(path="s3://...")
148176
```
149177

178+
#### Get CloudWatch Logs Insights query results
179+
180+
```py3
181+
session = awswrangler.Session()
182+
results = session.cloudwatchlogs.query(
183+
log_group_names=[LOG_GROUP_NAME],
184+
query="fields @timestamp, @message | sort @timestamp desc | limit 5",
185+
)
186+
```
187+
150188
## Diving Deep
151189

152190
### Pandas to Redshift Flow

awswrangler/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from awswrangler.pandas import Pandas # noqa
77
from awswrangler.s3 import S3 # noqa
88
from awswrangler.athena import Athena # noqa
9+
from awswrangler.cloudwatchlogs import CloudWatchLogs # noqa
910
from awswrangler.glue import Glue # noqa
1011
from awswrangler.redshift import Redshift # noqa
1112
import awswrangler.utils # noqa

awswrangler/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
__title__ = "awswrangler"
22
__description__ = "Utility belt to handle data on AWS."
3-
__version__ = "0.0b32"
3+
__version__ = "0.0.1"
44
__license__ = "Apache License 2.0"

awswrangler/athena.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from time import sleep
22
import logging
33

4-
from awswrangler.exceptions import UnsupportedType
4+
from awswrangler.exceptions import UnsupportedType, QueryFailed, QueryCancelled
55

66
logger = logging.getLogger(__name__)
77

@@ -86,12 +86,29 @@ def run_query(self, query, database, s3_output=None):
8686
return response["QueryExecutionId"]
8787

8888
def wait_query(self, query_execution_id):
89+
"""
90+
Wait query ends
91+
92+
:param query_execution_id: Query execution ID
93+
:return: Query response
94+
"""
8995
final_states = ["FAILED", "SUCCEEDED", "CANCELLED"]
9096
response = self._client_athena.get_query_execution(
9197
QueryExecutionId=query_execution_id)
92-
while (response.get("QueryExecution").get("Status").get("State") not in
93-
final_states):
98+
state = response["QueryExecution"]["Status"]["State"]
99+
while state not in final_states:
94100
sleep(QUERY_WAIT_POLLING_DELAY)
95101
response = self._client_athena.get_query_execution(
96102
QueryExecutionId=query_execution_id)
103+
state = response["QueryExecution"]["Status"]["State"]
104+
logger.debug(f"state: {state}")
105+
logger.debug(
106+
f"StateChangeReason: {response['QueryExecution']['Status'].get('StateChangeReason')}"
107+
)
108+
if state == "FAILED":
109+
raise QueryFailed(
110+
response["QueryExecution"]["Status"].get("StateChangeReason"))
111+
elif state == "CANCELLED":
112+
raise QueryCancelled(
113+
response["QueryExecution"]["Status"].get("StateChangeReason"))
97114
return response

awswrangler/cloudwatchlogs.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
from time import sleep
2+
from datetime import datetime
3+
import logging
4+
5+
from awswrangler.exceptions import QueryFailed, QueryCancelled
6+
7+
logger = logging.getLogger(__name__)
8+
9+
QUERY_WAIT_POLLING_DELAY = 0.2 # MILLISECONDS
10+
11+
12+
class CloudWatchLogs:
13+
def __init__(self, session):
14+
self._session = session
15+
self._client_logs = session.boto3_session.client(
16+
service_name="logs", config=session.botocore_config)
17+
18+
def start_query(self,
19+
query,
20+
log_group_names,
21+
start_time=datetime(year=1970, month=1, day=1),
22+
end_time=datetime.utcnow(),
23+
limit=None):
24+
"""
25+
Run a query against AWS CloudWatchLogs Insights and wait the results
26+
https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_QuerySyntax.html
27+
28+
:param query: The query string to use.
29+
:param log_group_names: The list of log groups to be queried. You can include up to 20 log groups.
30+
:param start_time: The beginning of the time range to query (datetime.datetime object)
31+
:param end_time: The end of the time range to query (datetime.datetime object)
32+
:param limit: The maximum number of log events to return in the query.
33+
:return: Query ID
34+
"""
35+
start_timestamp = int(1000 * start_time.timestamp())
36+
end_timestamp = int(1000 * end_time.timestamp())
37+
logger.debug(f"start_timestamp: {start_timestamp}")
38+
logger.debug(f"end_timestamp: {end_timestamp}")
39+
args = {
40+
"logGroupNames": log_group_names,
41+
"startTime": start_timestamp,
42+
"endTime": end_timestamp,
43+
"queryString": query
44+
}
45+
if limit:
46+
args["limit"] = limit
47+
response = self._client_logs.start_query(**args)
48+
return response["queryId"]
49+
50+
def wait_query(self, query_id):
51+
"""
52+
Wait query ends
53+
54+
:param query_id: Query ID
55+
:return: Query results
56+
"""
57+
final_states = ["Complete", "Failed", "Cancelled"]
58+
response = self._client_logs.get_query_results(queryId=query_id)
59+
status = response["status"]
60+
while status not in final_states:
61+
sleep(QUERY_WAIT_POLLING_DELAY)
62+
response = self._client_logs.get_query_results(queryId=query_id)
63+
status = response["status"]
64+
logger.debug(f"status: {status}")
65+
if status == "Failed":
66+
raise QueryFailed(f"query ID: {query_id}")
67+
elif status == "Cancelled":
68+
raise QueryCancelled(f"query ID: {query_id}")
69+
return response
70+
71+
def query(self,
72+
query,
73+
log_group_names,
74+
start_time=datetime(year=1970, month=1, day=1),
75+
end_time=datetime.utcnow(),
76+
limit=None):
77+
"""
78+
Run a query against AWS CloudWatchLogs Insights and wait the results
79+
https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_QuerySyntax.html
80+
81+
:param query: The query string to use.
82+
:param log_group_names: The list of log groups to be queried. You can include up to 20 log groups.
83+
:param start_time: The beginning of the time range to query (datetime.datetime object)
84+
:param end_time: The end of the time range to query (datetime.datetime object)
85+
:param limit: The maximum number of log events to return in the query.
86+
:return: Results
87+
"""
88+
query_id = self.start_query(query=query,
89+
log_group_names=log_group_names,
90+
start_time=start_time,
91+
end_time=end_time,
92+
limit=limit)
93+
response = self.wait_query(query_id=query_id)
94+
return response["results"]

awswrangler/exceptions.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,11 @@ class InvalidRedshiftSortkey(Exception):
5656

5757
class EmptyDataframe(Exception):
5858
pass
59+
60+
61+
class QueryCancelled(Exception):
62+
pass
63+
64+
65+
class QueryFailed(Exception):
66+
pass

awswrangler/pandas.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from math import floor
55
import copy
66
import csv
7+
from datetime import datetime
78

89
import pandas
910
import pyarrow
@@ -833,3 +834,37 @@ def to_redshift(
833834
mode=mode,
834835
)
835836
self._session.s3.delete_objects(path=path)
837+
838+
def read_log_query(self,
839+
query,
840+
log_group_names,
841+
start_time=datetime(year=1970, month=1, day=1),
842+
end_time=datetime.utcnow(),
843+
limit=None):
844+
"""
845+
Run a query against AWS CloudWatchLogs Insights and convert the results to Pandas DataFrame
846+
847+
:param query: The query string to use. https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_QuerySyntax.html
848+
:param log_group_names: The list of log groups to be queried. You can include up to 20 log groups.
849+
:param start_time: The beginning of the time range to query (datetime.datetime object)
850+
:param end_time: The end of the time range to query (datetime.datetime object)
851+
:param limit: The maximum number of log events to return in the query. If the query string uses the fields command, only the specified fields and their values are returned.
852+
:return: Results as a Pandas DataFrame
853+
"""
854+
results = self._session.cloudwatchlogs.query(
855+
query=query,
856+
log_group_names=log_group_names,
857+
start_time=start_time,
858+
end_time=end_time,
859+
limit=limit)
860+
pre_df = []
861+
for row in results:
862+
new_row = {}
863+
for col in row:
864+
if col["field"].startswith("@"):
865+
col_name = col["field"].replace("@", "", 1)
866+
else:
867+
col_name = col["field"]
868+
new_row[col_name] = col["value"]
869+
pre_df.append(new_row)
870+
return pandas.DataFrame(pre_df)

awswrangler/session.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from awswrangler.s3 import S3
99
from awswrangler.athena import Athena
10+
from awswrangler.cloudwatchlogs import CloudWatchLogs
1011
from awswrangler.pandas import Pandas
1112
from awswrangler.glue import Glue
1213
from awswrangler.redshift import Redshift
@@ -88,6 +89,7 @@ def __init__(
8889
self._load_new_boto3_session()
8990
self._s3 = None
9091
self._athena = None
92+
self._cloudwatchlogs = None
9193
self._pandas = None
9294
self._glue = None
9395
self._redshift = None
@@ -202,6 +204,12 @@ def athena(self):
202204
self._athena = Athena(session=self)
203205
return self._athena
204206

207+
@property
208+
def cloudwatchlogs(self):
209+
if not self._cloudwatchlogs:
210+
self._cloudwatchlogs = CloudWatchLogs(session=self)
211+
return self._cloudwatchlogs
212+
205213
@property
206214
def pandas(self):
207215
if not self._pandas:

building/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM lambci/lambda:build-python3.6
1+
FROM lambci/lambda:build-python3.7
22

33
RUN pip install --upgrade pip
44

0 commit comments

Comments
 (0)