Skip to content

Commit 19e6f4e

Browse files
committed
Connector changes for grolt
1 parent 3d40acc commit 19e6f4e

File tree

1 file changed

+39
-25
lines changed

1 file changed

+39
-25
lines changed

py2neo/client/__init__.py

Lines changed: 39 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -940,14 +940,12 @@ def __init__(self, profile=None, user_agent=None, init_size=None,
940940
self._routing_refresh_ttl = routing_refresh_ttl
941941
self._pools = {}
942942
if self._profile.routing:
943-
self._routing = Router()
944-
self._routing_tables = {}
943+
self._router = Router()
945944
else:
946-
self._routing = None
947-
self._routing_tables = None
948-
self.add_pools(*self._initial_routers)
945+
self._router = None
946+
self._add_pools(*self._initial_routers)
949947
if self._profile.routing:
950-
self._refresh_routing_table(None)
948+
self.refresh_routing_table(None)
951949

952950
def __repr__(self):
953951
return "<{} to {!r}>".format(self.__class__.__name__, self.profile)
@@ -959,7 +957,7 @@ def __str__(self):
959957
def __hash__(self):
960958
return hash(self.profile)
961959

962-
def add_pools(self, *profiles):
960+
def _add_pools(self, *profiles):
963961
""" Adds connection pools for one or more connection profiles.
964962
Pools that already exist will be skipped.
965963
"""
@@ -981,16 +979,16 @@ def add_pools(self, *profiles):
981979
def invalidate_routing_table(self, graph_name):
982980
""" Invalidate the routing table for the given graph.
983981
"""
984-
if self._routing is not None:
985-
self._routing.invalidate_routing_table(graph_name)
982+
if self._router is not None:
983+
self._router.invalidate_routing_table(graph_name)
986984

987985
def _get_profiles(self, graph_name=None, readonly=False):
988-
if self._routing is None:
986+
if self._router is None:
989987
# If routing isn't enabled, just return a
990988
# simple list of pools.
991989
return self._pools.keys(), self._pools.keys()
992990

993-
rt = self._routing.table(graph_name)
991+
rt = self._router.get_routing_table(graph_name)
994992
while True: # TODO: some limit to this, maybe with repeater?
995993
ro_profiles, rw_profiles, expired = rt.runners()
996994
if not expired:
@@ -1003,15 +1001,22 @@ def _get_profiles(self, graph_name=None, readonly=False):
10031001
else:
10041002
rt.wait_until_updated()
10051003
else:
1006-
self._refresh_routing_table(graph_name)
1004+
self.refresh_routing_table(graph_name)
10071005

1008-
def _refresh_routing_table(self, graph_name=None):
1006+
def refresh_routing_table(self, graph_name):
1007+
""" Refresh the routing table for a given graph database.
1008+
1009+
:param graph_name:
1010+
graph database for which to refresh the routing table
1011+
:returns:
1012+
:class:`.RoutingTable` instance for the graph database
1013+
"""
10091014
log.debug("Attempting to refresh routing table for %s", _repr_graph_name(graph_name))
1010-
assert self._routing is not None
1011-
rt = self._routing.table(graph_name)
1015+
assert self._router is not None
1016+
rt = self._router.get_routing_table(graph_name)
10121017
rt.set_updating()
10131018
try:
1014-
known_routers = self._routing.routers + self._initial_routers # TODO de-dupe
1019+
known_routers = self._router.routers + self._initial_routers # TODO de-dupe
10151020
log.debug("Known routers are: %s", ", ".join(map(repr, known_routers)))
10161021
for router in known_routers:
10171022
log.debug("Asking %r for routing table", router)
@@ -1033,20 +1038,27 @@ def _refresh_routing_table(self, graph_name=None):
10331038
continue
10341039
else:
10351040
# TODO: comment this algorithm
1036-
self.add_pools(*routers)
1037-
self.add_pools(*ro_runners)
1038-
self.add_pools(*rw_runners)
1039-
old_profiles = self._routing.update(graph_name, routers, ro_runners, rw_runners, ttl)
1041+
self._add_pools(*routers)
1042+
self._add_pools(*ro_runners)
1043+
self._add_pools(*rw_runners)
1044+
old_profiles = self._router.update(graph_name, routers, ro_runners, rw_runners, ttl)
10401045
for profile in old_profiles:
10411046
self.prune(profile)
1042-
return
1047+
return self._router.get_routing_table(graph_name)
10431048
finally:
10441049
cx.release()
10451050
else:
10461051
raise ServiceUnavailable("Cannot connect to any known routers")
10471052
finally:
10481053
rt.set_not_updating()
10491054

1055+
def get_router_profiles(self):
1056+
""" Get the last known router profiles.
1057+
"""
1058+
if self._router is None:
1059+
return None
1060+
return self._router.routers
1061+
10501062
@property
10511063
def profile(self):
10521064
""" The initial connection profile for this connector.
@@ -1261,7 +1273,7 @@ def prune(self, profile):
12611273
pass
12621274
else:
12631275
pool.prune()
1264-
if self._routing is not None and pool.size == 0:
1276+
if self._router is not None and pool.size == 0:
12651277
log.debug("Removing connection pool for profile %r", profile)
12661278
try:
12671279
del self._pools[profile]
@@ -1298,8 +1310,8 @@ def _on_broken(self, profile, message):
12981310
"""
12991311
log.debug("Connection to %r broken\n%s", profile, message)
13001312
# TODO: clean up broken connections from reader and writer entries too
1301-
if self._routing is not None:
1302-
self._routing.set_broken(profile)
1313+
if self._router is not None:
1314+
self._router.set_broken(profile)
13031315
self.prune(profile)
13041316

13051317
def auto_run(self, cypher, parameters=None, pull=-1, graph_name=None, readonly=False,
@@ -1524,7 +1536,9 @@ def __init__(self):
15241536
def routers(self):
15251537
return self._routers
15261538

1527-
def table(self, graph_name):
1539+
def get_routing_table(self, graph_name):
1540+
""" Return the routing table for the given graph.
1541+
"""
15281542
with self._lock:
15291543
try:
15301544
return self._routing_tables[graph_name]

0 commit comments

Comments
 (0)