Skip to content

Rework the lighthouse geometry estimator #543

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 36 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
a1bf030
Corrected types
krichardsson May 14, 2025
ddad27f
Improve de-flipper to use all base stations
krichardsson May 14, 2025
6ff42e6
Added IPPE solutions to LhCfPoseSample for reuse and speed up
krichardsson May 14, 2025
f0e6a53
Refactor sample matcher
krichardsson May 14, 2025
6534344
Removed unused parameters
krichardsson May 14, 2025
ca419c5
Moved ippe estimation to LhCfPoseSample
krichardsson May 14, 2025
5276b2a
Added input data container
krichardsson May 15, 2025
7504863
Make sure mandatory samples are not discarded
krichardsson May 15, 2025
5d95092
Moved scaling into estimation manager
krichardsson May 16, 2025
f1ec5cc
Added functions for live sample matching
krichardsson May 16, 2025
24de4a5
Updated tests
krichardsson Jun 5, 2025
7853d2b
Corrected test
krichardsson Jun 5, 2025
3d7eb8e
Styling
krichardsson Jun 5, 2025
6fe04c3
styling
krichardsson Jun 5, 2025
1e81175
basic continuous geo estimation
krichardsson Jun 12, 2025
a524535
Unified solution data and added human readable information
krichardsson Jun 13, 2025
2b813e1
Fix problem in sweep angle reader
krichardsson Jun 14, 2025
da21262
Added base station link map
krichardsson Jun 16, 2025
9094f92
Added unit test
krichardsson Jun 16, 2025
eb6c623
Added matched lighthouse sample stream
krichardsson Jun 17, 2025
088c691
Added user action detector
krichardsson Jun 18, 2025
6cb0c27
Make thread daemon
krichardsson Jun 23, 2025
08ba567
Added missing sample from the origin
krichardsson Jun 23, 2025
482542c
Restored save/load functionality
krichardsson Jun 23, 2025
b28584b
Join optional in solver thread
krichardsson Jun 24, 2025
711dea2
Try to solve when the solver thread is started
krichardsson Jun 25, 2025
1cedb1d
Added sample count methods
krichardsson Jun 27, 2025
7c084ab
Corrected callback
krichardsson Jun 27, 2025
a03c120
Build link stats as early as possible
krichardsson Jun 27, 2025
97a012d
Added script for uploading geometries
krichardsson Jul 1, 2025
17e800d
Added user notification platform service
krichardsson Jul 2, 2025
ab5cfcc
Added timeout to sweep angle reader
krichardsson Jul 2, 2025
bd6576f
Touch up of multi bs estimation script
krichardsson Jul 3, 2025
6d2b1ef
Use crossing beam to calculate error of solution
krichardsson Jul 3, 2025
9688279
Added read/write of geo raw data as yaml
krichardsson Jul 4, 2025
09fbc48
Added session management
krichardsson Jul 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions cflib/crazyflie/localization.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class Localization():
EXT_POSE_PACKED = 9
LH_ANGLE_STREAM = 10
LH_PERSIST_DATA = 11
LH_MATCHED_ANGLE_STREAM = 12

def __init__(self, crazyflie=None):
"""
Expand Down Expand Up @@ -105,6 +106,8 @@ def _incoming(self, packet):
decoded_data = bool(data[0])
elif pk_type == self.LH_ANGLE_STREAM:
decoded_data = self._decode_lh_angle(data)
elif pk_type == self.LH_MATCHED_ANGLE_STREAM:
decoded_data = self._decode_matched_lh_angle(data)

pk = LocalizationPacket(pk_type, data, decoded_data)
self.receivedLocationPacket.call(pk)
Expand All @@ -128,6 +131,27 @@ def _decode_lh_angle(self, data):

return decoded_data

def _decode_matched_lh_angle(self, data):
decoded_data = {}

raw_data = struct.unpack('<BfhhhfhhhB', data)

decoded_data['basestation'] = raw_data[0]
decoded_data['x'] = [0, 0, 0, 0]
decoded_data['x'][0] = raw_data[1]
decoded_data['x'][1] = raw_data[1] - fp16_to_float(raw_data[2])
decoded_data['x'][2] = raw_data[1] - fp16_to_float(raw_data[3])
decoded_data['x'][3] = raw_data[1] - fp16_to_float(raw_data[4])
decoded_data['y'] = [0, 0, 0, 0]
decoded_data['y'][0] = raw_data[5]
decoded_data['y'][1] = raw_data[5] - fp16_to_float(raw_data[6])
decoded_data['y'][2] = raw_data[5] - fp16_to_float(raw_data[7])
decoded_data['y'][3] = raw_data[5] - fp16_to_float(raw_data[8])
decoded_data['group_id'] = raw_data[9] >> 4
decoded_data['bs_count'] = raw_data[9] & 0x0F

return decoded_data

def send_extpos(self, pos):
"""
Send the current Crazyflie X, Y, Z position. This is going to be
Expand Down
12 changes: 12 additions & 0 deletions cflib/crazyflie/platformservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
PLATFORM_SET_CONT_WAVE = 0
PLATFORM_REQUEST_ARMING = 1
PLATFORM_REQUEST_CRASH_RECOVERY = 2
PLATFORM_REQUEST_USER_NOTIFICATION = 3

