Hello Artiq users,

I am trying to develop a simple photon counter to measure a time trace of photons from my PMT. Below I attach the code I wrote.

Issues:

  1. I get an RTIOOverflow for high number of data points (> 10k)
  2. TerminationRequested does not work properly in Dashboard

Do you know any better methods for this simple application? Ideally, I want to get rid of the delays in-between the data point acquisition.

from artiq.experiment import *
from artiq.coredevice.edge_counter import CounterOverflow
import time

import numpy as np

class PhotonCounterCurrent2(EnvExperiment): 
    def build(self):
        self.setattr_device("core")
        self.setattr_device("ttl2")
        self.setattr_device("ttl_counter2")

        self.setattr_argument("n_points", NumberValue(1000,ndecimals=0, step=1))  #steps number
        self.setattr_argument("det_time", NumberValue(0.01*ms,ndecimals=3, unit="ms", step=0.001))  #time of detection 
        

    def run(self):
        tc = time.time()     
        try:
            self.run_kernel()
        except TerminationRequested:
            print('Terminated.') 
            
        print("Time elapsed: {:.2f} seconds" .format(time.time() - tc))        
            

    @kernel
    def run_kernel(self):
        self.core.reset()
                                 
        i = 0 ;

        self.set_dataset("Photon_Counts", np.full(self.n_points, np.nan), broadcast=True)
        
        # self.core.break_realtime()
        delay(10*ms)
        

        for _ in range(self.n_points):        
            num_rising_edges = self.counting_style3()
            delay(100*us)
            self.mutate_dataset("Photon_Counts", i, num_rising_edges)
            i += 1


    @kernel
    def counting_style4(self):        
        self.ttl_counter2.gate_rising(self.det_time) #needs to be initialised in the programme
        
        return self.ttl_counter2.fetch_count()
    
    @kernel
    def counting_style3(self):
        gate_end_mu = self.ttl2.gate_rising(self.det_time)
     
        return self.ttl2.count(now_mu())

    Hi MikoT42 ,

    I had a similar question a while ago. My goal was to get the time-of-arrival for each photon measured on a pmt. Here's the link to the thread.

    Ultimately, I ended up with the following code (reduced to essential lines) shown below. The pmt events are collected and then during a 500us delay, the CPU extracts the timestamps of each event. It's a little confusing because the line of code that extracts the timestamps is BEFORE the delay line. If you have more questions, let me know. I'm happy to email if you'd like to see the whole experiment file.

    def prepare(self):
        self.timestamps_down = [np.int64(0) for _ in range(self.n_events * self.n_reps)]
        # Save the abosulte time in machine units of the pmt start time
        self.timestamps_down_starts = [np.int64(0) for _ in range(self.n_reps)]
    
    @kernel
    def run(self):
        # Initialize core timeline
        self.core.break_realtime()
        delay(self.DELAY * us)
        self.core.reset()
        delay(self.DELAY * us)
    
        self.init_aoms()
        
        # Measure fluorescence of down state
        for i in range(self.n_reps):
            # 0. Doppler Cool (pulse doppler and protection beams)
            self.doppler_cool()
            # 1. Optical Pumping
            self.optical_pumping.sw.pulse(self.optical_pumping_time * us)
            # 2. State Detection (turn detection beam on and count photons at pmt)
            self.timestamps_down_starts[i] = now_mu()  # save absolute time value of beginning of detection
            gate_end = self.detect_cool()
            # Extract all pmt event timestamps
            for j in range(self.n_events):
                self.timestamps_down[i*self.n_events + j] = self.ttl0.timestamp_mu(gate_end)
                if self.timestamps_down[i*self.n_events + j] < 0:
                    break
            # Delay to allow for timestamp computation on CPU
            delay(500 * us)

    MikoT42 I get an RTIOOverflow for high number of data points (> 10k)

    If you're only counting you can use the gateware counter, which would not overflow.

    4 days later

    MikoT42 TerminationRequested does not work properly in Dashboard

    We have a similar experiment (related section shown below) which might help with the termination problem. You might want to check https://m-labs.hk/artiq/manual/management_system.html?highlight=check_pause#pauses.

        def run(self):
            self.core.reset()
            self.start_time = time()
            self.set_dataset_attribute("pmt.start_time", self.start_time)
            self.set_dataset("pmt.times", [], broadcast=True)
            self.set_dataset("pmt.counts", [], broadcast=True)
            self.set_dataset("pmt.last_counts", 0., broadcast=True)
    
            while True:
                status = self.scheduler.get_status()
                same_pipelines_higher_priority = 0
    
                for exp in status:
                    if (status[exp]["pipeline"] == self.scheduler.pipeline_name
                        and status[exp]["priority"] > self.scheduler.priority):
                        same_pipelines_higher_priority += 1
                if not self.scheduler.check_pause():
                    self.run_pmt()
                elif same_pipelines_higher_priority > 0: # higher priority experiment scheduled
                    self.core.close() # close the connection so other experiment can use it
                    self.scheduler.pause() # yield control to the scheduler
                else: # user has stopped the experiment
                    break
    
        @kernel(flags={"fast-math"})
        def run_pmt(self):
            self.core.break_realtime()
            while True:
                detect_time_mu = self.get_pmt_detect_time_mu()  # RPC function, not shown here.
                self.core.break_realtime()
                t_count = self.pmt.gate_rising_mu(detect_time_mu)
                count = self.pmt.fetch_count()
    
                self.record_counts(count)  # RPC function, not shown here.
                if self.scheduler.check_pause():
                    break