481 lines
21 KiB
Python
481 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import sys
|
|
import logging
|
|
import shlex
|
|
import time
|
|
import threading
|
|
from typing import Optional, List, Dict, Union
|
|
|
|
from prompt_toolkit import PromptSession
|
|
from prompt_toolkit.completion import WordCompleter
|
|
from prompt_toolkit.shortcuts import print_formatted_text, clear as clear_screen
|
|
from prompt_toolkit.formatted_text import HTML
|
|
from prompt_toolkit.styles import Style
|
|
from prompt_toolkit.patch_stdout import patch_stdout
|
|
|
|
from m2m.nbiot import factory
|
|
from m2m.utils import fmt_val
|
|
from m2m.exceptions import SerialError, NetworkError
|
|
|
|
logger = logging.getLogger('m2m.interactive')
|
|
|
|
class ModemShell:
|
|
"""
|
|
Interactive shell for controlling IoT modules.
|
|
"""
|
|
def __init__(self, module_type: str, port: str, apn: str = 'iot.telefonica.de'):
|
|
self.module_type = module_type
|
|
self.port = port
|
|
self.apn = apn
|
|
try:
|
|
self.module = factory(module_type, port)
|
|
except Exception as e:
|
|
print_formatted_text(HTML(f'<red>Error: Could not initialize module on {port}: {e}</red>'))
|
|
sys.exit(1)
|
|
|
|
# Command Dispatch Table
|
|
self.commands = {
|
|
'info': self.do_info,
|
|
'status': self.do_status,
|
|
'nw_info': self.do_nw_info,
|
|
'cells': self.do_cells,
|
|
'forensics': self.do_forensics,
|
|
'scan': self.do_scan,
|
|
'connect': self.do_connect,
|
|
'disconnect': self.do_disconnect,
|
|
'operator': self.do_operator,
|
|
'ping': self.do_ping,
|
|
'dns': self.do_dns,
|
|
'http_get': self.do_http_get,
|
|
'udp_send': self.do_udp_send,
|
|
'udp_listen': self.do_udp_listen,
|
|
'tcp_client': self.do_tcp_client,
|
|
'sockets': self.do_sockets,
|
|
'close': self.do_close,
|
|
'recv': self.do_recv,
|
|
'sms_send': self.do_sms_send,
|
|
'sms_list': self.do_sms_list,
|
|
'radio': self.do_radio,
|
|
'reboot': self.do_reboot,
|
|
'psm': self.do_psm,
|
|
'mode': self.do_mode,
|
|
'edrx': self.do_edrx,
|
|
'bands': self.do_bands,
|
|
'gpio': self.do_gpio,
|
|
'ri': self.do_ri,
|
|
'urc_delay': self.do_urc_delay,
|
|
'at': self.do_at,
|
|
'error': self.do_error,
|
|
'ussd': self.do_ussd,
|
|
'clock': self.do_clock,
|
|
'temp': self.do_temp,
|
|
'ls': self.do_ls,
|
|
'power_off': self.do_power_off,
|
|
'help': self.do_help,
|
|
'clear': self.do_clear,
|
|
'exit': self.do_exit,
|
|
'quit': self.do_exit
|
|
}
|
|
|
|
self.completer = WordCompleter(
|
|
list(self.commands.keys()) +
|
|
['on', 'off', 'loc', 'status', 'disable', 'sync', 'show', 'gsm', 'catm', 'nb', 'nb+emtc', 'all', 'opt'],
|
|
ignore_case=True
|
|
)
|
|
self.style = Style.from_dict({
|
|
'prompt': '#00ff00 bold',
|
|
'command': '#ffffff',
|
|
'error': '#ff0000',
|
|
'info': '#00ffff',
|
|
'success': '#00ff00',
|
|
'header': '#ffff00 bold',
|
|
'alert': '#ff8800 bold',
|
|
})
|
|
|
|
self._stop_bg = threading.Event()
|
|
self._bg_thread = None
|
|
|
|
# Register callbacks
|
|
if hasattr(self.module.s_port, 'add_callback'):
|
|
self.module.s_port.add_callback(b'+QIURC: "recv",', self._on_data_received)
|
|
self.module.s_port.add_callback(b'+QIURC: "pdpdeact",', self._on_pdp_deact)
|
|
|
|
def _on_data_received(self, payload: bytes):
|
|
try:
|
|
conn_id = payload.decode().strip()
|
|
print_formatted_text(HTML(f'\n<info>[DATA] Socket {conn_id} received data.</info> Type "recv {conn_id}"'))
|
|
except: pass
|
|
|
|
def _on_pdp_deact(self, payload: bytes):
|
|
print_formatted_text(HTML('\n<alert>[NETWORK] PDP Context Deactivated by network.</alert>'))
|
|
|
|
def _bg_worker(self):
|
|
"""Polls for URCs when idle without blocking the main lock indefinitely."""
|
|
while not self._stop_bg.is_set():
|
|
try:
|
|
if self.module.s_port._serial.in_waiting > 0:
|
|
if self.module.s_port.lock.acquire(blocking=False):
|
|
try:
|
|
self.module.s_port.read_any(timeout=0.1)
|
|
finally:
|
|
self.module.s_port.lock.release()
|
|
except: pass
|
|
self._stop_bg.wait(0.5)
|
|
|
|
def run(self):
|
|
print_formatted_text(HTML(f'<b>Interactive Modem Shell</b> ({self.module_type} on {self.port})'))
|
|
print_formatted_text(HTML('Type <u>help</u> for available commands, <u>exit</u> to quit.\n'))
|
|
|
|
session = PromptSession(completer=self.completer, style=self.style)
|
|
self._bg_thread = threading.Thread(target=self._bg_worker, daemon=True)
|
|
self._bg_thread.start()
|
|
|
|
with patch_stdout():
|
|
with self.module:
|
|
while True:
|
|
try:
|
|
text = session.prompt(HTML('<prompt>m2m></prompt> '))
|
|
except (KeyboardInterrupt, EOFError): break
|
|
|
|
if not (stripped := text.strip()): continue
|
|
|
|
try:
|
|
parts = shlex.split(stripped)
|
|
except ValueError as e:
|
|
print_formatted_text(HTML(f'<error>Parse error: {e}</error>')); continue
|
|
|
|
cmd, args = parts[0].lower(), parts[1:]
|
|
if handler := self.commands.get(cmd):
|
|
try: handler(args)
|
|
except Exception as e:
|
|
print_formatted_text(HTML(f'<error>Error: {e}</error>'))
|
|
else:
|
|
print_formatted_text(HTML(f'<error>Unknown command: {cmd}</error>'))
|
|
|
|
self._stop_bg.set()
|
|
|
|
def do_help(self, args):
|
|
"""Show available commands."""
|
|
print_formatted_text(HTML('<header>Available Commands:</header>'))
|
|
for c in sorted(self.commands.keys()):
|
|
doc = (self.commands[c].__doc__ or "No description.").replace('<', '<').replace('>', '>')
|
|
print_formatted_text(HTML(f' <info>{c:<12}</info> {doc}'))
|
|
|
|
def do_clear(self, _):
|
|
"""Clear the terminal screen."""
|
|
clear_screen()
|
|
|
|
def do_info(self, _):
|
|
"""Read detailed module identification and configuration."""
|
|
print_formatted_text(HTML('<info>Reading identification...</info>'))
|
|
print(f" IMEI: {self.module.read_imei() or 'N/A'}")
|
|
print(f" IMSI: {self.module.read_imsi() or 'N/A'}")
|
|
print(f" ICCID: {self.module.read_iccid() or 'N/A'}")
|
|
print(f" FW: {self.module.read_firmware_version() or 'N/A'}")
|
|
|
|
if hasattr(self.module, 'get_nw_scan_mode'):
|
|
mode_map = {0: "Automatic", 1: "GSM Only", 2: "LTE Only", 3: "GSM + LTE"}
|
|
print(f" ScanMode: {mode_map.get(self.module.get_nw_scan_mode(), 'Unknown')}")
|
|
|
|
if hasattr(self.module, 'get_iot_op_mode_val'):
|
|
op_map = {0: "eMTC", 1: "NB-IoT", 2: "eMTC + NB-IoT"}
|
|
print(f" IoTMode: {op_map.get(self.module.get_iot_op_mode_val(), 'Unknown')}")
|
|
|
|
def do_status(self, _):
|
|
"""Show registration and signal status."""
|
|
print_formatted_text(HTML('<info>Checking status...</info>'))
|
|
rssi, ber = self.module.get_signal_quality()
|
|
print(f" Signal: RSSI={rssi}, BER={ber}")
|
|
print(f" Registered: {'Yes' if self.module.is_registered() else 'No'}")
|
|
print(f" Operator: {self.module.get_current_operator() or 'None'}")
|
|
print(f" IP Address: {self.module.get_ip_address(1) or 'None'}")
|
|
|
|
def do_nw_info(self, _):
|
|
"""Show detailed network and serving cell info."""
|
|
if hasattr(self.module, 'get_network_info'):
|
|
print(f" Network: {self.module.get_network_info()}")
|
|
if hasattr(self.module, 'get_serving_cell_info'):
|
|
print(f" Cell: {self.module.get_serving_cell_info()}")
|
|
|
|
def do_cells(self, _):
|
|
"""Show serving and neighbor cell information."""
|
|
if hasattr(self.module, 'get_serving_cell_info'):
|
|
print(f" Serving: {self.module.get_serving_cell_info()}")
|
|
if hasattr(self.module, 'get_neighbor_cells'):
|
|
for n in self.module.get_neighbor_cells():
|
|
print(f" - {n}")
|
|
|
|
def do_forensics(self, _):
|
|
"""Full SIM card forensics report."""
|
|
print_formatted_text(HTML('<header>SIM CARD NETWORK FORENSICS</header>'))
|
|
print(f"{'ICCID:':<25} {fmt_val(self.module.read_iccid())}")
|
|
print(f"{'IMSI:':<25} {fmt_val(self.module.read_imsi())}")
|
|
print(f"{'Home PLMN:':<25} {fmt_val(self.module.get_home_plmn())}")
|
|
print(f"{'Registered PLMN:':<25} {fmt_val(self.module.get_registered_plmn())}")
|
|
print(f"{'Forbidden PLMNs:':<25} {fmt_val(self.module.read_forbidden_plmns())}")
|
|
print(f"{'Equivalent Home:':<25} {fmt_val(self.module.read_ehplmns())}")
|
|
|
|
def do_scan(self, _):
|
|
"""Scan for available network operators."""
|
|
print_formatted_text(HTML('<info>Scanning for operators...</info>'))
|
|
ops = self.module.search_operators(timeout=180)
|
|
for op in (ops or []):
|
|
s_map = {'0': 'Unknown', '1': 'Available', '2': 'Current', '3': 'Forbidden'}
|
|
print(f" - {op['long']} ({op['mccmnc']}) [{s_map.get(op['status'], op['status'])}, Tech: {op['act']}]")
|
|
|
|
def do_connect(self, args):
|
|
"""Connect to network. Usage: connect [apn] [--force]"""
|
|
force = '--force' in args
|
|
clean_args = [a for a in args if a != '--force']
|
|
apn = clean_args[0] if clean_args else self.apn
|
|
print_formatted_text(HTML(f'<info>Connecting with APN: {apn}...</info>'))
|
|
if self.module.connect_network(apn=apn, force=force):
|
|
print_formatted_text(HTML('<success>Connected!</success>'))
|
|
else: print_formatted_text(HTML('<error>Failed.</error>'))
|
|
|
|
def do_disconnect(self, _):
|
|
"""Deactivate PDP context."""
|
|
if hasattr(self.module, 'deactivate_pdp_context') and self.module.deactivate_pdp_context(1):
|
|
print_formatted_text(HTML('<success>Disconnected.</success>'))
|
|
else: print_formatted_text(HTML('<error>Failed or not supported.</error>'))
|
|
|
|
def do_operator(self, args):
|
|
"""Select operator manually. Usage: operator <mccmnc> [act]"""
|
|
if not args: print("Usage: operator <mccmnc> [act]"); return
|
|
plmn, act = args[0], int(args[1]) if len(args) > 1 else 8
|
|
if self.module.set_operator(plmn, act=act):
|
|
print_formatted_text(HTML('<success>Done.</success>'))
|
|
else: print_formatted_text(HTML('<error>Failed.</error>'))
|
|
|
|
def do_ping(self, args):
|
|
"""Ping a host. Usage: ping <host> [num]"""
|
|
if not args: print("Usage: ping <host> [num]"); return
|
|
host, num = args[0], int(args[1]) if len(args) > 1 else 4
|
|
res = self.module.ping(host, num=num)
|
|
for r in res: print(f" {r}")
|
|
if (s := self.module.parse_ping_summary(res)):
|
|
print_formatted_text(HTML(f'<success>RTT avg: {s["avg"]}ms</success>'))
|
|
|
|
def do_dns(self, args):
|
|
"""Resolve a hostname. Usage: dns <host>"""
|
|
if not args: print("Usage: dns <host>"); return
|
|
if (ips := self.module.dns_query(args[0])):
|
|
print(f" IPs: {', '.join(ips)}")
|
|
else: print_formatted_text(HTML('<error>Failed.</error>'))
|
|
|
|
def do_http_get(self, args):
|
|
"""Perform HTTP GET. Usage: http_get <url>"""
|
|
if not args: print("Usage: http_get <url>"); return
|
|
url = args[0] if args[0].startswith('http') else f'http://{args[0]}'
|
|
if self.module.http_set_url(url) and self.module.http_get():
|
|
print("-" * 40 + f"\n{self.module.http_read_response()}\n" + "-" * 40)
|
|
else: print_formatted_text(HTML('<error>Failed.</error>'))
|
|
|
|
def do_udp_send(self, args):
|
|
"""Send a UDP packet. Usage: udp_send <host> <port> <data>"""
|
|
if len(args) < 3: print("Usage: udp_send <host> <port> <data>"); return
|
|
host, port, data = args[0], int(args[1]), " ".join(args[2:])
|
|
sock_id = -1
|
|
if hasattr(self.module, 'get_socket_state'):
|
|
for s in self.module.get_socket_state():
|
|
if s['type'] == 'UDP' and s['remote'] == f"{host}:{port}":
|
|
sock_id = int(s['id']); break
|
|
if sock_id == -1:
|
|
sock_id = self.module.open_socket(service_type="UDP", remote_ip=host, remote_port=port)
|
|
if sock_id >= 0 and self.module.send_data_hex(sock_id, data.encode()):
|
|
print_formatted_text(HTML('<success>Sent!</success>'))
|
|
else: print_formatted_text(HTML('<error>Failed.</error>'))
|
|
|
|
def do_udp_listen(self, args):
|
|
"""Open a local UDP listener. Usage: udp_listen <port>"""
|
|
if not args: print("Usage: udp_listen <port>"); return
|
|
sid = self.module.open_socket(service_type="UDP SERVICE", remote_ip="127.0.0.1", local_port=int(args[0]))
|
|
if sid >= 0: print_formatted_text(HTML(f'<success>Listening on socket {sid}.</success>'))
|
|
else: print_formatted_text(HTML('<error>Failed.</error>'))
|
|
|
|
def do_tcp_client(self, args):
|
|
"""Connect to a TCP server. Usage: tcp_client <host> <port>"""
|
|
if len(args) < 2: print("Usage: tcp_client <host> <port>"); return
|
|
sid = self.module.open_socket(service_type="TCP", remote_ip=args[0], remote_port=int(args[1]))
|
|
if sid >= 0: print_formatted_text(HTML(f'<success>Socket {sid} connected.</success>'))
|
|
else: print_formatted_text(HTML('<error>Failed.</error>'))
|
|
|
|
def do_sockets(self, _):
|
|
"""List active sockets."""
|
|
if hasattr(self.module, 'get_socket_state'):
|
|
socks = self.module.get_socket_state()
|
|
for s in socks: print(f" ID {s['id']}: {s['type']:<8} | {s['remote']:<20} | {s['state']}")
|
|
else: print("Not supported.")
|
|
|
|
def do_close(self, args):
|
|
"""Close a socket. Usage: close <id>"""
|
|
if args and self.module.close_socket(int(args[0])):
|
|
print_formatted_text(HTML('<success>Closed.</success>'))
|
|
else: print_formatted_text(HTML('<error>Failed.</error>'))
|
|
|
|
def do_recv(self, args):
|
|
"""Read socket data. Usage: recv <id> [len]"""
|
|
if not args: print("Usage: recv <id> [len]"); return
|
|
data = self.module.receive_data(int(args[0]), int(args[1]) if len(args) > 1 else 1500)
|
|
if data: print("-" * 20 + f"\n{data.decode(errors='replace')}\n" + "-" * 20)
|
|
else: print(" No data.")
|
|
|
|
def do_sms_send(self, args):
|
|
"""Send an SMS. Usage: sms_send <number> <message>"""
|
|
if len(args) < 2:
|
|
print("Usage: sms_send <number> <message>")
|
|
return
|
|
|
|
number, message = args[0], " ".join(args[1:])
|
|
print_formatted_text(HTML(f'<info>Sending SMS to {number}...</info>'))
|
|
if self.module.send_sms(number, message):
|
|
print_formatted_text(HTML('<success>Sent!</success>'))
|
|
else:
|
|
print_formatted_text(HTML('<error>Failed to send SMS.</error>'))
|
|
|
|
def do_sms_list(self, args):
|
|
"""List SMS. Usage: sms_list [ALL|REC UNREAD]"""
|
|
for msg in self.module.list_sms(args[0] if args else "ALL"): print(msg)
|
|
|
|
def do_radio(self, args):
|
|
"""Toggle radio. Usage: radio <on|off>"""
|
|
if not args: print(f"Radio is {'ON' if self.module.is_radio_on() else 'OFF'}"); return
|
|
self.module.radio_on() if args[0].lower() == 'on' else self.module.radio_off()
|
|
|
|
def do_reboot(self, _):
|
|
"""Reboot the module."""
|
|
self.module.reboot()
|
|
|
|
def do_mode(self, args):
|
|
"""Set connectivity preference. Usage: mode <gsm|emtc|nb|nb+emtc|all>"""
|
|
if not args:
|
|
print("Usage: mode <gsm|emtc|nb|nb+emtc|all>")
|
|
return
|
|
|
|
mode = args[0].lower()
|
|
print_formatted_text(HTML(f'<info>Setting connectivity mode to {mode}...</info>'))
|
|
if hasattr(self.module, 'set_connectivity_mode'):
|
|
if self.module.set_connectivity_mode(mode):
|
|
print_formatted_text(HTML('<success>Mode set successfully.</success>'))
|
|
else:
|
|
print_formatted_text(HTML('<error>Failed to set mode.</error>'))
|
|
else:
|
|
print("Mode configuration not supported on this module.")
|
|
|
|
def do_psm(self, args):
|
|
"""Control PSM. Usage: psm <status|on|off|set|opt>"""
|
|
sub = args[0].lower() if args else 'status'
|
|
if sub == 'status': print(f" PSM: {self.module.read_psm_status() or 'Unknown'}")
|
|
elif sub == 'off': self.module.disable_psm()
|
|
elif sub == 'on' and len(args) >= 3: self.module.configure_psm_auto(int(args[1]), int(args[2]))
|
|
elif sub == 'opt': self.module.configure_psm_optimization()
|
|
|
|
def do_edrx(self, args):
|
|
"""Configure eDRX. Usage: edrx <mode> <act> <bits>"""
|
|
if len(args) >= 3 and self.module.set_edrx(int(args[0]), int(args[1]), args[2]):
|
|
print_formatted_text(HTML('<success>Done.</success>'))
|
|
|
|
def do_bands(self, args):
|
|
"""Show or lock frequency bands. Usage: bands [gsm|catm|nb] [add|remove] <band_num>"""
|
|
from m2m.utils import hex_mask_to_bands, bands_to_hex_mask
|
|
|
|
if not hasattr(self.module, 'get_locked_bands'):
|
|
print("Band control not supported."); return
|
|
|
|
current = self.module.get_locked_bands()
|
|
# Map to actual lists of integers
|
|
band_lists = {k: hex_mask_to_bands(v) for k, v in current.items()}
|
|
|
|
if not args:
|
|
print_formatted_text(HTML('<info>Currently Active Bands:</info>'))
|
|
for rat, blist in band_lists.items():
|
|
print(f" {rat.upper():<5}: {blist}")
|
|
return
|
|
|
|
if len(args) < 3:
|
|
print("Usage: bands <gsm|catm|nb> <add|remove> <band_number>"); return
|
|
|
|
rat, action, bnum = args[0].lower(), args[1].lower(), int(args[2])
|
|
if rat not in band_lists:
|
|
print(f"Invalid RAT: {rat}. Use gsm, catm, or nb."); return
|
|
|
|
if action == 'add':
|
|
if bnum not in band_lists[rat]: band_lists[rat].append(bnum)
|
|
elif action == 'remove':
|
|
if bnum in band_lists[rat]: band_lists[rat].remove(bnum)
|
|
else:
|
|
print(f"Invalid action: {action}. Use add or remove."); return
|
|
|
|
# Apply changes
|
|
print_formatted_text(HTML(f'<info>Updating {rat.upper()} bands...</info>'))
|
|
if self.module.lock_bands(
|
|
gsm_bands=band_lists.get('gsm', []),
|
|
catm_bands=band_lists.get('catm', []),
|
|
nb_bands=band_lists.get('nb', [])
|
|
):
|
|
print_formatted_text(HTML('<success>Bands updated successfully.</success>'))
|
|
else:
|
|
print_formatted_text(HTML('<error>Failed to update bands.</error>'))
|
|
|
|
def do_gpio(self, args):
|
|
"""Control GPIO. Usage: gpio set <pin> <val> | get <pin>"""
|
|
if len(args) >= 2:
|
|
if args[0] == 'set' and len(args) >= 3: self.module.set_gpio(3, int(args[1]), int(args[2]))
|
|
elif args[0] == 'get': print(f"GPIO {args[1]}: {self.module.get_gpio(int(args[1]))}")
|
|
|
|
def do_ri(self, args):
|
|
"""Configure Ring Indicator. Usage: ri <type> [duration]"""
|
|
if len(args) >= 1:
|
|
dur = int(args[1]) if len(args) > 1 else 120
|
|
if hasattr(self.module, 'configure_ri_pin') and self.module.configure_ri_pin(args[0], dur):
|
|
print_formatted_text(HTML('<success>Done.</success>'))
|
|
|
|
def do_urc_delay(self, args):
|
|
"""Configure URC delay. Usage: urc_delay <on|off>"""
|
|
if args and hasattr(self.module, 'configure_urc_delay'):
|
|
if self.module.configure_urc_delay(args[0].lower() == 'on'):
|
|
print_formatted_text(HTML('<success>Done.</success>'))
|
|
|
|
def do_at(self, args):
|
|
"""Send raw AT command."""
|
|
if args:
|
|
cmd = " ".join(args)
|
|
if not cmd.upper().startswith('AT'): cmd = f'AT{cmd}'
|
|
print(self.module.send_at_command(cmd).decode(errors='ignore'))
|
|
|
|
def do_error(self, _):
|
|
"""Show extended error report (AT+CEER)."""
|
|
print(f" Report: {self.module.get_extended_error_report()}")
|
|
|
|
def do_ussd(self, args):
|
|
"""Send USSD code. Usage: ussd <code>"""
|
|
if args: print(f" Response: {self.module.send_ussd(args[0])}")
|
|
|
|
def do_clock(self, args):
|
|
"""Show or sync clock. Usage: clock [show|sync]"""
|
|
sub = args[0].lower() if args else 'show'
|
|
if sub == 'show': print(f" Clock: {self.module.get_clock()}")
|
|
elif sub == 'sync':
|
|
if self.module.ntp_sync(): print(f" New Clock: {self.module.get_clock()}")
|
|
else: print_formatted_text(HTML('<error>Sync failed.</error>'))
|
|
|
|
def do_temp(self, _):
|
|
"""Read module temperature."""
|
|
if hasattr(self.module, 'get_temperature'):
|
|
print(f" Temp: {self.module.get_temperature()} C")
|
|
|
|
def do_ls(self, args):
|
|
"""List files. Usage: ls [pattern]"""
|
|
files = self.module.list_files(args[0] if args else "*")
|
|
print(f"{'Filename':<30} | {'Size':>10}")
|
|
for f in files: print(f"{f['name']:<30} | {f['size']:>10,}")
|
|
|
|
def do_power_off(self, _):
|
|
"""Power off."""
|
|
self.module.power_off()
|
|
|
|
def do_exit(self, _):
|
|
"""Exit shell."""
|
|
sys.exit(0)
|