VERSION_GET_PROTOCOL = 0
VERSION_GET_FIRMWARE = 1
Expand Down Expand Up @@ -110,6 +111,17 @@ def send_crash_recovery_request(self):
pk.data = (PLATFORM_REQUEST_CRASH_RECOVERY, )
self._cf.send_packet(pk)

def send_user_notification(self, success: bool = True):
"""
Send a user notification to the Crazyflie. This is used to notify a user of some sort of event by using the
means available on the Crazyflie.
"""
pk = CRTPPacket()
pk.set_header(CRTPPort.PLATFORM, PLATFORM_COMMAND)
notification_type = 1 if success else 0
pk.data = (PLATFORM_REQUEST_USER_NOTIFICATION, notification_type)
self._cf.send_packet(pk)

def get_protocol_version(self):
"""
Return version of the CRTP protocol
Expand Down
6 changes: 5 additions & 1 deletion cflib/localization/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,19 @@
from .lighthouse_bs_vector import LighthouseBsVector
from .lighthouse_config_manager import LighthouseConfigFileManager
from .lighthouse_config_manager import LighthouseConfigWriter
from .lighthouse_sweep_angle_reader import LighthouseMatchedSweepAngleReader
from .lighthouse_sweep_angle_reader import LighthouseSweepAngleAverageReader
from .lighthouse_sweep_angle_reader import LighthouseSweepAngleReader
from .lighthouse_utils import LighthouseCrossingBeam
from .param_io import ParamFileManager

__all__ = [
'LighthouseBsGeoEstimator',
'LighthouseBsVector',
'LighthouseSweepAngleAverageReader',
'LighthouseSweepAngleReader',
'LighthouseMatchedSweepAngleReader',
'LighthouseConfigFileManager',
'LighthouseConfigWriter',
'ParamFileManager']
'ParamFileManager',
'LighthouseCrossingBeam']
2 changes: 1 addition & 1 deletion cflib/localization/ippe_cf.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def solve(U_cf: npt.ArrayLike, Q_cf: npt.ArrayLike) -> list[Solution]:

First param: Y (positive to the left)
Second param: Z (positive up)
:return: A list that contains 2 sets of pose solution from IPPE including rotation matrix
:return: A list that contains 2 sets of pose solutions from IPPE including rotation matrix
translation matrix, and reprojection error. The first solution in the list has
the smallest reprojection error.
"""
Expand Down
46 changes: 44 additions & 2 deletions cflib/localization/lighthouse_bs_vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import numpy as np
import numpy.typing as npt
import yaml


class LighthouseBsVector:
Expand Down Expand Up @@ -137,14 +138,40 @@ def projection(self) -> npt.NDArray[np.float32]:
def _q(self):
return math.tan(self._lh_v1_vert_angle) / math.sqrt(1 + math.tan(self._lh_v1_horiz_angle) ** 2)

def __eq__(self, other):
if not isinstance(other, LighthouseBsVector):
return NotImplemented

return (self._lh_v1_horiz_angle == other._lh_v1_horiz_angle and
self._lh_v1_vert_angle == other._lh_v1_vert_angle)

@staticmethod
def yaml_representer(dumper, data: 'LighthouseBsVector'):
return dumper.represent_mapping('!LighthouseBsVector', {
'lh_v1_angles': [data.lh_v1_horiz_angle, data.lh_v1_vert_angle],
})

@staticmethod
def yaml_constructor(loader, node):
values = loader.construct_mapping(node, deep=True)
lh_v1_angles = values.get('lh_v1_angles', [0.0, 0.0])
if len(lh_v1_angles) != 2:
raise ValueError('lh_v1_angles must be a list of two angles')
lh_v1_horiz_angle, lh_v1_vert_angle = lh_v1_angles
return LighthouseBsVector(lh_v1_horiz_angle, lh_v1_vert_angle)


yaml.add_representer(LighthouseBsVector, LighthouseBsVector.yaml_representer)
yaml.add_constructor('!LighthouseBsVector', LighthouseBsVector.yaml_constructor)


class LighthouseBsVectors(list):
"""A list of 4 LighthouseBsVector, one for each sensor.
LighthouseBsVectors is essentially the same as list[LighthouseBsVector]"""

def projection_pair_list(self) -> npt.NDArray:
"""
Genereate a list of projection pairs for all vectors
Generate a list of projection pairs for all vectors
"""
result = np.empty((len(self), 2), dtype=float)
for i, vector in enumerate(self):
Expand All @@ -154,11 +181,26 @@ def projection_pair_list(self) -> npt.NDArray:

def angle_list(self) -> npt.NDArray:
"""
Genereate a list of angles for all vectors, the order is horizontal, vertical, horizontal, vertical...
Generate a list of angles for all vectors, the order is horizontal, vertical, horizontal, vertical...
"""
result = np.empty((len(self) * 2), dtype=float)
for i, vector in enumerate(self):
result[i * 2] = vector.lh_v1_horiz_angle
result[i * 2 + 1] = vector.lh_v1_vert_angle

