[Python API] BrokenPipeError when streaming successivly

Moku model: Moku:GO
Operating system: Windows 10 and 11
Software version: Moku firmware v601, Moku Python package v3.3.3, Moku CLI v2.3.0

Hi,

I am trying to perform successive measurments with the datalogger using the get_stream_data method. The goal is to perform a measurment, get the data, process it, change the experimental parameters and then repeat. However, the code fails when trying to perform successive measurments, raising a BrokenPipeError, usually after 3 or 4 measurments.

Exmple of code :

from moku.instruments import Datalogger
import time

class Trace :
    
    
    def __init__(self,adress,sampling_rate,impedance = ['1MOhm','1MOhm'],coupling = ['AC','AC'],mrange=['10Vpp','10Vpp']):
        
        self.log = Datalogger(adress,force_connect=True)
        
        self.log.set_frontend(1,impedance[0],coupling[0],mrange[0])
        #self.log.set_frontend(2,impedance[1],coupling[1],mrange[1])
        self.log.enable_input(2,False)
        
        self.log.set_acquisition_mode('Precision')
        self.log.set_samplerate(sampling_rate)
        
        print('Moku setup done')
    
    def stream(self,T):
        
        self.log.start_streaming(T)
        
        complete = False
        while not complete :
            time.sleep(0.1)
            
            progress = self.log.logging_progress()
            complete = progress['complete']
        
        data = self.log.get_stream_data()
        self.log.stop_streaming()
        return(data)

if __name__ == "__main__":
    tracer = Trace('[myIPadress]',1e6)
    for i in range(100):
        test = tracer.stream(1)
        print(i)

Error message :

runfile('C:/Users/h.marot/Documents/CodesPythonHugo/mwe.py', wdir='C:/Users/h.marot/Documents/CodesPythonHugo')
Moku setup done
0
1
2
Exception in thread Thread-101:
Traceback (most recent call last):
  File "C:\tools\Anaconda3\Lib\threading.py", line 1073, in _bootstrap_inner
    self.run()
  File "C:\Users\h.marot\AppData\Roaming\Python\Python312\site-packages\moku\instruments\_stream.py", line 44, in run
    raise StreamException(error)
moku.exceptions.StreamException: Exception in thread mokucli-get-logsink0:
Traceback (most recent call last):
  File "threading.py", line 1038, in _bootstrap_inner
  File "threading.py", line 975, in run
  File "mokucli\converter\targets.py", line 85, in _get_chunk_recursive
BrokenPipeError: [Errno 32] Broken pipe

3
Traceback (most recent call last):

  File C:\tools\Anaconda3\Lib\site-packages\spyder_kernels\py3compat.py:356 in compat_exec
    exec(code, globals, locals)

  File c:\users\h.marot\documents\codespythonhugo\mwe.py:45
    test = tracer.stream(1)

  File c:\users\h.marot\documents\codespythonhugo\mwe.py:38 in stream
    data = self.log.get_stream_data()

  File ~\AppData\Roaming\Python\Python312\site-packages\moku\instruments\_stream.py:163 in get_stream_data
    raise Exception

Exception

This error was observed on two different computers (one on windows 10, and one on windows 11), and was observed when the moku was connected to the computer by an USB cable or an ethernet cable.

Hi Hugo,

Firstly, you are only calling get_stream_data() once, which means you are not getting all of the data from the stream in this one call. The streaming works by filling up a buffer as the data streams in, so you need to access the data as it streams in you need to call get_stream_data() in your while not complete: loop to get all the data from the stream.

Secondly, you do not need the self.log.stop_streaming() line as the stream will stop once you get all the data out of the buffer.

Your code should work with these changes but it will exit the script once the stream is completed (with the exception “End of stream”) and will therefore only complete a single stream. To avoid exiting the script you could implement some loop to check and filter if the exception is an “End of stream” exception or another kind like below:

from moku.instruments import Datalogger
from moku.exceptions import StreamException

sampling_rate = 250
impedance = ['1MOhm','1MOhm']
coupling = ['AC','AC']
mrange=['10Vpp','10Vpp']

log = Datalogger('10.1.XXX.XXX')

log.set_frontend(1,impedance[0],coupling[0],mrange[0])

log.set_acquisition_mode('Precision')
log.set_samplerate(sampling_rate)

print('Moku setup done')

for i in range(100):
    try:
        # this is where you can change the parameters for subsequent streams 
        # e.g. log.set_samplerate(sampling_rate+i)

        print(i)
        log.start_streaming(1)

        complete = False

        while True:
            data = log.get_stream_data()
            if data:
                keys = data.keys()
                for key in keys:
                    # Take measurement here
                    # Line below just prints out the bits of streamed data
                    print(f"'{key}': {str(data[key][:3])[:-1]} ... {data[key][-1]}]")

            progress = log.logging_progress()
            complete = progress['complete']

    except StreamException as e:
        print(e)
    else:
        # Close the connection to the Moku device
        log.relinquish_ownership()

To perform a measurement, get the data, process it, change the experimental parameters and then repeat, I think you could do this in 2 ways.

  1. You could use stream_to_file to save the measurement, load it into your script to process it and the go about changing the experimental parameters and repeating. There is a good example of streaming to file and loading it into the script in our Knowledge Base. It does take a few seconds to download the streamed file, so if timing is critical then this might not be the best option.
  2. You can use the streaming setup you have outlined, with the above mentioned adjustments, to stream the data and read out the measurements into variables during streaming. This example here, in particular the “streaming method” section is another good example of how to analyze the data in real-time.

I hope this helps, let me know if you have any more questions,
Indira