start

Hello

Welcome to my corner on the web! My name is Ignas Bukys and I am excited to share my thoughts, experiences, and insights with you.
On this blog, you will find a diverse range of topics including technology and programming. All my projects (ongoing & finished) are listed here.
Whether you are here for personal growth, to learn something new, or simply to be entertained, I hope you find value in what I have to share. Thank you for stopping by and I hope you enjoy your visit.

—- Visit blog archive

Latest Blog messages

Controlling Tuya Smart Plugs Locally with MicroPython on ESP32

No cloud. No dependencies. Pure local LAN control.

I spent a few days reverse-engineering the Tuya local LAN protocol to get my AUBESS Smart Socket 20A plugs talking to an ESP32 running MicroPython — without relying on the cloud, without any third-party MicroPython libraries. This post documents everything I found, including the quirks that cost me the most time.

Tuya's cloud API is rate-limited, requires internet, and adds ~500ms latency. For home automation on a microcontroller you want local LAN control: direct TCP on port 6668, sub-10ms response and solution that works offline.

There are Python libraries for this on desktop (tinytuya), but nothing clean for MicroPython. This post fills that gap.

  • ESP32 board running MicroPython
  • Tuya-based smart plug (tested: AUBESS Smart Socket 20A)
  • Your plug's Device ID and Local Key (see below)
  • The plug's local IP address

I assume, you already have Tuya app installed on your phone :) Its the only way to get device's Local Key- developer account is needed at Tuya platform. Free trial account is totally suitable, no need to pay.

You need the Device ID (20 chars) and Local Key (16 chars) from Tuya's developer portal.

On your PC:

pip install tinytuya
python -m tinytuya wizard

This walks you through linking your Tuya developer account and dumps all device credentials to devices.json and other JSON files. Dont forget to check those.

⚠️ The Local Key changes every time you remove and re-add a device in the Smart Life app. Don't do that after grabbing the key.

Tuya v3.3 uses plain TCP on port 6668 with AES-128-ECB encrypted JSON payloads. Each frame looks like:

