Thursday, September 8, 2022

Python program for controlling NI-cDAQ device without LABView installation

  1.  Requires NI-MAX driver more than 17.0
  2. Lots of dependencies for nidaqmx python wrapper for the C API given by NI
  3. Plotting is using Matplotlib
  4. Use pip download option to download all the required dependencies as whl files on the computer with internet and then transfer the folder with whl files to the DAQ computer that is without internet
  5. Use pip install --find-links --no-index option to install the nidaqmx on the DAQ computer without internet
  6. The python program below generates signals used to excite the power amplifiers.Force from power amplifiers is measured via FX293 load cells via NI9205 input card.
  7. Generation and acquisition tasks must be committed before being used. Other wise there is big lag between start of generation task and start of acquisition tas
  8. See if you can use same clock source for both generation and acquisition to ensure synchronous use
  9. Trigger can be used on output task. Output tasks trigger is given by input task. input task should be committed before taking trigger from it.
  10. If you are using a task in a loop, always close the task for each loop use. Other wise the task will not read new data.
  11. Program below:
import nidaqmx
import numpy as np
import matplotlib.pyplot as plt
import time
import pdb
print("===========================================");
print("Revised = Balaji Sankar, 07 September 2022")
print("Revised = Balaji Sankar, 08 September 2022: trigger AO and AI simulatneously")
print("Original = Balaji Sankar, 18 September 2018")
print("Code to interact with CDAQ Analog input 9205 and output 9264 cards")
print("To generate signal for actuator excitation and to acquire data from load cell")
print("Input card 9205 is in slot 1 of cDAQ9189 chassis")
print("Output card 9264 is in slot 2 of cDAQ9189 chassis")
print("Using cDAQ 9189 chassis with serial number 01C8D546")
print("Host name cDAQ9189-1C8D546")
print("Check test panel of MAX to get the channel and chassis details.")
print(" ");
print("Always ensure computer ipV4 setting is in DHCP mode for auto assigning of IP address to CDAQ. Other wise you get error 201252 that there is no route to device or network adapter not fournd. I wasted 2 days full on googling this issue.")
print(" ");
print(" Virtual channel = Physical channel + Terminal connection + Scaling");
print(" Task = Virtual channel + Timing + Triggering");
print("===========================================");
def query_devices():
local_system = nidaqmx.system.System.local()
driver_version = local_system.driver_version
print('Version information = DAQmx {0}.{1}.{2}'.format(driver_version.major_version, driver_version.minor_version,
driver_version.update_version))
for device in local_system.devices:
print('Device Name: {0}, Product Category: {1}, Product Type: {2}'.format(
device.name, device.product_category, device.product_type))
# Setup
#--------
# Number of steps in the generated staircase given as excitation signal to the bottom amplifier.
bcvSigSeq = np.append(np.linspace(0,5,10),0);
bcvSigSeq = np.linspace(0,5,10);
# second and third row supply the 5 volt input supply for load cell
# DAQ output current nominal = 4mA
# Load cell current flow = 3mA
outputData = np.array([bcvSigSeq,5*np.ones(bcvSigSeq.shape),5*np.ones(bcvSigSeq.shape)]);
outputData[1,-1] = 0;
outputData[2,-1] = 0;
numStepsInGenSignal = outputData.shape[1];
samplingFrequency_GEN = 1;#Hz
samplingFrequency_ACQ = 10#Hz
acquisitionRunTime = numStepsInGenSignal*1/samplingFrequency_GEN;
samplesPerChannel_ACQ = int(samplingFrequency_ACQ*acquisitionRunTime);
#0 = Time
#1 = BCV Excitation signal loop back
#2 = Load cell 1
#3 = Load cell 2
#4 = 5V supply to load cell?
data = np.zeros((4,samplesPerChannel_ACQ))# Declare buffer array
data[0] = np.linspace(0,acquisitionRunTime-(1/samplingFrequency_ACQ) ,samplesPerChannel_ACQ) # Time
# Create output tasks
try:
print('Closing pending tasks')
inputTask.stop()
outputTask.stop()
inputTask.close()
outputTask.close()
print('Closed pending tasks')
except NameError:
print('Tasks are already closed.')
except:
print('Escapting errors like this should not be done.')
outputTask = nidaqmx.task.Task('balajiOutputTask');
outputTask.ao_channels.add_ao_voltage_chan("cDAQ9189-1C8D546Mod2/ao0")
outputTask.ao_channels.add_ao_voltage_chan("cDAQ9189-1C8D546Mod2/ao3")
outputTask.ao_channels.add_ao_voltage_chan("cDAQ9189-1C8D546Mod2/ao4")
# If rate =1, each sample of the data array will last for 1 second.
# If samps_per_chan is 10 and input data given to DAQ has 2 samples, the input data will be repeated 5 times.
# Then the pulse train will last for 10 seconds.
# If samps_per_chan is 5 and input data given to DAQ has 7 samples, only the first 5 samples of input data will be written.
outputTask.timing.cfg_samp_clk_timing(rate=samplingFrequency_GEN,\
samps_per_chan=numStepsInGenSignal,\
sample_mode=nidaqmx.constants.AcquisitionType.FINITE);
# Create input task
inputTask = nidaqmx.task.Task('balajiInputTask');
# Creating input channel
inputTask.ai_channels.add_ai_voltage_chan("cDAQ9189-1C8D546Mod1/ai0",\
name_to_assign_to_channel='Generated signal loop back', max_val=5, min_val=0,\
terminal_config=nidaqmx.constants.TerminalConfiguration.RSE)
inputTask.ai_channels.add_ai_voltage_chan("cDAQ9189-1C8D546Mod1/ai1",\
name_to_assign_to_channel='Load cell 1', max_val=5, min_val=0,\
terminal_config=nidaqmx.constants.TerminalConfiguration.RSE)
inputTask.ai_channels.add_ai_voltage_chan("cDAQ9189-1C8D546Mod1/ai2",\
name_to_assign_to_channel='Load cell 2', max_val=5, min_val=0,\
terminal_config=nidaqmx.constants.TerminalConfiguration.RSE)
#inputTask.ai_channels.add_ai_voltage_chan("cDAQ9189-1C8D546Mod1/ai3",\
#name_to_assign_to_channel='Supply 5V to load cell ', max_val=6, min_val=0,\
#terminal_config=nidaqmx.constants.TerminalConfiguration.RSE)
# Timing configureation
inputTask.timing.cfg_samp_clk_timing(rate=samplingFrequency_ACQ,\
sample_mode=nidaqmx.constants.AcquisitionType.FINITE,\
samps_per_chan=samplesPerChannel_ACQ)
#pdb.set_trace()
print(" ")
print("=======================")
print("Signal details:")
print("BCV start = {} V".format(outputData[0,0]))
print("BCV first value = {} V".format(outputData[0,1]))
print("BCV penultimate = {} V".format(outputData[0,-2]))
print("BCV end = {} V".format(outputData[0,-1]))
print("Acquisition run time = {} s".format(acquisitionRunTime))
print("Acquisition samples = {} nos".format(samplesPerChannel_ACQ))
print("Sample frequency ACQ = {} kHz".format(samplingFrequency_ACQ/1e3))
print(" ")
print("outputData = ")
print(outputData)
print(" ")
# Start writing
#pdb.set_trace()
# Each row of data is one channel.
# From page 51 (55 of 303) in nidaqmx-python.pdf
#outputTask.out_stream.relative_to = nidaqmx.constants.WriteRelativeTo.FIRST_SAMPLE
# From page 51 (55 of 303) in nidaqmx-python.pdf
#nidaqmx.constants.WriteRelativeTo
# From https://github.com/fhchl
# Commiting both the tasks reduces lag between read and write
# Before this, there is 1.3 second lag between input and output TASK.
# After only committing , even without trigger modifications, the lag is unnoticeable at 10s/s. It seems to be 5ms from online posts
# In addition, to this also configure start trigger of write task to take the trigger from the read task.
inputTask.control(nidaqmx.constants.TaskMode.TASK_COMMIT)
#After committing the input task, the inputTask.triggers.start_trigger.term property becomes
#available. Use this as the trigger setting for outputTask.
outputTask.triggers.start_trigger.cfg_dig_edge_start_trig(inputTask.triggers.start_trigger.term);
# Not sure committing output task helps.
outputTask.control(nidaqmx.constants.TaskMode.TASK_COMMIT)
outputTask.write(data=outputData,auto_start=False)
# Now you have to start the output task as auto start is set to false. It will start and wait for trigger from input task.
outputTask.start();
# outputTask.wait_until_done()
# Do not use wait until done. It pauses execution.
# So input task starts only after output tasks gets over. THere is no parallel execution.
# Start acquiring immediately after output task start.
data[1:4] = inputTask.read(number_of_samples_per_channel=nidaqmx.constants.READ_ALL_AVAILABLE, timeout=20.0);
inputTask.wait_until_done()
t = time.localtime();
prepend = 'straight'
# Write data to csv file for post processing.
fileName = './output/'+prepend +'loadCellData_t_{}_{}_{}_{}_{}_{}.csv'.format(t.tm_year,t.tm_mon,t.tm_mday,t.tm_hour,t.tm_min,t.tm_sec)
np.savetxt(fileName,data.transpose(),delimiter=',',)
# Plot the main signals
plt.figure(figsize=(10, 16))
plt.subplot(311)
plt.plot(data[0],data[1],'-o')
plt.axis([0 , np.max(data[0]), -0.1, 6])
plt.ylabel('BCV Signal V'.format())
plt.title('BCV loop back Smpl={:4.1f} Hz'.format(samplingFrequency_ACQ))
plt.subplot(312)
plt.plot(data[0],data[2],'-o')
plt.axis([0 , np.max(data[0]), -0.1, 3])
plt.ylabel('Load cell signal 1'.format())
plt.title('Load cell V 1 Smpl={:4.1f} Hz'.format(samplingFrequency_ACQ))
plt.subplot(313)
plt.plot(data[0],data[3],'-o')
plt.axis([0 , np.max(data[0]), -0.1, 3])
plt.ylabel('Load cell signal 2'.format())
plt.title('Load cell V 2 Smpl={:4.1f} Hz'.format(samplingFrequency_ACQ))
fileName = './output/'+prepend +'plot_t_{}_{}_{}_{}_{}_{}.png'.format(t.tm_year,t.tm_mon,t.tm_mday,t.tm_hour,t.tm_min,t.tm_sec)
plt.savefig(fileName, bbox_inches='tight')
#plt.show()
plt.close();
# Stop the tasks to make the next loop execution start afresh.
# Otherwise, you get the error
#DaqWriteError: Attempted to write a sample beyond the final sample generated. The generation has stopped, therefore the sample specified by the combination of position and offset w
#ill never be available.
#
#Specify a position and offset which selects a sample up to, but not beyond, the final sample generated. The final sample generated can be determined by querying the total samples g
#enerated after a generation has stopped.
#Attempted to Write Sample: 4
#Property: DAQmx_Write_RelativeTo
#Corresponding Value: DAQmx_Val_FirstSample
#Property: DAQmx_Write_Offset
#Corresponding Value: 0
inputTask.stop()
outputTask.stop()
# Close and cleanup
inputTask.close()
outputTask.close()
if __name__ == "__main__":
query_devices()

No comments:

Post a Comment