Skip to content

Commit dba49c0

Browse files
committed
Connection pool and execute refactor
1 parent 8fc9a5d commit dba49c0

File tree

3 files changed

+277
-106
lines changed

3 files changed

+277
-106
lines changed

datasette/app.py

Lines changed: 20 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,11 @@
2424

2525
from .utils import (
2626
QueryInterrupted,
27-
Results,
2827
escape_css_string,
2928
escape_sqlite,
3029
get_plugins,
3130
module_from_path,
3231
sqlite3,
33-
sqlite_timelimit,
3432
to_css_class,
3533
)
3634
from .utils.asgi import (
@@ -42,13 +40,12 @@
4240
asgi_send_json,
4341
asgi_send_redirect,
4442
)
45-
from .tracer import trace, AsgiTracer
43+
from .tracer import AsgiTracer
4644
from .plugins import pm, DEFAULT_PLUGINS
4745
from .version import __version__
4846

4947
app_root = Path(__file__).parent.parent
5048

51-
connections = threading.local()
5249
MEMORY = object()
5350

5451
ConfigOption = collections.namedtuple("ConfigOption", ("name", "default", "help"))
@@ -336,6 +333,25 @@ def prepare_connection(self, conn):
336333
# pylint: disable=no-member
337334
pm.hook.prepare_connection(conn=conn)
338335

336+
async def execute(
337+
self,
338+
db_name,
339+
sql,
340+
params=None,
341+
truncate=False,
342+
custom_time_limit=None,
343+
page_size=None,
344+
log_sql_errors=True,
345+
):
346+
return await self.databases[db_name].execute(
347+
sql,
348+
params=params,
349+
truncate=truncate,
350+
custom_time_limit=custom_time_limit,
351+
page_size=page_size,
352+
log_sql_errors=log_sql_errors,
353+
)
354+
339355
async def expand_foreign_keys(self, database, table, column, values):
340356
"Returns dict mapping (column, value) -> label"
341357
labeled_fks = {}
@@ -477,72 +493,6 @@ def table_metadata(self, database, table):
477493
.get(table, {})
478494
)
479495

480-
async def execute_against_connection_in_thread(self, db_name, fn):
481-
def in_thread():
482-
conn = getattr(connections, db_name, None)
483-
if not conn:
484-
conn = self.databases[db_name].connect()
485-
self.prepare_connection(conn)
486-
setattr(connections, db_name, conn)
487-
return fn(conn)
488-
489-
return await asyncio.get_event_loop().run_in_executor(self.executor, in_thread)
490-
491-
async def execute(
492-
self,
493-
db_name,
494-
sql,
495-
params=None,
496-
truncate=False,
497-
custom_time_limit=None,
498-
page_size=None,
499-
log_sql_errors=True,
500-
):
501-
"""Executes sql against db_name in a thread"""
502-
page_size = page_size or self.page_size
503-
504-
def sql_operation_in_thread(conn):
505-
time_limit_ms = self.sql_time_limit_ms
506-
if custom_time_limit and custom_time_limit < time_limit_ms:
507-
time_limit_ms = custom_time_limit
508-
509-
with sqlite_timelimit(conn, time_limit_ms):
510-
try:
511-
cursor = conn.cursor()
512-
cursor.execute(sql, params or {})
513-
max_returned_rows = self.max_returned_rows
514-
if max_returned_rows == page_size:
515-
max_returned_rows += 1
516-
if max_returned_rows and truncate:
517-
rows = cursor.fetchmany(max_returned_rows + 1)
518-
truncated = len(rows) > max_returned_rows
519-
rows = rows[:max_returned_rows]
520-
else:
521-
rows = cursor.fetchall()
522-
truncated = False
523-
except sqlite3.OperationalError as e:
524-
if e.args == ("interrupted",):
525-
raise QueryInterrupted(e, sql, params)
526-
if log_sql_errors:
527-
print(
528-
"ERROR: conn={}, sql = {}, params = {}: {}".format(
529-
conn, repr(sql), params, e
530-
)
531-
)
532-
raise
533-
534-
if truncate:
535-
return Results(rows, truncated, cursor.description)
536-
537-
else:
538-
return Results(rows, False, cursor.description)
539-
540-
with trace("sql", database=db_name, sql=sql.strip(), params=params):
541-
results = await self.execute_against_connection_in_thread(
542-
db_name, sql_operation_in_thread
543-
)
544-
return results
545-
546496
def register_renderers(self):
547497
""" Register output renderers which output data in custom formats. """
548498
# Built-in renderers

0 commit comments

Comments
 (0)