Skip to content

Commit 10c5b26

Browse files
committed
support data sampling and some clean up
1 parent 6103cf0 commit 10c5b26

File tree

10 files changed

+91
-32
lines changed

10 files changed

+91
-32
lines changed

checks.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
version: "1"
22
validations:
3-
- dataset: ch-local@[nyc_taxi.trips_small]
3+
- dataset: ch@[nyc_taxi.trips_small]
44
where: "pickup_datetime > '2014-01-01'"
55
checks:
66
- id: row_count > 0

cmd/check.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ By automating these checks, you can proactively identify and address data qualit
5050
}
5151

5252
log.Printf(" [%d/%d] '%s': %s", cIdx+1, len(rule.Checks), check.ID, getCheckResultLabel(pass))
53-
if !pass && internal.IdOrDefault(string(check.OnFail), internal.OnFailActionError) == "error" {
53+
if !pass && internal.IdOrDefault(string(check.OnFail), internal.OnFailActionError) == internal.OnFailActionError {
5454
exitCode = 1
5555
}
5656
}

cmd/import.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ This command is useful for quickly onboarding data from external systems, allowi
5858

5959
cmd.Flags().StringVarP(&dataSource, "datasource", "d", "", "Datasource from which datasets will be imported")
6060
cmd.Flags().StringVarP(&filter, "filter", "f", "", "Filter applied for dataset selection")
61-
cmd.Flags().BoolVarP(&updateCfg, "update-checks", "u", false, "Update checks config file in place")
61+
cmd.Flags().BoolVarP(&updateCfg, "update-config", "u", false, "Update dbq config file in place")
6262

6363
return cmd
6464
}

