Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Added
-----

- Add a ``FlowTimer`` class to aid in creating timers that run flows.
6 changes: 5 additions & 1 deletion docs/services/timer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ Globus Timers
Helper Objects
--------------

A helper is provided for constructing Transfer Timers:
A helper is provided for constructing Transfer and Flows timers:

.. autoclass:: FlowTimer
:members:
:show-inheritance:

.. autoclass:: TransferTimer
:members:
Expand Down
2 changes: 2 additions & 0 deletions src/globus_sdk/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ from .services.search import (
)
from .services.timer import TimerAPIError, TimerClient
from .services.timers import (
FlowTimer,
OnceTimerSchedule,
RecurringTimerSchedule,
TimerJob,
Expand Down Expand Up @@ -239,6 +240,7 @@ __all__ = (
"TimerJob",
"TimersAPIError",
"TimersClient",
"FlowTimer",
"TransferTimer",
"ActivationRequirementsResponse",
"DeleteData",
Expand Down
35 changes: 35 additions & 0 deletions src/globus_sdk/_testing/data/timer/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,41 @@
},
"status": "new",
"submitted_at": "2023-10-26T20:31:09+00:00",
"timer_type": "transfer",
}


FLOW_ID = str(uuid.uuid4())
V2_FLOW_TIMER = {
"callback_body": {
"body": {"input_key": "input_value"},
"run_managers": [f"urn:globus:auth:identity:{uuid.uuid4()}"],
},
"callback_url": f"https://flows.automate.globus.org/flows/{FLOW_ID}/run",
"inactive_reason": None,
"interval": None,
"job_id": str(uuid.uuid4()),
"last_ran_at": None,
"n_errors": 0,
"n_runs": 0,
"name": "Very Cool Timer",
"next_run": "2025-10-27T05:00:00+00:00",
"results": [],
"schedule": {
"datetime": "2025-10-27T05:00:00+00:00",
"type": "once",
},
"scope": (
f"https://auth.globus.org/scopes/{FLOW_ID}"
f"/flow_{FLOW_ID.replace('-', '_')}_user"
),
"status": "new",
"stop_after": {
"date": None,
"n_runs": 1,
},
"submitted_at": "2025-08-01T20:31:09+00:00",
"timer_type": "flow",
}


Expand Down
24 changes: 23 additions & 1 deletion src/globus_sdk/_testing/data/timer/create_timer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
from globus_sdk._testing.models import RegisteredResponse, ResponseSet

from ._common import DEST_EP_ID, SOURCE_EP_ID, TIMER_ID, V2_TRANSFER_TIMER
from ._common import (
DEST_EP_ID,
FLOW_ID,
SOURCE_EP_ID,
TIMER_ID,
V2_FLOW_TIMER,
V2_TRANSFER_TIMER,
)

RESPONSES = ResponseSet(
default=RegisteredResponse(
Expand All @@ -17,4 +24,19 @@
"destination_endpoint": DEST_EP_ID,
},
),
flow_timer_success=RegisteredResponse(
service="timer",
path="/v2/timer",
method="POST",
json={
"timer": V2_FLOW_TIMER,
},
status=201,
metadata={
"timer_id": V2_FLOW_TIMER["job_id"],
"flow_id": FLOW_ID,
"callback_body": V2_FLOW_TIMER["callback_body"],
"schedule": V2_FLOW_TIMER["schedule"],
},
),
)
9 changes: 8 additions & 1 deletion src/globus_sdk/services/timers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
from .client import TimersClient
from .data import OnceTimerSchedule, RecurringTimerSchedule, TimerJob, TransferTimer
from .data import (
FlowTimer,
OnceTimerSchedule,
RecurringTimerSchedule,
TimerJob,
TransferTimer,
)
from .errors import TimersAPIError

__all__ = (
"FlowTimer",
"TimersAPIError",
"TimersClient",
"OnceTimerSchedule",
Expand Down
4 changes: 2 additions & 2 deletions src/globus_sdk/services/timers/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
TransferScopes,
)

