-
Couldn't load subscription status.
- Fork 1.5k
Create shared schemas collector for DBM integrations #21720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 10 commits
6b80e5a
96e5260
04f8163
4624b88
a68f875
aa0e0dd
3c64896
da84647
9c2daa0
b487d83
5917939
3d7e7d2
9abc674
de655f6
cf247b5
2a65b0a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Create shared schemas collector for the Postgres, MySQL, and SQL Server integrations |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,186 @@ | ||||||
| # (C) Datadog, Inc. 2025-present | ||||||
| # All rights reserved | ||||||
| # Licensed under a 3-clause BSD style license (see LICENSE) | ||||||
|
|
||||||
| from __future__ import annotations | ||||||
|
|
||||||
| from abc import ABC, abstractmethod | ||||||
| from typing import TYPE_CHECKING, TypedDict | ||||||
|
|
||||||
| from datadog_checks.base.utils.serialization import json | ||||||
|
|
||||||
| from .utils import now_ms | ||||||
|
|
||||||
| if TYPE_CHECKING: | ||||||
| from datadog_checks.base.checks.db import DatabaseCheck | ||||||
|
|
||||||
| try: | ||||||
| import datadog_agent # type: ignore | ||||||
| except ImportError: | ||||||
| from datadog_checks.base.stubs import datadog_agent | ||||||
|
|
||||||
|
|
||||||
| class DatabaseInfo(TypedDict): | ||||||
| name: str | ||||||
|
|
||||||
|
|
||||||
| # The schema collector sends lists of DatabaseObjects to the agent | ||||||
| # DBMS subclasses may add additional fields to the dictionary | ||||||
| class DatabaseObject(TypedDict): | ||||||
| name: str | ||||||
|
|
||||||
|
|
||||||
| # Common configuration for schema collector | ||||||
| # Individual DBMS implementations should map their specific | ||||||
| # configuration to this type | ||||||
| class SchemaCollectorConfig: | ||||||
| def __init__(self): | ||||||
| self.collection_interval = 3600 | ||||||
| self.payload_chunk_size = 10_000 | ||||||
|
|
||||||
|
|
||||||
| class SchemaCollector(ABC): | ||||||
| """ | ||||||
| Abstract base class for DBM schema collectors. | ||||||
| """ | ||||||
|
|
||||||
| def __init__(self, check: DatabaseCheck, config: SchemaCollectorConfig): | ||||||
| self._check = check | ||||||
| self._log = check.log | ||||||
| self._config = config | ||||||
| self._reset() | ||||||
|
|
||||||
| def _reset(self): | ||||||
| # Timestamp in whole milliseconds when the current collection started. | ||||||
| self._collection_started_at = None | ||||||
| self._collection_payloads_count = 0 | ||||||
| self._queued_rows = [] | ||||||
| self._total_rows_count = 0 | ||||||
|
|
||||||
| def collect_schemas(self) -> bool: | ||||||
| """ | ||||||
| Collects and submits all applicable schema metadata to the agent. | ||||||
| This class relies on the owning check to handle scheduling this method. | ||||||
| """ | ||||||
| status = "success" | ||||||
| try: | ||||||
| self._collection_started_at = now_ms() | ||||||
| databases = self._get_databases() | ||||||
| self._log.debug("Collecting schemas for %d databases", len(databases)) | ||||||
| for database in databases: | ||||||
| self._log.debug("Starting collection of schemas for database %s", database['name']) | ||||||
| database_name = database['name'] | ||||||
sethsamuel marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| if not database_name: | ||||||
| self._log.warning("database has no name %v", database) | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Strings should use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the database object |
||||||
| continue | ||||||
| with self._get_cursor(database_name) as cursor: | ||||||
| # Get the next row from the cursor | ||||||
| next_row = self._get_next(cursor) | ||||||
| while next_row: | ||||||
| self._queued_rows.append(self._map_row(database, next_row)) | ||||||
| self._total_rows_count += 1 | ||||||
| # Because we're iterating over a cursor we need to try to get | ||||||
| # the next row to see if we've reached the last row | ||||||
| next_row = self._get_next(cursor) | ||||||
| is_last_payload = database is databases[-1] and next_row is None | ||||||
| self.maybe_flush(is_last_payload) | ||||||
| self._log.debug("Completed collection of schemas for database %s", database_name) | ||||||
| except Exception as e: | ||||||
| status = "error" | ||||||
| self._log.error("Error collecting schema: %s", e) | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Worth including some stats in here? Such as how many databases/tables collected / time passed? |
||||||
| raise e | ||||||
| finally: | ||||||
| self._check.histogram( | ||||||
| f"dd.{self._check.dbms}.schema.time", | ||||||
| now_ms() - self._collection_started_at, | ||||||
| tags=self._check.tags + ["status:" + status], | ||||||
| hostname=self._check.reported_hostname, | ||||||
| raw=True, | ||||||
| ) | ||||||
| self._check.gauge( | ||||||
| f"dd.{self._check.dbms}.schema.tables_count", | ||||||
| self._total_rows_count, | ||||||
| tags=self._check.tags + ["status:" + status], | ||||||
| hostname=self._check.reported_hostname, | ||||||
| raw=True, | ||||||
| ) | ||||||
| self._check.gauge( | ||||||
| f"dd.{self._check.dbms}.schema.payloads_count", | ||||||
| self._collection_payloads_count, | ||||||
| tags=self._check.tags + ["status:" + status], | ||||||
| hostname=self._check.reported_hostname, | ||||||
| raw=True, | ||||||
| ) | ||||||
|
|
||||||
| self._reset() | ||||||
| return True | ||||||
|
|
||||||
| @property | ||||||
| def base_event(self): | ||||||
| return { | ||||||
| "host": self._check.reported_hostname, | ||||||
| "database_instance": self._check.database_identifier, | ||||||
| "kind": self.kind, | ||||||
| "agent_version": datadog_agent.get_version(), | ||||||
| "collection_interval": self._config.collection_interval, | ||||||
| "dbms": self._check.dbms, | ||||||
| "dbms_version": str(self._check.dbms_version), | ||||||
| "tags": self._check.tags, | ||||||
| "cloud_metadata": self._check.cloud_metadata, | ||||||
| "collection_started_at": self._collection_started_at, | ||||||
| } | ||||||
|
|
||||||
| def maybe_flush(self, is_last_payload): | ||||||
| if is_last_payload or len(self._queued_rows) >= self._config.payload_chunk_size: | ||||||
| event = self.base_event.copy() | ||||||
sethsamuel marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||
| event["timestamp"] = now_ms() | ||||||
| # DBM backend expects metadata to be an array of database objects | ||||||
| event["metadata"] = self._queued_rows | ||||||
| self._collection_payloads_count += 1 | ||||||
| if is_last_payload: | ||||||
| # For the last payload, we need to include the total number of payloads collected | ||||||
| # This is used for snapshotting to ensure that all payloads have been received | ||||||
| event["collection_payloads_count"] = self._collection_payloads_count | ||||||
| self._check.database_monitoring_metadata(json.dumps(event)) | ||||||
|
|
||||||
| self._queued_rows = [] | ||||||
|
|
||||||
| @property | ||||||
| @abstractmethod | ||||||
| def kind(self) -> str: | ||||||
| """ | ||||||
| Returns the kind property of the schema metadata event. | ||||||
| Subclasses should override this property to return the kind of schema being collected. | ||||||
| """ | ||||||
| raise NotImplementedError("Subclasses must implement kind") | ||||||
|
|
||||||
| def _get_databases(self) -> list[DatabaseInfo]: | ||||||
sethsamuel marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| """ | ||||||
| Returns a list of database dictionaries. | ||||||
| Subclasses should override this method to return the list of databases to collect schema metadata for. | ||||||
| """ | ||||||
| raise NotImplementedError("Subclasses must implement _get_databases") | ||||||
|
|
||||||
| @abstractmethod | ||||||
| def _get_cursor(self, database): | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
I think we can type this as requiring a context manager using |
||||||
| """ | ||||||
| Returns a cursor for the given database. | ||||||
| Subclasses should override this method to return the cursor for the given database. | ||||||
| """ | ||||||
| raise NotImplementedError("Subclasses must implement _get_cursor") | ||||||
|
|
||||||
| @abstractmethod | ||||||
| def _get_next(self, cursor): | ||||||
| """ | ||||||
| Returns the next row from the cursor. | ||||||
| Subclasses should override this method to return the next row from the cursor. | ||||||
| """ | ||||||
| raise NotImplementedError("Subclasses must implement _get_next") | ||||||
|
|
||||||
| def _map_row(self, database: DatabaseInfo, _cursor_row) -> DatabaseObject: | ||||||
| """ | ||||||
| Maps a cursor row to a dict that matches the schema expected by DBM. | ||||||
| The base implementation of this method returns just the database dictionary. | ||||||
| Subclasses should override this method to add schema and table data based on the cursor row. | ||||||
| """ | ||||||
| return {**database} | ||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,102 @@ | ||
| # (C) Datadog, Inc. 2023-present | ||
| # All rights reserved | ||
| # Licensed under a 3-clause BSD style license (see LICENSE) | ||
| from contextlib import contextmanager | ||
|
|
||
| import pytest | ||
|
|
||
| from datadog_checks.base.checks.db import DatabaseCheck | ||
| from datadog_checks.base.utils.db.schemas import SchemaCollector, SchemaCollectorConfig | ||
|
|
||
| try: | ||
| import datadog_agent # type: ignore | ||
| except ImportError: | ||
| from datadog_checks.base.stubs import datadog_agent | ||
|
|
||
|
|
||
| class TestDatabaseCheck(DatabaseCheck): | ||
| __test__ = False | ||
|
|
||
| def __init__(self): | ||
| super().__init__() | ||
| self._reported_hostname = "test_hostname" | ||
| self._database_identifier = "test_database_identifier" | ||
| self._dbms_version = "test_dbms_version" | ||
| self._agent_version = "test_agent_version" | ||
| self._tags = ["test_tag"] | ||
| self._cloud_metadata = {"test_cloud_metadata": "test_cloud_metadata"} | ||
|
|
||
| @property | ||
| def reported_hostname(self): | ||
| return self._reported_hostname | ||
|
|
||
| @property | ||
| def database_identifier(self): | ||
| return self._database_identifier | ||
|
|
||
| @property | ||
| def dbms_version(self): | ||
| return self._dbms_version | ||
|
|
||
| @property | ||
| def agent_version(self): | ||
| return self._agent_version | ||
|
|
||
| @property | ||
| def tags(self): | ||
| return self._tags | ||
|
|
||
| @property | ||
| def cloud_metadata(self): | ||
| return self._cloud_metadata | ||
|
|
||
|
|
||
| class TestSchemaCollector(SchemaCollector): | ||
| __test__ = False | ||
|
|
||
| def __init__(self, check: DatabaseCheck, config: SchemaCollectorConfig): | ||
| super().__init__(check, config) | ||
| self._row_index = 0 | ||
| self._rows = [{'table_name': 'test_table'}] | ||
|
|
||
| def _get_databases(self): | ||
| return [{'name': 'test_database'}] | ||
|
|
||
| @contextmanager | ||
| def _get_cursor(self, database: str): | ||
| yield {} | ||
|
|
||
| def _get_next(self, _cursor): | ||
| if self._row_index < len(self._rows): | ||
| row = self._rows[self._row_index] | ||
| self._row_index += 1 | ||
| return row | ||
| return None | ||
|
|
||
| def _map_row(self, database: str, cursor_row: dict): | ||
| return {**database, "tables": [cursor_row]} | ||
|
|
||
| @property | ||
| def kind(self): | ||
| return "test_databases" | ||
|
|
||
|
|
||
| @pytest.mark.unit | ||
| def test_schema_collector(aggregator): | ||
| check = TestDatabaseCheck() | ||
| collector = TestSchemaCollector(check, SchemaCollectorConfig()) | ||
| collector.collect_schemas() | ||
|
|
||
| events = aggregator.get_event_platform_events("dbm-metadata") | ||
| assert len(events) == 1 | ||
| event = events[0] | ||
| assert event['kind'] == collector.kind | ||
| assert event['host'] == check.reported_hostname | ||
| assert event['database_instance'] == check.database_identifier | ||
| assert event['agent_version'] == datadog_agent.get_version() | ||
| assert event['collection_interval'] == collector._config.collection_interval | ||
| assert event['dbms_version'] == check.dbms_version | ||
| assert event['tags'] == check.tags | ||
| assert event['cloud_metadata'] == check.cloud_metadata | ||
| assert event['metadata'][0]['name'] == 'test_database' | ||
| assert event['metadata'][0]['tables'][0]['table_name'] == 'test_table' |
Uh oh!
There was an error while loading. Please reload this page.