summaryrefslogtreecommitdiff
path: root/thread.c
diff options
context:
space:
mode:
authorLuke Gruber <[email protected]>2025-05-12 18:03:22 -0400
committerAaron Patterson <[email protected]>2025-05-13 13:23:57 -0700
commit1d4822a175a0dfccca8f252b0e757a1991bd54f9 ()
tree442d4f6ede7999b141403b10ece794569e5b5d17 /thread.c
parent2fee379f8f0be08be49c1fccbb37cb2a06834b24 (diff)
Get ractor message passing working with > 1 thread sending/receiving values in same ractor
Rework ractors so that any ractor action (Ractor.receive, Ractor#send, Ractor.yield, Ractor#take, Ractor.select) will operate on the thread that called the action. It will put that thread to sleep if it's a blocking function and it needs to put it to sleep, and the awakening action (Ractor.yield, Ractor#send) will wake up the blocked thread. Before this change every blocking ractor action was associated with the ractor struct and its fields. If a ractor called Ractor.receive, its wait status was wait_receiving, and when another ractor calls r.send on it, it will look for that status in the ractor struct fields and wake it up. The problem was that what if 2 threads call blocking ractor actions in the same ractor. Imagine if 1 thread has called Ractor.receive and another r.take. Then, when a different ractor calls r.send on it, it doesn't know which ruby thread is associated to which ractor action, so what ruby thread should it schedule? This change moves some fields onto the ruby thread itself so that ruby threads are the ones that have ractor blocking statuses, and threads are then specifically scheduled when unblocked. Fixes [#17624] Fixes [#21037]
Notes: Merged: https://.com/ruby/ruby/pull/12633
-rw-r--r--thread.c24
1 files changed, 18 insertions, 6 deletions
@@ -335,7 +335,7 @@ unblock_function_clear(rb_thread_t *th)
}
static void
-threadptr_interrupt_locked(rb_thread_t *th, bool trap)
{
// th->interrupt_lock should be acquired here
@@ -357,26 +357,27 @@ threadptr_interrupt_locked(rb_thread_t *th, bool trap)
}
static void
-threadptr_interrupt(rb_thread_t *th, int trap)
{
rb_native_mutex_lock(&th->interrupt_lock);
{
- threadptr_interrupt_locked(th, trap);
}
rb_native_mutex_unlock(&th->interrupt_lock);
}
void
rb_threadptr_interrupt(rb_thread_t *th)
{
RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
- threadptr_interrupt(th, false);
}
static void
threadptr_trap_interrupt(rb_thread_t *th)
{
- threadptr_interrupt(th, true);
}
static void
@@ -525,6 +526,9 @@ thread_cleanup_func(void *th_ptr, int atfork)
}
rb_native_mutex_destroy(&th->interrupt_lock);
}
static VALUE rb_threadptr_raise(rb_thread_t *, int, VALUE *);
@@ -2423,6 +2427,7 @@ NORETURN(static void rb_threadptr_to_kill(rb_thread_t *th));
static void
rb_threadptr_to_kill(rb_thread_t *th)
{
rb_threadptr_pending_interrupt_clear(th);
th->status = THREAD_RUNNABLE;
th->to_kill = 1;
@@ -2446,6 +2451,11 @@ threadptr_get_interrupts(rb_thread_t *th)
static void threadptr_interrupt_exec_exec(rb_thread_t *th);
int
rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
{
@@ -2453,6 +2463,8 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
int postponed_job_interrupt = 0;
int ret = FALSE;
if (th->ec->raised_flag) return ret;
while ((interrupt = threadptr_get_interrupts(th)) != 0) {
@@ -6033,7 +6045,7 @@ rb_threadptr_interrupt_exec(rb_thread_t *th, rb_interrupt_exec_func_t *func, voi
rb_native_mutex_lock(&th->interrupt_lock);
{
ccan_list_add_tail(&th->interrupt_exec_tasks, &task->node);
- threadptr_interrupt_locked(th, true);
}
rb_native_mutex_unlock(&th->interrupt_lock);
}