Skip to content

Commit 1a7a495

Browse files
unexgecrisidev
andauthored
Python: Event Streaming (#2482)
* Add `RustType.Application` and `PythonType.Application` variants * Use `RustType.Application` for event streaming symbols * Call symbol provider to get `Receiver` type instead of hardcoding * Add Python specific event streaming symbol provider * Add event streaming wrapper generator * Generate correct type information for event streaming types * Add `CapturePokemon` operation to Python Pokemon service * Add `InternalServerError` variant to all event streaming errors * Use `PythonServerCargoDependency` for PyO3Asyncio imports * Return an attribute error instead of panicking in `IntoPy` impls of wrappers * Add `Sync` bound to `new` methods of wrappers * Revert "Add `InternalServerError` variant to all event streaming errors" This reverts commit b610cb2. * Add `PythonServerEventStreamErrorGenerator` to generate Python specific error types for unions * Try to extract error type or inner type from incoming streaming value and ignore the value otherwise for now * Allow missing type-stubs for `aiohttp` * Propogate modelled errors through stream * Raise modelled exceptions rather than sending through stream * Allow senders from Python side to raise modelled exceptions * Update `EventStreamSymbolProviderTest` to use `Application` type instead of `Opaque` type * Fix `ktlint` issues * Group up common variables in `codegenScope` * Document `RustType.Application` * Format `codegen-server-test/python/model/pokemon.smithy` * Use `tokio-stream` crate instead of `futures` crate * Use a better error message if user tries to reuse a stream * Add some details about event streams to example Pokemon service --------- Co-authored-by: Matteo Bigoi <1781140+crisidev@users.noreply.github.com>
1 parent 33b24bb commit 1a7a495

File tree

20 files changed

+972
-36
lines changed

20 files changed

+972
-36
lines changed

codegen-client/src/test/kotlin/software/amazon/smithy/rust/codegen/client/smithy/EventStreamSymbolProviderTest.kt

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import software.amazon.smithy.rust.codegen.client.testutil.testClientRustSetting
1515
import software.amazon.smithy.rust.codegen.core.rustlang.RustType
1616
import software.amazon.smithy.rust.codegen.core.smithy.CodegenTarget
1717
import software.amazon.smithy.rust.codegen.core.smithy.EventStreamSymbolProvider
18+
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType
1819
import software.amazon.smithy.rust.codegen.core.smithy.SymbolVisitor
1920
import software.amazon.smithy.rust.codegen.core.smithy.rustType
2021
import software.amazon.smithy.rust.codegen.core.smithy.transformers.OperationNormalizer
@@ -60,8 +61,17 @@ class EventStreamSymbolProviderTest {
6061
val inputType = provider.toSymbol(inputStream).rustType()
6162
val outputType = provider.toSymbol(outputStream).rustType()
6263

63-
inputType shouldBe RustType.Opaque("EventStreamSender<crate::types::SomeStream, crate::types::error::SomeStreamError>", "aws_smithy_http::event_stream")
64-
outputType shouldBe RustType.Opaque("Receiver<crate::types::SomeStream, crate::types::error::SomeStreamError>", "aws_smithy_http::event_stream")
64+
val someStream = RustType.Opaque("SomeStream", "crate::types")
65+
val someStreamError = RustType.Opaque("SomeStreamError", "crate::types::error")
66+
67+
inputType shouldBe RustType.Application(
68+
RuntimeType.eventStreamSender(TestRuntimeConfig).toSymbol().rustType(),
69+
listOf(someStream, someStreamError),
70+
)
71+
outputType shouldBe RustType.Application(
72+
RuntimeType.eventStreamReceiver(TestRuntimeConfig).toSymbol().rustType(),
73+
listOf(someStream, someStreamError),
74+
)
6575
}
6676

6777
@Test

codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/rustlang/RustType.kt

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,18 @@ sealed class RustType {
172172
}
173173

174174
data class Opaque(override val name: kotlin.String, override val namespace: kotlin.String? = null) : RustType()
175+
176+
/**
177+
* Represents application of a Rust type with the given arguments.
178+
*
179+
* For example, we can represent `HashMap<String, i64>` as
180+
* `RustType.Application(RustType.Opaque("HashMap"), listOf(RustType.String, RustType.Integer(64)))`.
181+
* This helps us to separate the type and the arguments which is useful in methods like [qualifiedName].
182+
*/
183+
data class Application(val type: RustType, val args: List<RustType>) : RustType() {
184+
override val name = type.name
185+
override val namespace = type.namespace
186+
}
175187
}
176188

