diff --git a/doc/source/modules.rst b/doc/source/modules.rst index 2a5b001f..182c4652 100644 --- a/doc/source/modules.rst +++ b/doc/source/modules.rst @@ -50,6 +50,10 @@ host :class:`testinfra.modules.interface.Interface` class + .. attribute:: iproute2 + + :class:`testinfra.modules.iproute2.IProute2` class + .. attribute:: iptables :class:`testinfra.modules.iptables.Iptables` class @@ -176,6 +180,13 @@ Interface :undoc-members: :exclude-members: get_module_class +IProute2 +~~~~~~~~~ + +.. autoclass:: testinfra.modules.iproute2.IProute2 + :members: + :undoc-members: + :exclude-members: get_module_class Iptables ~~~~~~~~~ diff --git a/test/test_modules.py b/test/test_modules.py index 3106479f..9cb576f1 100644 --- a/test/test_modules.py +++ b/test/test_modules.py @@ -633,7 +633,7 @@ def test_addr_namespace(host): ) def test_interface(host, family): # exist - assert host.interface("eth0", family=family).exists + assert host.interface("tap0", family=family).exists assert not host.interface("does_not_exist", family=family).exists # addresses addresses = host.interface.default(family).addresses @@ -648,3 +648,129 @@ def test_interface(host, family): default_itf = host.interface.default(family) assert default_itf.name == "eth0" assert default_itf.exists + + +@pytest.mark.parametrize( + "family", + ["inet", None], +) +def test_iproute2_addresses(host, family): + assert host.iproute2.exists + + addresses = host.iproute2(family=family).addresses() + + assert len(addresses) > 0 + assert addresses[0].get("ifname") and addresses[0].get("ifindex") + + filtered_addresses = host.iproute2(family=family).addresses(ifname="lo") + assert filtered_addresses[0].get("ifname") == "lo" and len(filtered_addresses) == 1 + + filtered_addresses2 = host.iproute2(family=family).addresses(local="127.0.0.1") + assert ( + filtered_addresses2[0].get("ifname") == "lo" and len(filtered_addresses2) == 1 + ) + + +def test_iproute2_links(host): + assert host.iproute2.exists + + links = host.iproute2().links() + assert len(links) > 0 and len(links) < 4 + + assert links[0].get("ifname") and links[0].get("ifindex") + + +def test_iproute2_routes(host): + assert host.iproute2.exists + + routes = host.iproute2().routes() + assert len(routes) > 0 + + filtered_routes = host.iproute2().routes( + table="local", scope="host", src="127.0.0.1" + ) + assert filtered_routes[0].get("protocol") == "kernel" and len(filtered_routes) > 1 + + +def test_iproute2_rules(host): + assert host.iproute2.exists + + rules = host.iproute2().rules() + assert len(rules) > 0 and len(rules) < 4 + assert rules[0].get("priority") == 0 + assert rules[0].get("src") == "all" + assert rules[0].get("table") == "local" + + cmd = host.run("ip rule add from 1.2.3.4/32 table 123") + assert cmd.exit_status == 0, f"{cmd.stdout}\n{cmd.stderr}" + + rules_123 = host.iproute2().rules(src="1.2.3.4/32") + assert len(rules_123) > 0 + assert rules_123[0].get("src") == "1.2.3.4" + + +def test_iproute2_tunnels(host): + assert host.iproute2.exists + + tunnels = host.iproute2().tunnels() + assert len(tunnels) > 0 + + cmd = host.run("ip tunnel add test mode ipip remote 127.0.0.1") + assert cmd.exit_status == 0, f"{cmd.stdout}\n{cmd.stderr}" + + tunnels = host.iproute2().tunnels(ifname="test") + assert len(tunnels) > 0 + assert tunnels[0].get("ifname") == "test" + assert tunnels[0].get("mode") == "ip/ip" + assert tunnels[0].get("remote") == "127.0.0.1" + + +def test_iproute2_vrfs(host): + assert host.iproute2.exists + + vrfs = host.iproute2().vrfs() + assert len(vrfs) == 0 + + +def test_iproute2_netns(host): + assert host.iproute2.exists + + namespaces = host.iproute2().netns() + assert len(namespaces) == 0 + + cmd = host.run("ip netns add test") + assert cmd.exit_status == 0, f"{cmd.stdout}\n{cmd.stderr}" + + namespaces = host.iproute2().netns() + assert len(namespaces) == 1 + assert namespaces[0].get("name") == "test" + + +def test_iproute2_bridge_vlan(host): + assert host.iproute2.bridge_exists + + vlans = host.iproute2().bridge_vlan() + assert len(vlans) == 0 + + +def test_iproute2_bridge_fdb(host): + assert host.iproute2.bridge_exists + + fdb = host.iproute2().bridge_fdb() + assert len(fdb) > 0 + + +def test_iproute2_bridge_mdb(host): + assert host.iproute2.bridge_exists + + mdb = host.iproute2().bridge_mdb() + assert len(mdb) == 1 + assert len(mdb[0].get("mdb")) == 0 + assert len(mdb[0].get("router")) == 0 + + +def test_iproute2_bridge_link(host): + assert host.iproute2.bridge_exists + + links = host.iproute2().bridge_link() + assert len(links) == 0 diff --git a/testinfra/modules/__init__.py b/testinfra/modules/__init__.py index 94df79cb..d38aaee3 100644 --- a/testinfra/modules/__init__.py +++ b/testinfra/modules/__init__.py @@ -27,6 +27,7 @@ "group": "group:Group", "interface": "interface:Interface", "iptables": "iptables:Iptables", + "iproute2": "iproute2:IProute2", "mount_point": "mountpoint:MountPoint", "package": "package:Package", "pip": "pip:Pip", diff --git a/testinfra/modules/iproute2.py b/testinfra/modules/iproute2.py new file mode 100644 index 00000000..436a8f8d --- /dev/null +++ b/testinfra/modules/iproute2.py @@ -0,0 +1,610 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import functools +import json + +from testinfra.modules.base import Module + + +class IProute2(Module): + """Tests network configuration via iproute2 commands + + Currently supported: + + * ip-address + * ip-link + * ip-route + * ip-rule + * ip-vrf + * ip-tunnel + * ip-netns + * bridge vlan + * bridge link + * bridge fdb + * bridge mdb + + Optional module-level arguments can also be provided to control execution: + + * **family**: force iproute2 tools to use a specific protocol family + + >>> host.iproute2(family="inet").addresses() + + * **namespace**: execute iproute2 tools inside the provided namespace + + >>> host.iproute2(namespace="test").addresses() + + """ + + def __init__(self, family=None, namespace=None): + self.family = family + self.namespace = namespace + super().__init__() + + def __repr__(self): + return "" + + @classmethod + def get_module_class(cls, host): + if host.system_info.type == "linux": + return LinuxIProute2 + raise NotImplementedError + + @property + def exists(self): + """Returns True if ip -V succeeds + + >>> host.iproute2.exists + True + + """ + + @property + def bridge_exists(self): + """Returns True if bridge -V succeeds + + >>> host.iproute2.bridge_exists + True + + """ + + def addresses(self, address=None, ifname=None, local=None): + """Returns the addresses associated with interfaces + + >>> host.iproute2().addresses() + [{'ifindex': 1, + 'ifname': 'lo', + 'flags': ['LOOPBACK', 'UP', 'LOWER_UP'], + 'mtu': 65536, + 'qdisc': 'noqueue', + 'operstate': 'UNKNOWN', + 'group': 'default', + 'txqlen': 1000, + 'link_type': 'loopback', + 'address': '00:00:00:00:00:00', + 'broadcast': '00:00:00:00:00:00', + 'addr_info': [{'family': 'inet', + 'local': '127.0.0.1', + 'prefixlen': 8, + 'scope': 'host', + 'label': 'lo', + 'valid_life_time': 4294967295, + 'preferred_life_time': 4294967295}, + {'family': 'inet6', + 'local': '::1', + 'prefixlen': 128, + 'scope': 'host', + 'noprefixroute': True, + 'valid_life_time': 4294967295, + 'preferred_life_time': 4294967295}]}] + + Optionally, results can be filtered with the following selectors: + + * address + * ifname + * local + + """ + raise NotImplementedError + + def links(self): + """Returns links and their state. + + >>> host.iproute2().links() + [{'ifindex': 1, + 'ifname': 'lo', + 'flags': ['LOOPBACK', 'UP', 'LOWER_UP'], + 'mtu': 65536, + 'qdisc': 'noqueue', + 'operstate': 'UNKNOWN', + 'linkmode': 'DEFAULT', + 'group': 'default', + 'txqlen': 1000, + 'link_type': 'loopback', + 'address': '00:00:00:00:00:00', + 'broadcast': '00:00:00:00:00:00'}] + + """ + raise NotImplementedError + + def routes( + self, table="all", device=None, scope=None, proto=None, src=None, metric=None + ): + """Returns the routes installed in *all* routing tables. + + >>> host.iproute2().routes() + [{'dst': '169.254.0.0/16', + 'dev': 'wlp4s0', + 'scope': 'link', + 'metric': 1000, + 'flags': []}, + {'type': 'multicast', + 'dst': 'ff00::/8', + 'dev': 'wlp4s0', + 'table': 'local', + 'protocol': 'kernel', + 'metric': 256, + 'flags': [], + 'pref': 'medium'}] + + Optionally, routes returned can be filtered with the following + selectors. This can be useful in busy routing tables. + + * table + * device (maps to ip-route's 'dev' selector) + * scope + * proto + * src + * metric + + """ + raise NotImplementedError + + def rules( + self, + src=None, + to=None, + tos=None, + fwmark=None, + iif=None, + oif=None, + pref=None, + uidrange=None, + ipproto=None, + sport=None, + dport=None, + ): + """Returns the rules our routing policy consists of. + + >>> host.iproute2().rules() + [{'priority': 0, 'src': 'all', 'table': 'local'}, + {'priority': 32765, 'src': '1.2.3.4', 'table': '123'}, + {'priority': 32766, 'src': 'all', 'table': 'main'}, + {'priority': 32767, 'src': 'all', 'table': 'default'}] + + Optionally, rules returned can be filtered with the following + selectors. This can be useful in busy rulesets. + + * src (maps to ip-rule's 'from' selector) + * to + * tos + * fwmark + * iif + * oif + * pref + * uidrange + * ipproto + * sport + * dport + + """ + raise NotImplementedError + + def tunnels(self, ifname=None): + """Returns all configured tunnels + + >>> host.iproute2().tunnels() + [{'ifname': 'test1', + 'mode': 'ip/ip', + 'remote': '127.0.0.2', + 'local': '0.0.0.0'}] + + Optionally, tunnels returned can be filtered with the interface name. + This can be faster in busy tunnel installations. + + * ifname + + """ + raise NotImplementedError + + def vrfs(self): + """Returns all configured vrfs""" + raise NotImplementedError + + def netns(self): + """Returns all configured network namespaces + + >>> host.iproute2().netns() + [{'name': 'test'}] + """ + raise NotImplementedError + + def bridge_vlan(self): + """Returns all configured vlans + + >>> host.iproute2().bridge_vlan() + [] + """ + raise NotImplementedError + + def bridge_fdb(self): + """Returns all configured fdb entries + + >>> host.iproute2().bridge_fdb() + [{'mac': '33:33:00:00:00:01', + 'ifname': 'enp0s31f6', + 'flags': ['self'], + 'state': 'permanent'}] + """ + raise NotImplementedError + + def bridge_mdb(self): + """Returns all configured mdb entries + + >>> host.iproute2().bridge_mdb() + [{'mdb': [], 'router': {}}] + + """ + raise NotImplementedError + + def bridge_link(self): + """Returns all configured links + + >>> host.iproute2().bridge_link() + [] + """ + raise NotImplementedError + + +class LinuxIProute2(IProute2): + @functools.cached_property + def _ip(self): + ip_cmd = self.find_command("ip") + if self.namespace is not None: + ip_cmd = f"{ip_cmd} -n {self.namespace}" + if self.family is not None: + ip_cmd = f"{ip_cmd} -f {self.family}" + return ip_cmd + + @functools.cached_property + def _bridge(self): + bridge_cmd = self.find_command("bridge") + if self.namespace is not None: + bridge_cmd = f"{bridge_cmd} -n {self.namespace}" + return bridge_cmd + + @property + def exists(self): + """Returns True if ip -V succeeds + + >>> host.iproute2.exists + True + + """ + return self.run_test("{} -V".format(self._ip)).rc == 0 + + @property + def bridge_exists(self): + """Returns True if bridge -V succeeds + + >>> host.iproute2.bridge_exists + True + + """ + return self.run_test("{} -V".format(self._bridge)).rc == 0 + + def addresses(self, address=None, ifname=None, local=None): + """Returns the addresses associated with interfaces + + >>> host.iproute2().addresses() + [{'ifindex': 1, + 'ifname': 'lo', + 'flags': ['LOOPBACK', 'UP', 'LOWER_UP'], + 'mtu': 65536, + 'qdisc': 'noqueue', + 'operstate': 'UNKNOWN', + 'group': 'default', + 'txqlen': 1000, + 'link_type': 'loopback', + 'address': '00:00:00:00:00:00', + 'broadcast': '00:00:00:00:00:00', + 'addr_info': [{'family': 'inet', + 'local': '127.0.0.1', + 'prefixlen': 8, + 'scope': 'host', + 'label': 'lo', + 'valid_life_time': 4294967295, + 'preferred_life_time': 4294967295}, + {'family': 'inet6', + 'local': '::1', + 'prefixlen': 128, + 'scope': 'host', + 'noprefixroute': True, + 'valid_life_time': 4294967295, + 'preferred_life_time': 4294967295}]}] + + Optionally, results can be filtered with the following selectors: + + * address + * ifname + * local + + """ + cmd = f"{self._ip} --json address show" + out = self.check_output(cmd) + j = json.loads(out) + o = [] + if address is None and ifname is None and local is None: + # no filters, bail out early + return j + if address is not None: + [o.append(x) for x in j if x["address"] == address] + if ifname is not None: + [o.append(x) for x in j if x["ifname"] == ifname] + if local is not None: + for x in j: + for addr in x["addr_info"]: # multiple IPs in an interface + if addr["local"] == local: + o.append(x) + return o + + def links(self): + """Returns links and their state. + + >>> host.iproute2().links() + [{'ifindex': 1, + 'ifname': 'lo', + 'flags': ['LOOPBACK', 'UP', 'LOWER_UP'], + 'mtu': 65536, + 'qdisc': 'noqueue', + 'operstate': 'UNKNOWN', + 'linkmode': 'DEFAULT', + 'group': 'default', + 'txqlen': 1000, + 'link_type': 'loopback', + 'address': '00:00:00:00:00:00', + 'broadcast': '00:00:00:00:00:00'}] + + """ + cmd = f"{self._ip} --json link show" + out = self.check_output(cmd) + return json.loads(out) + + def routes( + self, table="all", device=None, scope=None, proto=None, src=None, metric=None + ): + """Returns the routes installed in *all* routing tables. + + >>> host.iproute2().routes() + [{'dst': '169.254.0.0/16', + 'dev': 'wlp4s0', + 'scope': 'link', + 'metric': 1000, + 'flags': []}, + {'type': 'multicast', + 'dst': 'ff00::/8', + 'dev': 'wlp4s0', + 'table': 'local', + 'protocol': 'kernel', + 'metric': 256, + 'flags': [], + 'pref': 'medium'}] + + Optionally, routes returned can be filtered with the following + selectors. This can be useful in busy routing tables. + + * table + * device (maps to ip-route's 'dev' selector) + * scope + * proto + * src + * metric + + """ + cmd = f"{self._ip} --json route show " + options = [] + if table is not None: + options += ["table", table] + if device is not None: + options += ["dev", device] + if scope is not None: + options += ["scope", scope] + if proto is not None: + options += ["proto", proto] + if src is not None: + options += ["src", src] + if metric is not None: + options += ["metric", metric] + + cmd += " ".join(options) + out = self.check_output(cmd) + return json.loads(out) + + def rules( + self, + src=None, + to=None, + tos=None, + fwmark=None, + iif=None, + oif=None, + pref=None, + uidrange=None, + ipproto=None, + sport=None, + dport=None, + ): + """Returns the rules our routing policy consists of. + + >>> host.iproute2().rules() + [{'priority': 0, 'src': 'all', 'table': 'local'}, + {'priority': 32765, 'src': '1.2.3.4', 'table': '123'}, + {'priority': 32766, 'src': 'all', 'table': 'main'}, + {'priority': 32767, 'src': 'all', 'table': 'default'}] + + Optionally, rules returned can be filtered with the following + selectors. This can be useful in busy rulesets. + + * src (maps to ip-rule's 'from' selector) + * to + * tos + * fwmark + * iif + * oif + * pref + * uidrange + * ipproto + * sport + * dport + + """ + cmd = f"{self._ip} --json rule show " + + options = [] + if src is not None: + options += ["from", src] + + if to is not None: + options += ["to", to] + + if tos is not None: + options += ["tos", tos] + + if fwmark is not None: + options += ["fwmark", fwmark] + + if iif is not None: + options += ["iif", iif] + + if oif is not None: + options += ["oif", oif] + + if pref is not None: + options += ["pref", pref] + + if uidrange is not None: + options += ["uidrange", uidrange] + + if ipproto is not None: + options += ["ipproto", ipproto] + + if sport is not None: + options += ["sport", sport] + + if dport is not None: + options += ["dport", dport] + + cmd += " ".join(options) + out = self.check_output(cmd) + return json.loads(out) + + def tunnels(self, ifname=None): + """Returns all configured tunnels + + >>> host.iproute2().tunnels() + [{'ifname': 'test1', + 'mode': 'ip/ip', + 'remote': '127.0.0.2', + 'local': '0.0.0.0'}] + + Optionally, tunnels returned can be filtered with the interface name. + This can be faster in busy tunnel installations. + + * ifname + + """ + cmd = f"{self._ip} --json tunnel show " + + options = [] + if ifname is not None: + options += [ifname] + + cmd += " ".join(options) + out = self.check_output(cmd) + return json.loads(out) + + def vrfs(self): + """Returns all configured vrfs""" + cmd = f"{self._ip} --json vrf show" + out = self.check_output(cmd) + return json.loads(out) + + def netns(self): + """Returns all configured network namespaces + + >>> host.iproute2().netns() + [{'name': 'test'}] + """ + + cmd = f"{self._ip} --json netns show" + out = self.check_output(cmd) + if not out: # ip netns returns null instead of [] in json mode + return json.loads("[]\n") + return json.loads(out) + + def bridge_vlan(self): + """Returns all configured vlans + + >>> host.iproute2().bridge_vlan() + [] + """ + + cmd = f"{self._bridge} -json vlan show" + out = self.check_output(cmd) + return json.loads(out) + + def bridge_fdb(self): + """Returns all configured fdb entries + + >>> host.iproute2().bridge_fdb() + [{'mac': '33:33:00:00:00:01', + 'ifname': 'enp0s31f6', + 'flags': ['self'], + 'state': 'permanent'}] + """ + + cmd = f"{self._bridge} -json fdb show" + out = self.check_output(cmd) + return json.loads(out) + + def bridge_mdb(self): + """Returns all configured mdb entries + + >>> host.iproute2().bridge_mdb() + [{'mdb': [], 'router': {}}] + + """ + + cmd = f"{self._bridge} -json mdb show" + out = self.check_output(cmd) + return json.loads(out) + + def bridge_link(self): + """Returns all configured links + + >>> host.iproute2().bridge_link() + [] + """ + + cmd = f"{self._bridge} -json link show" + out = self.check_output(cmd) + return json.loads(out)