[Feature Request] Tasmota with SML Support should be able to emulate a "shelly metering" #23565
Replies: 2 comments 3 replies
-
+1 |
Beta Was this translation helpful? Give feedback.
-
Here an Claude AI generated Berry script, absolutly untested nor verified for correctness of any kind! # Shelly Energy Meter Emulator for Tasmota
# Emulates Shelly EM/EM1 devices via UDP protocol
class ShellyEmulator
var udp_port
var device_id
var powermeters
var client_filters
var udp_server
var running
def init(udp_port, device_id)
self.udp_port = udp_port
self.device_id = device_id
self.powermeters = []
self.client_filters = []
self.running = false
# Initialize UDP server
self.udp_server = udp()
print(f"Shelly Emulator initialized on port {self.udp_port}")
end
def add_powermeter(powermeter_func, client_filter)
# Add powermeter function and corresponding client filter
self.powermeters.push(powermeter_func)
self.client_filters.push(client_filter)
end
def _calculate_derived_values(power)
var decimal_point_enforcer = 0.001
if abs(power) < 0.1
return decimal_point_enforcer
end
var rounded = real(int(power * 10)) / 10.0
if power == int(power) || power == 0
return rounded + decimal_point_enforcer
end
return rounded
end
def _create_em_response(request_id, powers)
var power_count = size(powers)
# Ensure we have exactly 3 power values
if power_count == 1
powers = [powers[0], 0, 0]
elif power_count != 3
powers = [0, 0, 0]
end
var a = self._calculate_derived_values(powers[0])
var b = self._calculate_derived_values(powers[1])
var c = self._calculate_derived_values(powers[2])
var total_act_power = powers[0] + powers[1] + powers[2]
total_act_power = real(int(total_act_power * 1000)) / 1000.0 # Round to 3 decimals
if total_act_power == int(total_act_power) || total_act_power == 0
total_act_power += 0.001
end
return {
"id": request_id,
"src": self.device_id,
"dst": "unknown",
"result": {
"a_act_power": a,
"b_act_power": b,
"c_act_power": c,
"total_act_power": total_act_power
}
}
end
def _create_em1_response(request_id, powers)
var total_power = 0
for power : powers
total_power += power
end
total_power = real(int(total_power * 1000)) / 1000.0 # Round to 3 decimals
if total_power == int(total_power) || total_power == 0
total_power += 0.001
end
return {
"id": request_id,
"src": self.device_id,
"dst": "unknown",
"result": {
"act_power": total_power
}
}
end
def _find_matching_powermeter(client_ip)
# Find powermeter for matching client filter
for i : 0..size(self.client_filters)-1
var filter = self.client_filters[i]
if self._client_matches(client_ip, filter)
return self.powermeters[i]
end
end
return nil
end
def _client_matches(client_ip, filter)
# Simple IP matching - can be extended for more complex filters
# For now, match all clients (return true)
# In real implementation, you would check IP ranges, subnets, etc.
return true
end
def _handle_request(data, remote_ip, remote_port)
try
var request_str = data.asstring()
print(f"Received UDP request from {remote_ip}:{remote_port}: {request_str}")
var request = json.load(request_str)
if !request
print("Invalid JSON received")
return
end
# Check if request has valid structure
var method = request.find("method")
var request_id = request.find("id")
var params = request.find("params", {})
if !method || request_id == nil
print("Invalid request structure")
return
end
# Find matching powermeter
var powermeter_func = self._find_matching_powermeter(remote_ip)
if !powermeter_func
print(f"No powermeter found for client {remote_ip}")
return
end
# Get power values from powermeter function
var powers = powermeter_func()
if !powers
powers = [0]
end
var response = nil
if method == "EM.GetStatus"
response = self._create_em_response(request_id, powers)
elif method == "EM1.GetStatus"
response = self._create_em1_response(request_id, powers)
else
print(f"Unknown method: {method}")
return
end
# Send response
var response_json = json.dump(response)
print(f"Sending response: {response_json}")
self.udp_server.send(remote_ip, remote_port, bytes().fromstring(response_json))
except .. as e
print(f"Error processing request: {e}")
end
end
def _udp_server_loop()
while self.running
# Check for incoming UDP packets
var packet = self.udp_server.read()
if packet
var remote_ip = self.udp_server.remote_ip
var remote_port = self.udp_server.remote_port
# Handle request in next tick to avoid blocking
tasmota.set_timer(0, / -> self._handle_request(packet, remote_ip, remote_port))
end
tasmota.yield() # Give time to other tasks
end
end
def start()
if self.running
print("Shelly emulator already running")
return
end
# Start UDP server
if !self.udp_server.begin("", self.udp_port)
print(f"Failed to start UDP server on port {self.udp_port}")
return false
end
self.running = true
print(f"Shelly emulator started on UDP port {self.udp_port}")
# Start server loop using fast_loop for responsive UDP handling
tasmota.add_fast_loop(/ -> self._udp_server_loop())
return true
end
def stop()
if !self.running
return
end
self.running = false
# Remove from fast loop
tasmota.remove_fast_loop(/ -> self._udp_server_loop())
# Close UDP server
self.udp_server.close()
print("Shelly emulator stopped")
end
end
# Example usage and configuration
class PowermeterExample
var power_values
def init()
self.power_values = [0, 0, 0] # Three-phase power values
end
def get_power_watts()
# Get actual power values from Tasmota energy monitoring
if energy != nil
# Use Tasmota's energy monitoring if available
return [
energy.active_power,
energy.active_power_phases != nil ? energy.active_power_phases[1] : 0,
energy.active_power_phases != nil ? energy.active_power_phases[2] : 0
]
else
# Return simulated values for testing
import math
var time_factor = tasmota.millis() / 10000.0
return [
100 + 50 * math.sin(time_factor),
75 + 25 * math.cos(time_factor),
120 + 30 * math.sin(time_factor + 1)
]
end
end
def set_power_values(powers)
self.power_values = powers
end
end
# Global emulator instance
var shelly_emulator = nil
# Command to start the emulator
def start_shelly_emulator(cmd, idx, payload, payload_json)
var port = payload != "" ? int(payload) : 5683
var device_id = f"shellyem-{tasmota.wifi()['mac'].tr(':', '')}"
if shelly_emulator != nil
shelly_emulator.stop()
end
shelly_emulator = ShellyEmulator(port, device_id)
# Add powermeter with example client filter
var powermeter = PowermeterExample()
shelly_emulator.add_powermeter(/ -> powermeter.get_power_watts(), "all")
if shelly_emulator.start()
tasmota.resp_cmnd_str(f"Shelly emulator started on port {port}")
else
tasmota.resp_cmnd_error()
end
end
# Command to stop the emulator
def stop_shelly_emulator(cmd, idx, payload, payload_json)
if shelly_emulator != nil
shelly_emulator.stop()
shelly_emulator = nil
tasmota.resp_cmnd_done()
else
tasmota.resp_cmnd_str("Emulator not running")
end
end
# Command to get emulator status
def shelly_emulator_status(cmd, idx, payload, payload_json)
if shelly_emulator != nil && shelly_emulator.running
var status = {
"running": true,
"port": shelly_emulator.udp_port,
"device_id": shelly_emulator.device_id,
"powermeters": size(shelly_emulator.powermeters)
}
tasmota.resp_cmnd(json.dump(status))
else
tasmota.resp_cmnd_str("Not running")
end
end
# Register custom commands
tasmota.add_cmd("ShellyStart", start_shelly_emulator)
tasmota.add_cmd("ShellyStop", stop_shelly_emulator)
tasmota.add_cmd("ShellyStatus", shelly_emulator_status)
print("Shelly emulator loaded. Use 'ShellyStart [port]' to start.") Tasmota Berry Script - Shelly Energy Meter EmulatorUsage Instructions:
Key Differences from Python Original:
Functional Equivalence:
Features:The script provides the same functionality as the Python original but is optimally adapted for the Tasmota environment and utilizes the available Berry APIs and Tasmota functions. Command Reference:
Supported Protocols:
Power Value Sources:
The emulator automatically detects available energy monitoring hardware and adapts accordingly. Installation Notes:
Troubleshooting:
|
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Most Devices that require a Power Meter reading as an input support their own hardware and usually alternatively any shelly metering info on udp ports 1010 and/or 2220.
Multiple Power Stations or EV chargers with a Solar charging mode support this "shelly-protocol".
When using an ir-reader on the powermeter additional Hardware should not be required.
This could maximize the solar usage of many homes.
tomquist has a python implementation in b2500-meter, but i can not find one to add to the tasmota routing.
Beta Was this translation helpful? Give feedback.
All reactions