The Squid worker python API
The squid.worker(sim_fn)
function starts a simulation loop:
- Receives an agent from the GA thread running on the Squid broker.
- Passes the agent's parameters as a JSON string to your simulation function
sim_fn
. - Sends the fitness score and optional additional simulation data (as a
dict
) back to the broker.
The loop terminates when there are no more agents to simulate.
To install the latest version:
pip install 'https://github.com/benbaarber/squid-worker.git'
To install a specific version, add the tag to the url:
pip install 'https://github.com/benbaarber/squid-worker.git@<tag>'
The generic worker takes in a simulation function and runs it as it receives agents from the GA.
-
Define your simulation function
Your function must take a genome JSON string and return a fitness score
float
and an optionaldict
for additional simulation data.The dict should mirror the
[csv_data]
section in your blueprint file and take the form:{ "csv_data_key": [row1, row2, ...], ... }
where the rows are lists of floats that line up with the headers specified in the blueprint. These rows will be appended to the csv files for the best agent after each generation.
In other words, your function must match this signature:
def my_simulation(genome: str) -> tuple[float, dict[str, list[list[float]]] | None]
An example:
def my_simulation(genome): # Deserialize genome ctrnn = CTRNN(7, 4, 2) ctrnn.load_genome_json(genome) # Simulate ... # Process agent and return a result fitness = 12.4 sim_data = { "sensor_outputs": [ [0.1, 0.2, 0.3, 0.4], [0.5, 0.6, 0.7, 0.8] ], "altitude": [ [0.1], [0.2], [0.3] ] } return fitness, sim_data
-
Create the entrypoint
In your entrypoint file, call
squid.worker()
with your simulation function:import squid ... if __name__ == "__main__": squid.worker(my_simulation)
The ROS worker is an extension of the Squid worker that is specific to running
a multiagent simulation in ROS and Gazebo. It takes in a class that inherits
from squid.ros.AgentNode
(which in turn inherits from the ROS Node
class)
as well as the SDF string of the model to be simulated.
-
Define your node class
In the same way you start a python ROS node project, begin by creating a node class. However, instead of inheriting from rclpy's
Node
, inherit from squid'sAgentNode
. Useself.model_name
to set up ros topics.import squid.ros class ExampleNode(squid.ros.AgentNode): def __init__(self, **kwargs): super().__init__(**kwargs) # ROS node setup ... # ROS node callbacks ...
-
Implement the
sim
methodAgentNode
also defines one abstract method calledsim
. This method has the same signature as the simulation function in the generic worker, and is called the same way internally.import squid.ros import rclpy class ExampleNode(squid.ros.AgentNode): ... def sim(self, genome: str) -> tuple[float, dict[str, list[list[float]]] | None]: # Deserialize genome self.ctrnn.load_genome_json(genome) # Reset drone self.reset() # Spin ros loop until exit condition is met, and collect data for fitness and evaluation self.sim_done = False while not self.sim_done: rclpy.spin_once(self, timeout_sec=0.1) # Evaluate agent fitness = self.compute_fitness() return fitness, self.data
AgentNode
also initializes a ZMQ PUSH socket to interface with the ResetModel plugin. This socket is connected to the correctGZ_PARTITION
and accessed atself.reset_sock
, so to reset your model's position and velocity, do this:self.reset_sock.send_string(self.model_name)
-
Create the entrypoint
Pass the node class and the model name template into the
ros_worker
function.def main(): squid.ros.ros_worker(ExampleNode, "x500_*")
-
Create launch file
The entrypoint of the squid container must call a python launch script that uses the util function
squid.ros.partitioned_gz_sims
. Ensure the main node where you call theros_worker
function is first in theLaunchDescription
, as it must connect to the broker quickly before it times out.def generate_launch_description(): ... gz_world_path = "path/to/world.sdf" # World must not include the model model_path = "path/to/model.sdf" bridge_template_path = "path/to/bridge_template.yaml" # File using the bridge template syntax (TODO document bridge template syntax) main_node = Node( package="your_ros_package", executable="your_main_entrypoint", output="screen", env=os.environ.copy() # must include env ) return LaunchDescription( [ main_node, *partitioned_gz_sims(gz_world_path, model_path, bridge_template_path), RegisterEventHandler( OnProcessExit( target_action=main_node, on_exit=[EmitEvent(event=Shutdown(reason="Experiment ended"))], ) ), ] )
- Python 3.11+
- pyzmq
- orjson
- rclpy