ni.com is currently undergoing scheduled maintenance.

Some services may be unavailable at this time. Please contact us for help or try again later.

Python and NI

cancel
Showing results for 
Search instead for 
Did you mean: 

stream_readers implementation in python

 Hey there!

 

I am looking into implementing the stream readers into my python code. The Docs have this interesting note:

 Consider using the nidaqmx.stream_readers and nidaqmx.stream_writers classes to increase the performance of your application, which accept pre-allocated NumPy arrays.

 

I cannot find an example of the implementation for this. See the code I have below for a 5 minute record. What benefits would I get with the stream reader and how to setup??

 

Using a NI 9234, with a high sensitivity accelerometer.

 

Thanks

 

Example Code:

 

import nidaqmx
from nidaqmx.constants import AcquisitionType
import matplotlib.pyplot as plt

sample_time = 60*5  # units = seconds
s_freq = 2**12
num_samples = sample_time*s_freq
dt = 1/s_freq

with nidaqmx.Task() as task:
    task.ai_channels.add_ai_accel_chan("cDAQ1Mod1/ai0",
                                       sensitivity=9948.0,
                                       max_val=0.5,
                                       min_val=-0.5)
    task.timing.cfg_samp_clk_timing(4096,
                                   sample_mode = AcquisitionType.CONTINUOUS)
    data = task.read(number_of_samples_per_channel=num_samples, timeout = nidaqmx.constants.WAIT_INFINITELY)

 

0 Kudos
Message 1 of 7
(16,904 Views)

 I would also be very interested to learn about stream readers. Currently trying to implement them.

0 Kudos
Message 2 of 7
(16,216 Views)

From what I see the closest we have to an example is https://github.com/ni/nidaqmx-python/blob/master/nidaqmx/tests/test_stream_analog_readers_writers.py

 

Just pull out the writer parts if you are only doing input.

0 Kudos
Message 3 of 7
(16,085 Views)

I am trying to implement the use of a streamreader within a task.register_every_n_samples_acquired_into_buffer_event.  I got the  read function work.  

Here is what I have for the input.  

from nidaqmx import stream_readers as sr

readings=np.ndarray((4,16384))
print(readings)
sr.AnalogMultiChannelReader.read_many_sample(readings,16384)

 

This it the error I get. 

Traceback (most recent call last):
File "_ctypes/callbacks.c", line 234, in 'calling callback function'
File "CodeFileGraphStreamReader.py", line 27, in data_collect
sr.AnalogMultiChannelReader.read_many_sample(readings,16384)
File "C:\Users\chad_finkbeiner\AppData\Local\Programs\Python\Python36-32\lib\site-packages\nidaqmx\stream_readers.py", line 327, in read_many_sample
self._task._calculate_num_samps_per_chan(
AttributeError: 'numpy.ndarray' object has no attribute '_task'

The documentation clearly states to use a numpy array.  

What am I missing? 

0 Kudos
Message 4 of 7
(15,477 Views)

Hi,

 

did you ever figure the stream_reader ADI out? I also would like to use it but the documentation is unclear. Any help appreciated.

 

 

0 Kudos
Message 5 of 7
(14,930 Views)

Hey there,

 

The `read_many_sample` method is not a static method on `AnalogMultiChannelReader`. You need to instantiate an `AnalogMultiChannelReader` object and pass it the `in_stream` object from the `nidaqmx.Task` object.

 

You can find an example of doing this here:

https://github.com/ni/nidaqmx-python/blob/master/nidaqmx/tests/test_stream_analog_readers_writers.py...

 

0 Kudos
Message 6 of 7
(14,894 Views)

I just wanted to provide an updated link for the github code, since the paths have been changed and the links above are broken.

 

https://github.com/ni/nidaqmx-python/tree/master/tests/legacy

0 Kudos
Message 7 of 7
(7,379 Views)