07-22-2024 05:51 AM - edited 07-22-2024 06:05 AM
Greetings,
I am working on a project to teach myself more about nidaqmx. The goal is to write and read analog data at a synced set interval and send this data to a web page for (almost) live visual feedback.
The current issue I have is that while using a finite sampling mode, the program either quits instantly or too soon and the buffer callback is not called the expected times.
A solution I found is using the done callback together with a global flag, but I have the feeling there should be a cleaner way of doing this.
My initial assumption was that the wait_until_done function would work for this, but when I use this instead of the global flag the buffer callback is not called at all.
I am using nidaqmx-python with a simulated USB-6343 daq, my current code looks like this;
import nidaqmx
from nidaqmx.constants import AcquisitionType, Edge
import numpy as np
import matplotlib.pyplot as plt
import time
finished = False
def generate_sine_wave(frequency, amplitude, sampling_rate, duration):
t = np.linspace(0, duration, int(sampling_rate * duration), endpoint=False)
return amplitude * np.sin(2 * np.pi * frequency * t)
# REF: https://www.ni.com/en/support/documentation/supplemental/06/timing-and-synchronization-features-of-ni-daqmx.html
def main():
with nidaqmx.Task() as ao_task, nidaqmx.Task() as ai_task:
sampling_rate = 1000.0 # Samples per second
duration = 3.0 # in seconds
number_of_samples = int(sampling_rate * duration)
# Generate a sine wave
data_out = generate_sine_wave(frequency=10.0, amplitude=1.0, sampling_rate=sampling_rate, duration=duration)
# Configure the analog output channel
ao_task.ao_channels.add_ao_voltage_chan("Dev1/ao0")
ao_task.timing.cfg_samp_clk_timing(sampling_rate, sample_mode=AcquisitionType.FINITE,
samps_per_chan=number_of_samples)
# Configure the analog input channel
ai_task.ai_channels.add_ai_voltage_chan("Dev1/ai0")
ai_task.timing.cfg_samp_clk_timing(sampling_rate, source='/Dev1/ao/SampleClock',
sample_mode=AcquisitionType.FINITE, samps_per_chan=number_of_samples)
# Write data to the AO task
ao_task.write(data_out, auto_start=False)
def callback(task_handle, every_n_samples_event_type, sample_count, callback_data):
print(ai_task.read(number_of_samples_per_channel=sample_count))
return 0
def test(task_handle, status, callback_data):
global finished
finished = True
return 0
ai_task.register_every_n_samples_acquired_into_buffer_event(1000, callback)
ai_task.register_done_event(test)
# Start the tasks
ai_task.start()
ao_task.start()
# Wait for the tasks to complete (does not print any samples at all)
# ai_task.wait_until_done(timeout=10)
# ao_task.wait_until_done(timeout=10)
# Works as expected
global finished
while not finished:
pass
# Does not print the last sample
# while not ai_task.is_task_done():
# pass
# This does not work if we read the samples beforehand (because the buffer is empty)
# Read data from the AI task
# data_in = ai_task.read(number_of_samples_per_channel=number_of_samples)
# Plot the results
# plt.plot(data_out, label='Output')
# plt.plot(data_in, label='Input')
# plt.legend()
# plt.show()
if __name__ == "__main__":
main()
What would be a better way of approaching this? Of course other suggestions for improving my program are also welcome.
Thanks a lot in advance.