Skip to content

Commit 2ea6ecc

Browse files
authored
Merge pull request #73 from nrc/box-futures
Refactor everything
2 parents 56bddc2 + fb55c45 commit 2ea6ecc

File tree

21 files changed

+607
-1127
lines changed

21 files changed

+607
-1127
lines changed

examples/raw.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
22

33
#![feature(async_await, await_macro)]
4+
#![type_length_limit = "3081103"]
45

56
mod common;
67

@@ -84,8 +85,8 @@ async fn main() -> Result<()> {
8485
let start = "k1";
8586
let end = "k2";
8687
let pairs = client
88+
.with_key_only(true)
8789
.scan(start..=end, 10)
88-
.key_only()
8990
.await
9091
.expect("Could not scan");
9192

src/compat.rs

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
66
use futures::prelude::*;
77
use futures::task::{Context, Poll};
8-
use futures::try_ready;
8+
use futures::{ready, try_ready};
99
use std::pin::Pin;
1010

1111
/// The status of a `loop_fn` loop.
@@ -62,6 +62,83 @@ where
6262
}
6363
}
6464

65+
pub(crate) fn stream_fn<S, T, A, F, E>(initial_state: S, mut func: F) -> LoopFn<A, F>
66+
where
67+
F: FnMut(S) -> A,
68+
A: Future<Output = Result<Option<(S, T)>, E>>,
69+
{
70+
LoopFn {
71+
future: func(initial_state),
72+
func,
73+
}
74+
}
75+
76+
impl<S, T, A, F, E> Stream for LoopFn<A, F>
77+
where
78+
F: FnMut(S) -> A,
79+
A: Future<Output = Result<Option<(S, T)>, E>>,
80+
{
81+
type Item = Result<T, E>;
82+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
83+
unsafe {
84+
let this = Pin::get_unchecked_mut(self);
85+
match ready!(Pin::new_unchecked(&mut this.future).poll(cx)) {
86+
Err(e) => Poll::Ready(Some(Err(e))),
87+
Ok(None) => Poll::Ready(None),
88+
Ok(Some((s, t))) => {
89+
this.future = (this.func)(s);
90+
Poll::Ready(Some(Ok(t)))
91+
}
92+
}
93+
}
94+
}
95+
}
96+
97+
/// A future created by the `ok_and_then` method.
98+
#[derive(Debug)]
99+
#[must_use = "futures do nothing unless polled"]
100+
pub(crate) struct OkAndThen<A, F> {
101+
future: A,
102+
func: F,
103+
}
104+
105+
impl<U, T, A, F, E> Future for OkAndThen<A, F>
106+
where
107+
F: FnMut(U) -> Result<T, E>,
108+
A: Future<Output = Result<U, E>>,
109+
{
110+
type Output = Result<T, E>;
111+
112+
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<T, E>> {
113+
unsafe {
114+
let this = Pin::get_unchecked_mut(self);
115+
let result = try_ready!(Pin::new_unchecked(&mut this.future).poll(cx));
116+
Poll::Ready((this.func)(result))
117+
}
118+
}
119+
}
120+
121+
/// An extension crate to make using our combinator functions more ergonomic.
122+
pub(crate) trait ClientFutureExt {
123+
/// This function is similar to `map_ok` combinator. Provide a function which
124+
/// is applied after the `self` future is resolved, only if that future
125+
/// resolves to `Ok`. Similar to `Result::and_then`, the supplied function
126+
/// must return a Result (c.f., `map_ok`, which returns the underlying type,
127+
/// `T`).
128+
///
129+
/// Note that unlike `and_then`, the supplied function returns a resolved
130+
/// value, not a closure.
131+
fn ok_and_then<U, T, F, E>(self, func: F) -> OkAndThen<Self, F>
132+
where
133+
F: FnMut(U) -> Result<T, E>,
134+
Self: Future<Output = Result<U, E>> + Sized,
135+
{
136+
OkAndThen { future: self, func }
137+
}
138+
}
139+
140+
impl<T: TryFuture> ClientFutureExt for T {}
141+
65142
/// Emulate `send_all`/`SendAll` from futures 0.1 since the 0.3 versions don't
66143
/// work with Tokio `Handle`s due to ownership differences.
67144
pub(crate) trait SinkCompat<I, E> {

src/kv.rs

Lines changed: 21 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
22

3+
use derive_new::new;
34
use std::cmp::{Eq, PartialEq};
45
use std::convert::TryFrom;
56
use std::ops::{Bound, Deref, DerefMut, Range, RangeFrom, RangeInclusive};
@@ -55,20 +56,10 @@ impl<'a> fmt::Display for HexRepr<'a> {
5556
/// **But, you should not need to worry about all this:** Many functions which accept a `Key`
5657
/// accept an `Into<Key>`, which means all of the above types can be passed directly to those
5758
/// functions.
58-
#[derive(Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
59+
#[derive(new, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
5960
pub struct Key(Vec<u8>);
6061

6162
impl Key {
62-
#[inline]
63-
pub fn new(value: Vec<u8>) -> Self {
64-
Key(value)
65-
}
66-
67-
#[inline]
68-
pub(crate) fn into_inner(self) -> Vec<u8> {
69-
self.0
70-
}
71-
7263
#[inline]
7364
fn zero_terminated(&self) -> bool {
7465
self.0.last().map(|i| *i == 0).unwrap_or(false)
@@ -142,6 +133,12 @@ impl AsMut<[u8]> for Key {
142133
}
143134
}
144135

136+
impl Into<Vec<u8>> for Key {
137+
fn into(self) -> Vec<u8> {
138+
self.0
139+
}
140+
}
141+
145142
impl Deref for Key {
146143
type Target = [u8];
147144

@@ -199,21 +196,9 @@ impl fmt::Debug for Key {
199196
/// **But, you should not need to worry about all this:** Many functions which accept a `Value`
200197
/// accept an `Into<Value>`, which means all of the above types can be passed directly to those
201198
/// functions.
202-
#[derive(Default, Clone, Eq, PartialEq, Hash)]
199+
#[derive(new, Default, Clone, Eq, PartialEq, Hash)]
203200
pub struct Value(Vec<u8>);
204201

205-
impl Value {
206-
#[inline]
207-
pub fn new(value: Vec<u8>) -> Self {
208-
Value(value)
209-
}
210-
211-
#[inline]
212-
pub(crate) fn into_inner(self) -> Vec<u8> {
213-
self.0
214-
}
215-
}
216-
217202
impl From<Vec<u8>> for Value {
218203
fn from(v: Vec<u8>) -> Self {
219204
Value(v)
@@ -232,6 +217,12 @@ impl From<&'static str> for Value {
232217
}
233218
}
234219

220+
impl Into<Vec<u8>> for Value {
221+
fn into(self) -> Vec<u8> {
222+
self.0
223+
}
224+
}
225+
235226
impl Deref for Value {
236227
type Target = [u8];
237228

@@ -290,11 +281,6 @@ impl KvPair {
290281
&self.1
291282
}
292283

293-
#[inline]
294-
pub fn into_inner(self) -> (Key, Value) {
295-
(self.0, self.1)
296-
}
297-
298284
#[inline]
299285
pub fn into_key(self) -> Key {
300286
self.0
@@ -340,6 +326,12 @@ where
340326
}
341327
}
342328

329+
impl Into<(Key, Value)> for KvPair {
330+
fn into(self) -> (Key, Value) {
331+
(self.0, self.1)
332+
}
333+
}
334+
343335
impl fmt::Debug for KvPair {
344336
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
345337
let KvPair(key, value) = self;

src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,13 @@ pub mod raw;
8282
mod rpc;
8383
pub mod transaction;
8484

85+
#[macro_use]
86+
extern crate lazy_static;
87+
#[macro_use]
88+
extern crate log;
89+
#[macro_use]
90+
extern crate prometheus;
91+
8592
#[doc(inline)]
8693
pub use crate::config::Config;
8794
#[doc(inline)]

0 commit comments

Comments
 (0)