Controlling Tuya Smart Plugs Locally with MicroPython on ESP32
No cloud. No dependencies. Pure local LAN control.
I spent a few days reverse-engineering the Tuya local LAN protocol to get my AUBESS Smart Socket 20A plugs talking to an ESP32 running MicroPython — without relying on the cloud, without any third-party MicroPython libraries. This post documents everything I found, including the quirks that cost me the most time.
Tuya's cloud API is rate-limited, requires internet, and adds ~500ms latency. For home automation on a microcontroller you want local LAN control: direct TCP on port 6668, sub-10ms response and solution that works offline.
There are Python libraries for this on desktop (tinytuya), but nothing clean for MicroPython. This post fills that gap.
I assume, you already have Tuya app installed on your phone :) Its the only way to get device's Local Key- developer account is needed at Tuya platform. Free trial account is totally suitable, no need to pay.
You need the Device ID (20 chars) and Local Key (16 chars) from Tuya's developer portal.
On your PC:
pip install tinytuya python -m tinytuya wizard
This walks you through linking your Tuya developer account and dumps all device credentials to devices.json and other JSON files. Dont forget to check those.
⚠️ The Local Key changes every time you remove and re-add a device in the Smart Life app. Don't do that after grabbing the key.
Tuya v3.3 uses plain TCP on port 6668 with AES-128-ECB encrypted JSON payloads. Each frame looks like:
[HEADER 4B][SEQUENCE 4B][COMMAND 4B][LENGHt 4B][AES'ed PAYLOAD][FOOTER 4B]
00 00 55 AA00 00 00 0100 00 00 0a00 00 00 7800 00 AA 55Lenght is measured PAYLOAD + FOOTER
All my AUBESS plugs are query-style - after connecting to socket Plug waits. You must send a status query packet first.
Standard Tuya v3.3 documentation says the encrypted payload should be prefixed with 3.3\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00 (15 bytes). This makes packets 151 bytes. It is required when sending set_switch command
After capturing tinytuya traffic I discovered my plugs want no prefix for the status query — 136-byte packets only. The prefix is only used for control packets (turning on/off).
This single detail isn't documented anywhere I could find.
| Packet type | Version prefix | Packet size |
Status query (cmd 0x0a) | ❌ No prefix | 136 bytes |
Heart-beat (cmd 0x09) | ❌ No prefix | 136 bytes |
Control (cmd 0x07) | ✅ With prefix | 151 bytes |
MicroPython's utime.time() epoch starts at 2000-01-01, not 1970-01-01 like Unix.
Tuya validates the t field in the payload and rejects queries where the timestamp is too far from real Unix time.
Fix this by syncing NTP on boot and adding the epoch offset:
import network, utime, ntptime sta = network.WLAN(network.STA_IF) sta.active(True) sta.connect('YOUR_SSID', 'YOUR_PASSWORD') while not sta.isconnected(): utime.sleep_ms(300) ntptime.settime() # sync to real UTC
# In your main script EPOCH_OFFSET = 946684800 # seconds between 1970 and 2000 def unix_now(): return utime.time() + EPOCH_OFFSET
The plug returns a dps dictionary. For AUBESS Smart Socket 20A:
| Key | Unit | Description |
| 1 | bool | Switch state |
| 9 | — | Countdown timer |
| 18 | mA | Current |
| 19 | W × 10 | Power (divide by 10) |
| 20 | V × 10 | Voltage (divide by 10) |
| 21 | — | Power-on mode |
| 22–24 | — | Calibration values |
| 25 | mA | Leakage current threshold |
Mapping could be found in files, generated by tinytuya wizard
data format error in decrypted response — key is correct but payload format is wrong. Check JSON structure.This monkey-patch script helped a lot. Using it i was able to prepare Microptyhon library for Tuya devices
# Run this on PC: python capture.py import tinytuya import socket import threading import time # Pinguin DEVICE_IP = '192.168.1.55' DEVICE_ID = '1<hidden>9' LOCAL_KEY = '5<hidden>b' # Monkey-patch socket to capture raw bytes _orig_connect = socket.socket.connect _orig_send = socket.socket.send _orig_recv = socket.socket.recv _orig_sendall = socket.socket.sendall def patched_connect(self, addr): print('[SOCKET] connect to', addr) return _orig_connect(self, addr) def patched_send(self, data, *args): print('[SOCKET] send %d bytes:' % len(data), ' '.join('%02x' % b for b in data)) return _orig_send(self, data, *args) def patched_recv(self, size, *args): data = _orig_recv(self, size, *args) print('[SOCKET] recv %d bytes:' % len(data), ' '.join('%02x' % b for b in data)) return data def patched_sendall(self, data, *args): print('[SOCKET] sendall %d bytes:' % len(data), ' '.join('%02x' % b for b in data)) return _orig_sendall(self, data, *args) socket.socket.connect = patched_connect socket.socket.send = patched_send socket.socket.recv = patched_recv socket.socket.sendall = patched_sendall d = tinytuya.OutletDevice( dev_id=DEVICE_ID, address=DEVICE_IP, local_key=LOCAL_KEY, version=3.3 ) d.set_socketTimeout(10) data = d.status() print('STATUS:', data)
0x09) before the status query — some plugs need it, mine didn'tutime.time() without NTP sync — plug rejects stale timestampsThis was tested on AUBESS Smart Socket 20A running Tuya firmware v3.3. Other Tuya devices may behave differently — some push status on connect without needing a query, some require a heartbeat first. The packet capture approach (monkey-patching Python's socket in tinytuya) is the most reliable way to debug unknown devices.
All code runs on standard MicroPython ESP32 builds with no additional packages. The only modules used are socket, time, struct, cryptolib, binascii, errno and json— all built-in.
""" uTuya - MicroPython Library for Local Tuya Device Control ========================================================== A lightweight MicroPython library for controlling Tuya-compatible smart devices (smart plugs, switches, lights, etc.) over local network without cloud dependency. Author: Ignas Bukys ignas.bukys [] gmail [] com License: MIT Version: 1.0.0 @ 2026-05-13 Features: --------- - Local network communication (no cloud required) - AES encryption/decryption for secure device communication - Support for Tuya protocol v3.3 - Energy monitoring (voltage, current, power) for compatible devices - DPS (Data Point) read/write operations - Heartbeat support to keep connections alive - Configurable debug logging - Optional connection pooling for frequent queries - Memory-optimized with memoryview to reduce allocations Hardware Requirements: --------------------- - ESP32, ESP8266, or other MicroPython-compatible microcontroller - Tuya-compatible smart device on the same local network Dependencies: ------------ - MicroPython standard library (socket, time, struct, json) - cryptolib (MicroPython AES encryption module) Usage Example: ------------- from tuya import uTuya # Initialize device connection device = uTuya( ip='192.168.1.100', dev='your_device_id', key='your_local_key', *version=3.3, *debug=True ) * Optional # Turn switch ON device.turn_on() device.set_dps(1, True) # Turn switch OFF device.turn_off() device.set_dps(1, False) # Get energy monitoring data print(f"Voltage: {device.voltage} V") print(f"Current: {device.current} A") print(f"Power: {device.power} W") # Check switch state if device.is_on: print("Device is ON") # Get all DPS values dps = device.get_dps() print(dps) # Get specific DPS value switch_state = device.get_dps(1) # Set custom DPS value device.set_dps(1, True) # Turn on # Print formatted status device.print_status() # Send heartbeat device.heartbeat() # Use keep_alive mode for multiple operations (automatic cleanup) device_persistent = uTuya( ip='192.168.1.100', dev='your_device_id', key='your_local_key', keep_alive=True # Reuse socket connection ) # Multiple operations reuse the same socket device_persistent.set_switch(True) print(device_persistent.voltage) device_persistent.heartbeat() # Close persistent connection when done device_persistent.close() How to Get Device Credentials: ------------------------------ 1. Device ID and Local Key: Read on http://bukys.eu blog post how to get Extract using tool tinytuya 2. IP Address: Check your router's DHCP table or use network scanner Protocol Notes: -------------- - Query packets (CMD_QUERY): NO version prefix in payload encryption - Control packets (CMD_CONTROL): REQUIRES version prefix (e.g., '3.3') - All packets use: HEADER + BODY + CRC32 + FOOTER structure - Responses are AES-encrypted and may include version prefix Memory Optimization Notes: -------------------------- - memoryview is used to avoid unnecessary data copying - Slicing memoryview creates a view, not a copy (zero-copy operations) - Must convert memoryview to bytes when passing to functions that need bytes - Pre-allocated buffers for CRC32 calculation reduce GC pressure - In-place operations where possible to minimize allocations MIT License: ----------- Copyright (c) 2025 Ignas Bukys Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ import socket, errno, time, binascii, struct import cryptolib # pyright: ignore[reportMissingImports] import json from micropython import const # pyright: ignore[reportMissingImports] # ── Socket Management ──────────────────────────────────────────────── class _TuyaSocket: """ Context manager for safe socket handling with automatic cleanup. Now handles the opening, connecting, and logic for persistent sockets. """ def __init__(self, parent): """ Initialize socket context manager. Args: parent (uTuya): Parent uTuya instance that provides IP, PORT, and stores the persistent socket state. """ self.parent = parent self.sock = None self.is_reused = False def _sock_open(self): """ Private helper to create and connect a new TCP socket. Returns socket object or None on failure. """ self.parent._log("Creating new socket object") sck = socket.socket() sck.settimeout(3) # Standard timeout for Tuya handshake try: self.parent._log(f"Connecting to {self.parent.ip}:{self.parent.PORT}") sck.connect((self.parent.ip, self.parent.PORT)) self.parent._log("Connected successfully") return sck except socket.error as e: # Handle common network errors with descriptive logs if e.errno == errno.EHOSTUNREACH: self.parent._log(f"Error: Host {self.parent.ip} unreachable.") elif e.errno == errno.ECONNREFUSED: self.parent._log(f"Error: Connection refused on port {self.parent.PORT}.") elif e.errno == errno.ECONNRESET: self.parent._log(f"Error: Connection reset by peer.") else: self.parent._log(f"Socket error: {e}") return None except Exception as e: self.parent._log(f"Unexpected error opening socket: {e}") return None def __enter__(self): """ Enter context: open or reuse socket connection. Returns: socket: Active socket connection Raises: OSError: If connection fails """ # 1. Try to reuse a persistent socket if keep_alive is active if self.parent.keep_alive and self.parent._socket: self.parent._log("Reusing persistent socket") self.sock = self.parent._socket self.is_reused = True return self.sock # Otherwise, open a fresh connection self.sock = self._sock_open() if self.sock is None: raise OSError("Failed to connect to device") # If keep_alive is enabled, store this socket for future reuse if self.parent.keep_alive: self.parent._socket = self.sock self.parent._log("Socket stored for keep_alive mode") self.is_reused = False return self.sock def __exit__(self, exc_type, exc_val, exc_tb): """ Exit context: close socket only if NOT in keep_alive mode or if error occurred. Args: exc_type: Exception type (if any) exc_val: Exception value (if any) exc_tb: Exception traceback (if any) """ # If an exception occurred, close and clear the socket even in keep_alive mode if exc_type is not None: self.parent._log(f"Exception in socket context: {exc_val}") if self.sock: try: self.sock.close() except: pass # Clear the persistent socket reference so next call opens fresh self.parent._socket = None return False # If NOT in keep_alive mode, close the socket now if not self.parent.keep_alive and self.sock: self.parent._log("Closing short-lived socket (not in keep_alive mode)") try: self.sock.close() except: pass else: self.parent._log("Socket kept open (keep_alive mode)") return False class uTuya: """ Main class for Tuya device communication over local network. Handles protocol encryption, packet crafting, and device state management. Memory-optimized with memoryview for reduced allocations. """ PORT = const(6668) # Standard Tuya device port HEADER = const(b'\x00\x00\x55\xaa') # Packet header signature FOOTER = const(b'\x00\x00\xaa\x55') # Packet footer signature CMD_HEARTBEAT = const(0x09) # Keep-alive command as heartbeay CMD_QUERY = const(0x0a) # Read device state command CMD_CONTROL = const(0x07) # Write/control command # Unix epoch (1970-01-01) vs MicroPython epoch (2000-01-01) offset in seconds # Used to convert MicroPython time.time() to Unix timestamp for Tuya protocol EPOCH_DIFF = const(946684800) def __init__(self, ip, dev, key, version=3.3, debug=False, keep_alive=False): """ Initialize Tuya device connection. Args: ip (str): Device IP address on local network dev (str): Tuya device ID (gwId/devId) key (str|bytes): AES encryption key for device version (float|str): Protocol version (default '3.3') debug (bool): Enable debug logging (default False) keep_alive (bool): Maintain persistent socket connection (default False) """ self.ip = ip self.did = dev # Ensure local_key is bytes for cryptolib self.key = key.encode() if isinstance(key, str) else key self.version = str(version) self.sequence = 0 # Packet sequence counter (incremented per request) self.debug = debug # Debug logging flag self.keep_alive = keep_alive self._socket = None # Persistent socket for keep_alive mode try: _ = binascii.crc32 self._crc32_available = True except (AttributeError, ImportError): self._crc32_available = True # ── Cryptography ──────────────────────────────────────────────────────── def _crc32(self, data): """ Calculate CRC32 checksum for packet integrity verification. Memory-optimized: Uses memoryview to avoid copying data when iterating. Uses polynomial 0xEDB88320 (reversed 0x04C11DB7) as per Tuya protocol. This is the same CRC32 algorithm used in ZIP, Ethernet, PNG, etc. Args: data (bytes): Data to calculate checksum for Returns: int: 32-bit CRC checksum """ # use the firmware's built-in CRC32 (written in C- faster) if self._crc32_available: return binascii.crc32(data) # use pure python implementation crc = 0xFFFFFFFF # Use memoryview to iterate without copying - saves memory mv = memoryview(data) for byte in mv: crc ^= byte for _ in range(8): if crc & 1: crc = (crc >> 1) ^ 0xEDB88320 else: crc >>= 1 return crc ^ 0xFFFFFFFF def _aes_encrypt(self, data): """ Encrypt data using AES-128-ECB with PKCS7 padding. Tuya uses ECB mode (not CBC) for simplicity, though less secure. PKCS7 padding ensures data length is multiple of 16 bytes. Args: data (str|bytes): Plaintext to encrypt Returns: bytes: Encrypted ciphertext """ if isinstance(data, str): data = data.encode() # PKCS7 padding: add N bytes of value N to reach 16-byte boundary pad = 16 - (len(data) % 16) data += bytes([pad] * pad) # cryptolib mode 1 = ECB encryption return cryptolib.aes(self.key, 1).encrypt(data) def _aes_decrypt(self, data): """ Decrypt AES-128-ECB encrypted data and remove PKCS7 padding. Args: data (bytes): Encrypted ciphertext Returns: bytes: Decrypted plaintext with padding removed """ dec = cryptolib.aes(self.key, 1).decrypt(data) # Remove PKCS7 padding (last byte indicates pad length) pad = dec[-1] if pad <= 16: # Valid padding must be 1-16 dec = dec[:-pad] return dec # ── Protocol ────────────────────────────────────────────────────────── def _craft_packet(self, cmd, payload_bytes): """ Construct a complete Tuya protocol packet. Packet structure: - HEADER (4 bytes): 0x000055aa - Sequence (4 bytes): Incrementing packet counter - Command (4 bytes): Command code (0x07, 0x09, 0x0a, etc.) - Length (4 bytes): Payload length + 8 (includes return code + payload) - Payload (variable): Encrypted data - CRC32 (4 bytes): Checksum of HEADER + body - FOOTER (4 bytes): 0x0000aa55 Args: cmd (int): Command code (CMD_QUERY, CMD_CONTROL, etc.) payload_bytes (bytes): Encrypted payload data Returns: bytes: Complete packet ready to send """ self.sequence += 1 # Increment for each new request # Length includes 8-byte overhead (4-byte return code + 4-byte CRC in some contexts) length = len(payload_bytes) + 8 # Body: sequence + command + length + payload body = struct.pack('>III', self.sequence, cmd, length) + payload_bytes # Calculate CRC over header + body crc = self._crc32(self.HEADER + body) & 0xFFFFFFFF packet = self.HEADER + body + struct.pack('>I', crc) + self.FOOTER self._log(f"Crafted packet: seq={self.sequence}, cmd=0x{cmd:02x}, len={length}, total={len(packet)} bytes") return packet def _encrypt_query(self, data, version_incl=False): """ Encrypt payload with optional version prefix. CRITICAL PROTOCOL DIFFERENCE: - Query packets (CMD_QUERY): NO version prefix (version_incl=False) - Control packets (CMD_CONTROL): REQUIRE version prefix (version_incl=True) Version prefix format: '3.3' + 12 null bytes + encrypted_data Args: data (str|bytes): JSON payload to encrypt version_incl (bool): Whether to prepend version header Returns: bytes: Encrypted payload (with version prefix if requested) """ self._log(f"Encrypting payload ({len(data)} bytes, version_prefix={version_incl})") enc = self._aes_encrypt(data) if version_incl: # Format: '3.3\x00\x00...\x00' (version string + 12 nulls) + encrypted # result = b'{%s}' % self.version.encode() + b'\x00' * 12 + enc result = b'%s' % self.version.encode() + b'\x00' * 12 + enc self._log(f"Added version prefix: total {len(result)} bytes") return result self._log(f"No version prefix: {len(enc)} bytes") return enc def _decrypt_response(self, resp): """ Decrypt and parse device response packet. MEMORY-OPTIMIZED: Uses memoryview to slice without copying data. Response may have various prefixes before the actual JSON: - 4-byte null prefix (0x00000000) - Version prefix ('3.3' + 12 nulls) Args: resp (bytes): Raw response packet from device Returns: dict|None: Parsed JSON data or None if decryption failed """ # ── Validation ── if not resp or len(resp) < 16: self._log("Response too short") return None # Create memoryview for zero-copy slicing mv = memoryview(resp) # Verify packet structure (header at start, footer at end) # Must convert memoryview slices to bytes for comparison if bytes(mv[:4]) != self.HEADER or bytes(mv[-4:]) != self.FOOTER: self._log("Invalid header/footer in response") return None # ── Extract Payload ── # Length field at offset 12 includes 8-byte overhead length = struct.unpack('>I', mv[12:16])[0] # Extract payload using memoryview (zero-copy) pl_view = mv[16:16 + length - 8] # ── Strip Prefixes ── # Some responses have 4-byte null prefix (0x00000000) if len(pl_view) >= 4 and bytes(pl_view[:2]) == b'\x00\x00': pl_view = pl_view[4:] # Strip version prefix if present ('3.3' + 12 nulls = 15 bytes) if pl_view[:3] == b'3.3': pl_view = pl_view[15:] # ── Decrypt ── # Payload must be multiple of 16 (AES block size) if not pl_view or len(pl_view) % 16 != 0: self._log(f"Invalid payload length: {len(pl_view)}") return None # Convert memoryview to bytes for decryption (cryptolib needs bytes) payload = bytes(pl_view) try: dec = self._aes_decrypt(payload) # Find JSON boundaries in decrypted data # (sometimes has garbage before/after the JSON) i = dec.find(b'{') j = dec.rfind(b'}') + 1 if i >= 0 and j > i: return json.loads(dec[i:j]) except Exception as e: self._log(f"Decryption error: {e}") return None # ── Helpers ──────────────────────────────────────────────────────── def _unix_now(self): """ Get current Unix timestamp (seconds since 1970-01-01). MicroPython's time.time() returns seconds since 2000-01-01, so we add EPOCH_DIFF to convert to Unix time. Returns: int: Current Unix timestamp """ return int(time.time() + self.EPOCH_DIFF) def _log(self, msg): """ Print debug log message if debug mode enabled. Args: msg (str): Message to log """ if self.debug: print(f"[uTuya] {msg}") def _is_ack(self, raw): """ Checks if a raw response packet is a successful ACK from the device. Args: raw (bytes): The raw data received from the socket. Returns: bool: True if this appears to be a valid ACK """ # Use memoryview for zero-copy header check mv = memoryview(raw) # Minimum valid packet size for an ACK is 28 bytes: # Header(4) + Seq(4) + Cmd(4) + Len(4) + RetCode(4) + CRC(4) + Footer(4) if not raw or len(raw) < 28: self._log("ACK Check: Response too short or empty") return False # 1. Verify Packet Integrity (Header and Footer) if bytes(mv[:4]) != self.HEADER or bytes(mv[-4:]) != self.FOOTER: self._log("ACK Check: Invalid Header or Footer") return False try: # 2. Extract Return Code # In the Tuya protocol, the Return Code is the first 4 bytes of the # payload area, which starts at index 16. ret_code = struct.unpack('>I', bytes(mv[16:20]))[0] if ret_code == 0: self._log("ACK Check: Success (Return Code 0)") return True else: self._log(f"ACK Check: Device returned error code {ret_code}") return False except Exception as e: self._log(f"ACK Check: Parsing error: {e}") return False # ── Public API ──────────────────────────────────────────────────────── def get_data_raw(self): """ Query device for current state (all DPS values). Sends CMD_QUERY command with device ID and timestamp. Does NOT include version prefix for query packets. Returns: dict|None: Response containing 'dps' dict with all data points, or None on error Example response: { 'dps': { '1': True, # Switch state '18': 2340, # Current (mA) '19': 523, # Power (W*10) '20': 2301 # Voltage (V*10) } } """ # Construct JSON payload with device ID and timestamp t = str(self._unix_now()) payload = ('{"gwId":"%s","devId":"%s","uid":"%s","t":"%s"}' % (self.did, self.did, self.did, t)) # Encrypt WITHOUT version prefix (critical for query packets) enc = self._encrypt_query(payload, version_incl=False) pkt = self._craft_packet(self.CMD_QUERY, enc) self._log(f"Sending QUERY command (seq={self.sequence})") try: # Context manager ensures socket is always closed properly with _TuyaSocket(self) as s: self._log(f"Request ({len(pkt)} bytes): {pkt.hex()}") s.send(pkt) raw = s.recv(512) # Max response size self._log(f"Response ({len(raw)} bytes): {raw.hex()}") return self._decrypt_response(raw) except OSError as e: print(f"Connection error in get_data_raw: {e}") return None except Exception as e: print(f'get_data_raw error: {e}') return None def get_dps(self, key=None): """ Get DPS (Data Point) values from device. Args: key (int|str|None): Specific DPS key to retrieve, or None for all Returns: dict: All DPS values if key=None any: Specific DPS value if key provided None: On error or if key not found Example: device.get_dps() # {'1': True, '18': 2340, '19': 523, '20': 2301} device.get_dps(1) # True device.get_dps('20') # 2301 """ data = self.get_data_raw() if not data: return None dps = data.get('dps', {}) # Return specific key if requested, otherwise return all DPS return dps.get(str(key)) if key is not None else dps def set_dps(self, key, value): """ Set a specific DPS (Data Point) value on device. Generic setter for any DPS key/value pair. Useful for advanced control beyond simple switch on/off (e.g., brightness, color, timer, etc.) Note: Some devices send TWO responses: 1. ACK packet (command accepted) 2. Status packet (updated DPS values) Args: key (int|str): DPS key to set value (any): Value to set (bool, int, str, etc.) Returns: dict|None: Response from device or None on error Example: device.set_dps(1, True) # Turn switch on device.set_dps(2, 50) # Set brightness to 50% device.set_dps(3, "white") # Set color mode """ t = str(self._unix_now()) # Convert value to JSON string if not already a string # (booleans need lowercase 'true'/'false', not Python's 'True'/'False') val_str = json.dumps(value) if not isinstance(value, str) else value.lower() payload = ('{"gwId":"%s","devId":"%s","uid":"%s","t":"%s","dps":{"%s":%s}}' % (self.did, self.did, self.did, t, key, val_str)) # Control commands REQUIRE version prefix (critical!) enc = self._encrypt_query(payload, version_incl=True) pkt = self._craft_packet(self.CMD_CONTROL, enc) self._log(f"Sending CONTROL (seq={self.sequence}): DPS {key}={value}") try: # Context manager ensures socket is always closed properly with _TuyaSocket(self) as s: self._log(f"Request ({len(pkt)} bytes): {pkt.hex()}") s.send(pkt) # Receive the first response (The ACK is expected) raw_ack = s.recv(512) self._log(f"ACK response ({len(raw_ack)} bytes): {raw_ack.hex()}") if self._is_ack(raw_ack): self._log("Command accepted by device.") # Receive actual response (expected status update) raw = s.recv(512) self._log(f"Response ({len(raw)} bytes): {raw.hex()}") return self._decrypt_response(raw) self._log("Command failed or timed out.") return None except OSError as e: print(f"Connection error in set_dps: {e}") return None except Exception as e: print(f'set_dps error: {e}') return None def heartbeat(self): """ Send heartbeat packet to device to keep connection alive. Some devices may drop idle connections. Send periodic heartbeats to maintain persistent connections. Returns: bool: True if heartbeat acknowledged, False on error """ # Heartbeat has empty payload pkt = self._craft_packet(self.CMD_HEARTBEAT, b'') self._log(f"Sending HEARTBEAT (seq={self.sequence})") try: # Context manager ensures socket is always closed properly with _TuyaSocket(self) as s: self._log(f"Heatbeat request ({len(pkt)} bytes): {pkt.hex()}") s.send(pkt) raw = s.recv(512) self._log(f"Heartbeat ACK response ({len(raw)} bytes): {raw.hex()}") return raw is not None except OSError as e: print(f"Connection error in heartbeat: {e}") return False except Exception as e: print(f'heartbeat error: {e}') return False def turn_on(self): """Convenience method to turn device ON""" return self.set_dps(1, True) def turn_off(self): """Convenience method to turn device OFF""" return self.set_dps(1, False) # ── Convenience Properties ──────────────────────────────────────────── # NOTE: Properties like voltage() call self.get_dps(), which calls self.get_data_raw(). # If you check voltage, current, and power sequentially, you are performing # three separate network requests. For efficiency, call get_dps() once and # calculate all values from the returned dict. @property def voltage(self): """ Get current voltage in Volts. Returns: float|None: Voltage in V (e.g., 230.1) or None on error """ dps = self.get_dps() return dps.get('20', 0) / 10 if dps else None @property def current(self): """ Get current draw in Amperes Returns: float|None: Current in A (e.g., 2.34) or None on error """ dps = self.get_dps() return dps.get('18', 0) / 1000 if dps else None @property def power(self): """ Get power consumption in Watts. Returns: float|None: Power in W (e.g., 52.3) or None on error """ dps = self.get_dps() return dps.get('19', 0) / 10 if dps else None @property def is_on(self): """ Check if device switch is currently ON. Returns: bool|None: True if ON, False if OFF, None on error """ dps = self.get_dps() return dps.get('1', False) if dps else None # ── Utility Methods ──────────────────────────────────────────────── def print_status(self): """ Print formatted device status to console. Displays switch state, voltage, current, power, and raw DPS values. Useful for debugging and monitoring. """ dps = self.get_dps() print('-' * 40) print('Device Status:') print('-' * 40) print(f' Switch : {dps.get("1", "N/A")}') print(f' Voltage: {dps.get("20", 0) / 10:.1f} V') print(f' Current: {dps.get("18", 0) / 1000:.3f} A') print(f' Power : {dps.get("19", 0) / 10:.1f} W') print('-' * 40) print(f' Raw DPS: {dps}') print('-' * 40)