|
| 1 | +# SPDX-FileCopyrightText: 2023 Trevor Beaton for Adafruit Industries |
| 2 | +# |
| 3 | +# SPDX-License-Identifier: MIT |
| 4 | +import os |
| 5 | +import ssl |
| 6 | +import time |
| 7 | +import board |
| 8 | +import wifi |
| 9 | +import terminalio |
| 10 | +import socketpool |
| 11 | +import adafruit_requests |
| 12 | +import displayio |
| 13 | +import rgbmatrix |
| 14 | +import framebufferio |
| 15 | +import adafruit_display_text.label |
| 16 | +from displayio import OnDiskBitmap, TileGrid, Group |
| 17 | + |
| 18 | +# Release any existing displays |
| 19 | +displayio.release_displays() |
| 20 | + |
| 21 | +# --- Matrix Properties --- |
| 22 | +DISPLAY_WIDTH = 128 |
| 23 | +DISPLAY_HEIGHT = 64 |
| 24 | + |
| 25 | +# 432 Minutes - 7.2 Hours |
| 26 | +NETWORK_CALL_INTERVAL = 25920 |
| 27 | + |
| 28 | +# --- Icon Properties --- |
| 29 | +ICON_WIDTH = 26 # Width of the icons |
| 30 | +ICON_HEIGHT = 26 # Height of the icons |
| 31 | +# Calculate the gap between icons |
| 32 | +gap_between_icons = 5 |
| 33 | + |
| 34 | +GAP_BETWEEN_ICONS = 15 # Gap between the icons |
| 35 | +NUMBER_OF_ICONS = 2 # Number of icons to display |
| 36 | +PLACEHOLDER_ICON_PATH = "/airline_logos/placeholder.bmp" |
| 37 | + |
| 38 | +# --- Text Properties --- |
| 39 | +TEXT_START_X = ICON_WIDTH + 4 |
| 40 | +TEXT_RESET_X = 170 |
| 41 | +FONT = terminalio.FONT |
| 42 | +TEXT_COLOR = 0x22FF00 # e.g., Green |
| 43 | + |
| 44 | +# Initialize the main display group |
| 45 | +main_group = Group() |
| 46 | + |
| 47 | +# Initialize the icon group (this remains static on the display) |
| 48 | +static_icon_group = Group() |
| 49 | + |
| 50 | +# Sample Bounding Box |
| 51 | +bounding_box = { |
| 52 | + "min_latitude": 40.633013, # Southernmost latitude |
| 53 | + "max_latitude": 44.953469, # Northernmost latitude |
| 54 | + "min_longitude": -111.045360, # Westernmost longitude |
| 55 | + "max_longitude": -104.046570, # Easternmost longitude |
| 56 | +} |
| 57 | + |
| 58 | +# --- Matrix setup --- |
| 59 | +BIT_DEPTH = 2 |
| 60 | +matrix = rgbmatrix.RGBMatrix( |
| 61 | + width=DISPLAY_WIDTH, |
| 62 | + height=DISPLAY_HEIGHT, |
| 63 | + bit_depth=BIT_DEPTH, |
| 64 | + rgb_pins=[ |
| 65 | + board.MTX_B1, |
| 66 | + board.MTX_G1, |
| 67 | + board.MTX_R1, |
| 68 | + board.MTX_B2, |
| 69 | + board.MTX_G2, |
| 70 | + board.MTX_R2, |
| 71 | + ], |
| 72 | + addr_pins=[ |
| 73 | + board.MTX_ADDRA, |
| 74 | + board.MTX_ADDRB, |
| 75 | + board.MTX_ADDRC, |
| 76 | + board.MTX_ADDRD, |
| 77 | + board.MTX_ADDRE, |
| 78 | + ], |
| 79 | + clock_pin=board.MTX_CLK, |
| 80 | + latch_pin=board.MTX_LAT, |
| 81 | + output_enable_pin=board.MTX_OE, |
| 82 | + tile=1, |
| 83 | + serpentine=True, |
| 84 | + doublebuffer=True, |
| 85 | +) |
| 86 | + |
| 87 | +display = framebufferio.FramebufferDisplay(matrix, auto_refresh=True) |
| 88 | + |
| 89 | +# --- Wi-Fi setup --- |
| 90 | +wifi.radio.connect( |
| 91 | + os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv("CIRCUITPY_WIFI_PASSWORD") |
| 92 | +) |
| 93 | +print(f"Connected to {os.getenv('CIRCUITPY_WIFI_SSID')}") |
| 94 | + |
| 95 | +# --- Networking setup --- |
| 96 | +context = ssl.create_default_context() |
| 97 | + |
| 98 | +with open("/ssl.com-root.pem", "rb") as certfile: |
| 99 | + context.load_verify_locations(cadata=certfile.read()) |
| 100 | + |
| 101 | +pool = socketpool.SocketPool(wifi.radio) |
| 102 | +requests = adafruit_requests.Session(pool, context) |
| 103 | + |
| 104 | +# --- Icon Positioning --- |
| 105 | +total_icons_height = (ICON_HEIGHT * NUMBER_OF_ICONS) + ( |
| 106 | + GAP_BETWEEN_ICONS * (NUMBER_OF_ICONS - 1) |
| 107 | +) |
| 108 | + |
| 109 | +# Function to scroll objects |
| 110 | +def scroll_text_labels(text_labels): |
| 111 | + for label in text_labels: |
| 112 | + label.x -= 1 # Move label left. |
| 113 | + if label.x < -300: # If label has moved off screen. |
| 114 | + label.x = TEXT_RESET_X |
| 115 | + |
| 116 | + |
| 117 | +def construct_query_string(params): |
| 118 | + return "&".join(f"{key}={value}" for key, value in params.items()) |
| 119 | + |
| 120 | +def fetch_flight_data(): |
| 121 | + print("Running fetch_flight_data") |
| 122 | + |
| 123 | + base_url = "https://aeroapi.flightaware.com/aeroapi/flights/search" |
| 124 | + query_prefix = "-latlong+\"" |
| 125 | + query_suffix = ( |
| 126 | + str(bounding_box['min_latitude']) + "+" + |
| 127 | + str(bounding_box['min_longitude']) + "+" + |
| 128 | + str(bounding_box['max_latitude']) + "+" + |
| 129 | + str(bounding_box['max_longitude']) + "\"") |
| 130 | + query = query_prefix + query_suffix |
| 131 | + |
| 132 | + params = { |
| 133 | + "query": query, |
| 134 | + "max_pages": "1",} |
| 135 | + |
| 136 | + |
| 137 | + headers = { |
| 138 | + "Accept": "application/json; charset=UTF-8", |
| 139 | + "x-apikey": os.getenv("AREO_API_KEY"), # Replace with your actual API key |
| 140 | + } |
| 141 | + full_url = f"{base_url}?{construct_query_string(params)}" |
| 142 | + response = requests.get(full_url, headers=headers) |
| 143 | + |
| 144 | + if response.status_code == 200: |
| 145 | + json_response = response.json() # Parse JSON only once |
| 146 | + return process_flight_data(json_response) # Process flights and return |
| 147 | + else: |
| 148 | + print(f"Request failed with status code {response.status_code}") |
| 149 | + if response.content: |
| 150 | + print(f"Response content: {response.content}") |
| 151 | + return [] |
| 152 | + |
| 153 | +def process_flight_data(json_data): |
| 154 | + # Initialize an empty list to hold processed flight data |
| 155 | + processed_flights = [] |
| 156 | + |
| 157 | + for flight in json_data.get("flights", []): |
| 158 | + # Use 'get' with default values to avoid KeyError |
| 159 | + flight_info = { |
| 160 | + "ident": flight.get("ident", "N/A"), |
| 161 | + "ident_icao": flight.get("ident_icao", "N/A"), |
| 162 | + "fa_flight_id": flight.get("fa_flight_id", "N/A"), |
| 163 | + "actual_off": flight.get("actual_off", "N/A"), |
| 164 | + "actual_on": flight.get("actual_on", "N/A"), |
| 165 | + "origin_code": flight.get("origin", {}).get("code", "UnknownA"), |
| 166 | + "origin_city": flight.get("origin", {}).get("city", "UnknownB"), |
| 167 | + "origin_country": flight.get("origin", {}).get("country", "UnknownC"), |
| 168 | + "destination_code": flight.get("destination", {}).get("code", "UnknownD") |
| 169 | + if flight.get("destination") |
| 170 | + else "UnknownE", |
| 171 | + "destination_city": flight.get("destination", {}).get("city", "UnknownH") |
| 172 | + if flight.get("destination") |
| 173 | + else "Unknown Destination", |
| 174 | + "destination_country": flight.get("destination", {}).get( |
| 175 | + "country", "UnknownZ" |
| 176 | + ) |
| 177 | + if flight.get("destination") |
| 178 | + else "UnknownG", |
| 179 | + "altitude": flight.get("last_position", {}).get("altitude", "N/A"), |
| 180 | + "groundspeed": flight.get("last_position", {}).get("groundspeed", "N/A"), |
| 181 | + "heading": flight.get("last_position", {}).get("heading", "N/A"), |
| 182 | + "latitude": flight.get("last_position", {}).get("latitude", "N/A"), |
| 183 | + "longitude": flight.get("last_position", {}).get("longitude", "N/A"), |
| 184 | + "timestamp": flight.get("last_position", {}).get("timestamp", "N/A"), |
| 185 | + "aircraft_type": flight.get("aircraft_type", "N/A"), |
| 186 | + } |
| 187 | + # Only add flight_info if the 'ident_icao' is present and not 'N/A' |
| 188 | + if flight_info["ident_icao"] != "N/A": |
| 189 | + processed_flights.append(flight_info) |
| 190 | + |
| 191 | + return processed_flights |
| 192 | + |
| 193 | + |
| 194 | +def create_text_labels(flight_data, Ypositions): |
| 195 | + local_text_labels = [] |
| 196 | + for i, flight in enumerate(flight_data): |
| 197 | + y_position = Ypositions[i] + GAP_BETWEEN_ICONS |
| 198 | + |
| 199 | + # Since 'country' is not present, use 'origin_city' and 'destination_city' instead |
| 200 | + origin_city = flight.get("origin_city", "Unknown City") |
| 201 | + destination_city = flight.get("destination_city", "Unknown City") |
| 202 | + |
| 203 | + # Construct the display text for each flight |
| 204 | + single_line_text = ( |
| 205 | + f"{flight['ident']} | From: {origin_city} To: {destination_city}" |
| 206 | + ) |
| 207 | + |
| 208 | + text_label = adafruit_display_text.label.Label( |
| 209 | + FONT, color=TEXT_COLOR, x=TEXT_START_X, y=y_position, text=single_line_text |
| 210 | + ) |
| 211 | + local_text_labels.append(text_label) |
| 212 | + return local_text_labels |
| 213 | + |
| 214 | + |
| 215 | + |
| 216 | +def create_icon_tilegrid(ident): |
| 217 | + airline_code = ident[:3].upper() # Use the first three characters of 'ident' |
| 218 | + icon_path = f"/airline_logos/{airline_code}.bmp" |
| 219 | + |
| 220 | + try: |
| 221 | + file = open(icon_path, "rb") |
| 222 | + icon_bitmap = OnDiskBitmap(file) |
| 223 | + except OSError: |
| 224 | + print(f"Icon for {airline_code} not found. Using placeholder.") |
| 225 | + file = open(PLACEHOLDER_ICON_PATH, "rb") |
| 226 | + icon_bitmap = OnDiskBitmap(file) |
| 227 | + |
| 228 | + icon_tilegrid = TileGrid(icon_bitmap, pixel_shader=icon_bitmap.pixel_shader, x=0, y=0) |
| 229 | + return icon_tilegrid |
| 230 | + |
| 231 | + |
| 232 | +def update_display_with_flight_data(flight_data, icon_group, display_group): |
| 233 | + # Clear previous display items |
| 234 | + while len(display_group): |
| 235 | + display_group.pop() |
| 236 | + |
| 237 | + # Clear previous icon items |
| 238 | + while len(icon_group): |
| 239 | + icon_group.pop() |
| 240 | + |
| 241 | + # Limit flight data to the adjusted number of icons |
| 242 | + flight_data = flight_data[:NUMBER_OF_ICONS] |
| 243 | + |
| 244 | + # Calculate the y position for each icon |
| 245 | + y_positions = [ |
| 246 | + gap_between_icons + (ICON_HEIGHT + gap_between_icons) * i |
| 247 | + for i in range(NUMBER_OF_ICONS) |
| 248 | + ] |
| 249 | + |
| 250 | + # Create text labels for up to NUMBER_OF_ICONS flights |
| 251 | + text_labels = create_text_labels(flight_data, y_positions) |
| 252 | + |
| 253 | + # Add text labels to the display group first so they are behind icons |
| 254 | + for label in text_labels: |
| 255 | + display_group.append(label) |
| 256 | + |
| 257 | + # Load icons and create icon tilegrids for up to NUMBER_OF_ICONS flights |
| 258 | + for i, flight in enumerate(flight_data): |
| 259 | + # Calculate the y position for each icon |
| 260 | + y_position = y_positions[i] |
| 261 | + |
| 262 | + # Load the icon dynamically |
| 263 | + icon_tilegrid = create_icon_tilegrid(flight["ident"]) |
| 264 | + if icon_tilegrid: |
| 265 | + icon_tilegrid.y = y_position |
| 266 | + icon_group.append(icon_tilegrid) |
| 267 | + |
| 268 | + # Add the icon group to the main display group after text labels |
| 269 | + display_group.append(icon_group) |
| 270 | + |
| 271 | + # Show the updated group on the display |
| 272 | + display.show(display_group) |
| 273 | + display.refresh() |
| 274 | + return text_labels |
| 275 | + |
| 276 | + |
| 277 | + |
| 278 | +def display_no_flights(): |
| 279 | + # Clear the previous group content |
| 280 | + while len(main_group): |
| 281 | + main_group.pop() |
| 282 | + |
| 283 | + # Create a label for "Looking for flights..." |
| 284 | + looking_label = adafruit_display_text.label.Label( |
| 285 | + FONT, color=TEXT_COLOR, text="LOOKING FOR FLIGHTS", x=8, y=DISPLAY_HEIGHT // 2 |
| 286 | + ) |
| 287 | + main_group.append(looking_label) |
| 288 | + |
| 289 | + # Update the display with the new group |
| 290 | + display.show(main_group) |
| 291 | + display.refresh() |
| 292 | + |
| 293 | + |
| 294 | +display_no_flights() |
| 295 | + |
| 296 | +flight_json_response = fetch_flight_data() |
| 297 | + |
| 298 | +# Check if we received any flight data |
| 299 | +if flight_json_response: |
| 300 | + flight_data_labels = update_display_with_flight_data( |
| 301 | + flight_json_response, static_icon_group, main_group |
| 302 | + ) |
| 303 | + |
| 304 | +last_network_call_time = time.monotonic() |
| 305 | + |
| 306 | + |
| 307 | +while True: |
| 308 | + # Scroll the text labels |
| 309 | + scroll_text_labels(flight_data_labels) |
| 310 | + # Refresh the display |
| 311 | + display.refresh(minimum_frames_per_second=0) |
| 312 | + current_time = time.monotonic() |
| 313 | + |
| 314 | + # Check if NETWORK_CALL_INTERVAL seconds have passed |
| 315 | + if (current_time - last_network_call_time) >= NETWORK_CALL_INTERVAL: |
| 316 | + print("Fetching new flight data...") |
| 317 | + new_flight_data = fetch_flight_data() |
| 318 | + |
| 319 | + if new_flight_data: |
| 320 | + # If flight data is found, update the display with it |
| 321 | + new_text_labels = update_display_with_flight_data( |
| 322 | + new_flight_data, static_icon_group, main_group |
| 323 | + ) |
| 324 | + else: |
| 325 | + # If no flight data is found, display the "Looking for flights..." message |
| 326 | + display_no_flights() |
| 327 | + |
| 328 | + # Reset the last network call time |
| 329 | + last_network_call_time = current_time |
| 330 | + |
| 331 | + # Sleep for a short period to prevent maxing out your CPU |
| 332 | + time.sleep(1) # Sleep for 1 second |
0 commit comments