Skip to content

Commit 4a21069

Browse files
author
Maor Kleinberger
authored
Support celery appids with queue workers (#332)
* Support celery app id with queue workers * Support short prefix args and add test case for multiple queues
1 parent 9765560 commit 4a21069

File tree

3 files changed

+56
-7
lines changed

3 files changed

+56
-7
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,3 +145,6 @@ cython_debug/
145145
# Resources
146146
gprofiler/resources/*
147147
!gprofiler/resources/flamegraph
148+
149+
# IDEs
150+
.idea

gprofiler/metadata/application_identifiers.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
_logger = get_logger_adapter(__name__)
2020

21-
2221
_PYTHON_BIN_RE = re.compile(r"^python([23](\.\d{1,2})?)?$")
2322

2423

@@ -47,7 +46,9 @@ def _is_python_bin(bin_name: str) -> bool:
4746
return _PYTHON_BIN_RE.match(os.path.basename(bin_name)) is not None
4847

4948

50-
def _get_cli_arg_by_name(args: List[str], arg_name: str, check_for_equals_arg: bool = False) -> str:
49+
def _get_cli_arg_by_name(
50+
args: List[str], arg_name: str, check_for_equals_arg: bool = False, check_for_short_prefix_arg: bool = False
51+
) -> str:
5152
if arg_name in args:
5253
return args[args.index(arg_name) + 1]
5354

@@ -57,6 +58,11 @@ def _get_cli_arg_by_name(args: List[str], arg_name: str, check_for_equals_arg: b
5758
if arg_key == arg_name:
5859
return arg_val
5960

61+
if check_for_short_prefix_arg:
62+
for arg in args:
63+
if arg.startswith(arg_name):
64+
return arg[len(arg_name) :]
65+
6066
return _NON_AVAILABLE_ARG
6167

6268

@@ -201,9 +207,17 @@ def get_application_name(self, process: Process) -> Optional[str]:
201207
if not self.is_celery_process(process):
202208
return None
203209

204-
app_name = _get_cli_arg_by_name(process.cmdline(), "-A") or _get_cli_arg_by_name(
205-
process.cmdline(), "--app", check_for_equals_arg=True
206-
)
210+
app_name = _get_cli_arg_by_name(
211+
process.cmdline(), "-A", check_for_short_prefix_arg=True
212+
) or _get_cli_arg_by_name(process.cmdline(), "--app", check_for_equals_arg=True)
213+
if app_name is _NON_AVAILABLE_ARG:
214+
queue_name = _get_cli_arg_by_name(
215+
process.cmdline(), "-Q", check_for_short_prefix_arg=True
216+
) or _get_cli_arg_by_name(process.cmdline(), "--queues", check_for_equals_arg=True)
217+
# TODO: One worker can handle multiple queues, it could be useful to encode that into the app id.
218+
if queue_name is not _NON_AVAILABLE_ARG:
219+
# The queue handler routing is defined in the directory where the worker is run
220+
return f"celery queue: {queue_name} ({process.cwd()})"
207221
if app_name is _NON_AVAILABLE_ARG:
208222
_logger.warning(
209223
f"{self.__class__.__name__}: Couldn't find positional argument -A or --app for application indication",

tests/test_appids.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,14 +81,17 @@ def get_uwsgi_config(process: Process, config_file: str) -> TextIO:
8181
assert "uwsgi: my.ini" == get_application_name(process_with_cmdline(["uwsgi", "a", "b", "--ini", "my.ini"]))
8282

8383

84-
def test_celery() -> None:
84+
def test_celery_with_app() -> None:
8585
# celery -A
8686
assert f"celery: app1 ({PROCESS_CWD}/app1.py)" == get_application_name(
8787
process_with_cmdline(["celery", "a", "b", "-A", "app1"])
8888
)
8989
assert "celery: /path/to/app1 (/path/to/app1.py)" == get_application_name(
9090
process_with_cmdline(["celery", "a", "b", "-A", "/path/to/app1"])
9191
)
92+
assert "celery: /path/to/app1 (/path/to/app1.py)" == get_application_name(
93+
process_with_cmdline(["celery", "a", "b", "-A/path/to/app1"])
94+
)
9295
# python celery -A
9396
assert f"celery: app1 ({PROCESS_CWD}/app1.py)" == get_application_name(
9497
process_with_cmdline(["python", "/path/to/celery", "a", "b", "-A", "app1"])
@@ -110,7 +113,36 @@ def test_celery() -> None:
110113
assert "celery: /path/to/app3 (/path/to/app3.py)" == get_application_name(
111114
process_with_cmdline(["celery", "a", "b", "--app=/path/to/app3"])
112115
)
113-
# No app
116+
117+
118+
def test_celery_with_queue() -> None:
119+
# celery -Q queue
120+
assert f"celery queue: qqq ({PROCESS_CWD})" == get_application_name(
121+
process_with_cmdline(["celery", "a", "b", "-Q", "qqq"])
122+
)
123+
# celery -Qqueue
124+
assert f"celery queue: qqq ({PROCESS_CWD})" == get_application_name(
125+
process_with_cmdline(["celery", "a", "b", "-Qqqq"])
126+
)
127+
# python celery -Q queue
128+
assert f"celery queue: qqq ({PROCESS_CWD})" == get_application_name(
129+
process_with_cmdline(["python", "/path/to/celery", "a", "b", "-Q", "qqq"])
130+
)
131+
# --queues queue
132+
assert f"celery queue: qqq ({PROCESS_CWD})" == get_application_name(
133+
process_with_cmdline(["celery", "a", "b", "--queues", "qqq"])
134+
)
135+
# --queues=queue
136+
assert f"celery queue: qqq ({PROCESS_CWD})" == get_application_name(
137+
process_with_cmdline(["celery", "a", "b", "--queues=qqq"])
138+
)
139+
# multiple queues
140+
assert f"celery queue: qqq,www ({PROCESS_CWD})" == get_application_name(
141+
process_with_cmdline(["celery", "a", "b", "-Q", "qqq,www"])
142+
)
143+
144+
145+
def test_celery_without_app() -> None:
114146
assert get_application_name(process_with_cmdline(["celery", "a", "b"])) is None
115147

116148

0 commit comments

Comments
 (0)