Skip to content

Commit 83305bf

Browse files
committed
Main logic for saving data & events to storage
1 parent a50f0b3 commit 83305bf

File tree

3 files changed

+194
-0
lines changed

3 files changed

+194
-0
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
blocks/*.json
2+
latest_height.txt
3+
__pycache__/

main.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
import json
2+
import os
3+
4+
import rel
5+
import websocket
6+
7+
from util import (
8+
decode_txs,
9+
get_block_events,
10+
get_block_height,
11+
get_block_txs,
12+
get_unique_event_addresses,
13+
remove_useless_data,
14+
update_latest_height,
15+
)
16+
17+
# TODO: Save Txs & events to postgres?
18+
# maybe a redis cache as well so other people can subscribe to redis for events?
19+
20+
# https://docs.tendermint.com/v0.34/rpc/
21+
RPC_IP = "15.204.143.232:26657"
22+
RPC_URL = f"ws://{RPC_IP}/websocket"
23+
WALLET_PREFIX = "juno1"
24+
COSMOS_BINARY_FILE = "junod"
25+
26+
current_dir = os.path.dirname(os.path.realpath(__file__))
27+
blocks = os.path.join(current_dir, "blocks")
28+
os.makedirs(blocks, exist_ok=True)
29+
30+
31+
# TODO: get current height. If there is a difference, then we need to query the RPC for the missing blocks.
32+
latest_height_file = os.path.join(current_dir, "latest_height.txt")
33+
latest_height = -1
34+
if os.path.exists(latest_height_file):
35+
with open(latest_height_file, "r") as f:
36+
latest_height = int(f.read())
37+
print(f"Latest Height: {latest_height}")
38+
39+
40+
def on_message(ws, message):
41+
msg = dict(json.loads(message))
42+
43+
if msg.get("result") == {}:
44+
print("Subscribed to New Block...")
45+
return
46+
47+
block_data = msg.get("result", {})
48+
49+
# Get height & save latest to file
50+
block_height = get_block_height(block_data)
51+
update_latest_height(latest_height_file, block_height)
52+
53+
# Gets block transactions, decodes them to JSON, and saves them to the block_data
54+
block_txs = get_block_txs(block_data)
55+
decoded_txs = decode_txs(COSMOS_BINARY_FILE, block_txs)
56+
block_data["data"]["value"]["block"]["data"]["txs"] = decoded_txs
57+
58+
# Gets unique addresses from events (users/contracts interacted with during this time frame)
59+
# Useful for cache solutions. So if a user does not have any changes here, then we can keep them cached longer
60+
block_events = get_block_events(block_data)
61+
unique_event_addresses = get_unique_event_addresses(WALLET_PREFIX, block_events)
62+
block_data["events"]["all_unique_event_addresses"] = list(unique_event_addresses)
63+
64+
# Removes useless events we do not need to cache which take up lots of space
65+
updated_data = remove_useless_data(block_data)
66+
67+
# Saves data to a file
68+
# TODO: learn postgres and save with relations
69+
with open(os.path.join(blocks, f"{block_height}.json"), "w") as f:
70+
f.write(json.dumps(updated_data))
71+
72+
73+
def on_error(ws, error):
74+
print("error", error)
75+
76+
77+
def on_close(ws, close_status_code, close_msg):
78+
print("### closed ###")
79+
80+
81+
def on_open(ws):
82+
print("Opened connection")
83+
ws.send(
84+
'{"jsonrpc": "2.0", "method": "subscribe", "params": ["tm.event=\'NewBlock\'"], "id": 1}'
85+
)
86+
print("Sent subscribe request")
87+
88+
89+
# from websocket import create_connection
90+
if __name__ == "__main__":
91+
websocket.enableTrace(False) # toggle to show or hide output
92+
ws = websocket.WebSocketApp(
93+
f"{RPC_URL}",
94+
on_open=on_open,
95+
on_message=on_message,
96+
on_error=on_error,
97+
on_close=on_close,
98+
)
99+
100+
ws.run_forever(
101+
dispatcher=rel, reconnect=5
102+
) # Set dispatcher to automatic reconnection, 5 second reconnect delay if connection closed unexpectedly
103+
rel.signal(2, rel.abort) # Keyboard Interrupt
104+
rel.dispatch()

util.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import json
2+
import os
3+
4+
current_dir = os.path.dirname(os.path.realpath(__file__))
5+
6+
7+
def __run_decode(cosmos_binary: str, tx: str) -> dict:
8+
# check for max len tx (store code breaks this for CLI usage on linux)
9+
if len(tx) > 32766:
10+
print("TX too long. Skipping...")
11+
return {}
12+
13+
res = os.popen(f"{cosmos_binary} tx decode {tx} --output json").read()
14+
return json.loads(res)
15+
16+
17+
def decode_txs(COSMOS_BINARY_FILE: str, block_txs: dict) -> list:
18+
decoded_txs = []
19+
20+
# iterate through each and convert to json with junod
21+
for tx in block_txs:
22+
tx = __run_decode(COSMOS_BINARY_FILE, tx)
23+
if isinstance(tx, dict) and len(tx) > 0:
24+
decoded_txs.append(tx)
25+
26+
return decoded_txs
27+
28+
def get_block_txs(block_data: dict) -> list:
29+
return (
30+
block_data.get("data", {})
31+
.get("value", {})
32+
.get("block", {})
33+
.get("data", {})
34+
.get("txs", [])
35+
)
36+
37+
38+
def get_block_events(block_data: dict) -> dict:
39+
return block_data.get("events", {})
40+
41+
42+
def get_block_height(block_data: dict) -> int:
43+
height = block_data["data"]["value"]["block"]["header"]["height"]
44+
print(f"Block Height: {height}")
45+
46+
return height
47+
48+
49+
def update_latest_height(latest_height_file: str, height: int) -> None:
50+
with open(latest_height_file, "w") as f:
51+
f.write(str(height))
52+
53+
54+
def get_unique_event_addresses(wallet_prefix: str, block_events: dict) -> list[str]:
55+
# any address which had some action in the block
56+
event_addresses: list[str] = []
57+
58+
for event_key, value in block_events.items():
59+
if not isinstance(value, list):
60+
continue
61+
62+
for v in value:
63+
if isinstance(v, str) and v.startswith(wallet_prefix):
64+
if v not in event_addresses:
65+
event_addresses.append(v)
66+
67+
return event_addresses
68+
69+
70+
def remove_useless_data(block_data: dict) -> dict:
71+
# remove result_begin_block in the value section
72+
del block_data["data"]["value"]["result_begin_block"]
73+
del block_data["data"]["value"]["result_end_block"]
74+
del block_data["data"]["value"]["block"]["last_commit"]["signatures"]
75+
76+
# remove usless events for us
77+
del block_data["events"]["commission.amount"]
78+
del block_data["events"]["commission.validator"]
79+
del block_data["events"]["rewards.amount"]
80+
del block_data["events"]["rewards.validator"]
81+
del block_data["events"]["coin_spent.amount"]
82+
del block_data["events"]["coin_received.receiver"]
83+
del block_data["events"]["proposer_reward.amount"]
84+
del block_data["events"]["mint.amount"]
85+
del block_data["events"]["coinbase.amount"]
86+
87+
return block_data

0 commit comments

Comments
 (0)