This post follows on from my previous post about working with embedded systems using Python. I highly recommend starting off entering blocks of code (as in the previous post) into a REPL while you debug things, but once you’ve got that set up, it’s worth making the code reusable between projects for you and others to benefit. I’ll cover how to do that and a few other possible improvements
Refactor the code into a class
You might already see how you could put the connection and setup code into an OptoForce.connect()
method, and the reading code into a OptoForce.read()
method. We’ll start off with the code below, copied from the previous post, and organized into a class. There’s no need to carefully read it now - it’s just there so you know where we’re starting off
import serial, struct
from serial.tools.list_ports import comports
SENSOR_PARAMS = {
'baudrate': 1000_000,
'stopbits': serial.STOPBITS_ONE,
'parity': serial.PARITY_NONE,
'bytesize': serial.EIGHTBITS,
}
class OptoForce:
def __init__(self):
# find the optoforce
devices = [dev for dev in comports() if dev.description == 'OptoForce DAQ']
if len(devices) == 0:
raise RuntimeError(f"Couldn't find an OptoForce")
elif len(devices) == 1:
self.dev = devices[0]
else:
raise RuntimeError(f'Found more than one OptoForce: {devices}')
def connect(self):
self.opt_ser = serial.Serial(self.dev.device, **SENSOR_PARAMS)
# write sensor setup code
header = (170, 0, 50, 3)
speed = 1 # 1 = 1000 Hz, 10 = 100 Hz, ...
filt = 0 # don't pre-filter data
zero = 255
checksum = sum(header) + speed + filt + zero
payload = (*header, speed, filt, zero, *divmod(checksum, 256))
self.opt_ser.write(bytes(payload))
def read(self):
expected_header = bytes((170, 7, 8, 10))
self.opt_ser.read_until(expected_header)
count, status, fx, fy, fz, checksum = (
struct.unpack('>HHhhhH', self.opt_ser.read(12))
)
return fx, fy, fz
def close(self):
self.opt_ser.close()
Let’s get on to the fun stuff now
Context handlers to make sure connections get closed
Remember how we used a with
block to make sure the connection gets closed afterwards? Let’s make sure that the same happens with this sensor class. To do so, we just define .__enter__()
and .__exit__()
methods (you can read this, this or this for explanations). We can also define a .__del__()
method, which will get run when the class is deleted (either manually or via garbage collection). The code might look like this (with the full code listing at the end of this article):
class OptoForce:
# ... define __init__, connect(), read() and close() ...
def __enter__(self):
self.connect()
return self
def __exit__(self, *args):
self.close()
def __del__(self):
self.close()
Now we can use our code in a very Python-y way:
with OptoForce() as force_sensor:
fx, fy, fz = force_sensor.read()
this_might_crash()
and know that the connection will be closed regardless of what happens in the program. The alternative would be something like:
force_sensor = OptoForce()
try:
force_sensor.connect()
fx, fy, fz = force_sensor.read()
this_might_crash()
finally: # the `finally` block of a try/catch statement always runs
force_sensor.close()
Setting up logging
It’s tempting to add print
statements everywhere in our code, so that when things go wrong, we can more easily debug it. However, constant printing from every module you use can is noisy, so there should be some way to turn logging on only when you need it. Enter: the logging
module. Again, I’ll refer you to the documentation and basic logging tutorial. We might do something like:
# in your optoforce.py file:
import logging
# by making a logger just for this file, we ensure that the logging
# for this file specifically can be turned off
logger = logging.getLogger(__name__) # __name__ = 'optoforce'
class OptoForce:
# ... __init__ code ...
def connect(self):
# ... code ...
# note: logger.info, NOT logging.info
logger.info(f'connected at port: {port}')
# ... code ...
logger.info(f'sent configuration bytes: {payload}')
def read(self):
# ... code ...
logger.debug(f'read data: {fx}, {fy}, {fz}')
return fx, fy, fz
def close(self):
# ... code ...
logger.info('closed connection')
Running that code won’t produce any output, since we only log at the INFO
severity level or lower, which isn’t logged by default. We can change that when we want to know what’s going on in more detail, though:
# in your main.py file:
import logging
logging.basicConfig(
format='%(asctime)s | %(levelname)s | %(name)s: %(message)s',
datefmt='%I:%M:%S',
level=logging.INFO)
with OptoForce() as of:
of.read()
which prints the following to your terminal:
12:18:19 | INFO | optoforce: connected at port: /dev/ttyUSB0
12:18:19 | INFO | optoforce: sent configuration bytes: b'\xaa\x002\x03\x01\x00\xff\x01\xdf'
12:18:19 | INFO | optoforce: closed connection
12:18:19 | DEBUG | optoforce: read data: 0.0, 0.0, -9.81
isn’t printed, since we didn’t set the log level to logging.DEBUG
See LogRecord attributes if you want to know the options for what things you can choose to log using logger.basicConfig(format=)
. If you want to mess around with this stuff in your terminal, I find it’s easier to run the logging.basicConfig()
stuff above, then:
logger = logging.getLogger('optoforce'); logger.warning('this is a test')
Make the sensor configurable
The sensor can be configured, so we should expose that functionality instead of hard-coding it for one particular application:
I can think of three simple ways to represent the mapping from frequency to integer:
# variables, as is used by pySerial:
SPEED_STOP = 0
SPEED_1000Hz = 1
SPEED_333Hz = 3
SPEED_100Hz = 10
SPEED_30Hz = 33
SPEED_10Hz = 100
# they represent fractions using eg. serial.STOPBITS_ONE_POINT_FIVE
# a dictionary
SPEED_HZ = {
'stop': 0,
1000: 1,
333: 3,
100: 10,
30: 33,
10: 100
}
# a class
class Speed:
_STOP = 0
_1000Hz = 1
_333Hz = 3
_100Hz = 10
_30Hz = 33
_10Hz = 100
# could also use an enum: https://docs.python.org/3/library/enum.html
None of these feel like a great solution to me, and they each have some disadvantages. The variables and class attributes are limited by the names you can give Python variables (eg. can’t start with a number, can’t have points in the name) and the dictionary approach more easily results in runtime errors if you use the wrong key. Perhaps there’s another approach? In this example, we’ll use the dictionary, since representing a frequency of 1.5 Hz is awkward otherwise, but we’ll use Python’s type hinting module to help reduce the chances of an error. This can be abstracted from the user API quite nicely:
from typing import Literal
SPEED_MAPPING = {
'stop': 0,
1000: 1,
333: 3,
100: 10,
30: 33,
10: 100
}
# must be one of these specific values:
SPEEDS = Literal['stop', 1000, 333, 100, 30, 10]
FILTER_MAPPING = {
'none': 0,
500: 1,
150: 2,
50: 3,
15: 4,
5: 5,
1.5: 6
}
FILTERS = Literal['none', 500, 150, 50, 15, 5, 1.5]
class OptoForce:
def __init__(self,
speed_hz: SPEEDS = 100,
filter_hz: FILTERS = 15):
self.speed = SPEED_MAPPING[speed_hz]
self.filt = FILTER_MAPPING[filter_hz]
# ... code ...
which might be used like:
from optoforce import OptoForce
with OptoForce(speed_hz=30, filter_hz='none') as force_sensor:
# do stuff
Great! Now, let’s think about how things could go wrong
Some considerations when using the class
Dealing with different sampling rates
What happens when you read from the sensor as a lower rate than what it transmits at? For example, it might send data at 1kHz, but you read from it at 100Hz. As the receiving buffer fills up, you’ll start to get delayed data (eg. the sample happens at t = 0.1 seconds, but you use it at t = 0.2 seconds) and potential buffer overflows. Both of those aren’t good
There are three solutions that I know of: the first is to simply reduce the sensor’s sample rate, and the second is to first flush the buffer before reading from it:
class OptoForce:
# ... code ...
def read(self, only_latest_data: bool):
# opt_ser.in_waiting returns the number of bytes in the buffer
if only_latest_data and self.opt_ser.in_waiting > 16:
# flush input to make sure we're not reading old data
self.opt_ser.reset_input_buffer()
# ... code to read from buffer ...
The third solution is to spawn a second process which exclusively reads data from the sensor at the sensor’s sample rate without being slowed down by the other work in your program
Reading from the sensor in another process
This is a potentially large undertaking, and likely not worth it most of the time. I’m just going to mention why you might do it, give an outline of the implementation, and mention some things to watch out for
First off, why?
- You could let the sensor run at a high speed and save every bit of data, which is useful for offline analysis
- It’s easier to make sure your other code always gets the latest data
- The main loop wouldn’t be slowed down by the time required to read and potentially pre-process data from the sensor
- You might find it simpler to reason about, since each ‘block’ in your system is an independent process
Next, how? Python has a multiprocessing module for situations like this. I won’t go into detail on it - just show a smallish code snippet highlighting the general approach. Feel free to skip to the next section
import multiprocessing
# Queue's and Event's are shared between processes.
# one process could `put` data on a `Queue`, and the
# other `get`s it, removing it from the queue
sensor_queue = multiprocessing.Queue()
# similarly, one process could `set` or `clear` an
# Event flag, and the other just reads it.
# the flag is initially false
replacing_data = multiprocessing.Event()
shut_down_sensor = multiprocessing.Event()
# the code in this function will run on another process
def run_sensor(queue, replacing, shutdown):
import optoforce, csv, time
data_arr = []
force_sensor = optoforce.OptoForce()
tstart = time.time()
# break the loop when told to do so
while not shutdown.is_set():
data = force_sensor.read(only_latest_data=False)
# if the queue is empty, put data inside
if queue.empty():
queue.put(data)
# otherwise, tell the other thread that we're putting
# data inside, then empty it, then put new data in.
# if we didn't do this flag stuff, the other process
# could take the data from the queue, so when this
# process tries to get it, it would hold, as the queue
# is empty.
# I wish there was a `queue.replacewith(data)` function
else:
replacing.set()
queue.get() # remove data from queue
queue.put(data)
replacing.clear()
data_arr.append([time.time() - tstart] + data)
# save data to disk
with open('sensorlog.csv', 'w') as csvfile:
csvfile.writerow('t [seconds],fx [N],fy [N],fz [N]')
csvfile.writerows(data_arr)
# the OptoForce sensor automatically closes its connection
# when it gets garbage collected, because we implemented
# the __del__ method!
try:
# start the sensor process
sensor_process = multiprocessing.Process(
target=run_sensor,
args=(sensor_queue, replacing_data, shut_down_sensor),
)
sensor_process.start()
while True:
while replacing.is_set(): pass
data = sensor_queue.get()
do_stuff_with_data(data)
finally:
# set the flag to signal the sensor process to shut down
shut_down_sensor.set()
Finally, why wouldn’t you do this?
- Mainly, because it (clearly) introduces more complexity. You’ll have to manage queues, processes, flags, shut things down correctly, be careful to avoid deadlocks, and so on
- The time required to read data from a sensor is usually small compared to other parts of your code
- You might be doing something wrong if you have really tight loops in Python, requiring this sort of optimisation. The Python part of a system is usually the high-level command centre which runs at a lower rate, with a microprocessor handling the really tight loops
- Interprocess communication introduces overhead, which could nullify any gains
Anyway, it might still be worth it for your particular use case
Package and upload!
You’ve used a lot of freely shared open source code to get to this point, so why not share your code as well? flit
makes this ridiculously easy! The website explains how to do this, so I won’t repeat them here, but you can look at the repository for this sensor as an example
Closing thoughts
This sensor is now robust, versatile, configurable, easy to install, use and debug, and so on. The price paid is time and new code complexity, so you might decide that it’s simply not worth it, which is totally fine and likely the right choice most of the time! I hope you at least learned something new