Problems streaming data from Datalogger in MIM for 24hrs

I’m using my Moku:Pro to stream data to my computer while also getting periodic spectra from the Spectrum Analyzer via Python script. Therefore I setup the two instruments in Muli-Instrument Mode, configure the inputs and the instruments and start the DataLogger with
logger.start_streaming(86400, 500e3) while reading the data with logger.get_chunk().

In another thread, I’m periodically calling
spectrum.get_data(units='Vrms', psdUnits=True, wait_reacquire=False, wait_complete=False)
in order to get the spectra.

While testing, I noticed two problems:

  1. The script aborts sporadically because one of the queries to the moku returns with a “502 - Bad gateway” exception. Seems like I could fix that by enclosing all communication with the moku with threading.Lock() logic.
  2. After running for a little over 7 hours, the script stopped, probably because an empty package was returned by get_chunk(). The result of get_stream_status() shows “Error” as status.
    Connecting to the moku with the Desktop app right after the script finished did show “Error, an overflow error occured” in the datalogger instrument (translated from german).

Do you have any explanation for this behaviour? How can I stream data for 24 hours (or more)?

In the meantime, I suceeded to do a 24hrs measurement. Seems like previously started streams where the python program crashed before closing down the stream kept on running, filling the Mokus buffer.

After succesfully performing multiple streams of some single digit hours duration, the streaming stopped working again. The MIM-setup works as intended, but there is only one chunk (containing the header data) transmitted. The second call to get_chunk returns an empty packet.
When I connect to the Moku with the Desktop App directly afterwards, the Datalogger gives the same “Overflow” error as in the screenshot in the first message.

Is there any way to clear the internal buffer from python? Can I do anything to stop the buffer from overflowing? Is there any other explanation for this behaviour?

Regards,
Heiko

Hello Heiko,

To stream for longer than 24 hours, there are a few factors you will need to consider. The first and most important is that you should use an ethernet connection for your Moku:Pro. The buffer fills up if the chunks aren’t retrieved faster than the samples are written. Connecting to your Pro over WiFi can be very slow and you will likely run into this issue. Using ethernet is much faster and will reduce the likelihood of overflow.

Other factors that can fill the buffer up quickly include high sample rates, using Precision acquisition, and enabling multiple channels. These settings use more space and fill up the buffer quickly.

What is the configuration of your Pro for this test? What’s the sample rate and acquisition mode, and how many channels are enabled? Would you mind sharing your script?

Thank you,
Nadia

Hi Nadia,

yes, the Moku is connected via Ethernet and the script is fast enough to get all the data - it did work for several measurements, streaming for something like 40 hours in total. After that, the stream always aborted right away after starting it - until I did a power cycle of the Moku. Now the streaming is working fine again.

I’m using the data logger in precision mode with 500kSa/s - the longest stream I did with this configuration was 24 hours.

Here are the relevant parts of the code:

from time import sleep
from threading import Thread, Lock
from datetime import datetime, timezone
import moku.exceptions
from moku.instruments import Datalogger, MultiInstrument, SpectrumAnalyzer

