Multifunction DAQ

cancel
Showing results for 
Search instead for 
Did you mean: 

NI USB 6211 - PyDAQmx - Sync Analog Inputs with Analog Outputs

I'm using the Python bindings for DAQmx Base, which is simplified a wrapper around the C functions.

I think some of you know this Python binding PyDAQmx.

 

Currently I'm using the same sample rate for two different Tasks. One Task writes one time the sample to the AnalogOutput and then it's repeated internally in the device itself. The other Task reads two Analog Inputs in a while loop. Is there an easy way to use a C function to synchronize the blocking call ReadAnalogF64 with the repeated samples of the other Task, which outputs the analog signal?

 

Here the code. Cleaned a little bit up, but still not perfect and it's bleeding edge. Needs Python 3.6.

 

#!/usr/bin/env python3.6

import logging
import sys


import zmq
import numpy as np
import PyDAQmx
from PyDAQmx.DAQmxFunctions import *
from PyDAQmx.DAQmxConstants import *


def get_device_name(physicalChannel):
    if isinstance(physicalChannel, list):
        return physicalChannel[0].split('/')[0]
    if isinstance(physicalChannel, str):
        return physicalChannel.split('/')[0]
    else:
        raise Exception('Wrong type of physicalChannel')


class ContextManager:

    def __enter__(self):
        return self
        
    def __exit__(self, *args):
        self.stop()


