Skip to content

Add gotData and endOfProcess handlers #1564

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions conn_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type onProcess struct {
progress func(*Progress)
profileInfo func(*ProfileInfo)
profileEvents func([]ProfileEvent)
gotData func()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference of gotData vs data? Other than the function signature?

Copy link
Author

@velom velom Jun 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data is "internal" handler set by library itself during query start:

var (
errors = make(chan error, 1)
stream = make(chan *proto.Block, bufferSize)
)
go func() {
onProcess.data = func(b *proto.Block) {
stream <- b
}
err := c.process(ctx, onProcess)
if err != nil {
c.debugf("[query] process error: %v", err)
errors <- err
}
close(stream)
close(errors)
release(c, err)
}()

endOfProcess func()
}

func (c *connect) firstBlock(ctx context.Context, on *onProcess) (*proto.Block, error) {
Expand Down Expand Up @@ -143,6 +145,9 @@ func (c *connect) process(ctx context.Context, on *onProcess) error {
func (c *connect) processImpl(ctx context.Context, on *onProcess) error {
c.readerMutex.Lock()
defer c.readerMutex.Unlock()
if on.endOfProcess != nil {
defer on.endOfProcess()
}

for {
if c.reader == nil {
Expand Down Expand Up @@ -178,6 +183,9 @@ func (c *connect) handle(ctx context.Context, packet byte, on *onProcess) error
}
if block.Rows() != 0 && on.data != nil {
on.data(block)
if on.gotData != nil {
on.gotData()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
on.gotData()
on.gotData(block)

why doesn't gotData take block?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you'd like to, I can add it

}
}
case proto.ServerException:
return c.exception()
Expand Down
29 changes: 28 additions & 1 deletion context.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import (
"maps"
"time"

"github.com/ClickHouse/clickhouse-go/v2/ext"
"go.opentelemetry.io/otel/trace"

"github.com/ClickHouse/clickhouse-go/v2/ext"
)

var _contextOptionKey = &QueryOptions{
Expand Down Expand Up @@ -59,6 +60,8 @@ type (
progress func(*Progress)
profileInfo func(*ProfileInfo)
profileEvents func([]ProfileEvent)
gotData func()
endOfProcess func()
}
settings Settings
parameters Parameters
Expand Down Expand Up @@ -147,6 +150,20 @@ func WithProfileEvents(fn func([]ProfileEvent)) QueryOption {
}
}

func WithGotData(fn func()) QueryOption {
return func(o *QueryOptions) error {
o.events.gotData = fn
return nil
}
}

func WithEndOfProcess(fn func()) QueryOption {
return func(o *QueryOptions) error {
o.events.endOfProcess = fn
return nil
}
}

func WithExternalTable(t ...*ext.Table) QueryOption {
return func(o *QueryOptions) error {
o.external = append(o.external, t...)
Expand Down Expand Up @@ -273,6 +290,16 @@ func (q *QueryOptions) onProcess() *onProcess {
q.events.profileEvents(events)
}
},
gotData: func() {
if q.events.gotData != nil {
q.events.gotData()
}
},
endOfProcess: func() {
if q.events.endOfProcess != nil {
q.events.endOfProcess()
}
},
}
}

Expand Down
53 changes: 53 additions & 0 deletions examples/clickhouse_api/end_of_process.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Licensed to ClickHouse, Inc. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. ClickHouse, Inc. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package clickhouse_api

import (
"context"
"fmt"

"github.com/ClickHouse/clickhouse-go/v2"
)

func EndOfProcessAndGotData() error {
conn, err := GetNativeConnection(clickhouse.Settings{
"send_logs_level": "trace",
}, nil, nil)
if err != nil {
return err
}
var totalBlocks int
// use context to pass a call back for end of process and got data
ctx := clickhouse.Context(context.Background(), clickhouse.WithEndOfProcess(func() {
fmt.Println("process is finished")
}), clickhouse.WithGotData(func() {
totalBlocks++
}))

rows, err := conn.Query(ctx, "SELECT number from numbers(1000000) LIMIT 1000000")
if err != nil {
return err
}
defer rows.Close()

for rows.Next() {
}

fmt.Printf("Total data blocks: %d\n", totalBlocks)
return rows.Err()
}
10 changes: 8 additions & 2 deletions examples/clickhouse_api/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ package clickhouse_api
import (
"context"
"fmt"
clickhouse_tests "github.com/ClickHouse/clickhouse-go/v2/tests"
"github.com/stretchr/testify/require"
"os"
"strconv"
"testing"

"github.com/stretchr/testify/require"

clickhouse_tests "github.com/ClickHouse/clickhouse-go/v2/tests"
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -237,3 +239,7 @@ func TestJSONStringExample(t *testing.T) {
t.Skip("client cannot receive JSON strings")
require.NoError(t, JSONStringExample())
}

func TestEndOfProcessAndGotBlock(t *testing.T) {
require.NoError(t, EndOfProcessAndGotData())
}
53 changes: 53 additions & 0 deletions examples/std/end_of_process.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Licensed to ClickHouse, Inc. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. ClickHouse, Inc. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package std

import (
"context"
"fmt"

"github.com/ClickHouse/clickhouse-go/v2"
)

func EndOfProcessAndGotData() error {
conn, err := GetStdOpenDBConnection(clickhouse.Native, clickhouse.Settings{
"send_logs_level": "trace",
}, nil, nil)
if err != nil {
return err
}
var totalBlocks int
// use context to pass a call back for end of process and got data
ctx := clickhouse.Context(context.Background(), clickhouse.WithEndOfProcess(func() {
fmt.Println("process is finished")
}), clickhouse.WithGotData(func() {
totalBlocks++
}))

rows, err := conn.QueryContext(ctx, "SELECT number from numbers(1000000) LIMIT 1000000")
if err != nil {
return err
}
defer rows.Close()

for rows.Next() {
}

fmt.Printf("Total data blocks: %d\n", totalBlocks)
return rows.Err()
}
7 changes: 6 additions & 1 deletion examples/std/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import (
"testing"
"time"

clickhouse_tests "github.com/ClickHouse/clickhouse-go/v2/tests"
"github.com/stretchr/testify/require"

clickhouse_tests "github.com/ClickHouse/clickhouse-go/v2/tests"
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -169,3 +170,7 @@ func TestJSONStringExample(t *testing.T) {
t.Skip("client cannot receive JSON strings")
require.NoError(t, JSONStringExample())
}

func TestEndOfProcessAndGotBlock(t *testing.T) {
require.NoError(t, EndOfProcessAndGotData())
}
Loading