Skip to content

Commit af2ea3a

Browse files
authored
add async python interlayer and bump 0.0.3 (#26)
* add async python interlayer closes #21 Signed-off-by: andylokandy <andylokandy@hotmail.com> * fix typo in python interlayer closes #6 Signed-off-by: andylokandy <andylokandy@hotmail.com> * allow multiple pd endpoints closes #22 Signed-off-by: andylokandy <andylokandy@hotmail.com> * remove whitespace Signed-off-by: andylokandy <andylokandy@hotmail.com>
1 parent 763cf9f commit af2ea3a

File tree

9 files changed

+172
-32
lines changed

9 files changed

+172
-32
lines changed

CHANGELOG

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# Change Log
2+
3+
* 0.0.3
4+
5+
* Add `Transaction.rollback()` to abort a transaction.
6+
7+
* Support multiple PD endpoints.
8+
9+
Example:
10+
11+
```python
12+
from tikv_client import RawClient
13+
14+
client = RawClient.connect(["127.0.0.1:2379"])
15+
```

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
authors = ["Andy Lok <andylokandy@hotmail.com>"]
33
edition = "2018"
44
name = "tikv-client"
5-
version = "0.0.2"
5+
version = "0.0.3"
66

77
[dependencies]
88
futures = "0.3"

examples/run.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
from tikv_client import RawClient
22

3-
client = RawClient.connect("127.0.0.1:2379")
3+
client = RawClient.connect(["127.0.0.1:2379"])
44

55
client.put(b"k1", b"v1")
66
client.put(b"k2", b"v2")
77
client.put(b"k3", b"v3")
8-
client.put(b"k4", b"v4")
9-
client.put(b"k5", b"v5")
8+
client.batch_put({b"k4": b"v4", b"k5": b"v5"})
109

1110
print(client.get(b"k3"))
1211
print(client.batch_get([b"k1", b"k4"]))

examples/run_async.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from tikv_client.asynchronous import TransactionClient
33

44
async def main():
5-
client = await TransactionClient.connect("127.0.0.1:2379")
5+
client = await TransactionClient.connect(["127.0.0.1:2379"])
66

77
txn = await client.begin(pessimistic=True)
88
await txn.put(b"k1", b"v1")

src/raw.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
use std::convert::TryInto;
44
use std::sync::Arc;
55

6-
use pyo3::exceptions::PyException;
76
use pyo3::prelude::*;
87
use pyo3::types::*;
98
use pyo3::ToPyObject;
@@ -18,17 +17,14 @@ pub struct RawClient {
1817

1918
#[pymethods]
2019
impl RawClient {
21-
#[new]
22-
pub fn new() -> PyResult<Self> {
23-
Err(PyException::new_err(
24-
"Please use `RawClient.connect()` instead.",
25-
))
26-
}
27-
2820
#[classmethod]
29-
pub fn connect<'p>(_cls: &PyType, py: Python<'p>, pd_endpoint: String) -> PyResult<&'p PyAny> {
21+
pub fn connect<'p>(
22+
_cls: &PyType,
23+
py: Python<'p>,
24+
pd_endpoints: Vec<String>,
25+
) -> PyResult<&'p PyAny> {
3026
future_into_py(py, async move {
31-
let inner = tikv_client::RawClient::new(vec![pd_endpoint], None)
27+
let inner = tikv_client::RawClient::new(pd_endpoints, None)
3228
.await
3329
.map_err(to_py_execption)?;
3430
let client = RawClient {

src/transaction.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
use std::sync::Arc;
44

5-
use pyo3::exceptions::PyException;
65
use pyo3::prelude::*;
76
use pyo3::types::*;
87
use pyo3::ToPyObject;
@@ -20,17 +19,14 @@ pub struct TransactionClient {
2019

2120
#[pymethods]
2221
impl TransactionClient {
23-
#[new]
24-
pub fn new() -> PyResult<Self> {
25-
Err(PyException::new_err(
26-
"Please use `TransactionClient.connect()` instead.",
27-
))
28-
}
29-
3022
#[classmethod]
31-
pub fn connect<'p>(_cls: &PyType, py: Python<'p>, pd_endpoint: String) -> PyResult<&'p PyAny> {
23+
pub fn connect<'p>(
24+
_cls: &PyType,
25+
py: Python<'p>,
26+
pd_endpoints: Vec<String>,
27+
) -> PyResult<&'p PyAny> {
3228
future_into_py(py, async move {
33-
let inner = tikv_client::TransactionClient::new(vec![pd_endpoint], None)
29+
let inner = tikv_client::TransactionClient::new(pd_endpoints, None)
3430
.await
3531
.map_err(to_py_execption)?;
3632
let client = TransactionClient {

tikv_client/__init__.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ def __init__(self):
99
raise Exception("Please use `RawClient.connect()` instead.")
1010

1111
@classmethod
12-
def connect(cls, pd_endpoint):
12+
def connect(cls, pd_endpoints):
1313
event_loop = asyncio.get_event_loop()
1414
inner = event_loop.run_until_complete(
15-
asynchronous.RawClient.connect(pd_endpoint))
15+
asynchronous.RawClient.connect(pd_endpoints))
1616
self = cls.__new__(cls)
1717
self.inner = inner
1818
return self
@@ -39,7 +39,7 @@ def put(self, key, value, cf="default"):
3939

4040
def batch_put(self, pairs, cf="default"):
4141
event_loop = asyncio.get_event_loop()
42-
event_loop.run_until_complete(self.inner.put(pairs, cf))
42+
event_loop.run_until_complete(self.inner.batch_put(pairs, cf))
4343

4444
def delete(self, key, cf="default"):
4545
event_loop = asyncio.get_event_loop()
@@ -59,10 +59,10 @@ def __init__(self):
5959
raise Exception("Please use `TransactionClient.connect()` instead.")
6060

6161
@classmethod
62-
def connect(cls, pd_endpoint):
62+
def connect(cls, pd_endpoints):
6363
event_loop = asyncio.get_event_loop()
6464
inner = event_loop.run_until_complete(
65-
asynchronous.TransactionClient.connect(pd_endpoint))
65+
asynchronous.TransactionClient.connect(pd_endpoints))
6666
self = cls.__new__(cls)
6767
self.inner = inner
6868
return self
@@ -159,3 +159,7 @@ def delete(self, key):
159159
def commit(self):
160160
event_loop = asyncio.get_event_loop()
161161
event_loop.run_until_complete(self.inner.commit())
162+
163+
def rollback(self):
164+
event_loop = asyncio.get_event_loop()
165+
event_loop.run_until_complete(self.inner.rollback())

tikv_client/asynchronous/__init__.py

Lines changed: 131 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,133 @@
11
# Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.
22

3-
from ..tikv_client import (RawClient, TransactionClient)
3+
from .. import tikv_client
4+
5+
6+
class RawClient:
7+
def __init__(self):
8+
raise Exception("Please use `RawClient.connect()` instead.")
9+
10+
@classmethod
11+
async def connect(cls, pd_endpoints):
12+
if not isinstance(pd_endpoints, list):
13+
raise Exception("Please use list as pd_endpoints. For example: `RawClient.connect([127.0.0.1:2379])`.")
14+
inner = await tikv_client.RawClient.connect(pd_endpoints)
15+
self = cls.__new__(cls)
16+
self.inner = inner
17+
return self
18+
19+
async def get(self, key, cf="default"):
20+
return await self.inner.get(key, cf)
21+
22+
async def batch_get(self, keys, cf="default"):
23+
return await self.inner.batch_get(keys, cf)
24+
25+
async def scan(self, start, end, limit, include_start=True, include_end=False, cf="default"):
26+
return await self.inner.scan(start, end, limit, include_start, include_end, cf)
27+
28+
async def scan_keys(self, start, end, limit, include_start=True, include_end=False, cf="default"):
29+
return await self.inner.scan_keys(start, end, limit, include_start, include_end, cf)
30+
31+
async def put(self, key, value, cf="default"):
32+
await self.inner.put(key, value, cf)
33+
34+
async def batch_put(self, pairs, cf="default"):
35+
await self.inner.batch_put(pairs, cf)
36+
37+
async def delete(self, key, cf="default"):
38+
await self.inner.delete(key, cf)
39+
40+
async def batch_delete(self, keys, cf="default"):
41+
return await self.inner.batch_delete(keys, cf)
42+
43+
async def delete_range(self, start, end=None, include_start=True, include_end=False, cf="default"):
44+
return await self.inner.delete_range(start, end, include_start, include_end, cf)
45+
46+
47+
class TransactionClient:
48+
def __init__(self):
49+
raise Exception("Please use `TransactionClient.connect()` instead.")
50+
51+
@classmethod
52+
async def connect(cls, pd_endpoints):
53+
if not isinstance(pd_endpoints, list):
54+
raise Exception("Please use list as pd_endpoints. For example: `TransactionClient.connect([127.0.0.1:2379])`.")
55+
inner = await tikv_client.TransactionClient.connect(pd_endpoints)
56+
self = cls.__new__(cls)
57+
self.inner = inner
58+
return self
59+
60+
async def begin(self, pessimistic=False):
61+
transaction = await self.inner.begin(pessimistic)
62+
return Transaction(transaction)
63+
64+
async def current_timestamp(self):
65+
return await self.inner.current_timestamp()
66+
67+
def snapshot(self, timestamp, pessimistic):
68+
snapshot = self.inner.snapshot(timestamp, pessimistic)
69+
return Snapshot(snapshot)
70+
71+
72+
class Snapshot:
73+
def __init__(self, inner):
74+
self.inner = inner
75+
76+
async def get(self, key):
77+
return await self.inner.get(key)
78+
79+
async def key_exists(self, key):
80+
return await self.inner.key_exists(key)
81+
82+
async def batch_get(self, keys):
83+
return await self.inner.batch_get(keys)
84+
85+
async def scan(self, start, end, limit, include_start=True, include_end=False):
86+
return await self.inner.scan(start, end, limit, include_start, include_end)
87+
88+
async def scan_keys(self, start, end, limit, include_start=True, include_end=False):
89+
return await self.inner.scan_keys(start, end, limit, include_start, include_end)
90+
91+
92+
class Transaction:
93+
def __init__(self, inner):
94+
self.inner = inner
95+
96+
async def get(self, key):
97+
return await self.inner.get(key)
98+
99+
async def get_for_update(self, key):
100+
return await self.inner.get_for_update(key)
101+
102+
async def key_exists(self, key):
103+
return await self.inner.key_exists(key)
104+
105+
async def batch_get(self, keys):
106+
return await self.inner.batch_get(keys)
107+
108+
async def batch_get_for_update(self, keys):
109+
return await self.inner.batch_get_for_update(keys)
110+
111+
async def scan(self, start, end, limit, include_start=True, include_end=False):
112+
return await self.inner.scan(start, end, limit, include_start, include_end)
113+
114+
async def scan_keys(self, start, end, limit, include_start=True, include_end=False):
115+
return await self.inner.scan_keys(start, end, limit, include_start, include_end)
116+
117+
async def lock_keys(self, keys):
118+
await self.inner.lock_keys(keys)
119+
120+
async def put(self, key, value):
121+
await self.inner.put(key, value)
122+
123+
async def insert(self, key, value):
124+
await self.inner.insert(key, value)
125+
126+
async def delete(self, key):
127+
await self.inner.delete(key)
128+
129+
async def commit(self):
130+
await self.inner.commit()
131+
132+
async def rollback(self):
133+
await self.inner.rollback()

0 commit comments

Comments
 (0)