I want to maintain a queue entirely within the Arm core. From studying the examples, and some experimentation, it looks like I will need to initialize data which is accessed from multiple @kernel functions within the build function, which is invoked by the host. That's fine. But it's not clear to me how the Core will access the data prefixed with "self." Does the Core need to use, effectively, an RPC call to access it? I want to make the kernel operations very efficient. Is there a different way to create the queue that would accomplish my goal of being able to maintain the entire execution (other than invocation and print) in the Core?
The "artiq_compile" command doesn't show any options to produce a human-readable representation of the elf file. Is there a tool available that I can use to disassemble the elf? Thanks for any ideas.
Here is a little program showing what I'm trying to do.
`import numpy as np
from artiq.experiment import *
class BlinkForever(EnvExperiment):
kernel_invariants = {"eq_capacity"}
# Experiment BlinkForever builder
def build(self):
self.setattr_device("core")
self.setattr_device("led0")
# Initialize event queue and counters
self.eq_capacity = 1024
self.eq_size = 0
self.eq_head = 0
self.eq_tail = -1
self.count_this = 0
self.count_that = 0
self.eq_queue = self.return_array(self.eq_capacity)
def return_array(self, count) -> TArray(TInt32):
return np.empty(count, dtype=np.int32)
@kernel
def Enqueue(self, item):
if self.eq_size == self.eq_capacity:
# print("Error, queue capacity exceeded")
return
self.eq_tail = (self.eq_tail + 1) & (self.eq_capacity - 1)
self.eq_queue[self.eq_tail] = item
self.eq_size += 1
@kernel
def Dequeue(self) -> TInt32:
if self.eq_size == 0:
# print("Error, queue underflow")
return 0
item = self.eq_queue[self.eq_head]
self.eq_head = (self.eq_head + 1) & (self.eq_capacity - 1)
self.eq_size -= 1
return item
@kernel
def do_this(self):
# print("do this event")
self.count_this += 1
@kernel
def do_that(self):
# print("do that event")
self.count_that += 1
@kernel
def process_queued_events(self):
while True:
item = self.Dequeue()
if (item == 1):
self.do_this()
elif (item == 2):
self.do_that()
elif (item == 0):
# print("Event queue is empty quit")
return
@kernel
def test_basic(self):
self.process_queued_events()
self.Enqueue(1)
self.Enqueue(2)
self.Enqueue(1)
self.Enqueue(1)
self.process_queued_events()
# How to do timing?
for _ in range(5):
self.Enqueue(1)
self.Enqueue(2)
self.process_queued_events()
@kernel
def run(self):
self.core.reset()
self.test_basic()
try:
for _ in range(10):
self.led0.pulse(300*ms)
delay(300*ms)
except RTIOUnderflow:
print("RTIO underflow occurred")
print("Number of this", self.count_this)
print("Number of that", self.count_that)`