Hi All,

I am trying to document and understand the behavior of the sampler and the zotino dac functions. The idea here is to be able to take data for long periods of time without errors. (Think of a clock operating for 1000 hours). We need to be able to measure, do something, measure etc. So the simplest version of this is change a voltage, measure with sampler and repeat. For now, sampling is turned off because just changing a voltage a few hundred times causes what appear to me to be random slack errors.

Relevant functions/classes simplified for readibility

`

from artiq.experiment import *
import numpy as np
from function_library2 import Sample_Set_Voltage
class Activation_Control(EnvExperiment):
"""Test Class"""
def build(self):
    self.setattr_device("core")
 

@kernel
def run(self):
  
    for i in range(400):
        self.measure.take_data(actual_samples,0, voltage=V)
        # delay(25*us)
        #self.core.wait_until_mu(now_mu())

    print('done')

`

In another folder called

`

 class Sample_Set_Voltage(HasEnvironment):
 """Sample 4 channels and set voltages"""
 def build(self):

     self.delay= 100*us
     self.setattr_device("core")
     self.setattr_device("sampler0")
     self.setattr_device("zotino0")
     self.setattr_device("core_dma")

     # self.zotino0.init()
     # self.sampler0.init()  


 @kernel
 def take_data(self,samples,channel,voltage):
     
     self.core.reset()
     self.zotino0.init()
     self.sampler0.init()     
     self.core.break_realtime()
     delay(self.delay)
     # self.record()

     #%% Prep Stuff
     n_channels = 4
     smp = [0.0]*n_channels
     channel_names = channel_names = ["sampler0_chan5", "sampler0_chan6",
                                      "sampler0_chan7", "sampler0_chan8"]
     timestamps_mu = 'Time (Machine Units)'
     
     for i in range(n_channels):
         chan_data = np.full(samples, np.nan)
         self.set_dataset(channel_names[i], chan_data, broadcast=True)
         self.sampler0.set_gain_mu(7-i, 0) 
     
     tstamp_data_mu = np.full(samples,self.core.get_rtio_counter_mu())     
           

     self.set_dataset(timestamps_mu, tstamp_data_mu, broadcast=True)
     #%% Take Data
     pulses_handle = self.core_dma.get_handle("ramp")
     self.core.break_realtime()
     for n in range(samples):  

        self.zotino0.write_dac(channel, voltage)      
        # delay(5*us)                                           #loops over number of samples do be taken
        # self.sampler0.sample(smp)                                                #runs sampler and saves to list 
        current_time = self.core.get_rtio_counter_mu()

        for i in range(n_channels):
            self.mutate_dataset(channel_names[i], n, smp[i])
        self.mutate_dataset(timestamps_mu, n, current_time)
        delay(self.delay)   

        # self.core.wait_until_mu(now_mu())

`

