From 8d312aa3a29d9a3223f512c0d1e5c41fcc8fb1ec Mon Sep 17 00:00:00 2001 From: Costas Drogos Date: Tue, 22 Aug 2023 16:42:45 +0200 Subject: [PATCH 01/10] First version of the ip module --- test/test_modules.py | 39 +++++++++- testinfra/modules/__init__.py | 1 + testinfra/modules/ip.py | 129 ++++++++++++++++++++++++++++++++++ 3 files changed, 168 insertions(+), 1 deletion(-) create mode 100644 testinfra/modules/ip.py diff --git a/test/test_modules.py b/test/test_modules.py index 3106479f..8bb85b1b 100644 --- a/test/test_modules.py +++ b/test/test_modules.py @@ -633,7 +633,7 @@ def test_addr_namespace(host): ) def test_interface(host, family): # exist - assert host.interface("eth0", family=family).exists + assert host.interface("tap0", family=family).exists assert not host.interface("does_not_exist", family=family).exists # addresses addresses = host.interface.default(family).addresses @@ -648,3 +648,40 @@ def test_interface(host, family): default_itf = host.interface.default(family) assert default_itf.name == "eth0" assert default_itf.exists + + +def test_ip_links(host): + assert host.ip.exists + iphandle = host.ip() + + links = iphandle.links() + assert len(links) > 0 and len(links) < 4 + + assert links[0].get("ifname") and links[0].get("ifindex") + + +def test_ip_routes(host): + assert host.ip.exists + iphandle = host.ip() + + routes = iphandle.routes() + assert len(routes) > 0 + + +def test_ip_rules(host): + assert host.ip.exists + iphandle = host.ip() + + rules = iphandle.rules() + assert len(rules) > 0 and len(rules) < 4 + assert rules[0].get("priority") == 0 + assert rules[0].get("src") == "all" + assert rules[0].get("table") == "local" + + +def test_ip_tunnels(host): + assert host.ip.exists + iphandle = host.ip() + + tunnels = iphandle.tunnels() + assert len(tunnels) == 0 diff --git a/testinfra/modules/__init__.py b/testinfra/modules/__init__.py index 94df79cb..0b6d3749 100644 --- a/testinfra/modules/__init__.py +++ b/testinfra/modules/__init__.py @@ -27,6 +27,7 @@ "group": "group:Group", "interface": "interface:Interface", "iptables": "iptables:Iptables", + "ip": "ip:IP", "mount_point": "mountpoint:MountPoint", "package": "package:Package", "pip": "pip:Pip", diff --git a/testinfra/modules/ip.py b/testinfra/modules/ip.py new file mode 100644 index 00000000..228ee680 --- /dev/null +++ b/testinfra/modules/ip.py @@ -0,0 +1,129 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import functools +import json +import re + +from testinfra.modules.base import Module + +class IP(Module): + """Test network configuration via ip commands + + >>> host.ip.rules() + + host.ip.rules(from,to,tos,fwmark,iif,oif,pref, uidrange, ipproto, sport, dport) + host.ip.routes(table, device, scope, proto, src, metric) + host.ip.links() + host.ip.addresses() + host.ip.tunnels() + + Optionally, the protocol family can be provided: + >>> host.ip.routes("inet6", table="main") + ...FIX + + Optionally, this can work inside a different network namespace: + >>> host.ip.routes("inet6", "vpn") + ...FIX + """ + + def __init__(self, family=None, netns=None): + self.family = family + self.netns = netns + super().__init__() + + @property + def exists(self): + raise NotImplementedError + + def addresses(self): + """Return the addresses associated with interfaces + """ + raise NotImplementedError + + def links(self): + """Return links and their state + """ + raise NotImplementedError + + def routes(self): + """Return the routes associated with the routing table + """ + raise NotImplementedError + + def rules(self): + """Return all configured ip rules + """ + raise NotImplementedError + + def tunnels(self): + """Return all configured tunnels + """ + raise NotImplementedError + + def __repr__(self): + return "" + + @classmethod + def get_module_class(cls, host): + if host.system_info.type == "linux": + return LinuxIP + raise NotImplementedError + +class LinuxIP(IP): + @functools.cached_property + def _ip(self): + ip_cmd = self.find_command("ip") + if self.netns is not None: + ip_cmd = f"{ip_cmd} netns exec {self.netns} {ip_cmd}" + if self.family is not None: + ip_cmd = f"{ip_cmd} -f {self.family}" + return ip_cmd + + @property + def exists(self): + return self.run_test("{} -V".format(self._ip), self.name).rc == 0 + + def addresses(self): + """Return the addresses associated with interfaces + """ + cmd = f"{self._ip} --json address show" + out = self.check_output(cmd) + return json.loads(out) + + def links(self): + """Return links and their state + """ + cmd = f"{self._ip} --json link show" + out = self.check_output(cmd) + return json.loads(out) + + def routes(self): + """Return the routes installed + """ + cmd = f"{self._ip} --json route show table all" + out = self.check_output(cmd) + return json.loads(out) + + def rules(self): + """Return the rules our routing policy consists of + """ + cmd = f"{self._ip} --json rule show" + out = self.check_output(cmd) + return json.loads(out) + + def tunnels(self): + """Return all configured tunnels + """ + cmd = f"{self._ip} --json tunnel show" + out = self.check_output(cmd) + return json.loads(out) From 93599d87fcff8f2e05f05341d5670f9b22797b37 Mon Sep 17 00:00:00 2001 From: Costas Drogos Date: Fri, 25 Aug 2023 23:43:09 +0300 Subject: [PATCH 02/10] more rules --- test/test_modules.py | 34 +++++++++++------ testinfra/modules/ip.py | 85 ++++++++++++++--------------------------- 2 files changed, 51 insertions(+), 68 deletions(-) diff --git a/test/test_modules.py b/test/test_modules.py index 8bb85b1b..445d781e 100644 --- a/test/test_modules.py +++ b/test/test_modules.py @@ -650,38 +650,48 @@ def test_interface(host, family): assert default_itf.exists -def test_ip_links(host): +def test_iproute2_links(host): assert host.ip.exists - iphandle = host.ip() - links = iphandle.links() + links = host.ip.links() assert len(links) > 0 and len(links) < 4 assert links[0].get("ifname") and links[0].get("ifindex") -def test_ip_routes(host): +def test_iproute2_routes(host): assert host.ip.exists - iphandle = host.ip() - routes = iphandle.routes() + routes = host.ip.routes() assert len(routes) > 0 -def test_ip_rules(host): +def test_iproute2_rules(host): assert host.ip.exists - iphandle = host.ip() - rules = iphandle.rules() + rules = host.ip.rules() assert len(rules) > 0 and len(rules) < 4 assert rules[0].get("priority") == 0 assert rules[0].get("src") == "all" assert rules[0].get("table") == "local" -def test_ip_tunnels(host): +def test_iproute2_tunnels(host): assert host.ip.exists - iphandle = host.ip() - tunnels = iphandle.tunnels() + tunnels = host.ip.tunnels() assert len(tunnels) == 0 + + +def test_iproute2_vrfs(host): + assert host.ip.exists + + vrfs = host.ip.vrfs() + assert len(vrfs) == 0 + + +def test_iproute2_netns(host): + assert host.ip.exists + + namespaces = host.ip.netns() + assert len(namespaces) == 0 diff --git a/testinfra/modules/ip.py b/testinfra/modules/ip.py index 228ee680..34ea0b87 100644 --- a/testinfra/modules/ip.py +++ b/testinfra/modules/ip.py @@ -12,12 +12,12 @@ import functools import json -import re -from testinfra.modules.base import Module +from testinfra.modules.base import InstanceModule -class IP(Module): - """Test network configuration via ip commands + +class IP(InstanceModule): + """Test network configuration via iproute2 commands >>> host.ip.rules() @@ -27,7 +27,7 @@ class IP(Module): host.ip.addresses() host.ip.tunnels() - Optionally, the protocol family can be provided: + Optionally, the protocol family can be provided to reduce the number of routes returned: >>> host.ip.routes("inet6", table="main") ...FIX @@ -36,94 +36,67 @@ class IP(Module): ...FIX """ - def __init__(self, family=None, netns=None): + def __init__(self, family=None, namespace=None): self.family = family - self.netns = netns + self.namespace = namespace super().__init__() - @property - def exists(self): - raise NotImplementedError - - def addresses(self): - """Return the addresses associated with interfaces - """ - raise NotImplementedError - - def links(self): - """Return links and their state - """ - raise NotImplementedError - - def routes(self): - """Return the routes associated with the routing table - """ - raise NotImplementedError - - def rules(self): - """Return all configured ip rules - """ - raise NotImplementedError - - def tunnels(self): - """Return all configured tunnels - """ - raise NotImplementedError - def __repr__(self): return "" - @classmethod - def get_module_class(cls, host): - if host.system_info.type == "linux": - return LinuxIP - raise NotImplementedError - -class LinuxIP(IP): @functools.cached_property def _ip(self): ip_cmd = self.find_command("ip") - if self.netns is not None: - ip_cmd = f"{ip_cmd} netns exec {self.netns} {ip_cmd}" + if self.namespace is not None: + ip_cmd = f"{ip_cmd} netns exec {self.namespace} {ip_cmd}" if self.family is not None: ip_cmd = f"{ip_cmd} -f {self.family}" return ip_cmd @property def exists(self): - return self.run_test("{} -V".format(self._ip), self.name).rc == 0 + return self.run_test("{} -V".format(self._ip)).rc == 0 def addresses(self): - """Return the addresses associated with interfaces - """ + """Return the addresses associated with interfaces""" cmd = f"{self._ip} --json address show" out = self.check_output(cmd) return json.loads(out) def links(self): - """Return links and their state - """ + """Return links and their state""" cmd = f"{self._ip} --json link show" out = self.check_output(cmd) return json.loads(out) def routes(self): - """Return the routes installed - """ + """Return the routes installed""" cmd = f"{self._ip} --json route show table all" out = self.check_output(cmd) return json.loads(out) def rules(self): - """Return the rules our routing policy consists of - """ + """Return the rules our routing policy consists of""" cmd = f"{self._ip} --json rule show" out = self.check_output(cmd) return json.loads(out) def tunnels(self): - """Return all configured tunnels - """ + """Return all configured tunnels""" cmd = f"{self._ip} --json tunnel show" out = self.check_output(cmd) return json.loads(out) + + def vrfs(self): + """Return all configured vrfs""" + cmd = f"{self._ip} --json vrf show" + out = self.check_output(cmd) + return json.loads(out) + + def netns(self): + """Return all configured network namespaces""" + cmd = f"{self._ip} --json netns show" + out = self.check_output(cmd) + if out is None: # ip netns returns null instead of [] in json mode + return json.loads("[]") + return json.loads(out) From 8aec2e4f24d22aef850cb83589715748ee87d7d4 Mon Sep 17 00:00:00 2001 From: Costas Drogos Date: Fri, 25 Aug 2023 23:59:27 +0300 Subject: [PATCH 03/10] rename to iproute2 --- test/test_modules.py | 24 ++++++++++++------------ testinfra/modules/__init__.py | 2 +- testinfra/modules/{ip.py => iproute2.py} | 12 ++++++------ 3 files changed, 19 insertions(+), 19 deletions(-) rename testinfra/modules/{ip.py => iproute2.py} (91%) diff --git a/test/test_modules.py b/test/test_modules.py index 445d781e..9db8122f 100644 --- a/test/test_modules.py +++ b/test/test_modules.py @@ -651,25 +651,25 @@ def test_interface(host, family): def test_iproute2_links(host): - assert host.ip.exists + assert host.iproute2.exists - links = host.ip.links() + links = host.iproute2.links() assert len(links) > 0 and len(links) < 4 assert links[0].get("ifname") and links[0].get("ifindex") def test_iproute2_routes(host): - assert host.ip.exists + assert host.iproute2.exists - routes = host.ip.routes() + routes = host.iproute2.routes() assert len(routes) > 0 def test_iproute2_rules(host): - assert host.ip.exists + assert host.iproute2.exists - rules = host.ip.rules() + rules = host.iproute2.rules() assert len(rules) > 0 and len(rules) < 4 assert rules[0].get("priority") == 0 assert rules[0].get("src") == "all" @@ -677,21 +677,21 @@ def test_iproute2_rules(host): def test_iproute2_tunnels(host): - assert host.ip.exists + assert host.iproute2.exists - tunnels = host.ip.tunnels() + tunnels = host.iproute2.tunnels() assert len(tunnels) == 0 def test_iproute2_vrfs(host): - assert host.ip.exists + assert host.iproute2.exists - vrfs = host.ip.vrfs() + vrfs = host.iproute2.vrfs() assert len(vrfs) == 0 def test_iproute2_netns(host): - assert host.ip.exists + assert host.iproute2.exists - namespaces = host.ip.netns() + namespaces = host.iproute2.netns() assert len(namespaces) == 0 diff --git a/testinfra/modules/__init__.py b/testinfra/modules/__init__.py index 0b6d3749..d38aaee3 100644 --- a/testinfra/modules/__init__.py +++ b/testinfra/modules/__init__.py @@ -27,7 +27,7 @@ "group": "group:Group", "interface": "interface:Interface", "iptables": "iptables:Iptables", - "ip": "ip:IP", + "iproute2": "iproute2:IProute2", "mount_point": "mountpoint:MountPoint", "package": "package:Package", "pip": "pip:Pip", diff --git a/testinfra/modules/ip.py b/testinfra/modules/iproute2.py similarity index 91% rename from testinfra/modules/ip.py rename to testinfra/modules/iproute2.py index 34ea0b87..f146adc9 100644 --- a/testinfra/modules/ip.py +++ b/testinfra/modules/iproute2.py @@ -16,10 +16,10 @@ from testinfra.modules.base import InstanceModule -class IP(InstanceModule): +class IProute2(InstanceModule): """Test network configuration via iproute2 commands - >>> host.ip.rules() + >>> host.iproute2.rules() host.ip.rules(from,to,tos,fwmark,iif,oif,pref, uidrange, ipproto, sport, dport) host.ip.routes(table, device, scope, proto, src, metric) @@ -28,11 +28,11 @@ class IP(InstanceModule): host.ip.tunnels() Optionally, the protocol family can be provided to reduce the number of routes returned: - >>> host.ip.routes("inet6", table="main") + >>> host.iproute2.routes("inet6", table="main") ...FIX Optionally, this can work inside a different network namespace: - >>> host.ip.routes("inet6", "vpn") + >>> host.iproute2.routes("inet6", "vpn") ...FIX """ @@ -97,6 +97,6 @@ def netns(self): """Return all configured network namespaces""" cmd = f"{self._ip} --json netns show" out = self.check_output(cmd) - if out is None: # ip netns returns null instead of [] in json mode - return json.loads("[]") + if not out: # ip netns returns null instead of [] in json mode + return json.loads("[]\n") return json.loads(out) From 0d82bec105cf7b8eadd174b8c878ba10df822671 Mon Sep 17 00:00:00 2001 From: Costas Drogos Date: Sat, 26 Aug 2023 01:28:20 +0300 Subject: [PATCH 04/10] modules/iproute2: Add filters in routes & addresses --- test/test_modules.py | 27 ++++++++++ testinfra/modules/iproute2.py | 94 ++++++++++++++++++++++++++++++++--- 2 files changed, 114 insertions(+), 7 deletions(-) diff --git a/test/test_modules.py b/test/test_modules.py index 9db8122f..15c3004d 100644 --- a/test/test_modules.py +++ b/test/test_modules.py @@ -650,6 +650,23 @@ def test_interface(host, family): assert default_itf.exists +def test_iproute2_addresses(host): + assert host.iproute2.exists + + addresses = host.iproute2.addresses() + + assert len(addresses) > 0 + assert addresses[0].get("ifname") and addresses[0].get("ifindex") + + filtered_addresses = host.iproute2.addresses(ifname="lo") + assert filtered_addresses[0].get("ifname") == "lo" and len(filtered_addresses) == 1 + + filtered_addresses2 = host.iproute2.addresses(local="127.0.0.1") + assert ( + filtered_addresses2[0].get("ifname") == "lo" and len(filtered_addresses2) == 1 + ) + + def test_iproute2_links(host): assert host.iproute2.exists @@ -665,6 +682,9 @@ def test_iproute2_routes(host): routes = host.iproute2.routes() assert len(routes) > 0 + filtered_routes = host.iproute2.routes(table="local", scope="host", src="127.0.0.1") + assert filtered_routes[0].get("protocol") == "kernel" and len(filtered_routes) > 1 + def test_iproute2_rules(host): assert host.iproute2.exists @@ -675,6 +695,13 @@ def test_iproute2_rules(host): assert rules[0].get("src") == "all" assert rules[0].get("table") == "local" + cmd = host.run("ip rule add from 1.2.3.4/32 table 123") + assert cmd.exit_status == 0, f"{cmd.stdout}\n{cmd.stderr}" + + rules_123 = host.iproute2.rules(src="1.2.3.4/32") + assert len(rules_123) > 0 + assert rules_123[0].get("src") == "1.2.3.4" + def test_iproute2_tunnels(host): assert host.iproute2.exists diff --git a/testinfra/modules/iproute2.py b/testinfra/modules/iproute2.py index f146adc9..3e437ef7 100644 --- a/testinfra/modules/iproute2.py +++ b/testinfra/modules/iproute2.py @@ -48,7 +48,7 @@ def __repr__(self): def _ip(self): ip_cmd = self.find_command("ip") if self.namespace is not None: - ip_cmd = f"{ip_cmd} netns exec {self.namespace} {ip_cmd}" + ip_cmd = f"{ip_cmd} -n {self.namespace}" if self.family is not None: ip_cmd = f"{ip_cmd} -f {self.family}" return ip_cmd @@ -57,11 +57,25 @@ def _ip(self): def exists(self): return self.run_test("{} -V".format(self._ip)).rc == 0 - def addresses(self): + def addresses(self, address=None, ifname=None, local=None): """Return the addresses associated with interfaces""" cmd = f"{self._ip} --json address show" out = self.check_output(cmd) - return json.loads(out) + j = json.loads(out) + o = [] + if address is None and ifname is None and local is None: + # no filters, bail out early + return j + if address is not None: + [o.append(x) for x in j if x["address"] == address] + if ifname is not None: + [o.append(x) for x in j if x["ifname"] == ifname] + if local is not None: + for x in j: + for addr in x["addr_info"]: # multiple IPs in an interface + if addr["local"] == local: + o.append(x) + return o def links(self): """Return links and their state""" @@ -69,15 +83,81 @@ def links(self): out = self.check_output(cmd) return json.loads(out) - def routes(self): + def routes( + self, table="all", device=None, scope=None, proto=None, src=None, metric=None + ): """Return the routes installed""" - cmd = f"{self._ip} --json route show table all" + cmd = f"{self._ip} --json route show " + options = [] + if table is not None: + options += ["table", table] + if device is not None: + options += ["dev", device] + if scope is not None: + options += ["scope", scope] + if proto is not None: + options += ["proto", proto] + if src is not None: + options += ["src", src] + if metric is not None: + options += ["metric", metric] + + cmd += " ".join(options) out = self.check_output(cmd) return json.loads(out) - def rules(self): + def rules( + self, + src=None, + to=None, + tos=None, + fwmark=None, + iif=None, + oif=None, + pref=None, + uidrange=None, + ipproto=None, + sport=None, + dport=None, + ): """Return the rules our routing policy consists of""" - cmd = f"{self._ip} --json rule show" + cmd = f"{self._ip} --json rule show " + + options = [] + if src is not None: + options += ["from", src] + + if to is not None: + options += ["to", to] + + if tos is not None: + options += ["tos", tos] + + if fwmark is not None: + options += ["fwmark", fwmark] + + if iif is not None: + options += ["iif", iif] + + if oif is not None: + options += ["oif", oif] + + if pref is not None: + options += ["pref", pref] + + if uidrange is not None: + options += ["uidrange", uidrange] + + if ipproto is not None: + options += ["ipproto", ipproto] + + if sport is not None: + options += ["sport", sport] + + if dport is not None: + options += ["dport", dport] + + cmd += " ".join(options) out = self.check_output(cmd) return json.loads(out) From 35d568c2d7a0d7c807feb5e8f4cb9461066fcfa0 Mon Sep 17 00:00:00 2001 From: Costas Drogos Date: Thu, 31 Aug 2023 00:12:18 +0300 Subject: [PATCH 05/10] iproute2: filtering in tunnels, more tests for netns & tunnels --- test/test_modules.py | 18 +++++++++++++++++- testinfra/modules/iproute2.py | 10 ++++++++-- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/test/test_modules.py b/test/test_modules.py index 15c3004d..6a37acc2 100644 --- a/test/test_modules.py +++ b/test/test_modules.py @@ -707,7 +707,16 @@ def test_iproute2_tunnels(host): assert host.iproute2.exists tunnels = host.iproute2.tunnels() - assert len(tunnels) == 0 + assert len(tunnels) > 0 + + cmd = host.run("ip tunnel add test mode ipip remote 127.0.0.1") + assert cmd.exit_status == 0, f"{cmd.stdout}\n{cmd.stderr}" + + tunnels = host.iproute2.tunnels(ifname="test") + assert len(tunnels) > 0 + assert tunnels[0].get("ifname") == "test" + assert tunnels[0].get("mode") == "ip/ip" + assert tunnels[0].get("remote") == "127.0.0.1" def test_iproute2_vrfs(host): @@ -722,3 +731,10 @@ def test_iproute2_netns(host): namespaces = host.iproute2.netns() assert len(namespaces) == 0 + + cmd = host.run("ip netns add test") + assert cmd.exit_status == 0, f"{cmd.stdout}\n{cmd.stderr}" + + namespaces = host.iproute2.netns() + assert len(namespaces) == 1 + assert namespaces[0].get("name") == "test" diff --git a/testinfra/modules/iproute2.py b/testinfra/modules/iproute2.py index 3e437ef7..e5f150d0 100644 --- a/testinfra/modules/iproute2.py +++ b/testinfra/modules/iproute2.py @@ -161,9 +161,15 @@ def rules( out = self.check_output(cmd) return json.loads(out) - def tunnels(self): + def tunnels(self, ifname=None): """Return all configured tunnels""" - cmd = f"{self._ip} --json tunnel show" + cmd = f"{self._ip} --json tunnel show " + + options = [] + if ifname is not None: + options += [ifname] + + cmd += " ".join(options) out = self.check_output(cmd) return json.loads(out) From d187821f9f835b00b3277127db9802fe0dc74f3b Mon Sep 17 00:00:00 2001 From: Costas Drogos Date: Wed, 30 Aug 2023 23:51:57 +0200 Subject: [PATCH 06/10] iproute2: add some documentation --- doc/source/modules.rst | 4 ++ testinfra/modules/iproute2.py | 75 +++++++++++++++++++++++++---------- 2 files changed, 59 insertions(+), 20 deletions(-) diff --git a/doc/source/modules.rst b/doc/source/modules.rst index 2a5b001f..9d6d7106 100644 --- a/doc/source/modules.rst +++ b/doc/source/modules.rst @@ -50,6 +50,10 @@ host :class:`testinfra.modules.interface.Interface` class + .. attribute:: iproute2 + + :class:`testinfra.modules.iproute2.IProute2` class + .. attribute:: iptables :class:`testinfra.modules.iptables.Iptables` class diff --git a/testinfra/modules/iproute2.py b/testinfra/modules/iproute2.py index e5f150d0..6401be05 100644 --- a/testinfra/modules/iproute2.py +++ b/testinfra/modules/iproute2.py @@ -19,21 +19,10 @@ class IProute2(InstanceModule): """Test network configuration via iproute2 commands - >>> host.iproute2.rules() + Optional arguments: + - family: force iproute2 tools to use a specific protocol family + - namespace: execute iproute2 tools inside the provided namespace - host.ip.rules(from,to,tos,fwmark,iif,oif,pref, uidrange, ipproto, sport, dport) - host.ip.routes(table, device, scope, proto, src, metric) - host.ip.links() - host.ip.addresses() - host.ip.tunnels() - - Optionally, the protocol family can be provided to reduce the number of routes returned: - >>> host.iproute2.routes("inet6", table="main") - ...FIX - - Optionally, this can work inside a different network namespace: - >>> host.iproute2.routes("inet6", "vpn") - ...FIX """ def __init__(self, family=None, namespace=None): @@ -58,7 +47,14 @@ def exists(self): return self.run_test("{} -V".format(self._ip)).rc == 0 def addresses(self, address=None, ifname=None, local=None): - """Return the addresses associated with interfaces""" + """Return the addresses associated with interfaces + + Optionally, results can be filtered by: + - address + - ifname + - local + + """ cmd = f"{self._ip} --json address show" out = self.check_output(cmd) j = json.loads(out) @@ -86,7 +82,20 @@ def links(self): def routes( self, table="all", device=None, scope=None, proto=None, src=None, metric=None ): - """Return the routes installed""" + """Returns the routes installed + + Optionally, routes returned can be filtered with the following + selectors. This can be useful in busy routing tables. + + Selectors: + - table + - device (maps to ip-route's 'dev' selector) + - scope + - proto + - src + - metric + + """ cmd = f"{self._ip} --json route show " options = [] if table is not None: @@ -120,7 +129,25 @@ def rules( sport=None, dport=None, ): - """Return the rules our routing policy consists of""" + """Returns the rules our routing policy consists of + + Optionally, rules returned can be filtered with the following + selectors. This can be useful in busy rulesets. + + Selectors: + - src (maps to ip-rule's 'from' selector) + - to + - tos + - fwmark + - iif + - oif + - pref + - uidrange + - ipproto + - sport + - dport + + """ cmd = f"{self._ip} --json rule show " options = [] @@ -162,7 +189,15 @@ def rules( return json.loads(out) def tunnels(self, ifname=None): - """Return all configured tunnels""" + """Returns all configured tunnels + + Optionally, tunnels returned can be filtered with the interface name. + This can be faster in busy tunnel installations. + + Selectors: + - ifname + + """ cmd = f"{self._ip} --json tunnel show " options = [] @@ -174,13 +209,13 @@ def tunnels(self, ifname=None): return json.loads(out) def vrfs(self): - """Return all configured vrfs""" + """Returns all configured vrfs""" cmd = f"{self._ip} --json vrf show" out = self.check_output(cmd) return json.loads(out) def netns(self): - """Return all configured network namespaces""" + """Returns all configured network namespaces""" cmd = f"{self._ip} --json netns show" out = self.check_output(cmd) if not out: # ip netns returns null instead of [] in json mode From eb2b08b367ad49bba1170de6279086e3728d716b Mon Sep 17 00:00:00 2001 From: Costas Drogos Date: Thu, 31 Aug 2023 01:49:19 +0200 Subject: [PATCH 07/10] iproute2: add bridge --- test/test_modules.py | 26 ++++++++++++++++++++++++++ testinfra/modules/iproute2.py | 24 ++++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/test/test_modules.py b/test/test_modules.py index 6a37acc2..cfad279f 100644 --- a/test/test_modules.py +++ b/test/test_modules.py @@ -738,3 +738,29 @@ def test_iproute2_netns(host): namespaces = host.iproute2.netns() assert len(namespaces) == 1 assert namespaces[0].get("name") == "test" + +def test_iproute2_bridge_vlan(host): + assert host.iproute2.bridge_exists + + vlans = host.iproute2.bridge_vlan() + assert len(vlans) == 0 + +def test_iproute2_bridge_fdb(host): + assert host.iproute2.bridge_exists + + fdb = host.iproute2.bridge_fdb() + assert len(fdb) > 0 + +def test_iproute2_bridge_mdb(host): + assert host.iproute2.bridge_exists + + mdb = host.iproute2.bridge_mdb() + assert len(mdb) == 1 + assert len(mdb[0].get("mdb")) == 0 + assert len(mdb[0].get("router")) == 0 + +def test_iproute2_bridge_link(host): + assert host.iproute2.bridge_exists + + links = host.iproute2.bridge_link() + assert len(links) == 0 diff --git a/testinfra/modules/iproute2.py b/testinfra/modules/iproute2.py index 6401be05..b88a539a 100644 --- a/testinfra/modules/iproute2.py +++ b/testinfra/modules/iproute2.py @@ -221,3 +221,27 @@ def netns(self): if not out: # ip netns returns null instead of [] in json mode return json.loads("[]\n") return json.loads(out) + + def bridge_vlan(self): + """Returns all configured vlans""" + cmd = f"{self._bridge} -json vlan show" + out = self.check_output(cmd) + return json.loads(out) + + def bridge_fdb(self): + """Returns all configured fdb entries""" + cmd = f"{self._bridge} -json fdb show" + out = self.check_output(cmd) + return json.loads(out) + + def bridge_mdb(self): + """Returns all configured mdb entries""" + cmd = f"{self._bridge} -json mdb show" + out = self.check_output(cmd) + return json.loads(out) + + def bridge_link(self): + """Returns all configured links""" + cmd = f"{self._bridge} -json link show" + out = self.check_output(cmd) + return json.loads(out) From 09d4cb0e8d2b79892e1551e96542e49adaee0578 Mon Sep 17 00:00:00 2001 From: Costas Drogos Date: Thu, 31 Aug 2023 01:54:59 +0200 Subject: [PATCH 08/10] iproute2: updated docs --- doc/source/modules.rst | 7 + testinfra/modules/iproute2.py | 234 +++++++++++++++++++++++++++------- 2 files changed, 193 insertions(+), 48 deletions(-) diff --git a/doc/source/modules.rst b/doc/source/modules.rst index 9d6d7106..562fa529 100644 --- a/doc/source/modules.rst +++ b/doc/source/modules.rst @@ -180,6 +180,13 @@ Interface :undoc-members: :exclude-members: get_module_class +IProute2 +~~~~~~~~~ + +.. autoclass:: testinfra.modules.iproute2.IProute2 + :members: + :undoc-members: + Iptables ~~~~~~~~~ diff --git a/testinfra/modules/iproute2.py b/testinfra/modules/iproute2.py index b88a539a..a3a7ec77 100644 --- a/testinfra/modules/iproute2.py +++ b/testinfra/modules/iproute2.py @@ -19,9 +19,27 @@ class IProute2(InstanceModule): """Test network configuration via iproute2 commands - Optional arguments: - - family: force iproute2 tools to use a specific protocol family - - namespace: execute iproute2 tools inside the provided namespace + Currently supported: + + * ip-address + * ip-link + * ip-route + * ip-rule + * ip-vrf + * ip-tunnel + * ip-netns + * bridge vlan + * bridge link + * bridge fdb + * bridge mdb + + Optional module-level arguments can also be provided to contro; execution: + + * family: force iproute2 tools to use a specific protocol family + >>> host.iproute2(family="inet").addresses() + + * namespace: execute iproute2 tools inside the provided namespace + >>> host.iproute2(namespace="test").addresses() """ @@ -42,17 +60,68 @@ def _ip(self): ip_cmd = f"{ip_cmd} -f {self.family}" return ip_cmd + @functools.cached_property + def _bridge(self): + bridge_cmd = self.find_command("bridge") + if self.namespace is not None: + bridge_cmd = f"{bridge_cmd} -n {self.namespace}" + return bridge_cmd + @property def exists(self): + """Returns True if ip -V succeeds + + >>> host.iproute2.exists + True + + """ return self.run_test("{} -V".format(self._ip)).rc == 0 - def addresses(self, address=None, ifname=None, local=None): - """Return the addresses associated with interfaces + @property + def bridge_exists(self): + """Returns True if bridge -V succeeds + + >>> host.iproute2.bridge_exists + True - Optionally, results can be filtered by: - - address - - ifname - - local + """ + return self.run_test("{} -V".format(self._bridge)).rc == 0 + + def addresses(self, address=None, ifname=None, local=None): + """Returns the addresses associated with interfaces + + >>> host.iproute2.addresses() + [{'ifindex': 1, + 'ifname': 'lo', + 'flags': ['LOOPBACK', 'UP', 'LOWER_UP'], + 'mtu': 65536, + 'qdisc': 'noqueue', + 'operstate': 'UNKNOWN', + 'group': 'default', + 'txqlen': 1000, + 'link_type': 'loopback', + 'address': '00:00:00:00:00:00', + 'broadcast': '00:00:00:00:00:00', + 'addr_info': [{'family': 'inet', + 'local': '127.0.0.1', + 'prefixlen': 8, + 'scope': 'host', + 'label': 'lo', + 'valid_life_time': 4294967295, + 'preferred_life_time': 4294967295}, + {'family': 'inet6', + 'local': '::1', + 'prefixlen': 128, + 'scope': 'host', + 'noprefixroute': True, + 'valid_life_time': 4294967295, + 'preferred_life_time': 4294967295}]}] + + Optionally, results can be filtered with the following selectors: + + * address + * ifname + * local """ cmd = f"{self._ip} --json address show" @@ -74,7 +143,23 @@ def addresses(self, address=None, ifname=None, local=None): return o def links(self): - """Return links and their state""" + """Returns links and their state. + + >>> host.iproute2.links() + [{'ifindex': 1, + 'ifname': 'lo', + 'flags': ['LOOPBACK', 'UP', 'LOWER_UP'], + 'mtu': 65536, + 'qdisc': 'noqueue', + 'operstate': 'UNKNOWN', + 'linkmode': 'DEFAULT', + 'group': 'default', + 'txqlen': 1000, + 'link_type': 'loopback', + 'address': '00:00:00:00:00:00', + 'broadcast': '00:00:00:00:00:00'}] + + """ cmd = f"{self._ip} --json link show" out = self.check_output(cmd) return json.loads(out) @@ -82,18 +167,32 @@ def links(self): def routes( self, table="all", device=None, scope=None, proto=None, src=None, metric=None ): - """Returns the routes installed - - Optionally, routes returned can be filtered with the following - selectors. This can be useful in busy routing tables. - - Selectors: - - table - - device (maps to ip-route's 'dev' selector) - - scope - - proto - - src - - metric + """Returns the routes installed in *all* routing tables. + + >>> host.iproute2.routes() + [{'dst': '169.254.0.0/16', + 'dev': 'wlp4s0', + 'scope': 'link', + 'metric': 1000, + 'flags': []}, + {'type': 'multicast', + 'dst': 'ff00::/8', + 'dev': 'wlp4s0', + 'table': 'local', + 'protocol': 'kernel', + 'metric': 256, + 'flags': [], + 'pref': 'medium'}] + + Optionally, routes returned can be filtered with the following + selectors. This can be useful in busy routing tables. + + * table + * device (maps to ip-route's 'dev' selector) + * scope + * proto + * src + * metric """ cmd = f"{self._ip} --json route show " @@ -129,23 +228,28 @@ def rules( sport=None, dport=None, ): - """Returns the rules our routing policy consists of - - Optionally, rules returned can be filtered with the following - selectors. This can be useful in busy rulesets. - - Selectors: - - src (maps to ip-rule's 'from' selector) - - to - - tos - - fwmark - - iif - - oif - - pref - - uidrange - - ipproto - - sport - - dport + """Returns the rules our routing policy consists of. + + >>> host.iproute2.rules() + [{'priority': 0, 'src': 'all', 'table': 'local'}, + {'priority': 32765, 'src': '1.2.3.4', 'table': '123'}, + {'priority': 32766, 'src': 'all', 'table': 'main'}, + {'priority': 32767, 'src': 'all', 'table': 'default'}] + + Optionally, rules returned can be filtered with the following + selectors. This can be useful in busy rulesets. + + * src (maps to ip-rule's 'from' selector) + * to + * tos + * fwmark + * iif + * oif + * pref + * uidrange + * ipproto + * sport + * dport """ cmd = f"{self._ip} --json rule show " @@ -191,11 +295,16 @@ def rules( def tunnels(self, ifname=None): """Returns all configured tunnels - Optionally, tunnels returned can be filtered with the interface name. - This can be faster in busy tunnel installations. + >>> host.iproute2.tunnels() + [{'ifname': 'test1', + 'mode': 'ip/ip', + 'remote': '127.0.0.2', + 'local': '0.0.0.0'}] + + Optionally, tunnels returned can be filtered with the interface name. + This can be faster in busy tunnel installations. - Selectors: - - ifname + * ifname """ cmd = f"{self._ip} --json tunnel show " @@ -215,7 +324,12 @@ def vrfs(self): return json.loads(out) def netns(self): - """Returns all configured network namespaces""" + """Returns all configured network namespaces + + >>> host.iproute2.netns() + [{'name': 'test'}] + """ + cmd = f"{self._ip} --json netns show" out = self.check_output(cmd) if not out: # ip netns returns null instead of [] in json mode @@ -223,25 +337,49 @@ def netns(self): return json.loads(out) def bridge_vlan(self): - """Returns all configured vlans""" + """Returns all configured vlans + + >>> host.iproute2.bridge_vlan() + [] + """ + cmd = f"{self._bridge} -json vlan show" out = self.check_output(cmd) return json.loads(out) def bridge_fdb(self): - """Returns all configured fdb entries""" + """Returns all configured fdb entries + + >>> host.iproute2.bridge_fdb() + [{'mac': '33:33:00:00:00:01', + 'ifname': 'enp0s31f6', + 'flags': ['self'], + 'state': 'permanent'}] + """ + cmd = f"{self._bridge} -json fdb show" out = self.check_output(cmd) return json.loads(out) def bridge_mdb(self): - """Returns all configured mdb entries""" + """Returns all configured mdb entries + + >>> host.iproute2.bridge_mdb() + [{'mdb': [], 'router': {}}] + + """ + cmd = f"{self._bridge} -json mdb show" out = self.check_output(cmd) return json.loads(out) def bridge_link(self): - """Returns all configured links""" + """Returns all configured links + + >>> host.iproute2.bridge_link() + [] + """ + cmd = f"{self._bridge} -json link show" out = self.check_output(cmd) return json.loads(out) From fb3301d8ce118e13ffd4d8f0ba1b184613214077 Mon Sep 17 00:00:00 2001 From: Costas Drogos Date: Thu, 31 Aug 2023 02:15:38 +0200 Subject: [PATCH 09/10] iproute2: convert to Module from InstanceModule --- doc/source/modules.rst | 2 +- test/test_modules.py | 4 + testinfra/modules/iproute2.py | 263 +++++++++++++++++++++++++++++++--- 3 files changed, 245 insertions(+), 24 deletions(-) diff --git a/doc/source/modules.rst b/doc/source/modules.rst index 562fa529..182c4652 100644 --- a/doc/source/modules.rst +++ b/doc/source/modules.rst @@ -186,7 +186,7 @@ IProute2 .. autoclass:: testinfra.modules.iproute2.IProute2 :members: :undoc-members: - + :exclude-members: get_module_class Iptables ~~~~~~~~~ diff --git a/test/test_modules.py b/test/test_modules.py index cfad279f..1bbf2d45 100644 --- a/test/test_modules.py +++ b/test/test_modules.py @@ -739,18 +739,21 @@ def test_iproute2_netns(host): assert len(namespaces) == 1 assert namespaces[0].get("name") == "test" + def test_iproute2_bridge_vlan(host): assert host.iproute2.bridge_exists vlans = host.iproute2.bridge_vlan() assert len(vlans) == 0 + def test_iproute2_bridge_fdb(host): assert host.iproute2.bridge_exists fdb = host.iproute2.bridge_fdb() assert len(fdb) > 0 + def test_iproute2_bridge_mdb(host): assert host.iproute2.bridge_exists @@ -759,6 +762,7 @@ def test_iproute2_bridge_mdb(host): assert len(mdb[0].get("mdb")) == 0 assert len(mdb[0].get("router")) == 0 + def test_iproute2_bridge_link(host): assert host.iproute2.bridge_exists diff --git a/testinfra/modules/iproute2.py b/testinfra/modules/iproute2.py index a3a7ec77..4e17ba66 100644 --- a/testinfra/modules/iproute2.py +++ b/testinfra/modules/iproute2.py @@ -13,11 +13,11 @@ import functools import json -from testinfra.modules.base import InstanceModule +from testinfra.modules.base import Module -class IProute2(InstanceModule): - """Test network configuration via iproute2 commands +class IProute2(Module): + """Tests network configuration via iproute2 commands Currently supported: @@ -33,12 +33,14 @@ class IProute2(InstanceModule): * bridge fdb * bridge mdb - Optional module-level arguments can also be provided to contro; execution: + Optional module-level arguments can also be provided to control execution: + + * **family**: force iproute2 tools to use a specific protocol family - * family: force iproute2 tools to use a specific protocol family >>> host.iproute2(family="inet").addresses() - * namespace: execute iproute2 tools inside the provided namespace + * **namespace**: execute iproute2 tools inside the provided namespace + >>> host.iproute2(namespace="test").addresses() """ @@ -51,6 +53,221 @@ def __init__(self, family=None, namespace=None): def __repr__(self): return "" + @classmethod + def get_module_class(cls, host): + if host.system_info.type == "linux": + return LinuxIProute2 + raise NotImplementedError + + @property + def exists(self): + """Returns True if ip -V succeeds + + >>> host.iproute2.exists + True + + """ + + @property + def bridge_exists(self): + """Returns True if bridge -V succeeds + + >>> host.iproute2.bridge_exists + True + + """ + + def addresses(self, address=None, ifname=None, local=None): + """Returns the addresses associated with interfaces + + >>> host.iproute2.addresses() + [{'ifindex': 1, + 'ifname': 'lo', + 'flags': ['LOOPBACK', 'UP', 'LOWER_UP'], + 'mtu': 65536, + 'qdisc': 'noqueue', + 'operstate': 'UNKNOWN', + 'group': 'default', + 'txqlen': 1000, + 'link_type': 'loopback', + 'address': '00:00:00:00:00:00', + 'broadcast': '00:00:00:00:00:00', + 'addr_info': [{'family': 'inet', + 'local': '127.0.0.1', + 'prefixlen': 8, + 'scope': 'host', + 'label': 'lo', + 'valid_life_time': 4294967295, + 'preferred_life_time': 4294967295}, + {'family': 'inet6', + 'local': '::1', + 'prefixlen': 128, + 'scope': 'host', + 'noprefixroute': True, + 'valid_life_time': 4294967295, + 'preferred_life_time': 4294967295}]}] + + Optionally, results can be filtered with the following selectors: + + * address + * ifname + * local + + """ + + def links(self): + """Returns links and their state. + + >>> host.iproute2.links() + [{'ifindex': 1, + 'ifname': 'lo', + 'flags': ['LOOPBACK', 'UP', 'LOWER_UP'], + 'mtu': 65536, + 'qdisc': 'noqueue', + 'operstate': 'UNKNOWN', + 'linkmode': 'DEFAULT', + 'group': 'default', + 'txqlen': 1000, + 'link_type': 'loopback', + 'address': '00:00:00:00:00:00', + 'broadcast': '00:00:00:00:00:00'}] + + """ + + def routes( + self, table="all", device=None, scope=None, proto=None, src=None, metric=None + ): + """Returns the routes installed in *all* routing tables. + + >>> host.iproute2.routes() + [{'dst': '169.254.0.0/16', + 'dev': 'wlp4s0', + 'scope': 'link', + 'metric': 1000, + 'flags': []}, + {'type': 'multicast', + 'dst': 'ff00::/8', + 'dev': 'wlp4s0', + 'table': 'local', + 'protocol': 'kernel', + 'metric': 256, + 'flags': [], + 'pref': 'medium'}] + + Optionally, routes returned can be filtered with the following + selectors. This can be useful in busy routing tables. + + * table + * device (maps to ip-route's 'dev' selector) + * scope + * proto + * src + * metric + + """ + + def rules( + self, + src=None, + to=None, + tos=None, + fwmark=None, + iif=None, + oif=None, + pref=None, + uidrange=None, + ipproto=None, + sport=None, + dport=None, + ): + """Returns the rules our routing policy consists of. + + >>> host.iproute2.rules() + [{'priority': 0, 'src': 'all', 'table': 'local'}, + {'priority': 32765, 'src': '1.2.3.4', 'table': '123'}, + {'priority': 32766, 'src': 'all', 'table': 'main'}, + {'priority': 32767, 'src': 'all', 'table': 'default'}] + + Optionally, rules returned can be filtered with the following + selectors. This can be useful in busy rulesets. + + * src (maps to ip-rule's 'from' selector) + * to + * tos + * fwmark + * iif + * oif + * pref + * uidrange + * ipproto + * sport + * dport + + """ + + def tunnels(self, ifname=None): + """Returns all configured tunnels + + >>> host.iproute2.tunnels() + [{'ifname': 'test1', + 'mode': 'ip/ip', + 'remote': '127.0.0.2', + 'local': '0.0.0.0'}] + + Optionally, tunnels returned can be filtered with the interface name. + This can be faster in busy tunnel installations. + + * ifname + + """ + + def vrfs(self): + """Returns all configured vrfs""" + cmd = f"{self._ip} --json vrf show" + out = self.check_output(cmd) + return json.loads(out) + + def netns(self): + """Returns all configured network namespaces + + >>> host.iproute2.netns() + [{'name': 'test'}] + """ + + def bridge_vlan(self): + """Returns all configured vlans + + >>> host.iproute2.bridge_vlan() + [] + """ + + def bridge_fdb(self): + """Returns all configured fdb entries + + >>> host.iproute2.bridge_fdb() + [{'mac': '33:33:00:00:00:01', + 'ifname': 'enp0s31f6', + 'flags': ['self'], + 'state': 'permanent'}] + """ + + def bridge_mdb(self): + """Returns all configured mdb entries + + >>> host.iproute2.bridge_mdb() + [{'mdb': [], 'router': {}}] + + """ + + def bridge_link(self): + """Returns all configured links + + >>> host.iproute2.bridge_link() + [] + """ + + +class LinuxIProute2(IProute2): @functools.cached_property def _ip(self): ip_cmd = self.find_command("ip") @@ -326,9 +543,9 @@ def vrfs(self): def netns(self): """Returns all configured network namespaces - >>> host.iproute2.netns() - [{'name': 'test'}] - """ + >>> host.iproute2.netns() + [{'name': 'test'}] + """ cmd = f"{self._ip} --json netns show" out = self.check_output(cmd) @@ -339,9 +556,9 @@ def netns(self): def bridge_vlan(self): """Returns all configured vlans - >>> host.iproute2.bridge_vlan() - [] - """ + >>> host.iproute2.bridge_vlan() + [] + """ cmd = f"{self._bridge} -json vlan show" out = self.check_output(cmd) @@ -350,11 +567,11 @@ def bridge_vlan(self): def bridge_fdb(self): """Returns all configured fdb entries - >>> host.iproute2.bridge_fdb() - [{'mac': '33:33:00:00:00:01', - 'ifname': 'enp0s31f6', - 'flags': ['self'], - 'state': 'permanent'}] + >>> host.iproute2.bridge_fdb() + [{'mac': '33:33:00:00:00:01', + 'ifname': 'enp0s31f6', + 'flags': ['self'], + 'state': 'permanent'}] """ cmd = f"{self._bridge} -json fdb show" @@ -364,10 +581,10 @@ def bridge_fdb(self): def bridge_mdb(self): """Returns all configured mdb entries - >>> host.iproute2.bridge_mdb() - [{'mdb': [], 'router': {}}] + >>> host.iproute2.bridge_mdb() + [{'mdb': [], 'router': {}}] - """ + """ cmd = f"{self._bridge} -json mdb show" out = self.check_output(cmd) @@ -376,9 +593,9 @@ def bridge_mdb(self): def bridge_link(self): """Returns all configured links - >>> host.iproute2.bridge_link() - [] - """ + >>> host.iproute2.bridge_link() + [] + """ cmd = f"{self._bridge} -json link show" out = self.check_output(cmd) From 1f7b1aff67f92e38b64d66f7584fba8293564028 Mon Sep 17 00:00:00 2001 From: Costas Drogos Date: Thu, 31 Aug 2023 10:59:41 +0200 Subject: [PATCH 10/10] iproute2: fix tests after moving Module --- test/test_modules.py | 42 +++++++++++++++------------ testinfra/modules/iproute2.py | 54 ++++++++++++++++++++--------------- 2 files changed, 55 insertions(+), 41 deletions(-) diff --git a/test/test_modules.py b/test/test_modules.py index 1bbf2d45..9cb576f1 100644 --- a/test/test_modules.py +++ b/test/test_modules.py @@ -650,18 +650,22 @@ def test_interface(host, family): assert default_itf.exists -def test_iproute2_addresses(host): +@pytest.mark.parametrize( + "family", + ["inet", None], +) +def test_iproute2_addresses(host, family): assert host.iproute2.exists - addresses = host.iproute2.addresses() + addresses = host.iproute2(family=family).addresses() assert len(addresses) > 0 assert addresses[0].get("ifname") and addresses[0].get("ifindex") - filtered_addresses = host.iproute2.addresses(ifname="lo") + filtered_addresses = host.iproute2(family=family).addresses(ifname="lo") assert filtered_addresses[0].get("ifname") == "lo" and len(filtered_addresses) == 1 - filtered_addresses2 = host.iproute2.addresses(local="127.0.0.1") + filtered_addresses2 = host.iproute2(family=family).addresses(local="127.0.0.1") assert ( filtered_addresses2[0].get("ifname") == "lo" and len(filtered_addresses2) == 1 ) @@ -670,7 +674,7 @@ def test_iproute2_addresses(host): def test_iproute2_links(host): assert host.iproute2.exists - links = host.iproute2.links() + links = host.iproute2().links() assert len(links) > 0 and len(links) < 4 assert links[0].get("ifname") and links[0].get("ifindex") @@ -679,17 +683,19 @@ def test_iproute2_links(host): def test_iproute2_routes(host): assert host.iproute2.exists - routes = host.iproute2.routes() + routes = host.iproute2().routes() assert len(routes) > 0 - filtered_routes = host.iproute2.routes(table="local", scope="host", src="127.0.0.1") + filtered_routes = host.iproute2().routes( + table="local", scope="host", src="127.0.0.1" + ) assert filtered_routes[0].get("protocol") == "kernel" and len(filtered_routes) > 1 def test_iproute2_rules(host): assert host.iproute2.exists - rules = host.iproute2.rules() + rules = host.iproute2().rules() assert len(rules) > 0 and len(rules) < 4 assert rules[0].get("priority") == 0 assert rules[0].get("src") == "all" @@ -698,7 +704,7 @@ def test_iproute2_rules(host): cmd = host.run("ip rule add from 1.2.3.4/32 table 123") assert cmd.exit_status == 0, f"{cmd.stdout}\n{cmd.stderr}" - rules_123 = host.iproute2.rules(src="1.2.3.4/32") + rules_123 = host.iproute2().rules(src="1.2.3.4/32") assert len(rules_123) > 0 assert rules_123[0].get("src") == "1.2.3.4" @@ -706,13 +712,13 @@ def test_iproute2_rules(host): def test_iproute2_tunnels(host): assert host.iproute2.exists - tunnels = host.iproute2.tunnels() + tunnels = host.iproute2().tunnels() assert len(tunnels) > 0 cmd = host.run("ip tunnel add test mode ipip remote 127.0.0.1") assert cmd.exit_status == 0, f"{cmd.stdout}\n{cmd.stderr}" - tunnels = host.iproute2.tunnels(ifname="test") + tunnels = host.iproute2().tunnels(ifname="test") assert len(tunnels) > 0 assert tunnels[0].get("ifname") == "test" assert tunnels[0].get("mode") == "ip/ip" @@ -722,20 +728,20 @@ def test_iproute2_tunnels(host): def test_iproute2_vrfs(host): assert host.iproute2.exists - vrfs = host.iproute2.vrfs() + vrfs = host.iproute2().vrfs() assert len(vrfs) == 0 def test_iproute2_netns(host): assert host.iproute2.exists - namespaces = host.iproute2.netns() + namespaces = host.iproute2().netns() assert len(namespaces) == 0 cmd = host.run("ip netns add test") assert cmd.exit_status == 0, f"{cmd.stdout}\n{cmd.stderr}" - namespaces = host.iproute2.netns() + namespaces = host.iproute2().netns() assert len(namespaces) == 1 assert namespaces[0].get("name") == "test" @@ -743,21 +749,21 @@ def test_iproute2_netns(host): def test_iproute2_bridge_vlan(host): assert host.iproute2.bridge_exists - vlans = host.iproute2.bridge_vlan() + vlans = host.iproute2().bridge_vlan() assert len(vlans) == 0 def test_iproute2_bridge_fdb(host): assert host.iproute2.bridge_exists - fdb = host.iproute2.bridge_fdb() + fdb = host.iproute2().bridge_fdb() assert len(fdb) > 0 def test_iproute2_bridge_mdb(host): assert host.iproute2.bridge_exists - mdb = host.iproute2.bridge_mdb() + mdb = host.iproute2().bridge_mdb() assert len(mdb) == 1 assert len(mdb[0].get("mdb")) == 0 assert len(mdb[0].get("router")) == 0 @@ -766,5 +772,5 @@ def test_iproute2_bridge_mdb(host): def test_iproute2_bridge_link(host): assert host.iproute2.bridge_exists - links = host.iproute2.bridge_link() + links = host.iproute2().bridge_link() assert len(links) == 0 diff --git a/testinfra/modules/iproute2.py b/testinfra/modules/iproute2.py index 4e17ba66..436a8f8d 100644 --- a/testinfra/modules/iproute2.py +++ b/testinfra/modules/iproute2.py @@ -80,7 +80,7 @@ def bridge_exists(self): def addresses(self, address=None, ifname=None, local=None): """Returns the addresses associated with interfaces - >>> host.iproute2.addresses() + >>> host.iproute2().addresses() [{'ifindex': 1, 'ifname': 'lo', 'flags': ['LOOPBACK', 'UP', 'LOWER_UP'], @@ -114,11 +114,12 @@ def addresses(self, address=None, ifname=None, local=None): * local """ + raise NotImplementedError def links(self): """Returns links and their state. - >>> host.iproute2.links() + >>> host.iproute2().links() [{'ifindex': 1, 'ifname': 'lo', 'flags': ['LOOPBACK', 'UP', 'LOWER_UP'], @@ -133,13 +134,14 @@ def links(self): 'broadcast': '00:00:00:00:00:00'}] """ + raise NotImplementedError def routes( self, table="all", device=None, scope=None, proto=None, src=None, metric=None ): """Returns the routes installed in *all* routing tables. - >>> host.iproute2.routes() + >>> host.iproute2().routes() [{'dst': '169.254.0.0/16', 'dev': 'wlp4s0', 'scope': 'link', @@ -165,6 +167,7 @@ def routes( * metric """ + raise NotImplementedError def rules( self, @@ -182,7 +185,7 @@ def rules( ): """Returns the rules our routing policy consists of. - >>> host.iproute2.rules() + >>> host.iproute2().rules() [{'priority': 0, 'src': 'all', 'table': 'local'}, {'priority': 32765, 'src': '1.2.3.4', 'table': '123'}, {'priority': 32766, 'src': 'all', 'table': 'main'}, @@ -204,11 +207,12 @@ def rules( * dport """ + raise NotImplementedError def tunnels(self, ifname=None): """Returns all configured tunnels - >>> host.iproute2.tunnels() + >>> host.iproute2().tunnels() [{'ifname': 'test1', 'mode': 'ip/ip', 'remote': '127.0.0.2', @@ -220,51 +224,55 @@ def tunnels(self, ifname=None): * ifname """ + raise NotImplementedError def vrfs(self): """Returns all configured vrfs""" - cmd = f"{self._ip} --json vrf show" - out = self.check_output(cmd) - return json.loads(out) + raise NotImplementedError def netns(self): """Returns all configured network namespaces - >>> host.iproute2.netns() + >>> host.iproute2().netns() [{'name': 'test'}] """ + raise NotImplementedError def bridge_vlan(self): """Returns all configured vlans - >>> host.iproute2.bridge_vlan() + >>> host.iproute2().bridge_vlan() [] """ + raise NotImplementedError def bridge_fdb(self): """Returns all configured fdb entries - >>> host.iproute2.bridge_fdb() + >>> host.iproute2().bridge_fdb() [{'mac': '33:33:00:00:00:01', 'ifname': 'enp0s31f6', 'flags': ['self'], 'state': 'permanent'}] """ + raise NotImplementedError def bridge_mdb(self): """Returns all configured mdb entries - >>> host.iproute2.bridge_mdb() + >>> host.iproute2().bridge_mdb() [{'mdb': [], 'router': {}}] """ + raise NotImplementedError def bridge_link(self): """Returns all configured links - >>> host.iproute2.bridge_link() + >>> host.iproute2().bridge_link() [] """ + raise NotImplementedError class LinuxIProute2(IProute2): @@ -307,7 +315,7 @@ def bridge_exists(self): def addresses(self, address=None, ifname=None, local=None): """Returns the addresses associated with interfaces - >>> host.iproute2.addresses() + >>> host.iproute2().addresses() [{'ifindex': 1, 'ifname': 'lo', 'flags': ['LOOPBACK', 'UP', 'LOWER_UP'], @@ -362,7 +370,7 @@ def addresses(self, address=None, ifname=None, local=None): def links(self): """Returns links and their state. - >>> host.iproute2.links() + >>> host.iproute2().links() [{'ifindex': 1, 'ifname': 'lo', 'flags': ['LOOPBACK', 'UP', 'LOWER_UP'], @@ -386,7 +394,7 @@ def routes( ): """Returns the routes installed in *all* routing tables. - >>> host.iproute2.routes() + >>> host.iproute2().routes() [{'dst': '169.254.0.0/16', 'dev': 'wlp4s0', 'scope': 'link', @@ -447,7 +455,7 @@ def rules( ): """Returns the rules our routing policy consists of. - >>> host.iproute2.rules() + >>> host.iproute2().rules() [{'priority': 0, 'src': 'all', 'table': 'local'}, {'priority': 32765, 'src': '1.2.3.4', 'table': '123'}, {'priority': 32766, 'src': 'all', 'table': 'main'}, @@ -512,7 +520,7 @@ def rules( def tunnels(self, ifname=None): """Returns all configured tunnels - >>> host.iproute2.tunnels() + >>> host.iproute2().tunnels() [{'ifname': 'test1', 'mode': 'ip/ip', 'remote': '127.0.0.2', @@ -543,7 +551,7 @@ def vrfs(self): def netns(self): """Returns all configured network namespaces - >>> host.iproute2.netns() + >>> host.iproute2().netns() [{'name': 'test'}] """ @@ -556,7 +564,7 @@ def netns(self): def bridge_vlan(self): """Returns all configured vlans - >>> host.iproute2.bridge_vlan() + >>> host.iproute2().bridge_vlan() [] """ @@ -567,7 +575,7 @@ def bridge_vlan(self): def bridge_fdb(self): """Returns all configured fdb entries - >>> host.iproute2.bridge_fdb() + >>> host.iproute2().bridge_fdb() [{'mac': '33:33:00:00:00:01', 'ifname': 'enp0s31f6', 'flags': ['self'], @@ -581,7 +589,7 @@ def bridge_fdb(self): def bridge_mdb(self): """Returns all configured mdb entries - >>> host.iproute2.bridge_mdb() + >>> host.iproute2().bridge_mdb() [{'mdb': [], 'router': {}}] """ @@ -593,7 +601,7 @@ def bridge_mdb(self): def bridge_link(self): """Returns all configured links - >>> host.iproute2.bridge_link() + >>> host.iproute2().bridge_link() [] """