91 lines
3.4 KiB
Python
91 lines
3.4 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import logging
|
|
from typing import Dict, List, Callable, Optional, Union, Any
|
|
|
|
logger = logging.getLogger('m2m.urc')
|
|
|
|
class UrcHandler:
|
|
"""
|
|
Manages Unsolicited Result Codes (URC).
|
|
Registered URCs are tracked in an internal registry with optional callbacks.
|
|
"""
|
|
|
|
def __init__(self):
|
|
logger.debug("Initialized URC registry")
|
|
# Registry structure: { prefix: {'values': [], 'callbacks': [], 'store': bool} }
|
|
self.__urc_registry: Dict[bytes, Dict[str, Any]] = {}
|
|
|
|
def register_urc(self, urc: Union[bytes, str], store_values: bool = True,
|
|
callbacks: Optional[Union[Callable[[bytes], None], List[Callable[[bytes], None]]]] = None) -> None:
|
|
"""Registers a URC prefix to the handler."""
|
|
if isinstance(urc, str):
|
|
urc = urc.encode('ascii')
|
|
|
|
if callbacks is None:
|
|
cb_list = []
|
|
elif isinstance(callbacks, list):
|
|
cb_list = callbacks
|
|
else:
|
|
cb_list = [callbacks]
|
|
|
|
if urc in self.__urc_registry:
|
|
logger.error(f"Duplicate URC: {urc!r}")
|
|
raise KeyError(f"URC {urc!r} already registered.")
|
|
|
|
self.__urc_registry[urc] = {
|
|
'values': [],
|
|
'callbacks': cb_list,
|
|
'store': store_values
|
|
}
|
|
logger.debug(f"Registered URC: {urc!r}")
|
|
|
|
def unregister_urc(self, urc: bytes) -> None:
|
|
"""Removes the URC from the registry."""
|
|
if urc in self.__urc_registry:
|
|
del self.__urc_registry[urc]
|
|
else:
|
|
logger.warning(f"Attempted to unregister unknown URC: {urc!r}")
|
|
|
|
def pop_urc_value(self, urc: bytes) -> Optional[bytes]:
|
|
"""Retrieves and removes the oldest stored value for a URC."""
|
|
if item := self.__urc_registry.get(urc):
|
|
values = item['values']
|
|
return values.pop(0) if values else None
|
|
return None
|
|
|
|
def get_urc_values(self, urc: bytes) -> List[bytes]:
|
|
"""Get all stored values for a URC."""
|
|
return self.__urc_registry.get(urc, {}).get('values', [])
|
|
|
|
def clear_urc_values(self, urc: bytes) -> None:
|
|
"""Clears stored values for a URC."""
|
|
if item := self.__urc_registry.get(urc):
|
|
item['values'].clear()
|
|
|
|
def check_urc(self, msg: bytes) -> bool:
|
|
"""Checks if msg starts with any registered URC and fires callbacks."""
|
|
# We still iterate because prefixes can be varying lengths
|
|
# and we don't have a fixed separator for all possible URCs.
|
|
for urc, entry in self.__urc_registry.items():
|
|
if msg.startswith(urc):
|
|
payload = msg[len(urc):].strip()
|
|
logger.debug(f"URC Match: {urc!r} -> {payload!r}")
|
|
|
|
if entry['store']:
|
|
entry['values'].append(payload)
|
|
|
|
for cb in entry['callbacks']:
|
|
try: cb(payload)
|
|
except Exception as e:
|
|
logger.error(f"URC Callback Error ({urc!r}): {e}")
|
|
return True
|
|
return False
|
|
|
|
def add_callback(self, urc: bytes, callback: Callable[[bytes], None]) -> None:
|
|
"""Adds a callback to an existing URC registration."""
|
|
if item := self.__urc_registry.get(urc):
|
|
item['callbacks'].append(callback)
|
|
else:
|
|
raise KeyError(f"URC {urc!r} not registered.")
|