From 6b80e5a367aa44b4a4d0796d7d095f6eb3f70c08 Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Tue, 21 Oct 2025 13:12:00 -0400 Subject: [PATCH 01/14] Create shared schemas collector for DBM integrations --- .../datadog_checks/base/utils/db/schemas.py | 511 ++++++++++++++++++ .../tests/base/utils/db/test_schemas.py | 160 ++++++ 2 files changed, 671 insertions(+) create mode 100644 datadog_checks_base/datadog_checks/base/utils/db/schemas.py create mode 100644 datadog_checks_base/tests/base/utils/db/test_schemas.py diff --git a/datadog_checks_base/datadog_checks/base/utils/db/schemas.py b/datadog_checks_base/datadog_checks/base/utils/db/schemas.py new file mode 100644 index 0000000000000..f62ec345e6129 --- /dev/null +++ b/datadog_checks_base/datadog_checks/base/utils/db/schemas.py @@ -0,0 +1,511 @@ +# (C) Datadog, Inc. 2025-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) + +from __future__ import annotations + +import contextlib +import time +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, TypedDict + +import orjson as json +from psycopg.rows import dict_row + +if TYPE_CHECKING: + from datadog_checks.base import AgentCheck + from datadog_checks.postgres import PostgreSql + +from datadog_checks.postgres.version_utils import VersionUtils + +try: + import datadog_agent +except ImportError: + from datadog_checks.base.stubs import datadog_agent + + +class DatabaseInfo(TypedDict): + description: str + name: str + id: str + encoding: str + owner: str + + +# The schema collector sends lists of DatabaseObjects to the agent +# The format is for backwards compatibility with the current backend +class DatabaseObject(TypedDict): + # Splat of database info + description: str + name: str + id: str + encoding: str + owner: str + + +class SchemaCollector(ABC): + def __init__(self, check: AgentCheck): + self._check = check + self._log = check.log + self._config = check._config.collect_schemas + self._row_chunk_size = 10000 + + self._reset() + + def _reset(self): + self._collection_started_at = None + self._collection_payloads_count = 0 + self._queued_rows = [] + self._total_rows_count = 0 + + def collect_schemas(self) -> bool: + """ + Collects and submits all applicable schema metadata to the agent. + Returns False if the previous collection was still in progress. + """ + if self._collection_started_at is not None: + return False + status = "success" + try: + self._collection_started_at = int(time.time() * 1000) + databases = self._get_databases() + for database in databases: + database_name = database['name'] + if not database_name: + self._check.log("database has no name %v", database) + continue + start = time.time() + with self._get_cursor(database_name) as cursor: + end = time.time() + self._log.info("Time to get cursor (%s): %s", database_name, int((end - start)*1000)) + # data = self._get_all(cursor) + next = self._get_next(cursor) + start = time.time() + while next: + # for i, next in enumerate(data): + self._queued_rows.append(self._map_row(database, next)) + self._total_rows_count += 1 + next = self._get_next(cursor) + is_last_payload = database is databases[-1] and next is None + # is_last_payload = i == len(data) - 1 + self.maybe_flush(is_last_payload) + end = time.time() + self._log.info("Time to process rows (%s): %s", database_name, int((end - start)*1000)) + except Exception as e: + status = "error" + self._log.error("Error collecting schema metadata: %s", e) + raise e + finally: + self._check.histogram( + "dd.postgres.schema.time", + int(time.time() * 1000) - self._collection_started_at, + tags=self._check.tags + ["status:" + status], + hostname=self._check.reported_hostname, + raw=True, + ) + self._check.gauge( + "dd.postgres.schema.tables_count", + self._total_rows_count, + tags=self._check.tags + ["status:" + status], + hostname=self._check.reported_hostname, + raw=True, + ) + self._check.gauge( + "dd.postgres.schema.payloads_count", + self._collection_payloads_count, + tags=self._check.tags + ["status:" + status], + hostname=self._check.reported_hostname, + raw=True, + ) + + self._reset() + return True + + @property + def base_event(self): + return { + "host": self._check.reported_hostname, + "database_instance": self._check.database_identifier, + "agent_version": datadog_agent.get_version(), + "collection_interval": self._config.collection_interval, + "dbms_version": str(self._check.version), + "tags": self._check.tags, + "cloud_metadata": self._check.cloud_metadata, + "collection_started_at": self._collection_started_at, + } + + def maybe_flush(self, is_last_payload): + if len(self._queued_rows) > self._row_chunk_size or is_last_payload: + event = self.base_event.copy() + event['timestamp'] = int(time.time() * 1000) + event["metadata"] = self._queued_rows + self._collection_payloads_count += 1 + if is_last_payload: + event["collection_payloads_count"] = self._collection_payloads_count + self._check.database_monitoring_metadata(json.dumps(event)) + + self._queued_rows = [] + + @abstractmethod + def _get_databases(self) -> list[DatabaseInfo]: + pass + + @abstractmethod + def _get_cursor(self, database): + pass + + @abstractmethod + def _get_next(self, cursor): + pass + + @abstractmethod + def _get_all(self, cursor): + pass + + @abstractmethod + def _map_row(self, database: DatabaseInfo, cursor_row) -> DatabaseObject: + """ + Maps a cursor row to a dict that matches the schema expected by DBM. + """ + return { + **database, + "id": str(database["id"]), #Case id into string as expected by backend + } + + +PG_TABLES_QUERY_V10_PLUS = """ +SELECT c.oid AS table_id, + c.relnamespace AS schema_id, + c.relname AS table_name, + c.relhasindex AS has_indexes, + c.relowner :: regrole AS owner, + ( CASE + WHEN c.relkind = 'p' THEN TRUE + ELSE FALSE + END ) AS has_partitions, + t.relname AS toast_table +FROM pg_class c + left join pg_class t + ON c.reltoastrelid = t.oid +WHERE c.relkind IN ( 'r', 'p', 'f' ) + AND c.relispartition != 't' +""" + +PG_TABLES_QUERY_V9 = """ +SELECT c.oid AS table_id, + c.relnamespace AS schema_id, + c.relname AS table_name, + c.relhasindex AS has_indexes, + c.relowner :: regrole AS owner, + t.relname AS toast_table +FROM pg_class c + left join pg_class t + ON c.reltoastrelid = t.oid +WHERE c.relkind IN ( 'r', 'f' ) +""" + + +SCHEMA_QUERY = """ +SELECT nsp.oid AS schema_id, + nspname AS schema_name, + nspowner :: regrole AS schema_owner +FROM pg_namespace nsp + LEFT JOIN pg_roles r on nsp.nspowner = r.oid +WHERE nspname NOT IN ( 'information_schema', 'pg_catalog' ) + AND nspname NOT LIKE 'pg_toast%' + AND nspname NOT LIKE 'pg_temp_%' +""" + +COLUMNS_QUERY = """ +SELECT attname AS name, + Format_type(atttypid, atttypmod) AS data_type, + NOT attnotnull AS nullable, + pg_get_expr(adbin, adrelid) AS default, + attrelid AS table_id +FROM pg_attribute + LEFT JOIN pg_attrdef ad + ON adrelid = attrelid + AND adnum = attnum +WHERE attnum > 0 + AND NOT attisdropped +""" + + +PG_INDEXES_QUERY = """ +SELECT + c.relname AS name, + ix.indrelid AS table_id, + pg_get_indexdef(c.oid) AS definition, + ix.indisunique AS is_unique, + ix.indisexclusion AS is_exclusion, + ix.indimmediate AS is_immediate, + ix.indisclustered AS is_clustered, + ix.indisvalid AS is_valid, + ix.indcheckxmin AS is_checkxmin, + ix.indisready AS is_ready, + ix.indislive AS is_live, + ix.indisreplident AS is_replident, + ix.indpred IS NOT NULL AS is_partial +FROM + pg_index ix +JOIN + pg_class c +ON + c.oid = ix.indexrelid +""" + + +PG_CONSTRAINTS_QUERY = """ +SELECT conname AS name, + pg_get_constraintdef(oid) AS definition, + conrelid AS table_id +FROM pg_constraint +WHERE contype = 'f' +""" + + +PARTITION_KEY_QUERY = """ +SELECT relname, + pg_get_partkeydef(oid) AS partition_key, + oid AS table_id +FROM pg_class +""" + +NUM_PARTITIONS_QUERY = """ +SELECT count(inhrelid :: regclass) AS num_partitions, inhparent as table_id +FROM pg_inherits +GROUP BY inhparent; +""" + +PARTITION_ACTIVITY_QUERY = """ +SELECT pi.inhparent :: regclass AS parent_table_name, + SUM(COALESCE(psu.seq_scan, 0) + COALESCE(psu.idx_scan, 0)) AS total_activity, + pi.inhparent as table_id +FROM pg_catalog.pg_stat_user_tables psu + join pg_class pc + ON psu.relname = pc.relname + join pg_inherits pi + ON pi.inhrelid = pc.oid +GROUP BY pi.inhparent +""" + + +class TableObject(TypedDict): + id: str + name: str + columns: list + indexes: list + foreign_keys: list + + +class SchemaObject(TypedDict): + id: str + name: str + owner: str + tables: list[TableObject] + + +class PostgresDatabaseObject(DatabaseObject): + schemas: list[SchemaObject] + + +DATABASE_INFORMATION_QUERY = """ +SELECT db.oid AS id, + datname AS NAME, + pg_encoding_to_char(encoding) AS encoding, + rolname AS owner, + description +FROM pg_catalog.pg_database db + LEFT JOIN pg_catalog.pg_description dc + ON dc.objoid = db.oid + JOIN pg_roles a + ON datdba = a.oid + WHERE datname NOT LIKE 'template%' +""" + + +class PostgresSchemaCollector(SchemaCollector): + def __init__(self, check: PostgreSql): + super().__init__(check) + self._check = check + self._config = check._config.collect_schemas + + @property + def base_event(self): + return { + **super().base_event, + "dbms": "postgres", + "kind": "pg_databases", + } + + def _get_databases(self): + with self._check._get_main_db() as conn: + with conn.cursor(row_factory=dict_row) as cursor: + query = DATABASE_INFORMATION_QUERY + for exclude_regex in self._config.exclude_databases: + query += " AND datname !~ '{}'".format(exclude_regex) + if self._config.include_databases: + query += f" AND ({' OR '.join(f"datname ~ '{include_regex}'" for include_regex in self._config.include_databases)})" + + # Autodiscovery trumps exclude and include + autodiscovery_databases = self._check.autodiscovery.get_items() + if autodiscovery_databases: + query += " AND datname IN ({})".format(", ".join(f"'{db}'" for db in autodiscovery_databases)) + + cursor.execute(query) + return cursor.fetchall() + + @contextlib.contextmanager + def _get_cursor(self, database_name): + with self._check.db_pool.get_connection(database_name) as conn: + with conn.cursor(row_factory=dict_row) as cursor: + schemas_query = self._get_schemas_query() + tables_query = self._get_tables_query() + columns_query = COLUMNS_QUERY + indexes_query = PG_INDEXES_QUERY + constraints_query = PG_CONSTRAINTS_QUERY + partitions_ctes = ( + f""" + , + partition_keys AS ( + {PARTITION_KEY_QUERY} + ), + num_partitions AS ( + {NUM_PARTITIONS_QUERY} + ) + """ + if VersionUtils.transform_version(str(self._check.version))["version.major"] > "9" + else "" + ) + partition_joins = ( + """ + LEFT JOIN partition_keys ON tables.table_id = partition_keys.table_id + LEFT JOIN num_partitions ON tables.table_id = num_partitions.table_id + """ + if VersionUtils.transform_version(str(self._check.version))["version.major"] > "9" + else "" + ) + parition_selects = ( + """ + , + partition_keys.partition_key, + num_partitions.num_partitions + """ + if VersionUtils.transform_version(str(self._check.version))["version.major"] > "9" + else "" + ) + limit = int(self._config.max_tables or 1_000_000) + + query = f""" + WITH + schemas AS( + {schemas_query} + ), + tables AS ( + {tables_query} + ), + schema_tables AS ( + SELECT schemas.schema_id, schemas.schema_name, + tables.table_id, tables.table_name + FROM schemas + LEFT JOIN tables ON schemas.schema_id = tables.schema_id + ORDER BY schemas.schema_name, tables.table_name + LIMIT {limit} + ), + columns AS ( + {columns_query} + ), + indexes AS ( + {indexes_query} + ), + constraints AS ( + {constraints_query} + ) + {partitions_ctes} + + SELECT * FROM ( + SELECT schema_tables.schema_id, schema_tables.schema_name, + schema_tables.table_id, schema_tables.table_name, + array_agg(row_to_json(columns.*)) FILTER (WHERE columns.name IS NOT NULL) as columns, + array_agg(row_to_json(indexes.*)) FILTER (WHERE indexes.name IS NOT NULL) as indexes, + array_agg(row_to_json(constraints.*)) FILTER (WHERE constraints.name IS NOT NULL) + as foreign_keys + {parition_selects} + FROM schema_tables + LEFT JOIN columns ON schema_tables.table_id = columns.table_id + LEFT JOIN indexes ON schema_tables.table_id = indexes.table_id + LEFT JOIN constraints ON schema_tables.table_id = constraints.table_id + {partition_joins} + GROUP BY schema_tables.schema_id, schema_tables.schema_name, schema_tables.table_id, schema_tables.table_name + ) t + ; + """ + print(query) + cursor.execute(query) + yield cursor + + def _get_schemas_query(self): + query = SCHEMA_QUERY + for exclude_regex in self._config.exclude_schemas: + query += " AND nspname !~ '{}'".format(exclude_regex) + if self._config.include_schemas: + query += f" AND ({' OR '.join(f"nspname ~ '{include_regex}'" for include_regex in self._config.include_schemas)})" + if self._check._config.ignore_schemas_owned_by: + query += " AND nspowner :: regrole :: text not IN ({})".format( + ", ".join(f"'{owner}'" for owner in self._check._config.ignore_schemas_owned_by) + ) + return query + + def _get_tables_query(self): + if VersionUtils.transform_version(str(self._check.version))["version.major"] == "9": + query = PG_TABLES_QUERY_V9 + else: + query = PG_TABLES_QUERY_V10_PLUS + for exclude_regex in self._config.exclude_tables: + query += " AND c.relname !~ '{}'".format(exclude_regex) + if self._config.include_tables: + query += f" AND ({' OR '.join(f"c.relname ~ '{include_regex}'" for include_regex in self._config.include_tables)})" + return query + + + def _get_next(self, cursor): + return cursor.fetchone() + + def _get_all(self, cursor): + return cursor.fetchall() + + def _map_row(self, database: DatabaseInfo, cursor_row) -> DatabaseObject: + object = super()._map_row(database, cursor_row) + # Map the cursor row to the expected schema, and strip out None values + object["schemas"] = [ + { + k: v + for k, v in { + "id": str(cursor_row.get("schema_id")), + "name": cursor_row.get("schema_name"), + "owner": cursor_row.get("schema_owner"), + "tables": [ + { + k: v + for k, v in { + "id": str(cursor_row.get("table_id")), + "name": cursor_row.get("table_name"), + "owner": cursor_row.get("owner"), + # The query can create duplicates of the joined tables + "columns": list({v and v['name']: v for v in cursor_row.get("columns") or []}.values()), + "indexes": list({v and v['name']: v for v in cursor_row.get("indexes") or []}.values()), + "foreign_keys": list( + {v and v['name']: v for v in cursor_row.get("foreign_keys") or []}.values() + ), + "toast_table": cursor_row.get("toast_table"), + "num_partitions": cursor_row.get("num_partitions"), + "partition_key": cursor_row.get("partition_key"), + }.items() + if v is not None + } + ], + }.items() + if v is not None + } + ] + return object diff --git a/datadog_checks_base/tests/base/utils/db/test_schemas.py b/datadog_checks_base/tests/base/utils/db/test_schemas.py new file mode 100644 index 0000000000000..518e62d84222a --- /dev/null +++ b/datadog_checks_base/tests/base/utils/db/test_schemas.py @@ -0,0 +1,160 @@ +# (C) Datadog, Inc. 2023-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +import pytest + +from datadog_checks.postgres.schemas import PostgresSchemaCollector + +from .common import POSTGRES_VERSION + +pytestmark = [pytest.mark.integration, pytest.mark.usefixtures('dd_environment')] + + +@pytest.fixture +def dbm_instance(pg_instance): + pg_instance['dbm'] = True + pg_instance['min_collection_interval'] = 0.1 + pg_instance['query_samples'] = {'enabled': False} + pg_instance['query_activity'] = {'enabled': False} + pg_instance['query_metrics'] = {'enabled': False} + pg_instance['collect_resources'] = {'enabled': False, 'run_sync': True} + pg_instance['collect_settings'] = {'enabled': False, 'run_sync': True} + pg_instance['collect_schemas'] = {'enabled': True, 'run_sync': True} + return pg_instance + + +def test_get_databases(dbm_instance, integration_check): + check = integration_check(dbm_instance) + collector = PostgresSchemaCollector(check) + + databases = collector._get_databases() + datbase_names = [database['name'] for database in databases] + assert 'postgres' in datbase_names + assert 'dogs' in datbase_names + assert 'dogs_3' in datbase_names + assert 'nope' not in datbase_names + + +def test_databases_filters(dbm_instance, integration_check): + dbm_instance['collect_schemas']['exclude_databases'] = ['^dogs$', 'dogs_[345]'] + check = integration_check(dbm_instance) + collector = PostgresSchemaCollector(check) + + databases = collector._get_databases() + datbase_names = [database['name'] for database in databases] + assert 'postgres' in datbase_names + assert 'dogs' not in datbase_names + assert 'dogs_3' not in datbase_names + assert 'dogs_9' in datbase_names + assert 'nope' not in datbase_names + + +def test_get_cursor(dbm_instance, integration_check): + check = integration_check(dbm_instance) + check.version = POSTGRES_VERSION + collector = PostgresSchemaCollector(check) + + with collector._get_cursor('datadog_test') as cursor: + assert cursor is not None + schemas = [] + for row in cursor: + schemas.append(row['schema_name']) + + assert set(schemas) == {'datadog', 'hstore', 'public', 'public2', 'rdsadmin_test'} + + +def test_schemas_filters(dbm_instance, integration_check): + dbm_instance['collect_schemas']['exclude_schemas'] = ['public', 'rdsadmin_test'] + check = integration_check(dbm_instance) + check.version = POSTGRES_VERSION + collector = PostgresSchemaCollector(check) + + with collector._get_cursor('datadog_test') as cursor: + assert cursor is not None + schemas = [] + for row in cursor: + schemas.append(row['schema_name']) + + assert set(schemas) == {'datadog', 'hstore'} + + +def test_tables(dbm_instance, integration_check): + check = integration_check(dbm_instance) + check.version = POSTGRES_VERSION + collector = PostgresSchemaCollector(check) + + with collector._get_cursor('datadog_test') as cursor: + assert cursor is not None + tables = [] + for row in cursor: + if row['table_name']: + tables.append(row['table_name']) + + assert set(tables) == { + 'persons', + 'personsdup1', + 'personsdup2', + 'personsdup3', + 'personsdup4', + 'personsdup5', + 'personsdup6', + 'personsdup7', + 'personsdup8', + 'personsdup9', + 'personsdup10', + 'personsdup11', + 'personsdup12', + 'personsdup13', + 'persons_indexed', + 'pgtable', + 'pg_newtable', + 'cities', + 'rds_admin_misc', + 'sample_foreign_d73a8c', + } + + +def test_columns(dbm_instance, integration_check): + check = integration_check(dbm_instance) + check.version = POSTGRES_VERSION + collector = PostgresSchemaCollector(check) + + with collector._get_cursor('datadog_test') as cursor: + assert cursor is not None + # Assert that at least one row has columns + assert any(row['columns'] for row in cursor) + for row in cursor: + if row['columns']: + for column in row['columns']: + assert column['name'] is not None + assert column['data_type'] is not None + if row['table_name'] == 'cities': + assert row['columns'] + assert row['columns'][0]['name'] + + +def test_indexes(dbm_instance, integration_check): + check = integration_check(dbm_instance) + check.version = POSTGRES_VERSION + collector = PostgresSchemaCollector(check) + + with collector._get_cursor('datadog_test') as cursor: + assert cursor is not None + # Assert that at least one row has indexes + assert any(row['indexes'] for row in cursor) + for row in cursor: + if row['indexes']: + for index in row['indexes']: + assert index['name'] is not None + assert index['definition'] is not None + if row['table_name'] == 'cities': + assert row['indexes'] + assert row['indexes'][0]['name'] + + +def test_collect_schemas(dbm_instance, integration_check): + check = integration_check(dbm_instance) + check.version = POSTGRES_VERSION + collector = PostgresSchemaCollector(check) + + collector.collect_schemas() From 96e526028f09283f963afda16e6ed49c4e052731 Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Tue, 21 Oct 2025 14:36:39 -0400 Subject: [PATCH 02/14] WIP --- .../datadog_checks/base/checks/db.py | 24 + .../datadog_checks/base/utils/db/schemas.py | 461 +++--------------- .../datadog_checks/base/utils/db/utils.py | 7 + .../tests/base/utils/db/test_schemas.py | 223 +++------ .../tests/base/utils/test_persistent_cache.py | 1 + 5 files changed, 171 insertions(+), 545 deletions(-) diff --git a/datadog_checks_base/datadog_checks/base/checks/db.py b/datadog_checks_base/datadog_checks/base/checks/db.py index 2a5fe0fc57551..b9fee24fbb856 100644 --- a/datadog_checks_base/datadog_checks/base/checks/db.py +++ b/datadog_checks_base/datadog_checks/base/checks/db.py @@ -20,3 +20,27 @@ def database_monitoring_metadata(self, raw_event: str): def database_monitoring_health(self, raw_event: str): self.event_platform_event(raw_event, "dbm-health") + + @property + def reported_hostname(self) -> str | None: + raise NotImplementedError("reported_hostname is not implemented for this check") + + @property + def database_identifier(self) -> str: + raise NotImplementedError("database_identifier is not implemented for this check") + + @property + def dbms_version(self) -> str: + raise NotImplementedError("dbms_version is not implemented for this check") + + @property + def agent_version(self) -> str: + raise NotImplementedError("agent_version is not implemented for this check") + + @property + def tags(self) -> list[str]: + raise NotImplementedError("tags is not implemented for this check") + + @property + def cloud_metadata(self) -> dict: + raise NotImplementedError("cloud_metadata is not implemented for this check") diff --git a/datadog_checks_base/datadog_checks/base/utils/db/schemas.py b/datadog_checks_base/datadog_checks/base/utils/db/schemas.py index f62ec345e6129..1eb8bf0d921d0 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/schemas.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/schemas.py @@ -4,19 +4,15 @@ from __future__ import annotations -import contextlib -import time from abc import ABC, abstractmethod from typing import TYPE_CHECKING, TypedDict import orjson as json -from psycopg.rows import dict_row -if TYPE_CHECKING: - from datadog_checks.base import AgentCheck - from datadog_checks.postgres import PostgreSql +from .utils import now_ms -from datadog_checks.postgres.version_utils import VersionUtils +if TYPE_CHECKING: + from datadog_checks.base.checks.db import DatabaseCheck try: import datadog_agent @@ -25,31 +21,44 @@ class DatabaseInfo(TypedDict): - description: str name: str - id: str - encoding: str - owner: str # The schema collector sends lists of DatabaseObjects to the agent -# The format is for backwards compatibility with the current backend +# DBMS subclasses may add additional fields to the dictionary class DatabaseObject(TypedDict): - # Splat of database info - description: str name: str - id: str - encoding: str - owner: str + + +# Common configuration for schema collector +# Individual DBMS implementations should map their specific +# configuration to this type +class SchemaCollectorConfig: + def __init__(self): + self.collection_interval = 3600 + self.enabled = False + self.payload_chunk_size = 10_000 class SchemaCollector(ABC): - def __init__(self, check: AgentCheck): + """ + Abstract base class for DBM schema collectors. + + Attributes: + _collection_started_at (int): Timestamp in whole milliseconds + when the current collection started. + """ + + _collection_started_at: int | None = None + + def __init__(self, check: DatabaseCheck, config: SchemaCollectorConfig): self._check = check self._log = check.log - self._config = check._config.collect_schemas - self._row_chunk_size = 10000 - + self._config = config + self._dbms = check.__class__.__name__.lower() + if self._dbms == 'postgresql': + # Backwards compatibility for metrics namespacing + self._dbms = 'postgres' self._reset() def _reset(self): @@ -61,57 +70,54 @@ def _reset(self): def collect_schemas(self) -> bool: """ Collects and submits all applicable schema metadata to the agent. - Returns False if the previous collection was still in progress. + This class relies on the owning check to handle scheduling this method. + + This method will enforce non-overlapping invocations and + returns False if the previous collection was still in progress when invoked again. """ if self._collection_started_at is not None: return False status = "success" try: - self._collection_started_at = int(time.time() * 1000) + self._collection_started_at = now_ms() databases = self._get_databases() for database in databases: database_name = database['name'] if not database_name: - self._check.log("database has no name %v", database) + self._log.warning("database has no name %v", database) continue - start = time.time() with self._get_cursor(database_name) as cursor: - end = time.time() - self._log.info("Time to get cursor (%s): %s", database_name, int((end - start)*1000)) - # data = self._get_all(cursor) + # Get the next row from the cursor next = self._get_next(cursor) - start = time.time() while next: - # for i, next in enumerate(data): self._queued_rows.append(self._map_row(database, next)) self._total_rows_count += 1 + # Because we're iterating over a cursor we need to try to get + # the next row to see if we've reached the last row next = self._get_next(cursor) is_last_payload = database is databases[-1] and next is None - # is_last_payload = i == len(data) - 1 self.maybe_flush(is_last_payload) - end = time.time() - self._log.info("Time to process rows (%s): %s", database_name, int((end - start)*1000)) except Exception as e: status = "error" - self._log.error("Error collecting schema metadata: %s", e) + self._log.error("Error collecting schema: %s", e) raise e finally: self._check.histogram( - "dd.postgres.schema.time", - int(time.time() * 1000) - self._collection_started_at, + f"dd.{self._dbms}.schema.time", + now_ms() - self._collection_started_at, tags=self._check.tags + ["status:" + status], hostname=self._check.reported_hostname, raw=True, ) self._check.gauge( - "dd.postgres.schema.tables_count", + f"dd.{self._dbms}.schema.tables_count", self._total_rows_count, tags=self._check.tags + ["status:" + status], hostname=self._check.reported_hostname, raw=True, ) self._check.gauge( - "dd.postgres.schema.payloads_count", + f"dd.{self._dbms}.schema.payloads_count", self._collection_payloads_count, tags=self._check.tags + ["status:" + status], hostname=self._check.reported_hostname, @@ -128,19 +134,22 @@ def base_event(self): "database_instance": self._check.database_identifier, "agent_version": datadog_agent.get_version(), "collection_interval": self._config.collection_interval, - "dbms_version": str(self._check.version), + "dbms_version": str(self._check.dbms_version), "tags": self._check.tags, "cloud_metadata": self._check.cloud_metadata, "collection_started_at": self._collection_started_at, } def maybe_flush(self, is_last_payload): - if len(self._queued_rows) > self._row_chunk_size or is_last_payload: + if is_last_payload or len(self._queued_rows) >= self._config.payload_chunk_size: event = self.base_event.copy() - event['timestamp'] = int(time.time() * 1000) + event["timestamp"] = now_ms() + # DBM backend expects metadata to be an array of database objects event["metadata"] = self._queued_rows self._collection_payloads_count += 1 if is_last_payload: + # For the last payload, we need to include the total number of payloads collected + # This is used for snapshotting to ensure that all payloads have been received event["collection_payloads_count"] = self._collection_payloads_count self._check.database_monitoring_metadata(json.dumps(event)) @@ -148,364 +157,32 @@ def maybe_flush(self, is_last_payload): @abstractmethod def _get_databases(self) -> list[DatabaseInfo]: - pass + """ + Returns a list of database dictionaries. + Subclasses should override this method to return the list of databases to collect schema metadata for. + """ + raise NotImplementedError("Subclasses must implement _get_databases") @abstractmethod def _get_cursor(self, database): - pass + """ + Returns a cursor for the given database. + Subclasses should override this method to return the cursor for the given database. + """ + raise NotImplementedError("Subclasses must implement _get_cursor") @abstractmethod def _get_next(self, cursor): - pass - - @abstractmethod - def _get_all(self, cursor): - pass + """ + Returns the next row from the cursor. + Subclasses should override this method to return the next row from the cursor. + """ + raise NotImplementedError("Subclasses must implement _get_next") - @abstractmethod - def _map_row(self, database: DatabaseInfo, cursor_row) -> DatabaseObject: + def _map_row(self, database: DatabaseInfo, _cursor_row) -> DatabaseObject: """ Maps a cursor row to a dict that matches the schema expected by DBM. + The base implementation of this method returns just the database dictionary. + Subclasses should override this method to add schema and table data based on the cursor row. """ - return { - **database, - "id": str(database["id"]), #Case id into string as expected by backend - } - - -PG_TABLES_QUERY_V10_PLUS = """ -SELECT c.oid AS table_id, - c.relnamespace AS schema_id, - c.relname AS table_name, - c.relhasindex AS has_indexes, - c.relowner :: regrole AS owner, - ( CASE - WHEN c.relkind = 'p' THEN TRUE - ELSE FALSE - END ) AS has_partitions, - t.relname AS toast_table -FROM pg_class c - left join pg_class t - ON c.reltoastrelid = t.oid -WHERE c.relkind IN ( 'r', 'p', 'f' ) - AND c.relispartition != 't' -""" - -PG_TABLES_QUERY_V9 = """ -SELECT c.oid AS table_id, - c.relnamespace AS schema_id, - c.relname AS table_name, - c.relhasindex AS has_indexes, - c.relowner :: regrole AS owner, - t.relname AS toast_table -FROM pg_class c - left join pg_class t - ON c.reltoastrelid = t.oid -WHERE c.relkind IN ( 'r', 'f' ) -""" - - -SCHEMA_QUERY = """ -SELECT nsp.oid AS schema_id, - nspname AS schema_name, - nspowner :: regrole AS schema_owner -FROM pg_namespace nsp - LEFT JOIN pg_roles r on nsp.nspowner = r.oid -WHERE nspname NOT IN ( 'information_schema', 'pg_catalog' ) - AND nspname NOT LIKE 'pg_toast%' - AND nspname NOT LIKE 'pg_temp_%' -""" - -COLUMNS_QUERY = """ -SELECT attname AS name, - Format_type(atttypid, atttypmod) AS data_type, - NOT attnotnull AS nullable, - pg_get_expr(adbin, adrelid) AS default, - attrelid AS table_id -FROM pg_attribute - LEFT JOIN pg_attrdef ad - ON adrelid = attrelid - AND adnum = attnum -WHERE attnum > 0 - AND NOT attisdropped -""" - - -PG_INDEXES_QUERY = """ -SELECT - c.relname AS name, - ix.indrelid AS table_id, - pg_get_indexdef(c.oid) AS definition, - ix.indisunique AS is_unique, - ix.indisexclusion AS is_exclusion, - ix.indimmediate AS is_immediate, - ix.indisclustered AS is_clustered, - ix.indisvalid AS is_valid, - ix.indcheckxmin AS is_checkxmin, - ix.indisready AS is_ready, - ix.indislive AS is_live, - ix.indisreplident AS is_replident, - ix.indpred IS NOT NULL AS is_partial -FROM - pg_index ix -JOIN - pg_class c -ON - c.oid = ix.indexrelid -""" - - -PG_CONSTRAINTS_QUERY = """ -SELECT conname AS name, - pg_get_constraintdef(oid) AS definition, - conrelid AS table_id -FROM pg_constraint -WHERE contype = 'f' -""" - - -PARTITION_KEY_QUERY = """ -SELECT relname, - pg_get_partkeydef(oid) AS partition_key, - oid AS table_id -FROM pg_class -""" - -NUM_PARTITIONS_QUERY = """ -SELECT count(inhrelid :: regclass) AS num_partitions, inhparent as table_id -FROM pg_inherits -GROUP BY inhparent; -""" - -PARTITION_ACTIVITY_QUERY = """ -SELECT pi.inhparent :: regclass AS parent_table_name, - SUM(COALESCE(psu.seq_scan, 0) + COALESCE(psu.idx_scan, 0)) AS total_activity, - pi.inhparent as table_id -FROM pg_catalog.pg_stat_user_tables psu - join pg_class pc - ON psu.relname = pc.relname - join pg_inherits pi - ON pi.inhrelid = pc.oid -GROUP BY pi.inhparent -""" - - -class TableObject(TypedDict): - id: str - name: str - columns: list - indexes: list - foreign_keys: list - - -class SchemaObject(TypedDict): - id: str - name: str - owner: str - tables: list[TableObject] - - -class PostgresDatabaseObject(DatabaseObject): - schemas: list[SchemaObject] - - -DATABASE_INFORMATION_QUERY = """ -SELECT db.oid AS id, - datname AS NAME, - pg_encoding_to_char(encoding) AS encoding, - rolname AS owner, - description -FROM pg_catalog.pg_database db - LEFT JOIN pg_catalog.pg_description dc - ON dc.objoid = db.oid - JOIN pg_roles a - ON datdba = a.oid - WHERE datname NOT LIKE 'template%' -""" - - -class PostgresSchemaCollector(SchemaCollector): - def __init__(self, check: PostgreSql): - super().__init__(check) - self._check = check - self._config = check._config.collect_schemas - - @property - def base_event(self): - return { - **super().base_event, - "dbms": "postgres", - "kind": "pg_databases", - } - - def _get_databases(self): - with self._check._get_main_db() as conn: - with conn.cursor(row_factory=dict_row) as cursor: - query = DATABASE_INFORMATION_QUERY - for exclude_regex in self._config.exclude_databases: - query += " AND datname !~ '{}'".format(exclude_regex) - if self._config.include_databases: - query += f" AND ({' OR '.join(f"datname ~ '{include_regex}'" for include_regex in self._config.include_databases)})" - - # Autodiscovery trumps exclude and include - autodiscovery_databases = self._check.autodiscovery.get_items() - if autodiscovery_databases: - query += " AND datname IN ({})".format(", ".join(f"'{db}'" for db in autodiscovery_databases)) - - cursor.execute(query) - return cursor.fetchall() - - @contextlib.contextmanager - def _get_cursor(self, database_name): - with self._check.db_pool.get_connection(database_name) as conn: - with conn.cursor(row_factory=dict_row) as cursor: - schemas_query = self._get_schemas_query() - tables_query = self._get_tables_query() - columns_query = COLUMNS_QUERY - indexes_query = PG_INDEXES_QUERY - constraints_query = PG_CONSTRAINTS_QUERY - partitions_ctes = ( - f""" - , - partition_keys AS ( - {PARTITION_KEY_QUERY} - ), - num_partitions AS ( - {NUM_PARTITIONS_QUERY} - ) - """ - if VersionUtils.transform_version(str(self._check.version))["version.major"] > "9" - else "" - ) - partition_joins = ( - """ - LEFT JOIN partition_keys ON tables.table_id = partition_keys.table_id - LEFT JOIN num_partitions ON tables.table_id = num_partitions.table_id - """ - if VersionUtils.transform_version(str(self._check.version))["version.major"] > "9" - else "" - ) - parition_selects = ( - """ - , - partition_keys.partition_key, - num_partitions.num_partitions - """ - if VersionUtils.transform_version(str(self._check.version))["version.major"] > "9" - else "" - ) - limit = int(self._config.max_tables or 1_000_000) - - query = f""" - WITH - schemas AS( - {schemas_query} - ), - tables AS ( - {tables_query} - ), - schema_tables AS ( - SELECT schemas.schema_id, schemas.schema_name, - tables.table_id, tables.table_name - FROM schemas - LEFT JOIN tables ON schemas.schema_id = tables.schema_id - ORDER BY schemas.schema_name, tables.table_name - LIMIT {limit} - ), - columns AS ( - {columns_query} - ), - indexes AS ( - {indexes_query} - ), - constraints AS ( - {constraints_query} - ) - {partitions_ctes} - - SELECT * FROM ( - SELECT schema_tables.schema_id, schema_tables.schema_name, - schema_tables.table_id, schema_tables.table_name, - array_agg(row_to_json(columns.*)) FILTER (WHERE columns.name IS NOT NULL) as columns, - array_agg(row_to_json(indexes.*)) FILTER (WHERE indexes.name IS NOT NULL) as indexes, - array_agg(row_to_json(constraints.*)) FILTER (WHERE constraints.name IS NOT NULL) - as foreign_keys - {parition_selects} - FROM schema_tables - LEFT JOIN columns ON schema_tables.table_id = columns.table_id - LEFT JOIN indexes ON schema_tables.table_id = indexes.table_id - LEFT JOIN constraints ON schema_tables.table_id = constraints.table_id - {partition_joins} - GROUP BY schema_tables.schema_id, schema_tables.schema_name, schema_tables.table_id, schema_tables.table_name - ) t - ; - """ - print(query) - cursor.execute(query) - yield cursor - - def _get_schemas_query(self): - query = SCHEMA_QUERY - for exclude_regex in self._config.exclude_schemas: - query += " AND nspname !~ '{}'".format(exclude_regex) - if self._config.include_schemas: - query += f" AND ({' OR '.join(f"nspname ~ '{include_regex}'" for include_regex in self._config.include_schemas)})" - if self._check._config.ignore_schemas_owned_by: - query += " AND nspowner :: regrole :: text not IN ({})".format( - ", ".join(f"'{owner}'" for owner in self._check._config.ignore_schemas_owned_by) - ) - return query - - def _get_tables_query(self): - if VersionUtils.transform_version(str(self._check.version))["version.major"] == "9": - query = PG_TABLES_QUERY_V9 - else: - query = PG_TABLES_QUERY_V10_PLUS - for exclude_regex in self._config.exclude_tables: - query += " AND c.relname !~ '{}'".format(exclude_regex) - if self._config.include_tables: - query += f" AND ({' OR '.join(f"c.relname ~ '{include_regex}'" for include_regex in self._config.include_tables)})" - return query - - - def _get_next(self, cursor): - return cursor.fetchone() - - def _get_all(self, cursor): - return cursor.fetchall() - - def _map_row(self, database: DatabaseInfo, cursor_row) -> DatabaseObject: - object = super()._map_row(database, cursor_row) - # Map the cursor row to the expected schema, and strip out None values - object["schemas"] = [ - { - k: v - for k, v in { - "id": str(cursor_row.get("schema_id")), - "name": cursor_row.get("schema_name"), - "owner": cursor_row.get("schema_owner"), - "tables": [ - { - k: v - for k, v in { - "id": str(cursor_row.get("table_id")), - "name": cursor_row.get("table_name"), - "owner": cursor_row.get("owner"), - # The query can create duplicates of the joined tables - "columns": list({v and v['name']: v for v in cursor_row.get("columns") or []}.values()), - "indexes": list({v and v['name']: v for v in cursor_row.get("indexes") or []}.values()), - "foreign_keys": list( - {v and v['name']: v for v in cursor_row.get("foreign_keys") or []}.values() - ), - "toast_table": cursor_row.get("toast_table"), - "num_partitions": cursor_row.get("num_partitions"), - "partition_key": cursor_row.get("partition_key"), - }.items() - if v is not None - } - ], - }.items() - if v is not None - } - ] - return object + return {**database} diff --git a/datadog_checks_base/datadog_checks/base/utils/db/utils.py b/datadog_checks_base/datadog_checks/base/utils/db/utils.py index 0c46a26cff82e..3114dbb1a3632 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/utils.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/utils.py @@ -590,3 +590,10 @@ def get_tags(self) -> List[str]: # Generate and cache regular tags self._cached_tag_list = self._generate_tag_strings(self._tags) return list(self._cached_tag_list) + + +def now_ms() -> int: + """ + Get the current time in whole milliseconds. + """ + return int(time.time() * 1000) diff --git a/datadog_checks_base/tests/base/utils/db/test_schemas.py b/datadog_checks_base/tests/base/utils/db/test_schemas.py index 518e62d84222a..4045f99c06b61 100644 --- a/datadog_checks_base/tests/base/utils/db/test_schemas.py +++ b/datadog_checks_base/tests/base/utils/db/test_schemas.py @@ -1,160 +1,77 @@ # (C) Datadog, Inc. 2023-present # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) +from contextlib import contextmanager + import pytest -from datadog_checks.postgres.schemas import PostgresSchemaCollector - -from .common import POSTGRES_VERSION - -pytestmark = [pytest.mark.integration, pytest.mark.usefixtures('dd_environment')] - - -@pytest.fixture -def dbm_instance(pg_instance): - pg_instance['dbm'] = True - pg_instance['min_collection_interval'] = 0.1 - pg_instance['query_samples'] = {'enabled': False} - pg_instance['query_activity'] = {'enabled': False} - pg_instance['query_metrics'] = {'enabled': False} - pg_instance['collect_resources'] = {'enabled': False, 'run_sync': True} - pg_instance['collect_settings'] = {'enabled': False, 'run_sync': True} - pg_instance['collect_schemas'] = {'enabled': True, 'run_sync': True} - return pg_instance - - -def test_get_databases(dbm_instance, integration_check): - check = integration_check(dbm_instance) - collector = PostgresSchemaCollector(check) - - databases = collector._get_databases() - datbase_names = [database['name'] for database in databases] - assert 'postgres' in datbase_names - assert 'dogs' in datbase_names - assert 'dogs_3' in datbase_names - assert 'nope' not in datbase_names - - -def test_databases_filters(dbm_instance, integration_check): - dbm_instance['collect_schemas']['exclude_databases'] = ['^dogs$', 'dogs_[345]'] - check = integration_check(dbm_instance) - collector = PostgresSchemaCollector(check) - - databases = collector._get_databases() - datbase_names = [database['name'] for database in databases] - assert 'postgres' in datbase_names - assert 'dogs' not in datbase_names - assert 'dogs_3' not in datbase_names - assert 'dogs_9' in datbase_names - assert 'nope' not in datbase_names - - -def test_get_cursor(dbm_instance, integration_check): - check = integration_check(dbm_instance) - check.version = POSTGRES_VERSION - collector = PostgresSchemaCollector(check) - - with collector._get_cursor('datadog_test') as cursor: - assert cursor is not None - schemas = [] - for row in cursor: - schemas.append(row['schema_name']) - - assert set(schemas) == {'datadog', 'hstore', 'public', 'public2', 'rdsadmin_test'} - - -def test_schemas_filters(dbm_instance, integration_check): - dbm_instance['collect_schemas']['exclude_schemas'] = ['public', 'rdsadmin_test'] - check = integration_check(dbm_instance) - check.version = POSTGRES_VERSION - collector = PostgresSchemaCollector(check) - - with collector._get_cursor('datadog_test') as cursor: - assert cursor is not None - schemas = [] - for row in cursor: - schemas.append(row['schema_name']) - - assert set(schemas) == {'datadog', 'hstore'} - - -def test_tables(dbm_instance, integration_check): - check = integration_check(dbm_instance) - check.version = POSTGRES_VERSION - collector = PostgresSchemaCollector(check) - - with collector._get_cursor('datadog_test') as cursor: - assert cursor is not None - tables = [] - for row in cursor: - if row['table_name']: - tables.append(row['table_name']) - - assert set(tables) == { - 'persons', - 'personsdup1', - 'personsdup2', - 'personsdup3', - 'personsdup4', - 'personsdup5', - 'personsdup6', - 'personsdup7', - 'personsdup8', - 'personsdup9', - 'personsdup10', - 'personsdup11', - 'personsdup12', - 'personsdup13', - 'persons_indexed', - 'pgtable', - 'pg_newtable', - 'cities', - 'rds_admin_misc', - 'sample_foreign_d73a8c', - } - - -def test_columns(dbm_instance, integration_check): - check = integration_check(dbm_instance) - check.version = POSTGRES_VERSION - collector = PostgresSchemaCollector(check) - - with collector._get_cursor('datadog_test') as cursor: - assert cursor is not None - # Assert that at least one row has columns - assert any(row['columns'] for row in cursor) - for row in cursor: - if row['columns']: - for column in row['columns']: - assert column['name'] is not None - assert column['data_type'] is not None - if row['table_name'] == 'cities': - assert row['columns'] - assert row['columns'][0]['name'] - - -def test_indexes(dbm_instance, integration_check): - check = integration_check(dbm_instance) - check.version = POSTGRES_VERSION - collector = PostgresSchemaCollector(check) - - with collector._get_cursor('datadog_test') as cursor: - assert cursor is not None - # Assert that at least one row has indexes - assert any(row['indexes'] for row in cursor) - for row in cursor: - if row['indexes']: - for index in row['indexes']: - assert index['name'] is not None - assert index['definition'] is not None - if row['table_name'] == 'cities': - assert row['indexes'] - assert row['indexes'][0]['name'] - - -def test_collect_schemas(dbm_instance, integration_check): - check = integration_check(dbm_instance) - check.version = POSTGRES_VERSION - collector = PostgresSchemaCollector(check) +from datadog_checks.base.checks.db import DatabaseCheck +from datadog_checks.base.utils.db.schemas import SchemaCollector, SchemaCollectorConfig + + +class TestDatabaseCheck(DatabaseCheck): + __test__ = False + def __init__(self): + super().__init__() + self._reported_hostname = "test_hostname" + self._database_identifier = "test_database_identifier" + self._dbms_version = "test_dbms_version" + self._agent_version = "test_agent_version" + self._tags = ["test_tag"] + self._cloud_metadata = {"test_cloud_metadata": "test_cloud_metadata"} + + @property + def reported_hostname(self): + return self._reported_hostname + + @property + def database_identifier(self): + return self._database_identifier + + @property + def dbms_version(self): + return self._dbms_version + + @property + def agent_version(self): + return self._agent_version + + @property + def tags(self): + return self._tags + + @property + def cloud_metadata(self): + return self._cloud_metadata + + +class TestSchemaCollector(SchemaCollector): + __test__ = False + def __init__(self, check: DatabaseCheck, config: SchemaCollectorConfig): + super().__init__(check, config) + self._row_index = 0 + self._rows = [{'table_name': 'test_table'}] + + def _get_databases(self): + return [{'name': 'test_database'}] + + @contextmanager + def _get_cursor(self, database: str): + yield {} + + def _get_next(self, _cursor): + if self._row_index < len(self._rows): + row = self._rows[self._row_index] + self._row_index += 1 + return row + return None + + def _map_row(self, database: str, cursor_row: dict): + return {**database} + +@pytest.mark.unit +def test_schema_collector(): + check = TestDatabaseCheck() + collector = TestSchemaCollector(check, SchemaCollectorConfig()) collector.collect_schemas() diff --git a/datadog_checks_base/tests/base/utils/test_persistent_cache.py b/datadog_checks_base/tests/base/utils/test_persistent_cache.py index 3feeaaa274194..66bda1ee24434 100644 --- a/datadog_checks_base/tests/base/utils/test_persistent_cache.py +++ b/datadog_checks_base/tests/base/utils/test_persistent_cache.py @@ -40,6 +40,7 @@ def cache_id(check: AgentCheck) -> str: class TestCheck(AgentCheck): + __test__ = False def check(self, instance): pass From 04f8163b81f211370907f7081556101a41f1f62b Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Tue, 21 Oct 2025 14:43:35 -0400 Subject: [PATCH 03/14] WIP --- .../datadog_checks/base/checks/db.py | 4 --- .../datadog_checks/base/utils/db/schemas.py | 9 +++++++ .../tests/base/utils/db/test_schemas.py | 27 +++++++++++++++++-- 3 files changed, 34 insertions(+), 6 deletions(-) diff --git a/datadog_checks_base/datadog_checks/base/checks/db.py b/datadog_checks_base/datadog_checks/base/checks/db.py index b9fee24fbb856..7b1c92ea41fbb 100644 --- a/datadog_checks_base/datadog_checks/base/checks/db.py +++ b/datadog_checks_base/datadog_checks/base/checks/db.py @@ -33,10 +33,6 @@ def database_identifier(self) -> str: def dbms_version(self) -> str: raise NotImplementedError("dbms_version is not implemented for this check") - @property - def agent_version(self) -> str: - raise NotImplementedError("agent_version is not implemented for this check") - @property def tags(self) -> list[str]: raise NotImplementedError("tags is not implemented for this check") diff --git a/datadog_checks_base/datadog_checks/base/utils/db/schemas.py b/datadog_checks_base/datadog_checks/base/utils/db/schemas.py index 1eb8bf0d921d0..2c7ce54dea383 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/schemas.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/schemas.py @@ -132,6 +132,7 @@ def base_event(self): return { "host": self._check.reported_hostname, "database_instance": self._check.database_identifier, + "kind": self.kind, "agent_version": datadog_agent.get_version(), "collection_interval": self._config.collection_interval, "dbms_version": str(self._check.dbms_version), @@ -155,7 +156,15 @@ def maybe_flush(self, is_last_payload): self._queued_rows = [] + @property @abstractmethod + def kind(self) -> str: + """ + Returns the kind property of the schema metadata event. + Subclasses should override this property to return the kind of schema being collected. + """ + raise NotImplementedError("Subclasses must implement kind") + def _get_databases(self) -> list[DatabaseInfo]: """ Returns a list of database dictionaries. diff --git a/datadog_checks_base/tests/base/utils/db/test_schemas.py b/datadog_checks_base/tests/base/utils/db/test_schemas.py index 4045f99c06b61..d064417ef4259 100644 --- a/datadog_checks_base/tests/base/utils/db/test_schemas.py +++ b/datadog_checks_base/tests/base/utils/db/test_schemas.py @@ -8,6 +8,11 @@ from datadog_checks.base.checks.db import DatabaseCheck from datadog_checks.base.utils.db.schemas import SchemaCollector, SchemaCollectorConfig +try: + import datadog_agent # type: ignore +except ImportError: + from datadog_checks.base.stubs import datadog_agent + class TestDatabaseCheck(DatabaseCheck): __test__ = False @@ -67,11 +72,29 @@ def _get_next(self, _cursor): return None def _map_row(self, database: str, cursor_row: dict): - return {**database} + return {**database, "tables": [cursor_row]} + + @property + def kind(self): + return "test_databases" @pytest.mark.unit -def test_schema_collector(): +def test_schema_collector(aggregator): check = TestDatabaseCheck() collector = TestSchemaCollector(check, SchemaCollectorConfig()) collector.collect_schemas() + + events = aggregator.get_event_platform_events("dbm-metadata") + assert len(events) == 1 + event = events[0] + assert event['kind'] == collector.kind + assert event['host'] == check.reported_hostname + assert event['database_instance'] == check.database_identifier + assert event['agent_version'] == datadog_agent.get_version() + assert event['collection_interval'] == collector._config.collection_interval + assert event['dbms_version'] == check.dbms_version + assert event['tags'] == check.tags + assert event['cloud_metadata'] == check.cloud_metadata + assert event['metadata'][0]['name'] == 'test_database' + assert event['metadata'][0]['tables'][0]['table_name'] == 'test_table' From 4624b88150c6b6baa6c3f1efbcded9cd1a8abed0 Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Tue, 21 Oct 2025 14:48:34 -0400 Subject: [PATCH 04/14] Changelog --- datadog_checks_base/changelog.d/21720.added | 1 + datadog_checks_base/tests/base/utils/db/test_schemas.py | 4 +++- datadog_checks_base/tests/base/utils/test_persistent_cache.py | 1 + 3 files changed, 5 insertions(+), 1 deletion(-) create mode 100644 datadog_checks_base/changelog.d/21720.added diff --git a/datadog_checks_base/changelog.d/21720.added b/datadog_checks_base/changelog.d/21720.added new file mode 100644 index 0000000000000..951cfcdc5b176 --- /dev/null +++ b/datadog_checks_base/changelog.d/21720.added @@ -0,0 +1 @@ +Create shared schemas collector for the Postgres, MySQL, and SQL Server integrations diff --git a/datadog_checks_base/tests/base/utils/db/test_schemas.py b/datadog_checks_base/tests/base/utils/db/test_schemas.py index d064417ef4259..8b45c5e56a335 100644 --- a/datadog_checks_base/tests/base/utils/db/test_schemas.py +++ b/datadog_checks_base/tests/base/utils/db/test_schemas.py @@ -9,13 +9,14 @@ from datadog_checks.base.utils.db.schemas import SchemaCollector, SchemaCollectorConfig try: - import datadog_agent # type: ignore + import datadog_agent # type: ignore except ImportError: from datadog_checks.base.stubs import datadog_agent class TestDatabaseCheck(DatabaseCheck): __test__ = False + def __init__(self): super().__init__() self._reported_hostname = "test_hostname" @@ -52,6 +53,7 @@ def cloud_metadata(self): class TestSchemaCollector(SchemaCollector): __test__ = False + def __init__(self, check: DatabaseCheck, config: SchemaCollectorConfig): super().__init__(check, config) self._row_index = 0 diff --git a/datadog_checks_base/tests/base/utils/test_persistent_cache.py b/datadog_checks_base/tests/base/utils/test_persistent_cache.py index 66bda1ee24434..56cc8b73e9802 100644 --- a/datadog_checks_base/tests/base/utils/test_persistent_cache.py +++ b/datadog_checks_base/tests/base/utils/test_persistent_cache.py @@ -41,6 +41,7 @@ def cache_id(check: AgentCheck) -> str: class TestCheck(AgentCheck): __test__ = False + def check(self, instance): pass From a68f875e8494b4cd85527b74116b06989eaad06b Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Tue, 21 Oct 2025 14:49:37 -0400 Subject: [PATCH 05/14] Warning --- datadog_checks_base/datadog_checks/base/utils/db/schemas.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datadog_checks_base/datadog_checks/base/utils/db/schemas.py b/datadog_checks_base/datadog_checks/base/utils/db/schemas.py index 2c7ce54dea383..72205e7bee419 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/schemas.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/schemas.py @@ -15,7 +15,7 @@ from datadog_checks.base.checks.db import DatabaseCheck try: - import datadog_agent + import datadog_agent # type: ignore except ImportError: from datadog_checks.base.stubs import datadog_agent From aa0e0ddbeb8437f122f901cbf694458ef9181144 Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Tue, 21 Oct 2025 14:50:11 -0400 Subject: [PATCH 06/14] Remove unused --- datadog_checks_base/datadog_checks/base/utils/db/schemas.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/datadog_checks_base/datadog_checks/base/utils/db/schemas.py b/datadog_checks_base/datadog_checks/base/utils/db/schemas.py index 72205e7bee419..be59c63e22bab 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/schemas.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/schemas.py @@ -19,7 +19,6 @@ except ImportError: from datadog_checks.base.stubs import datadog_agent - class DatabaseInfo(TypedDict): name: str @@ -36,7 +35,6 @@ class DatabaseObject(TypedDict): class SchemaCollectorConfig: def __init__(self): self.collection_interval = 3600 - self.enabled = False self.payload_chunk_size = 10_000 From 3c6489682bcbf705c761a7b8ef9de27e5a550cac Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Tue, 21 Oct 2025 14:54:09 -0400 Subject: [PATCH 07/14] Lint --- datadog_checks_base/datadog_checks/base/utils/db/schemas.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datadog_checks_base/datadog_checks/base/utils/db/schemas.py b/datadog_checks_base/datadog_checks/base/utils/db/schemas.py index be59c63e22bab..0e0b34c7a90a6 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/schemas.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/schemas.py @@ -15,10 +15,11 @@ from datadog_checks.base.checks.db import DatabaseCheck try: - import datadog_agent # type: ignore + import datadog_agent # type: ignore except ImportError: from datadog_checks.base.stubs import datadog_agent + class DatabaseInfo(TypedDict): name: str From da84647994cbd2162c92ec798c0528a1193da152 Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Fri, 24 Oct 2025 09:04:55 -0400 Subject: [PATCH 08/14] AI Fixes --- datadog_checks_base/datadog_checks/base/utils/db/schemas.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datadog_checks_base/datadog_checks/base/utils/db/schemas.py b/datadog_checks_base/datadog_checks/base/utils/db/schemas.py index 0e0b34c7a90a6..597bd68480ac9 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/schemas.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/schemas.py @@ -7,7 +7,7 @@ from abc import ABC, abstractmethod from typing import TYPE_CHECKING, TypedDict -import orjson as json +from datadog_checks.base.utils.format.json import json from .utils import now_ms @@ -134,6 +134,7 @@ def base_event(self): "kind": self.kind, "agent_version": datadog_agent.get_version(), "collection_interval": self._config.collection_interval, + "dbms": self._dbms, "dbms_version": str(self._check.dbms_version), "tags": self._check.tags, "cloud_metadata": self._check.cloud_metadata, From 9c2daa0d8ac72b87bfed39a6baea22665e5fe268 Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Fri, 24 Oct 2025 12:51:01 -0400 Subject: [PATCH 09/14] Fix --- datadog_checks_base/datadog_checks/base/utils/db/schemas.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datadog_checks_base/datadog_checks/base/utils/db/schemas.py b/datadog_checks_base/datadog_checks/base/utils/db/schemas.py index 597bd68480ac9..e2afb17f371ef 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/schemas.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/schemas.py @@ -7,7 +7,7 @@ from abc import ABC, abstractmethod from typing import TYPE_CHECKING, TypedDict -from datadog_checks.base.utils.format.json import json +from datadog_checks.base.utils.serialization import json from .utils import now_ms From b487d83088367fe1c18fc86e88eaed4d3110a250 Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Mon, 27 Oct 2025 10:22:03 -0400 Subject: [PATCH 10/14] Feedback --- .../datadog_checks/base/checks/db.py | 20 +++++++--- .../datadog_checks/base/utils/db/schemas.py | 37 +++++++------------ 2 files changed, 28 insertions(+), 29 deletions(-) diff --git a/datadog_checks_base/datadog_checks/base/checks/db.py b/datadog_checks_base/datadog_checks/base/checks/db.py index 7b1c92ea41fbb..ca068122ca262 100644 --- a/datadog_checks_base/datadog_checks/base/checks/db.py +++ b/datadog_checks_base/datadog_checks/base/checks/db.py @@ -2,6 +2,7 @@ # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) +from abc import abstractmethod from . import AgentCheck @@ -22,21 +23,30 @@ def database_monitoring_health(self, raw_event: str): self.event_platform_event(raw_event, "dbm-health") @property + @abstractmethod def reported_hostname(self) -> str | None: - raise NotImplementedError("reported_hostname is not implemented for this check") + pass @property + @abstractmethod def database_identifier(self) -> str: - raise NotImplementedError("database_identifier is not implemented for this check") + pass @property + def dbms(self) -> str: + return self.__class__.__name__.lower() + + @property + @abstractmethod def dbms_version(self) -> str: - raise NotImplementedError("dbms_version is not implemented for this check") + pass @property + @abstractmethod def tags(self) -> list[str]: - raise NotImplementedError("tags is not implemented for this check") + pass @property + @abstractmethod def cloud_metadata(self) -> dict: - raise NotImplementedError("cloud_metadata is not implemented for this check") + pass diff --git a/datadog_checks_base/datadog_checks/base/utils/db/schemas.py b/datadog_checks_base/datadog_checks/base/utils/db/schemas.py index e2afb17f371ef..2b41d5d96abd3 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/schemas.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/schemas.py @@ -42,25 +42,16 @@ def __init__(self): class SchemaCollector(ABC): """ Abstract base class for DBM schema collectors. - - Attributes: - _collection_started_at (int): Timestamp in whole milliseconds - when the current collection started. """ - _collection_started_at: int | None = None - def __init__(self, check: DatabaseCheck, config: SchemaCollectorConfig): self._check = check self._log = check.log self._config = config - self._dbms = check.__class__.__name__.lower() - if self._dbms == 'postgresql': - # Backwards compatibility for metrics namespacing - self._dbms = 'postgres' self._reset() def _reset(self): + # Timestamp in whole milliseconds when the current collection started. self._collection_started_at = None self._collection_payloads_count = 0 self._queued_rows = [] @@ -70,53 +61,51 @@ def collect_schemas(self) -> bool: """ Collects and submits all applicable schema metadata to the agent. This class relies on the owning check to handle scheduling this method. - - This method will enforce non-overlapping invocations and - returns False if the previous collection was still in progress when invoked again. """ - if self._collection_started_at is not None: - return False status = "success" try: self._collection_started_at = now_ms() databases = self._get_databases() + self._log.debug("Collecting schemas for %d databases", len(databases)) for database in databases: + self._log.debug("Starting collection of schemas for database %s", database['name']) database_name = database['name'] if not database_name: self._log.warning("database has no name %v", database) continue with self._get_cursor(database_name) as cursor: # Get the next row from the cursor - next = self._get_next(cursor) - while next: - self._queued_rows.append(self._map_row(database, next)) + next_row = self._get_next(cursor) + while next_row: + self._queued_rows.append(self._map_row(database, next_row)) self._total_rows_count += 1 # Because we're iterating over a cursor we need to try to get # the next row to see if we've reached the last row - next = self._get_next(cursor) - is_last_payload = database is databases[-1] and next is None + next_row = self._get_next(cursor) + is_last_payload = database is databases[-1] and next_row is None self.maybe_flush(is_last_payload) + self._log.debug("Completed collection of schemas for database %s", database_name) except Exception as e: status = "error" self._log.error("Error collecting schema: %s", e) raise e finally: self._check.histogram( - f"dd.{self._dbms}.schema.time", + f"dd.{self._check.dbms}.schema.time", now_ms() - self._collection_started_at, tags=self._check.tags + ["status:" + status], hostname=self._check.reported_hostname, raw=True, ) self._check.gauge( - f"dd.{self._dbms}.schema.tables_count", + f"dd.{self._check.dbms}.schema.tables_count", self._total_rows_count, tags=self._check.tags + ["status:" + status], hostname=self._check.reported_hostname, raw=True, ) self._check.gauge( - f"dd.{self._dbms}.schema.payloads_count", + f"dd.{self._check.dbms}.schema.payloads_count", self._collection_payloads_count, tags=self._check.tags + ["status:" + status], hostname=self._check.reported_hostname, @@ -134,7 +123,7 @@ def base_event(self): "kind": self.kind, "agent_version": datadog_agent.get_version(), "collection_interval": self._config.collection_interval, - "dbms": self._dbms, + "dbms": self._check.dbms, "dbms_version": str(self._check.dbms_version), "tags": self._check.tags, "cloud_metadata": self._check.cloud_metadata, From 5917939b3e317b77205a5bb29259c0849c86f987 Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Mon, 27 Oct 2025 12:34:58 -0400 Subject: [PATCH 11/14] Feedback --- datadog_checks_base/datadog_checks/base/utils/db/schemas.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/datadog_checks_base/datadog_checks/base/utils/db/schemas.py b/datadog_checks_base/datadog_checks/base/utils/db/schemas.py index 2b41d5d96abd3..ff5ba34f8733e 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/schemas.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/schemas.py @@ -82,7 +82,7 @@ def collect_schemas(self) -> bool: # Because we're iterating over a cursor we need to try to get # the next row to see if we've reached the last row next_row = self._get_next(cursor) - is_last_payload = database is databases[-1] and next_row is None + is_last_payload = database == databases[-1] and next_row is None self.maybe_flush(is_last_payload) self._log.debug("Completed collection of schemas for database %s", database_name) except Exception as e: @@ -132,7 +132,7 @@ def base_event(self): def maybe_flush(self, is_last_payload): if is_last_payload or len(self._queued_rows) >= self._config.payload_chunk_size: - event = self.base_event.copy() + event = self.base_event event["timestamp"] = now_ms() # DBM backend expects metadata to be an array of database objects event["metadata"] = self._queued_rows @@ -154,6 +154,7 @@ def kind(self) -> str: """ raise NotImplementedError("Subclasses must implement kind") + @abstractmethod def _get_databases(self) -> list[DatabaseInfo]: """ Returns a list of database dictionaries. From 3d7e7d294cb270d9e6fb83dfcb9e25d0bd8c6ba2 Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Mon, 27 Oct 2025 12:37:55 -0400 Subject: [PATCH 12/14] Lint --- datadog_checks_base/datadog_checks/base/checks/db.py | 1 + 1 file changed, 1 insertion(+) diff --git a/datadog_checks_base/datadog_checks/base/checks/db.py b/datadog_checks_base/datadog_checks/base/checks/db.py index ca068122ca262..bdbda0e7d095b 100644 --- a/datadog_checks_base/datadog_checks/base/checks/db.py +++ b/datadog_checks_base/datadog_checks/base/checks/db.py @@ -3,6 +3,7 @@ # Licensed under a 3-clause BSD style license (see LICENSE) from abc import abstractmethod + from . import AgentCheck From 9abc674add3e745d9bf6e9d5520bcb2d5072ff58 Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Mon, 27 Oct 2025 12:51:34 -0400 Subject: [PATCH 13/14] Refactor health --- postgres/datadog_checks/postgres/health.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/postgres/datadog_checks/postgres/health.py b/postgres/datadog_checks/postgres/health.py index 0f8978a038060..9e1f6d937089a 100644 --- a/postgres/datadog_checks/postgres/health.py +++ b/postgres/datadog_checks/postgres/health.py @@ -37,6 +37,7 @@ def submit_health_event( self, name: HealthEvent | PostgresHealthEvent, status: HealthStatus, + data: dict, **kwargs, ): """ @@ -46,14 +47,17 @@ def submit_health_event( The name of the health event. :param status: HealthStatus The health status to submit. - :param kwargs: Additional keyword arguments to include in the event. + :param data: A dictionary to be submitted as `data`. Must be JSON serializable. """ super().submit_health_event( name=name, status=status, # If we have an error parsing the config we may not have tags yet tags=self.check.tags if hasattr(self.check, 'tags') else [], - database_instance=self.check.database_identifier, - ddagenthostname=self.check.agent_hostname, + data={ + "database_instance": self.check.database_identifier, + "ddagenthostname": self.check.agent_hostname, + **(data or {}), + }, **kwargs, ) From de655f619ab486a73c81a5951706133466c483b9 Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Mon, 27 Oct 2025 13:02:42 -0400 Subject: [PATCH 14/14] Fix --- postgres/datadog_checks/postgres/health.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/postgres/datadog_checks/postgres/health.py b/postgres/datadog_checks/postgres/health.py index 9e1f6d937089a..0f8978a038060 100644 --- a/postgres/datadog_checks/postgres/health.py +++ b/postgres/datadog_checks/postgres/health.py @@ -37,7 +37,6 @@ def submit_health_event( self, name: HealthEvent | PostgresHealthEvent, status: HealthStatus, - data: dict, **kwargs, ): """ @@ -47,17 +46,14 @@ def submit_health_event( The name of the health event. :param status: HealthStatus The health status to submit. - :param data: A dictionary to be submitted as `data`. Must be JSON serializable. + :param kwargs: Additional keyword arguments to include in the event. """ super().submit_health_event( name=name, status=status, # If we have an error parsing the config we may not have tags yet tags=self.check.tags if hasattr(self.check, 'tags') else [], - data={ - "database_instance": self.check.database_identifier, - "ddagenthostname": self.check.agent_hostname, - **(data or {}), - }, + database_instance=self.check.database_identifier, + ddagenthostname=self.check.agent_hostname, **kwargs, )