m2m-python/examples/06_aws_iot_secure_lifecycle.py
2026-02-19 08:14:53 +01:00

50 lines
1.7 KiB
Python

#!/usr/bin/env python3
import logging
import argparse
from m2m.nbiot import factory
def setup_aws_iot(port):
module = factory('BG95', port)
# Configuration
AWS_ENDPOINT = "a3xxxxxxxxxxxx-ats.iot.us-east-1.amazonaws.com"
CLIENT_ID = "Sensor_Node_01"
with module:
module.radio_on()
module.activate_pdp_context(1)
# 1. Provision Certificates to Module RAM
print("Uploading Security Assets...")
module.upload_file("aws_ca.crt", b"CERT_DATA_HERE...")
module.upload_file("client.crt", b"CERT_DATA_HERE...")
module.upload_file("private.key", b"KEY_DATA_HERE...")
# 2. Map Assets to SSL Context 1
print("Configuring SSL Engine...")
module.ssl_configure(
ssl_ctx_id=1,
cacert="aws_ca.crt",
clientcert="client.crt",
clientkey="private.key"
)
# 3. Configure MQTT for SSL
module.set_qcfg("mqtt/ssl", 1)
module.mqtt_configure(client_idx=0, keepalive=60)
# 4. Connect and Publish
if module.mqtt_open(0, AWS_ENDPOINT, 8883) == 0:
if module.mqtt_connect(0, CLIENT_ID) == 0:
print("AWS IoT Connected!")
module.mqtt_publish(0, 1, 1, 0, "dt/sensor/data", '{"status": "ok", "uptime": 1200}')
# Stay connected to receive shadows/commands
module.mqtt_subscribe(0, 2, "cmd/sensor/01", 1)
module.poll(30)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run AWS IoT lifecycle demo.")
parser.add_argument("port", nargs="?", default="/dev/ttyUSB0", help="Serial port (e.g. /dev/ttyUSB0 or MOCK)")
args = parser.parse_args()
setup_aws_iot(args.port)