Skip to content

Bugfix/9 should use show stream for get table names #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
VERSION := $(shell git describe --tags --abbrev=0 | sed 's/^v//')

version:
echo "Version: $(VERSION)"

build:
python3 -m pip install --upgrade build
python3 -m build

install: build
python3 -m pip install --upgrade dist/timeplus_connect-$(VERSION).tar.gz

lint:
pylint --rcfile=./pylintrc timeplus_connect
26 changes: 26 additions & 0 deletions test_dist/llamaIndex_dbreader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from llama_index.readers.database import DatabaseReader
import timeplus_connect


# Ensure the timeplus-connect driver is registered
TIMEPLUS_URI = "timeplus://user:password@localhost:8123"
db_reader = DatabaseReader(
uri=TIMEPLUS_URI # Use the explicit SQLAlchemy engine
)

print(f"db reader type: {type(db_reader)}")
print(type(db_reader.load_data))

### SQLDatabase class ###
# db.sql is an instance of SQLDatabase:
print(type(db_reader.sql_database))
# SQLDatabase available methods:
print(type(db_reader.sql_database.from_uri))
print(type(db_reader.sql_database.get_single_table_info))
print(type(db_reader.sql_database.get_table_columns))
print(type(db_reader.sql_database.get_usable_table_names))
print(type(db_reader.sql_database.insert_into_table))
print(type(db_reader.sql_database.run_sql))
# SQLDatabase available properties:
print(type(db_reader.sql_database.dialect))
print(type(db_reader.sql_database.engine))
2 changes: 1 addition & 1 deletion timeplus_connect/cc_sqlalchemy/ddl/tableengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ def build_engine(full_engine: str) -> Optional[TableEngine]:
engine_cls = engine_map[name]
except KeyError:
if not name.startswith('System'):
logger.warning('Engine %s not found', name)
logger.warning('Engine %s not found, full_engine: %s', name, full_engine)
return None
engine = engine_cls.__new__(engine_cls)
engine.name = name
Expand Down
17 changes: 14 additions & 3 deletions timeplus_connect/cc_sqlalchemy/dialect.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

from sqlalchemy.engine.default import DefaultDialect
from sqlalchemy.sql import text

from timeplus_connect import dbapi

Expand Down Expand Up @@ -37,24 +38,34 @@ class TimeplusDialect(DefaultDialect):
def dbapi(cls):
return dbapi

@classmethod
def import_dbapi(cls):
return dbapi

def initialize(self, connection):
pass

@staticmethod
def get_schema_names(connection, **_):
return [row.name for row in connection.execute('SHOW DATABASES')]
query = text('SHOW DATABASES')
return [row.name for row in connection.execute(query)]

@staticmethod
def has_database(connection, db_name):
return (connection.execute('SELECT name FROM system.databases ' +
f'WHERE name = {format_str(db_name)}')).rowcount > 0

def get_table_names(self, connection, schema=None, **kw):
cmd = 'SHOW STREAMS'
cmd = text('SHOW STREAMS') # Wrap in text() to make it an executable SQLAlchemy statement
if schema:
cmd += ' FROM ' + quote_identifier(schema)
cmd = text(f"SHOW STREAMS FROM {quote_identifier(schema)}") # Ensure schema is properly quoted

return [row.name for row in connection.execute(cmd)]

def get_columns(self, connection, table_name, schema=None, **kwargs):
inspector = self.inspector(connection)
return inspector.get_columns(table_name, schema, **kwargs)

def get_primary_keys(self, connection, table_name, schema=None, **kw):
return []

Expand Down
16 changes: 13 additions & 3 deletions timeplus_connect/cc_sqlalchemy/inspector.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sqlalchemy.schema as sa_schema

from sqlalchemy import text
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.orm.exc import NoResultFound

Expand All @@ -12,8 +13,12 @@


def get_engine(connection, table_name, schema=None):
result_set = connection.execute(
f"SELECT engine_full FROM system.tables WHERE database = '{schema}' and name = '{table_name}'")
if schema is None:
schema = 'default'

query = text(f"SELECT engine_full FROM system.tables WHERE database = '{schema}' and name = '{table_name}'")

result_set = connection.execute(query)
row = next(result_set, None)
if not row:
raise NoResultFound(f'STREAM {schema}.{table_name} does not exist')
Expand All @@ -34,8 +39,13 @@ def reflect_table(self, table, include_columns, exclude_columns, *_args, **_kwar
table.engine = get_engine(self.bind, table.name, schema)

def get_columns(self, table_name, schema=None, **_kwargs):
if schema is None:
schema = 'default'

table_id = full_table(table_name, schema)
result_set = self.bind.execute(f'DESCRIBE {table_id}')
query = text(f"DESCRIBE {table_id}")

result_set = self.bind.execute(query)
if not result_set:
raise NoResultFound(f'STREAM {full_table} does not exist')
columns = []
Expand Down