Skip to content

Commit bf83ab7

Browse files
committed
monitor effects
1 parent f6754ed commit bf83ab7

File tree

2 files changed

+127
-21
lines changed

2 files changed

+127
-21
lines changed

code_graph/git_utils/git_utils.py

Lines changed: 126 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,117 @@
11
import os
2+
import time
3+
import redis
4+
import threading
5+
import subprocess
26
from git import Repo
37
from ..graph import Graph
48

9+
monitor_thread = None
10+
replica_process = None
11+
monitor_exit_event = threading.Event()
12+
13+
# setup replication to the master
14+
def setup_replication():
15+
global replica_process
16+
17+
# start replica server
18+
command = [
19+
"redis-server",
20+
"--port", "6380",
21+
"--replicaof", "localhost", "6379",
22+
"--loadmodule", "/Users/roilipman/Dev/FalkorDB/bin/macos-arm64v8-release/src/falkordb.so"
23+
]
24+
replica_process = subprocess.Popen(command)
25+
26+
# closes redis replica
27+
def teardown_replica():
28+
print("closing replica")
29+
30+
# Gracefully terminate the process
31+
replica_process.terminate()
32+
33+
# Wait for the process to exit
34+
replica_process.wait()
35+
36+
print("replica terminated.")
37+
38+
# runs on a dedicated thread, capture GRAPH.EFFECT commands
39+
def _monitor_effects(graph_name):
40+
# Connect to Redis server
41+
r = redis.Redis(host='localhost', port=6380)
42+
43+
# Start monitoring the Redis server
44+
with r.monitor() as m:
45+
print("MONITOR ACTIVATED!")
46+
# Print commands as they are executed on the Redis server
47+
for command in m.listen():
48+
print("listening for monitor commands")
49+
# check for exit signal
50+
if monitor_exit_event.is_set():
51+
print("exit signal recived")
52+
break
53+
54+
cmd = command['command']
55+
print(f"cmd: {cmd}")
56+
if "GRAPH.EFFECT" in cmd and graph_name in cmd:
57+
# "GRAPH.EFFECT" "FalkorDB" "\x01\x05\x00\x00\x00\x90\x14\x00\x00\x00\x00\x00\x00"
58+
print(f"Detected effect: {cmd}")
59+
60+
# save effect
61+
62+
#{'time': 1728840094.85141, 'db': 0, 'client_address': '[::1]', 'client_port': '6379', 'client_type': 'tcp'
63+
#, 'command': 'set x 7'}
64+
65+
# monitor graph.effect commands
66+
def start_monitor_effects(graph_name):
67+
print(f"graph_name: {graph_name}")
68+
r = redis.Redis(host='localhost', port=6380, decode_responses=True)
69+
70+
# wait for replica to become responsive
71+
connected = False
72+
while not connected:
73+
# wait one sec
74+
time.sleep(1)
75+
try:
76+
role = r.role()
77+
connected = (role[0] == 'slave' and role[1] == 'localhost' and role[2] == 6379 and role[3] == 'connected')
78+
except Exception:
79+
pass
80+
81+
print("starting monitor thread")
82+
83+
monitor_thread = threading.Thread(target=_monitor_effects, args=(graph_name,))
84+
monitor_thread.start()
85+
86+
print("monitor thread started")
87+
88+
# stop monitoring graph.effect commands
89+
def stop_monitor_effects():
90+
print("signaling monitor thread to exit")
91+
92+
# Signal the thread to exit
93+
monitor_exit_event.set()
94+
95+
# Wait for the thread to finish
96+
monitor_thread.join()
97+
98+
print("monitor thread exited")
99+
5100
# build a graph capturing the git commit history
6101
def build_commit_graph(path: str):
7102
repo = Repo(path)
8103

9104
repo_name = os.path.split(os.path.normpath(path))[-1]
10105
g = Graph(repo_name)
11106

107+
# TODO: need to wait for replication to sync with its master
108+
setup_replication()
109+
110+
# start monitoring graph effects
111+
# these capture the changes a graph goes through when moving from one
112+
# git commit to another
113+
start_monitor_effects(g.g.name)
114+
12115
head_commit = repo.commit("HEAD")
13116
while len(head_commit.parents) > 0:
14117
prev_commit = head_commit.parents[0]
@@ -24,41 +127,45 @@ def build_commit_graph(path: str):
24127
diff = head_commit.diff(prev_commit)
25128

26129
print(f"hash: {head_commit.hexsha}")
27-
print(f"message: {head_commit.message}")
130+
#print(f"message: {head_commit.message}")
28131

29-
for change in diff:
30-
added = []
31-
deleted = []
32-
modified = []
132+
added = []
133+
deleted = []
134+
modified = []
33135

136+
for change in diff:
34137
if change.new_file:
35138
#print(f"new_file: {change.b_path}")
36139
added.append(change.b_path)
37-
pass
38140
elif change.deleted_file:
39141
print(f"deleted_file: {change.a_path}")
40142
deleted.append(change.a_path)
41143
elif change.change_type == 'M':
42144
#print(f"modified_file: {change.a_path}")
43145
modified.append(change.a_path)
44-
pass
45146

46-
head_commit = prev_commit
147+
head_commit = prev_commit
148+
149+
#-----------------------------------------------------------------------
150+
# apply changes
151+
#-----------------------------------------------------------------------
47152

48-
#-------------------------------------------------------------------
49-
# apply changes
50-
#-------------------------------------------------------------------
153+
# apply deletions
51154

52-
# apply deletions
155+
if len(deleted) > 0:
156+
# TODO: a bit of a waste, compute in previous loop
157+
deleted_files = [
158+
{'path': os.path.dirname(path),
159+
'name': os.path.basename(path),
160+
'ext' : os.path.splitext(path)[1]} for path in deleted]
53161

54-
if len(deleted_file) > 0:
55-
deleted_files = [
56-
{'path': os.path.dirname(path),
57-
'name': os.path.basename(path),
58-
'ext' : os.path.splitext(path)[1]} for path in deleted]
162+
# remove deleted files from the graph
163+
g.delete_files(deleted_files)
164+
input("Press Enter to continue...")
59165

60-
# remove deleted files from the graph
61-
g.delete_files(deleted_files)
166+
# clean up
167+
stop_monitor_effects()
168+
teardown_replica()
62169

63170
if __name__ == "__main__":
64171
build_commit_graph("/Users/roilipman/Dev/FalkorDB")

code_graph/graph.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,8 +260,7 @@ def delete_files(self, files: List[dict]) -> None:
260260
"""
261261

262262
params = {'files': files}
263-
#self.g.query(q, params)
264-
self.g.explain(q, params)
263+
self.g.query(q, params)
265264

266265
def get_file(self, path: str, name: str, ext: str) -> Optional[File]:
267266
"""

0 commit comments

Comments
 (0)