177189
/**
@@ -242,7 +254,10 @@ fun RustType.render(fullyQualified: Boolean = true): String {
242254
"&${this.lifetime?.let { "'$it" } ?: ""} ${this.member.render(fullyQualified)}"
243255
}
244256
}
245-
257+
is RustType.Application -> {
258+
val args = this.args.joinToString(", ") { it.render(fullyQualified) }
259+
"${this.name}<$args>"
260+
}
246261
is RustType.Option -> "${this.name}<${this.member.render(fullyQualified)}>"
247262
is RustType.Box -> "${this.name}<${this.member.render(fullyQualified)}>"
248263
is RustType.Dyn -> "${this.name} ${this.member.render(fullyQualified)}"

codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/EventStreamSymbolProvider.kt

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import software.amazon.smithy.model.shapes.OperationShape
1111
import software.amazon.smithy.model.shapes.Shape
1212
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency
1313
import software.amazon.smithy.rust.codegen.core.rustlang.RustType
14-
import software.amazon.smithy.rust.codegen.core.rustlang.render
1514
import software.amazon.smithy.rust.codegen.core.rustlang.stripOuter
1615
import software.amazon.smithy.rust.codegen.core.smithy.traits.SyntheticInputTrait
1716
import software.amazon.smithy.rust.codegen.core.smithy.traits.SyntheticOutputTrait
@@ -48,15 +47,15 @@ class EventStreamSymbolProvider(
4847
} else {
4948
symbolForEventStreamError(unionShape)
5049
}
51-
val errorFmt = error.rustType().render(fullyQualified = true)
52-
val innerFmt = initial.rustType().stripOuter<RustType.Option>().render(fullyQualified = true)
50+
val errorT = error.rustType()
51+
val innerT = initial.rustType().stripOuter<RustType.Option>()
5352
val isSender = (shape.isInputEventStream(model) && target == CodegenTarget.CLIENT) ||
5453
(shape.isOutputEventStream(model) && target == CodegenTarget.SERVER)
5554
val outer = when (isSender) {
56-
true -> "EventStreamSender<$innerFmt, $errorFmt>"
57-
else -> "Receiver<$innerFmt, $errorFmt>"
55+
true -> RuntimeType.eventStreamSender(runtimeConfig).toSymbol().rustType()
56+
else -> RuntimeType.eventStreamReceiver(runtimeConfig).toSymbol().rustType()
5857
}
59-
val rustType = RustType.Opaque(outer, "aws_smithy_http::event_stream")
58+
val rustType = RustType.Application(outer, listOf(innerT, errorT))
6059
return initial.toBuilder()
6160
.name(rustType.name)
6261
.rustType(rustType)

codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/RuntimeType.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,9 @@ data class RuntimeType(val path: String, val dependency: RustDependency? = null)
218218
val Bool = std.resolve("primitive::bool")
219219
val TryFrom = stdConvert.resolve("TryFrom")
220220
val Vec = std.resolve("vec::Vec")
221+
val Arc = std.resolve("sync::Arc")
222+
val Send = std.resolve("marker::Send")
223+
val Sync = std.resolve("marker::Sync")
221224

222225
// external cargo dependency types
223226
val Bytes = CargoDependency.Bytes.toType().resolve("Bytes")
@@ -277,6 +280,7 @@ data class RuntimeType(val path: String, val dependency: RustDependency? = null)
277280
fun document(runtimeConfig: RuntimeConfig): RuntimeType = smithyTypes(runtimeConfig).resolve("Document")
278281
fun retryErrorKind(runtimeConfig: RuntimeConfig) = smithyTypes(runtimeConfig).resolve("retry::ErrorKind")
279282
fun eventStreamReceiver(runtimeConfig: RuntimeConfig): RuntimeType = smithyHttp(runtimeConfig).resolve("event_stream::Receiver")
283+
fun eventStreamSender(runtimeConfig: RuntimeConfig): RuntimeType = smithyHttp(runtimeConfig).resolve("event_stream::EventStreamSender")
280284
fun errorMetadata(runtimeConfig: RuntimeConfig) = smithyTypes(runtimeConfig).resolve("error::ErrorMetadata")
281285
fun errorMetadataBuilder(runtimeConfig: RuntimeConfig) = smithyTypes(runtimeConfig).resolve("error::metadata::Builder")
282286
fun provideErrorMetadataTrait(runtimeConfig: RuntimeConfig) = smithyTypes(runtimeConfig).resolve("error::metadata::ProvideErrorMetadata")

codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/generators/http/HttpBindingGenerator.kt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import software.amazon.smithy.rust.codegen.core.rustlang.RustType
3030
import software.amazon.smithy.rust.codegen.core.rustlang.RustWriter
3131
import software.amazon.smithy.rust.codegen.core.rustlang.Writable
3232
import software.amazon.smithy.rust.codegen.core.rustlang.asOptional
33+
import software.amazon.smithy.rust.codegen.core.rustlang.qualifiedName
3334
import software.amazon.smithy.rust.codegen.core.rustlang.render
3435
import software.amazon.smithy.rust.codegen.core.rustlang.rust
3536
import software.amazon.smithy.rust.codegen.core.rustlang.rustBlock
@@ -222,7 +223,7 @@ class HttpBindingGenerator(
222223
// Streaming unions are Event Streams and should be handled separately
223224
val target = model.expectShape(binding.member.target)
224225
if (target is UnionShape) {
225-
bindEventStreamOutput(operationShape, target)
226+
bindEventStreamOutput(operationShape, outputT, target)
226227
} else {
227228
deserializeStreamingBody(binding)
228229
}
@@ -243,22 +244,22 @@ class HttpBindingGenerator(
243244
}
244245
}
245246

246-
private fun RustWriter.bindEventStreamOutput(operationShape: OperationShape, targetShape: UnionShape) {
247+
private fun RustWriter.bindEventStreamOutput(operationShape: OperationShape, outputT: Symbol, targetShape: UnionShape) {
247248
val unmarshallerConstructorFn = EventStreamUnmarshallerGenerator(
248249
protocol,
249250
codegenContext,
250251
operationShape,
251252
targetShape,
252253
).render()
254+
val receiver = outputT.rustType().qualifiedName()
253255
rustTemplate(
254256
"""
255257
let unmarshaller = #{unmarshallerConstructorFn}();
256258
let body = std::mem::replace(body, #{SdkBody}::taken());
257-
Ok(#{Receiver}::new(unmarshaller, body))
259+
Ok($receiver::new(unmarshaller, body))
258260
""",
259261
"SdkBody" to RuntimeType.sdkBody(runtimeConfig),
260262
"unmarshallerConstructorFn" to unmarshallerConstructorFn,
261-
"Receiver" to RuntimeType.eventStreamReceiver(runtimeConfig),
262263
)
263264
}
264265

codegen-server-test/python/model/pokemon.smithy

Lines changed: 87 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,18 @@
11
/// TODO(https://github.com/awslabs/smithy-rs/issues/1508)
2-
/// $econcile this model with the main one living inside codegen-server-test/model/pokemon.smithy
2+
/// Reconcile this model with the main one living inside codegen-server-test/model/pokemon.smithy
33
/// once the Python implementation supports Streaming and Union shapes.
44
$version: "1.0"
55

66
namespace com.aws.example.python
77

88
use aws.protocols#restJson1
9+
use com.aws.example#CheckHealth
10+
use com.aws.example#DoNothing
11+
use com.aws.example#GetServerStatistics
912
use com.aws.example#PokemonSpecies
1013
use com.aws.example#Storage
11-
use com.aws.example#GetServerStatistics
12-
use com.aws.example#DoNothing
13-
use com.aws.example#CheckHealth
1414
use smithy.framework#ValidationException
1515

16-
1716
/// The Pokémon Service allows you to retrieve information about Pokémon species.
1817
@title("Pokémon Service")
1918
@restJson1
@@ -23,11 +22,93 @@ service PokemonService {
2322
operations: [
2423
GetServerStatistics
2524
DoNothing
25+
CapturePokemon
2626
CheckHealth
2727
StreamPokemonRadio
28-
],
28+
]
29+
}
30+
31+
/// Capture Pokémons via event streams.
32+
@http(uri: "/capture-pokemon-event/{region}", method: "POST")
33+
operation CapturePokemon {
34+
input: CapturePokemonEventsInput
35+
output: CapturePokemonEventsOutput
36+
errors: [
37+
UnsupportedRegionError
38+
ThrottlingError
39+
ValidationException
40+
]
2941
}
3042

43+
@input
44+
structure CapturePokemonEventsInput {
45+
@httpPayload
46+
events: AttemptCapturingPokemonEvent
47+
@httpLabel
48+
@required
49+
region: String
50+
}
51+
52+
@output
53+
structure CapturePokemonEventsOutput {
54+
@httpPayload
55+
events: CapturePokemonEvents
56+
}
57+
58+
@streaming
59+
union AttemptCapturingPokemonEvent {
60+
event: CapturingEvent
61+
masterball_unsuccessful: MasterBallUnsuccessful
62+
}
63+
64+
structure CapturingEvent {
65+
@eventPayload
66+
payload: CapturingPayload
67+
}
68+
69+
structure CapturingPayload {
70+
name: String
71+
pokeball: String
72+
}
73+
74+
@streaming
75+
union CapturePokemonEvents {
76+
event: CaptureEvent
77+
invalid_pokeball: InvalidPokeballError
78+
throttlingError: ThrottlingError
79+
}
80+
81+
structure CaptureEvent {
82+
@eventHeader
83+
name: String
84+
@eventHeader
85+
captured: Boolean
86+
@eventHeader
87+
shiny: Boolean
88+
@eventPayload
89+
pokedex_update: Blob
90+
}
91+
92+
@error("server")
93+
structure UnsupportedRegionError {
94+
@required
95+
region: String
96+
}
97+
98+
@error("client")
99+
structure InvalidPokeballError {
100+
@required
101+
pokeball: String
102+
}
103+
104+
@error("server")
105+
structure MasterBallUnsuccessful {
106+
message: String
107+
}
108+
109+
@error("client")
110+
structure ThrottlingError {}
111+
31112
/// Fetch the radio song from the database and stream it back as a playable audio.
32113
@readonly
33114
@http(uri: "/radio", method: "GET")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package software.amazon.smithy.rust.codegen.server.python.smithy
7+
8+
import software.amazon.smithy.codegen.core.Symbol
9+
import software.amazon.smithy.model.shapes.MemberShape
10+
import software.amazon.smithy.model.shapes.OperationShape
11+
import software.amazon.smithy.model.shapes.Shape
12+
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency
13+
import software.amazon.smithy.rust.codegen.core.rustlang.RustType
14+
import software.amazon.smithy.rust.codegen.core.rustlang.stripOuter
15+
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeConfig
16+
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType
17+
import software.amazon.smithy.rust.codegen.core.smithy.RustSymbolProvider
18+
import software.amazon.smithy.rust.codegen.core.smithy.WrappingSymbolProvider
19+
import software.amazon.smithy.rust.codegen.core.smithy.rustType
20+
import software.amazon.smithy.rust.codegen.core.smithy.traits.SyntheticInputTrait
21+
import software.amazon.smithy.rust.codegen.core.smithy.traits.SyntheticOutputTrait
22+
import software.amazon.smithy.rust.codegen.core.smithy.transformers.eventStreamErrors
23+
import software.amazon.smithy.rust.codegen.core.util.getTrait
24+
import software.amazon.smithy.rust.codegen.core.util.isEventStream
25+
import software.amazon.smithy.rust.codegen.core.util.isOutputEventStream
26+
import software.amazon.smithy.rust.codegen.core.util.toPascalCase
27+
28+
/**
29+
* Symbol provider for Python that maps event streaming member shapes to their respective Python wrapper types.
30+
*
31+
* For example given a model:
32+
* ```smithy
33+
* @input
34+
* structure CapturePokemonInput {
35+
* @httpPayload
36+
* events: AttemptCapturingPokemonEvent,
37+
* }
38+
*
39+
* @streaming
40+
* union AttemptCapturingPokemonEvent {
41+
* ...
42+
* }
43+
* ```
44+
* for the member shape `CapturePokemonInput$events` it will return a symbol that points to
45+
* `crate::python_event_stream::CapturePokemonInputEventsReceiver`.
46+
*/
47+
class PythonEventStreamSymbolProvider(
48+
private val runtimeConfig: RuntimeConfig,
49+
base: RustSymbolProvider,
50+
) : WrappingSymbolProvider(base) {
51+
override fun toSymbol(shape: Shape): Symbol {
52+
val initial = super.toSymbol(shape)
53+
54+
// We only want to wrap with Event Stream types when dealing with member shapes
55+
if (shape !is MemberShape || !shape.isEventStream(model)) {
56+
return initial
57+
}
58+
59+
// We can only wrap the type if it's either an input or an output that used in an operation
60+
model.expectShape(shape.container).let { maybeInputOutput ->
61+
val operationId = maybeInputOutput.getTrait<SyntheticInputTrait>()?.operation
62+
?: maybeInputOutput.getTrait<SyntheticOutputTrait>()?.operation
63+
operationId?.let { model.expectShape(it, OperationShape::class.java) }
64+
} ?: return initial
65+
66+
val unionShape = model.expectShape(shape.target).asUnionShape().get()
67+
val error = if (unionShape.eventStreamErrors().isEmpty()) {
68+
RuntimeType.smithyHttp(runtimeConfig).resolve("event_stream::MessageStreamError").toSymbol()
69+
} else {
70+
symbolForEventStreamError(unionShape)
71+
}
72+
val inner = initial.rustType().stripOuter<RustType.Option>()
73+
val innerSymbol = Symbol.builder().name(inner.name).rustType(inner).build()
74+
val containerName = shape.container.name
75+
val memberName = shape.memberName.toPascalCase()
76+
val outer = when (shape.isOutputEventStream(model)) {
77+
true -> "${containerName}${memberName}EventStreamSender"
78+
else -> "${containerName}${memberName}Receiver"
79+
}
80+
val rustType = RustType.Opaque(outer, PythonServerRustModule.PythonEventStream.fullyQualifiedPath())
81+
return Symbol.builder()
82+
.name(rustType.name)
83+
.rustType(rustType)
84+
.addReference(innerSymbol)
85+
.addReference(error)
86+
.addDependency(CargoDependency.smithyHttp(runtimeConfig).withFeature("event-stream"))
87+
.addDependency(PythonServerCargoDependency.TokioStream)
88+
.addDependency(PythonServerCargoDependency.PyO3Asyncio.withFeature("unstable-streams"))
89+
.build()
90+
}
91+
92+
companion object {
93+
data class EventStreamSymbol(val innerT: RustType, val errorT: RustType)
94+
95+
fun parseSymbol(symbol: Symbol): EventStreamSymbol {
96+
check(symbol.references.size >= 2) {
97+
"`PythonEventStreamSymbolProvider` adds inner type and error type as references to resulting symbol"
98+
}
99+
val innerT = symbol.references[0].symbol.rustType()
100+
val errorT = symbol.references[1].symbol.rustType()
101+
return EventStreamSymbol(innerT, errorT)
102+
}
103+
}
104+
}

codegen-server/python/src/main/kotlin/software/amazon/smithy/rust/codegen/server/python/smithy/PythonServerCargoDependency.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ object PythonServerCargoDependency {
1818
val PyO3: CargoDependency = CargoDependency("pyo3", CratesIo("0.17"))
1919
val PyO3Asyncio: CargoDependency = CargoDependency("pyo3-asyncio", CratesIo("0.17"), features = setOf("attributes", "tokio-runtime"))
2020
val Tokio: CargoDependency = CargoDependency("tokio", CratesIo("1.20.1"), features = setOf("full"))
21+
val TokioStream: CargoDependency = CargoDependency("tokio-stream", CratesIo("0.1.12"))
2122
val Tracing: CargoDependency = CargoDependency("tracing", CratesIo("0.1"))
2223
val Tower: CargoDependency = CargoDependency("tower", CratesIo("0.4"))
2324
val TowerHttp: CargoDependency = CargoDependency("tower-http", CratesIo("0.3"), features = setOf("trace"))

0 commit comments

Comments
 (0)