Sunday, October 16, 2022

DynCont#9 Simple Looped AI and AO using PXIe, nidaqmx, Python, Labview, NIMAX

  1. It took 3 days of poking, trial and error to get this python program working. This can only be run with hardware (PXIe system, with NI 6356 card. It has 8 diff analog inputs and 2 analog outputs
  2. No, you cannot use 8 differential as 16 ground referenced single ended channels. Pay more for that to Hungary.
  3.  LABVIEW not essential
  4. NIMAX drivers required for configuring connections, checking hardware using test panels
  5. nidaqmx python library used instead of labview.
  6. LABVIEW is a mind melting mixed veg noodles, that is 2 days old.
  7. This program is a simple looped AI and AO 
  8. It is not yet software timed. To do software timed loop, use a while loop inside the outer loop to check if your control loop update time has elapsed and then continue with the next outer loop pass.
  9. In software timed loop, if one outer loop pass takes more time than your control update time, then increase control update time.
  10. In hardware timed loop, use call backs from input task that is triggered after samples are read from the buffer to the computer.
  11. Program below samples  4 channels of analog in TCV, BCV, Excitation and Eddy. Writes 2 outputs TCV and BCV. TCV and BCV are looped into analog in also.
  12. No hardware or software timing, no processing using the sampled values to calculate the outputs. Only seeing how fast simple loop runs. This is the bare minimum control update possible. This uses stream readers and stream writers. Uses shared clock between analog in and analog out. Uses a trigger from analog in to start analog out.
from nidaqmx import stream_readers
from nidaqmx import stream_writers
import nidaqmx
import numpy as np
import matplotlib.pyplot as plt
import time
import pdb
print("="*80);
print("Revised = Balaji Sankar, 15 October 2022: Attempting code using logic from Python_DAQmx_examples-master")
print("Code to interact with PXIe 6356")
print("Check test panel of MAX to get the channel and chassis details.")
print(" ");
print("Always ensure computer ipV4 setting is in DHCP mode for auto assigning of IP address to CDAQ. Other wise you get error 201252 that there is no route to device or network adapter not fournd. I wasted 2 days full on googling this issue.")
print(" ");
print("Virtual channel = Physical channel + Terminal connection + Scaling");
print("Task = Virtual channel + Timing + Triggering");
print("="*80);
# Setup
#--------
# Take more samples at a time so i can average it
numberOfInputChannels = 4;
numberOfOutputChannels = 2;
outputData_4streamWriter= np.array([1,1], dtype=np.float64)
inputData_4streamReader = np.array([0,0,0,0], dtype=np.float64)
Fs_control = 5e3;#Hz
Fs_sample = Fs_control;
acquisitionRunTimePerLoop = 1/Fs_sample;
nLoops = 20;
experimentRunTime = acquisitionRunTimePerLoop*nLoops;
data = np.zeros((1+numberOfInputChannels,nLoops), dtype=np.float64)# Declare buffer array, one more than the channels for time.
data[0] = np.linspace(0,experimentRunTime-(1/Fs_sample) ,nLoops) # Time
# Create output tasks
try:
print('Closing pending tasks')
inputTask.stop()
outputTask.stop()
inputTask.close()
outputTask.close()
print('Closed pending tasks')
except NameError:
print('Tasks are already closed.')
except:
print('Escapting errors like this should not be done.')
outputTask = nidaqmx.task.Task('balajiOutputTask');
outputTask.ao_channels.add_ao_voltage_chan("PXI1slot5/ao0")
outputTask.ao_channels.add_ao_voltage_chan("PXI1slot5/ao1")
# If rate =1, each sample of the data array will last for 1 second.
# If samps_per_chan is 10 and input data given to DAQ has 2 samples, the input data will be repeated 5 times.
# Check the regeneration mode property for this.
# Then the pulse train will last for 10 seconds.
# If samps_per_chan is 5 and input data given to DAQ has 7 samples, only the first 5 samples of input data will be written.
outputTask.timing.cfg_samp_clk_timing(rate=Fs_control)
#samps_per_chan=samplesPerOutputChannelPerPass,\
#sample_mode=nidaqmx.constants.AcquisitionType.FINITE);
# Create input task
inputTask = nidaqmx.task.Task('balajiInputTask');
# Creating input channel
inputTask.ai_channels.add_ai_voltage_chan("PXI1slot5/ai0",\
name_to_assign_to_channel='TCV', max_val=5, min_val=0,\
terminal_config=nidaqmx.constants.TerminalConfiguration.DIFF)
inputTask.ai_channels.add_ai_voltage_chan("PXI1slot5/ai1",\
name_to_assign_to_channel='BCV', max_val=5, min_val=0,\
terminal_config=nidaqmx.constants.TerminalConfiguration.DIFF)
inputTask.ai_channels.add_ai_voltage_chan("PXI1slot5/ai2",\
name_to_assign_to_channel='EXC', max_val=5, min_val=0,\
terminal_config=nidaqmx.constants.TerminalConfiguration.DIFF)
inputTask.ai_channels.add_ai_voltage_chan("PXI1slot5/ai3",\
name_to_assign_to_channel='EDD', max_val=0, min_val=-10,\
terminal_config=nidaqmx.constants.TerminalConfiguration.DIFF)
# Timing configuration of input , Using the clock of output task in the input.
print('Input task clock is given from output task');
inputTask.timing.cfg_samp_clk_timing(rate=Fs_sample,\
source= '/PXI1Slot5/ao/SampleClock',\
sample_mode=nidaqmx.constants.AcquisitionType.CONTINUOUS);
print('writer.write must start before reader.read. Only then reader.read will get clock signal.')
#If I give FINITE, there is a warning that the task might have stopped before all values are acquired. In Continous, whatever is there in buffer is read.
#pdb.set_trace()
print(" ")
print("=======================")
print("Signal details:")
print("Acquisition run time = {} mu s".format(acquisitionRunTimePerLoop*1e6))
print("Experiment run time = {} m s".format(experimentRunTime*1e3))
print("Control frequency GEN = {} kHz".format(Fs_control/1e3))
print("Sample frequency ACQ = {} kHz".format(Fs_sample/1e3))
print(" ")
# Start writing
# pdb.set_trace()
# Each row of data is one channel.
# From https://github.com/fhchl
# Commiting both the tasks reduces lag between read and write
# Before this, there is 1.3 second lag between input and output TASK.
# After only committing , even without trigger modifications, the lag is unnoticeable at 10s/s. It seems to be 5ms from online posts
# In addition, to this also configure start trigger of write task to take the trigger from the read task.
inputTask.control(nidaqmx.constants.TaskMode.TASK_COMMIT)
#After committing the input task, the inputTask.triggers.start_trigger.term property becomes
#available. Use this as the trigger setting for outputTask.
# outputTask.triggers.start_trigger.cfg_dig_edge_start_trig(inputTask.triggers.start_trigger.term);
# Not sure committing output task helps.
outputTask.control(nidaqmx.constants.TaskMode.TASK_COMMIT)
# * Do allow sample regeneration: i.e. the buffer contents will play repeatedly (cyclically).
# http://zone.ni.com/reference/en-XX/help/370471AE-01/mxcprop/attr1453/
# For more on DAQmx write properties: http://zone.ni.com/reference/en-XX/help/370469AG-01/daqmxprop/daqmxwrite/
# For a discussion on regeneration mode in the context of analog output tasks see:
# https://forums.ni.com/t5/Multifunction-DAQ/Continuous-write-analog-voltage-NI-cDAQ-9178-with-callbacks/td-p/4036271
outputTask.out_stream.regen_mode = nidaqmx.constants.RegenerationMode.DONT_ALLOW_REGENERATION
# This will hide any error in our output channel.
print('\n BS Start trigger of output task linked to input start trigger.\n Not sure if this helps. As clock of input is from output task.')
outputTask.triggers.start_trigger.cfg_dig_edge_start_trig( '/PXI1Slot5/ai/StartTrigger' )
print(' Note that now the AO task must be started before the AI task in order for the synchronisation to work');
print(' BS Setup complete');
# Get the streams for the tasks.
reader = stream_readers.AnalogMultiChannelReader(inputTask.in_stream)
writer = stream_writers.AnalogMultiChannelWriter(outputTask.out_stream)
# Now you have to start the output task as auto start is set to false. It will start and wait for trigger from input task.
outputTask.start();
inputTask.start();
# outputTask.wait_until_done()
# Do not use wait until done. It pauses execution.
# So input task starts only after output tasks gets over. THere is no parallel execution.
print('Loop starts..')
time.clock() # start the clock
for i in np.arange(0,nLoops):
writer.write_one_sample(i/nLoops*outputData_4streamWriter, timeout=1)
reader.read_one_sample(inputData_4streamReader,timeout=1);
data[0,i] = time.clock();
data[1:,i] = inputData_4streamReader;
data[0,:] = data[0,:] - np.min(data[0,:])
print('Loop stopped.')
print('Remember the output is triggered after the input is read, by the input trigger link')
# Start time from 0
print('='*80);
# Close and cleanup
inputTask.stop()
outputTask.stop()
inputTask.close()
outputTask.close()
# Post process.
t = time.localtime();
prepend = 'straight'
# Write data to csv file for post processing.
fileName = './output/'+prepend +'PID_t_{}_{}_{}_{}_{}_{}.csv'.format(t.tm_year,t.tm_mon,t.tm_mday,t.tm_hour,t.tm_min,t.tm_sec)
np.savetxt(fileName,data.transpose(),delimiter=',',)
# Plot the main signals
plt.figure(figsize=(10, 16))
plt.subplot(311)
#plt.plot(data[0],data[1],'-o')
plt.plot(data[1],'-o')
#plt.axis([np.min(data[0]) , np.max(data[0]), -0.1, 5])
plt.ylabel('TCV Signal V'.format())
plt.title('TCV loop back Fs={:4.1f} Hz'.format(Fs_sample))
plt.subplot(312)
plt.plot(data[0],data[2],'-o')
plt.axis([np.min(data[0]) , np.max(data[0]), -0.1, 5])
plt.ylabel('BCV Signal V'.format())
#plt.title('BCV loop back Sample rate={:4.1f} Hz'.format(Fs_sample))
plt.subplot(313)
plt.plot(data[0],data[3],'-o')
plt.axis([np.min(data[0]) , np.max(data[0]), -0.1, 5])
plt.ylabel('EXC Signal V'.format())
#plt.title('Exc Sample rate={:4.1f} Hz'.format(Fs_sample))
fileName = './output/'+prepend +'plot_t_{}_{}_{}_{}_{}_{}.png'.format(t.tm_year,t.tm_mon,t.tm_mday,t.tm_hour,t.tm_min,t.tm_sec)
#plt.savefig(fileName, bbox_inches='tight')
plt.show()
#plt.close();
def query_devices():
local_system = nidaqmx.system.System.local()
driver_version = local_system.driver_version
print('Version information = DAQmx {0}.{1}.{2}'.format(driver_version.major_version, driver_version.minor_version,
driver_version.update_version))
for device in local_system.devices:
print('Device Name: {0}, Product Category: {1}, Product Type: {2}'.format(
device.name, device.product_category, device.product_type))
if __name__ == "__main__":
query_devices()

No comments:

Post a Comment