Skip to content
Open
1 change: 1 addition & 0 deletions datadog_checks_base/changelog.d/21720.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Create shared schemas collector for the Postgres, MySQL, and SQL Server integrations
30 changes: 30 additions & 0 deletions datadog_checks_base/datadog_checks/base/checks/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)

from abc import abstractmethod
from . import AgentCheck


Expand All @@ -20,3 +21,32 @@ def database_monitoring_metadata(self, raw_event: str):

def database_monitoring_health(self, raw_event: str):
self.event_platform_event(raw_event, "dbm-health")

@property
@abstractmethod
def reported_hostname(self) -> str | None:
pass

@property
@abstractmethod
def database_identifier(self) -> str:
pass

@property
def dbms(self) -> str:
return self.__class__.__name__.lower()

@property
@abstractmethod
def dbms_version(self) -> str:
pass

@property
@abstractmethod
def tags(self) -> list[str]:
pass

@property
@abstractmethod
def cloud_metadata(self) -> dict:
pass
186 changes: 186 additions & 0 deletions datadog_checks_base/datadog_checks/base/utils/db/schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
# (C) Datadog, Inc. 2025-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)

from __future__ import annotations

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, TypedDict

from datadog_checks.base.utils.serialization import json

from .utils import now_ms

if TYPE_CHECKING:
from datadog_checks.base.checks.db import DatabaseCheck

try:
import datadog_agent # type: ignore
except ImportError:
from datadog_checks.base.stubs import datadog_agent


class DatabaseInfo(TypedDict):
name: str


# The schema collector sends lists of DatabaseObjects to the agent
# DBMS subclasses may add additional fields to the dictionary
class DatabaseObject(TypedDict):
name: str


# Common configuration for schema collector
# Individual DBMS implementations should map their specific
# configuration to this type
class SchemaCollectorConfig:
def __init__(self):
self.collection_interval = 3600
self.payload_chunk_size = 10_000


class SchemaCollector(ABC):
"""
Abstract base class for DBM schema collectors.
"""

def __init__(self, check: DatabaseCheck, config: SchemaCollectorConfig):
self._check = check
self._log = check.log
self._config = config
self._reset()

def _reset(self):
# Timestamp in whole milliseconds when the current collection started.
self._collection_started_at = None
self._collection_payloads_count = 0
self._queued_rows = []
self._total_rows_count = 0

def collect_schemas(self) -> bool:
"""
Collects and submits all applicable schema metadata to the agent.
This class relies on the owning check to handle scheduling this method.
"""
status = "success"
try:
self._collection_started_at = now_ms()
databases = self._get_databases()
self._log.debug("Collecting schemas for %d databases", len(databases))
for database in databases:
self._log.debug("Starting collection of schemas for database %s", database['name'])
database_name = database['name']
if not database_name:
self._log.warning("database has no name %v", database)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self._log.warning("database has no name %v", database)
self._log.warning("database has no name %s", database)

Strings should use %s

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the database object

