Source code for lora

import logging
import threading
import lora_constants
import sys

[docs] class LoRa: """LoRa class. Manages the LoRa operations communicating with LoRa modules. Attributes: lock (threading.Lock): Semaphore to use LoRa object. loraModule (str): LoRa module type. serialPort (serial.Serial): Serial object. base_band (str): LoRa base band. lora_class (str): LoRa class. tx_power (int): LoRa transmission power. """ def __init__(self, _loraModule, _serialPortObj): """LoRa class constructor. Parameters: _loraModule (str): LoRa module type. _serialPortObj (serial.Serial): Serial object. Returns: None """ ## Semaphore to use LoRa object self.lock = threading.Lock() ## LoRa module type self.loraModule = _loraModule ## Serial object self.serialPort = _serialPortObj # # Get and check LoRa base band # self.base_band = _loraCfg.get("base_band") # if self.base_band not in lora_constants.lora_baseband_ref: # logging.error("Base band parameter in LoRa configuration file is not valid") # sys.exit("\tERROR: Base band parameter in LoRa configuration file is not valid!!!") # # Get and check LoRa class # self.lora_class = _loraCfg.get("lora_class") # if self.lora_class not in lora_constants.lora_class_ref: # logging.error("Class parameter in LoRa configuration file is not valid") # sys.exit("\tERROR: Class parameter in LoRa configuration file is not valid!!!") # # Get and check LoRa transmission power # self.tx_power = _loraCfg.get("tx_power") # if self.base_band == "EU434" or self.base_band == "EU868": # if self.tx_power not in lora_constants.lora_tx_power_868_ref: # logging.error("Transmission power parameter in LoRa configuration file is not valid") # sys.exit("\tERROR: Transmission power parameter in LoRa configuration file is not valid!!!") # elif self.base_band == "US915" or self.base_band == "AU920": # if self.tx_power not in lora_constants.lora_tx_power_915_ref: # logging.error("Transmission power parameter in LoRa configuration file is not valid") # sys.exit("\tERROR: Transmission power parameter in LoRa configuration file is not valid!!!") # # Get and check ADR (Automatic Data Rate) # self.adr = _loraCfg.get("adr") # if self.adr not in lora_constants.lora_adr_ref: # logging.error("ADR parameter in LoRa configuration file is not valid") # sys.exit("\tERROR: ADR parameter in LoRa configuration file is not valid!!!") # # Get and check Authentication Mode # self.auth_mode = _loraCfg.get("auth_mode") # if self.auth_mode not in lora_constants.lora_auth_mode_ref: # logging.error("Authentication mode parameter in LoRa configuration file is not valid") # sys.exit("\tERROR: Authentication mode parameter in LoRa configuration file is not valid!!!") # self.repeat = _loraCfg.get("repeat") # self.retry = _loraCfg.get("retry")
[docs] def checkLoRa(self): """Check if LoRa module is connected and ready to use. Parameters: None Returns: bool: True - if successfuly / False - if failed """ self.serialPort.write(b"AT\n") response = self.serialPort.readline().decode("UTF-8") if "OK" in response: self.serialPort.write(b"AT+VER\n") response = self.serialPort.readline().decode("UTF-8") response = response.split(": ") response = response[1] response = response[0:len(response)-2] logging.info("Connected to %s LoRa module with firmware version %s", self.loraModule, response) return True else: logging.error("LoRa module is not connected") return False
[docs] def getLoRaMaxPayload(self, loraBaseBand, loraUPDatarate): """Get the maximum payload size based on base band and uplink data rate. Parameters: loraBaseBand (str): LoRa base band. loraUPDatarate (str): Uplink data rate. Returns: int: Maximum payload size. """ max_payload = 0 if loraBaseBand == "EU434" or loraBaseBand == "EU868": if loraUPDatarate not in lora_constants.lora_dr_868_ref: logging.error("Uplink data rate parameter in LoRa configuration file is not valid") sys.exit("\tERROR: Uplink data rate parameter in LoRa configuration file is not valid!!!") if (loraUPDatarate == "DR0" or loraUPDatarate == "DR1" or loraUPDatarate == "DR2"): max_payload = 51 elif (loraUPDatarate == "DR3"): max_payload = 115 if (loraUPDatarate == "DR4" or loraUPDatarate == "DR5" or loraUPDatarate == "DR6" or loraUPDatarate == "DR7"): max_payload = 242 if loraBaseBand == "US915" or loraBaseBand == "AU915" or loraBaseBand == "AU920": if loraUPDatarate not in lora_constants.lora_dr_915_ref: logging.error("Uplink data rate parameter in LoRa configuration file is not valid") sys.exit("\tERROR: Uplink data rate parameter in LoRa configuration file is not valid!!!") if (loraUPDatarate == "DR0"): max_payload = 11 elif (loraUPDatarate == "DR1"): max_payload = 53 elif (loraUPDatarate == "DR2"): max_payload = 126 elif (loraUPDatarate == "DR3" or loraUPDatarate == "DR4"): max_payload = 242 return max_payload
# # Setup LoRa channel 0 # logging.debug("Setting LoRa channel 0 to %s and %s", self.chan0_freq, self.chan0_dr) # cmd = "AT+CH=0,"+str(self.chan0_freq)+","+self.chan0_dr+"\n" # self.serialPort.write(cmd.encode("UTF-8")) # response = self.serialPort.readline().decode("UTF-8") # if self.chan0_dr in response: # pass # else: # logging.warning("Channel 0 at LoRa module not configured") # # Setup LoRa channel 1 # logging.debug("Setting LoRa channel 1 to %s and %s", self.chan1_freq, self.chan1_dr) # cmd = "AT+CH=1,"+str(self.chan1_freq)+","+self.chan1_dr+"\n" # self.serialPort.write(cmd.encode("UTF-8")) # response = self.serialPort.readline().decode("UTF-8") # if self.chan1_dr in response: # pass # else: # logging.warning("Channel 1 at LoRa module not configured")
[docs] def setLoRaRX1Win(self): """Enable RX window 1.""" logging.debug("Enabling LoRa receive window 1") cmd = "AT+RXWIN1=ON\n" self.serialPort.write(cmd.encode("UTF-8")) response = self.serialPort.readline().decode("UTF-8") if "RXWIN1: ON" in response: pass else: logging.warning("RX window 1 at LoRa module not enabled")
[docs] def setLoRaRX2Win(self, loraRX2Frequency, loraRX2Datarate): """Sets the RX window 2 parameters. Parameters: loraRX2Frequency (int): RX window 2 frequency. loraRX2Datarate (str): RX window 2 data rate. Returns: None """ logging.debug("Setting LoRa receive window 2 to %s and %s", loraRX2Frequency, loraRX2Datarate) cmd = "AT+RXWIN2="+str(loraRX2Frequency)+","+loraRX2Datarate+"\n" self.serialPort.write(cmd.encode("UTF-8")) response = self.serialPort.readline().decode("UTF-8") if loraRX2Datarate in response: pass else: logging.warning("Response window 2 at LoRa module not configured")
[docs] def setLoRaAuthMode(self, loraAuthMode): """Sets the authentication mode. Parameters: loraAuthMode (str): authentication mode Returns: None """ logging.debug("Setting LoRa authentication mode to %s", loraAuthMode) cmd = "AT+MODE="+loraAuthMode+"\n" self.serialPort.write(cmd.encode("UTF-8")) response = self.serialPort.readline().decode("UTF-8") if loraAuthMode in response: pass else: logging.warning("Authentication mode at LoRa module not configured")
[docs] def setLoRaADR(self, loraADR): """Enable or disable ADR (Automatic Data Rate). Parameters: loraADR (str): enable (ON) or disable (OFF) Returns: None """ logging.debug("Setting LoRa ADR to %s", loraADR) cmd = "AT+ADR="+loraADR+"\n" self.serialPort.write(cmd.encode("UTF-8")) response = self.serialPort.readline().decode("UTF-8") if loraADR in response: pass else: logging.warning("ADR at LoRa module not configured")
[docs] def setLoRaUPDatarate(self, loraUPDatarate): """Sets the uplink data rate. Parameters: loraUPDatarate (str): uplink data rate Returns: None """ logging.debug("Setting LoRa uplink data rate to %s", loraUPDatarate) cmd = "AT+DR="+loraUPDatarate+"\n" self.serialPort.write(cmd.encode("UTF-8")) response = self.serialPort.readline().decode("UTF-8") if loraUPDatarate in response: pass else: logging.warning("Uplink data rate at LoRa module not configured")
[docs] def setLoRaTXPower(self, loraTXPower): """Sets the transmission power (in dBm). Parameters: loraTXPower (int): transmission power (in dBm) Returns: None """ logging.debug("Setting LoRa transmission power to %s dBm", loraTXPower) cmd = "AT+POWER="+str(loraTXPower)+"\n" self.serialPort.write(cmd.encode("UTF-8")) response = self.serialPort.readline().decode("UTF-8") if str(loraTXPower) in response: pass else: logging.warning("Transmission power at LoRa module not configured")
[docs] def setLoRaClass(self, loraClass): """Sets the LoRa class. Parameters: loraClass (str): LoRa class Returns: None """ logging.debug("Setting LoRa class to %s", loraClass) cmd = "AT+CLASS="+loraClass+"\n" self.serialPort.write(cmd.encode("UTF-8")) response = self.serialPort.readline().decode("UTF-8") if loraClass in response: pass else: logging.warning("Class at LoRa module not configured")
[docs] def setLoRaSubBand(self, loraSubBand): """Sets the sub-band. This will disable all channels not belonging to the specified sub-band. Parameters: loraSubBand (int): sub-band Returns: None """ if (self.loraModule == "RHF76-052"): if loraSubBand == 1: for ch in range(8, 72): logging.debug("Disabling LoRa channel %s", ch) cmd = "AT+CH="+str(ch)+", 0\n" self.serialPort.write(cmd.encode("UTF-8")) response = self.serialPort.readline().decode("UTF-8") if str(ch) in response: pass else: logging.warning("Failed disabling LoRa channel %s", ch) elif loraSubBand == 2: for ch in range(0, 8): logging.debug("Disabling LoRa channel %s", ch) cmd = "AT+CH="+str(ch)+", 0\n" self.serialPort.write(cmd.encode("UTF-8")) response = self.serialPort.readline().decode("UTF-8") if str(ch) in response: pass else: logging.warning("Failed disabling LoRa channel %s", ch) for ch in range(16, 65): logging.debug("Disabling LoRa channel %s", ch) cmd = "AT+CH="+str(ch)+", 0\n" self.serialPort.write(cmd.encode("UTF-8")) response = self.serialPort.readline().decode("UTF-8") if str(ch) in response: pass else: logging.warning("Failed disabling LoRa channel %s", ch) for ch in range(66, 72): logging.debug("Disabling LoRa channel %s", ch) cmd = "AT+CH="+str(ch)+", 0\n" self.serialPort.write(cmd.encode("UTF-8")) response = self.serialPort.readline().decode("UTF-8") if str(ch) in response: pass else: logging.warning("Failed disabling LoRa channel %s", ch)
# elif (self.loraModule == "RHF03M003"):
[docs] def setLoRaBaseBand(self, loraBaseBand): """Sets the base-band. Parameters: loraBaseBand (str): base-band Returns: None """ logging.debug("Setting LoRa base band to %s", loraBaseBand) cmd = "AT+DR="+loraBaseBand+"\n" self.serialPort.write(cmd.encode("UTF-8")) response = self.serialPort.readline().decode("UTF-8") if loraBaseBand in response: pass else: logging.warning("Base band at LoRa module not configured")
[docs] def setDevEUI(self, devEUI): """Sets the device EUI. Parameters: devEUI (str): device EUI Returns: None """ logging.debug("Setting TTN device EUI to %s", devEUI) cmd = "AT+ID=DevEui,"+devEUI+"\n" self.serialPort.write(cmd.encode("UTF-8")) response = self.serialPort.readline().decode("UTF-8") if "DevEui," in response: pass else: logging.warning("Device EUI at LoRa module not configured")
[docs] def setAppEUI(self, appEUI): """Sets the application EUI. Parameters: appEUI (str): application EUI Returns: None """ logging.debug("Setting TTN application EUI to %s", appEUI) cmd = "AT+ID=AppEui,"+appEUI+"\n" self.serialPort.write(cmd.encode("UTF-8")) response = self.serialPort.readline().decode("UTF-8") if "AppEui," in response: pass else: logging.warning("Application EUI at LoRa module not configured")
[docs] def setDevAddr(self, devAddr): """Sets the device address. Parameters: devAddr (str): device address Returns: None """ logging.debug("Setting TTN device address to %s", devAddr) cmd = "AT+ID=DevAddr,"+devAddr+"\n" self.serialPort.write(cmd.encode("UTF-8")) response = self.serialPort.readline().decode("UTF-8") if "DevAddr," in response: pass else: logging.warning("Device address at LoRa module not configured")
[docs] def setNwkSKey(self, nwkSKey): """Sets the network session key. Parameters: nwkSKey network session Returns: None """ logging.debug("Setting TTN network session key to %s", nwkSKey) cmd = "AT+KEY=NwkSKey,"+nwkSKey+"\n" self.serialPort.write(cmd.encode("UTF-8")) response = self.serialPort.readline().decode("UTF-8") if "NWKSKEY" in response: pass else: logging.warning("Network session key at LoRa module not configured")
[docs] def setAppSKey(self, appSKey): """Sets the application session key. Parameters: appSKey application session key. Returns: None """ logging.debug("Setting TTN application session key to %s", appSKey) cmd = "AT+KEY=AppSKey,"+appSKey+"\n" self.serialPort.write(cmd.encode("UTF-8")) response = self.serialPort.readline().decode("UTF-8") if "APPSKEY" in response: pass else: logging.warning("Application session key at LoRa module not configured")
[docs] def setAppKey(self, appKey): """Sets the application key. Parameters: appKey application key. Returns: None """ logging.debug("Setting TTN application key to %s", appKey) cmd = "AT+KEY=AppKey,"+appKey+"\n" self.serialPort.write(cmd.encode("UTF-8")) response = self.serialPort.readline().decode("UTF-8") if "AppKey," in response: pass else: logging.warning("Application key at LoRa module not configured")
[docs] def sendJoinRequest(self): """Sends a join request. Parameters: None Returns: bool: True - if successful / False - if failed """ logging.debug("Sending Lora JOIN request") cmd = "AT+Join\n" self.serialPort.write(cmd.encode("UTF-8")) response = self.serialPort.readline().decode("UTF-8") print(response) while True: response = self.serialPort.readline().decode("UTF-8") print(response) if "failed" in response: logging.warning("Join failed") return False elif "DevAddr" in response: logging.info("Network joined") return True elif "Joined" in response: logging.info("Joined already") return True
[docs] def sendNoAckMsgHex(self, _loraPort, _loraMsg): """Sends unconfirmed hexadecimal messages. Parameters: _loraPort port _loraMsg hexadecimal message Returns: None """ logging.debug("Setting Lora port to %s", _loraPort) cmd = "AT+PORT="+str(_loraPort)+"\n" self.serialPort.write(cmd.encode("UTF-8")) response = self.serialPort.readline().decode("UTF-8") if "PORT" in response: pass else: logging.warning("LoRa port at LoRa module not configured") logging.info("Sending no ack hexadecimal message: %s", _loraMsg) cmd = "AT+MSGHEX=\""+_loraMsg+"\"\n" self.serialPort.write(cmd.encode("UTF-8")) while True: response = self.serialPort.readline().decode("UTF-8") # print(response) if "Done" in response: break
[docs] def sendAckMsgHex(self, _loraPort, _loraMsg): """Sends confirmed hexadecimal messages. Parameters: _loraPort port _loraMsg hexadecimal message Returns: None """ logging.debug("Setting Lora port to %s", _loraPort) cmd = "AT+PORT="+str(_loraPort)+"\n" self.serialPort.write(cmd.encode("UTF-8")) response = self.serialPort.readline().decode("UTF-8") if "PORT" in response: pass else: logging.warning("LoRa port at LoRa module not configured") logging.info("Sending ack hexadecimal message: %s", _loraMsg) cmd = "AT+CMSGHEX=\""+_loraMsg+"\"\n" self.serialPort.write(cmd.encode("UTF-8")) while True: response = self.serialPort.readline().decode("UTF-8") # print(response) if "Done" in response: break