Skip to content

Commit 27e2e4d

Browse files
committed
Flesh out logging
1 parent f477873 commit 27e2e4d

File tree

3 files changed

+50
-42
lines changed

3 files changed

+50
-42
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import hail as hl
2+
import luigi
3+
4+
from v03_pipeline.lib.logger import get_logger
5+
from v03_pipeline.lib.model import DatasetType, Env, ReferenceGenome, SampleType
6+
from v03_pipeline.lib.tasks.files import GCSorLocalFolderTarget
7+
8+
logger = get_logger(__name__)
9+
10+
class BaseTask(luigi.Task):
11+
reference_genome = luigi.EnumParameter(enum=ReferenceGenome)
12+
dataset_type = luigi.EnumParameter(enum=DatasetType)
13+
sample_type = luigi.EnumParameter(enum=SampleType)
14+
15+
def output(self) -> luigi.Target:
16+
raise NotImplementedError
17+
18+
def complete(self) -> bool:
19+
return GCSorLocalFolderTarget(self.output().path).exists()
20+
21+
def init_hail(self):
22+
# Need to use the GCP bucket as temp storage for very large callset joins
23+
hl.init(tmp_dir=Env.HAIL_TMPDIR, idempotent=True)
24+
25+
# Interval ref data join causes shuffle death, this prevents it
26+
hl._set_flags(use_new_shuffle='1', no_whole_stage_codegen='1') # noqa: SLF001
27+
28+
@luigi.Task.event_handler(luigi.Event.DEPENDENCY_MISSING)
29+
def dependency_missing(task):
30+
logger.info(f'{task} dependency_missing')
31+
32+
@luigi.Task.event_handler(luigi.Event.DEPENDENCY_PRESENT)
33+
def dependency_present(task):
34+
logger.info(f'{task} dependency_present')
35+
36+
@luigi.Task.event_handler(luigi.Event.START)
37+
def start(task):
38+
logger.info(f'{task} start')
39+
40+
@luigi.Task.event_handler(luigi.Event.FAILURE)
41+
def failure(task, exception):
42+
logger.exception(f'{task} failure')
43+
44+
@luigi.Task.event_handler(luigi.Event.SUCCESS)
45+
def success(task):
46+
logger.info(f'{task} success')

v03_pipeline/lib/tasks/base/base_update_task.py

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,10 @@
11
import hail as hl
2-
import luigi
32

43
from v03_pipeline.lib.misc.io import write
5-
from v03_pipeline.lib.model import DatasetType, Env, ReferenceGenome, SampleType
6-
from v03_pipeline.lib.tasks.files import GCSorLocalFolderTarget
4+
from v03_pipeline.lib.tasks.base.base_task import BaseTask
75

86

9-
class BaseUpdateTask(luigi.Task):
10-
reference_genome = luigi.EnumParameter(enum=ReferenceGenome)
11-
dataset_type = luigi.EnumParameter(enum=DatasetType)
12-
sample_type = luigi.EnumParameter(enum=SampleType)
13-
14-
def output(self) -> luigi.Target:
15-
raise NotImplementedError
16-
17-
def complete(self) -> bool:
18-
return GCSorLocalFolderTarget(self.output().path).exists()
19-
20-
def init_hail(self):
21-
# Need to use the GCP bucket as temp storage for very large callset joins
22-
hl.init(tmp_dir=Env.HAIL_TMPDIR, idempotent=True)
23-
24-
# Interval ref data join causes shuffle death, this prevents it
25-
hl._set_flags(use_new_shuffle='1', no_whole_stage_codegen='1') # noqa: SLF001
26-
7+
class BaseUpdateTask(BaseTask):
278
def run(self) -> None:
289
self.init_hail()
2910
if not self.output().exists():

v03_pipeline/lib/tasks/base/base_write_task.py

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,10 @@
11
import hail as hl
2-
import luigi
32

43
from v03_pipeline.lib.misc.io import write
5-
from v03_pipeline.lib.model import DatasetType, Env, ReferenceGenome, SampleType
6-
from v03_pipeline.lib.tasks.files import GCSorLocalFolderTarget
4+
from v03_pipeline.lib.tasks.base.base_task import BaseTask
75

86

9-
class BaseWriteTask(luigi.Task):
10-
reference_genome = luigi.EnumParameter(enum=ReferenceGenome)
11-
dataset_type = luigi.EnumParameter(enum=DatasetType)
12-
sample_type = luigi.EnumParameter(enum=SampleType)
13-
14-
def output(self) -> luigi.Target:
15-
raise NotImplementedError
16-
17-
def complete(self) -> bool:
18-
return GCSorLocalFolderTarget(self.output().path).exists()
19-
20-
def init_hail(self):
21-
# Need to use the GCP bucket as temp storage for very large callset joins
22-
hl.init(tmp_dir=Env.HAIL_TMPDIR, idempotent=True)
23-
24-
# Interval ref data join causes shuffle death, this prevents it
25-
hl._set_flags(use_new_shuffle='1', no_whole_stage_codegen='1') # noqa: SLF001
26-
7+
class BaseWriteTask(BaseTask):
278
def run(self) -> None:
289
self.init_hail()
2910
ht = self.create_table()

0 commit comments

Comments
 (0)