Skip to content

Commit ec58e20

Browse files
Merge pull request #6 from ttngu207/main
implement new "activation" mechanism -> using dict, module name or module for `requirement`
2 parents e9191dd + aa4064c commit ec58e20

File tree

1 file changed

+55
-42
lines changed

1 file changed

+55
-42
lines changed

elements_ephys/ephys.py

Lines changed: 55 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -5,76 +5,89 @@
55
import inspect
66
import uuid
77
import hashlib
8-
from collections.abc import Mapping
8+
import importlib
99

1010
from .readers import neuropixels, kilosort
1111
from . import probe
1212

1313
schema = dj.schema()
1414

15-
context = locals()
15+
_required_module = None
1616

17-
table_classes = (dj.Manual, dj.Lookup, dj.Imported, dj.Computed)
1817

18+
def activate(ephys_schema_name, probe_schema_name=None, *, create_schema=True,
19+
create_tables=True, required_module=None):
20+
"""
21+
activate(ephys_schema_name, probe_schema_name=None, *, create_schema=True, create_tables=True, dependency=None)
22+
:param ephys_schema_name: schema name to activate the `ephys` element
23+
:param probe_schema_name: schema name to activate the `probe` element
24+
- may be omitted if the `probe` element is already activated
25+
:param create_schema: create the schema if not yet existed (default = True)
26+
:param create_tables: create the tables if not yet existed (default = True)
27+
:param required_module: a module name or a module containing the
28+
required dependencies to activate the `ephys` element:
29+
Upstream tables:
30+
+ Session: parent table to ProbeInsertion, typically identifying a recording session
31+
+ SkullReference:
32+
Functions:
33+
+ get_neuropixels_data_directory(probe_insertion_key: dict) -> str
34+
Retrieve the recorded Neuropixels data directory for a given ProbeInsertion
35+
:param probe_insertion_key: a dictionary of one ProbeInsertion `key`
36+
:return: a string for full path to the resulting Neuropixels data directory
37+
+ get_kilosort_output_directory(clustering_task_key: dict) -> str
38+
Retrieve the Kilosort output directory for a given ClusteringTask
39+
:param clustering_task_key: a dictionary of one ClusteringTask `key`
40+
:return: a string for full path to the resulting Kilosort output directory
41+
+ get_paramset_idx(ephys_rec_key: dict) -> int
42+
Retrieve attribute `paramset_idx` from the ClusteringParamSet record for the given EphysRecording.
43+
:param ephys_rec_key: a dictionary of one EphysRecording `key`
44+
:return: int specifying the `paramset_idx`
45+
"""
1946

20-
def activate(ephys_schema_name, probe_schema_name=None, create_schema=True, create_tables=True, add_objects=None):
21-
assert isinstance(add_objects, Mapping)
22-
23-
upstream_tables = ("Session", "SkullReference")
24-
for name in upstream_tables:
25-
assert name in add_objects, "Upstream table %s is required in ephys.activate(add_objects=...)" % name
26-
table = add_objects[name]
27-
if inspect.isclass(table):
28-
table = table()
29-
assert isinstance(table, table_classes), "Upstream table %s must be a DataJoint table " \
30-
"object in ephys.activate(add_objects=...)" % name
47+
if isinstance(required_module, str):
48+
required_module = importlib.import_module(required_module)
49+
assert inspect.ismodule(required_module), "The argument 'dependency' must be a module's name or a module"
3150

32-
required_functions = ("get_neuropixels_data_directory", "get_paramset_idx", "get_kilosort_output_directory")
33-
for name in required_functions:
34-
assert name in add_objects, "Functions %s is required in ephys.activate(add_objects=...)" % name
35-
assert inspect.isfunction(add_objects[name]), "%s must be a function in ephys.activate(add_objects=...)" % name
36-
context.update(**{name: add_objects[name]})
51+
global _required_module
52+
_required_module = required_module
3753