[HEADER 4B][SEQUENCE 4B][COMMAND 4B][LENGHt 4B][AES'ed PAYLOAD][FOOTER 4B]

  • Header: 00 00 55 AA
  • Sequence: 00 00 00 01
  • Command: 00 00 00 0a
  • Lenght (hex): 00 00 00 78
  • Payload:
  • Footer: 00 00 AA 55

Lenght is measured PAYLOAD + FOOTER

All my AUBESS plugs are query-style - after connecting to socket Plug waits. You must send a status query packet first.

Standard Tuya v3.3 documentation says the encrypted payload should be prefixed with 3.3\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00 (15 bytes). This makes packets 151 bytes. It is required when sending set_switch command

After capturing tinytuya traffic I discovered my plugs want no prefix for the status query — 136-byte packets only. The prefix is only used for control packets (turning on/off).

This single detail isn't documented anywhere I could find.

Packet type Version prefix Packet size
Status query (cmd 0x0a) ❌ No prefix 136 bytes
Heart-beat (cmd 0x09) ❌ No prefix 136 bytes
Control (cmd 0x07) ✅ With prefix 151 bytes

MicroPython's utime.time() epoch starts at 2000-01-01, not 1970-01-01 like Unix.

Tuya validates the t field in the payload and rejects queries where the timestamp is too far from real Unix time.

Fix this by syncing NTP on boot and adding the epoch offset:

boot.py
import network, utime, ntptime
 
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.connect('YOUR_SSID', 'YOUR_PASSWORD')
while not sta.isconnected():
    utime.sleep_ms(300)
 
ntptime.settime()  # sync to real UTC
# In your main script
EPOCH_OFFSET = 946684800  # seconds between 1970 and 2000
 
def unix_now():
    return utime.time() + EPOCH_OFFSET

The plug returns a dps dictionary. For AUBESS Smart Socket 20A:

Key Unit Description
1 bool Switch state
9 Countdown timer
18 mA Current
19 W × 10 Power (divide by 10)
20 V × 10 Voltage (divide by 10)
21 Power-on mode
22–24 Calibration values
25 mA Leakage current threshold

Mapping could be found in files, generated by tinytuya wizard

  • ECONNRESET immediately — plug rejects the packet content. Check key order in JSON and version prefix.
  • ETIMEDOUT — plug ignores the packet entirely. Check timestamp (NTP sync), and whether you're using prefix or no-prefix for the right packet type.
  • data format error in decrypted response — key is correct but payload format is wrong. Check JSON structure.
  • Only one connection at a time — close the Smart Life app on your phone before connecting from ESP32.

This monkey-patch script helped a lot. Using it i was able to prepare Microptyhon library for Tuya devices

capture.py
# Run this on PC: python capture.py
import tinytuya
import socket
import threading
import time
 
# Pinguin
DEVICE_IP = '192.168.1.55'
DEVICE_ID = '1<hidden>9'
LOCAL_KEY  = '5<hidden>b'
 
# Monkey-patch socket to capture raw bytes
_orig_connect = socket.socket.connect
_orig_send    = socket.socket.send
_orig_recv    = socket.socket.recv
_orig_sendall = socket.socket.sendall
 
def patched_connect(self, addr):
    print('[SOCKET] connect to', addr)
    return _orig_connect(self, addr)
 
def patched_send(self, data, *args):
    print('[SOCKET] send %d bytes:' % len(data),
          ' '.join('%02x' % b for b in data))
    return _orig_send(self, data, *args)
 
def patched_recv(self, size, *args):
    data = _orig_recv(self, size, *args)
    print('[SOCKET] recv %d bytes:' % len(data),
          ' '.join('%02x' % b for b in data))
    return data
 
def patched_sendall(self, data, *args):
    print('[SOCKET] sendall %d bytes:' % len(data), 
          ' '.join('%02x' % b for b in data))
    return _orig_sendall(self, data, *args)
 
socket.socket.connect = patched_connect
socket.socket.send    = patched_send
socket.socket.recv    = patched_recv
socket.socket.sendall = patched_sendall
 
d = tinytuya.OutletDevice(
    dev_id=DEVICE_ID,
    address=DEVICE_IP,
    local_key=LOCAL_KEY,
    version=3.3
)
d.set_socketTimeout(10)
data = d.status()
print('STATUS:', data)
  • Sending heartbeat (0x09) before the status query — some plugs need it, mine didn't
  • utime.time() without NTP sync — plug rejects stale timestamps
  • Version prefix on status query — adds 15 bytes that this plug firmware rejects

This was tested on AUBESS Smart Socket 20A running Tuya firmware v3.3. Other Tuya devices may behave differently — some push status on connect without needing a query, some require a heartbeat first. The packet capture approach (monkey-patching Python's socket in tinytuya) is the most reliable way to debug unknown devices.

All code runs on standard MicroPython ESP32 builds with no additional packages. The only modules used are socket, time, struct, cryptolib, binascii, errno and json— all built-in.

uTuya.py
"""
uTuya - MicroPython Library for Local Tuya Device Control
==========================================================
 
A lightweight MicroPython library for controlling Tuya-compatible smart devices
(smart plugs, switches, lights, etc.) over local network without cloud dependency.
 
Author: Ignas Bukys
ignas.bukys [] gmail [] com
License: MIT
Version: 1.0.0 @ 2026-05-13
 
Features:
---------
- Local network communication (no cloud required)
- AES encryption/decryption for secure device communication
- Support for Tuya protocol v3.3
- Energy monitoring (voltage, current, power) for compatible devices
- DPS (Data Point) read/write operations
- Heartbeat support to keep connections alive
- Configurable debug logging
- Optional connection pooling for frequent queries
- Memory-optimized with memoryview to reduce allocations
 
Hardware Requirements:
---------------------
- ESP32, ESP8266, or other MicroPython-compatible microcontroller
- Tuya-compatible smart device on the same local network
 
Dependencies:
------------
- MicroPython standard library (socket, time, struct, json)
- cryptolib (MicroPython AES encryption module)
 
Usage Example:
-------------
    from tuya import uTuya
 
    # Initialize device connection
    device = uTuya(
        ip='192.168.1.100',
        dev='your_device_id',
        key='your_local_key',
        *version=3.3,
        *debug=True
    )
    * Optional
 
    # Turn switch ON
    device.turn_on()
    device.set_dps(1, True)
 
    # Turn switch OFF
    device.turn_off()
    device.set_dps(1, False)
 
 
    # Get energy monitoring data
    print(f"Voltage: {device.voltage} V")
    print(f"Current: {device.current} A")
    print(f"Power: {device.power} W")
 
    # Check switch state
    if device.is_on:
        print("Device is ON")
 
    # Get all DPS values
    dps = device.get_dps()
    print(dps)
 
    # Get specific DPS value
    switch_state = device.get_dps(1)
 
    # Set custom DPS value
    device.set_dps(1, True)  # Turn on
 
    # Print formatted status
    device.print_status()
 
    # Send heartbeat
    device.heartbeat()
 
    # Use keep_alive mode for multiple operations (automatic cleanup)
    device_persistent = uTuya(
        ip='192.168.1.100',
        dev='your_device_id',
        key='your_local_key',
        keep_alive=True  # Reuse socket connection
    )
 
    # Multiple operations reuse the same socket
    device_persistent.set_switch(True)
    print(device_persistent.voltage)
    device_persistent.heartbeat()
 
    # Close persistent connection when done
    device_persistent.close()
 
How to Get Device Credentials:
------------------------------
1. Device ID and Local Key: Read on http://bukys.eu blog post how to get Extract using tool tinytuya
2. IP Address: Check your router's DHCP table or use network scanner
 
Protocol Notes:
--------------
- Query packets (CMD_QUERY): NO version prefix in payload encryption
- Control packets (CMD_CONTROL): REQUIRES version prefix (e.g., '3.3')
- All packets use: HEADER + BODY + CRC32 + FOOTER structure
- Responses are AES-encrypted and may include version prefix
 
Memory Optimization Notes:
--------------------------
- memoryview is used to avoid unnecessary data copying
- Slicing memoryview creates a view, not a copy (zero-copy operations)
- Must convert memoryview to bytes when passing to functions that need bytes
- Pre-allocated buffers for CRC32 calculation reduce GC pressure
- In-place operations where possible to minimize allocations
 
MIT License:
-----------
Copyright (c) 2025 Ignas Bukys
 
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
 
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
 
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
 
import socket, errno, time, binascii, struct
import cryptolib               # pyright: ignore[reportMissingImports]
import json
from micropython import const   # pyright: ignore[reportMissingImports]
 
# ── Socket Management ────────────────────────────────────────────────
class _TuyaSocket:
    """
    Context manager for safe socket handling with automatic cleanup.
    Now handles the opening, connecting, and logic for persistent sockets.
    """
 
    def __init__(self, parent):
        """
        Initialize socket context manager.
 
        Args:
            parent (uTuya): Parent uTuya instance that provides IP, PORT, 
                           and stores the persistent socket state.
        """
        self.parent = parent
        self.sock = None
        self.is_reused = False
 
 
    def _sock_open(self):
        """
        Private helper to create and connect a new TCP socket.
        Returns socket object or None on failure.
        """
        self.parent._log("Creating new socket object")
        sck = socket.socket()
        sck.settimeout(3)  # Standard timeout for Tuya handshake
 
        try:
            self.parent._log(f"Connecting to {self.parent.ip}:{self.parent.PORT}")
            sck.connect((self.parent.ip, self.parent.PORT))
            self.parent._log("Connected successfully")
            return sck
 
        except socket.error as e:
            # Handle common network errors with descriptive logs
            if e.errno == errno.EHOSTUNREACH:
                self.parent._log(f"Error: Host {self.parent.ip} unreachable.")
            elif e.errno == errno.ECONNREFUSED:
                self.parent._log(f"Error: Connection refused on port {self.parent.PORT}.")
            elif e.errno == errno.ECONNRESET:
                self.parent._log(f"Error: Connection reset by peer.")
            else:
                self.parent._log(f"Socket error: {e}")
            return None
        except Exception as e:
            self.parent._log(f"Unexpected error opening socket: {e}")
            return None
 
    def __enter__(self):
        """
        Enter context: open or reuse socket connection.
 
        Returns:
            socket: Active socket connection
 
        Raises:
            OSError: If connection fails
        """
        # 1. Try to reuse a persistent socket if keep_alive is active
        if self.parent.keep_alive and self.parent._socket:
            self.parent._log("Reusing persistent socket")
            self.sock = self.parent._socket
            self.is_reused = True
            return self.sock
 
        # Otherwise, open a fresh connection
        self.sock = self._sock_open()
 
        if self.sock is None:
            raise OSError("Failed to connect to device")
 
        # If keep_alive is enabled, store this socket for future reuse
        if self.parent.keep_alive:
            self.parent._socket = self.sock
            self.parent._log("Socket stored for keep_alive mode")
 
        self.is_reused = False
        return self.sock
 
    def __exit__(self, exc_type, exc_val, exc_tb):
        """
        Exit context: close socket only if NOT in keep_alive mode or if error occurred.
 
        Args:
            exc_type: Exception type (if any)
            exc_val: Exception value (if any)
            exc_tb: Exception traceback (if any)
        """
        # If an exception occurred, close and clear the socket even in keep_alive mode
        if exc_type is not None:
            self.parent._log(f"Exception in socket context: {exc_val}")
            if self.sock:
                try: self.sock.close()
                except: pass
 
            # Clear the persistent socket reference so next call opens fresh
            self.parent._socket = None
            return False
 
        # If NOT in keep_alive mode, close the socket now
        if not self.parent.keep_alive and self.sock:
            self.parent._log("Closing short-lived socket (not in keep_alive mode)")
            try:
                self.sock.close()
            except:
                pass
        else:
            self.parent._log("Socket kept open (keep_alive mode)")
 
        return False
 
class uTuya:
    """
    Main class for Tuya device communication over local network.
 
    Handles protocol encryption, packet crafting, and device state management.
    Memory-optimized with memoryview for reduced allocations.
    """
 
    PORT = const(6668)                      # Standard Tuya device port
    HEADER = const(b'\x00\x00\x55\xaa')     # Packet header signature
    FOOTER = const(b'\x00\x00\xaa\x55')     # Packet footer signature
    CMD_HEARTBEAT = const(0x09)             # Keep-alive command as heartbeay
    CMD_QUERY = const(0x0a)                 # Read device state command
    CMD_CONTROL = const(0x07)               # Write/control command
 
    # Unix epoch (1970-01-01) vs MicroPython epoch (2000-01-01) offset in seconds
    # Used to convert MicroPython time.time() to Unix timestamp for Tuya protocol
    EPOCH_DIFF = const(946684800)
 
    def __init__(self, ip, dev, key, version=3.3, debug=False, keep_alive=False):
        """
        Initialize Tuya device connection.
 
        Args:
            ip (str): Device IP address on local network
            dev (str): Tuya device ID (gwId/devId)
            key (str|bytes): AES encryption key for device
            version (float|str): Protocol version (default '3.3')
            debug (bool): Enable debug logging (default False)
            keep_alive (bool): Maintain persistent socket connection (default False)
        """
        self.ip = ip
        self.did = dev
        # Ensure local_key is bytes for cryptolib
        self.key = key.encode() if isinstance(key, str) else key
        self.version = str(version)
        self.sequence = 0          # Packet sequence counter (incremented per request)
        self.debug = debug         # Debug logging flag
        self.keep_alive = keep_alive
        self._socket = None        # Persistent socket for keep_alive mode
 
        try:
            _ = binascii.crc32
            self._crc32_available = True
        except (AttributeError, ImportError):
            self._crc32_available = True
 
    # ── Cryptography ────────────────────────────────────────────────────────
    def _crc32(self, data):
        """
        Calculate CRC32 checksum for packet integrity verification.
 
        Memory-optimized: Uses memoryview to avoid copying data when iterating.
 
        Uses polynomial 0xEDB88320 (reversed 0x04C11DB7) as per Tuya protocol.
        This is the same CRC32 algorithm used in ZIP, Ethernet, PNG, etc.
 
        Args:
            data (bytes): Data to calculate checksum for
 
        Returns:
            int: 32-bit CRC checksum
        """
        # use the firmware's built-in CRC32 (written in C- faster)
        if self._crc32_available:
            return binascii.crc32(data)
 
        # use pure python implementation
        crc = 0xFFFFFFFF
 
        # Use memoryview to iterate without copying - saves memory
        mv = memoryview(data)
        for byte in mv:
            crc ^= byte
            for _ in range(8):
                if crc & 1:
                    crc = (crc >> 1) ^ 0xEDB88320
                else:
                    crc >>= 1
 
        return crc ^ 0xFFFFFFFF
 
 
    def _aes_encrypt(self, data):
        """
        Encrypt data using AES-128-ECB with PKCS7 padding.
 
        Tuya uses ECB mode (not CBC) for simplicity, though less secure.
        PKCS7 padding ensures data length is multiple of 16 bytes.
 
        Args:
            data (str|bytes): Plaintext to encrypt
 
        Returns:
            bytes: Encrypted ciphertext
        """
        if isinstance(data, str):
            data = data.encode()
 
        # PKCS7 padding: add N bytes of value N to reach 16-byte boundary
        pad = 16 - (len(data) % 16)
        data += bytes([pad] * pad)
 
        # cryptolib mode 1 = ECB encryption
        return cryptolib.aes(self.key, 1).encrypt(data)
 
 
    def _aes_decrypt(self, data):
        """
        Decrypt AES-128-ECB encrypted data and remove PKCS7 padding.
 
        Args:
            data (bytes): Encrypted ciphertext
 
        Returns:
            bytes: Decrypted plaintext with padding removed
        """
        dec = cryptolib.aes(self.key, 1).decrypt(data)
 
        # Remove PKCS7 padding (last byte indicates pad length)
        pad = dec[-1]
        if pad <= 16:  # Valid padding must be 1-16
            dec = dec[:-pad]
        return dec
 
    # ── Protocol ──────────────────────────────────────────────────────────
    def _craft_packet(self, cmd, payload_bytes):
        """
        Construct a complete Tuya protocol packet.
 
        Packet structure:
        - HEADER (4 bytes): 0x000055aa
        - Sequence (4 bytes): Incrementing packet counter
        - Command (4 bytes): Command code (0x07, 0x09, 0x0a, etc.)
        - Length (4 bytes): Payload length + 8 (includes return code + payload)
        - Payload (variable): Encrypted data
        - CRC32 (4 bytes): Checksum of HEADER + body
        - FOOTER (4 bytes): 0x0000aa55
 
        Args:
            cmd (int): Command code (CMD_QUERY, CMD_CONTROL, etc.)
            payload_bytes (bytes): Encrypted payload data
 
        Returns:
            bytes: Complete packet ready to send
        """
        self.sequence += 1  # Increment for each new request
 
        # Length includes 8-byte overhead (4-byte return code + 4-byte CRC in some contexts)
        length = len(payload_bytes) + 8
 
        # Body: sequence + command + length + payload
        body = struct.pack('>III', self.sequence, cmd, length) + payload_bytes
 
        # Calculate CRC over header + body
        crc = self._crc32(self.HEADER + body) & 0xFFFFFFFF
        packet = self.HEADER + body + struct.pack('>I', crc) + self.FOOTER
 
        self._log(f"Crafted packet: seq={self.sequence}, cmd=0x{cmd:02x}, len={length}, total={len(packet)} bytes")
        return packet
 
 
    def _encrypt_query(self, data, version_incl=False):
        """
        Encrypt payload with optional version prefix.
 
        CRITICAL PROTOCOL DIFFERENCE:
        - Query packets (CMD_QUERY): NO version prefix (version_incl=False)
        - Control packets (CMD_CONTROL): REQUIRE version prefix (version_incl=True)
 
        Version prefix format: '3.3' + 12 null bytes + encrypted_data
 
        Args:
            data (str|bytes): JSON payload to encrypt
            version_incl (bool): Whether to prepend version header
 
        Returns:
            bytes: Encrypted payload (with version prefix if requested)
        """
        self._log(f"Encrypting payload ({len(data)} bytes, version_prefix={version_incl})")
 
        enc = self._aes_encrypt(data)
 
        if version_incl:
            # Format: '3.3\x00\x00...\x00' (version string + 12 nulls) + encrypted
            # result = b'{%s}' % self.version.encode() + b'\x00' * 12 + enc
            result = b'%s' % self.version.encode() + b'\x00' * 12 + enc
            self._log(f"Added version prefix: total {len(result)} bytes")
            return result
 
        self._log(f"No version prefix: {len(enc)} bytes")
        return enc
 
 
    def _decrypt_response(self, resp):
        """
        Decrypt and parse device response packet.
 
        MEMORY-OPTIMIZED: Uses memoryview to slice without copying data.
 
        Response may have various prefixes before the actual JSON:
        - 4-byte null prefix (0x00000000)
        - Version prefix ('3.3' + 12 nulls)
 
        Args:
            resp (bytes): Raw response packet from device
 
        Returns:
            dict|None: Parsed JSON data or None if decryption failed
        """
        # ── Validation ──
        if not resp or len(resp) < 16:
            self._log("Response too short")
            return None
 
        # Create memoryview for zero-copy slicing
        mv = memoryview(resp)
 
        # Verify packet structure (header at start, footer at end)
        # Must convert memoryview slices to bytes for comparison
        if bytes(mv[:4]) != self.HEADER or bytes(mv[-4:]) != self.FOOTER:
            self._log("Invalid header/footer in response")
            return None
 
        # ── Extract Payload ──
        # Length field at offset 12 includes 8-byte overhead
        length = struct.unpack('>I', mv[12:16])[0]
 
        # Extract payload using memoryview (zero-copy)
        pl_view = mv[16:16 + length - 8]
 
        # ── Strip Prefixes ──
        # Some responses have 4-byte null prefix (0x00000000)
        if len(pl_view) >= 4 and bytes(pl_view[:2]) == b'\x00\x00':
            pl_view = pl_view[4:]
 
        # Strip version prefix if present ('3.3' + 12 nulls = 15 bytes)
        if pl_view[:3] == b'3.3':
            pl_view = pl_view[15:]
 
        # ── Decrypt ──
        # Payload must be multiple of 16 (AES block size)
        if not pl_view or len(pl_view) % 16 != 0:
            self._log(f"Invalid payload length: {len(pl_view)}")
            return None
 
        # Convert memoryview to bytes for decryption (cryptolib needs bytes)
        payload = bytes(pl_view)
        try:
            dec = self._aes_decrypt(payload)
 
            # Find JSON boundaries in decrypted data
            # (sometimes has garbage before/after the JSON)
            i = dec.find(b'{')
            j = dec.rfind(b'}') + 1
 
            if i >= 0 and j > i:
                return json.loads(dec[i:j])
        except Exception as e:
            self._log(f"Decryption error: {e}")
 
        return None
 
    # ── Helpers ────────────────────────────────────────────────────────
    def _unix_now(self):
        """
        Get current Unix timestamp (seconds since 1970-01-01).
 
        MicroPython's time.time() returns seconds since 2000-01-01,
        so we add EPOCH_DIFF to convert to Unix time.
 
        Returns:
            int: Current Unix timestamp
        """
        return int(time.time() + self.EPOCH_DIFF)
 
    def _log(self, msg):
        """
        Print debug log message if debug mode enabled.
 
        Args:
            msg (str): Message to log
        """
        if self.debug:
            print(f"[uTuya] {msg}")
 
    def _is_ack(self, raw):
        """
        Checks if a raw response packet is a successful ACK from the device.
 
        Args:
            raw (bytes): The raw data received from the socket.
 
        Returns:
            bool: True if this appears to be a valid ACK
        """
 
        # Use memoryview for zero-copy header check
        mv = memoryview(raw)
 
        # Minimum valid packet size for an ACK is 28 bytes:
        # Header(4) + Seq(4) + Cmd(4) + Len(4) + RetCode(4) + CRC(4) + Footer(4)
        if not raw or len(raw) < 28:
            self._log("ACK Check: Response too short or empty")
            return False
 
        # 1. Verify Packet Integrity (Header and Footer)
        if bytes(mv[:4]) != self.HEADER or bytes(mv[-4:]) != self.FOOTER:
            self._log("ACK Check: Invalid Header or Footer")
            return False
 
        try:
            # 2. Extract Return Code
            # In the Tuya protocol, the Return Code is the first 4 bytes of the 
            # payload area, which starts at index 16.
            ret_code = struct.unpack('>I', bytes(mv[16:20]))[0]
 
            if ret_code == 0:
                self._log("ACK Check: Success (Return Code 0)")
                return True
            else:
                self._log(f"ACK Check: Device returned error code {ret_code}")
                return False
 
        except Exception as e:
            self._log(f"ACK Check: Parsing error: {e}")
            return False
 
    # ── Public API ────────────────────────────────────────────────────────
    def get_data_raw(self):
        """
        Query device for current state (all DPS values).
 
        Sends CMD_QUERY command with device ID and timestamp.
        Does NOT include version prefix for query packets.
 
        Returns:
            dict|None: Response containing 'dps' dict with all data points, or None on error
 
        Example response:
            {
                'dps': {
                    '1': True,      # Switch state
                    '18': 2340,     # Current (mA)
                    '19': 523,      # Power (W*10)
                    '20': 2301      # Voltage (V*10)
                }
            }
        """
        # Construct JSON payload with device ID and timestamp
        t = str(self._unix_now())
        payload = ('{"gwId":"%s","devId":"%s","uid":"%s","t":"%s"}'
                   % (self.did, self.did, self.did, t))
 
        # Encrypt WITHOUT version prefix (critical for query packets)
        enc = self._encrypt_query(payload, version_incl=False)
        pkt = self._craft_packet(self.CMD_QUERY, enc)
 
        self._log(f"Sending QUERY command (seq={self.sequence})")
 
        try:
            # Context manager ensures socket is always closed properly
            with _TuyaSocket(self) as s:
                self._log(f"Request ({len(pkt)} bytes): {pkt.hex()}")
                s.send(pkt)
                raw = s.recv(512)  # Max response size
                self._log(f"Response ({len(raw)} bytes): {raw.hex()}")
                return self._decrypt_response(raw)
 
        except OSError as e:
            print(f"Connection error in get_data_raw: {e}")
            return None
        except Exception as e:
            print(f'get_data_raw error: {e}')
            return None
 
 
    def get_dps(self, key=None):
        """
        Get DPS (Data Point) values from device.
 
        Args:
            key (int|str|None): Specific DPS key to retrieve, or None for all
 
        Returns:
            dict: All DPS values if key=None
            any: Specific DPS value if key provided
            None: On error or if key not found
 
        Example:
            device.get_dps()      # {'1': True, '18': 2340, '19': 523, '20': 2301}
            device.get_dps(1)     # True
            device.get_dps('20')  # 2301
        """
        data = self.get_data_raw()
        if not data:
            return None
 
        dps = data.get('dps', {})
 
        # Return specific key if requested, otherwise return all DPS
        return dps.get(str(key)) if key is not None else dps
 
 
    def set_dps(self, key, value):
        """
        Set a specific DPS (Data Point) value on device.
 
        Generic setter for any DPS key/value pair. Useful for advanced control
        beyond simple switch on/off (e.g., brightness, color, timer, etc.)
 
        Note: Some devices send TWO responses:
        1. ACK packet (command accepted)
        2. Status packet (updated DPS values)
 
        Args:
            key (int|str): DPS key to set
            value (any): Value to set (bool, int, str, etc.)
 
        Returns:
            dict|None: Response from device or None on error
 
        Example:
            device.set_dps(1, True)        # Turn switch on
            device.set_dps(2, 50)          # Set brightness to 50%
            device.set_dps(3, "white")     # Set color mode
        """
        t = str(self._unix_now())
 
        # Convert value to JSON string if not already a string
        # (booleans need lowercase 'true'/'false', not Python's 'True'/'False')
        val_str = json.dumps(value) if not isinstance(value, str) else value.lower()
 
        payload = ('{"gwId":"%s","devId":"%s","uid":"%s","t":"%s","dps":{"%s":%s}}'
                   % (self.did, self.did, self.did, t, key, val_str))
 
        # Control commands REQUIRE version prefix (critical!)
        enc = self._encrypt_query(payload, version_incl=True)
        pkt = self._craft_packet(self.CMD_CONTROL, enc)
 
        self._log(f"Sending CONTROL (seq={self.sequence}): DPS {key}={value}")
 
        try:
            # Context manager ensures socket is always closed properly
            with _TuyaSocket(self) as s:
                self._log(f"Request ({len(pkt)} bytes): {pkt.hex()}")
                s.send(pkt)
 
                # Receive the first response (The ACK is expected)
                raw_ack = s.recv(512)
                self._log(f"ACK response ({len(raw_ack)} bytes): {raw_ack.hex()}")
 
                if self._is_ack(raw_ack):
                    self._log("Command accepted by device.")
 
                    # Receive actual response (expected status update)
                    raw = s.recv(512)
                    self._log(f"Response ({len(raw)} bytes): {raw.hex()}")
 
                    return self._decrypt_response(raw)
 
                self._log("Command failed or timed out.")
                return None
 
        except OSError as e:
            print(f"Connection error in set_dps: {e}")
            return None
        except Exception as e:
            print(f'set_dps error: {e}')
            return None
 
 
    def heartbeat(self):
        """
        Send heartbeat packet to device to keep connection alive.
 
        Some devices may drop idle connections. Send periodic heartbeats
        to maintain persistent connections.
 
        Returns:
            bool: True if heartbeat acknowledged, False on error
        """
        # Heartbeat has empty payload
        pkt = self._craft_packet(self.CMD_HEARTBEAT, b'')
 
        self._log(f"Sending HEARTBEAT (seq={self.sequence})")
 
        try:
            # Context manager ensures socket is always closed properly
            with _TuyaSocket(self) as s:
                self._log(f"Heatbeat request ({len(pkt)} bytes): {pkt.hex()}")
                s.send(pkt)
                raw = s.recv(512)
                self._log(f"Heartbeat ACK response ({len(raw)} bytes): {raw.hex()}")
                return raw is not None
 
        except OSError as e:
            print(f"Connection error in heartbeat: {e}")
            return False
        except Exception as e:
            print(f'heartbeat error: {e}')
            return False
 
    def turn_on(self):
        """Convenience method to turn device ON"""
        return self.set_dps(1, True)
 
    def turn_off(self):
        """Convenience method to turn device OFF"""
        return self.set_dps(1, False)
 
    # ── Convenience Properties ────────────────────────────────────────────
    # NOTE: Properties like voltage() call self.get_dps(), which calls self.get_data_raw().
    # If you check voltage, current, and power sequentially, you are performing 
    # three separate network requests. For efficiency, call get_dps() once and 
    # calculate all values from the returned dict.
 
    @property
    def voltage(self):
        """
        Get current voltage in Volts.
 
        Returns:
            float|None: Voltage in V (e.g., 230.1) or None on error
        """
        dps = self.get_dps()
        return dps.get('20', 0) / 10 if dps else None
 
    @property
    def current(self):
        """
        Get current draw in Amperes
 
        Returns:
            float|None: Current in A (e.g., 2.34) or None on error
        """
        dps = self.get_dps()
        return dps.get('18', 0) / 1000 if dps else None
 
    @property
    def power(self):
        """
        Get power consumption in Watts.
 
        Returns:
            float|None: Power in W (e.g., 52.3) or None on error
        """
        dps = self.get_dps()
        return dps.get('19', 0) / 10 if dps else None
 
    @property
    def is_on(self):
        """
        Check if device switch is currently ON.
 
        Returns:
            bool|None: True if ON, False if OFF, None on error
        """
        dps = self.get_dps()
        return dps.get('1', False) if dps else None
 
    # ── Utility Methods ────────────────────────────────────────────────
 
    def print_status(self):
        """
        Print formatted device status to console.
 
        Displays switch state, voltage, current, power, and raw DPS values.
        Useful for debugging and monitoring.
        """
        dps = self.get_dps()
 
        print('-' * 40)
        print('Device Status:')
        print('-' * 40)
        print(f'  Switch : {dps.get("1", "N/A")}')
        print(f'  Voltage: {dps.get("20", 0) / 10:.1f} V')
        print(f'  Current: {dps.get("18", 0) / 1000:.3f} A')
        print(f'  Power  : {dps.get("19", 0) / 10:.1f} W')
        print('-' * 40)
        print(f'  Raw DPS: {dps}')
        print('-' * 40)
2026/05/13 18:26 · Ignas · 0 Comments

I like to try various gadgets for my bike. Some enhance my side, some are just, well, meh… :)

With a new bike computer, I have logged more than 1000 kilometers this season, and I’m eager to give my opinion, based on my personal experience. The gadget I’m talking about is the Geoid CC600. The Geoid CC600 uses a ESP32 chip which serves as the CPU. This allows for a feature set that, for its price point, is very impressive.

  • 2.4' color screen with 3 physical buttons
  • Backlight screen with ambient light sensor
  • IPX7 waterproof level, but I would not keep it on rain
  • GPS, Galileo, Beidou, Glonass and QZSS positioning systems
  • Type-C standard charging port
  • 17-24 hours battery life, based on settings and usage scenario
  • Wifi, Bluetooth and ANT+ wireless protocols
  • 100 hours of storage
  • Supported sensors: Speed*, Cadence*, Heartrate* Power meter, Smart trainer, Electronic shifting, Tail light, Radar*

* - Tested: I own XOSS Vortex Speed and Cadence sensors (works in both BT and ANT+), Garmin Heartrate sensor and W100 Radar.

Thing that stood out to me about the device was the display. It is a bright, colored screen that is easy to navigate through even in direct sunlight, which is something I have struggled with in the past. The screen’s brightness coupled with the resolution means that all the essential metrics and even navigation details are squinted in.

I have to say that the CC600's capability to interface with ANT+ and Bluetooth Low Energy (BLE) is a given advantage. I have seamlessly connected it with my heart rate monitor, speed sensor, and cadence sensor, and all of the connections have remained consistent with no surprise disconnections during my rides. Also, the unit has quite a number of data fields which gives me the possibility to customize my display pages to show the information that I want to see at a glance.

The navigation feature is a breadcrumb style which is good for following a set route. I have used it to follow GPX files that I have uploaded from my phone and it has worked unbelievably well. While it lacks the detailed, full-map view of more expensive devices, its simple outline track and turn-by-turn prompts are adequate to keep the user on track. This is a no-frills navigation system that gets the job done quite well. One more thing that I like is that the navigation track is shown in color, which is good for visibility. Also, worth to mention, that routes created in OneLapFit application has additional information on map about sideroads, which will help you to orient where to make your turn. If you drive away from your route, your route will be rerouted if OneLapFit application is open on your phone

After 1000 kilometers, the Geoid CC600 has proven to be a reliable and capable bike computer. It's not perfect—the companion app can be a little clunky at times, and the setup was a bit cryptic initially. However, the hardware itself is solid, and for its price, it offers a great balance of features, especially for someone who primarily wants a device for tracking stats and following simple routes. If you're in the market for a budget-friendly bike computer with a vibrant display and essential features, the Geoid CC600 is definitely worth considering.

2025/10/01 12:57 · Ignas · 0 Comments

In this post, I want to share my personal experience in choosing and configuring a 4G router for my home. Bigger project I was on, required me to install a network solution in a house, and the choice was determined by the ratio of the offered functionality and price.

Out of many choices, the ZBT WE826 model caught my eye. This router had an attractive price and seemed suitable for my needs. With a 4G module, the router costs about €60. I found information that it supports OpenWRT software. I purchased from the Aliexpress platform and received the package.

However, after starting to use this router, problems arose. The 4G connection worked only for 3-8 hours (very randomly), and then disappeared. I wanted to make sure that the device was working properly and fairly stably, so at first I didn't change any settings and tried to use the one I received. I tried to reconfigure the device according to the seller's notes and advice, but it didn't help. The seller shared the firmware for the router based on the LEDE Reboot version from 2018. I decided to install it, but it wasn't successful either - the device worked for 8 minutes and rebooted. Apparently some script was working.

Finally, I decided to install the latest OpenWRT firmware version specifically for my device. I chose the “ZBT W826 T 16M factory image” version. This solution turned out to be correct and my device with the new firmware started up. As expected, the 4G modem did not work and needed to be configured. Two sources helped me do this:

You need to understand that in order to install some packages into router, you need an internet connection. I didn't had a wired connection, so I thought, maybe I can share the internet on my mobile and make the router a Wifi client. It was fairly straight forward. I've managed to enable 4G support by following documentation. Left router to hang overnight to evaluate stability. To my deep disappointment, the initial situation repeated itself - the 4G connection disappeared after a few hours.

Since I was using the latest version of OpenWRT, I decided to describe my problem on the OpenWRT forum. A forum member, guru, AndrewZ, responded quite quickly and asked for additional data from the router. After an initial analysis, he suggested changing the 4G module mode and reconfigure the router. I read the procedure and decided to go this route. I have nothing to lose, even if I break my router!

I reloaded the OpenWRT firmware once again to start from fresh, shared the Internet via my mobile phone. I changed the module configuration to support the QMI protocol using AT commands. I configured the router according to the documentation. This process was a challenge, but in the end, I've managed to achieve the desired result.

Now I have router that gives me low level configuration, has wide support for various software and active community in case something goes wrong. My ZBT WE826 router works stably and without problems. The 4G connection is stable, and the Wi-Fi coverage is very good. In addition, the form factor of the router is perfect for hanging on the wall, which is a big advantage.

This project showed that even with an inexpensive device, good results can be achieved if you are ready to invest some time and effort. The main thing is not to give up and look for solutions, even if everything seems complicated at first.

New blog entry:
  • start.txt
  • Last modified: 2026/05/13 18:26
  • by Ignas