- 
                Notifications
    You must be signed in to change notification settings 
- Fork 77
Add Pipeline Scheduler #1522
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Pipeline Scheduler #1522
Changes from 2 commits
8cee97d
              8e3d6f3
              5c9aafb
              ddc4a11
              cf8725e
              7774267
              5893ca9
              6131a7f
              cca4b22
              a762af5
              490985d
              103d867
              e77d7b4
              17ff6f9
              c1a8e2b
              e072ccb
              db13f9d
              f3dc5c3
              919307e
              a6a6c4f
              f0371c7
              ede9272
              87b120d
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| --- | ||
| sidebar_position: 1 | ||
| --- | ||
|  | ||
| # Installing the Usage Collection Scheduler | ||
|  | ||
| ## Why do we need a usage collection scheduler? | ||
| - Some data warehouses do not preserve query history for longer than a few days or are limited to only a few rows. As an example, Amazon Redshift only preserves usage history for around 2-5 days | ||
| - Having adequate usage history is critical to provide an accurate Total Cost of Ownership (TCO) estimate | ||
| - Having too little history could produce an inaccurate representation of data warehouse query patterns and system utilization | ||
|  | ||
| ## Supported operating systems | ||
| The usage collection scheduler can be installed on the following operating systems: | ||
|  | ||
| 1. MacOS | ||
| 2. Linux | ||
|  | ||
| ## Local vs. Remote Deployments | ||
|  | ||
| The usage collection scheduler can be executed in the following deployments: | ||
|  | ||
| 1. **Local scheduler** - Scheduler is installed on a local laptop or workstation, provided that the local system can connect to a bastion host and forward connectivity to the data warehouse | ||
| 2. **Remote scheduler** - Scheduler is installed directly on a bastion host in a separate subnet or a compute instance deployed within the data warehouse's subnet | ||
|  | ||
|  | ||
| ## Automatic restart | ||
|  | ||
| The usage collection scheduler utilizes the host operating system to run as a background process. As a result, the process will automatically resume processing after a system restart; no intervention is needed during a host system restart. The scheduler will automatically detect usage history gaps and proactively backfill when possible. | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| --- | ||
| sidebar_position: 2 | ||
| title: Linux Installation | ||
| --- | ||
|  | ||
| # (Linux) Installing the Usage Collection Scheduler | ||
|  | ||
| ## Installation Steps: | ||
|  | ||
| 1. Open a new Terminal window and navigate to the remorph repository | ||
|  | ||
| 2. Copy the unit file to `/etc/systemd/system/`: | ||
|  | ||
| ```bash | ||
| $ cp ./src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.service /etc/systemd/system/ | ||
| ``` | ||
|  | ||
| 3. Copy the timer file to `/etc/systemd/system/`: | ||
|  | ||
| ```bash | ||
| $ cp ./src/databricks/labs/remorph/assessments/scheduler/install/linux/remorph_usage_collection.timer /etc/systemd/system/ | ||
| ``` | ||
|  | ||
| 4. Reload the systmd process to pick up the new files: | ||
|         
                  goodwillpunning marked this conversation as resolved.
              Outdated
          
            Show resolved
            Hide resolved | ||