3854
# activate
39-
if probe.schema.database is not None:
40-
probe.schema.activate(probe_schema_name or ephys_schema_name,
41-
create_schema=create_schema, create_tables=create_tables)
42-
55+
probe.schema.activate(probe_schema_name, create_schema=create_schema, create_tables=create_tables)
4356
schema.activate(ephys_schema_name, create_schema=create_schema,
44-
create_tables=create_tables, add_objects=add_objects)
57+
create_tables=create_tables, add_objects=_required_module.__dict__)
4558

4659

4760
# -------------- Functions required by the elements-ephys ---------------
4861

4962

5063
def get_neuropixels_data_directory(probe_insertion_key: dict) -> str:
5164
"""
52-
Retrieve the recorded Neuropixels data directory for a given ProbeInsertion
53-
:param probe_insertion_key: a dictionary of one ProbeInsertion `key`
54-
:return: a string for full path to the resulting Neuropixels data directory
65+
get_neuropixels_data_directory(probe_insertion_key: dict) -> str
66+
Retrieve the recorded Neuropixels data directory for a given ProbeInsertion
67+
:param probe_insertion_key: a dictionary of one ProbeInsertion `key`
68+
:return: a string for full path to the resulting Neuropixels data directory
5569
"""
56-
assert set(ProbeInsertion().primary_key) <= set(probe_insertion_key)
57-
raise NotImplementedError('Workflow module should define function: "get_neuropixels_data_directory"')
70+
return _required_module.get_neuropixels_data_directory(probe_insertion_key)
5871

5972

6073
def get_kilosort_output_directory(clustering_task_key: dict) -> str:
6174
"""
62-
Retrieve the Kilosort output directory for a given ClusteringTask
63-
:param clustering_task_key: a dictionary of one ClusteringTask `key`
64-
:return: a string for full path to the resulting Kilosort output directory
75+
get_kilosort_output_directory(clustering_task_key: dict) -> str
76+
Retrieve the Kilosort output directory for a given ClusteringTask
77+
:param clustering_task_key: a dictionary of one ClusteringTask `key`
78+
:return: a string for full path to the resulting Kilosort output directory
6579
"""
66-
assert set(EphysRecording().primary_key) <= set(clustering_task_key)
67-
raise NotImplementedError('Workflow module should define function: "get_kilosort_output_directory"')
80+
return _required_module.get_kilosort_output_directory(clustering_task_key)
6881

6982

7083
def get_paramset_idx(ephys_rec_key: dict) -> int:
7184
"""
72-
Retrieve attribute `paramset_idx` from the ClusteringParamSet record for the given EphysRecording.
73-
:param ephys_rec_key: a dictionary of one EphysRecording `key`
74-
:return: int specifying the `paramset_idx`
85+
get_paramset_idx(ephys_rec_key: dict) -> int
86+
Retrieve attribute `paramset_idx` from the ClusteringParamSet record for the given EphysRecording.
87+
:param ephys_rec_key: a dictionary of one EphysRecording `key`
88+
:return: int specifying the `paramset_idx`
7589
"""
76-
assert set(EphysRecording().primary_key) <= set(ephys_rec_key)
77-
raise NotImplementedError('Workflow module should define function: get_paramset_idx')
90+
return _required_module.get_paramset_idx(ephys_rec_key)
7891

7992

8093
# ----------------------------- Table declarations ----------------------
@@ -234,11 +247,11 @@ def insert_new_params(cls, processing_method: str, paramset_idx: int, paramset_d
234247
q_param = cls & {'param_set_hash': param_dict['param_set_hash']}
235248

236249
if q_param: # If the specified param-set already exists
237-
pname = q_param.fetch1('param_set_name')
238-
if pname == paramset_idx: # If the existed set has the same name: job done
250+
pname = q_param.fetch1('paramset_idx')
251+
if pname == paramset_idx: # If the existing set has the same paramset_idx: job done
239252
return
240253
else: # If not same name: human error, trying to add the same paramset with different name
241-
raise dj.DataJointError('The specified param-set already exists - name: {}'.format(pname))
254+
raise dj.DataJointError('The specified param-set already exists - paramset_idx: {}'.format(pname))
242255
else:
243256
cls.insert1(param_dict)
244257

0 commit comments

Comments
 (0)