Hello,

I am using TTL to count, and I wonder what is the fastest way to retrieve a list of data from the TTL? I am using print(), but that increases the waiting time for that command, and usually gives underflow error (A delay <50ms would still gives underflow error sometimes), while without print() it doesn't need to wait very long until next command can be used.

I tried to store everything in a list, but it seems like normal functions in Python to handle list can not be used in the ARTIQ script (append or add or something, it all gives me error that doesn't appear in a normal Python). I wonder what would be, in your opinion, the fastest way to retrieve this data with a small delay within the next command?

Is there a quick section in the documentation that I can read as to why this happen and how to handle data? Thank you.

What do you need: Do you need latency or throughput?
How fast: What "speed" do you need (quantitative)?
Where to: Do you need to retrieve to the host or to the coredevice?
What's the data: Do you need histograms, timestamps, or something else?

Given what you tried, the fastest way would be passing a list to an async RPC. You'll get a few ms.

Thank you very much, a few ms should be great for us I think, I will try what you suggest.

Our atoms is few uK for now and we wish to record the signal before it disappear, and earlier 50ms seems to be too long for that purpose.

Can you guide me a bit on handling list on a ARTIQ script? Is list the correct type to be used? It seems like usual functions as append() or list1 + list2 wouldn't work while using artiq_run but it works while I compile Python normally. I also tried numpy array but I have similar error as well. (when I use artiq_run it gives type list does not have an attribute append or something similar, I didn't recall the exact error).

