From dcef9be6c46194b56f3e622835dc596ac57ef7fa Mon Sep 17 00:00:00 2001 From: Shu Liu Date: Thu, 27 Apr 2023 22:35:19 -0700 Subject: [PATCH 01/11] import related planners --- skyplane/api/pipeline.py | 13 ++++- skyplane/planner/planner.py | 96 ++++++++++++++++++++++++++++++++++-- skyplane/planner/topology.py | 3 +- 3 files changed, 106 insertions(+), 6 deletions(-) diff --git a/skyplane/api/pipeline.py b/skyplane/api/pipeline.py index b68b97b01..7d2269945 100644 --- a/skyplane/api/pipeline.py +++ b/skyplane/api/pipeline.py @@ -17,7 +17,7 @@ from skyplane.api.transfer_job import CopyJob, SyncJob, TransferJob from skyplane.api.config import TransferConfig -from skyplane.planner.planner import MulticastDirectPlanner +from skyplane.planner.planner import MulticastDirectPlanner, UnicastDirectPlanner, UnicastILPPlanner, MulticastILPPlanner, MulticastMDSTPlanner, MulticastSteinerTreePlanner from skyplane.planner.topology import TopologyPlanGateway from skyplane.utils import logger from skyplane.utils.definitions import gateway_docker_image, tmp_log_dir @@ -39,6 +39,7 @@ def __init__( transfer_config: TransferConfig, # cloud_regions: dict, max_instances: Optional[int] = 1, + num_connections: Optional[int] = 32, planning_algorithm: Optional[str] = "direct", debug: Optional[bool] = False, ): @@ -68,7 +69,15 @@ def __init__( # planner self.planning_algorithm = planning_algorithm if self.planning_algorithm == "direct": - self.planner = MulticastDirectPlanner(self.max_instances, 32) + self.planner = MulticastDirectPlanner(self.max_instances, num_connections) + elif self.planning_algorithm == "MDST": + self.planner = MulticastMDSTPlanner(self.max_instances, num_connections) + elif self.planning_algorithm == "SteinerTree": + self.planning_algorithm = MulticastSteinerTreePlanner(self.max_instances, num_connections) + elif self.planning_algorithm == "ILP": + self.planning_algorithm = MulticastILPPlanner(self.max_instances, num_connections) + elif self.planning_algorithm == "UnicastILP": + self.planning_algorithm = UnicastILPPlanner(self.max_instances, num_connections) else: raise ValueError(f"No such planning algorithm {planning_algorithm}") diff --git a/skyplane/planner/planner.py b/skyplane/planner/planner.py index fd74a97e2..dd64f8677 100644 --- a/skyplane/planner/planner.py +++ b/skyplane/planner/planner.py @@ -1,6 +1,7 @@ from importlib.resources import path -from typing import List, Optional, Tuple - +from typing import List +from skyplane.planner.solver_ilp import ThroughputSolverILP +from skyplane.planner.solver import ThroughputProblem from skyplane import compute from skyplane.planner.topology import TopologyPlan @@ -174,8 +175,97 @@ def __init__(self, n_instances: int, n_connections: int, required_throughput_gbi super().__init__() def plan(self, jobs: List[TransferJob]) -> TopologyPlan: - raise NotImplementedError("ILP solver not implemented yet") + # make sure only single destination + for job in jobs: + assert len(job.dst_ifaces) == 1, f"DirectPlanner only support single destination jobs, got {len(job.dst_ifaces)}" + + src_region_tag = jobs[0].src_iface.region_tag() + src_provider, src_region = src_region_tag.split(":") + dst_region_tag = jobs[0].dst_ifaces[0].region_tag() + dst_provider, dst_region = dst_region_tag.split(":") + + # jobs must have same sources and destinations + for job in jobs[1:]: + assert job.src_iface.region_tag() == src_region_tag, "All jobs must have same source region" + assert job.dst_ifaces[0].region_tag() == dst_region_tag, "All jobs must have same destination region" + + + # NOTE: compute ILP plans here + problem = ThroughputProblem( + src=f"{src_provider}:{src_region}", + dst=f"{dst_provider}:{dst_region}", + required_throughput_gbits=self.solver_required_throughput_gbits, + gbyte_to_transfer=1, + instance_limit=self.n_instances, + ) + + with path("skyplane.data", "throughput.csv") as solver_throughput_grid: + tput = ThroughputSolverILP(solver_throughput_grid) + solution = tput.solve_min_cost(problem, solver=ThroughputSolverILP.choose_solver(), save_lp_path=None) + if not solution.is_feasible: + raise ValueError("ILP solver failed to find a solution, try solving with fewer constraints") + + topo, _ = tput.to_replication_topology(solution) + + + plan = TopologyPlan(src_region_tag=src_region_tag, dest_region_tags=[dst_region_tag]) + + # TODO: use VM limits to determine how many instances to create in each region + # TODO: support on-sided transfers but not requiring VMs to be created in source/destination regions + for i in range(self.n_instances): + plan.add_gateway(src_region_tag) + plan.add_gateway(dst_region_tag) + + # ids of gateways in dst region + dst_gateways = plan.get_region_gateways(dst_region_tag) + + src_program = GatewayProgram() + dst_program = GatewayProgram() + + for job in jobs: + src_bucket = job.src_iface.bucket() + dst_bucket = job.dst_ifaces[0].bucket() + + # give each job a different partition id, so we can read/write to different buckets + partition_id = jobs.index(job) + # source region gateway program + obj_store_read = src_program.add_operator( + GatewayReadObjectStore(src_bucket, src_region_tag, self.n_connections), partition_id=partition_id + ) + mux_or = src_program.add_operator(GatewayMuxOr(), parent_handle=obj_store_read, partition_id=partition_id) + for i in range(self.n_instances): + src_program.add_operator( + GatewaySend(target_gateway_id=dst_gateways[i].gateway_id, region=src_region_tag, num_connections=self.n_connections), + parent_handle=mux_or, + partition_id=partition_id, + ) + + # dst region gateway program + recv_op = dst_program.add_operator(GatewayReceive(), partition_id=partition_id) + dst_program.add_operator( + GatewayWriteObjectStore(dst_bucket, dst_region_tag, self.n_connections), parent_handle=recv_op, partition_id=partition_id + ) + + # update cost per GB + plan.cost_per_gb += compute.CloudProvider.get_transfer_cost(src_region_tag, dst_region_tag) + + # set gateway programs + plan.set_gateway_program(src_region_tag, src_program) + plan.set_gateway_program(dst_region_tag, dst_program) + + return plan + + +class UnicastRONSolverPlanner(Planner): + def __init__(self, n_instances: int, n_connections: int, required_throughput_gbits: float): + self.n_instances = n_instances + self.n_connections = n_connections + self.solver_required_throughput_gbits = required_throughput_gbits + super().__init__() + + def plan(self, jobs: List[TransferJob]) -> TopologyPlan: + raise NotImplementedError("RON solver not implemented yet") class MulticastILPPlanner(Planner): def __init__(self, n_instances: int, n_connections: int, required_throughput_gbits: float): diff --git a/skyplane/planner/topology.py b/skyplane/planner/topology.py index f8b72de9f..1551f9ce6 100644 --- a/skyplane/planner/topology.py +++ b/skyplane/planner/topology.py @@ -52,9 +52,10 @@ class TopologyPlan: The TopologyPlan constains a list of gateway programs corresponding to each gateway in the dataplane. """ - def __init__(self, src_region_tag: str, dest_region_tags: List[str], cost_per_gb: float = 0.0): + def __init__(self, src_region_tag: str, dest_region_tags: List[str], overlay_region_tags: List[str] = [], cost_per_gb: float = 0.0): self.src_region_tag = src_region_tag self.dest_region_tags = dest_region_tags + self.overlay_region_tags = overlay_region_tags self.gateways = {} self.cost_per_gb = cost_per_gb From 31a3cb9e8ea4f926f2cd42242447ae0018c47a56 Mon Sep 17 00:00:00 2001 From: Shu Liu Date: Mon, 1 May 2023 23:38:25 -0700 Subject: [PATCH 02/11] multicast planners --- skyplane/api/pipeline.py | 16 +- skyplane/gateway/gateway_program.py | 4 +- skyplane/planner/planner.py | 736 +++++++++++++++++++++------- skyplane/planner/solver.py | 80 +++ 4 files changed, 660 insertions(+), 176 deletions(-) diff --git a/skyplane/api/pipeline.py b/skyplane/api/pipeline.py index 7d2269945..a47167a29 100644 --- a/skyplane/api/pipeline.py +++ b/skyplane/api/pipeline.py @@ -17,7 +17,13 @@ from skyplane.api.transfer_job import CopyJob, SyncJob, TransferJob from skyplane.api.config import TransferConfig -from skyplane.planner.planner import MulticastDirectPlanner, UnicastDirectPlanner, UnicastILPPlanner, MulticastILPPlanner, MulticastMDSTPlanner, MulticastSteinerTreePlanner +from skyplane.planner.planner import ( + MulticastDirectPlanner, + UnicastDirectPlanner, + UnicastILPPlanner, + MulticastILPPlanner, + MulticastMDSTPlanner +) from skyplane.planner.topology import TopologyPlanGateway from skyplane.utils import logger from skyplane.utils.definitions import gateway_docker_image, tmp_log_dir @@ -39,7 +45,7 @@ def __init__( transfer_config: TransferConfig, # cloud_regions: dict, max_instances: Optional[int] = 1, - num_connections: Optional[int] = 32, + num_connections: Optional[int] = 32, planning_algorithm: Optional[str] = "direct", debug: Optional[bool] = False, ): @@ -68,12 +74,12 @@ def __init__( # planner self.planning_algorithm = planning_algorithm - if self.planning_algorithm == "direct": + if self.planning_algorithm == "Ndirect": self.planner = MulticastDirectPlanner(self.max_instances, num_connections) + if self.planning_algorithm == "direct": + self.planner = UnicastDirectPlanner(self.max_instances, num_connections) elif self.planning_algorithm == "MDST": self.planner = MulticastMDSTPlanner(self.max_instances, num_connections) - elif self.planning_algorithm == "SteinerTree": - self.planning_algorithm = MulticastSteinerTreePlanner(self.max_instances, num_connections) elif self.planning_algorithm == "ILP": self.planning_algorithm = MulticastILPPlanner(self.max_instances, num_connections) elif self.planning_algorithm == "UnicastILP": diff --git a/skyplane/gateway/gateway_program.py b/skyplane/gateway/gateway_program.py index 3afeb325f..3714b6162 100644 --- a/skyplane/gateway/gateway_program.py +++ b/skyplane/gateway/gateway_program.py @@ -104,7 +104,7 @@ def __init__(self): def get_operators(self) -> List[GatewayOperator]: return list(self._ops.values()) - def add_operators(self, ops: List[GatewayOperator], parent_handle: Optional[str] = None, partition_id: Optional[int] = 0): + def add_operators(self, ops: List[GatewayOperator], parent_handle: Optional[str] = None, partition_id: Optional[Tuple] = None): parent_op = self._ops[parent_handle] if parent_handle else None ops_handles = [] for op in ops: @@ -112,7 +112,7 @@ def add_operators(self, ops: List[GatewayOperator], parent_handle: Optional[str] return ops_handles - def add_operator(self, op: GatewayOperator, parent_handle: Optional[str] = None, partition_id: Optional[int] = 0): + def add_operator(self, op: GatewayOperator, parent_handle: Optional[str] = None, partition_id: Optional[Tuple] = None): parent_op = self._ops[parent_handle] if parent_handle else None if not parent_op: # root operation self._plan[partition_id].append(op) diff --git a/skyplane/planner/planner.py b/skyplane/planner/planner.py index dd64f8677..28d0180e4 100644 --- a/skyplane/planner/planner.py +++ b/skyplane/planner/planner.py @@ -1,9 +1,13 @@ +import functools from importlib.resources import path -from typing import List +from typing import List, Optional, Tuple +import numpy as np + +import pandas as pd +from skyplane.obj_store.storage_interface import StorageInterface from skyplane.planner.solver_ilp import ThroughputSolverILP -from skyplane.planner.solver import ThroughputProblem +from skyplane.planner.solver import ThroughputProblem, BroadcastProblem, BroadcastSolution, GBIT_PER_GBYTE from skyplane import compute - from skyplane.planner.topology import TopologyPlan from skyplane.gateway.gateway_program import ( GatewayProgram, @@ -14,165 +18,590 @@ GatewayReceive, GatewaySend, ) - +import networkx as nx from skyplane.api.transfer_job import TransferJob +from pathlib import Path +from importlib.resources import files class Planner: - def plan(self) -> TopologyPlan: - raise NotImplementedError - - -class UnicastDirectPlanner(Planner): - def __init__(self, n_instances: int, n_connections: int): + def __init__(self, n_instances: int, n_connections: int, n_partitions: Optional[int] = 1): self.n_instances = n_instances self.n_connections = n_connections - super().__init__() + self.n_partitions = n_partitions + self.G = self.make_nx_graph() + + def logical_plan(self) -> nx.DiGraph: + raise NotImplementedError def plan(self, jobs: List[TransferJob]) -> TopologyPlan: - # make sure only single destination - for job in jobs: - assert len(job.dst_ifaces) == 1, f"DirectPlanner only support single destination jobs, got {len(job.dst_ifaces)}" + raise NotImplementedError + def check_src_dst(self, jobs: List[TransferJob], multicast=False) -> Tuple[str, List[str]]: src_region_tag = jobs[0].src_iface.region_tag() - dst_region_tag = jobs[0].dst_ifaces[0].region_tag() - # jobs must have same sources and destinations - for job in jobs[1:]: - assert job.src_iface.region_tag() == src_region_tag, "All jobs must have same source region" - assert job.dst_ifaces[0].region_tag() == dst_region_tag, "All jobs must have same destination region" - plan = TopologyPlan(src_region_tag=src_region_tag, dest_region_tags=[dst_region_tag]) - # TODO: use VM limits to determine how many instances to create in each region - # TODO: support on-sided transfers but not requiring VMs to be created in source/destination regions - for i in range(self.n_instances): - plan.add_gateway(src_region_tag) - plan.add_gateway(dst_region_tag) - - # ids of gateways in dst region - dst_gateways = plan.get_region_gateways(dst_region_tag) - - src_program = GatewayProgram() - dst_program = GatewayProgram() + # jobs must have same sources and destinations + if not multicast: + for job in jobs: + assert len(job.dst_ifaces) == 1, f"DirectPlanner only support single destination jobs, got {len(job.dst_ifaces)}" + + dst_region_tag = jobs[0].dst_ifaces[0].region_tag() + for job in jobs[1:]: + assert job.src_iface.region_tag() == src_region_tag, "All jobs must have same source region" + assert job.dst_ifaces[0].region_tag() == dst_region_tag, "All jobs must have same destination region" + dst_region_tag = [dst_region_tag] + + else: + dst_region_tags = [iface.region_tag() for iface in jobs[0].dst_ifaces] + for job in jobs[1:]: + assert job.src_iface.region_tag() == src_region_tag, "All jobs must have same source region" + assert [iface.region_tag() for iface in job.dst_ifaces] == dst_region_tags, "Add jobs must have same destination set" + + return src_region_tag, dst_region_tags + + @functools.lru_cache(maxsize=None) + def get_path_cost(self, src, dst, src_tier="PREMIUM", dst_tier="PREMIUM"): + from skyplane.compute.cloud_provider import CloudProvider + + assert src_tier == "PREMIUM" and dst_tier == "PREMIUM" + return CloudProvider.get_transfer_cost(src, dst) + + def make_nx_graph(self, tp_grid_path: Optional[Path] = files("skyplane.data") / "throughput.csv") -> nx.DiGraph: + G = nx.DiGraph() + throughput = pd.read_csv(tp_grid_path) + for _, row in throughput.iterrows(): + if row["src_region"] == row["dst_region"]: + continue + G.add_edge(row["src_region"], row["dst_region"], cost=None, throughput=row["throughput_sent"] / 1e9) + + # update the cost using skyplane.compute tools [i.e. in utils.py] (need to build the skyplane repo first) + for edge in G.edges.data(): + if edge[-1]["cost"] is None: + edge[-1]["cost"] = self.get_path_cost(edge[0], edge[1]) + + assert all([edge[-1]["cost"] is not None for edge in G.edges.data()]) + return G + + def add_operator_receive_send( + self, + solution_graph, + gateway_program: GatewayProgram, + region: str, + partition_ids: List[int], + partition_offset: int, + plan: TopologyPlan, + obj_store: Optional[Tuple[str, str]] = None, + dst_op: Optional[GatewayReceive] = None, + ) -> bool: + if dst_op is not None: + receive_op = dst_op + else: + if obj_store is None: + receive_op = GatewayReceive() + else: + receive_op = GatewayReadObjectStore( + bucket_name=obj_store[0], bucket_region=obj_store[1], num_connections=self.n_connections + ) - for job in jobs: - src_bucket = job.src_iface.bucket() - dst_bucket = job.dst_ifaces[0].bucket() + # find set of regions to send to for all partitions in partition_ids + g = solution_graph - # give each job a different partition id, so we can read/write to different buckets - partition_id = jobs.index(job) + # check correctness + any_id = partition_ids[0] - partition_offset + next_regions = set([edge[1] for edge in g.out_edges(region, data=True) if str(any_id) in edge[-1]["partitions"]]) - # source region gateway program - obj_store_read = src_program.add_operator( - GatewayReadObjectStore(src_bucket, src_region_tag, self.n_connections), partition_id=partition_id + # if no regions to forward data to + if len(next_regions) == 0: + print( + f"Region {region}, any id: {any_id}, partition ids: {partition_ids}, has no next region to forward data to: {g.out_edges(region, data=True)}" ) - mux_or = src_program.add_operator(GatewayMuxOr(), parent_handle=obj_store_read, partition_id=partition_id) - for i in range(self.n_instances): - src_program.add_operator( - GatewaySend(target_gateway_id=dst_gateways[i].gateway_id, region=src_region_tag, num_connections=self.n_connections), - parent_handle=mux_or, - partition_id=partition_id, - ) - - # dst region gateway program - recv_op = dst_program.add_operator(GatewayReceive(), partition_id=partition_id) - dst_program.add_operator( - GatewayWriteObjectStore(dst_bucket, dst_region_tag, self.n_connections), parent_handle=recv_op, partition_id=partition_id + return False + + region_to_id_map = {} + for next_region in next_regions: + region_to_id_map[next_region] = plan.get_region_gateways(next_region).gateway_id + + # use muxand or muxor for partition_id + operation = "MUX_AND" if len(next_regions) > 1 else "MUX_OR" + mux_op = GatewayMuxAnd() if len(next_regions) > 1 else GatewayMuxOr() + + # non-dst node: add receive_op into gateway program + if dst_op is None: + gateway_program.add_operator(op=receive_op, partition_id=tuple(partition_ids)) + + # MUX_AND: send this partition to multiple regions + if operation == "MUX_AND": + if dst_op is not None and dst_op.op_type == "mux_and": + mux_op = receive_op + else: # do not add any nested mux_and if dst_op parent is mux_and + gateway_program.add_operator(op=mux_op, parent_handle=receive_op, partition_id=tuple(partition_ids)) + + for next_region, next_region_ids in region_to_id_map.items(): + send_ops = [GatewaySend(ip, num_connections=self.n_connections, region=next_region) for ip in next_region_ids] + + # if next region has >1 gateways, add MUX_OR + if len(next_region_ids) > 1: + mux_or_op = GatewayMuxOr() + gateway_program.add_operator(op=mux_or_op, parent_handle=mux_op, partition_id=tuple(partition_ids)) + gateway_program.add_operators(ops=send_ops, parent_handle=mux_or_op, partition_id=tuple(partition_ids)) + else: + # otherwise, the parent of send_op is mux_op ("MUX_AND") + assert len(send_ops) == 1 + gateway_program.add_operator(op=send_ops[0], parent_handle=mux_op, partition_id=tuple(partition_ids)) + else: + # only send this partition to a single region + assert len(region_to_id_map) == 1 + + next_region = list(region_to_id_map.keys())[0] + ips = [ip for next_region_ids in region_to_id_map.values() for ip in next_region_ids] + + num_connections = int(self.n_connections / len(ips)) + send_ops = [GatewaySend(ip, num_connections=num_connections, region=next_region) for ip in ips] + + # if num of gateways > 1, then connect to MUX_OR + if len(ips) > 1: + gateway_program.add_operator(op=mux_op, parent_handle=receive_op, partition_id=tuple(partition_ids)) + gateway_program.add_operators(ops=send_ops, parent_handle=mux_op) + else: + gateway_program.add_operators(op=send_ops, parent_handle=receive_op, partition_id=tuple(partition_ids)) + + return True + + def add_dst_operator( + self, + solution_graph, + gateway_program: GatewayProgram, + region: str, + partition_ids: List[int], + partition_offset: int, + plan: TopologyPlan, + obj_store: Tuple[str, str] = None, + ): + receive_op = GatewayReceive() + gateway_program.add_operator(receive_op, partition_id=tuple(partition_ids)) + + # write + write_op = GatewayWriteObjectStore(bucket_name=obj_store[0], bucket_region=obj_store[1], num_connections=self.n_connections) + + g = solution_graph + any_id = partition_ids[0] - partition_offset + next_regions = set([edge[1] for edge in g.out_edges(region, data=True) if str(any_id) in edge[-1]["partitions"]]) + + # if no regions to forward data to, just write + if len(next_regions) == 0: + gateway_program.add_operator(write_op, receive_op, partition_id=tuple(partition_ids)) + else: # otherwise, "and" --> write and forward + mux_and_op = GatewayMuxAnd() + gateway_program.add_operator(mux_and_op, receive_op, partition_id=tuple(partition_ids)) + gateway_program.add_operator(write_op, mux_and_op, partition_id=tuple(partition_ids)) + self.add_operator_receive_send( + solution_graph, gateway_program, region, partition_ids, partition_offset, plan, dst_op=mux_and_op ) - # update cost per GB - plan.cost_per_gb += compute.CloudProvider.get_transfer_cost(src_region_tag, dst_region_tag) + def solution_to_gateway_programs( + self, + src_ifaces: List[StorageInterface], + dst_ifaces: List[List[StorageInterface]], + solution_graph: nx.graph, + num_partitions: int = 1, + ) -> TopologyPlan: + src_region_tag = src_ifaces[0].region_tag() + dst_region_tags = [dst_iface.region_tag() for dst_iface in dst_ifaces[0]] + + # map from the node to the gateway program + node_to_program = {node: GatewayProgram() for node in solution_graph.nodes} + + # construct a plan for each node + overlay_region_tags = [node for node in solution_graph.nodes if node != src_region_tag and node not in dst_region_tags] + plan = TopologyPlan(src_region_tag=src_region_tag, dest_region_tags=dst_region_tags, overlay_region_tags=overlay_region_tags) + + # iterate through all the nodes + for node in solution_graph.nodes: + plan.add_gateway(node) + + def remap_keys(self, mapping): + return [{"partitions": k, "value": v} for k, v in mapping.items()] + + # iterate through all the jobs + for i in range(len(src_ifaces)): + src_bucket = src_ifaces[i].bucket() + dst_buckets = {dst_iface.region_tag(): dst_iface[i].bucket() for dst_iface in dst_ifaces} + + # iterate through all the nodes + for node in solution_graph.nodes: + node_gateway_program = node_to_program[node] + partition_to_next_regions = {} + + for j in range(i, i + num_partitions): + partition_to_next_regions[j] = set( + [edge[1] for edge in solution_graph.out_edges(node, data=True) if str(j) in edge[-1]["partitions"]] + ) - # set gateway programs - plan.set_gateway_program(src_region_tag, src_program) - plan.set_gateway_program(dst_region_tag, dst_program) + import collections + + keys_per_set = collections.defaultdict(list) + for key, value in partition_to_next_regions.items(): + keys_per_set[frozenset(value)].append(key) + + list_of_partitions = list(keys_per_set.values()) + + # source node: read from object store or generate random data, then forward data + for partitions in list_of_partitions: + if node == src_region_tag: + self.add_operator_receive_send( + solution_graph, + node_gateway_program, + node, + partitions, + partition_offset=i, + plan=plan, + obj_store=(src_bucket, node), + ) + + # dst receive data, write to object store, forward data if needed + elif node in dst_region_tags: + dst_bucket = dst_buckets[node] + self.add_dst_operator( + solution_graph, + node_gateway_program, + node, + partitions, + partition_offset=i, + plan=plan, + obj_store=(dst_bucket, node), + ) + + # overlay node only forward data + else: + self.add_operator_receive_send( + solution_graph, node_gateway_program, node, partitions, partition_offset=i, plan=plan, obj_store=None + ) + + node_to_program[node] = remap_keys(node_gateway_program.to_dict()) + assert len(node_to_program[node]) > 0, f"Empty gateway program {node}" + + for node in solution_graph.nodes: + plan.set_gateway_program(node, node_to_program[node]) + + for edge in solution_graph.edges.data(): + src_region, dst_region = edge[0], edge[1] + plan.cost_per_gb += compute.CloudProvider.get_transfer_cost(src_region, dst_region) * ( + len(edge[-1]["partitions"]) / num_partitions + ) return plan class MulticastDirectPlanner(Planner): - def __init__(self, n_instances: int, n_connections: int): - self.n_instances = n_instances - self.n_connections = n_connections - super().__init__() + def __init__(self, n_instances: int, n_connections: int, n_partitions: Optional[int] = 1): + super().__init__(n_instances, n_connections, n_partitions) - def plan(self, jobs: List[TransferJob]) -> TopologyPlan: - src_region_tag = jobs[0].src_iface.region_tag() - dst_region_tags = [iface.region_tag() for iface in jobs[0].dst_ifaces] - # jobs must have same sources and destinations - for job in jobs[1:]: - assert job.src_iface.region_tag() == src_region_tag, "All jobs must have same source region" - assert [iface.region_tag() for iface in job.dst_ifaces] == dst_region_tags, "Add jobs must have same destination set" + def logical_plan(self, src_region: str, dst_regions: List[str]) -> nx.DiGraph: + graph = nx.DiGraph() + graph.add_node(src_region) + for dst_region in dst_regions: + graph.add_node(dst_region) + graph.add_edge(src_region, dst_region, partitions=list(range(self.n_partitions))) + return graph - plan = TopologyPlan(src_region_tag=src_region_tag, dest_region_tags=dst_region_tags) - # TODO: use VM limits to determine how many instances to create in each region - # TODO: support on-sided transfers but not requiring VMs to be created in source/destination regions - for i in range(self.n_instances): - plan.add_gateway(src_region_tag) - for dst_region_tag in dst_region_tags: - plan.add_gateway(dst_region_tag) + def plan(self, jobs: List[TransferJob]) -> TopologyPlan: + src_region_tag, dst_region_tags = self.check_src_dst(jobs) + solution_graph = self.logical_plan(src_region_tag, dst_region_tags) + return self.solution_to_gateway_programs([job.src_iface for job in jobs], [job.dst_ifaces for job in jobs], solution_graph) - # initialize gateway programs per region - dst_program = {dst_region: GatewayProgram() for dst_region in dst_region_tags} - src_program = GatewayProgram() - # iterate through all jobs - for job in jobs: - src_bucket = job.src_iface.bucket() - src_region_tag = job.src_iface.region_tag() +class MulticastILPPlanner(Planner): + def __init__( + self, + n_instances: int, + n_connections: int, + target_time: float, + n_partitions: Optional[int] = 1, + aws_only: bool = False, + gcp_only: bool = False, + azure_only: bool = False, + ): + super().__init__(n_instances, n_connections, n_partitions) + self.target_time = target_time + self.aws_only = aws_only + self.gcp_only = gcp_only + self.azure_only = azure_only + + def multicast_solution_to_nxgraph(self, solution: BroadcastSolution) -> nx.DiGraph: + """ + Convert ILP solution to BroadcastReplicationTopology + """ + v_result = solution.var_instances_per_region + result = np.array(solution.var_edge_partitions) + result_g = nx.DiGraph() # solution nx graph + for i in range(result.shape[0]): + edge = solution.var_edges[i] + partitions = [str(partition_i) for partition_i in range(result.shape[1]) if result[i][partition_i] > 0.5] + + if len(partitions) == 0: + continue + + src_node, dst_node = edge[0], edge[1] + result_g.add_edge( + src_node, + dst_node, + partitions=partitions, + throughput=self.G[src_node][dst_node]["throughput"], + cost=self.G[src_node][dst_node]["cost"], + ) - # give each job a different partition id, so we can read/write to different buckets - partition_id = jobs.index(job) + for i in range(len(v_result)): + num_vms = int(v_result[i]) + node = solution.var_nodes[i] + if node in result_g.nodes: + result_g.nodes[node]["num_vms"] = num_vms + + def logical_plan( + self, + src_region: str, + dst_regions: List[str], + gbyte_to_transfer: int = 1, + filter_node: bool = False, + filter_edge: bool = False, + solver_verbose: bool = False, + save_lp_path: Optional[str] = None, + ) -> nx.DiGraph: + import cvxpy as cp + + if solver is None: + solver = cp.GUROBI + + problem = BroadcastProblem( + src=src_region, + dsts=dst_regions, + gbyte_to_transfer=gbyte_to_transfer, + instance_limit=self.n_instances, + num_partitions=self.n_partitions, + required_time_budget=self.target_time, + ) - # source region gateway program - obj_store_read = src_program.add_operator( - GatewayReadObjectStore(src_bucket, src_region_tag, self.n_connections), partition_id=partition_id + g = self.G + + # node-approximation + from random import sample + + if filter_node: + src_dst_li = [problem.src] + problem.dsts + sampled = [i for i in sample(list(self.G.nodes), 15) if i not in src_dst_li] + g = g.subgraph(src_dst_li + sampled).copy() + print(f"Filter node (only use): {src_dst_li + sampled}") + + cost = np.array([e[2] for e in g.edges(data="cost")]) + tp = np.array([e[2] for e in g.edges(data="throughput")]) + + edges = list(g.edges) + nodes = list(g.nodes) + num_edges, num_nodes = len(edges), len(nodes) + num_dest = len(problem.dsts) + print(f"Num edges: {num_edges}, num nodes: {num_nodes}, num dest: {num_dest}, runtime budget: {problem.required_time_budget}s") + + partition_size_gb = problem.gbyte_to_transfer / problem.num_partitions + partition_size_gbit = partition_size_gb * GBIT_PER_GBYTE + print("Partition size (gbit): ", partition_size_gbit) + + # define variables + p = cp.Variable((num_edges, problem.num_partitions), boolean=True) # whether edge is carrying partition + n = cp.Variable((num_nodes), boolean=True) # whether node transfers partition + f = cp.Variable((num_nodes * problem.num_partitions, num_nodes + 1), integer=True) # enforce flow conservation + v = cp.Variable((num_nodes), integer=True) # number of VMs per region + + # define objective + egress_cost = cp.sum(cost @ p) * partition_size_gb + instance_cost = cp.sum(v) * (problem.cost_per_instance_hr / 3600) * problem.required_time_budget + tot_cost = egress_cost + instance_cost + obj = cp.Minimize(tot_cost) + + # define constants + constraints = [] + + # constraints on VM per region + for i in range(num_nodes): + constraints.append(v[i] <= problem.instance_limit) + constraints.append(v[i] >= 0) + + # constraints to enforce flow between source/dest nodes + for c in range(problem.num_partitions): + for i in range(num_nodes): + for j in range(num_nodes + 1): + if i != j: + if j != num_nodes: + edge = (nodes[i], nodes[j]) + + constraints.append(f[c * num_nodes + i][j] <= p[edges.index(edge)][c] * num_dest) + # p = 0 -> f <= 0 + # p = 1 -> f <= num_dest + constraints.append(f[c * num_nodes + i][j] >= (p[edges.index(edge)][c] - 1) * (num_dest + 1) + 1) + # p = 0 -> f >= -(num_dest) + # p = 1 -> f >= 1 + + constraints.append(f[c * num_nodes + i][j] == -f[c * num_nodes + j][i]) + + # capacity constraint for special node + else: + if nodes[i] in problem.dsts: # only connected to destination nodes + constraints.append(f[c * num_nodes + i][j] <= 1) + else: + constraints.append(f[c * num_nodes + i][j] <= 0) + else: + constraints.append(f[c * num_nodes + i][i] == 0) + + # flow conservation + if nodes[i] != problem.src and i != num_nodes + 1: + constraints.append(cp.sum(f[c * num_nodes + i]) == 0) + + # source must have outgoing flow + constraints.append(cp.sum(f[c * num_nodes + nodes.index(problem.src), :]) == num_dest) + + # special node (connected to all destinations) must recieve all flow + constraints.append(cp.sum(f[c * num_nodes : (c + 1) * num_nodes, -1]) == num_dest) + + # node contained if edge is contained + for edge in edges: + constraints.append(n[nodes.index(edge[0])] >= cp.max(p[edges.index(edge)])) + constraints.append(n[nodes.index(edge[1])] >= cp.max(p[edges.index(edge)])) + + # edge approximation + if filter_edge: + # hop limit = 2: either source is source node, and/or dest is terminal node + # all other edges must be 0 + # alternative: filter edges to matchi this + print("Filter edge") + for edge in edges: + if edge[0] != problem.src and edge[1] not in problem.dsts: + # cannot be in graph + constraints.append(p[edges.index(edge)] == 0) + + # throughput constraint + for edge_i in range(num_edges): + node_i = nodes.index(edge[0]) + constraints.append(cp.sum(p[edge_i] * partition_size_gbit) <= problem.required_time_budget * tp[edge_i] * v[node_i]) + + # instance limits + for node in nodes: + region = node.split(":")[0] + if region == "aws": + ingress_limit_gbps, egress_limit_gbps = problem.aws_instance_throughput_limit + elif region == "gcp": + ingress_limit_gbps, egress_limit_gbps = problem.gcp_instance_throughput_limit + elif region == "azure": + ingress_limit_gbps, egress_limit_gbps = problem.azure_instance_throughput_limit + elif region == "cloudflare": # TODO: not supported yet in the tput / cost graph + ingress_limit_gbps, egress_limit_gbps = 1, 1 + + node_i = nodes.index(node) + # egress + i = np.zeros(num_edges) + for e in g.edges: + if e[0] == node: # edge goes to dest + i[edges.index(e)] = 1 + + constraints.append(cp.sum(i @ p) * partition_size_gbit <= problem.required_time_budget * egress_limit_gbps * v[node_i]) + + # ingress + i = np.zeros(num_edges) + for e in g.edges: + # edge goes to dest + if e[1] == node: + i[edges.index(e)] = 1 + constraints.append(cp.sum(i @ p) * partition_size_gbit <= problem.required_time_budget * ingress_limit_gbps * v[node_i]) + + print("Define problem done.") + + # solve + prob = cp.Problem(obj, constraints) + if solver == cp.GUROBI or solver == "gurobi": + solver_options = {} + solver_options["Threads"] = 1 + if save_lp_path: + solver_options["ResultFile"] = str(save_lp_path) + if not solver_verbose: + solver_options["OutputFlag"] = 0 + cost = prob.solve(verbose=solver_verbose, qcp=True, solver=cp.GUROBI, reoptimize=True, **solver_options) + elif solver == cp.CBC or solver == "cbc": + solver_options = {} + solver_options["maximumSeconds"] = 60 + solver_options["numberThreads"] = 1 + cost = prob.solve(verbose=solver_verbose, solver=cp.CBC, **solver_options) + else: + cost = prob.solve(solver=solver, verbose=solver_verbose) + + if prob.status == "optimal": + solution = BroadcastSolution( + problem=problem, + is_feasible=True, + var_edges=edges, + var_nodes=nodes, + var_edge_partitions=p.value, + var_node_transfer_partitions=n.value, + var_instances_per_region=v.value, + var_flow=f.value, + cost_egress=egress_cost.value, + cost_instance=instance_cost.value, + cost_total=tot_cost.value, ) - # send to all destination - mux_and = src_program.add_operator(GatewayMuxAnd(), parent_handle=obj_store_read, partition_id=partition_id) - dst_prefixes = job.dst_prefixes - for i in range(len(job.dst_ifaces)): - dst_iface = job.dst_ifaces[i] - dst_prefix = dst_prefixes[i] - dst_region_tag = dst_iface.region_tag() - dst_bucket = dst_iface.bucket() - dst_gateways = plan.get_region_gateways(dst_region_tag) - - # can send to any gateway in region - mux_or = src_program.add_operator(GatewayMuxOr(), parent_handle=mux_and, partition_id=partition_id) - for i in range(self.n_instances): - src_program.add_operator( - GatewaySend( - target_gateway_id=dst_gateways[i].gateway_id, region=dst_region_tag, num_connections=self.n_connections - ), - parent_handle=mux_or, - partition_id=partition_id, - ) + else: + solution = BroadcastSolution(problem=problem, is_feasible=False, extra_data=dict(status=prob.status)) - # each gateway also recieves data from source - recv_op = dst_program[dst_region_tag].add_operator(GatewayReceive(), partition_id=partition_id) - dst_program[dst_region_tag].add_operator( - GatewayWriteObjectStore(dst_bucket, dst_region_tag, self.n_connections, key_prefix=dst_prefix), - parent_handle=recv_op, - partition_id=partition_id, - ) + return self.multicast_solution_to_nxgraph(solution) - # update cost per GB - plan.cost_per_gb += compute.CloudProvider.get_transfer_cost(src_region_tag, dst_region_tag) + def plan(self, jobs: List[TransferJob]) -> TopologyPlan: + src_region_tag, dst_region_tags = self.check_src_dst(jobs, multicast=True) + solution_graph = self.logical_plan(src_region_tag, dst_region_tags) + return self.solution_to_gateway_programs([job.src_iface for job in jobs], [job.dst_ifaces for job in jobs], solution_graph) - # set gateway programs - plan.set_gateway_program(src_region_tag, src_program) - for dst_region_tag, program in dst_program.items(): - plan.set_gateway_program(dst_region_tag, program) - return plan +class MulticastMDSTPlanner(Planner): + def __init__(self, n_instances: int, n_connections: int, n_partitions: Optional[int] = 1): + super().__init__(n_instances, n_connections, n_partitions) + + def logical_plan(self, src_region: str, dst_regions: List[str]) -> nx.DiGraph: + h = self.G.copy() + h.remove_edges_from(list(h.in_edges(src_region)) + list(nx.selfloop_edges(h))) + + DST_graph = nx.algorithms.tree.Edmonds(h.subgraph([src_region] + dst_regions)) + opt_DST = DST_graph.find_optimum(attr="cost", kind="min", preserve_attrs=True, style="arborescence") + + # Construct MDST graph + MDST_graph = nx.DiGraph() + for edge in list(opt_DST.edges()): + s, d = edge[0], edge[1] + MDST_graph.add_edge(s, d, partitions=[str(i) for i in list(range(self.num_partitions))]) + + return MDST_graph + + def plan(self, jobs: List[TransferJob]) -> TopologyPlan: + src_region_tag, dst_region_tags = self.check_src_dst(jobs, multicast=True) + solution_graph = self.logical_plan(src_region_tag, dst_region_tags) + return self.solution_to_gateway_programs([job.src_iface for job in jobs], [job.dst_ifaces for job in jobs], solution_graph) + + +class UnicastDirectPlanner(Planner): + def __init__(self, n_instances: int, n_connections: int, n_partitions: Optional[int] = 1): + super().__init__(n_instances, n_connections, n_partitions) + + def logical_plan(self, src_region: str, dst_regions: List[str]) -> nx.DiGraph: + graph = nx.DiGraph() + graph.add_node(src_region) + for dst_region in dst_regions: + graph.add_node(dst_region) + graph.add_edge(src_region, dst_region, partitions=list(range(self.n_partitions))) + return graph + + def plan(self, jobs: List[TransferJob]) -> TopologyPlan: + src_region_tag, dst_region_tag = self.check_src_dst(jobs) + solution_graph = self.logical_plan(src_region_tag, dst_region_tag) + return self.solution_to_gateway_programs([job.src_iface for job in jobs], [job.dst_ifaces for job in jobs], solution_graph) class UnicastILPPlanner(Planner): - def __init__(self, n_instances: int, n_connections: int, required_throughput_gbits: float): - self.n_instances = n_instances - self.n_connections = n_connections + def __init__(self, n_instances: int, n_connections: int, required_throughput_gbits: float, n_partitions: Optional[int] = 1): + super().__init__(n_instances, n_connections, n_partitions) self.solver_required_throughput_gbits = required_throughput_gbits - super().__init__() def plan(self, jobs: List[TransferJob]) -> TopologyPlan: # make sure only single destination @@ -183,14 +612,13 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: src_provider, src_region = src_region_tag.split(":") dst_region_tag = jobs[0].dst_ifaces[0].region_tag() dst_provider, dst_region = dst_region_tag.split(":") - + # jobs must have same sources and destinations for job in jobs[1:]: assert job.src_iface.region_tag() == src_region_tag, "All jobs must have same source region" assert job.dst_ifaces[0].region_tag() == dst_region_tag, "All jobs must have same destination region" - - # NOTE: compute ILP plans here + # NOTE: compute ILP plans here problem = ThroughputProblem( src=f"{src_provider}:{src_region}", dst=f"{dst_provider}:{dst_region}", @@ -198,27 +626,26 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: gbyte_to_transfer=1, instance_limit=self.n_instances, ) - + with path("skyplane.data", "throughput.csv") as solver_throughput_grid: tput = ThroughputSolverILP(solver_throughput_grid) solution = tput.solve_min_cost(problem, solver=ThroughputSolverILP.choose_solver(), save_lp_path=None) if not solution.is_feasible: raise ValueError("ILP solver failed to find a solution, try solving with fewer constraints") - + topo, _ = tput.to_replication_topology(solution) - - + plan = TopologyPlan(src_region_tag=src_region_tag, dest_region_tags=[dst_region_tag]) - + # TODO: use VM limits to determine how many instances to create in each region # TODO: support on-sided transfers but not requiring VMs to be created in source/destination regions for i in range(self.n_instances): plan.add_gateway(src_region_tag) plan.add_gateway(dst_region_tag) - + # ids of gateways in dst region dst_gateways = plan.get_region_gateways(dst_region_tag) - + src_program = GatewayProgram() dst_program = GatewayProgram() @@ -255,44 +682,15 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: plan.set_gateway_program(dst_region_tag, dst_program) return plan - -class UnicastRONSolverPlanner(Planner): - def __init__(self, n_instances: int, n_connections: int, required_throughput_gbits: float): - self.n_instances = n_instances - self.n_connections = n_connections - self.solver_required_throughput_gbits = required_throughput_gbits - super().__init__() - - def plan(self, jobs: List[TransferJob]) -> TopologyPlan: - raise NotImplementedError("RON solver not implemented yet") -class MulticastILPPlanner(Planner): - def __init__(self, n_instances: int, n_connections: int, required_throughput_gbits: float): - self.n_instances = n_instances - self.n_connections = n_connections +class UnicastRONSolverPlanner(Planner): + def __init__(self, n_instances: int, n_connections: int, required_throughput_gbits: float, n_partitions: Optional[int] = 1): + super().__init__(n_instances, n_connections, n_partitions) self.solver_required_throughput_gbits = required_throughput_gbits - super().__init__() - - def plan(self, jobs: List[TransferJob]) -> TopologyPlan: - raise NotImplementedError("ILP solver not implemented yet") - - -class MulticastMDSTPlanner(Planner): - def __init__(self, n_instances: int, n_connections: int): - self.n_instances = n_instances - self.n_connections = n_connections - super().__init__() - - def plan(self, jobs: List[TransferJob]) -> TopologyPlan: - raise NotImplementedError("MDST solver not implemented yet") - -class MulticastSteinerTreePlanner(Planner): - def __init__(self, n_instances: int, n_connections: int): - self.n_instances = n_instances - self.n_connections = n_connections - super().__init__() + def logical_plan(self, src_region: str, dst_regions: List[str]) -> nx.DiGraph: + return super().logical_plan(src_region, dst_regions) def plan(self, jobs: List[TransferJob]) -> TopologyPlan: - raise NotImplementedError("Steiner tree solver not implemented yet") + raise NotImplementedError("RON solver not implemented yet") diff --git a/skyplane/planner/solver.py b/skyplane/planner/solver.py index 5a18f1ed4..65e1a740d 100644 --- a/skyplane/planner/solver.py +++ b/skyplane/planner/solver.py @@ -15,6 +15,86 @@ GBIT_PER_GBYTE = 8 +@dataclass +class BroadcastProblem: + src: str + dsts: List[str] + + gbyte_to_transfer: float + instance_limit: int # max # of vms per region + num_partitions: int + + required_time_budget: float = 10 # ILP specific, default to 10s + + const_throughput_grid_gbits: Optional[np.ndarray] = None # if not set, load from profiles + const_cost_per_gb_grid: Optional[np.ndarray] = None # if not set, load from profiles + + # provider bandwidth limits (ingress, egress) + aws_instance_throughput_limit: Tuple[float, float] = (10, 5) + gcp_instance_throughput_limit: Tuple[float, float] = (16, 7) # limited to 12.5 gbps due to CPU limit + azure_instance_throughput_limit: Tuple[float, float] = (16, 16) # limited to 12.5 gbps due to CPU limit + + # benchmarked_throughput_connections is the number of connections that the iperf3 throughput grid was run at, + # we assume throughput is linear up to this connection limit + benchmarked_throughput_connections = 64 + cost_per_instance_hr = 0.54 # based on m5.8xlarge spot + instance_cost_multiplier = 1.0 + # instance_provision_time_s = 0.0 + + def to_summary_dict(self): + """Simple summary of the problem""" + return { + "src": self.src, + "dsts": self.dsts, + "gbyte_to_transfer": self.gbyte_to_transfer, + "instance_limit": self.instance_limit, + "num_partitions": self.num_partitions, + "required_time_budget": self.required_time_budget, + "aws_instance_throughput_limit": self.aws_instance_throughput_limit, + "gcp_instance_throughput_limit": self.gcp_instance_throughput_limit, + "azure_instance_throughput_limit": self.azure_instance_throughput_limit, + "benchmarked_throughput_connections": self.benchmarked_throughput_connections, + "cost_per_instance_hr": self.cost_per_instance_hr, + "instance_cost_multiplier": self.instance_cost_multiplier + # "instance_provision_time_s": self.instance_provision_time_s, + } + + +@dataclass +class BroadcastSolution: + problem: BroadcastProblem + is_feasible: bool + extra_data: Optional[Dict] = None + + var_edges: Optional[List] = None # need to fix this, just for testing + var_nodes: Optional[List] = None # need to fix this, just for testing + + # solution variables + var_edge_partitions: Optional[np.ndarray] = None # each edge carries each partition or not + var_node_transfer_partitions: Optional[np.ndarray] = None # whether node transfers partition + var_instances_per_region: Optional[np.ndarray] = None # number of VMs per region + var_flow: Optional[np.ndarray] = None # enforce flow conservation, just used for checking + + # solution values + cost_egress: Optional[float] = None + cost_instance: Optional[float] = None + cost_total: Optional[float] = None + transfer_runtime_s: Optional[float] = None # NOTE: might not be able to calculate here + throughput_achieved_gbits: Optional[List[float]] = None # NOTE: might not be able to calculate here + + def to_summary_dict(self): + """Print simple summary of solution.""" + return { + "is_feasible": self.is_feasible, + "solution": { + "cost_egress": self.cost_egress, + "cost_instance": self.cost_instance, + "cost_total": self.cost_total, + "time_budget": self.problem.required_time_budget, + }, + } + + @dataclass class ThroughputProblem: src: str From c59723c480f5be2d0067c57c69885115040c5204 Mon Sep 17 00:00:00 2001 From: Shu Liu Date: Tue, 2 May 2023 18:59:19 -0700 Subject: [PATCH 03/11] fix gateway operators --- poetry.lock | 14 +- pyproject.toml | 3 +- scripts/requirements-gateway.txt | 1 + skyplane/api/client.py | 4 - skyplane/api/config.py | 2 +- skyplane/api/dataplane.py | 6 +- skyplane/api/pipeline.py | 22 +- skyplane/api/tracker.py | 5 +- skyplane/api/transfer_job.py | 17 +- skyplane/api/usage.py | 2 +- skyplane/cli/impl/progress_bar.py | 2 +- skyplane/compute/server.py | 3 - skyplane/gateway/gateway_daemon.py | 6 +- skyplane/gateway/gateway_daemon_api.py | 3 +- skyplane/gateway/gateway_onprem.py | 3 - skyplane/gateway/gateway_program.py | 9 +- skyplane/obj_store/object_store_interface.py | 3 +- skyplane/obj_store/s3_interface.py | 2 + skyplane/planner/planner.py | 264 ++++++++----------- skyplane/planner/solver_ilp.py | 11 +- 20 files changed, 175 insertions(+), 207 deletions(-) diff --git a/poetry.lock b/poetry.lock index 4d0ec884e..1c60518da 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1790,6 +1790,18 @@ requests-oauthlib = ">=0.5.0" [package.extras] async = ["aiodns", "aiohttp (>=3.0)"] +[[package]] +name = "networkx" +version = "2.6.3" +description = "Python package for creating and manipulating graphs and networks" +category = "main" +optional = true +python-versions = ">=3.7" +files = [ + {file = "networkx-2.6.3-py3-none-any.whl", hash = "sha256:80b6b89c77d1dfb64a4c7854981b60aeea6360ac02c6d4e4913319e0a313abef"}, + {file = "networkx-2.6.3.tar.gz", hash = "sha256:c0946ed31d71f1b732b5aaa6da5a0388a345019af232ce2f49c766e2d6795c51"}, +] + [[package]] name = "mypy-extensions" version = "1.0.0" @@ -2891,7 +2903,7 @@ azure = ["azure-identity", "azure-mgmt-authorization", "azure-mgmt-compute", "az gateway = ["flask", "lz4", "pynacl", "pyopenssl", "werkzeug"] gcp = ["google-api-python-client", "google-auth", "google-cloud-compute", "google-cloud-storage"] ibm = ["ibm-cloud-sdk-core", "ibm-cos-sdk", "ibm-vpc"] -solver = ["cvxpy", "graphviz", "matplotlib", "numpy"] +solver = ["networkx", "cvxpy", "graphviz", "matplotlib", "numpy"] [metadata] lock-version = "2.0" diff --git a/pyproject.toml b/pyproject.toml index 6443f3ff3..e05a2b9f6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,6 +54,7 @@ cvxpy = { version = ">=1.1.0", optional = true } graphviz = { version = ">=0.15", optional = true } matplotlib = { version = ">=3.0.0", optional = true } numpy = { version = ">=1.19.0", optional = true } +networkx = { version = ">=2.5", optional = true } # gateway dependencies flask = { version = "^2.1.2", optional = true } @@ -70,7 +71,7 @@ gcp = ["google-api-python-client", "google-auth", "google-cloud-compute", "googl ibm = ["ibm-cloud-sdk-core", "ibm-cos-sdk", "ibm-vpc"] all = ["boto3", "azure-identity", "azure-mgmt-authorization", "azure-mgmt-compute", "azure-mgmt-network", "azure-mgmt-resource", "azure-mgmt-storage", "azure-mgmt-subscription", "azure-storage-blob", "google-api-python-client", "google-auth", "google-cloud-compute", "google-cloud-storage", "ibm-cloud-sdk-core", "ibm-cos-sdk", "ibm-vpc"] gateway = ["flask", "lz4", "pynacl", "pyopenssl", "werkzeug"] -solver = ["cvxpy", "graphviz", "matplotlib", "numpy"] +solver = ["networkx", "cvxpy", "graphviz", "matplotlib", "numpy"] [tool.poetry.dev-dependencies] pytest = ">=6.0.0" diff --git a/scripts/requirements-gateway.txt b/scripts/requirements-gateway.txt index 687f3aeeb..be1e298a1 100644 --- a/scripts/requirements-gateway.txt +++ b/scripts/requirements-gateway.txt @@ -33,3 +33,4 @@ numpy pandas pyarrow typer +networkx \ No newline at end of file diff --git a/skyplane/api/client.py b/skyplane/api/client.py index f7afae7e1..6156ded25 100644 --- a/skyplane/api/client.py +++ b/skyplane/api/client.py @@ -5,15 +5,11 @@ from typing import TYPE_CHECKING, Optional from skyplane.api.config import TransferConfig -from skyplane.api.dataplane import Dataplane from skyplane.api.provisioner import Provisioner from skyplane.api.obj_store import ObjectStore from skyplane.api.usage import get_clientid -from skyplane.obj_store.object_store_interface import ObjectStoreInterface -from skyplane.planner.planner import MulticastDirectPlanner from skyplane.utils import logger from skyplane.utils.definitions import tmp_log_dir -from skyplane.utils.path import parse_path from skyplane.api.pipeline import Pipeline diff --git a/skyplane/api/config.py b/skyplane/api/config.py index 2d1ff42ac..f2d69acd2 100644 --- a/skyplane/api/config.py +++ b/skyplane/api/config.py @@ -1,6 +1,6 @@ from dataclasses import dataclass -from typing import Optional, List +from typing import Optional from skyplane import compute diff --git a/skyplane/api/dataplane.py b/skyplane/api/dataplane.py index 6dd7bc0ad..a2d4f9fd0 100644 --- a/skyplane/api/dataplane.py +++ b/skyplane/api/dataplane.py @@ -1,7 +1,7 @@ import json import os import threading -from collections import defaultdict, Counter +from collections import defaultdict from datetime import datetime from functools import partial from datetime import datetime @@ -14,7 +14,7 @@ from skyplane import compute from skyplane.api.tracker import TransferProgressTracker, TransferHook -from skyplane.api.transfer_job import CopyJob, SyncJob, TransferJob +from skyplane.api.transfer_job import TransferJob from skyplane.api.config import TransferConfig from skyplane.planner.topology import TopologyPlan, TopologyPlanGateway from skyplane.utils import logger @@ -219,6 +219,8 @@ def copy_gateway_logs(self): # copy logs from all gateways in parallel def copy_log(instance): instance.run_command("sudo docker logs -t skyplane_gateway 2> /tmp/gateway.stderr > /tmp/gateway.stdout") + print("Downloading files...: ", self.transfer_dir / f"gateway_{instance.uuid()}.stdout") + print("Downloading file....: ", self.transfer_dir / f"gateway_{instance.uuid()}.stderr") instance.download_file("/tmp/gateway.stdout", self.transfer_dir / f"gateway_{instance.uuid()}.stdout") instance.download_file("/tmp/gateway.stderr", self.transfer_dir / f"gateway_{instance.uuid()}.stderr") diff --git a/skyplane/api/pipeline.py b/skyplane/api/pipeline.py index a47167a29..b19965f25 100644 --- a/skyplane/api/pipeline.py +++ b/skyplane/api/pipeline.py @@ -1,19 +1,12 @@ -import json -import time -import os import threading -from collections import defaultdict, Counter from datetime import datetime -from functools import partial from datetime import datetime -import nacl.secret -import nacl.utils import urllib3 from typing import TYPE_CHECKING, Dict, List, Optional from skyplane import compute -from skyplane.api.tracker import TransferProgressTracker, TransferHook +from skyplane.api.tracker import TransferProgressTracker from skyplane.api.transfer_job import CopyJob, SyncJob, TransferJob from skyplane.api.config import TransferConfig @@ -22,12 +15,11 @@ UnicastDirectPlanner, UnicastILPPlanner, MulticastILPPlanner, - MulticastMDSTPlanner + MulticastMDSTPlanner, ) from skyplane.planner.topology import TopologyPlanGateway from skyplane.utils import logger -from skyplane.utils.definitions import gateway_docker_image, tmp_log_dir -from skyplane.utils.fn import PathLike, do_parallel +from skyplane.utils.definitions import tmp_log_dir from skyplane.api.dataplane import Dataplane @@ -74,10 +66,12 @@ def __init__( # planner self.planning_algorithm = planning_algorithm - if self.planning_algorithm == "Ndirect": - self.planner = MulticastDirectPlanner(self.max_instances, num_connections) + if self.planning_algorithm == "direct": + # TODO: should find some ways to merge direct / Ndirect self.planner = UnicastDirectPlanner(self.max_instances, num_connections) + elif self.planning_algorithm == "Ndirect": + self.planner = MulticastDirectPlanner(self.max_instances, num_connections) elif self.planning_algorithm == "MDST": self.planner = MulticastMDSTPlanner(self.max_instances, num_connections) elif self.planning_algorithm == "ILP": @@ -127,7 +121,7 @@ def start(self, debug=False, progress=False): # copy gateway logs if debug: dp.copy_gateway_logs() - except Exception as e: + except Exception: dp.copy_gateway_logs() dp.deprovision(spinner=True) return dp diff --git a/skyplane/api/tracker.py b/skyplane/api/tracker.py index d98741191..10ce2d485 100644 --- a/skyplane/api/tracker.py +++ b/skyplane/api/tracker.py @@ -1,5 +1,4 @@ import functools -from pprint import pprint import json import time from abc import ABC @@ -214,7 +213,7 @@ def monitor_single_dst_helper(dst_region): } self.hooks.on_transfer_end() - start_time = int(time.time()) + int(time.time()) try: for job in self.jobs.values(): logger.fs.debug(f"[TransferProgressTracker] Finalizing job {job.uuid}") @@ -229,7 +228,7 @@ def monitor_single_dst_helper(dst_region): session_start_timestamp_ms, ) raise e - end_time = int(time.time()) + int(time.time()) # verify transfer try: diff --git a/skyplane/api/transfer_job.py b/skyplane/api/transfer_job.py index b4686178a..5611f96ea 100644 --- a/skyplane/api/transfer_job.py +++ b/skyplane/api/transfer_job.py @@ -12,14 +12,14 @@ from queue import Queue from typing import TYPE_CHECKING, Callable, Generator, List, Optional, Tuple, TypeVar, Dict -from abc import ABC, abstractmethod +from abc import ABC import urllib3 from rich import print as rprint from skyplane import exceptions from skyplane.api.config import TransferConfig -from skyplane.chunk import Chunk, ChunkRequest +from skyplane.chunk import Chunk from skyplane.obj_store.azure_blob_interface import AzureBlobObject from skyplane.obj_store.gcs_interface import GCSObject from skyplane.obj_store.storage_interface import StorageInterface @@ -97,6 +97,7 @@ def _run_multipart_chunk_thread( src_object = transfer_pair.src_obj dest_objects = transfer_pair.dst_objs dest_key = transfer_pair.dst_key + print("dest_key: ", dest_key) if isinstance(self.src_iface, ObjectStoreInterface): mime_type = self.src_iface.get_obj_mime_type(src_object.key) # create multipart upload request per destination @@ -269,10 +270,10 @@ def transfer_pair_generator( dest_provider, dest_region = dst_iface.region_tag().split(":") try: dest_key = self.map_object_key_prefix(src_prefix, obj.key, dst_prefix, recursive=recursive) - assert ( - dest_key[: len(dst_prefix)] == dst_prefix - ), f"Destination key {dest_key} does not start with destination prefix {dst_prefix}" - dest_keys.append(dest_key[len(dst_prefix) :]) + # TODO: why is this here? + # dest_keys.append(dest_key[len(dst_prefix) :]) + + dest_keys.append(dest_key) except exceptions.MissingObjectException as e: logger.fs.exception(e) raise e from None @@ -471,8 +472,12 @@ def dst_prefixes(self) -> List[str]: if not hasattr(self, "_dst_prefix"): if self.transfer_type == "unicast": self._dst_prefix = [str(parse_path(self.dst_paths[0])[2])] + print("return dst_prefixes for unicast", self._dst_prefix) else: + for path in self.dst_paths: + print("Parsing result for multicast", parse_path(path)) self._dst_prefix = [str(parse_path(path)[2]) for path in self.dst_paths] + print("return dst_prefixes for multicast", self._dst_prefix) return self._dst_prefix @property diff --git a/skyplane/api/usage.py b/skyplane/api/usage.py index 8fed9092f..cef075965 100644 --- a/skyplane/api/usage.py +++ b/skyplane/api/usage.py @@ -11,7 +11,7 @@ import requests from rich import print as rprint -from typing import Optional, Dict, List +from typing import Optional, Dict import skyplane from skyplane.utils.definitions import tmp_log_dir diff --git a/skyplane/cli/impl/progress_bar.py b/skyplane/cli/impl/progress_bar.py index 62fdb6632..d101d36cd 100644 --- a/skyplane/cli/impl/progress_bar.py +++ b/skyplane/cli/impl/progress_bar.py @@ -3,7 +3,7 @@ from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, DownloadColumn, TransferSpeedColumn, TimeRemainingColumn from skyplane import exceptions from skyplane.chunk import Chunk -from skyplane.cli.impl.common import console, print_stats_completed +from skyplane.cli.impl.common import console from skyplane.utils.definitions import format_bytes from skyplane.api.tracker import TransferHook diff --git a/skyplane/compute/server.py b/skyplane/compute/server.py index c4f9aacff..585773105 100644 --- a/skyplane/compute/server.py +++ b/skyplane/compute/server.py @@ -1,6 +1,5 @@ import json import logging -from pprint import pprint import os import socket from contextlib import closing @@ -14,12 +13,10 @@ from skyplane import compute from skyplane.compute.const_cmds import make_autoshutdown_script, make_dozzle_command, make_sysctl_tcp_tuning_command from skyplane.config_paths import config_path, cloud_config, __config_root__ -from skyplane.gateway.gateway_program import GatewayProgram from skyplane.utils import logger from skyplane.utils.fn import PathLike, wait_for from skyplane.utils.retry import retry_backoff from skyplane.utils.timer import Timer -from skyplane.planner.topology import TopologyPlanGateway tmp_log_dir = Path("/tmp/skyplane") diff --git a/skyplane/gateway/gateway_daemon.py b/skyplane/gateway/gateway_daemon.py index 98b739498..a76a4ee99 100644 --- a/skyplane/gateway/gateway_daemon.py +++ b/skyplane/gateway/gateway_daemon.py @@ -9,7 +9,7 @@ from multiprocessing import Event, Queue from os import PathLike from pathlib import Path -from typing import Dict, List, Optional +from typing import Dict, List from skyplane.utils import logger @@ -218,11 +218,11 @@ def create_gateway_operators_helper(input_queue, program: List[Dict], partition_ elif op["op_type"] == "send": # TODO: handle private ips for GCP->GCP target_gateway_info = self.gateway_info[op["target_gateway_id"]] - print("Gateway sender sending to ", target_gateway_info["private_ip_address"]) + print("Gateway sender sending to ", target_gateway_info["public_ip_address"]) operators[handle] = GatewaySender( handle, region=self.region, - ip_addr=target_gateway_info["private_ip_address"], + ip_addr=target_gateway_info["public_ip_address"], input_queue=input_queue, output_queue=output_queue, error_event=self.error_event, diff --git a/skyplane/gateway/gateway_daemon_api.py b/skyplane/gateway/gateway_daemon_api.py index c31cd1892..dbe784937 100644 --- a/skyplane/gateway/gateway_daemon_api.py +++ b/skyplane/gateway/gateway_daemon_api.py @@ -7,8 +7,7 @@ from multiprocessing.managers import DictProxy from queue import Empty from traceback import TracebackException -from typing import Dict, List, Tuple, Optional -import json +from typing import Dict, List, Tuple from flask import Flask, jsonify, request from werkzeug.serving import make_server diff --git a/skyplane/gateway/gateway_onprem.py b/skyplane/gateway/gateway_onprem.py index 9bd0e1d01..af6ffd0a2 100644 --- a/skyplane/gateway/gateway_onprem.py +++ b/skyplane/gateway/gateway_onprem.py @@ -1,6 +1,3 @@ -import psutil -from multiprocessing import Process - # TODO: migrate to programmable gateways # from skyplane.gateway.gateway_sender import GatewaySender # diff --git a/skyplane/gateway/gateway_program.py b/skyplane/gateway/gateway_program.py index 3714b6162..f24c1c35e 100644 --- a/skyplane/gateway/gateway_program.py +++ b/skyplane/gateway/gateway_program.py @@ -108,7 +108,7 @@ def add_operators(self, ops: List[GatewayOperator], parent_handle: Optional[str] parent_op = self._ops[parent_handle] if parent_handle else None ops_handles = [] for op in ops: - ops_handles.append(self.add_operator(op, parent_op, partition_id)) + ops_handles.append(self.add_operator(op, parent_op.handle, partition_id)) return ops_handles @@ -129,6 +129,8 @@ def to_dict(self): """ program_all = [] for partition_id, op_list in self._plan.items(): + partition_id = list(partition_id) # convert tuple to list + # build gateway program representation program = [] for op in op_list: @@ -138,11 +140,12 @@ def to_dict(self): exists = False for p in program_all: if p["value"] == program: # equivalent partition exists - p["partitions"].append(partition_id) + for pid in partition_id: + p["partitions"].append(pid) exists = True break if not exists: - program_all.append({"value": program, "partitions": [partition_id]}) + program_all.append({"value": program, "partitions": partition_id}) return program_all diff --git a/skyplane/obj_store/object_store_interface.py b/skyplane/obj_store/object_store_interface.py index 99b52501e..eb6466159 100644 --- a/skyplane/obj_store/object_store_interface.py +++ b/skyplane/obj_store/object_store_interface.py @@ -1,9 +1,8 @@ from dataclasses import dataclass -from typing import Iterator, List, Optional, Tuple +from typing import List, Optional, Tuple from skyplane.obj_store.storage_interface import StorageInterface -from skyplane.utils import logger @dataclass diff --git a/skyplane/obj_store/s3_interface.py b/skyplane/obj_store/s3_interface.py index 07c74af5a..97976a27c 100644 --- a/skyplane/obj_store/s3_interface.py +++ b/skyplane/obj_store/s3_interface.py @@ -193,6 +193,8 @@ def upload_object( ): dst_object_name, src_file_path = str(dst_object_name), str(src_file_path) s3_client = self._s3_client() + print("Destination object name: ", dst_object_name) + print("Source file path: ", src_file_path) assert len(dst_object_name) > 0, f"Destination object name must be non-empty: '{dst_object_name}'" b64_md5sum = base64.b64encode(check_md5).decode("utf-8") if check_md5 else None checksum_args = dict(ContentMD5=b64_md5sum) if b64_md5sum else dict() diff --git a/skyplane/planner/planner.py b/skyplane/planner/planner.py index 28d0180e4..257776444 100644 --- a/skyplane/planner/planner.py +++ b/skyplane/planner/planner.py @@ -2,9 +2,8 @@ from importlib.resources import path from typing import List, Optional, Tuple import numpy as np - +import collections import pandas as pd -from skyplane.obj_store.storage_interface import StorageInterface from skyplane.planner.solver_ilp import ThroughputSolverILP from skyplane.planner.solver import ThroughputProblem, BroadcastProblem, BroadcastSolution, GBIT_PER_GBYTE from skyplane import compute @@ -22,6 +21,7 @@ from skyplane.api.transfer_job import TransferJob from pathlib import Path from importlib.resources import files +from random import sample class Planner: @@ -29,44 +29,43 @@ def __init__(self, n_instances: int, n_connections: int, n_partitions: Optional[ self.n_instances = n_instances self.n_connections = n_connections self.n_partitions = n_partitions - self.G = self.make_nx_graph() def logical_plan(self) -> nx.DiGraph: + # create logical plan in nx.DiGraph format raise NotImplementedError def plan(self, jobs: List[TransferJob]) -> TopologyPlan: + # create physical plan in TopologyPlan format raise NotImplementedError - def check_src_dst(self, jobs: List[TransferJob], multicast=False) -> Tuple[str, List[str]]: + def verify_job_src_dsts(self, jobs: List[TransferJob], multicast=False) -> Tuple[str, List[str]]: src_region_tag = jobs[0].src_iface.region_tag() - # jobs must have same sources and destinations - if not multicast: + if multicast: + # multicast checking + dst_region_tags = [iface.region_tag() for iface in jobs[0].dst_ifaces] + + # jobs must have same sources and destinations + for job in jobs[1:]: + assert job.src_iface.region_tag() == src_region_tag, "All jobs must have same source region" + assert [iface.region_tag() for iface in job.dst_ifaces] == dst_region_tags, "Add jobs must have same destination set" + else: + # unicast checking for job in jobs: assert len(job.dst_ifaces) == 1, f"DirectPlanner only support single destination jobs, got {len(job.dst_ifaces)}" + # jobs must have same sources and destinations dst_region_tag = jobs[0].dst_ifaces[0].region_tag() for job in jobs[1:]: assert job.src_iface.region_tag() == src_region_tag, "All jobs must have same source region" assert job.dst_ifaces[0].region_tag() == dst_region_tag, "All jobs must have same destination region" - dst_region_tag = [dst_region_tag] - - else: - dst_region_tags = [iface.region_tag() for iface in jobs[0].dst_ifaces] - for job in jobs[1:]: - assert job.src_iface.region_tag() == src_region_tag, "All jobs must have same source region" - assert [iface.region_tag() for iface in job.dst_ifaces] == dst_region_tags, "Add jobs must have same destination set" + dst_region_tags = [dst_region_tag] return src_region_tag, dst_region_tags @functools.lru_cache(maxsize=None) - def get_path_cost(self, src, dst, src_tier="PREMIUM", dst_tier="PREMIUM"): - from skyplane.compute.cloud_provider import CloudProvider - - assert src_tier == "PREMIUM" and dst_tier == "PREMIUM" - return CloudProvider.get_transfer_cost(src, dst) - def make_nx_graph(self, tp_grid_path: Optional[Path] = files("skyplane.data") / "throughput.csv") -> nx.DiGraph: + # create throughput / cost graph for all regions for planner G = nx.DiGraph() throughput = pd.read_csv(tp_grid_path) for _, row in throughput.iterrows(): @@ -74,29 +73,29 @@ def make_nx_graph(self, tp_grid_path: Optional[Path] = files("skyplane.data") / continue G.add_edge(row["src_region"], row["dst_region"], cost=None, throughput=row["throughput_sent"] / 1e9) - # update the cost using skyplane.compute tools [i.e. in utils.py] (need to build the skyplane repo first) for edge in G.edges.data(): if edge[-1]["cost"] is None: - edge[-1]["cost"] = self.get_path_cost(edge[0], edge[1]) + edge[-1]["cost"] = compute.CloudProvider.get_transfer_cost(edge[0], edge[1]) assert all([edge[-1]["cost"] is not None for edge in G.edges.data()]) return G - def add_operator_receive_send( + def add_src_or_overlay_operator( self, - solution_graph, + solution_graph: nx.DiGraph, gateway_program: GatewayProgram, region: str, partition_ids: List[int], partition_offset: int, plan: TopologyPlan, - obj_store: Optional[Tuple[str, str]] = None, - dst_op: Optional[GatewayReceive] = None, + obj_store: Optional[Tuple[str, str]] = None, # obj_store is None if this is not the source node + dst_op: Optional[GatewayReceive] = None, # dst_op is None if this is not the destination node ) -> bool: if dst_op is not None: receive_op = dst_op else: if obj_store is None: + # TODO: add generate data locally operator receive_op = GatewayReceive() else: receive_op = GatewayReadObjectStore( @@ -106,7 +105,7 @@ def add_operator_receive_send( # find set of regions to send to for all partitions in partition_ids g = solution_graph - # check correctness + # partition_ids are ids that follow the same path from the out edges of the region any_id = partition_ids[0] - partition_offset next_regions = set([edge[1] for edge in g.out_edges(region, data=True) if str(any_id) in edge[-1]["partitions"]]) @@ -119,7 +118,9 @@ def add_operator_receive_send( region_to_id_map = {} for next_region in next_regions: - region_to_id_map[next_region] = plan.get_region_gateways(next_region).gateway_id + region_to_id_map[next_region] = [] + for i in range(solution_graph.nodes[next_region]["num_vms"]): + region_to_id_map[next_region].append(plan.get_region_gateways(next_region)[i].gateway_id) # use muxand or muxor for partition_id operation = "MUX_AND" if len(next_regions) > 1 else "MUX_OR" @@ -134,36 +135,38 @@ def add_operator_receive_send( if dst_op is not None and dst_op.op_type == "mux_and": mux_op = receive_op else: # do not add any nested mux_and if dst_op parent is mux_and - gateway_program.add_operator(op=mux_op, parent_handle=receive_op, partition_id=tuple(partition_ids)) + gateway_program.add_operator(op=mux_op, parent_handle=receive_op.handle, partition_id=tuple(partition_ids)) for next_region, next_region_ids in region_to_id_map.items(): - send_ops = [GatewaySend(ip, num_connections=self.n_connections, region=next_region) for ip in next_region_ids] + send_ops = [ + GatewaySend(target_gateway_id=id, region=next_region, num_connections=self.n_connections) for id in next_region_ids + ] - # if next region has >1 gateways, add MUX_OR + # if there is more than one region to forward data to, add MUX_OR if len(next_region_ids) > 1: mux_or_op = GatewayMuxOr() - gateway_program.add_operator(op=mux_or_op, parent_handle=mux_op, partition_id=tuple(partition_ids)) - gateway_program.add_operators(ops=send_ops, parent_handle=mux_or_op, partition_id=tuple(partition_ids)) + gateway_program.add_operator(op=mux_or_op, parent_handle=mux_op.handle, partition_id=tuple(partition_ids)) + gateway_program.add_operators(ops=send_ops, parent_handle=mux_or_op.handle, partition_id=tuple(partition_ids)) else: # otherwise, the parent of send_op is mux_op ("MUX_AND") assert len(send_ops) == 1 - gateway_program.add_operator(op=send_ops[0], parent_handle=mux_op, partition_id=tuple(partition_ids)) + gateway_program.add_operator(op=send_ops[0], parent_handle=mux_op.handle, partition_id=tuple(partition_ids)) else: # only send this partition to a single region assert len(region_to_id_map) == 1 next_region = list(region_to_id_map.keys())[0] - ips = [ip for next_region_ids in region_to_id_map.values() for ip in next_region_ids] + ids = [id for next_region_ids in region_to_id_map.values() for id in next_region_ids] - num_connections = int(self.n_connections / len(ips)) - send_ops = [GatewaySend(ip, num_connections=num_connections, region=next_region) for ip in ips] + num_connections = int(self.n_connections / len(ids)) + send_ops = [GatewaySend(target_gateway_id=id, region=next_region, num_connections=num_connections) for id in ids] # if num of gateways > 1, then connect to MUX_OR - if len(ips) > 1: - gateway_program.add_operator(op=mux_op, parent_handle=receive_op, partition_id=tuple(partition_ids)) - gateway_program.add_operators(ops=send_ops, parent_handle=mux_op) + if len(ids) > 1: + gateway_program.add_operator(op=mux_op, parent_handle=receive_op.handle, partition_id=tuple(partition_ids)) + gateway_program.add_operators(ops=send_ops, parent_handle=mux_op.handle) else: - gateway_program.add_operators(op=send_ops, parent_handle=receive_op, partition_id=tuple(partition_ids)) + gateway_program.add_operators(ops=send_ops, parent_handle=receive_op.handle, partition_id=tuple(partition_ids)) return True @@ -177,68 +180,71 @@ def add_dst_operator( plan: TopologyPlan, obj_store: Tuple[str, str] = None, ): + # operator that receives data receive_op = GatewayReceive() gateway_program.add_operator(receive_op, partition_id=tuple(partition_ids)) - # write + # operator that writes to the object store write_op = GatewayWriteObjectStore(bucket_name=obj_store[0], bucket_region=obj_store[1], num_connections=self.n_connections) g = solution_graph + + # partition_ids are ids that follow the same path from the out edges of the region any_id = partition_ids[0] - partition_offset next_regions = set([edge[1] for edge in g.out_edges(region, data=True) if str(any_id) in edge[-1]["partitions"]]) - # if no regions to forward data to, just write + # if no regions to forward data to, write to the object store if len(next_regions) == 0: - gateway_program.add_operator(write_op, receive_op, partition_id=tuple(partition_ids)) - else: # otherwise, "and" --> write and forward + gateway_program.add_operator(write_op, receive_op.handle, partition_id=tuple(partition_ids)) + + # otherwise, receive and write to the object store, then forward data to next regions + else: mux_and_op = GatewayMuxAnd() - gateway_program.add_operator(mux_and_op, receive_op, partition_id=tuple(partition_ids)) - gateway_program.add_operator(write_op, mux_and_op, partition_id=tuple(partition_ids)) - self.add_operator_receive_send( + # receive and write + gateway_program.add_operator(mux_and_op, receive_op.handle, partition_id=tuple(partition_ids)) + gateway_program.add_operator(write_op, mux_and_op.handle, partition_id=tuple(partition_ids)) + + # forward: destination nodes are also forwarders + self.add_src_or_overlay_operator( solution_graph, gateway_program, region, partition_ids, partition_offset, plan, dst_op=mux_and_op ) - def solution_to_gateway_programs( + def logical_plan_to_gateway_programs( self, - src_ifaces: List[StorageInterface], - dst_ifaces: List[List[StorageInterface]], + jobs: List[TransferJob], solution_graph: nx.graph, num_partitions: int = 1, ) -> TopologyPlan: + # get source and destination regions + src_ifaces, dst_ifaces = [job.src_iface for job in jobs], [job.dst_ifaces for job in jobs] src_region_tag = src_ifaces[0].region_tag() dst_region_tags = [dst_iface.region_tag() for dst_iface in dst_ifaces[0]] # map from the node to the gateway program - node_to_program = {node: GatewayProgram() for node in solution_graph.nodes} + region_to_gateway_program = {region: GatewayProgram() for region in solution_graph.nodes} - # construct a plan for each node + # construct TopologyPlan for all the regions in solution_graph overlay_region_tags = [node for node in solution_graph.nodes if node != src_region_tag and node not in dst_region_tags] plan = TopologyPlan(src_region_tag=src_region_tag, dest_region_tags=dst_region_tags, overlay_region_tags=overlay_region_tags) - - # iterate through all the nodes for node in solution_graph.nodes: plan.add_gateway(node) - def remap_keys(self, mapping): - return [{"partitions": k, "value": v} for k, v in mapping.items()] - # iterate through all the jobs for i in range(len(src_ifaces)): src_bucket = src_ifaces[i].bucket() - dst_buckets = {dst_iface.region_tag(): dst_iface[i].bucket() for dst_iface in dst_ifaces} + dst_buckets = {dst_iface[i].region_tag(): dst_iface[i].bucket() for dst_iface in dst_ifaces} - # iterate through all the nodes + # iterate through all the regions in the solution graph for node in solution_graph.nodes: - node_gateway_program = node_to_program[node] + node_gateway_program = region_to_gateway_program[node] partition_to_next_regions = {} + # give each job a different partition offset i, so we can read/write to different buckets for j in range(i, i + num_partitions): partition_to_next_regions[j] = set( [edge[1] for edge in solution_graph.out_edges(node, data=True) if str(j) in edge[-1]["partitions"]] ) - import collections - keys_per_set = collections.defaultdict(list) for key, value in partition_to_next_regions.items(): keys_per_set[frozenset(value)].append(key) @@ -248,7 +254,7 @@ def remap_keys(self, mapping): # source node: read from object store or generate random data, then forward data for partitions in list_of_partitions: if node == src_region_tag: - self.add_operator_receive_send( + self.add_src_or_overlay_operator( solution_graph, node_gateway_program, node, @@ -273,15 +279,15 @@ def remap_keys(self, mapping): # overlay node only forward data else: - self.add_operator_receive_send( + self.add_src_or_overlay_operator( solution_graph, node_gateway_program, node, partitions, partition_offset=i, plan=plan, obj_store=None ) - - node_to_program[node] = remap_keys(node_gateway_program.to_dict()) - assert len(node_to_program[node]) > 0, f"Empty gateway program {node}" + region_to_gateway_program[node] = node_gateway_program + assert len(region_to_gateway_program) > 0, f"Empty gateway program {node}" for node in solution_graph.nodes: - plan.set_gateway_program(node, node_to_program[node]) + plan.set_gateway_program(node, region_to_gateway_program[node]) + # print(f"Gateway program for {node}: {region_to_gateway_program[node].to_dict()}") for edge in solution_graph.edges.data(): src_region, dst_region = edge[0], edge[1] @@ -301,13 +307,17 @@ def logical_plan(self, src_region: str, dst_regions: List[str]) -> nx.DiGraph: graph.add_node(src_region) for dst_region in dst_regions: graph.add_node(dst_region) - graph.add_edge(src_region, dst_region, partitions=list(range(self.n_partitions))) + graph.add_edge(src_region, dst_region, partitions=[str(i) for i in range(self.n_partitions)]) + + for node in graph.nodes: + graph.nodes[node]["num_vms"] = self.n_instances + return graph def plan(self, jobs: List[TransferJob]) -> TopologyPlan: - src_region_tag, dst_region_tags = self.check_src_dst(jobs) + src_region_tag, dst_region_tags = self.verify_job_src_dsts(jobs) solution_graph = self.logical_plan(src_region_tag, dst_region_tags) - return self.solution_to_gateway_programs([job.src_iface for job in jobs], [job.dst_ifaces for job in jobs], solution_graph) + return self.logical_plan_to_gateway_programs(jobs, solution_graph) class MulticastILPPlanner(Planner): @@ -315,7 +325,7 @@ def __init__( self, n_instances: int, n_connections: int, - target_time: float, + target_time: Optional[float] = 10000, n_partitions: Optional[int] = 1, aws_only: bool = False, gcp_only: bool = False, @@ -326,6 +336,7 @@ def __init__( self.aws_only = aws_only self.gcp_only = gcp_only self.azure_only = azure_only + self.G = super().make_nx_graph() def multicast_solution_to_nxgraph(self, solution: BroadcastSolution) -> nx.DiGraph: """ @@ -383,8 +394,6 @@ def logical_plan( g = self.G # node-approximation - from random import sample - if filter_node: src_dst_li = [problem.src] + problem.dsts sampled = [i for i in sample(list(self.G.nodes), 15) if i not in src_dst_li] @@ -467,10 +476,6 @@ def logical_plan( # edge approximation if filter_edge: - # hop limit = 2: either source is source node, and/or dest is terminal node - # all other edges must be 0 - # alternative: filter edges to matchi this - print("Filter edge") for edge in edges: if edge[0] != problem.src and edge[1] not in problem.dsts: # cannot be in graph @@ -550,14 +555,15 @@ def logical_plan( return self.multicast_solution_to_nxgraph(solution) def plan(self, jobs: List[TransferJob]) -> TopologyPlan: - src_region_tag, dst_region_tags = self.check_src_dst(jobs, multicast=True) + src_region_tag, dst_region_tags = self.verify_job_src_dsts(jobs, multicast=True) solution_graph = self.logical_plan(src_region_tag, dst_region_tags) - return self.solution_to_gateway_programs([job.src_iface for job in jobs], [job.dst_ifaces for job in jobs], solution_graph) + return self.logical_plan_to_gateway_programs(jobs, solution_graph) class MulticastMDSTPlanner(Planner): def __init__(self, n_instances: int, n_connections: int, n_partitions: Optional[int] = 1): super().__init__(n_instances, n_connections, n_partitions) + self.G = super().make_nx_graph() def logical_plan(self, src_region: str, dst_regions: List[str]) -> nx.DiGraph: h = self.G.copy() @@ -572,12 +578,15 @@ def logical_plan(self, src_region: str, dst_regions: List[str]) -> nx.DiGraph: s, d = edge[0], edge[1] MDST_graph.add_edge(s, d, partitions=[str(i) for i in list(range(self.num_partitions))]) + for node in MDST_graph.nodes: + MDST_graph.nodes[node]["num_vms"] = self.n_instances + return MDST_graph def plan(self, jobs: List[TransferJob]) -> TopologyPlan: - src_region_tag, dst_region_tags = self.check_src_dst(jobs, multicast=True) + src_region_tag, dst_region_tags = self.verify_job_src_dsts(jobs, multicast=True) solution_graph = self.logical_plan(src_region_tag, dst_region_tags) - return self.solution_to_gateway_programs([job.src_iface for job in jobs], [job.dst_ifaces for job in jobs], solution_graph) + return self.logical_plan_to_gateway_programs(jobs, solution_graph) class UnicastDirectPlanner(Planner): @@ -589,13 +598,17 @@ def logical_plan(self, src_region: str, dst_regions: List[str]) -> nx.DiGraph: graph.add_node(src_region) for dst_region in dst_regions: graph.add_node(dst_region) - graph.add_edge(src_region, dst_region, partitions=list(range(self.n_partitions))) + graph.add_edge(src_region, dst_region, partitions=[str(i) for i in range(self.n_partitions)]) + + for node in graph.nodes: + graph.nodes[node]["num_vms"] = self.n_instances + return graph def plan(self, jobs: List[TransferJob]) -> TopologyPlan: - src_region_tag, dst_region_tag = self.check_src_dst(jobs) + src_region_tag, dst_region_tag = self.verify_job_src_dsts(jobs) solution_graph = self.logical_plan(src_region_tag, dst_region_tag) - return self.solution_to_gateway_programs([job.src_iface for job in jobs], [job.dst_ifaces for job in jobs], solution_graph) + return self.logical_plan_to_gateway_programs(jobs, solution_graph) class UnicastILPPlanner(Planner): @@ -603,25 +616,10 @@ def __init__(self, n_instances: int, n_connections: int, required_throughput_gbi super().__init__(n_instances, n_connections, n_partitions) self.solver_required_throughput_gbits = required_throughput_gbits - def plan(self, jobs: List[TransferJob]) -> TopologyPlan: - # make sure only single destination - for job in jobs: - assert len(job.dst_ifaces) == 1, f"DirectPlanner only support single destination jobs, got {len(job.dst_ifaces)}" - - src_region_tag = jobs[0].src_iface.region_tag() - src_provider, src_region = src_region_tag.split(":") - dst_region_tag = jobs[0].dst_ifaces[0].region_tag() - dst_provider, dst_region = dst_region_tag.split(":") - - # jobs must have same sources and destinations - for job in jobs[1:]: - assert job.src_iface.region_tag() == src_region_tag, "All jobs must have same source region" - assert job.dst_ifaces[0].region_tag() == dst_region_tag, "All jobs must have same destination region" - - # NOTE: compute ILP plans here + def logical_plan(self, src_region: str, dst_regions: List[str]) -> nx.DiGraph: problem = ThroughputProblem( - src=f"{src_provider}:{src_region}", - dst=f"{dst_provider}:{dst_region}", + src=src_region, + dst=dst_regions, required_throughput_gbits=self.solver_required_throughput_gbits, gbyte_to_transfer=1, instance_limit=self.n_instances, @@ -629,59 +627,17 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: with path("skyplane.data", "throughput.csv") as solver_throughput_grid: tput = ThroughputSolverILP(solver_throughput_grid) - solution = tput.solve_min_cost(problem, solver=ThroughputSolverILP.choose_solver(), save_lp_path=None) - if not solution.is_feasible: - raise ValueError("ILP solver failed to find a solution, try solving with fewer constraints") - - topo, _ = tput.to_replication_topology(solution) - - plan = TopologyPlan(src_region_tag=src_region_tag, dest_region_tags=[dst_region_tag]) - - # TODO: use VM limits to determine how many instances to create in each region - # TODO: support on-sided transfers but not requiring VMs to be created in source/destination regions - for i in range(self.n_instances): - plan.add_gateway(src_region_tag) - plan.add_gateway(dst_region_tag) - - # ids of gateways in dst region - dst_gateways = plan.get_region_gateways(dst_region_tag) + solution = tput.solve_min_cost(problem) - src_program = GatewayProgram() - dst_program = GatewayProgram() - - for job in jobs: - src_bucket = job.src_iface.bucket() - dst_bucket = job.dst_ifaces[0].bucket() - - # give each job a different partition id, so we can read/write to different buckets - partition_id = jobs.index(job) - - # source region gateway program - obj_store_read = src_program.add_operator( - GatewayReadObjectStore(src_bucket, src_region_tag, self.n_connections), partition_id=partition_id - ) - mux_or = src_program.add_operator(GatewayMuxOr(), parent_handle=obj_store_read, partition_id=partition_id) - for i in range(self.n_instances): - src_program.add_operator( - GatewaySend(target_gateway_id=dst_gateways[i].gateway_id, region=src_region_tag, num_connections=self.n_connections), - parent_handle=mux_or, - partition_id=partition_id, - ) - - # dst region gateway program - recv_op = dst_program.add_operator(GatewayReceive(), partition_id=partition_id) - dst_program.add_operator( - GatewayWriteObjectStore(dst_bucket, dst_region_tag, self.n_connections), parent_handle=recv_op, partition_id=partition_id - ) - - # update cost per GB - plan.cost_per_gb += compute.CloudProvider.get_transfer_cost(src_region_tag, dst_region_tag) + if not solution.is_feasible: + raise RuntimeError("No feasible solution found") - # set gateway programs - plan.set_gateway_program(src_region_tag, src_program) - plan.set_gateway_program(dst_region_tag, dst_program) + return tput.to_replication_topology(solution) - return plan + def plan(self, jobs: List[TransferJob]) -> TopologyPlan: + src_region_tag, dst_region_tag = self.verify_job_src_dsts(jobs) + solution_graph = self.logical_plan(src_region_tag, dst_region_tag) + return self.logical_plan_to_gateway_programs(jobs, solution_graph) class UnicastRONSolverPlanner(Planner): @@ -690,7 +646,7 @@ def __init__(self, n_instances: int, n_connections: int, required_throughput_gbi self.solver_required_throughput_gbits = required_throughput_gbits def logical_plan(self, src_region: str, dst_regions: List[str]) -> nx.DiGraph: - return super().logical_plan(src_region, dst_regions) + raise NotImplementedError("RON solver not implemented yet") def plan(self, jobs: List[TransferJob]) -> TopologyPlan: raise NotImplementedError("RON solver not implemented yet") diff --git a/skyplane/planner/solver_ilp.py b/skyplane/planner/solver_ilp.py index e81dbac98..b159572b5 100644 --- a/skyplane/planner/solver_ilp.py +++ b/skyplane/planner/solver_ilp.py @@ -1,18 +1,23 @@ -import cvxpy as cp # type: ignore - from skyplane.planner.solver import ThroughputSolver, ThroughputProblem, GBIT_PER_GBYTE, ThroughputSolution class ThroughputSolverILP(ThroughputSolver): @staticmethod def choose_solver(): + import cvxpy as cp # type: ignore + installed = cp.installed_solvers() order = ["GUROBI", "CBC", "GLPK_MI"] for package in order: if package in installed: return getattr(cp, package) - def solve_min_cost(self, p: ThroughputProblem, solver=cp.GLPK, solver_verbose=False, save_lp_path=None) -> ThroughputSolution: + def solve_min_cost(self, p: ThroughputProblem, solver=None, solver_verbose=False, save_lp_path=None) -> ThroughputSolution: + import cvxpy as cp # type: ignore + + if solver == None: + solver = cp.GLPK # default + regions = self.get_regions() sources = [regions.index(p.src)] sinks = [regions.index(p.dst)] From 7592f5f23523870864b0fce16ea07dd5a077da54 Mon Sep 17 00:00:00 2001 From: Shu Liu Date: Thu, 4 May 2023 11:51:51 -0700 Subject: [PATCH 04/11] reformat gateway program generator --- skyplane/api/dataplane.py | 6 ++- skyplane/api/transfer_job.py | 2 +- skyplane/planner/planner.py | 89 +++++++++++++++++++----------------- 3 files changed, 52 insertions(+), 45 deletions(-) diff --git a/skyplane/api/dataplane.py b/skyplane/api/dataplane.py index a2d4f9fd0..688d061a4 100644 --- a/skyplane/api/dataplane.py +++ b/skyplane/api/dataplane.py @@ -8,6 +8,7 @@ import nacl.secret import nacl.utils +import typer import urllib3 from pathlib import Path from typing import TYPE_CHECKING, Dict, List, Optional @@ -218,9 +219,10 @@ def provision( def copy_gateway_logs(self): # copy logs from all gateways in parallel def copy_log(instance): + typer.secho(f"Downloading log: {self.transfer_dir}/gateway_{instance.uuid()}.stdout", fg="bright_black") + typer.secho(f"Downloading log: {self.transfer_dir}/gateway_{instance.uuid()}.stderr", fg="bright_black") + instance.run_command("sudo docker logs -t skyplane_gateway 2> /tmp/gateway.stderr > /tmp/gateway.stdout") - print("Downloading files...: ", self.transfer_dir / f"gateway_{instance.uuid()}.stdout") - print("Downloading file....: ", self.transfer_dir / f"gateway_{instance.uuid()}.stderr") instance.download_file("/tmp/gateway.stdout", self.transfer_dir / f"gateway_{instance.uuid()}.stdout") instance.download_file("/tmp/gateway.stderr", self.transfer_dir / f"gateway_{instance.uuid()}.stderr") diff --git a/skyplane/api/transfer_job.py b/skyplane/api/transfer_job.py index 5611f96ea..596f437f9 100644 --- a/skyplane/api/transfer_job.py +++ b/skyplane/api/transfer_job.py @@ -270,7 +270,7 @@ def transfer_pair_generator( dest_provider, dest_region = dst_iface.region_tag().split(":") try: dest_key = self.map_object_key_prefix(src_prefix, obj.key, dst_prefix, recursive=recursive) - # TODO: why is this here? + # TODO: why is it changed here? # dest_keys.append(dest_key[len(dst_prefix) :]) dest_keys.append(dest_key) diff --git a/skyplane/planner/planner.py b/skyplane/planner/planner.py index 257776444..0cd7f67d0 100644 --- a/skyplane/planner/planner.py +++ b/skyplane/planner/planner.py @@ -88,34 +88,46 @@ def add_src_or_overlay_operator( partition_ids: List[int], partition_offset: int, plan: TopologyPlan, - obj_store: Optional[Tuple[str, str]] = None, # obj_store is None if this is not the source node - dst_op: Optional[GatewayReceive] = None, # dst_op is None if this is not the destination node + bucket_info: Optional[Tuple[str, str]] = None, + dst_op: Optional[GatewayReceive] = None, ) -> bool: - if dst_op is not None: - receive_op = dst_op - else: - if obj_store is None: - # TODO: add generate data locally operator - receive_op = GatewayReceive() - else: - receive_op = GatewayReadObjectStore( - bucket_name=obj_store[0], bucket_region=obj_store[1], num_connections=self.n_connections - ) - - # find set of regions to send to for all partitions in partition_ids - g = solution_graph - - # partition_ids are ids that follow the same path from the out edges of the region + """ + :param solution_graph: nx.DiGraph of solution + :param gateway_program: GatewayProgram of region to add operator to + :param region: region to add operator to + :param partition_ids: list of partition ids to add operator to + :param partition_offset: offset of partition ids + :param plan: TopologyPlan of solution [for getting gateway ids] + :param bucket_info: tuple of (bucket_name, bucket_region) for object store + :param dst_op: if None, then this is either the source node or a overlay node; otherwise, this is the destination overlay node + """ + # partition_ids are set of ids that follow the same path from the out edges of the region any_id = partition_ids[0] - partition_offset next_regions = set([edge[1] for edge in g.out_edges(region, data=True) if str(any_id) in edge[-1]["partitions"]]) - # if no regions to forward data to + # if partition_ids does not have a next region, then we cannot add an operator if len(next_regions) == 0: print( f"Region {region}, any id: {any_id}, partition ids: {partition_ids}, has no next region to forward data to: {g.out_edges(region, data=True)}" ) - return False + return + # identify if this is a destination overlay node or not + if dst_op is None: + # source node or overlay node + # TODO: add generate data locally operator + if bucket_info is None: + receive_op = GatewayReceive() + else: + receive_op = GatewayReadObjectStore( + bucket_name=bucket_info[0], bucket_region=bucket_info[1], num_connections=self.n_connections + ) + else: + # destination overlay node, dst_op is the parent node + receive_op = dst_op + + # find set of regions to send to for all partitions in partition_ids + g = solution_graph region_to_id_map = {} for next_region in next_regions: region_to_id_map[next_region] = [] @@ -154,12 +166,9 @@ def add_src_or_overlay_operator( else: # only send this partition to a single region assert len(region_to_id_map) == 1 - next_region = list(region_to_id_map.keys())[0] ids = [id for next_region_ids in region_to_id_map.values() for id in next_region_ids] - - num_connections = int(self.n_connections / len(ids)) - send_ops = [GatewaySend(target_gateway_id=id, region=next_region, num_connections=num_connections) for id in ids] + send_ops = [GatewaySend(target_gateway_id=id, region=next_region, num_connections=self.n_connections) for id in ids] # if num of gateways > 1, then connect to MUX_OR if len(ids) > 1: @@ -195,26 +204,24 @@ def add_dst_operator( # if no regions to forward data to, write to the object store if len(next_regions) == 0: - gateway_program.add_operator(write_op, receive_op.handle, partition_id=tuple(partition_ids)) + gateway_program.add_operator(write_op, parent_handle=receive_op.handle, partition_id=tuple(partition_ids)) # otherwise, receive and write to the object store, then forward data to next regions else: mux_and_op = GatewayMuxAnd() # receive and write - gateway_program.add_operator(mux_and_op, receive_op.handle, partition_id=tuple(partition_ids)) - gateway_program.add_operator(write_op, mux_and_op.handle, partition_id=tuple(partition_ids)) + gateway_program.add_operator(mux_and_op, parent_handle=receive_op.handle, partition_id=tuple(partition_ids)) + gateway_program.add_operator(write_op, parent_handle=mux_and_op.handle, partition_id=tuple(partition_ids)) # forward: destination nodes are also forwarders self.add_src_or_overlay_operator( solution_graph, gateway_program, region, partition_ids, partition_offset, plan, dst_op=mux_and_op ) - def logical_plan_to_gateway_programs( - self, - jobs: List[TransferJob], - solution_graph: nx.graph, - num_partitions: int = 1, - ) -> TopologyPlan: + def logical_plan_to_topology_plan(self, jobs: List[TransferJob], solution_graph: nx.graph) -> TopologyPlan: + """ + Given a logical plan, construct a gateway program for each region in the logical plan for the given jobs. + """ # get source and destination regions src_ifaces, dst_ifaces = [job.src_iface for job in jobs], [job.dst_ifaces for job in jobs] src_region_tag = src_ifaces[0].region_tag() @@ -240,7 +247,7 @@ def logical_plan_to_gateway_programs( partition_to_next_regions = {} # give each job a different partition offset i, so we can read/write to different buckets - for j in range(i, i + num_partitions): + for j in range(i, i + self.n_partitions): partition_to_next_regions[j] = set( [edge[1] for edge in solution_graph.out_edges(node, data=True) if str(j) in edge[-1]["partitions"]] ) @@ -287,12 +294,11 @@ def logical_plan_to_gateway_programs( for node in solution_graph.nodes: plan.set_gateway_program(node, region_to_gateway_program[node]) - # print(f"Gateway program for {node}: {region_to_gateway_program[node].to_dict()}") for edge in solution_graph.edges.data(): src_region, dst_region = edge[0], edge[1] plan.cost_per_gb += compute.CloudProvider.get_transfer_cost(src_region, dst_region) * ( - len(edge[-1]["partitions"]) / num_partitions + len(edge[-1]["partitions"]) / self.n_partitions ) return plan @@ -311,13 +317,12 @@ def logical_plan(self, src_region: str, dst_regions: List[str]) -> nx.DiGraph: for node in graph.nodes: graph.nodes[node]["num_vms"] = self.n_instances - return graph def plan(self, jobs: List[TransferJob]) -> TopologyPlan: src_region_tag, dst_region_tags = self.verify_job_src_dsts(jobs) solution_graph = self.logical_plan(src_region_tag, dst_region_tags) - return self.logical_plan_to_gateway_programs(jobs, solution_graph) + return self.logical_plan_to_topology_plan(jobs, solution_graph) class MulticastILPPlanner(Planner): @@ -340,7 +345,7 @@ def __init__( def multicast_solution_to_nxgraph(self, solution: BroadcastSolution) -> nx.DiGraph: """ - Convert ILP solution to BroadcastReplicationTopology + Convert ILP solution to logical plan in nx graph """ v_result = solution.var_instances_per_region result = np.array(solution.var_edge_partitions) @@ -557,7 +562,7 @@ def logical_plan( def plan(self, jobs: List[TransferJob]) -> TopologyPlan: src_region_tag, dst_region_tags = self.verify_job_src_dsts(jobs, multicast=True) solution_graph = self.logical_plan(src_region_tag, dst_region_tags) - return self.logical_plan_to_gateway_programs(jobs, solution_graph) + return self.logical_plan_to_topology_plan(jobs, solution_graph) class MulticastMDSTPlanner(Planner): @@ -586,7 +591,7 @@ def logical_plan(self, src_region: str, dst_regions: List[str]) -> nx.DiGraph: def plan(self, jobs: List[TransferJob]) -> TopologyPlan: src_region_tag, dst_region_tags = self.verify_job_src_dsts(jobs, multicast=True) solution_graph = self.logical_plan(src_region_tag, dst_region_tags) - return self.logical_plan_to_gateway_programs(jobs, solution_graph) + return self.logical_plan_to_topology_plan(jobs, solution_graph) class UnicastDirectPlanner(Planner): @@ -608,7 +613,7 @@ def logical_plan(self, src_region: str, dst_regions: List[str]) -> nx.DiGraph: def plan(self, jobs: List[TransferJob]) -> TopologyPlan: src_region_tag, dst_region_tag = self.verify_job_src_dsts(jobs) solution_graph = self.logical_plan(src_region_tag, dst_region_tag) - return self.logical_plan_to_gateway_programs(jobs, solution_graph) + return self.logical_plan_to_topology_plan(jobs, solution_graph) class UnicastILPPlanner(Planner): @@ -637,7 +642,7 @@ def logical_plan(self, src_region: str, dst_regions: List[str]) -> nx.DiGraph: def plan(self, jobs: List[TransferJob]) -> TopologyPlan: src_region_tag, dst_region_tag = self.verify_job_src_dsts(jobs) solution_graph = self.logical_plan(src_region_tag, dst_region_tag) - return self.logical_plan_to_gateway_programs(jobs, solution_graph) + return self.logical_plan_to_topology_plan(jobs, solution_graph) class UnicastRONSolverPlanner(Planner): From 0617ed1d72480535ccdccdff974dd32522070c7d Mon Sep 17 00:00:00 2001 From: huangwei Date: Mon, 31 Jul 2023 11:36:08 +0800 Subject: [PATCH 05/11] seperate compress and encryption --- skyplane/gateway/gateway_daemon.py | 56 ++++- skyplane/gateway/gateway_program.py | 32 ++- .../gateway/operators/gateway_operator.py | 203 ++++++++++++++++-- .../gateway/operators/gateway_receiver.py | 24 +-- skyplane/planner/planner.py | 54 ++++- 5 files changed, 312 insertions(+), 57 deletions(-) diff --git a/skyplane/gateway/gateway_daemon.py b/skyplane/gateway/gateway_daemon.py index 06ef3e1ad..343bad6ac 100644 --- a/skyplane/gateway/gateway_daemon.py +++ b/skyplane/gateway/gateway_daemon.py @@ -23,6 +23,10 @@ GatewayObjStoreReadOperator, GatewayObjStoreWriteOperator, GatewayWaitReceiver, + GatewayCompress, + GatewayDecompress, + GatewayEncrypt, + GatewayDecrypt, ) from skyplane.gateway.operators.gateway_receiver import GatewayReceiver from skyplane.utils import logger @@ -90,8 +94,6 @@ def __init__( error_queue=self.error_queue, max_pending_chunks=max_incoming_ports, use_tls=self.use_tls, - use_compression=use_compression, - e2ee_key_bytes=self.e2ee_key_bytes, ) # API server @@ -232,8 +234,6 @@ def create_gateway_operators_helper(input_queue, program: List[Dict], partition_ error_queue=self.error_queue, chunk_store=self.chunk_store, use_tls=self.use_tls, - use_compression=op["compress"], - e2ee_key_bytes=self.e2ee_key_bytes, n_processes=op["num_connections"], ) total_p += op["num_connections"] @@ -264,6 +264,54 @@ def create_gateway_operators_helper(input_queue, program: List[Dict], partition_ chunk_store=self.chunk_store, ) total_p += 1 + elif op["op_type"] == "compress": + operators[handle] = GatewayCompress( + handle=handle, + region=self.region, + input_queue=input_queue, + output_queue=output_queue, + error_event=self.error_event, + error_queue=self.error_queue, + chunk_store=self.chunk_store, + use_compression=op["compress"], + ) + total_p += 1 + elif op["op_type"] == "decompress": + operators[handle] = GatewayDecompress( + handle=handle, + region=self.region, + input_queue=input_queue, + output_queue=output_queue, + error_event=self.error_event, + error_queue=self.error_queue, + chunk_store=self.chunk_store, + use_compression=op["compress"], + ) + total_p += 1 + elif op["op_type"] == "encrypt": + operators[handle] = GatewayEncrypt( + handle=handle, + region=self.region, + input_queue=input_queue, + output_queue=output_queue, + error_event=self.error_event, + error_queue=self.error_queue, + chunk_store=self.chunk_store, + e2ee_key_bytes=self.e2ee_key_bytes, + ) + total_p += 1 + elif op["op_type"] == "decrypt": + operators[handle] = GatewayDecrypt( + handle=handle, + region=self.region, + input_queue=input_queue, + output_queue=output_queue, + error_event=self.error_event, + error_queue=self.error_queue, + chunk_store=self.chunk_store, + e2ee_key_bytes=self.e2ee_key_bytes, + ) + total_p += 1 else: raise ValueError(f"Unsupported op_type {op['op_type']}") # recursively create for child operators diff --git a/skyplane/gateway/gateway_program.py b/skyplane/gateway/gateway_program.py index f275b82e8..f2432df95 100644 --- a/skyplane/gateway/gateway_program.py +++ b/skyplane/gateway/gateway_program.py @@ -37,24 +37,18 @@ def __init__( target_gateway_id: str, region: str, num_connections: int = 32, - compress: bool = False, - encrypt: bool = False, private_ip: bool = False, ): super().__init__("send") self.target_gateway_id = target_gateway_id # gateway to send to self.region = region # region to send to self.num_connections = num_connections # default this for now - self.compress = compress - self.encrypt = encrypt self.private_ip = private_ip # whether to send to private or public IP (private for GCP->GCP) class GatewayReceive(GatewayOperator): - def __init__(self, decompress: bool = False, decrypt: bool = False, max_pending_chunks: int = 1000): + def __init__(self, max_pending_chunks: int = 1000): super().__init__("receive") - self.decompress = decompress - self.decrypt = decrypt self.max_pending_chunks = max_pending_chunks @@ -97,6 +91,30 @@ def __init__(self): super().__init__("mux_or") +class GatewayCompress(GatewayOperator): + def __init__(self, compress: bool = False): + super().__init__("compress") + self.compress = compress + + +class GatewayDecompress(GatewayOperator): + def __init__(self, compress: bool = False): + super().__init__("decompress") + self.compress = compress + + +class GatewayEncrypt(GatewayOperator): + def __init__(self, encrypt: bool = False): + super().__init__("encrypt") + self.encrypt = encrypt + + +class GatewayDecrypt(GatewayOperator): + def __init__(self, decrypt: bool = False): + super().__init__("decrypt") + self.decrypt = decrypt + + class GatewayProgram: """ diff --git a/skyplane/gateway/operators/gateway_operator.py b/skyplane/gateway/operators/gateway_operator.py index c574caebc..49417ad48 100644 --- a/skyplane/gateway/operators/gateway_operator.py +++ b/skyplane/gateway/operators/gateway_operator.py @@ -162,15 +162,11 @@ def __init__( chunk_store: ChunkStore, ip_addr: str, use_tls: Optional[bool] = True, - use_compression: Optional[bool] = True, - e2ee_key_bytes: Optional[bytes] = None, n_processes: Optional[int] = 32, ): super().__init__(handle, region, input_queue, output_queue, error_event, error_queue, chunk_store, n_processes) self.ip_addr = ip_addr self.use_tls = use_tls - self.use_compression = use_compression - self.e2ee_key_bytes = e2ee_key_bytes self.args = (ip_addr,) # provider = region.split(":")[0] @@ -179,12 +175,6 @@ def __init__( # elif provider == "azure": # self.n_processes = 24 # due to throttling limits from authentication - # encryption - if e2ee_key_bytes is None: - self.e2ee_secretbox = None - else: - self.e2ee_secretbox = nacl.secret.SecretBox(e2ee_key_bytes) - # SSL context if use_tls: self.ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) @@ -332,14 +322,6 @@ def process(self, chunk_req: ChunkRequest, dst_host: str): raw_wire_length = wire_length compressed_length = None - if self.use_compression: - data = lz4.frame.compress(data) - wire_length = len(data) - compressed_length = wire_length - if self.e2ee_secretbox is not None: - data = self.e2ee_secretbox.encrypt(data) - wire_length = len(data) - # send chunk header header = chunk.to_wire_header( n_chunks_left_on_socket=len(chunk_ids) - idx - 1, @@ -600,3 +582,188 @@ def process(self, chunk_req: ChunkRequest): f"[obj_store:{self.worker_id}] Uploaded {chunk_req.chunk.chunk_id} partition {chunk_req.chunk.part_number} to {self.bucket_name}" ) return True + + +class GatewayCompress(GatewayOperator): + def __init__( + self, + handle: str, + region: str, + input_queue: GatewayQueue, + output_queue: GatewayQueue, + error_event, + error_queue: Queue, + chunk_store: Optional[ChunkStore] = None, + use_compression: Optional[bool] = True, + prefix: Optional[str] = "", + ): + super().__init__( + handle, region, input_queue, output_queue, error_event, error_queue, chunk_store + ) + self.chunk_store = chunk_store + self.use_compression = use_compression + self.prefix = prefix + + def process(self, chunk_req: ChunkRequest): + if not self.use_compression: return True + logger.debug( + f"[{self.handle}:{self.worker_id}] Start upload {chunk_req.chunk.chunk_id} to {self.bucket_name}, key {chunk_req.chunk.dest_key}" + ) + chunk_reqs = [chunk_req] + for idx, chunk_req in enumerate(chunk_reqs): + chunk_id = chunk_req.chunk.chunk_id + chunk = chunk_req.chunk + chunk_file_path = self.chunk_store.get_chunk_file_path(chunk_id) + + with open(chunk_file_path, "rb") as f: + data = f.read() + assert len(data) == chunk.chunk_length_bytes, f"chunk {chunk_id} has size {len(data)} but should be {chunk.chunk_length_bytes}" + + data = lz4.frame.compress(data) + wire_length = len(data) + compressed_length = wire_length + + with open(chunk_file_path, "wb") as f: + data = f.write() + return True + + +class GatewayDecompress(GatewayOperator): + def __init__( + self, + handle: str, + region: str, + input_queue: GatewayQueue, + output_queue: GatewayQueue, + error_event, + error_queue: Queue, + chunk_store: Optional[ChunkStore] = None, + use_compression: Optional[bool] = True, + prefix: Optional[str] = "", + ): + super().__init__( + handle, region, input_queue, output_queue, error_event, error_queue, chunk_store + ) + self.chunk_store = chunk_store + self.use_compression = use_compression + self.prefix = prefix + + def process(self, chunk_req: ChunkRequest): + if not self.use_compression: return True + logger.debug( + f"[{self.handle}:{self.worker_id}] Start upload {chunk_req.chunk.chunk_id} to {self.bucket_name}, key {chunk_req.chunk.dest_key}" + ) + chunk_reqs = [chunk_req] + for idx, chunk_req in enumerate(chunk_reqs): + chunk_id = chunk_req.chunk.chunk_id + chunk = chunk_req.chunk + chunk_file_path = self.chunk_store.get_chunk_file_path(chunk_id) + + with open(chunk_file_path, "rb") as f: + data = f.read() + assert len(data) == chunk.chunk_length_bytes, f"chunk {chunk_id} has size {len(data)} but should be {chunk.chunk_length_bytes}" + + data = lz4.frame.decompress(data) + wire_length = len(data) + compressed_length = wire_length + + with open(chunk_file_path, "wb") as f: + data = f.write() + return True + +class GatewayEncrypt(GatewayOperator): + def __init__( + self, + handle: str, + region: str, + input_queue: GatewayQueue, + output_queue: GatewayQueue, + error_event, + error_queue: Queue, + chunk_store: Optional[ChunkStore] = None, + e2ee_key_bytes: Optional[bytes] = None, + prefix: Optional[str] = "", + ): + super().__init__( + handle, region, input_queue, output_queue, error_event, error_queue, chunk_store + ) + self.chunk_store = chunk_store + self.prefix = prefix + + # encryption + if e2ee_key_bytes is None: + self.e2ee_secretbox = None + else: + self.e2ee_secretbox = nacl.secret.SecretBox(e2ee_key_bytes) + + def process(self, chunk_req: ChunkRequest): + if not self.e2ee_secretbox: return + logger.debug( + f"[{self.handle}:{self.worker_id}] Start upload {chunk_req.chunk.chunk_id} to {self.bucket_name}, key {chunk_req.chunk.dest_key}" + ) + chunk_reqs = [chunk_req] + for idx, chunk_req in enumerate(chunk_reqs): + chunk_id = chunk_req.chunk.chunk_id + chunk = chunk_req.chunk + chunk_file_path = self.chunk_store.get_chunk_file_path(chunk_id) + + with open(chunk_file_path, "rb") as f: + data = f.read() + assert len(data) == chunk.chunk_length_bytes, f"chunk {chunk_id} has size {len(data)} but should be {chunk.chunk_length_bytes}" + + data = self.e2ee_secretbox.encrypt(data) + wire_length = len(data) + encrypted_length = wire_length + + with open(chunk_file_path, "wb") as f: + data = f.write() + return True + + +class GatewayDecrypt(GatewayOperator): + def __init__( + self, + handle: str, + region: str, + input_queue: GatewayQueue, + output_queue: GatewayQueue, + error_event, + error_queue: Queue, + chunk_store: Optional[ChunkStore] = None, + e2ee_key_bytes: Optional[bytes] = None, + prefix: Optional[str] = "", + ): + super().__init__( + handle, region, input_queue, output_queue, error_event, error_queue, chunk_store + ) + self.chunk_store = chunk_store + self.prefix = prefix + + # encryption + if e2ee_key_bytes is None: + self.e2ee_secretbox = None + else: + self.e2ee_secretbox = nacl.secret.SecretBox(e2ee_key_bytes) + + def process(self, chunk_req: ChunkRequest): + if not self.e2ee_secretbox: return + logger.debug( + f"[{self.handle}:{self.worker_id}] Start upload {chunk_req.chunk.chunk_id} to {self.bucket_name}, key {chunk_req.chunk.dest_key}" + ) + chunk_reqs = [chunk_req] + for idx, chunk_req in enumerate(chunk_reqs): + chunk_id = chunk_req.chunk.chunk_id + chunk = chunk_req.chunk + chunk_file_path = self.chunk_store.get_chunk_file_path(chunk_id) + + with open(chunk_file_path, "rb") as f: + data = f.read() + assert len(data) == chunk.chunk_length_bytes, f"chunk {chunk_id} has size {len(data)} but should be {chunk.chunk_length_bytes}" + + data = self.e2ee_secretbox.decrypt(data) + wire_length = len(data) + encrypted_length = wire_length + + with open(chunk_file_path, "wb") as f: + data = f.write() + return True diff --git a/skyplane/gateway/operators/gateway_receiver.py b/skyplane/gateway/operators/gateway_receiver.py index aeed0e341..21d3c8c98 100644 --- a/skyplane/gateway/operators/gateway_receiver.py +++ b/skyplane/gateway/operators/gateway_receiver.py @@ -31,8 +31,6 @@ def __init__( recv_block_size=4 * MB, max_pending_chunks=1, use_tls: Optional[bool] = True, - use_compression: Optional[bool] = True, - e2ee_key_bytes: Optional[bytes] = None, ): self.handle = handle self.region = region @@ -42,11 +40,6 @@ def __init__( self.recv_block_size = recv_block_size self.max_pending_chunks = max_pending_chunks print("Max pending chunks", self.max_pending_chunks) - self.use_compression = use_compression - if e2ee_key_bytes is None: - self.e2ee_secretbox = None - else: - self.e2ee_secretbox = nacl.secret.SecretBox(e2ee_key_bytes) self.server_processes = [] self.server_ports = [] self.next_gateway_worker_id = 0 @@ -153,9 +146,6 @@ def recv_chunks(self, conn: socket.socket, addr: Tuple[str, int]): # TODO: this wont work # chunk_request = self.chunk_store.get_chunk_request(chunk_header.chunk_id) - should_decrypt = self.e2ee_secretbox is not None # and chunk_request.dst_region == self.region - should_decompress = chunk_header.is_compressed # and chunk_request.dst_region == self.region - # wait for space # while self.chunk_store.remaining_bytes() < chunk_header.data_len * self.max_pending_chunks: # print( @@ -171,7 +161,7 @@ def recv_chunks(self, conn: socket.socket, addr: Tuple[str, int]): fpath = self.chunk_store.get_chunk_file_path(chunk_header.chunk_id) with fpath.open("wb") as f: socket_data_len = chunk_header.data_len - chunk_received_size, chunk_received_size_decompressed = 0, 0 + chunk_received_size = 0 to_write = bytearray(socket_data_len) to_write_view = memoryview(to_write) while socket_data_len > 0: @@ -188,18 +178,6 @@ def recv_chunks(self, conn: socket.socket, addr: Tuple[str, int]): ) to_write = bytes(to_write) - if should_decrypt: - to_write = self.e2ee_secretbox.decrypt(to_write) - print(f"[receiver:{server_port}]:{chunk_header.chunk_id} Decrypting {len(to_write)} bytes") - - if should_decompress: - data_batch_decompressed = lz4.frame.decompress(to_write) - chunk_received_size_decompressed += len(data_batch_decompressed) - to_write = data_batch_decompressed - print( - f"[receiver:{server_port}]:{chunk_header.chunk_id} Decompressing {len(to_write)} bytes to {chunk_received_size_decompressed} bytes" - ) - # try to write data until successful while True: try: diff --git a/skyplane/planner/planner.py b/skyplane/planner/planner.py index 6bfb5233a..03a400092 100644 --- a/skyplane/planner/planner.py +++ b/skyplane/planner/planner.py @@ -17,6 +17,10 @@ GatewayWriteObjectStore, GatewayReceive, GatewaySend, + GatewayEncrypt, + GatewayDecrypt, + GatewayCompress, + GatewayDecompress, ) from skyplane.api.transfer_job import TransferJob @@ -239,6 +243,16 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: ) mux_or = src_program.add_operator(GatewayMuxOr(), parent_handle=obj_store_read, partition_id=partition_id) for i in range(n_instances): + compress_op = dst_program[dst_region_tag].add_operator( + GatewayCompress(decompress=self.transfer_config.use_compression), + parent_handle=mux_or, + partition_id=partition_id, + ) + encrypt_op = dst_program[dst_region_tag].add_operator( + GatewayEncrypt(decrypt=self.transfer_config.use_e2ee), + parent_handle=compress_op, + partition_id=partition_id, + ) src_program.add_operator( GatewaySend( target_gateway_id=dst_gateways[i].gateway_id, @@ -247,14 +261,24 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: compress=True, encrypt=True, ), - parent_handle=mux_or, + parent_handle=encrypt_op, partition_id=partition_id, ) # dst region gateway program recv_op = dst_program.add_operator(GatewayReceive(decompress=True, decrypt=True), partition_id=partition_id) + decrypt_op = dst_program[dst_region_tag].add_operator( + GatewayDecrypt(decrypt=self.transfer_config.use_e2ee), + parent_handle=recv_op, + partition_id=partition_id, + ) + decompress_op = dst_program[dst_region_tag].add_operator( + GatewayDecompress(decompress=self.transfer_config.use_compression), + parent_handle=decrypt_op, + partition_id=partition_id, + ) dst_program.add_operator( - GatewayWriteObjectStore(dst_bucket, dst_region_tag, self.n_connections), parent_handle=recv_op, partition_id=partition_id + GatewayWriteObjectStore(dst_bucket, dst_region_tag, self.n_connections), parent_handle=decompress_op, partition_id=partition_id ) # update cost per GB @@ -341,6 +365,16 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: if dst_gateways[i].provider == "gcp" and src_provider == "gcp": # print("Using private IP for GCP to GCP transfer", src_region_tag, dst_region_tag) private_ip = True + compress_op = dst_program[dst_region_tag].add_operator( + GatewayDecompress(decompress=self.transfer_config.use_compression), + parent_handle=mux_or, + partition_id=partition_id, + ) + encrypt_op = dst_program[dst_region_tag].add_operator( + GatewayDecrypt(decrypt=self.transfer_config.use_e2ee), + parent_handle=compress_op, + partition_id=partition_id, + ) src_program.add_operator( GatewaySend( target_gateway_id=dst_gateways[i].gateway_id, @@ -350,18 +384,28 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: compress=self.transfer_config.use_compression, encrypt=self.transfer_config.use_e2ee, ), - parent_handle=mux_or, + parent_handle=encrypt_op, partition_id=partition_id, ) # each gateway also recieves data from source recv_op = dst_program[dst_region_tag].add_operator( - GatewayReceive(decompress=self.transfer_config.use_compression, decrypt=self.transfer_config.use_e2ee), + GatewayReceive(), + partition_id=partition_id, + ) + decrypt_op = dst_program[dst_region_tag].add_operator( + GatewayDecrypt(decrypt=self.transfer_config.use_e2ee), + parent_handle=recv_op, + partition_id=partition_id, + ) + decompress_op = dst_program[dst_region_tag].add_operator( + GatewayDecompress(decompress=self.transfer_config.use_compression), + parent_handle=decrypt_op, partition_id=partition_id, ) dst_program[dst_region_tag].add_operator( GatewayWriteObjectStore(dst_bucket, dst_region_tag, self.n_connections, key_prefix=dst_prefix), - parent_handle=recv_op, + parent_handle=decompress_op, partition_id=partition_id, ) From ce4926bfd8fe36fba27900728b83a6034545eef8 Mon Sep 17 00:00:00 2001 From: huangwei Date: Mon, 31 Jul 2023 15:07:05 +0800 Subject: [PATCH 06/11] clear compress and decrypt from code --- skyplane/api/dataplane.py | 9 +++--- skyplane/chunk.py | 11 ++----- skyplane/compute/server.py | 12 +------ skyplane/gateway/gateway_daemon.py | 31 ++++++------------- skyplane/gateway/gateway_program.py | 6 ++-- .../gateway/operators/gateway_operator.py | 10 +++--- skyplane/planner/planner.py | 17 +++++++--- 7 files changed, 40 insertions(+), 56 deletions(-) diff --git a/skyplane/api/dataplane.py b/skyplane/api/dataplane.py index df6fab67b..568b226df 100644 --- a/skyplane/api/dataplane.py +++ b/skyplane/api/dataplane.py @@ -89,7 +89,6 @@ def _start_gateway( gateway_server: compute.Server, gateway_log_dir: Optional[PathLike], authorize_ssh_pub_key: Optional[str] = None, - e2ee_key_bytes: Optional[str] = None, ): # map outgoing ports setup_args = {} @@ -119,9 +118,7 @@ def _start_gateway( gateway_docker_image=gateway_docker_image, gateway_program_path=str(gateway_program_filename), gateway_info_path=f"{gateway_log_dir}/gateway_info.json", - e2ee_key_bytes=e2ee_key_bytes, # TODO: remove use_bbr=self.transfer_config.use_bbr, # TODO: remove - use_compression=self.transfer_config.use_compression, use_socket_tls=self.transfer_config.use_socket_tls, ) @@ -202,6 +199,10 @@ def provision( # todo: move server.py:start_gateway here logger.fs.info(f"Using docker image {gateway_docker_image}") e2ee_key_bytes = nacl.utils.random(nacl.secret.SecretBox.KEY_SIZE) + # save E2EE keys + e2ee_key_file = "e2ee_key" + with open(f"/tmp/{e2ee_key_file}", 'wb') as f: + f.write(e2ee_key_bytes) # create gateway logging dir gateway_program_dir = f"{self.log_dir}/programs" @@ -218,7 +219,7 @@ def provision( jobs = [] for node, server in gateway_bound_nodes.items(): jobs.append( - partial(self._start_gateway, gateway_docker_image, node, server, gateway_program_dir, authorize_ssh_pub_key, e2ee_key_bytes) + partial(self._start_gateway, gateway_docker_image, node, server, gateway_program_dir, authorize_ssh_pub_key) ) logger.fs.debug(f"[Dataplane.provision] Starting gateways on {len(jobs)} servers") try: diff --git a/skyplane/chunk.py b/skyplane/chunk.py index 8b425a1a1..66bf15b62 100644 --- a/skyplane/chunk.py +++ b/skyplane/chunk.py @@ -26,12 +26,11 @@ class Chunk: part_number: Optional[int] = None upload_id: Optional[str] = None # TODO: for broadcast, this is not used - def to_wire_header(self, n_chunks_left_on_socket: int, wire_length: int, raw_wire_length: int, is_compressed: bool = False): + def to_wire_header(self, n_chunks_left_on_socket: int, wire_length: int, raw_wire_length: int): return WireProtocolHeader( chunk_id=self.chunk_id, data_len=wire_length, raw_data_len=raw_wire_length, - is_compressed=is_compressed, n_chunks_left_on_socket=n_chunks_left_on_socket, ) @@ -99,7 +98,6 @@ class WireProtocolHeader: chunk_id: str # 128bit UUID data_len: int # long raw_data_len: int # long (uncompressed, unecrypted) - is_compressed: bool # char n_chunks_left_on_socket: int # long @staticmethod @@ -115,8 +113,8 @@ def protocol_version(): @staticmethod def length_bytes(): - # magic (8) + protocol_version (4) + chunk_id (16) + data_len (8) + raw_data_len(8) + is_compressed (1) + n_chunks_left_on_socket (8) - return 8 + 4 + 16 + 8 + 8 + 1 + 8 + # magic (8) + protocol_version (4) + chunk_id (16) + data_len (8) + raw_data_len(8) + n_chunks_left_on_socket (8) + return 8 + 4 + 16 + 8 + 8 + 8 @staticmethod def from_bytes(data: bytes): @@ -130,13 +128,11 @@ def from_bytes(data: bytes): chunk_id = data[12:28].hex() chunk_len = int.from_bytes(data[28:36], byteorder="big") raw_chunk_len = int.from_bytes(data[36:44], byteorder="big") - is_compressed = bool(int.from_bytes(data[44:45], byteorder="big")) n_chunks_left_on_socket = int.from_bytes(data[45:53], byteorder="big") return WireProtocolHeader( chunk_id=chunk_id, data_len=chunk_len, raw_data_len=raw_chunk_len, - is_compressed=is_compressed, n_chunks_left_on_socket=n_chunks_left_on_socket, ) @@ -149,7 +145,6 @@ def to_bytes(self): out_bytes += chunk_id_bytes out_bytes += self.data_len.to_bytes(8, byteorder="big") out_bytes += self.raw_data_len.to_bytes(8, byteorder="big") - out_bytes += self.is_compressed.to_bytes(1, byteorder="big") out_bytes += self.n_chunks_left_on_socket.to_bytes(8, byteorder="big") assert len(out_bytes) == WireProtocolHeader.length_bytes(), f"{len(out_bytes)} != {WireProtocolHeader.length_bytes()}" return out_bytes diff --git a/skyplane/compute/server.py b/skyplane/compute/server.py index c4f9aacff..613a2448d 100644 --- a/skyplane/compute/server.py +++ b/skyplane/compute/server.py @@ -292,8 +292,6 @@ def start_gateway( gateway_info_path: str, log_viewer_port=8888, use_bbr=False, - use_compression=False, - e2ee_key_bytes=None, use_socket_tls=False, ): def check_stderr(tup): @@ -341,13 +339,6 @@ def check_stderr(tup): if self.provider == "aws": docker_envs["AWS_DEFAULT_REGION"] = self.region_tag.split(":")[1] - # copy E2EE keys - if e2ee_key_bytes is not None: - e2ee_key_file = "e2ee_key" - self.write_file(e2ee_key_bytes, f"/tmp/{e2ee_key_file}") - docker_envs["E2EE_KEY_FILE"] = f"/pkg/data/{e2ee_key_file}" - docker_run_flags += f" -v /tmp/{e2ee_key_file}:/pkg/data/{e2ee_key_file}" - # upload gateway programs and gateway info gateway_program_file = os.path.basename(gateway_program_path).replace(":", "_") gateway_info_file = os.path.basename(gateway_info_path).replace(":", "_") @@ -362,8 +353,7 @@ def check_stderr(tup): # update docker flags docker_run_flags += " " + " ".join(f"--env {k}={v}" for k, v in docker_envs.items()) - gateway_daemon_cmd += f" --region {self.region_tag} {'--use-compression' if use_compression else ''}" - gateway_daemon_cmd += f" {'--disable-e2ee' if e2ee_key_bytes is None else ''}" + gateway_daemon_cmd += f" --region {self.region_tag}" gateway_daemon_cmd += f" {'--disable-tls' if not use_socket_tls else ''}" escaped_gateway_daemon_cmd = gateway_daemon_cmd.replace('"', '\\"') docker_launch_cmd = ( diff --git a/skyplane/gateway/gateway_daemon.py b/skyplane/gateway/gateway_daemon.py index 343bad6ac..966a5ea2d 100644 --- a/skyplane/gateway/gateway_daemon.py +++ b/skyplane/gateway/gateway_daemon.py @@ -23,10 +23,10 @@ GatewayObjStoreReadOperator, GatewayObjStoreWriteOperator, GatewayWaitReceiver, - GatewayCompress, - GatewayDecompress, - GatewayEncrypt, - GatewayDecrypt, + GatewayCompressor, + GatewayDecompressor, + GatewayEncrypter, + GatewayDecrypter, ) from skyplane.gateway.operators.gateway_receiver import GatewayReceiver from skyplane.utils import logger @@ -42,8 +42,6 @@ def __init__( chunk_dir: PathLike, max_incoming_ports=64, use_tls=True, - use_e2ee=True, # TODO: read from operator field - use_compression=True, # TODO: read from operator field ): # read gateway program gateway_program_path = Path(os.environ["GATEWAY_PROGRAM_FILE"]).expanduser() @@ -72,13 +70,6 @@ def __init__( self.error_event = Event() self.error_queue = Queue() - if use_e2ee: - e2ee_key_path = Path(os.environ["E2EE_KEY_FILE"]).expanduser() - with open(e2ee_key_path, "rb") as f: - self.e2ee_key_bytes = f.read() - print("Server side E2EE key loaded: ", self.e2ee_key_bytes) - else: - self.e2ee_key_bytes = None # create gateway operators self.terminal_operators = defaultdict(list) # track terminal operators per partition @@ -265,7 +256,7 @@ def create_gateway_operators_helper(input_queue, program: List[Dict], partition_ ) total_p += 1 elif op["op_type"] == "compress": - operators[handle] = GatewayCompress( + operators[handle] = GatewayCompressor( handle=handle, region=self.region, input_queue=input_queue, @@ -277,7 +268,7 @@ def create_gateway_operators_helper(input_queue, program: List[Dict], partition_ ) total_p += 1 elif op["op_type"] == "decompress": - operators[handle] = GatewayDecompress( + operators[handle] = GatewayDecompressor( handle=handle, region=self.region, input_queue=input_queue, @@ -289,7 +280,7 @@ def create_gateway_operators_helper(input_queue, program: List[Dict], partition_ ) total_p += 1 elif op["op_type"] == "encrypt": - operators[handle] = GatewayEncrypt( + operators[handle] = GatewayEncrypter( handle=handle, region=self.region, input_queue=input_queue, @@ -297,11 +288,11 @@ def create_gateway_operators_helper(input_queue, program: List[Dict], partition_ error_event=self.error_event, error_queue=self.error_queue, chunk_store=self.chunk_store, - e2ee_key_bytes=self.e2ee_key_bytes, + e2ee_key_bytes=op["e2ee_key_bytes"], ) total_p += 1 elif op["op_type"] == "decrypt": - operators[handle] = GatewayDecrypt( + operators[handle] = GatewayDecrypter( handle=handle, region=self.region, input_queue=input_queue, @@ -309,7 +300,7 @@ def create_gateway_operators_helper(input_queue, program: List[Dict], partition_ error_event=self.error_event, error_queue=self.error_queue, chunk_store=self.chunk_store, - e2ee_key_bytes=self.e2ee_key_bytes, + e2ee_key_bytes=op["e2ee_key_bytes"], ) total_p += 1 else: @@ -394,8 +385,6 @@ def exit_handler(signum, frame): parser.add_argument("--region", type=str, required=True, help="Region tag (provider:region") parser.add_argument("--chunk-dir", type=Path, default="/tmp/skyplane/chunks", help="Directory to store chunks") parser.add_argument("--disable-tls", action="store_true") - parser.add_argument("--use-compression", action="store_true") # TODO: remove - parser.add_argument("--disable-e2ee", action="store_true") # TODO: remove args = parser.parse_args() os.makedirs(args.chunk_dir) diff --git a/skyplane/gateway/gateway_program.py b/skyplane/gateway/gateway_program.py index f2432df95..2ef43c0cb 100644 --- a/skyplane/gateway/gateway_program.py +++ b/skyplane/gateway/gateway_program.py @@ -104,15 +104,17 @@ def __init__(self, compress: bool = False): class GatewayEncrypt(GatewayOperator): - def __init__(self, encrypt: bool = False): + def __init__(self, encrypt: bool = False, e2ee_key_bytes: Optional[str] = None): super().__init__("encrypt") self.encrypt = encrypt + self.e2ee_key_bytes = e2ee_key_bytes class GatewayDecrypt(GatewayOperator): - def __init__(self, decrypt: bool = False): + def __init__(self, decrypt: bool = False, e2ee_key_bytes: Optional[str] = None): super().__init__("decrypt") self.decrypt = decrypt + self.e2ee_key_bytes = e2ee_key_bytes class GatewayProgram: diff --git a/skyplane/gateway/operators/gateway_operator.py b/skyplane/gateway/operators/gateway_operator.py index 49417ad48..e02cb647a 100644 --- a/skyplane/gateway/operators/gateway_operator.py +++ b/skyplane/gateway/operators/gateway_operator.py @@ -320,14 +320,12 @@ def process(self, chunk_req: ChunkRequest, dst_host: str): wire_length = len(data) raw_wire_length = wire_length - compressed_length = None # send chunk header header = chunk.to_wire_header( n_chunks_left_on_socket=len(chunk_ids) - idx - 1, wire_length=wire_length, raw_wire_length=raw_wire_length, - is_compressed=(compressed_length is not None), ) # print(f"[sender-{self.worker_id}]:{chunk_id} sending chunk header {header}") header.to_socket(sock) @@ -584,7 +582,7 @@ def process(self, chunk_req: ChunkRequest): return True -class GatewayCompress(GatewayOperator): +class GatewayCompressor(GatewayOperator): def __init__( self, handle: str, @@ -628,7 +626,7 @@ def process(self, chunk_req: ChunkRequest): return True -class GatewayDecompress(GatewayOperator): +class GatewayDecompressor(GatewayOperator): def __init__( self, handle: str, @@ -671,7 +669,7 @@ def process(self, chunk_req: ChunkRequest): data = f.write() return True -class GatewayEncrypt(GatewayOperator): +class GatewayEncrypter(GatewayOperator): def __init__( self, handle: str, @@ -720,7 +718,7 @@ def process(self, chunk_req: ChunkRequest): return True -class GatewayDecrypt(GatewayOperator): +class GatewayDecrypter(GatewayOperator): def __init__( self, handle: str, diff --git a/skyplane/planner/planner.py b/skyplane/planner/planner.py index 03a400092..c8ce9158e 100644 --- a/skyplane/planner/planner.py +++ b/skyplane/planner/planner.py @@ -1,5 +1,6 @@ from collections import defaultdict from importlib.resources import path +from pathlib import Path from typing import Dict, List, Optional, Tuple, Tuple import os import csv @@ -230,6 +231,10 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: src_program = GatewayProgram() dst_program = GatewayProgram() + e2ee_key_file = "e2ee_key" + with open(e2ee_key_file, "rb") as f: + self.e2ee_key_bytes = f.read() + for job in jobs: src_bucket = job.src_iface.bucket() dst_bucket = job.dst_ifaces[0].bucket() @@ -249,7 +254,7 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: partition_id=partition_id, ) encrypt_op = dst_program[dst_region_tag].add_operator( - GatewayEncrypt(decrypt=self.transfer_config.use_e2ee), + GatewayEncrypt(encrypt=self.transfer_config.use_e2ee, e2ee_key_bytes=self.e2ee_key_bytes), parent_handle=compress_op, partition_id=partition_id, ) @@ -268,7 +273,7 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: # dst region gateway program recv_op = dst_program.add_operator(GatewayReceive(decompress=True, decrypt=True), partition_id=partition_id) decrypt_op = dst_program[dst_region_tag].add_operator( - GatewayDecrypt(decrypt=self.transfer_config.use_e2ee), + GatewayDecrypt(decrypt=self.transfer_config.use_e2ee, e2ee_key_bytes=self.e2ee_key_bytes), parent_handle=recv_op, partition_id=partition_id, ) @@ -325,6 +330,10 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: dst_program = {dst_region: GatewayProgram() for dst_region in dst_region_tags} src_program = GatewayProgram() + e2ee_key_file = "e2ee_key" + with open(f"/tmp/{e2ee_key_file}", 'rb') as f: + self.e2ee_key_bytes = f.read() + # iterate through all jobs for job in jobs: src_bucket = job.src_iface.bucket() @@ -366,12 +375,12 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: # print("Using private IP for GCP to GCP transfer", src_region_tag, dst_region_tag) private_ip = True compress_op = dst_program[dst_region_tag].add_operator( - GatewayDecompress(decompress=self.transfer_config.use_compression), + GatewayCompress(decompress=self.transfer_config.use_compression), parent_handle=mux_or, partition_id=partition_id, ) encrypt_op = dst_program[dst_region_tag].add_operator( - GatewayDecrypt(decrypt=self.transfer_config.use_e2ee), + GatewayEncrypt(encrypt=self.transfer_config.use_e2ee, e2ee_key_bytes=self.e2ee_key_bytes), parent_handle=compress_op, partition_id=partition_id, ) From b097ecc234ff410c588658552e1872f0782ed3b1 Mon Sep 17 00:00:00 2001 From: huangwei Date: Mon, 31 Jul 2023 16:50:58 +0800 Subject: [PATCH 07/11] fix operator arguments --- skyplane/planner/planner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/skyplane/planner/planner.py b/skyplane/planner/planner.py index c8ce9158e..3f1f2ec33 100644 --- a/skyplane/planner/planner.py +++ b/skyplane/planner/planner.py @@ -249,7 +249,7 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: mux_or = src_program.add_operator(GatewayMuxOr(), parent_handle=obj_store_read, partition_id=partition_id) for i in range(n_instances): compress_op = dst_program[dst_region_tag].add_operator( - GatewayCompress(decompress=self.transfer_config.use_compression), + GatewayCompress(compress=self.transfer_config.use_compression), parent_handle=mux_or, partition_id=partition_id, ) @@ -375,7 +375,7 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: # print("Using private IP for GCP to GCP transfer", src_region_tag, dst_region_tag) private_ip = True compress_op = dst_program[dst_region_tag].add_operator( - GatewayCompress(decompress=self.transfer_config.use_compression), + GatewayCompress(compress=self.transfer_config.use_compression), parent_handle=mux_or, partition_id=partition_id, ) From dbbc10f77eb914798d0a1dedf35e04e66191c63f Mon Sep 17 00:00:00 2001 From: "Huang, Wei" Date: Mon, 31 Jul 2023 19:21:46 +0800 Subject: [PATCH 08/11] fix assertion of compression and encryption --- skyplane/gateway/operators/gateway_operator.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/skyplane/gateway/operators/gateway_operator.py b/skyplane/gateway/operators/gateway_operator.py index e02cb647a..958befdb0 100644 --- a/skyplane/gateway/operators/gateway_operator.py +++ b/skyplane/gateway/operators/gateway_operator.py @@ -615,7 +615,6 @@ def process(self, chunk_req: ChunkRequest): with open(chunk_file_path, "rb") as f: data = f.read() - assert len(data) == chunk.chunk_length_bytes, f"chunk {chunk_id} has size {len(data)} but should be {chunk.chunk_length_bytes}" data = lz4.frame.compress(data) wire_length = len(data) @@ -659,7 +658,6 @@ def process(self, chunk_req: ChunkRequest): with open(chunk_file_path, "rb") as f: data = f.read() - assert len(data) == chunk.chunk_length_bytes, f"chunk {chunk_id} has size {len(data)} but should be {chunk.chunk_length_bytes}" data = lz4.frame.decompress(data) wire_length = len(data) @@ -707,7 +705,6 @@ def process(self, chunk_req: ChunkRequest): with open(chunk_file_path, "rb") as f: data = f.read() - assert len(data) == chunk.chunk_length_bytes, f"chunk {chunk_id} has size {len(data)} but should be {chunk.chunk_length_bytes}" data = self.e2ee_secretbox.encrypt(data) wire_length = len(data) @@ -756,7 +753,6 @@ def process(self, chunk_req: ChunkRequest): with open(chunk_file_path, "rb") as f: data = f.read() - assert len(data) == chunk.chunk_length_bytes, f"chunk {chunk_id} has size {len(data)} but should be {chunk.chunk_length_bytes}" data = self.e2ee_secretbox.decrypt(data) wire_length = len(data) From 7399a0fc8720e208572ec52ac999f3efb3ea4076 Mon Sep 17 00:00:00 2001 From: huangwei Date: Fri, 4 Aug 2023 11:06:20 +0800 Subject: [PATCH 09/11] [fix] #907, fail to request, add retry --- skyplane/compute/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/skyplane/compute/server.py b/skyplane/compute/server.py index 613a2448d..3caf49fb8 100644 --- a/skyplane/compute/server.py +++ b/skyplane/compute/server.py @@ -371,7 +371,7 @@ def check_stderr(tup): logger.fs.debug(f"{self.uuid()} gateway_api_url = {self.gateway_api_url}") # wait for gateways to start (check status API) - http_pool = urllib3.PoolManager() + http_pool = urllib3.PoolManager(retries=urllib3.Retry(total=10)) def is_api_ready(): try: From ae9a42200b8a87893007b7d4269c2f21b2679e8a Mon Sep 17 00:00:00 2001 From: huangwei Date: Fri, 4 Aug 2023 18:31:18 +0800 Subject: [PATCH 10/11] [fix] BrokenpipeError in socket --- skyplane/gateway/operators/gateway_operator.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/skyplane/gateway/operators/gateway_operator.py b/skyplane/gateway/operators/gateway_operator.py index 958befdb0..967d3da45 100644 --- a/skyplane/gateway/operators/gateway_operator.py +++ b/skyplane/gateway/operators/gateway_operator.py @@ -90,7 +90,14 @@ def worker_loop(self, worker_id: int, *args): # TODO: status logging self.chunk_store.log_chunk_state(chunk_req, ChunkState.in_progress, operator_handle=self.handle, worker_id=worker_id) # process chunk - succ = self.process(chunk_req, *args) + succ = retry_backoff( + partial( + self.process, + chunk_req, + *args + ), + max_retries=1, + ) # place in output queue if succ: From a99ea0f7b9c4d43db67f90301ef1ddc09fe98996 Mon Sep 17 00:00:00 2001 From: huangwei Date: Fri, 4 Aug 2023 18:31:53 +0800 Subject: [PATCH 11/11] fix typo of GatewayCompress --- skyplane/planner/planner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/skyplane/planner/planner.py b/skyplane/planner/planner.py index ac69f8d3c..9ce19329e 100644 --- a/skyplane/planner/planner.py +++ b/skyplane/planner/planner.py @@ -471,7 +471,7 @@ def add_dst_operator( decrypt_op = GatewayDecrypt(decrypt=self.transfer_config.use_e2ee, e2ee_key_bytes=e2ee_key_bytes) gateway_program.add_operator(op=decrypt_op, parent_handle=receive_op.handle, partition_id=tuple(partition_ids)) - decompress_op = GatewayCompress(compress=self.transfer_config.use_compression) + decompress_op = GatewayDecompress(compress=self.transfer_config.use_compression) gateway_program.add_operator(op=decompress_op, parent_handle=decrypt_op.handle, partition_id=tuple(partition_ids))