@@ -20,36 +20,56 @@ package column
20
20
import (
21
21
"database/sql/driver"
22
22
"fmt"
23
- "github.com/ClickHouse/ch-go/proto"
24
- "github.com/ClickHouse/clickhouse-go/v2/lib/chcol"
25
23
"math"
26
24
"reflect"
25
+ "strconv"
27
26
"strings"
27
+
28
+ "github.com/ClickHouse/ch-go/proto"
29
+ "github.com/ClickHouse/clickhouse-go/v2/lib/chcol"
28
30
)
29
31
30
- const SupportedDynamicSerializationVersion = 3
31
- const DefaultMaxDynamicTypes = 32
32
+ const DynamicSerializationVersion = 3
33
+ const DynamicDeprecatedSerializationVersion = 1
32
34
const DynamicNullDiscriminator = - 1 // The Null index changes as data is being built, use -1 as placeholder for writes.
35
+ const DefaultMaxDynamicTypes = 32
36
+
37
+ func supportsFlatDynamicJSON (sc * ServerContext ) bool {
38
+ return sc .VersionMajor >= 25 && sc .VersionMinor >= 6
39
+ }
33
40
34
41
type Dynamic struct {
35
42
chType Type
36
43
sc * ServerContext
37
44
name string
38
45
46
+ serializationVersion uint64
47
+
39
48
totalTypes int // Null is last type index + 1, so this doubles as the Null type index for reads.
40
49
discriminators []int
41
50
offsets []int
42
51
43
52
columns []Interface
44
- columnIndexByName map [string ]int
53
+ columnIndexByType map [string ]int
54
+
55
+ deprecated deprecatedDynamic
45
56
}
46
57
47
58
func (c * Dynamic ) parse (t Type , sc * ServerContext ) (_ * Dynamic , err error ) {
48
59
c .chType = t
49
60
c .sc = sc
50
61
tStr := string (t )
51
62
52
- c .columnIndexByName = make (map [string ]int )
63
+ c .columnIndexByType = make (map [string ]int )
64
+
65
+ if ! supportsFlatDynamicJSON (sc ) {
66
+ // SharedVariant is special, and does not count against totalTypes
67
+ sv , _ := Type ("SharedVariant" ).Column ("" , sc )
68
+ c .addColumn (sv )
69
+
70
+ c .deprecated .maxTypes = DefaultMaxDynamicTypes
71
+ c .totalTypes = 0 // Reset to 0 after adding SharedVariant
72
+ }
53
73
54
74
if tStr == "Dynamic" {
55
75
return c , nil
@@ -59,13 +79,28 @@ func (c *Dynamic) parse(t Type, sc *ServerContext) (_ *Dynamic, err error) {
59
79
return nil , & UnsupportedColumnTypeError {t : t }
60
80
}
61
81
82
+ if ! supportsFlatDynamicJSON (sc ) {
83
+ typeParamsStr := strings .TrimPrefix (tStr , "Dynamic(" )
84
+ typeParamsStr = strings .TrimSuffix (typeParamsStr , ")" )
85
+
86
+ if strings .HasPrefix (typeParamsStr , "max_types=" ) {
87
+ v := strings .TrimPrefix (typeParamsStr , "max_types=" )
88
+ if maxTypes , err := strconv .Atoi (v ); err == nil {
89
+ c .deprecated .maxTypes = maxTypes
90
+ }
91
+ }
92
+ }
93
+
62
94
return c , nil
63
95
}
64
96
65
97
func (c * Dynamic ) addColumn (col Interface ) int {
66
- colIndex := len (c .columns )
98
+ typeName := string (col .Type ())
99
+ c .deprecated .typeNames = append (c .deprecated .typeNames , typeName )
100
+
101
+ colIndex := len (c .deprecated .typeNames ) - 1
67
102
c .columns = append (c .columns , col )
68
- c .columnIndexByName [ string ( col . Type ()) ] = colIndex
103
+ c .columnIndexByType [ typeName ] = colIndex
69
104
c .totalTypes ++
70
105
71
106
return colIndex
@@ -88,9 +123,16 @@ func (c *Dynamic) Row(i int, ptr bool) any {
88
123
offsetIndex := c .offsets [i ]
89
124
var value any
90
125
var chType string
91
- if typeIndex != c .totalTypes {
92
- value = c .columns [typeIndex ].Row (offsetIndex , ptr )
93
- chType = string (c .columns [typeIndex ].Type ())
126
+ if c .serializationVersion == DynamicDeprecatedSerializationVersion {
127
+ if typeIndex != DynamicNullDiscriminator {
128
+ value = c .columns [typeIndex ].Row (offsetIndex , ptr )
129
+ chType = string (c .columns [typeIndex ].Type ())
130
+ }
131
+ } else {
132
+ if typeIndex != c .totalTypes {
133
+ value = c .columns [typeIndex ].Row (offsetIndex , ptr )
134
+ chType = string (c .columns [typeIndex ].Type ())
135
+ }
94
136
}
95
137
96
138
dyn := chcol .NewDynamicWithType (value , chType )
@@ -106,9 +148,16 @@ func (c *Dynamic) ScanRow(dest any, row int) error {
106
148
offsetIndex := c .offsets [row ]
107
149
var value any
108
150
var chType string
109
- if typeIndex != c .totalTypes {
110
- value = c .columns [typeIndex ].Row (offsetIndex , false )
111
- chType = string (c .columns [typeIndex ].Type ())
151
+ if c .serializationVersion == DynamicDeprecatedSerializationVersion {
152
+ if typeIndex != DynamicNullDiscriminator {
153
+ value = c .columns [typeIndex ].Row (offsetIndex , false )
154
+ chType = string (c .columns [typeIndex ].Type ())
155
+ }
156
+ } else {
157
+ if typeIndex != c .totalTypes {
158
+ value = c .columns [typeIndex ].Row (offsetIndex , false )
159
+ chType = string (c .columns [typeIndex ].Type ())
160
+ }
112
161
}
113
162
114
163
switch v := dest .(type ) {
@@ -119,8 +168,14 @@ func (c *Dynamic) ScanRow(dest any, row int) error {
119
168
dyn := chcol .NewDynamicWithType (value , chType )
120
169
* * v = dyn
121
170
default :
122
- if typeIndex == c .totalTypes {
123
- return nil
171
+ if c .serializationVersion == DynamicDeprecatedSerializationVersion {
172
+ if typeIndex == DynamicNullDiscriminator {
173
+ return nil
174
+ }
175
+ } else {
176
+ if typeIndex == c .totalTypes {
177
+ return nil
178
+ }
124
179
}
125
180
126
181
if err := c .columns [typeIndex ].ScanRow (dest , offsetIndex ); err != nil {
@@ -206,7 +261,7 @@ func (c *Dynamic) AppendRow(v any) error {
206
261
207
262
if requestedType != "" {
208
263
var col Interface
209
- colIndex , ok := c .columnIndexByName [requestedType ]
264
+ colIndex , ok := c .columnIndexByType [requestedType ]
210
265
if ok {
211
266
col = c .columns [colIndex ]
212
267
} else {
@@ -229,6 +284,11 @@ func (c *Dynamic) AppendRow(v any) error {
229
284
230
285
// If preferred type wasn't provided, try each column
231
286
for i , col := range c .columns {
287
+ if c .deprecated .typeNames [i ] == "SharedVariant" {
288
+ // Do not try to fit into SharedVariant
289
+ continue
290
+ }
291
+
232
292
if err := col .AppendRow (v ); err == nil {
233
293
c .appendDiscriminatorRow (i )
234
294
return nil
@@ -245,7 +305,7 @@ func (c *Dynamic) AppendRow(v any) error {
245
305
}
246
306
247
307
func (c * Dynamic ) encodeHeader (buffer * proto.Buffer ) error {
248
- buffer .PutUInt64 (SupportedDynamicSerializationVersion )
308
+ buffer .PutUInt64 (DynamicSerializationVersion )
249
309
buffer .PutUVarInt (uint64 (c .totalTypes ))
250
310
251
311
for _ , col := range c .columns {
@@ -292,11 +352,20 @@ func (c *Dynamic) encodeData(buffer *proto.Buffer) {
292
352
}
293
353
294
354
func (c * Dynamic ) WriteStatePrefix (buffer * proto.Buffer ) error {
295
- return c .encodeHeader (buffer )
355
+ if supportsFlatDynamicJSON (c .sc ) {
356
+ return c .encodeHeader (buffer )
357
+ }
358
+
359
+ return c .encodeHeader_v1 (buffer )
296
360
}
297
361
298
362
func (c * Dynamic ) Encode (buffer * proto.Buffer ) {
299
- c .encodeData (buffer )
363
+ if supportsFlatDynamicJSON (c .sc ) {
364
+ c .encodeData (buffer )
365
+ return
366
+ }
367
+
368
+ c .encodeData_v1 (buffer )
300
369
}
301
370
302
371
func (c * Dynamic ) ScanType () reflect.Type {
@@ -312,24 +381,13 @@ func (c *Dynamic) Reset() {
312
381
}
313
382
314
383
func (c * Dynamic ) decodeHeader (reader * proto.Reader ) error {
315
- dynamicSerializationVersion , err := reader .UInt64 ()
316
- if err != nil {
317
- return fmt .Errorf ("failed to read dynamic serialization version: %w" , err )
318
- }
319
-
320
- if dynamicSerializationVersion == DeprecatedDynamicSerializationVersion {
321
- return fmt .Errorf ("deprecated dynamic serialization version: %d, enable \" output_format_native_use_flattened_dynamic_and_json_serialization\" in your settings" , dynamicSerializationVersion )
322
- } else if dynamicSerializationVersion != SupportedDynamicSerializationVersion {
323
- return fmt .Errorf ("unsupported dynamic serialization version: %d" , dynamicSerializationVersion )
324
- }
325
-
326
384
totalTypes , err := reader .UVarInt ()
327
385
if err != nil {
328
386
return fmt .Errorf ("failed to read total types for dynamic column: %w" , err )
329
387
}
330
388
331
389
c .columns = make ([]Interface , 0 , totalTypes )
332
- c .columnIndexByName = make (map [string ]int , totalTypes )
390
+ c .columnIndexByType = make (map [string ]int , totalTypes )
333
391
for i := uint64 (0 ); i < totalTypes ; i ++ {
334
392
typeName , err := reader .Str ()
335
393
if err != nil {
@@ -383,7 +441,7 @@ func discriminatorReader(totalTypes int, reader *proto.Reader) func() (int, erro
383
441
func (c * Dynamic ) decodeData (reader * proto.Reader , rows int ) error {
384
442
c .discriminators = make ([]int , rows )
385
443
c .offsets = make ([]int , rows )
386
- rowCountByType := make ([]int , len ( c . columns ) )
444
+ rowCountByType := make ([]int , c . totalTypes )
387
445
388
446
readDiscriminator := discriminatorReader (c .totalTypes , reader )
389
447
for i := 0 ; i < rows ; i ++ {
@@ -410,19 +468,29 @@ func (c *Dynamic) decodeData(reader *proto.Reader, rows int) error {
410
468
}
411
469
412
470
func (c * Dynamic ) ReadStatePrefix (reader * proto.Reader ) error {
413
- err := c . decodeHeader ( reader )
471
+ dynamicSerializationVersion , err := reader . UInt64 ( )
414
472
if err != nil {
415
- return fmt .Errorf ("failed to decode dynamic header : %w" , err )
473
+ return fmt .Errorf ("failed to read dynamic serialization version : %w" , err )
416
474
}
475
+ c .serializationVersion = dynamicSerializationVersion
417
476
418
- return nil
477
+ switch c .serializationVersion {
478
+ case DynamicSerializationVersion :
479
+ return c .decodeHeader (reader )
480
+ case DynamicDeprecatedSerializationVersion :
481
+ return c .decodeHeader_v1 (reader )
482
+ default :
483
+ return fmt .Errorf ("unsupported dynamic serialization version: %d" , dynamicSerializationVersion )
484
+ }
419
485
}
420
486
421
487
func (c * Dynamic ) Decode (reader * proto.Reader , rows int ) error {
422
- err := c .decodeData (reader , rows )
423
- if err != nil {
424
- return fmt .Errorf ("failed to decode dynamic data: %w" , err )
488
+ switch c .serializationVersion {
489
+ case DynamicSerializationVersion :
490
+ return c .decodeData (reader , rows )
491
+ case DynamicDeprecatedSerializationVersion :
492
+ return c .decodeData_v1 (reader , rows )
493
+ default :
494
+ return fmt .Errorf ("unsupported dynamic serialization version: %d" , c .serializationVersion )
425
495
}
426
-
427
- return nil
428
496
}
0 commit comments