From 27d0653a41e9b4c605acdb09e6a27f52e1878c9f Mon Sep 17 00:00:00 2001 From: Phillip Weinberg Date: Mon, 14 Apr 2025 16:22:55 -0400 Subject: [PATCH 01/17] Removing uneeded interface --- src/bloqade/analog/task/base.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/bloqade/analog/task/base.py b/src/bloqade/analog/task/base.py index aab4cc323..2aed8827c 100644 --- a/src/bloqade/analog/task/base.py +++ b/src/bloqade/analog/task/base.py @@ -115,10 +115,6 @@ def from_compile_results( @abc.abstractmethod def geometry(self) -> Geometry: ... - @property - @abc.abstractmethod - def parallel_decoder(self) -> ParallelDecoder: ... - @property @abc.abstractmethod def metadata(self) -> Dict[str, ParamType]: ... From b9d5db838697951be335571bb1ca042560666de5 Mon Sep 17 00:00:00 2001 From: Phillip Weinberg Date: Mon, 14 Apr 2025 16:23:51 -0400 Subject: [PATCH 02/17] WIP:exlcusive access class --- src/bloqade/analog/task/exclusive.py | 113 +++++++++++++++++++++++++++ 1 file changed, 113 insertions(+) create mode 100644 src/bloqade/analog/task/exclusive.py diff --git a/src/bloqade/analog/task/exclusive.py b/src/bloqade/analog/task/exclusive.py new file mode 100644 index 000000000..da1e55b03 --- /dev/null +++ b/src/bloqade/analog/task/exclusive.py @@ -0,0 +1,113 @@ +import abc +import os +import uuid + +from beartype.typing import Dict + +from bloqade.analog.task.base import CustomRemoteTaskABC +from bloqade.analog.builder.typing import ParamType +from bloqade.analog.submission.ir.parallel import ParallelDecoder +from bloqade.analog.submission.ir.task_specification import QuEraTaskSpecification +from bloqade.analog.submission.ir.task_results import QuEraTaskResults, QuEraTaskStatusCode +from bloqade.analog.task.base import Geometry + + +class HTTPHandlerABC: + @abc.abstractmethod + def submit_task_via_zapier(task_ir: QuEraTaskSpecification, task_id: str): + """Submit a task and add task_id to the task fields for querying later. + + args: + task_ir: The task to be submitted. + task_id: The task id to be added to the task fields. + + returns + response: The response from the Zapier webhook. used for error handling + + """ + ... + + @abc.abstractmethod + def query_task_status(task_id: str): + """Query the task status from the AirTable. + + args: + task_id: The task id to be queried. + + returns + response: The response from the AirTable. used for error handling + + """ + ... + + @abc.abstractmethod + def fetch_results(task_id: str): + """Fetch the task results from the AirTable. + + args: + task_id: The task id to be queried. + + returns + response: The response from the AirTable. used for error handling + + """ + ... + +class HTTPHandler(HTTPHandlerABC): + pass + +class TestHTTPHandler(HTTPHandlerABC): + pass + + + +class ExclusiveRemoteTask(CustomRemoteTaskABC): + def __init__( + self, + task_ir: QuEraTaskSpecification, + metadata: Dict[str, ParamType], + parallel_decoder: ParallelDecoder | None, + http_handler: HTTPHandlerABC| None = None, + ): + if http_handler is None: + http_handler = HTTPHandler() + + self._task_ir = task_ir + self._metadata = metadata + self._parallel_decoder = parallel_decoder + float_sites = list(map(lambda x: (float(x[0]), float(x[1])), task_ir.lattice.sites)) + self._geometry = Geometry( + float_sites, task_ir.lattice.filling, parallel_decoder) + self._task_id = None + self._task_result_ir = None + self.air_table_url = os.environ["AIR_TABLE_URL"] + self.air_table_api_key = os.environ["AIR_TABLE_API_KEY"] + self.zapier_webhook_url = os.environ["ZAPIER_WEBHOOK_URL"] + self.zapier_webhook_key = os.environ["ZAPIER_WEBHOOK_KEY"] + + def pull(self): + """Block execution until the task is completed and fetch the results.""" + raise NotImplementedError + + + @property + def geometry(self): + return self._geometry + + @property + def task_ir(self): + return self._task_ir + + @property + def task_id(self) -> str: + assert isinstance(self._task_id, str), "Task ID is not set" + return self._task_id + + @property + def task_result_ir(self): + return self._task_result_ir + + def _submit(self): + self._task_id = str(uuid.uuid4()) + + From b2eb5171f903883a314567fe9b31f6a7c2691792 Mon Sep 17 00:00:00 2001 From: Phillip Weinberg Date: Mon, 14 Apr 2025 16:28:32 -0400 Subject: [PATCH 03/17] WIP: saving results --- src/bloqade/analog/task/exclusive.py | 59 +++++++++++++++------------- 1 file changed, 31 insertions(+), 28 deletions(-) diff --git a/src/bloqade/analog/task/exclusive.py b/src/bloqade/analog/task/exclusive.py index da1e55b03..42875f665 100644 --- a/src/bloqade/analog/task/exclusive.py +++ b/src/bloqade/analog/task/exclusive.py @@ -1,83 +1,89 @@ -import abc import os +import abc import uuid from beartype.typing import Dict -from bloqade.analog.task.base import CustomRemoteTaskABC +from bloqade.analog.task.base import Geometry, CustomRemoteTaskABC from bloqade.analog.builder.typing import ParamType from bloqade.analog.submission.ir.parallel import ParallelDecoder +from bloqade.analog.submission.ir.task_results import ( + QuEraTaskResults, + QuEraTaskStatusCode, +) from bloqade.analog.submission.ir.task_specification import QuEraTaskSpecification -from bloqade.analog.submission.ir.task_results import QuEraTaskResults, QuEraTaskStatusCode -from bloqade.analog.task.base import Geometry class HTTPHandlerABC: @abc.abstractmethod def submit_task_via_zapier(task_ir: QuEraTaskSpecification, task_id: str): """Submit a task and add task_id to the task fields for querying later. - + args: task_ir: The task to be submitted. task_id: The task id to be added to the task fields. - - returns + + returns response: The response from the Zapier webhook. used for error handling - + """ ... - + @abc.abstractmethod def query_task_status(task_id: str): """Query the task status from the AirTable. - + args: task_id: The task id to be queried. - - returns + + returns response: The response from the AirTable. used for error handling - + """ ... - + @abc.abstractmethod def fetch_results(task_id: str): """Fetch the task results from the AirTable. - + args: task_id: The task id to be queried. - - returns + + returns response: The response from the AirTable. used for error handling - + """ ... + class HTTPHandler(HTTPHandlerABC): pass + class TestHTTPHandler(HTTPHandlerABC): pass - class ExclusiveRemoteTask(CustomRemoteTaskABC): def __init__( self, task_ir: QuEraTaskSpecification, metadata: Dict[str, ParamType], parallel_decoder: ParallelDecoder | None, - http_handler: HTTPHandlerABC| None = None, + http_handler: HTTPHandlerABC | None = None, ): if http_handler is None: http_handler = HTTPHandler() - + self._task_ir = task_ir self._metadata = metadata self._parallel_decoder = parallel_decoder - float_sites = list(map(lambda x: (float(x[0]), float(x[1])), task_ir.lattice.sites)) + float_sites = list( + map(lambda x: (float(x[0]), float(x[1])), task_ir.lattice.sites) + ) self._geometry = Geometry( - float_sites, task_ir.lattice.filling, parallel_decoder) + float_sites, task_ir.lattice.filling, parallel_decoder + ) self._task_id = None self._task_result_ir = None self.air_table_url = os.environ["AIR_TABLE_URL"] @@ -87,13 +93,12 @@ def __init__( def pull(self): """Block execution until the task is completed and fetch the results.""" - raise NotImplementedError - + raise NotImplementedError @property def geometry(self): return self._geometry - + @property def task_ir(self): return self._task_ir @@ -109,5 +114,3 @@ def task_result_ir(self): def _submit(self): self._task_id = str(uuid.uuid4()) - - From 9b9e20be71c9293121326ab9cbe7350836957cb5 Mon Sep 17 00:00:00 2001 From: Jing Chen Date: Thu, 24 Apr 2025 07:14:36 -0400 Subject: [PATCH 04/17] exclusive access v1 --- src/bloqade/analog/task/batch.py | 3 +- src/bloqade/analog/task/exclusive.py | 259 +++++++++++++++++++++++++-- 2 files changed, 249 insertions(+), 13 deletions(-) diff --git a/src/bloqade/analog/task/batch.py b/src/bloqade/analog/task/batch.py index d85dcd2d6..c0bb25589 100644 --- a/src/bloqade/analog/task/batch.py +++ b/src/bloqade/analog/task/batch.py @@ -553,7 +553,8 @@ def _submit( # saving ? save(errors, error_file) - save(self, future_file) + # TODO DEBUG uncomment this line once serialize and unserialize is defined + # save(self, future_file) if ignore_submission_error: warnings.warn( diff --git a/src/bloqade/analog/task/exclusive.py b/src/bloqade/analog/task/exclusive.py index 42875f665..ecb93cb7f 100644 --- a/src/bloqade/analog/task/exclusive.py +++ b/src/bloqade/analog/task/exclusive.py @@ -1,6 +1,8 @@ import os import abc import uuid +import json +import re from beartype.typing import Dict @@ -12,6 +14,7 @@ QuEraTaskStatusCode, ) from bloqade.analog.submission.ir.task_specification import QuEraTaskSpecification +from requests import Response, request, get class HTTPHandlerABC: @@ -53,11 +56,118 @@ def fetch_results(task_id: str): response: The response from the AirTable. used for error handling """ + ... +def convert_preview_to_download(preview_url): + # help function to convert the googledrive preview URL to download URL + # Only used in http handler + match = re.search(r"/d/([^/]+)/", preview_url) + if not match: + raise ValueError("Invalid preview URL format") + file_id = match.group(1) + return f"https://drive.usercontent.google.com/download?id={file_id}&export=download" + + class HTTPHandler(HTTPHandlerABC): - pass + def __init__(self, zapier_webhook_url: str, zapier_webhook_key: str, vercel_api_url: str): + self.zapier_webhook_url = zapier_webhook_url + self.zapier_webhook_key = zapier_webhook_key + self.verrcel_api_url = vercel_api_url + + def submit_task_via_zapier(self, task_ir: QuEraTaskSpecification, task_id: str, task_note: str): + # implement http request logic to submit task via Zapier + request_options = dict( + params={"key": self.zapier_webhook_key, "note": task_id}) + + # for metadata, task_ir in self._compile_single(shots, use_experimental, args): + json_request_body = task_ir.json(exclude_none=True, exclude_unset=True) + + request_options.update(data=json_request_body) + response = request("POST", self.zapier_webhook_url, **request_options) + + if response.status_code == 200: + response_data = response.json() + submit_status = response_data.get("status", None) + return submit_status + else: + print( + f"HTTP request failed with status code: {response.status_code}") + print("HTTP responce: ", response.text) + return "Failed" + + def query_task_status(self, task_id: str): + response = request( + "GET", + self.verrcel_api_url, + params={ + "searchPattern": task_id, + "magicToken": self.zapier_webhook_key, + "useRegex": False, + }, + ) + if response.status_code != 200: + return "Not Found" + response_data = response.json() + # Get "matched" from the response + matches = response_data.get("matches", None) + # The return is a list of dictionaries + # Verify if the list contains only one element + if matches is None: + print("No task found with the given ID.") + return "Failed" + elif len(matches) > 1: + print("Multiple tasks found with the given ID.") + return "Failed" + + # Extract the status from the first dictionary + return matches[0].get("status", None) + + def fetch_results(self, task_id: str): + response = request( + "GET", + self.verrcel_api_url, + params={ + "searchPattern": task_id, + "magicToken": self.zapier_webhook_key, + "useRegex": False, + }, + ) + if response.status_code != 200: + print( + f"HTTP request failed with status code: {response.status_code}") + print("HTTP responce: ", response.text) + return None + + response_data = response.json() + # Get "matched" from the response + matches = response_data.get("matches", None) + # The return is a list of dictionaries + # Verify if the list contains only one element + if matches is None: + print("No task found with the given ID.") + return None + elif len(matches) > 1: + print("Multiple tasks found with the given ID.") + return None + record = matches[0] + print("The record = ", record.get("status")) + if record.get("status") == "Completed": + # test_google_doc = "https://drive.usercontent.google.com/download?id=1hvUlzzpIpl5FIsXDjetGeQQKoYW9LrAn&export=download&authuser=0" + googledoc = record.get("resultsFileUrl") + + # convert the preview URL to download URL + googledoc = convert_preview_to_download( + googledoc) + print("googledoc = ", googledoc) + res = get(googledoc) + res.raise_for_status() + print("res = ", res.text) + data = res.json() + + task_results = QuEraTaskResults(**data) + return task_results class TestHTTPHandler(HTTPHandlerABC): @@ -72,9 +182,12 @@ def __init__( parallel_decoder: ParallelDecoder | None, http_handler: HTTPHandlerABC | None = None, ): - if http_handler is None: - http_handler = HTTPHandler() - + self.zapier_webhook_url = os.environ["ZAPIER_WEBHOOK_URL"] + self.zapier_webhook_key = os.environ["ZAPIER_WEBHOOK_KEY"] + self.vercel_api_url = os.environ["VERCEL_API_URL"] + self._http_handler = http_handler or HTTPHandler(zapier_webhook_url=self.zapier_webhook_url, + zapier_webhook_key=self.zapier_webhook_key, + vercel_api_url=self.vercel_api_url) self._task_ir = task_ir self._metadata = metadata self._parallel_decoder = parallel_decoder @@ -86,14 +199,96 @@ def __init__( ) self._task_id = None self._task_result_ir = None - self.air_table_url = os.environ["AIR_TABLE_URL"] - self.air_table_api_key = os.environ["AIR_TABLE_API_KEY"] - self.zapier_webhook_url = os.environ["ZAPIER_WEBHOOK_URL"] - self.zapier_webhook_key = os.environ["ZAPIER_WEBHOOK_KEY"] + # self.air_table_url = os.environ["AIR_TABLE_URL"] + # self.air_table_api_key = os.environ["AIR_TABLE_API_KEY"] + + @classmethod + def from_compile_results(cls, task_ir, metadata, parallel_decoder): + return cls( + task_ir=task_ir, + metadata=metadata, + parallel_decoder=parallel_decoder, + ) + + def _submit(self, force: bool = False) -> "ExclusiveRemoteTask": + if not force: + if self._task_id is not None: + raise ValueError( + "the task is already submitted with %s" % (self._task_id) + ) + self._task_id = str(uuid.uuid4()) + if self._http_handler.submit_task_via_zapier(self._task_ir, self._task_id, None) == "success": + self._task_result_ir = QuEraTaskResults( + task_status=QuEraTaskStatusCode.Accepted) + else: + self._task_result_ir = QuEraTaskResults( + task_status=QuEraTaskStatusCode.Failed) + print(self.task_result_ir) + return self + + def fetch(self): + if self.task_result_ir.task_status is QuEraTaskStatusCode.Unsubmitted: + raise ValueError("Task ID not found.") + + if self.task_result_ir.task_status in [ + QuEraTaskStatusCode.Completed, + QuEraTaskStatusCode.Partial, + QuEraTaskStatusCode.Failed, + QuEraTaskStatusCode.Unaccepted, + QuEraTaskStatusCode.Cancelled, + ]: + return self + + status = self.status() + print("status = ", status) + if status in [QuEraTaskStatusCode.Completed, QuEraTaskStatusCode.Partial]: + self.task_result_ir = self._http_handler.fetch_results( + self.task_id) + else: + self.task_result_ir = QuEraTaskResults(task_status=status) + + return self def pull(self): - """Block execution until the task is completed and fetch the results.""" - raise NotImplementedError + # Please avoid use this method, it's blocking and the wating time is hours long + pass + + def cancel(self): + pass + + def validate(self) -> str: + pass + + def status(self) -> QuEraTaskStatusCode: + if self._task_id is None: + return QuEraTaskStatusCode.Unsubmitted + print("status: self._task_id = ", self._task_id) + res = self._http_handler.query_task_status(self._task_id) + if res != "Not Found": + return res + elif res == "Failed": + # through an error + raise ValueError("Query task status failed.") + else: + return self.task_result_ir.task_status + + def _result_exists(self): + if self.task_result_ir is None: + return False + else: + if self.task_result_ir.task_status == QuEraTaskStatusCode.Completed: + return True + else: + return False + + def result(self): + if self._task_result_ir is None: + raise ValueError("Task result not found.") + return self._task_result_ir + + @property + def metadata(self): + return self._metadata @property def geometry(self): @@ -112,5 +307,45 @@ def task_id(self) -> str: def task_result_ir(self): return self._task_result_ir - def _submit(self): - self._task_id = str(uuid.uuid4()) + @property + def parallel_decoder(self): + return self._parallel_decoder + + @task_result_ir.setter + def task_result_ir(self, task_result_ir: QuEraTaskResults): + self._task_result_ir = task_result_ir + + +# @ExclusiveRemoteTask.set_serializer +# def _serialze(obj: ExclusiveRemoteTask) -> Dict[str, ParamType]: +# # TODO: Not tested, once it's done, resolve the DEBUG flag +# return { +# "task_id": obj.task_id or None, +# "task_ir": obj.task_ir.dict(by_alias=True, exclude_none=True), +# "metadata": obj.metadata, +# "zapier_webhook_url": obj.zapier_webhook_url, +# "zapier_webhook_key": obj.zapier_webhook_key, +# "vercel_api_url": obj.vercel_api_url, +# "parallel_decoder": ( +# obj.parallel_decoder.dict() or None +# ), +# "geometry": obj.geometry.dict() or None, +# "task_result_ir": obj.task_result_ir.dict() if obj.task_result_ir else None, +# } + + +# @ExclusiveRemoteTask.set_deserializer +# def _deserializer(d: Dict[str, Any]) -> ExclusiveRemoteTask: +# # TODO: Not tested, once it's done, resolve the DEBUG flag +# d["task_ir"] = QuEraTaskSpecification(**d["task_ir"]) +# d["parallel_decoder"] = ( +# ParallelDecoder(**d["parallel_decoder"] +# ) if d["parallel_decoder"] else None +# ) +# d["http_handler"] = HTTPHandler( +# zapier_webhook_url=d["zapier_webhook_url"], +# zapier_webhook_key=d["zapier_webhook_key"], +# vercel_api_url=d["vercel_api_url"], +# ) + +# return ExclusiveRemoteTask(**d) From 590c517d6d570a730390d834a090e80fef69f673 Mon Sep 17 00:00:00 2001 From: Jing Chen Date: Thu, 24 Apr 2025 07:37:37 -0400 Subject: [PATCH 05/17] remove prints for debug --- src/bloqade/analog/task/exclusive.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/bloqade/analog/task/exclusive.py b/src/bloqade/analog/task/exclusive.py index ecb93cb7f..d7af38393 100644 --- a/src/bloqade/analog/task/exclusive.py +++ b/src/bloqade/analog/task/exclusive.py @@ -152,7 +152,6 @@ def fetch_results(self, task_id: str): print("Multiple tasks found with the given ID.") return None record = matches[0] - print("The record = ", record.get("status")) if record.get("status") == "Completed": # test_google_doc = "https://drive.usercontent.google.com/download?id=1hvUlzzpIpl5FIsXDjetGeQQKoYW9LrAn&export=download&authuser=0" googledoc = record.get("resultsFileUrl") @@ -160,10 +159,8 @@ def fetch_results(self, task_id: str): # convert the preview URL to download URL googledoc = convert_preview_to_download( googledoc) - print("googledoc = ", googledoc) res = get(googledoc) res.raise_for_status() - print("res = ", res.text) data = res.json() task_results = QuEraTaskResults(**data) @@ -240,7 +237,6 @@ def fetch(self): return self status = self.status() - print("status = ", status) if status in [QuEraTaskStatusCode.Completed, QuEraTaskStatusCode.Partial]: self.task_result_ir = self._http_handler.fetch_results( self.task_id) From b56194be42a4ba7925ce5db3ade4c7068c5df228 Mon Sep 17 00:00:00 2001 From: Jing Chen Date: Thu, 24 Apr 2025 07:46:39 -0400 Subject: [PATCH 06/17] fix: status return code --- src/bloqade/analog/task/exclusive.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/bloqade/analog/task/exclusive.py b/src/bloqade/analog/task/exclusive.py index d7af38393..647be5aa2 100644 --- a/src/bloqade/analog/task/exclusive.py +++ b/src/bloqade/analog/task/exclusive.py @@ -122,7 +122,9 @@ def query_task_status(self, task_id: str): return "Failed" # Extract the status from the first dictionary - return matches[0].get("status", None) + status = matches[0].get("status") + return QuEraTaskStatusCode(status) + def fetch_results(self, task_id: str): response = request( From 7dcb7af1c39d8f759a8db1a7e6c033b331237120 Mon Sep 17 00:00:00 2001 From: Jing Chen Date: Thu, 24 Apr 2025 08:06:43 -0400 Subject: [PATCH 07/17] status code "Submitted" -> Enqueued --- src/bloqade/analog/task/exclusive.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/bloqade/analog/task/exclusive.py b/src/bloqade/analog/task/exclusive.py index 647be5aa2..2c33dd651 100644 --- a/src/bloqade/analog/task/exclusive.py +++ b/src/bloqade/analog/task/exclusive.py @@ -123,7 +123,7 @@ def query_task_status(self, task_id: str): # Extract the status from the first dictionary status = matches[0].get("status") - return QuEraTaskStatusCode(status) + return status def fetch_results(self, task_id: str): @@ -267,6 +267,14 @@ def status(self) -> QuEraTaskStatusCode: elif res == "Failed": # through an error raise ValueError("Query task status failed.") + elif res == "Submitted": + return QuEraTaskStatusCode.Enqueued + # TODO: please add all possible status + elif res == "Completed": + return QuEraTaskStatusCode.Completed + elif res == "Running": + # Not covered by test + return QuEraTaskStatusCode.Executing else: return self.task_result_ir.task_status From aaa575decd0e01da5ca264a63594b35978282826 Mon Sep 17 00:00:00 2001 From: Jing Chen Date: Mon, 28 Apr 2025 16:26:52 -0400 Subject: [PATCH 08/17] refactor: enable saving and serialization and ExclusiveRemoteTask --- src/bloqade/analog/task/batch.py | 3 +- src/bloqade/analog/task/exclusive.py | 110 +++++++++++++-------------- 2 files changed, 53 insertions(+), 60 deletions(-) diff --git a/src/bloqade/analog/task/batch.py b/src/bloqade/analog/task/batch.py index c0bb25589..d85dcd2d6 100644 --- a/src/bloqade/analog/task/batch.py +++ b/src/bloqade/analog/task/batch.py @@ -553,8 +553,7 @@ def _submit( # saving ? save(errors, error_file) - # TODO DEBUG uncomment this line once serialize and unserialize is defined - # save(self, future_file) + save(self, future_file) if ignore_submission_error: warnings.warn( diff --git a/src/bloqade/analog/task/exclusive.py b/src/bloqade/analog/task/exclusive.py index 2c33dd651..d97516e0a 100644 --- a/src/bloqade/analog/task/exclusive.py +++ b/src/bloqade/analog/task/exclusive.py @@ -15,6 +15,9 @@ ) from bloqade.analog.submission.ir.task_specification import QuEraTaskSpecification from requests import Response, request, get +from bloqade.analog.serialize import Serializer +from bloqade.analog.builder.base import ParamType + class HTTPHandlerABC: @@ -71,10 +74,12 @@ def convert_preview_to_download(preview_url): class HTTPHandler(HTTPHandlerABC): - def __init__(self, zapier_webhook_url: str, zapier_webhook_key: str, vercel_api_url: str): - self.zapier_webhook_url = zapier_webhook_url - self.zapier_webhook_key = zapier_webhook_key - self.verrcel_api_url = vercel_api_url + def __init__(self, zapier_webhook_url: str = None, + zapier_webhook_key: str = None, + vercel_api_url: str = None): + self.zapier_webhook_url = zapier_webhook_url or os.environ["ZAPIER_WEBHOOK_URL"] + self.zapier_webhook_key = zapier_webhook_key or os.environ["ZAPIER_WEBHOOK_KEY"] + self.verrcel_api_url = vercel_api_url or os.environ["VERCEL_API_URL"] def submit_task_via_zapier(self, task_ir: QuEraTaskSpecification, task_id: str, task_note: str): # implement http request logic to submit task via Zapier @@ -155,7 +160,6 @@ def fetch_results(self, task_id: str): return None record = matches[0] if record.get("status") == "Completed": - # test_google_doc = "https://drive.usercontent.google.com/download?id=1hvUlzzpIpl5FIsXDjetGeQQKoYW9LrAn&export=download&authuser=0" googledoc = record.get("resultsFileUrl") # convert the preview URL to download URL @@ -172,21 +176,18 @@ def fetch_results(self, task_id: str): class TestHTTPHandler(HTTPHandlerABC): pass - +@Serializer.register class ExclusiveRemoteTask(CustomRemoteTaskABC): def __init__( self, task_ir: QuEraTaskSpecification, metadata: Dict[str, ParamType], parallel_decoder: ParallelDecoder | None, - http_handler: HTTPHandlerABC | None = None, + http_handler: HTTPHandlerABC | None = HTTPHandler(), + task_id: str = None, + task_result_ir: QuEraTaskResults = None, ): - self.zapier_webhook_url = os.environ["ZAPIER_WEBHOOK_URL"] - self.zapier_webhook_key = os.environ["ZAPIER_WEBHOOK_KEY"] - self.vercel_api_url = os.environ["VERCEL_API_URL"] - self._http_handler = http_handler or HTTPHandler(zapier_webhook_url=self.zapier_webhook_url, - zapier_webhook_key=self.zapier_webhook_key, - vercel_api_url=self.vercel_api_url) + self._http_handler = http_handler self._task_ir = task_ir self._metadata = metadata self._parallel_decoder = parallel_decoder @@ -196,10 +197,8 @@ def __init__( self._geometry = Geometry( float_sites, task_ir.lattice.filling, parallel_decoder ) - self._task_id = None - self._task_result_ir = None - # self.air_table_url = os.environ["AIR_TABLE_URL"] - # self.air_table_api_key = os.environ["AIR_TABLE_API_KEY"] + self._task_id = task_id + self._task_result_ir = task_result_ir @classmethod def from_compile_results(cls, task_ir, metadata, parallel_decoder): @@ -248,20 +247,23 @@ def fetch(self): return self def pull(self): - # Please avoid use this method, it's blocking and the wating time is hours long - pass - + # Please avoid using this method, it's blocking and the waiting time is hours long + # Throw an error saying this is not supported + raise NotImplementedError( + "Pulling is not supported. Please use fetch() instead." + ) + def cancel(self): - pass - - def validate(self) -> str: - pass + # This is not supported + raise NotImplementedError( + "Cancelling is not supported." + ) def status(self) -> QuEraTaskStatusCode: if self._task_id is None: return QuEraTaskStatusCode.Unsubmitted - print("status: self._task_id = ", self._task_id) res = self._http_handler.query_task_status(self._task_id) + print("Query task status: ", res) if res != "Not Found": return res elif res == "Failed": @@ -322,36 +324,28 @@ def task_result_ir(self, task_result_ir: QuEraTaskResults): self._task_result_ir = task_result_ir -# @ExclusiveRemoteTask.set_serializer -# def _serialze(obj: ExclusiveRemoteTask) -> Dict[str, ParamType]: -# # TODO: Not tested, once it's done, resolve the DEBUG flag -# return { -# "task_id": obj.task_id or None, -# "task_ir": obj.task_ir.dict(by_alias=True, exclude_none=True), -# "metadata": obj.metadata, -# "zapier_webhook_url": obj.zapier_webhook_url, -# "zapier_webhook_key": obj.zapier_webhook_key, -# "vercel_api_url": obj.vercel_api_url, -# "parallel_decoder": ( -# obj.parallel_decoder.dict() or None -# ), -# "geometry": obj.geometry.dict() or None, -# "task_result_ir": obj.task_result_ir.dict() if obj.task_result_ir else None, -# } - - -# @ExclusiveRemoteTask.set_deserializer -# def _deserializer(d: Dict[str, Any]) -> ExclusiveRemoteTask: -# # TODO: Not tested, once it's done, resolve the DEBUG flag -# d["task_ir"] = QuEraTaskSpecification(**d["task_ir"]) -# d["parallel_decoder"] = ( -# ParallelDecoder(**d["parallel_decoder"] -# ) if d["parallel_decoder"] else None -# ) -# d["http_handler"] = HTTPHandler( -# zapier_webhook_url=d["zapier_webhook_url"], -# zapier_webhook_key=d["zapier_webhook_key"], -# vercel_api_url=d["vercel_api_url"], -# ) - -# return ExclusiveRemoteTask(**d) +@ExclusiveRemoteTask.set_serializer + +def _serialze(obj: ExclusiveRemoteTask) -> Dict[str, ParamType]: + return { + "task_id": obj.task_id or None, + "task_ir": obj.task_ir.dict(by_alias=True, exclude_none=True), + "metadata": obj.metadata, + "parallel_decoder": ( + obj.parallel_decoder.dict() if obj.parallel_decoder else None + ), + "geometry": obj.geometry, + "task_result_ir": obj.task_result_ir.dict() if obj.task_result_ir else None, + } + + +@ExclusiveRemoteTask.set_deserializer +def _deserializer(d: Dict[str, any]) -> ExclusiveRemoteTask: + # TODO: Not tested, once it's done, resolve the DEBUG flag + d["task_ir"] = QuEraTaskSpecification(**d["task_ir"]) + d["parallel_decoder"] = ( + ParallelDecoder(**d["parallel_decoder"] + ) if d["parallel_decoder"] else None + ) + return ExclusiveRemoteTask(**d) + From a460d238df318545ef34904983ba45cb7a5e74de Mon Sep 17 00:00:00 2001 From: Jing Chen Date: Mon, 28 Apr 2025 16:40:32 -0400 Subject: [PATCH 09/17] refactor: comment out debug print statement in ExclusiveRemoteTask --- src/bloqade/analog/task/exclusive.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bloqade/analog/task/exclusive.py b/src/bloqade/analog/task/exclusive.py index d97516e0a..5921d18e6 100644 --- a/src/bloqade/analog/task/exclusive.py +++ b/src/bloqade/analog/task/exclusive.py @@ -263,7 +263,7 @@ def status(self) -> QuEraTaskStatusCode: if self._task_id is None: return QuEraTaskStatusCode.Unsubmitted res = self._http_handler.query_task_status(self._task_id) - print("Query task status: ", res) + #print("Query task status: ", res) if res != "Not Found": return res elif res == "Failed": From 74c53fa139da3e55a2533a8dd29ffcc0daac601d Mon Sep 17 00:00:00 2001 From: Jing Chen Date: Tue, 29 Apr 2025 09:45:20 -0400 Subject: [PATCH 10/17] refactor: streamline task status query logic and remove unused geometry field --- src/bloqade/analog/task/exclusive.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/bloqade/analog/task/exclusive.py b/src/bloqade/analog/task/exclusive.py index 5921d18e6..0acd160fb 100644 --- a/src/bloqade/analog/task/exclusive.py +++ b/src/bloqade/analog/task/exclusive.py @@ -264,10 +264,7 @@ def status(self) -> QuEraTaskStatusCode: return QuEraTaskStatusCode.Unsubmitted res = self._http_handler.query_task_status(self._task_id) #print("Query task status: ", res) - if res != "Not Found": - return res - elif res == "Failed": - # through an error + if res == "Failed": raise ValueError("Query task status failed.") elif res == "Submitted": return QuEraTaskStatusCode.Enqueued @@ -334,7 +331,6 @@ def _serialze(obj: ExclusiveRemoteTask) -> Dict[str, ParamType]: "parallel_decoder": ( obj.parallel_decoder.dict() if obj.parallel_decoder else None ), - "geometry": obj.geometry, "task_result_ir": obj.task_result_ir.dict() if obj.task_result_ir else None, } From 2927372f8bb387fe7cc89a02fafbd042f0f0a05d Mon Sep 17 00:00:00 2001 From: Jing Chen Date: Wed, 30 Apr 2025 17:42:55 -0400 Subject: [PATCH 11/17] use dataclass for ExclusiveCustomRemoteTask --- src/bloqade/analog/task/base.py | 4 ++ src/bloqade/analog/task/exclusive.py | 72 ++++++++++++++-------------- 2 files changed, 41 insertions(+), 35 deletions(-) diff --git a/src/bloqade/analog/task/base.py b/src/bloqade/analog/task/base.py index 2aed8827c..aab4cc323 100644 --- a/src/bloqade/analog/task/base.py +++ b/src/bloqade/analog/task/base.py @@ -115,6 +115,10 @@ def from_compile_results( @abc.abstractmethod def geometry(self) -> Geometry: ... + @property + @abc.abstractmethod + def parallel_decoder(self) -> ParallelDecoder: ... + @property @abc.abstractmethod def metadata(self) -> Dict[str, ParamType]: ... diff --git a/src/bloqade/analog/task/exclusive.py b/src/bloqade/analog/task/exclusive.py index 0acd160fb..bc236d58c 100644 --- a/src/bloqade/analog/task/exclusive.py +++ b/src/bloqade/analog/task/exclusive.py @@ -5,6 +5,7 @@ import re from beartype.typing import Dict +from dataclasses import dataclass, field from bloqade.analog.task.base import Geometry, CustomRemoteTaskABC from bloqade.analog.builder.typing import ParamType @@ -176,36 +177,31 @@ def fetch_results(self, task_id: str): class TestHTTPHandler(HTTPHandlerABC): pass +@dataclass @Serializer.register class ExclusiveRemoteTask(CustomRemoteTaskABC): - def __init__( - self, - task_ir: QuEraTaskSpecification, - metadata: Dict[str, ParamType], - parallel_decoder: ParallelDecoder | None, - http_handler: HTTPHandlerABC | None = HTTPHandler(), - task_id: str = None, - task_result_ir: QuEraTaskResults = None, - ): - self._http_handler = http_handler - self._task_ir = task_ir - self._metadata = metadata - self._parallel_decoder = parallel_decoder + _task_ir: QuEraTaskSpecification | None + _metadata: Dict[str, ParamType] + _parallel_decoder: ParallelDecoder | None + _http_handler: HTTPHandlerABC = field(default_factory=HTTPHandler) + _task_id: str | None = None + _task_result_ir: QuEraTaskResults | None = None + + def __post_init__(self): float_sites = list( - map(lambda x: (float(x[0]), float(x[1])), task_ir.lattice.sites) + map(lambda x: (float(x[0]), float(x[1])), self._task_ir.lattice.sites) ) self._geometry = Geometry( - float_sites, task_ir.lattice.filling, parallel_decoder + float_sites, self._task_ir.lattice.filling, self._parallel_decoder ) - self._task_id = task_id - self._task_result_ir = task_result_ir + @classmethod def from_compile_results(cls, task_ir, metadata, parallel_decoder): return cls( - task_ir=task_ir, - metadata=metadata, - parallel_decoder=parallel_decoder, + _task_ir=task_ir, + _metadata=metadata, + _parallel_decoder=parallel_decoder, ) def _submit(self, force: bool = False) -> "ExclusiveRemoteTask": @@ -215,20 +211,20 @@ def _submit(self, force: bool = False) -> "ExclusiveRemoteTask": "the task is already submitted with %s" % (self._task_id) ) self._task_id = str(uuid.uuid4()) + if self._http_handler.submit_task_via_zapier(self._task_ir, self._task_id, None) == "success": self._task_result_ir = QuEraTaskResults( task_status=QuEraTaskStatusCode.Accepted) else: self._task_result_ir = QuEraTaskResults( task_status=QuEraTaskStatusCode.Failed) - print(self.task_result_ir) return self def fetch(self): - if self.task_result_ir.task_status is QuEraTaskStatusCode.Unsubmitted: + if self._task_result_ir.task_status is QuEraTaskStatusCode.Unsubmitted: raise ValueError("Task ID not found.") - if self.task_result_ir.task_status in [ + if self._task_result_ir.task_status in [ QuEraTaskStatusCode.Completed, QuEraTaskStatusCode.Partial, QuEraTaskStatusCode.Failed, @@ -239,10 +235,10 @@ def fetch(self): status = self.status() if status in [QuEraTaskStatusCode.Completed, QuEraTaskStatusCode.Partial]: - self.task_result_ir = self._http_handler.fetch_results( - self.task_id) + self._task_result_ir = self._http_handler.fetch_results( + self._task_id) else: - self.task_result_ir = QuEraTaskResults(task_status=status) + self._task_result_ir = QuEraTaskResults(task_status=status) return self @@ -275,13 +271,13 @@ def status(self) -> QuEraTaskStatusCode: # Not covered by test return QuEraTaskStatusCode.Executing else: - return self.task_result_ir.task_status + return self._task_result_ir.task_status def _result_exists(self): - if self.task_result_ir is None: + if self._task_result_ir is None: return False else: - if self.task_result_ir.task_status == QuEraTaskStatusCode.Completed: + if self._task_result_ir.task_status == QuEraTaskStatusCode.Completed: return True else: return False @@ -322,7 +318,6 @@ def task_result_ir(self, task_result_ir: QuEraTaskResults): @ExclusiveRemoteTask.set_serializer - def _serialze(obj: ExclusiveRemoteTask) -> Dict[str, ParamType]: return { "task_id": obj.task_id or None, @@ -337,11 +332,18 @@ def _serialze(obj: ExclusiveRemoteTask) -> Dict[str, ParamType]: @ExclusiveRemoteTask.set_deserializer def _deserializer(d: Dict[str, any]) -> ExclusiveRemoteTask: - # TODO: Not tested, once it's done, resolve the DEBUG flag - d["task_ir"] = QuEraTaskSpecification(**d["task_ir"]) - d["parallel_decoder"] = ( + d1 = dict() + d1["_task_ir"] = QuEraTaskSpecification(**d["task_ir"]) + d1["_parallel_decoder"] = ( ParallelDecoder(**d["parallel_decoder"] - ) if d["parallel_decoder"] else None + ) if d["parallel_decoder"] else None ) + d1["_metadata"] = d["metadata"] + d1["_task_result_ir"] = ( + QuEraTaskResults(**d["task_result_ir"]) + if d["task_result_ir"] + else None ) - return ExclusiveRemoteTask(**d) + d1["_task_id"] = d["task_id"] + + return ExclusiveRemoteTask(**d1) From 1870b89897c2519e8d5f853937925c860afcf5d0 Mon Sep 17 00:00:00 2001 From: Jing Chen Date: Wed, 30 Apr 2025 17:48:07 -0400 Subject: [PATCH 12/17] refactor: use black to clean up whitespace and improve code formatting in HTTPHandler and ExclusiveRemoteTask --- src/bloqade/analog/task/exclusive.py | 63 ++++++++++++++-------------- 1 file changed, 31 insertions(+), 32 deletions(-) diff --git a/src/bloqade/analog/task/exclusive.py b/src/bloqade/analog/task/exclusive.py index bc236d58c..85eb5a817 100644 --- a/src/bloqade/analog/task/exclusive.py +++ b/src/bloqade/analog/task/exclusive.py @@ -20,7 +20,6 @@ from bloqade.analog.builder.base import ParamType - class HTTPHandlerABC: @abc.abstractmethod def submit_task_via_zapier(task_ir: QuEraTaskSpecification, task_id: str): @@ -75,17 +74,21 @@ def convert_preview_to_download(preview_url): class HTTPHandler(HTTPHandlerABC): - def __init__(self, zapier_webhook_url: str = None, - zapier_webhook_key: str = None, - vercel_api_url: str = None): + def __init__( + self, + zapier_webhook_url: str = None, + zapier_webhook_key: str = None, + vercel_api_url: str = None, + ): self.zapier_webhook_url = zapier_webhook_url or os.environ["ZAPIER_WEBHOOK_URL"] self.zapier_webhook_key = zapier_webhook_key or os.environ["ZAPIER_WEBHOOK_KEY"] self.verrcel_api_url = vercel_api_url or os.environ["VERCEL_API_URL"] - def submit_task_via_zapier(self, task_ir: QuEraTaskSpecification, task_id: str, task_note: str): + def submit_task_via_zapier( + self, task_ir: QuEraTaskSpecification, task_id: str, task_note: str + ): # implement http request logic to submit task via Zapier - request_options = dict( - params={"key": self.zapier_webhook_key, "note": task_id}) + request_options = dict(params={"key": self.zapier_webhook_key, "note": task_id}) # for metadata, task_ir in self._compile_single(shots, use_experimental, args): json_request_body = task_ir.json(exclude_none=True, exclude_unset=True) @@ -98,8 +101,7 @@ def submit_task_via_zapier(self, task_ir: QuEraTaskSpecification, task_id: str, submit_status = response_data.get("status", None) return submit_status else: - print( - f"HTTP request failed with status code: {response.status_code}") + print(f"HTTP request failed with status code: {response.status_code}") print("HTTP responce: ", response.text) return "Failed" @@ -131,7 +133,6 @@ def query_task_status(self, task_id: str): status = matches[0].get("status") return status - def fetch_results(self, task_id: str): response = request( "GET", @@ -143,8 +144,7 @@ def fetch_results(self, task_id: str): }, ) if response.status_code != 200: - print( - f"HTTP request failed with status code: {response.status_code}") + print(f"HTTP request failed with status code: {response.status_code}") print("HTTP responce: ", response.text) return None @@ -164,8 +164,7 @@ def fetch_results(self, task_id: str): googledoc = record.get("resultsFileUrl") # convert the preview URL to download URL - googledoc = convert_preview_to_download( - googledoc) + googledoc = convert_preview_to_download(googledoc) res = get(googledoc) res.raise_for_status() data = res.json() @@ -177,6 +176,7 @@ def fetch_results(self, task_id: str): class TestHTTPHandler(HTTPHandlerABC): pass + @dataclass @Serializer.register class ExclusiveRemoteTask(CustomRemoteTaskABC): @@ -186,7 +186,7 @@ class ExclusiveRemoteTask(CustomRemoteTaskABC): _http_handler: HTTPHandlerABC = field(default_factory=HTTPHandler) _task_id: str | None = None _task_result_ir: QuEraTaskResults | None = None - + def __post_init__(self): float_sites = list( map(lambda x: (float(x[0]), float(x[1])), self._task_ir.lattice.sites) @@ -195,7 +195,6 @@ def __post_init__(self): float_sites, self._task_ir.lattice.filling, self._parallel_decoder ) - @classmethod def from_compile_results(cls, task_ir, metadata, parallel_decoder): return cls( @@ -212,12 +211,19 @@ def _submit(self, force: bool = False) -> "ExclusiveRemoteTask": ) self._task_id = str(uuid.uuid4()) - if self._http_handler.submit_task_via_zapier(self._task_ir, self._task_id, None) == "success": + if ( + self._http_handler.submit_task_via_zapier( + self._task_ir, self._task_id, None + ) + == "success" + ): self._task_result_ir = QuEraTaskResults( - task_status=QuEraTaskStatusCode.Accepted) + task_status=QuEraTaskStatusCode.Accepted + ) else: self._task_result_ir = QuEraTaskResults( - task_status=QuEraTaskStatusCode.Failed) + task_status=QuEraTaskStatusCode.Failed + ) return self def fetch(self): @@ -235,8 +241,7 @@ def fetch(self): status = self.status() if status in [QuEraTaskStatusCode.Completed, QuEraTaskStatusCode.Partial]: - self._task_result_ir = self._http_handler.fetch_results( - self._task_id) + self._task_result_ir = self._http_handler.fetch_results(self._task_id) else: self._task_result_ir = QuEraTaskResults(task_status=status) @@ -248,18 +253,15 @@ def pull(self): raise NotImplementedError( "Pulling is not supported. Please use fetch() instead." ) - + def cancel(self): # This is not supported - raise NotImplementedError( - "Cancelling is not supported." - ) + raise NotImplementedError("Cancelling is not supported.") def status(self) -> QuEraTaskStatusCode: if self._task_id is None: return QuEraTaskStatusCode.Unsubmitted res = self._http_handler.query_task_status(self._task_id) - #print("Query task status: ", res) if res == "Failed": raise ValueError("Query task status failed.") elif res == "Submitted": @@ -335,15 +337,12 @@ def _deserializer(d: Dict[str, any]) -> ExclusiveRemoteTask: d1 = dict() d1["_task_ir"] = QuEraTaskSpecification(**d["task_ir"]) d1["_parallel_decoder"] = ( - ParallelDecoder(**d["parallel_decoder"] - ) if d["parallel_decoder"] else None ) + ParallelDecoder(**d["parallel_decoder"]) if d["parallel_decoder"] else None + ) d1["_metadata"] = d["metadata"] d1["_task_result_ir"] = ( - QuEraTaskResults(**d["task_result_ir"]) - if d["task_result_ir"] - else None + QuEraTaskResults(**d["task_result_ir"]) if d["task_result_ir"] else None ) d1["_task_id"] = d["task_id"] return ExclusiveRemoteTask(**d1) - From 2f98ecf6a3269d83740907c2e900e9e30b93889e Mon Sep 17 00:00:00 2001 From: Jing Chen Date: Wed, 30 Apr 2025 18:01:57 -0400 Subject: [PATCH 13/17] clean up duplicated imports --- src/bloqade/analog/task/exclusive.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/bloqade/analog/task/exclusive.py b/src/bloqade/analog/task/exclusive.py index 85eb5a817..2f7a9254c 100644 --- a/src/bloqade/analog/task/exclusive.py +++ b/src/bloqade/analog/task/exclusive.py @@ -1,7 +1,6 @@ import os import abc import uuid -import json import re from beartype.typing import Dict @@ -15,10 +14,8 @@ QuEraTaskStatusCode, ) from bloqade.analog.submission.ir.task_specification import QuEraTaskSpecification -from requests import Response, request, get +from requests import request, get from bloqade.analog.serialize import Serializer -from bloqade.analog.builder.base import ParamType - class HTTPHandlerABC: @abc.abstractmethod From 06b090c0b7ca68b9de008a819a3883493fff0ef0 Mon Sep 17 00:00:00 2001 From: Jing Chen Date: Wed, 30 Apr 2025 18:06:53 -0400 Subject: [PATCH 14/17] fix: add missing newline for improved code readability --- src/bloqade/analog/task/exclusive.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/bloqade/analog/task/exclusive.py b/src/bloqade/analog/task/exclusive.py index 2f7a9254c..91783f8d5 100644 --- a/src/bloqade/analog/task/exclusive.py +++ b/src/bloqade/analog/task/exclusive.py @@ -17,6 +17,7 @@ from requests import request, get from bloqade.analog.serialize import Serializer + class HTTPHandlerABC: @abc.abstractmethod def submit_task_via_zapier(task_ir: QuEraTaskSpecification, task_id: str): From 0a0f9116088e3adce9b086c12a2939169833f86e Mon Sep 17 00:00:00 2001 From: Jing Chen Date: Mon, 16 Jun 2025 11:38:15 -0400 Subject: [PATCH 15/17] merge --- src/bloqade/analog/task/exclusive.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/src/bloqade/analog/task/exclusive.py b/src/bloqade/analog/task/exclusive.py index 271b29de8..7bcdadee4 100644 --- a/src/bloqade/analog/task/exclusive.py +++ b/src/bloqade/analog/task/exclusive.py @@ -1,13 +1,4 @@ import os -<<<<<<< HEAD -import abc -import uuid -import re - -from beartype.typing import Dict -from dataclasses import dataclass, field - -======= import re import abc import time @@ -18,7 +9,6 @@ from beartype.typing import Dict from bloqade.analog.serialize import Serializer ->>>>>>> main from bloqade.analog.task.base import Geometry, CustomRemoteTaskABC from bloqade.analog.builder.typing import ParamType from bloqade.analog.submission.ir.parallel import ParallelDecoder @@ -27,11 +17,6 @@ QuEraTaskStatusCode, ) from bloqade.analog.submission.ir.task_specification import QuEraTaskSpecification -<<<<<<< HEAD -from requests import request, get -from bloqade.analog.serialize import Serializer -======= ->>>>>>> main class HTTPHandlerABC: From d4c2a23b0d461000fcaca4cb6f198d71a5083d5c Mon Sep 17 00:00:00 2001 From: Jing Chen Date: Sun, 22 Jun 2025 13:42:47 -0400 Subject: [PATCH 16/17] add validation error message --- src/bloqade/analog/task/exclusive.py | 31 ++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/src/bloqade/analog/task/exclusive.py b/src/bloqade/analog/task/exclusive.py index 7bcdadee4..2df017777 100644 --- a/src/bloqade/analog/task/exclusive.py +++ b/src/bloqade/analog/task/exclusive.py @@ -102,7 +102,7 @@ def submit_task_via_zapier( else: print(f"HTTP request failed with status code: {response.status_code}") print("HTTP responce: ", response.text) - return "Failed" + return "HTTP Request Failed" def query_task_status(self, task_id: str): response = request( @@ -115,7 +115,7 @@ def query_task_status(self, task_id: str): }, ) if response.status_code != 200: - return "Not Found" + return "HTTP Request Failed." response_data = response.json() # Get "matched" from the response matches = response_data.get("matches", None) @@ -123,13 +123,29 @@ def query_task_status(self, task_id: str): # Verify if the list contains only one element if matches is None: print("No task found with the given ID.") - return "Failed" + return "Task searching Failed" elif len(matches) > 1: print("Multiple tasks found with the given ID.") - return "Failed" + return "Task searching Failed" + + record = matches[0] # Extract the status from the first dictionary - status = matches[0].get("status") + status = record.get("status") + + if status == "Failed validation": + googledoc = record.get("resultsFileUrl") + + # convert the preview URL to download URL + googledoc = convert_preview_to_download(googledoc) + res = get(googledoc) + res.raise_for_status() + data = res.json() + # get the "statusCode" and "message" from the data and print them out. + status_code = data.get("statusCode", "NA") + message = data.get("message", "NA") + print(f"Task validation failed with status code: {status_code}, message: {message}") + return status def fetch_results(self, task_id: str): @@ -283,7 +299,10 @@ def status(self) -> QuEraTaskStatusCode: return QuEraTaskStatusCode.Unsubmitted res = self._http_handler.query_task_status(self._task_id) if res == "Failed": - raise ValueError("Query task status failed.") + return QuEraTaskStatusCode.Failed + elif res == "Failed validation": + + return QuEraTaskStatusCode.Failed elif res == "Submitted": return QuEraTaskStatusCode.Enqueued # TODO: please add all possible status From 1fc7e94adb8057393b22a8508ee788510b56c435 Mon Sep 17 00:00:00 2001 From: Jing Chen Date: Sun, 22 Jun 2025 14:55:37 -0400 Subject: [PATCH 17/17] black reformat --- src/bloqade/analog/task/exclusive.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/bloqade/analog/task/exclusive.py b/src/bloqade/analog/task/exclusive.py index 2df017777..3ff9d06f5 100644 --- a/src/bloqade/analog/task/exclusive.py +++ b/src/bloqade/analog/task/exclusive.py @@ -141,10 +141,12 @@ def query_task_status(self, task_id: str): res = get(googledoc) res.raise_for_status() data = res.json() - # get the "statusCode" and "message" from the data and print them out. + # get the "statusCode" and "message" from the data and print them out. status_code = data.get("statusCode", "NA") message = data.get("message", "NA") - print(f"Task validation failed with status code: {status_code}, message: {message}") + print( + f"Task validation failed with status code: {status_code}, message: {message}" + ) return status @@ -301,7 +303,7 @@ def status(self) -> QuEraTaskStatusCode: if res == "Failed": return QuEraTaskStatusCode.Failed elif res == "Failed validation": - + return QuEraTaskStatusCode.Failed elif res == "Submitted": return QuEraTaskStatusCode.Enqueued