Skip to content

Commit 6ecf95d

Browse files
committed
v0.3.1 greynet API refactor
1 parent ff466b7 commit 6ecf95d

14 files changed

+122
-113
lines changed

examples/greynet/greynet_example_aggregations.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ class SalesTransaction:
1616
@builder.constraint("Sales Regional Analysis")
1717
def sales_analysis():
1818
return (
19-
builder.from_facts(SalesTransaction)
19+
builder.for_each(SalesTransaction)
2020
.group_by(
2121
lambda tx: tx.region,
2222
Collectors.compose({

examples/greynet/greynet_example_all_collectors.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def count_collector_example():
5151
"""Demonstrates: CountCollector
5252
Counts the number of sales transactions for each product. Penalizes if a product has more than 3 sales.
5353
"""
54-
return (cb.from_facts(Sale)
54+
return (cb.for_each(Sale)
5555
.group_by(lambda s: s.product_id, Collectors.count())
5656
.filter(lambda product_id, count: count > 3)
5757
.penalize_simple(lambda product_id, count: count)
@@ -62,7 +62,7 @@ def sum_collector_example():
6262
"""Demonstrates: SumCollector
6363
Calculates the total revenue (price * quantity) for each product.
6464
"""
65-
return (cb.from_facts(Sale)
65+
return (cb.for_each(Sale)
6666
.group_by(lambda s: s.product_id, Collectors.sum(lambda s: s.price * s.quantity))
6767
.filter(lambda product_id, total_revenue: total_revenue > 0)
6868
.penalize_simple(lambda product_id, total_revenue: 0) # Use penalty 0 to just report
@@ -73,7 +73,7 @@ def min_max_avg_collectors_example():
7373
"""Demonstrates: MinCollector, MaxCollector, AvgCollector
7474
Finds the minimum, maximum, and average sale price for each product.
7575
"""
76-
return (cb.from_facts(Sale)
76+
return (cb.for_each(Sale)
7777
.group_by(lambda s: s.product_id, Collectors.compose({
7878
"min_price": Collectors.min(lambda s: s.price),
7979
"max_price": Collectors.max(lambda s: s.price),
@@ -88,7 +88,7 @@ def stddev_variance_collectors_example():
8888
"""Demonstrates: StdDevCollector, VarianceCollector
8989
Calculates the standard deviation and variance of prices for each product.
9090
"""
91-
return (cb.from_facts(Sale)
91+
return (cb.for_each(Sale)
9292
.group_by(lambda s: s.product_id, Collectors.compose({
9393
"price_stddev": Collectors.stddev(lambda s: s.price),
9494
"price_variance": Collectors.variance(lambda s: s.price)
@@ -102,7 +102,7 @@ def list_collector_example():
102102
"""Demonstrates: ListCollector
103103
Collects all `Sale` objects for each product into a list.
104104
"""
105-
return (cb.from_facts(Sale)
105+
return (cb.for_each(Sale)
106106
.group_by(lambda s: s.product_id, Collectors.to_list())
107107
.filter(lambda product_id, sales_list: len(sales_list) > 0)
108108
.penalize_simple(lambda product_id, sales_list: 0) # Reporting only
@@ -113,7 +113,7 @@ def set_collector_example():
113113
"""Demonstrates: SetCollector and MappingCollector
114114
Collects the unique set of customer IDs for each product.
115115
"""
116-
return (cb.from_facts(Sale)
116+
return (cb.for_each(Sale)
117117
.group_by(
118118
lambda s: s.product_id,
119119
Collectors.mapping(
@@ -130,7 +130,7 @@ def distinct_collector_example():
130130
"""Demonstrates: DistinctCollector
131131
Collects a list of unique customer IDs for each product, preserving insertion order.
132132
"""
133-
return (cb.from_facts(Sale)
133+
return (cb.for_each(Sale)
134134
.group_by(
135135
lambda s: s.product_id,
136136
Collectors.mapping(
@@ -147,7 +147,7 @@ def filtering_collector_example():
147147
"""Demonstrates: FilteringCollector
148148
Counts only the sales where the quantity is greater than 2.
149149
"""
150-
return (cb.from_facts(Sale)
150+
return (cb.for_each(Sale)
151151
.group_by(
152152
lambda s: s.product_id,
153153
Collectors.filtering(
@@ -164,7 +164,7 @@ def consecutive_sequences_collector_example():
164164
"""Demonstrates: consecutive_sequences
165165
Finds consecutive sequences of shipment numbers for each order.
166166
"""
167-
return (cb.from_facts(Shipment)
167+
return (cb.for_each(Shipment)
168168
.group_by(
169169
lambda s: s.order_id,
170170
Collectors.consecutive_sequences(lambda s: s.shipment_no)
@@ -178,7 +178,7 @@ def connected_ranges_collector_example():
178178
"""Demonstrates: connected_ranges
179179
Finds groups of overlapping or adjacent maintenance windows for each machine.
180180
"""
181-
return (cb.from_facts(Maintenance)
181+
return (cb.for_each(Maintenance)
182182
.group_by(
183183
lambda m: m.machine_id,
184184
Collectors.connected_ranges(
@@ -205,7 +205,7 @@ def get_window_key(timestamp: datetime) -> datetime:
205205
window_start_ts = epoch.timestamp() + window_index * window_size_sec
206206
return datetime.fromtimestamp(window_start_ts, tz=timezone.utc)
207207

208-
return (cb.from_facts(UserEvent)
208+
return (cb.for_each(UserEvent)
209209
.group_by(
210210
group_key_function=lambda e: get_window_key(e.timestamp),
211211
collector_supplier=Collectors.avg(lambda e: e.value)

examples/greynet/greynet_example_blom_filter.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ class UserBan:
2020
@builder.constraint("login_not_banned")
2121
def login_not_banned():
2222
# Start from UserLogin facts
23-
login_stream = builder.from_facts(UserLogin)
23+
login_stream = builder.for_each(UserLogin)
2424
# Join with UserBan facts, using NOT_EQUAL on user_id
2525
# This will use the CountingBloomFilter under the hood
2626
not_banned_stream = login_stream.join(
27-
builder.from_facts(UserBan),
27+
builder.for_each(UserBan),
2828
JoinerType.NOT_EQUAL,
2929
left_key_func=lambda login: login.user_id,
3030
right_key_func=lambda ban: ban.user_id

examples/greynet/greynet_example_composite_keys.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ def room_double_booking():
5656
at the exact same time. This is a classic composite key equality join.
5757
The composite key is (room_id, start_time).
5858
"""
59-
appointments = builder.from_facts(Appointment)
59+
appointments = builder.for_each(Appointment)
6060
return (
6161
appointments.join(
6262
appointments,
@@ -78,10 +78,10 @@ def resource_allocation_mismatch():
7878
This demonstrates a multi-stage join process, followed by filtering.
7979
"""
8080
# Create streams for all our base facts
81-
appointments = builder.from_facts(Appointment)
82-
doctors = builder.from_facts(Doctor)
83-
rooms = builder.from_facts(Room)
84-
procedures = builder.from_facts(ProcedureInfo)
81+
appointments = builder.for_each(Appointment)
82+
doctors = builder.for_each(Doctor)
83+
rooms = builder.for_each(Room)
84+
procedures = builder.for_each(ProcedureInfo)
8585

8686
# Stage 1: Join Appointments with Doctors
8787
app_with_doc = appointments.join(
@@ -125,7 +125,7 @@ def scheduling_priority_inversion():
125125
case on the same day. This is achieved by first grouping appointments by
126126
doctor and day, and then filtering for priority/time inversions.
127127
"""
128-
appointments = builder.from_facts(Appointment)
128+
appointments = builder.for_each(Appointment)
129129

130130
# Define the composite key for grouping: (doctor_id, date)
131131
def get_grouping_key(app: Appointment) -> tuple:

examples/greynet/greynet_example_core.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ class CompanyPolicy:
4242
@builder.constraint("Required skill missing")
4343
def required_skill_missing():
4444
return (
45-
builder.from_facts(Shift)
45+
builder.for_each(Shift)
4646
.join(
47-
builder.from_facts(Employee),
47+
builder.for_each(Employee),
4848
JoinerType.EQUAL,
4949
left_key_func=lambda shift: shift.employee_name,
5050
right_key_func=lambda employee: employee.name
@@ -57,7 +57,7 @@ def required_skill_missing():
5757
@builder.constraint("Shift for non-existent employee")
5858
def shift_for_non_existent_employee():
5959
return (
60-
builder.from_facts(Shift)
60+
builder.for_each(Shift)
6161
.if_not_exists(
6262
Employee,
6363
# FIX: Removed JoinerType.EQUAL, which was an invalid argument.
@@ -71,9 +71,9 @@ def shift_for_non_existent_employee():
7171
@builder.constraint("Scheduled on unavailable day")
7272
def scheduled_on_unavailable_day():
7373
return (
74-
builder.from_facts(Shift)
74+
builder.for_each(Shift)
7575
.join(
76-
builder.from_facts(Employee),
76+
builder.for_each(Employee),
7777
JoinerType.EQUAL,
7878
left_key_func=lambda shift: shift.employee_name,
7979
right_key_func=lambda employee: employee.name
@@ -86,9 +86,9 @@ def scheduled_on_unavailable_day():
8686
@builder.constraint("Overlapping shifts")
8787
def overlapping_shifts():
8888
return (
89-
builder.from_facts(Shift)
89+
builder.for_each(Shift)
9090
.join(
91-
builder.from_facts(Shift),
91+
builder.for_each(Shift),
9292
JoinerType.EQUAL,
9393
left_key_func=lambda s: (s.employee_name, s.shift_date),
9494
right_key_func=lambda s: (s.employee_name, s.shift_date)
@@ -108,7 +108,7 @@ def max_consecutive_days():
108108
)
109109

110110
return (
111-
builder.from_facts(Shift)
111+
builder.for_each(Shift)
112112
.group_by(
113113
lambda shift: shift.employee_name,
114114
Collectors.consecutive_sequences(
@@ -117,7 +117,7 @@ def max_consecutive_days():
117117
)
118118
)
119119
.join(
120-
builder.from_facts(CompanyPolicy),
120+
builder.for_each(CompanyPolicy),
121121
JoinerType.GREATER_THAN, # A dummy join to bring the policy into the stream
122122
left_key_func=lambda name, sequences: 1,
123123
right_key_func=lambda policy: 0

examples/greynet/greynet_example_node_sharing.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ def create_rules(builder: ConstraintBuilder, num_rules: int, with_sharing: bool)
5656
# All rules share the exact same filter condition.
5757
@builder.constraint(constraint_id, default_weight=1.0)
5858
def shared_rule():
59-
return (builder.from_facts(Transaction)
59+
return (builder.for_each(Transaction)
6060
.filter(lambda t: t.amount > 5000)
6161
.penalize_simple(lambda t: t.amount))
6262
else:
@@ -68,7 +68,7 @@ def create_unique_rule(rule_index):
6868

6969
@builder.constraint(f"high_value_tx_unique_{rule_index}", default_weight=1.0)
7070
def unique_rule():
71-
return (builder.from_facts(Transaction)
71+
return (builder.for_each(Transaction)
7272
.filter(unique_filter) # Use the unique filter object
7373
.penalize_simple(lambda t: t.amount))
7474
return unique_rule

examples/greynet/greynet_example_stress_1.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,23 +38,23 @@ def define_constraints(builder: ConstraintBuilder):
3838
# Constraint 1: Simple filter for high-value transactions.
3939
@builder.constraint("high_value_transaction")
4040
def high_value_transaction():
41-
return builder.from_facts(Transaction)\
41+
return builder.for_each(Transaction)\
4242
.filter(lambda tx: tx.amount > 45000)\
4343
.penalize_simple(lambda tx: tx.amount / 1000)
4444

4545
# Constraint 2: Group transactions by customer and check for excessive activity.
4646
@builder.constraint("excessive_transactions_per_customer")
4747
def excessive_transactions():
48-
return builder.from_facts(Transaction)\
48+
return builder.for_each(Transaction)\
4949
.group_by(lambda tx: tx.customer_id, Collectors.count())\
5050
.filter(lambda cid, count: count > 25)\
5151
.penalize_simple(lambda cid, count: (count - 25) * 10)
5252

5353
# Constraint 3: Join transactions with security alerts on location.
5454
@builder.constraint("transaction_in_alerted_location")
5555
def suspicious_location_tx():
56-
return builder.from_facts(Transaction)\
57-
.join(builder.from_facts(SecurityAlert),
56+
return builder.for_each(Transaction)\
57+
.join(builder.for_each(SecurityAlert),
5858
JoinerType.EQUAL,
5959
lambda tx: tx.location,
6060
lambda alert: alert.location)\
@@ -63,9 +63,9 @@ def suspicious_location_tx():
6363
# Constraint 4: Join to find transactions from inactive customers.
6464
@builder.constraint("inactive_customer_transaction")
6565
def inactive_customer_tx():
66-
return builder.from_facts(Customer)\
66+
return builder.for_each(Customer)\
6767
.filter(lambda c: c.status == 'inactive')\
68-
.join(builder.from_facts(Transaction),
68+
.join(builder.for_each(Transaction),
6969
JoinerType.EQUAL,
7070
lambda c: c.id,
7171
lambda tx: tx.customer_id)\
@@ -76,9 +76,9 @@ def inactive_customer_tx():
7676
# that does NOT have a security alert.
7777
@builder.constraint("high_risk_transaction_without_alert")
7878
def high_risk_no_alert():
79-
return builder.from_facts(Customer)\
79+
return builder.for_each(Customer)\
8080
.filter(lambda c: c.risk_level == 'high')\
81-
.join(builder.from_facts(Transaction),
81+
.join(builder.for_each(Transaction),
8282
JoinerType.EQUAL,
8383
lambda c: c.id,
8484
lambda tx: tx.customer_id)\

examples/greynet/greynet_example_stress_2.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ def define_advanced_constraints(builder: ConstraintBuilder):
114114
# --- Rule 1: Market Manipulation ("Painting the Tape") ---
115115
@builder.constraint("market_manipulation_painting_the_tape")
116116
def painting_the_tape():
117-
return builder.from_facts(Trade) \
117+
return builder.for_each(Trade) \
118118
.group_by(
119119
lambda trade: (trade.trader_id, trade.ticker),
120120
Collectors.to_list()
@@ -128,7 +128,7 @@ def painting_the_tape():
128128
# --- Rule 2: Exceeding Daily Risk Limits ---
129129
@builder.constraint("trader_exceeds_daily_risk_limit")
130130
def trader_risk_limit():
131-
daily_trade_volumes = builder.from_facts(Trade) \
131+
daily_trade_volumes = builder.for_each(Trade) \
132132
.group_by(
133133
lambda trade: (trade.trader_id, trade.timestamp.date()),
134134
Collectors.compose({
@@ -141,7 +141,7 @@ def trader_risk_limit():
141141
# and join the Trader facts to it. This uses a different node
142142
# that preserves all necessary data.
143143
return daily_trade_volumes \
144-
.join(builder.from_facts(Trader),
144+
.join(builder.for_each(Trader),
145145
JoinerType.EQUAL,
146146
# Left key func: extracts trader_id from the group key
147147
lambda key, result: key[0],
@@ -161,15 +161,15 @@ def trader_risk_limit():
161161
# --- Rule 3: Potential Insider Trading ---
162162
@builder.constraint("potential_insider_trading")
163163
def insider_trading():
164-
insider_trades = builder.from_facts(Trader)\
164+
insider_trades = builder.for_each(Trader)\
165165
.filter(lambda trader: trader.is_insider)\
166-
.join(builder.from_facts(Trade),
166+
.join(builder.for_each(Trade),
167167
JoinerType.EQUAL,
168168
lambda trader: trader.id,
169169
lambda trade: trade.trader_id
170170
)
171171

172-
return insider_trades.join(builder.from_facts(MarketNews),
172+
return insider_trades.join(builder.for_each(MarketNews),
173173
JoinerType.EQUAL,
174174
lambda trader, trade: trade.ticker,
175175
lambda news: news.related_tickers[0] if news.related_tickers else None
@@ -182,9 +182,9 @@ def insider_trading():
182182
# --- Rule 4: Restricted Security Trading Bursts ---
183183
@builder.constraint("restricted_security_trading_burst")
184184
def restricted_trading_burst():
185-
restricted_trades = builder.from_facts(Trade) \
185+
restricted_trades = builder.for_each(Trade) \
186186
.if_exists(
187-
builder.from_facts(Security).filter(lambda s: s.is_restricted),
187+
builder.for_each(Security).filter(lambda s: s.is_restricted),
188188
lambda trade: trade.ticker,
189189
lambda sec: sec.ticker
190190
)

examples/greynet/greynet_example_subqueries.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,16 +57,16 @@ class TrainingCompletion:
5757
# ---
5858
@builder.constraint("Missing Invoice for VIP Project")
5959
def missing_invoice_for_vip_project():
60-
active_assignments = builder.from_facts(Assignment).filter(lambda a: a.status == 'Active')
60+
active_assignments = builder.for_each(Assignment).filter(lambda a: a.status == 'Active')
6161

6262
vip_projects = (
6363
active_assignments.join(
64-
builder.from_facts(Project),
64+
builder.for_each(Project),
6565
JoinerType.EQUAL,
6666
left_key_func=lambda a: a.project_id,
6767
right_key_func=lambda p: p.project_id
6868
).join(
69-
builder.from_facts(Client),
69+
builder.for_each(Client),
7070
JoinerType.EQUAL,
7171
left_key_func=lambda a, p: p.client_id,
7272
right_key_func=lambda c: c.client_id
@@ -86,13 +86,13 @@ def missing_invoice_for_vip_project():
8686
# ---
8787
@builder.constraint("Training Dead-End Assignment")
8888
def training_dead_end():
89-
assignments = builder.from_facts(Assignment).join(
90-
builder.from_facts(Employee),
89+
assignments = builder.for_each(Assignment).join(
90+
builder.for_each(Employee),
9191
JoinerType.EQUAL,
9292
lambda a: a.employee_id,
9393
lambda e: e.employee_id
9494
).join(
95-
builder.from_facts(Project),
95+
builder.for_each(Project),
9696
JoinerType.EQUAL,
9797
lambda a, e: a.project_id,
9898
lambda p: p.project_id
@@ -117,10 +117,10 @@ def training_dead_end():
117117
@builder.constraint("Bonus Flag: VIP Project Completion by Trained Employee")
118118
def bonus_for_vip_completion():
119119
completed_vip_assignments = (
120-
builder.from_facts(Assignment)
120+
builder.for_each(Assignment)
121121
.filter(lambda a: a.status == 'Completed')
122-
.join(builder.from_facts(Project), JoinerType.EQUAL, lambda a: a.project_id, lambda p: p.project_id)
123-
.join(builder.from_facts(Client), JoinerType.EQUAL, lambda a, p: p.client_id, lambda c: c.client_id)
122+
.join(builder.for_each(Project), JoinerType.EQUAL, lambda a: a.project_id, lambda p: p.project_id)
123+
.join(builder.for_each(Client), JoinerType.EQUAL, lambda a, p: p.client_id, lambda c: c.client_id)
124124
.filter(lambda a, p, c: c.status == 'VIP')
125125
)
126126

0 commit comments

Comments
 (0)