1
+ import logging
2
+ import re
3
+ from datetime import datetime
4
+ from marshmallow import fields , Schema
5
+ from marshmallow .validate import Range
6
+ from typing import Any
7
+ from sqlalchemy .engine .url import URL
8
+
9
+ from flask import current_app
10
+ from flask_babel import gettext as __
11
+ from sqlalchemy import types
12
+ from superset .db_engine_specs import BaseEngineSpec
13
+ from superset .models .core import Database
14
+
15
+ logger = logging .getLogger (__name__ )
16
+
17
+ try :
18
+ from timeplus_connect .common import set_setting
19
+ from timeplus_connect .datatypes .format import set_default_formats
20
+
21
+ # override default formats for compatibility
22
+ set_default_formats (
23
+ "fixed_string" ,
24
+ "string" ,
25
+ "ipv*" ,
26
+ "string" ,
27
+ "uint64" ,
28
+ "signed" ,
29
+ "UUID" ,
30
+ "string" ,
31
+ "*int256" ,
32
+ "string" ,
33
+ "*int128" ,
34
+ "string" ,
35
+ )
36
+ set_setting (
37
+ "product_name" ,
38
+ f"superset/{ current_app .config .get ('VERSION_STRING' , 'dev' )} " ,
39
+ )
40
+ except ImportError : # Timeplus Connect not installed, do nothing
41
+ pass
42
+
43
+ class TimeplusParametersSchema (Schema ):
44
+ username = fields .String (allow_none = True , metadata = {"description" : __ ("Username" )})
45
+ password = fields .String (allow_none = True , metadata = {"description" : __ ("Password" )})
46
+ host = fields .String (
47
+ required = True , metadata = {"description" : __ ("Hostname or IP address" )}
48
+ )
49
+ port = fields .Integer (
50
+ allow_none = True ,
51
+ metadata = {"description" : __ ("Database port" )},
52
+ validate = Range (min = 0 , max = 65535 ),
53
+ )
54
+ database = fields .String (
55
+ allow_none = True , metadata = {"description" : __ ("Database name" )}
56
+ )
57
+ encryption = fields .Boolean (
58
+ dump_default = True ,
59
+ metadata = {"description" : __ ("Use an encrypted connection to the database" )},
60
+ )
61
+ query = fields .Dict (
62
+ keys = fields .Str (),
63
+ values = fields .Raw (),
64
+ metadata = {"description" : __ ("Additional parameters" )},
65
+ )
66
+
67
+
68
+ class TimeplusEngineSpec (BaseEngineSpec ):
69
+ """Engine spec for timeplus-connect connector"""
70
+
71
+ engine = "timeplusdb"
72
+ engine_name = "Timeplus Connect (Superset)"
73
+
74
+ _show_functions_column = "name"
75
+ supports_file_upload = False
76
+
77
+ sqlalchemy_uri_placeholder = (
78
+ "timeplusdb://user:password@host[:port][/dbname][?secure=value&=value...]"
79
+ )
80
+
81
+ parameters_schema = TimeplusParametersSchema ()
82
+ encryption_parameters = {"secure" : "true" }
83
+
84
+ @classmethod
85
+ def epoch_to_dttm (cls ) -> str :
86
+ return "{col}"
87
+
88
+ @classmethod
89
+ def convert_dttm (
90
+ cls , target_type : str , dttm : datetime , db_extra : dict [str , Any ] | None = None
91
+ ) -> str | None :
92
+ sqla_type = cls .get_sqla_column_type (target_type )
93
+
94
+ if isinstance (sqla_type , types .Date ):
95
+ return f"to_date('{ dttm .date ().isoformat ()} ')"
96
+ if isinstance (sqla_type , types .DateTime ):
97
+ return f"""to_datetime('{ dttm .isoformat (sep = " " , timespec = "seconds" )} ')"""
98
+ return None
99
+
100
+
101
+ @classmethod
102
+ def get_function_names (cls , database : Database ) -> list [str ]:
103
+ # pylint: disable=import-outside-toplevel, import-error
104
+ from timeplus_connect .driver .exceptions import TimeplusError
105
+
106
+ if cls ._function_names :
107
+ return cls ._function_names
108
+ try :
109
+ names = database .get_df (
110
+ "SELECT name FROM system.functions UNION ALL "
111
+ + "SELECT name FROM system.table_functions"
112
+ )["name" ].tolist ()
113
+ cls ._function_names = names
114
+ return names
115
+ except TimeplusError :
116
+ logger .exception ("Error retrieving system.functions" )
117
+ return []
118
+
119
+ @classmethod
120
+ def get_datatype (cls , type_code : str ) -> str :
121
+ # keep it lowercase, as Timeplus types aren't typical SHOUTCASE ANSI SQL
122
+ return type_code
123
+
124
+ @classmethod
125
+ def build_sqlalchemy_uri (
126
+ cls ,
127
+ parameters : dict [str , str ],
128
+ encrypted_extra : dict [str , str ] | None = None ,
129
+ ) -> str :
130
+ url_params = parameters .copy ()
131
+ if url_params .get ("encryption" ):
132
+ query = parameters .get ("query" , {}).copy ()
133
+ query .update (cls .encryption_parameters )
134
+ url_params ["query" ] = query
135
+ if not url_params .get ("database" ):
136
+ url_params ["database" ] = "default"
137
+ url_params .pop ("encryption" , None )
138
+ return str (URL .create (f"{ cls .engine } +{ cls .default_driver } " , ** url_params ))
0 commit comments