Source code for system.hardware.tici.esim

#!/usr/bin/env python3
import os
import math
import time
import binascii
import requests
import serial
import subprocess


[docs] def post(url, payload): print() print("POST to", url) r = requests.post( url, data=payload, verify=False, headers={ "Content-Type": "application/json", "X-Admin-Protocol": "gsma/rsp/v2.2.0", "charset": "utf-8", "User-Agent": "gsma-rsp-lpad", }, ) print("resp", r) print("resp text", repr(r.text)) print() r.raise_for_status() ret = f"HTTP/1.1 {r.status_code}" ret += ''.join(f"{k}: {v}" for k, v in r.headers.items() if k != 'Connection') return ret.encode() + r.content
[docs] class LPA: def __init__(self): self.dev = serial.Serial('/dev/ttyUSB2', baudrate=57600, timeout=1, bytesize=8) self.dev.reset_input_buffer() self.dev.reset_output_buffer() assert "OK" in self.at("AT")
[docs] def at(self, cmd): print(f"==> {cmd}") self.dev.write(cmd.encode() + b'\r\n') r = b"" cnt = 0 while b"OK" not in r and b"ERROR" not in r and cnt < 20: r += self.dev.read(8192).strip() cnt += 1 r = r.decode() print(f"<== {repr(r)}") return r
[docs] def download_ota(self, qr): return self.at(f'AT+QESIM="ota","{qr}"')
[docs] def download(self, qr): smdp = qr.split('$')[1] out = self.at(f'AT+QESIM="download","{qr}"') for _ in range(5): line = out.split("+QESIM: ")[1].split("\r\n\r\nOK")[0] parts = [x.strip().strip('"') for x in line.split(',', maxsplit=4)] print(repr(parts)) trans, ret, url, payloadlen, payload = parts assert trans == "trans" and ret == "0" assert len(payload) == int(payloadlen) r = post(f"https://{smdp}/{url}", payload) to_send = binascii.hexlify(r).decode() chunk_len = 1400 for i in range(math.ceil(len(to_send) / chunk_len)): state = 1 if (i+1)*chunk_len < len(to_send) else 0 data = to_send[i * chunk_len : (i+1)*chunk_len] out = self.at(f'AT+QESIM="trans",{len(to_send)},{state},{i},{len(data)},"{data}"') assert "OK" in out if '+QESIM:"download",1' in out: raise Exception("profile install failed") elif '+QESIM:"download",0' in out: print("done, successfully loaded") break
[docs] def enable(self, iccid): self.at(f'AT+QESIM="enable","{iccid}"')
[docs] def disable(self, iccid): self.at(f'AT+QESIM="disable","{iccid}"')
[docs] def delete(self, iccid): self.at(f'AT+QESIM="delete","{iccid}"')
[docs] def list_profiles(self): out = self.at('AT+QESIM="list"') return out.strip().splitlines()[1:]
if __name__ == "__main__": import sys if "RESTART" in os.environ: subprocess.check_call("sudo systemctl stop ModemManager", shell=True) subprocess.check_call("/usr/comma/lte/lte.sh stop_blocking", shell=True) subprocess.check_call("/usr/comma/lte/lte.sh start", shell=True) while not os.path.exists('/dev/ttyUSB2'): time.sleep(1) time.sleep(3) lpa = LPA() print(lpa.list_profiles()) if len(sys.argv) > 1: lpa.download(sys.argv[1]) print(lpa.list_profiles())