07-11-2017 07:51 AM
I'm using the Python bindings for DAQmx Base, which is simplified a wrapper around the C functions.
I think some of you know this Python binding PyDAQmx.
Currently I'm using the same sample rate for two different Tasks. One Task writes one time the sample to the AnalogOutput and then it's repeated internally in the device itself. The other Task reads two Analog Inputs in a while loop. Is there an easy way to use a C function to synchronize the blocking call ReadAnalogF64 with the repeated samples of the other Task, which outputs the analog signal?
Here the code. Cleaned a little bit up, but still not perfect and it's bleeding edge. Needs Python 3.6.
#!/usr/bin/env python3.6 import logging import sys import zmq import numpy as np import PyDAQmx from PyDAQmx.DAQmxFunctions import * from PyDAQmx.DAQmxConstants import * def get_device_name(physicalChannel): if isinstance(physicalChannel, list): return physicalChannel[0].split('/')[0] if isinstance(physicalChannel, str): return physicalChannel.split('/')[0] else: raise Exception('Wrong type of physicalChannel') class ContextManager: def __enter__(self): return self def __exit__(self, *args): self.stop() class MultiChannelAnalogInput(ContextManager): def __init__(self, physicalChannel, min_limit=-10.0, max_limit=10.0, reset=False, rate=4096, samples=512): self.device = get_device_name(physicalChannel) self.physicalChannel = physicalChannel self.numberOfChannel = len(physicalChannel) self.min_limit = min_limit self.max_limit = max_limit self.samples = samples self.sampling_rate = rate self.data = np.zeros(samples, dtype=np.float64) if reset: DAQmxResetDevice(self.device) def configure(self): task = PyDAQmx.Task() for name in self.physicalChannel: log.info(f'Adding chan {name}') task.CreateAIVoltageChan(name, '', DAQmx_Val_RSE, self.min_limit, self.max_limit, DAQmx_Val_Volts, None) task.CfgSampClkTiming(None, self.sampling_rate // self.numberOfChannel, DAQmx_Val_Rising, DAQmx_Val_ContSamps, self.samples // self.numberOfChannel) self.task = task def read(self): task = self.task task.StartTask() dlen = self.samples numberOfChannel = self.numberOfChannel dlen_per_channel = dlen // numberOfChannel data = self.data read = int32() log.info('Starting read loop') while True: task.ReadAnalogF64(dlen, 10.0, DAQmx_Val_GroupByChannel, data, dlen, byref(read), None) yield data def stop(self): log.info('Stopping analog input task') self.task.StopTask() class ChannelAnalogOutput(ContextManager): def __init__(self, physicalChannel, min_limit=0.0, max_limit=10.0, reset=False, rate=10240, samples=512): self.device = get_device_name(physicalChannel) self.physicalChannel = physicalChannel self.min_limit = min_limit self.max_limit = max_limit self.samples = samples self.sampling_rate = rate if reset: DAQmxResetDevice(self.device) def configure(self): task_handle = PyDAQmx.Task() log.info(f'Adding chan {self.physicalChannel}') task_handle.CreateAOVoltageChan(self.physicalChannel, '', self.min_limit, self.max_limit, DAQmx_Val_Volts, None) task_handle.CfgSampClkTiming(None, self.sampling_rate, DAQmx_Val_Rising, DAQmx_Val_ContSamps, self.samples) self.taskHandle = task_handle def write(self, signal_array): data = signal_array dlen = self.samples write = int32() taskHandle = self.taskHandle taskHandle.WriteAnalogF64(dlen, True, 10.0, DAQmx_Val_GroupByChannel, data, byref(write), None) log.info(f'Starting output to channel {self.physicalChannel}') taskHandle.StartTask() self.task = taskHandle def stop(self): log.info('Stopping analog output task') self.task.StopTask() class ZMQ_Publisher: def __init__(self, bind_to): context = zmq.Context() socket = context.socket(zmq.PUB) socket.bind(bind_to) self.socket = socket def send(self, np_array): self.socket.send_pyobj(np_array) def __enter__(self): return self def __exit__(self, *args): log.info('Closing socket') self.socket.close() def triangle(samples): half_samples = samples // 2 rising = np.linspace(1, 10, num=half_samples, dtype=np.float64) falling = np.linspace(10, 1, num=half_samples, dtype=np.float64) return np.concatenate((rising, falling)) def get_samples(): if len(sys.argv) != 2: print(f'{sys.argv[0]} samples') sys.exit(1) try: samples = int(sys.argv[1]) except ValueError: print('samples must be an integer') sys.exit(1) return samples if __name__ == '__main__': logging.basicConfig() log = logging.getLogger(__name__) log.setLevel(logging.DEBUG) output_channel = 'Dev1/ao0' input_channels = ['Dev1/ai0', 'Dev1/ai1'] samples = get_samples() rate = samples * 10 log.info(f'Samplerate: {rate}') log.info(f'Samples of output: {samples}') log.info(f'Samples per analog input channel: {samples // len(input_channels)}') output_settings = dict(physicalChannel=output_channel, reset=True, rate=rate, samples=samples) input_settings = dict(physicalChannel=input_channels, reset=False, rate=rate, samples=samples) log.info('Setting up server') # using a contextmanager to ensure that everythig is closed correctly with ZMQ_Publisher('tcp://*:5556') as server,\ ChannelAnalogOutput(**output_settings) as ao,\ MultiChannelAnalogInput(**input_settings) as multipleAI: # prepare output ao.configure() # generate output signal log.info('Generate triangle sample') output_sample = triangle(samples) # write the sample log.info('Writing sample to output') ao.write(output_sample) # prepare analog inputs multipleAI.configure() read_generator = multipleAI.read() try: for data in read_generator: server.send(data) except KeyboardInterrupt: log.info('Closing now')
A Radar receiver is connected to AO0, AI0 and AI1.
To show that there is no synchronization between the two tasks, i recorded a Video.
The code is running inside a virtual machine with Scientific Linux 6.
The visualization is running outside the virtual machine. Currently I do not split the both signals.
07-13-2017 08:52 AM
I found a hint with example code here: http://www.ni.com/example/28596/en/
It's very short and easy to understand.
Now trying this to implement this in Python with PyDAQmx.
When I'm done I'll share my code with you.
07-14-2017 05:02 AM
It seems to be a monologue with my self.
I have seen, that triggerSource in DAQmxCfgDigEdgeStartTrig must start with a leading slash. Instead of "ai/StartTrigger" it must contain the Device specifier with a leading slash: "/Dev1/ai/StartTrigger". Currently with my test code I can read the analog inputs, but the analog output is not triggered, so I have still a doppler radar.
If it helps: I'm using SL6 x86 with nidaqmxbase-15.0.0.
What I'm doing wrong?
08-25-2017 07:58 AM
Finally I switched from Linux to Windows.
Every time when I was trying to find another solution, I read what I could not do with Ni DAQmx Base. It's horrifying. Ni is announcing Linux Support, but in fact everything is limited.
If you try to do a similar task in Python, you should use PyDAQmx which is more Pythonic as the thin C-Wrappers for DAQmx [Base] ANSI C.
What I like on this package is for example the support for a context manager. It guarantees that a Task will be stopped, whatever happens.
My code, which is actually working:
def read_loop(self): sample_clock = f'/{self.device}/ai/SampleClock' with Task('AITask') as ai_task, Task('AOTask') as ao_task: for channel in self.ai_channel: ai_task.ai_channels.add_ai_voltage_chan(channel, min_val=self.sensor['output_min'], max_val=self.sensor['output_max'], terminal_config=RSE) ai_task.timing.cfg_samp_clk_timing(self.sample_rate, sample_mode=CONTINUOUS, samps_per_chan=self.samples) ao_task.ao_channels.add_ao_voltage_chan(self.ao_channel, min_val=self.sensor['vco_min'], max_val=self.sensor['vco_max']) ao_task.timing.cfg_samp_clk_timing(self.sample_rate, source=sample_clock, sample_mode=CONTINUOUS, samps_per_chan=self.samples) ao_task.write(self.signal.output) ao_task.start() while True: try: yield ai_task.read(self.samples) except KeyboardInterrupt: break
I'm using ai/SampleClock for the ai_task and ao_task. Then I start the ao_task, which is waiting for the ai_task. This synchronizes the analog output with the analog inputs.
The method is generator. Outside I'm iterating over the generator, which yields the values from the analog inputs.