diff --git a/.gitignore b/.gitignore index ab3e8ce..b542706 100644 --- a/.gitignore +++ b/.gitignore @@ -154,7 +154,7 @@ dmypy.json # Cython debug symbols cython_debug/ - +.DS_Store # PyCharm # JetBrains specific template is maintained in a separate JetBrains.gitignore that can # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore diff --git a/README.md b/README.md index ade2269..236eb54 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,98 @@ # m2m-python -A versatile Python library for controlling IoT modules (Quectel BG9x/BC66, HiSilicon) via AT commands. Features a thread-safe serial engine, real-time URC handling, SIM forensics, and an interactive diagnostic shell. \ No newline at end of file +Python library for controlling IoT modules (Quectel BG96, BG95, BC66, HiSilicon, etc.). + +## Features + +- **Robust AT Command Parser**: Handles URCs, multi-line responses, and timeouts gracefully. +- **Interactive Shell**: A powerful CLI for manual control, diagnostics, and testing. +- **Multiple Connectivity Options**: Supports Serial ports (`/dev/ttyUSB0`), TCP sockets (`socket://`), and RFC2217. +- **Vendor Agnostic Core**: Abstract base class with specialized implementations for Quectel and HiSilicon. +- **Advanced Diagnostics**: Signal quality, cell info, SIM forensics, and network scanning. +- **Network Services**: Integrated Ping, DNS, HTTP(S), and Socket (UDP/TCP) support. + +## Installation + +```bash +pip install . +``` + +## Quick Start: Interactive Shell + +The easiest way to get started is using the built-in interactive shell: + +```bash +# Connect to a physical serial port +m2m-shell /dev/ttyUSB0 --type BG96 + +# Connect to a remote modem via TCP (e.g., using socat) +m2m-shell socket://192.168.1.50:3000 --type BG95 +``` + +### Available Commands +- `info`: Display module IMEI, IMSI, Firmware, and configuration. +- `status`: Check registration, signal quality, and current operator. +- `nw_info`: Show detailed network and serving cell info. +- `cells`: Show serving and neighbor cell information. +- `forensics`: Run a deep SIM card analysis. +- `scan`: Search for available network operators. +- `connect [apn]`: Establish a PDP context. +- `disconnect`: Deactivate PDP context. +- `operator [act]`: Select operator manually. +- `ping [num]`: Ping a remote host. +- `dns `: Resolve a hostname. +- `http_get `: Perform an HTTP GET request. +- `udp_send `: Send a UDP packet. +- `udp_listen `: Open a local UDP listener. +- `tcp_client `: Connect to a TCP server. +- `sockets`: List active sockets and their states. +- `close `: Close a socket by ID. +- `recv [len]`: Read data from a socket. +- `sms_send `: Send an SMS message. +- `sms_list [ALL|REC UNREAD]`: List SMS messages. +- `radio `: Toggle module radio. +- `reboot`: Reboot the module. +- `mode `: Set connectivity preference. +- `psm `: Control Power Saving Mode. +- `edrx `: Configure eDRX settings. +- `bands [gsm|catm|nb]`: Show or lock frequency bands. +- `gpio set `: Control GPIO pins. +- `at `: Send raw AT command to the module. +- `clock [show|sync]`: Display or sync system clock via NTP. +- `temp`: Read module internal temperature. +- `ls [pattern]`: List files on the module filesystem. +- `help`: Show all available commands with descriptions. + +## Programmatic Usage + +```python +from m2m.nbiot import factory + +# Initialize module +module = factory("BG95", "/dev/ttyUSB0") + +with module: + # Check signal + rssi, ber = module.get_signal_quality() + print(f"Signal Strength: {rssi}") + + # Connect to network + if module.connect_network("iot.telefonica.de"): + print("Connected!") + + # Perform DNS lookup + ips = module.dns_query("google.com") + print(f"Google IP: {ips[0]}") +``` + +## Development + +Install dev dependencies: +```bash +pip install -e .[dev] +``` + +Run tests: +```bash +pytest tests/ +``` diff --git a/examples/01_diagnostics.py b/examples/01_diagnostics.py new file mode 100644 index 0000000..5aa19fc --- /dev/null +++ b/examples/01_diagnostics.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 +import logging +import argparse +from m2m.nbiot import factory + +# Setup logging +logging.basicConfig(level=logging.INFO, format='%(name)s - %(levelname)s - %(message)s') + +def run_diagnostics(port): + # Instantiate a BG96 module (works the same for BG95) + module = factory('BG96', port, baudrate=115200) + + with module: + print("--- Module Info ---") + print(f"Firmware: {module.read_firmware_version()}") + print(f"IMEI: {module.read_imei()}") + print(f"ICCID: {module.read_iccid()}") + + print("\n--- SIM Status ---") + # Check if PIN is needed + if module.unlock_sim("1234"): + print("SIM Unlocked/Ready") + + print(f"Registered PLMN: {module.get_registered_plmn()}") + print(f"Forbidden PLMNs: {module.read_forbidden_plmns()}") + + print("\n--- Network Info ---") + rssi, ber = module.get_signal_quality() + print(f"Signal: RSSI {rssi}, BER {ber}") + print(f"Registration: {module.read_cereg_status()}") + + # Deep engineering info + cell_info = module.get_serving_cell_info() + print(f"Serving Cell: {cell_info}") + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run module diagnostics.") + parser.add_argument("port", nargs="?", default="/dev/ttyUSB0", help="Serial port (e.g. /dev/ttyUSB0 or MOCK)") + args = parser.parse_args() + run_diagnostics(args.port) diff --git a/examples/02_mqtt_secure.py b/examples/02_mqtt_secure.py new file mode 100644 index 0000000..fe424a1 --- /dev/null +++ b/examples/02_mqtt_secure.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 +import logging +import argparse +from m2m.nbiot import factory + +logging.basicConfig(level=logging.INFO) + +def run_mqtt_demo(port): + module = factory('BG95', port, baudrate=115200) + + with module: + # 1. Initialization + module.radio_on() + module.activate_pdp_context(context_id=1) + + # 2. Upload Certificates (Assume they are locally available) + # module.upload_file("ca.crt", b"-----BEGIN CERTIFICATE-----\n...") + + # 3. Configure SSL Context + module.ssl_configure(ssl_ctx_id=0, cacert="ca.crt") + + # 4. Configure MQTT to use SSL Context 0 + module.set_qcfg("mqtt/ssl", 1) # Specific QCFG for some firmwares + module.mqtt_configure(client_idx=0, keepalive=120) + + # 5. Connect to Broker + print(f"Connecting to MQTT Broker via {port}...") + if module.mqtt_open(client_idx=0, host="your-iot-endpoint.amazonaws.com", port=8883) == 0: + if module.mqtt_connect(client_idx=0, client_id="my_device_01") == 0: + print("Connected! Publishing data...") + + # 6. Publish data + module.mqtt_publish(client_idx=0, msg_id=1, qos=1, retain=0, + topic="sensors/temperature", message='{"value": 24.5}') + + module.mqtt_subscribe(client_idx=0, msg_id=2, topic="commands/#", qos=1) + print("Subscribed to commands/#. Waiting for messages...") + + # Poll for 10 seconds to catch incoming MQTT messages via URC + for _ in range(10): + module.poll(1.0) + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run MQTT demo.") + parser.add_argument("port", nargs="?", default="/dev/ttyUSB0", help="Serial port (e.g. /dev/ttyUSB0 or MOCK)") + args = parser.parse_args() + run_mqtt_demo(args.port) diff --git a/examples/03_http_get.py b/examples/03_http_get.py new file mode 100644 index 0000000..0613b4a --- /dev/null +++ b/examples/03_http_get.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python3 +import argparse +from m2m.nbiot import factory + +def run_http_get(port): + module = factory('BG96', port) + + with module: + module.radio_on() + module.activate_pdp_context(context_id=1) + + print("Configuring HTTP...") + module.http_configure(context_id=1, response_header=True) + + print("Setting URL...") + if module.http_set_url("http://httpbin.org/get"): + print("Sending GET request...") + if module.http_get(): + print("\n--- Response Content ---") + content = module.http_read_response(wait_time=30) + print(content) + else: + print("HTTP GET failed.") + else: + print("Failed to set URL.") + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run HTTP GET demo.") + parser.add_argument("port", nargs="?", default="/dev/ttyUSB0", help="Serial port (e.g. /dev/ttyUSB0 or MOCK)") + args = parser.parse_args() + run_http_get(args.port) diff --git a/examples/04_psm_gnss.py b/examples/04_psm_gnss.py new file mode 100644 index 0000000..7ca7a85 --- /dev/null +++ b/examples/04_psm_gnss.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 +import argparse +from m2m.nbiot import factory +from m2m.utils import encode_psm_timer + +def run_low_power_location(port): + module = factory('BG95', port) + + with module: + # 1. Configure Power Saving Mode + # Example: 1 minute Active Time (T3324) and 24 hours Periodic TAU (T3412) + t3324_bin = encode_psm_timer(60, is_t3324=True) + t3412_bin = encode_psm_timer(24 * 3600, is_t3324=False) + + print(f"Setting PSM: T3324={t3324_bin}, T3412={t3412_bin}") + module.set_psm_settings(mode=1, tau=t3412_bin, active_time=t3324_bin) + + # 2. GNSS Location + print("Enabling GNSS...") + if module.enable_gnss(True): + print("Waiting for fix (may take up to 60s)...") + # Poll location until success + for i in range(10): + import time + time.sleep(5) + loc = module.get_gnss_location() + if loc: + print(f"Location Found: {loc}") + break + else: + print(f"Attempt {i+1}: No fix yet...") + + module.enable_gnss(False) # Save power + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run PSM and GNSS demo.") + parser.add_argument("port", nargs="?", default="/dev/ttyUSB0", help="Serial port (e.g. /dev/ttyUSB0 or MOCK)") + args = parser.parse_args() + run_low_power_location(args.port) diff --git a/examples/05_async_urc_handler.py b/examples/05_async_urc_handler.py new file mode 100644 index 0000000..30be52e --- /dev/null +++ b/examples/05_async_urc_handler.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 +import threading +import time +import logging +import argparse +from m2m.nbiot import factory + +logging.basicConfig(level=logging.INFO, format='[%(threadName)s] %(message)s') + +def urc_listener(module): + """Background thread to continuously poll for URCs.""" + while True: + # poll() uses the internal lock, so it's safe to run alongside the main thread + module.poll(timeout=1.0) + time.sleep(0.1) + +def main(port): + module = factory('BG95', port) + + with module: + module.radio_on() + + # Start background listener + listener = threading.Thread(target=urc_listener, args=(module,), name="URC-Pump", daemon=True) + listener.start() + + # Main thread performs transactions + for i in range(10): + with module.transaction: + rssi, ber = module.get_signal_quality() + status = module.read_cereg_status() + logging.info(f"Check {i}: Signal={rssi}, RegStatus={status}") + + # The background thread can still process incoming data here + time.sleep(5) + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run async URC handler demo.") + parser.add_argument("port", nargs="?", default="/dev/ttyUSB0", help="Serial port (e.g. /dev/ttyUSB0 or MOCK)") + args = parser.parse_args() + main(args.port) diff --git a/examples/06_aws_iot_secure_lifecycle.py b/examples/06_aws_iot_secure_lifecycle.py new file mode 100644 index 0000000..c60285d --- /dev/null +++ b/examples/06_aws_iot_secure_lifecycle.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 +import logging +import argparse +from m2m.nbiot import factory + +def setup_aws_iot(port): + module = factory('BG95', port) + + # Configuration + AWS_ENDPOINT = "a3xxxxxxxxxxxx-ats.iot.us-east-1.amazonaws.com" + CLIENT_ID = "Sensor_Node_01" + + with module: + module.radio_on() + module.activate_pdp_context(1) + + # 1. Provision Certificates to Module RAM + print("Uploading Security Assets...") + module.upload_file("aws_ca.crt", b"CERT_DATA_HERE...") + module.upload_file("client.crt", b"CERT_DATA_HERE...") + module.upload_file("private.key", b"KEY_DATA_HERE...") + + # 2. Map Assets to SSL Context 1 + print("Configuring SSL Engine...") + module.ssl_configure( + ssl_ctx_id=1, + cacert="aws_ca.crt", + clientcert="client.crt", + clientkey="private.key" + ) + + # 3. Configure MQTT for SSL + module.set_qcfg("mqtt/ssl", 1) + module.mqtt_configure(client_idx=0, keepalive=60) + + # 4. Connect and Publish + if module.mqtt_open(0, AWS_ENDPOINT, 8883) == 0: + if module.mqtt_connect(0, CLIENT_ID) == 0: + print("AWS IoT Connected!") + module.mqtt_publish(0, 1, 1, 0, "dt/sensor/data", '{"status": "ok", "uptime": 1200}') + + # Stay connected to receive shadows/commands + module.mqtt_subscribe(0, 2, "cmd/sensor/01", 1) + module.poll(30) + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run AWS IoT lifecycle demo.") + parser.add_argument("port", nargs="?", default="/dev/ttyUSB0", help="Serial port (e.g. /dev/ttyUSB0 or MOCK)") + args = parser.parse_args() + setup_aws_iot(args.port) diff --git a/examples/07_network_diagnostic_recovery.py b/examples/07_network_diagnostic_recovery.py new file mode 100644 index 0000000..ee26cd7 --- /dev/null +++ b/examples/07_network_diagnostic_recovery.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 +import logging +import argparse +from m2m.nbiot import factory + +def recover_connection(port): + module = factory('BG96', port) + + with module: + print("Attempting to connect...") + module.radio_on() + module.attach_network() + + status = module.read_cereg_status() + if status not in [1, 5]: # Not Home or Roaming + print("Registration failed. Starting diagnostics...") + + # 1. Check Extended Error + error = module.get_extended_error_report() + print(f"3GPP Error: {error}") + + # 2. Check for Forbidden Networks + fplmns = module.read_forbidden_plmns() + print(f"Forbidden Networks: {fplmns}") + + # 3. Optimization: Force NB-IoT priority and re-scan + print("Switching strategy: Prioritizing NB-IoT and clearing scan sequence...") + module.set_iot_op_mode("NB-IoT") + module.set_nw_scan_priority("NB-IoT", "eMTC", "GSM") + + # 4. Deep Reboot to apply logic + module.reboot() + + # Retry + module.attach_network() + print(f"New Status: {module.read_cereg_status()}") + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run network recovery demo.") + parser.add_argument("port", nargs="?", default="/dev/ttyUSB0", help="Serial port (e.g. /dev/ttyUSB0 or MOCK)") + args = parser.parse_args() + recover_connection(args.port) diff --git a/examples/08_power_optimized_sensor.py b/examples/08_power_optimized_sensor.py new file mode 100644 index 0000000..3473899 --- /dev/null +++ b/examples/08_power_optimized_sensor.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 +import time +import argparse +from m2m.nbiot import factory +from m2m.utils import encode_psm_timer + +def industrial_sensor_cycle(port): + # Setup for ultra-low power + module = factory('BG95', port) + + with module: + # 1. Wake up and check GNSS + module.enable_gnss(True) + location = module.get_gnss_location() + module.enable_gnss(False) # Turn off immediately to save mA + + # 2. Attach and send payload + module.radio_on() + module.activate_pdp_context(1) + + payload = f'{{"loc": "{location}", "batt": 3.8, "alert": false}}' + module.send_data_hex(0, payload.encode('ascii')) + + # 3. Calculate optimized sleep + # T3324 (Active Time): 10 seconds (time to wait for incoming commands) + # T3412 (Periodic TAU): 12 hours (heartbeat interval) + t3324 = encode_psm_timer(10, is_t3324=True) + t3412 = encode_psm_timer(12 * 3600, is_t3324=False) + + print(f"Entering deep sleep (PSM). T3324={t3324}, T3412={t3412}") + module.set_psm_settings(mode=1, tau=t3412, active_time=t3324) + + # 4. Graceful Power Down + module.radio_off() + print("Device is now in hibernate. Power consumption < 5uA.") + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run power optimized sensor demo.") + parser.add_argument("port", nargs="?", default="/dev/ttyUSB0", help="Serial port (e.g. /dev/ttyUSB0 or MOCK)") + args = parser.parse_args() + industrial_sensor_cycle(args.port) diff --git a/examples/09_sim_forensics.py b/examples/09_sim_forensics.py new file mode 100644 index 0000000..08585f4 --- /dev/null +++ b/examples/09_sim_forensics.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 +import logging +import argparse +from m2m.nbiot import factory +from m2m.utils import fmt_val + +# Configure logging to hide noisy internal AT command logs for this report +logging.basicConfig(level=logging.WARNING) + +def sim_deep_forensics(port): + # Supports BG96, BG95, or BC66 + module = factory('BG95', port) + + with module: + print("="*50) + print(" SIM CARD NETWORK FORENSICS REPORT") + print("="*50) + + # Basic Identification + print(f"{'ICCID:':<25} {fmt_val(module.read_iccid())}") + print(f"{'IMSI:':<25} {fmt_val(module.read_imsi())}") + + # Core PLMN Information + print("\n--- HOME & REGISTRATION ---") + print(f"{'Home PLMN (HPLMN):':<25} {fmt_val(module.get_home_plmn())}") + print(f"{'Last Registered (RPLMN):':<25} {fmt_val(module.get_registered_plmn())}") + + # Priority Lists + print("\n--- PRIORITY LISTS (EHPLMN / EPLMN) ---") + print(f"{'Equivalent Home:':<25} {fmt_val(module.read_ehplmns())}") + print(f"{'Equivalent PLMNs:':<25} {fmt_val(module.read_eplmns())}") + + # Advanced Selection Lists (with Access Technology) + print("\n--- SELECTOR LISTS (with Access Tech) ---") + print(f"{'User Controlled:':<25} {fmt_val(module.read_user_plmn_act())}") + print(f"{'Operator Controlled:':<25} {fmt_val(module.read_operator_plmn_act())}") + print(f"{'HPLMN Selector:':<25} {fmt_val(module.read_hplmn_act())}") + print(f"{'Legacy PLMN Selector:':<25} {fmt_val(module.read_plmn_selector())}") + + # Restrictions & Special Groups + print("\n--- RESTRICTIONS ---") + print(f"{'Forbidden PLMNs:':<25} {fmt_val(module.read_forbidden_plmns())}") + print(f"{'Closed Access Group:':<25} {fmt_val(module.read_cag_info())}") + + print("\n" + "="*50) + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run SIM forensics report.") + parser.add_argument("port", nargs="?", default="/dev/ttyUSB0", help="Serial port (e.g. /dev/ttyUSB0 or MOCK)") + args = parser.parse_args() + sim_deep_forensics(args.port) diff --git a/examples/10_mock_simulation.py b/examples/10_mock_simulation.py new file mode 100644 index 0000000..9b477ed --- /dev/null +++ b/examples/10_mock_simulation.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 +import logging +import argparse +from m2m.nbiot import factory + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(name)s - %(levelname)s - %(message)s') + +def run_simulation(port): + print(f"Starting Simulation on {port}...") + + # Initialize with the provided port (e.g. 'MOCK') + module = factory('BG95', port) + + with module: + print("\n--- Basic Checks ---") + if module.check_state(): + print("Module is responsive (AT -> OK)") + + print(f"Firmware: {module.read_firmware_version()}") + print(f"IMEI: {module.read_imei()}") + print(f"IMSI: {module.read_imsi()}") + + print("\n--- Network Simulation ---") + # The mock has default responses for these if 'MOCK' is used + if module.attach_network(): + print("Attached to network") + + rssi, ber = module.get_signal_quality() + print(f"Signal Quality: RSSI={rssi}, BER={ber}") + + reg_status = module.read_cereg_status() + print(f"Registration Status: {reg_status}") + + print("\n--- Extended Info ---") + print(f"Network Info: {module.get_network_info()}") + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run simulation.") + parser.add_argument("port", nargs="?", default="MOCK", help="Serial port (default: MOCK)") + args = parser.parse_args() + run_simulation(args.port) diff --git a/examples/11_comprehensive_test.py b/examples/11_comprehensive_test.py new file mode 100755 index 0000000..ff631ce --- /dev/null +++ b/examples/11_comprehensive_test.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python3 +import logging +import argparse +import time +from m2m.nbiot import factory + +# Configure clean logging output +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s [%(levelname)s] %(message)s', + datefmt='%H:%M:%S' +) +logger = logging.getLogger('comprehensive_test') + +def run_suite(module, apn): + """Runs the core network test suite and returns metrics.""" + results = {'rssi': 99, 'dl_kbps': 0.0, 'ul_kbps': 0.0} + + print(" - Signal Quality...") + rssi, ber = module.get_signal_quality() + results['rssi'] = rssi + print(f" RSSI: {rssi}, BER: {ber}") + + print(" - DNS Resolution (google.com)...") + ips = module.dns_query("google.com") + print(f" IPs: {ips if ips else 'Failed'}") + + if ips: + print(" - Ping (8.8.8.8)...") + ping_res = module.ping("8.8.8.8", num=3) + summary = module.parse_ping_summary(ping_res) + if summary: + print(f" Summary: {summary.get('rcvd')}/{summary.get('sent')} received, RTT avg: {summary.get('avg')}ms") + else: + print(" Ping failed.") + + # Symmetrical Speed Tests (150KB each) + test_size_kb = 150 + + # Download + print(f" - Speed Test (Download {test_size_kb}KB)...") + module.http_configure(context_id=1, response_header=False) + if module.http_set_url(f"http://httpbin.org/bytes/{test_size_kb * 1024}"): + start = time.time() + if module.http_get(timeout=300): + resp = module.http_read_response(wait_time=120) + duration = time.time() - start + actual_size_kb = len(resp) / 1024 + if actual_size_kb > 1: + speed = (actual_size_kb * 8) / duration + results['dl_kbps'] = speed + print(f" DL: {actual_size_kb:.1f} KB in {duration:.1f}s ({speed:.1f} kbps)") + else: print(" DL test failed.") + + # Upload + print(f" - Speed Test (Upload {test_size_kb}KB)...") + dummy_data = b"x" * (test_size_kb * 1024) + if module.http_set_url("http://httpbin.org/post"): + start = time.time() + if module.http_post(dummy_data, timeout=300): + duration = time.time() - start + speed = (test_size_kb * 8) / duration + results['ul_kbps'] = speed + print(f" UL: {test_size_kb:.1f} KB in {duration:.1f}s ({speed:.1f} kbps)") + else: print(" UL test failed.") + + return results + +def run_comprehensive_test(port, module_type='BG96', apn='iot.telefonica.de'): + print("\n" + "="*80) + print(f" MULTI-OPERATOR BENCHMARK: {module_type} on {port}") + print("="*80) + + module = factory(module_type, port) + all_results = [] + + with module: + # 1. Basic Health & Identification + print("\n[1/5] Modem Identification...") + if not module.check_state(): + print("ERROR: Modem not responding!") + return + module.set_echo_mode(True) + imei = module.read_imei() + print(f" IMEI: {imei}") + + # 2. Operator Search + print("\n[2/5] Scanning for available operators (this takes a while)...") + operators = module.search_operators(timeout=180) + if not operators: + print(" No operators found or scan failed.") + return + + print(f" Found {len(operators)} operators.") + + # 3. Test Each Operator + print("\n[3/5] Starting Multi-Operator Test Suite...") + for i, op in enumerate(operators, 1): + stat_map = {'0': 'Unknown', '1': 'Available', '2': 'Current', '3': 'Forbidden'} + op_name = f"{op['long']} ({op['mccmnc']})" + print(f"\n ({i}/{len(operators)}) Testing Operator: {op_name} [Status: {stat_map.get(op['status'], op['status'])}]") + + if op['status'] == '3': + print(" Skipping forbidden operator.") + continue + + if module.set_operator(op['mccmnc'], format=2, act=int(op['act'])): + print(" - Waiting for registration...") + start_time = time.time() + registered = False + while time.time() - start_time < 90: + if module.is_registered(): + registered = True + break + time.sleep(2) + + if registered: + print(f" - Registered! Attaching and activating context with APN '{apn}'...") + if module.attach_network() and module.activate_pdp_context(1): + res = run_suite(module, apn) + res['name'] = op_name + all_results.append(res) + else: + print(" - Failed to attach or activate PDP.") + else: + print(" - Failed to register on this operator.") + else: + print(" - Failed to select operator.") + + # 4. Summary & Ranking + print("\n" + "="*80) + print(" FINAL BENCHMARK SUMMARY") + print("="*80) + if not all_results: + print("No network tests were successful.") + else: + # Rank by DL Speed + UL Speed + ranked = sorted(all_results, key=lambda x: (x['dl_kbps'] + x['ul_kbps']), reverse=True) + + for i, res in enumerate(ranked, 1): + star = "★ " if i == 1 else " " + print(f"{star}#{i} {res['name']:<25} | DL: {res['dl_kbps']:>6.1f} kbps | UL: {res['ul_kbps']:>6.1f} kbps | RSSI: {res['rssi']}") + + best = ranked[0] + print("\nWINNER: " + best['name'] + " is the best performing operator!") + + # 5. Hardware & Time + print("\n[5/5] Final Hardware Health...") + print(f" Internal Temp: {module.get_temperature()} C") + bat = module.get_battery_status() + print(f" Battery: {bat[1]}%, {bat[2]}mV | Module Clock: {module.get_clock()}") + + print("\n" + "="*80) + print(" MULTI-OPERATOR BENCHMARK FINISHED") + print("="*80) + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Multi-operator benchmark test.") + parser.add_argument("port", help="Serial port or socket URL") + parser.add_argument("--type", default="BG95", help="Module type (BG95, BG96, BC66)") + parser.add_argument("--apn", default="iot.telefonica.de", help="APN for connection") + args = parser.parse_args() + + run_comprehensive_test(args.port, args.type, args.apn) diff --git a/examples/12_disable_psm.py b/examples/12_disable_psm.py new file mode 100644 index 0000000..64a173c --- /dev/null +++ b/examples/12_disable_psm.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 +import argparse +import logging +from m2m.nbiot import factory + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') + +def disable_psm(port, module_type='BG95'): + module = factory(module_type, port) + + with module: + print(f"--- PSM Deactivation Utility for {module_type} ---") + + # 1. Read current status + print("Reading current PSM status...") + status = module.read_psm_status() + print(f"Current Status: {status if status else 'Not set or error'}") + + # 2. Deactivate PSM + print("Sending deactivation command (AT+CPSMS=0)...") + if module.disable_psm(): + print("OK: PSM has been successfully deactivated.") + else: + print("ERROR: Failed to deactivate PSM.") + + # 3. Verify final status + print("Verifying final status...") + final_status = module.read_psm_status() + print(f"Final Status: {final_status}") + + if "+CPSMS: 0" in final_status: + print("SUCCESS: PSM is confirmed OFF.") + else: + print("WARNING: PSM status might still be active or returned unexpected response.") + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Deactivate PSM (Power Saving Mode) on the modem.") + parser.add_argument("port", help="Serial port (e.g. /dev/ttyUSB0, socket://host:port, or MOCK)") + parser.add_argument("--type", default="BG95", help="Module type (BG95, BG96, BC66)") + args = parser.parse_args() + + disable_psm(args.port, args.type) diff --git a/m2m/__init__.py b/m2m/__init__.py new file mode 100644 index 0000000..02113c6 --- /dev/null +++ b/m2m/__init__.py @@ -0,0 +1,10 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +__version__ = '1.0.0' + +from m2m.nbiot import factory, ModuleBG96, ModuleBG95, ModuleBC66, ModuleHiSi +from m2m.serial import SerialPort +from m2m.interactive import ModemShell + +__all__ = ['factory', 'SerialPort', 'ModuleBG96', 'ModuleBG95', 'ModuleBC66', 'ModuleHiSi', 'ModemShell'] diff --git a/m2m/__main__.py b/m2m/__main__.py new file mode 100644 index 0000000..f65d609 --- /dev/null +++ b/m2m/__main__.py @@ -0,0 +1 @@ +from m2m.cli import main; main() diff --git a/m2m/cli.py b/m2m/cli.py new file mode 100644 index 0000000..842f614 --- /dev/null +++ b/m2m/cli.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +import argparse +import logging +from m2m.interactive import ModemShell + +# Hide logs to keep the shell clean, unless requested +logging.basicConfig(level=logging.WARNING) + +def main(): + parser = argparse.ArgumentParser(description="Interactive Modem Control Shell") + parser.add_argument("port", help="Serial port (e.g. /dev/ttyUSB0, socket://host:port, or MOCK)") + parser.add_argument("--type", default="BG95", help="Module type (BG95, BG96, BC66)") + parser.add_argument("--apn", default="iot.telefonica.de", help="Default APN for connection") + parser.add_argument("--verbose", action="store_true", help="Enable verbose logging") + + args = parser.parse_args() + + if args.verbose: + logging.getLogger().setLevel(logging.INFO) + + shell = ModemShell(args.type, args.port, args.apn) + shell.run() + +if __name__ == "__main__": + main() diff --git a/m2m/exceptions.py b/m2m/exceptions.py new file mode 100644 index 0000000..96b29f5 --- /dev/null +++ b/m2m/exceptions.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Custom exceptions for the m2m library. +""" + +class M2MError(Exception): + """Base class for all m2m library exceptions.""" + pass + +class SerialError(M2MError): + """Raised when a serial communication error occurs.""" + pass + +class ATCommandError(M2MError): + """Raised when an AT command returns an error or times out.""" + pass + +class TimeoutError(M2MError): + """Raised when an operation times out.""" + pass + +class NetworkError(M2MError): + """Raised when a network operation fails (e.g. attach, context activation).""" + pass diff --git a/m2m/interactive.py b/m2m/interactive.py new file mode 100644 index 0000000..0e551fe --- /dev/null +++ b/m2m/interactive.py @@ -0,0 +1,481 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import sys +import logging +import shlex +import time +import threading +from typing import Optional, List, Dict, Union + +from prompt_toolkit import PromptSession +from prompt_toolkit.completion import WordCompleter +from prompt_toolkit.shortcuts import print_formatted_text, clear as clear_screen +from prompt_toolkit.formatted_text import HTML +from prompt_toolkit.styles import Style +from prompt_toolkit.patch_stdout import patch_stdout + +from m2m.nbiot import factory +from m2m.utils import fmt_val +from m2m.exceptions import SerialError, NetworkError + +logger = logging.getLogger('m2m.interactive') + +class ModemShell: + """ + Interactive shell for controlling IoT modules. + """ + def __init__(self, module_type: str, port: str, apn: str = 'iot.telefonica.de'): + self.module_type = module_type + self.port = port + self.apn = apn + try: + self.module = factory(module_type, port) + except Exception as e: + print_formatted_text(HTML(f'Error: Could not initialize module on {port}: {e}')) + sys.exit(1) + + # Command Dispatch Table + self.commands = { + 'info': self.do_info, + 'status': self.do_status, + 'nw_info': self.do_nw_info, + 'cells': self.do_cells, + 'forensics': self.do_forensics, + 'scan': self.do_scan, + 'connect': self.do_connect, + 'disconnect': self.do_disconnect, + 'operator': self.do_operator, + 'ping': self.do_ping, + 'dns': self.do_dns, + 'http_get': self.do_http_get, + 'udp_send': self.do_udp_send, + 'udp_listen': self.do_udp_listen, + 'tcp_client': self.do_tcp_client, + 'sockets': self.do_sockets, + 'close': self.do_close, + 'recv': self.do_recv, + 'sms_send': self.do_sms_send, + 'sms_list': self.do_sms_list, + 'radio': self.do_radio, + 'reboot': self.do_reboot, + 'psm': self.do_psm, + 'mode': self.do_mode, + 'edrx': self.do_edrx, + 'bands': self.do_bands, + 'gpio': self.do_gpio, + 'ri': self.do_ri, + 'urc_delay': self.do_urc_delay, + 'at': self.do_at, + 'error': self.do_error, + 'ussd': self.do_ussd, + 'clock': self.do_clock, + 'temp': self.do_temp, + 'ls': self.do_ls, + 'power_off': self.do_power_off, + 'help': self.do_help, + 'clear': self.do_clear, + 'exit': self.do_exit, + 'quit': self.do_exit + } + + self.completer = WordCompleter( + list(self.commands.keys()) + + ['on', 'off', 'loc', 'status', 'disable', 'sync', 'show', 'gsm', 'catm', 'nb', 'nb+emtc', 'all', 'opt'], + ignore_case=True + ) + self.style = Style.from_dict({ + 'prompt': '#00ff00 bold', + 'command': '#ffffff', + 'error': '#ff0000', + 'info': '#00ffff', + 'success': '#00ff00', + 'header': '#ffff00 bold', + 'alert': '#ff8800 bold', + }) + + self._stop_bg = threading.Event() + self._bg_thread = None + + # Register callbacks + if hasattr(self.module.s_port, 'add_callback'): + self.module.s_port.add_callback(b'+QIURC: "recv",', self._on_data_received) + self.module.s_port.add_callback(b'+QIURC: "pdpdeact",', self._on_pdp_deact) + + def _on_data_received(self, payload: bytes): + try: + conn_id = payload.decode().strip() + print_formatted_text(HTML(f'\n[DATA] Socket {conn_id} received data. Type "recv {conn_id}"')) + except: pass + + def _on_pdp_deact(self, payload: bytes): + print_formatted_text(HTML('\n[NETWORK] PDP Context Deactivated by network.')) + + def _bg_worker(self): + """Polls for URCs when idle without blocking the main lock indefinitely.""" + while not self._stop_bg.is_set(): + try: + if self.module.s_port._serial.in_waiting > 0: + if self.module.s_port.lock.acquire(blocking=False): + try: + self.module.s_port.read_any(timeout=0.1) + finally: + self.module.s_port.lock.release() + except: pass + self._stop_bg.wait(0.5) + + def run(self): + print_formatted_text(HTML(f'Interactive Modem Shell ({self.module_type} on {self.port})')) + print_formatted_text(HTML('Type help for available commands, exit to quit.\n')) + + session = PromptSession(completer=self.completer, style=self.style) + self._bg_thread = threading.Thread(target=self._bg_worker, daemon=True) + self._bg_thread.start() + + with patch_stdout(): + with self.module: + while True: + try: + text = session.prompt(HTML('m2m> ')) + except (KeyboardInterrupt, EOFError): break + + if not (stripped := text.strip()): continue + + try: + parts = shlex.split(stripped) + except ValueError as e: + print_formatted_text(HTML(f'Parse error: {e}')); continue + + cmd, args = parts[0].lower(), parts[1:] + if handler := self.commands.get(cmd): + try: handler(args) + except Exception as e: + print_formatted_text(HTML(f'Error: {e}')) + else: + print_formatted_text(HTML(f'Unknown command: {cmd}')) + + self._stop_bg.set() + + def do_help(self, args): + """Show available commands.""" + print_formatted_text(HTML('
Available Commands:
')) + for c in sorted(self.commands.keys()): + doc = (self.commands[c].__doc__ or "No description.").replace('<', '<').replace('>', '>') + print_formatted_text(HTML(f' {c:<12} {doc}')) + + def do_clear(self, _): + """Clear the terminal screen.""" + clear_screen() + + def do_info(self, _): + """Read detailed module identification and configuration.""" + print_formatted_text(HTML('Reading identification...')) + print(f" IMEI: {self.module.read_imei() or 'N/A'}") + print(f" IMSI: {self.module.read_imsi() or 'N/A'}") + print(f" ICCID: {self.module.read_iccid() or 'N/A'}") + print(f" FW: {self.module.read_firmware_version() or 'N/A'}") + + if hasattr(self.module, 'get_nw_scan_mode'): + mode_map = {0: "Automatic", 1: "GSM Only", 2: "LTE Only", 3: "GSM + LTE"} + print(f" ScanMode: {mode_map.get(self.module.get_nw_scan_mode(), 'Unknown')}") + + if hasattr(self.module, 'get_iot_op_mode_val'): + op_map = {0: "eMTC", 1: "NB-IoT", 2: "eMTC + NB-IoT"} + print(f" IoTMode: {op_map.get(self.module.get_iot_op_mode_val(), 'Unknown')}") + + def do_status(self, _): + """Show registration and signal status.""" + print_formatted_text(HTML('Checking status...')) + rssi, ber = self.module.get_signal_quality() + print(f" Signal: RSSI={rssi}, BER={ber}") + print(f" Registered: {'Yes' if self.module.is_registered() else 'No'}") + print(f" Operator: {self.module.get_current_operator() or 'None'}") + print(f" IP Address: {self.module.get_ip_address(1) or 'None'}") + + def do_nw_info(self, _): + """Show detailed network and serving cell info.""" + if hasattr(self.module, 'get_network_info'): + print(f" Network: {self.module.get_network_info()}") + if hasattr(self.module, 'get_serving_cell_info'): + print(f" Cell: {self.module.get_serving_cell_info()}") + + def do_cells(self, _): + """Show serving and neighbor cell information.""" + if hasattr(self.module, 'get_serving_cell_info'): + print(f" Serving: {self.module.get_serving_cell_info()}") + if hasattr(self.module, 'get_neighbor_cells'): + for n in self.module.get_neighbor_cells(): + print(f" - {n}") + + def do_forensics(self, _): + """Full SIM card forensics report.""" + print_formatted_text(HTML('
SIM CARD NETWORK FORENSICS
')) + print(f"{'ICCID:':<25} {fmt_val(self.module.read_iccid())}") + print(f"{'IMSI:':<25} {fmt_val(self.module.read_imsi())}") + print(f"{'Home PLMN:':<25} {fmt_val(self.module.get_home_plmn())}") + print(f"{'Registered PLMN:':<25} {fmt_val(self.module.get_registered_plmn())}") + print(f"{'Forbidden PLMNs:':<25} {fmt_val(self.module.read_forbidden_plmns())}") + print(f"{'Equivalent Home:':<25} {fmt_val(self.module.read_ehplmns())}") + + def do_scan(self, _): + """Scan for available network operators.""" + print_formatted_text(HTML('Scanning for operators...')) + ops = self.module.search_operators(timeout=180) + for op in (ops or []): + s_map = {'0': 'Unknown', '1': 'Available', '2': 'Current', '3': 'Forbidden'} + print(f" - {op['long']} ({op['mccmnc']}) [{s_map.get(op['status'], op['status'])}, Tech: {op['act']}]") + + def do_connect(self, args): + """Connect to network. Usage: connect [apn] [--force]""" + force = '--force' in args + clean_args = [a for a in args if a != '--force'] + apn = clean_args[0] if clean_args else self.apn + print_formatted_text(HTML(f'Connecting with APN: {apn}...')) + if self.module.connect_network(apn=apn, force=force): + print_formatted_text(HTML('Connected!')) + else: print_formatted_text(HTML('Failed.')) + + def do_disconnect(self, _): + """Deactivate PDP context.""" + if hasattr(self.module, 'deactivate_pdp_context') and self.module.deactivate_pdp_context(1): + print_formatted_text(HTML('Disconnected.')) + else: print_formatted_text(HTML('Failed or not supported.')) + + def do_operator(self, args): + """Select operator manually. Usage: operator [act]""" + if not args: print("Usage: operator [act]"); return + plmn, act = args[0], int(args[1]) if len(args) > 1 else 8 + if self.module.set_operator(plmn, act=act): + print_formatted_text(HTML('Done.')) + else: print_formatted_text(HTML('Failed.')) + + def do_ping(self, args): + """Ping a host. Usage: ping [num]""" + if not args: print("Usage: ping [num]"); return + host, num = args[0], int(args[1]) if len(args) > 1 else 4 + res = self.module.ping(host, num=num) + for r in res: print(f" {r}") + if (s := self.module.parse_ping_summary(res)): + print_formatted_text(HTML(f'RTT avg: {s["avg"]}ms')) + + def do_dns(self, args): + """Resolve a hostname. Usage: dns """ + if not args: print("Usage: dns "); return + if (ips := self.module.dns_query(args[0])): + print(f" IPs: {', '.join(ips)}") + else: print_formatted_text(HTML('Failed.')) + + def do_http_get(self, args): + """Perform HTTP GET. Usage: http_get """ + if not args: print("Usage: http_get "); return + url = args[0] if args[0].startswith('http') else f'http://{args[0]}' + if self.module.http_set_url(url) and self.module.http_get(): + print("-" * 40 + f"\n{self.module.http_read_response()}\n" + "-" * 40) + else: print_formatted_text(HTML('Failed.')) + + def do_udp_send(self, args): + """Send a UDP packet. Usage: udp_send """ + if len(args) < 3: print("Usage: udp_send "); return + host, port, data = args[0], int(args[1]), " ".join(args[2:]) + sock_id = -1 + if hasattr(self.module, 'get_socket_state'): + for s in self.module.get_socket_state(): + if s['type'] == 'UDP' and s['remote'] == f"{host}:{port}": + sock_id = int(s['id']); break + if sock_id == -1: + sock_id = self.module.open_socket(service_type="UDP", remote_ip=host, remote_port=port) + if sock_id >= 0 and self.module.send_data_hex(sock_id, data.encode()): + print_formatted_text(HTML('Sent!')) + else: print_formatted_text(HTML('Failed.')) + + def do_udp_listen(self, args): + """Open a local UDP listener. Usage: udp_listen """ + if not args: print("Usage: udp_listen "); return + sid = self.module.open_socket(service_type="UDP SERVICE", remote_ip="127.0.0.1", local_port=int(args[0])) + if sid >= 0: print_formatted_text(HTML(f'Listening on socket {sid}.')) + else: print_formatted_text(HTML('Failed.')) + + def do_tcp_client(self, args): + """Connect to a TCP server. Usage: tcp_client """ + if len(args) < 2: print("Usage: tcp_client "); return + sid = self.module.open_socket(service_type="TCP", remote_ip=args[0], remote_port=int(args[1])) + if sid >= 0: print_formatted_text(HTML(f'Socket {sid} connected.')) + else: print_formatted_text(HTML('Failed.')) + + def do_sockets(self, _): + """List active sockets.""" + if hasattr(self.module, 'get_socket_state'): + socks = self.module.get_socket_state() + for s in socks: print(f" ID {s['id']}: {s['type']:<8} | {s['remote']:<20} | {s['state']}") + else: print("Not supported.") + + def do_close(self, args): + """Close a socket. Usage: close """ + if args and self.module.close_socket(int(args[0])): + print_formatted_text(HTML('Closed.')) + else: print_formatted_text(HTML('Failed.')) + + def do_recv(self, args): + """Read socket data. Usage: recv [len]""" + if not args: print("Usage: recv [len]"); return + data = self.module.receive_data(int(args[0]), int(args[1]) if len(args) > 1 else 1500) + if data: print("-" * 20 + f"\n{data.decode(errors='replace')}\n" + "-" * 20) + else: print(" No data.") + + def do_sms_send(self, args): + """Send an SMS. Usage: sms_send """ + if len(args) < 2: + print("Usage: sms_send ") + return + + number, message = args[0], " ".join(args[1:]) + print_formatted_text(HTML(f'Sending SMS to {number}...')) + if self.module.send_sms(number, message): + print_formatted_text(HTML('Sent!')) + else: + print_formatted_text(HTML('Failed to send SMS.')) + + def do_sms_list(self, args): + """List SMS. Usage: sms_list [ALL|REC UNREAD]""" + for msg in self.module.list_sms(args[0] if args else "ALL"): print(msg) + + def do_radio(self, args): + """Toggle radio. Usage: radio """ + if not args: print(f"Radio is {'ON' if self.module.is_radio_on() else 'OFF'}"); return + self.module.radio_on() if args[0].lower() == 'on' else self.module.radio_off() + + def do_reboot(self, _): + """Reboot the module.""" + self.module.reboot() + + def do_mode(self, args): + """Set connectivity preference. Usage: mode """ + if not args: + print("Usage: mode ") + return + + mode = args[0].lower() + print_formatted_text(HTML(f'Setting connectivity mode to {mode}...')) + if hasattr(self.module, 'set_connectivity_mode'): + if self.module.set_connectivity_mode(mode): + print_formatted_text(HTML('Mode set successfully.')) + else: + print_formatted_text(HTML('Failed to set mode.')) + else: + print("Mode configuration not supported on this module.") + + def do_psm(self, args): + """Control PSM. Usage: psm """ + sub = args[0].lower() if args else 'status' + if sub == 'status': print(f" PSM: {self.module.read_psm_status() or 'Unknown'}") + elif sub == 'off': self.module.disable_psm() + elif sub == 'on' and len(args) >= 3: self.module.configure_psm_auto(int(args[1]), int(args[2])) + elif sub == 'opt': self.module.configure_psm_optimization() + + def do_edrx(self, args): + """Configure eDRX. Usage: edrx """ + if len(args) >= 3 and self.module.set_edrx(int(args[0]), int(args[1]), args[2]): + print_formatted_text(HTML('Done.')) + + def do_bands(self, args): + """Show or lock frequency bands. Usage: bands [gsm|catm|nb] [add|remove] """ + from m2m.utils import hex_mask_to_bands, bands_to_hex_mask + + if not hasattr(self.module, 'get_locked_bands'): + print("Band control not supported."); return + + current = self.module.get_locked_bands() + # Map to actual lists of integers + band_lists = {k: hex_mask_to_bands(v) for k, v in current.items()} + + if not args: + print_formatted_text(HTML('Currently Active Bands:')) + for rat, blist in band_lists.items(): + print(f" {rat.upper():<5}: {blist}") + return + + if len(args) < 3: + print("Usage: bands "); return + + rat, action, bnum = args[0].lower(), args[1].lower(), int(args[2]) + if rat not in band_lists: + print(f"Invalid RAT: {rat}. Use gsm, catm, or nb."); return + + if action == 'add': + if bnum not in band_lists[rat]: band_lists[rat].append(bnum) + elif action == 'remove': + if bnum in band_lists[rat]: band_lists[rat].remove(bnum) + else: + print(f"Invalid action: {action}. Use add or remove."); return + + # Apply changes + print_formatted_text(HTML(f'Updating {rat.upper()} bands...')) + if self.module.lock_bands( + gsm_bands=band_lists.get('gsm', []), + catm_bands=band_lists.get('catm', []), + nb_bands=band_lists.get('nb', []) + ): + print_formatted_text(HTML('Bands updated successfully.')) + else: + print_formatted_text(HTML('Failed to update bands.')) + + def do_gpio(self, args): + """Control GPIO. Usage: gpio set | get """ + if len(args) >= 2: + if args[0] == 'set' and len(args) >= 3: self.module.set_gpio(3, int(args[1]), int(args[2])) + elif args[0] == 'get': print(f"GPIO {args[1]}: {self.module.get_gpio(int(args[1]))}") + + def do_ri(self, args): + """Configure Ring Indicator. Usage: ri [duration]""" + if len(args) >= 1: + dur = int(args[1]) if len(args) > 1 else 120 + if hasattr(self.module, 'configure_ri_pin') and self.module.configure_ri_pin(args[0], dur): + print_formatted_text(HTML('Done.')) + + def do_urc_delay(self, args): + """Configure URC delay. Usage: urc_delay """ + if args and hasattr(self.module, 'configure_urc_delay'): + if self.module.configure_urc_delay(args[0].lower() == 'on'): + print_formatted_text(HTML('Done.')) + + def do_at(self, args): + """Send raw AT command.""" + if args: + cmd = " ".join(args) + if not cmd.upper().startswith('AT'): cmd = f'AT{cmd}' + print(self.module.send_at_command(cmd).decode(errors='ignore')) + + def do_error(self, _): + """Show extended error report (AT+CEER).""" + print(f" Report: {self.module.get_extended_error_report()}") + + def do_ussd(self, args): + """Send USSD code. Usage: ussd """ + if args: print(f" Response: {self.module.send_ussd(args[0])}") + + def do_clock(self, args): + """Show or sync clock. Usage: clock [show|sync]""" + sub = args[0].lower() if args else 'show' + if sub == 'show': print(f" Clock: {self.module.get_clock()}") + elif sub == 'sync': + if self.module.ntp_sync(): print(f" New Clock: {self.module.get_clock()}") + else: print_formatted_text(HTML('Sync failed.')) + + def do_temp(self, _): + """Read module temperature.""" + if hasattr(self.module, 'get_temperature'): + print(f" Temp: {self.module.get_temperature()} C") + + def do_ls(self, args): + """List files. Usage: ls [pattern]""" + files = self.module.list_files(args[0] if args else "*") + print(f"{'Filename':<30} | {'Size':>10}") + for f in files: print(f"{f['name']:<30} | {f['size']:>10,}") + + def do_power_off(self, _): + """Power off.""" + self.module.power_off() + + def do_exit(self, _): + """Exit shell.""" + sys.exit(0) diff --git a/m2m/misc/__init__.py b/m2m/misc/__init__.py new file mode 100644 index 0000000..2c0d63b --- /dev/null +++ b/m2m/misc/__init__.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +m2m.misc package initialization. +""" diff --git a/m2m/misc/at_parser.py b/m2m/misc/at_parser.py new file mode 100644 index 0000000..8ccd66b --- /dev/null +++ b/m2m/misc/at_parser.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Helper for parsing AT command responses. +""" + +import logging +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from m2m.serial.serial_port import SerialPort + +logger = logging.getLogger('atparser') + + +class AtParser: + """ + Parses responses from the serial port to determine command success/failure. + """ + + def __init__(self, serial_port: 'SerialPort'): + self._port = serial_port + self._last_response = b'' + + @property + def last_response(self) -> bytes: + return self._last_response + + def read_ok(self, timeout: float = 1.0) -> bool: + """ + Reads until a terminator is found and checks if it was 'OK'. + """ + self._last_response = self._port.read_until(terminator=b'OK', timeout=timeout) + # Check if the buffer *ends* with OK, or contains it on a line. + # Strict check: + lines = self._last_response.splitlines() + for line in reversed(lines): + line = line.strip() + if line == b'OK': + return True + if line == b'ERROR' or b'CME ERROR' in line: + return False + return False + + def is_on(self, query_cmd: bytes, response_prefix: bytes, timeout: float = 1.0) -> bool: + """ + Sends a query and checks if the value is '1'. + Example: is_on(b'AT+CFUN?', b'+CFUN:') + """ + self._port.send_cmd(query_cmd) + response = self._port.read_until(timeout=timeout) + self._last_response = response + + # Parse for prefix + for line in response.splitlines(): + line = line.strip() + if line.startswith(response_prefix): + # +CFUN: 1 + try: + # Remove prefix, then split by comma if multiple params + # We assume the first param is the status + payload = line[len(response_prefix):].strip() + # Handle cases like "1" or "1,0" + val_str = payload.split(b',')[0] + return int(val_str) == 1 + except ValueError: + pass + return False diff --git a/m2m/nbiot/__init__.py b/m2m/nbiot/__init__.py new file mode 100644 index 0000000..0347e4e --- /dev/null +++ b/m2m/nbiot/__init__.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +m2m.nbiot package initialization. +""" + +import logging +from typing import Dict, Type, Optional + +from m2m.nbiot.module import ModuleBase +from m2m.nbiot.module_bg96 import ModuleBG96 +from m2m.nbiot.module_bg95 import ModuleBG95 +from m2m.nbiot.module_bc66 import ModuleBC66 +from m2m.nbiot.module_hisi import ModuleHiSi + +# Establish parent logger +logging.getLogger('m2m').addHandler(logging.NullHandler()) + +# Chipset Registry +_MODULE_REGISTRY: Dict[str, Type[ModuleBase]] = { + 'HiSi': ModuleHiSi, + 'BG96': ModuleBG96, + 'BG95': ModuleBG95, + 'BC66': ModuleBC66, +} + +def factory(chipset: str, serial_port: str, **kwargs) -> ModuleBase: + """ + Factory for creating module instances based on chipset name. + + Args: + chipset: Name of the chipset (e.g., 'BG96', 'BG95', 'HiSi'). + serial_port: Device path (e.g., '/dev/ttyUSB0'). + **kwargs: Passed to the module constructor. + + Returns: + An instance of a ModuleBase subclass. + + Raises: + ValueError: If the chipset is not supported. + """ + module_class = _MODULE_REGISTRY.get(chipset) + if not module_class: + supported = ", ".join(_MODULE_REGISTRY.keys()) + raise ValueError(f"Unsupported chipset '{chipset}'. Supported: {supported}") + + return module_class(serial_port, **kwargs) + +__all__ = ['factory', 'ModuleBase', 'ModuleBG96', 'ModuleBG95', 'ModuleBC66', 'ModuleHiSi'] diff --git a/m2m/nbiot/module.py b/m2m/nbiot/module.py new file mode 100755 index 0000000..779e89f --- /dev/null +++ b/m2m/nbiot/module.py @@ -0,0 +1,529 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import logging +import time +from typing import Tuple, Union, Optional, ContextManager, List + +from m2m.misc.at_parser import AtParser +from m2m.serial.serial_port import SerialPort +from m2m.exceptions import NetworkError + +logger = logging.getLogger('m2m.module') + +class ModuleBase: + """ + Base class for all IoT modules. + Defines the standard 3GPP interface and the contract for vendor-specific implementations. + """ + + # SIM Elementary File IDs (3GPP TS 31.102 / 51.011) + EF_ICCID = 0x2FE2 + EF_IMSI = 0x6F07 + EF_AD = 0x6FAD + EF_LOCI = 0x6F7E + EF_PSLOCI = 0x6F73 + EF_FPLMN = 0x6F7B + EF_EHPLMN = 0x6F43 + EF_EPLMN = 0x6F66 + EF_PLMNsel = 0x6F30 + EF_PLMNwAcT = 0x6F60 + EF_OPLMNwAcT = 0x6F61 + EF_HPLMNwAcT = 0x6F62 + EF_CAG = 0x6FD9 + + def __init__(self, serial_port: str, baudrate: int = 9600, **kwargs): + self.s_port = SerialPort(serial_port, baudrate=baudrate, **kwargs) + self.at = AtParser(self.s_port) + self.cereg_level = 0 + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def close(self): + """Closes the serial port connection.""" + if self.s_port: + self.s_port.close() + + @property + def transaction(self) -> ContextManager: + """Returns a context manager for atomic transactions (thread locking).""" + return self.s_port.lock + + # --- Low Level AT --- + + def send_at_command(self, cmd: Union[bytes, str], timeout: float = 5.0) -> bytes: + """Sends an AT command and returns the raw response.""" + if isinstance(cmd, str): + cmd = cmd.encode('ascii') + with self.transaction: + self.s_port.send_cmd(cmd) + return self.s_port.read_until(timeout=timeout) + + def check_state(self) -> bool: + """Checks if module responds to AT.""" + with self.transaction: + self.s_port.send_cmd(b'AT') + return self.at.read_ok() + + def set_echo_mode(self, enable: bool = False) -> bool: + """Sets command echo (ATE0/ATE1).""" + cmd = b'ATE1' if enable else b'ATE0' + with self.transaction: + self.s_port.send_cmd(cmd) + return self.at.read_ok() + + def reboot(self) -> bool: + """Performs a software reboot (AT+CFUN=1,1).""" + with self.transaction: + self.s_port.send_cmd(b'AT+CFUN=1,1') + return self.at.read_ok(timeout=15) + + # --- Info & Identification --- + + def read_firmware_version(self) -> Optional[str]: + with self.transaction: + self.s_port.send_cmd(b'AT+CGMR') + response = self.s_port.read_until() + for line in response.splitlines(): + line = line.strip() + if line and line not in (b'OK', b'AT+CGMR', b'ERROR'): + return line.decode('ascii', errors='ignore') + return None + + def read_imei(self) -> Optional[str]: + with self.transaction: + self.s_port.send_cmd(b'AT+CGSN') + response = self.s_port.read_until() + for line in response.splitlines(): + line = line.strip() + if line.isdigit(): + return line.decode('ascii') + return None + + def read_imsi(self) -> Optional[str]: + with self.transaction: + self.s_port.send_cmd(b'AT+CIMI') + response = self.s_port.read_until() + for line in response.splitlines(): + line = line.strip() + if line.isdigit(): + return line.decode('ascii') + return None + + def read_iccid(self) -> Optional[str]: + with self.transaction: + self.s_port.send_cmd(b'AT+CCID') + response = self.s_port.read_until() + for line in response.splitlines(): + line = line.strip() + if line.startswith(b'+CCID:'): + return line.split(b':')[1].strip().decode('ascii') + if line.isdigit() and len(line) > 15: + return line.decode('ascii') + return None + + # --- Power & Radio --- + + def set_phone_functionality(self, fun: int = 1, rst: Optional[int] = None) -> bool: + """Controls radio functionality (AT+CFUN).""" + cmd = f'AT+CFUN={fun}'.encode() + if rst is not None: + cmd += f',{rst}'.encode() + with self.transaction: + self.s_port.send_cmd(cmd) + return self.at.read_ok(timeout=15) + + def radio_on(self) -> bool: + return self.set_phone_functionality(1) + + def radio_off(self) -> bool: + return self.set_phone_functionality(0) + + def is_radio_on(self) -> bool: + return self.at.is_on(b'AT+CFUN?', b'+CFUN:') + + # --- Network Registration --- + + def attach_network(self) -> bool: + """Attaches to packet domain (AT+CGATT).""" + with self.transaction: + self.s_port.send_cmd(b'AT+CGATT=1') + return self.at.read_ok(timeout=75) + + def detach_network(self) -> bool: + with self.transaction: + self.s_port.send_cmd(b'AT+CGATT=0') + return self.at.read_ok(timeout=40) + + def is_attached(self) -> bool: + return self.at.is_on(b'AT+CGATT?', b'+CGATT:') + + def _read_reg_status(self, cmd: bytes, prefix: bytes) -> int: + with self.transaction: + self.s_port.send_cmd(cmd) + response = self.s_port.read_until() + for line in response.splitlines(): + if line.startswith(prefix): + try: + parts = line.split(b',') + return int(parts[1].strip()) + except (IndexError, ValueError): + pass + return -1 + + def read_creg_status(self) -> int: + """Reads CS registration status (AT+CREG).""" + return self._read_reg_status(b'AT+CREG?', b'+CREG:') + + def read_cereg_status(self) -> int: + """Reads EPS registration status (AT+CEREG).""" + return self._read_reg_status(b'AT+CEREG?', b'+CEREG:') + + def is_registered(self) -> bool: + """Checks if registered (Home or Roaming) on CREG or CEREG.""" + creg = self.read_creg_status() + cereg = self.read_cereg_status() + return creg in (1, 5) or cereg in (1, 5) + + def get_current_operator(self) -> Optional[str]: + """Returns the currently selected operator and AcT (AT+COPS?).""" + with self.transaction: + self.s_port.send_cmd(b'AT+COPS?') + response = self.s_port.read_until() + for line in response.splitlines(): + if line.startswith(b'+COPS:'): + return line.decode('ascii', errors='ignore').split(':', 1)[1].strip() + return None + + def set_operator(self, plmn: str, format: int = 2, act: Optional[int] = None) -> bool: + """Forces operator selection (AT+COPS).""" + cmd = f'AT+COPS=1,{format},"{plmn}"' + if act is not None: + cmd += f',{act}' + with self.transaction: + self.s_port.send_cmd(cmd.encode()) + return self.at.read_ok(timeout=120) + + def get_signal_quality(self) -> Tuple[int, int]: + """Returns (rssi, ber) via AT+CSQ.""" + with self.transaction: + self.s_port.send_cmd(b'AT+CSQ') + response = self.s_port.read_until() + for line in response.splitlines(): + if line.startswith(b'+CSQ:'): + try: + parts = line.split(b':')[1].strip().split(b',') + return int(parts[0]), int(parts[1]) + except (IndexError, ValueError): + pass + return 99, 99 + + def get_extended_error_report(self) -> str: + """Retrieves extended error report (AT+CEER).""" + with self.transaction: + self.s_port.send_cmd(b'AT+CEER') + response = self.s_port.read_until() + for line in response.splitlines(): + if line.startswith(b'+CEER:'): + return line.decode('ascii', errors='ignore').strip() + return "" + + # --- PSM (Power Saving Mode) --- + + def set_psm_settings(self, mode: int = 1, tau: str = None, active_time: str = None): + cmd = f'AT+CPSMS={mode}'.encode() + if mode == 1 and tau and active_time: + cmd += f',,,"{tau}","{active_time}"'.encode() + with self.transaction: + self.s_port.send_cmd(cmd) + return self.at.read_ok() + + def read_psm_status(self) -> Optional[str]: + """Reads current PSM settings (AT+CPSMS?).""" + with self.transaction: + self.s_port.send_cmd(b'AT+CPSMS?') + response = self.s_port.read_until() + for line in response.splitlines(): + if line.startswith(b'+CPSMS:'): + return line.decode('ascii', errors='ignore').strip() + return None + + def disable_psm(self): + return self.set_psm_settings(0) + + def configure_psm_auto(self, active_time_s: int, tau_s: int) -> bool: + """Configures PSM by automatically encoding seconds into 3GPP bits.""" + from m2m.utils import encode_psm_timer + t3324 = encode_psm_timer(active_time_s, is_t3324=True) + t3412 = encode_psm_timer(tau_s, is_t3324=False) + return self.set_psm_settings(mode=1, tau=t3412, active_time=t3324) + + # --- SIM Diagnostics --- + + def unlock_sim(self, pin: str) -> bool: + """Unlocks SIM card (AT+CPIN).""" + with self.transaction: + self.s_port.send_cmd(b'AT+CPIN?') + if b'READY' in self.s_port.read_until(): + return True + self.s_port.send_cmd(f'AT+CPIN="{pin}"'.encode()) + return self.at.read_ok(timeout=5) + + def restricted_sim_access(self, command: int, file_id: int, p1: int = 0, p2: int = 0, p3: int = 0) -> Tuple[int, int, bytes]: + """Generic Restricted SIM Access (AT+CRSM).""" + cmd = f'AT+CRSM={command},{file_id},{p1},{p2},{p3}'.encode() + with self.transaction: + self.s_port.send_cmd(cmd) + response = self.s_port.read_until() + for line in response.splitlines(): + if line.startswith(b'+CRSM:'): + try: + parts = line.split(b',') + sw1 = int(parts[0].split(b':')[1].strip()) + sw2 = int(parts[1].strip()) + data = parts[2].strip(b' "') if len(parts) > 2 else b'' + return sw1, sw2, data + except (IndexError, ValueError): pass + return -1, -1, b'' + + def read_sim_file(self, file_id: int, length: int) -> Optional[bytes]: + """Reads raw bytes from a SIM Elementary File. Returns None on error.""" + import binascii + sw1, sw2, data_hex = self.restricted_sim_access(176, file_id, 0, 0, length) + if sw1 == 144: # 0x9000 Success + return binascii.unhexlify(data_hex) + return None + + # --- SIM Helpers --- + + def get_registered_plmn(self) -> Optional[str]: + from m2m.utils import decode_plmn + data = self.read_sim_file(self.EF_LOCI, 11) + if data and len(data) >= 7: + return decode_plmn(data[4:7]) + return None + + def get_mnc_length(self) -> int: + data = self.read_sim_file(self.EF_AD, 4) + if data and len(data) >= 4: + return data[3] & 0x0F + return 2 + + def get_home_plmn(self) -> Optional[str]: + imsi = self.read_imsi() + if imsi and len(imsi) >= 5: + mnc_len = self.get_mnc_length() + return f"{imsi[:3]}-{imsi[3:3+mnc_len]}" + return None + + def read_forbidden_plmns(self) -> List[str]: + from m2m.utils import decode_plmn_list + data = self.read_sim_file(self.EF_FPLMN, 12) + return decode_plmn_list(data) if data else [] + + def read_eplmns(self) -> List[str]: + from m2m.utils import decode_plmn_list + data = self.read_sim_file(self.EF_EPLMN, 15) + return decode_plmn_list(data) if data else [] + + def read_ehplmns(self) -> List[str]: + from m2m.utils import decode_plmn_list + data = self.read_sim_file(self.EF_EHPLMN, 15) + return decode_plmn_list(data) if data else [] + + def read_user_plmn_act(self) -> List[str]: + from m2m.utils import decode_plmn_list + data = self.read_sim_file(self.EF_PLMNwAcT, 40) + return decode_plmn_list(data, has_act=True) if data else [] + + def read_operator_plmn_act(self) -> List[str]: + from m2m.utils import decode_plmn_list + data = self.read_sim_file(self.EF_OPLMNwAcT, 40) + return decode_plmn_list(data, has_act=True) if data else [] + + def read_hplmn_act(self) -> List[str]: + from m2m.utils import decode_plmn_list + data = self.read_sim_file(self.EF_HPLMNwAcT, 10) + return decode_plmn_list(data, has_act=True) if data else [] + + def read_plmn_selector(self) -> List[str]: + from m2m.utils import decode_plmn_list + data = self.read_sim_file(self.EF_PLMNsel, 24) + return decode_plmn_list(data) if data else [] + + def read_cag_info(self) -> Optional[str]: + data = self.read_sim_file(self.EF_CAG, 10) + if data: + return data.hex() if any(b != 0xFF for b in data) else None + return None + + # --- Abstract / Contract Methods --- + + def configure_pdp_context(self, context_id: int, context_type: str, apn: str) -> bool: + raise NotImplementedError("Subclass must implement configure_pdp_context") + + def activate_pdp_context(self, context_id: int) -> bool: + raise NotImplementedError("Subclass must implement activate_pdp_context") + + def get_ip_address(self, cid: int = 1) -> Optional[str]: + with self.transaction: + self.s_port.send_cmd(f'AT+CGPADDR={cid}'.encode()) + response = self.s_port.read_until() + for line in response.splitlines(): + if line.startswith(b'+CGPADDR:'): + parts = line.split(b',') + if len(parts) > 1: + return parts[1].strip(b' "').decode('ascii') + return None + + def ping(self, host: str, context_id: int = 1, timeout: int = 20, num: int = 4) -> List[str]: + raise NotImplementedError("Ping not implemented for this module.") + + def dns_query(self, host: str, context_id: int = 1, timeout: int = 30) -> List[str]: + raise NotImplementedError("DNS query not implemented for this module.") + + # --- High Level Automation --- + + def connect_network(self, apn: str, pin: Optional[str] = None, timeout: int = 120, force: bool = False) -> bool: + if not force and self.is_registered(): + ip = self.get_ip_address(1) + if ip: + logger.info(f"Already connected! IP: {ip}") + return True + + logger.info(f"Starting network connection sequence (APN: {apn})...") + + if pin and not self.unlock_sim(pin): + logger.error("Failed to unlock SIM.") + return False + + if not self.is_radio_on(): + if not self.radio_on(): + logger.error("Failed to turn on radio.") + return False + + start_time = time.time() + while time.time() - start_time < timeout: + if self.is_registered(): + break + time.sleep(2) + else: + logger.info("Not registered yet. Triggering automatic selection (AT+COPS=0)...") + self.send_at_command(b'AT+COPS=0') + time.sleep(5) + if not self.is_registered(): + logger.error("Timed out waiting for network registration.") + + if not self.is_attached(): + self.attach_network() + + try: + if hasattr(self, 'is_pdp_active') and self.is_pdp_active(1): + logger.info("Deactivating existing PDP context to apply new settings...") + self.deactivate_pdp_context(1) + + if not self.configure_pdp_context(1, "IP", apn): + logger.warning("PDP Configuration failed or not supported.") + + if not self.activate_pdp_context(1): + logger.error("Failed to activate PDP context.") + return False + except NotImplementedError: + logger.warning("PDP context methods not implemented in this module.") + + ip = self.get_ip_address(1) + if ip: + logger.info(f"Connected! IP: {ip}") + return True + + return False + + def poll(self, timeout: float = 1.0) -> bytes: + return self.s_port.read_any(timeout=timeout) + + # --- Standard SMS --- + + def set_sms_format(self, text_mode: bool = True) -> bool: + mode = 1 if text_mode else 0 + with self.transaction: + self.s_port.send_cmd(f'AT+CMGF={mode}'.encode()) + return self.at.read_ok() + + def send_sms(self, phone_number: str, message: str) -> bool: + """Sends an SMS message (AT+CMGS).""" + with self.transaction: + # 1. Ensure Text Mode + if not self.set_sms_format(True): + logger.error("SMS: Failed to set text mode") + return False + + # 2. Clear any stale data + self.s_port.read_any(timeout=0.1) + + # 3. Send command + self.s_port.send_cmd(f'AT+CMGS="{phone_number}"'.encode()) + + # 4. Wait for prompt '> ' + prompt = self.s_port.read_until(terminator=b'>', timeout=10) + if b'>' not in prompt: + logger.error(f"SMS: Did not receive prompt '>', got: {prompt!r}") + return False + + # 5. Small settling delay for the modem to be ready for the body + time.sleep(0.2) + + # 6. Send message and Ctrl+Z + try: + self.s_port._serial.write(message.encode('ascii')) + self.s_port._serial.write(b'\x1A') + except Exception as e: + logger.error(f"SMS: Write failed: {e}") + return False + + # 7. Wait for result + resp = self.s_port.read_until(terminator=b'OK', timeout=60) + if b'OK' in resp: + return True + + logger.error(f"SMS: Failed to get OK response, got: {resp!r}") + return False + + def list_sms(self, stat: str = "ALL") -> List[str]: + with self.transaction: + self.s_port.send_cmd(f'AT+CMGL="{stat}"'.encode()) + return self.s_port.read_until().decode('ascii', errors='ignore').splitlines() + + def delete_sms(self, index: int, delflag: int = 0) -> bool: + with self.transaction: + self.s_port.send_cmd(f'AT+CMGD={index},{delflag}'.encode()) + return self.at.read_ok() + + def get_battery_status(self) -> Tuple[int, int, int]: + with self.transaction: + self.s_port.send_cmd(b'AT+CBC') + response = self.s_port.read_until() + for line in response.splitlines(): + if line.startswith(b'+CBC:'): + try: + p = [int(x) for x in line.split(b':')[1].split(b',')] + return p[0], p[1], p[2] + except (IndexError, ValueError): pass + return 0, 0, 0 + + def get_clock(self) -> Optional[str]: + with self.transaction: + self.s_port.send_cmd(b'AT+CCLK?') + response = self.s_port.read_until() + for line in response.splitlines(): + if line.startswith(b'+CCLK:'): + return line.split(b':')[1].strip(b' "').decode('ascii') + return None + + def set_clock(self, time_str: str) -> bool: + with self.transaction: + self.s_port.send_cmd(f'AT+CCLK="{time_str}"'.encode()) + return self.at.read_ok() diff --git a/m2m/nbiot/module_bc66.py b/m2m/nbiot/module_bc66.py new file mode 100644 index 0000000..f295271 --- /dev/null +++ b/m2m/nbiot/module_bc66.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import logging +from typing import List, Optional +from m2m.nbiot.quectel import QuectelModule + +logger = logging.getLogger('m2m.bc66') + +class ModuleBC66(QuectelModule): + """ + Implementation for Quectel BC66 NB-IoT modules. + """ + + def __init__(self, serial_port: str, baudrate: int = 9600, **kwargs): + super().__init__(serial_port, baudrate, **kwargs) + self._setup() + + def _setup(self): + self.set_echo_mode(False) + # Disable sleep lock and initialize network URCs + self.send_at_command('AT+QSCLK=0') + self.send_at_command('AT+CEREG=0') + + def reboot(self) -> bool: + """Reboots the module (AT+QRST=1).""" + return self.send_at_command('AT+QRST=1') == b'OK' + + def lwm2m_configure(self, server_ip: str, port: int = 5683, endpoint: str = "", + bootstrap: bool = False, security_mode: int = 3, + psk_id: str = "", psk: str = "") -> bool: + """Configures LwM2M settings (AT+QLWCONFIG).""" + bs = 1 if bootstrap else 0 + cmd = f'AT+QLWCONFIG={bs},"{server_ip}",{port},"{endpoint}",60,{security_mode}' + if security_mode != 3 and psk_id and psk: + cmd += f',"{psk_id}","{psk}"' + return self.send_at_command(cmd) == b'OK' + + def lwm2m_register(self, timeout: int = 60) -> bool: + """Registers to LwM2M server (AT+QLWREG).""" + resp = self.send_at_command('AT+QLWREG', timeout=timeout) + return b'QLWREG: 0' in resp or b'OK' in resp + + def ping(self, host: str, context_id: int = 1, timeout: int = 4, num: int = 4) -> List[str]: + raise NotImplementedError("BC66 Ping parsing not fully standardized.") + + def dns_query(self, host: str, context_id: int = 1, timeout: int = 30) -> List[str]: + raise NotImplementedError("BC66 DNS query not fully standardized.") diff --git a/m2m/nbiot/module_bg95.py b/m2m/nbiot/module_bg95.py new file mode 100644 index 0000000..014e180 --- /dev/null +++ b/m2m/nbiot/module_bg95.py @@ -0,0 +1,27 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import logging +from m2m.nbiot.quectel import QuectelModule + +logger = logging.getLogger('m2m.bg95') + +class ModuleBG95(QuectelModule): + """ + Implementation for Quectel BG95 series modules. + """ + + def __init__(self, serial_port: str, baudrate: int = 115200, **kwargs): + super().__init__(serial_port, baudrate, **kwargs) + self._setup() + + def _setup(self): + self.set_echo_mode(False) + # Enable Connection Status URC + self.send_at_command('AT+QCSCON=1') + + def set_nb2_mode(self, enable: bool = True) -> bool: + """Enables LTE Cat NB2 support (Release 14).""" + # This typically involves specific band configuration or scan priority + # which is already covered by high-level 'mode' and 'bands' commands. + return True diff --git a/m2m/nbiot/module_bg96.py b/m2m/nbiot/module_bg96.py new file mode 100644 index 0000000..b73cad3 --- /dev/null +++ b/m2m/nbiot/module_bg96.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import logging +from m2m.nbiot.quectel import QuectelModule + +logger = logging.getLogger('m2m.bg96') + +class ModuleBG96(QuectelModule): + """ + Implementation for Quectel BG96 series modules. + """ + + def __init__(self, serial_port: str, baudrate: int = 115200, **kwargs): + super().__init__(serial_port, baudrate, **kwargs) + self._setup() + + def _setup(self): + self.set_echo_mode(False) + # Enable Connection Status URC + self.send_at_command('AT+QCSCON=1') diff --git a/m2m/nbiot/module_hisi.py b/m2m/nbiot/module_hisi.py new file mode 100644 index 0000000..1a41517 --- /dev/null +++ b/m2m/nbiot/module_hisi.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import logging +import binascii +from typing import Optional, List +from m2m.nbiot.module import ModuleBase + +logger = logging.getLogger('m2m.hisi') + +class ModuleHiSi(ModuleBase): + """ + Implementation for HiSilicon Boudica based modules (e.g. BC95). + """ + + def __init__(self, serial_port: str, baudrate: int = 9600, **kwargs): + super().__init__(serial_port, baudrate, **kwargs) + self._setup() + + def _setup(self): + self.reboot() + + def reboot(self) -> bool: + """Performs a software reboot via AT+NRB.""" + with self.transaction: + self.s_port.send_cmd(b'AT+NRB') + return self.at.read_ok(timeout=15) + + def set_nconfig(self, key: str, value: str) -> bool: + """Sets configuration via AT+NCONFIG.""" + return self.send_at_command(f'AT+NCONFIG="{key}","{value}"') == b'OK' + + def configure_pdp_context(self, context_id: int, context_type: str, apn: str) -> bool: + """Configures PDP context using AT+CGDCONT.""" + return self.send_at_command(f'AT+CGDCONT={context_id},"{context_type}","{apn}"') == b'OK' + + def activate_pdp_context(self, context_id: int) -> bool: + """Activates PDP context via AT+CGACT.""" + if self.send_at_command(f'AT+CGACT=1,{context_id}') == b'OK': + return True + return bool(self.get_ip_address(context_id)) + + def open_socket(self, local_port: int = 0, protocol: str = "UDP") -> int: + """Opens a socket via AT+NSOCR.""" + proto_val = 17 if protocol.upper() == "UDP" else 6 + resp = self.send_at_command(f'AT+NSOCR=DGRAM,{proto_val},{local_port}') + for line in resp.splitlines(): + line = line.strip() + if line.isdigit(): return int(line) + return -1 + + def send_data_hex(self, socket_id: int, data: bytes, remote_ip: str = "", remote_port: int = 0) -> bool: + """Sends data (UDP) via AT+NSOST.""" + if not remote_ip or not remote_port: return False + hex_data = binascii.hexlify(data).decode('ascii') + cmd = f'AT+NSOST={socket_id},{remote_ip},{remote_port},{len(data)},{hex_data}' + return self.send_at_command(cmd) == b'OK' + + def close_socket(self, socket_id: int) -> bool: + """Closes socket via AT+NSOCL.""" + return self.send_at_command(f'AT+NSOCL={socket_id}') == b'OK' + + def receive_data(self, socket_id: int, length: int = 512) -> bytes: + """Reads data via AT+NSORF.""" + resp = self.send_at_command(f'AT+NSORF={socket_id},{length}') + for line in resp.splitlines(): + if b',' in line: + p = line.split(b',') + if len(p) >= 5 and p[0].strip() == str(socket_id).encode(): + try: return binascii.unhexlify(p[4]) + except: pass + return b'' diff --git a/m2m/nbiot/quectel.py b/m2m/nbiot/quectel.py new file mode 100644 index 0000000..2df45e9 --- /dev/null +++ b/m2m/nbiot/quectel.py @@ -0,0 +1,518 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import logging +import binascii +import time +import re +from typing import Optional, Union, Tuple, List, Dict + +from m2m.nbiot.module import ModuleBase + +logger = logging.getLogger('m2m.quectel') + +class QuectelModule(ModuleBase): + """ + Base class for Quectel modules implementing shared AT commands. + """ + + def __init__(self, serial_port: str, baudrate: int = 115200, **kwargs): + super().__init__(serial_port, baudrate, **kwargs) + self._setup_urcs() + + def _setup_urcs(self): + # Socket events + self.s_port.register_urc(b'+QIURC: "pdpdeact",', True, lambda u: logger.info(f"PDP Deactivated: {u.decode()}")) + self.s_port.register_urc(b'+QIURC: "recv",', True, lambda u: logger.debug(f"Data Received: {u.decode()}")) + self.s_port.register_urc(b'+QIOPEN:', True, lambda u: logger.debug(f"Socket Open: {u.decode()}")) + + # Service URCs + self.s_port.register_urc(b'+QMTRECV:', True, lambda u: logger.info(f"MQTT Message: {u.decode()}")) + self.s_port.register_urc(b'+CUSD:', True, lambda u: logger.info(f"USSD Response: {u.decode()}")) + self.s_port.register_urc(b'+QPING:', True) + self.s_port.register_urc(b'+QIURC: "dnsgip",', True) + self.s_port.register_urc(b'+QHTTPGET:', True) + self.s_port.register_urc(b'+QHTTPPOST:', True) + self.s_port.register_urc(b'+QHTTPREAD:', True) + self.s_port.register_urc(b'+QNTP:', True) + + # Extended URCs + self.s_port.register_urc(b'+QIND: "nipd",', True, lambda u: logger.info(f"NIDD: {u.decode()}")) + self.s_port.register_urc(b'+QIND: "GEOFENCE",', True, lambda u: logger.info(f"Geofence: {u.decode()}")) + + # --- Extended Configuration (AT+QCFG) --- + + def set_qcfg(self, parameter: str, value: Union[int, str, List[int]], effect_immediately: bool = False) -> bool: + val_str = ",".join(map(str, value)) if isinstance(value, list) else str(value) + cmd = f'AT+QCFG="{parameter}",{val_str}' + if effect_immediately: + cmd += ",1" + + with self.transaction: + self.s_port.send_cmd(cmd.encode()) + return self.at.read_ok(timeout=10) + + def get_qcfg(self, parameter: str) -> Optional[str]: + with self.transaction: + self.s_port.send_cmd(f'AT+QCFG="{parameter}"'.encode()) + response = self.s_port.read_until() + for line in response.splitlines(): + if b'+QCFG:' in line: + return line.decode('ascii', errors='ignore') + return None + + def set_iot_op_mode(self, mode: str = "NB-IoT") -> bool: + modes = {"eMTC": 0, "NB-IoT": 1, "Both": 2} + return self.set_qcfg("iotopmode", modes.get(mode, 1), True) + + def set_nw_scan_priority(self, first: str = "NB-IoT", second: str = "eMTC", third: str = "GSM") -> bool: + seq_map = {"GSM": "01", "eMTC": "02", "NB-IoT": "03"} + try: + val = seq_map[first] + seq_map[second] + seq_map[third] + return self.set_qcfg("nwscanseq", val, True) + except KeyError: return False + + def lock_bands(self, gsm_bands: List[int] = [], catm_bands: List[int] = [], nb_bands: List[int] = []) -> bool: + from m2m.utils import bands_to_hex_mask + val_gsm = bands_to_hex_mask(gsm_bands) if gsm_bands else "0" + val_catm = bands_to_hex_mask(catm_bands) if catm_bands else "0" + val_nb = bands_to_hex_mask(nb_bands) if nb_bands else "0" + return self.set_qcfg("band", [val_gsm, val_catm, val_nb], True) + + def get_locked_bands(self) -> Dict[str, str]: + resp = self.get_qcfg("band") + if resp and '+QCFG: "band",' in resp: + parts = [p.strip(' "') for p in resp.split(',')] + if len(parts) >= 4: + return {'gsm': parts[1], 'catm': parts[2], 'nb': parts[3]} + return {} + + def get_nw_scan_mode(self) -> int: + resp = self.get_qcfg("nwscanmode") + if resp and '+QCFG: "nwscanmode",' in resp: + try: return int(resp.split(',')[-1].strip()) + except: pass + return -1 + + def set_nw_scan_mode(self, mode: int = 0) -> bool: + """ + Sets network scan mode (AT+QCFG="nwscanmode"). + 0: Auto (GSM + LTE), 1: GSM Only, 3: LTE Only + """ + return self.set_qcfg("nwscanmode", mode, True) + + def set_connectivity_mode(self, mode: str) -> bool: + """ + High-level helper to set connectivity preference. + Modes: 'gsm', 'emtc', 'nb', 'nb+emtc', 'all' + """ + mode = mode.lower() + res = False + if mode == 'gsm': + res = self.set_nw_scan_mode(1) + elif mode == 'emtc': + # LTE Only (3) + eMTC category (0) + r1 = self.set_nw_scan_mode(3) + r2 = self.set_iot_op_mode("eMTC") + res = r1 and r2 + elif mode == 'nb': + # LTE Only (3) + NB-IoT category (1) + r1 = self.set_nw_scan_mode(3) + r2 = self.set_iot_op_mode("NB-IoT") + res = r1 and r2 + elif mode == 'nb+emtc': + # LTE Only (3) + Both categories (2) + r1 = self.set_nw_scan_mode(3) + r2 = self.set_iot_op_mode("Both") + res = r1 and r2 + elif mode == 'all': + # Auto (0) + Both categories (2) + Priority Sequence + r1 = self.set_nw_scan_mode(0) + r2 = self.set_iot_op_mode("Both") + r3 = self.set_nw_scan_priority("eMTC", "NB-IoT", "GSM") + res = r1 and r2 and r3 + + return res + + def get_iot_op_mode_val(self) -> int: + resp = self.get_qcfg("iotopmode") + if resp and '+QCFG: "iotopmode",' in resp: + try: return int(resp.split(',')[-1].strip()) + except: pass + return -1 + + # --- PDP & Sockets --- + + def configure_pdp_context(self, context_id: int = 1, context_type: str = "IP", apn: str = "") -> bool: + type_map = {"IP": 1, "IPV6": 2, "IPV4V6": 3} + t_val = type_map.get(context_type.upper(), 1) + with self.transaction: + self.s_port.send_cmd(f'AT+QICSGP={context_id},{t_val},"{apn}"'.encode()) + return self.at.read_ok() + + def activate_pdp_context(self, context_id: int = 1) -> bool: + with self.transaction: + self.s_port.send_cmd(f'AT+QIACT={context_id}'.encode()) + return self.at.read_ok(timeout=150) + + def deactivate_pdp_context(self, context_id: int = 1) -> bool: + with self.transaction: + self.s_port.send_cmd(f'AT+QIDEACT={context_id}'.encode()) + return self.at.read_ok(timeout=40) + + def is_pdp_active(self, context_id: int = 1) -> bool: + with self.transaction: + self.s_port.send_cmd(b'AT+QIACT?') + response = self.s_port.read_until() + for line in response.splitlines(): + if line.startswith(b'+QIACT:'): + try: + parts = line.split(b',') + cid = int(parts[0].split(b':')[1].strip()) + state = int(parts[1]) + if cid == context_id: return state == 1 + except: pass + return False + + def open_socket(self, context_id: int = 1, connect_id: int = 0, service_type: str = "TCP", + remote_ip: str = "", remote_port: int = 0, access_mode: int = 0, local_port: int = 0) -> int: + cmd = f'AT+QIOPEN={context_id},{connect_id},"{service_type}","{remote_ip}",{remote_port},{local_port},{access_mode}' + with self.transaction: + self.s_port.send_cmd(cmd.encode()) + if not self.at.read_ok(): return -1 + response = self.s_port.read_until_notify(b'+QIOPEN:', timeout=150) + + if response: + try: + parts = response.split(b',') + return int(parts[1]) if int(parts[1]) == 0 else -int(parts[1]) + except: pass + return -1 + + def close_socket(self, connect_id: int = 0) -> bool: + with self.transaction: + self.s_port.send_cmd(f'AT+QICLOSE={connect_id}'.encode()) + return self.at.read_ok() + + def get_socket_state(self, connect_id: Optional[int] = None) -> List[Dict[str, str]]: + cmd = b'AT+QISTATE' + if connect_id is not None: + cmd = f'AT+QISTATE=1,{connect_id}'.encode() + with self.transaction: + self.s_port.send_cmd(cmd) + response = self.s_port.read_until() + + sockets = [] + for line in response.splitlines(): + if line.startswith(b'+QISTATE:'): + line_str = line.decode('ascii', errors='ignore') + _, rest = line_str.split(':', 1) + p = [x.strip(' "') for x in rest.split(',')] + if len(p) >= 6: + sockets.append({ + 'id': p[0], 'type': p[1], 'remote': f"{p[2]}:{p[3]}", + 'local_port': p[4], 'state': p[5] + }) + return sockets + + def send_data_hex(self, connect_id: int, data: bytes) -> bool: + hex_data = binascii.hexlify(data).decode('ascii') + with self.transaction: + self.s_port.send_cmd(f'AT+QISENDEX={connect_id},"{hex_data}"'.encode()) + response = self.s_port.read_until() + return b'SEND OK' in response + + def receive_data(self, connect_id: int, read_length: int = 1500) -> bytes: + with self.transaction: + self.s_port.send_cmd(f'AT+QIRD={connect_id},{read_length}'.encode()) + response = self.s_port.read_until() + + data = b'' + reading = False + for line in response.splitlines(): + if line.startswith(b'+QIRD:'): + try: + if int(line.split(b':')[1].strip()) == 0: return b'' + reading = True; continue + except: pass + if reading: + if line.strip() == b'OK': break + data += line + return data + + # --- Network Services --- + + def ping(self, host: str, context_id: int = 1, timeout: int = 20, num: int = 4) -> List[str]: + with self.transaction: + self.s_port.send_cmd(f'AT+QPING={context_id},"{host}",{timeout},{num}'.encode()) + if not self.at.read_ok(): return [] + + responses = [] + end_time = time.time() + (timeout * num) + 10 + while time.time() < end_time and len(responses) < num + 1: + urc = self.s_port.read_until_notify(b'+QPING:', timeout=timeout + 2) + if not urc: break + line = urc.decode('ascii', errors='ignore').strip() + responses.append(line) + if line.count(',') >= 5: break + return responses + + def parse_ping_summary(self, lines: List[str]) -> Dict[str, Union[int, str]]: + if not lines: return {} + parts = lines[-1].split(',') + if len(parts) >= 7 and parts[0] == '0': + try: + return { + 'sent': int(parts[1]), 'rcvd': int(parts[2]), 'lost': int(parts[3]), + 'min': int(parts[4]), 'max': int(parts[5]), 'avg': int(parts[6]) + } + except: pass + return {} + + def dns_query(self, host: str, context_id: int = 1, timeout: int = 30) -> List[str]: + with self.transaction: + self.s_port.send_cmd(f'AT+QIDNSGIP={context_id},"{host}"'.encode()) + if not self.at.read_ok(): return [] + + ips = [] + urc = self.s_port.read_until_notify(b'+QIURC: "dnsgip",', timeout=timeout) + if urc and urc.startswith(b'0'): + try: + count = int(urc.split(b',')[1]) + for _ in range(count): + ip_urc = self.s_port.read_until_notify(b'+QIURC: "dnsgip",', timeout=5) + if ip_urc: ips.append(ip_urc.strip(b' "').decode()) + except: pass + return ips + + def search_operators(self, timeout: int = 180) -> List[Dict[str, str]]: + with self.transaction: + self.s_port.send_cmd(b'AT+COPS=?') + response = self.s_port.read_until(b'OK', timeout=timeout) + + operators = [] + for line in response.splitlines(): + if line.startswith(b'+COPS:'): + matches = re.findall(r'\((\d+),"([^"]*)","([^"]*)","([^"]*)",(\d+)\)', line.decode()) + for m in matches: + operators.append({'status': m[0], 'long': m[1], 'short': m[2], 'mccmnc': m[3], 'act': m[4]}) + return operators + + # --- HTTP(S) --- + + def http_configure(self, context_id: int = 1, response_header: bool = False) -> bool: + with self.transaction: + self.s_port.send_cmd(f'AT+QHTTPCFG="contextid",{context_id}'.encode()) + if not self.at.read_ok(): return False + self.s_port.send_cmd(f'AT+QHTTPCFG="responseheader",{1 if response_header else 0}'.encode()) + return self.at.read_ok() + + def http_set_url(self, url: str, timeout: int = 60) -> bool: + url_bytes = url.encode() + with self.transaction: + self.s_port.send_cmd(f'AT+QHTTPURL={len(url_bytes)},{timeout}'.encode()) + if b'CONNECT' in self.s_port.read_until(b'CONNECT', timeout=5): + self.s_port._serial.write(url_bytes) + return self.at.read_ok() + return False + + def http_get(self, timeout: int = 60) -> bool: + with self.transaction: + self.s_port.send_cmd(f'AT+QHTTPGET={timeout}'.encode()) + if not self.at.read_ok(timeout=5): return False + urc = self.s_port.read_until_notify(b'+QHTTPGET:', timeout=timeout + 5) + return urc and urc.startswith(b'0') + + def http_post(self, content: bytes, timeout: int = 60) -> bool: + with self.transaction: + self.s_port.send_cmd(f'AT+QHTTPPOST={len(content)},{timeout},{timeout}'.encode()) + if b'CONNECT' not in self.s_port.read_until(b'CONNECT', timeout=5): return False + self.s_port._serial.write(content) + if not self.at.read_ok(): return False + urc = self.s_port.read_until_notify(b'+QHTTPPOST:', timeout=timeout + 5) + return urc and urc.startswith(b'0') + + def http_read_response(self, wait_time: int = 60) -> str: + with self.transaction: + self.s_port.send_cmd(f'AT+QHTTPREAD={wait_time}'.encode()) + if b'CONNECT' not in self.s_port.read_until(b'CONNECT', timeout=5): return "" + + buffer = bytearray() + end_time = time.time() + wait_time + while time.time() < end_time: + line = self.s_port._read_line() + if not line: continue + if line.strip() == b'OK': break + buffer.extend(line) + return buffer.decode('ascii', errors='ignore') + + # --- Advanced Features --- + + def configure_psm_optimization(self, enter_immediately: bool = True, enable_urc: bool = True) -> bool: + r1 = self.set_qcfg("psm/enter", 1 if enter_immediately else 0) + r2 = self.set_qcfg("psm/urc", 1 if enable_urc else 0) + return r1 and r2 + + def set_gpio(self, mode: int, pin: int, val: int = 0, save: int = 0) -> bool: + with self.transaction: + self.s_port.send_cmd(f'AT+QCFG="gpio",{mode},{pin},{val},{save}'.encode()) + return self.at.read_ok() + + def get_gpio(self, pin: int) -> int: + with self.transaction: + self.s_port.send_cmd(f'AT+QCFG="gpio",2,{pin}'.encode()) + response = self.s_port.read_until() + for line in response.splitlines(): + if b'+QCFG: "gpio"' in line: + try: return int(line.split(b',')[-1].strip()) + except: pass + return -1 + + def list_files(self, pattern: str = "*") -> List[Dict[str, Union[str, int]]]: + with self.transaction: + self.s_port.send_cmd(f'AT+QFLST="{pattern}"'.encode()) + response = self.s_port.read_until() + files = [] + for line in response.splitlines(): + if line.startswith(b'+QFLST:'): + try: + p = line.decode('ascii', errors='ignore').split(':', 1)[1].strip().split(',') + files.append({'name': p[0].strip(' "'), 'size': int(p[1])}) + except: pass + return files + + def power_off(self, mode: int = 1) -> bool: + with self.transaction: + self.s_port.send_cmd(f'AT+QPOWD={mode}'.encode()) + return self.at.read_ok(timeout=10) + + def get_temperature(self) -> float: + with self.transaction: + self.s_port.send_cmd(b'AT+QTEMP') + response = self.s_port.read_until() + for line in response.splitlines(): + if line.startswith(b'+QTEMP:'): + try: + parts = line.split(b':')[1].strip().split(b',') + return max([float(p) for p in parts]) if parts else 0.0 + except: pass + return 0.0 + + def upload_file(self, filename: str, content: bytes, storage: str = "RAM") -> bool: + """Uploads a file to module storage (AT+QFUPL).""" + with self.transaction: + self.s_port.send_cmd(f'AT+QFUPL="{storage}:{filename}",{len(content)}'.encode()) + resp = self.s_port.read_until(b'CONNECT', timeout=5) + if b'CONNECT' not in resp: return False + self.s_port._serial.write(content) + # After write, modem sends OK + return self.at.read_ok(timeout=10) + + def delete_file(self, filename: str, storage: str = "RAM") -> bool: + """Deletes a file from module storage (AT+QFDEL).""" + with self.transaction: + self.s_port.send_cmd(f'AT+QFDEL="{storage}:{filename}"'.encode()) + return self.at.read_ok() + + def ntp_sync(self, server: str = "pool.ntp.org", port: int = 123, context_id: int = 1) -> bool: + with self.transaction: + self.s_port.send_cmd(f'AT+QNTP={context_id},"{server}",{port}'.encode()) + if not self.at.read_ok(): return False + urc = self.s_port.read_until_notify(b'+QNTP:', timeout=75) + return urc and urc.startswith(b'0') + + def set_edrx(self, mode: int = 1, act_type: int = 5, value: str = "0010") -> bool: + """Sets eDRX parameters (AT+CEDRXS).""" + cmd = f'AT+CEDRXS={mode},{act_type},"{value}"'.encode() + with self.transaction: + self.s_port.send_cmd(cmd) + return self.at.read_ok() + + def get_neighbor_cells(self) -> List[Dict[str, Union[str, int]]]: + with self.transaction: + self.s_port.send_cmd(b'AT+QENG="neighbourcell"') + response = self.s_port.read_until() + neighbors = [] + for line in response.splitlines(): + if b'+QENG: "neighbourcell' in line: + try: + p = [x.strip(' "') for x in line.decode('ascii', errors='ignore').split(',')] + if len(p) > 5: + neighbors.append({'tech': p[1], 'earfcn': p[2], 'pci': p[3], 'rsrq': p[4], 'rsrp': p[5]}) + except: pass + return neighbors + + def get_serving_cell_info(self) -> Dict[str, Union[str, int]]: + with self.transaction: + self.s_port.send_cmd(b'AT+QENG="servingcell"') + response = self.s_port.read_until() + res = {} + for line in response.splitlines(): + if line.startswith(b'+QENG:'): + try: + p = [x.strip(' "') for x in line.decode('ascii', errors='ignore').split(',')] + if len(p) > 2: + res['tech'] = p[2] + if p[2] in ("LTE", "eMTC"): + res.update({'mcc-mnc': f"{p[4]}-{p[5]}", 'cellid': p[6], 'rsrp': int(p[13]), 'rsrq': int(p[14])}) + elif p[2] == "NB-IoT": + res.update({'mcc-mnc': f"{p[4]}-{p[5]}", 'cellid': p[6], 'rsrp': int(p[10]), 'rsrq': int(p[11])}) + except: pass + return res + + def get_network_info(self) -> Optional[str]: + with self.transaction: + self.s_port.send_cmd(b'AT+QNWINFO') + response = self.s_port.read_until() + for line in response.splitlines(): + if line.startswith(b'+QNWINFO:'): + return line.decode('ascii', errors='ignore').strip().split(':', 1)[1].strip() + return None + + # --- Extended Configuration (AT+QCFGEXT) --- + + def set_qcfg_ext(self, parameter: str, value: str) -> bool: + cmd = f'AT+QCFGEXT="{parameter}",{value}' + with self.transaction: + self.s_port.send_cmd(cmd.encode()) + return self.at.read_ok() + + def nidd_configure(self, apn: str, account: str = "", pwd: str = "") -> bool: + return self.set_qcfg_ext("nipdcfg", f'"{apn}","{account}","{pwd}"') + + def nidd_open(self, enable: bool = True) -> bool: + return self.set_qcfg_ext("nipd", "1" if enable else "0") + + def nidd_send(self, data: bytes) -> bool: + hex_data = binascii.hexlify(data).decode('ascii') + return self.set_qcfg_ext("nipds", f'"{hex_data}"') + + def nidd_receive(self) -> bytes: + with self.transaction: + self.s_port.send_cmd(b'AT+QCFGEXT="nipdr"') + response = self.s_port.read_until() + for line in response.splitlines(): + if b'+QCFGEXT: "nipdr",' in line: + try: + return binascii.unhexlify(line.split(b',')[-1].strip(b' "')) + except: pass + return b'' + + def add_geofence(self, id: int, shape: int, coords: List[float], radius: int = 0) -> bool: + val = f"{id},{shape}," + ",".join(map(str, coords)) + if shape == 0: val += f",{radius}" + return self.set_qcfg_ext("addgeo", val) + + def delete_geofence(self, id: int) -> bool: + return self.set_qcfg_ext("deletegeo", str(id)) + + def query_geofence(self, id: int) -> Optional[str]: + with self.transaction: + self.s_port.send_cmd(f'AT+QCFGEXT="querygeo",{id}'.encode()) + return self.s_port.read_until().decode('ascii', errors='ignore') + + def set_usb_power(self, enable: bool = True) -> bool: + return self.set_qcfg_ext("disusb", "0" if enable else "1") + + def set_pwm(self, pin: int, freq: int, duty: int) -> bool: + return self.set_qcfg_ext("pwm", f"{pin},{freq},{duty}") diff --git a/m2m/serial/__init__.py b/m2m/serial/__init__.py new file mode 100644 index 0000000..14107ce --- /dev/null +++ b/m2m/serial/__init__.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from m2m.serial.serial_port import SerialPort + +__all__ = ['SerialPort'] diff --git a/m2m/serial/mock_serial.py b/m2m/serial/mock_serial.py new file mode 100644 index 0000000..1f0b875 --- /dev/null +++ b/m2m/serial/mock_serial.py @@ -0,0 +1,222 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Mock serial port for testing and development without hardware. +""" + +import logging +import time +from typing import Dict, Optional, Callable, List + +logger = logging.getLogger('mock_serial') + +class MockSerial: + """ + Simulates a serial port behavior with internal state management. + """ + + def __init__(self, port: str = "MOCK", baudrate: int = 9600, timeout: float = 1.0, **kwargs): + self.port = port + self.baudrate = baudrate + self.timeout = timeout + self.is_open = True + self._rx_buffer = b"" + self._tx_buffer = b"" + self._raw_mode = False + + # Internal State + self.state = { + "radio_on": False, + "sim_status": "READY", + "attached": False, + "registered": False, + "pdp_active": False, + "sockets": {}, + "files": {}, + "mqtt_session": False + } + + # Default responses for query/read commands + self._query_responses: Dict[bytes, bytes] = { + b'AT': b'OK\r\n', + b'ATE0': b'OK\r\n', + b'ATE1': b'OK\r\n', + b'AT+CGMR': b'Mock_Modem_v2.0\r\nOK\r\n', + b'AT+CGSN': b'864215030000000\r\nOK\r\n', + b'AT+CIMI': b'460001234567890\r\nOK\r\n', + b'AT+CCID': b'+CCID: 89860012345678901234\r\nOK\r\n', + b'AT+CSQ': b'+CSQ: 25,99\r\nOK\r\n', + b'AT+QNWINFO': b'+QNWINFO: "FDD LTE","46000","LTE BAND 3",1300\r\nOK\r\n', + b'AT+CEER': b'+CEER: No Error\r\nOK\r\n', + b'AT+CBC': b'+CBC: 0,85,3900\r\nOK\r\n', + b'AT+CCLK?': b'+CCLK: "23/01/01,12:00:00+32"\r\nOK\r\n', + b'AT+QTEMP': b'+QTEMP: 35\r\nOK\r\n', + } + + # Command Handlers + self._handlers: Dict[bytes, Callable[[bytes], bytes]] = {} + self._register_default_handlers() + + def _register_default_handlers(self): + self.add_handler(b'AT+CFUN=', self._handle_cfun) + self.add_handler(b'AT+CFUN?', self._handle_cfun_query) + self.add_handler(b'AT+CPIN?', self._handle_cpin_query) + self.add_handler(b'AT+CPIN=', self._handle_cpin_set) + self.add_handler(b'AT+CRSM=', self._handle_crsm) + self.add_handler(b'AT+CREG?', self._handle_creg) + self.add_handler(b'AT+CEREG?', self._handle_cereg) + self.add_handler(b'AT+CGATT=', self._handle_cgatt) + self.add_handler(b'AT+CGATT?', self._handle_cgatt_query) + self.add_handler(b'AT+CGPADDR', self._handle_cgpaddr) + self.add_handler(b'AT+QENG="servingcell"', self._handle_qeng) + self.add_handler(b'AT+QIACT=', self._handle_qiact) + self.add_handler(b'AT+QIOPEN=', self._handle_qiopen) + self.add_handler(b'AT+QICLOSE=', self._handle_qiclose) + self.add_handler(b'AT+QISENDEX=', self._handle_qisendex) + self.add_handler(b'AT+QIRD=', self._handle_qird) + self.add_handler(b'AT+QFUPL=', self._handle_qfupl) + self.add_handler(b'AT+QFDEL=', self._handle_qfdel) + self.add_handler(b'AT+QICSGP=', self._handle_qicsgp) + + def _handle_cfun(self, cmd: bytes) -> bytes: + val = cmd.split(b'=')[1].strip() + if val == b'1': + self.state["radio_on"] = True + self.state["registered"] = True + elif val == b'0': + self.state["radio_on"] = False + self.state["registered"] = False + self.state["attached"] = False + return b'OK\r\n' + + def _handle_cfun_query(self, cmd: bytes) -> bytes: + val = 1 if self.state["radio_on"] else 0 + return f'+CFUN: {val}\r\nOK\r\n'.encode() + + def _handle_cpin_query(self, cmd: bytes) -> bytes: + return f'+CPIN: {self.state["sim_status"]}\r\nOK\r\n'.encode() + + def _handle_cpin_set(self, cmd: bytes) -> bytes: + self.state["sim_status"] = "READY" + return b'OK\r\n' + + def _handle_creg(self, cmd: bytes) -> bytes: + stat = 1 if self.state["registered"] else 0 + return f'+CREG: 0,{stat}\r\nOK\r\n'.encode() + + def _handle_cereg(self, cmd: bytes) -> bytes: + stat = 1 if self.state["registered"] else 0 + return f'+CEREG: 0,{stat}\r\nOK\r\n'.encode() + + def _handle_cgatt(self, cmd: bytes) -> bytes: + val = cmd.split(b'=')[1].strip() + self.state["attached"] = (val == b'1') + return b'OK\r\n' + + def _handle_cgatt_query(self, cmd: bytes) -> bytes: + val = 1 if self.state["attached"] else 0 + return f'+CGATT: {val}\r\nOK\r\n'.encode() + + def _handle_cgpaddr(self, cmd: bytes) -> bytes: + if self.state["pdp_active"]: + return b'+CGPADDR: 1,"10.0.0.5"\r\nOK\r\n' + return b'+CGPADDR: 1,""\r\nOK\r\n' + + def _handle_qeng(self, cmd: bytes) -> bytes: + # Realistic LTE response with enough fields for index 13 + # +QENG: "servingcell",,"LTE",,,,,,,,,,,,,... + return b'+QENG: "servingcell","NOCONN","LTE","FDD",460,00,1A2B3C,123,2500,3,"5M","5M",1234,-105,-15,-70,10,20\r\nOK\r\n' + + def _handle_qiact(self, cmd: bytes) -> bytes: + self.state["pdp_active"] = True + return b'OK\r\n' + + def _handle_qiopen(self, cmd: bytes) -> bytes: + parts = cmd.split(b',') + connect_id = int(parts[1]) + self.state["sockets"][connect_id] = "OPEN" + self._rx_buffer += f'+QIOPEN: {connect_id},0\r\n'.encode() + return b'OK\r\n' + + def _handle_qiclose(self, cmd: bytes) -> bytes: + return b'OK\r\n' + + def _handle_qisendex(self, cmd: bytes) -> bytes: + return b'SEND OK\r\n' + + def _handle_qird(self, cmd: bytes) -> bytes: + data = b"Hello from Mock!" + return f'+QIRD: {len(data)}\r\n'.encode() + data + b'\r\nOK\r\n' + + def _handle_crsm(self, cmd: bytes) -> bytes: + if b'28539' in cmd: + return b'+CRSM: 144,0,"64F01064F020FFFFFFFFFFFF"\r\nOK\r\n' + return b'+CRSM: 144,0,""\r\nOK\r\n' + + def _handle_qfupl(self, cmd: bytes) -> bytes: + self._raw_mode = True + return b'CONNECT\r\n' + + def _handle_qfdel(self, cmd: bytes) -> bytes: + return b'OK\r\n' + + def _handle_qicsgp(self, cmd: bytes) -> bytes: + return b'OK\r\n' + + def add_handler(self, command_prefix: bytes, handler: Callable[[bytes], bytes]): + self._handlers[command_prefix] = handler + + def close(self): + self.is_open = False + + def write(self, data: bytes): + if not self.is_open: raise Exception("Port is closed") + if self._raw_mode: + self._raw_mode = False + self._rx_buffer += b'OK\r\n' + return + self._tx_buffer += data + while b'\r' in self._tx_buffer: + cmd_full, rest = self._tx_buffer.split(b'\r', 1) + self._tx_buffer = rest + cmd = cmd_full.strip() + if not cmd: continue + response = self._process_command(cmd) + if response: + self._rx_buffer += response + + def _process_command(self, cmd: bytes) -> bytes: + if cmd in self._query_responses: + return self._query_responses[cmd] + for prefix, handler in self._handlers.items(): + if cmd.startswith(prefix): + return handler(cmd) + return b'ERROR\r\n' + + def read(self, size: int = 1) -> bytes: + if not self.is_open: return b"" + if size > len(self._rx_buffer): + data = self._rx_buffer + self._rx_buffer = b"" + return data + data = self._rx_buffer[:size] + self._rx_buffer = self._rx_buffer[size:] + return data + + def readline(self) -> bytes: + if not self.is_open: return b"" + if b'\n' in self._rx_buffer: + line, rest = self._rx_buffer.split(b'\n', 1) + self._rx_buffer = rest + return line + b'\n' + + # If we have data but no newline, returns empty to simulate partial read? + # Standard readline blocks. Here we just return empty if incomplete. + if not self._rx_buffer: + time.sleep(0.001) # Reduced sleep for faster tests + return b"" + + @property + def in_waiting(self): + return len(self._rx_buffer) diff --git a/m2m/serial/serial_port.py b/m2m/serial/serial_port.py new file mode 100644 index 0000000..af67c6c --- /dev/null +++ b/m2m/serial/serial_port.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import logging +import time +import threading +from typing import Optional, Union + +import serial + +from m2m.serial.urc_handler import UrcHandler +from m2m.serial.mock_serial import MockSerial +from m2m.exceptions import SerialError + +logger = logging.getLogger('m2m.serial') + +class SerialPort(UrcHandler): + """ + Serial port class tailored for AT-command communication. + Thread-safe using RLock. + """ + + def __init__(self, port: str, baudrate: int = 9600, timeout: float = 1.0, delimiter: bytes = b'\r', **kwargs): + super().__init__() + self._delimiter = delimiter + self._port_name = port + self.lock = threading.RLock() + + try: + if port.upper() == 'MOCK': + self._serial = MockSerial(port=port, baudrate=baudrate, timeout=timeout, **kwargs) + else: + self._serial = serial.serial_for_url(url=port, baudrate=baudrate, timeout=timeout, **kwargs) + if port.startswith(('socket://', 'rfc2217://')): + time.sleep(0.5) # Settling time for network sockets + except (serial.SerialException, ValueError) as e: + logger.error(f"Failed to open port {port}: {e}") + raise SerialError(f"Could not open {port}") from e + + logger.debug(f"Opened serial port: {port} @ {baudrate}") + + def __enter__(self): return self + def __exit__(self, *args): self.close() + + def close(self): + """Closes the serial port.""" + if self._serial and self._serial.is_open: + self._serial.close() + logger.debug(f"Closed serial port: {self._port_name}") + + def send_cmd(self, cmd: Union[bytes, str]) -> None: + """Sends a command appended with the delimiter.""" + if isinstance(cmd, str): + cmd = cmd.encode('ascii') + full_cmd = cmd + self._delimiter + with self.lock: + try: + logger.debug(f"TX: {full_cmd!r}") + self._serial.write(full_cmd) + except (serial.SerialException, ConnectionError, OSError) as e: + raise SerialError(f"Write failed: {e}") from e + + def read_until(self, terminator: bytes = b'OK', timeout: float = 5.0) -> bytes: + """Reads until the terminator is found or timeout occurs.""" + buffer = bytearray() + end_time = time.time() + timeout + + with self.lock: + while time.time() < end_time: + # We read whatever is available to catch partial lines or prompts + try: + in_waiting = self._serial.in_waiting + except (AttributeError, TypeError): + in_waiting = 0 + + if isinstance(in_waiting, int) and in_waiting > 0: + data = self._serial.read(in_waiting) + if data: + buffer.extend(data) + # Process URCs inside the buffer + for line in bytes(buffer).splitlines(keepends=True): + if line.endswith((b'\r', b'\n')): + self.check_urc(line.strip()) + if terminator in buffer: break + else: + time.sleep(0.01) + + res = bytes(buffer) + if res: logger.debug(f"RX: {res!r}") + return res + + def read_until_notify(self, notification: bytes, timeout: float = 5.0) -> Optional[bytes]: + """Waits specifically for a URC notification.""" + end_time = time.time() + timeout + while time.time() < end_time: + self._read_line() # Pumping the reader + if val := self.pop_urc_value(notification): + return val + time.sleep(0.01) + return None + + def read_any(self, timeout: float = 5.0) -> bytes: + """Reads anything available within the timeout.""" + buffer = bytearray() + end_time = time.time() + timeout + with self.lock: + while time.time() < end_time: + try: + in_waiting = self._serial.in_waiting + except (AttributeError, TypeError): + in_waiting = 0 + + if isinstance(in_waiting, int) and in_waiting > 0: + buffer.extend(self._serial.read(in_waiting)) + else: + if timeout < 0.2: break + time.sleep(0.01) + return bytes(buffer) + + def _read_line(self) -> bytes: + """Internal method to read a single line and process URCs.""" + try: + try: + in_waiting = self._serial.in_waiting + except (AttributeError, TypeError): + in_waiting = 0 + + line = self._serial.readline() + if line: + logger.debug(f"RX Line: {line!r}") + if self.check_urc(line.strip()): return b'' + elif self._port_name.startswith(('socket://', 'rfc2217://')) and (isinstance(in_waiting, int) and in_waiting == 0): + time.sleep(0.01) + return line + except (serial.SerialException, ConnectionError, OSError) as e: + raise SerialError(f"Read failed: {e}") from e diff --git a/m2m/serial/urc_handler.py b/m2m/serial/urc_handler.py new file mode 100644 index 0000000..b3a2cdc --- /dev/null +++ b/m2m/serial/urc_handler.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import logging +from typing import Dict, List, Callable, Optional, Union, Any + +logger = logging.getLogger('m2m.urc') + +class UrcHandler: + """ + Manages Unsolicited Result Codes (URC). + Registered URCs are tracked in an internal registry with optional callbacks. + """ + + def __init__(self): + logger.debug("Initialized URC registry") + # Registry structure: { prefix: {'values': [], 'callbacks': [], 'store': bool} } + self.__urc_registry: Dict[bytes, Dict[str, Any]] = {} + + def register_urc(self, urc: Union[bytes, str], store_values: bool = True, + callbacks: Optional[Union[Callable[[bytes], None], List[Callable[[bytes], None]]]] = None) -> None: + """Registers a URC prefix to the handler.""" + if isinstance(urc, str): + urc = urc.encode('ascii') + + if callbacks is None: + cb_list = [] + elif isinstance(callbacks, list): + cb_list = callbacks + else: + cb_list = [callbacks] + + if urc in self.__urc_registry: + logger.error(f"Duplicate URC: {urc!r}") + raise KeyError(f"URC {urc!r} already registered.") + + self.__urc_registry[urc] = { + 'values': [], + 'callbacks': cb_list, + 'store': store_values + } + logger.debug(f"Registered URC: {urc!r}") + + def unregister_urc(self, urc: bytes) -> None: + """Removes the URC from the registry.""" + if urc in self.__urc_registry: + del self.__urc_registry[urc] + else: + logger.warning(f"Attempted to unregister unknown URC: {urc!r}") + + def pop_urc_value(self, urc: bytes) -> Optional[bytes]: + """Retrieves and removes the oldest stored value for a URC.""" + if item := self.__urc_registry.get(urc): + values = item['values'] + return values.pop(0) if values else None + return None + + def get_urc_values(self, urc: bytes) -> List[bytes]: + """Get all stored values for a URC.""" + return self.__urc_registry.get(urc, {}).get('values', []) + + def clear_urc_values(self, urc: bytes) -> None: + """Clears stored values for a URC.""" + if item := self.__urc_registry.get(urc): + item['values'].clear() + + def check_urc(self, msg: bytes) -> bool: + """Checks if msg starts with any registered URC and fires callbacks.""" + # We still iterate because prefixes can be varying lengths + # and we don't have a fixed separator for all possible URCs. + for urc, entry in self.__urc_registry.items(): + if msg.startswith(urc): + payload = msg[len(urc):].strip() + logger.debug(f"URC Match: {urc!r} -> {payload!r}") + + if entry['store']: + entry['values'].append(payload) + + for cb in entry['callbacks']: + try: cb(payload) + except Exception as e: + logger.error(f"URC Callback Error ({urc!r}): {e}") + return True + return False + + def add_callback(self, urc: bytes, callback: Callable[[bytes], None]) -> None: + """Adds a callback to an existing URC registration.""" + if item := self.__urc_registry.get(urc): + item['callbacks'].append(callback) + else: + raise KeyError(f"URC {urc!r} not registered.") diff --git a/m2m/utils.py b/m2m/utils.py new file mode 100644 index 0000000..680b680 --- /dev/null +++ b/m2m/utils.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Utilities for data encoding/decoding (PSM timers, SIM BCD, PLMN lists). +""" + +from typing import List, Union, Optional + +def encode_psm_timer(seconds: int, is_t3324: bool = False) -> str: + """Encodes seconds into 3GPP binary string.""" + if is_t3324: + if seconds <= 2 * 31: return f"000{seconds // 2:05b}" + if seconds <= 60 * 31: return f"001{seconds // 60:05b}" + if seconds <= 360 * 31: return f"010{seconds // 360:05b}" + return "11100000" + else: + if seconds <= 30 * 31: return f"100{seconds // 30:05b}" + if seconds <= 60 * 31: return f"101{seconds // 60:05b}" + if seconds <= 3600 * 31: return f"001{seconds // 3600:05b}" + return f"010{min(31, seconds // 36000):05b}" + +def decode_plmn(bcd_data: bytes) -> Optional[str]: + """Decodes a 3-byte BCD PLMN (MCC/MNC) from SIM storage.""" + if len(bcd_data) < 3 or bcd_data == b'\xff\xff\xff': + return None + + mcc = f"{(bcd_data[0] & 0x0F)}{(bcd_data[0] >> 4)}{(bcd_data[1] & 0x0F)}" + mnc_digit3 = (bcd_data[1] >> 4) + mnc = f"{(bcd_data[2] & 0x0F)}{(bcd_data[2] >> 4)}" + if mnc_digit3 != 0xF: + mnc = f"{(bcd_data[2] & 0x0F)}{(bcd_data[2] >> 4)}{mnc_digit3}" + return f"{mcc}-{mnc}" + +def decode_act(act_bytes: bytes) -> str: + """Decodes Access Technology bitmask (2 bytes).""" + if len(act_bytes) < 2: return "Unknown" + techs = [] + b1, b2 = act_bytes[0], act_bytes[1] + if b1 & 0x80: techs.append("UTRAN") + if b1 & 0x40: techs.append("E-UTRAN") + if b1 & 0x20: techs.append("NG-RAN") + if b2 & 0x80: techs.append("GSM") + if b2 & 0x40: techs.append("cdma2000 HRPD") + if b2 & 0x20: techs.append("cdma2000 1xRTT") + return "/".join(techs) if techs else "None" + +def decode_plmn_list(data: bytes, has_act: bool = False) -> List[str]: + """Decodes a list of PLMNs, optionally with Access Technology.""" + stride = 5 if has_act else 3 + results = [] + for i in range(0, len(data), stride): + chunk = data[i:i+stride] + plmn = decode_plmn(chunk[:3]) + if plmn: + if has_act: + act = decode_act(chunk[3:5]) + results.append(f"{plmn} [{act}]") + else: + results.append(plmn) + return results + +def decode_iccid(bcd_data: bytes) -> str: + """Decodes raw ICCID bytes (swapped nibbles).""" + return "".join([f"{(b & 0x0F)}{(b >> 4):X}" for b in bcd_data]).replace("F", "") + +def bands_to_hex_mask(bands: List[int]) -> str: + """Converts a list of band numbers into a hex bitmask string.""" + mask = 0 + for b in bands: + if b > 0: + mask |= (1 << (b - 1)) + return f"{mask:X}" + +def hex_mask_to_bands(hex_mask: str) -> List[int]: + """Converts a hex bitmask string back into a list of band numbers.""" + try: + mask = int(hex_mask, 16) + except (ValueError, TypeError): + return [] + + bands = [] + for i in range(128): # Support up to 128 bands + if (mask >> i) & 1: + bands.append(i + 1) + return bands + +def fmt_val(val: Union[str, List, None]) -> str: + """Helper to format values for display in reports and shells.""" + if val is None or val == "" or val == []: + return "None (Not set on SIM)" + if isinstance(val, list): + return ", ".join(val) + return str(val) diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..c47e3c2 --- /dev/null +++ b/setup.py @@ -0,0 +1,36 @@ +from setuptools import setup, find_packages + +with open("README.md", "r", encoding="utf-8") as fh: + long_description = fh.read() + +setup( + name='m2m-python', + version='0.2.0', + description='Python library for controlling IoT modules (Quectel BG96, BG95, BC66, HiSilicon)', + long_description=long_description, + long_description_content_type="text/markdown", + author='Yassine Amraue', + url='https://github.com/yassine/m2m-python', + packages=find_packages(exclude=["tests*", "examples*"]), + classifiers=[ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + "Topic :: Software Development :: Embedded Systems", + "Topic :: System :: Hardware :: Hardware Drivers", + "Intended Audience :: Developers", + ], + python_requires='>=3.6', + install_requires=[ + 'pyserial>=3.5', + 'prompt_toolkit>=3.0', + ], + extras_require={ + 'dev': ['pytest', 'mock'], + }, + entry_points={ + 'console_scripts': [ + 'm2m-shell=m2m.cli:main', + ], + }, +) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..521515c --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,9 @@ +import pytest +from m2m.nbiot import factory + +@pytest.fixture +def bg95_mock(): + """Returns a BG95 module instance connected to the MockSerial engine.""" + module = factory('BG95', 'MOCK') + with module: + yield module diff --git a/tests/test_factory.py b/tests/test_factory.py new file mode 100644 index 0000000..bbbf558 --- /dev/null +++ b/tests/test_factory.py @@ -0,0 +1,22 @@ +import pytest +from unittest.mock import MagicMock, patch +from m2m.nbiot import factory +from m2m.nbiot.module_bg96 import ModuleBG96 + +@patch('m2m.serial.serial_port.serial.Serial') +def test_factory_creates_bg96(mock_serial_cls): + """Verify that the factory correctly instantiates a BG96 module with a mocked serial port.""" + # Setup mock to avoid hardware dependency + mock_inst = MagicMock() + mock_serial_cls.return_value = mock_inst + + module = factory('BG96', '/dev/ttyUSB0') + + assert isinstance(module, ModuleBG96) + # Corrected attribute access path + assert module.s_port._port_name == '/dev/ttyUSB0' + +def test_factory_invalid_chipset(): + """Verify that an invalid chipset raises a ValueError.""" + with pytest.raises(ValueError): + factory('INVALID_CHIPSET', '/dev/ttyUSB0') diff --git a/tests/test_module_base.py b/tests/test_module_base.py new file mode 100644 index 0000000..6454bbb --- /dev/null +++ b/tests/test_module_base.py @@ -0,0 +1,40 @@ +import pytest + +def test_initial_state(bg95_mock): + """Verify initial state of the mock module.""" + assert bg95_mock.check_state() is True + assert bg95_mock.read_firmware_version() == "Mock_Modem_v2.0" + assert bg95_mock.read_imei() == "864215030000000" + +def test_radio_control(bg95_mock): + """Test enabling/disabling radio functionality.""" + # Turn Off + assert bg95_mock.radio_off() is True + assert bg95_mock.is_radio_on() is False + + # Turn On + assert bg95_mock.radio_on() is True + assert bg95_mock.is_radio_on() is True + +def test_network_attachment(bg95_mock): + """Test network attach/detach sequence.""" + bg95_mock.radio_on() + + # Detach + assert bg95_mock.detach_network() is True + assert bg95_mock.is_attached() is False + + # Attach + assert bg95_mock.attach_network() is True + assert bg95_mock.is_attached() is True + +def test_sim_unlock(bg95_mock): + """Test SIM PIN unlocking.""" + # Mock starts in READY or some state, unlock should return True + assert bg95_mock.unlock_sim("1234") is True + +def test_signal_quality(bg95_mock): + """Test reading signal quality.""" + rssi, ber = bg95_mock.get_signal_quality() + assert rssi == 25 + assert ber == 99 diff --git a/tests/test_quectel_module.py b/tests/test_quectel_module.py new file mode 100644 index 0000000..621c801 --- /dev/null +++ b/tests/test_quectel_module.py @@ -0,0 +1,50 @@ +import pytest + +def test_pdp_context(bg95_mock): + """Test PDP context configuration and activation.""" + # Configure + assert bg95_mock.configure_pdp_context(1, "IP", "iot.apn") is True + + # Activate + assert bg95_mock.activate_pdp_context(1) is True + + # Check IP + ip = bg95_mock.get_ip_address(1) + assert ip == "10.0.0.5" + +def test_socket_lifecycle(bg95_mock): + """Test opening, sending, and closing a socket.""" + # Open + # Note: open_socket returns -1 on failure, or 0/result on success/URC + # In our mock, it returns 0 via URC simulation. + # The synchronous open_socket method waits for it. + res = bg95_mock.open_socket(1, 0, "TCP", "8.8.8.8", 80) + assert res == 0 + + # Send + assert bg95_mock.send_data_hex(0, b"Hello") is True + + # Receive + data = bg95_mock.receive_data(0, 100) + assert data == b"Hello from Mock!" + + # Close + assert bg95_mock.close_socket(0) is True + +def test_file_operations(bg95_mock): + """Test file upload and delete.""" + assert bg95_mock.upload_file("test.txt", b"content") is True + assert bg95_mock.delete_file("test.txt") is True + +def test_engineering_info(bg95_mock): + """Test retrieving serving cell info.""" + info = bg95_mock.get_serving_cell_info() + assert info['tech'] == "LTE" + assert info['cellid'] == "1A2B3C" + assert info['rsrp'] == -105 + +def test_sim_diagnostics(bg95_mock): + """Test reading forbidden PLMNs.""" + fplmns = bg95_mock.read_forbidden_plmns() + # Mock returns "64F010..." -> 460-01 + assert "460-01" in fplmns diff --git a/tests/test_serial_port.py b/tests/test_serial_port.py new file mode 100644 index 0000000..66674d8 --- /dev/null +++ b/tests/test_serial_port.py @@ -0,0 +1,46 @@ +import pytest +from unittest.mock import patch, MagicMock +import serial +from m2m.serial.serial_port import SerialPort +from m2m.exceptions import SerialError + +def test_serial_port_standard(): + with patch('serial.serial_for_url') as mock_for_url: + mock_serial = MagicMock() + mock_for_url.return_value = mock_serial + + with SerialPort(port='/dev/ttyUSB0', baudrate=115200) as sp: + mock_for_url.assert_called_once_with( + url='/dev/ttyUSB0', + baudrate=115200, + timeout=1.0 + ) + +def test_serial_port_tcp_url(): + with patch('serial.serial_for_url') as mock_for_url: + mock_serial = MagicMock() + mock_for_url.return_value = mock_serial + + url = 'socket://1.2.3.4:5678' + with SerialPort(port=url, baudrate=9600) as sp: + mock_for_url.assert_called_once_with( + url=url, + baudrate=9600, + timeout=1.0 + ) + +def test_serial_port_invalid_url(): + with patch('serial.serial_for_url') as mock_for_url: + mock_for_url.side_effect = ValueError("invalid URL") + + with pytest.raises(SerialError) as excinfo: + SerialPort(port='invalid://port') + assert "Could not open invalid://port" in str(excinfo.value) + +def test_serial_port_mock(): + # Verify 'MOCK' still uses MockSerial and NOT serial_for_url + with patch('m2m.serial.serial_port.MockSerial') as mock_mock: + with patch('serial.serial_for_url') as mock_for_url: + sp = SerialPort(port='MOCK') + mock_mock.assert_called_once() + mock_for_url.assert_not_called()