"""
uTuya - MicroPython Library for Local Tuya Device Control
==========================================================
A lightweight MicroPython library for controlling Tuya-compatible smart devices
(smart plugs, switches, lights, etc.) over local network without cloud dependency.
Author: Ignas Bukys
ignas.bukys [] gmail [] com
License: MIT
Version: 1.0.0 @ 2026-05-13
Features:
---------
- Local network communication (no cloud required)
- AES encryption/decryption for secure device communication
- Support for Tuya protocol v3.3
- Energy monitoring (voltage, current, power) for compatible devices
- DPS (Data Point) read/write operations
- Heartbeat support to keep connections alive
- Configurable debug logging
- Optional connection pooling for frequent queries
- Memory-optimized with memoryview to reduce allocations
Hardware Requirements:
---------------------
- ESP32, ESP8266, or other MicroPython-compatible microcontroller
- Tuya-compatible smart device on the same local network
Dependencies:
------------
- MicroPython standard library (socket, time, struct, json)
- cryptolib (MicroPython AES encryption module)
Usage Example:
-------------
from tuya import uTuya
# Initialize device connection
device = uTuya(
ip='192.168.1.100',
dev='your_device_id',
key='your_local_key',
*version=3.3,
*debug=True
)
* Optional
# Turn switch ON
device.turn_on()
device.set_dps(1, True)
# Turn switch OFF
device.turn_off()
device.set_dps(1, False)
# Get energy monitoring data
print(f"Voltage: {device.voltage} V")
print(f"Current: {device.current} A")
print(f"Power: {device.power} W")
# Check switch state
if device.is_on:
print("Device is ON")
# Get all DPS values
dps = device.get_dps()
print(dps)
# Get specific DPS value
switch_state = device.get_dps(1)
# Set custom DPS value
device.set_dps(1, True) # Turn on
# Print formatted status
device.print_status()
# Send heartbeat
device.heartbeat()
# Use keep_alive mode for multiple operations (automatic cleanup)
device_persistent = uTuya(
ip='192.168.1.100',
dev='your_device_id',
key='your_local_key',
keep_alive=True # Reuse socket connection
)
# Multiple operations reuse the same socket
device_persistent.set_switch(True)
print(device_persistent.voltage)
device_persistent.heartbeat()
# Close persistent connection when done
device_persistent.close()
How to Get Device Credentials:
------------------------------
1. Device ID and Local Key: Read on http://bukys.eu blog post how to get Extract using tool tinytuya
2. IP Address: Check your router's DHCP table or use network scanner
Protocol Notes:
--------------
- Query packets (CMD_QUERY): NO version prefix in payload encryption
- Control packets (CMD_CONTROL): REQUIRES version prefix (e.g., '3.3')
- All packets use: HEADER + BODY + CRC32 + FOOTER structure
- Responses are AES-encrypted and may include version prefix
Memory Optimization Notes:
--------------------------
- memoryview is used to avoid unnecessary data copying
- Slicing memoryview creates a view, not a copy (zero-copy operations)
- Must convert memoryview to bytes when passing to functions that need bytes
- Pre-allocated buffers for CRC32 calculation reduce GC pressure
- In-place operations where possible to minimize allocations
MIT License:
-----------
Copyright (c) 2025 Ignas Bukys
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import socket, errno, time, binascii, struct
import cryptolib # pyright: ignore[reportMissingImports]
import json
from micropython import const # pyright: ignore[reportMissingImports]
# ── Socket Management ────────────────────────────────────────────────
class _TuyaSocket:
"""
Context manager for safe socket handling with automatic cleanup.
Now handles the opening, connecting, and logic for persistent sockets.
"""
def __init__(self, parent):
"""
Initialize socket context manager.
Args:
parent (uTuya): Parent uTuya instance that provides IP, PORT,
and stores the persistent socket state.
"""
self.parent = parent
self.sock = None
self.is_reused = False
def _sock_open(self):
"""
Private helper to create and connect a new TCP socket.
Returns socket object or None on failure.
"""
self.parent._log("Creating new socket object")
sck = socket.socket()
sck.settimeout(3) # Standard timeout for Tuya handshake
try:
self.parent._log(f"Connecting to {self.parent.ip}:{self.parent.PORT}")
sck.connect((self.parent.ip, self.parent.PORT))
self.parent._log("Connected successfully")
return sck
except socket.error as e:
# Handle common network errors with descriptive logs
if e.errno == errno.EHOSTUNREACH:
self.parent._log(f"Error: Host {self.parent.ip} unreachable.")
elif e.errno == errno.ECONNREFUSED:
self.parent._log(f"Error: Connection refused on port {self.parent.PORT}.")
elif e.errno == errno.ECONNRESET:
self.parent._log(f"Error: Connection reset by peer.")
else:
self.parent._log(f"Socket error: {e}")
return None
except Exception as e:
self.parent._log(f"Unexpected error opening socket: {e}")
return None
def __enter__(self):
"""
Enter context: open or reuse socket connection.
Returns:
socket: Active socket connection
Raises:
OSError: If connection fails
"""
# 1. Try to reuse a persistent socket if keep_alive is active
if self.parent.keep_alive and self.parent._socket:
self.parent._log("Reusing persistent socket")
self.sock = self.parent._socket
self.is_reused = True
return self.sock
# Otherwise, open a fresh connection
self.sock = self._sock_open()
if self.sock is None:
raise OSError("Failed to connect to device")
# If keep_alive is enabled, store this socket for future reuse
if self.parent.keep_alive:
self.parent._socket = self.sock
self.parent._log("Socket stored for keep_alive mode")
self.is_reused = False
return self.sock
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Exit context: close socket only if NOT in keep_alive mode or if error occurred.
Args:
exc_type: Exception type (if any)
exc_val: Exception value (if any)
exc_tb: Exception traceback (if any)
"""
# If an exception occurred, close and clear the socket even in keep_alive mode
if exc_type is not None:
self.parent._log(f"Exception in socket context: {exc_val}")
if self.sock:
try: self.sock.close()
except: pass
# Clear the persistent socket reference so next call opens fresh
self.parent._socket = None
return False
# If NOT in keep_alive mode, close the socket now
if not self.parent.keep_alive and self.sock:
self.parent._log("Closing short-lived socket (not in keep_alive mode)")
try:
self.sock.close()
except:
pass
else:
self.parent._log("Socket kept open (keep_alive mode)")
return False
class uTuya:
"""
Main class for Tuya device communication over local network.
Handles protocol encryption, packet crafting, and device state management.
Memory-optimized with memoryview for reduced allocations.
"""
PORT = const(6668) # Standard Tuya device port
HEADER = const(b'\x00\x00\x55\xaa') # Packet header signature
FOOTER = const(b'\x00\x00\xaa\x55') # Packet footer signature
CMD_HEARTBEAT = const(0x09) # Keep-alive command as heartbeay
CMD_QUERY = const(0x0a) # Read device state command
CMD_CONTROL = const(0x07) # Write/control command
# Unix epoch (1970-01-01) vs MicroPython epoch (2000-01-01) offset in seconds
# Used to convert MicroPython time.time() to Unix timestamp for Tuya protocol
EPOCH_DIFF = const(946684800)
def __init__(self, ip, dev, key, version=3.3, debug=False, keep_alive=False):
"""
Initialize Tuya device connection.
Args:
ip (str): Device IP address on local network
dev (str): Tuya device ID (gwId/devId)
key (str|bytes): AES encryption key for device
version (float|str): Protocol version (default '3.3')
debug (bool): Enable debug logging (default False)
keep_alive (bool): Maintain persistent socket connection (default False)
"""
self.ip = ip
self.did = dev
# Ensure local_key is bytes for cryptolib
self.key = key.encode() if isinstance(key, str) else key
self.version = str(version)
self.sequence = 0 # Packet sequence counter (incremented per request)
self.debug = debug # Debug logging flag
self.keep_alive = keep_alive
self._socket = None # Persistent socket for keep_alive mode
try:
_ = binascii.crc32
self._crc32_available = True
except (AttributeError, ImportError):
self._crc32_available = True
# ── Cryptography ────────────────────────────────────────────────────────
def _crc32(self, data):
"""
Calculate CRC32 checksum for packet integrity verification.
Memory-optimized: Uses memoryview to avoid copying data when iterating.
Uses polynomial 0xEDB88320 (reversed 0x04C11DB7) as per Tuya protocol.
This is the same CRC32 algorithm used in ZIP, Ethernet, PNG, etc.
Args:
data (bytes): Data to calculate checksum for
Returns:
int: 32-bit CRC checksum
"""
# use the firmware's built-in CRC32 (written in C- faster)
if self._crc32_available:
return binascii.crc32(data)
# use pure python implementation
crc = 0xFFFFFFFF
# Use memoryview to iterate without copying - saves memory
mv = memoryview(data)
for byte in mv:
crc ^= byte
for _ in range(8):
if crc & 1:
crc = (crc >> 1) ^ 0xEDB88320
else:
crc >>= 1
return crc ^ 0xFFFFFFFF
def _aes_encrypt(self, data):
"""
Encrypt data using AES-128-ECB with PKCS7 padding.
Tuya uses ECB mode (not CBC) for simplicity, though less secure.
PKCS7 padding ensures data length is multiple of 16 bytes.
Args:
data (str|bytes): Plaintext to encrypt
Returns:
bytes: Encrypted ciphertext
"""
if isinstance(data, str):
data = data.encode()
# PKCS7 padding: add N bytes of value N to reach 16-byte boundary
pad = 16 - (len(data) % 16)
data += bytes([pad] * pad)
# cryptolib mode 1 = ECB encryption
return cryptolib.aes(self.key, 1).encrypt(data)
def _aes_decrypt(self, data):
"""
Decrypt AES-128-ECB encrypted data and remove PKCS7 padding.
Args:
data (bytes): Encrypted ciphertext
Returns:
bytes: Decrypted plaintext with padding removed
"""
dec = cryptolib.aes(self.key, 1).decrypt(data)
# Remove PKCS7 padding (last byte indicates pad length)
pad = dec[-1]
if pad <= 16: # Valid padding must be 1-16
dec = dec[:-pad]
return dec
# ── Protocol ──────────────────────────────────────────────────────────
def _craft_packet(self, cmd, payload_bytes):
"""
Construct a complete Tuya protocol packet.
Packet structure:
- HEADER (4 bytes): 0x000055aa
- Sequence (4 bytes): Incrementing packet counter
- Command (4 bytes): Command code (0x07, 0x09, 0x0a, etc.)
- Length (4 bytes): Payload length + 8 (includes return code + payload)
- Payload (variable): Encrypted data
- CRC32 (4 bytes): Checksum of HEADER + body
- FOOTER (4 bytes): 0x0000aa55
Args:
cmd (int): Command code (CMD_QUERY, CMD_CONTROL, etc.)
payload_bytes (bytes): Encrypted payload data
Returns:
bytes: Complete packet ready to send
"""
self.sequence += 1 # Increment for each new request
# Length includes 8-byte overhead (4-byte return code + 4-byte CRC in some contexts)
length = len(payload_bytes) + 8
# Body: sequence + command + length + payload
body = struct.pack('>III', self.sequence, cmd, length) + payload_bytes
# Calculate CRC over header + body
crc = self._crc32(self.HEADER + body) & 0xFFFFFFFF
packet = self.HEADER + body + struct.pack('>I', crc) + self.FOOTER
self._log(f"Crafted packet: seq={self.sequence}, cmd=0x{cmd:02x}, len={length}, total={len(packet)} bytes")
return packet
def _encrypt_query(self, data, version_incl=False):
"""
Encrypt payload with optional version prefix.
CRITICAL PROTOCOL DIFFERENCE:
- Query packets (CMD_QUERY): NO version prefix (version_incl=False)
- Control packets (CMD_CONTROL): REQUIRE version prefix (version_incl=True)
Version prefix format: '3.3' + 12 null bytes + encrypted_data
Args:
data (str|bytes): JSON payload to encrypt
version_incl (bool): Whether to prepend version header
Returns:
bytes: Encrypted payload (with version prefix if requested)
"""
self._log(f"Encrypting payload ({len(data)} bytes, version_prefix={version_incl})")
enc = self._aes_encrypt(data)
if version_incl:
# Format: '3.3\x00\x00...\x00' (version string + 12 nulls) + encrypted
# result = b'{%s}' % self.version.encode() + b'\x00' * 12 + enc
result = b'%s' % self.version.encode() + b'\x00' * 12 + enc
self._log(f"Added version prefix: total {len(result)} bytes")
return result
self._log(f"No version prefix: {len(enc)} bytes")
return enc
def _decrypt_response(self, resp):
"""
Decrypt and parse device response packet.
MEMORY-OPTIMIZED: Uses memoryview to slice without copying data.
Response may have various prefixes before the actual JSON:
- 4-byte null prefix (0x00000000)
- Version prefix ('3.3' + 12 nulls)
Args:
resp (bytes): Raw response packet from device
Returns:
dict|None: Parsed JSON data or None if decryption failed
"""
# ── Validation ──
if not resp or len(resp) < 16:
self._log("Response too short")
return None
# Create memoryview for zero-copy slicing
mv = memoryview(resp)
# Verify packet structure (header at start, footer at end)
# Must convert memoryview slices to bytes for comparison
if bytes(mv[:4]) != self.HEADER or bytes(mv[-4:]) != self.FOOTER:
self._log("Invalid header/footer in response")
return None
# ── Extract Payload ──
# Length field at offset 12 includes 8-byte overhead
length = struct.unpack('>I', mv[12:16])[0]
# Extract payload using memoryview (zero-copy)
pl_view = mv[16:16 + length - 8]
# ── Strip Prefixes ──
# Some responses have 4-byte null prefix (0x00000000)
if len(pl_view) >= 4 and bytes(pl_view[:2]) == b'\x00\x00':
pl_view = pl_view[4:]
# Strip version prefix if present ('3.3' + 12 nulls = 15 bytes)
if pl_view[:3] == b'3.3':
pl_view = pl_view[15:]
# ── Decrypt ──
# Payload must be multiple of 16 (AES block size)
if not pl_view or len(pl_view) % 16 != 0:
self._log(f"Invalid payload length: {len(pl_view)}")
return None
# Convert memoryview to bytes for decryption (cryptolib needs bytes)
payload = bytes(pl_view)
try:
dec = self._aes_decrypt(payload)
# Find JSON boundaries in decrypted data
# (sometimes has garbage before/after the JSON)
i = dec.find(b'{')
j = dec.rfind(b'}') + 1
if i >= 0 and j > i:
return json.loads(dec[i:j])
except Exception as e:
self._log(f"Decryption error: {e}")
return None
# ── Helpers ────────────────────────────────────────────────────────
def _unix_now(self):
"""
Get current Unix timestamp (seconds since 1970-01-01).
MicroPython's time.time() returns seconds since 2000-01-01,
so we add EPOCH_DIFF to convert to Unix time.
Returns:
int: Current Unix timestamp
"""
return int(time.time() + self.EPOCH_DIFF)
def _log(self, msg):
"""
Print debug log message if debug mode enabled.
Args:
msg (str): Message to log
"""
if self.debug:
print(f"[uTuya] {msg}")
def _is_ack(self, raw):
"""
Checks if a raw response packet is a successful ACK from the device.
Args:
raw (bytes): The raw data received from the socket.
Returns:
bool: True if this appears to be a valid ACK
"""
# Use memoryview for zero-copy header check
mv = memoryview(raw)
# Minimum valid packet size for an ACK is 28 bytes:
# Header(4) + Seq(4) + Cmd(4) + Len(4) + RetCode(4) + CRC(4) + Footer(4)
if not raw or len(raw) < 28:
self._log("ACK Check: Response too short or empty")
return False
# 1. Verify Packet Integrity (Header and Footer)
if bytes(mv[:4]) != self.HEADER or bytes(mv[-4:]) != self.FOOTER:
self._log("ACK Check: Invalid Header or Footer")
return False
try:
# 2. Extract Return Code
# In the Tuya protocol, the Return Code is the first 4 bytes of the
# payload area, which starts at index 16.
ret_code = struct.unpack('>I', bytes(mv[16:20]))[0]
if ret_code == 0:
self._log("ACK Check: Success (Return Code 0)")
return True
else:
self._log(f"ACK Check: Device returned error code {ret_code}")
return False
except Exception as e:
self._log(f"ACK Check: Parsing error: {e}")
return False
# ── Public API ────────────────────────────────────────────────────────
def get_data_raw(self):
"""
Query device for current state (all DPS values).
Sends CMD_QUERY command with device ID and timestamp.
Does NOT include version prefix for query packets.
Returns:
dict|None: Response containing 'dps' dict with all data points, or None on error
Example response:
{
'dps': {
'1': True, # Switch state
'18': 2340, # Current (mA)
'19': 523, # Power (W*10)
'20': 2301 # Voltage (V*10)
}
}
"""
# Construct JSON payload with device ID and timestamp
t = str(self._unix_now())
payload = ('{"gwId":"%s","devId":"%s","uid":"%s","t":"%s"}'
% (self.did, self.did, self.did, t))
# Encrypt WITHOUT version prefix (critical for query packets)
enc = self._encrypt_query(payload, version_incl=False)
pkt = self._craft_packet(self.CMD_QUERY, enc)
self._log(f"Sending QUERY command (seq={self.sequence})")
try:
# Context manager ensures socket is always closed properly
with _TuyaSocket(self) as s:
self._log(f"Request ({len(pkt)} bytes): {pkt.hex()}")
s.send(pkt)
raw = s.recv(512) # Max response size
self._log(f"Response ({len(raw)} bytes): {raw.hex()}")
return self._decrypt_response(raw)
except OSError as e:
print(f"Connection error in get_data_raw: {e}")
return None
except Exception as e:
print(f'get_data_raw error: {e}')
return None
def get_dps(self, key=None):
"""
Get DPS (Data Point) values from device.
Args:
key (int|str|None): Specific DPS key to retrieve, or None for all
Returns:
dict: All DPS values if key=None
any: Specific DPS value if key provided
None: On error or if key not found
Example:
device.get_dps() # {'1': True, '18': 2340, '19': 523, '20': 2301}
device.get_dps(1) # True
device.get_dps('20') # 2301
"""
data = self.get_data_raw()
if not data:
return None
dps = data.get('dps', {})
# Return specific key if requested, otherwise return all DPS
return dps.get(str(key)) if key is not None else dps
def set_dps(self, key, value):
"""
Set a specific DPS (Data Point) value on device.
Generic setter for any DPS key/value pair. Useful for advanced control
beyond simple switch on/off (e.g., brightness, color, timer, etc.)
Note: Some devices send TWO responses:
1. ACK packet (command accepted)
2. Status packet (updated DPS values)
Args:
key (int|str): DPS key to set
value (any): Value to set (bool, int, str, etc.)
Returns:
dict|None: Response from device or None on error
Example:
device.set_dps(1, True) # Turn switch on
device.set_dps(2, 50) # Set brightness to 50%
device.set_dps(3, "white") # Set color mode
"""
t = str(self._unix_now())
# Convert value to JSON string if not already a string
# (booleans need lowercase 'true'/'false', not Python's 'True'/'False')
val_str = json.dumps(value) if not isinstance(value, str) else value.lower()
payload = ('{"gwId":"%s","devId":"%s","uid":"%s","t":"%s","dps":{"%s":%s}}'
% (self.did, self.did, self.did, t, key, val_str))
# Control commands REQUIRE version prefix (critical!)
enc = self._encrypt_query(payload, version_incl=True)
pkt = self._craft_packet(self.CMD_CONTROL, enc)
self._log(f"Sending CONTROL (seq={self.sequence}): DPS {key}={value}")
try:
# Context manager ensures socket is always closed properly
with _TuyaSocket(self) as s:
self._log(f"Request ({len(pkt)} bytes): {pkt.hex()}")
s.send(pkt)
# Receive the first response (The ACK is expected)
raw_ack = s.recv(512)
self._log(f"ACK response ({len(raw_ack)} bytes): {raw_ack.hex()}")
if self._is_ack(raw_ack):
self._log("Command accepted by device.")
# Receive actual response (expected status update)
raw = s.recv(512)
self._log(f"Response ({len(raw)} bytes): {raw.hex()}")
return self._decrypt_response(raw)
self._log("Command failed or timed out.")
return None
except OSError as e:
print(f"Connection error in set_dps: {e}")
return None
except Exception as e:
print(f'set_dps error: {e}')
return None
def heartbeat(self):
"""
Send heartbeat packet to device to keep connection alive.
Some devices may drop idle connections. Send periodic heartbeats
to maintain persistent connections.
Returns:
bool: True if heartbeat acknowledged, False on error
"""
# Heartbeat has empty payload
pkt = self._craft_packet(self.CMD_HEARTBEAT, b'')
self._log(f"Sending HEARTBEAT (seq={self.sequence})")
try:
# Context manager ensures socket is always closed properly
with _TuyaSocket(self) as s:
self._log(f"Heatbeat request ({len(pkt)} bytes): {pkt.hex()}")
s.send(pkt)
raw = s.recv(512)
self._log(f"Heartbeat ACK response ({len(raw)} bytes): {raw.hex()}")
return raw is not None
except OSError as e:
print(f"Connection error in heartbeat: {e}")
return False
except Exception as e:
print(f'heartbeat error: {e}')
return False
def turn_on(self):
"""Convenience method to turn device ON"""
return self.set_dps(1, True)
def turn_off(self):
"""Convenience method to turn device OFF"""
return self.set_dps(1, False)
# ── Convenience Properties ────────────────────────────────────────────
# NOTE: Properties like voltage() call self.get_dps(), which calls self.get_data_raw().
# If you check voltage, current, and power sequentially, you are performing
# three separate network requests. For efficiency, call get_dps() once and
# calculate all values from the returned dict.
@property
def voltage(self):
"""
Get current voltage in Volts.
Returns:
float|None: Voltage in V (e.g., 230.1) or None on error
"""
dps = self.get_dps()
return dps.get('20', 0) / 10 if dps else None
@property
def current(self):
"""
Get current draw in Amperes
Returns:
float|None: Current in A (e.g., 2.34) or None on error
"""
dps = self.get_dps()
return dps.get('18', 0) / 1000 if dps else None
@property
def power(self):
"""
Get power consumption in Watts.
Returns:
float|None: Power in W (e.g., 52.3) or None on error
"""
dps = self.get_dps()
return dps.get('19', 0) / 10 if dps else None
@property
def is_on(self):
"""
Check if device switch is currently ON.
Returns:
bool|None: True if ON, False if OFF, None on error
"""
dps = self.get_dps()
return dps.get('1', False) if dps else None
# ── Utility Methods ────────────────────────────────────────────────
def print_status(self):
"""
Print formatted device status to console.
Displays switch state, voltage, current, power, and raw DPS values.
Useful for debugging and monitoring.
"""
dps = self.get_dps()
print('-' * 40)
print('Device Status:')
print('-' * 40)
print(f' Switch : {dps.get("1", "N/A")}')
print(f' Voltage: {dps.get("20", 0) / 10:.1f} V')
print(f' Current: {dps.get("18", 0) / 1000:.3f} A')
print(f' Power : {dps.get("19", 0) / 10:.1f} W')
print('-' * 40)
print(f' Raw DPS: {dps}')
print('-' * 40)