diff --git a/poetry.lock b/poetry.lock index 3d2615d26..889ef581a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1800,6 +1800,18 @@ requests-oauthlib = ">=0.5.0" [package.extras] async = ["aiodns", "aiohttp (>=3.0)"] +[[package]] +name = "networkx" +version = "2.6.3" +description = "Python package for creating and manipulating graphs and networks" +category = "main" +optional = true +python-versions = ">=3.7" +files = [ + {file = "networkx-2.6.3-py3-none-any.whl", hash = "sha256:80b6b89c77d1dfb64a4c7854981b60aeea6360ac02c6d4e4913319e0a313abef"}, + {file = "networkx-2.6.3.tar.gz", hash = "sha256:c0946ed31d71f1b732b5aaa6da5a0388a345019af232ce2f49c766e2d6795c51"}, +] + [[package]] name = "mypy-extensions" version = "1.0.0" @@ -2903,7 +2915,7 @@ azure = ["azure-identity", "azure-mgmt-authorization", "azure-mgmt-compute", "az gateway = ["flask", "lz4", "pynacl", "pyopenssl", "werkzeug"] gcp = ["google-api-python-client", "google-auth", "google-cloud-compute", "google-cloud-storage"] ibm = ["ibm-cloud-sdk-core", "ibm-cos-sdk", "ibm-vpc"] -solver = ["cvxpy", "graphviz", "matplotlib", "numpy"] +solver = ["networkx", "cvxpy", "graphviz", "matplotlib", "numpy"] [metadata] lock-version = "2.0" diff --git a/pyproject.toml b/pyproject.toml index 00ee8be99..82e75cf52 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,6 +54,7 @@ cvxpy = { version = ">=1.1.0", optional = true } graphviz = { version = ">=0.15", optional = true } matplotlib = { version = ">=3.0.0", optional = true } numpy = { version = ">=1.19.0", optional = true } +networkx = { version = ">=2.5", optional = true } # gateway dependencies flask = { version = "^2.1.2", optional = true } @@ -70,7 +71,7 @@ gcp = ["google-api-python-client", "google-auth", "google-cloud-compute", "googl ibm = ["ibm-cloud-sdk-core", "ibm-cos-sdk", "ibm-vpc"] all = ["boto3", "azure-identity", "azure-mgmt-authorization", "azure-mgmt-compute", "azure-mgmt-network", "azure-mgmt-resource", "azure-mgmt-storage", "azure-mgmt-subscription", "azure-storage-blob", "google-api-python-client", "google-auth", "google-cloud-compute", "google-cloud-storage", "ibm-cloud-sdk-core", "ibm-cos-sdk", "ibm-vpc"] gateway = ["flask", "lz4", "pynacl", "pyopenssl", "werkzeug"] -solver = ["cvxpy", "graphviz", "matplotlib", "numpy"] +solver = ["networkx", "cvxpy", "graphviz", "matplotlib", "numpy"] [tool.poetry.dev-dependencies] pytest = ">=6.0.0" diff --git a/scripts/requirements-gateway.txt b/scripts/requirements-gateway.txt index 687f3aeeb..be1e298a1 100644 --- a/scripts/requirements-gateway.txt +++ b/scripts/requirements-gateway.txt @@ -33,3 +33,4 @@ numpy pandas pyarrow typer +networkx \ No newline at end of file diff --git a/skyplane/api/dataplane.py b/skyplane/api/dataplane.py index c8253a922..e1ca5e797 100644 --- a/skyplane/api/dataplane.py +++ b/skyplane/api/dataplane.py @@ -8,6 +8,7 @@ import nacl.secret import nacl.utils +import typer import urllib3 from pathlib import Path from typing import TYPE_CHECKING, Dict, List, Optional @@ -89,7 +90,6 @@ def _start_gateway( gateway_server: compute.Server, gateway_log_dir: Optional[PathLike], authorize_ssh_pub_key: Optional[str] = None, - e2ee_key_bytes: Optional[str] = None, ): # map outgoing ports setup_args = {} @@ -119,9 +119,7 @@ def _start_gateway( gateway_docker_image=gateway_docker_image, gateway_program_path=str(gateway_program_filename), gateway_info_path=f"{gateway_log_dir}/gateway_info.json", - e2ee_key_bytes=e2ee_key_bytes, # TODO: remove use_bbr=self.transfer_config.use_bbr, # TODO: remove - use_compression=self.transfer_config.use_compression, use_socket_tls=self.transfer_config.use_socket_tls, ) @@ -202,6 +200,10 @@ def provision( # todo: move server.py:start_gateway here logger.fs.info(f"Using docker image {gateway_docker_image}") e2ee_key_bytes = nacl.utils.random(nacl.secret.SecretBox.KEY_SIZE) + # save E2EE keys + e2ee_key_file = "e2ee_key" + with open(f"/tmp/{e2ee_key_file}", 'wb') as f: + f.write(e2ee_key_bytes) # create gateway logging dir gateway_program_dir = f"{self.log_dir}/programs" @@ -218,7 +220,7 @@ def provision( jobs = [] for node, server in gateway_bound_nodes.items(): jobs.append( - partial(self._start_gateway, gateway_docker_image, node, server, gateway_program_dir, authorize_ssh_pub_key, e2ee_key_bytes) + partial(self._start_gateway, gateway_docker_image, node, server, gateway_program_dir, authorize_ssh_pub_key) ) logger.fs.debug(f"[Dataplane.provision] Starting gateways on {len(jobs)} servers") try: diff --git a/skyplane/api/pipeline.py b/skyplane/api/pipeline.py index 6fa3face4..4744f7bf5 100644 --- a/skyplane/api/pipeline.py +++ b/skyplane/api/pipeline.py @@ -10,7 +10,15 @@ from skyplane.api.transfer_job import CopyJob, SyncJob, TransferJob from skyplane.api.config import TransferConfig -from skyplane.planner.planner import MulticastDirectPlanner, DirectPlannerSourceOneSided, DirectPlannerDestOneSided +from skyplane.planner.planner import ( + MulticastDirectPlanner, + DirectPlannerSourceOneSided, + DirectPlannerDestOneSided, + UnicastDirectPlanner, + UnicastILPPlanner, + MulticastILPPlanner, + MulticastMDSTPlanner, +) from skyplane.planner.topology import TopologyPlanGateway from skyplane.utils import logger from skyplane.utils.definitions import tmp_log_dir @@ -61,12 +69,23 @@ def __init__( # planner self.planning_algorithm = planning_algorithm + if self.planning_algorithm == "direct": - self.planner = MulticastDirectPlanner(self.max_instances, self.n_connections, self.transfer_config) + self.planner = MulticastDirectPlanner(self.transfer_config, self.max_instances, self.n_connections) elif self.planning_algorithm == "src_one_sided": - self.planner = DirectPlannerSourceOneSided(self.max_instances, self.n_connections, self.transfer_config) + self.planner = DirectPlannerSourceOneSided(self.transfer_config, self.max_instances, self.n_connections) elif self.planning_algorithm == "dst_one_sided": - self.planner = DirectPlannerDestOneSided(self.max_instances, self.n_connections, self.transfer_config) + self.planner = DirectPlannerDestOneSided(self.transfer_config, self.max_instances, self.n_connections) + # TODO: should find some ways to merge direct / Ndirect + self.planner = UnicastDirectPlanner(self.transfer_config, self.max_instances, self.n_connections) + elif self.planning_algorithm == "multi_direct": + self.planner = MulticastDirectPlanner(self.transfer_config, self.max_instances, self.n_connections) + elif self.planning_algorithm == "multi_dst": + self.planner = MulticastMDSTPlanner(self.transfer_config, self.max_instances, self.n_connections) + elif self.planning_algorithm == "multi_ilp": + self.planning_algorithm = MulticastILPPlanner(self.transfer_config, self.max_instances, self.n_connections) + elif self.planning_algorithm == "uni_ilp": + self.planning_algorithm = UnicastILPPlanner(self.transfer_config, self.max_instances, self.n_connections) else: raise ValueError(f"No such planning algorithm {planning_algorithm}") @@ -185,3 +204,4 @@ def estimate_total_cost(self): # return size return total_size * topo.cost_per_gb + diff --git a/skyplane/api/transfer_job.py b/skyplane/api/transfer_job.py index 0155b7c17..8a7424f84 100644 --- a/skyplane/api/transfer_job.py +++ b/skyplane/api/transfer_job.py @@ -102,6 +102,7 @@ def _run_multipart_chunk_thread( src_object = transfer_pair.src_obj dest_objects = transfer_pair.dst_objs dest_key = transfer_pair.dst_key + print("dest_key: ", dest_key) if isinstance(self.src_iface, ObjectStoreInterface): mime_type = self.src_iface.get_obj_mime_type(src_object.key) # create multipart upload request per destination @@ -283,10 +284,10 @@ def transfer_pair_generator( dest_provider, dest_region = dst_iface.region_tag().split(":") try: dest_key = self.map_object_key_prefix(src_prefix, obj.key, dst_prefix, recursive=recursive) - assert ( - dest_key[: len(dst_prefix)] == dst_prefix - ), f"Destination key {dest_key} does not start with destination prefix {dst_prefix}" - dest_keys.append(dest_key[len(dst_prefix) :]) + # TODO: why is it changed here? + # dest_keys.append(dest_key[len(dst_prefix) :]) + + dest_keys.append(dest_key) except exceptions.MissingObjectException as e: logger.fs.exception(e) raise e from None @@ -508,8 +509,12 @@ def dst_prefixes(self) -> List[str]: if not hasattr(self, "_dst_prefix"): if self.transfer_type == "unicast": self._dst_prefix = [str(parse_path(self.dst_paths[0])[2])] + print("return dst_prefixes for unicast", self._dst_prefix) else: + for path in self.dst_paths: + print("Parsing result for multicast", parse_path(path)) self._dst_prefix = [str(parse_path(path)[2]) for path in self.dst_paths] + print("return dst_prefixes for multicast", self._dst_prefix) return self._dst_prefix @property diff --git a/skyplane/api/usage.py b/skyplane/api/usage.py index f2e17482a..465bbe10e 100644 --- a/skyplane/api/usage.py +++ b/skyplane/api/usage.py @@ -11,7 +11,7 @@ import requests from rich import print as rprint -from typing import Optional, Dict, List +from typing import Optional, Dict import skyplane from skyplane.utils.definitions import tmp_log_dir diff --git a/skyplane/chunk.py b/skyplane/chunk.py index 8b425a1a1..66bf15b62 100644 --- a/skyplane/chunk.py +++ b/skyplane/chunk.py @@ -26,12 +26,11 @@ class Chunk: part_number: Optional[int] = None upload_id: Optional[str] = None # TODO: for broadcast, this is not used - def to_wire_header(self, n_chunks_left_on_socket: int, wire_length: int, raw_wire_length: int, is_compressed: bool = False): + def to_wire_header(self, n_chunks_left_on_socket: int, wire_length: int, raw_wire_length: int): return WireProtocolHeader( chunk_id=self.chunk_id, data_len=wire_length, raw_data_len=raw_wire_length, - is_compressed=is_compressed, n_chunks_left_on_socket=n_chunks_left_on_socket, ) @@ -99,7 +98,6 @@ class WireProtocolHeader: chunk_id: str # 128bit UUID data_len: int # long raw_data_len: int # long (uncompressed, unecrypted) - is_compressed: bool # char n_chunks_left_on_socket: int # long @staticmethod @@ -115,8 +113,8 @@ def protocol_version(): @staticmethod def length_bytes(): - # magic (8) + protocol_version (4) + chunk_id (16) + data_len (8) + raw_data_len(8) + is_compressed (1) + n_chunks_left_on_socket (8) - return 8 + 4 + 16 + 8 + 8 + 1 + 8 + # magic (8) + protocol_version (4) + chunk_id (16) + data_len (8) + raw_data_len(8) + n_chunks_left_on_socket (8) + return 8 + 4 + 16 + 8 + 8 + 8 @staticmethod def from_bytes(data: bytes): @@ -130,13 +128,11 @@ def from_bytes(data: bytes): chunk_id = data[12:28].hex() chunk_len = int.from_bytes(data[28:36], byteorder="big") raw_chunk_len = int.from_bytes(data[36:44], byteorder="big") - is_compressed = bool(int.from_bytes(data[44:45], byteorder="big")) n_chunks_left_on_socket = int.from_bytes(data[45:53], byteorder="big") return WireProtocolHeader( chunk_id=chunk_id, data_len=chunk_len, raw_data_len=raw_chunk_len, - is_compressed=is_compressed, n_chunks_left_on_socket=n_chunks_left_on_socket, ) @@ -149,7 +145,6 @@ def to_bytes(self): out_bytes += chunk_id_bytes out_bytes += self.data_len.to_bytes(8, byteorder="big") out_bytes += self.raw_data_len.to_bytes(8, byteorder="big") - out_bytes += self.is_compressed.to_bytes(1, byteorder="big") out_bytes += self.n_chunks_left_on_socket.to_bytes(8, byteorder="big") assert len(out_bytes) == WireProtocolHeader.length_bytes(), f"{len(out_bytes)} != {WireProtocolHeader.length_bytes()}" return out_bytes diff --git a/skyplane/compute/server.py b/skyplane/compute/server.py index 585773105..fade94e90 100644 --- a/skyplane/compute/server.py +++ b/skyplane/compute/server.py @@ -289,8 +289,6 @@ def start_gateway( gateway_info_path: str, log_viewer_port=8888, use_bbr=False, - use_compression=False, - e2ee_key_bytes=None, use_socket_tls=False, ): def check_stderr(tup): @@ -338,13 +336,6 @@ def check_stderr(tup): if self.provider == "aws": docker_envs["AWS_DEFAULT_REGION"] = self.region_tag.split(":")[1] - # copy E2EE keys - if e2ee_key_bytes is not None: - e2ee_key_file = "e2ee_key" - self.write_file(e2ee_key_bytes, f"/tmp/{e2ee_key_file}") - docker_envs["E2EE_KEY_FILE"] = f"/pkg/data/{e2ee_key_file}" - docker_run_flags += f" -v /tmp/{e2ee_key_file}:/pkg/data/{e2ee_key_file}" - # upload gateway programs and gateway info gateway_program_file = os.path.basename(gateway_program_path).replace(":", "_") gateway_info_file = os.path.basename(gateway_info_path).replace(":", "_") @@ -359,8 +350,7 @@ def check_stderr(tup): # update docker flags docker_run_flags += " " + " ".join(f"--env {k}={v}" for k, v in docker_envs.items()) - gateway_daemon_cmd += f" --region {self.region_tag} {'--use-compression' if use_compression else ''}" - gateway_daemon_cmd += f" {'--disable-e2ee' if e2ee_key_bytes is None else ''}" + gateway_daemon_cmd += f" --region {self.region_tag}" gateway_daemon_cmd += f" {'--disable-tls' if not use_socket_tls else ''}" escaped_gateway_daemon_cmd = gateway_daemon_cmd.replace('"', '\\"') docker_launch_cmd = ( @@ -378,7 +368,7 @@ def check_stderr(tup): logger.fs.debug(f"{self.uuid()} gateway_api_url = {self.gateway_api_url}") # wait for gateways to start (check status API) - http_pool = urllib3.PoolManager() + http_pool = urllib3.PoolManager(retries=urllib3.Retry(total=10)) def is_api_ready(): try: diff --git a/skyplane/gateway/gateway_daemon.py b/skyplane/gateway/gateway_daemon.py index 08e6562df..058ad037f 100644 --- a/skyplane/gateway/gateway_daemon.py +++ b/skyplane/gateway/gateway_daemon.py @@ -23,6 +23,10 @@ GatewayObjStoreReadOperator, GatewayObjStoreWriteOperator, GatewayWaitReceiver, + GatewayCompressor, + GatewayDecompressor, + GatewayEncrypter, + GatewayDecrypter, ) from skyplane.gateway.operators.gateway_receiver import GatewayReceiver from skyplane.utils import logger @@ -38,8 +42,6 @@ def __init__( chunk_dir: PathLike, max_incoming_ports=64, use_tls=True, - use_e2ee=True, # TODO: read from operator field - use_compression=True, # TODO: read from operator field ): # read gateway program gateway_program_path = Path(os.environ["GATEWAY_PROGRAM_FILE"]).expanduser() @@ -68,13 +70,6 @@ def __init__( self.error_event = Event() self.error_queue = Queue() - if use_e2ee: - e2ee_key_path = Path(os.environ["E2EE_KEY_FILE"]).expanduser() - with open(e2ee_key_path, "rb") as f: - self.e2ee_key_bytes = f.read() - print("Server side E2EE key loaded: ", self.e2ee_key_bytes) - else: - self.e2ee_key_bytes = None # create gateway operators self.terminal_operators = defaultdict(list) # track terminal operators per partition @@ -90,8 +85,6 @@ def __init__( error_queue=self.error_queue, max_pending_chunks=max_incoming_ports, use_tls=self.use_tls, - use_compression=use_compression, - e2ee_key_bytes=self.e2ee_key_bytes, ) # API server @@ -232,8 +225,6 @@ def create_gateway_operators_helper(input_queue, program: List[Dict], partition_ error_queue=self.error_queue, chunk_store=self.chunk_store, use_tls=self.use_tls, - use_compression=op["compress"], - e2ee_key_bytes=self.e2ee_key_bytes, n_processes=op["num_connections"], ) total_p += op["num_connections"] @@ -264,6 +255,54 @@ def create_gateway_operators_helper(input_queue, program: List[Dict], partition_ chunk_store=self.chunk_store, ) total_p += 1 + elif op["op_type"] == "compress": + operators[handle] = GatewayCompressor( + handle=handle, + region=self.region, + input_queue=input_queue, + output_queue=output_queue, + error_event=self.error_event, + error_queue=self.error_queue, + chunk_store=self.chunk_store, + use_compression=op["compress"], + ) + total_p += 1 + elif op["op_type"] == "decompress": + operators[handle] = GatewayDecompressor( + handle=handle, + region=self.region, + input_queue=input_queue, + output_queue=output_queue, + error_event=self.error_event, + error_queue=self.error_queue, + chunk_store=self.chunk_store, + use_compression=op["compress"], + ) + total_p += 1 + elif op["op_type"] == "encrypt": + operators[handle] = GatewayEncrypter( + handle=handle, + region=self.region, + input_queue=input_queue, + output_queue=output_queue, + error_event=self.error_event, + error_queue=self.error_queue, + chunk_store=self.chunk_store, + e2ee_key_bytes=op["e2ee_key_bytes"], + ) + total_p += 1 + elif op["op_type"] == "decrypt": + operators[handle] = GatewayDecrypter( + handle=handle, + region=self.region, + input_queue=input_queue, + output_queue=output_queue, + error_event=self.error_event, + error_queue=self.error_queue, + chunk_store=self.chunk_store, + e2ee_key_bytes=op["e2ee_key_bytes"], + ) + total_p += 1 else: raise ValueError(f"Unsupported op_type {op['op_type']}") # recursively create for child operators @@ -346,8 +385,6 @@ def exit_handler(signum, frame): parser.add_argument("--region", type=str, required=True, help="Region tag (provider:region") parser.add_argument("--chunk-dir", type=Path, default="/tmp/skyplane/chunks", help="Directory to store chunks") parser.add_argument("--disable-tls", action="store_true") - parser.add_argument("--use-compression", action="store_true") # TODO: remove - parser.add_argument("--disable-e2ee", action="store_true") # TODO: remove args = parser.parse_args() os.makedirs(args.chunk_dir) diff --git a/skyplane/gateway/gateway_program.py b/skyplane/gateway/gateway_program.py index c27427fd9..674396ff6 100644 --- a/skyplane/gateway/gateway_program.py +++ b/skyplane/gateway/gateway_program.py @@ -37,24 +37,18 @@ def __init__( target_gateway_id: str, region: str, num_connections: int = 32, - compress: bool = False, - encrypt: bool = False, private_ip: bool = False, ): super().__init__("send") self.target_gateway_id = target_gateway_id # gateway to send to self.region = region # region to send to self.num_connections = num_connections # default this for now - self.compress = compress - self.encrypt = encrypt self.private_ip = private_ip # whether to send to private or public IP (private for GCP->GCP) class GatewayReceive(GatewayOperator): - def __init__(self, decompress: bool = False, decrypt: bool = False, max_pending_chunks: int = 1000): + def __init__(self, max_pending_chunks: int = 1000): super().__init__("receive") - self.decompress = decompress - self.decrypt = decrypt self.max_pending_chunks = max_pending_chunks @@ -97,6 +91,32 @@ def __init__(self): super().__init__("mux_or") +class GatewayCompress(GatewayOperator): + def __init__(self, compress: bool = False): + super().__init__("compress") + self.compress = compress + + +class GatewayDecompress(GatewayOperator): + def __init__(self, compress: bool = False): + super().__init__("decompress") + self.compress = compress + + +class GatewayEncrypt(GatewayOperator): + def __init__(self, encrypt: bool = False, e2ee_key_bytes: Optional[str] = None): + super().__init__("encrypt") + self.encrypt = encrypt + self.e2ee_key_bytes = e2ee_key_bytes + + +class GatewayDecrypt(GatewayOperator): + def __init__(self, decrypt: bool = False, e2ee_key_bytes: Optional[str] = None): + super().__init__("decrypt") + self.decrypt = decrypt + self.e2ee_key_bytes = e2ee_key_bytes + + class GatewayProgram: """ @@ -117,7 +137,7 @@ def add_operators(self, ops: List[GatewayOperator], parent_handle: Optional[str] parent_op = self._ops[parent_handle] if parent_handle else None ops_handles = [] for op in ops: - ops_handles.append(self.add_operator(op, parent_op, partition_id)) + ops_handles.append(self.add_operator(op, parent_op.handle, partition_id)) return ops_handles @@ -138,6 +158,8 @@ def to_dict(self): """ program_all = [] for partition_id, op_list in self._plan.items(): + partition_id = list(partition_id) # convert tuple to list + # build gateway program representation program = [] for op in op_list: @@ -147,11 +169,12 @@ def to_dict(self): exists = False for p in program_all: if p["value"] == program: # equivalent partition exists - p["partitions"].append(partition_id) + for pid in partition_id: + p["partitions"].append(pid) exists = True break if not exists: - program_all.append({"value": program, "partitions": [partition_id]}) + program_all.append({"value": program, "partitions": partition_id}) return program_all diff --git a/skyplane/gateway/operators/gateway_operator.py b/skyplane/gateway/operators/gateway_operator.py index c574caebc..967d3da45 100644 --- a/skyplane/gateway/operators/gateway_operator.py +++ b/skyplane/gateway/operators/gateway_operator.py @@ -90,7 +90,14 @@ def worker_loop(self, worker_id: int, *args): # TODO: status logging self.chunk_store.log_chunk_state(chunk_req, ChunkState.in_progress, operator_handle=self.handle, worker_id=worker_id) # process chunk - succ = self.process(chunk_req, *args) + succ = retry_backoff( + partial( + self.process, + chunk_req, + *args + ), + max_retries=1, + ) # place in output queue if succ: @@ -162,15 +169,11 @@ def __init__( chunk_store: ChunkStore, ip_addr: str, use_tls: Optional[bool] = True, - use_compression: Optional[bool] = True, - e2ee_key_bytes: Optional[bytes] = None, n_processes: Optional[int] = 32, ): super().__init__(handle, region, input_queue, output_queue, error_event, error_queue, chunk_store, n_processes) self.ip_addr = ip_addr self.use_tls = use_tls - self.use_compression = use_compression - self.e2ee_key_bytes = e2ee_key_bytes self.args = (ip_addr,) # provider = region.split(":")[0] @@ -179,12 +182,6 @@ def __init__( # elif provider == "azure": # self.n_processes = 24 # due to throttling limits from authentication - # encryption - if e2ee_key_bytes is None: - self.e2ee_secretbox = None - else: - self.e2ee_secretbox = nacl.secret.SecretBox(e2ee_key_bytes) - # SSL context if use_tls: self.ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) @@ -330,22 +327,12 @@ def process(self, chunk_req: ChunkRequest, dst_host: str): wire_length = len(data) raw_wire_length = wire_length - compressed_length = None - - if self.use_compression: - data = lz4.frame.compress(data) - wire_length = len(data) - compressed_length = wire_length - if self.e2ee_secretbox is not None: - data = self.e2ee_secretbox.encrypt(data) - wire_length = len(data) # send chunk header header = chunk.to_wire_header( n_chunks_left_on_socket=len(chunk_ids) - idx - 1, wire_length=wire_length, raw_wire_length=raw_wire_length, - is_compressed=(compressed_length is not None), ) # print(f"[sender-{self.worker_id}]:{chunk_id} sending chunk header {header}") header.to_socket(sock) @@ -600,3 +587,184 @@ def process(self, chunk_req: ChunkRequest): f"[obj_store:{self.worker_id}] Uploaded {chunk_req.chunk.chunk_id} partition {chunk_req.chunk.part_number} to {self.bucket_name}" ) return True + + +class GatewayCompressor(GatewayOperator): + def __init__( + self, + handle: str, + region: str, + input_queue: GatewayQueue, + output_queue: GatewayQueue, + error_event, + error_queue: Queue, + chunk_store: Optional[ChunkStore] = None, + use_compression: Optional[bool] = True, + prefix: Optional[str] = "", + ): + super().__init__( + handle, region, input_queue, output_queue, error_event, error_queue, chunk_store + ) + self.chunk_store = chunk_store + self.use_compression = use_compression + self.prefix = prefix + + def process(self, chunk_req: ChunkRequest): + if not self.use_compression: return True + logger.debug( + f"[{self.handle}:{self.worker_id}] Start upload {chunk_req.chunk.chunk_id} to {self.bucket_name}, key {chunk_req.chunk.dest_key}" + ) + chunk_reqs = [chunk_req] + for idx, chunk_req in enumerate(chunk_reqs): + chunk_id = chunk_req.chunk.chunk_id + chunk = chunk_req.chunk + chunk_file_path = self.chunk_store.get_chunk_file_path(chunk_id) + + with open(chunk_file_path, "rb") as f: + data = f.read() + + data = lz4.frame.compress(data) + wire_length = len(data) + compressed_length = wire_length + + with open(chunk_file_path, "wb") as f: + data = f.write() + return True + + +class GatewayDecompressor(GatewayOperator): + def __init__( + self, + handle: str, + region: str, + input_queue: GatewayQueue, + output_queue: GatewayQueue, + error_event, + error_queue: Queue, + chunk_store: Optional[ChunkStore] = None, + use_compression: Optional[bool] = True, + prefix: Optional[str] = "", + ): + super().__init__( + handle, region, input_queue, output_queue, error_event, error_queue, chunk_store + ) + self.chunk_store = chunk_store + self.use_compression = use_compression + self.prefix = prefix + + def process(self, chunk_req: ChunkRequest): + if not self.use_compression: return True + logger.debug( + f"[{self.handle}:{self.worker_id}] Start upload {chunk_req.chunk.chunk_id} to {self.bucket_name}, key {chunk_req.chunk.dest_key}" + ) + chunk_reqs = [chunk_req] + for idx, chunk_req in enumerate(chunk_reqs): + chunk_id = chunk_req.chunk.chunk_id + chunk = chunk_req.chunk + chunk_file_path = self.chunk_store.get_chunk_file_path(chunk_id) + + with open(chunk_file_path, "rb") as f: + data = f.read() + + data = lz4.frame.decompress(data) + wire_length = len(data) + compressed_length = wire_length + + with open(chunk_file_path, "wb") as f: + data = f.write() + return True + +class GatewayEncrypter(GatewayOperator): + def __init__( + self, + handle: str, + region: str, + input_queue: GatewayQueue, + output_queue: GatewayQueue, + error_event, + error_queue: Queue, + chunk_store: Optional[ChunkStore] = None, + e2ee_key_bytes: Optional[bytes] = None, + prefix: Optional[str] = "", + ): + super().__init__( + handle, region, input_queue, output_queue, error_event, error_queue, chunk_store + ) + self.chunk_store = chunk_store + self.prefix = prefix + + # encryption + if e2ee_key_bytes is None: + self.e2ee_secretbox = None + else: + self.e2ee_secretbox = nacl.secret.SecretBox(e2ee_key_bytes) + + def process(self, chunk_req: ChunkRequest): + if not self.e2ee_secretbox: return + logger.debug( + f"[{self.handle}:{self.worker_id}] Start upload {chunk_req.chunk.chunk_id} to {self.bucket_name}, key {chunk_req.chunk.dest_key}" + ) + chunk_reqs = [chunk_req] + for idx, chunk_req in enumerate(chunk_reqs): + chunk_id = chunk_req.chunk.chunk_id + chunk = chunk_req.chunk + chunk_file_path = self.chunk_store.get_chunk_file_path(chunk_id) + + with open(chunk_file_path, "rb") as f: + data = f.read() + + data = self.e2ee_secretbox.encrypt(data) + wire_length = len(data) + encrypted_length = wire_length + + with open(chunk_file_path, "wb") as f: + data = f.write() + return True + + +class GatewayDecrypter(GatewayOperator): + def __init__( + self, + handle: str, + region: str, + input_queue: GatewayQueue, + output_queue: GatewayQueue, + error_event, + error_queue: Queue, + chunk_store: Optional[ChunkStore] = None, + e2ee_key_bytes: Optional[bytes] = None, + prefix: Optional[str] = "", + ): + super().__init__( + handle, region, input_queue, output_queue, error_event, error_queue, chunk_store + ) + self.chunk_store = chunk_store + self.prefix = prefix + + # encryption + if e2ee_key_bytes is None: + self.e2ee_secretbox = None + else: + self.e2ee_secretbox = nacl.secret.SecretBox(e2ee_key_bytes) + + def process(self, chunk_req: ChunkRequest): + if not self.e2ee_secretbox: return + logger.debug( + f"[{self.handle}:{self.worker_id}] Start upload {chunk_req.chunk.chunk_id} to {self.bucket_name}, key {chunk_req.chunk.dest_key}" + ) + chunk_reqs = [chunk_req] + for idx, chunk_req in enumerate(chunk_reqs): + chunk_id = chunk_req.chunk.chunk_id + chunk = chunk_req.chunk + chunk_file_path = self.chunk_store.get_chunk_file_path(chunk_id) + + with open(chunk_file_path, "rb") as f: + data = f.read() + + data = self.e2ee_secretbox.decrypt(data) + wire_length = len(data) + encrypted_length = wire_length + + with open(chunk_file_path, "wb") as f: + data = f.write() + return True diff --git a/skyplane/gateway/operators/gateway_receiver.py b/skyplane/gateway/operators/gateway_receiver.py index aeed0e341..21d3c8c98 100644 --- a/skyplane/gateway/operators/gateway_receiver.py +++ b/skyplane/gateway/operators/gateway_receiver.py @@ -31,8 +31,6 @@ def __init__( recv_block_size=4 * MB, max_pending_chunks=1, use_tls: Optional[bool] = True, - use_compression: Optional[bool] = True, - e2ee_key_bytes: Optional[bytes] = None, ): self.handle = handle self.region = region @@ -42,11 +40,6 @@ def __init__( self.recv_block_size = recv_block_size self.max_pending_chunks = max_pending_chunks print("Max pending chunks", self.max_pending_chunks) - self.use_compression = use_compression - if e2ee_key_bytes is None: - self.e2ee_secretbox = None - else: - self.e2ee_secretbox = nacl.secret.SecretBox(e2ee_key_bytes) self.server_processes = [] self.server_ports = [] self.next_gateway_worker_id = 0 @@ -153,9 +146,6 @@ def recv_chunks(self, conn: socket.socket, addr: Tuple[str, int]): # TODO: this wont work # chunk_request = self.chunk_store.get_chunk_request(chunk_header.chunk_id) - should_decrypt = self.e2ee_secretbox is not None # and chunk_request.dst_region == self.region - should_decompress = chunk_header.is_compressed # and chunk_request.dst_region == self.region - # wait for space # while self.chunk_store.remaining_bytes() < chunk_header.data_len * self.max_pending_chunks: # print( @@ -171,7 +161,7 @@ def recv_chunks(self, conn: socket.socket, addr: Tuple[str, int]): fpath = self.chunk_store.get_chunk_file_path(chunk_header.chunk_id) with fpath.open("wb") as f: socket_data_len = chunk_header.data_len - chunk_received_size, chunk_received_size_decompressed = 0, 0 + chunk_received_size = 0 to_write = bytearray(socket_data_len) to_write_view = memoryview(to_write) while socket_data_len > 0: @@ -188,18 +178,6 @@ def recv_chunks(self, conn: socket.socket, addr: Tuple[str, int]): ) to_write = bytes(to_write) - if should_decrypt: - to_write = self.e2ee_secretbox.decrypt(to_write) - print(f"[receiver:{server_port}]:{chunk_header.chunk_id} Decrypting {len(to_write)} bytes") - - if should_decompress: - data_batch_decompressed = lz4.frame.decompress(to_write) - chunk_received_size_decompressed += len(data_batch_decompressed) - to_write = data_batch_decompressed - print( - f"[receiver:{server_port}]:{chunk_header.chunk_id} Decompressing {len(to_write)} bytes to {chunk_received_size_decompressed} bytes" - ) - # try to write data until successful while True: try: diff --git a/skyplane/obj_store/s3_interface.py b/skyplane/obj_store/s3_interface.py index f502cd77e..3ba6a9ee6 100644 --- a/skyplane/obj_store/s3_interface.py +++ b/skyplane/obj_store/s3_interface.py @@ -199,6 +199,8 @@ def upload_object( ): dst_object_name, src_file_path = str(dst_object_name), str(src_file_path) s3_client = self._s3_client() + print("Destination object name: ", dst_object_name) + print("Source file path: ", src_file_path) assert len(dst_object_name) > 0, f"Destination object name must be non-empty: '{dst_object_name}'" b64_md5sum = base64.b64encode(check_md5).decode("utf-8") if check_md5 else None checksum_args = dict(ContentMD5=b64_md5sum) if b64_md5sum else dict() diff --git a/skyplane/planner/planner.py b/skyplane/planner/planner.py index 2bb21e0ac..89e3db6cb 100644 --- a/skyplane/planner/planner.py +++ b/skyplane/planner/planner.py @@ -1,13 +1,16 @@ from collections import defaultdict -from importlib.resources import path -from typing import Dict, List, Optional, Tuple, Tuple import os import csv - -from skyplane import compute from skyplane.api.config import TransferConfig from skyplane.utils import logger - +from importlib.resources import path +from typing import Dict, List, Optional, Tuple +import numpy as np +import collections +from skyplane.planner.solver import ThroughputProblem, BroadcastProblem, \ + BroadcastSolution, GBIT_PER_GBYTE, ThroughputSolution, ThroughputSolver +from skyplane.planner.solver_ilp import ThroughputSolverILP +from skyplane import compute from skyplane.planner.topology import TopologyPlan from skyplane.gateway.gateway_program import ( GatewayProgram, @@ -17,21 +20,41 @@ GatewayWriteObjectStore, GatewayReceive, GatewaySend, + GatewayCompress, + GatewayDecompress, + GatewayEncrypt, + GatewayDecrypt ) - +import networkx as nx +import math from skyplane.api.transfer_job import TransferJob import json +import functools from skyplane.utils.fn import do_parallel from skyplane.config_paths import config_path, azure_standardDv5_quota_path, aws_quota_path, gcp_quota_path from skyplane.config import SkyplaneConfig +from pathlib import Path +from random import sample +import matplotlib.pyplot as plt class Planner: - def __init__(self, transfer_config: TransferConfig, quota_limits_file: Optional[str] = None): + def __init__(self, + transfer_config: TransferConfig, + n_instances:int, + n_connections: int, + n_partitions: int = 1, + quota_limits_file: Optional[str] = None, + tp_grid_path: Optional[Path] = Path("skyplane.data", "throughput.csv")): self.transfer_config = transfer_config self.config = SkyplaneConfig.load_config(config_path) - self.n_instances = self.config.get_flag("max_instances") + self.n_instances = n_instances + self.n_connections = n_connections + self.n_partitions = n_partitions + self.solution_graph = None + self.regions = None + self.tp_grid_path = tp_grid_path # Loading the quota information, add ibm cloud when it is supported quota_limits = {} @@ -63,7 +86,8 @@ def __init__(self, transfer_config: TransferConfig, quota_limits_file: Optional[ vcpu_cost = int(vcpu_cost) self.vcpu_info[cloud_provider][instance_name] = vcpu_cost - def plan(self) -> TopologyPlan: + def plan(self, jobs: List[TransferJob]) -> TopologyPlan: + # create physical plan in TopologyPlan format raise NotImplementedError def _vm_to_vcpus(self, cloud_provider: str, vm: str) -> int: @@ -116,7 +140,8 @@ def _calculate_vm_types(self, region_tag: str) -> Optional[Tuple[str, int]]: # Get the quota limit quota_limit = self._get_quota_limits_for( - cloud_provider=cloud_provider, region=region, spot=getattr(self.transfer_config, f"{cloud_provider}_use_spot_instances") + cloud_provider=cloud_provider, region=region, + spot=getattr(self.transfer_config, f"{cloud_provider}_use_spot_instances") ) config_vm_type = getattr(self.transfer_config, f"{cloud_provider}_instance_class") @@ -134,7 +159,8 @@ def _calculate_vm_types(self, region_tag: str) -> Optional[Tuple[str, int]]: return config_vm_type, quota_limit // config_vcpus vm_type, vcpus = None, None - for instance_name, vcpu_cost in sorted(self.vcpu_info[cloud_provider].items(), key=lambda x: x[1], reverse=True): + for instance_name, vcpu_cost in sorted(self.vcpu_info[cloud_provider].items(), key=lambda x: x[1], + reverse=True): if vcpu_cost <= quota_limit: vm_type, vcpus = instance_name, vcpu_cost break @@ -152,7 +178,7 @@ def _calculate_vm_types(self, region_tag: str) -> Optional[Tuple[str, int]]: return (vm_type, n_instances) def _get_vm_type_and_instances( - self, src_region_tag: Optional[str] = None, dst_region_tags: Optional[List[str]] = None + self, src_region_tag: Optional[str] = None, dst_region_tags: Optional[List[str]] = None ) -> Tuple[Dict[str, str], int]: """Dynamically calculates the vm type each region can use (both the source region and all destination regions) based on their quota limits and calculates the number of vms to launch in all regions by conservatively @@ -170,10 +196,11 @@ def _get_vm_type_and_instances( dst_tags = dst_region_tags if dst_region_tags is not None else [] if src_region_tag: - assert len(src_region_tag.split(":")) == 2, f"Source region tag {src_region_tag} must be in the form of `cloud_provider:region`" + assert len(src_region_tag.split( + ":")) == 2, f"Source region tag {src_region_tag} must be in the form of `cloud_provider:region`" if dst_region_tags: assert ( - len(dst_region_tags[0].split(":")) == 2 + len(dst_region_tags[0].split(":")) == 2 ), f"Destination region tag {dst_region_tags} must be in the form of `cloud_provider:region`" # do_parallel returns tuples of (region_tag, (vm_type, n_instances)) @@ -184,25 +211,14 @@ def _get_vm_type_and_instances( n_instances = min([self.n_instances] + [v[1][1] for v in vm_info]) # type: ignore return vm_types, n_instances - -class UnicastDirectPlanner(Planner): - # DO NOT USE THIS - broken for single-region transfers - def __init__(self, n_instances: int, n_connections: int, transfer_config: TransferConfig, quota_limits_file: Optional[str] = None): - super().__init__(transfer_config, quota_limits_file) - self.n_instances = n_instances - self.n_connections = n_connections - - def plan(self, jobs: List[TransferJob]) -> TopologyPlan: - # make sure only single destination - for job in jobs: - assert len(job.dst_ifaces) == 1, f"DirectPlanner only support single destination jobs, got {len(job.dst_ifaces)}" - + def verify_job_src_dsts(self, jobs: List[TransferJob], multicast=False) -> Tuple[str, List[str]]: src_region_tag = jobs[0].src_iface.region_tag() dst_region_tag = jobs[0].dst_ifaces[0].region_tag() - assert len(src_region_tag.split(":")) == 2, f"Source region tag {src_region_tag} must be in the form of `cloud_provider:region`" + assert len(src_region_tag.split( + ":")) == 2, f"Source region tag {src_region_tag} must be in the form of `cloud_provider:region`" assert ( - len(dst_region_tag.split(":")) == 2 + len(dst_region_tag.split(":")) == 2 ), f"Destination region tag {dst_region_tag} must be in the form of `cloud_provider:region`" # jobs must have same sources and destinations @@ -213,65 +229,380 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: plan = TopologyPlan(src_region_tag=src_region_tag, dest_region_tags=[dst_region_tag]) # Dynammically calculate n_instances based on quota limits - vm_types, n_instances = self._get_vm_type_and_instances(src_region_tag=src_region_tag, dst_region_tags=[dst_region_tag]) + vm_types, n_instances = self._get_vm_type_and_instances(src_region_tag=src_region_tag, + dst_region_tags=[dst_region_tag]) # TODO: support on-sided transfers but not requiring VMs to be created in source/destination regions for i in range(n_instances): plan.add_gateway(src_region_tag, vm_types[src_region_tag]) plan.add_gateway(dst_region_tag, vm_types[dst_region_tag]) - # ids of gateways in dst region - dst_gateways = plan.get_region_gateways(dst_region_tag) - - src_program = GatewayProgram() - dst_program = GatewayProgram() - - for job in jobs: - src_bucket = job.src_iface.bucket() - dst_bucket = job.dst_ifaces[0].bucket() - - # give each job a different partition id, so we can read/write to different buckets - partition_id = job.uuid + if multicast: + # multicast checking + dst_region_tags = [iface.region_tag() for iface in jobs[0].dst_ifaces] - # source region gateway program - obj_store_read = src_program.add_operator( - GatewayReadObjectStore(src_bucket, src_region_tag, self.n_connections), partition_id=partition_id + # jobs must have same sources and destinations + for job in jobs[1:]: + assert job.src_iface.region_tag() == src_region_tag, "All jobs must have same source region" + assert [iface.region_tag() for iface in + job.dst_ifaces] == dst_region_tags, "Add jobs must have same destination set" + else: + # unicast checking + for job in jobs: + assert len( + job.dst_ifaces) == 1, f"DirectPlanner only support single destination jobs, got {len(job.dst_ifaces)}" + + # jobs must have same sources and destinations + dst_region_tag = jobs[0].dst_ifaces[0].region_tag() + for job in jobs[1:]: + assert job.src_iface.region_tag() == src_region_tag, "All jobs must have same source region" + assert job.dst_ifaces[0].region_tag() == dst_region_tag, "All jobs must have same destination region" + dst_region_tags = [dst_region_tag] + + return src_region_tag, dst_region_tags + + @functools.lru_cache(maxsize=None) + def make_nx_graph(self) -> nx.DiGraph: + # create throughput / cost graph for all regions for planner + G = nx.DiGraph() + dataSolver = ThroughputSolver(self.tp_grid_path) + self.regions = dataSolver.get_regions() + for i, src in enumerate(self.regions): + for j, dst in enumerate(self.regions): + if i != j: + throughput_grid = dataSolver.get_path_throughput(src, dst) if dataSolver.get_path_throughput(src,dst) is not None else 0 + G.add_edge(self.regions[i], self.regions[j], cost=None, throughput=throughput_grid / 1e9) + + for edge in G.edges.data(): + if edge[-1]["cost"] is None: + edge[-1]["cost"] = dataSolver.get_path_cost(edge[0], edge[1]) + + assert all([edge[-1]["cost"] is not None for edge in G.edges.data()]) + return G + + def add_src_operator(self, + solution_graph: nx.DiGraph, + gateway_program: GatewayProgram, + region: str, + partition_ids: List[int], + plan: TopologyPlan, + bucket_info: Optional[Tuple[str, str]] = None, + dst_op: Optional[GatewayReceive] = None, + ) -> bool: + return self.add_src_or_overlay_operator( solution_graph, gateway_program, region, partition_ids, plan, bucket_info, dst_op, is_src=True) + + def add_overlay_operator(self, + solution_graph: nx.DiGraph, + gateway_program: GatewayProgram, + region: str, + partition_ids: List[int], + plan: TopologyPlan, + bucket_info: Optional[Tuple[str, str]] = None, + dst_op: Optional[GatewayReceive] = None, + ) -> bool: + return self.add_src_or_overlay_operator( solution_graph, gateway_program, region, partition_ids, plan, bucket_info, dst_op, is_src=False) + + + def add_src_or_overlay_operator( + self, + solution_graph: nx.DiGraph, + gateway_program: GatewayProgram, + region: str, + partition_ids: List[int], + plan: TopologyPlan, + bucket_info: Optional[Tuple[str, str]] = None, + dst_op: Optional[GatewayReceive] = None, + is_src: bool = False + ) -> bool: + """ + :param solution_graph: nx.DiGraph of solution + :param gateway_program: GatewayProgram of region to add operator to + :param region: region to add operator to + :param partition_ids: list of partition ids to add operator to + :param partition_offset: offset of partition ids + :param plan: TopologyPlan of solution [for getting gateway ids] + :param bucket_info: tuple of (bucket_name, bucket_region) for object store + :param dst_op: if None, then this is either the source node or a overlay node; otherwise, this is the destination overlay node + """ + # partition_ids are set of ids that follow the same path from the out edges of the region + # any_id = partition_ids[0] - partition_offset + any_id = partition_ids[0] + # g = self.make_nx_graph() + next_regions = set( + [edge[1] for edge in solution_graph.out_edges(region, data=True) if str(any_id) in edge[-1]["partitions"]]) + + # if partition_ids does not have a next region, then we cannot add an operator + if len(next_regions) == 0: + print( + f"Region {region}, any id: {any_id}, partition ids: {partition_ids}, has no next region to forward data to: {solution_graph.out_edges(region, data=True)}" ) - mux_or = src_program.add_operator(GatewayMuxOr(), parent_handle=obj_store_read, partition_id=partition_id) - for i in range(n_instances): - src_program.add_operator( - GatewaySend( - target_gateway_id=dst_gateways[i].gateway_id, - region=src_region_tag, - num_connections=self.n_connections, - compress=True, - encrypt=True, - ), - parent_handle=mux_or, - partition_id=partition_id, + return False + + # identify if this is a destination overlay node or not + if dst_op is None: + # source node or overlay node + # TODO: add generate data locally operator + if bucket_info is None: + receive_op = GatewayReceive() + else: + receive_op = GatewayReadObjectStore( + bucket_name=bucket_info[0], bucket_region=bucket_info[1], num_connections=self.n_connections ) - - # dst region gateway program - recv_op = dst_program.add_operator(GatewayReceive(decompress=True, decrypt=True), partition_id=partition_id) - dst_program.add_operator( - GatewayWriteObjectStore(dst_bucket, dst_region_tag, self.n_connections), parent_handle=recv_op, partition_id=partition_id + else: + # destination overlay node, dst_op is the parent node + receive_op = dst_op + + # find set of regions to send to for all partitions in partition_ids + region_to_id_map = {} + for next_region in next_regions: + region_to_id_map[next_region] = [] + for i in range(solution_graph.nodes[next_region]["num_vms"]): + region_to_id_map[next_region].append(plan.get_region_gateways(next_region)[i].gateway_id) + + # use muxand or muxor for partition_id + operation = "MUX_AND" if len(next_regions) > 1 else "MUX_OR" + mux_op = GatewayMuxAnd() if len(next_regions) > 1 else GatewayMuxOr() + + # non-dst node: add receive_op into gateway program + if dst_op is None: + gateway_program.add_operator(op=receive_op, partition_id=tuple(partition_ids)) + + # MUX_AND: send this partition to multiple regions + if operation == "MUX_AND": + if is_src and self.transfer_config.use_e2ee: + e2ee_key_file = "e2ee_key" + with open(f"/tmp/{e2ee_key_file}", 'rb') as f: + e2ee_key_bytes = f.read() + + if dst_op is not None and dst_op.op_type == "mux_and": + mux_op = receive_op + else: # do not add any nested mux_and if dst_op parent is mux_and + gateway_program.add_operator(op=mux_op, parent_handle=receive_op.handle, + partition_id=tuple(partition_ids)) + + for next_region, next_region_ids in region_to_id_map.items(): + send_ops = [ + GatewaySend(target_gateway_id=id, region=next_region, num_connections=self.n_connections) for id in + next_region_ids + ] + + # if there is more than one region to forward data to, add MUX_OR + if len(next_region_ids) > 1: + mux_or_op = GatewayMuxOr() + gateway_program.add_operator(op=mux_or_op, parent_handle=mux_op.handle, + partition_id=tuple(partition_ids)) + if is_src: + for id in next_region_ids: + compress_op = GatewayCompress(compress=self.transfer_config.use_compression) + gateway_program.add_operator(op=compress_op, parent_handle=mux_or_op.handle, + partition_id=tuple(partition_ids)) + encrypt_op = GatewayEncrypt(encrypt=self.transfer_config.use_e2ee, e2ee_key_bytes=e2ee_key_bytes) + gateway_program.add_operator(op=encrypt_op, parent_handle=encrypt_op.handle, + partition_id=tuple(partition_ids)) + send_op = GatewaySend(target_gateway_id=id, region=next_region, num_connections=self.n_connections) + gateway_program.add_operator(ops=send_op, parent_handle=encrypt_op.handle, + partition_id=tuple(partition_ids)) + else: + # otherwise, the parent of send_op is mux_op ("MUX_AND") + assert len(send_ops) == 1 + if is_src: + compress_op = GatewayCompress(compress=self.transfer_config.use_compression) + gateway_program.add_operator(op=compress_op, parent_handle=mux_op.handle, + partition_id=tuple(partition_ids)) + encrypt_op = GatewayEncrypt(encrypt=self.transfer_config.use_e2ee, e2ee_key_bytes=e2ee_key_bytes) + gateway_program.add_operator(op=encrypt_op, parent_handle=compress_op.handle, + partition_id=tuple(partition_ids)) + + gateway_program.add_operator(op=send_ops[0], parent_handle=encrypt_op.handle if is_src else mux_op.handle, + partition_id=tuple(partition_ids)) + else: + # only send this partition to a single region + assert len(region_to_id_map) == 1 + next_region = list(region_to_id_map.keys())[0] + ids = [id for next_region_ids in region_to_id_map.values() for id in next_region_ids] + send_ops = [GatewaySend(target_gateway_id=id, region=next_region, num_connections=self.n_connections) for id + in + ids] + + # if num of gateways > 1, then connect to MUX_OR + if len(ids) > 1: + gateway_program.add_operator(op=mux_op, parent_handle=receive_op.handle, + partition_id=tuple(partition_ids)) + gateway_program.add_operators(ops=send_ops, parent_handle=mux_op.handle) + else: + gateway_program.add_operators(ops=send_ops, parent_handle=receive_op.handle, + partition_id=tuple(partition_ids)) + + return True + + def add_dst_operator( + self, + solution_graph, + gateway_program: GatewayProgram, + region: str, + partition_ids: List[int], + partition_offset: int, + plan: TopologyPlan, + obj_store: Tuple[str, str] = None, + ): + # operator that receives data + receive_op = GatewayReceive() + gateway_program.add_operator(receive_op, partition_id=tuple(partition_ids)) + + # operator that writes to the object store + write_op = GatewayWriteObjectStore(bucket_name=obj_store[0], bucket_region=obj_store[1], + num_connections=self.n_connections) + + g = solution_graph + + # partition_ids are ids that follow the same path from the out edges of the region + # any_id = partition_ids[0] - partition_offset + any_id = partition_ids[0] + next_regions = set( + [edge[1] for edge in g.out_edges(region, data=True) if str(any_id) in edge[-1]["partitions"]]) + + # if no regions to forward data to, write to the object store + if len(next_regions) == 0: + if self.transfer_config.use_e2ee: + e2ee_key_file = "e2ee_key" + with open(f"/tmp/{e2ee_key_file}", 'rb') as f: + e2ee_key_bytes = f.read() + + decrypt_op = GatewayDecrypt(decrypt=self.transfer_config.use_e2ee, e2ee_key_bytes=e2ee_key_bytes) + gateway_program.add_operator(op=decrypt_op, parent_handle=receive_op.handle, + partition_id=tuple(partition_ids)) + decompress_op = GatewayDecompress(compress=self.transfer_config.use_compression) + gateway_program.add_operator(op=decompress_op, parent_handle=decrypt_op.handle, + partition_id=tuple(partition_ids)) + + gateway_program.add_operator(write_op, parent_handle=decompress_op.handle, partition_id=tuple(partition_ids)) + + # otherwise, receive and write to the object store, then forward data to next regions + else: + mux_and_op = GatewayMuxAnd() + # receive and write + gateway_program.add_operator(mux_and_op, parent_handle=receive_op.handle, partition_id=tuple(partition_ids)) + gateway_program.add_operator(write_op, parent_handle=mux_and_op.handle, partition_id=tuple(partition_ids)) + + # forward: destination nodes are also forwarders + self.add_src_or_overlay_operator( + solution_graph, gateway_program, region, partition_ids, partition_offset, plan, dst_op=mux_and_op, is_src=False ) - # update cost per GB - plan.cost_per_gb += compute.CloudProvider.get_transfer_cost(src_region_tag, dst_region_tag) - - # set gateway programs - plan.set_gateway_program(src_region_tag, src_program) - plan.set_gateway_program(dst_region_tag, dst_program) + def logical_plan_to_topology_plan(self, jobs: List[TransferJob], solution_graph: nx.graph) -> TopologyPlan: + """ + Given a logical plan, construct a gateway program for each region in the logical plan for the given jobs. + """ + # get source and destination regions + src_ifaces, dst_ifaces = [job.src_iface for job in jobs], [job.dst_ifaces for job in jobs] + src_region_tag = src_ifaces[0].region_tag() + dst_region_tags = [dst_iface.region_tag() for dst_iface in dst_ifaces[0]] + + # map from the node to the gateway program + region_to_gateway_program = {region: GatewayProgram() for region in solution_graph.nodes} + + # construct TopologyPlan for all the regions in solution_graph + overlay_region_tags = [node for node in solution_graph.nodes if + node != src_region_tag and node not in dst_region_tags] + plan = TopologyPlan(src_region_tag=src_region_tag, dest_region_tags=dst_region_tags, + overlay_region_tags=overlay_region_tags) + + for node in solution_graph.nodes: + for i in range(solution_graph.nodes[node]["num_vms"]): + plan.add_gateway(node) + + # iterate through all the jobs + for i in range(len(src_ifaces)): + src_bucket = src_ifaces[i].bucket() + dst_buckets = {dst_iface[i].region_tag(): dst_iface[i].bucket() for dst_iface in dst_ifaces} + + # iterate through all the regions in the solution graph + for node in solution_graph.nodes: + node_gateway_program = region_to_gateway_program[node] + partition_to_next_regions = {} + + # give each job a different partition offset i, so we can read/write to different buckets + partition_to_next_regions[jobs[i].uuid] = set( + [edge[1] for edge in solution_graph.out_edges(node, data=True) + if str(jobs[i].uuid) in edge[-1]["partitions"]]) + + keys_per_set = collections.defaultdict(list) + for key, value in partition_to_next_regions.items(): + keys_per_set[frozenset(value)].append(key) + + list_of_partitions = list(keys_per_set.values()) + + # source node: read from object store or generate random data, then forward data + for partitions in list_of_partitions: + if node == src_region_tag: + self.add_src_operator( + solution_graph, + node_gateway_program, + node, + partitions, + partition_offset=i, + plan=plan, + bucket_info=(src_bucket, src_region_tag) + ) + + # dst receive data, write to object store, forward data if needed + elif node in dst_region_tags: + dst_bucket = dst_buckets[node] + self.add_dst_operator( + solution_graph, + node_gateway_program, + node, + partitions, + partition_offset=i, + plan=plan, + obj_store=(dst_bucket, node), + ) + + # overlay node only forward data + else: + self.add_overlay_operator( + solution_graph, node_gateway_program, node, partitions, partition_offset=i, plan=plan + ) + region_to_gateway_program[node] = node_gateway_program + assert len(region_to_gateway_program) > 0, f"Empty gateway program {node}" + + for node in solution_graph.nodes: + plan.set_gateway_program(node, region_to_gateway_program[node]) + + for edge in solution_graph.edges.data(): + src_region, dst_region = edge[0], edge[1] + plan.cost_per_gb += compute.CloudProvider.get_transfer_cost(src_region, dst_region) * ( + len(edge[-1]["partitions"]) / self.n_partitions) return plan + def draw_nxgrapg(self, save_fig=True): + if self.solution_graph is None: + raise Exception('please run plan method first to get nx_graph') + nx.draw(self.solution_graph, with_labels=True) + if save_fig: + os.makedirs('graph', exist_ok=True) + plt.savefig(f'graph/figure.png') + else: + plt.show() + + def draw_di_graph(self, di_graph, save_name = "test.png"): + # Specify the edges you want here + pos = nx.spring_layout(di_graph) + nx.draw_networkx_nodes(di_graph, pos, node_size = 50) + nx.draw_networkx_labels(di_graph, pos, font_size = 8) + nx.draw_networkx_edges(di_graph, pos, edgelist=di_graph.edges(), edge_color='blue', arrows=True) + + plt.savefig(save_name) + class MulticastDirectPlanner(Planner): - def __init__(self, n_instances: int, n_connections: int, transfer_config: TransferConfig, quota_limits_file: Optional[str] = None): - super().__init__(transfer_config, quota_limits_file) - self.n_instances = n_instances - self.n_connections = n_connections + def __init__(self, transfer_config: TransferConfig, + n_instances: int, + n_connections: int, + n_partitions: int, + quota_limits_file: Optional[str] = None): + super().__init__(transfer_config, n_instances, n_connections, n_partitions, quota_limits_file) def plan(self, jobs: List[TransferJob]) -> TopologyPlan: src_region_tag = jobs[0].src_iface.region_tag() @@ -280,7 +611,8 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: # jobs must have same sources and destinations for job in jobs[1:]: assert job.src_iface.region_tag() == src_region_tag, "All jobs must have same source region" - assert [iface.region_tag() for iface in job.dst_ifaces] == dst_region_tags, "Add jobs must have same destination set" + assert [iface.region_tag() for iface in + job.dst_ifaces] == dst_region_tags, "Add jobs must have same destination set" plan = TopologyPlan(src_region_tag=src_region_tag, dest_region_tags=dst_region_tags) @@ -289,7 +621,8 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: vm_types = None n_instances = self.n_instances else: - vm_types, n_instances = self._get_vm_type_and_instances(src_region_tag=src_region_tag, dst_region_tags=dst_region_tags) + vm_types, n_instances = self._get_vm_type_and_instances(src_region_tag=src_region_tag, + dst_region_tags=dst_region_tags) # TODO: support on-sided transfers but not requiring VMs to be created in source/destination regions for i in range(n_instances): @@ -312,11 +645,11 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: # source region gateway program obj_store_read = src_program.add_operator( - GatewayReadObjectStore(src_bucket, src_region_tag, self.n_connections), partition_id=partition_id - ) + GatewayReadObjectStore(src_bucket, src_region_tag, self.n_connections), partition_id=(partition_id,)) # send to all destination - mux_and = src_program.add_operator(GatewayMuxAnd(), parent_handle=obj_store_read, partition_id=partition_id) + mux_and = src_program.add_operator(GatewayMuxAnd(), parent_handle=obj_store_read, + partition_id=(partition_id,)) dst_prefixes = job.dst_prefixes for i in range(len(job.dst_ifaces)): dst_iface = job.dst_ifaces[i] @@ -330,15 +663,16 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: src_program.add_operator( GatewayWriteObjectStore(dst_bucket, dst_region_tag, self.n_connections, key_prefix=dst_prefix), parent_handle=mux_and, - partition_id=partition_id, + partition_id=(partition_id,) ) continue # can send to any gateway in region - mux_or = src_program.add_operator(GatewayMuxOr(), parent_handle=mux_and, partition_id=partition_id) + mux_or = src_program.add_operator(GatewayMuxOr(), parent_handle=mux_and, partition_id=(partition_id,)) for i in range(n_instances): private_ip = False - if dst_gateways[i].provider == "gcp" and src_provider == "gcp": + # TODO pcnl openstack|tencent|hw|ali|cloudflare|ibm|gcp|azure|aws + if dst_gateways[i].provider == src_provider and src_provider in {"gcp", "openstack"}: # print("Using private IP for GCP to GCP transfer", src_region_tag, dst_region_tag) private_ip = True src_program.add_operator( @@ -351,7 +685,7 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: encrypt=self.transfer_config.use_e2ee, ), parent_handle=mux_or, - partition_id=partition_id, + partition_id=(partition_id,), ) # each gateway also recieves data from source @@ -362,7 +696,7 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: dst_program[dst_region_tag].add_operator( GatewayWriteObjectStore(dst_bucket, dst_region_tag, self.n_connections, key_prefix=dst_prefix), parent_handle=recv_op, - partition_id=partition_id, + partition_id=(partition_id,), ) # update cost per GB @@ -385,7 +719,8 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: # jobs must have same sources and destinations for job in jobs[1:]: assert job.src_iface.region_tag() == src_region_tag, "All jobs must have same source region" - assert [iface.region_tag() for iface in job.dst_ifaces] == dst_region_tags, "Add jobs must have same destination set" + assert [iface.region_tag() for iface in + job.dst_ifaces] == dst_region_tags, "Add jobs must have same destination set" plan = TopologyPlan(src_region_tag=src_region_tag, dest_region_tags=dst_region_tags) @@ -410,10 +745,11 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: # source region gateway program obj_store_read = src_program.add_operator( - GatewayReadObjectStore(src_bucket, src_region_tag, self.n_connections), partition_id=partition_id + GatewayReadObjectStore(src_bucket, src_region_tag, self.n_connections), partition_id=(partition_id,) ) # send to all destination - mux_and = src_program.add_operator(GatewayMuxAnd(), parent_handle=obj_store_read, partition_id=partition_id) + mux_and = src_program.add_operator(GatewayMuxAnd(), parent_handle=obj_store_read, + partition_id=(partition_id,)) dst_prefixes = job.dst_prefixes for i in range(len(job.dst_ifaces)): dst_iface = job.dst_ifaces[i] @@ -426,7 +762,7 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: src_program.add_operator( GatewayWriteObjectStore(dst_bucket, dst_region_tag, self.n_connections, key_prefix=dst_prefix), parent_handle=mux_and, - partition_id=partition_id, + partition_id=(partition_id,), ) # update cost per GB plan.cost_per_gb += compute.CloudProvider.get_transfer_cost(src_region_tag, dst_region_tag) @@ -446,17 +782,15 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: # jobs must have same sources and destinations for job in jobs[1:]: assert job.src_iface.region_tag() == src_region_tag, "All jobs must have same source region" - assert [iface.region_tag() for iface in job.dst_ifaces] == dst_region_tags, "Add jobs must have same destination set" + assert [iface.region_tag() for iface in + job.dst_ifaces] == dst_region_tags, "Add jobs must have same destination set" plan = TopologyPlan(src_region_tag=src_region_tag, dest_region_tags=dst_region_tags) - - # Dynammically calculate n_instances based on quota limits - vm_types, n_instances = self._get_vm_type_and_instances(dst_region_tags=dst_region_tags) - + # TODO: use VM limits to determine how many instances to create in each region # TODO: support on-sided transfers but not requiring VMs to be created in source/destination regions - for i in range(n_instances): + for i in range(self.n_instances): for dst_region_tag in dst_region_tags: - plan.add_gateway(dst_region_tag, vm_types[dst_region_tag]) + plan.add_gateway(dst_region_tag) # initialize gateway programs per region dst_program = {dst_region: GatewayProgram() for dst_region in dst_region_tags} @@ -467,7 +801,7 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: src_region_tag = job.src_iface.region_tag() src_provider = src_region_tag.split(":")[0] - partition_id = job.uuid + partition_id = jobs.index(job) # send to all destination dst_prefixes = job.dst_prefixes @@ -496,3 +830,382 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: for dst_region_tag, program in dst_program.items(): plan.set_gateway_program(dst_region_tag, program) return plan + + +class MulticastILPPlanner(Planner): + def __init__( + self, + transfer_config: TransferConfig, + n_instances: int, + n_connections: int, + n_partitions: int, + target_time, + gbyte_to_transfer: int, + filter_node: bool = False, + filter_edge: bool = False, + solver_verbose: bool = False, + save_lp_path: Optional[str] = None, + ): + super().__init__(transfer_config, n_connections, n_partitions) + self.target_time = target_time + self.G = super().make_nx_graph() + self.gbyte_to_transfer = gbyte_to_transfer + self.filter_node = filter_node + self.filter_edge = filter_edge + self.solver_verbose = solver_verbose + self.save_lp_path = save_lp_path + self.n_instances = n_instances + + def multicast_solution_to_nxgraph(self, solution: BroadcastSolution) -> nx.DiGraph: + """ + Convert ILP solution to logical plan in nx graph + """ + v_result = solution.var_instances_per_region + result = np.array(solution.var_edge_partitions) + result_g = nx.DiGraph() # solution nx graph + for i in range(result.shape[0]): + edge = solution.var_edges[i] + partitions = [str(partition_i) for partition_i in range(result.shape[1]) if result[i][partition_i] > 0.5] + + if len(partitions) == 0: + continue + + src_node, dst_node = edge[0], edge[1] + result_g.add_edge( + src_node, + dst_node, + partitions=partitions, + throughput=self.G[src_node][dst_node]["throughput"], + cost=self.G[src_node][dst_node]["cost"], + ) + + for i in range(len(v_result)): + num_vms = int(v_result[i]) + node = solution.var_nodes[i] + if node in result_g.nodes: + result_g.nodes[node]["num_vms"] = num_vms + + return result_g + + def logical_plan( + self, + src_region: str, + dst_regions: List[str], + ) -> nx.DiGraph: + import cvxpy as cp + solver = cp.CBC + + problem = BroadcastProblem( + src=src_region, + dsts=dst_regions, + gbyte_to_transfer=self.gbyte_to_transfer, + instance_limit=self.n_instances, + num_partitions=self.n_partitions, + required_time_budget=self.target_time, + ) + + g = self.G + + # node-approximation + if self.filter_node: + src_dst_li = [problem.src] + problem.dsts + sampled = [i for i in sample(list(self.G.nodes), 15) if i not in src_dst_li] + g = g.subgraph(src_dst_li + sampled).copy() + print(f"Filter node (only use): {src_dst_li + sampled}") + + cost = np.array([e[2] for e in g.edges(data="cost")]) + tp = np.array([e[2] for e in g.edges(data="throughput")]) + + edges = list(g.edges) + nodes = list(g.nodes) + num_edges, num_nodes = len(edges), len(nodes) + num_dest = len(problem.dsts) + print(f"Num edges: {num_edges}, num nodes: {num_nodes}, num dest: {num_dest}, runtime budget: {problem.required_time_budget}s") + + partition_size_gb = problem.gbyte_to_transfer / problem.num_partitions + partition_size_gbit = partition_size_gb * GBIT_PER_GBYTE + print("Partition size (gbit): ", partition_size_gbit) + + # define variables + p = cp.Variable((num_edges, problem.num_partitions), boolean=True) # whether edge is carrying partition + n = cp.Variable((num_nodes), boolean=True) # whether node transfers partition + f = cp.Variable((num_nodes * problem.num_partitions, num_nodes + 1), integer=True) # enforce flow conservation + v = cp.Variable((num_nodes), integer=True) # number of VMs per region + + # define objective + egress_cost = cp.sum(cost @ p) * partition_size_gb + instance_cost = cp.sum(v) * (problem.cost_per_instance_hr / 3600) * problem.required_time_budget + tot_cost = egress_cost + instance_cost + obj = cp.Minimize(tot_cost) + + # define constants + constraints = [] + + # constraints on VM per region + for i in range(num_nodes): + constraints.append(v[i] <= problem.instance_limit) + constraints.append(v[i] >= 0) + + # constraints to enforce flow between source/dest nodes + for c in range(problem.num_partitions): + for i in range(num_nodes): + for j in range(num_nodes + 1): + if i != j: + if j != num_nodes: + edge = (nodes[i], nodes[j]) + + constraints.append(f[c * num_nodes + i][j] <= p[edges.index(edge)][c] * num_dest) + # p = 0 -> f <= 0 + # p = 1 -> f <= num_dest + constraints.append( + f[c * num_nodes + i][j] >= (p[edges.index(edge)][c] - 1) * (num_dest + 1) + 1) + # p = 0 -> f >= -(num_dest) + # p = 1 -> f >= 1 + + constraints.append(f[c * num_nodes + i][j] == -f[c * num_nodes + j][i]) + + # capacity constraint for special node + else: + if nodes[i] in problem.dsts: # only connected to destination nodes + constraints.append(f[c * num_nodes + i][j] <= 1) + else: + constraints.append(f[c * num_nodes + i][j] <= 0) + else: + constraints.append(f[c * num_nodes + i][i] == 0) + + # flow conservation + if nodes[i] != problem.src and i != num_nodes + 1: + constraints.append(cp.sum(f[c * num_nodes + i]) == 0) + + # source must have outgoing flow + constraints.append(cp.sum(f[c * num_nodes + nodes.index(problem.src), :]) == num_dest) + + # special node (connected to all destinations) must recieve all flow + constraints.append(cp.sum(f[c * num_nodes: (c + 1) * num_nodes, -1]) == num_dest) + + # node contained if edge is contained + for edge in edges: + constraints.append(n[nodes.index(edge[0])] >= cp.max(p[edges.index(edge)])) + constraints.append(n[nodes.index(edge[1])] >= cp.max(p[edges.index(edge)])) + + # edge approximation + if self.filter_edge: + for edge in edges: + if edge[0] != problem.src and edge[1] not in problem.dsts: + # cannot be in graph + constraints.append(p[edges.index(edge)] == 0) + + # throughput constraint + for edge_i in range(num_edges): + node_i = nodes.index(edge[0]) + constraints.append( + cp.sum(p[edge_i] * partition_size_gbit) <= problem.required_time_budget * tp[edge_i] * v[node_i]) + + # instance limits + for node in nodes: + region = node.split(":")[0] + if region == "aws": + ingress_limit_gbps, egress_limit_gbps = problem.aws_instance_throughput_limit + elif region == "gcp": + ingress_limit_gbps, egress_limit_gbps = problem.gcp_instance_throughput_limit + elif region == "azure": + ingress_limit_gbps, egress_limit_gbps = problem.azure_instance_throughput_limit + elif region == "cloudflare": # TODO: not supported yet in the tput / cost graph + ingress_limit_gbps, egress_limit_gbps = 1, 1 + else: + raise Exception('region limit gbps is not given') + + node_i = nodes.index(node) + # egress + i = np.zeros(num_edges) + for e in g.edges: + if e[0] == node: # edge goes to dest + i[edges.index(e)] = 1 + + constraints.append( + cp.sum(i @ p) * partition_size_gbit <= problem.required_time_budget * egress_limit_gbps * v[node_i]) + + # ingress + i = np.zeros(num_edges) + for e in g.edges: + # edge goes to dest + if e[1] == node: + i[edges.index(e)] = 1 + constraints.append( + cp.sum(i @ p) * partition_size_gbit <= problem.required_time_budget * ingress_limit_gbps * v[node_i]) + + print("Define problem done.") + + # solve + prob = cp.Problem(obj, constraints) + if solver == cp.GUROBI or solver == "gurobi": + solver_options = {} + solver_options["Threads"] = 1 + if self.save_lp_path: + solver_options["ResultFile"] = str(self.save_lp_path) + if not self.solver_verbose: + solver_options["OutputFlag"] = 0 + cost = prob.solve(verbose=self.solver_verbose, qcp=True, solver=cp.GUROBI, reoptimize=True, + **solver_options) + elif solver == cp.CBC or solver == "cbc": + solver_options = {} + solver_options["maximumSeconds"] = 60 + solver_options["numberThreads"] = 1 + cost = prob.solve(verbose=self.solver_verbose, solver=cp.CBC, **solver_options) + else: + cost = prob.solve(solver=solver, verbose=self.solver_verbose) + + if prob.status == "optimal": + solution = BroadcastSolution( + problem=problem, + is_feasible=True, + var_edges=edges, + var_nodes=nodes, + var_edge_partitions=p.value, + var_node_transfer_partitions=n.value, + var_instances_per_region=v.value, + var_flow=f.value, + cost_egress=egress_cost.value, + cost_instance=instance_cost.value, + cost_total=tot_cost.value, + ) + else: + solution = BroadcastSolution(problem=problem, is_feasible=False, extra_data=dict(status=prob.status)) + + if not solution.is_feasible: + raise Exception('no feasible solution found') + + return self.multicast_solution_to_nxgraph(solution) + + def plan(self, jobs: List[TransferJob]) -> TopologyPlan: + src_region_tag, dst_region_tags = self.verify_job_src_dsts(jobs, multicast=True) + # src_provider, src_region = src_region_tag.split(":") + # dst_regions = [dst_region_tag.split(":")[1] for dst_region_tag in dst_region_tags] + solution_graph = self.logical_plan(src_region_tag, dst_region_tags) + self.solution_graph = solution_graph + return self.logical_plan_to_topology_plan(jobs, solution_graph) + + +class MulticastMDSTPlanner(Planner): + def __init__(self, transfer_config, n_instances: int, n_connections: int, n_partitions:int): + super().__init__(transfer_config, n_instances, n_connections, n_partitions) + self.G = super().make_nx_graph() + + def logical_plan(self, src_region: str, dst_regions: List[str]) -> nx.DiGraph: + h = self.G.copy() + h.remove_edges_from(list(h.in_edges(src_region)) + list(nx.selfloop_edges(h))) + + DST_graph = nx.algorithms.tree.Edmonds(h.subgraph([src_region] + dst_regions)) + opt_DST = DST_graph.find_optimum(attr="cost", kind="min", preserve_attrs=True, style="arborescence") + + # Construct MDST graph + MDST_graph = nx.DiGraph() + for edge in list(opt_DST.edges()): + s, d = edge[0], edge[1] + MDST_graph.add_edge(s, d, partitions=[str(i) for i in list(range(self.n_partitions))]) + + for node in MDST_graph.nodes: + MDST_graph.nodes[node]["num_vms"] = self.n_instances + + return MDST_graph + + def plan(self, jobs: List[TransferJob]) -> TopologyPlan: + src_region_tag, dst_region_tags = self.verify_job_src_dsts(jobs, multicast=True) + # src_provider, src_region = src_region_tag.split(":") + # dst_regions = [dst_region_tag.split(":")[1] for dst_region_tag in dst_region_tags] + solution_graph = self.logical_plan(src_region_tag, dst_region_tags) + self.solution_graph = solution_graph + return self.logical_plan_to_topology_plan(jobs, solution_graph) + +class UnicastILPPlanner(Planner): + def __init__(self, required_throughput_gbits, *args, **kwargs): + super().__init__(*args, **kwargs) + self.solver_required_throughput_gbits = required_throughput_gbits + self.G = super().make_nx_graph() + + def logical_plan(self, src_region: str, dst_regions: List[str], jobs) -> nx.DiGraph: + + if isinstance(dst_regions, List): + dst_regions = dst_regions[0] + + problem = ThroughputProblem( + src=src_region, + dst=dst_regions, + required_throughput_gbits=self.solver_required_throughput_gbits, + gbyte_to_transfer=1, + instance_limit=self.n_instances, + ) + + with path("skyplane.data", "throughput.csv") as solver_throughput_grid: + tput = ThroughputSolverILP(solver_throughput_grid) + # solver = tput.choose_solver() + solution = tput.solve_min_cost(problem) + + if not solution.is_feasible: + raise RuntimeError("No feasible solution found") + + regions = tput.get_regions() + return self.unicast_solution_to_nxgraph(solution, regions, jobs) + + def unicast_solution_to_nxgraph(self, solution: ThroughputSolution, regions, jobs) -> nx.DiGraph: + """ + Convert ILP solution to logical plan in nx graph + """ + v_result = solution.var_instances_per_region + result_g = nx.DiGraph() # solution nx graph + for i, src_node in enumerate(regions): + for j, dst_node in enumerate(regions): + if solution.var_edge_flow_gigabits[i, j] > 0: + result_g.add_edge( + src_node, + dst_node, + partitions=[jobs[0].uuid], + throughput=self.G[src_node][dst_node]["throughput"], + cost=self.G[src_node][dst_node]["cost"], + num_conn=int(solution.var_conn[i][j]) + ) + for i in range(len(v_result)): + num_vms = math.ceil(v_result[i]) + node = regions[i] + if node in result_g.nodes: + result_g.nodes[node]["num_vms"] = num_vms + + return result_g + + def plan(self, jobs: List[TransferJob]) -> TopologyPlan: + src_region_tag, dst_region_tags = self.verify_job_src_dsts(jobs) + # src_provider, src_region = src_region_tag.split(":") + # dst_regions = [dst_region_tag.split(":")[1] for dst_region_tag in dst_region_tags] + solution_graph = self.logical_plan(src_region_tag, dst_region_tags, jobs) + self.solution_graph = solution_graph + return self.logical_plan_to_topology_plan(jobs, solution_graph) + +class UnicastDirectPlanner(Planner): + + def __init__(self, transfer_config: TransferConfig, + n_instances: int, + n_connections: int, + n_partitions: Optional[int] = 1, + quota_limits_file: Optional[str] = None): + super().__init__(transfer_config, n_instances, n_connections, n_partitions, quota_limits_file) + + def logical_plan(self, src_region: str, dst_regions: List[str], jobs) -> nx.DiGraph: + graph = nx.DiGraph() + graph.add_node(src_region) + for dst_region in dst_regions: + graph.add_node(dst_region) + graph.add_edge(src_region, dst_region, partitions=[jobs[0].uuid]) + + for node in graph.nodes: + graph.nodes[node]["num_vms"] = self.n_instances + + return graph + + def plan(self, jobs: List[TransferJob]) -> TopologyPlan: + src_region_tag, dst_region_tags = self.verify_job_src_dsts(jobs) + # src_provider, src_region = src_region_tag.split(":") + # dst_regions = [dst_region_tag.split(":")[1] for dst_region_tag in dst_region_tags] + solution_graph = self.logical_plan(src_region_tag, dst_region_tags, jobs) + self.solution_graph = solution_graph + return self.logical_plan_to_topology_plan(jobs, solution_graph) + diff --git a/skyplane/planner/solver.py b/skyplane/planner/solver.py index 5a18f1ed4..65e1a740d 100644 --- a/skyplane/planner/solver.py +++ b/skyplane/planner/solver.py @@ -15,6 +15,86 @@ GBIT_PER_GBYTE = 8 +@dataclass +class BroadcastProblem: + src: str + dsts: List[str] + + gbyte_to_transfer: float + instance_limit: int # max # of vms per region + num_partitions: int + + required_time_budget: float = 10 # ILP specific, default to 10s + + const_throughput_grid_gbits: Optional[np.ndarray] = None # if not set, load from profiles + const_cost_per_gb_grid: Optional[np.ndarray] = None # if not set, load from profiles + + # provider bandwidth limits (ingress, egress) + aws_instance_throughput_limit: Tuple[float, float] = (10, 5) + gcp_instance_throughput_limit: Tuple[float, float] = (16, 7) # limited to 12.5 gbps due to CPU limit + azure_instance_throughput_limit: Tuple[float, float] = (16, 16) # limited to 12.5 gbps due to CPU limit + + # benchmarked_throughput_connections is the number of connections that the iperf3 throughput grid was run at, + # we assume throughput is linear up to this connection limit + benchmarked_throughput_connections = 64 + cost_per_instance_hr = 0.54 # based on m5.8xlarge spot + instance_cost_multiplier = 1.0 + # instance_provision_time_s = 0.0 + + def to_summary_dict(self): + """Simple summary of the problem""" + return { + "src": self.src, + "dsts": self.dsts, + "gbyte_to_transfer": self.gbyte_to_transfer, + "instance_limit": self.instance_limit, + "num_partitions": self.num_partitions, + "required_time_budget": self.required_time_budget, + "aws_instance_throughput_limit": self.aws_instance_throughput_limit, + "gcp_instance_throughput_limit": self.gcp_instance_throughput_limit, + "azure_instance_throughput_limit": self.azure_instance_throughput_limit, + "benchmarked_throughput_connections": self.benchmarked_throughput_connections, + "cost_per_instance_hr": self.cost_per_instance_hr, + "instance_cost_multiplier": self.instance_cost_multiplier + # "instance_provision_time_s": self.instance_provision_time_s, + } + + +@dataclass +class BroadcastSolution: + problem: BroadcastProblem + is_feasible: bool + extra_data: Optional[Dict] = None + + var_edges: Optional[List] = None # need to fix this, just for testing + var_nodes: Optional[List] = None # need to fix this, just for testing + + # solution variables + var_edge_partitions: Optional[np.ndarray] = None # each edge carries each partition or not + var_node_transfer_partitions: Optional[np.ndarray] = None # whether node transfers partition + var_instances_per_region: Optional[np.ndarray] = None # number of VMs per region + var_flow: Optional[np.ndarray] = None # enforce flow conservation, just used for checking + + # solution values + cost_egress: Optional[float] = None + cost_instance: Optional[float] = None + cost_total: Optional[float] = None + transfer_runtime_s: Optional[float] = None # NOTE: might not be able to calculate here + throughput_achieved_gbits: Optional[List[float]] = None # NOTE: might not be able to calculate here + + def to_summary_dict(self): + """Print simple summary of solution.""" + return { + "is_feasible": self.is_feasible, + "solution": { + "cost_egress": self.cost_egress, + "cost_instance": self.cost_instance, + "cost_total": self.cost_total, + "time_budget": self.problem.required_time_budget, + }, + } + + @dataclass class ThroughputProblem: src: str diff --git a/skyplane/planner/solver_ilp.py b/skyplane/planner/solver_ilp.py index e81dbac98..b159572b5 100644 --- a/skyplane/planner/solver_ilp.py +++ b/skyplane/planner/solver_ilp.py @@ -1,18 +1,23 @@ -import cvxpy as cp # type: ignore - from skyplane.planner.solver import ThroughputSolver, ThroughputProblem, GBIT_PER_GBYTE, ThroughputSolution class ThroughputSolverILP(ThroughputSolver): @staticmethod def choose_solver(): + import cvxpy as cp # type: ignore + installed = cp.installed_solvers() order = ["GUROBI", "CBC", "GLPK_MI"] for package in order: if package in installed: return getattr(cp, package) - def solve_min_cost(self, p: ThroughputProblem, solver=cp.GLPK, solver_verbose=False, save_lp_path=None) -> ThroughputSolution: + def solve_min_cost(self, p: ThroughputProblem, solver=None, solver_verbose=False, save_lp_path=None) -> ThroughputSolution: + import cvxpy as cp # type: ignore + + if solver == None: + solver = cp.GLPK # default + regions = self.get_regions() sources = [regions.index(p.src)] sinks = [regions.index(p.dst)] diff --git a/skyplane/planner/topology.py b/skyplane/planner/topology.py index fd5a1898f..e485ce3d3 100644 --- a/skyplane/planner/topology.py +++ b/skyplane/planner/topology.py @@ -63,9 +63,10 @@ class TopologyPlan: The TopologyPlan constains a list of gateway programs corresponding to each gateway in the dataplane. """ - def __init__(self, src_region_tag: str, dest_region_tags: List[str], cost_per_gb: float = 0.0): + def __init__(self, src_region_tag: str, dest_region_tags: List[str], overlay_region_tags: List[str] = [], cost_per_gb: float = 0.0): self.src_region_tag = src_region_tag self.dest_region_tags = dest_region_tags + self.overlay_region_tags = overlay_region_tags self.gateways = {} self.cost_per_gb = cost_per_gb