Skip to content

Commit c001360

Browse files
authored
Merge pull request #2 from MikPisula/latest
chore: Update Python typing
2 parents c4126b5 + 9ccf3af commit c001360

File tree

15 files changed

+72
-73
lines changed

15 files changed

+72
-73
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ src/*.egg-info
55
dist/
66
.idea/
77
.vscode/
8+
__pycache__
89

910
testdata/custom_*
1011
scripts/cli_custom.sh

src/firewall_test/plugins/system/abstract_rule_match.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@ class RuleMatchResult:
1111
def __init__(
1212
self,
1313
matched: bool,
14-
action: (type[RuleAction], None),
15-
target_chain_name: (str, None),
16-
target_nat_ip: ((IPv4Address, IPv6Address), None),
17-
target_nat_port: (int, None),
14+
action: type[RuleAction]|None,
15+
target_chain_name: str|None,
16+
target_nat_ip: IPv4Address|IPv6Address|None,
17+
target_nat_port: int|None,
1818
):
1919
self.matched = matched
2020
self.action = action

src/firewall_test/plugins/system/firewall_netfilter.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@
22
from plugins.system.abstract_rule_match import RuleMatcher, RuleMatchResult
33
from plugins.translate.abstract import Rule
44
from plugins.translate.netfilter.parse import NftRule
5-
from simulator.packet import PacketIP, PacketTCPUDP, PacketICMP
5+
from simulator.packet import PacketIP, PacketTCPUDP
66
from utils.logger import log_debug, log_warn
77

88
# todo: add explicit match-tests
99

1010

1111
# pylint: disable=R0912
1212
class RuleMatcherNetfilter(RuleMatcher):
13-
def matches(self, packet: (PacketIP, PacketTCPUDP, PacketICMP), rule: Rule) -> RuleMatchResult:
13+
def matches(self, packet: PacketIP, rule: Rule) -> RuleMatchResult:
1414
"""
1515
:param packet: Packet to match
1616
:param rule: Rule to check

src/firewall_test/plugins/system/firewall_opnsense.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from config import ProtoL3IP4IP6, RuleActionGoTo
2-
from simulator.packet import PACKET_KINDS, PacketTCPUDP
2+
from simulator.packet import PacketIP, PacketTCPUDP
33
from plugins.translate.abstract import Rule
44
from plugins.system.abstract_rule_match import RuleMatcher, RuleMatchResult
55
from plugins.translate.opnsense.rule import OPNsenseRule, RULE_SEQUENCE_NEXT_CHAIN
@@ -10,7 +10,7 @@
1010

1111
# pylint: disable=R0912
1212
class RuleMatcherOPNsense(RuleMatcher):
13-
def matches(self, packet: PACKET_KINDS, rule: Rule) -> RuleMatchResult:
13+
def matches(self, packet: PacketIP, rule: Rule) -> RuleMatchResult:
1414
"""
1515
:param packet: Packet to match
1616
:param rule: Rule to check

src/firewall_test/plugins/translate/abstract.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@ def __init__(self, raw: any):
1414
self.raw = raw
1515

1616
@abstractmethod
17-
def get(self) -> (dict, list[dict]):
17+
def get(self) -> dict|list[dict]:
1818
pass
1919

2020

2121
class TranslateOutput(ABC):
2222
@abstractmethod
23-
def dump(self) -> (dict, list[dict]):
23+
def dump(self) -> dict|list[dict]:
2424
pass
2525

2626
@abstractmethod
@@ -304,12 +304,12 @@ class Chain(TranslateOutput):
304304

305305
# pylint: disable=W0622
306306
def __init__(
307-
self, name: str, hook: (str, None), policy: (None, RuleActionAccept, RuleActionDrop, RuleActionReject),
307+
self, name: str, hook: str|None, policy: RuleActionAccept|RuleActionDrop|RuleActionReject|None,
308308
rules: list[Rule], priority: int = 0, type: str = 'filter', family: type[ProtoL3] = ProtoL3IP4IP6,
309309
):
310310
self.name = name
311311
self.type = type
312-
self.family: type[ProtoL3] = family
312+
self.family = family
313313
self.hook = hook
314314
self.priority = priority
315315
self.policy = policy
@@ -388,7 +388,7 @@ def __init__(
388388
self.type = type
389389
self.priority = priority
390390
self.chains = chains
391-
self.family: type[ProtoL3] = family
391+
self.family = family
392392

393393
def dump(self) -> dict:
394394
return {

src/firewall_test/plugins/translate/netfilter/elements.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ class NftMatch:
9494
OP_GT = '>'
9595
OP_LT = '<'
9696

97-
def __init__(self, operator: str, left: (str, dict), right: (str, dict, list)):
97+
def __init__(self, operator: str, left: str|dict, right: str|dict|list):
9898
self.operator = operator
9999
self._left = left
100100

@@ -270,7 +270,7 @@ def update_value_type(self):
270270

271271
self.value = values
272272

273-
def get_matches(self) -> (dict, None):
273+
def get_matches(self) -> dict|None:
274274
matches = {}
275275
if self.match_proto_l3:
276276
if self.value_proto_l3:

src/firewall_test/plugins/translate/opnsense/rule.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from ipaddress import IPv4Network, IPv6Network
22

3-
from config import ProtoL3IP4IP6, PROTOS_L3, PROTOS_L4
3+
from config import ProtoL3IP4IP6, ProtoL3, ProtoL4
44
from utils.logger import rule_repr
55

66
# pylint: disable=R0801
@@ -24,8 +24,8 @@ def __init__(
2424
nis: list[str] = None,
2525
ni_direction: str = None,
2626
desc: str = None,
27-
ipprotocol: PROTOS_L3 = ProtoL3IP4IP6,
28-
protocol: PROTOS_L4 = None,
27+
ipprotocol: ProtoL3 = ProtoL3IP4IP6,
28+
protocol: ProtoL4 = None,
2929
source: list[(IPv4Network, IPv6Network)] = None,
3030
destination: list[(IPv4Network, IPv6Network)] = None,
3131
source_port: list[int] = None,
@@ -85,7 +85,7 @@ def match_ip_daddr(self) -> bool:
8585

8686
return False
8787

88-
def get_matches(self) -> (dict, None):
88+
def get_matches(self) -> dict|None:
8989
matches = {}
9090
if self.ipp is not None:
9191
matches['proto_l3'] = self.ipp.N

src/firewall_test/plugins/translate/opnsense/ruleset.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from ipaddress import ip_network, summarize_address_range, ip_address
33
import xml.etree.ElementTree as ET
44
from xml.etree.ElementTree import Element
5+
from typing import Any
56

67
from requests import get as http_get
78
from requests import Response as HttpResponse
@@ -167,7 +168,7 @@ def get(self) -> Ruleset:
167168

168169
### RULES ###
169170

170-
def _parse_rule_address(self, value: str) -> (list, None):
171+
def _parse_rule_address(self, value: str) -> list|None:
171172
values = split_csv(value)
172173
out = []
173174

@@ -197,7 +198,7 @@ def _parse_rule_address(self, value: str) -> (list, None):
197198

198199
return out
199200

200-
def _parse_rule_network(self, value: str) -> (list, None):
201+
def _parse_rule_network(self, value: str) -> list|None:
201202
values = split_csv(value)
202203
out = []
203204

@@ -227,7 +228,7 @@ def _parse_rule_network(self, value: str) -> (list, None):
227228

228229
return out
229230

230-
def _parse_rule_port(self, value: str) -> (list, None):
231+
def _parse_rule_port(self, value: str) -> list|None:
231232
values = split_csv(value)
232233
out = []
233234

@@ -252,7 +253,7 @@ def _parse_rule_port(self, value: str) -> (list, None):
252253
return out
253254

254255
@staticmethod
255-
def log_unsupported_rule(chain: Chain, rule_raw: dict, rule: dict, result: (any, None), invalid: bool) -> bool:
256+
def log_unsupported_rule(chain: Chain, rule_raw: dict, rule: dict, result: Any, invalid: bool) -> bool:
256257
if invalid:
257258
return invalid
258259

@@ -592,7 +593,7 @@ def _parse_alias_iplist_plain(self, url: str) -> (list[str], None):
592593
return content
593594

594595
@staticmethod
595-
def _download_alias_iplist(url: str) -> (HttpResponse, None):
596+
def _download_alias_iplist(url: str) -> HttpResponse|None:
596597
url = url.strip()
597598
if not url.startswith('http'):
598599
log_warn('Firewall Plugin', f'Unsupported alias-type "urltable" URL: {url}')

src/firewall_test/simulator/firewall.py

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from plugins.system.abstract import FirewallSystem
88
from plugins.system.abstract_rule_match import RuleMatchResult
99
from plugins.translate.abstract import Ruleset, Table, Chain, Rule
10-
from simulator.packet import PACKET_KINDS, PacketTCPUDP
10+
from simulator.packet import PacketIP, PacketTCPUDP
1111
from utils.logger import log_debug, log_info, log_warn
1212

1313

@@ -16,7 +16,7 @@ def __init__(self, fw, run_tables):
1616
self._fw = fw
1717
self._run_tables = run_tables
1818

19-
def _get_chain_by_name_and_family(self, packet: PACKET_KINDS, table: Table, name: str, family: str) -> (Chain, None):
19+
def _get_chain_by_name_and_family(self, packet: PacketIP, table: Table, name: str, family: str) -> Chain|None:
2020
for t in self._run_tables.get_tables(packet):
2121
if t.name != table.name or t.family != table.family or t.priority != table.priority:
2222
continue
@@ -41,7 +41,7 @@ def _log_match(self, chain: Chain, rule: Rule, debug: bool = False):
4141
log_info(label='Firewall', v1=msg, v2=v2)
4242

4343
# pylint: disable=R0911,R0912
44-
def process(self, chain: Chain, packet: PACKET_KINDS) -> (bool, (Rule, None)):
44+
def process(self, chain: Chain, packet: PacketIP) -> tuple[bool, Rule|None]:
4545
"""
4646
:param chain: Firewall chain to process; if any rule has an action that targets another chain - it will also be processed
4747
:param packet: Packet to match
@@ -52,8 +52,8 @@ def process(self, chain: Chain, packet: PACKET_KINDS) -> (bool, (Rule, None)):
5252

5353
rule_matcher = self._fw.system.get_rule_matcher()(chain.run_table)
5454
chain.rules.sort(key=lambda r: r.seq, reverse=False)
55-
lazy_action: (None, RuleAction) = None
56-
lazy_rule: (None, Rule) = None
55+
lazy_action: RuleAction|None = None
56+
lazy_rule: Rule|None = None
5757

5858
for rule in chain.rules:
5959
result: RuleMatchResult = rule_matcher.matches(packet=packet, rule=rule)
@@ -162,7 +162,7 @@ def __init__(self, fw):
162162
self._run_chains = RunFirewallChain(fw=fw, run_tables=self)
163163

164164
@staticmethod
165-
def _is_matching_table(packet: PACKET_KINDS, table: Table, ignore_type: list[str] = None) -> bool:
165+
def _is_matching_table(packet: PacketIP, table: Table, ignore_type: list[str] = None) -> bool:
166166
if ignore_type is not None and table.type in ignore_type:
167167
return False
168168

@@ -177,14 +177,14 @@ def _is_matching_table(packet: PACKET_KINDS, table: Table, ignore_type: list[str
177177

178178
return False
179179

180-
def get_tables(self, packet: PACKET_KINDS, ignore_type: list[str] = None) -> list[Table]:
180+
def get_tables(self, packet: PacketIP, ignore_type: list[str] = None) -> list[Table]:
181181
return [
182182
t for t in self._fw.ruleset.tables
183183
if self._is_matching_table(packet=packet, table=t, ignore_type=ignore_type)
184184
]
185185

186186
@staticmethod
187-
def _is_matching_chain(packet: PACKET_KINDS, chain: Chain, ignore_type: list[str] = None) -> bool:
187+
def _is_matching_chain(packet: PacketIP, chain: Chain, ignore_type: list[str] = None) -> bool:
188188
if ignore_type is not None and chain.type in ignore_type:
189189
return False
190190

@@ -200,7 +200,7 @@ def _is_matching_chain(packet: PACKET_KINDS, chain: Chain, ignore_type: list[str
200200

201201
return False
202202

203-
def get_chains(self, packet: PACKET_KINDS, table: Table, ignore_type: list[str] = None):
203+
def get_chains(self, packet: PacketIP, table: Table, ignore_type: list[str] = None):
204204
return [
205205
c for c in table.chains
206206
if self._is_matching_chain(packet=packet, chain=c, ignore_type=ignore_type)
@@ -283,8 +283,8 @@ def _inherit_table_priority_to_chain(table: Table, chain: Chain):
283283
chain.priority = table.priority
284284

285285
def _process_by_table_prio(
286-
self, tables: list[Table], callback_chain_filter: Callable[[Chain], bool], packet: PACKET_KINDS,
287-
) -> (bool, (Rule, None)):
286+
self, tables: list[Table], callback_chain_filter: Callable[[Chain], bool], packet: PacketIP,
287+
) -> tuple[bool, Rule|None]:
288288
for table in self._sort_tables_by_priority(tables):
289289
chains = [
290290
c for c in self.get_chains(packet=packet, table=table)
@@ -300,8 +300,8 @@ def _process_by_table_prio(
300300
return True, None
301301

302302
def _process_by_chain_prio(
303-
self, tables: list[Table], callback_chain_filter: Callable[[Chain], bool], packet: PACKET_KINDS,
304-
) -> (bool, (Rule, None)):
303+
self, tables: list[Table], callback_chain_filter: Callable[[Chain], bool], packet: PacketIP,
304+
) -> tuple[bool, Rule|None]:
305305
chains: list[Chain] = []
306306
for table in tables:
307307
for chain in self.get_chains(packet=packet, table=table):
@@ -323,7 +323,7 @@ def _process_by_chain_prio(
323323

324324
return True, None
325325

326-
def _process_chain(self, chain: Chain, packet: PACKET_KINDS) -> (bool, (Rule, None)):
326+
def _process_chain(self, chain: Chain, packet: PacketIP) -> tuple[bool, Rule|None]:
327327
"""
328328
see: RunFirewallChain.process
329329
"""
@@ -343,7 +343,7 @@ def _chain_filter_pre_routing(self, chain: Chain, flow: type[Flow]) -> bool:
343343
before_dnat = self._is_chain_before_eq(chain=chain, **self._fw.system.FIREWALL_NAT[flow]['dnat'])
344344
return chain.type != chain.TYPE_NAT and before_dnat
345345

346-
def process_pre_routing(self, packet: PACKET_KINDS, flow: type[Flow]) -> (bool, (Rule, None)):
346+
def process_pre_routing(self, packet: PacketIP, flow: type[Flow]) -> tuple[bool, Rule|None]:
347347
"""
348348
:param packet: Packet to process
349349
:param flow: traffic flow-type
@@ -368,7 +368,7 @@ def _chain_filter_dnat(self, chain: Chain, flow: type[Flow]) -> bool:
368368
chain.hook == chain_dnat['hook'] and \
369369
chain.priority == chain_dnat['priority']
370370

371-
def process_dnat(self, packet: PACKET_KINDS, flow: type[Flow]) -> (bool, (Rule, None)):
371+
def process_dnat(self, packet: PacketIP, flow: type[Flow]) -> tuple[bool, Rule|None]:
372372
"""
373373
:param packet: Packet to process
374374
:param flow: traffic flow-type
@@ -400,7 +400,7 @@ def _chain_filter_main(self, chain: Chain, flow: type[Flow]) -> bool:
400400

401401
return chain.type != chain.TYPE_NAT and after_dnat and before_snat
402402

403-
def process_main(self, packet: PACKET_KINDS, flow: type[Flow]) -> (bool, (Rule, None)):
403+
def process_main(self, packet: PacketIP, flow: type[Flow]) -> tuple[bool, Rule|None]:
404404
"""
405405
:param packet: Packet to process
406406
:param flow: traffic flow-type
@@ -425,7 +425,7 @@ def _chain_filter_snat(self, chain: Chain, flow: type[Flow]) -> bool:
425425
chain.hook == chain_snat['hook'] and \
426426
chain.priority == chain_snat['priority']
427427

428-
def process_snat(self, packet: PACKET_KINDS, flow: type[Flow]) -> (bool, (Rule, None)):
428+
def process_snat(self, packet: PacketIP, flow: type[Flow]) -> tuple[bool, Rule|None]:
429429
"""
430430
:param packet: Packet to process
431431
:param flow: traffic flow-type
@@ -451,7 +451,7 @@ def _chain_filter_egress(self, chain: Chain, flow: type[Flow]) -> bool:
451451
after_snat = self._is_chain_after(chain=chain, **self._fw.system.FIREWALL_NAT[flow]['snat'])
452452
return chain.type != chain.TYPE_NAT and after_snat
453453

454-
def process_egress(self, packet: PACKET_KINDS, flow: type[Flow]) -> (bool, (Rule, None)):
454+
def process_egress(self, packet: PacketIP, flow: type[Flow]) -> tuple[bool, Rule|None]:
455455
"""
456456
:param packet: Packet to process
457457
:param flow: traffic flow-type
@@ -475,15 +475,15 @@ def __init__(self, system: type[FirewallSystem], ruleset: Ruleset):
475475

476476
self._run_tables = RunFirewallTables(self)
477477

478-
def process_pre_routing(self, packet: PACKET_KINDS, flow: type[Flow]) -> (bool, (Rule, None)):
478+
def process_pre_routing(self, packet: PacketIP, flow: type[Flow]) -> tuple[bool, Rule|None]:
479479
log_info('Firewall', v3='Processing Pre-Routing Filter-Hooks')
480480
if flow == FlowInputForward:
481481
# before DNAT we cannot know for sure
482482
flow = FlowInput
483483

484484
return self._run_tables.process_pre_routing(packet=packet, flow=flow)
485485

486-
def process_dnat(self, packet: PACKET_KINDS, flow: type[Flow]) -> (bool, (Rule, None)):
486+
def process_dnat(self, packet: PacketIP, flow: type[Flow]) -> tuple[bool, Rule|None]:
487487
if flow == FlowInputForward:
488488
# before DNAT we cannot know for sure
489489
flow = FlowInput
@@ -496,12 +496,12 @@ def process_dnat(self, packet: PACKET_KINDS, flow: type[Flow]) -> (bool, (Rule,
496496

497497
return self._run_tables.process_dnat(packet=packet, flow=flow)
498498

499-
def process_main(self, packet: PACKET_KINDS, flow: type[Flow]) -> (bool, (Rule, None)):
499+
def process_main(self, packet: PacketIP, flow: type[Flow]) -> tuple[bool, Rule|None]:
500500
log_info('Firewall', v3='Processing Main Filter-Hooks')
501501

502502
return self._run_tables.process_main(packet=packet, flow=flow)
503503

504-
def process_snat(self, packet: PACKET_KINDS, flow: type[Flow]) -> (bool, (Rule, None)):
504+
def process_snat(self, packet: PacketIP, flow: type[Flow]) -> tuple[bool, Rule|None]:
505505
if not self.system.FIREWALL_SNAT or 'snat' not in self.system.FIREWALL_NAT[flow]:
506506
# system or flow has no SNAT capability
507507
return False, None
@@ -510,7 +510,7 @@ def process_snat(self, packet: PACKET_KINDS, flow: type[Flow]) -> (bool, (Rule,
510510

511511
return self._run_tables.process_snat(packet=packet, flow=flow)
512512

513-
def process_egress(self, packet: PACKET_KINDS, flow: type[Flow]) -> (bool, (Rule, None)):
513+
def process_egress(self, packet: PacketIP, flow: type[Flow]) -> tuple[bool, Rule|None]:
514514
if 'snat' not in self.system.FIREWALL_NAT[flow]:
515515
# already processed all chains
516516
return True, None

0 commit comments

Comments
 (0)