Skip to content

first commit water meter code #2963

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 262 additions & 0 deletions Espresso_Water_Meter/code.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
# SPDX-FileCopyrightText: 2025 John Park for Adafruit Industries
#
# SPDX-License-Identifier: MIT
'''
Espresso Tank Meter
Feather ESP32-S2 with RCWL-1601 Ultrasonic distance sensor
'''

import time
import os
import ssl
import microcontroller
import supervisor
import socketpool
import wifi
import board
import alarm
import neopixel
import adafruit_hcsr04
import adafruit_minimqtt.adafruit_minimqtt as MQTT
from adafruit_io.adafruit_io import IO_MQTT
import adafruit_requests
import adafruit_max1704x

# Initialize the sonar sensor
sonar = adafruit_hcsr04.HCSR04(trigger_pin=board.A0, echo_pin=board.A1)

# Initialize the battery monitor
i2c = board.I2C() # uses board.SCL and board.SDA
battery_monitor = adafruit_max1704x.MAX17048(i2c)

# Define colors (hex values)
WHITE = 0xFFFFFF
BLUE = 0x0000FF
GREEN = 0x00FF00
YELLOW = 0xFFFF00
RED = 0xFF0000
PINK = 0xbb00bb
CYAN = 0x00bbbb
OFF = 0x000000

# Initialize the NeoPixel
pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.25)
# Show yellow on startup
pixel.fill(YELLOW)

# Operating hours (24-hour format with minutes, e.g., "6:35" and "16:00")
OPENING_TIME = "6:00"
CLOSING_TIME = "22:30"
# Normal operation check interval
NORMAL_CHECK_MINUTES = 5
# Sleep duration in seconds during operating hours
SLEEP_DURATION = 60 * NORMAL_CHECK_MINUTES
# Display duration in seconds
DISPLAY_DURATION = 1
# Number of samples to average
NUM_SAMPLES = 5

def parse_time(time_str):
"""Convert time string (HH:MM format) to hours and minutes."""
# pylint: disable=redefined-outer-name
parts = time_str.split(':')
return int(parts[0]), int(parts[1])

def get_average_distance():
"""Take multiple distance readings and return the average."""
distances = []
for _ in range(NUM_SAMPLES):
try:
distance = sonar.distance
distances.append(distance)
time.sleep(0.1) # Short delay between readings
except RuntimeError:
print("Error reading distance")
continue

# Only average valid readings
if distances:
return sum(distances) / len(distances)
return None

def set_pixel_color(distance):
"""Set NeoPixel color based on distance."""
if distance is None:
pixel.fill(OFF)
return

if distance < 2:
pixel.fill(WHITE)
elif 2 <= distance < 10:
pixel.fill(BLUE)
elif 10 <= distance < 16:
pixel.fill(GREEN)
elif 18 <= distance < 20:
pixel.fill(YELLOW)
else: # distance >= 22
pixel.fill(RED)

# Wait for things to settle before reading sonar
time.sleep(0.1)

# Get average distance
avg_distance = get_average_distance()

if avg_distance is not None:

if avg_distance >= 22:
# pylint: disable=invalid-name
avg_distance = 22
print(f"Average distance: {avg_distance:.1f} cm")
# Set color based on average distance
set_pixel_color(avg_distance)

# Check battery status
battery_voltage = battery_monitor.cell_voltage
battery_percent = battery_monitor.cell_percent
print(f"Battery: {battery_percent:.1f}% ({battery_voltage:.2f}V)")

# Try connecting to WiFi
try:

print("Connecting to %s" % os.getenv("CIRCUITPY_WIFI_SSID"))
# Show pink while attempting to connect
pixel.fill(PINK)
wifi.radio.connect(os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv("CIRCUITPY_WIFI_PASSWORD"))
print("Connected to %s" % os.getenv("CIRCUITPY_WIFI_SSID"))
# Show cyan on successful connection
pixel.fill(CYAN)
time.sleep(1) # Brief pause to show the connection success
# pylint: disable=broad-except
except Exception as e:
print("Failed to connect to WiFi. Error:", e, "\nBoard will hard reset in 30 seconds.")
pixel.fill(OFF)
time.sleep(10)
microcontroller.reset()

# Create a socket pool
pool = socketpool.SocketPool(wifi.radio)
requests = adafruit_requests.Session(pool, ssl.create_default_context())

