41 lines
1.4 KiB
Python
41 lines
1.4 KiB
Python
#!/usr/bin/env python3
|
|
import threading
|
|
import time
|
|
import logging
|
|
import argparse
|
|
from m2m.nbiot import factory
|
|
|
|
logging.basicConfig(level=logging.INFO, format='[%(threadName)s] %(message)s')
|
|
|
|
def urc_listener(module):
|
|
"""Background thread to continuously poll for URCs."""
|
|
while True:
|
|
# poll() uses the internal lock, so it's safe to run alongside the main thread
|
|
module.poll(timeout=1.0)
|
|
time.sleep(0.1)
|
|
|
|
def main(port):
|
|
module = factory('BG95', port)
|
|
|
|
with module:
|
|
module.radio_on()
|
|
|
|
# Start background listener
|
|
listener = threading.Thread(target=urc_listener, args=(module,), name="URC-Pump", daemon=True)
|
|
listener.start()
|
|
|
|
# Main thread performs transactions
|
|
for i in range(10):
|
|
with module.transaction:
|
|
rssi, ber = module.get_signal_quality()
|
|
status = module.read_cereg_status()
|
|
logging.info(f"Check {i}: Signal={rssi}, RegStatus={status}")
|
|
|
|
# The background thread can still process incoming data here
|
|
time.sleep(5)
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Run async URC handler demo.")
|
|
parser.add_argument("port", nargs="?", default="/dev/ttyUSB0", help="Serial port (e.g. /dev/ttyUSB0 or MOCK)")
|
|
args = parser.parse_args()
|
|
main(args.port)
|