Run automatic modem diagnostics
This script runs the annotated modem diagnostics automatically and saves results to a text file.Download the scripts
Download
modem_diagnostics.py and modem_interface.py to the same directory.Run the script
python3 modem_diagnostics.py --port /dev/ttyUSB2 --baud 115200
Note:
--port and --baud are optional. If omitted, the script will prompt for available serial ports and default to 115200 baud.ModemManager variant
If you use ModemManager, run the alternative helper script:python3 mmcli-diagnostics.py
Python scripts
mmcli-diagnostics.py
#!/usr/bin/env python3
###
### 2023-12-06 | Authors: Leonardo Gurria
### The project is the automated process of running modem diagnostics on Linux using ModemManager
### License: hologram.io
###
import subprocess
import logging
import time
import json
# Sanity checks for ModemManager
def check_mmcli(log):
# Check if mmcli is installed (should be installed by default)
log.debug('Checking if mmcli is installed')
installed = subprocess.getoutput('which mmcli')
# Check if mmcli is running (should be running on boot)
log.debug('Checking if mmcli is active and running')
running = subprocess.getoutput('sudo systemctl status ModemManager')
if len(installed) == 0 or 'active (running)' not in running:
return False
return True
# Sanity checks for debug mode
def check_debug_mode(log):
# Check for service file
log.debug('Checking if ModemManager.service file exists')
service = subprocess.getoutput('ls -lrt /lib/systemd/system/ModemManager.service')
if len(service) == 0:
log.warning('File /lib/systemd/system/ModemManager.service does not exist')
return False
# Check for debug flag
log.debug('Checking if debug flag is present')
flag = subprocess.getoutput('grep "debug" /lib/systemd/system/ModemManager.service')
if len(flag) == 0:
log.debug('Debug flag is not present')
return False
return True
# Ask user how to proceed
def user_selection(log):
# Print user menu
print('\nDebug mode is not enabled on ModemManager')
print('How would you like to proceed?')
print(' 0 - Basic Diagnostics')
print(' 1 - Show me how to enable it')
user_input = int(input('Your selection: '))
while user_input != 0 and user_input != 1:
log.error('Please enter a valid option')
user_input = int(input('Your selection: '))
if user_input == 0:
log.info('Proceeding with basic diagnostics! Please note this might take around 5 mins')
# Do basic diagnostics
b, p = basic_diagnostics(log)
del p
log.info(json.dumps(b, indent=4))
else:
log.debug('Showing instructions to enable debug mode')
# Show instructions
print('\nStep 1) Add --debug at the end of line: ExecStart=/usr/sbin/ModemManager in /lib/systemd/system/ModemManager.service file')
print(' Line should look like this: ExecStart=/usr/sbin/ModemManager --debug')
print('Step 2) Restart your RPi/Linux machine and run this script again!')
exit(0)
# Basic diagnostics
def basic_diagnostics(log):
# Check that modem is plugged in
log.debug('Checking if modem is plugged in')
plugged = subprocess.getoutput('mmcli -L')
if 'No modems were found' in plugged:
log.error('No modem is plugged in')
exit(0)
plugged = subprocess.getoutput('mmcli -L -J')
plugged = json.loads(plugged)
plugged = plugged['modem-list'][0]
# Check for modem info
log.debug('Checking for modem information in slot ' + plugged)
modem = subprocess.getoutput('mmcli -m ' + plugged)
if "error: couldn't find modem" in modem:
log.error('Unable to check modem information')
exit(0)
log.debug(modem)
# Get information on JSON format
modem = subprocess.getoutput('mmcli -m ' + plugged + ' -J')
log.debug('Modem JSON file')
log.debug(modem)
m = json.loads(modem)
s = m['modem']['generic']['sim']
b = m['modem']['generic']['bearers'][0] if len(m['modem']['generic']['bearers']) > 0 else 'No bearer'
# Check that SIM is present
log.debug('Checking for SIM in slot ' + s)
sim = subprocess.getoutput('mmcli -i ' + s)
if "error: couldn't find SIM" in sim:
log.error('SIM is missing')
exit(0)
log.debug(sim)
# Get information on JSON format
sim = subprocess.getoutput('mmcli -i ' + s + ' -J')
log.debug('SIM JSON file')
log.debug(sim)
# Check for bearer
log.debug('Checking for connection in slot ' + b)
bearer = subprocess.getoutput('mmcli -b ' + b)
if "error: couldn't find bearer" in bearer or b == 'No bearer':
log.warning('Connection is not in use')
bearer = 'No bearer'
else:
log.debug(bearer)
# Get information on JSON format
bearer = subprocess.getoutput('mmcli -b ' + b + ' -J')
log.debug('Bearer JSON file')
log.debug(bearer)
# Parse JSON information
return parse_basic_diagnostics(log, modem, sim, bearer, plugged)
# Parse basic diagnsotics from JSON format
def parse_basic_diagnostics(log, modem, sim, bearer, plugged):
log.debug('Basic Diagnostics')
m = json.loads(modem)
s = json.loads(sim)
if bearer != 'No bearer':
bea = json.loads(bearer)
b = bea['bearer']['properties']['apn']
else:
b = bearer
# Networks
networks = subprocess.getoutput('sudo mmcli -m ' + plugged + ' --3gpp-scan --timeout=300 -J')
if 'error' not in networks:
net = json.loads(networks)
n = net['modem']['3gpp']['scan-networks']
else:
n = networks
log.warning(n)
# Extended signal
ext_signal = subprocess.getoutput('mmcli -m ' + plugged + ' --signal-get')
ext_signal = ext_signal.replace('error rate threshold', '')
if 'error' not in ext_signal:
# Setup extended signal
ext_signal = subprocess.getoutput('sudo mmcli -m ' + plugged + ' --signal-setup=10')
# Wait 10 + 1 secs
time.sleep(11)
# Get actual extended signal
ext_signal = subprocess.getoutput('mmcli -m ' + plugged + ' --signal-get -J')
es = json.loads(ext_signal)
if es['modem']['signal']['lte']['rssi'] == '--':
log.debug('Fetching all RAT extended signal')
sig = es['modem']['signal']
else:
log.debug('Fetching LTE extended signal')
sig = es['modem']['signal']['lte']
else:
sig = ext_signal
log.warning(f'Unable to get extended signal due to: {ext_signal}')
basic_diagnostics = {
'module': m['modem']['generic']['model'],
'manufacturer': m['modem']['generic']['manufacturer'],
'firmware': m['modem']['generic']['revision'],
'iccid': s['sim']['properties']['iccid'],
'signal': m['modem']['generic']['signal-quality'],
'extended_signal': sig,
'registration_status': m['modem']['generic']['state'] + ' | ' + m['modem']['3gpp']['registration-state'],
'apn': b,
'current_operator': m['modem']['3gpp']['operator-code'] + ' | ' + m['modem']['3gpp']['operator-name'],
'networks_in_reach': n,
'cpin': '',
'fplmn': '',
'csq': '',
'cgact': '',
'creg': '',
'cgreg': '',
'cereg': '',
'cgdcont': ''
}
log.debug(json.dumps(basic_diagnostics, indent=4))
return basic_diagnostics, plugged
# Full diagnostics
def full_diagnostics(log):
# Run basic diagnsotics first
full_diagnostics, plugged = basic_diagnostics(log)
log.debug('Full diagnostics')
# Run remaining commands for full diagnostics
cpin = subprocess.getoutput("sudo mmcli -m " + plugged + " --command='+cpin?'")
fplmn = subprocess.getoutput("sudo mmcli -m " + plugged + " --command='+crsm=176,28539,0,0,12'")
csq = subprocess.getoutput("sudo mmcli -m " + plugged + " --command='+csq'")
cgact = subprocess.getoutput("sudo mmcli -m " + plugged + " --command='+cgact?'")
cgdcont = subprocess.getoutput("sudo mmcli -m " + plugged + " --command='+cgdcont?'")
# These need to be checked in /var/log/syslog for their output
creg = subprocess.getoutput("sudo mmcli -m " + plugged + " --command='+creg?'")
# creg_out = subprocess.getoutput('cat /var/log/syslog | grep -i "+creg:" | tail -1')
creg_out = subprocess.getoutput('journalctl -u ModemManager | grep -i "+creg:" | tail -1')
cgreg = subprocess.getoutput("sudo mmcli -m " + plugged + " --command='+cgreg?'")
# cgreg_out = subprocess.getoutput('cat /var/log/syslog | grep -i "+cgreg:" | tail -1')
cgreg_out = subprocess.getoutput('journalctl -u ModemManager | grep -i "+cgreg:" | tail -1')
cereg = subprocess.getoutput("sudo mmcli -m " + plugged + " --command='+cereg?'")
# cereg_out = subprocess.getoutput('cat /var/log/syslog | grep -i "+cereg:" | tail -1')
cereg_out = subprocess.getoutput('journalctl -u ModemManager | grep -i "+cereg:" | tail -1')
# Sanity checks for outputs
if 'error' in cpin or cpin == "response: ''":
log.warning(f'Unable to run +cpin command due to: {cpin}')
else:
full_diagnostics['cpin'] = cpin
if 'error' in fplmn or fplmn == "response: ''":
log.warning(f'Unable to run +crsm command due to: {fplmn}')
else:
full_diagnostics['fplmn'] = fplmn
if 'error' in csq or csq == "response: ''":
log.warning(f'Unable to run +csq command due to: {csq}')
else:
full_diagnostics['csq'] = csq
if 'error' in cgact or cgact == "response: ''":
log.warning(f'Unable to run +cgact command due to: {cgact}')
else:
full_diagnostics['cgact'] = cgact
if 'error' in cgdcont or cgdcont == "response: ''":
log.warning(f'Unable to run +cgdcont command due to: {cgdcont}')
else:
full_diagnostics['cgdcont'] = cgdcont
if 'error' in creg or len(creg_out) == 0:
log.warning(f'Unable to run +creg command due to: {creg}')
else:
find = creg_out.find('<-- ')
parse = creg_out[find:]
full_diagnostics['creg'] = parse.replace('<-- ', '').replace("'", '').replace('<CR><LF>', '').replace('OK', '')
if 'error' in cgreg or len(cgreg_out) == 0:
log.warning(f'Unable to run +cgreg command due to: {cgreg}')
else:
find = cgreg_out.find('<-- ')
parse = cgreg_out[find:]
full_diagnostics['cgreg'] = parse.replace('<-- ', '').replace("'", '').replace('<CR><LF>', '').replace('OK', '')
if 'error' in cereg or len(cereg_out) == 0:
log.warning(f'Unable to run +cereg command due to: {cereg}')
else:
find = cereg_out.find('<-- ')
parse = cereg_out[find:]
full_diagnostics['cereg'] = parse.replace('<-- ', '').replace("'", '').replace('<CR><LF>', '').replace('OK', '')
log.info(json.dumps(full_diagnostics, indent=4))
# Main
def main():
# Measure execution time
start_time = time.time()
# Log info
log = logging.getLogger('mmcli-diagnostics')
logging.basicConfig(level=logging.DEBUG, filename='mmcli-diagnostics.log', filemode='w', format='%(asctime)s | %(name)s | %(levelname)s | %(message)s')
# Console
con = logging.StreamHandler()
con.setLevel(logging.INFO)
con.setFormatter(logging.Formatter('%(asctime)s | %(name)s | %(levelname)s | %(message)s'))
log.addHandler(con)
log.info('Starting script!')
# Run sanity checks for ModemManager
mmcli_ready = check_mmcli(log)
if mmcli_ready is False:
log.error('Your system does not have ModemManager ready for diagnostics')
exit(0)
# Run sanity checks for debug mode
debug_mode_ready = check_debug_mode(log)
if debug_mode_ready is False:
log.debug('Debug mode is not enabled')
# Check for user selection
user_selection(log)
else:
log.info('Debug flag is present. Proceeding with full diagnostics! Please note this might take around 5 mins')
full_diagnostics(log)
log.info('Script completed!')
# Measure execution time
end_time = time.time()
log.info('Execution duration: ' + str(end_time - start_time) + ' secs')
# Python stuff
if __name__ == '__main__':
main()
modem_interface.py
#!/usr/bin/env python3
#
# This code provide a robust serial interface to a modem.
# Cory Dixon, Hologram
#
#from tkinter import W
import serial
import time
import sys
import glob
import os
import re
# helper function to perform sort
def atoi(text):
return int(text) if text.isdigit() else text
def natural_keys(text):
'''
alist.sort(key=natural_keys) sorts in human order
http://nedbatchelder.com/blog/200712/human_sorting.html
(See Toothy's implementation in the comments)
'''
return [ atoi(c) for c in re.split(r'(\d+)', text) ]
""" Lists serial port names
:raises EnvironmentError:
On unsupported or unknown platforms
:returns:
A list of the serial ports available on the system
"""
def serial_ports():
# Windows OS
if sys.platform.startswith('win'):
ports = ['COM%s' % (i + 1) for i in range(256)]
# Linux OS
elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'):
# this excludes your current terminal "/dev/tty"
ports = glob.glob('/dev/tty[A-Za-z]*')
# Mac OS
elif sys.platform.startswith('darwin'):
ports = glob.glob('/dev/tty.*')
# Some other OS
else:
raise EnvironmentError('Unsupported platform')
# Mainly for MacOS
user = os.environ.get('USER').lower()
if user:
user = user.lower()
# Store port names
result = []
for port in ports:
if user in port.lower(): continue
elif 'bluetooth' in port.lower(): continue
try:
s = serial.Serial(port)
s.close()
result.append(port)
except (OSError, serial.SerialException):
pass
result.sort(key=natural_keys)
return result
""" Modem AT Command Manager Class """
class ATManager:
def __init__(self, serial_timeout=5, msg_timeout=1, print_debug = False):
self.print_debug = print_debug
self.ser = None
self.init_time = time.time()
self.sent_cmd = None
self.last_rx = time.time()
self.serial_timeout = serial_timeout
self.msg_timeout = msg_timeout
self.msg_timeout_def = msg_timeout
self.send_at = None
self.last_sms = [] # message from last urc code
self.have_sms = False
self.ping_count = 0
self.have_ping = False
self.last_ping = 0
# The default message timeout is 1 second. However, some commands like the AT+COPS=?
# can take much longer. You should cal set_msg_timeout(big_number) for those commands
# and then it is advised to set the timeout back to 1 set_msg_timeout(1)
def set_msg_timeout(self, val):
if val is not None:
self.msg_timeout = val
self.msg_timeout_def = val
# Assign an AT command to the command manager
def set_cmd(self, val):
self.send_at = val
#print("setting cmd {}".format(self.send_at))
# Open serial port
def open(self, port, baud):
self.ser = serial.Serial(port, baudrate = baud, timeout=self.serial_timeout)
#self.ser.flushInput()
self.ser.reset_input_buffer()
self.ser.reset_output_buffer()
self.ser.flush()
self.ser.read_all()
# Close serial port
def close(self):
if self.ser:
self.ser.reset_input_buffer()
self.ser.reset_output_buffer()
self.ser.flush()
self.ser.read_all()
self.ser.close()
self.sent_cmd = None
self.ser = None
# DO NOT CALL THIS DIRECTLY. Use set_cmd
def __write(self,val):
cmd = 'AT' if not (val.startswith('AT') or val.startswith('at')) else ''
dt = (time.time() - self.init_time)
if self.print_debug: print("TX [{:.2f}]: {}".format(dt, cmd+val))
cmd = cmd + val + '\r\n'
cmd = cmd.encode()
self.ser.write(cmd)
self.ser.flush()
self.sent_cmd = time.time()
# Check is serial port is open
def isOpen(self):
return self.ser.isOpen()
# Wait for modem to respond to AT command sent
def wait_for_rx(self, wait_time=None, log_file=None, exit_flag = False):
if wait_time is not None:
self.msg_timeout = wait_time
full_rep = []
while self.sent_cmd or self.send_at and exit_flag != True:
rep = self.update(log_file)
if rep is None:
return full_rep
elif rep == '':
time.sleep(0.01)
else:
full_rep.append(rep)
self.msg_timeout = self.msg_timeout_def
return full_rep
# Update output/log_file
def update(self, log_file=None):
#print("{} {} {} {}".format(time.time(), self.send_at, self.last_rx, self.sent_cmd ))
if not self.ser.isOpen():
if self.print_debug: print("Serial port is not open")
return None
n = self.ser.inWaiting()
dt = (time.time() - self.init_time)
if n > 0:
self.last_rx = time.time()
try:
reply = self.ser.readline()
reply = reply.decode().strip()
except:
return ''
# URC detection
#if reply.startswith('+')
self.parse_urc( reply, log_file)
if self.sent_cmd:
if self.print_debug: print("RX [{:.2f}]: {}".format(dt, reply))
if reply.startswith('OK') or 'ERROR' in reply or 'ABORTED' in reply:
self.sent_cmd = None
else:
if self.print_debug: print("UX [{:.2f}]: {}".format(dt, reply))
#print('[{:.2f}] {}'.format(dt, reply))
if log_file:
log_file.write('[{:.2f}] '.format(dt))
log_file.write(reply)
log_file.write('\r\n')
#print("have reply: {}".format(reply))
return reply
elif self.sent_cmd:
#print("waiting on send")
if (time.time() - self.sent_cmd) >= self.msg_timeout:
if self.print_debug: print("ERROR: timed out waiting for response")
self.sent_cmd = None
return 'TIMEOUT'
# Don't send a new command faster than 200 ms
elif self.send_at and (time.time() - self.last_rx) >= 0.2 and (time.time() - (self.sent_cmd if self.sent_cmd else 0)) >= .2:
#print("next send")
if log_file:
log_file.write('[{:.2f}] '.format(dt))
log_file.write(self.send_at)
log_file.write('\r\n')
self.__write(self.send_at)
self.send_at = None
return ''
# Parse URC values we may care about
def parse_urc(self, rsp, log_file=None):
#print(f'parse_urc {rsp}')
# +UUSORD: 0,17
if '+UUSORD' in rsp:
val = rsp.rsplit(':', 1)[-1]
cmd = 'AT+USORD=' + val
self.__write(cmd)
elif '+CMT' in rsp:
# Send SMS ack
#self.set_cmd('AT+CNMA')
self.have_sms = True
self.last_sms = rsp
elif '+QPING' in rsp:
# Ping ack
# +QPING: 0,"8.8.8.8",32,295,255
# +QPING: 0,1,1,0,295,295,295
val = rsp.rsplit(':', 1)[-1]
ms = val.split(',')
self.ping_count += 1
if len(ms) > 2 and ms[1].isdigit():
count = int(ms[1])
if count + 1 == self.ping_count:
self.have_ping = True
self.last_ping = int(ms[-1])
self.sent_cmd = None
# get sms
def get_sms(self):
if self.have_sms:
self.have_sms = False
sms = self.last_sms
self.last_sms = []
return sms
return None
# get ping
def get_ping(self):
if self.have_ping:
self.have_ping = False
self.ping_count = 0
ping = self.last_ping
self.last_ping = []
return ping
return None
# Get the current carrier
def get_carrier(self, log_file=None):
# send command
self.set_cmd('AT+COPS?')
# wait for response
reply = self.wait_for_rx(log_file=log_file)
# parse response
if reply is not None and len(reply) == 2 and 'OK' in reply[-1]:
vals = reply[0].split(',')
if len(vals) >= 3:
return vals[2]
return ''
# See if the modem will respond to AT command
def connected_state(self, log_file=None):
# send command
self.set_cmd('AT+CEREG?')
# wait for response
reply = self.wait_for_rx(log_file=log_file)
# parse response
if reply is not None and len(reply) == 2 and 'OK' in reply[-1]:
vals = re.findall('[0-9]+', reply[0])
if len(vals) >= 2:
try:
v = int(vals[1])
return v
except:
return 4
return 4
# Ping a server
def ping(self, server="8.8.8.8", num=4, log_file=None):
timeout = 1
self.have_ping = False
self.ping_count = 0
# format command
cmd = f'AT+QPING=1,"{server}",{timeout},{str(num)}'
# send command
self.set_cmd(cmd)
# wait for response
reply = self.wait_for_rx(wait_time=5, log_file=log_file)
if reply is None or reply[-1] != 'OK':
return None
for i in range(20):
self.update()
if self.have_ping:
return self.get_ping()
time.sleep(.1)
return None
modem_diagnostics.py
#!/usr/bin/env python3
import sys
import signal
import argparse
# Fails on Windows unless X-server is used
# If this line is commented, though, the script runs fine + W is not used anywhere
# from tkinter import W
from modem_interface import *
import re
# setup handler for control-c to exit cleanly
def ctrl_handler(signum, frame):
print("Ctrl-c was pressed.")
atm.close()
if log_file:
log_file.write("\nCtrl-c was pressed.\n")
log_file.close()
sys.exit(0)
# Main
def main():
# SERIAL_PORT = "/dev/ttyACM0" # Raspberry Pi 2
# BAUD = 115200
SERIAL_PORT = None
BAUD = None
# get inputs
parser=argparse.ArgumentParser()
parser.add_argument('--port', default='', type=str, help='Serial port to use')
parser.add_argument('--baud', default=115200, type=int, help='Baud rate to use')
args=parser.parse_args()
if args.baud:
BAUD = args.baud
if args.port:
SERIAL_PORT = args.port
else:
sp = serial_ports()
print('Select port:')
for (i,s) in enumerate(sp):
print('{} {}'.format(i,s))
ind = int(input('> '))
SERIAL_PORT = sp[ind]
# Diagnostic Commands
#atCommands = ['ATE0','ATI','AT+CGMI','AT+CGMM','AT+GMM','AT+CMEE=2', 'AT+CREG=2', 'AT+CGREG=2', 'AT+CEREG=3', 'AT+CGEREP=2,1', 'AT+CPIN?','AT+CCID','AT+CRSM=176,28539,0,0,12','AT+CFUN?','AT+CSQ','AT+CESQ','AT+CREG?','AT+CGREG?','AT+CEREG?','AT+CGDCONT?','AT+CGACT?','AT+COPS?','AT+COPS=?']
atCommands = ['ATE0','ATI','AT+CGMI','AT+CGMM','AT+GMM','AT+CMEE=2', 'AT+CREG=2', 'AT+CGREG=2', 'AT+CEREG=3', 'AT+CGEREP=2,1', 'AT+CPIN?','AT+CCID','AT+CRSM=176,28539,0,0,12','AT+CFUN?','AT+CSQ','AT+CESQ','AT+CREG?','AT+CGREG?','AT+CEREG?','AT+CGDCONT?','AT+CGACT?','AT+COPS?']
# Create and open modem interface
atm = ATManager(print_debug=True)
# Save to file
file_name = 'ModemDiagnostics.txt'
log_file = open(file_name, 'w')
signal.signal(signal.SIGINT, ctrl_handler)
print('\n---------------\nConnecting to modem ...')
while True:
try:
atm.open(SERIAL_PORT, BAUD)
break
except:
time.sleep(.1)
pass
while atm.isOpen():
atm.set_cmd('AT')
reply = atm.wait_for_rx(log_file=log_file)
#print(reply)
#print(reply[-1])
#print('OK' in reply)
if reply is None or len(reply) == 0:
time.sleep(1)
elif 'OK' in reply[-1]:
break
# the COPS=? command can take several minutes
atm.set_msg_timeout( 300 )
for cmd in atCommands:
atm.set_cmd(cmd)
atm.wait_for_rx(log_file=log_file)
log_file.write('-----\r\n')
print('-----\r\n')
# How you can parse out ICCID
atm.set_cmd('AT+QCCID')
reply = atm.wait_for_rx(log_file=log_file)
print('----')
if len(reply) >= 2 and reply[1] == 'OK':
x = re.findall('[0-9]+', reply[0])
print(x)
else:
print('Invalid reponse!')
# close interfaces
log_file.close()
atm.close()
print("Good bye!")
# Python stuff
if __name__ == '__main__':
main()