Hello all,
I have created an experiment, that constantly runs when the system is otherwise idle. It needs to read and control various devices at different time intervals. Running it, I receive a syntax error after a short while. Looking further into it I also found, that responses to RPC calls are sometimes swapped, and it appears as though the issue is caused by two RPC calls writing to the same buffer at the same time.
The following is a reduced example of what I tried to implement:
class Monitoring(EnvExperiment):
def build(self):
self.setattr_device("core")
self.setattr_device("scheduler")
self.setattr_device("ttl8")
self.setattr_device("sampler0")
self.running = True
self.action_queue = Queue()
self.sampler_data = [0.0] * 8
@kernel
def sampler_action(self):
self.sampler0.sample(self.sampler_data)
delay(100 * us)
def sampler_task(self):
while self.running:
self.task_queue.put(self.sampler_action)
time.sleep(0.3)
@kernel
def ttl_action(self):
self.ttl8.pulse(5 * us)
def ttl_task(self):
while self.running:
self.action_queue.put(self.ttl_action)
time.sleep(0.1)
self.set_dataset("sampler", self.sampler_data, persist=True)
@kernel
def kernel_runner(self):
self.core.reset()
while not self.scheduler.check_pause():
self.action_queue.get()()
def run(self):
t1 = threading.Thread(target=self.ttl_task)
t2 = threading.Thread(target=self.sampler_task)
t1.start()
t2.start()
try:
while self.running:
self.kernel_runner()
self.core.comm.close()
self.scheduler.pause()
except TerminationRequested:
self.running = False
finally:
t1.join()
t2.join()
The result is usually an error message like the following, where the evaluation of the corrupted string fails.
INFO:worker(3029,threadingtest.py):print:Exception in thread Thread-1:
INFO:worker(3029,threadingtest.py):print:Traceback (most recent call last):
INFO:worker(3029,threadingtest.py):print: File "C:\Users\Merlin\miniconda3\envs\env\lib\threading.py", line 932, in _bootstrap_inner
INFO:worker(3029,threadingtest.py):print: self.run()
INFO:worker(3029,threadingtest.py):print: File "C:\Users\Merlin\miniconda3\envs\env\lib\threading.py", line 870, in run
INFO:worker(3029,threadingtest.py):print: self._target(*self._args, **self._kwargs)
INFO:worker(3029,threadingtest.py):print: File "C:\Users\Merlin\env\lab_control\repository\threadingtest.py", line 138, in ttl_task
INFO:worker(3029,threadingtest.py):print: self.set_dataset("sampler", self.sampler_data, persist=True)
INFO:worker(3029,threadingtest.py):print: File "C:\Users\Merlin\miniconda3\envs\env\lib\site-packages\artiq\language\environment.py", line 352, in set_dataset
INFO:worker(3029,threadingtest.py):print: self.__dataset_mgr.set(key, value, broadcast, persist, archive)
INFO:worker(3029,threadingtest.py):print: File "C:\Users\Merlin\miniconda3\envs\env\lib\site-packages\artiq\master\worker_db.py", line 128, in set
INFO:worker(3029,threadingtest.py):print: self._broadcaster[key] = persist, value
INFO:worker(3029,threadingtest.py):print: File "C:\Users\Merlin\miniconda3\envs\env\lib\site-packages\sipyco\sync_struct.py", line 241, in __setitem__
INFO:worker(3029,threadingtest.py):print: self.root.publish({"action": ModAction.setitem.value,
INFO:worker(3029,threadingtest.py):print: File "C:\Users\Merlin\miniconda3\envs\env\lib\site-packages\artiq\master\worker_impl.py", line 53, in parent_action
INFO:worker(3029,threadingtest.py):print: reply = get_object()
INFO:worker(3029,threadingtest.py):print: File "C:\Users\Merlin\miniconda3\envs\env\lib\site-packages\artiq\master\worker_impl.py", line 41, in get_object
INFO:worker(3029,threadingtest.py):print: return pyon.decode(line)
INFO:worker(3029,threadingtest.py):print: File "C:\Users\Merlin\miniconda3\envs\env\lib\site-packages\sipyco\pyon.py", line 215, in decode
INFO:worker(3029,threadingtest.py):print: return eval(s, _eval_dict, {})
INFO:worker(3029,threadingtest.py):print: File "<string>", line 1
INFO:worker(3029,threadingtest.py):print: }{"status": "ok", "data": false}
INFO:worker(3029,threadingtest.py):print: ^
INFO:worker(3029,threadingtest.py):print:SyntaxError: unmatched '}'
Exchanging the threads with async
functions resulted in the same errors.
Is there a recommended way to implement such a thing?
I also tried to isolate the problem and created a minimal non-working example using only Sipyco.
import threading
from sipyco.pc_rpc import Client
c = Client("::1", 6789, "Meep")
def loop_a():
while True:
a = c.return_a()
assert a == "a", f"expected a got {a}"
def loop_b():
while True:
b = c.return_b()
assert b == "b", f"expected b got {b}"
threading.Thread(target=loop_a).start()
loop_b()