def readLogger(moku_logger: Datalogger,
               duration: float,
               fs: [float, int],
               lock: Lock) -> None:
    print('Start streaming...')
    
    count = 0
    prev_samples_overflow = 0
    prev_samples_before = 0
    lock.acquire()
    try:
        moku_logger.start_streaming(duration, fs)
    except moku.exceptions.MokuException as exc:
        print(f'{datetime.now(timezone.utc):%Y-%m-%d, %H-%M-%S}\tMoku \'start_streaming\' error: {exc}')
    finally:
        lock.release()
    
    while True:
        chunk_samples = 0
        previous_samples = 0
        lock.acquire()
        try:
            raw_data: bytes = moku_logger.get_chunk()
            r = moku_logger.get_stream_status()
        except moku.exceptions.MokuException as exc:
            print(f'{datetime.now(timezone.utc):%Y-%m-%d, %H-%M-%S}\tMoku streamer error: {exc}')
        except Exception as exc:
            print(exc)
            break
        finally:
            lock.release()
            if len(raw_data) == 0:
                print(f'{datetime.now(timezone.utc):%Y-%m-%d, %H-%M-%S}\tMoku streamer: empty chunk')
                break
                
        if r['error'] != '':
            print(f'{datetime.now(timezone.utc):%Y-%m-%d, %H-%M-%S}\tMoku streamer error: {r["error"]}')
        
        samples = r['cumulative_size']
        status = r['status']
        chunks = r['no_of_chunks']
        count += 1
        if count == 1:
            scale_bytes = int.from_bytes(raw_data[45:47], byteorder='little')
            scale = float(raw_data[48: 47 + scale_bytes])
        else:
            channel = int(int.from_bytes(raw_data[0:0 + 4], byteorder='little'))
            if channel != 0:
                continue
            
            chunk_samples = int(int.from_bytes(raw_data[4:4 + 4], byteorder='little') / 4)
            
            previous_samples = int(int.from_bytes(raw_data[8:8 + 4], byteorder='little') / 4)
            prev_samples_overflow += 1 if previous_samples < prev_samples_before else 0
            prev_samples_before = previous_samples
            
            if len(raw_data) != chunk_samples * 4 + 16:
                print(f'Malformed packet')
                
            write_to_disk(raw_data[16:])
        
        print(f'\rChunk samples: {chunk_samples: >8_}, '
              f'Total samples: {chunk_samples + previous_samples + prev_samples_overflow * (2**32):>14_}, '
              f'Samples overflowed: {prev_samples_overflow:>2}, '
              f'Remote size: {samples:>10_}, '
              f'Status: {status:>16}, '
              f'Remote chunks: {chunks:>5}',
              end='')
        
        sleep(0.01)
    
    print()
    lock.acquire()
    try:
        moku_logger.stop_streaming()
        print('Stop streaming...')
    finally:
        lock.release()
    

if __name__ == '__main__':
    rlu_moku = MultiInstrument(moku_ip, platform_id=4, force_connect=True)
    moku_sa: SpectrumAnalyzer = rlu_moku.set_instrument(1, SpectrumAnalyzer)
    moku_logger: Datalogger = rlu_moku.set_instrument(2, Datalogger)
    rlu_moku.set_connections([{'source': 'Input1', 'destination': 'Slot1InA'},
                              {'source': 'Input1', 'destination': 'Slot2InA'},])
    rlu_moku.set_frontend(1, '1MOhm', 'AC', '0dB')
    moku_logger.enable_input(2, False)
    moku_logger.set_acquisition_mode('Precision')
    moku_sa.set_span(0, 250000)
    moku_sa.set_window('Hann')
    moku_sa.set_rbw('Manual', 500)
    moku_sa.set_averaging(1.0)
    
    try:
        moku_logger.stop_logging()
        moku_logger.stop_streaming()
    except Exception as exc:
        print(f'No logging or streaming session in progress...')

    logger = Thread(target=readLogger,
                        kwargs={'moku_logger': moku_logger,
                                'duration': meas_duration,
                                'fs': 500000,
                                'lock': lock})

Hello @hschill,

Thank you for your patience! I’ve been looking into this and have been able to reproduce this issue. Here is a quick way to clear the buffer between test runs:

status = dl.get_stream_status()
if status['cumulative_size'] != 0:
    dl.start_streaming(1)
    while dl.get_chunk() != b'':
        _ = dl.get_chunk()

If the remote size is increasing, the Python code may not finish writing before the next chunk needs to be read, it may be worth writing the chunks to a queue that processes the data in another thread.

If this still does not help, it may be due to network speed or the data transfer not being fast enough.

  1. You could try using stream_to_file, streaming to hdf5 is the most efficient file type, with the fastest speed and smallest size.
  2. Or you could try using get_stream_data, which converts the chunk to a readable dictionary so you don’t have to convert it yourself.

Hi Nadia,
thanks for your help. I will implement the code in my script and test it.

Regarding you two suggestions, unfortunatley neither of them is a viable option.
I can’t use stream_to_file() since I want to process the received data on the fly.
I tried get_stream_data()right at the beginning, but unfortunately it was so slow for me (probably the caclulation of the timestamps), that the buffer overflowed after a few minutes of data streaming.

Kind regards
Heiko

Hi @hschill,

Apologies for the delay in my response! I have a few questions for you;
How are you processing the data from get chunk? And are you able time time it? What is the content of the function write_to_disk(raw_data[16:])?
Did you consider writing the chunks to a queue to process the received data in another thread?

Converting the data through mokucli is CPU limited, which is why get_stream_data may be too slow for high sample rates and long stream durations.

Cheers,
Nadia