Skip to content

Commit 54c4759

Browse files
committed
output format improvements
1 parent e633103 commit 54c4759

File tree

8 files changed

+58
-65
lines changed

8 files changed

+58
-65
lines changed

checks.yaml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
version: "1"
22
validations:
3-
- dataset: ch-local@[nyc_taxi.trips_small, nyc_taxi.trips_big]
3+
- dataset: ch-local@[nyc_taxi.trips_small]
44
where: "pickup_datetime > '2014-01-01'"
55
checks:
66
- id: row_count > 0
@@ -27,6 +27,10 @@ validations:
2727
description: "sum of value"
2828
severity: error
2929

30+
- id: countIf(trip_id == 1) == 1
31+
description: "check trip id"
32+
severity: warn
33+
3034
- id: raw_query
3135
description: "some raw query description here"
3236
severity: error

cmd/check.go

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"dbq/internal"
55
"fmt"
66
"log"
7+
"os"
78
"strings"
89

910
"github.com/spf13/cobra"
@@ -28,9 +29,8 @@ By automating these checks, you can proactively identify and address data qualit
2829
return fmt.Errorf("error while loading checks configuration file: %w", err)
2930
}
3031

31-
for i, rule := range checksCfg.Validations {
32-
log.Printf("Running check for %s [%d/%d]", rule.Dataset, i+1, len(checksCfg.Validations))
33-
32+
shouldFail := false
33+
for _, rule := range checksCfg.Validations {
3434
dataSourceId, datasets, err := parseDatasetString(rule.Dataset)
3535
if err != nil {
3636
return fmt.Errorf("error while parsing dataset property: %w", err)
@@ -42,18 +42,26 @@ By automating these checks, you can proactively identify and address data qualit
4242
}
4343

4444
for dsIdx, dataset := range datasets {
45-
log.Printf(" [%d/%d] Running checks for: %s", dsIdx+1, len(datasets), dataset)
46-
for _, check := range rule.Checks {
47-
_, err := app.RunCheck(&check, dataSource, dataset, rule.Where)
45+
log.Printf("[%d/%d] Running quality checks for: %s", dsIdx+1, len(datasets), dataset)
46+
for cIdx, check := range rule.Checks {
47+
pass, _, err := app.RunCheck(&check, dataSource, dataset, rule.Where)
4848
if err != nil {
4949
log.Printf("Failed to run check: %s", err.Error())
5050
}
51-
// todo: act on check result
52-
// if check.Severity {...}
51+
52+
log.Printf(" [%d/%d] '%s': %s", cIdx+1, len(rule.Checks), check.ID, getCheckResultLabel(pass))
53+
if !pass && check.Severity == "error" {
54+
shouldFail = true
55+
}
5356
}
5457
}
5558
}
5659

60+
if shouldFail {
61+
log.Printf("One or more checks with 'error' severity have failed, exiting...")
62+
os.Exit(1)
63+
}
64+
5765
return nil
5866
},
5967
}
@@ -98,3 +106,11 @@ func parseDatasetString(input string) (datasource string, datasets []string, err
98106

99107
return datasource, datasets, nil
100108
}
109+
110+
func getCheckResultLabel(passed bool) string {
111+
if passed {
112+
return "passed"
113+
} else {
114+
return "failed"
115+
}
116+
}

internal/app.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ type DbqApp interface {
1616
GetDbqConfig() *DbqConfig
1717
SaveDbqConfig() error
1818
FindDataSourceById(srcId string) *DataSource
19-
RunCheck(check *Check, dataSource *DataSource, dataset string, defaultWhere string) (string, error)
19+
RunCheck(check *Check, dataSource *DataSource, dataset string, defaultWhere string) (bool, string, error)
2020
}
2121

