@@ -524,6 +524,8 @@ def _workflow_event_to_nexus_link(
524
524
_LINK_URL_PATH_REGEX = re .compile (
525
525
r"^/namespaces/(?P<namespace>[^/]+)/workflows/(?P<workflow_id>[^/]+)/(?P<run_id>[^/]+)/history$"
526
526
)
527
+ LINK_EVENT_ID_PARAM_NAME = "eventID"
528
+ LINK_EVENT_TYPE_PARAM_NAME = "eventType"
527
529
528
530
529
531
def _nexus_link_to_workflow_event (
@@ -537,21 +539,10 @@ def _nexus_link_to_workflow_event(
537
539
)
538
540
return None
539
541
try :
540
- query_params = urllib .parse .parse_qs (url .query )
541
- [reference_type ] = query_params .get ("referenceType" , [])
542
- if reference_type != "EventReference" :
543
- raise ValueError (
544
- f"Expected Nexus link URL query parameter referenceType to be EventReference but got: { reference_type } "
545
- )
546
- [event_type_name ] = query_params .get ("eventType" , [])
547
- event_ref = temporalio .api .common .v1 .Link .WorkflowEvent .EventReference (
548
- # TODO(nexus-prerelease): confirm that it is correct not to use event_id.
549
- # Should the proto say explicitly that it's optional or how it behaves when it's missing?
550
- event_type = temporalio .api .enums .v1 .EventType .Value (event_type_name )
551
- )
542
+ event_ref = _event_reference_from_query_params (urllib .parse .parse_qs (url .query ))
552
543
except ValueError as err :
553
544
logger .warning (
554
- f"Failed to parse event type from Nexus link URL query parameters: { link } ({ err } )"
545
+ f"Failed to parse event reference from Nexus link URL query parameters: { link } ({ err } )"
555
546
)
556
547
event_ref = None
557
548
@@ -564,6 +555,55 @@ def _nexus_link_to_workflow_event(
564
555
)
565
556
566
557
558
+ def _event_reference_from_query_params (
559
+ query_params : Mapping [str , list [str ]],
560
+ ) -> temporalio .api .common .v1 .Link .WorkflowEvent .EventReference :
561
+ """
562
+ Return an EventReference from the query params or raise ValueError.
563
+ """
564
+ [reference_type ] = query_params .get ("referenceType" , [])
565
+ if reference_type != "EventReference" :
566
+ raise ValueError (
567
+ f"Expected Nexus link URL query parameter referenceType to be EventReference but got: { reference_type } "
568
+ )
569
+ # event type
570
+ [raw_event_type_name ] = query_params .get (LINK_EVENT_TYPE_PARAM_NAME , [None ])
571
+ if not raw_event_type_name :
572
+ raise ValueError (f"query params do not contain event type: { query_params } " )
573
+ if raw_event_type_name .startswith ("EVENT_TYPE_" ):
574
+ event_type_name = raw_event_type_name
575
+ elif re .match ("[A-Z][a-z]" , raw_event_type_name ):
576
+ event_type_name = "EVENT_TYPE_" + _pascal_case_to_constant_case (
577
+ raw_event_type_name
578
+ )
579
+ else :
580
+ raise ValueError (f"Invalid event type name: { raw_event_type_name } " )
581
+
582
+ # event id
583
+ event_id = 0
584
+ [raw_event_id ] = query_params .get (LINK_EVENT_ID_PARAM_NAME , [None ])
585
+ if raw_event_id :
586
+ try :
587
+ event_id = int (raw_event_id )
588
+ except ValueError :
589
+ raise ValueError (f"Query params contain invalid event id: { raw_event_id } " )
590
+
591
+ return temporalio .api .common .v1 .Link .WorkflowEvent .EventReference (
592
+ event_type = temporalio .api .enums .v1 .EventType .Value (event_type_name ),
593
+ event_id = event_id ,
594
+ )
595
+
596
+
597
+ def _pascal_case_to_constant_case (s : str ) -> str :
598
+ """
599
+ Convert a PascalCase string to CONSTANT_CASE.
600
+
601
+ >>> _pascal_case_to_constant_case("NexusOperationScheduled")
602
+ "NEXUS_OPERATION_SCHEDULED"
603
+ """
604
+ return re .sub (r"([^\b])([A-Z])" , lambda m : "_" .join (m .groups ()), s ).upper ()
605
+
606
+
567
607
class LoggerAdapter (logging .LoggerAdapter ):
568
608
"""Logger adapter that adds Nexus operation context information."""
569
609
0 commit comments