Skip to content

Commit 47be211

Browse files
authored
Update SDK Core and PyO3 (#31)
1 parent f1aa1c9 commit 47be211

File tree

8 files changed

+390
-388
lines changed

8 files changed

+390
-388
lines changed

temporalio/bridge/Cargo.lock

Lines changed: 274 additions & 358 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

temporalio/bridge/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@ crate-type = ["cdylib"]
99

1010
[dependencies]
1111
log = "0.4"
12+
parking_lot = "0.12"
1213
prost = "0.9"
1314
prost-types = "0.9"
14-
pyo3 = { version = "0.15", features = ["extension-module", "abi3-py37"] }
15-
pyo3-asyncio = { version = "0.15", features = ["tokio-runtime"] }
15+
pyo3 = { version = "0.16", features = ["extension-module", "abi3-py37"] }
16+
pyo3-asyncio = { version = "0.16", features = ["tokio-runtime"] }
1617
temporal-client = { version = "0.1.0", path = "./sdk-core/client" }
1718
temporal-sdk-core = { version = "0.1.0", path = "./sdk-core/core" }
1819
temporal-sdk-core-api = { version = "0.1.0", path = "./sdk-core/core-api" }

temporalio/bridge/sdk-core

Submodule sdk-core updated 104 files

temporalio/bridge/src/client.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
use parking_lot::RwLock;
12
use pyo3::exceptions::{PyException, PyRuntimeError, PyValueError};
23
use pyo3::prelude::*;
34
use pyo3_asyncio::tokio::future_into_py;
45
use std::collections::HashMap;
6+
use std::sync::Arc;
57
use std::time::Duration;
68
use temporal_client::{
79
ClientOptions, ClientOptionsBuilder, ConfiguredClient, RetryClient, RetryConfig, TlsConfig,
@@ -51,12 +53,20 @@ struct ClientRetryConfig {
5153

5254
pub fn connect_client(py: Python, config: ClientConfig) -> PyResult<&PyAny> {
5355
// TODO(cretz): Add metrics_meter?
56+
let headers = if config.static_headers.is_empty() {
57+
None
58+
} else {
59+
Some(Arc::new(RwLock::new(config.static_headers.clone())))
60+
};
5461
let opts: ClientOptions = config.try_into()?;
5562
future_into_py(py, async move {
5663
Ok(ClientRef {
57-
retry_client: opts.connect_no_namespace(None).await.map_err(|err| {
58-
PyRuntimeError::new_err(format!("Failed client connect: {}", err))
59-
})?,
64+
retry_client: opts
65+
.connect_no_namespace(None, headers)
66+
.await
67+
.map_err(|err| {
68+
PyRuntimeError::new_err(format!("Failed client connect: {}", err))
69+
})?,
6070
})
6171
})
6272
}
@@ -96,9 +106,14 @@ impl ClientRef {
96106
"get_search_attributes" => {
97107
rpc_call!(retry_client, retry, get_search_attributes, req)
98108
}
109+
"get_system_info" => rpc_call!(retry_client, retry, get_system_info, req),
99110
"get_workflow_execution_history" => {
100111
rpc_call!(retry_client, retry, get_workflow_execution_history, req)
101112
}
113+
// TODO(cretz): Fix when https://github.com/temporalio/sdk-core/issues/335 fixed
114+
// "get_workflow_execution_history_reverse" => {
115+
// rpc_call!(retry_client, retry, get_workflow_execution_history_reverse, req)
116+
// }
102117
"list_archived_workflow_executions" => {
103118
rpc_call!(retry_client, retry, list_archived_workflow_executions, req)
104119
}
@@ -238,7 +253,6 @@ impl TryFrom<ClientConfig> for ClientOptions {
238253
)
239254
.client_name(opts.client_name)
240255
.client_version(opts.client_version)
241-
.static_headers(opts.static_headers)
242256
.identity(opts.identity)
243257
.worker_binary_id(opts.worker_binary_id)
244258
.retry_config(

temporalio/bridge/src/telemetry.rs

Lines changed: 59 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,39 @@
11
use pyo3::exceptions::{PyRuntimeError, PyValueError};
22
use pyo3::prelude::*;
3+
use std::collections::HashMap;
34
use std::net::SocketAddr;
45
use std::str::FromStr;
5-
use temporal_sdk_core::{telemetry_init, TelemetryOptions, TelemetryOptionsBuilder};
6+
use temporal_sdk_core::{
7+
telemetry_init, Logger, MetricsExporter, OtelCollectorOptions, TelemetryOptions,
8+
TelemetryOptionsBuilder, TraceExporter,
9+
};
610
use url::Url;
711

812
#[pyclass]
913
pub struct TelemetryRef {
1014
// TODO(cretz): This is private
11-
// telemetry: &'static temporal_sdk_core::telemetry::GlobalTelemDat,
15+
// telemetry: &'static temporal_sdk_core::telemetry::GlobalTelemDat,
1216
}
1317

1418
#[derive(FromPyObject)]
1519
pub struct TelemetryConfig {
16-
otel_collector_url: Option<String>,
1720
tracing_filter: Option<String>,
21+
otel_tracing: Option<OtelCollectorConfig>,
22+
log_console: bool,
1823
log_forwarding_level: Option<String>,
19-
prometheus_export_bind_address: Option<String>,
24+
otel_metrics: Option<OtelCollectorConfig>,
25+
prometheus_metrics: Option<PrometheusMetricsConfig>,
26+
}
27+
28+
#[derive(FromPyObject)]
29+
pub struct OtelCollectorConfig {
30+
url: String,
31+
headers: HashMap<String, String>,
32+
}
33+
34+
#[derive(FromPyObject)]
35+
pub struct PrometheusMetricsConfig {
36+
bind_address: String,
2037
}
2138

2239
pub fn init_telemetry(config: TelemetryConfig) -> PyResult<TelemetryRef> {
@@ -34,28 +51,52 @@ impl TryFrom<TelemetryConfig> for TelemetryOptions {
3451

3552
fn try_from(conf: TelemetryConfig) -> PyResult<Self> {
3653
let mut build = TelemetryOptionsBuilder::default();
37-
if let Some(ref v) = conf.otel_collector_url {
38-
build.otel_collector_url(
39-
Url::parse(v)
40-
.map_err(|err| PyValueError::new_err(format!("Invalid OTel URL: {}", err)))?,
41-
);
42-
}
4354
if let Some(v) = conf.tracing_filter {
4455
build.tracing_filter(v);
4556
}
57+
if let Some(v) = conf.otel_tracing {
58+
build.tracing(TraceExporter::Otel(v.try_into()?));
59+
}
4660
if let Some(ref v) = conf.log_forwarding_level {
47-
build.log_forwarding_level(
48-
log::LevelFilter::from_str(v)
49-
.map_err(|err| PyValueError::new_err(format!("Invalid log level: {}", err)))?,
50-
);
61+
if conf.log_console {
62+
return Err(PyValueError::new_err(
63+
"Cannot have log forwarding level and log console",
64+
));
65+
}
66+
build.logging(Logger::Forward(log::LevelFilter::from_str(v).map_err(
67+
|err| PyValueError::new_err(format!("Invalid log level: {}", err)),
68+
)?));
69+
} else if conf.log_console {
70+
build.logging(Logger::Console);
5171
}
52-
if let Some(ref v) = conf.prometheus_export_bind_address {
53-
build.prometheus_export_bind_address(SocketAddr::from_str(v).map_err(|err| {
54-
PyValueError::new_err(format!("Invalid Prometheus address: {}", err))
55-
})?);
72+
if let Some(v) = conf.otel_metrics {
73+
if conf.prometheus_metrics.is_some() {
74+
return Err(PyValueError::new_err(
75+
"Cannot have OTel and Prometheus metrics",
76+
));
77+
}
78+
build.metrics(MetricsExporter::Otel(v.try_into()?));
79+
} else if let Some(v) = conf.prometheus_metrics {
80+
build.metrics(MetricsExporter::Prometheus(
81+
SocketAddr::from_str(&v.bind_address).map_err(|err| {
82+
PyValueError::new_err(format!("Invalid Prometheus address: {}", err))
83+
})?,
84+
));
5685
}
5786
build
5887
.build()
5988
.map_err(|err| PyValueError::new_err(format!("Invalid telemetry config: {}", err)))
6089
}
6190
}
91+
92+
impl TryFrom<OtelCollectorConfig> for OtelCollectorOptions {
93+
type Error = PyErr;
94+
95+
fn try_from(conf: OtelCollectorConfig) -> PyResult<Self> {
96+
Ok(OtelCollectorOptions {
97+
url: Url::parse(&conf.url)
98+
.map_err(|err| PyValueError::new_err(format!("Invalid OTel URL: {}", err)))?,
99+
headers: conf.headers,
100+
})
101+
}
102+
}

temporalio/bridge/telemetry.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
"""Telemetry for SDK Core."""
22

3+
from __future__ import annotations
4+
35
import warnings
46
from dataclasses import dataclass
5-
from typing import Optional
7+
from typing import Mapping, Optional
68

79
import temporalio.bridge.temporal_sdk_bridge
810

@@ -11,10 +13,27 @@
1113
class TelemetryConfig:
1214
"""Python representation of the Rust struct for configuring telemetry."""
1315

14-
otel_collector_url: Optional[str] = None
1516
tracing_filter: Optional[str] = None
17+
otel_tracing: Optional[OtelCollectorConfig] = None
18+
log_console: bool = False
1619
log_forwarding_level: Optional[str] = None
17-
prometheus_export_bind_address: Optional[str] = None
20+
otel_metrics: Optional[OtelCollectorConfig] = None
21+
prometheus_metrics: Optional[PrometheusMetricsConfig] = None
22+
23+
24+
@dataclass
25+
class OtelCollectorConfig:
26+
"""Python representation of the Rust struct for configuring OTel."""
27+
28+
url: str
29+
headers: Mapping[str, str]
30+
31+
32+
@dataclass
33+
class PrometheusMetricsConfig:
34+
"""Python representation of the Rust struct for configuring Prometheus."""
35+
36+
bind_address: str
1837

1938

2039
_inited = False

temporalio/workflow_service.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,11 +171,22 @@ def __init__(self, config: ConnectConfig) -> None:
171171
wsv1.GetSearchAttributesRequest,
172172
wsv1.GetSearchAttributesResponse,
173173
)
174+
self.get_system_info = self._new_call(
175+
"get_system_info",
176+
wsv1.GetSystemInfoRequest,
177+
wsv1.GetSystemInfoResponse,
178+
)
174179
self.get_workflow_execution_history = self._new_call(
175180
"get_workflow_execution_history",
176181
wsv1.GetWorkflowExecutionHistoryRequest,
177182
wsv1.GetWorkflowExecutionHistoryResponse,
178183
)
184+
# TODO(cretz): Fix when https://github.com/temporalio/sdk-core/issues/335 fixed
185+
# self.get_workflow_execution_history_reverse = self._new_call(
186+
# "get_workflow_execution_history_reverse",
187+
# wsv1.GetWorkflowExecutionHistoryReverseRequest,
188+
# wsv1.GetWorkflowExecutionHistoryReverseResponse,
189+
# )
179190
self.list_archived_workflow_executions = self._new_call(
180191
"list_archived_workflow_executions",
181192
wsv1.ListArchivedWorkflowExecutionsRequest,

tests/test_workflow_service.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ def test_all_grpc_calls_present(client: Client):
2727
# Collect gRPC service calls with a fake channel
2828
channel = CallCollectingChannel()
2929
temporalio.api.workflowservice.v1.WorkflowServiceStub(channel)
30-
# TODO(cretz): Remove once get_system_info is in core
31-
del channel.calls["get_system_info"]
30+
# TODO(cretz): Remove once https://github.com/temporalio/sdk-core/issues/335 fixed
31+
del channel.calls["get_workflow_execution_history_reverse"]
3232

3333
assert channel.calls == workflow_service_calls
3434

0 commit comments

Comments
 (0)