-
Notifications
You must be signed in to change notification settings - Fork 21
Description
This ticket is to support dynamic packet definitions that change their definitions per-packet based on the header. This came out of conversations with @tloubrieu-jpl and @nischayn99 about a particular packet they have on Europa Clipper which changes its definition based on a header field (which isn't the secondary header field, even).
I came up with two ways we can support this. I'm writing them here so others can discuss and weigh in.
Packet Factory
We can have a PacketFactory, with its own load()
method, which allows you provide a function reference that returns a packet definition based on the primary header. For each packet, the primary header would be parsed, and then the provided function would be called, and the packet would be re-parsed with the definition provided by the function. An example of what this would look like is as follows.
def packet_factory_fun(headers):
if headers[xxx] == yyy:
return VariableLength([
....
])
elif headers[xxx] == zzz:
return VariableLength([
.... # different than before
])
else:
return VariableLength([
.... # even more different
])
pkt_fac = PacketFactory(packet_factory_fun)
result = pkt_fac.load('telemetry.bin')
If you wanted to parse more than the primary header and use that to decide on the rest of the packet, you could pass in an optional argument to provide that "decision" definition.
def packet_factory_fun(initial_results):
# make decision based on result of parsing with initial_defs
...
initial_defs= VariableLength([
# initial fields to parse
...
])
pkt_fac = PacketFactory(packet_factory_fun, initial_defs=initial_defs)
result = pkt_fac.load('telemetry.bin')
Define fields conditionally based on Expression
I don't think this is the best option, but I think it's a bit complex but I will mention it here. We could add a a keyword argument condition=
that you could set to some very simply expression that would be evaluated for each packet to determine if the field is included. For example, if the field is only included if the secondary header field is none, you could add condition="CCSDS_SECONDARY_FLAG==1"
.
I don't think this is the best because adding the ability to parse these expressions is going to add a lot of complexity. We could also just replace the string expression with a callable (probably a lamba) and do something like condition=(lambda headers: headers['CCSDS_SECONDARY_FLAG'] == 1)
, but I also think that doing that multiple times might be messier than using a factory.
pkt = ccsdspy.FixedLength([
PacketField(name='SHCOARSE', data_type='uint', bit_length=32, condition="CCSDS_SECONDARY_FLAG==1"),
PacketField(name='SHFINE', data_type='uint', bit_length=20, condition="CCSDS_SECONDARY_FLAG==1"),
PacketField(name='OPMODE', data_type='uint', bit_length=3),
PacketField(name='SPACER', data_type='fill', bit_length=1),
PacketField(name='VOLTAGE', data_type='int', bit_length=8),
PacketArray(
name='SENSOR_GRID',
data_type='uint',
bit_length=16,
array_shape=(32, 32),
array_order='C'
),
])