From Friday, April 19th (11:00 PM CDT) through Saturday, April 20th (2:00 PM CDT), 2024, ni.com will undergo system upgrades that may result in temporary service interruption.

We appreciate your patience as we improve our online experience.

Multifunction DAQ

cancel
Showing results for 
Search instead for 
Did you mean: 

nidaqmx reader and writer in a stream

I am trying to make a code with the python module nidaqmx (PXIe 6365, PXIe 6739 and communication with thunderbolt) wich does :

 

- read one sample with the analog inputs

- then, calculate the output signal with it

- and finally write the result into the analog outputs

- the inputs and outputs have also to be synchronized

 

Here is the code :

 

 

 

# -*- coding: utf-8 -*-

import nidaqmx
import numpy as np

class PXI():
    def __init__(self):
        system = nidaqmx.system.System.local()
        self.system = system
        self.devices = [system.devices[i].name for i in range(len(system.devices)-1)]
        
        self.task_AI = None
        self.task_AO = None
        self.Fs = None
        self.AIchannels = None
        self.dev_AI = None
        self.AIconfig = None
        self.AI_trig = None
        
        
        
    
    def config_AnalogIn(self,device,Fs,channels,config="AsymRef",trig=False,AI_trig=None,delay=0):
        self.task_AI = nidaqmx.Task()
        if config == "AsymRef": config = nidaqmx.constants.TerminalConfiguration.RSE
        AnIn = np.array([ch.name for ch in self.system.devices[device].ai_physical_chans])
        
        if trig and (AI_trig is not None):
            self.task_AI.ai_channels.add_ai_voltage_chan(AnIn[AI_trig],terminal_config=config)
        for chan in AnIn[channels]:
            if not trig or chan != AnIn[AI_trig]:
                self.task_AI.ai_channels.add_ai_voltage_chan(chan,terminal_config=config)
            
        if trig:
            self.task_AI.triggers.start_trigger.cfg_anlg_edge_start_trig(AnIn[AI_trig],trigger_level = 1)
            self.task_AI.triggers.start_trigger.delay_units = nidaqmx.constants.DigitalWidthUnits.SECONDS
        if delay>0: self.task_AI.triggers.start_trigger.delay = delay
        
        self.Fs = Fs
        self.AIchannels = channels
        self.dev_AI = device
        self.AIconfig = config
        self.AI_trig = AI_trig
        
    def config_AnalogOut(self,device,channels,trig_AI=False):
        self.task_AO = nidaqmx.Task()
        AnOut = np.array([ch.name for ch in self.system.devices[device].ao_physical_chans])
        for chan in AnOut[channels]:
            self.task_AO.ao_channels.add_ao_voltage_chan(chan)
        
        if trig_AI:
            self.task_AO.triggers.start_trigger.cfg_dig_edge_start_trig(self.task_AI.triggers.start_trigger.term)
    
    
    def config_sync(self,dev_AI,dev_AO,channels_In,channels_Out,Fs):
        self.config_AnalogIn(dev_AI,Fs,channels_In)
        self.config_AnalogOut(dev_AO,channels_Out,True)
    
    def config_stream(self,dev_AI,dev_AO,channels_In,channels_Out,Fs):
        
        self.config_sync(dev_AI, dev_AO, channels_In, channels_Out, Fs)
        self.reader = nidaqmx.stream_readers.AnalogMultiChannelReader(self.task_AI.in_stream)
        self.writer = nidaqmx.stream_writers.AnalogMultiChannelWriter(self.task_AO.in_stream,auto_start=False)
        self.task_AO.out_stream.regen_mode = nidaqmx.constants.RegenerationMode.DONT_ALLOW_REGENERATION
    
    def acquire_stream(self,Nsamples):
        self.task_AO.timing.cfg_samp_clk_timing(self.Fs,samps_per_chan=1)
        self.task_AI.timing.cfg_samp_clk_timing(self.Fs,samps_per_chan=1)
        data = np.zeros((len(self.AIchannels),Nsamples))
        temp = np.zeros((1,len(self.AIchannels)))
        dV = 1e-7
        n = 0
        
        def writeCallback(task_idx, every_n_samples_event_type, num_of_samples, callback_data):
            global data, n, temp
            self.reader.read_many_sample(temp,1)
            data[:,n] = temp.copy();
            self.writer.write_many_sample(temp+dV)  # write the array
            n += 1
            return 0
        self.task_AO.register_every_n_samples_transferred_from_buffer_event(Nsamples,writeCallback)
        self.task_AO.start()
        self.writer.write_many_sample(temp)
        
        return np.arange(Nsamples)/self.Fs,data
 
        
    def close_all(self):
        self.task_AI.close()
        self.task_AO.close()

 

 

 And in an other file :

 

 

 

# -*- coding: utf-8 -*-

import numpy as np
import matplotlib.pyplot as plt
import pypxi

dt = 0.5
Fs = 500
Nsamples = int(Fs*dt)
channels_In = [16]
channels_Out = [32]

pulse = np.zeros(Nsamples)
pulse[2000:3000] = 2

PXI = pypxi.PXI()
PXI.config_stream(0,1,channels_In,channels_Out,Fs)
t,data = PXI.acquire_stream(Nsamples)

plt.close()
plt.plot(t,data)


PXI.close_all()

 

 

 

All the functions works excepted the function "acquire_stream", wich return the error :

 

An internal error occurred.
Task Name: _unnamedTask<A3>

Status Code: -88700

 

Not very helpfull...

0 Kudos
Message 1 of 2
(587 Views)

- read one sample with the analog inputs

- then, calculate the output signal with it

- and finally write the result into the analog outputs

- the inputs and outputs have also to be synchronized

 

Based on your requirement, you should consider using NI-DAQmx Hardware-Timed Single-Point Mode.

And since you need to process data and write new signals to the AO ASAP, you should not use Python. Python executes much slower and introduces latency to your program. 

From C++ vs Python: Full Comparison - History-Computer:

Depending on the complexity of calculations, C++ is anywhere from 10 to 100 times faster than Python. Python programs also tend to use more RAM than applications built with C++. However, many programmers acknowledge that the simple syntax of Python makes it a much faster language for development.

 

If you have LabVIEW, there is an example available in Application Case 1 of NI-DAQmx Hardware-Timed Single Point Lateness Checking

If you don't, you can refer to the ANSI C example and make some modifications to use HWTSP instead of Continuous Sampling.

C:\Users\Public\Documents\National Instruments\NI-DAQ\Examples\DAQmx ANSI C\Synchronization\Multi-Function\Synch AI-AO

 

0 Kudos
Message 2 of 2
(560 Views)