Instrument Control (GPIB, Serial, VISA, IVI)

cancel
Showing results for 
Search instead for 
Did you mean: 

LeCroy WavePro 735 Zi-A automation script using PyVisa

Dear All,

Thanks for your help in advance. For the last few weeks I am trying to use a LeCroy WavePro 735 Zi-A for my acquisition of traces. In principle, the idea would be the following:
The scope should wait until a signal passes its trigger level, and then acquire the waveform. This should run for a fixed amount of time (eg. 40 seconds). At the end, all the waveforms should be plotted in one final graph, mostly overlapping each other.  The scope (and the additional devices) are connected via TCIP and GPIB using pyvisa to my computer running windows 10 (if necessary information). The script is written in Python using VBS and SCPI commands. See my script here:

import pyvisa
import time
import csv
import os
from datetime import datetime
import numpy as np
import matplotlib.pyplot as plt

# -------------------- Parameters --------------------
duration = 40  # seconds
poll_interval = 0.001
waveform_channel = "C1"
trigger_level = 0.012  # [V]
bias_current = 0.00000  # [A]
compliance_voltage = 20  # [V]
switch_threshold = 20.0
cooldown_time = 60
vdiv = 0.01    # in volts/div
tdiv = 0.0001   # in seconds/div

# -------------------- Paths --------------------
save_path = r""
os.makedirs(save_path, exist_ok=True)
logfile_path = os.path.join(save_path, "trigger_log.csv")
switch_log_path = os.path.join(save_path, "switch_events.csv")

# -------------------- VISA Setup --------------------
rm = pyvisa.ResourceManager()
scope = rm.open_resource("")
keithley = rm.open_resource("GPIB0::26::INSTR")
scope.timeout = 3000
keithley.timeout = 1000

# -------------------- Keithley Init --------------------
keithley.write("*RST")
keithley.write("smua.source.func = smua.OUTPUT_DCAMPS")
keithley.write(f"smua.source.leveli = {bias_current}")
keithley.write(f"smua.source.limitv = {compliance_voltage}")
keithley.write("smua.source.output = smua.OUTPUT_ON")
print(f"Keithley is sourcing {bias_current} A")

# -------------------- Scope Init --------------------
#Initial Reset
scope.write("*RST")
scope.write("*CLS")

# Channel Setup
scope.write(f"{waveform_channel}:TRACE ON")
scope.write("C2:TRACE OFF")
scope.write("C3:TRACE OFF")
scope.write("C4:TRACE OFF")
scope.write("GRID SINGLE")
scope.write(f"{waveform_channel}:CPL D1M")
scope.write(f"TIME_DIV {tdiv}")
scope.write(f"{waveform_channel}:VDIV {vdiv} V")

# Sampling Rate

scope.write("VBS 'app.acquisition.horizontal.maximize = \"FixedSampleRate\"'")
resp = scope.query("VBS? 'Return = app.Acquisition.Horizontal.Maximize'")
print("Fixed‑rate mode is:", resp.strip())
record_length = int(20* (1/tdiv))
scope.write(f"HORIZONTAL:RECORD_LENGTH {record_length}")

# Trigger Settings — **only once before the loop**
scope.write("TRIG_MODE NORM")
scope.write(f"TRIG_SELECT EDGE, SR, {waveform_channel}")
scope.write(f"{waveform_channel}:TRIG_LEVEL {trigger_level}V")
scope.write(f"{waveform_channel}:TRIG_SLOPE POS")

# Data formatting
scope.write("COMM_FORMAT DEF9,WORD,BIN")
scope.write(f"WFSU SP,0,NP,{record_length},FP,0,SN,0")
scope.write("COMM_ORDER HI")
scope.write("CHDR OFF")
scope.write("DISP ON")

# Print setup finished
# ------------ Check initial parameters -------------


