How to implement __aiter__ ? #5261
-
How to implement #[pyclass]
struct Events {
inner: Pin<Box<dyn Stream<Item = PyObject> + Send + Sync + Unpin + 'static>>,
} |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 4 replies
-
I think something like that pub fn __aiter__(slf: PyRef<'_, Self>) -> PyResult<PyRef<'_, Self>> {
Ok(slf)
} |
Beta Was this translation helpful? Give feedback.
-
Sorry for tha badly phrased question. The question should have been: #[pyclass]
struct Events {
inner: Pin<Box<dyn Stream<Item = PyObject> + Send + Sync + Unpin + 'static>>,
} I did it like this:
#[pyclass]
struct Events {
inner: Arc<RwLock<Pin<Box<dyn Stream<Item = PyObject> + Send + Sync + Unpin + 'static>>>>,
}
impl Clone for Events {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
#[pyfunction]
#[pyo3(signature=(events))]
fn next_event<'p>(py: Python<'p>, events: &'p Events) -> PyResult<Bound<'p, PyAny>> {
let events = events.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let mut events = events.inner.write().await;
let event = events.next().await;
Ok(event)
})
}
import the_bindings
class EventStream:
def __init__(self, events):
self._events = events
def __aiter__(self):
return self
async def __anext__(self):
event = await the_bindings.next_event(self._events)
if event is None:
raise StopAsyncIteration
return event
# Somehow get the `events` from `the_bindings` library
event_stream = EventStream(events)
async for event in event_stream:
print(f"Received event: {event}") Is there a better way to do it? |
Beta Was this translation helpful? Give feedback.
-
I don't know if you want to achieve that or not but I think that's the example use std::pin::Pin;
use std::sync::Arc;
use futures::Stream;
use futures::stream::StreamExt;
use pyo3::exceptions::PyStopAsyncIteration;
use pyo3::prelude::*;
use pyo3_async_runtimes::tokio::future_into_py;
use tokio::sync::RwLock;
#[pyclass]
#[derive(Clone)]
pub struct Events {
inner: Arc<RwLock<Pin<Box<dyn Stream<Item = PyObject> + Send + Sync + Unpin>>>>,
}
#[pymethods]
impl Events {
#[new]
pub fn new(py:Python, events:Vec<Py<PyAny>>) -> PyResult<Self> {
let py_iter = events.iter();
let mut items = vec![];
for item in py_iter {
items.push(item.into_pyobject(py));
}
let stream = futures::stream::iter(events);
Ok(Self {
inner: Arc::new(RwLock::new(Box::pin(stream))),
})
}
fn __aiter__(slf: PyRef<'_, Self>) -> PyResult<PyRef<'_, Self>> {
Ok(slf)
}
fn __anext__<'p>(&self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
let events = self.clone();
let fut = future_into_py(py, async move {
let mut lock = events.inner.write().await;
match lock.next().await {
Some(item) => Ok(item),
None => Err(PyStopAsyncIteration::new_err("Stream exhausted")),
}
})?;
Ok(fut)
}
}
on the Python site it looks like this class Events:
def __init__(self,events: List[Any]) -> None: ...
def __aiter__(self) -> 'Events': ...
async def __anext__(self): ...
|
Beta Was this translation helpful? Give feedback.
I don't know if you want to achieve that or not but I think that's the example