# Initialize a new MQTT Client object
mqtt_client = MQTT.MQTT(
broker="io.adafruit.com",
username=os.getenv("ADAFRUIT_AIO_USERNAME"),
password=os.getenv("ADAFRUIT_AIO_KEY"),
socket_pool=pool,
ssl_context=ssl.create_default_context(),
)

# Initialize Adafruit IO MQTT "helper"
io = IO_MQTT(mqtt_client)

try:
# If Adafruit IO is not connected...
if not io.is_connected:
print("Connecting to Adafruit IO...")
io.connect()

# Get current time from AIO time service
aio_username = os.getenv("ADAFRUIT_AIO_USERNAME")
aio_key = os.getenv("ADAFRUIT_AIO_KEY")
timezone = os.getenv("TIMEZONE")
# pylint: disable=line-too-long
TIME_URL = f"https://io.adafruit.com/api/v2/{aio_username}/integrations/time/strftime?x-aio-key={aio_key}&tz={timezone}"
TIME_URL += "&fmt=%25Y-%25m-%25d+%25H%3A%25M%3A%25S.%25L+%25j+%25u+%25z+%25Z"

print("Getting time from Adafruit IO...")
response = requests.get(TIME_URL)
time_str = response.text.strip() # Remove any leading/trailing whitespace
print("Current time:", time_str)

# Parse the current time from the time string
time_parts = time_str.split()
current_time = time_parts[1].split(':')
current_hour = int(current_time[0])
current_minute = int(current_time[1])

# Get opening and closing hours and minutes
opening_hour, opening_minute = parse_time(OPENING_TIME)
closing_hour, closing_minute = parse_time(CLOSING_TIME)

# Convert all times to minutes for easier comparison
current_minutes = current_hour * 60 + current_minute
opening_minutes = opening_hour * 60 + opening_minute
closing_minutes = closing_hour * 60 + closing_minute

# Check if we're within operating hours
if opening_minutes <= current_minutes < closing_minutes:
print(f"Within operating hours ({OPENING_TIME} to {CLOSING_TIME}), proceeding with measurement")

# Explicitly pump the message loop
io.loop()

# Send the distance data
print(f"Publishing {avg_distance:.1f} to espresso water level feed")
io.publish("espresso-water-tank-level", f"{avg_distance:.1f}")

# Send the battery data
print(f"Publishing {battery_percent:.1f} to battery level feed")
io.publish("espresso-water-sensor-battery", f"{battery_percent:.1f}")


# Make sure the message gets sent
io.loop()

print("Water level sent successfully")

# Keep NeoPixel lit for DISPLAY_DURATION seconds
time.sleep(DISPLAY_DURATION)

# Use normal check interval during operating hours
# # pylint: disable=invalid-name
sleep_seconds = SLEEP_DURATION
print(f"Next check in {NORMAL_CHECK_MINUTES} minutes")
else:
print(f"Outside operating hours ({OPENING_TIME} to {CLOSING_TIME}), going back to sleep")
# Calculate time until next opening
if current_minutes >= closing_minutes:
# After closing, calculate time until opening tomorrow
minutes_until_open = (24 * 60 - current_minutes) + opening_minutes
else:
# Before opening, calculate time until opening today
minutes_until_open = opening_minutes - current_minutes

# Convert minutes to seconds for sleep duration
sleep_seconds = minutes_until_open * 60
hours_until_open = minutes_until_open // 60
minutes_remaining = minutes_until_open % 60
if minutes_remaining:
print(f"Sleeping until {OPENING_TIME} ({hours_until_open} hours, {minutes_remaining} minutes)")
else:
print(f"Sleeping until {OPENING_TIME} ({hours_until_open} hours)")

response.close()

# pylint: disable=broad-except
except Exception as e:
print("Failed to get or send data, or connect. Error:", e,
"\nBoard will hard reset in 30 seconds.")
pixel.fill(OFF)
time.sleep(30)
microcontroller.reset()

else:
print("Failed to get valid distance readings")
pixel.fill(OFF)
# pylint: disable=invalid-name
sleep_seconds = SLEEP_DURATION # Use normal interval if we couldn't get readings

# Prepare for deep sleep
pixel.brightness = 0 # Turn off NeoPixel

# Flush the serial output before sleep
# pylint: disable=pointless-statement
supervisor.runtime.serial_bytes_available
time.sleep(0.05)

# Create time alarm
time_alarm = alarm.time.TimeAlarm(monotonic_time=time.monotonic() + sleep_seconds)

# Enter deep sleep
alarm.exit_and_deep_sleep_until_alarms(time_alarm)
Loading