m2m-python/m2m/serial/urc_handler.py
2026-02-19 08:14:53 +01:00

91 lines
3.4 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import logging
from typing import Dict, List, Callable, Optional, Union, Any
logger = logging.getLogger('m2m.urc')
class UrcHandler:
"""
Manages Unsolicited Result Codes (URC).
Registered URCs are tracked in an internal registry with optional callbacks.
"""
def __init__(self):
logger.debug("Initialized URC registry")
# Registry structure: { prefix: {'values': [], 'callbacks': [], 'store': bool} }
self.__urc_registry: Dict[bytes, Dict[str, Any]] = {}
def register_urc(self, urc: Union[bytes, str], store_values: bool = True,
callbacks: Optional[Union[Callable[[bytes], None], List[Callable[[bytes], None]]]] = None) -> None:
"""Registers a URC prefix to the handler."""
if isinstance(urc, str):
urc = urc.encode('ascii')
if callbacks is None:
cb_list = []
elif isinstance(callbacks, list):
cb_list = callbacks
else:
cb_list = [callbacks]
if urc in self.__urc_registry:
logger.error(f"Duplicate URC: {urc!r}")
raise KeyError(f"URC {urc!r} already registered.")
self.__urc_registry[urc] = {
'values': [],
'callbacks': cb_list,
'store': store_values
}
logger.debug(f"Registered URC: {urc!r}")
def unregister_urc(self, urc: bytes) -> None:
"""Removes the URC from the registry."""
if urc in self.__urc_registry:
del self.__urc_registry[urc]
else:
logger.warning(f"Attempted to unregister unknown URC: {urc!r}")
def pop_urc_value(self, urc: bytes) -> Optional[bytes]:
"""Retrieves and removes the oldest stored value for a URC."""
if item := self.__urc_registry.get(urc):
values = item['values']
return values.pop(0) if values else None
return None
def get_urc_values(self, urc: bytes) -> List[bytes]:
"""Get all stored values for a URC."""
return self.__urc_registry.get(urc, {}).get('values', [])
def clear_urc_values(self, urc: bytes) -> None:
"""Clears stored values for a URC."""
if item := self.__urc_registry.get(urc):
item['values'].clear()
def check_urc(self, msg: bytes) -> bool:
"""Checks if msg starts with any registered URC and fires callbacks."""
# We still iterate because prefixes can be varying lengths
# and we don't have a fixed separator for all possible URCs.
for urc, entry in self.__urc_registry.items():
if msg.startswith(urc):
payload = msg[len(urc):].strip()
logger.debug(f"URC Match: {urc!r} -> {payload!r}")
if entry['store']:
entry['values'].append(payload)
for cb in entry['callbacks']:
try: cb(payload)
except Exception as e:
logger.error(f"URC Callback Error ({urc!r}): {e}")
return True
return False
def add_callback(self, urc: bytes, callback: Callable[[bytes], None]) -> None:
"""Adds a callback to an existing URC registration."""
if item := self.__urc_registry.get(urc):
item['callbacks'].append(callback)
else:
raise KeyError(f"URC {urc!r} not registered.")