|  | ||
| ```bash | ||
| $ sudo systemctl daemon-reload | ||
| ``` | ||
|  | ||
| 5. Enable the system timer | ||
|  | ||
| ```bash | ||
| $ sudo systemctl enable remorph_usage_collection.timer | ||
| ``` | ||
|  | ||
| 6. Start the system timer | ||
|  | ||
| ```bash | ||
| $ sudo systemctl start remorph_usage_collection.timer | ||
| ``` | ||
|  | ||
| ## Description of the Unit File Elements | ||
|  | ||
| 1. **Description**: A description of the service | ||
|  | ||
| 2. **After**: Ensures the service starts only after the specified system service is up (network in this case) | ||
|  | ||
| 3. **ExecStart**: Specifies the command and arguments (python3 interpreter and the script path) | ||
|  | ||
| 4. **WorkingDirectory**: The directory where the script should run | ||
|  | ||
| 5. **StandardOutput**: Redirects the standard out to the system logs | ||
|  | ||
| 6. **StandardError**: Redirects the error out to the system logs | ||
|  | ||
| 7. **Restart**: Restarts the script if the system reboots | ||
|  | ||
| 8. **User**: Runs the script as the specified user | ||
|  | ||
| 9. **WantedBy**: Ensures the service starts when the system reaches multi-user mode | ||
|  | ||
|  | ||
| ## Description of the Timer File Elements | ||
|  | ||
| 1. **Description**: A description of the timer | ||
|  | ||
| 2. **Persistent** : Ensures the script runs after reboots | ||
|  | ||
| 3. **OnBootSec**: Ensures the script runs 1 min after bootin | ||
|  | ||
| 4. **OnUnitActiveSec**: Runs the script every 15 minutes | ||
|  | ||
| 5. **WantedBy**: Ensures the timer is activated when the system is running timers | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| --- | ||
| sidebar_position: 1 | ||
| title: MacOS Installation | ||
| --- | ||
|  | ||
| # (MacOS) Installing the Usage Collection Scheduler | ||
|  | ||
| ## Installation Steps: | ||
|  | ||
| 1. Open a new Terminal window and navigate to the remorph repository | ||
|  | ||
| 2. Copy the plist file to `~/Library/LaunchAgents/` for a user-specific task or `/Library/LaunchDaemons/` for a system-wide task: | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Most users won't know the difference, I don't think? My advice here is to advise one of these, or help the user choose. (My preference: user-specific, given the rest of the instructions assume this.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, totally agree. I'll update the PR to install only a user-specific agent. | ||
|  | ||
| ```bash | ||
| $ cp ./src/databricks/labs/remorph/assessments/scheduler/install/macos/com.remorph.usagecollection.plist ~/Library/LaunchAgents/ | ||
| ``` | ||
|  | ||
| 3. Next, load the process by executing the following command: | ||
|  | ||
| ```bash | ||
| $ launchctl load ~/Library/LaunchAgents/com.remorph.usagecollection.plist | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does this work, in sequio and above  There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems to work. I've been running this process on my Mac for over a month now. Happy to update to another approach if you feel it's more appropriate. | ||
| ``` | ||
|  | ||
| 4. Grant the usage collection script with execution permissions: | ||
|  | ||
| ```bash | ||
| $ chmod +x ./src/databricks/labs/remorph/assessments/scheduler/usage_collector.py | ||
| ``` | ||
| 
      Comment on lines
    
      +24
     to 
      +28
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should be earlier… step 2? Otherwise I think the first run will already be triggered (and fail). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great catch. Thanks | ||
|  | ||
| ## Description of the `plist` Elements | ||
|  | ||
| 1. **Label**: A unique identifier for the job | ||
|  | ||
| 2. **ProgramArguments**: Specifies the command and arguments (python3 interpreter and the script path) | ||
|  | ||
| 3. **StartInterval**: Runs the script every 900 seconds (15 minutes) | ||
|  | ||
| 4. **RunAtLoad**: Ensures the script runs immediately after loading | ||
|  | ||
| 5. **StandardOutPath**: Logs output to `~/Downloads/remorph/scheduler/stdout` for debugging | ||
|  | ||
| 6. **StandardErrorPath**: Logs errors to `~/Downloads/remorph/scheduler/stderr` for debugging | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| import duckdb | ||
|  | ||
|  | ||
| class DuckDBManager: | ||
| """Manages a connection to a single DuckDB database instance.""" | ||
|  | ||
| _instance = None | ||
| _connection = None | ||
|  | ||
| def __new__(cls, db_path: str = "remorph_profiler_db.duckdb"): | ||
| if cls._instance is None: | ||
| cls._instance = super(DuckDBManager, cls).__new__(cls) | ||
| cls._instance._init_connection(db_path) | ||
| return cls._instance | ||
|  | ||
| def _init_connection(self, db_path: str): | ||
| self._db_path = db_path | ||
| self._connection = duckdb.connect(database=db_path) | ||
|  | ||
| def get_connection(self): | ||
| return self._connection | ||
|  | ||
| def close_connection(self): | ||
| if self._connection is not None: | ||
| self._connection.close() | ||
| DuckDBManager._instance = None | ||
| DuckDBManager._connection = None | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -5,6 +5,7 @@ | |
| import subprocess | ||
| import yaml | ||
| import duckdb | ||
| from enum import StrEnum | ||
|  | ||
| from databricks.labs.remorph.connections.credential_manager import cred_file | ||
|  | ||
|  | @@ -17,6 +18,12 @@ | |
| DB_NAME = "profiler_extract.db" | ||
|  | ||
|  | ||
| class StepExecutionStatus(StrEnum): | ||
| COMPLETE = "COMPLETE" | ||
| ERROR = "ERROR" | ||
| SKIPPED = "SKIPPED" | ||
|  | ||
|  | ||
| class PipelineClass: | ||
| def __init__(self, config: PipelineConfig, executor: DatabaseManager): | ||
| self.config = config | ||
|  | @@ -28,16 +35,26 @@ def execute(self): | |
| for step in self.config.steps: | ||
| if step.flag == "active": | ||
| logging.debug(f"Executing step: {step.name}") | ||
| self._execute_step(step) | ||
| self.execute_step(step) | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you expect to people to run  There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry I meant to sync with you. I had to make this a public function so that the scheduler or another process can parse the pipeline config and execute the step based upon the defined frequency. What do you think? I know you made this private so that the pipeline class can execute entirely. But there needs to be a hook for the scheduler class or the pipeline class needs to parse the scheduled frequency info. | ||
| logging.info("Pipeline execution completed") | ||
|  | ||
| def _execute_step(self, step: Step): | ||
| if step.type == "sql": | ||
| self._execute_sql_step(step) | ||
| elif step.type == "python": | ||
| self._execute_python_step(step) | ||
| else: | ||
| logging.error(f"Unsupported step type: {step.type}") | ||
| def execute_step(self, step: Step) -> StrEnum: | ||
| try: | ||
| if step.type == "sql": | ||
| logging.info(f"Executing SQL step {step.name}") | ||
| self._execute_sql_step(step) | ||
| status = StepExecutionStatus.COMPLETE | ||
| elif step.type == "python": | ||
| logging.info(f"Executing Python step {step.name}") | ||
| self._execute_python_step(step) | ||
| status = StepExecutionStatus.COMPLETE | ||
| else: | ||
| logging.error(f"Unsupported step type: {step.type}") | ||
| raise RuntimeError(f"Unsupported step type: {step.type}") | ||
| except RuntimeError as e: | ||
| logging.error(e) | ||
| status = StepExecutionStatus.ERROR | ||
| return status | ||
|  | ||
| def _execute_sql_step(self, step: Step): | ||
| logging.debug(f"Reading query from file: {step.extract_source}") | ||
|  | @@ -86,7 +103,8 @@ def _save_to_db(self, result, step_name: str, mode: str, batch_size: int = 1000) | |
| self._create_dir(self.db_path_prefix) | ||
| conn = duckdb.connect(str(self.db_path_prefix) + '/' + DB_NAME) | ||
| columns = result.keys() | ||
| # TODO: Add support for figuring out data types from SQLALCHEMY result object result.cursor.description is not reliable | ||
| # TODO: Add support for figuring out data types from SQLALCHEMY | ||
| # result object result.cursor.description is not reliable | ||
| schema = ' STRING, '.join(columns) + ' STRING' | ||
|  | ||
| # Handle write modes | ||
|  | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| [Unit] | ||
| Description=Remorph usage collection service | ||
| After=network.target | ||
|  | ||
| [Service] | ||
| ExecStart=/usr/bin/python3 ~/Downloads/remorph/scheduler/usage_collector.py | ||
| WorkingDirectory=~/Downloads/remorph/scheduler/ | ||
| StandardOutput=journal | ||
| StandardError=journal | ||
| Restart=always | ||
| User=root | ||
|  | ||
| [Install] | ||
| WantedBy=multi-user.target | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| [Unit] | ||
| Description=Remorph usage collection every 15 minutes | ||
| After=network.target | ||
|  | ||
| [Service] | ||
| ExecStart=/usr/bin/python3 ~/Downloads/remorph/scheduler/usage_collector.py | ||
| WorkingDirectory=~/Downloads/remorph/scheduler/ | ||
| StandardOutput=journal | ||
| StandardError=journal | ||
| Restart=always | ||
| User=root | ||
|  | ||
| [Install] | ||
| WantedBy=multi-user.target | ||
|          | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| <?xml version="1.0" encoding="UTF-8"?> | ||
| <!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd"> | ||
| <plist version="1.0"> | ||
| <dict> | ||
| <key>Label</key> | ||
| <string>com.remorph.usagecollection</string> | ||
|  | ||
| <key>ProgramArguments</key> | ||
| <array> | ||
| <string>/Library/Frameworks/Python.framework/Versions/3.11/bin/python3</string> | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't something you can assume, unfortunately: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, great catch. Will update this to be dynamic. | ||
| <string>~/Downloads/remorph/scheduler/usage_collector.py</string> | ||
|          | ||
| </array> | ||
|  | ||
| <key>StartInterval</key> | ||
| <integer>900</integer> | ||
|  | ||
| <key>RunAtLoad</key> | ||
| <true/> | ||
|  | ||
| <key>StandardOutPath</key> | ||
| <string>~/Downloads/remorph/scheduler/stdout/usagecollection.log</string> | ||
|  | ||
| <key>StandardErrorPath</key> | ||
| <string>~/Downloads/remorph/scheduler/stderr/usagecollection_error.log</string> | ||
| </dict> | ||
| </plist> | ||
|  | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These instructions are clear, although I think that we also need to describe how to remove it and clean up afterwards.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great point. I didn't even think about uninstallation.