LabVIEW

cancel
Showing results for 
Search instead for 
Did you mean: 

Help with callback

Firstly, apologies for posting this here but I cannot find any Python related forum. Admins, please feel free to move that where ever it fits best.

 

I have a USB 6008 and I want to trigger a Python task when one of its digital inputs receive a TTL pulse. Hence, lets say that a microscope is connected to GND and P0.0. I am a total beginner with NI-DAQ devices and I have difficulty doing the callback please. This is my Python3 script (heavily based on online examples) but while it doesnt return any errors I dont seem to enter the callback.

 

What am I doing wrong please?

 

import PyDAQmx
import numpy as np
import ctypes


# Define the python function
def DoneCallback_py(taskHandle, status, callbackData):
    print("In callback_py")
    return 0 # The function should return an integer

# Convert the python function to a C function callback
# The name is defined in DAQmxTypes
DoneCallback = PyDAQmx.DAQmxDoneEventCallbackPtr(DoneCallback_py)

timeout = 1.0
class CallbackTask(PyDAQmx.Task):
    def __init__(self, source):
        PyDAQmx.Task.__init__(self)
        data = np.array([0], dtype=np.uint8)
        self.a = []
        c_samps_read = ctypes.c_int32(0)
        c_bytes_per_samp = ctypes.c_int32(0)

        self.CreateDIChan(source,
                          "",
                          PyDAQmx.DAQmx_Val_ChanPerLine)

        self.ReadDigitalLines(PyDAQmx.DAQmx_Val_Auto,
                              timeout,
                              PyDAQmx.DAQmx_Val_GroupByChannel,
                              data,
                              1,
                              ctypes.byref(c_samps_read),
                              ctypes.byref(c_bytes_per_samp),
                              None)

        self.RegisterDoneEvent(0, DoneCallback, None)
        print('initialised!')



task=CallbackTask(source = "Dev1/port0/line0")
task.StartTask()

input('Acquiring samples continuously. Press Enter to interrupt\n')

task.StopTask()
task.ClearTask()
0 Kudos
Message 1 of 6
(221 Views)

Try to change task to finite (1 sample) and add the start trigger instead of reading from the line.

When task is acquiring data, it does not care what data are there, true or false. Since you are acquiring continuously, it is never done. So your callback is never called.

With trigger it will wait till true (rising edge) happens on trigger line, only then starts reading. When it completes task (1 sample), it generates event "done".

PS (edit) you can try to configure "Change detection event", or "Every N samples acuired into buffer", that might be faster and allows you to read data, not just trigger something else. 

0 Kudos
Message 2 of 6
(174 Views)

Alexander, many thanks for your reply. I am sorry but I cant figure out how to do this. Do you have any pointers to:

  • change to a the finite task
  • do the trigger

I am relatively latency tolerant, hence looking into the "Change detection event", or "Every N samples acquired into buffer" for possible speed improvements can be done later on

 

Many thanks

0 Kudos
Message 3 of 6
(149 Views)

This is how far I got, hitting now the exception:

PyDAQmx.DAQmxFunctions.AttributeNotSupportedInTaskContextError: Specified property is not supported by the device or is not applicable to the task.
Property: DAQmx_StartTrig_Type

Task Name: _unnamedTask<0>

Status Code: -200452
 in function DAQmxCfgDigEdgeStartTrig

The code generating this exception is

import PyDAQmx
import numpy as np
import ctypes

# Define the python function
def DoneCallback_py(taskHandle, status, callbackData):
    print("In callback_py")
    return 0 # The function should return an integer

# Convert the python function to a C function callback
# The name is defined in DAQmxTypes
DoneCallback = PyDAQmx.DAQmxDoneEventCallbackPtr(DoneCallback_py)

timeout = 1.0
class CallbackTask(PyDAQmx.Task):
    def __init__(self, source):
        PyDAQmx.Task.__init__(self)
        data = np.array([0], dtype=np.uint8)
        self.a = []
        c_samps_read = ctypes.c_int32(0)
        c_bytes_per_samp = ctypes.c_int32(0)

        self.CreateDIChan(source,
                          "",
                          PyDAQmx.DAQmx_Val_ChanPerLine)

        self.ReadDigitalLines(PyDAQmx.DAQmx_Val_Auto,
                              timeout,
                              PyDAQmx.DAQmx_Val_GroupByChannel,
                              data,
                              1,
                              ctypes.byref(c_samps_read),
                              ctypes.byref(c_bytes_per_samp),
                              None)

        self.RegisterDoneEvent(0, DoneCallback, None)

        self.cfg_digital_start_trigger(source)
        print('initialised!')\


    def cfg_digital_start_trigger(self, source):
        self.CfgDigEdgeStartTrig(source,  PyDAQmx.DAQmx_Val_Rising)
        print('hello')

task=CallbackTask(source = "Dev1/port0/line0")

If anyone has any ideas I would be greatful

0 Kudos
Message 4 of 6
(129 Views)

I am not very good at Python, I did not notice you are not configuring task timing (see link below), just noticed "continuous data acquisition".

What board do you have? Trigger and most daqmx events are supported only for buffer acquisitions, but not all boards support it.

 

How fast do you need to respond to trigger? 1ms? 10 ms? 

How long is it? 

 

Right now your tasks is doing the following:

[Create. Read once. Register event] (wait for enter)  Stop (not used, task is not running) Clear

 

WIth finite acquisition you would do:

Create. Set timing (finite, N samples). Set trigger. Register callback for done event. Start (task will wait for trigger, after which starts copying into internal buffer). Done event appears. Clear. 

No loop is required, it is waiting for trigger on board level.

 

Alternative way is to read in a loop and check value, when it is true - call what you need:

Create. Start. Loop: (Read, stop if true) Call your function manually. Stop. Clear

 

Calling Start task before the loop allows to read much faster, than just calling read in a loop, normally you can get down to less than ms rate.

 

Timing: https://nidaqmx-python.readthedocs.io/en/latest/start_trigger.html

Trigger: https://nidaqmx-python.readthedocs.io/en/latest/timing.html

DAQmx Task (Start, events limitation, etc): https://nidaqmx-python.readthedocs.io/en/latest/task.html

Message 5 of 6
(121 Views)

Thanks,

I ended up doing the the alternative way. Its a bit rough and naive but cant figure out how to do the callback

0 Kudos
Message 6 of 6
(74 Views)