I found the bug, figured out error handling, and will now post a solution for long-scans for others (simplified). It turns out
self.sampler0.set_gain_mu(7-i, 0) almost always throws overflow errors on long scans. Much better to pass a counter to it. It also turns out that (at least for four channels on the sampler running continuously/arbitrary number of samples that adding in a samples/5 * 30us seemed to be about right. This is tested up for 30000 samples taken at 10kHz running 500 times all within a kernel.
num = samples/5 +1
for n in range(samples):
delay(30usnum) #seems about right from testing to
`
from artiq.experiment import *
import numpy as np
from basic_func1 import Basic_Task
import time
from function_library2 import Sampler_Multiple_Samples
from function_library2 import Zotino_Voltage
from function_library2 import DDSControl
from function_library2 import Sample_Set_Voltage
class Controls(EnvExperiment):
"""Controls """
def build(self):
self.setattr_device("core")
self.delay = 100*us
self.setattr_argument("integration", NumberValue(default=1*ms,
unit = "ms", step=.5))
self.sampler = Sampler_Multiple_Samples(self)
self.sampler.build()
self.dac = Zotino_Voltage(self)
self.dac.build()
self.measure = Sample_Set_Voltage(self)
self.measure.build()
# self.setattr_argument("Laser_V_Scan",
# Scannable(RangeScan(10, 20, 6, randomize=False),unit="V"))
self.setattr_argument("Laser_V_Scan",
Scannable(default=[ RangeScan(.5, 2.5, 51, randomize=False),
CenterScan(1, 1.5,.005, randomize=False),
NoScan(15,1),
],unit="V"))
def process_data(self,chs,channel_names,counter):
'Take mean and standard deviation of data'
#Takes the sampled data and processes it
sz = len(chs)
# print('sz is ' + str(sz))
# mean_data = np.full(sz, np.nan)
# std_data = np.full(sz, np.nan)
l=0
j=0
for ch in chs:
# print(ch)
dat = np.array(self.measure.get_dataset(ch))
# print(dat)
mean= np.mean(dat)
# mean_data[j] = mean
sigma = np.std(dat)
# std_data[j] = sigma
self.mutate_dataset(channel_names[l], counter, mean)
self.mutate_dataset(channel_names[l+1], counter, sigma)
j=j+1
l=l+2
@kernel
def run(self):
# l=1
self.core.reset()
self.core.break_realtime()
desired_samples = int( self.integration / self.sampler.delay)+1
print('desired samples')
print(desired_samples)
self.set_dataset("Laser_V_Scan", self.Laser_V_Scan.sequence,
broadcast=True, persist=False, archive=True)
self.set_dataset("integration_time", self.integration, persist=True)
timestamps_mu = 'Time (Machine Units)'
channel_names = ["Temperature_R",'Temperature_R_Sig',
"Laser_Power_V", "Laser_Power_V_Sig",
"Ref_Spectrum_V", "Spectrum_V",
"Ref_Spectrum_Sig_V", "Spectrum_Sig_V"]
channel_names_in = ["sampler0_chan5", "sampler0_chan6",
"sampler0_chan7", "sampler0_chan8"]
chan_data = np.full(self.Laser_V_Scan.npoints, np.nan)
for ch in channel_names:
self.set_dataset(ch, chan_data, broadcast=True,persist=True)
old_time = self.core.get_rtio_counter_mu()
counter = 0
for V in self.Laser_V_Scan.sequence:
print(V)
self.measure.take_data(samples=desired_samples
,channel=0, voltage=V
,counter=counter)
self.process_data(channel_names_in,channel_names,counter)
counter = counter + 1
current_time = self.core.get_rtio_counter_mu()
time_elapsed = self.core.mu_to_seconds(current_time-old_time)
print('done')
print('time elapsed is')
print(time_elapsed)
`
`
"""helper functions in function_library2"""
class Zotino_Voltage(HasEnvironment):
"""Zotino: Ramp Generator"""
def build(self):
self.setattr_device("core")
self.setattr_device("zotino0")
self.delay= 100*us # on the scope it looks like it takes 8us to switch
@kernel
def write_voltage(self,channel,voltage):
# self.core.reset()
self.core.break_realtime()
self.zotino0.init()
delay(self.delay)
while True:
try:
self.zotino0.write_dac(channel, voltage)
self.zotino0.load()
delay(self.delay)
break
except RTIOUnderflow:
delay(self.delay)
class Sample_Set_Voltage(HasEnvironment):
"""Sample 4 channels and set voltages"""
def build(self):
self.delay= 100*us
self.setattr_device("core")
# self.setattr_device("sampler0")
# self.setattr_device("zotino0")
self.dac = Zotino_Voltage(self)
self.dac.build()
self.adc = Sampler_Multiple_Samples(self)
self.adc.build()
# self.zotino0.init()
# self.sampler0.init()
@kernel
def take_data(self,samples,channel,voltage,counter):
self.core.break_realtime()
# self.sampler0.init()
delay(self.delay)
#%% Take Data
self.core.break_realtime()
self.dac.write_voltage(channel,voltage)
delay(self.delay)
self.adc.take_data(samples,counter)
class Sampler_Multiple_Samples(HasEnvironment):
"""Sampler V3: all channels with timestamps"""
def build(self):
self.delay= 100*us
self.setattr_device("core")
self.setattr_device("sampler0")
@kernel
def take_data(self,samples,counter):
self.sampler0.init()
#%% Prep Stuff
n_channels = 4
smp = [0.0]*n_channels
channel_names = channel_names = ["sampler0_chan5", "sampler0_chan6",
"sampler0_chan7", "sampler0_chan8"]
timestamps_mu = 'Time (Machine Units)'
for i in range(n_channels):
chan_data = np.full(samples, np.nan)
self.set_dataset(channel_names[i], chan_data, broadcast=True,
persist=False,unit = 'V')
# delay(self.delay)
if counter ==0:
self.sampler0.set_gain_mu(7-i, 0)
tstamp_data_mu = np.full(samples,self.core.get_rtio_counter_mu())
self.set_dataset(timestamps_mu, tstamp_data_mu, broadcast=True,
persist=False)
#%% Data Taking
num = samples/5 +1
for n in range(samples):
delay(30*us*num) #seems about right from testing to
#avoid RTIO errors on the sampler
while True:
try:
current_time = self.core.get_rtio_counter_mu()
self.sampler0.sample(smp) #runs sampler and saves to list
# current_time = self.core.get_rtio_counter_mu()
for i in range(n_channels):
self.mutate_dataset(channel_names[i], n, smp[i])
self.mutate_dataset(timestamps_mu, n, current_time)
# delay(self.delay)
break
except RTIOUnderflow:
delay(self.delay)
except RTIOOverflow:
delay(self.delay)
`