m2m-python/m2m/nbiot/module_hisi.py
2026-02-19 08:14:53 +01:00

72 lines
2.8 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import logging
import binascii
from typing import Optional, List
from m2m.nbiot.module import ModuleBase
logger = logging.getLogger('m2m.hisi')
class ModuleHiSi(ModuleBase):
"""
Implementation for HiSilicon Boudica based modules (e.g. BC95).
"""
def __init__(self, serial_port: str, baudrate: int = 9600, **kwargs):
super().__init__(serial_port, baudrate, **kwargs)
self._setup()
def _setup(self):
self.reboot()
def reboot(self) -> bool:
"""Performs a software reboot via AT+NRB."""
with self.transaction:
self.s_port.send_cmd(b'AT+NRB')
return self.at.read_ok(timeout=15)
def set_nconfig(self, key: str, value: str) -> bool:
"""Sets configuration via AT+NCONFIG."""
return self.send_at_command(f'AT+NCONFIG="{key}","{value}"') == b'OK'
def configure_pdp_context(self, context_id: int, context_type: str, apn: str) -> bool:
"""Configures PDP context using AT+CGDCONT."""
return self.send_at_command(f'AT+CGDCONT={context_id},"{context_type}","{apn}"') == b'OK'
def activate_pdp_context(self, context_id: int) -> bool:
"""Activates PDP context via AT+CGACT."""
if self.send_at_command(f'AT+CGACT=1,{context_id}') == b'OK':
return True
return bool(self.get_ip_address(context_id))
def open_socket(self, local_port: int = 0, protocol: str = "UDP") -> int:
"""Opens a socket via AT+NSOCR."""
proto_val = 17 if protocol.upper() == "UDP" else 6
resp = self.send_at_command(f'AT+NSOCR=DGRAM,{proto_val},{local_port}')
for line in resp.splitlines():
line = line.strip()
if line.isdigit(): return int(line)
return -1
def send_data_hex(self, socket_id: int, data: bytes, remote_ip: str = "", remote_port: int = 0) -> bool:
"""Sends data (UDP) via AT+NSOST."""
if not remote_ip or not remote_port: return False
hex_data = binascii.hexlify(data).decode('ascii')
cmd = f'AT+NSOST={socket_id},{remote_ip},{remote_port},{len(data)},{hex_data}'
return self.send_at_command(cmd) == b'OK'
def close_socket(self, socket_id: int) -> bool:
"""Closes socket via AT+NSOCL."""
return self.send_at_command(f'AT+NSOCL={socket_id}') == b'OK'
def receive_data(self, socket_id: int, length: int = 512) -> bytes:
"""Reads data via AT+NSORF."""
resp = self.send_at_command(f'AT+NSORF={socket_id},{length}')
for line in resp.splitlines():
if b',' in line:
p = line.split(b',')
if len(p) >= 5 and p[0].strip() == str(socket_id).encode():
try: return binascii.unhexlify(p[4])
except: pass
return b''