This post follows on from my previous post about working with embedded systems using Python. I highly recommend starting off entering blocks of code (as in the previous post) into a REPL while you debug things, but once you’ve got that set up, it’s worth making the code reusable between projects for you and others to benefit. I’ll cover how to do that and a few other possible improvements

Refactor the code into a class

You might already see how you could put the connection and setup code into an OptoForce.connect() method, and the reading code into a OptoForce.read() method. We’ll start off with the code below, copied from the previous post, and organized into a class. There’s no need to carefully read it now - it’s just there so you know where we’re starting off

import serial, struct
from serial.tools.list_ports import comports

SENSOR_PARAMS = {
    'baudrate': 1000_000,
    'stopbits': serial.STOPBITS_ONE,
    'parity': serial.PARITY_NONE,
    'bytesize': serial.EIGHTBITS,
}

class OptoForce:
    def __init__(self):
        # find the optoforce
        devices = [dev for dev in comports() if dev.description == 'OptoForce DAQ']
        if len(devices) == 0:
            raise RuntimeError(f"Couldn't find an OptoForce")
        elif len(devices) == 1:
            self.dev = devices[0]
        else:
            raise RuntimeError(f'Found more than one OptoForce: {devices}')
    
    def connect(self):
        self.opt_ser = serial.Serial(self.dev.device, **SENSOR_PARAMS)
        # write sensor setup code
        header = (170, 0, 50, 3)
        speed = 1  # 1 = 1000 Hz, 10 = 100 Hz, ...
        filt = 0   # don't pre-filter data
        zero = 255
        checksum = sum(header) + speed + filt + zero
        payload = (*header, speed, filt, zero, *divmod(checksum, 256))
        self.opt_ser.write(bytes(payload))

    def read(self):
        expected_header = bytes((170, 7, 8, 10))
        self.opt_ser.read_until(expected_header)

        count, status, fx, fy, fz, checksum = (
            struct.unpack('>HHhhhH', self.opt_ser.read(12))
        )
        return fx, fy, fz

    def close(self):
        self.opt_ser.close()

Let’s get on to the fun stuff now

Context handlers to make sure connections get closed

Remember how we used a with block to make sure the connection gets closed afterwards? Let’s make sure that the same happens with this sensor class. To do so, we just define .__enter__() and .__exit__() methods (you can read this, this or this for explanations). We can also define a .__del__() method, which will get run when the class is deleted (either manually or via garbage collection). The code might look like this (with the full code listing at the end of this article):

class OptoForce:
    # ... define __init__, connect(), read() and close() ...

    def __enter__(self):
        self.connect()
        return self

    def __exit__(self, *args):
        self.close()
    
    def __del__(self):
        self.close()

Now we can use our code in a very Python-y way:

with OptoForce() as force_sensor:
    fx, fy, fz = force_sensor.read()
    this_might_crash()

and know that the connection will be closed regardless of what happens in the program. The alternative would be something like:

force_sensor = OptoForce()

try:
    force_sensor.connect()
    fx, fy, fz = force_sensor.read()
    this_might_crash()
finally:  # the `finally` block of a try/catch statement always runs
    force_sensor.close()

Setting up logging

It’s tempting to add print statements everywhere in our code, so that when things go wrong, we can more easily debug it. However, constant printing from every module you use can is noisy, so there should be some way to turn logging on only when you need it. Enter: the logging module. Again, I’ll refer you to the documentation and basic logging tutorial. We might do something like:

# in your optoforce.py file:
import logging

# by making a logger just for this file, we ensure that the logging
# for this file specifically can be turned off
logger = logging.getLogger(__name__)  # __name__ = 'optoforce'

class OptoForce:
    # ... __init__ code ...

    def connect(self):
        # ... code ...
        # note: logger.info, NOT logging.info
        logger.info(f'connected at port: {port}')
        # ... code ...
        logger.info(f'sent configuration bytes: {payload}')

    def read(self):
        # ... code ...
        logger.debug(f'read data: {fx}, {fy}, {fz}')
        return fx, fy, fz
    
    def close(self):
        # ... code ...
        logger.info('closed connection')

