Hello all,

I have created an experiment, that constantly runs when the system is otherwise idle. It needs to read and control various devices at different time intervals. Running it, I receive a syntax error after a short while. Looking further into it I also found, that responses to RPC calls are sometimes swapped, and it appears as though the issue is caused by two RPC calls writing to the same buffer at the same time.

The following is a reduced example of what I tried to implement:

class Monitoring(EnvExperiment):
    def build(self):
        self.setattr_device("core")
        self.setattr_device("scheduler")
        self.setattr_device("ttl8")
        self.setattr_device("sampler0")

        self.running = True
        self.action_queue = Queue()

        self.sampler_data = [0.0] * 8

    @kernel
    def sampler_action(self):
        self.sampler0.sample(self.sampler_data)
        delay(100 * us)

    def sampler_task(self):
        while self.running:
            self.task_queue.put(self.sampler_action)
            time.sleep(0.3)

    @kernel
    def ttl_action(self):
        self.ttl8.pulse(5 * us)

    def ttl_task(self):
        while self.running:
            self.action_queue.put(self.ttl_action)
            time.sleep(0.1)
            self.set_dataset("sampler", self.sampler_data, persist=True)

    @kernel
    def kernel_runner(self):
        self.core.reset()
        while not self.scheduler.check_pause():
            self.action_queue.get()()

    def run(self):
        t1 = threading.Thread(target=self.ttl_task)
        t2 = threading.Thread(target=self.sampler_task)
        t1.start()
        t2.start()

        try:
            while self.running:
                self.kernel_runner()
                self.core.comm.close()
                self.scheduler.pause()
        except TerminationRequested:
            self.running = False
        finally:
            t1.join()
            t2.join()

The result is usually an error message like the following, where the evaluation of the corrupted string fails.

INFO:worker(3029,threadingtest.py):print:Exception in thread Thread-1:
INFO:worker(3029,threadingtest.py):print:Traceback (most recent call last):
INFO:worker(3029,threadingtest.py):print:  File "C:\Users\Merlin\miniconda3\envs\env\lib\threading.py", line 932, in _bootstrap_inner
INFO:worker(3029,threadingtest.py):print:    self.run()
INFO:worker(3029,threadingtest.py):print:  File "C:\Users\Merlin\miniconda3\envs\env\lib\threading.py", line 870, in run
INFO:worker(3029,threadingtest.py):print:    self._target(*self._args, **self._kwargs)
INFO:worker(3029,threadingtest.py):print:  File "C:\Users\Merlin\env\lab_control\repository\threadingtest.py", line 138, in ttl_task
INFO:worker(3029,threadingtest.py):print:    self.set_dataset("sampler", self.sampler_data, persist=True)
INFO:worker(3029,threadingtest.py):print:  File "C:\Users\Merlin\miniconda3\envs\env\lib\site-packages\artiq\language\environment.py", line 352, in set_dataset
INFO:worker(3029,threadingtest.py):print:    self.__dataset_mgr.set(key, value, broadcast, persist, archive)
INFO:worker(3029,threadingtest.py):print:  File "C:\Users\Merlin\miniconda3\envs\env\lib\site-packages\artiq\master\worker_db.py", line 128, in set
INFO:worker(3029,threadingtest.py):print:    self._broadcaster[key] = persist, value
INFO:worker(3029,threadingtest.py):print:  File "C:\Users\Merlin\miniconda3\envs\env\lib\site-packages\sipyco\sync_struct.py", line 241, in __setitem__
INFO:worker(3029,threadingtest.py):print:    self.root.publish({"action": ModAction.setitem.value,
INFO:worker(3029,threadingtest.py):print:  File "C:\Users\Merlin\miniconda3\envs\env\lib\site-packages\artiq\master\worker_impl.py", line 53, in parent_action
INFO:worker(3029,threadingtest.py):print:    reply = get_object()
INFO:worker(3029,threadingtest.py):print:  File "C:\Users\Merlin\miniconda3\envs\env\lib\site-packages\artiq\master\worker_impl.py", line 41, in get_object
INFO:worker(3029,threadingtest.py):print:    return pyon.decode(line)
INFO:worker(3029,threadingtest.py):print:  File "C:\Users\Merlin\miniconda3\envs\env\lib\site-packages\sipyco\pyon.py", line 215, in decode
INFO:worker(3029,threadingtest.py):print:    return eval(s, _eval_dict, {})
INFO:worker(3029,threadingtest.py):print:  File "<string>", line 1
INFO:worker(3029,threadingtest.py):print:    }{"status": "ok", "data": false}
INFO:worker(3029,threadingtest.py):print:    ^
INFO:worker(3029,threadingtest.py):print:SyntaxError: unmatched '}'

Exchanging the threads with async functions resulted in the same errors.
Is there a recommended way to implement such a thing?

I also tried to isolate the problem and created a minimal non-working example using only Sipyco.

import threading
from sipyco.pc_rpc import Client

c = Client("::1", 6789, "Meep")

def loop_a():
    while True:
        a = c.return_a()
        assert a == "a", f"expected a got {a}"

def loop_b():
    while True:
        b = c.return_b()
        assert b == "b", f"expected b got {b}"

threading.Thread(target=loop_a).start()
loop_b()

(The threading library in Python is a bit lame due to the GIL, you probably should use asyncio instead if you can. But the solution is the same - use mutexes)