2222
type DbqAppImpl struct {
@@ -94,10 +94,10 @@ func (app *DbqAppImpl) FindDataSourceById(srcId string) *DataSource {
9494
return nil
9595
}
9696

97-
func (app *DbqAppImpl) RunCheck(check *Check, dataSource *DataSource, dataset string, defaultWhere string) (string, error) {
97+
func (app *DbqAppImpl) RunCheck(check *Check, dataSource *DataSource, dataset string, defaultWhere string) (bool, string, error) {
9898
cnn, err := getDbqConnector(*dataSource)
9999
if err != nil {
100-
return "", err
100+
return false, "", err
101101
}
102102
return cnn.RunCheck(check, dataset, defaultWhere)
103103
}

internal/checks_config.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package internal
22

33
import (
44
"gopkg.in/yaml.v3"
5-
"log"
65
"os"
76
)
87

@@ -28,14 +27,12 @@ func LoadChecksConfig(fileName string) (*ChecksConfig, error) {
2827
file, err := os.Open(fileName)
2928
defer file.Close()
3029
if err != nil {
31-
log.Printf("Error opening file: %v\n", err)
3230
return nil, err
3331
}
3432

3533
var cfg ChecksConfig
3634
decoder := yaml.NewDecoder(file)
3735
if err := decoder.Decode(&cfg); err != nil {
38-
log.Printf("Error decoding YAML: %v\n", err)
3936
return nil, err
4037
}
4138

internal/clickhouse.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/ClickHouse/clickhouse-go/v2"
88
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
99
"log"
10+
"log/slog"
1011
"regexp"
1112
"strings"
1213
"time"
@@ -101,15 +102,15 @@ func (c *ClickhouseDbqConnector) ProfileDataset(dataset string) (*TableMetrics,
101102
ColumnsMetrics: make(map[string]*ColumnMetrics),
102103
}
103104

104-
log.Printf("Calculating metrics for table: %s", dataset)
105+
slog.Debug("Calculating metrics for table:", dataset)
105106

106107
// Total Row Count
107-
log.Printf("Fetching total row count...")
108+
slog.Debug("Fetching total row count...")
108109
err := c.cnn.QueryRow(ctx, fmt.Sprintf("SELECT count() FROM %s", dataset)).Scan(&metrics.TotalRows)
109110
if err != nil {
110111
return nil, fmt.Errorf("failed to get total row count for %s: %w", dataset, err)
111112
}
112-
log.Printf("Total rows: %d", metrics.TotalRows)
113+
slog.Debug("Total rows: %d", metrics.TotalRows)
113114

114115
// Get Column Information (Name and Type)
115116
columnsToProcess, err := fetchColumns(c.cnn, ctx, databaseName, tableName)
@@ -118,7 +119,7 @@ func (c *ClickhouseDbqConnector) ProfileDataset(dataset string) (*TableMetrics,
118119
}
119120

120121
if len(columnsToProcess) == 0 {
121-
log.Printf("Warning: No columns found for table %s. Returning basic info.", dataset)
122+
slog.Warn("Warning: No columns found for table %s. Returning basic info.", dataset)
122123
metrics.ProfilingDurationMs = time.Since(startTime).Milliseconds()
123124
return metrics, nil
124125
}
@@ -221,40 +222,39 @@ func (c *ClickhouseDbqConnector) ProfileDataset(dataset string) (*TableMetrics,
221222
return metrics, nil
222223
}
223224

224-
func (c *ClickhouseDbqConnector) RunCheck(check *Check, dataset string, defaultWhere string) (string, error) {
225+
func (c *ClickhouseDbqConnector) RunCheck(check *Check, dataset string, defaultWhere string) (bool, string, error) {
225226
if c.cnn == nil {
226-
return "", fmt.Errorf("database connection is not initialized")
227+
return false, "", fmt.Errorf("database connection is not initialized")
227228
}
228229

229230
query, err := generateDataCheckQuery(check, dataset, defaultWhere)
230231
if err != nil {
231-
return "", fmt.Errorf("failed to generate SQL for check (%s)/(%s): %s", check.ID, dataset, err.Error())
232+
return false, "", fmt.Errorf("failed to generate SQL for check (%s)/(%s): %s", check.ID, dataset, err.Error())
232233
}
233234

234-
log.Printf("Executing SQL for '%s': %s", check.ID, query)
235+
// todo: debug
236+
// log.Printf("Executing SQL for '%s': %s", check.ID, query)
235237

236-
startTime := time.Now()
238+
// startTime := time.Now()
237239
rows, err := c.cnn.Query(context.Background(), query)
238240
if err != nil {
239-
return "", fmt.Errorf("failed to query database: %w", err)
241+
return false, "", fmt.Errorf("failed to query database: %w", err)
240242
}
241243
defer rows.Close()
242-
elapsed := time.Since(startTime).Milliseconds()
244+
// _ := time.Since(startTime).Milliseconds()
243245

246+
var checkPassed bool
244247
for rows.Next() {
245-
var checkPassed bool
246248
if err := rows.Scan(&checkPassed); err != nil {
247-
return "", fmt.Errorf("failed to scan row: %w", err)
249+
return false, "", fmt.Errorf("failed to scan row: %w", err)
248250
}
249-
log.Printf("Check passed: %t (in %d ms)", checkPassed, elapsed)
250-
log.Printf("---")
251251
}
252252

253253
if err = rows.Err(); err != nil {
254-
return "", fmt.Errorf("error occurred during row iteration: %w", err)
254+
return false, "", fmt.Errorf("error occurred during row iteration: %w", err)
255255
}
256256

257-
return "", nil
257+
return checkPassed, "", nil
258258
}
259259

260260
func fetchColumns(cnn driver.Conn, ctx context.Context, databaseName string, tableName string) ([]ColumnInfo, error) {
@@ -379,7 +379,7 @@ func isStringCHType(dataType string) bool {
379379

380380
func startWithAnyOf(prefixes []string, s string) bool {
381381
for _, prefix := range prefixes {
382-
if strings.HasPrefix(s, prefix) {
382+
if strings.HasPrefix(s, strings.ToLower(prefix)) {
383383
return true
384384
}
385385
}

internal/dbq_config.go

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,5 @@
11
package internal
22

3-
import (
4-
"gopkg.in/yaml.v3"
5-
"log"
6-
"os"
7-
)
8-
93
type DbqConfig struct {
104
Version string `yaml:"version"`
115
DataSources []DataSource `yaml:"datasources"`
@@ -25,21 +19,3 @@ type ConfigDetails struct {
2519
Password string `yaml:"password"`
2620
Database string `yaml:"database,omitempty"`
2721
}
28-
29-
func LoadDbqSetting(fileName string) (*DbqConfig, error) {
30-
file, err := os.Open(fileName)
31-
defer file.Close()
32-
if err != nil {
33-
log.Printf("Error opening file: %v\n", err)
34-
return nil, err
35-
}
36-
37-
var settings DbqConfig
38-
decoder := yaml.NewDecoder(file)
39-
if err := decoder.Decode(&settings); err != nil {
40-
log.Printf("Error decoding YAML: %v\n", err)
41-
return nil, err
42-
}
43-
44-
return &settings, nil
45-
}

internal/dbq_connector.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ type DbqConnector interface {
44
Ping() (string, error)
55
ImportDatasets(filter string) ([]string, error)
66
ProfileDataset(dataset string) (*TableMetrics, error)
7-
RunCheck(check *Check, dataset string, defaultWhere string) (string, error)
7+
RunCheck(check *Check, dataset string, defaultWhere string) (bool, string, error)
88
}
99

1010
const (
@@ -16,12 +16,12 @@ type ColumnMetrics struct {
1616
ColumnComment string `json:"column_comment"`
1717
DataType string `json:"data_type"`
1818
NullCount uint64 `json:"null_count"`
19-
BlankCount *int64 `json:"blank_count,omitempty"` // Applicable only for String types
20-
MinValue *float64 `json:"min_value,omitempty"` // Numeric only
21-
MaxValue *float64 `json:"max_value,omitempty"` // Numeric only
22-
AvgValue *float64 `json:"avg_value,omitempty"` // Numeric only
23-
StddevValue *float64 `json:"stddev_value,omitempty"` // Numeric only (Population StdDev)
24-
MostFrequentValue *string `json:"most_frequent_value,omitempty"` // Using NullString to handle NULL as most frequent
19+
BlankCount *int64 `json:"blank_count,omitempty"` // string only
20+
MinValue *float64 `json:"min_value,omitempty"` // numeric only
21+
MaxValue *float64 `json:"max_value,omitempty"` // numeric only
22+
AvgValue *float64 `json:"avg_value,omitempty"` // numeric only
23+
StddevValue *float64 `json:"stddev_value,omitempty"` // numeric only (Population StdDev)
24+
MostFrequentValue *string `json:"most_frequent_value,omitempty"` // pointer to handle NULL as most frequent
2525
ProfilingDurationMs int64 `json:"profiling_duration_ms"`
2626
}
2727

readme.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@ It is designed to be easy to use and integrate into your existing workflow.
2828
- [x] implement aliases for common checks based on raw sql check
2929
- [x] fix cmd descriptions
3030
- [x] review todos
31+
- [x] improve output
3132
- [ ] basic cross validation (dataset is defined)
3233
- [ ] review logs
3334
- [ ] review crashes (wrong arguments)
3435
- [ ] default values (e.g. severity)
35-
- [ ] improve output
3636
- [ ] quiet/verbose mode for logs
3737
- [ ] docs
3838

0 commit comments

Comments
 (0)