![
](https://)

What is interesting here is that I am getting a sampler underflow error despite not calling the sampler

`
root:Terminating with exception (RTIOOverflow: RTIO underflow at 1031760933091536 mu, channel 0x000c:spi_sampler0_pgia, slack -18120712 mu)
Core Device Traceback:
Traceback (most recent call first):
File "libksupport/src/rtio_csr.rs", line 71, column 10, in (Rust function)
<unknown>
^
File "/home/grpartiq/ARTIQ/artiq-master/repository/Experiments/Activation_Control_v5.py", line 152, in ?? (RA=+0x160)
self.measure.take_data(samples=1,channel=0, voltage=0)
File "<artiq>/coredevice/spi2.py", line 228, in ... artiq.coredevice.spi2.SPIMaster.write<artiq.coredevice.spi2.SPIMaster>(...) (RA=+0x4c4)
rtio_output((self.channel << 8) | SPI_DATA_ADDR, data)
File "<artiq>/coredevice/sampler.py", line 98, in ... artiq.coredevice.sampler.Sampler.set_gain_mu<artiq.coredevice.sampler.Sampler>(...) (inlined)
self.bus_pgia.write(gains << 16)
File "/home/grpartiq/ARTIQ/artiq-master/repository/Experiments/function_library2.py", line 137, in ... function_library2.Sample_Set_Voltage.take_data<function_library2.Sample_Set_Voltage>(...) (inlined)
self.sampler0.set_gain_mu(7-i, 0)
artiq.coredevice.exceptions.RTIOOverflow(0): RTIO underflow at 1031760933091536 mu, channel 0x000c:spi_sampler0_pgia, slack -18120712 mu

End of Core Device Traceback
Traceback (most recent call last):
File "/nix/store/fcs9ly81kipbm1z012704kw01zbbfpfd-python3-3.12.9-env/lib/python3.12/site-packages/artiq/master/worker_impl.py", line 378, in main
exp_inst.run()
File "/nix/store/fcs9ly81kipbm1z012704kw01zbbfpfd-python3-3.12.9-env/lib/python3.12/site-packages/artiq/language/core.py", line 54, in run_on_core
return getattr(self, arg).run(run_on_core, ((self,) + k_args), k_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/fcs9ly81kipbm1z012704kw01zbbfpfd-python3-3.12.9-env/lib/python3.12/site-packages/artiq/coredevice/core.py", line 177, in run
self._run_compiled(kernel_library, embedding_map, symbolizer, demangler)
File "/nix/store/fcs9ly81kipbm1z012704kw01zbbfpfd-python3-3.12.9-env/lib/python3.12/site-packages/artiq/coredevice/core.py", line 166, in run_compiled
self.comm.serve(embedding_map, symbolizer, demangler)
File "/nix/store/fcs9ly81kipbm1z012704kw01zbbfpfd-python3-3.12.9-env/lib/python3.12/site-packages/artiq/coredevice/comm_kernel.py", line 739, in serve
self.
serve_exception(embedding_map, symbolizer, demangler)
File "/nix/store/fcs9ly81kipbm1z012704kw01zbbfpfd-python3-3.12.9-env/lib/python3.12/site-packages/artiq/coredevice/comm_kernel.py", line 721, in _serve_exception
raise python_exn
artiq.coredevice.exceptions.RTIOOverflow: RTIO underflow at 1031760933091536 mu, channel 0x000c:spi_sampler0_pgia, slack -18120712 mu

`

Note this error does NOT exist when running " for i in range(100):" or something suitably smaller. How can I extend this range out to millions of points if needed? I tried playing with different time delays and/or # self.core.wait_until_mu(now_mu()) . However, these commands made no difference, the controller just jumps into a bad spot and it seems like there is no way to predict it. The slack was good over several/many cycles before suddenly failing.

I found the bug, figured out error handling, and will now post a solution for long-scans for others (simplified). It turns out

self.sampler0.set_gain_mu(7-i, 0) almost always throws overflow errors on long scans. Much better to pass a counter to it. It also turns out that (at least for four channels on the sampler running continuously/arbitrary number of samples that adding in a samples/5 * 30us seemed to be about right. This is tested up for 30000 samples taken at 10kHz running 500 times all within a kernel.

num = samples/5 +1
for n in range(samples):
delay(30usnum) #seems about right from testing to

`

from artiq.experiment import *
import numpy as np
from basic_func1 import Basic_Task
import time

 from function_library2 import Sampler_Multiple_Samples
 from function_library2 import Zotino_Voltage
 from function_library2 import DDSControl
 from function_library2 import Sample_Set_Voltage

class Controls(EnvExperiment):
"""Controls """
def build(self):
    self.setattr_device("core")
    self.delay = 100*us    
    self.setattr_argument("integration", NumberValue(default=1*ms,
                                                     unit = "ms", step=.5))
    self.sampler = Sampler_Multiple_Samples(self)
    self.sampler.build()
    
    self.dac = Zotino_Voltage(self)
    self.dac.build()
    
    self.measure = Sample_Set_Voltage(self)
    self.measure.build()
    
    # self.setattr_argument("Laser_V_Scan",
    #     Scannable(RangeScan(10, 20, 6, randomize=False),unit="V"))
    
    
    self.setattr_argument("Laser_V_Scan",
        Scannable(default=[ RangeScan(.5, 2.5, 51, randomize=False),
                           CenterScan(1, 1.5,.005, randomize=False),
                           NoScan(15,1),                            
                          ],unit="V"))
    


def process_data(self,chs,channel_names,counter):
        'Take mean and standard deviation of data'
    #Takes the sampled data and processes it
        sz = len(chs)
        # print('sz is ' + str(sz))
        # mean_data = np.full(sz, np.nan)
        # std_data = np.full(sz, np.nan)

        l=0
        j=0
        for ch in chs:
            # print(ch)
            dat = np.array(self.measure.get_dataset(ch))
            # print(dat)
            mean= np.mean(dat)
            # mean_data[j] = mean
            sigma = np.std(dat)
            # std_data[j] = sigma
            self.mutate_dataset(channel_names[l], counter, mean)
            self.mutate_dataset(channel_names[l+1], counter, sigma)
            j=j+1
            l=l+2
          

@kernel
def run(self):
    # l=1
    self.core.reset()
    self.core.break_realtime()
    desired_samples = int( self.integration / self.sampler.delay)+1
    print('desired samples')
    print(desired_samples)
    self.set_dataset("Laser_V_Scan", self.Laser_V_Scan.sequence,
                broadcast=True, persist=False, archive=True)

    self.set_dataset("integration_time", self.integration, persist=True)
    
    timestamps_mu = 'Time (Machine Units)'
    channel_names = ["Temperature_R",'Temperature_R_Sig',
                     "Laser_Power_V", "Laser_Power_V_Sig",
                     "Ref_Spectrum_V", "Spectrum_V",
                     "Ref_Spectrum_Sig_V", "Spectrum_Sig_V"]
    
    channel_names_in = ["sampler0_chan5", "sampler0_chan6",
                           "sampler0_chan7", "sampler0_chan8"]
    
    chan_data = np.full(self.Laser_V_Scan.npoints, np.nan)
    for ch in channel_names:
        self.set_dataset(ch, chan_data, broadcast=True,persist=True)    
       
   
    old_time = self.core.get_rtio_counter_mu()

    counter = 0
    for V in self.Laser_V_Scan.sequence:
  
        print(V)
        
        self.measure.take_data(samples=desired_samples
                               ,channel=0, voltage=V
                               ,counter=counter)
        
        self.process_data(channel_names_in,channel_names,counter)
        
        counter = counter + 1
       

       
    current_time = self.core.get_rtio_counter_mu()
    
    time_elapsed = self.core.mu_to_seconds(current_time-old_time)

    print('done')
    print('time elapsed is')
    print(time_elapsed)

`

`
"""helper functions in function_library2"""

class Zotino_Voltage(HasEnvironment):
"""Zotino: Ramp Generator"""
def build(self):

    self.setattr_device("core")
    self.setattr_device("zotino0")
    self.delay= 100*us # on the scope it looks like it takes 8us to switch

@kernel
def write_voltage(self,channel,voltage):
    # self.core.reset()

    self.core.break_realtime()                                    
    self.zotino0.init()
    delay(self.delay)
    
    while True:
        try:
            self.zotino0.write_dac(channel, voltage)
            self.zotino0.load()
            delay(self.delay)
            break
        except RTIOUnderflow:
                delay(self.delay)


class Sample_Set_Voltage(HasEnvironment):
 """Sample 4 channels and set voltages"""
 def build(self):

     self.delay= 100*us
     self.setattr_device("core")
     # self.setattr_device("sampler0")
     # self.setattr_device("zotino0")

     self.dac = Zotino_Voltage(self)
     self.dac.build()


     self.adc = Sampler_Multiple_Samples(self)
     self.adc.build()
     # self.zotino0.init()
     # self.sampler0.init()  


 @kernel
 def take_data(self,samples,channel,voltage,counter):
     self.core.break_realtime()

     # self.sampler0.init()     
     delay(self.delay)

     #%% Take Data
     self.core.break_realtime()
     
     self.dac.write_voltage(channel,voltage)
     delay(self.delay)
     self.adc.take_data(samples,counter)
     
     
  

class Sampler_Multiple_Samples(HasEnvironment):
"""Sampler V3: all channels with timestamps"""
def build(self):

    self.delay= 100*us
    self.setattr_device("core")
    self.setattr_device("sampler0")
            
    
    
    
@kernel    
def take_data(self,samples,counter):
    self.sampler0.init()     

    #%% Prep Stuff
    n_channels = 4
    smp = [0.0]*n_channels
    channel_names = channel_names = ["sampler0_chan5", "sampler0_chan6",
                                     "sampler0_chan7", "sampler0_chan8"]
    timestamps_mu = 'Time (Machine Units)'
    
    
    for i in range(n_channels):
        chan_data = np.full(samples, np.nan)
        self.set_dataset(channel_names[i], chan_data, broadcast=True,
                         persist=False,unit = 'V')
        
        # delay(self.delay)
        
        if counter ==0:
                self.sampler0.set_gain_mu(7-i, 0) 
          
    tstamp_data_mu = np.full(samples,self.core.get_rtio_counter_mu())     
    self.set_dataset(timestamps_mu, tstamp_data_mu, broadcast=True,
                     persist=False)

    #%% Data Taking
    num = samples/5 +1
    for n in range(samples):  
      delay(30*us*num) #seems about right from testing to 
      #avoid RTIO errors on the sampler

      while True:
           try:
               current_time = self.core.get_rtio_counter_mu()
               self.sampler0.sample(smp)                                                #runs sampler and saves to list 
               # current_time = self.core.get_rtio_counter_mu()
   
               for i in range(n_channels):
                   self.mutate_dataset(channel_names[i], n, smp[i])
               self.mutate_dataset(timestamps_mu, n, current_time)
               # delay(self.delay)   
               break
           except RTIOUnderflow:
               delay(self.delay) 
           except RTIOOverflow:
               delay(self.delay) 

`