print("Coupling:", scope.query(f"{waveform_channel}:CPL?"))
print("Trigger Level:", scope.query("C1:TRIG_LEVEL?"))
print("Waveform setup:", scope.query("WFSU?"))
print("Comm header:", scope.query("CHDR?"))
print("Time division:", scope.query("C1:TIME_DIV?"))
print("Voltage division:", scope.query("C1:VDIV?"))
print("Comm order:", scope.query("COMM_ORDER?"))
print("Vertical offset:", scope.query("OFFSET?"))
print("Vertical scale:", scope.query("VBS? 'Return = app.Acquisition.C1.VerScale'"))

# -------------------- Logging Setup --------------------
trigger_times = []
switch_times = []

with open(logfile_path, mode='w', newline='') as log_file, \
     open(switch_log_path, mode='w', newline='') as switch_file:

    trigger_writer = csv.writer(log_file)
    switch_writer = csv.writer(switch_file)

    trigger_writer.writerow(["Trigger Index", "Time Since Start (s)", "Timestamp", "Waveform File"])
    switch_writer.writerow(["Switch Time (s)", "Timestamp", "Voltage (V)"])

    print(f"Monitoring for {duration} seconds...")
    start_time = time.time()
    event_index = 0
    switch_logged = False

    # ------------- Acquisition Loop -------------
    while time.time() - start_time < duration:
        t_now = time.time() - start_time
        timestamp_str = datetime.now().strftime("%Y-%m-%d_%H-%M-%S-%f")[:-3]

        # --- Keithley voltage check (unchanged) ---
        try:
            voltage = float(keithley.query("print(smua.measure.v())").strip())
            if voltage > switch_threshold and not switch_logged:
                print(f"Sample switched at {t_now:.3f}s, V = {voltage:.2f} V")
                switch_writer.writerow([f"{t_now:.6f}", timestamp_str, f"{voltage:.3f}"])
                switch_times.append(t_now)
                switch_logged = True
                keithley.write("smua.source.output = smua.OUTPUT_OFF")
                print("Keithley OFF for cooldown (60s)")
                time.sleep(cooldown_time)
                keithley.write("smua.source.output = smua.OUTPUT_ON")
                print("Keithley back ON")
            else:
                switch_logged = False
        except Exception as e:
            print(f"Voltage check error: {e}")

        # --- Trigger Detection & Waveform Acquisition ---
        scope.write("*CLS")
        scope.write("ARM")                 # arm for one shot
        triggered = False
        t_arm = time.time()

        # poll INR? for bit 0 until timeout or trigger
        while (time.time() - t_arm) < (duration - (time.time() - start_time)):
            inr = int(scope.query("INR?").strip())
            if inr & 0x1:                  # bit 0 => trigger
                triggered = True
                break
            time.sleep(poll_interval)

        if not triggered:
            # no trigger in this cycle → loop back and re‑arm
            continue

        # got a trigger → download waveform exactly as before
        print(f"Trigger #{event_index + 1} at {t_now:.3f}s")

        scope.write(f"{waveform_channel}:WF? DESC")
        desc_raw = scope.read_raw()
        hash_index = desc_raw.find(b"#")
        if hash_index == -1:
            print("Malformed DESC header")
            continue
        num_digits = int(desc_raw[hash_index + 1:hash_index + 2])
        num_bytes  = int(desc_raw[hash_index + 2:hash_index + 2 + num_digits])
        dstart     = hash_index + 2 + num_digits
        desc_clean = desc_raw[dstart:dstart + num_bytes]
        if len(desc_clean) < 346:
            print("Descriptor too short, skipping.")
            continue

        def get_float_from(buf, off, ln=4): return np.frombuffer(buf[off:off+ln], dtype=">f4")[0]
        def get_double_from(buf, off):    return np.frombuffer(buf[off:off+8], dtype=">f8")[0]
        def get_int_from(buf, off, ln):   return int.from_bytes(buf[off:off+ln], byteorder="big", signed=True)

        vert_gain      = get_float_from(desc_clean, 156)
        vert_offset    = get_float_from(desc_clean, 160)
        horiz_interval = get_float_from(desc_clean, 176)
        horiz_offset   = get_double_from(desc_clean, 180)
        first_valid    = get_int_from(desc_clean, 124, 4)
        last_valid     = get_int_from(desc_clean, 128, 4)
        num_points     = get_int_from(desc_clean, 116, 4)

        scope.write(f"{waveform_channel}:WF? DAT1")
        data_raw = scope.read_raw()
        payload = data_raw[16:-1]
        arr = np.frombuffer(payload, dtype=">i2")
        data_trimmed = arr[first_valid:last_valid+1]
        volts = data_trimmed * vert_gain - vert_offset
        times = np.arange(len(volts)) * horiz_interval + horiz_offset

        wf_filename = f"waveform_{timestamp_str}.csv"
        np.savetxt(
          os.path.join(save_path, wf_filename),
          np.column_stack((times, volts)),
          delimiter=",",
          header="Time [s], Voltage [V]",
          comments=''
        )

        trigger_writer.writerow([event_index + 1, f"{t_now:.6f}", timestamp_str, wf_filename])
        trigger_times.append(t_now)
        event_index += 1

        time.sleep(poll_interval)  # small pause before re‑arming

