Skip to content

Commit 7f285bc

Browse files
committed
fix opaque type
Signed-off-by: xxchan <xxchan22f@gmail.com>
1 parent 34902d5 commit 7f285bc

File tree

5 files changed

+94
-59
lines changed

5 files changed

+94
-59
lines changed

src/connector/src/sink/kafka.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,16 @@ struct KafkaPayloadWriter<'a> {
403403
config: &'a KafkaConfig,
404404
}
405405

406-
pub type KafkaSinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
406+
mod opaque_type {
407+
use super::*;
408+
pub type KafkaSinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
409+
410+
pub(super) fn map_delivery_future(future: DeliveryFuture) -> KafkaSinkDeliveryFuture {
411+
future.map(KafkaPayloadWriter::<'static>::map_future_result)
412+
}
413+
}
414+
use opaque_type::map_delivery_future;
415+
pub use opaque_type::KafkaSinkDeliveryFuture;
407416

408417
pub struct KafkaSinkWriter {
409418
formatter: SinkFormatterImpl,
@@ -482,7 +491,7 @@ impl<'w> KafkaPayloadWriter<'w> {
482491
Ok(delivery_future) => {
483492
if self
484493
.add_future
485-
.add_future_may_await(Self::map_delivery_future(delivery_future))
494+
.add_future_may_await(map_delivery_future(delivery_future))
486495
.await?
487496
{
488497
tracing::warn!(
@@ -567,10 +576,6 @@ impl<'w> KafkaPayloadWriter<'w> {
567576
Err(_) => Err(KafkaError::Canceled.into()),
568577
}
569578
}
570-
571-
fn map_delivery_future(future: DeliveryFuture) -> KafkaSinkDeliveryFuture {
572-
future.map(KafkaPayloadWriter::<'static>::map_future_result)
573-
}
574579
}
575580

576581
impl<'a> FormattedSink for KafkaPayloadWriter<'a> {

src/connector/src/sink/kinesis.rs

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -201,8 +201,36 @@ impl KinesisSinkWriter {
201201
}
202202
}
203203

204-
pub type KinesisSinkPayloadWriterDeliveryFuture =
205-
impl TryFuture<Ok = (), Error = SinkError> + Unpin + Send + 'static;
204+
mod opaque_type {
205+
use super::*;
206+
pub type KinesisSinkPayloadWriterDeliveryFuture =
207+
impl TryFuture<Ok = (), Error = SinkError> + Unpin + Send + 'static;
208+
209+
impl KinesisSinkPayloadWriter {
210+
pub(super) fn finish(self) -> KinesisSinkPayloadWriterDeliveryFuture {
211+
async move {
212+
let builder = self.builder.expect("should not be None");
213+
let context_fmt = format!(
214+
"failed to put record to {}",
215+
builder
216+
.get_stream_name()
217+
.as_ref()
218+
.expect("should have set stream name")
219+
);
220+
Retry::spawn(
221+
ExponentialBackoff::from_millis(100).map(jitter).take(3),
222+
|| builder.clone().send(),
223+
)
224+
.await
225+
.with_context(|| context_fmt.clone())
226+
.map_err(SinkError::Kinesis)?;
227+
Ok(())
228+
}
229+
.boxed()
230+
}
231+
}
232+
}
233+
pub use opaque_type::KinesisSinkPayloadWriterDeliveryFuture;
206234

207235
impl KinesisSinkPayloadWriter {
208236
fn put_record(&mut self, key: String, payload: Vec<u8>) {
@@ -216,28 +244,6 @@ impl KinesisSinkPayloadWriter {
216244
),
217245
);
218246
}
219-
220-
fn finish(self) -> KinesisSinkPayloadWriterDeliveryFuture {
221-
async move {
222-
let builder = self.builder.expect("should not be None");
223-
let context_fmt = format!(
224-
"failed to put record to {}",
225-
builder
226-
.get_stream_name()
227-
.as_ref()
228-
.expect("should have set stream name")
229-
);
230-
Retry::spawn(
231-
ExponentialBackoff::from_millis(100).map(jitter).take(3),
232-
|| builder.clone().send(),
233-
)
234-
.await
235-
.with_context(|| context_fmt.clone())
236-
.map_err(SinkError::Kinesis)?;
237-
Ok(())
238-
}
239-
.boxed()
240-
}
241247
}
242248

243249
impl FormattedSink for KinesisSinkPayloadWriter {

src/connector/src/sink/pulsar.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -235,15 +235,20 @@ struct PulsarPayloadWriter<'w> {
235235
add_future: DeliveryFutureManagerAddFuture<'w, PulsarDeliveryFuture>,
236236
}
237237

238-
pub type PulsarDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
239-
240-
fn may_delivery_future(future: SendFuture) -> PulsarDeliveryFuture {
241-
future.map(|result| {
242-
result
243-
.map(|_| ())
244-
.map_err(|e: pulsar::Error| SinkError::Pulsar(anyhow!(e)))
245-
})
238+
mod opaque_type {
239+
use super::*;
240+
pub type PulsarDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
241+
242+
pub(super) fn may_delivery_future(future: SendFuture) -> PulsarDeliveryFuture {
243+
future.map(|result| {
244+
result
245+
.map(|_| ())
246+
.map_err(|e: pulsar::Error| SinkError::Pulsar(anyhow!(e)))
247+
})
248+
}
246249
}
250+
use opaque_type::may_delivery_future;
251+
pub use opaque_type::PulsarDeliveryFuture;
247252

248253
impl PulsarSinkWriter {
249254
pub async fn new(

src/jni_core/src/lib.rs

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -242,25 +242,28 @@ struct JavaClassMethodCache {
242242
utc: OnceLock<GlobalRef>,
243243
}
244244

245-
// TODO: may only return a RowRef
246-
pub type StreamChunkRowIterator<'a> = impl Iterator<Item = (Op, OwnedRow)> + 'a;
245+
mod opaque_type {
246+
use super::*;
247+
// TODO: may only return a RowRef
248+
pub type StreamChunkRowIterator<'a> = impl Iterator<Item = (Op, OwnedRow)> + 'a;
249+
250+
impl<'a> JavaBindingIteratorInner<'a> {
251+
pub(super) fn from_chunk(chunk: &'a StreamChunk) -> JavaBindingIteratorInner<'a> {
252+
JavaBindingIteratorInner::StreamChunk(
253+
chunk
254+
.rows()
255+
.map(|(op, row)| (op.to_protobuf(), row.to_owned_row())),
256+
)
257+
}
258+
}
259+
}
260+
pub use opaque_type::StreamChunkRowIterator;
247261
pub type HummockJavaBindingIterator = BoxStream<'static, anyhow::Result<(Bytes, OwnedRow)>>;
248-
249262
pub enum JavaBindingIteratorInner<'a> {
250263
Hummock(HummockJavaBindingIterator),
251264
StreamChunk(StreamChunkRowIterator<'a>),
252265
}
253266

254-
impl<'a> JavaBindingIteratorInner<'a> {
255-
fn from_chunk(chunk: &'a StreamChunk) -> JavaBindingIteratorInner<'a> {
256-
JavaBindingIteratorInner::StreamChunk(
257-
chunk
258-
.rows()
259-
.map(|(op, row)| (op.to_protobuf(), row.to_owned_row())),
260-
)
261-
}
262-
}
263-
264267
enum RowExtra {
265268
Op(Op),
266269
Key(Bytes),

src/storage/src/store_impl.rs

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,27 @@ use crate::monitor::{
4242
use crate::opts::StorageOpts;
4343
use crate::StateStore;
4444

45-
pub type HummockStorageType = impl StateStore + AsHummock;
46-
pub type MemoryStateStoreType = impl StateStore + AsHummock;
47-
pub type SledStateStoreType = impl StateStore + AsHummock;
45+
mod opaque_type {
46+
use super::*;
47+
48+
pub type HummockStorageType = impl StateStore + AsHummock;
49+
pub type MemoryStateStoreType = impl StateStore + AsHummock;
50+
pub type SledStateStoreType = impl StateStore + AsHummock;
51+
52+
pub fn in_memory(state_store: MemoryStateStore) -> MemoryStateStoreType {
53+
may_dynamic_dispatch(state_store)
54+
}
55+
56+
pub fn hummock(state_store: HummockStorage) -> HummockStorageType {
57+
may_dynamic_dispatch(may_verify(state_store))
58+
}
59+
60+
pub fn sled(state_store: SledStateStore) -> SledStateStoreType {
61+
may_dynamic_dispatch(state_store)
62+
}
63+
}
64+
use opaque_type::{hummock, in_memory, sled};
65+
pub use opaque_type::{HummockStorageType, MemoryStateStoreType, SledStateStoreType};
4866

4967
/// The type erased [`StateStore`].
5068
#[derive(Clone, EnumAsInner)]
@@ -114,24 +132,22 @@ impl StateStoreImpl {
114132
storage_metrics: Arc<MonitoredStorageMetrics>,
115133
) -> Self {
116134
// The specific type of MemoryStateStoreType in deducted here.
117-
Self::MemoryStateStore(may_dynamic_dispatch(state_store).monitored(storage_metrics))
135+
Self::MemoryStateStore(in_memory(state_store).monitored(storage_metrics))
118136
}
119137

120138
pub fn hummock(
121139
state_store: HummockStorage,
122140
storage_metrics: Arc<MonitoredStorageMetrics>,
123141
) -> Self {
124142
// The specific type of HummockStateStoreType in deducted here.
125-
Self::HummockStateStore(
126-
may_dynamic_dispatch(may_verify(state_store)).monitored(storage_metrics),
127-
)
143+
Self::HummockStateStore(hummock(state_store).monitored(storage_metrics))
128144
}
129145

130146
pub fn sled(
131147
state_store: SledStateStore,
132148
storage_metrics: Arc<MonitoredStorageMetrics>,
133149
) -> Self {
134-
Self::SledStateStore(may_dynamic_dispatch(state_store).monitored(storage_metrics))
150+
Self::SledStateStore(sled(state_store).monitored(storage_metrics))
135151
}
136152

137153
pub fn shared_in_memory_store(storage_metrics: Arc<MonitoredStorageMetrics>) -> Self {

0 commit comments

Comments
 (0)