continue
with self._get_cursor(database_name) as cursor:
# Get the next row from the cursor
next_row = self._get_next(cursor)
while next_row:
self._queued_rows.append(self._map_row(database, next_row))
self._total_rows_count += 1
# Because we're iterating over a cursor we need to try to get
# the next row to see if we've reached the last row
next_row = self._get_next(cursor)
is_last_payload = database is databases[-1] and next_row is None
self.maybe_flush(is_last_payload)
self._log.debug("Completed collection of schemas for database %s", database_name)
except Exception as e:
status = "error"
self._log.error("Error collecting schema: %s", e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth including some stats in here? Such as how many databases/tables collected / time passed?

raise e
finally:
self._check.histogram(
f"dd.{self._check.dbms}.schema.time",
now_ms() - self._collection_started_at,
tags=self._check.tags + ["status:" + status],
hostname=self._check.reported_hostname,
raw=True,
)
self._check.gauge(
f"dd.{self._check.dbms}.schema.tables_count",
self._total_rows_count,
tags=self._check.tags + ["status:" + status],
hostname=self._check.reported_hostname,
raw=True,
)
self._check.gauge(
f"dd.{self._check.dbms}.schema.payloads_count",
self._collection_payloads_count,
tags=self._check.tags + ["status:" + status],
hostname=self._check.reported_hostname,
raw=True,
)

self._reset()
return True

@property
def base_event(self):
return {
"host": self._check.reported_hostname,
"database_instance": self._check.database_identifier,
"kind": self.kind,
"agent_version": datadog_agent.get_version(),
"collection_interval": self._config.collection_interval,
"dbms": self._check.dbms,
"dbms_version": str(self._check.dbms_version),
"tags": self._check.tags,
"cloud_metadata": self._check.cloud_metadata,
"collection_started_at": self._collection_started_at,
}

def maybe_flush(self, is_last_payload):
if is_last_payload or len(self._queued_rows) >= self._config.payload_chunk_size:
event = self.base_event.copy()
event["timestamp"] = now_ms()
# DBM backend expects metadata to be an array of database objects
event["metadata"] = self._queued_rows
self._collection_payloads_count += 1
if is_last_payload:
# For the last payload, we need to include the total number of payloads collected
# This is used for snapshotting to ensure that all payloads have been received
event["collection_payloads_count"] = self._collection_payloads_count
self._check.database_monitoring_metadata(json.dumps(event))

self._queued_rows = []

@property
@abstractmethod
def kind(self) -> str:
"""
Returns the kind property of the schema metadata event.
Subclasses should override this property to return the kind of schema being collected.
"""
raise NotImplementedError("Subclasses must implement kind")

def _get_databases(self) -> list[DatabaseInfo]:
"""
Returns a list of database dictionaries.
Subclasses should override this method to return the list of databases to collect schema metadata for.
"""
raise NotImplementedError("Subclasses must implement _get_databases")

@abstractmethod
def _get_cursor(self, database):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def _get_cursor(self, database):
def _get_cursor(self, database) -> AbstractContextManager[Any]:

I think we can type this as requiring a context manager using from contextlib import AbstractContextManager

"""
Returns a cursor for the given database.
Subclasses should override this method to return the cursor for the given database.
"""
raise NotImplementedError("Subclasses must implement _get_cursor")

@abstractmethod
def _get_next(self, cursor):
"""
Returns the next row from the cursor.
Subclasses should override this method to return the next row from the cursor.
"""
raise NotImplementedError("Subclasses must implement _get_next")

def _map_row(self, database: DatabaseInfo, _cursor_row) -> DatabaseObject:
"""
Maps a cursor row to a dict that matches the schema expected by DBM.
The base implementation of this method returns just the database dictionary.
Subclasses should override this method to add schema and table data based on the cursor row.
"""
return {**database}
7 changes: 7 additions & 0 deletions datadog_checks_base/datadog_checks/base/utils/db/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,3 +590,10 @@ def get_tags(self) -> List[str]:
# Generate and cache regular tags
self._cached_tag_list = self._generate_tag_strings(self._tags)
return list(self._cached_tag_list)


def now_ms() -> int:
"""
Get the current time in whole milliseconds.
"""
return int(time.time() * 1000)
102 changes: 102 additions & 0 deletions datadog_checks_base/tests/base/utils/db/test_schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# (C) Datadog, Inc. 2023-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
from contextlib import contextmanager

import pytest

from datadog_checks.base.checks.db import DatabaseCheck
from datadog_checks.base.utils.db.schemas import SchemaCollector, SchemaCollectorConfig

try:
import datadog_agent # type: ignore
except ImportError:
from datadog_checks.base.stubs import datadog_agent


class TestDatabaseCheck(DatabaseCheck):
__test__ = False

def __init__(self):
super().__init__()
self._reported_hostname = "test_hostname"
self._database_identifier = "test_database_identifier"
self._dbms_version = "test_dbms_version"
self._agent_version = "test_agent_version"
self._tags = ["test_tag"]
self._cloud_metadata = {"test_cloud_metadata": "test_cloud_metadata"}

@property
def reported_hostname(self):
return self._reported_hostname

@property
def database_identifier(self):
return self._database_identifier

@property
def dbms_version(self):
return self._dbms_version

@property
def agent_version(self):
return self._agent_version

@property
def tags(self):
return self._tags

@property
def cloud_metadata(self):
return self._cloud_metadata


class TestSchemaCollector(SchemaCollector):
__test__ = False

def __init__(self, check: DatabaseCheck, config: SchemaCollectorConfig):
super().__init__(check, config)
self._row_index = 0
self._rows = [{'table_name': 'test_table'}]

def _get_databases(self):
return [{'name': 'test_database'}]

@contextmanager
def _get_cursor(self, database: str):
yield {}

def _get_next(self, _cursor):
if self._row_index < len(self._rows):
row = self._rows[self._row_index]
self._row_index += 1
return row
return None

def _map_row(self, database: str, cursor_row: dict):
return {**database, "tables": [cursor_row]}

@property
def kind(self):
return "test_databases"


@pytest.mark.unit
def test_schema_collector(aggregator):
check = TestDatabaseCheck()
collector = TestSchemaCollector(check, SchemaCollectorConfig())
collector.collect_schemas()

events = aggregator.get_event_platform_events("dbm-metadata")
assert len(events) == 1
event = events[0]
assert event['kind'] == collector.kind
assert event['host'] == check.reported_hostname
assert event['database_instance'] == check.database_identifier
assert event['agent_version'] == datadog_agent.get_version()
assert event['collection_interval'] == collector._config.collection_interval
assert event['dbms_version'] == check.dbms_version
assert event['tags'] == check.tags
assert event['cloud_metadata'] == check.cloud_metadata
assert event['metadata'][0]['name'] == 'test_database'
assert event['metadata'][0]['tables'][0]['table_name'] == 'test_table'
2 changes: 2 additions & 0 deletions datadog_checks_base/tests/base/utils/test_persistent_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ def cache_id(check: AgentCheck) -> str:


class TestCheck(AgentCheck):
__test__ = False

def check(self, instance):
pass

Expand Down
Loading