cmd/profile.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
func NewProfileCommand(app internal.DbqApp) *cobra.Command {
1111
var dataSource string
1212
var dataSet string
13+
var sample bool
1314

1415
cmd := &cobra.Command{
1516
Use: "profile",
@@ -39,7 +40,7 @@ and helps in making better decisions about data processing and analysis.
3940
}
4041

4142
for _, curDataSet := range dataSetsToProfile {
42-
metrics, err := app.ProfileDataset(dataSource, curDataSet)
43+
metrics, err := app.ProfileDataset(dataSource, curDataSet, sample)
4344
if err != nil {
4445
log.Printf("Failed to profile %s: %s\n", curDataSet, err)
4546
} else {
@@ -62,6 +63,7 @@ and helps in making better decisions about data processing and analysis.
6263
_ = cmd.MarkFlagRequired("datasource")
6364

6465
cmd.Flags().StringVarP(&dataSet, "dataset", "s", "", "Dataset within specified data source")
66+
cmd.Flags().BoolVarP(&sample, "sample", "m", false, "Include data samples in profiling report")
6567

6668
return cmd
6769
}

cmd/root.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@ package cmd
22

33
import (
44
"dbq/internal"
5+
"log/slog"
56
"os"
67

78
"github.com/spf13/cobra"
89
)
910

11+
var verbose bool
12+
1013
var rootCmd = &cobra.Command{
1114
Use: "dbq",
1215
Short: "dbq is a CLI tool for profiling data and running quality checks across various data sources",
@@ -25,10 +28,15 @@ func AddCommands(app internal.DbqApp) {
2528
rootCmd.AddCommand(NewCheckCommand(app))
2629
rootCmd.AddCommand(NewProfileCommand(app))
2730
rootCmd.AddCommand(NewVersionCommand())
31+
32+
if verbose {
33+
app.SetLogLevel(slog.LevelInfo)
34+
}
2835
}
2936

3037
func init() {
3138
// workaround for bootstrap config flag & unsupported flag issue
3239
var dbqConfigFile string
3340
rootCmd.PersistentFlags().StringVar(&dbqConfigFile, "config", "", "config file (default is $HOME/.dbq.yaml or ./dbq.yaml)")
41+
rootCmd.PersistentFlags().BoolVarP(&verbose, "verbose", "v", false, "Enables verbose logging")
3442
}

dbq.yaml

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
version: "1"
22
datasources:
3-
- id: ch-local
3+
- id: ch
44
type: clickhouse
55
configuration:
66
host: 0.0.0.0:19000
@@ -9,14 +9,4 @@ datasources:
99
password: changeme
1010
database: default
1111
datasets:
12-
- nyc_taxi.trips_big
1312
- nyc_taxi.trips_small
14-
- id: pgsql
15-
type: postgres
16-
configuration:
17-
host: 127.0.0.1
18-
port: 9004
19-
username: simple
20-
password: simple_pass
21-
datasets:
22-
- public.table_1

internal/app.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,30 +5,34 @@ import (
55
"github.com/spf13/cobra"
66
"github.com/spf13/viper"
77
"gopkg.in/yaml.v3"
8+
"log/slog"
89
"os"
910
"strings"
1011
)
1112

1213
type DbqApp interface {
1314
PingDataSource(srcId string) (string, error)
1415
ImportDatasets(srcId string, filter string) ([]string, error)
15-
ProfileDataset(srcId string, dataset string) (*TableMetrics, error)
16+
ProfileDataset(srcId string, dataset string, sample bool) (*TableMetrics, error)
1617
GetDbqConfig() *DbqConfig
1718
SaveDbqConfig() error
1819
FindDataSourceById(srcId string) *DataSource
1920
RunCheck(check *Check, dataSource *DataSource, dataset string, defaultWhere string) (bool, string, error)
21+
SetLogLevel(level slog.Level)
2022
}
2123

2224
type DbqAppImpl struct {
2325
dbqConfigPath string
2426
dbqConfig *DbqConfig
27+
logLevel slog.Level
2528
}
2629

2730
func NewDbqApp(dbqConfigPath string) DbqApp {
2831
dbqConfig, dbqConfigUsedPath := initConfig(dbqConfigPath)
2932
return &DbqAppImpl{
3033
dbqConfigPath: dbqConfigUsedPath,
3134
dbqConfig: dbqConfig,
35+
logLevel: slog.LevelError,
3236
}
3337
}
3438

@@ -58,13 +62,13 @@ func (app *DbqAppImpl) ImportDatasets(srcId string, filter string) ([]string, er
5862
return cnn.ImportDatasets(filter)
5963
}
6064

61-
func (app *DbqAppImpl) ProfileDataset(srcId string, dataset string) (*TableMetrics, error) {
65+
func (app *DbqAppImpl) ProfileDataset(srcId string, dataset string, sample bool) (*TableMetrics, error) {
6266
var dataSource = app.FindDataSourceById(srcId)
6367
cnn, err := getDbqConnector(*dataSource)
6468
if err != nil {
6569
return nil, err
6670
}
67-
return cnn.ProfileDataset(dataset)
71+
return cnn.ProfileDataset(dataset, sample)
6872
}
6973

7074
func (app *DbqAppImpl) GetDbqConfig() *DbqConfig {
@@ -102,6 +106,10 @@ func (app *DbqAppImpl) RunCheck(check *Check, dataSource *DataSource, dataset st
102106
return cnn.RunCheck(check, dataset, defaultWhere)
103107
}
104108

109+
func (app *DbqAppImpl) SetLogLevel(logLevel slog.Level) {
110+
app.logLevel = logLevel
111+
}
112+
105113
func initConfig(dbqConfigPath string) (*DbqConfig, string) {
106114
v := viper.New()
107115

internal/clickhouse.go

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
99
"log"
1010
"log/slog"
11+
"reflect"
1112
"regexp"
1213
"strings"
1314
"time"
@@ -84,7 +85,7 @@ func (c *ClickhouseDbqConnector) ImportDatasets(filter string) ([]string, error)
8485
return datasets, nil
8586
}
8687

87-
func (c *ClickhouseDbqConnector) ProfileDataset(dataset string) (*TableMetrics, error) {
88+
func (c *ClickhouseDbqConnector) ProfileDataset(dataset string, sample bool) (*TableMetrics, error) {
8889
startTime := time.Now()
8990
ctx := context.Background()
9091

@@ -112,6 +113,46 @@ func (c *ClickhouseDbqConnector) ProfileDataset(dataset string) (*TableMetrics,
112113
}
113114
slog.Debug("Total rows: %d", metrics.TotalRows)
114115

116+
// sample data if enabled
117+
if sample {
118+
sampleQuery := fmt.Sprintf("select * from %s.%s order by rand() limit 100", databaseName, tableName)
119+
120+
toCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
121+
defer cancel()
122+
123+
rows, err := c.cnn.Query(toCtx, sampleQuery)
124+
if err != nil {
125+
log.Printf("Warning: Failed to sample data %s: %v", err)
126+
}
127+
defer rows.Close()
128+
129+
var allRows []map[string]interface{}
130+
for rows.Next() {
131+
scanArgs := make([]interface{}, len(rows.Columns()))
132+
for i, colType := range rows.ColumnTypes() {
133+
scanType := colType.ScanType()
134+
valuePtr := reflect.New(scanType).Interface()
135+
scanArgs[i] = valuePtr
136+
}
137+
138+
err = rows.Scan(scanArgs...)
139+
if err != nil {
140+
log.Printf("Warning: Failed to scan row: %v", err)
141+
continue
142+
}
143+
144+
rowData := make(map[string]interface{})
145+
for i, colName := range rows.Columns() {
146+
scannedValue := reflect.ValueOf(scanArgs[i]).Elem().Interface()
147+
rowData[colName] = scannedValue
148+
}
149+
150+
allRows = append(allRows, rowData)
151+
}
152+
153+
metrics.RowsSample = allRows
154+
}
155+
115156
// Get Column Information (Name and Type)
116157
columnsToProcess, err := fetchColumns(c.cnn, ctx, databaseName, tableName)
117158
if err != nil {
@@ -131,9 +172,10 @@ func (c *ClickhouseDbqConnector) ProfileDataset(dataset string) (*TableMetrics,
131172
colStartTime := time.Now()
132173
log.Printf("Processing column: %s (Type: %s)", col.Name, col.Type)
133174
colMetrics := &ColumnMetrics{
134-
ColumnName: col.Name,
135-
DataType: col.Type,
136-
ColumnComment: col.Comment,
175+
ColumnName: col.Name,
176+
DataType: col.Type,
177+
ColumnComment: col.Comment,
178+
ColumnPosition: col.Position,
137179
}
138180

139181
// Null Count (all types)
@@ -259,7 +301,7 @@ func (c *ClickhouseDbqConnector) RunCheck(check *Check, dataset string, defaultW
259301

260302
func fetchColumns(cnn driver.Conn, ctx context.Context, databaseName string, tableName string) ([]ColumnInfo, error) {
261303
columnQuery := `
262-
SELECT name, type, comment
304+
SELECT name, type, comment, position
263305
FROM system.columns
264306
WHERE database = ? AND table = ?
265307
ORDER BY position`
@@ -273,10 +315,11 @@ func fetchColumns(cnn driver.Conn, ctx context.Context, databaseName string, tab
273315
var cols []ColumnInfo
274316
for rows.Next() {
275317
var colName, colType, comment string
276-
if err := rows.Scan(&colName, &colType, &comment); err != nil {
318+
var pos uint64
319+
if err := rows.Scan(&colName, &colType, &comment, &pos); err != nil {
277320
return nil, fmt.Errorf("failed to scan column info: %w", err)
278321
}
279-
cols = append(cols, ColumnInfo{Name: colName, Type: colType, Comment: comment})
322+
cols = append(cols, ColumnInfo{Name: colName, Type: colType, Comment: comment, Position: uint(pos)})
280323
}
281324

282325
if err = rows.Err(); err != nil {

internal/dbq_connector.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package internal
33
type DbqConnector interface {
44
Ping() (string, error)
55
ImportDatasets(filter string) ([]string, error)
6-
ProfileDataset(dataset string) (*TableMetrics, error)
6+
ProfileDataset(dataset string, sample bool) (*TableMetrics, error)
77
RunCheck(check *Check, dataset string, defaultWhere string) (bool, string, error)
88
}
99

@@ -12,8 +12,9 @@ const (
1212
)
1313

1414
type ColumnMetrics struct {
15-
ColumnName string `json:"column_name"`
16-
ColumnComment string `json:"column_comment"`
15+
ColumnName string `json:"col_name"`
16+
ColumnComment string `json:"col_comment"`
17+
ColumnPosition uint `json:"col_position"`
1718
DataType string `json:"data_type"`
1819
NullCount uint64 `json:"null_count"`
1920
BlankCount *int64 `json:"blank_count,omitempty"` // string only
@@ -31,13 +32,15 @@ type TableMetrics struct {
3132
DatabaseName string `json:"database_name"`
3233
TotalRows uint64 `json:"total_rows"`
3334
ColumnsMetrics map[string]*ColumnMetrics `json:"columns_metrics"`
35+
RowsSample []map[string]interface{} `json:"rows_sample"`
3436
ProfilingDurationMs int64 `json:"profiling_duration_ms"`
3537
}
3638

3739
type ColumnInfo struct {
38-
Name string
39-
Type string
40-
Comment string
40+
Name string
41+
Type string
42+
Comment string
43+
Position uint
4144
}
4245

4346
type ProfileResultOutput struct {

readme.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ datasources:
5454
5555
### Checks example
5656
57+
```yaml
58+
# checks.yaml
59+
60+
```
61+
5762
### Commands
5863

5964
```bash
@@ -76,9 +81,9 @@ Available Commands:
7681
Flags:
7782
--config string config file (default is $HOME/.dbq.yaml or ./dbq.yaml)
7883
-h, --help help for dbq
84+
-v, --verbose Enables verbose logging
7985
8086
Use "dbq [command] --help" for more information about a command.
81-
8287
```
8388

8489
### Quick start

0 commit comments

Comments
 (0)