from .data import TimerJob, TransferTimer
from .data import FlowTimer, TimerJob, TransferTimer
from .errors import TimersAPIError

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -138,7 +138,7 @@ def get_job(
return self.get(f"/jobs/{job_id}", query_params=query_params)

def create_timer(
self, timer: dict[str, t.Any] | TransferTimer
self, timer: dict[str, t.Any] | TransferTimer | FlowTimer
) -> response.GlobusHTTPResponse:
"""
:param timer: a document defining the new timer
Expand Down
109 changes: 109 additions & 0 deletions src/globus_sdk/services/timers/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import datetime as dt
import logging
import typing as t
import uuid

from globus_sdk.config import get_service_url
from globus_sdk.exc import warn_deprecated
Expand Down Expand Up @@ -123,6 +124,114 @@ def _preprocess_body(
return new_body


class FlowTimer(PayloadWrapper):
"""
A helper for defining a payload for Flow Timer creation.
Use this along with :meth:`create_timer <globus_sdk.TimersClient.create_timer>` to
create a timer.

.. note::

``TimersClient`` has two methods for creating timers:
``create_timer`` and ``create_job``.

This helper class only works with the ``create_timer`` method.

:param flow_id: The flow ID to run when the timer runs.
:param name: A name to identify this timer.
:param schedule: The schedule on which the timer runs
:param body: A transfer payload for the timer to use.

The ``schedule`` field determines when the timer will run.
Timers may be "run once" or "recurring", and "recurring" timers may specify an end
date or the number of executions after which the timer will stop.
A ``schedule`` is specified as a dict, but the SDK provides two helpers
for constructing these data.

**Example Schedules**

.. tab-set::

.. tab-item:: Run Once, Right Now

.. code-block:: python

schedule = OnceTimerSchedule()

.. tab-item:: Run Once, At a Specific Time

.. code-block:: python

schedule = OnceTimerSchedule(datetime="2023-09-22T00:00:00Z")

.. tab-item:: Run Every 5 Minutes, Until a Specific Time

.. code-block:: python

schedule = RecurringTimerSchedule(
interval_seconds=300,
end={"condition": "time", "datetime": "2023-10-01T00:00:00Z"},
)

.. tab-item:: Run Every 30 Minutes, 10 Times

.. code-block:: python

schedule = RecurringTimerSchedule(
interval_seconds=1800,
end={"condition": "iterations", "iterations": 10},
)

.. tab-item:: Run Every 10 Minutes, Indefinitely

.. code-block:: python

schedule = RecurringTimerSchedule(interval_seconds=600)

Using these schedules, you can create a timer:

.. code-block:: pycon

>>> from globus_sdk import FlowTimer
>>> schedule = ...
>>> timer = FlowTimer(
... name="my timer",
... flow_id="00000000-19a9-44e6-9c1a-867da59d84ab",
... schedule=schedule,
... body={
... "body": {
... "input_key": "input_value",
... },
... "run_managers": [
... "urn:globus:auth:identity:11111111-be6a-473a-a027-4cfe4ceeafe3"
... ],
... },
... )

Submit the timer to the Timers service with
:meth:`create_timer <globus_sdk.TimersClient.create_timer>`.
"""

def __init__(
self,
*,
flow_id: uuid.UUID | str,
name: str | MissingType = MISSING,
schedule: dict[str, t.Any] | RecurringTimerSchedule | OnceTimerSchedule,
body: dict[str, t.Any],
) -> None:
super().__init__()
self["timer_type"] = "flow"
self["flow_id"] = flow_id
self["name"] = name
self["schedule"] = schedule
self["body"] = self._preprocess_body(body)

def _preprocess_body(self, body: dict[str, t.Any]) -> dict[str, t.Any]:
# Additional processing may be added in the future.
return body.copy()


class RecurringTimerSchedule(PayloadWrapper):
"""
A helper used as part of a *timer* to define when the *timer* will run.
Expand Down
21 changes: 21 additions & 0 deletions tests/functional/services/timers/test_create_timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,24 @@ def test_transfer_timer_creation(client):
assert sent["timer"]["body"] == {
k: v for k, v in body.items() if k != "skip_activation_check"
}


def test_flow_timer_creation(client):
# Setup
meta = load_response(client.create_timer, case="flow_timer_success").metadata

# Act
client.create_timer(
timer=globus_sdk.FlowTimer(
flow_id=meta["flow_id"],
body=meta["callback_body"],
schedule=meta["schedule"],
)
)

# Verify
req = get_last_request()
sent = json.loads(req.body)
assert sent["timer"]["flow_id"] == meta["flow_id"]
assert sent["timer"]["body"] == meta["callback_body"]
assert sent["timer"]["schedule"] == meta["schedule"]
Loading