Skip to content

Commit 2546871

Browse files
authored
Minor updates (#186)
Fixes #162 Fixes #145 Fixes #141 Fixes #140
1 parent 929dc81 commit 2546871

File tree

10 files changed

+123
-46
lines changed

10 files changed

+123
-46
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ build-develop = "python scripts/setup_bridge.py develop"
6565
build-develop-with-release = { cmd = "python scripts/setup_bridge.py develop", env = { TEMPORAL_BUILD_RELEASE = "1" }}
6666
fix-wheel = "python scripts/fix_wheel.py"
6767
format = [{cmd = "black ."}, {cmd = "isort ."}]
68-
gen-docs = "pydoctor"
68+
gen-docs = "python scripts/gen_docs.py"
6969
gen-protos = "python scripts/gen_protos.py"
7070
lint = [
7171
{cmd = "black --check ."},

scripts/_img/favicon.ico

106 KB
Binary file not shown.

scripts/gen_docs.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import shutil
2+
import subprocess
3+
from pathlib import Path
4+
5+
base_dir = Path(__file__).parent.parent
6+
7+
if __name__ == "__main__":
8+
print("Generating documentation...")
9+
10+
# Run pydoctor
11+
subprocess.check_call("pydoctor")
12+
13+
# Copy favicon
14+
shutil.copyfile(
15+
base_dir / "scripts" / "_img" / "favicon.ico",
16+
base_dir / "build" / "apidocs" / "favicon.ico",
17+
)

temporalio/bridge/src/telemetry.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use std::collections::HashMap;
44
use std::net::SocketAddr;
55
use std::str::FromStr;
66
use temporal_sdk_core::{
7-
telemetry_init, Logger, MetricsExporter, OtelCollectorOptions, TelemetryOptions,
8-
TelemetryOptionsBuilder, TraceExporter,
7+
telemetry_init, Logger, MetricTemporality, MetricsExporter, OtelCollectorOptions,
8+
TelemetryOptions, TelemetryOptionsBuilder, TraceExporter,
99
};
1010
use url::Url;
1111

@@ -23,6 +23,7 @@ pub struct TelemetryConfig {
2323
log_forwarding_level: Option<String>,
2424
otel_metrics: Option<OtelCollectorConfig>,
2525
prometheus_metrics: Option<PrometheusMetricsConfig>,
26+
metric_temporality: String,
2627
}
2728

2829
#[derive(FromPyObject)]
@@ -83,6 +84,19 @@ impl TryFrom<TelemetryConfig> for TelemetryOptions {
8384
})?,
8485
));
8586
}
87+
match conf.metric_temporality.as_str() {
88+
"cumulative" => {
89+
build.metric_temporality(MetricTemporality::Cumulative);
90+
}
91+
"delta" => {
92+
build.metric_temporality(MetricTemporality::Delta);
93+
}
94+
_ => {
95+
return Err(PyValueError::new_err(
96+
"Invalid metric temporality, expected 'cumulative' or 'delta'",
97+
));
98+
}
99+
}
86100
build
87101
.build()
88102
.map_err(|err| PyValueError::new_err(format!("Invalid telemetry config: {}", err)))

temporalio/bridge/telemetry.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
from dataclasses import dataclass
1010
from typing import Mapping, Optional
1111

12+
from typing_extensions import Literal
13+
1214
import temporalio.bridge.temporal_sdk_bridge
1315

1416

@@ -22,6 +24,7 @@ class TelemetryConfig:
2224
log_forwarding_level: Optional[str] = None
2325
otel_metrics: Optional[OtelCollectorConfig] = None
2426
prometheus_metrics: Optional[PrometheusMetricsConfig] = None
27+
metric_temporality: Literal["cumulative", "delta"] = "cumulative"
2528

2629

2730
@dataclass

temporalio/client.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1990,6 +1990,20 @@ def status(self) -> Optional[WorkflowExecutionStatus]:
19901990
return self._status
19911991

19921992

1993+
class WorkflowQueryFailedError(temporalio.exceptions.TemporalError):
1994+
"""Error that occurs when a query fails."""
1995+
1996+
def __init__(self, message: str) -> None:
1997+
"""Create workflow query failed error."""
1998+
super().__init__(message)
1999+
self._message = message
2000+
2001+
@property
2002+
def message(self) -> str:
2003+
"""Get query failed message."""
2004+
return self._message
2005+
2006+
19932007
class AsyncActivityCancelledError(temporalio.exceptions.TemporalError):
19942008
"""Error that occurs when async activity attempted heartbeat but was cancelled."""
19952009

