3
3
from pathlib import Path
4
4
import json
5
5
import os
6
+ import subprocess
7
+ import gzip
8
+ import logging
9
+ from datetime import datetime
10
+ import csv
11
+
12
+ # Setup logging
13
+ def setup_logging ():
14
+ logger = logging .getLogger ('vv' )
15
+ logger .setLevel (logging .DEBUG ) # Change to DEBUG level
16
+
17
+ # File handler
18
+ fh = logging .FileHandler ('vv.log' )
19
+ fh .setLevel (logging .DEBUG ) # Change to DEBUG level
20
+
21
+ # Console handler
22
+ ch = logging .StreamHandler ()
23
+ ch .setLevel (logging .INFO ) # Keep console at INFO
24
+
25
+ # Formatting
26
+ formatter = logging .Formatter ('%(asctime)s - %(levelname)s - %(message)s' )
27
+ fh .setFormatter (formatter )
28
+ ch .setFormatter (formatter )
29
+
30
+ logger .addHandler (fh )
31
+ logger .addHandler (ch )
32
+
33
+ return logger
34
+
35
+ logger = setup_logging ()
6
36
7
37
# Output structure config
8
38
STRUCTURE = {
53
83
}
54
84
}
55
85
86
+ class ValidationLogger :
87
+ """Handles logging validation results in a structured format"""
88
+ def __init__ (self , log_file = "VV_log.csv" ):
89
+ self .log_file = log_file
90
+ self .results = []
91
+ self .stats = {} # Track quantitative stats
92
+
93
+ # Create/overwrite log file with header
94
+ with open (log_file , "w" ) as f :
95
+ f .write ("component,sample_id,check_name,status,message,details\n " )
96
+
97
+ def log (self , component , sample_id , check_name , status , message , details = "" , stats = None ):
98
+ """Log a validation result with optional stats"""
99
+ # Extract just the sample name from CSV row
100
+ sample_name = sample_id .split (',' )[- 1 ].strip () if ',' in sample_id else sample_id
101
+
102
+ entry = {
103
+ "component" : component ,
104
+ "sample_id" : sample_name , # Use clean sample name
105
+ "check_name" : check_name ,
106
+ "status" : status ,
107
+ "message" : message ,
108
+ "details" : details
109
+ }
110
+ self .results .append (entry )
111
+
112
+ # Track stats if provided
113
+ if stats :
114
+ if component not in self .stats :
115
+ self .stats [component ] = {}
116
+ if sample_name not in self .stats [component ]: # Use clean sample name
117
+ self .stats [component ][sample_name ] = {}
118
+ self .stats [component ][sample_name ][check_name ] = stats
119
+
120
+ # Log to vv.log
121
+ if stats :
122
+ # Quantitative check
123
+ logger .info (f"{ component } - { sample_name } - { check_name } :" ) # Use clean sample name
124
+ for stat_name , value in stats .items ():
125
+ logger .info (f" { stat_name } : { value } " )
126
+ else :
127
+ # Qualitative check
128
+ logger .info (f"{ component } - { sample_name } - { check_name } : { status } " ) # Use clean sample name
129
+ if details :
130
+ logger .info (f" Details: { details } " )
131
+
132
+ # Escape any commas in fields
133
+ safe_fields = [
134
+ str (field ).replace ('"' , '""' ) if ',' in str (field ) else str (field )
135
+ for field in [component , sample_name , check_name , status , message , details ] # Use clean sample name
136
+ ]
137
+
138
+ # Quote fields that need it
139
+ csv_fields = [
140
+ f'"{ field } "' if ',' in field else field
141
+ for field in safe_fields
142
+ ]
143
+
144
+ # Append to log file
145
+ with open (self .log_file , "a" ) as f :
146
+ f .write (f"{ ',' .join (csv_fields )} \n " )
147
+
148
+ def get_status (self ):
149
+ """Get overall validation status"""
150
+ if any (r ["status" ] == "HALT" for r in self .results ):
151
+ return "failed"
152
+ if any (r ["status" ] == "RED" for r in self .results ):
153
+ return "failed"
154
+ if any (r ["status" ] == "YELLOW" for r in self .results ):
155
+ return "warning"
156
+ return "passed"
157
+
56
158
@click .command ()
57
159
@click .option ('--assay-type' , type = click .Choice (['rnaseq' , 'scrna' ]), default = 'rnaseq' )
58
160
@click .option ('--assay-suffix' , type = click .STRING , default = "_GLbulkRNAseq" )
@@ -79,8 +181,24 @@ def vv(assay_type, assay_suffix, runsheet_path, outdir, paired_end, mode, run_co
79
181
80
182
# Run validation if component specified
81
183
if run_components :
82
- with open ("VV_log.tsv" , "w" ) as f :
83
- f .write (f"Stub validation log for { run_components } \n " )
184
+ if 'raw_reads' in run_components :
185
+ # Convert paired_end string to bool
186
+ is_paired = paired_end .lower () == 'true' if isinstance (paired_end , str ) else paired_end
187
+
188
+ # Run validation
189
+ results = validate_raw_reads (
190
+ outdir = outdir ,
191
+ samples_txt = Path (runsheet_path ), # Using runsheet as samples file for now
192
+ paired_end = is_paired ,
193
+ assay_suffix = assay_suffix
194
+ )
195
+
196
+ # Create symlink from VV_log.csv to VV_log.tsv for Nextflow
197
+ if os .path .exists ("VV_log.tsv" ):
198
+ os .remove ("VV_log.tsv" )
199
+ os .symlink ("VV_log.csv" , "VV_log.tsv" )
200
+
201
+ return results
84
202
85
203
def stage_files (assay_type , section , ** file_paths ):
86
204
"""
@@ -122,6 +240,196 @@ def get_target_dir(structure, file_type):
122
240
# Implementation depends on your exact structure format
123
241
pass
124
242
243
+ def get_expected_files (sample_id : str , paired_end : bool , assay_suffix : str ) -> dict :
244
+ """
245
+ Get expected file patterns for a sample
246
+
247
+ Args:
248
+ sample_id: Sample name from runsheet
249
+ paired_end: Whether data is paired-end
250
+ assay_suffix: Assay suffix (e.g. "_GLbulkRNAseq")
251
+ """
252
+ expected = {
253
+ # Raw FastQ files
254
+ "fastq" : [f"{ sample_id } _raw.fastq.gz" ] if not paired_end else [
255
+ f"{ sample_id } _R1_raw.fastq.gz" ,
256
+ f"{ sample_id } _R2_raw.fastq.gz"
257
+ ],
258
+
259
+ # FastQC outputs
260
+ "fastqc" : [f"{ sample_id } _raw_fastqc.zip" ] if not paired_end else [
261
+ f"{ sample_id } _R1_raw_fastqc.zip" ,
262
+ f"{ sample_id } _R2_raw_fastqc.zip"
263
+ ],
264
+
265
+ # MultiQC report
266
+ "multiqc" : [f"raw_multiqc{ assay_suffix } _report.zip" ]
267
+ }
268
+ logging .debug (f"Expected files for { sample_id } : { expected } " )
269
+ return expected
270
+
271
+ def validate_raw_reads (outdir : Path ,
272
+ samples_txt : Path ,
273
+ paired_end : bool = True ,
274
+ assay_suffix : str = "_GLbulkRNAseq" ) -> dict :
275
+ """
276
+ Original dp_tools raw reads validation checks
277
+ """
278
+ val_logger = ValidationLogger ()
279
+ read_counts = {} # Track read counts for paired-end validation
280
+
281
+ # Log validation parameters
282
+ logging .info ("Starting raw reads validation:" )
283
+ logging .info (f" Output directory: { outdir } " )
284
+ logging .info (f" Samples file: { samples_txt } " )
285
+ logging .info (f" Paired-end: { paired_end } " )
286
+ logging .info (f" Assay suffix: { assay_suffix } " )
287
+
288
+ try :
289
+ # 1. Basic File Existence Checks
290
+ fastq_dir = outdir / "00-RawData/Fastq"
291
+ fastqc_dir = outdir / "00-RawData/FastQC_Reports"
292
+
293
+ # Read samples from CSV, get Sample Name column
294
+ samples = []
295
+ with open (samples_txt ) as f :
296
+ reader = csv .DictReader (f )
297
+ for row in reader :
298
+ if 'Sample Name' in row :
299
+ sample_name = row ['Sample Name' ].strip ()
300
+ if sample_name : # Only add non-empty sample names
301
+ samples .append (sample_name )
302
+ logging .debug (f"Added sample: { sample_name } " )
303
+ else :
304
+ raise ValueError ("Runsheet missing required 'Sample Name' column" )
305
+
306
+ logging .info (f"Found { len (samples )} samples to validate" )
307
+
308
+ # 2. Check each sample's files
309
+ for sample in samples :
310
+ expected = get_expected_files (sample , paired_end , assay_suffix )
311
+ logging .info (f"\n Validating sample: { sample } " )
312
+
313
+ # 2a. Check FASTQ files exist and validate
314
+ for fastq in expected ["fastq" ]:
315
+ fastq_path = fastq_dir / fastq
316
+
317
+ # Check existence
318
+ if not fastq_path .exists ():
319
+ val_logger .log ("raw_reads" , sample , "file_exists" , "HALT" ,
320
+ f"Missing FastQ file: { fastq } " )
321
+ continue
322
+
323
+ # Check GZIP integrity
324
+ try :
325
+ output = subprocess .run (["gzip" , "-t" , str (fastq_path )],
326
+ capture_output = True , check = True )
327
+ if output .stdout :
328
+ val_logger .log ("raw_reads" , sample , "gzip_integrity" , "HALT" ,
329
+ f"GZIP integrity check failed" , output .stdout .decode ())
330
+ else :
331
+ val_logger .log ("raw_reads" , sample , "gzip_integrity" , "GREEN" ,
332
+ "GZIP integrity check passed" )
333
+ except subprocess .CalledProcessError as e :
334
+ val_logger .log ("raw_reads" , sample , "gzip_integrity" , "HALT" ,
335
+ f"GZIP integrity check failed" , e .stderr .decode ())
336
+
337
+ # Check FASTQ format with stats
338
+ try :
339
+ issues = []
340
+ total_reads = 0
341
+ with gzip .open (fastq_path , "rt" ) as f :
342
+ for i , line in enumerate (f ):
343
+ if i % 4 == 0 : # Header lines only
344
+ total_reads += 1
345
+ if not line .startswith ('@' ):
346
+ issues .append (i + 1 )
347
+ if i % 2_000_000 == 0 : # Log progress
348
+ logging .debug (f"Checked { i } lines in { fastq_path } " )
349
+
350
+ stats = {
351
+ "total_reads" : total_reads ,
352
+ "invalid_headers" : len (issues ),
353
+ "checked_lines" : i + 1
354
+ }
355
+
356
+ if issues :
357
+ val_logger .log ("raw_reads" , sample , "fastq_format" , "HALT" ,
358
+ f"Invalid FASTQ format - headers missing @ at lines: { issues [:10 ]} ..." if len (issues ) > 10 else f"Invalid FASTQ format - headers missing @ at lines: { issues } " ,
359
+ stats = stats )
360
+ else :
361
+ val_logger .log ("raw_reads" , sample , "fastq_format" , "GREEN" ,
362
+ "FASTQ format check passed" ,
363
+ stats = stats )
364
+
365
+ # Store read count for paired-end check later
366
+ read_counts [fastq_path ] = total_reads
367
+
368
+ except Exception as e :
369
+ val_logger .log ("raw_reads" , sample , "fastq_format" , "HALT" ,
370
+ f"Error checking FASTQ format" , str (e ))
371
+
372
+ # 2b. If paired-end, check read counts match with stats
373
+ if paired_end :
374
+ try :
375
+ r1_path = fastq_dir / expected ["fastq" ][0 ]
376
+ r2_path = fastq_dir / expected ["fastq" ][1 ]
377
+
378
+ # Use counts from format checking
379
+ r1_count = read_counts .get (r1_path , 0 )
380
+ r2_count = read_counts .get (r2_path , 0 )
381
+
382
+ stats = {
383
+ "r1_reads" : r1_count ,
384
+ "r2_reads" : r2_count ,
385
+ "difference" : abs (r1_count - r2_count )
386
+ }
387
+
388
+ if r1_count != r2_count :
389
+ val_logger .log ("raw_reads" , sample , "paired_counts" , "HALT" ,
390
+ f"Paired read counts don't match" ,
391
+ f"R1={ r1_count } , R2={ r2_count } " ,
392
+ stats = stats )
393
+ else :
394
+ val_logger .log ("raw_reads" , sample , "paired_counts" , "GREEN" ,
395
+ f"Paired read counts match ({ r1_count } reads)" ,
396
+ stats = stats )
397
+ except Exception as e :
398
+ val_logger .log ("raw_reads" , sample , "paired_counts" , "HALT" ,
399
+ f"Error checking read counts" , str (e ))
400
+
401
+ # 2c. Check FastQC outputs exist
402
+ for fastqc in expected ["fastqc" ]:
403
+ fastqc_path = fastqc_dir / fastqc
404
+ if not fastqc_path .exists ():
405
+ val_logger .log ("raw_reads" , sample , "fastqc_exists" , "HALT" ,
406
+ f"Missing FastQC output: { fastqc } " )
407
+ else :
408
+ val_logger .log ("raw_reads" , sample , "fastqc_exists" , "GREEN" ,
409
+ f"FastQC output found: { fastqc } " )
410
+
411
+ # 3. Check MultiQC report exists
412
+ multiqc = expected ["multiqc" ][0 ]
413
+ multiqc_path = fastqc_dir / multiqc
414
+ if not multiqc_path .exists ():
415
+ val_logger .log ("raw_reads" , "ALL" , "multiqc_exists" , "HALT" ,
416
+ f"Missing MultiQC report: { multiqc } " )
417
+ else :
418
+ val_logger .log ("raw_reads" , "ALL" , "multiqc_exists" , "GREEN" ,
419
+ f"MultiQC report found: { multiqc } " )
420
+
421
+ except Exception as e :
422
+ val_logger .log ("raw_reads" , "ALL" , "validation" , "HALT" ,
423
+ f"Validation error" , str (e ))
424
+
425
+ return {
426
+ "status" : val_logger .get_status (),
427
+ "messages" : [r ["message" ] for r in val_logger .results ],
428
+ "failures" : {r ["check_name" ]: r ["message" ]
429
+ for r in val_logger .results
430
+ if r ["status" ] in ["HALT" , "RED" ]}
431
+ }
432
+
125
433
if __name__ == '__main__' :
126
434
vv ()
127
435
0 commit comments