Skip to content

Custom Remote Task Interface #1022

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 137 additions & 2 deletions src/bloqade/analog/ir/routine/quera.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import json
import time
from typing import TypeVar
from collections import OrderedDict, namedtuple

from beartype import beartype
from requests import Response, request
from beartype.typing import Any, Dict, List, Tuple, Union, Optional, NamedTuple
from pydantic.v1.dataclasses import dataclass

from bloqade.analog.task.base import CustomRemoteTaskABC
from bloqade.analog.task.batch import RemoteBatch
from bloqade.analog.task.quera import QuEraTask
from bloqade.analog.builder.typing import LiteralType
Expand Down Expand Up @@ -49,7 +51,7 @@

@dataclass(frozen=True, config=__pydantic_dataclass_config__)
class CustomSubmissionRoutine(RoutineBase):
def _compile(
def _compile_single(
self,
shots: int,
use_experimental: bool = False,
Expand Down Expand Up @@ -150,7 +152,7 @@
)

out = []
for metadata, task_ir in self._compile(shots, use_experimental, args):
for metadata, task_ir in self._compile_single(shots, use_experimental, args):
json_request_body = json_body_template.format(
task_ir=task_ir.json(exclude_none=True, exclude_unset=True)
)
Expand All @@ -161,6 +163,139 @@

return out

RemoteTaskType = TypeVar("RemoteTaskType", bound=CustomRemoteTaskABC)

def _compile_custom_batch(
self,
shots: int,
RemoteTask: type[RemoteTaskType],
use_experimental: bool = False,
args: Tuple[LiteralType, ...] = (),
name: Optional[str] = None,
) -> RemoteBatch:
from bloqade.analog.submission.capabilities import get_capabilities
from bloqade.analog.compiler.passes.hardware import (

Check warning on line 177 in src/bloqade/analog/ir/routine/quera.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/ir/routine/quera.py#L176-L177

Added lines #L176 - L177 were not covered by tests
assign_circuit,
analyze_channels,
generate_ahs_code,
generate_quera_ir,
validate_waveforms,
canonicalize_circuit,
)

if not issubclass(RemoteTask, CustomRemoteTaskABC):
raise TypeError(f"{RemoteTask} must be a subclass of CustomRemoteTaskABC.")

Check warning on line 187 in src/bloqade/analog/ir/routine/quera.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/ir/routine/quera.py#L186-L187

Added lines #L186 - L187 were not covered by tests

circuit, params = self.circuit, self.params
capabilities = get_capabilities(use_experimental)

Check warning on line 190 in src/bloqade/analog/ir/routine/quera.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/ir/routine/quera.py#L189-L190

Added lines #L189 - L190 were not covered by tests

tasks = OrderedDict()

Check warning on line 192 in src/bloqade/analog/ir/routine/quera.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/ir/routine/quera.py#L192

Added line #L192 was not covered by tests

for task_number, batch_params in enumerate(params.batch_assignments(*args)):
assignments = {**batch_params, **params.static_params}
final_circuit, metadata = assign_circuit(circuit, assignments)

Check warning on line 196 in src/bloqade/analog/ir/routine/quera.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/ir/routine/quera.py#L194-L196

Added lines #L194 - L196 were not covered by tests

level_couplings = analyze_channels(final_circuit)
final_circuit = canonicalize_circuit(final_circuit, level_couplings)

Check warning on line 199 in src/bloqade/analog/ir/routine/quera.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/ir/routine/quera.py#L198-L199

Added lines #L198 - L199 were not covered by tests

validate_waveforms(level_couplings, final_circuit)
ahs_components = generate_ahs_code(

Check warning on line 202 in src/bloqade/analog/ir/routine/quera.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/ir/routine/quera.py#L201-L202

Added lines #L201 - L202 were not covered by tests
capabilities, level_couplings, final_circuit
)

task_ir = generate_quera_ir(ahs_components, shots).discretize(capabilities)

Check warning on line 206 in src/bloqade/analog/ir/routine/quera.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/ir/routine/quera.py#L206

Added line #L206 was not covered by tests

tasks[task_number] = RemoteTask.from_compile_results(

Check warning on line 208 in src/bloqade/analog/ir/routine/quera.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/ir/routine/quera.py#L208

Added line #L208 was not covered by tests
task_ir,
metadata,
ahs_components.lattice_data.parallel_decoder,
)

batch = RemoteBatch(source=self.source, tasks=tasks, name=name)

Check warning on line 214 in src/bloqade/analog/ir/routine/quera.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/ir/routine/quera.py#L214

Added line #L214 was not covered by tests

return batch

Check warning on line 216 in src/bloqade/analog/ir/routine/quera.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/ir/routine/quera.py#L216

Added line #L216 was not covered by tests

@beartype
def run_async(
self,
shots: int,
RemoteTask: type[RemoteTaskType],
args: Tuple[LiteralType, ...] = (),
name: Optional[str] = None,
use_experimental: bool = False,
shuffle: bool = False,
**kwargs,
) -> RemoteBatch:
"""
Compile to a RemoteBatch, which contain
QuEra backend specific tasks,
and run_async through QuEra service.

Args:
shots (int): number of shots
args (Tuple): additional arguments
name (str): custom name of the batch
shuffle (bool): shuffle the order of jobs

Return:
RemoteBatch

"""
batch = self._compile_custom_batch(

Check warning on line 244 in src/bloqade/analog/ir/routine/quera.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/ir/routine/quera.py#L244

Added line #L244 was not covered by tests
shots, RemoteTask, use_experimental, args, name
)
batch._submit(shuffle, **kwargs)
return batch

Check warning on line 248 in src/bloqade/analog/ir/routine/quera.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/ir/routine/quera.py#L247-L248

Added lines #L247 - L248 were not covered by tests

@beartype
def run(
self,
shots: int,
RemoteTask: type[RemoteTaskType],
args: Tuple[LiteralType, ...] = (),
name: Optional[str] = None,
use_experimental: bool = False,
shuffle: bool = False,
**kwargs,
) -> RemoteBatch:
"""Run the custom task and return the result.

Args:
shots (int): number of shots
RemoteTask (type): type of the remote task, must subclass of CustomRemoteTaskABC
args (Tuple): additional arguments for remaining un
name (str): name of the batch object
shuffle (bool): shuffle the order of jobs
"""
if not callable(getattr(RemoteTask, "pull", None)):
raise TypeError(

Check warning on line 271 in src/bloqade/analog/ir/routine/quera.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/ir/routine/quera.py#L270-L271

Added lines #L270 - L271 were not covered by tests
f"{RemoteTask} must have a `pull` method for executing `run`."
)

batch = self.run_async(

Check warning on line 275 in src/bloqade/analog/ir/routine/quera.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/ir/routine/quera.py#L275

Added line #L275 was not covered by tests
shots, RemoteTask, args, name, use_experimental, shuffle, **kwargs
)
batch.pull()
return batch

Check warning on line 279 in src/bloqade/analog/ir/routine/quera.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/ir/routine/quera.py#L278-L279

Added lines #L278 - L279 were not covered by tests

@beartype
def __call__(
self,
*args: LiteralType,
RemoteTask: type[RemoteTaskType] | None = None,
shots: int = 1,
name: Optional[str] = None,
use_experimental: bool = False,
shuffle: bool = False,
**kwargs,
) -> RemoteBatch:
if RemoteTask is None:
raise ValueError("RemoteTask must be provided for custom submission.")

Check warning on line 293 in src/bloqade/analog/ir/routine/quera.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/ir/routine/quera.py#L292-L293

Added lines #L292 - L293 were not covered by tests

return self.run(

Check warning on line 295 in src/bloqade/analog/ir/routine/quera.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/ir/routine/quera.py#L295

Added line #L295 was not covered by tests
shots, RemoteTask, args, name, use_experimental, shuffle, **kwargs
)


@dataclass(frozen=True, config=__pydantic_dataclass_config__)
class QuEraHardwareRoutine(RoutineBase):
Expand Down
74 changes: 74 additions & 0 deletions src/bloqade/analog/task/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import abc
import datetime
from typing import Any
from numbers import Number
Expand All @@ -12,11 +13,13 @@

from bloqade.analog.serialize import Serializer
from bloqade.analog.visualization import display_report
from bloqade.analog.builder.typing import ParamType
from bloqade.analog.submission.ir.parallel import ParallelDecoder
from bloqade.analog.submission.ir.task_results import (
QuEraTaskResults,
QuEraTaskStatusCode,
)
from bloqade.analog.submission.ir.task_specification import QuEraTaskSpecification


@Serializer.register
Expand Down Expand Up @@ -97,6 +100,77 @@
raise NotImplementedError


class CustomRemoteTaskABC(abc.ABC):

@classmethod
@abc.abstractmethod
def from_compile_results(
cls,
task_ir: QuEraTaskSpecification,
metadata: Dict[str, ParamType],
parallel_decoder: Optional[ParallelDecoder],
): ...

@property
@abc.abstractmethod
def geometry(self) -> Geometry: ...

@property
@abc.abstractmethod
def parallel_decoder(self) -> ParallelDecoder: ...

@property
@abc.abstractmethod
def metadata(self) -> Dict[str, ParamType]: ...

@property
def task_result_ir(self) -> QuEraTaskResults | None:
if not hasattr(self, "_task_result_ir"):
self._task_result_ir = QuEraTaskResults(

Check warning on line 129 in src/bloqade/analog/task/base.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/base.py#L128-L129

Added lines #L128 - L129 were not covered by tests
task_status=QuEraTaskStatusCode.Unaccepted
)

if self._result_exists():
self._task_result_ir = self.result()

Check warning on line 134 in src/bloqade/analog/task/base.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/base.py#L133-L134

Added lines #L133 - L134 were not covered by tests

return self._task_result_ir

Check warning on line 136 in src/bloqade/analog/task/base.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/base.py#L136

Added line #L136 was not covered by tests

@task_result_ir.setter
def set_task_result(self, task_result):
self._task_result_ir = task_result

Check warning on line 140 in src/bloqade/analog/task/base.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/base.py#L140

Added line #L140 was not covered by tests

@property
@abc.abstractmethod
def task_id(self) -> str:
pass

Check warning on line 145 in src/bloqade/analog/task/base.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/base.py#L145

Added line #L145 was not covered by tests

@property
@abc.abstractmethod
def task_ir(self) -> QuEraTaskSpecification: ...

@abc.abstractmethod
def result(self) -> QuEraTaskResults: ...

@abc.abstractmethod
def _result_exists(self) -> bool: ...

@abc.abstractmethod
def fetch(self) -> None: ...

def status(self) -> QuEraTaskStatusCode:
if self._result_exists():
return self.result().task_status

Check warning on line 162 in src/bloqade/analog/task/base.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/base.py#L161-L162

Added lines #L161 - L162 were not covered by tests
else:
raise RuntimeError("Result does not exist yet.")

Check warning on line 164 in src/bloqade/analog/task/base.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/base.py#L164

Added line #L164 was not covered by tests

@abc.abstractmethod
def _submit(self): ...

def submit(self, force: bool = False):
if not self._result_exists() or force:
self._submit()

Check warning on line 171 in src/bloqade/analog/task/base.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/base.py#L170-L171

Added lines #L170 - L171 were not covered by tests


class LocalTask(Task):
"""`Task` to use for local executions for simulation purposes.."""

Expand Down
14 changes: 9 additions & 5 deletions src/bloqade/analog/task/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from beartype.typing import Any, Dict, List, Union, Optional

from bloqade.analog.serialize import Serializer
from bloqade.analog.task.base import Report
from bloqade.analog.task.base import Report, CustomRemoteTaskABC
from bloqade.analog.task.quera import QuEraTask
from bloqade.analog.task.braket import BraketTask
from bloqade.analog.builder.base import Builder
Expand Down Expand Up @@ -333,7 +333,11 @@ def _deserialize(obj: dict) -> BatchErrors:
@Serializer.register
class RemoteBatch(Serializable, Filter):
source: Builder
tasks: Union[OrderedDict[int, QuEraTask], OrderedDict[int, BraketTask]]
tasks: Union[
OrderedDict[int, QuEraTask],
OrderedDict[int, BraketTask],
OrderedDict[int, CustomRemoteTaskABC],
]
name: Optional[str] = None

class SubmissionException(Exception):
Expand Down Expand Up @@ -442,10 +446,10 @@ def tasks_metric(self) -> pd.DataFrame:
# offline, non-blocking
tid = []
data = []
for int, task in self.tasks.items():
tid.append(int)
for task_num, task in self.tasks.items():
tid.append(task_num)

dat = [None, None, None]
dat: list[int | str | None] = [None, None, None]
dat[0] = task.task_id
if task.task_result_ir is not None:
dat[1] = task.task_result_ir.task_status.name
Expand Down