01-13-2020 11:58 AM
Hello everyone,
I'm new using nidaqmx python API with a USB-6361 DAQ and I face an issue while trying to use an analog input and an analog output simultaneously. My setup is a a simple control of an actuator and a sensor.
The idea is to start the reading task to record the output of my sensor while I emit a signal with the acutator. The timings look like this (input and output could be started simultaneously):
Here is my current minimum working example :
import numpy as np
import nidaqmx
from nidaqmx import stream_readers
from nidaqmx import stream_writers
out_chan = 'Dev1/ao0'
in_chan = 'Dev1/ai0'
fs = 1 * 10 ** 6 # Sampling at 1MHz
f0 = 100 * 10 ** 3 # Signal at 100kHz
nb_pts = (fs // f0) // 2
signal = np.append(np.ones(nb_pts), 0)
result = np.zeros(fs) # Recording during 1s
with nidaqmx.Task() as out_task, nidaqmx.Task() as in_task:
out_task.ao_channels.add_ao_voltage_chan(out_chan)
out_task.timing.cfg_samp_clk_timing(
rate=fs,
sample_mode=nidaqmx.constants.AcquisitionType.FINITE,
samps_per_chan=len(signal))
writer = stream_writers.AnalogSingleChannelWriter(out_task.out_stream, auto_start=True)
in_task.ai_channels.add_ai_voltage_chan(in_chan)
in_task.timing.cfg_samp_clk_timing(
rate=fs,
sample_mode=nidaqmx.constants.AcquisitionType.FINITE,
samps_per_chan=len(result))
reader = stream_readers.AnalogSingleChannelReader(in_task.in_stream)
reader.read_many_sample(result)
writer.write_many_sample(signal)
out_task.wait_until_done()
in_task.wait_until_done()
My issue is that when I start the input task, it blocks my script during the full reading duration. Then, the output task is played after that.
Playing with the output part of the code, I noticed that the writer stream is non-blocking because I can switch it to CONTINUOUS and let it run while doing other things. Thus I tried to start this output before recording the at these high frequencies, the reader misses some data at the beginning.
I know that triggers exist, but I did not understand how to use them, as the python documentation is quite scarce about them.
Solved! Go to Solution.
01-13-2020 01:42 PM
I'm a LabVIEW guy and can't really help with detailed syntax for the python API. Here are some general tips:
1. I would tend to avoid "auto-start" behavior. You'll have more control over timing & sequencing when you choose to start your tasks explicitly. (Note: the input task will likely auto-start when you try to read from it before you've explicitly started it.)
2. After you've done most of the config, I would change things at the tail end of your posted code something like this:
writer.write_many_sample(signal)
// insert code to start the input task (but don't read yet!)
// insert code to start the output task
reader.read_many_sample(result)
// the read call will wait to collect all samples so the
// next "wait_until_done()" is likely redundant
in_task.wait_until_done()
out_task.wait_until_done()
3. There are also ways to sync tasks in hardware if you need to. For now, this makes sure you start acquisition *just before* starting your signal generation. They aren't sync'ed, but you also won't be missing stuff.
-Kevin P
01-14-2020 08:12 AM
Thank you very much Kevin!
Separating the in_task "start" and the reading command did the trick. For those interested, here the corrected code:
import numpy as np
import nidaqmx
from nidaqmx import stream_readers
from nidaqmx import stream_writers
out_chan = 'Dev1/ao0'
in_chan = 'Dev1/ai0'
fs = 1 * 10 ** 6 # Sampling at 1MHz
f0 = 100 * 10 ** 3 # Signal at 100kHz
nb_pts = (fs // f0) // 2
signal = np.append(np.ones(nb_pts), 0)
result = np.zeros(fs) # Recording during 1s
with nidaqmx.Task() as out_task, nidaqmx.Task() as in_task:
out_task.ao_channels.add_ao_voltage_chan(out_chan)
out_task.timing.cfg_samp_clk_timing(
rate=fs,
sample_mode=nidaqmx.constants.AcquisitionType.FINITE,
samps_per_chan=len(signal))
writer = stream_writers.AnalogSingleChannelWriter(
task_out_stream=out_task.out_stream,
auto_start=False)
writer.write_many_sample(signal)
in_task.ai_channels.add_ai_voltage_chan(in_chan)
in_task.timing.cfg_samp_clk_timing(
rate=fs,
sample_mode=nidaqmx.constants.AcquisitionType.FINITE,
samps_per_chan=len(result))
reader = stream_readers.AnalogSingleChannelReader(in_task.in_stream)
in_task.start()
out_task.start()
reader.read_many_sample(result)
# Do something with <result>
I think it will be sufficient for now. If I need more precise synchronisation, I will dig the hardware triggers way.