====== ESP32 LoRa SX1276 (433MHz) ======
The main chip using ESP32, Tensilica LX6 dual-core processor, clocked at 240MHz, computing power up to 600DMIPS, chip built-in 520 KB SRAM, 802.11 b / g / N HT40 Wi-Fi transceiver, baseband, protocol stack and LWIP, integrated dual-mode Bluetooth (traditional Bluetooth and BLE low power Bluetooth).
This product is based on the WIFI 32 added SX1278 chip, that is, LoRa ™ remote modem, 433MHz frequency, about -148dBm high sensitivity, +20 dBm power output, high reliability.
Onboard 32MBit Flash, Wi-Fi antenna, lithium battery charging circuit and interface, CP2102 USB to serial chip, the perfect support for Arduino development environment, can be very simple and fast for program verification and product development.
{{:components:esp32-lora.png?nolink&200|}}
**Specification**
* Operating voltage: 3.3V to 7V
* Operating temperature range: -40 ° C to + 90 ° C
* Supports Sniffer, Station, softAP and Wi-Fi Direct modes
* Data rate: 150 Mbps @ 11n HT40,72 Mbps @ 11n HT20,54 Mbps @ 11g, 11 Mbps @ 11b
* Transmit power: 19.5 dBm @ 11b, 16.5 dBm @ 11g, 15.5 dBm @ 11n
* Receiver sensitivity up to - 98 dBm
* UDP continues to throughput by 135 Mbps
====== Pinout ======
{{:components:esp32-lora-pinout.png?800|}}
====== Code ======
===== Samples =====
from machine import Pin, SPI
from Lib.sx127x import SX127x
# Board specific Lora configuration
lora_default = {
'frequency': 433e6,
'frequency_offset':0,
'tx_power_level': 14,
'signal_bandwidth': 125e3,
'spreading_factor': 9,
'coding_rate': 5,
'preamble_length': 8,
'implicitHeader': False,
'sync_word': 0x12,
'enable_CRC': True,
'invert_IQ': False,
'debug': False,
}
lora_pins = {
'dio_0':26,
'ss':18,
'reset':16,
'sck':5,
'miso':19,
'mosi':27,
}
lora_spi = SPI(
baudrate=10000000, polarity=0, phase=0,
bits=8, firstbit=SPI.MSB,
sck=Pin(lora_pins['sck'], Pin.OUT, Pin.PULL_DOWN),
mosi=Pin(lora_pins['mosi'], Pin.OUT, Pin.PULL_UP),
miso=Pin(lora_pins['miso'], Pin.IN, Pin.PULL_UP),
)
lora = SX127x(lora_spi, pins=lora_pins, parameters=lora_default)
print("LoRa Started");
while True:
if lora.receivedPacket():
try:
payload = lora.readPayload().decode()
rssi = lora.packetRssi()
snr = lora.packetSnr()
print("RX: {} | RSSI: {} SNR: {}".format(payload, rssi, snr))
from machine import Pin, SPI
from Lib.sx127x import SX127x
# Board specific Lora configuration
lora_default = {
'frequency': 433e6,
'frequency_offset':0,
'tx_power_level': 14,
'signal_bandwidth': 125e3,
'spreading_factor': 9,
'coding_rate': 5,
'preamble_length': 8,
'implicitHeader': False,
'sync_word': 0x12,
'enable_CRC': True,
'invert_IQ': False,
'debug': False,
}
lora_pins = {
'dio_0':26,
'ss':18,
'reset':16,
'sck':5,
'miso':19,
'mosi':27,
}
lora_spi = SPI(
baudrate=10000000, polarity=0, phase=0,
bits=8, firstbit=SPI.MSB,
sck=Pin(lora_pins['sck'], Pin.OUT, Pin.PULL_DOWN),
mosi=Pin(lora_pins['mosi'], Pin.OUT, Pin.PULL_UP),
miso=Pin(lora_pins['miso'], Pin.IN, Pin.PULL_UP),
)
lora = SX127x(lora_spi, pins=lora_pins, parameters=lora_default)
print("LoRa Started");
lora.println("Sending first lora message")
===== Library =====
from time import sleep, ticks_ms
from machine import SPI, Pin
from micropython import const
import gc
PA_OUTPUT_RFO_PIN = const(0)
PA_OUTPUT_PA_BOOST_PIN = const(1)
# registers
REG_FIFO = const(0x00)
REG_OP_MODE = const(0x01)
REG_FRF_MSB = const(0x06)
REG_FRF_MID = const(0x07)
REG_FRF_LSB = const(0x08)
REG_PA_CONFIG = const(0x09)
REG_LNA = const(0x0C)
REG_FIFO_ADDR_PTR = const(0x0D)
REG_FIFO_TX_BASE_ADDR = const(0x0E)
FifoTxBaseAddr = const(0x00)
REG_FIFO_RX_BASE_ADDR = const(0x0F)
FifoRxBaseAddr = const(0x00)
REG_FIFO_RX_CURRENT_ADDR = const(0x10)
REG_IRQ_FLAGS_MASK = const(0x11)
REG_IRQ_FLAGS = const(0x12)
REG_RX_NB_BYTES = const(0x13)
REG_PKT_RSSI_VALUE = const(0x1A)
REG_PKT_SNR_VALUE = const(0x19)
REG_MODEM_CONFIG_1 = const(0x1D)
REG_MODEM_CONFIG_2 = const(0x1E)
REG_PREAMBLE_MSB = const(0x20)
REG_PREAMBLE_LSB = const(0x21)
REG_PAYLOAD_LENGTH = const(0x22)
REG_FIFO_RX_BYTE_ADDR = const(0x25)
REG_MODEM_CONFIG_3 = const(0x26)
REG_RSSI_WIDEBAND = const(0x2C)
REG_DETECTION_OPTIMIZE = const(0x31)
REG_DETECTION_THRESHOLD = const(0x37)
REG_SYNC_WORD = const(0x39)
REG_DIO_MAPPING_1 = const(0x40)
REG_VERSION = const(0x42)
# invert IQ
REG_INVERTIQ = const(0x33)
RFLR_INVERTIQ_RX_MASK = const(0xBF)
RFLR_INVERTIQ_RX_OFF = const(0x00)
RFLR_INVERTIQ_RX_ON = const(0x40)
RFLR_INVERTIQ_TX_MASK = const(0xFE)
RFLR_INVERTIQ_TX_OFF = const(0x01)
RFLR_INVERTIQ_TX_ON = const(0x00)
REG_INVERTIQ2 = const(0x3B)
RFLR_INVERTIQ2_ON = const(0x19)
RFLR_INVERTIQ2_OFF = const(0x1D)
# modes
# bit 7: 1 => LoRa mode
MODE_LONG_RANGE_MODE = const(0x80)
MODE_SLEEP = const(0x00)
MODE_STDBY = const(0x01)
MODE_TX = const(0x03)
MODE_RX_CONTINUOUS = const(0x05)
MODE_RX_SINGLE = const(0x06)
# PA config
PA_BOOST = const(0x80)
# IRQ masks
IRQ_TX_DONE_MASK = const(0x08)
IRQ_PAYLOAD_CRC_ERROR_MASK = const(0x20)
IRQ_RX_DONE_MASK = const(0x40)
IRQ_RX_TIME_OUT_MASK = const(0x80)
# Buffer size
MAX_PKT_LENGTH = const(255)
class SX127x:
default_parameters = {
"frequency": 869525000,
"frequency_offset": 0,
"tx_power_level": 14,
"signal_bandwidth": 125e3,
"spreading_factor": 9,
"coding_rate": 5,
"preamble_length": 8,
"implicitHeader": False,
"sync_word": 0x12,
"enable_CRC": True,
"invert_IQ": False,
}
def __init__(self, spi, pins, parameters={}):
self.spi = spi
self.pins = pins
self.parameters = parameters
self.pin_ss = Pin(self.pins["ss"], Pin.OUT)
self.lock = False
self.implicit_header_mode = None
self.parameters = SX127x.default_parameters
if parameters:
self.parameters.update(parameters)
# check version
version = None
for i in range(5):
version = self.readRegister(REG_VERSION)
if version:
break
# debug output
print("SX version: {}".format(version))
# put in LoRa and sleep mode
self.sleep()
# config
self.setFrequency(self.parameters["frequency"])
self.setSignalBandwidth(self.parameters["signal_bandwidth"])
# set LNA boost
self.writeRegister(REG_LNA, self.readRegister(REG_LNA) | 0x03)
# set auto AGC
self.writeRegister(REG_MODEM_CONFIG_3, 0x04)
self.setTxPower(self.parameters["tx_power_level"])
self.implicitHeaderMode(self.parameters["implicitHeader"])
self.setSpreadingFactor(self.parameters["spreading_factor"])
self.setCodingRate(self.parameters["coding_rate"])
self.setPreambleLength(self.parameters["preamble_length"])
self.setSyncWord(self.parameters["sync_word"])
self.enableCRC(self.parameters["enable_CRC"])
self.invertIQ(self.parameters["invert_IQ"])
# set LowDataRateOptimize flag if symbol time > 16ms (default disable on reset)
# self.writeRegister(REG_MODEM_CONFIG_3, self.readRegister(REG_MODEM_CONFIG_3) & 0xF7) # default disable on reset
bw = self.parameters["signal_bandwidth"]
sf = self.parameters["spreading_factor"]
if 1000 / bw / 2 ** sf > 16:
self.writeRegister(
REG_MODEM_CONFIG_3, self.readRegister(REG_MODEM_CONFIG_3) | 0x08
)
# set base addresses
self.writeRegister(REG_FIFO_TX_BASE_ADDR, FifoTxBaseAddr)
self.writeRegister(REG_FIFO_RX_BASE_ADDR, FifoRxBaseAddr)
self.standby()
def beginPacket(self, implicitHeaderMode=False):
self.standby()
self.implicitHeaderMode(implicitHeaderMode)
# reset FIFO address and payload length
self.writeRegister(REG_FIFO_ADDR_PTR, FifoTxBaseAddr)
self.writeRegister(REG_PAYLOAD_LENGTH, 0)
def endPacket(self):
# put in TX mode
self.writeRegister(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_TX)
# wait for TX done, standby automatically on TX_DONE
while (self.readRegister(REG_IRQ_FLAGS) & IRQ_TX_DONE_MASK) == 0:
pass
# clear IRQ's
self.writeRegister(REG_IRQ_FLAGS, IRQ_TX_DONE_MASK)
def write(self, buffer):
currentLength = self.readRegister(REG_PAYLOAD_LENGTH)
size = len(buffer)
# check size
size = min(size, (MAX_PKT_LENGTH - FifoTxBaseAddr - currentLength))
# write data
for i in range(size):
self.writeRegister(REG_FIFO, buffer[i])
# update length
self.writeRegister(REG_PAYLOAD_LENGTH, currentLength + size)
return size
def aquirelock(self, lock=False):
# self.lock = False
self.lock = lock
def println(self, message, implicitHeader=False, repeat=1):
# wait until RX_Done, lock and begin writing
self.aquirelock(True)
if isinstance(message, str):
message = message.encode()
self.beginPacket(implicitHeader)
self.write(message)
for i in range(repeat):
self.endPacket()
# unlock when done writing
self.aquirelock(False)
self.collectGarbage()
def getIrqFlags(self):
irqFlags = self.readRegister(REG_IRQ_FLAGS)
self.writeRegister(REG_IRQ_FLAGS, irqFlags)
return irqFlags
def packetRssi(self, rfi="hf"):
packet_rssi = self.readRegister(REG_PKT_RSSI_VALUE)
return packet_rssi - (157 if rfi == "hf" else 164)
def packetSnr(self):
return (self.readRegister(REG_PKT_SNR_VALUE)) * 0.25
def standby(self):
self.writeRegister(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_STDBY)
def sleep(self):
self.writeRegister(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_SLEEP)
def setTxPower(self, level, outputPin=PA_OUTPUT_PA_BOOST_PIN):
self.parameters["tx_power_level"] = level
if outputPin == PA_OUTPUT_RFO_PIN:
# RFO
level = min(max(level, 0), 14)
self.writeRegister(REG_PA_CONFIG, 0x70 | level)
else:
# PA BOOST
level = min(max(level, 2), 17)
self.writeRegister(REG_PA_CONFIG, PA_BOOST | (level - 2))
def setFrequency(self, frequency):
# TODO min max limit
frequency = int(frequency)
self.parameters["frequency"] = frequency
frequency += self.parameters["frequency_offset"]
frf = (frequency << 19) // 32000000
self.writeRegister(REG_FRF_MSB, (frf >> 16) & 0xFF)
self.writeRegister(REG_FRF_MID, (frf >> 8) & 0xFF)
self.writeRegister(REG_FRF_LSB, (frf >> 0) & 0xFF)
def setSpreadingFactor(self, sf):
sf = min(max(sf, 6), 12)
self.writeRegister(REG_DETECTION_OPTIMIZE, 0xC5 if sf == 6 else 0xC3)
self.writeRegister(REG_DETECTION_THRESHOLD, 0x0C if sf == 6 else 0x0A)
self.writeRegister(
REG_MODEM_CONFIG_2,
(self.readRegister(REG_MODEM_CONFIG_2) & 0x0F) | ((sf << 4) & 0xF0),
)
def setSignalBandwidth(self, sbw):
bins = (
7.8e3,
10.4e3,
15.6e3,
20.8e3,
31.25e3,
41.7e3,
62.5e3,
125e3,
250e3,
)
bw = 9
if sbw < 10:
bw = sbw
else:
for i in range(len(bins)):
if sbw <= bins[i]:
bw = i
break
self.writeRegister(
REG_MODEM_CONFIG_1,
(self.readRegister(REG_MODEM_CONFIG_1) & 0x0F) | (bw << 4),
)
def setCodingRate(self, denominator):
denominator = min(max(denominator, 5), 8)
cr = denominator - 4
self.writeRegister(
REG_MODEM_CONFIG_1,
(self.readRegister(REG_MODEM_CONFIG_1) & 0xF1) | (cr << 1),
)
def setPreambleLength(self, length):
self.writeRegister(REG_PREAMBLE_MSB, (length >> 8) & 0xFF)
self.writeRegister(REG_PREAMBLE_LSB, (length >> 0) & 0xFF)
def enableCRC(self, enable_CRC=False):
modem_config_2 = self.readRegister(REG_MODEM_CONFIG_2)
config = modem_config_2 | 0x04 if enable_CRC else modem_config_2 & 0xFB
self.writeRegister(REG_MODEM_CONFIG_2, config)
def invertIQ(self, invertIQ):
self.parameters["invertIQ"] = invertIQ
if invertIQ:
self.writeRegister(
REG_INVERTIQ,
(
(
self.readRegister(REG_INVERTIQ)
& RFLR_INVERTIQ_TX_MASK
& RFLR_INVERTIQ_RX_MASK
)
| RFLR_INVERTIQ_RX_ON
| RFLR_INVERTIQ_TX_ON
),
)
self.writeRegister(REG_INVERTIQ2, RFLR_INVERTIQ2_ON)
else:
self.writeRegister(
REG_INVERTIQ,
(
(
self.readRegister(REG_INVERTIQ)
& RFLR_INVERTIQ_TX_MASK
& RFLR_INVERTIQ_RX_MASK
)
| RFLR_INVERTIQ_RX_OFF
| RFLR_INVERTIQ_TX_OFF
),
)
self.writeRegister(REG_INVERTIQ2, RFLR_INVERTIQ2_OFF)
def setSyncWord(self, sw):
self.writeRegister(REG_SYNC_WORD, sw)
def setChannel(self, parameters):
self.standby()
for key in parameters:
if key == "frequency":
self.setFrequency(parameters[key])
continue
if key == "invert_IQ":
self.invertIQ(parameters[key])
continue
if key == "tx_power_level":
self.setTxPower(parameters[key])
continue
def dumpRegisters(self):
# TODO end=''
for i in range(128):
print("0x{:02X}: {:02X}".format(i, self.readRegister(i)), end="")
if (i + 1) % 4 == 0:
print()
else:
print(" | ", end="")
def implicitHeaderMode(self, implicitHeaderMode=False):
if (
self.implicit_header_mode != implicitHeaderMode
): # set value only if different.
self.implicit_header_mode = implicitHeaderMode
modem_config_1 = self.readRegister(REG_MODEM_CONFIG_1)
config = (
modem_config_1 | 0x01
if implicitHeaderMode
else modem_config_1 & 0xFE
)
self.writeRegister(REG_MODEM_CONFIG_1, config)
def receive(self, size=0):
self.implicitHeaderMode(size > 0)
if size > 0:
self.writeRegister(REG_PAYLOAD_LENGTH, size & 0xFF)
# The last packet always starts at FIFO_RX_CURRENT_ADDR
# no need to reset FIFO_ADDR_PTR
self.writeRegister(
REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_RX_CONTINUOUS
)
def listen(self, time=1000):
time = min(max(time, 0), 10000)
self.receive()
start = ticks_ms()
while True:
if self.receivedPacket():
return self.readPayload()
if ticks_ms() - start > time:
return None
def onReceive(self, callback):
self.onReceive = callback
if "dio_0" in self.pins:
self.pin_rx_done = Pin(self.pins["dio_0"], Pin.IN)
if self.pin_rx_done:
if callback:
self.writeRegister(REG_DIO_MAPPING_1, 0x00)
self.pin_rx_done.irq(
trigger=Pin.IRQ_RISING, handler=self.handleOnReceive
)
else:
pass
# TODO detach irq
def handleOnReceive(self, event_source):
# lock until TX_Done
self.aquirelock(True)
irqFlags = self.getIrqFlags()
# RX_DONE only, irqFlags should be 0x40
if irqFlags & IRQ_RX_DONE_MASK == IRQ_RX_DONE_MASK:
# automatically standby when RX_DONE
if self.onReceive:
payload = self.readPayload()
self.onReceive(self, payload)
elif self.readRegister(REG_OP_MODE) != (
MODE_LONG_RANGE_MODE | MODE_RX_SINGLE
):
# no packet received.
# reset FIFO address / # enter single RX mode
self.writeRegister(REG_FIFO_ADDR_PTR, FifoRxBaseAddr)
self.writeRegister(
REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_RX_SINGLE
)
self.aquirelock(False) # unlock in any case.
self.collectGarbage()
return True
def receivedPacket(self, size=0):
irqFlags = self.getIrqFlags()
self.implicitHeaderMode(size > 0)
if size > 0:
self.writeRegister(REG_PAYLOAD_LENGTH, size & 0xFF)
# if (irqFlags & IRQ_RX_DONE_MASK) and \
# (irqFlags & IRQ_RX_TIME_OUT_MASK == 0) and \
# (irqFlags & IRQ_PAYLOAD_CRC_ERROR_MASK == 0):
if (
irqFlags == IRQ_RX_DONE_MASK
): # RX_DONE only, irqFlags should be 0x40
# automatically standby when RX_DONE
return True
elif self.readRegister(REG_OP_MODE) != (
MODE_LONG_RANGE_MODE | MODE_RX_SINGLE
):
# no packet received.
# reset FIFO address / # enter single RX mode
self.writeRegister(REG_FIFO_ADDR_PTR, FifoRxBaseAddr)
self.writeRegister(
REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_RX_SINGLE
)
def readPayload(self):
# set FIFO address to current RX address
# fifo_rx_current_addr = self.readRegister(REG_FIFO_RX_CURRENT_ADDR)
self.writeRegister(
REG_FIFO_ADDR_PTR, self.readRegister(REG_FIFO_RX_CURRENT_ADDR)
)
# read packet length
packet_length = 0
if self.implicit_header_mode:
packet_length = self.readRegister(REG_PAYLOAD_LENGTH)
else:
packet_length = self.readRegister(REG_RX_NB_BYTES)
payload = bytearray()
for i in range(packet_length):
payload.append(self.readRegister(REG_FIFO))
self.collectGarbage()
return bytes(payload)
def readRegister(self, address, byteorder="big", signed=False):
response = self.transfer(address & 0x7F)
return int.from_bytes(response, byteorder)
def writeRegister(self, address, value):
self.transfer(address | 0x80, value)
def transfer(self, address, value=0x00):
response = bytearray(1)
self.pin_ss.value(0)
self.spi.write(bytes([address]))
self.spi.write_readinto(bytes([value]), response)
self.pin_ss.value(1)
return response
def collectGarbage(self):
gc.collect()
# print('[Mem aft - free: {} allocated: {}]'.format(gc.mem_free(), gc.mem_alloc()))