====== KY-040 - Rotary Encoder ======
360 Degrees Rotary Encoder Module with 15×16.5 mm Potentiometer Rotary Knob Cap
{{:components:ky-040.jpg?nolink&200|}}
**Specification**
* Working voltage: 5V
* Pulse circle: 20
* Size: 26 x 19 x 29mm Diameter of the thread: 7mm
* Height of the thread: 7mm
====== Code ======
===== Library =====
# Author: Ignas Bukys
# Copyright Ignas Bukys 2022 Released under the MIT license
# Based on Kevin Köck and Peter Hinch's works for rotary encoder and async switch.
# Created on 2022-09-10
__updated__ = "2022-09-10"
__version__ = "0.1"
import uasyncio as asyncio
import time
import micropython
from machine import Pin
_DIR_CW = const(0x10) # Clockwise step
_DIR_CCW = const(0x20) # Counter-clockwise step
# Rotary Encoder States
_R_START = const(0x0)
_R_CW_1 = const(0x1)
_R_CW_2 = const(0x2)
_R_CW_3 = const(0x3)
_R_CCW_1 = const(0x4)
_R_CCW_2 = const(0x5)
_R_CCW_3 = const(0x6)
_R_ILLEGAL = const(0x7)
_R_transition_table = [
# |------------- NEXT STATE -------------| |CURRENT STATE|
# CLK/DT CLK/DT CLK/DT CLK/DT
# 00 01 10 11
[_R_START, _R_CCW_1, _R_CW_1, _R_START], # _R_START
[_R_CW_2, _R_START, _R_CW_1, _R_START], # _R_CW_1
[_R_CW_2, _R_CW_3, _R_CW_1, _R_START], # _R_CW_2
[_R_CW_2, _R_CW_3, _R_START, _R_START | _DIR_CW], # _R_CW_3
[_R_CCW_2, _R_CCW_1, _R_START, _R_START], # _R_CCW_1
[_R_CCW_2, _R_CCW_1, _R_CCW_3, _R_START], # _R_CCW_2
[_R_CCW_2, _R_START, _R_CCW_3, _R_START | _DIR_CCW], # _R_CCW_3
[_R_START, _R_START, _R_START, _R_START]] # _R_ILLEGAL
_R_transition_table_half_step = [
[_R_CW_3, _R_CW_2, _R_CW_1, _R_START],
[_R_CW_3 | _DIR_CCW, _R_START, _R_CW_1, _R_START],
[_R_CW_3 | _DIR_CW, _R_CW_2, _R_START, _R_START],
[_R_CW_3, _R_CCW_2, _R_CCW_1, _R_START],
[_R_CW_3, _R_CW_2, _R_CCW_1, _R_START | _DIR_CW],
[_R_CW_3, _R_CCW_2, _R_CW_3, _R_START | _DIR_CCW],
[_R_START, _R_START, _R_START, _R_START],
[_R_START, _R_START, _R_START, _R_START]]
_STATE_MASK = const(0x07)
_DIR_MASK = const(0x30)
type_gen = type((lambda: (yield))()) # Generator type
# If a callback is passed, run it and return.
# If a coro is passed initiate it and return.
# coros are passed by name i.e. not using function call syntax.
def launch(func, tup_args):
res = func(*tup_args)
if isinstance(res, type_gen):
loop = asyncio.get_event_loop()
loop.create_task(res)
class ky040:
debounce_ms = 50
long_press_ms = 1000
double_click_ms = 400
def __init__(self, btn_num, clk_num, dt_num, rotary_reverse = False, suppress=True):
self.pin = Pin(btn_num, Pin.IN, Pin.PULL_UP)
self._supp = suppress # don't call release func after long press
self._if = None # increment function
self._ef = None # decrement function
self._tf = None # pressed function
self._ff = None # released function
self._df = None # double pressed function
self._lf = None # long pressed function
self._value = 0
self._state = _R_START
self._listener = []
self._reverse = -1 if rotary_reverse else 1
self._pin_clk = Pin(clk_num, Pin.IN)
self._pin_dt = Pin(dt_num, Pin.IN)
self._hal_enable_irq()
self.sense = self.pin.value() # Convert from electrical to logical value
self.btn_state = self.rawstate() # Initial state
loop = asyncio.get_event_loop()
loop.create_task(self.buttoncheck()) # Thread runs forever
def add_listener(self, l):
self._listener.append(l)
def remove_listener(self, l):
if l not in self._listener:
raise ValueError('{} is not an installed listener'.format(l))
self._listener.remove(l)
def increment_func(self, func, args=()):
self._if = func
self._ia = args
def decrement_func(self, func, args=()):
self._ef = func
self._ea = args
def press_func(self, func, args=()):
self._tf = func
self._ta = args
def release_func(self, func, args=()):
self._ff = func
self._fa = args
def double_func(self, func, args=()):
self._df = func
self._da = args
def long_func(self, func, args=()):
self._lf = func
self._la = args
def _enable_clk_irq(self, callback=None):
self._pin_clk.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=callback)
def _enable_dt_irq(self, callback=None):
self._pin_dt.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=callback)
def _disable_clk_irq(self):
self._pin_clk.irq(handler=None)
def _disable_dt_irq(self):
self._pin_dt.irq(handler=None)
def _hal_get_clk_value(self):
return self._pin_clk.value()
def _hal_get_dt_value(self):
return self._pin_dt.value()
def _hal_enable_irq(self):
self._enable_clk_irq(self._process_rotary_pins)
self._enable_dt_irq(self._process_rotary_pins)
def _hal_disable_irq(self):
self._disable_clk_irq()
self._disable_dt_irq()
def _hal_close(self):
self._hal_disable_irq()
def _process_rotary_pins(self, pin):
old_value = self._value
clk_dt_pins = (self._hal_get_clk_value() << 1) | self._hal_get_dt_value()
# Determine next state
self._state = _R_transition_table[self._state & _STATE_MASK][clk_dt_pins]
direction = self._state & _DIR_MASK
incr = 0
if direction == _DIR_CW:
incr = 1
elif direction == _DIR_CCW:
incr = -1
incr *= self._reverse
if incr > 0 and self._if:
# launch(self._if, self._ia)
micropython.schedule(self._if, self._ia)
elif incr < 0 and self._ef:
# launch(self._ef, self._ea)
micropython.schedule(self._ef, self._ea)
# Current non-debounced logical button state: True == pressed
def rawstate(self):
return bool(self.pin.value() ^ self.sense)
# Current debounced state of button (True == pressed)
def __call__(self):
return self.btn_state
async def buttoncheck(self):
t_change = None
supp = False
clicks = 0
lpr = False # long press ran
####
# local functions for performance improvements
deb = self.debounce_ms
dcms = self.double_click_ms
lpms = self.long_press_ms
raw = self.rawstate
ticks_diff = time.ticks_diff
ticks_ms = time.ticks_ms
#
while True:
btn_state = raw()
if btn_state is False and self.btn_state is False and self._supp and \
ticks_diff(ticks_ms(), t_change) > dcms and clicks > 0 and self._ff:
clicks = 0
# launch(self._ff, self._fa)
micropython.schedule(self._ff, self._fa)
elif btn_state is True and self.btn_state is True:
if clicks > 0 and ticks_diff(ticks_ms(), t_change) > dcms:
# double click timeout
clicks = 0
if self._lf and lpr is False: # check long press
if ticks_diff(ticks_ms(), t_change) >= lpms:
lpr = True
clicks = 0
if self._supp is True:
supp = True
# launch(self._lf, self._la)
micropython.schedule(self._lf, self._la)
elif btn_state != self.btn_state: # state changed
lpr = False
self.btn_state = btn_state
if btn_state is True: # Button pressed: launch pressed func
if ticks_diff(ticks_ms(), t_change) > dcms:
clicks = 0
if self._df:
clicks += 1
if clicks == 2: # double click
clicks = 0
if self._supp is True:
supp = True
# launch(self._df, self._da)
micropython.schedule(self._df, self._da)
elif self._tf:
# launch(self._tf, self._ta)
micropython.schedule(self._tf, self._ta)
else: # Button released. launch release func
if supp is True:
supp = False
elif clicks and self._supp > 0:
pass
elif self._ff: # not after a long press with suppress
# launch(self._ff, self._fa)
micropython.schedule(self._ff, self._fa)
t_change = ticks_ms()
# Ignore state changes until switch has settled
await asyncio.sleep_ms(deb)
===== Example =====
import sys
import time
from machine import Pin, PWM
import uasyncio as asyncio
from libs.ky040 import ky040
time.sleep(3)
def map(x, in_min=0, in_max=180, out_min=25, out_max=115):
return int((x - in_min) * (out_max - out_min) /
(in_max - in_min) + out_min)
# example of a class that uses one rotary encoder
class Application1():
def __init__(self, r1):
self.r1 = r1
self.myevent = asyncio.Event()
asyncio.create_task(self.action())
r1.add_listener(self.callback)
def callback(self):
self.myevent.set()
async def action(self):
while True:
await self.myevent.wait()
print('App 1: rotary = {}'. format(self.r1.value()))
servo.duty(map(self.r1.value() * 10))
# do something with the encoder result ...
self.myevent.clear()
rot = ky040(btn_num = 17, clk_num=13, dt_num=12, rotary_reverse=True)
print("Starting rotary encoder")
async def main():
short_press = rot.release_func(print, ("SHORT",))
double_press = rot.double_func(print, ("DOUBLE",))
long_press = rot.long_func(print, ("LONG",))
incr = rot.increment_func(print, ("INCR",))
decr = rot.decrement_func(print, ("DECR",))
# create tasks that use the rotary encoders
app1 = Application1(rot)
while True:
await asyncio.sleep(1)
asyncio.run(main())