10-07-2019 12:14 PM
Firstly, apologies for posting this here but I cannot find any Python related forum. Admins, please feel free to move that where ever it fits best.
I have a USB 6008 and I want to trigger a Python task when one of its digital inputs receive a TTL pulse. Hence, lets say that a microscope is connected to GND and P0.0. I am a total beginner with NI-DAQ devices and I have difficulty doing the callback please. This is my Python3 script (heavily based on online examples) but while it doesnt return any errors I dont seem to enter the callback.
What am I doing wrong please?
import PyDAQmx import numpy as np import ctypes # Define the python function def DoneCallback_py(taskHandle, status, callbackData): print("In callback_py") return 0 # The function should return an integer # Convert the python function to a C function callback # The name is defined in DAQmxTypes DoneCallback = PyDAQmx.DAQmxDoneEventCallbackPtr(DoneCallback_py) timeout = 1.0 class CallbackTask(PyDAQmx.Task): def __init__(self, source): PyDAQmx.Task.__init__(self) data = np.array([0], dtype=np.uint8) self.a = [] c_samps_read = ctypes.c_int32(0) c_bytes_per_samp = ctypes.c_int32(0) self.CreateDIChan(source, "", PyDAQmx.DAQmx_Val_ChanPerLine) self.ReadDigitalLines(PyDAQmx.DAQmx_Val_Auto, timeout, PyDAQmx.DAQmx_Val_GroupByChannel, data, 1, ctypes.byref(c_samps_read), ctypes.byref(c_bytes_per_samp), None) self.RegisterDoneEvent(0, DoneCallback, None) print('initialised!') task=CallbackTask(source = "Dev1/port0/line0") task.StartTask() input('Acquiring samples continuously. Press Enter to interrupt\n') task.StopTask() task.ClearTask()
10-07-2019 02:02 PM - edited 10-07-2019 02:07 PM
Try to change task to finite (1 sample) and add the start trigger instead of reading from the line.
When task is acquiring data, it does not care what data are there, true or false. Since you are acquiring continuously, it is never done. So your callback is never called.
With trigger it will wait till true (rising edge) happens on trigger line, only then starts reading. When it completes task (1 sample), it generates event "done".
PS (edit) you can try to configure "Change detection event", or "Every N samples acuired into buffer", that might be faster and allows you to read data, not just trigger something else.
10-07-2019 04:20 PM - edited 10-07-2019 04:21 PM
Alexander, many thanks for your reply. I am sorry but I cant figure out how to do this. Do you have any pointers to:
I am relatively latency tolerant, hence looking into the "Change detection event", or "Every N samples acquired into buffer" for possible speed improvements can be done later on
Many thanks
10-07-2019 05:41 PM - edited 10-07-2019 05:43 PM
This is how far I got, hitting now the exception:
PyDAQmx.DAQmxFunctions.AttributeNotSupportedInTaskContextError: Specified property is not supported by the device or is not applicable to the task. Property: DAQmx_StartTrig_Type Task Name: _unnamedTask<0> Status Code: -200452 in function DAQmxCfgDigEdgeStartTrig
The code generating this exception is
import PyDAQmx import numpy as np import ctypes # Define the python function def DoneCallback_py(taskHandle, status, callbackData): print("In callback_py") return 0 # The function should return an integer # Convert the python function to a C function callback # The name is defined in DAQmxTypes DoneCallback = PyDAQmx.DAQmxDoneEventCallbackPtr(DoneCallback_py) timeout = 1.0 class CallbackTask(PyDAQmx.Task): def __init__(self, source): PyDAQmx.Task.__init__(self) data = np.array([0], dtype=np.uint8) self.a = [] c_samps_read = ctypes.c_int32(0) c_bytes_per_samp = ctypes.c_int32(0) self.CreateDIChan(source, "", PyDAQmx.DAQmx_Val_ChanPerLine) self.ReadDigitalLines(PyDAQmx.DAQmx_Val_Auto, timeout, PyDAQmx.DAQmx_Val_GroupByChannel, data, 1, ctypes.byref(c_samps_read), ctypes.byref(c_bytes_per_samp), None) self.RegisterDoneEvent(0, DoneCallback, None) self.cfg_digital_start_trigger(source) print('initialised!')\ def cfg_digital_start_trigger(self, source): self.CfgDigEdgeStartTrig(source, PyDAQmx.DAQmx_Val_Rising) print('hello') task=CallbackTask(source = "Dev1/port0/line0")
If anyone has any ideas I would be greatful
10-07-2019 06:19 PM - edited 10-07-2019 06:20 PM
I am not very good at Python, I did not notice you are not configuring task timing (see link below), just noticed "continuous data acquisition".
What board do you have? Trigger and most daqmx events are supported only for buffer acquisitions, but not all boards support it.
How fast do you need to respond to trigger? 1ms? 10 ms?
How long is it?
Right now your tasks is doing the following:
[Create. Read once. Register event] (wait for enter) Stop (not used, task is not running) Clear
WIth finite acquisition you would do:
Create. Set timing (finite, N samples). Set trigger. Register callback for done event. Start (task will wait for trigger, after which starts copying into internal buffer). Done event appears. Clear.
No loop is required, it is waiting for trigger on board level.
Alternative way is to read in a loop and check value, when it is true - call what you need:
Create. Start. Loop: (Read, stop if true) Call your function manually. Stop. Clear
Calling Start task before the loop allows to read much faster, than just calling read in a loop, normally you can get down to less than ms rate.
Timing: https://nidaqmx-python.readthedocs.io/en/latest/start_trigger.html
Trigger: https://nidaqmx-python.readthedocs.io/en/latest/timing.html
DAQmx Task (Start, events limitation, etc): https://nidaqmx-python.readthedocs.io/en/latest/task.html
10-09-2019 04:40 PM
Thanks,
I ended up doing the the alternative way. Its a bit rough and naive but cant figure out how to do the callback