class MultiChannelAnalogInput(ContextManager):

    def __init__(self, physicalChannel, min_limit=-10.0, max_limit=10.0, reset=False, rate=4096, samples=512):
        self.device = get_device_name(physicalChannel)
        self.physicalChannel = physicalChannel
        self.numberOfChannel = len(physicalChannel)
        self.min_limit = min_limit
        self.max_limit = max_limit
        self.samples = samples
        self.sampling_rate = rate
        self.data = np.zeros(samples, dtype=np.float64)
        if reset:
            DAQmxResetDevice(self.device)
            

    def configure(self):
        task = PyDAQmx.Task()
        for name in self.physicalChannel:
            log.info(f'Adding chan {name}')
            task.CreateAIVoltageChan(name, '', DAQmx_Val_RSE, self.min_limit, self.max_limit, DAQmx_Val_Volts, None)
        task.CfgSampClkTiming(None, self.sampling_rate // self.numberOfChannel, DAQmx_Val_Rising, DAQmx_Val_ContSamps, self.samples // self.numberOfChannel)
        self.task = task

    def read(self):
        task = self.task  
        task.StartTask()
        dlen = self.samples
        numberOfChannel = self.numberOfChannel
        dlen_per_channel = dlen // numberOfChannel
        data = self.data
        read = int32()
        log.info('Starting read loop')
        while True:
            task.ReadAnalogF64(dlen, 10.0, DAQmx_Val_GroupByChannel, data, dlen, byref(read), None)
            yield data
            
    def stop(self):
        log.info('Stopping analog input task')
        self.task.StopTask()


class ChannelAnalogOutput(ContextManager):

    def __init__(self, physicalChannel, min_limit=0.0, max_limit=10.0, reset=False, rate=10240, samples=512):
        self.device = get_device_name(physicalChannel)
        self.physicalChannel = physicalChannel
        self.min_limit = min_limit
        self.max_limit = max_limit
        self.samples = samples
        self.sampling_rate = rate
        if reset:
            DAQmxResetDevice(self.device)

                
    def configure(self):
        task_handle = PyDAQmx.Task()
        log.info(f'Adding chan {self.physicalChannel}')
        task_handle.CreateAOVoltageChan(self.physicalChannel, '', self.min_limit, self.max_limit, DAQmx_Val_Volts, None)
        task_handle.CfgSampClkTiming(None, self.sampling_rate, DAQmx_Val_Rising, DAQmx_Val_ContSamps, self.samples)
        self.taskHandle = task_handle

    def write(self, signal_array):
        data = signal_array   
        dlen = self.samples
        write = int32()
        taskHandle = self.taskHandle
        taskHandle.WriteAnalogF64(dlen, True, 10.0, DAQmx_Val_GroupByChannel, data, byref(write), None)
        log.info(f'Starting output to channel {self.physicalChannel}')
        taskHandle.StartTask()
        self.task = taskHandle
    
    def stop(self):
        log.info('Stopping analog output task')
        self.task.StopTask()


class ZMQ_Publisher:
    def __init__(self, bind_to):
        context = zmq.Context()
        socket = context.socket(zmq.PUB)
        socket.bind(bind_to)
        self.socket = socket
    
    def send(self, np_array):
        self.socket.send_pyobj(np_array)
    
    def __enter__(self):
        return self

    def __exit__(self, *args):
        log.info('Closing socket')
        self.socket.close()


def triangle(samples):
    half_samples = samples // 2
    rising = np.linspace(1, 10, num=half_samples, dtype=np.float64)
    falling = np.linspace(10, 1, num=half_samples, dtype=np.float64)
    return np.concatenate((rising, falling))


def get_samples():
    if len(sys.argv) != 2:
        print(f'{sys.argv[0]} samples')
        sys.exit(1)
    try:
        samples = int(sys.argv[1])
    except ValueError:
        print('samples must be an integer')
        sys.exit(1)
    return samples


if __name__ == '__main__':
    logging.basicConfig()
    log = logging.getLogger(__name__)
    log.setLevel(logging.DEBUG)
    
    output_channel = 'Dev1/ao0'
    input_channels = ['Dev1/ai0', 'Dev1/ai1']
    
    samples = get_samples()
    rate = samples * 10
    log.info(f'Samplerate: {rate}')
    log.info(f'Samples of output: {samples}')
    log.info(f'Samples per analog input channel: {samples // len(input_channels)}')
    
    output_settings = dict(physicalChannel=output_channel, reset=True, rate=rate, samples=samples)
    input_settings = dict(physicalChannel=input_channels, reset=False, rate=rate, samples=samples)
    
    
    log.info('Setting up server')
    # using a contextmanager to ensure that everythig is closed correctly
    with ZMQ_Publisher('tcp://*:5556') as server,\
        ChannelAnalogOutput(**output_settings) as ao,\
        MultiChannelAnalogInput(**input_settings) as multipleAI:
        
        # prepare output
        ao.configure()

        # generate output signal
        log.info('Generate triangle sample')
        output_sample = triangle(samples)

        # write the sample
        log.info('Writing sample to output')
        ao.write(output_sample)

        # prepare analog inputs
        multipleAI.configure()
        read_generator = multipleAI.read()
        try:
            for data in read_generator:
                server.send(data)
        except KeyboardInterrupt:
            log.info('Closing now')

 

A Radar receiver is connected to AO0, AI0 and AI1.

To show that there is no synchronization between the two tasks, i recorded a Video.

The code is running inside a virtual machine with Scientific Linux 6.

The visualization is running outside the virtual machine. Currently I do not split the both signals.

 

0 Kudos
Message 1 of 4
(4,244 Views)

I found a hint with example code here: http://www.ni.com/example/28596/en/

It's very short and easy to understand.

Now trying this to implement this in Python with PyDAQmx.
When I'm done I'll share my code with you.

0 Kudos
Message 2 of 4
(4,190 Views)

It seems to be a monologue with my self.

I have seen, that triggerSource in DAQmxCfgDigEdgeStartTrig must start with a leading slash. Instead of "ai/StartTrigger" it must contain the Device specifier with a leading slash: "/Dev1/ai/StartTrigger". Currently with my test code I can read the analog inputs, but the analog output is not triggered, so I have still a doppler radar.

 

If it helps: I'm using SL6 x86 with nidaqmxbase-15.0.0.

What I'm doing wrong?

0 Kudos
Message 3 of 4
(4,171 Views)

Finally I switched from Linux to Windows.

 

Every time when I was trying to find another solution, I read what I could not do with Ni DAQmx Base. It's horrifying. Ni is announcing Linux Support, but in fact everything is limited.

 

If you try to do a similar task in Python, you should use PyDAQmx which is more Pythonic as the thin C-Wrappers for DAQmx [Base] ANSI C.

What I like on this package is for example the support for a context manager. It guarantees that a Task will be stopped, whatever happens.

 

My code, which is actually working:

 

    def read_loop(self):
        sample_clock = f'/{self.device}/ai/SampleClock'
        with Task('AITask') as ai_task, Task('AOTask') as ao_task:
            for channel in self.ai_channel:
                ai_task.ai_channels.add_ai_voltage_chan(channel, min_val=self.sensor['output_min'], max_val=self.sensor['output_max'], terminal_config=RSE)
            ai_task.timing.cfg_samp_clk_timing(self.sample_rate, sample_mode=CONTINUOUS, samps_per_chan=self.samples)
            ao_task.ao_channels.add_ao_voltage_chan(self.ao_channel, min_val=self.sensor['vco_min'], max_val=self.sensor['vco_max'])
            ao_task.timing.cfg_samp_clk_timing(self.sample_rate, source=sample_clock, sample_mode=CONTINUOUS, samps_per_chan=self.samples)
            ao_task.write(self.signal.output)
            ao_task.start()
            while True:
                try:
                    yield ai_task.read(self.samples)
                except KeyboardInterrupt:
                    break

I'm using ai/SampleClock for the ai_task and ao_task. Then I start the ao_task, which is waiting for the ai_task. This synchronizes  the analog output with the analog inputs.

 

The method is generator. Outside I'm iterating over the generator, which yields the values from the analog inputs.

Message 4 of 4
(4,030 Views)