From b9535a4663563f19189920ea37195c79f709a7bc Mon Sep 17 00:00:00 2001 From: Phillip Weinberg Date: Mon, 24 Mar 2025 09:56:25 -0400 Subject: [PATCH 1/2] implement interface --- src/bloqade/analog/task/base.py | 52 ++++++++++++++++++++++++++++++++ src/bloqade/analog/task/batch.py | 8 +++-- 2 files changed, 58 insertions(+), 2 deletions(-) diff --git a/src/bloqade/analog/task/base.py b/src/bloqade/analog/task/base.py index a4985e2b0..c351167dc 100644 --- a/src/bloqade/analog/task/base.py +++ b/src/bloqade/analog/task/base.py @@ -1,3 +1,4 @@ +import abc import datetime from typing import Any from numbers import Number @@ -12,11 +13,13 @@ from bloqade.analog.serialize import Serializer from bloqade.analog.visualization import display_report +from bloqade.analog.builder.typing import ParamType from bloqade.analog.submission.ir.parallel import ParallelDecoder from bloqade.analog.submission.ir.task_results import ( QuEraTaskResults, QuEraTaskStatusCode, ) +from bloqade.analog.submission.ir.task_specification import QuEraTaskSpecification @Serializer.register @@ -97,6 +100,55 @@ def _result_exists(self) -> bool: raise NotImplementedError +class CustomRemoteTaskABC(RemoteTask, abc.ABC): + + @property + @abc.abstractmethod + def metadata(self) -> Dict[str, ParamType]: ... + + @property + def task_result_ir(self) -> QuEraTaskResults: + return self.result() + + @task_result_ir.setter + def set_task_result(self, task_result): + self._set_result(task_result) + + @abc.abstractmethod + def _set_result(self, task_result_ir: QuEraTaskResults) -> None: ... + + @property + @abc.abstractmethod + def task_id(self) -> str: + pass + + @property + @abc.abstractmethod + def task_ir(self) -> QuEraTaskSpecification: ... + + @abc.abstractmethod + def result(self) -> QuEraTaskResults: ... + + @abc.abstractmethod + def _result_exists(self) -> bool: ... + + @abc.abstractmethod + def fetch(self) -> None: ... + + def status(self) -> QuEraTaskStatusCode: + if self._result_exists(): + return self.result().task_status + else: + raise RuntimeError("Result does not exist yet.") + + @abc.abstractmethod + def _submit(self): ... + + def submit(self, force: bool = False): + if not self._result_exists() or force: + self._submit() + + class LocalTask(Task): """`Task` to use for local executions for simulation purposes..""" diff --git a/src/bloqade/analog/task/batch.py b/src/bloqade/analog/task/batch.py index d173602f7..42b4c0b0e 100644 --- a/src/bloqade/analog/task/batch.py +++ b/src/bloqade/analog/task/batch.py @@ -17,7 +17,7 @@ from beartype.typing import Any, Dict, List, Union, Optional from bloqade.analog.serialize import Serializer -from bloqade.analog.task.base import Report +from bloqade.analog.task.base import Report, CustomRemoteTaskABC from bloqade.analog.task.quera import QuEraTask from bloqade.analog.task.braket import BraketTask from bloqade.analog.builder.base import Builder @@ -333,7 +333,11 @@ def _deserialize(obj: dict) -> BatchErrors: @Serializer.register class RemoteBatch(Serializable, Filter): source: Builder - tasks: Union[OrderedDict[int, QuEraTask], OrderedDict[int, BraketTask]] + tasks: Union[ + OrderedDict[int, QuEraTask], + OrderedDict[int, BraketTask], + OrderedDict[int, CustomRemoteTaskABC], + ] name: Optional[str] = None class SubmissionException(Exception): From 50a2e45179185900a0cfcf32ac853d84193a8660 Mon Sep 17 00:00:00 2001 From: Phillip Weinberg Date: Mon, 24 Mar 2025 14:22:16 -0400 Subject: [PATCH 2/2] adding custom , routes for CustomSubmission using new . --- src/bloqade/analog/ir/routine/quera.py | 139 ++++++++++++++++++++++++- src/bloqade/analog/task/base.py | 36 +++++-- src/bloqade/analog/task/batch.py | 6 +- 3 files changed, 169 insertions(+), 12 deletions(-) diff --git a/src/bloqade/analog/ir/routine/quera.py b/src/bloqade/analog/ir/routine/quera.py index c6ff82608..5d82aa7f5 100644 --- a/src/bloqade/analog/ir/routine/quera.py +++ b/src/bloqade/analog/ir/routine/quera.py @@ -1,5 +1,6 @@ import json import time +from typing import TypeVar from collections import OrderedDict, namedtuple from beartype import beartype @@ -7,6 +8,7 @@ from beartype.typing import Any, Dict, List, Tuple, Union, Optional, NamedTuple from pydantic.v1.dataclasses import dataclass +from bloqade.analog.task.base import CustomRemoteTaskABC from bloqade.analog.task.batch import RemoteBatch from bloqade.analog.task.quera import QuEraTask from bloqade.analog.builder.typing import LiteralType @@ -49,7 +51,7 @@ def custom(self) -> "CustomSubmissionRoutine": @dataclass(frozen=True, config=__pydantic_dataclass_config__) class CustomSubmissionRoutine(RoutineBase): - def _compile( + def _compile_single( self, shots: int, use_experimental: bool = False, @@ -150,7 +152,7 @@ def submit( ) out = [] - for metadata, task_ir in self._compile(shots, use_experimental, args): + for metadata, task_ir in self._compile_single(shots, use_experimental, args): json_request_body = json_body_template.format( task_ir=task_ir.json(exclude_none=True, exclude_unset=True) ) @@ -161,6 +163,139 @@ def submit( return out + RemoteTaskType = TypeVar("RemoteTaskType", bound=CustomRemoteTaskABC) + + def _compile_custom_batch( + self, + shots: int, + RemoteTask: type[RemoteTaskType], + use_experimental: bool = False, + args: Tuple[LiteralType, ...] = (), + name: Optional[str] = None, + ) -> RemoteBatch: + from bloqade.analog.submission.capabilities import get_capabilities + from bloqade.analog.compiler.passes.hardware import ( + assign_circuit, + analyze_channels, + generate_ahs_code, + generate_quera_ir, + validate_waveforms, + canonicalize_circuit, + ) + + if not issubclass(RemoteTask, CustomRemoteTaskABC): + raise TypeError(f"{RemoteTask} must be a subclass of CustomRemoteTaskABC.") + + circuit, params = self.circuit, self.params + capabilities = get_capabilities(use_experimental) + + tasks = OrderedDict() + + for task_number, batch_params in enumerate(params.batch_assignments(*args)): + assignments = {**batch_params, **params.static_params} + final_circuit, metadata = assign_circuit(circuit, assignments) + + level_couplings = analyze_channels(final_circuit) + final_circuit = canonicalize_circuit(final_circuit, level_couplings) + + validate_waveforms(level_couplings, final_circuit) + ahs_components = generate_ahs_code( + capabilities, level_couplings, final_circuit + ) + + task_ir = generate_quera_ir(ahs_components, shots).discretize(capabilities) + + tasks[task_number] = RemoteTask.from_compile_results( + task_ir, + metadata, + ahs_components.lattice_data.parallel_decoder, + ) + + batch = RemoteBatch(source=self.source, tasks=tasks, name=name) + + return batch + + @beartype + def run_async( + self, + shots: int, + RemoteTask: type[RemoteTaskType], + args: Tuple[LiteralType, ...] = (), + name: Optional[str] = None, + use_experimental: bool = False, + shuffle: bool = False, + **kwargs, + ) -> RemoteBatch: + """ + Compile to a RemoteBatch, which contain + QuEra backend specific tasks, + and run_async through QuEra service. + + Args: + shots (int): number of shots + args (Tuple): additional arguments + name (str): custom name of the batch + shuffle (bool): shuffle the order of jobs + + Return: + RemoteBatch + + """ + batch = self._compile_custom_batch( + shots, RemoteTask, use_experimental, args, name + ) + batch._submit(shuffle, **kwargs) + return batch + + @beartype + def run( + self, + shots: int, + RemoteTask: type[RemoteTaskType], + args: Tuple[LiteralType, ...] = (), + name: Optional[str] = None, + use_experimental: bool = False, + shuffle: bool = False, + **kwargs, + ) -> RemoteBatch: + """Run the custom task and return the result. + + Args: + shots (int): number of shots + RemoteTask (type): type of the remote task, must subclass of CustomRemoteTaskABC + args (Tuple): additional arguments for remaining un + name (str): name of the batch object + shuffle (bool): shuffle the order of jobs + """ + if not callable(getattr(RemoteTask, "pull", None)): + raise TypeError( + f"{RemoteTask} must have a `pull` method for executing `run`." + ) + + batch = self.run_async( + shots, RemoteTask, args, name, use_experimental, shuffle, **kwargs + ) + batch.pull() + return batch + + @beartype + def __call__( + self, + *args: LiteralType, + RemoteTask: type[RemoteTaskType] | None = None, + shots: int = 1, + name: Optional[str] = None, + use_experimental: bool = False, + shuffle: bool = False, + **kwargs, + ) -> RemoteBatch: + if RemoteTask is None: + raise ValueError("RemoteTask must be provided for custom submission.") + + return self.run( + shots, RemoteTask, args, name, use_experimental, shuffle, **kwargs + ) + @dataclass(frozen=True, config=__pydantic_dataclass_config__) class QuEraHardwareRoutine(RoutineBase): diff --git a/src/bloqade/analog/task/base.py b/src/bloqade/analog/task/base.py index c351167dc..aab4cc323 100644 --- a/src/bloqade/analog/task/base.py +++ b/src/bloqade/analog/task/base.py @@ -100,22 +100,44 @@ def _result_exists(self) -> bool: raise NotImplementedError -class CustomRemoteTaskABC(RemoteTask, abc.ABC): +class CustomRemoteTaskABC(abc.ABC): + + @classmethod + @abc.abstractmethod + def from_compile_results( + cls, + task_ir: QuEraTaskSpecification, + metadata: Dict[str, ParamType], + parallel_decoder: Optional[ParallelDecoder], + ): ... + + @property + @abc.abstractmethod + def geometry(self) -> Geometry: ... + + @property + @abc.abstractmethod + def parallel_decoder(self) -> ParallelDecoder: ... @property @abc.abstractmethod def metadata(self) -> Dict[str, ParamType]: ... @property - def task_result_ir(self) -> QuEraTaskResults: - return self.result() + def task_result_ir(self) -> QuEraTaskResults | None: + if not hasattr(self, "_task_result_ir"): + self._task_result_ir = QuEraTaskResults( + task_status=QuEraTaskStatusCode.Unaccepted + ) + + if self._result_exists(): + self._task_result_ir = self.result() + + return self._task_result_ir @task_result_ir.setter def set_task_result(self, task_result): - self._set_result(task_result) - - @abc.abstractmethod - def _set_result(self, task_result_ir: QuEraTaskResults) -> None: ... + self._task_result_ir = task_result @property @abc.abstractmethod diff --git a/src/bloqade/analog/task/batch.py b/src/bloqade/analog/task/batch.py index 42b4c0b0e..d85dcd2d6 100644 --- a/src/bloqade/analog/task/batch.py +++ b/src/bloqade/analog/task/batch.py @@ -446,10 +446,10 @@ def tasks_metric(self) -> pd.DataFrame: # offline, non-blocking tid = [] data = [] - for int, task in self.tasks.items(): - tid.append(int) + for task_num, task in self.tasks.items(): + tid.append(task_num) - dat = [None, None, None] + dat: list[int | str | None] = [None, None, None] dat[0] = task.task_id if task.task_result_ir is not None: dat[1] = task.task_result_ir.task_status.name