Concerning the list handling: As mentioned in the FAQ (https://m-labs.hk/artiq/manual/faq.html), variable length arrays are not supported for @kernel functions. This means, that operations like append and concatenating two lists will not work in @kernel functions. Outside of them (on the host I guess), you can use variable length lists though.

So the way I take care of it is to preallocate a list with some default value that is used in @kernel functions and just use that or by @rpc methods I pass values to a separate list on the host, that is being managed there and I can use all of Python's tools if necessary. The same is the case for numpy arrays.

I also want to briefly mention the method append_to_dataset (https://m-labs.hk/artiq/manual/core_language_reference.html) which is useful if you just want to save some values in a list for external evaluation. Then a whole list is sometimes not even needed.

As an example of list handling, here is my experiment for reading out a PMT live through a TTL. It is probably not the optimal way (e.g. I do not use append_to_dataset here, because it is an older experiment and I did not know this method back then), but maybe helps anyway:

class PMTTimeSeries(EnvExperiment):
    
    def build(self):
        self.setattr_device("core")
        self.setattr_device("ttl0")
        self.setattr_device("scheduler")
        self.setattr_device("ccb")
      
        self.setattr_argument("t_accu", NumberValue(
            min = 0*us, 
            max = 10*s,
            default = 10*ms,
            unit = "ms"), 
        tooltip="Accumulation time")
        
        self.setattr_argument("samples",  NumberValue(
            min = 10, 
            max = 100000), 
        tooltip="Number of displayed samples")
        
    @kernel
    def run(self):
        self.ccb.issue("create_applet", "pmt_time_series", 
            "${artiq_applet}plot_xy n_photons", ["PMT"])
        self.set_dataset("n_photons", [None], broadcast=True)
        
        while True:
            self.core.break_realtime()
            try:
                photon_count = self.ttl0.count(self.ttl0.gate_rising(self.t_accu))
                if self.cb(photon_count):
                    break
            except RTIOOverflow:
                continue
                
    def cb(self, photon_count) -> TBool:
        n_photons = self.get_dataset("n_photons")
        if n_photons[0] is None:
            n_photons[0] = photon_count
        else:
            n_photons.append(photon_count)
        if len(n_photons) > self.samples:
            n_photons.pop(0)
            
        self.set_dataset("n_photons", n_photons, broadcast=True)
        
        try:
            self.scheduler.pause()
        except TerminationRequested:
            return True
        return False
a month later

Hello,

I am writing again to ask how to retrieve data from the TTL counter. I followed the example from the "Getting started with the management system" section with the counter, precisely:

n_points=1000
meas_time=10*ms

def run(self):
             self.set_dataset("n_photons", np.full(n_points,np.nan), broadcast=True)
             for i in range(n_points):
                          photon_count = self.ttl0.count(self.ttl0.gate_rising(meas_time))
                          self.mutate_dataset("n_photons", i, photon_count)
                          time.sleep(0.5)

And I keep having underflow error again. I try to put some fairly large time before and after the count() method but underflow still happen, and if I raise an RTIOUnderflow exception with try: then the data set just never updated and remain as nan

Can you guide me a bit more of how to retrieve data from the TTL counter? Thank you very much !

    Vanimiaou photon_count = self.ttl0.count(self.ttl0.gate_rising(meas_time))

    This line runs the compiler twice and returns to the interpreter in between, which is probably not what you want. Make a kernel function that wraps it so that the compiler is invoked only once.

    You also need to set the time cursor somewhere with e.g. core.reset() or break_realtime().

    Hi Sébastien, thank you very much for your advice, it manages to run without an Underflow error now. Can you help me a bit with the time cursor? I put one at the beginning of run() as below:

    `meas_time=10*ms
    n_points=1000


    class Main(EnvExperiment):
    """Data count test"""
    def build(self):
    self.setattr_device("core")
    self.setattr_device("ttl0")
    self.setattr_device("ccb") #for the applet plot


    def run(self):
        self.core.reset()
        self.core.break_realtime()
        self.ccb.issue("create_applet", "photon_counts_plot", 
            "${artiq_applet}plot_xy n_photons", ["Counts plot"])
        #self.ccb.issue("create_applet", "photon_histogram_plot", 
        #    "${artiq_applet}plot_xy_hist n_photons", ["Counts plot"])
        self.set_dataset("n_photons", np.full(n_points,np.nan), broadcast=True)
        delay(1*ms)
        for i in range(n_points):
            #try:
            photon_count = self.get_photon_count() 
            delay(1*ms)
            self.mutate_dataset("n_photons", i, photon_count)
            time.sleep(0.5)
            #except RTIOUnderflow:
            #    continue    
    
    def get_photon_count(self):
        #self.core.reset()
        #self.core.break_realtime()
        edgedetect=self.ttl0.gate_rising(meas_time)
        return self.ttl0.count(edgedetect)
        `

    and got the error:

    NotImplementedError: Attempted to interpret kernel without a time manager
    ERROR:master:artiq.master.scheduler:got worker exception in run stage, deleting RID 50

    I tried to put either core.reset() or break_realtime() or both as the same time, at the beginning of run() only or both in in run() and in the get_photon_count() function and still get the error.

    You are again mixing up kernel and interpreter code inappropriately. delay cannot be used in the interpreter, and also your kernels should be better delineated. get_photon_count should be decorated with @kernel.
    break_realtime immediately after reset is never necessary.

    Thank you for your advice. I guess it wasn't compiling theget_photon_countearlier. I removed the delay from run and add @kernel before the get_photon_count, and still get RTIO underflow error, no matter how big the delay I put within the get _photon_count line or the time.sleep() between calling get_photon_count and storing data. Can you help me with it? Thank you very much

    def run(self):
        self.core.reset()
        self.set_dataset("n_photons", np.full(n_points,np.nan), broadcast=True)
        for i in range(n_points):
            #time.sleep(0.01)
            photon_count = self.get_photon_count() 
            #time.sleep(0.01)
            self.mutate_dataset("n_photons", i, photon_count)
            time.sleep(0.5)
    
    @kernel
    def get_photon_count(self):
        #self.core.reset()
        #self.core.break_realtime()
        delay(1*ms)
        edgedetect=self.ttl0.gate_rising(meas_time)
        delay(1*ms)
        return self.ttl0.count(edgedetect)`

    I'm not sure what you are trying to achieve - if it is counting some photons at intervals timed by the slow and jittery time.sleep (+ kernel compilation and loading), then add break_realtime() at the beginning of get_photon_count() and remove the calls to delay() which are not necessary.

    Hi Sébastien,

    Thank you very much for your advice. time.sleep() is indeed slow.

    I am just trying to find a quick way to save data. Earlier I save the data by printing it out on the terminal using artiq_run but it needs a delay of 50ms between the count() and print() command to not have underflow, and save the data from the terminal.

    I plotted the data earlier by sending a DAC voltage proportional to the value of the count on the oscilloscope to see the count variation in real time.

    I am exploring the possibility to store and plot data from the management system to retrieve data and have it plotted in real time directly from the computer, with the histogram, but it is indeed slower. What would be the quickest way to retrieve the data do you suggest ? I saw the comment of @rjo using an async RPC but I haven't tried it yet because I do not have the knowledge on that and still have to read about it, do you know where would be a good starting point and whether if there is any example on your github ?

    Edit: I just remove the time.sleep() from the example, it still work without any underflow error, but to acquire 1000 points it needs 150sec so actually 150ms per point - of course there was 10ms integration time included).
    Actually I think we can work with this for now, since if we do not need the print() command to collect data, we can just send the DAC voltage with a much smaller time delay and it would not cause underflow error, and we can watch the atom trap with this. And we can collect data separately through the dashboard. Thanks Sébastien!

      Vanimiaou What would be the quickest way to retrieve the data do you suggest ? I saw the comment of @rjo using an async RPC but I haven't tried it yet because I do not have the knowledge on that and still have to read about it, do you know where would be a good starting point and whether if there is any example on your github ?

      @rpc(flags={"async"})
      def collect_data(self, datapoint):
         self.mutate_dataset("n_photons", self.i, datapoint)
         self.i += 1
      
      @kernel
      def my_kernel(self):
        ...
        self.collect_data(datapoint)
        ....
      3 years later

      Hi all,

      I have a related question. Suppose I want to schedule a bunch of ttl gates back to back, like below. I want to do some math on the counts during each gate and decide when to end based on what I find. What is the minimum recovery time I need to give between each gate? I am finding that the required delay to avoid underflowing upon scheduling the next gate_rising_mu ("L" in the code below) grows together with binT, so that in the end I can only have the TTL gated for about half the time. So if I try to run with 10 us gates, I need to burn 10 us between each, if 20, then 20 and so on.

      I was really hoping to do something like this, but with a negligible delay between gates, ideally 1*us. Does anyone know what the delay is attributed to? I'm counting 60 kcps, so in a 10 us gate period average 0.6 photons. Could I get around it by sending my PMT signal to TWO ARTIQ TTL's and gating them in an interleaved fashion?

      Thanks so much in advance to Sebastien or anyone else who has a chance to weigh in!

      @kernel
      def countSeveralBins(self):
      
          binT = 10*us
          tstart = now_mu()
          tdone = self.pmt.gate_rising_mu(binT)
      
          bin = 0
          Tot = 0
      
          while True:
              #collect one bin worth of photons.
              #this also forces the timeline up, so after this now_mu() would return tdone.
              thisN = self.pmt.count(tdone)
      
              #schedule the next bin
              tdone = self.pmt.gate_rising_mu(binT)
      
              #do math on the current bin's count thisN
              Tot += thisN
              bin += 1
              if Tot > bin*2:
                  break
              elif Tot > bin:
                  Tot += 100
                  break
              elif bin>10:
                  Tot = 0
                  break
      
              delay(L*us)
      
          thisN = self.pmt.count(tdone)
          return T

      I think the problem may be that the kernel's ALU (if it has such a thing) is not available for doing math concurrently with gating for TTL input. Can anyone confirm? I'm finding that any math I try to do directly after scheduling a TTL gate doesn't actually get done until after the gate concludes, even if I'm not advancing the timeline cursor.