|
2 | 2 |
|
3 | 3 | use super::ColumnFamily;
|
4 | 4 | use crate::{rpc::RpcClient, BoundRange, Error, Key, KvPair, Result, Value};
|
| 5 | +use futures::future::Either; |
5 | 6 | use futures::prelude::*;
|
6 | 7 | use futures::task::{Context, Poll};
|
7 | 8 | use std::{pin::Pin, sync::Arc, u32};
|
8 | 9 |
|
9 | 10 | const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
|
10 | 11 |
|
11 |
| -type BoxTryFuture<Resp> = Box<dyn Future<Output = Result<Resp>> + Send>; |
12 |
| - |
13 | 12 | trait RequestInner: Sized {
|
14 | 13 | type Resp;
|
| 14 | + type F: Future<Output = Result<Self::Resp>>; |
15 | 15 |
|
16 |
| - fn execute(self, client: Arc<RpcClient>, cf: Option<ColumnFamily>) -> BoxTryFuture<Self::Resp>; |
| 16 | + fn execute(self, client: Arc<RpcClient>, cf: Option<ColumnFamily>) -> Self::F; |
17 | 17 | }
|
18 | 18 |
|
19 | 19 | enum RequestState<Inner>
|
20 | 20 | where
|
21 | 21 | Inner: RequestInner,
|
22 | 22 | {
|
23 | 23 | Uninitiated(Option<(Arc<RpcClient>, Inner, Option<ColumnFamily>)>),
|
24 |
| - Initiated(BoxTryFuture<Inner::Resp>), |
| 24 | + Initiated(Inner::F), |
25 | 25 | }
|
26 | 26 |
|
27 | 27 | impl<Inner> RequestState<Inner>
|
|
61 | 61 | ) -> Pin<&'a mut dyn Future<Output = Result<Inner::Resp>>> {
|
62 | 62 | unsafe {
|
63 | 63 | match Pin::get_unchecked_mut(self) {
|
64 |
| - RequestState::Initiated(future) => Pin::new_unchecked(&mut **future), |
| 64 | + RequestState::Initiated(future) => Pin::new_unchecked(&mut *future), |
65 | 65 | _ => unreachable!(),
|
66 | 66 | }
|
67 | 67 | }
|
@@ -109,13 +109,10 @@ struct GetInner {
|
109 | 109 |
|
110 | 110 | impl RequestInner for GetInner {
|
111 | 111 | type Resp = Option<Value>;
|
| 112 | + existential type F: Future<Output = Result<Option<Value>>>; |
112 | 113 |
|
113 |
| - fn execute( |
114 |
| - self, |
115 |
| - client: Arc<RpcClient>, |
116 |
| - cf: Option<ColumnFamily>, |
117 |
| - ) -> BoxTryFuture<Option<Value>> { |
118 |
| - Box::new(client.raw_get(self.key, cf)) |
| 114 | + fn execute(self, client: Arc<RpcClient>, cf: Option<ColumnFamily>) -> Self::F { |
| 115 | + client.raw_get(self.key, cf) |
119 | 116 | }
|
120 | 117 | }
|
121 | 118 |
|
@@ -155,13 +152,10 @@ struct BatchGetInner {
|
155 | 152 |
|
156 | 153 | impl RequestInner for BatchGetInner {
|
157 | 154 | type Resp = Vec<KvPair>;
|
| 155 | + existential type F: Future<Output = Result<Vec<KvPair>>>; |
158 | 156 |
|
159 |
| - fn execute( |
160 |
| - self, |
161 |
| - client: Arc<RpcClient>, |
162 |
| - cf: Option<ColumnFamily>, |
163 |
| - ) -> BoxTryFuture<Vec<KvPair>> { |
164 |
| - Box::new(client.raw_batch_get(self.keys, cf)) |
| 157 | + fn execute(self, client: Arc<RpcClient>, cf: Option<ColumnFamily>) -> Self::F { |
| 158 | + client.raw_batch_get(self.keys, cf) |
165 | 159 | }
|
166 | 160 | }
|
167 | 161 |
|
@@ -202,10 +196,11 @@ struct PutInner {
|
202 | 196 |
|
203 | 197 | impl RequestInner for PutInner {
|
204 | 198 | type Resp = ();
|
| 199 | + existential type F: Future<Output = Result<()>>; |
205 | 200 |
|
206 |
| - fn execute(self, client: Arc<RpcClient>, cf: Option<ColumnFamily>) -> BoxTryFuture<()> { |
| 201 | + fn execute(self, client: Arc<RpcClient>, cf: Option<ColumnFamily>) -> Self::F { |
207 | 202 | let (key, value) = (self.key, self.value);
|
208 |
| - Box::new(client.raw_put(key, value, cf)) |
| 203 | + client.raw_put(key, value, cf) |
209 | 204 | }
|
210 | 205 | }
|
211 | 206 |
|
@@ -244,9 +239,10 @@ struct BatchPutInner {
|
244 | 239 |
|
245 | 240 | impl RequestInner for BatchPutInner {
|
246 | 241 | type Resp = ();
|
| 242 | + existential type F: Future<Output = Result<()>>; |
247 | 243 |
|
248 |
| - fn execute(self, client: Arc<RpcClient>, cf: Option<ColumnFamily>) -> BoxTryFuture<()> { |
249 |
| - Box::new(client.raw_batch_put(self.pairs, cf)) |
| 244 | + fn execute(self, client: Arc<RpcClient>, cf: Option<ColumnFamily>) -> Self::F { |
| 245 | + client.raw_batch_put(self.pairs, cf) |
250 | 246 | }
|
251 | 247 | }
|
252 | 248 |
|
@@ -285,9 +281,10 @@ struct DeleteInner {
|
285 | 281 |
|
286 | 282 | impl RequestInner for DeleteInner {
|
287 | 283 | type Resp = ();
|
| 284 | + existential type F: Future<Output = Result<()>>; |
288 | 285 |
|
289 |
| - fn execute(self, client: Arc<RpcClient>, cf: Option<ColumnFamily>) -> BoxTryFuture<()> { |
290 |
| - Box::new(client.raw_delete(self.key, cf)) |
| 286 | + fn execute(self, client: Arc<RpcClient>, cf: Option<ColumnFamily>) -> Self::F { |
| 287 | + client.raw_delete(self.key, cf) |
291 | 288 | }
|
292 | 289 | }
|
293 | 290 |
|
@@ -326,9 +323,10 @@ struct BatchDeleteInner {
|
326 | 323 |
|
327 | 324 | impl RequestInner for BatchDeleteInner {
|
328 | 325 | type Resp = ();
|
| 326 | + existential type F: Future<Output = Result<()>>; |
329 | 327 |
|
330 |
| - fn execute(self, client: Arc<RpcClient>, cf: Option<ColumnFamily>) -> BoxTryFuture<()> { |
331 |
| - Box::new(client.raw_batch_delete(self.keys, cf)) |
| 328 | + fn execute(self, client: Arc<RpcClient>, cf: Option<ColumnFamily>) -> Self::F { |
| 329 | + client.raw_batch_delete(self.keys, cf) |
332 | 330 | }
|
333 | 331 | }
|
334 | 332 |
|
@@ -386,19 +384,16 @@ impl ScanInner {
|
386 | 384 |
|
387 | 385 | impl RequestInner for ScanInner {
|
388 | 386 | type Resp = Vec<KvPair>;
|
| 387 | + existential type F: Future<Output = Result<Vec<KvPair>>>; |
389 | 388 |
|
390 |
| - fn execute( |
391 |
| - self, |
392 |
| - client: Arc<RpcClient>, |
393 |
| - cf: Option<ColumnFamily>, |
394 |
| - ) -> BoxTryFuture<Vec<KvPair>> { |
| 389 | + fn execute(self, client: Arc<RpcClient>, cf: Option<ColumnFamily>) -> Self::F { |
395 | 390 | if self.limit > MAX_RAW_KV_SCAN_LIMIT {
|
396 |
| - Box::new(future::err(Error::max_scan_limit_exceeded( |
| 391 | + Either::Right(future::err(Error::max_scan_limit_exceeded( |
397 | 392 | self.limit,
|
398 | 393 | MAX_RAW_KV_SCAN_LIMIT,
|
399 | 394 | )))
|
400 | 395 | } else {
|
401 |
| - Box::new(client.raw_scan(self.range, self.limit, self.key_only, cf)) |
| 396 | + Either::Left(client.raw_scan(self.range, self.limit, self.key_only, cf)) |
402 | 397 | }
|
403 | 398 | }
|
404 | 399 | }
|
@@ -457,19 +452,16 @@ impl BatchScanInner {
|
457 | 452 |
|
458 | 453 | impl RequestInner for BatchScanInner {
|
459 | 454 | type Resp = Vec<KvPair>;
|
| 455 | + existential type F: Future<Output = Result<Vec<KvPair>>>; |
460 | 456 |
|
461 |
| - fn execute( |
462 |
| - self, |
463 |
| - client: Arc<RpcClient>, |
464 |
| - cf: Option<ColumnFamily>, |
465 |
| - ) -> BoxTryFuture<Vec<KvPair>> { |
| 457 | + fn execute(self, client: Arc<RpcClient>, cf: Option<ColumnFamily>) -> Self::F { |
466 | 458 | if self.each_limit > MAX_RAW_KV_SCAN_LIMIT {
|
467 |
| - Box::new(future::err(Error::max_scan_limit_exceeded( |
| 459 | + Either::Right(future::err(Error::max_scan_limit_exceeded( |
468 | 460 | self.each_limit,
|
469 | 461 | MAX_RAW_KV_SCAN_LIMIT,
|
470 | 462 | )))
|
471 | 463 | } else {
|
472 |
| - Box::new(client.raw_batch_scan(self.ranges, self.each_limit, self.key_only, cf)) |
| 464 | + Either::Left(client.raw_batch_scan(self.ranges, self.each_limit, self.key_only, cf)) |
473 | 465 | }
|
474 | 466 | }
|
475 | 467 | }
|
@@ -510,8 +502,9 @@ pub struct DeleteRangeInner {
|
510 | 502 |
|
511 | 503 | impl RequestInner for DeleteRangeInner {
|
512 | 504 | type Resp = ();
|
| 505 | + existential type F: Future<Output = Result<()>>; |
513 | 506 |
|
514 |
| - fn execute(self, client: Arc<RpcClient>, cf: Option<ColumnFamily>) -> BoxTryFuture<()> { |
515 |
| - Box::new(client.raw_delete_range(self.range, cf)) |
| 507 | + fn execute(self, client: Arc<RpcClient>, cf: Option<ColumnFamily>) -> Self::F { |
| 508 | + client.raw_delete_range(self.range, cf) |
516 | 509 | }
|
517 | 510 | }
|
0 commit comments