return result

@staticmethod
def yaml_representer(dumper, data: 'LighthouseBsVectors'):
# Instead of using a sequence of LighthouseBsVector, we represent it as a sequence of lists to make it more
# compact
return dumper.represent_sequence('!LighthouseBsVectors', [list(vector.lh_v1_angle_pair) for vector in data])

@staticmethod
def yaml_constructor(loader, node):
values = loader.construct_sequence(node, deep=True)
return LighthouseBsVectors([LighthouseBsVector(pair[0], pair[1]) for pair in values])


yaml.add_representer(LighthouseBsVectors, LighthouseBsVectors.yaml_representer)
yaml.add_constructor('!LighthouseBsVectors', LighthouseBsVectors.yaml_constructor)
109 changes: 109 additions & 0 deletions cflib/localization/lighthouse_cf_pose_sample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from typing import NamedTuple

import numpy as np
import numpy.typing as npt
import yaml

from .ippe_cf import IppeCf
from cflib.localization.lighthouse_bs_vector import LighthouseBsVectors
from cflib.localization.lighthouse_types import Pose

ArrayFloat = npt.NDArray[np.float_]


class BsPairPoses(NamedTuple):
"""A type representing the poses of a pair of base stations"""
bs1: Pose
bs2: Pose


class LhCfPoseSample:
""" Represents a sample of a Crazyflie pose in space, it contains:
- a timestamp (if applicable)
- lighthouse angles from one or more base stations
- The the two solutions found by IPPE for each base station, in the cf ref frame.

The ippe solution is somewhat heavy and is only created on demand by calling augment_with_ippe()
"""

def __init__(self, angles_calibrated: dict[int, LighthouseBsVectors], timestamp: float = 0.0,
is_mandatory: bool = False) -> None:
self.timestamp: float = timestamp

# Angles measured by the Crazyflie and compensated using calibration data
# Stored in a dictionary using base station id as the key
self.angles_calibrated: dict[int, LighthouseBsVectors] = angles_calibrated

# A dictionary from base station id to BsPairPoses, The poses represents the two possible poses of the base
# stations found by IPPE, in the crazyflie reference frame.
self.ippe_solutions: dict[int, BsPairPoses] = {}
self.is_augmented = False

# Some samples are mandatory and must not be removed, even if they appear to be outliers. For instance the
# the samples that define the origin or x-axis
self.is_mandatory = is_mandatory

def augment_with_ippe(self, sensor_positions: ArrayFloat) -> None:
if not self.is_augmented:
self.ippe_solutions = self._find_ippe_solutions(self.angles_calibrated, sensor_positions)
self.is_augmented = True

def is_empty(self) -> bool:
"""Checks if no angles are set

Returns:
bool: True if no angles are set
"""
return len(self.angles_calibrated) == 0

def _find_ippe_solutions(self, angles_calibrated: dict[int, LighthouseBsVectors],
sensor_positions: ArrayFloat) -> dict[int, BsPairPoses]:

solutions: dict[int, BsPairPoses] = {}
for bs, angles in angles_calibrated.items():
projections = angles.projection_pair_list()
estimates_ref_bs = IppeCf.solve(sensor_positions, projections)
estimates_ref_cf = self._convert_estimates_to_cf_reference_frame(estimates_ref_bs)
solutions[bs] = estimates_ref_cf

return solutions

def _convert_estimates_to_cf_reference_frame(self, estimates_ref_bs: list[IppeCf.Solution]) -> BsPairPoses:
"""
Convert the two ippe solutions from the base station reference frame to the CF reference frame
"""
rot_1 = estimates_ref_bs[0].R.transpose()
t_1 = np.dot(rot_1, -estimates_ref_bs[0].t)

rot_2 = estimates_ref_bs[1].R.transpose()
t_2 = np.dot(rot_2, -estimates_ref_bs[1].t)

return BsPairPoses(Pose(rot_1, t_1), Pose(rot_2, t_2))

def __eq__(self, other):
if not isinstance(other, LhCfPoseSample):
return NotImplemented

return (self.timestamp == other.timestamp and
self.angles_calibrated == other.angles_calibrated and
self.is_mandatory == other.is_mandatory)

@staticmethod
def yaml_representer(dumper, data: 'LhCfPoseSample'):
return dumper.represent_mapping('!LhCfPoseSample', {
'timestamp': data.timestamp,
'angles_calibrated': data.angles_calibrated,
'is_mandatory': data.is_mandatory
})

@staticmethod
def yaml_constructor(loader, node):
values = loader.construct_mapping(node, deep=True)
timestamp = values.get('timestamp', 0.0)
angles_calibrated = values.get('angles_calibrated', {})
is_mandatory = values.get('is_mandatory', False)
return LhCfPoseSample(angles_calibrated, timestamp, is_mandatory)


yaml.add_representer(LhCfPoseSample, LhCfPoseSample.yaml_representer)
yaml.add_constructor('!LhCfPoseSample', LhCfPoseSample.yaml_constructor)
Loading
Loading