An AWS Athena backend for pySigma that converts Sigma detection rules into Athena-compatible SQL queries.
While designed for AWS Athena, it likely works with any Trino-based SQL engine.
Most standard Sigma rule features are supported, including logical and comparison operators, wildcards, regular expressions, CIDR matching, field reference matching, existence checks and case-sensitive matching. A handful of operations—like full-text value-only expressions and distinct-count window functions—aren't implemented; see the Limitations section below.
- Full-text searches (value-only expressions) are not supported.
- At present, only the
event_count
correlation rule type is supported.
From the CLI:
sigma convert -t athena rules/cloud/aws/cloudtrail/
From Python:
from sigma.backends.athena import athenaBackend
from sigma.collection import SigmaCollection
# Initialize the backend, optionally specifying table, fields, and time field
backend = athenaBackend()
# Load your Sigma rule collection
collection = SigmaCollection.from_yaml("...")
# Convert to Athena SQL
queries = backend.convert(collection)
for q in queries:
print(q)
There are a few ways of setting the table name that will appear in the SQL.
If all of your rules use the same table name, regardless of the logsource
set, you can pass a backend-option of table
.
For example:
athena_backend = athenaBackend(table="events")
Will result in:
SELECT * FROM events WHERE ...
If you need to set your table name based on the logsource
seen, you can use a pipeline. The backend will look for a
processing state value of table_name
and, if set, will use this table name.
For example:
name: AWS Security Lake Table Mapping
priority: 100
vars:
region: eu_west_2
transformations:
- id: cloudtrail_mgmt_table
type: set_state
key: table_name
val: "amazon_security_lake_table_{region}_cloud_trail_mgmt_2_0"
rule_conditions:
- type: logsource
product: aws
service: cloudtrail
If you are using this backend with AWS Security Lake, a helper plugin is also provided for setting the correct table name.
For example, the following will map an AWS CloudTrail source to amazon_security_lake_table_eu_west_2_cloud_trail_mgmt_2_0
.
from sigma.pipelines.athena import athena_pipeline_security_lake_table_name
from sigma.backends.athena import athenaBackend
from sigma.collection import SigmaCollection
pipeline = athena_pipeline_security_lake_table_name()
backend = athenaBackend(
aws_table_region="eu-west-2", # Replace with your AWS region
processing_pipeline=pipeline,
)
rules = SigmaCollection.load_ruleset("path/to/rules/")
queries = backend.convert(rules)
This supports the following log sources:
- product:
aws
, service:cloudtrail
- product:
aws
, service:cloudtrail_s3
- product:
aws
, service:cloudtrail_lambda
- product:
aws
, service:route53
- product:
aws
, service:security_hub
- product:
aws
, service:vpc_flow_logs
- product:
aws
, service:waf
- product:
aws
, service:eks_audit
You can also use this pipeline from the CLI:
sigma convert -t athena -O aws_table_region=eu-west-2 -p athena_security_lake_table_name rules/cloud/aws/cloudtrail/
The backend supports the fields attribute in Sigma rules.
If a Sigma rule specifies a list of fields, the generated Athena SQL query will SELECT only those specified fields instead of using SELECT *
.
This is useful for optimising queries by retrieving only necessary data.
For example, if your Sigma rule includes:
fields:
- uid
- time_dt
- process.command_line as command_line
The generated query will start with:
SELECT uid, time_dt, process.command_line as command_line FROM ...
If no fields
are specified in the rule, the query defaults to SELECT *
.
This feature is automatically applied when converting rules with the backend.
The Athena backend currently supports event_count correlation rules. Other types (value_count
, temporal
, temporal_ordered
) are not implemented and will raise NotImplementedError
.
- Type:
event_count
- Parameters:
rules
: List of Sigma rules to correlategroup-by
: Fields to partition bytimespan
: Window duration (e.g.15m
,1h
)condition
: Comparison operator and threshold (e.g.gte: 10
)
Generates a sliding time window-function query that counts matching events within the specified timeframe.
title: Windows Failed Logon
id: dc9f1a2e-7d3b-4f1c-b8f4-1e23eadf4567
status: stable
logsource:
product: windows
service: security
detection:
selection:
EventID: 4625
condition: selection
---
title: Possible Brute-Force Attack via Repeated Logon Failures
id: e8b2c3d4-5f6a-7b8c-9d0e-f1a2b3c4d5e6
status: stable
description: Detects more than 5 failed Windows logon attempts from the same source IP within 10 minutes.
correlation:
type: event_count
rules:
- dc9f1a2e-7d3b-4f1c-b8f4-1e23eadf4567
group-by:
- SourceIp
timespan: 10m
condition:
gte: 5
Gives:
WITH combined_events AS (
SELECT *
FROM <TABLE>
WHERE EventID = 4625
),
event_counts AS (
SELECT *,
COUNT(*) OVER (
PARTITION BY SourceIp
ORDER BY time
RANGE BETWEEN INTERVAL '600' SECOND PRECEDING AND CURRENT ROW
) AS correlation_event_count
FROM combined_events
)
SELECT *
FROM event_counts
WHERE correlation_event_count >= 5;
By default, the time
column is used for the rule's sliding time window. This can be changed by setting the
time_field
backend-option.
For example:
athena_backend = athenaBackend(time_field="time_dt")
Run the test suite:
pytest
Licensed under the MIT License.