Skip to content

Commit 803fbd6

Browse files
authored
Add netem chaos tests (MaterializeInc#3387)
Use pumba (a tool for chaos testing Docker containers) to add the following chaos networking tests: delay, rate, loss, duplication, corruption. Each new type of networking test is added as a new workflow in the chaos crate.
1 parent f1b3717 commit 803fbd6

File tree

2 files changed

+174
-37
lines changed

2 files changed

+174
-37
lines changed

misc/python/materialize/cli/mzconduct.py

Lines changed: 101 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import webbrowser
2020
from datetime import datetime, timezone
2121
from pathlib import Path
22+
from threading import Thread
2223
from typing import (
2324
Any,
2425
Callable,
@@ -946,27 +947,110 @@ def run(self, comp: Composition, workflow: Workflow) -> None:
946947
raise Failed(f"Unable to kill container {container_id}: {e}")
947948

948949

949-
@Steps.register("chaos-delay-docker")
950-
class ChaosDelayDockerStep(WorkflowStep):
951-
"""Delay the incoming and outgoing network traffic for a Docker service.
950+
class ChaosNetemStep(WorkflowStep):
951+
"""Base class for running network chaos tests against a Docker container.
952952
953-
Params:
954-
service: Docker service to delay, will be used to grep for container id
955-
NOTE: service name must be unique to correctly match the container id
956-
delay: milliseconds to delay network traffic (default: 100ms)
953+
We use pumba (a chaos testing tool for Docker) to run the various netem tests.
954+
pumba only supports Docker container names (not container ids), meaning that
955+
we have to pass the exact container name to these steps. We should fix this in
956+
the future.
957957
"""
958958

959-
def __init__(self, service: str, delay: int = 100) -> None:
960-
self._service = service
961-
self._delay = delay
959+
def __init__(self, duration: int):
960+
self._duration = duration
962961

963962
def run(self, comp: Composition, workflow: Workflow) -> None:
963+
if self._duration == -1:
964+
# If duration isn't provided, run for 7 days in a non-blocking thread.
965+
duration = 10080
966+
else:
967+
duration = self._duration
968+
969+
cmd = self.get_cmd(duration).split()
970+
netem_thread = Thread(target=self.threaded_netem, args=(cmd,), daemon=True)
971+
netem_thread.start()
972+
973+
def get_cmd(self, duration: int) -> str:
974+
pass
975+
976+
def threaded_netem(self, cmd: List[str]) -> None:
964977
try:
965-
container_id = comp.get_container_id(self._service)
966-
cmd = f"docker exec {container_id} tc qdisc add dev eth0 root netem delay {self._delay}ms".split()
967978
spawn.runv(cmd)
968979
except subprocess.CalledProcessError as e:
969-
raise Failed(f"Unable to delay container {container_id}: {e}")
980+
raise Failed(f"Unable to run netem chaos command: {e}")
981+
982+
983+
@Steps.register("chaos-delay-docker")
984+
class ChaosDelayDockerStep(ChaosNetemStep):
985+
"""Delay the egress network traffic for a Docker service.
986+
"""
987+
988+
def __init__(
989+
self, container: str, duration: int = -1, delay: int = 250, jitter: int = 250,
990+
) -> None:
991+
super().__init__(duration=duration)
992+
self._container = container
993+
self._delay = delay
994+
self._jitter = jitter
995+
996+
def get_cmd(self, duration: int) -> str:
997+
return f"pumba --random netem --duration {duration}m delay --time {self._delay} \
998+
--jitter {self._jitter} --distribution normal {self._container}"
999+
1000+
1001+
@Steps.register("chaos-rate-docker")
1002+
class ChaosRateDockerStep(ChaosNetemStep):
1003+
"""Limit the egress network traffic for a Docker service.
1004+
"""
1005+
1006+
def __init__(self, container: str, duration: int = -1) -> None:
1007+
super().__init__(duration=duration)
1008+
self._container = container
1009+
1010+
def get_cmd(self, duration: int) -> str:
1011+
return f"pumba netem --duration {duration}m rate {self._container}"
1012+
1013+
1014+
@Steps.register("chaos-loss-docker")
1015+
class ChaosLossDockerStep(ChaosNetemStep):
1016+
"""Lose a percent of a Docker container's network packets.
1017+
"""
1018+
1019+
def __init__(self, container: str, percent: int, duration: int = -1) -> None:
1020+
super().__init__(duration=duration)
1021+
self._container = container
1022+
self._percent = percent
1023+
1024+
def get_cmd(self, duration: int) -> str:
1025+
return f"pumba netem --duration {duration}m loss --percent {self._percent} {self._container}"
1026+
1027+
1028+
@Steps.register("chaos-duplicate-docker")
1029+
class ChaosDuplicateDockerStep(ChaosNetemStep):
1030+
"""Duplicate a percent of a Docker container's network packets.
1031+
"""
1032+
1033+
def __init__(self, container: str, percent: int, duration: int = -1) -> None:
1034+
super().__init__(duration=duration)
1035+
self._container = container
1036+
self._percent = percent
1037+
1038+
def get_cmd(self, duration: int) -> str:
1039+
return f"pumba netem --duration {duration}m duplicate --percent {self._percent} {self._container}"
1040+
1041+
1042+
@Steps.register("chaos-corrupt-docker")
1043+
class ChaosCorruptDockerStep(ChaosNetemStep):
1044+
"""Corrupt a percent of a Docker container's network packets.
1045+
"""
1046+
1047+
def __init__(self, container: str, percent: int, duration: int = -1) -> None:
1048+
super().__init__(duration=duration)
1049+
self._container = container
1050+
self._percent = percent
1051+
1052+
def get_cmd(self, duration: int) -> str:
1053+
return f"pumba netem --duration {duration}m corrupt --percent {self._percent} {self._container}"
9701054

9711055

9721056
@Steps.register("chaos-confirm")
@@ -991,6 +1075,10 @@ def run(self, comp: Composition, workflow: Workflow) -> None:
9911075
if not comp.docker_container_is_running(container_id):
9921076
raise Failed(f"chaos-confirm: container {container_id} is not running")
9931077
else:
1078+
if comp.docker_container_is_running(container_id):
1079+
raise Failed(
1080+
f"chaos-confirm: expected {container_id} to have exited, is running"
1081+
)
9941082
actual_exit_code = comp.docker_inspect("{{.State.ExitCode}}", container_id)
9951083
if actual_exit_code != f"'{self._exit_code}'":
9961084
raise Failed(

test/chaos/mzcompose.yml

Lines changed: 73 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -69,35 +69,58 @@ services:
6969

7070
mzconduct:
7171
workflows:
72-
# This test is designed to delay inbound and outbound network
73-
# traffic of the Kafka broker.
74-
#
75-
# Success: Kafka broker traffic is delayed, materialize does not
76-
# crash, chaos container exits successfully.
77-
# Note: to confirm that broker traffic is delayed, you can run the
78-
# following on the Kafka broker's container: `tc qdisc show dev eth0`
79-
# Failure: Kafka broker traffic is not delayed, materialize crashes,
80-
# or chaos container exits unsuccessfully.
72+
# This test is designed to delay egress network traffic of the Kafka broker.
8173
delay-kafka:
8274
steps:
8375
- step: workflow
8476
workflow: start-everything
8577
- step: chaos-delay-docker
86-
service: kafka
87-
- step: run
88-
service: chaos
89-
command: >-
90-
--materialized-host materialized
91-
--materialized-port 6875
92-
--kafka-url kafka:9092
93-
--kafka-partitions 5
94-
--message-count 1000000
95-
- step: chaos-confirm
96-
service: materialized
97-
running: true
98-
- step: chaos-confirm
99-
service: chaos_run
100-
exit_code: 0
78+
container: chaos_kafka_1
79+
- step: workflow
80+
workflow: run-netem-and-confirm
81+
82+
# This test is designed to rate limit egress network traffic of the Kafka broker.
83+
rate-kafka:
84+
steps:
85+
- step: workflow
86+
workflow: start-everything
87+
- step: chaos-rate-docker
88+
container: chaos_kafka_1
89+
- step: workflow
90+
workflow: run-netem-and-confirm
91+
92+
# This test is designed to test packet loss from the Kafka broker.
93+
loss-kafka:
94+
steps:
95+
- step: workflow
96+
workflow: start-everything
97+
- step: chaos-loss-docker
98+
container: chaos_kafka_1
99+
percent: 10
100+
- step: workflow
101+
workflow: run-netem-and-confirm
102+
103+
# This test is designed to test packet loss from the Kafka broker.
104+
duplicate-kafka:
105+
steps:
106+
- step: workflow
107+
workflow: start-everything
108+
- step: chaos-duplicate-docker
109+
container: chaos_kafka_1
110+
percent: 10
111+
- step: workflow
112+
workflow: run-netem-and-confirm
113+
114+
# This test is designed to test packet corruption from the Kafka broker.
115+
corrupt-kafka:
116+
steps:
117+
- step: workflow
118+
workflow: start-everything
119+
- step: chaos-corrupt-docker
120+
container: chaos_kafka_1
121+
percent: 10
122+
- step: workflow
123+
workflow: run-netem-and-confirm
101124

102125
# This test is designed to pause and unpause the running Kafka broker
103126
# (chaos-kill-container sends a SIGSTOP signal to the container).
@@ -192,3 +215,29 @@ mzconduct:
192215
- step: wait-for-tcp
193216
host: materialized
194217
port: 6875
218+
219+
# To verify a netem chaos test, we run the chaos container to completion
220+
# and check the following conditions:
221+
#
222+
# - the materialized container did not crash
223+
# - the kafka container did not crash
224+
# - the chaos container completed successfully
225+
run-netem-and-confirm:
226+
steps:
227+
- step: run
228+
service: chaos
229+
command: >-
230+
--materialized-host materialized
231+
--materialized-port 6875
232+
--kafka-url kafka:9092
233+
--kafka-partitions 100
234+
--message-count 100000000
235+
- step: chaos-confirm
236+
service: materialized
237+
running: true
238+
- step: chaos-confirm
239+
service: kafka
240+
running: true
241+
- step: chaos-confirm
242+
service: chaos_run
243+
exit_code: 0

0 commit comments

Comments
 (0)