Running that code won’t produce any output, since we only log at the INFO severity level or lower, which isn’t logged by default. We can change that when we want to know what’s going on in more detail, though:

# in your main.py file:
import logging
logging.basicConfig(
    format='%(asctime)s | %(levelname)s | %(name)s: %(message)s',
    datefmt='%I:%M:%S',
    level=logging.INFO)

with OptoForce() as of:
    of.read()

which prints the following to your terminal:

12:18:19 | INFO | optoforce: connected at port: /dev/ttyUSB0
12:18:19 | INFO | optoforce: sent configuration bytes: b'\xaa\x002\x03\x01\x00\xff\x01\xdf'
12:18:19 | INFO | optoforce: closed connection

12:18:19 | DEBUG | optoforce: read data: 0.0, 0.0, -9.81 isn’t printed, since we didn’t set the log level to logging.DEBUG

See LogRecord attributes if you want to know the options for what things you can choose to log using logger.basicConfig(format=). If you want to mess around with this stuff in your terminal, I find it’s easier to run the logging.basicConfig() stuff above, then: logger = logging.getLogger('optoforce'); logger.warning('this is a test')

Make the sensor configurable

The sensor can be configured, so we should expose that functionality instead of hard-coding it for one particular application:

Screenshot of OptoForce configuration mapping
Screenshot of OptoForce configuration mapping

I can think of three simple ways to represent the mapping from frequency to integer:

# variables, as is used by pySerial:
SPEED_STOP = 0
SPEED_1000Hz = 1
SPEED_333Hz = 3
SPEED_100Hz = 10
SPEED_30Hz = 33
SPEED_10Hz = 100
# they represent fractions using eg. serial.STOPBITS_ONE_POINT_FIVE

# a dictionary
SPEED_HZ = {
    'stop': 0,
    1000: 1,
    333: 3,
    100: 10,
    30: 33,
    10: 100
}

# a class
class Speed:
    _STOP = 0
    _1000Hz = 1
    _333Hz = 3
    _100Hz = 10
    _30Hz = 33
    _10Hz = 100
# could also use an enum: https://docs.python.org/3/library/enum.html

None of these feel like a great solution to me, and they each have some disadvantages. The variables and class attributes are limited by the names you can give Python variables (eg. can’t start with a number, can’t have points in the name) and the dictionary approach more easily results in runtime errors if you use the wrong key. Perhaps there’s another approach? In this example, we’ll use the dictionary, since representing a frequency of 1.5 Hz is awkward otherwise, but we’ll use Python’s type hinting module to help reduce the chances of an error. This can be abstracted from the user API quite nicely:

from typing import Literal

SPEED_MAPPING = {
    'stop': 0,
    1000: 1,
    333: 3,
    100: 10,
    30: 33,
    10: 100
}
# must be one of these specific values:
SPEEDS = Literal['stop', 1000, 333, 100, 30, 10]

FILTER_MAPPING = {
    'none': 0,
    500: 1,
    150: 2,
    50: 3,
    15: 4,
    5: 5,
    1.5: 6
}
FILTERS = Literal['none', 500, 150, 50, 15, 5, 1.5]

class OptoForce:
    def __init__(self,
                 speed_hz: SPEEDS = 100,
                 filter_hz: FILTERS = 15):
        self.speed = SPEED_MAPPING[speed_hz]
        self.filt = FILTER_MAPPING[filter_hz]
        # ... code ...

which might be used like:

from optoforce import OptoForce
with OptoForce(speed_hz=30, filter_hz='none') as force_sensor:
    # do stuff

Great! Now, let’s think about how things could go wrong

Some considerations when using the class

Dealing with different sampling rates

What happens when you read from the sensor as a lower rate than what it transmits at? For example, it might send data at 1kHz, but you read from it at 100Hz. As the receiving buffer fills up, you’ll start to get delayed data (eg. the sample happens at t = 0.1 seconds, but you use it at t = 0.2 seconds) and potential buffer overflows. Both of those aren’t good

There are three solutions that I know of: the first is to simply reduce the sensor’s sample rate, and the second is to first flush the buffer before reading from it:

