222 lines
8.3 KiB
Python
222 lines
8.3 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
Mock serial port for testing and development without hardware.
|
|
"""
|
|
|
|
import logging
|
|
import time
|
|
from typing import Dict, Optional, Callable, List
|
|
|
|
logger = logging.getLogger('mock_serial')
|
|
|
|
class MockSerial:
|
|
"""
|
|
Simulates a serial port behavior with internal state management.
|
|
"""
|
|
|
|
def __init__(self, port: str = "MOCK", baudrate: int = 9600, timeout: float = 1.0, **kwargs):
|
|
self.port = port
|
|
self.baudrate = baudrate
|
|
self.timeout = timeout
|
|
self.is_open = True
|
|
self._rx_buffer = b""
|
|
self._tx_buffer = b""
|
|
self._raw_mode = False
|
|
|
|
# Internal State
|
|
self.state = {
|
|
"radio_on": False,
|
|
"sim_status": "READY",
|
|
"attached": False,
|
|
"registered": False,
|
|
"pdp_active": False,
|
|
"sockets": {},
|
|
"files": {},
|
|
"mqtt_session": False
|
|
}
|
|
|
|
# Default responses for query/read commands
|
|
self._query_responses: Dict[bytes, bytes] = {
|
|
b'AT': b'OK\r\n',
|
|
b'ATE0': b'OK\r\n',
|
|
b'ATE1': b'OK\r\n',
|
|
b'AT+CGMR': b'Mock_Modem_v2.0\r\nOK\r\n',
|
|
b'AT+CGSN': b'864215030000000\r\nOK\r\n',
|
|
b'AT+CIMI': b'460001234567890\r\nOK\r\n',
|
|
b'AT+CCID': b'+CCID: 89860012345678901234\r\nOK\r\n',
|
|
b'AT+CSQ': b'+CSQ: 25,99\r\nOK\r\n',
|
|
b'AT+QNWINFO': b'+QNWINFO: "FDD LTE","46000","LTE BAND 3",1300\r\nOK\r\n',
|
|
b'AT+CEER': b'+CEER: No Error\r\nOK\r\n',
|
|
b'AT+CBC': b'+CBC: 0,85,3900\r\nOK\r\n',
|
|
b'AT+CCLK?': b'+CCLK: "23/01/01,12:00:00+32"\r\nOK\r\n',
|
|
b'AT+QTEMP': b'+QTEMP: 35\r\nOK\r\n',
|
|
}
|
|
|
|
# Command Handlers
|
|
self._handlers: Dict[bytes, Callable[[bytes], bytes]] = {}
|
|
self._register_default_handlers()
|
|
|
|
def _register_default_handlers(self):
|
|
self.add_handler(b'AT+CFUN=', self._handle_cfun)
|
|
self.add_handler(b'AT+CFUN?', self._handle_cfun_query)
|
|
self.add_handler(b'AT+CPIN?', self._handle_cpin_query)
|
|
self.add_handler(b'AT+CPIN=', self._handle_cpin_set)
|
|
self.add_handler(b'AT+CRSM=', self._handle_crsm)
|
|
self.add_handler(b'AT+CREG?', self._handle_creg)
|
|
self.add_handler(b'AT+CEREG?', self._handle_cereg)
|
|
self.add_handler(b'AT+CGATT=', self._handle_cgatt)
|
|
self.add_handler(b'AT+CGATT?', self._handle_cgatt_query)
|
|
self.add_handler(b'AT+CGPADDR', self._handle_cgpaddr)
|
|
self.add_handler(b'AT+QENG="servingcell"', self._handle_qeng)
|
|
self.add_handler(b'AT+QIACT=', self._handle_qiact)
|
|
self.add_handler(b'AT+QIOPEN=', self._handle_qiopen)
|
|
self.add_handler(b'AT+QICLOSE=', self._handle_qiclose)
|
|
self.add_handler(b'AT+QISENDEX=', self._handle_qisendex)
|
|
self.add_handler(b'AT+QIRD=', self._handle_qird)
|
|
self.add_handler(b'AT+QFUPL=', self._handle_qfupl)
|
|
self.add_handler(b'AT+QFDEL=', self._handle_qfdel)
|
|
self.add_handler(b'AT+QICSGP=', self._handle_qicsgp)
|
|
|
|
def _handle_cfun(self, cmd: bytes) -> bytes:
|
|
val = cmd.split(b'=')[1].strip()
|
|
if val == b'1':
|
|
self.state["radio_on"] = True
|
|
self.state["registered"] = True
|
|
elif val == b'0':
|
|
self.state["radio_on"] = False
|
|
self.state["registered"] = False
|
|
self.state["attached"] = False
|
|
return b'OK\r\n'
|
|
|
|
def _handle_cfun_query(self, cmd: bytes) -> bytes:
|
|
val = 1 if self.state["radio_on"] else 0
|
|
return f'+CFUN: {val}\r\nOK\r\n'.encode()
|
|
|
|
def _handle_cpin_query(self, cmd: bytes) -> bytes:
|
|
return f'+CPIN: {self.state["sim_status"]}\r\nOK\r\n'.encode()
|
|
|
|
def _handle_cpin_set(self, cmd: bytes) -> bytes:
|
|
self.state["sim_status"] = "READY"
|
|
return b'OK\r\n'
|
|
|
|
def _handle_creg(self, cmd: bytes) -> bytes:
|
|
stat = 1 if self.state["registered"] else 0
|
|
return f'+CREG: 0,{stat}\r\nOK\r\n'.encode()
|
|
|
|
def _handle_cereg(self, cmd: bytes) -> bytes:
|
|
stat = 1 if self.state["registered"] else 0
|
|
return f'+CEREG: 0,{stat}\r\nOK\r\n'.encode()
|
|
|
|
def _handle_cgatt(self, cmd: bytes) -> bytes:
|
|
val = cmd.split(b'=')[1].strip()
|
|
self.state["attached"] = (val == b'1')
|
|
return b'OK\r\n'
|
|
|
|
def _handle_cgatt_query(self, cmd: bytes) -> bytes:
|
|
val = 1 if self.state["attached"] else 0
|
|
return f'+CGATT: {val}\r\nOK\r\n'.encode()
|
|
|
|
def _handle_cgpaddr(self, cmd: bytes) -> bytes:
|
|
if self.state["pdp_active"]:
|
|
return b'+CGPADDR: 1,"10.0.0.5"\r\nOK\r\n'
|
|
return b'+CGPADDR: 1,""\r\nOK\r\n'
|
|
|
|
def _handle_qeng(self, cmd: bytes) -> bytes:
|
|
# Realistic LTE response with enough fields for index 13
|
|
# +QENG: "servingcell",<state>,"LTE",<is_tdd>,<mcc>,<mnc>,<cellid>,<pcid>,<earfcn>,<freq_band_ind>,<ul_bandwidth>,<dl_bandwidth>,<tac>,<rsrp>,<rsrq>,...
|
|
return b'+QENG: "servingcell","NOCONN","LTE","FDD",460,00,1A2B3C,123,2500,3,"5M","5M",1234,-105,-15,-70,10,20\r\nOK\r\n'
|
|
|
|
def _handle_qiact(self, cmd: bytes) -> bytes:
|
|
self.state["pdp_active"] = True
|
|
return b'OK\r\n'
|
|
|
|
def _handle_qiopen(self, cmd: bytes) -> bytes:
|
|
parts = cmd.split(b',')
|
|
connect_id = int(parts[1])
|
|
self.state["sockets"][connect_id] = "OPEN"
|
|
self._rx_buffer += f'+QIOPEN: {connect_id},0\r\n'.encode()
|
|
return b'OK\r\n'
|
|
|
|
def _handle_qiclose(self, cmd: bytes) -> bytes:
|
|
return b'OK\r\n'
|
|
|
|
def _handle_qisendex(self, cmd: bytes) -> bytes:
|
|
return b'SEND OK\r\n'
|
|
|
|
def _handle_qird(self, cmd: bytes) -> bytes:
|
|
data = b"Hello from Mock!"
|
|
return f'+QIRD: {len(data)}\r\n'.encode() + data + b'\r\nOK\r\n'
|
|
|
|
def _handle_crsm(self, cmd: bytes) -> bytes:
|
|
if b'28539' in cmd:
|
|
return b'+CRSM: 144,0,"64F01064F020FFFFFFFFFFFF"\r\nOK\r\n'
|
|
return b'+CRSM: 144,0,""\r\nOK\r\n'
|
|
|
|
def _handle_qfupl(self, cmd: bytes) -> bytes:
|
|
self._raw_mode = True
|
|
return b'CONNECT\r\n'
|
|
|
|
def _handle_qfdel(self, cmd: bytes) -> bytes:
|
|
return b'OK\r\n'
|
|
|
|
def _handle_qicsgp(self, cmd: bytes) -> bytes:
|
|
return b'OK\r\n'
|
|
|
|
def add_handler(self, command_prefix: bytes, handler: Callable[[bytes], bytes]):
|
|
self._handlers[command_prefix] = handler
|
|
|
|
def close(self):
|
|
self.is_open = False
|
|
|
|
def write(self, data: bytes):
|
|
if not self.is_open: raise Exception("Port is closed")
|
|
if self._raw_mode:
|
|
self._raw_mode = False
|
|
self._rx_buffer += b'OK\r\n'
|
|
return
|
|
self._tx_buffer += data
|
|
while b'\r' in self._tx_buffer:
|
|
cmd_full, rest = self._tx_buffer.split(b'\r', 1)
|
|
self._tx_buffer = rest
|
|
cmd = cmd_full.strip()
|
|
if not cmd: continue
|
|
response = self._process_command(cmd)
|
|
if response:
|
|
self._rx_buffer += response
|
|
|
|
def _process_command(self, cmd: bytes) -> bytes:
|
|
if cmd in self._query_responses:
|
|
return self._query_responses[cmd]
|
|
for prefix, handler in self._handlers.items():
|
|
if cmd.startswith(prefix):
|
|
return handler(cmd)
|
|
return b'ERROR\r\n'
|
|
|
|
def read(self, size: int = 1) -> bytes:
|
|
if not self.is_open: return b""
|
|
if size > len(self._rx_buffer):
|
|
data = self._rx_buffer
|
|
self._rx_buffer = b""
|
|
return data
|
|
data = self._rx_buffer[:size]
|
|
self._rx_buffer = self._rx_buffer[size:]
|
|
return data
|
|
|
|
def readline(self) -> bytes:
|
|
if not self.is_open: return b""
|
|
if b'\n' in self._rx_buffer:
|
|
line, rest = self._rx_buffer.split(b'\n', 1)
|
|
self._rx_buffer = rest
|
|
return line + b'\n'
|
|
|
|
# If we have data but no newline, returns empty to simulate partial read?
|
|
# Standard readline blocks. Here we just return empty if incomplete.
|
|
if not self._rx_buffer:
|
|
time.sleep(0.001) # Reduced sleep for faster tests
|
|
return b""
|
|
|
|
@property
|
|
def in_waiting(self):
|
|
return len(self._rx_buffer)
|