@@ -2410,9 +2424,17 @@ async def query_workflow(self, input: QueryWorkflowInput) -> Any:
24102424
)
24112425
if input.headers is not None:
24122426
temporalio.common._apply_headers(input.headers, req.query.header.fields)
2413-
resp = await self._client.workflow_service.query_workflow(
2414-
req, retry=True, metadata=input.rpc_metadata, timeout=input.rpc_timeout
2415-
)
2427+
try:
2428+
resp = await self._client.workflow_service.query_workflow(
2429+
req, retry=True, metadata=input.rpc_metadata, timeout=input.rpc_timeout
2430+
)
2431+
except RPCError as err:
2432+
# If the status is INVALID_ARGUMENT, we can assume it's a query
2433+
# failed error
2434+
if err.status == RPCStatusCode.INVALID_ARGUMENT:
2435+
raise WorkflowQueryFailedError(err.message)
2436+
else:
2437+
raise
24162438
if resp.HasField("query_rejected"):
24172439
raise WorkflowQueryRejectedError(
24182440
WorkflowExecutionStatus(resp.query_rejected.status)

temporalio/service.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -662,7 +662,6 @@ async def __call__(
662662
class _BridgeServiceClient(ServiceClient):
663663
@staticmethod
664664
async def connect(config: ConnectConfig) -> _BridgeServiceClient:
665-
# TODO(cretz): Expose telemetry init config
666665
temporalio.bridge.telemetry.init_telemetry(
667666
temporalio.bridge.telemetry.TelemetryConfig(),
668667
warn_if_already_inited=False,
@@ -754,9 +753,15 @@ class RPCError(temporalio.exceptions.TemporalError):
754753
def __init__(self, message: str, status: RPCStatusCode, details: bytes) -> None:
755754
"""Initialize RPC error."""
756755
super().__init__(message)
756+
self._message = message
757757
self._status = status
758758
self._details = details
759759

760+
@property
761+
def message(self) -> str:
762+
"""Message for the error."""
763+
return self._message
764+
760765
@property
761766
def status(self) -> RPCStatusCode:
762767
"""Status code for the error."""

temporalio/worker/_workflow_instance.py

Lines changed: 42 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1202,45 +1202,53 @@ async def _run_top_level_workflow_function(self, coro: Awaitable[None]) -> None:
12021202
except _ContinueAsNewError as err:
12031203
logger.debug("Workflow requested continue as new")
12041204
err._apply_command(self._add_command())
1205-
except temporalio.exceptions.FailureError as err:
1205+
1206+
# Note in some Python versions, cancelled error does not extend
1207+
# exception
1208+
# TODO(cretz): Should I fail the task on BaseException too (e.g.
1209+
# KeyboardInterrupt)?
1210+
except (Exception, asyncio.CancelledError) as err:
12061211
logger.debug(
12071212
f"Workflow raised failure with run ID {self._info.run_id}",
12081213
exc_info=True,
12091214
)
1210-
# If a cancel was requested, and the failure is from an activity or
1211-
# child, and its cause was a cancellation, we want to use that cause
1212-
# instead because it means a cancel bubbled up while waiting on an
1213-
# activity or child.
1214-
if (
1215-
self._cancel_requested
1216-
and (
1217-
isinstance(err, temporalio.exceptions.ActivityError)
1218-
or isinstance(err, temporalio.exceptions.ChildWorkflowError)
1219-
)
1220-
and isinstance(err.cause, temporalio.exceptions.CancelledError)
1221-
):
1222-
err = err.cause
12231215

1224-
command = self._add_command()
1225-
command.fail_workflow_execution.failure.SetInParent()
1226-
try:
1227-
self._failure_converter.to_failure(
1228-
err,
1229-
self._payload_converter,
1230-
command.fail_workflow_execution.failure,
1216+
# All asyncio cancelled errors become Temporal cancelled errors
1217+
if isinstance(err, asyncio.CancelledError):
1218+
err = temporalio.exceptions.CancelledError(str(err))
1219+
1220+
# If a cancel was ever requested and this is a cancellation, or an
1221+
# activity/child cancellation, we add a cancel command. Technically
1222+
# this means that a swallowed cancel followed by, say, an activity
1223+
# cancel later on will show the workflow as cancelled. But this is
1224+
# a Temporal limitation in that cancellation is a state not an
1225+
# event.
1226+
if self._cancel_requested and (
1227+
isinstance(err, temporalio.exceptions.CancelledError)
1228+
or (
1229+
(
1230+
isinstance(err, temporalio.exceptions.ActivityError)
1231+
or isinstance(err, temporalio.exceptions.ChildWorkflowError)
1232+
)
1233+
and isinstance(err.cause, temporalio.exceptions.CancelledError)
12311234
)
1232-
except Exception as inner_err:
1233-
raise ValueError("Failed converting workflow exception") from inner_err
1234-
except asyncio.CancelledError as err:
1235-
command = self._add_command()
1236-
command.fail_workflow_execution.failure.SetInParent()
1237-
self._failure_converter.to_failure(
1238-
temporalio.exceptions.CancelledError(str(err)),
1239-
self._payload_converter,
1240-
command.fail_workflow_execution.failure,
1241-
)
1242-
except Exception as err:
1243-
self._current_activation_error = err
1235+
):
1236+
self._add_command().cancel_workflow_execution.SetInParent()
1237+
elif isinstance(err, temporalio.exceptions.FailureError):
1238+
# All other failure errors fail the workflow
1239+
failure = self._add_command().fail_workflow_execution.failure
1240+
failure.SetInParent()
1241+
try:
1242+
self._failure_converter.to_failure(
1243+
err, self._payload_converter, failure
1244+
)
1245+
except Exception as inner_err:
1246+
raise ValueError(
1247+
"Failed converting workflow exception"
1248+
) from inner_err
1249+
else:
1250+
# All other exceptions fail the task
1251+
self._current_activation_error = err
12441252

12451253
async def _signal_external_workflow(
12461254
self,
@@ -1471,7 +1479,7 @@ async def handle_query(self, input: HandleQueryInput) -> Any:
14711479
# an # interceptor could have changed the name
14721480
if not handler:
14731481
raise RuntimeError(
1474-
f"Query handler for {input.query} expected but not found"
1482+
f"Query handler for '{input.query}' expected but not found"
14751483
)
14761484
# Put name first if dynamic
14771485
args = list(input.args) if not dynamic else [input.query] + list(input.args)

tests/test_client.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
WorkflowExecutionStatus,
4141
WorkflowFailureError,
4242
WorkflowHandle,
43+
WorkflowQueryFailedError,
4344
WorkflowQueryRejectedError,
4445
_history_from_json,
4546
)
@@ -246,10 +247,8 @@ async def test_query(client: Client, worker: ExternalWorker):
246247
await handle.result()
247248
assert "some query arg" == await handle.query("some query", "some query arg")
248249
# Try a query not on the workflow
249-
with pytest.raises(RPCError) as err:
250+
with pytest.raises(WorkflowQueryFailedError) as err:
250251
await handle.query("does not exist")
251-
# TODO(cretz): Is this the status we expect all SDKs to report?
252-
assert err.value.status == RPCStatusCode.INVALID_ARGUMENT
253252

254253

255254
async def test_query_rejected(client: Client, worker: ExternalWorker):

tests/worker/test_workflow.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,10 @@
4242
Client,
4343
RPCError,
4444
RPCStatusCode,
45+
WorkflowExecutionStatus,
4546
WorkflowFailureError,
4647
WorkflowHandle,
48+
WorkflowQueryFailedError,
4749
)
4850
from temporalio.common import RetryPolicy, SearchAttributes
4951
from temporalio.converter import (
@@ -366,10 +368,16 @@ async def test_workflow_signal_and_query_errors(client: Client):
366368
assert isinstance(err.value.cause, ApplicationError)
367369
assert list(err.value.cause.details) == [123]
368370
# Fail query (no details on query failure)
369-
with pytest.raises(RPCError) as rpc_err:
371+
with pytest.raises(WorkflowQueryFailedError) as rpc_err:
370372
await handle.query(SignalAndQueryErrorsWorkflow.bad_query)
371-
assert rpc_err.value.status is RPCStatusCode.INVALID_ARGUMENT
372373
assert str(rpc_err.value) == "query fail"
374+
# Unrecognized query
375+
with pytest.raises(WorkflowQueryFailedError) as rpc_err:
376+
await handle.query("non-existent query")
377+
assert (
378+
str(rpc_err.value)
379+
== "Query handler for 'non-existent query' expected but not found"
380+
)
373381

374382

375383
@workflow.defn
@@ -724,6 +732,7 @@ async def started() -> bool:
724732
with pytest.raises(WorkflowFailureError) as err:
725733
await handle.result()
726734
assert isinstance(err.value.cause, CancelledError)
735+
assert (await handle.describe()).status == WorkflowExecutionStatus.CANCELED
727736

728737

729738
@workflow.defn

0 commit comments

Comments
 (0)