class OptoForce:
    # ... code ...

    def read(self, only_latest_data: bool):
        # opt_ser.in_waiting returns the number of bytes in the buffer
        if only_latest_data and self.opt_ser.in_waiting > 16:

            # flush input to make sure we're not reading old data
            self.opt_ser.reset_input_buffer()
        
        # ... code to read from buffer ...

The third solution is to spawn a second process which exclusively reads data from the sensor at the sensor’s sample rate without being slowed down by the other work in your program

Reading from the sensor in another process

This is a potentially large undertaking, and likely not worth it most of the time. I’m just going to mention why you might do it, give an outline of the implementation, and mention some things to watch out for

First off, why?

  1. You could let the sensor run at a high speed and save every bit of data, which is useful for offline analysis
  2. It’s easier to make sure your other code always gets the latest data
  3. The main loop wouldn’t be slowed down by the time required to read and potentially pre-process data from the sensor
  4. You might find it simpler to reason about, since each ‘block’ in your system is an independent process

Next, how? Python has a multiprocessing module for situations like this. I won’t go into detail on it - just show a smallish code snippet highlighting the general approach. Feel free to skip to the next section

import multiprocessing

# Queue's and Event's are shared between processes.
# one process could `put` data on a `Queue`, and the
# other `get`s it, removing it from the queue
sensor_queue = multiprocessing.Queue()

# similarly, one process could `set` or `clear` an
# Event flag, and the other just reads it.
# the flag is initially false
replacing_data = multiprocessing.Event()
shut_down_sensor = multiprocessing.Event()

# the code in this function will run on another process
def run_sensor(queue, replacing, shutdown):
    import optoforce, csv, time

    data_arr = []
    force_sensor = optoforce.OptoForce()
    tstart = time.time()

    # break the loop when told to do so
    while not shutdown.is_set():
        data = force_sensor.read(only_latest_data=False)

        # if the queue is empty, put data inside
        if queue.empty():
            queue.put(data)

        # otherwise, tell the other thread that we're putting
        # data inside, then empty it, then put new data in.
        # if we didn't do this flag stuff, the other process
        # could take the data from the queue, so when this
        # process tries to get it, it would hold, as the queue
        # is empty.
        # I wish there was a `queue.replacewith(data)` function
        else:
            replacing.set()
            queue.get()  # remove data from queue
            queue.put(data)
            replacing.clear()
        
        data_arr.append([time.time() - tstart] + data)

    # save data to disk
    with open('sensorlog.csv', 'w') as csvfile:
        csvfile.writerow('t [seconds],fx [N],fy [N],fz [N]')
        csvfile.writerows(data_arr)
    
    # the OptoForce sensor automatically closes its connection
    # when it gets garbage collected, because we implemented
    # the __del__ method!

try:
    # start the sensor process
    sensor_process = multiprocessing.Process(
        target=run_sensor,
        args=(sensor_queue, replacing_data, shut_down_sensor),
    )

    sensor_process.start()
    while True:
        while replacing.is_set(): pass

        data = sensor_queue.get()

        do_stuff_with_data(data)
finally:
    # set the flag to signal the sensor process to shut down
    shut_down_sensor.set()

Finally, why wouldn’t you do this?

  1. Mainly, because it (clearly) introduces more complexity. You’ll have to manage queues, processes, flags, shut things down correctly, be careful to avoid deadlocks, and so on
  2. The time required to read data from a sensor is usually small compared to other parts of your code
  3. You might be doing something wrong if you have really tight loops in Python, requiring this sort of optimisation. The Python part of a system is usually the high-level command centre which runs at a lower rate, with a microprocessor handling the really tight loops
  4. Interprocess communication introduces overhead, which could nullify any gains

Anyway, it might still be worth it for your particular use case

Package and upload!

You’ve used a lot of freely shared open source code to get to this point, so why not share your code as well? flit makes this ridiculously easy! The website explains how to do this, so I won’t repeat them here, but you can look at the repository for this sensor as an example

Closing thoughts

This sensor is now robust, versatile, configurable, easy to install, use and debug, and so on. The price paid is time and new code complexity, so you might decide that it’s simply not worth it, which is totally fine and likely the right choice most of the time! I hope you at least learned something new