# -------------------- Cleanup --------------------
keithley.write("smua.source.leveli = 0")
keithley.write("smua.source.output = smua.OUTPUT_OFF")
print(scope.query("TRIG_MODE?").strip())
scope.write("DISP ON")
keithley.close()
scope.close()
print("Keithley source turned off safely.")
print(f"Finished. {event_index} triggers and {len(switch_times)} switching events logged.")

# -------------------- Plotting Trigger Events --------------------
x = [i * poll_interval for i in range(int(duration / poll_interval))]
y = [0] * len(x)
for t in trigger_times:
    idx = int(t / poll_interval)
    if 0 <= idx < len(y):
        y[idx] = 1

plt.figure(figsize=(10, 4))
plt.step(x, y, where='post', label='Trigger Event')
plt.xlabel("Time since start [s]")
plt.ylabel("Signal (0 or 1)")
plt.title("Trigger Events Over Time")
plt.ylim(-0.2, 1.2)
plt.grid(True)
plt.tight_layout()
plot_filename = os.path.join(save_path, "trigger_plot.png")
plt.savefig(plot_filename, dpi=300)
print(f"Trigger plot saved to: {plot_filename}")

# -------------------- Plotting All Captured Waveforms --------------------
waveform_files = sorted(
    [f for f in os.listdir(save_path) if f.startswith("waveform_") and f.endswith(".csv")]
)
if waveform_files:
    plt.figure(figsize=(10, 4))
    for fname in waveform_files:
        data = np.loadtxt(os.path.join(save_path, fname), delimiter=",", skiprows=1)
        plt.plot(data[:,0]*1e6, data[:,1], alpha=0.5)

    plt.xlabel("Time [µs]")
    plt.ylabel("Voltage [V]")
    plt.title("Overlay of Triggered Waveforms")
    plt.grid(True)
    plt.tight_layout()
    overlay_path = os.path.join(save_path, "waveform_overlay.png")
    plt.savefig(overlay_path, dpi=300)
    print(f"Overlay plot saved to: {overlay_path}")
else:
    print("No waveform CSV files found to plot.")

I removed some lines for safety reasons, but the connection works. The scope responds to my commands. To test my setup, I connected a wave generator to my scope, sending a 25mV pulse each 10 seconds. So in theory, if I set my trigger level to 20mV and a duration of 40 seconds, I should get 4 signals (give or take 1, depending when I start the script). But the script only gives me 1 or max 2 signals. Sometimes even no signals. Once I get signals, they look like expected. 

Could someone help me locate the error in my logic ? And if possible tell me how to fix it ? I am working now multiple weeks on that code with the help of multiple people, but it does not seem to work.

Every help is appreciated.

Greetings from Zürich, Switzerland.

0 Kudos
Message 1 of 1
(125 Views)