diff options
44 files changed, 2659 insertions, 3508 deletions
@@ -67,7 +67,7 @@ assert_equal "#<Ractor:#1 running>", %q{ # Return id, loc, and status for no-name ractor assert_match /^#<Ractor:#([^ ]*?) .+:[0-9]+ terminated>$/, %q{ r = Ractor.new { '' } - r.take sleep 0.1 until r.inspect =~ /terminated/ r.inspect } @@ -75,7 +75,7 @@ assert_match /^#<Ractor:#([^ ]*?) .+:[0-9]+ terminated>$/, %q{ # Return id, name, loc, and status for named ractor assert_match /^#<Ractor:#([^ ]*?) Test Ractor .+:[0-9]+ terminated>$/, %q{ r = Ractor.new(name: 'Test Ractor') { '' } - r.take sleep 0.1 until r.inspect =~ /terminated/ r.inspect } @@ -86,7 +86,7 @@ assert_equal 'ok', %q{ r = Ractor.new do 'ok' end - r.take } # Passed arguments to Ractor.new will be a block parameter @@ -96,7 +96,7 @@ assert_equal 'ok', %q{ r = Ractor.new 'ok' do |msg| msg end - r.take } # Pass multiple arguments to Ractor.new @@ -105,7 +105,7 @@ assert_equal 'ok', %q{ r = Ractor.new 'ping', 'pong' do |msg, msg2| [msg, msg2] end - 'ok' if r.take == ['ping', 'pong'] } # Ractor#send passes an object with copy to a Ractor @@ -115,65 +115,23 @@ assert_equal 'ok', %q{ msg = Ractor.receive end r.send 'ok' - r.take } # Ractor#receive_if can filter the message -assert_equal '[2, 3, 1]', %q{ - r = Ractor.new Ractor.current do |main| - main << 1 - main << 2 - main << 3 - end - a = [] - a << Ractor.receive_if{|msg| msg == 2} - a << Ractor.receive_if{|msg| msg == 3} - a << Ractor.receive -} -# Ractor#receive_if with break -assert_equal '[2, [1, :break], 3]', %q{ - r = Ractor.new Ractor.current do |main| - main << 1 - main << 2 - main << 3 end - a = [] - a << Ractor.receive_if{|msg| msg == 2} - a << Ractor.receive_if{|msg| break [msg, :break]} - a << Ractor.receive -} - -# Ractor#receive_if can't be called recursively -assert_equal '[[:e1, 1], [:e2, 2]]', %q{ - r = Ractor.new Ractor.current do |main| - main << 1 - main << 2 - main << 3 - end - - a = [] - - Ractor.receive_if do |msg| - begin - Ractor.receive - rescue Ractor::Error - a << [:e1, msg] - end - true # delete 1 from queue - end - - Ractor.receive_if do |msg| - begin - Ractor.receive_if{} - rescue Ractor::Error - a << [:e2, msg] - end - true # delete 2 from queue - end - - a # } # dtoa race condition @@ -184,7 +142,7 @@ assert_equal '[:ok, :ok, :ok]', %q{ 10_000.times{ rand.to_s } :ok } - }.map(&:take) } # Ractor.make_shareable issue for locals in proc [Bug #18023] @@ -218,27 +176,32 @@ if ENV['_WORKFLOW'] == 'Compilations' # ignore the follow else -# Ractor.select(*ractors) receives a values from a ractors. -# It is similar to select(2) and Go's select syntax. -# The return value is [ch, received_value] assert_equal 'ok', %q{ # select 1 r1 = Ractor.new{'r1'} - r, obj = Ractor.select(r1) - 'ok' if r == r1 and obj == 'r1' } # Ractor.select from two ractors. assert_equal '["r1", "r2"]', %q{ # select 2 - r1 = Ractor.new{'r1'} - r2 = Ractor.new{'r2'} - rs = [r1, r2] as = [] - r, obj = Ractor.select(*rs) - rs.delete(r) as << obj - r, obj = Ractor.select(*rs) as << obj as.sort #=> ["r1", "r2"] } @@ -282,30 +245,12 @@ assert_match /specify at least one ractor/, %q{ end } -# Outgoing port of a ractor will be closed when the Ractor is terminated. -assert_equal 'ok', %q{ - r = Ractor.new do - 'finish' - end - - r.take - sleep 0.1 until r.inspect =~ /terminated/ - - begin - o = r.take - rescue Ractor::ClosedError - 'ok' - else - "ng: #{o}" - end -} - # Raise Ractor::ClosedError when try to send into a terminated ractor assert_equal 'ok', %q{ r = Ractor.new do end - r.take # closed sleep 0.1 until r.inspect =~ /terminated/ begin @@ -317,47 +262,16 @@ assert_equal 'ok', %q{ end } -# Raise Ractor::ClosedError when try to send into a closed actor -assert_equal 'ok', %q{ - r = Ractor.new { Ractor.receive } - r.close_incoming - - begin - r.send(1) - rescue Ractor::ClosedError - 'ok' - else - 'ng' - end -} - -# Raise Ractor::ClosedError when try to take from closed actor -assert_equal 'ok', %q{ - r = Ractor.new do - Ractor.yield 1 - Ractor.receive - end - - r.close_outgoing - begin - r.take - rescue Ractor::ClosedError - 'ok' - else - 'ng' - end -} - -# Can mix with Thread#interrupt and Ractor#take [Bug #17366] assert_equal 'err', %q{ - Ractor.new{ t = Thread.current begin Thread.new{ t.raise "err" }.join rescue => e e.message end - }.take } # Killed Ractor's thread yields nil @@ -365,34 +279,18 @@ assert_equal 'nil', %q{ Ractor.new{ t = Thread.current Thread.new{ t.kill }.join - }.take.inspect #=> nil } -# Ractor.yield raises Ractor::ClosedError when outgoing port is closed. assert_equal 'ok', %q{ - r = Ractor.new Ractor.current do |main| Ractor.receive - main << true - Ractor.yield 1 - end - - r.close_outgoing - r << true - Ractor.receive - - begin - r.take - rescue Ractor::ClosedError - 'ok' - else - 'ng' - end -} -# Raise Ractor::ClosedError when try to send into a ractor with closed incoming port -assert_equal 'ok', %q{ - r = Ractor.new { Ractor.receive } - r.close_incoming begin r.send(1) @@ -403,154 +301,44 @@ assert_equal 'ok', %q{ end } -# A ractor with closed incoming port still can send messages out -assert_equal '[1, 2]', %q{ - r = Ractor.new do - Ractor.yield 1 - 2 - end - r.close_incoming - - [r.take, r.take] -} - -# Raise Ractor::ClosedError when try to take from a ractor with closed outgoing port -assert_equal 'ok', %q{ - r = Ractor.new do - Ractor.yield 1 - Ractor.receive - end - - sleep 0.01 # wait for Ractor.yield in r - r.close_outgoing - begin - r.take - rescue Ractor::ClosedError - 'ok' - else - 'ng' - end -} - -# A ractor with closed outgoing port still can receive messages from incoming port -assert_equal 'ok', %q{ - r = Ractor.new do - Ractor.receive - end - - r.close_outgoing - begin - r.send(1) - rescue Ractor::ClosedError - 'ng' - else - 'ok' - end -} - # Ractor.main returns main ractor assert_equal 'true', %q{ Ractor.new{ Ractor.main - }.take == Ractor.current } # a ractor with closed outgoing port should terminate assert_equal 'ok', %q{ Ractor.new do - close_outgoing end true until Ractor.count == 1 :ok } -# multiple Ractors can receive (wait) from one Ractor -assert_equal '[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]', %q{ - pipe = Ractor.new do - loop do - Ractor.yield Ractor.receive - end - end - - RN = 10 - rs = RN.times.map{|i| - Ractor.new pipe, i do |pipe, i| - msg = pipe.take - msg # ping-pong - end - } - RN.times{|i| - pipe << i - } - RN.times.map{ - r, n = Ractor.select(*rs) - rs.delete r - n - }.sort -} unless /mswin/ =~ RUBY_PLATFORM # randomly hangs on mswin https://.com/ruby/ruby/actions/runs/3753871445/jobs/6377551069#step:20:131 - -# Ractor.select also support multiple take, receive and yield -assert_equal '[true, true, true]', %q{ - RN = 10 - CR = Ractor.current - - rs = (1..RN).map{ - Ractor.new do - CR.send 'send' + CR.take #=> 'sendyield' - 'take' - end - } - received = [] - taken = [] - yielded = [] - until received.size == RN && taken.size == RN && yielded.size == RN - r, v = Ractor.select(CR, *rs, yield_value: 'yield') - case r - when :receive - received << v - when :yield - yielded << v - else - taken << v - rs.delete r - end end - r = [received == ['sendyield'] * RN, - yielded == [nil] * RN, - taken == ['take'] * RN, - ] - - STDERR.puts [received, yielded, taken].inspect - r -} - -# multiple Ractors can send to one Ractor -assert_equal '[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]', %q{ - pipe = Ractor.new do - loop do - Ractor.yield Ractor.receive - end end - - RN = 10 - RN.times.map{|i| - Ractor.new pipe, i do |pipe, i| - pipe << i - end - } - RN.times.map{ - pipe.take - }.sort } -# an exception in a Ractor main thread will be re-raised at Ractor#receive assert_equal '[RuntimeError, "ok", true]', %q{ r = Ractor.new do raise 'ok' # exception will be transferred receiver end begin - r.take rescue Ractor::RemoteError => e [e.cause.class, #=> RuntimeError e.cause.message, #=> 'ok' @@ -567,7 +355,7 @@ assert_equal 'ok', %q{ sleep 0.1 'ok' end - r.take } # threads in a ractor will killed @@ -610,7 +398,7 @@ assert_equal 'false', %q{ msg.object_id end - obj.object_id == r.take } # To copy the object, now Marshal#dump is used @@ -629,10 +417,11 @@ assert_equal "allocator undefined for Thread", %q{ # send shareable and unshareable objects assert_equal "ok", <<~'RUBY', frozen_string_literal: false - echo_ractor = Ractor.new do loop do v = Ractor.receive - Ractor.yield v end end @@ -680,13 +469,13 @@ assert_equal "ok", <<~'RUBY', frozen_string_literal: false shareable_objects.map{|o| echo_ractor << o - o2 = echo_ractor.take results << "#{o} is copied" unless o.object_id == o2.object_id } unshareable_objects.map{|o| echo_ractor << o - o2 = echo_ractor.take results << "#{o.inspect} is not copied" if o.object_id == o2.object_id } @@ -712,7 +501,7 @@ assert_equal [false, true, false].inspect, <<~'RUBY', frozen_string_literal: fal def check obj1 obj2 = Ractor.new obj1 do |obj| obj - end.take obj1.object_id == obj2.object_id end @@ -734,7 +523,7 @@ assert_equal 'hello world', <<~'RUBY', frozen_string_literal: false str = 'hello' r.send str, move: true - modified = r.take begin str << ' exception' # raise Ractor::MovedError @@ -754,7 +543,7 @@ assert_equal '[0, 1]', %q{ a1 = [0] r.send a1, move: true - a2 = r.take begin a1 << 2 # raise Ractor::MovedError rescue Ractor::MovedError @@ -764,55 +553,13 @@ assert_equal '[0, 1]', %q{ # unshareable frozen objects should still be frozen in new ractor after move assert_equal 'true', %q{ -r = Ractor.new do - obj = receive - { frozen: obj.frozen? } -end -obj = [Object.new].freeze -r.send(obj, move: true) -r.take[:frozen] -} - -# move with yield -assert_equal 'hello', %q{ - r = Ractor.new do - Thread.current.report_on_exception = false - obj = 'hello' - Ractor.yield obj, move: true - obj << 'world' - end - - str = r.take - begin - r.take - rescue Ractor::RemoteError - str #=> "hello" - end -} - -# yield/move should not make moved object when the yield is not succeeded -assert_equal '"str"', %q{ - R = Ractor.new{} - M = Ractor.current r = Ractor.new do - s = 'str' - selected_r, v = Ractor.select R, yield_value: s, move: true - raise if selected_r != R # taken from R - M.send s.inspect # s should not be a moved object end - - Ractor.receive -} - -# yield/move can fail -assert_equal "allocator undefined for Thread", %q{ - r = Ractor.new do - obj = Thread.new{} - Ractor.yield obj - rescue => e - e.message - end - r.take } # Access to global-variables are prohibited @@ -823,7 +570,7 @@ assert_equal 'can not access global variables $gv from non-main Ractors', %q{ end begin - r.take rescue Ractor::RemoteError => e e.cause.message end @@ -836,7 +583,7 @@ assert_equal 'can not access global variables $gv from non-main Ractors', %q{ end begin - r.take rescue Ractor::RemoteError => e e.cause.message end @@ -850,7 +597,7 @@ assert_equal 'ok', %q{ } end - [$stdin, $stdout, $stderr].zip(r.take){|io, (oid, fno)| raise "should not be different object" if io.object_id == oid raise "fd should be same" unless io.fileno == fno } @@ -866,7 +613,7 @@ assert_equal 'ok', %q{ 'ok' end - r.take } # $DEBUG, $VERBOSE are Ractor local @@ -924,7 +671,7 @@ assert_equal 'true', %q{ h = Ractor.new do ractor_local_globals - end.take ractor_local_globals == h #=> true } @@ -933,7 +680,8 @@ assert_equal 'false', %q{ r = Ractor.new do self.object_id end - r.take == self.object_id #=> false } # self is a Ractor instance @@ -941,7 +689,12 @@ assert_equal 'true', %q{ r = Ractor.new do self.object_id end - r.object_id == r.take #=> true } # given block Proc will be isolated, so can not access outer variables. @@ -969,7 +722,7 @@ assert_equal "can not get unshareable values from instance variables of classes/ end begin - r.take rescue Ractor::RemoteError => e e.cause.message end @@ -985,7 +738,7 @@ assert_equal 'can not access instance variables of shareable objects from non-ma end begin - r.take rescue Ractor::RemoteError => e e.cause.message end @@ -1011,7 +764,7 @@ assert_equal 'can not access instance variables of shareable objects from non-ma end begin - r.take rescue Ractor::RemoteError => e e.cause.message end @@ -1032,7 +785,7 @@ assert_equal 'can not access instance variables of shareable objects from non-ma end begin - r.take rescue Ractor::RemoteError => e e.cause.message end @@ -1046,7 +799,7 @@ assert_equal '11', %q{ Ractor.new obj do |obj| obj.instance_variable_get('@a') - end.take.to_s }.join } @@ -1072,25 +825,25 @@ assert_equal '333', %q{ def self.fstr = @fstr end - a = Ractor.new{ C.int }.take b = Ractor.new do C.str.to_i rescue Ractor::IsolationError 10 - end.take c = Ractor.new do C.fstr.to_i - end.take - d = Ractor.new{ M.int }.take e = Ractor.new do M.str.to_i rescue Ractor::IsolationError 20 - end.take f = Ractor.new do M.fstr.to_i - end.take # 1 + 10 + 100 + 2 + 20 + 200 @@ -1108,28 +861,28 @@ assert_equal '["instance-variable", "instance-variable", nil]', %q{ Ractor.new{ [C.iv1, C.iv2, C.iv3] - }.take } # moved objects have their shape properly set to original object's shape assert_equal '1234', %q{ -class Obj - attr_accessor :a, :b, :c, :d - def initialize - @a = 1 - @b = 2 - @c = 3 end -end -r = Ractor.new do - obj = receive - obj.d = 4 - [obj.a, obj.b, obj.c, obj.d] -end -obj = Obj.new -r.send(obj, move: true) -values = r.take -values.join } # cvar in shareable-objects are not allowed to access from non-main Ractor @@ -1145,7 +898,7 @@ assert_equal 'can not access class variables from non-main Ractors', %q{ end begin - r.take rescue Ractor::RemoteError => e e.cause.message end @@ -1167,7 +920,7 @@ assert_equal 'can not access class variables from non-main Ractors', %q{ end begin - r.take rescue Ractor::RemoteError => e e.cause.message end @@ -1182,7 +935,7 @@ assert_equal 'can not access non-shareable objects in constant C::CONST by non-m C::CONST end begin - r.take rescue Ractor::RemoteError => e e.cause.message end @@ -1194,7 +947,7 @@ assert_equal "can not access non-shareable objects in constant Object::STR by no def str; STR; end s = str() # fill const cache begin - Ractor.new{ str() }.take rescue Ractor::RemoteError => e e.cause.message end @@ -1208,7 +961,7 @@ assert_equal 'can not set constants with non-shareable objects by non-main Racto C::CONST = 'str' end begin - r.take rescue Ractor::RemoteError => e e.cause.message end @@ -1219,7 +972,7 @@ assert_equal "defined with an un-shareable Proc in a different Ractor", %q{ str = "foo" define_method(:buggy){|i| str << "#{i}"} begin - Ractor.new{buggy(10)}.take rescue => e e.cause.message end @@ -1230,7 +983,7 @@ assert_equal '[1000, 3]', %q{ A = Array.new(1000).freeze # [nil, ...] H = {a: 1, b: 2, c: 3}.freeze - Ractor.new{ [A.size, H.size] }.take } # Ractor.count @@ -1240,15 +993,15 @@ assert_equal '[1, 4, 3, 2, 1]', %q{ ractors = (1..3).map { Ractor.new { Ractor.receive } } counts << Ractor.count - ractors[0].send('End 0').take sleep 0.1 until ractors[0].inspect =~ /terminated/ counts << Ractor.count - ractors[1].send('End 1').take sleep 0.1 until ractors[1].inspect =~ /terminated/ counts << Ractor.count - ractors[2].send('End 2').take sleep 0.1 until ractors[2].inspect =~ /terminated/ counts << Ractor.count @@ -1261,7 +1014,7 @@ assert_equal '0', %q{ n = 0 ObjectSpace.each_object{|o| n += 1 unless Ractor.shareable?(o)} n - }.take } # ObjectSpace._id2ref can not handle unshareable objects with Ractors @@ -1274,7 +1027,7 @@ assert_equal 'ok', <<~'RUBY', frozen_string_literal: false rescue => e :ok end - end.take RUBY # Ractor.make_shareable(obj) @@ -1446,7 +1199,7 @@ assert_equal '1', %q{ a = 2 end - Ractor.new{ C.new.foo }.take } # Ractor.make_shareable(a_proc) makes a proc shareable. @@ -1489,7 +1242,7 @@ assert_equal '[6, 10]', %q{ Ractor.new{ # line 5 a = 1 b = 2 - }.take c = 3 # line 9 end rs @@ -1499,7 +1252,7 @@ assert_equal '[6, 10]', %q{ assert_equal '[true, false]', %q{ Ractor.new([[]].freeze) { |ary| [ary.frozen?, ary.first.frozen? ] - }.take } # Ractor deep copies frozen objects (str) @@ -1507,7 +1260,7 @@ assert_equal '[true, false]', %q{ s = String.new.instance_eval { @x = []; freeze} Ractor.new(s) { |s| [s.frozen?, s.instance_variable_get(:@x).frozen?] - }.take } # Can not trap with not isolated Proc on non-main ractor @@ -1515,14 +1268,14 @@ assert_equal '[:ok, :ok]', %q{ a = [] Ractor.new{ trap(:INT){p :ok} - }.take a << :ok begin Ractor.new{ s = 'str' trap(:INT){p s} - }.take rescue => Ractor::RemoteError a << :ok end @@ -1552,12 +1305,12 @@ assert_equal '[nil, "b", "a"]', %q{ ans = [] Ractor.current[:key] = 'a' r = Ractor.new{ - Ractor.yield self[:key] self[:key] = 'b' self[:key] } - ans << r.take - ans << r.take ans << Ractor.current[:key] } @@ -1573,7 +1326,7 @@ assert_equal '1', %q{ } }.each(&:join) a.uniq.size - }.take } # Ractor-local storage @@ -1591,7 +1344,7 @@ assert_equal '2', %q{ fails += 1 if e.message =~ /Cannot set ractor local/ end fails - }.take } ### @@ -1607,7 +1360,7 @@ assert_equal "#{N}#{N}", %Q{ Ractor.new{ N.times{|i| -(i.to_s)} } - }.map{|r| r.take}.join } assert_equal "ok", %Q{ @@ -1616,7 +1369,7 @@ assert_equal "ok", %Q{ Ractor.new{ N.times.map{|i| -(i.to_s)} } - }.map{|r| r.take} N.times do |i| unless a[i].equal?(b[i]) raise [a[i], b[i]].inspect @@ -1638,7 +1391,7 @@ assert_equal "#{n}#{n}", %Q{ obj.instance_variable_defined?("@a") end end - }.map{|r| r.take}.join } # NameError @@ -1670,16 +1423,17 @@ assert_equal "ok", %q{ # Can yield back values while GC is sweeping [Bug #18117] assert_equal "ok", %q{ workers = (0...8).map do - Ractor.new do loop do 10_000.times.map { Object.new } - Ractor.yield Time.now end end end - 1_000.times { idle_worker, tmp_reporter = Ractor.select(*workers) } "ok" } if !yjit_enabled? && ENV['_WORKFLOW'] != 'ModGC' # flaky @@ -1782,14 +1536,14 @@ assert_equal 'true', %q{ } n = CS.inject(1){|r, c| r * c.foo} * LN - rs.map{|r| r.take} == Array.new(RN){n} } # check experimental warning assert_match /\Atest_ractor\.rb:1:\s+warning:\s+Ractor is experimental/, %q{ Warning[:experimental] = $VERBOSE = true STDERR.reopen(STDOUT) - eval("Ractor.new{}.take", nil, "test_ractor.rb", 1) }, frozen_string_literal: false # check moved object @@ -1807,7 +1561,7 @@ assert_equal 'ok', %q{ end r.send obj, move: true - r.take } ## Ractor::Selector @@ -1883,10 +1637,11 @@ assert_equal '600', %q{ RN = 100 s = Ractor::Selector.new rs = RN.times.map{ Ractor.new{ - Ractor.main << Ractor.new{ Ractor.yield :v3; :v4 } - Ractor.main << Ractor.new{ Ractor.yield :v5; :v6 } Ractor.yield :v1 :v2 } @@ -1952,7 +1707,7 @@ assert_equal 'true', %q{ # prism parser with -O0 build consumes a lot of machine stack Data.define(:fileno).new(1) end - }.take.fileno > 0 } # require_relative in Ractor @@ -1970,7 +1725,7 @@ assert_equal 'true', %q{ begin Ractor.new dummyfile do |f| require_relative File.basename(f) - end.take ensure File.unlink dummyfile end @@ -1987,7 +1742,7 @@ assert_equal 'LoadError', %q{ rescue LoadError => e e.class end - end.take } # autolaod in Ractor @@ -2002,7 +1757,7 @@ assert_equal 'true', %q{ Data.define(:fileno).new(1) end end - r.take.fileno > 0 } # failed in autolaod in Ractor @@ -2017,7 +1772,7 @@ assert_equal 'LoadError', %q{ e.class end end - r.take } # bind_call in Ractor [Bug #20934] @@ -2028,7 +1783,7 @@ assert_equal 'ok', %q{ Object.instance_method(:itself).bind_call(self) end end - end.each(&:take) GC.start :ok.itself } @@ -2038,7 +1793,7 @@ assert_equal 'ok', %q{ ractor = Ractor.new { Ractor.receive } obj = "foobarbazfoobarbazfoobarbazfoobarbaz" ractor.send(obj.dup, move: true) - roundtripped_obj = ractor.take roundtripped_obj == obj ? :ok : roundtripped_obj } @@ -2047,7 +1802,7 @@ assert_equal 'ok', %q{ ractor = Ractor.new { Ractor.receive } obj = Array.new(10, 42) ractor.send(obj.dup, move: true) - roundtripped_obj = ractor.take roundtripped_obj == obj ? :ok : roundtripped_obj } @@ -2056,7 +1811,7 @@ assert_equal 'ok', %q{ ractor = Ractor.new { Ractor.receive } obj = { foo: 1, bar: 2 } ractor.send(obj.dup, move: true) - roundtripped_obj = ractor.take roundtripped_obj == obj ? :ok : roundtripped_obj } @@ -2065,7 +1820,7 @@ assert_equal 'ok', %q{ ractor = Ractor.new { Ractor.receive } obj = "foo".match(/o/) ractor.send(obj.dup, move: true) - roundtripped_obj = ractor.take roundtripped_obj == obj ? :ok : roundtripped_obj } @@ -2074,7 +1829,7 @@ assert_equal 'ok', %q{ ractor = Ractor.new { Ractor.receive } obj = Struct.new(:a, :b, :c, :d, :e, :f).new(1, 2, 3, 4, 5, 6) ractor.send(obj.dup, move: true) - roundtripped_obj = ractor.take roundtripped_obj == obj ? :ok : roundtripped_obj } @@ -2101,7 +1856,7 @@ assert_equal 'ok', %q{ obj = SomeObject.new ractor.send(obj.dup, move: true) - roundtripped_obj = ractor.take roundtripped_obj == obj ? :ok : roundtripped_obj } @@ -2153,7 +1908,7 @@ assert_equal 'ok', %q{ obj = Array.new(10, 42) original = obj.dup ractor.send([obj].freeze, move: true) - roundtripped_obj = ractor.take[0] roundtripped_obj == original ? :ok : roundtripped_obj } @@ -2164,7 +1919,7 @@ assert_equal 'ok', %q{ obj.instance_variable_set(:@array, [1]) ractor.send(obj, move: true) - roundtripped_obj = ractor.take roundtripped_obj.instance_variable_get(:@array) == [1] ? :ok : roundtripped_obj } @@ -2188,7 +1943,9 @@ assert_equal 'ok', %q{ struct_class = Struct.new(:a) struct = struct_class.new(String.new('a')) o = MyObject.new(String.new('a')) - r = Ractor.new do loop do obj = Ractor.receive val = case obj @@ -2201,7 +1958,7 @@ assert_equal 'ok', %q{ when Object obj.a == 'a' end - Ractor.yield val end end @@ -2218,7 +1975,7 @@ assert_equal 'ok', %q{ parts_moved[klass] = [obj.a] end r.send(obj, move: true) - val = r.take if val != true raise "bad val in ractor for obj at i:#{i}" end @@ -2258,13 +2015,11 @@ begin r = Ractor.new { Ractor.receive } _, status = Process.waitpid2 fork { begin - r.take - raise "ng" - rescue Ractor::ClosedError end } r.send(123) - raise unless r.take == 123 status.success? ? "ok" : status rescue NotImplementedError :ok @@ -2278,12 +2033,11 @@ begin _, status = Process.waitpid2 fork { begin r.send(123) - raise "ng" rescue Ractor::ClosedError end } r.send(123) - raise unless r.take == 123 status.success? ? "ok" : status rescue NotImplementedError :ok @@ -2293,16 +2047,17 @@ end # Creating classes inside of Ractors # [Bug #18119] assert_equal 'ok', %q{ workers = (0...8).map do - Ractor.new do loop do 100.times.map { Class.new } - Ractor.yield nil end end end - 100.times { Ractor.select(*workers) } 'ok' } @@ -2315,7 +2070,7 @@ assert_equal 'ok', %q{ # It should not use this cached proc, it should create a new one. If it used # the cached proc, we would get a ractor_confirm_belonging error here. :inspect.to_proc - end.take 'ok' } @@ -2326,115 +2081,133 @@ assert_equal 'ok', %q{ a.object_id a.dup # this deletes generic ivar on dupped object 'ok' - end.take } -# There are some bugs in Windows with multiple threads in same ractor calling ractor actions -# Ex: https://.com/ruby/ruby/actions/runs/14998660285/job/42139383905 -unless /mswin/ =~ RUBY_PLATFORM - # r.send and r.take from multiple threads - # [Bug #21037] - assert_equal '[true, true]', %q{ - class Map - def initialize - @r = Ractor.new { - loop do - key = Ractor.receive - Ractor.yield key - end - } - end - def fetch(key) - @r.send key - @r.take - end end - tm = Map.new - t1 = Thread.new { 10.times.map { tm.fetch("t1") } } - t2 = Thread.new { 10.times.map { tm.fetch("t2") } } - vals = t1.value + t2.value - [ - vals.first(10).all? { |v| v == "t1" }, - vals.last(10).all? { |v| v == "t2" } - ] - } - # r.send and Ractor.select from multiple threads - assert_equal '[true, true]', %q{ - class Map - def initialize - @r = Ractor.new { - loop do - key = Ractor.receive - Ractor.yield key - end - } - end - def fetch(key) - @r.send key - _r, val = Ractor.select(@r) - val - end end - tm = Map.new - t1 = Thread.new { 10.times.map { tm.fetch("t1") } } - t2 = Thread.new { 10.times.map { tm.fetch("t2") } } - vals = t1.value + t2.value - [ - vals.first(10).all? { |v| v == "t1" }, - vals.last(10).all? { |v| v == "t2" } - ] - } - # Ractor.receive in multiple threads in same ractor - # [Bug #17624] - assert_equal '["T1 received", "T2 received"]', %q{ - r1 = Ractor.new do - output = [] - m = Mutex.new - # Start two listener threads - t1 = Thread.new do - Ractor.receive - m.synchronize do - output << "T1 received" - end - end - t2 = Thread.new do - Ractor.receive - m.synchronize do - output << "T2 received" - end - end - sleep 0.1 until [t1,t2].all? { |t| t.status == "sleep" } - Ractor.main.send(:both_blocking) - [t1, t2].each(&:join) - output end - Ractor.receive # wait until both threads have blocked - r1.send(1) - r1.send(2) - r1.take.sort - } -end -# Moving an old object -assert_equal 'ok', %q{ r = Ractor.new do - o = Ractor.receive - GC.verify_internal_consistency - GC.start - o end - o = "ok" - # Make o an old object - 3.times { GC.start } - r.send(o, move: true) - r.take } @@ -3018,15 +3018,16 @@ assert_equal '[:itself]', %q{ itself end - tracing_ractor = Ractor.new do # 1: start tracing events = [] tp = TracePoint.new(:c_call) { events << _1.method_id } tp.enable - Ractor.yield(nil) # 3: run compiled method on tracing ractor - Ractor.yield(nil) traced_method events @@ -3034,13 +3035,13 @@ assert_equal '[:itself]', %q{ tp&.disable end - tracing_ractor.take # 2: compile on non tracing ractor traced_method - tracing_ractor.take - tracing_ractor.take } # Try to hit a lazy branch stub while another ractor enables tracing @@ -3054,17 +3055,18 @@ assert_equal '42', %q{ end end - ractor = Ractor.new do compiled(false) - Ractor.yield(nil) compiled(41) end tp = TracePoint.new(:line) { itself } - ractor.take tp.enable - ractor.take } # Test equality with changing types @@ -3140,7 +3142,7 @@ assert_equal '42', %q{ A.foo A.foo - Ractor.new { A.foo }.take } assert_equal '["plain", "special", "sub", "plain"]', %q{ @@ -3859,36 +3861,6 @@ assert_equal '3,12', %q{ pt_inspect(p) } -# Regression test for deadlock between branch_stub_hit and ractor_receive_if -assert_equal '10', %q{ - r = Ractor.new Ractor.current do |main| - main << 1 - main << 2 - main << 3 - main << 4 - main << 5 - main << 6 - main << 7 - main << 8 - main << 9 - main << 10 - end - - a = [] - a << Ractor.receive_if{|msg| msg == 10} - a << Ractor.receive_if{|msg| msg == 9} - a << Ractor.receive_if{|msg| msg == 8} - a << Ractor.receive_if{|msg| msg == 7} - a << Ractor.receive_if{|msg| msg == 6} - a << Ractor.receive_if{|msg| msg == 5} - a << Ractor.receive_if{|msg| msg == 4} - a << Ractor.receive_if{|msg| msg == 3} - a << Ractor.receive_if{|msg| msg == 2} - a << Ractor.receive_if{|msg| msg == 1} - - a.length -} - # checktype assert_equal 'false', %q{ def function() @@ -374,7 +374,7 @@ assert_equal 'ok', %q{ r = Ractor.new do 'ok' end - r.take } # Passed arguments to Ractor.new will be a block parameter @@ -384,7 +384,7 @@ assert_equal 'ok', %q{ r = Ractor.new 'ok' do |msg| msg end - r.take } # Pass multiple arguments to Ractor.new @@ -393,7 +393,7 @@ assert_equal 'ok', %q{ r = Ractor.new 'ping', 'pong' do |msg, msg2| [msg, msg2] end - 'ok' if r.take == ['ping', 'pong'] } # Ractor#send passes an object with copy to a Ractor @@ -403,7 +403,7 @@ assert_equal 'ok', %q{ msg = Ractor.receive end r.send 'ok' - r.take } assert_equal '[1, 2, 3]', %q{ @@ -14293,6 +14293,7 @@ ractor.$(OBJEXT): {$(VPATH)}ractor.c ractor.$(OBJEXT): {$(VPATH)}ractor.h ractor.$(OBJEXT): {$(VPATH)}ractor.rbinc ractor.$(OBJEXT): {$(VPATH)}ractor_core.h ractor.$(OBJEXT): {$(VPATH)}ruby_assert.h ractor.$(OBJEXT): {$(VPATH)}ruby_atomic.h ractor.$(OBJEXT): {$(VPATH)}rubyparser.h @@ -169,7 +169,7 @@ rb_gc_vm_lock_no_barrier(void) void rb_gc_vm_unlock_no_barrier(unsigned int lev) { - RB_VM_LOCK_LEAVE_LEV(&lev); } void @@ -2181,7 +2181,7 @@ newobj_init(VALUE klass, VALUE flags, int wb_protected, rb_objspace_t *objspace, gc_report(5, objspace, "newobj: %s\n", rb_obj_info(obj)); - RUBY_DEBUG_LOG("obj:%p (%s)", (void *)obj, rb_obj_info(obj)); return obj; } @@ -178,37 +178,21 @@ ractor_status_p(rb_ractor_t *r, enum ractor_status status) // Ractor data/mark/free -static struct rb_ractor_basket *ractor_queue_at(rb_ractor_t *r, struct rb_ractor_queue *rq, int i); static void ractor_local_storage_mark(rb_ractor_t *r); static void ractor_local_storage_free(rb_ractor_t *r); -static void -ractor_queue_mark(struct rb_ractor_queue *rq) -{ - for (int i=0; i<rq->cnt; i++) { - struct rb_ractor_basket *b = ractor_queue_at(NULL, rq, i); - rb_gc_mark(b->sender); - - switch (b->type.e) { - case basket_type_yielding: - case basket_type_take_basket: - case basket_type_deleted: - case basket_type_reserved: - // ignore - break; - default: - rb_gc_mark(b->p.send.v); - } - } -} static void ractor_mark(void *ptr) { rb_ractor_t *r = (rb_ractor_t *)ptr; - ractor_queue_mark(&r->sync.recv_queue); - ractor_queue_mark(&r->sync.takers_queue); rb_gc_mark(r->loc); rb_gc_mark(r->name); @@ -229,19 +213,14 @@ ractor_mark(void *ptr) } static void -ractor_queue_free(struct rb_ractor_queue *rq) -{ - free(rq->baskets); -} - -static void ractor_free(void *ptr) { rb_ractor_t *r = (rb_ractor_t *)ptr; RUBY_DEBUG_LOG("free r:%d", rb_ractor_id(r)); rb_native_mutex_destroy(&r->sync.lock); - ractor_queue_free(&r->sync.recv_queue); - ractor_queue_free(&r->sync.takers_queue); ractor_local_storage_free(r); rb_hook_list_free(&r->pub.hooks); @@ -252,24 +231,17 @@ ractor_free(void *ptr) r->newobj_cache = NULL; } ruby_xfree(r); } static size_t -ractor_queue_memsize(const struct rb_ractor_queue *rq) -{ - return sizeof(struct rb_ractor_basket) * rq->size; -} - -static size_t ractor_memsize(const void *ptr) { rb_ractor_t *r = (rb_ractor_t *)ptr; // TODO: more correct? - return sizeof(rb_ractor_t) + - ractor_queue_memsize(&r->sync.recv_queue) + - ractor_queue_memsize(&r->sync.takers_queue); } static const rb_data_type_t ractor_data_type = { @@ -317,1714 +289,7 @@ rb_ractor_current_id(void) } #endif -// Ractor queue - -static void -ractor_queue_setup(struct rb_ractor_queue *rq) -{ - rq->size = 2; - rq->cnt = 0; - rq->start = 0; - rq->baskets = malloc(sizeof(struct rb_ractor_basket) * rq->size); -} - -static struct rb_ractor_basket * -ractor_queue_head(rb_ractor_t *r, struct rb_ractor_queue *rq) -{ - if (r != NULL) ASSERT_ractor_locking(r); - return &rq->baskets[rq->start]; -} - -static struct rb_ractor_basket * -ractor_queue_at(rb_ractor_t *r, struct rb_ractor_queue *rq, int i) -{ - if (r != NULL) ASSERT_ractor_locking(r); - return &rq->baskets[(rq->start + i) % rq->size]; -} - -static void -ractor_queue_advance(rb_ractor_t *r, struct rb_ractor_queue *rq) -{ - ASSERT_ractor_locking(r); - - if (rq->reserved_cnt == 0) { - rq->cnt--; - rq->start = (rq->start + 1) % rq->size; - rq->serial++; - } - else { - ractor_queue_at(r, rq, 0)->type.e = basket_type_deleted; - } -} - -static bool -ractor_queue_skip_p(rb_ractor_t *r, struct rb_ractor_queue *rq, int i) -{ - struct rb_ractor_basket *b = ractor_queue_at(r, rq, i); - return basket_type_p(b, basket_type_deleted) || - basket_type_p(b, basket_type_reserved); -} - -static void -ractor_queue_compact(rb_ractor_t *r, struct rb_ractor_queue *rq) -{ - ASSERT_ractor_locking(r); - - while (rq->cnt > 0 && basket_type_p(ractor_queue_at(r, rq, 0), basket_type_deleted)) { - ractor_queue_advance(r, rq); - } -} - -static bool -ractor_queue_empty_p(rb_ractor_t *r, struct rb_ractor_queue *rq) -{ - ASSERT_ractor_locking(r); - - if (rq->cnt == 0) { - return true; - } - - ractor_queue_compact(r, rq); - - for (int i=0; i<rq->cnt; i++) { - if (!ractor_queue_skip_p(r, rq, i)) { - return false; - } - } - - return true; -} - -static bool -ractor_queue_deq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket) -{ - ASSERT_ractor_locking(r); - - for (int i=0; i<rq->cnt; i++) { - if (!ractor_queue_skip_p(r, rq, i)) { - struct rb_ractor_basket *b = ractor_queue_at(r, rq, i); - *basket = *b; - - // remove from queue - b->type.e = basket_type_deleted; - ractor_queue_compact(r, rq); - return true; - } - } - - return false; -} - -static void -ractor_queue_enq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket) -{ - ASSERT_ractor_locking(r); - - if (rq->size <= rq->cnt) { - rq->baskets = realloc(rq->baskets, sizeof(struct rb_ractor_basket) * rq->size * 2); - for (int i=rq->size - rq->start; i<rq->cnt; i++) { - rq->baskets[i + rq->start] = rq->baskets[i + rq->start - rq->size]; - } - rq->size *= 2; - } - // copy basket into queue - rq->baskets[(rq->start + rq->cnt++) % rq->size] = *basket; - // fprintf(stderr, "%s %p->cnt:%d\n", RUBY_FUNCTION_NAME_STRING, (void *)rq, rq->cnt); -} - -static void -ractor_queue_delete(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket) -{ - basket->type.e = basket_type_deleted; -} - -// Ractor basket - -static VALUE ractor_reset_belonging(VALUE obj); // in this file - -static VALUE -ractor_basket_value(struct rb_ractor_basket *b) -{ - switch (b->type.e) { - case basket_type_ref: - break; - case basket_type_copy: - case basket_type_move: - case basket_type_will: - b->type.e = basket_type_ref; - b->p.send.v = ractor_reset_belonging(b->p.send.v); - break; - default: - rb_bug("unreachable"); - } - - return b->p.send.v; -} - -static VALUE -ractor_basket_accept(struct rb_ractor_basket *b) -{ - VALUE v = ractor_basket_value(b); - - // a ractor's main thread had an error and yielded us this exception during its dying moments - if (b->p.send.exception) { - VALUE cause = v; - VALUE err = rb_exc_new_cstr(rb_eRactorRemoteError, "thrown by remote Ractor."); - rb_ivar_set(err, rb_intern("@ractor"), b->sender); - rb_ec_setup_exception(NULL, err, cause); - rb_exc_raise(err); - } - - return v; -} - -// Ractor synchronizations - -#if USE_RUBY_DEBUG_LOG -static const char * -wait_status_str(enum rb_ractor_wait_status wait_status) -{ - switch ((int)wait_status) { - case wait_none: return "none"; - case wait_receiving: return "receiving"; - case wait_taking: return "taking"; - case wait_yielding: return "yielding"; - case wait_receiving|wait_taking: return "receiving|taking"; - case wait_receiving|wait_yielding: return "receiving|yielding"; - case wait_taking|wait_yielding: return "taking|yielding"; - case wait_receiving|wait_taking|wait_yielding: return "receiving|taking|yielding"; - } - rb_bug("unreachable"); -} - -static const char * -wakeup_status_str(enum rb_ractor_wakeup_status wakeup_status) -{ - switch (wakeup_status) { - case wakeup_none: return "none"; - case wakeup_by_send: return "by_send"; - case wakeup_by_yield: return "by_yield"; - case wakeup_by_take: return "by_take"; - case wakeup_by_close: return "by_close"; - case wakeup_by_interrupt: return "by_interrupt"; - case wakeup_by_retry: return "by_retry"; - } - rb_bug("unreachable"); -} - -static const char * -basket_type_name(enum rb_ractor_basket_type type) -{ - switch (type) { - case basket_type_none: return "none"; - case basket_type_ref: return "ref"; - case basket_type_copy: return "copy"; - case basket_type_move: return "move"; - case basket_type_will: return "will"; - case basket_type_deleted: return "deleted"; - case basket_type_reserved: return "reserved"; - case basket_type_take_basket: return "take_basket"; - case basket_type_yielding: return "yielding"; - } - VM_ASSERT(0); - return NULL; -} -#endif // USE_RUBY_DEBUG_LOG - -static rb_thread_t * -ractor_sleeping_by(const rb_ractor_t *r, rb_thread_t *th, enum rb_ractor_wait_status wait_status) -{ - if (th) { - if ((th->ractor_waiting.wait_status & wait_status) && th->ractor_waiting.wakeup_status == wakeup_none) { - return th; - } - } - else { - // find any thread that has this ractor wait status that is blocked - ccan_list_for_each(&r->sync.wait.waiting_threads, th, ractor_waiting.waiting_node) { - if ((th->ractor_waiting.wait_status & wait_status) && th->ractor_waiting.wakeup_status == wakeup_none) { - return th; - } - } - } - return NULL; -} - -#ifdef RUBY_THREAD_PTHREAD_H -// thread_*.c -void rb_ractor_sched_wakeup(rb_ractor_t *r, rb_thread_t *th); -#else - -// win32 -static void -rb_ractor_sched_wakeup(rb_ractor_t *r, rb_thread_t *th) -{ - (void)r; - ASSERT_ractor_locking(r); - rb_native_cond_signal(&th->ractor_waiting.cond); - -} -#endif - - -/* - * Wakeup `r` if the given `th` is blocked and has the given ractor `wait_status`. - * Wakeup any blocked thread in `r` with the given ractor `wait_status` if `th` is NULL. - */ -static bool -ractor_wakeup(rb_ractor_t *r, rb_thread_t *th /* can be NULL */, enum rb_ractor_wait_status wait_status, enum rb_ractor_wakeup_status wakeup_status) -{ - ASSERT_ractor_locking(r); - - RUBY_DEBUG_LOG("r:%u wait:%s wakeup:%s", - rb_ractor_id(r), - wait_status_str(wait_status), - wakeup_status_str(wakeup_status)); - - if ((th = ractor_sleeping_by(r, th, wait_status)) != NULL) { - th->ractor_waiting.wakeup_status = wakeup_status; - rb_ractor_sched_wakeup(r, th); - return true; - } - else { - return false; - } -} - -// unblock function (UBF). This gets called when another thread on this or another ractor sets our thread's interrupt flag. -// This is not async-safe. -static void -ractor_sleep_interrupt(void *ptr) -{ - rb_execution_context_t *ec = ptr; - rb_ractor_t *r = rb_ec_ractor_ptr(ec); - rb_thread_t *th = rb_ec_thread_ptr(ec); - - RACTOR_LOCK(r); - { - ractor_wakeup(r, th, wait_receiving | wait_taking | wait_yielding, wakeup_by_interrupt); - } - RACTOR_UNLOCK(r); -} - -typedef void (*ractor_sleep_cleanup_function)(rb_ractor_t *cr, void *p); - -// Checks the current thread for ruby interrupts and runs the cleanup function `cf_func` with `cf_data` if -// `rb_ec_check_ints` is going to raise. See the `rb_threadptr_execute_interrupts` for info on when it can raise. -static void -ractor_check_ints(rb_execution_context_t *ec, rb_ractor_t *cr, rb_thread_t *cur_th, ractor_sleep_cleanup_function cf_func, void *cf_data) -{ - if (cur_th->ractor_waiting.wait_status != wait_none) { - enum rb_ractor_wait_status prev_wait_status = cur_th->ractor_waiting.wait_status; - cur_th->ractor_waiting.wait_status = wait_none; - cur_th->ractor_waiting.wakeup_status = wakeup_by_interrupt; - - RACTOR_UNLOCK(cr); - { - if (cf_func) { - enum ruby_tag_type state; - EC_PUSH_TAG(ec); - if ((state = EC_EXEC_TAG()) == TAG_NONE) { - rb_ec_check_ints(ec); - } - EC_POP_TAG(); - - if (state) { - (*cf_func)(cr, cf_data); // cleanup function is run after the ubf, if it had ubf - EC_JUMP_TAG(ec, state); - } - } - else { - rb_ec_check_ints(ec); - } - } - - RACTOR_LOCK(cr); - cur_th->ractor_waiting.wait_status = prev_wait_status; - } -} - -#ifdef RUBY_THREAD_PTHREAD_H -void rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf); -#else - -static void -ractor_cond_wait(rb_ractor_t *r, rb_thread_t *th) -{ -#if RACTOR_CHECK_MODE > 0 - VALUE locked_by = r->sync.locked_by; - r->sync.locked_by = Qnil; -#endif - rb_native_cond_wait(&th->ractor_waiting.cond, &r->sync.lock); - -#if RACTOR_CHECK_MODE > 0 - r->sync.locked_by = locked_by; -#endif -} - -static void * -ractor_sleep_wo_gvl(void *ptr) -{ - rb_ractor_t *cr = ptr; - rb_execution_context_t *ec = cr->threads.running_ec; - VM_ASSERT(GET_EC() == ec); - rb_thread_t *cur_th = rb_ec_thread_ptr(ec); - RACTOR_LOCK_SELF(cr); - { - VM_ASSERT(cur_th->ractor_waiting.wait_status != wait_none); - // it's possible that another ractor has woken us up (ractor_wakeup), - // so check this condition - if (cur_th->ractor_waiting.wakeup_status == wakeup_none) { - cur_th->status = THREAD_STOPPED_FOREVER; - ractor_cond_wait(cr, cur_th); - cur_th->status = THREAD_RUNNABLE; - VM_ASSERT(cur_th->ractor_waiting.wakeup_status != wakeup_none); - } - else { - RUBY_DEBUG_LOG("rare timing, no cond wait"); - } - cur_th->ractor_waiting.wait_status = wait_none; - } - RACTOR_UNLOCK_SELF(cr); - return NULL; -} - -static void -rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf_ractor_sleep_interrupt) -{ - ASSERT_ractor_locking(cr); - rb_thread_t *th = rb_ec_thread_ptr(ec); - struct ccan_list_node *waitn = &th->ractor_waiting.waiting_node; - VM_ASSERT(waitn->next == waitn->prev && waitn->next == waitn); // it should be unlinked - ccan_list_add(&cr->sync.wait.waiting_threads, waitn); - RACTOR_UNLOCK(cr); - { - rb_nogvl(ractor_sleep_wo_gvl, cr, ubf_ractor_sleep_interrupt, ec, RB_NOGVL_INTR_FAIL); - } - RACTOR_LOCK(cr); - ccan_list_del_init(waitn); -} -#endif - -/* - * Sleep the current ractor's current thread until another ractor wakes us up or another thread calls our unblock function. - * The following ractor actions can cause this function to be called: - * Ractor#take (wait_taking) - * Ractor.yield (wait_yielding) - * Ractor.receive (wait_receiving) - * Ractor.select (can be a combination of the above wait states, depending on the states of the ractors passed to Ractor.select) - */ -static enum rb_ractor_wakeup_status -ractor_sleep_with_cleanup(rb_execution_context_t *ec, rb_ractor_t *cr, rb_thread_t *cur_th, enum rb_ractor_wait_status wait_status, - ractor_sleep_cleanup_function cf_func, void *cf_data) -{ - ASSERT_ractor_locking(cr); - enum rb_ractor_wakeup_status wakeup_status; - VM_ASSERT(GET_RACTOR() == cr); - - VM_ASSERT(cur_th->ractor_waiting.wait_status == wait_none); - VM_ASSERT(wait_status != wait_none); - cur_th->ractor_waiting.wait_status = wait_status; - cur_th->ractor_waiting.wakeup_status = wakeup_none; - - // fprintf(stderr, "%s r:%p status:%s, wakeup_status:%s\n", RUBY_FUNCTION_NAME_STRING, (void *)cr, - // wait_status_str(cr->sync.wait.status), wakeup_status_str(cr->sync.wait.wakeup_status)); - - RUBY_DEBUG_LOG("sleep by %s", wait_status_str(wait_status)); - - while (cur_th->ractor_waiting.wakeup_status == wakeup_none) { - rb_ractor_sched_sleep(ec, cr, ractor_sleep_interrupt); - ractor_check_ints(ec, cr, cur_th, cf_func, cf_data); - } - - cur_th->ractor_waiting.wait_status = wait_none; - - wakeup_status = cur_th->ractor_waiting.wakeup_status; - cur_th->ractor_waiting.wakeup_status = wakeup_none; - - RUBY_DEBUG_LOG("wakeup %s", wakeup_status_str(wakeup_status)); - - ASSERT_ractor_locking(cr); - return wakeup_status; -} - -static enum rb_ractor_wakeup_status -ractor_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_thread_t *cur_th, enum rb_ractor_wait_status wait_status) -{ - return ractor_sleep_with_cleanup(ec, cr, cur_th, wait_status, 0, NULL); -} - -// Ractor.receive - -static void -ractor_recursive_receive_if(rb_thread_t *th) -{ - if (th->ractor_waiting.receiving_mutex && rb_mutex_owned_p(th->ractor_waiting.receiving_mutex)) { - rb_raise(rb_eRactorError, "can not call receive/receive_if recursively"); - } -} - -static VALUE -ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *rq) -{ - struct rb_ractor_basket basket; - ractor_recursive_receive_if(rb_ec_thread_ptr(ec)); - bool received = false; - - RACTOR_LOCK_SELF(cr); - { - RUBY_DEBUG_LOG("rq->cnt:%d", rq->cnt); - received = ractor_queue_deq(cr, rq, &basket); - } - RACTOR_UNLOCK_SELF(cr); - - if (!received) { - if (cr->sync.incoming_port_closed) { - rb_raise(rb_eRactorClosedError, "The incoming port is already closed"); - } - return Qundef; - } - else { - return ractor_basket_accept(&basket); - } -} - -static void -ractor_wait_receive(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *rq) -{ - VM_ASSERT(cr == rb_ec_ractor_ptr(ec)); - rb_thread_t *cur_th = rb_ec_thread_ptr(ec); - ractor_recursive_receive_if(cur_th); - - RACTOR_LOCK(cr); - { - while (ractor_queue_empty_p(cr, rq) && !cr->sync.incoming_port_closed) { - ractor_sleep(ec, cr, cur_th, wait_receiving); - } - } - RACTOR_UNLOCK(cr); -} - -static VALUE -ractor_receive(rb_execution_context_t *ec, rb_ractor_t *cr) -{ - VM_ASSERT(cr == rb_ec_ractor_ptr(ec)); - VALUE v; - struct rb_ractor_queue *rq = &cr->sync.recv_queue; - - while (UNDEF_P(v = ractor_try_receive(ec, cr, rq))) { - ractor_wait_receive(ec, cr, rq); - } - - return v; -} - -#if 0 -static void -rq_dump(struct rb_ractor_queue *rq) -{ - bool bug = false; - for (int i=0; i<rq->cnt; i++) { - struct rb_ractor_basket *b = ractor_queue_at(NULL, rq, i); - fprintf(stderr, "%d (start:%d) type:%s %p %s\n", i, rq->start, basket_type_name(b->type), - (void *)b, RSTRING_PTR(RARRAY_AREF(b->v, 1))); - if (basket_type_p(b, basket_type_reserved) bug = true; - } - if (bug) rb_bug("!!"); -} -#endif - -struct receive_block_data { - rb_ractor_t *cr; - rb_thread_t *th; - struct rb_ractor_queue *rq; - VALUE v; - int index; - bool success; -}; - -static void -ractor_receive_if_lock(rb_thread_t *th) -{ - VALUE m = th->ractor_waiting.receiving_mutex; - if (m == Qfalse) { - m = th->ractor_waiting.receiving_mutex = rb_mutex_new(); - } - rb_mutex_lock(m); -} - -static VALUE -receive_if_body(VALUE ptr) -{ - struct receive_block_data *data = (struct receive_block_data *)ptr; - - ractor_receive_if_lock(data->th); - VALUE block_result = rb_yield(data->v); - rb_ractor_t *cr = data->cr; - - RACTOR_LOCK_SELF(cr); - { - struct rb_ractor_basket *b = ractor_queue_at(cr, data->rq, data->index); - VM_ASSERT(basket_type_p(b, basket_type_reserved)); - data->rq->reserved_cnt--; - - if (RTEST(block_result)) { - ractor_queue_delete(cr, data->rq, b); - ractor_queue_compact(cr, data->rq); - } - else { - b->type.e = basket_type_ref; - } - } - RACTOR_UNLOCK_SELF(cr); - - data->success = true; - - if (RTEST(block_result)) { - return data->v; - } - else { - return Qundef; - } -} - -static VALUE -receive_if_ensure(VALUE v) -{ - struct receive_block_data *data = (struct receive_block_data *)v; - rb_ractor_t *cr = data->cr; - rb_thread_t *cur_th = data->th; - - if (!data->success) { - RACTOR_LOCK_SELF(cr); - { - struct rb_ractor_basket *b = ractor_queue_at(cr, data->rq, data->index); - VM_ASSERT(basket_type_p(b, basket_type_reserved)); - b->type.e = basket_type_deleted; - data->rq->reserved_cnt--; - } - RACTOR_UNLOCK_SELF(cr); - } - - rb_mutex_unlock(cur_th->ractor_waiting.receiving_mutex); - return Qnil; -} - -static VALUE -ractor_receive_if(rb_execution_context_t *ec, VALUE crv, VALUE b) -{ - if (!RTEST(b)) rb_raise(rb_eArgError, "no block given"); - - rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - rb_thread_t *cur_th = rb_ec_thread_ptr(ec); - unsigned int serial = (unsigned int)-1; - int index = 0; - struct rb_ractor_queue *rq = &cr->sync.recv_queue; - - while (1) { - VALUE v = Qundef; - - ractor_wait_receive(ec, cr, rq); - - RACTOR_LOCK_SELF(cr); - { - if (serial != rq->serial) { - serial = rq->serial; - index = 0; - } - - // check newer version - for (int i=index; i<rq->cnt; i++) { - if (!ractor_queue_skip_p(cr, rq, i)) { - struct rb_ractor_basket *b = ractor_queue_at(cr, rq, i); - v = ractor_basket_value(b); - b->type.e = basket_type_reserved; - rq->reserved_cnt++; - index = i; - break; - } - } - } - RACTOR_UNLOCK_SELF(cr); - - if (!UNDEF_P(v)) { - struct receive_block_data data = { - .cr = cr, - .th = cur_th, - .rq = rq, - .v = v, - .index = index, - .success = false, - }; - - VALUE result = rb_ensure(receive_if_body, (VALUE)&data, - receive_if_ensure, (VALUE)&data); - - if (!UNDEF_P(result)) return result; - index++; - } - - RUBY_VM_CHECK_INTS(ec); - } -} - -static void -ractor_send_basket(rb_execution_context_t *ec, rb_ractor_t *r, struct rb_ractor_basket *b) -{ - bool closed = false; - - RACTOR_LOCK(r); - { - if (r->sync.incoming_port_closed) { - closed = true; - } - else { - ractor_queue_enq(r, &r->sync.recv_queue, b); - // wakeup any receiving thread in `r` - ractor_wakeup(r, NULL, wait_receiving, wakeup_by_send); - } - } - RACTOR_UNLOCK(r); - - if (closed) { - rb_raise(rb_eRactorClosedError, "The incoming-port is already closed"); - } -} - -// Ractor#send - -static VALUE ractor_move(VALUE obj); // in this file -static VALUE ractor_copy(VALUE obj); // in this file - -static void -ractor_basket_prepare_contents(VALUE obj, VALUE move, volatile VALUE *pobj, enum rb_ractor_basket_type *ptype) -{ - VALUE v; - enum rb_ractor_basket_type type; - - if (rb_ractor_shareable_p(obj)) { - type = basket_type_ref; - v = obj; - } - else if (!RTEST(move)) { - v = ractor_copy(obj); - type = basket_type_copy; - } - else { - type = basket_type_move; - v = ractor_move(obj); - } - - *pobj = v; - *ptype = type; -} - -static void -ractor_basket_fill_(rb_ractor_t *cr, rb_thread_t *cur_th, struct rb_ractor_basket *basket, VALUE obj, bool exc) -{ - VM_ASSERT(cr == GET_RACTOR()); - - basket->sender = cr->pub.self; - basket->sending_th = cur_th; - basket->p.send.exception = exc; - basket->p.send.v = obj; -} - -static void -ractor_basket_fill(rb_ractor_t *cr, rb_thread_t *cur_th, struct rb_ractor_basket *basket, VALUE obj, VALUE move, bool exc) -{ - VALUE v; - enum rb_ractor_basket_type type; - ractor_basket_prepare_contents(obj, move, &v, &type); - ractor_basket_fill_(cr, cur_th, basket, v, exc); - basket->type.e = type; -} - -static void -ractor_basket_fill_will(rb_ractor_t *cr, rb_thread_t *cur_th, struct rb_ractor_basket *basket, VALUE obj, bool exc) -{ - ractor_basket_fill_(cr, cur_th, basket, obj, exc); - basket->type.e = basket_type_will; -} - -static VALUE -ractor_send(rb_execution_context_t *ec, rb_ractor_t *recv_r, VALUE obj, VALUE move) -{ - struct rb_ractor_basket basket; - rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - rb_thread_t *cur_th = rb_ec_thread_ptr(ec); - // TODO: Ractor local GC - ractor_basket_fill(cr, cur_th, &basket, obj, move, false); - ractor_send_basket(ec, recv_r, &basket); - return recv_r->pub.self; -} - -// Ractor#take - -static bool -ractor_take_has_will(rb_ractor_t *r) -{ - ASSERT_ractor_locking(r); - - return basket_type_p(&r->sync.will_basket, basket_type_will); -} - -static bool -ractor_take_will(rb_ractor_t *r, struct rb_ractor_basket *b) -{ - ASSERT_ractor_locking(r); - - if (ractor_take_has_will(r)) { - *b = r->sync.will_basket; - r->sync.will_basket.type.e = basket_type_none; - return true; - } - else { - VM_ASSERT(basket_type_p(&r->sync.will_basket, basket_type_none)); - return false; - } -} - -static bool -ractor_take_will_lock(rb_ractor_t *r, struct rb_ractor_basket *b) -{ - ASSERT_ractor_unlocking(r); - bool taken; - - RACTOR_LOCK(r); - { - taken = ractor_take_will(r, b); - } - RACTOR_UNLOCK(r); - - return taken; -} - -static bool -ractor_register_take(rb_ractor_t *cr, rb_thread_t *cur_th, rb_ractor_t *r, struct rb_ractor_basket *take_basket, - bool is_take, struct rb_ractor_selector_take_config *config, bool ignore_error) -{ - struct rb_ractor_basket b = { - .type.e = basket_type_take_basket, - .sender = cr->pub.self, - .sending_th = cur_th, - .p = { - .take = { - .basket = take_basket, // pointer to our stack value saved in ractor `r` queue - .config = config, - }, - }, - }; - bool closed = false; - - RACTOR_LOCK(r); - { - if (is_take && ractor_take_will(r, take_basket)) { - RUBY_DEBUG_LOG("take over a will of r:%d", rb_ractor_id(r)); - } - else if (!is_take && ractor_take_has_will(r)) { - RUBY_DEBUG_LOG("has_will"); - VM_ASSERT(config != NULL); - config->closed = true; - } - else if (r->sync.outgoing_port_closed) { - closed = true; - } - else { - RUBY_DEBUG_LOG("register in r:%d", rb_ractor_id(r)); - ractor_queue_enq(r, &r->sync.takers_queue, &b); - - if (basket_none_p(take_basket)) { - // wakeup any thread in `r` that has yielded, if there is any. - ractor_wakeup(r, NULL, wait_yielding, wakeup_by_take); - } - } - } - RACTOR_UNLOCK(r); - - if (closed) { - if (!ignore_error) rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed"); - return false; - } - else { - return true; - } -} - -static bool -ractor_deregister_take(rb_ractor_t *r, struct rb_ractor_basket *take_basket) -{ - struct rb_ractor_queue *ts = &r->sync.takers_queue; - bool deleted = false; - - RACTOR_LOCK(r); - { - if (r->sync.outgoing_port_closed) { - // ok - } - else { - for (int i=0; i<ts->cnt; i++) { - struct rb_ractor_basket *b = ractor_queue_at(r, ts, i); - if (basket_type_p(b, basket_type_take_basket) && b->p.take.basket == take_basket) { - ractor_queue_delete(r, ts, b); - deleted = true; - } - } - if (deleted) { - ractor_queue_compact(r, ts); - } - } - } - RACTOR_UNLOCK(r); - - return deleted; -} - -static VALUE -ractor_try_take(rb_ractor_t *cr, rb_thread_t *cur_th, rb_ractor_t *recv_r, struct rb_ractor_basket *take_basket) -{ - bool taken; - - RACTOR_LOCK_SELF(cr); - { - // If it hasn't yielded yet or is currently in the process of yielding, sleep more - if (basket_none_p(take_basket) || basket_type_p(take_basket, basket_type_yielding)) { - taken = false; - } - else { - taken = true; // basket type might be, for ex, basket_type_copy if value was copied during yield - } - } - RACTOR_UNLOCK_SELF(cr); - - if (taken) { - RUBY_DEBUG_LOG("taken"); - if (basket_type_p(take_basket, basket_type_deleted)) { - VM_ASSERT(recv_r->sync.outgoing_port_closed); - rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed"); - } - return ractor_basket_accept(take_basket); - } - else { - RUBY_DEBUG_LOG("not taken"); - return Qundef; - } -} - - -#if VM_CHECK_MODE > 0 -static bool -ractor_check_specific_take_basket_lock(rb_ractor_t *r, struct rb_ractor_basket *tb) -{ - bool ret = false; - struct rb_ractor_queue *ts = &r->sync.takers_queue; - - RACTOR_LOCK(r); - { - for (int i=0; i<ts->cnt; i++) { - struct rb_ractor_basket *b = ractor_queue_at(r, ts, i); - if (basket_type_p(b, basket_type_take_basket) && b->p.take.basket == tb) { - ret = true; - break; - } - } - } - RACTOR_UNLOCK(r); - - return ret; -} -#endif - -// cleanup function, cr is unlocked -static void -ractor_take_cleanup(rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *tb) -{ - retry: - if (basket_none_p(tb)) { // not yielded yet - if (!ractor_deregister_take(r, tb)) { - // not in r's takers queue - rb_thread_sleep(0); - goto retry; - } - } - else { - VM_ASSERT(!ractor_check_specific_take_basket_lock(r, tb)); - } -} - -struct take_wait_take_cleanup_data { - rb_ractor_t *r; - struct rb_ractor_basket *tb; -}; - -static void -ractor_wait_take_cleanup(rb_ractor_t *cr, void *ptr) -{ - struct take_wait_take_cleanup_data *data = (struct take_wait_take_cleanup_data *)ptr; - ractor_take_cleanup(cr, data->r, data->tb); -} - -static void -ractor_wait_take(rb_execution_context_t *ec, rb_ractor_t *cr, rb_thread_t *cur_th, rb_ractor_t *r, struct rb_ractor_basket *take_basket) -{ - struct take_wait_take_cleanup_data data = { - .r = r, - .tb = take_basket, - }; - - RACTOR_LOCK_SELF(cr); - { - if (basket_none_p(take_basket) || basket_type_p(take_basket, basket_type_yielding)) { - ractor_sleep_with_cleanup(ec, cr, cur_th, wait_taking, ractor_wait_take_cleanup, &data); - } - } - RACTOR_UNLOCK_SELF(cr); -} - -static VALUE -ractor_take(rb_execution_context_t *ec, rb_ractor_t *recv_r) -{ - RUBY_DEBUG_LOG("from r:%u", rb_ractor_id(recv_r)); - VALUE v; - rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - rb_thread_t *cur_th = rb_ec_thread_ptr(ec); - - struct rb_ractor_basket take_basket = { - .type.e = basket_type_none, - .sender = 0, - }; - - ractor_register_take(cr, cur_th, recv_r, &take_basket, true, NULL, false); - - while (UNDEF_P(v = ractor_try_take(cr, cur_th, recv_r, &take_basket))) { - ractor_wait_take(ec, cr, cur_th, recv_r, &take_basket); - } - - VM_ASSERT(!basket_none_p(&take_basket)); // might be, for ex, basket_type_copy - VM_ASSERT(!ractor_check_specific_take_basket_lock(recv_r, &take_basket)); - - return v; -} - -// Ractor.yield - -static bool -ractor_check_take_basket(rb_ractor_t *cr, struct rb_ractor_queue *rs) -{ - ASSERT_ractor_locking(cr); - - for (int i=0; i<rs->cnt; i++) { - struct rb_ractor_basket *b = ractor_queue_at(cr, rs, i); - if (basket_type_p(b, basket_type_take_basket) && - basket_none_p(b->p.take.basket)) { - return true; - } - } - - return false; -} - -// Find another ractor that is taking from this ractor, so we can yield to it -static bool -ractor_deq_take_basket(rb_ractor_t *cr, struct rb_ractor_queue *rs, struct rb_ractor_basket *b) -{ - ASSERT_ractor_unlocking(cr); - struct rb_ractor_basket *first_tb = NULL; - bool found = false; - - RACTOR_LOCK_SELF(cr); - { - while (ractor_queue_deq(cr, rs, b)) { - if (basket_type_p(b, basket_type_take_basket)) { // some other ractor is taking - struct rb_ractor_basket *tb = b->p.take.basket; - - if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_yielding) == basket_type_none) { - found = true; // payload basket is now "yielding" type - break; - } - else { - ractor_queue_enq(cr, rs, b); - if (first_tb == NULL) first_tb = tb; - struct rb_ractor_basket *head = ractor_queue_head(cr, rs); - VM_ASSERT(head != NULL); - if (basket_type_p(head, basket_type_take_basket) && head->p.take.basket == first_tb) { - break; // loop detected - } - } - } - else { - VM_ASSERT(basket_none_p(b)); - } - } - - if (found && b->p.take.config && !b->p.take.config->oneshot) { - ractor_queue_enq(cr, rs, b); - } - } - RACTOR_UNLOCK_SELF(cr); - - return found; -} - -// Try yielding to a taking ractor -static bool -ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *ts, volatile VALUE obj, VALUE move, bool exc, bool is_will) -{ - // Don't lock yielding ractor at same time as taking ractor. This could deadlock due to timing - // issue because we don't have a lock hierarchy. - ASSERT_ractor_unlocking(cr); - rb_thread_t *cur_th = rb_ec_thread_ptr(ec); - - struct rb_ractor_basket b; - - if (ractor_deq_take_basket(cr, ts, &b)) { // deq a take basket from takers queue of `cr` into `b` - VM_ASSERT(basket_type_p(&b, basket_type_take_basket)); - VM_ASSERT(basket_type_p(b.p.take.basket, basket_type_yielding)); - - rb_ractor_t *tr = RACTOR_PTR(b.sender); // taking ractor - rb_thread_t *tr_th = b.sending_th; // taking thread - struct rb_ractor_basket *tb = b.p.take.basket; // payload basket - enum rb_ractor_basket_type type; - - RUBY_DEBUG_LOG("basket from r:%u", rb_ractor_id(tr)); - - if (is_will) { - type = basket_type_will; // last message - } - else { - enum ruby_tag_type state; - - // begin - EC_PUSH_TAG(ec); - if ((state = EC_EXEC_TAG()) == TAG_NONE) { - // TODO: Ractor local GC - ractor_basket_prepare_contents(obj, move, &obj, &type); - } - EC_POP_TAG(); - // rescue ractor copy/move error, then re-raise - if (state) { - RACTOR_LOCK_SELF(cr); - { - b.p.take.basket->type.e = basket_type_none; - ractor_queue_enq(cr, ts, &b); - } - RACTOR_UNLOCK_SELF(cr); - EC_JUMP_TAG(ec, state); - } - } - - RACTOR_LOCK(tr); - { - VM_ASSERT(basket_type_p(tb, basket_type_yielding)); - // fill atomic - RUBY_DEBUG_LOG("fill %sbasket from r:%u", is_will ? "will " : "", rb_ractor_id(tr)); - ractor_basket_fill_(cr, cur_th, tb, obj, exc); // fill the take basket payload - if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_yielding, type) != basket_type_yielding) { - rb_bug("unreachable"); - } - ractor_wakeup(tr, tr_th, wait_taking, wakeup_by_yield); - } - RACTOR_UNLOCK(tr); - - return true; - } - else if (cr->sync.outgoing_port_closed) { - rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed"); - } - else { - RUBY_DEBUG_LOG("no take basket"); - return false; - } -} - -static void -ractor_wait_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *ts) -{ - rb_thread_t *cur_th = rb_ec_thread_ptr(ec); - RACTOR_LOCK_SELF(cr); - { - while (!ractor_check_take_basket(cr, ts) && !cr->sync.outgoing_port_closed) { - ractor_sleep(ec, cr, cur_th, wait_yielding); - } - } - RACTOR_UNLOCK_SELF(cr); -} - -// In order to yield, we wait until our takers queue has at least one element. Then, we wakeup a taker. -static VALUE -ractor_yield(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE obj, VALUE move) -{ - struct rb_ractor_queue *ts = &cr->sync.takers_queue; - - while (!ractor_try_yield(ec, cr, ts, obj, move, false, false)) { - ractor_wait_yield(ec, cr, ts); - } - - return Qnil; -} - -// Ractor::Selector - -struct rb_ractor_selector { - rb_ractor_t *r; - struct rb_ractor_basket take_basket; - st_table *take_ractors; // rb_ractor_t * => (struct rb_ractor_selector_take_config *) -}; - -static int -ractor_selector_mark_ractors_i(st_data_t key, st_data_t value, st_data_t data) -{ - const rb_ractor_t *r = (rb_ractor_t *)key; - rb_gc_mark(r->pub.self); - return ST_CONTINUE; -} - -static void -ractor_selector_mark(void *ptr) -{ - struct rb_ractor_selector *s = ptr; - - if (s->take_ractors) { - st_foreach(s->take_ractors, ractor_selector_mark_ractors_i, 0); - } - - switch (s->take_basket.type.e) { - case basket_type_ref: - case basket_type_copy: - case basket_type_move: - case basket_type_will: - rb_gc_mark(s->take_basket.sender); - rb_gc_mark(s->take_basket.p.send.v); - break; - default: - break; - } -} - -static int -ractor_selector_release_i(st_data_t key, st_data_t val, st_data_t data) -{ - struct rb_ractor_selector *s = (struct rb_ractor_selector *)data; - struct rb_ractor_selector_take_config *config = (struct rb_ractor_selector_take_config *)val; - - if (!config->closed) { - ractor_deregister_take((rb_ractor_t *)key, &s->take_basket); - } - free(config); - return ST_CONTINUE; -} - -static void -ractor_selector_free(void *ptr) -{ - struct rb_ractor_selector *s = ptr; - st_foreach(s->take_ractors, ractor_selector_release_i, (st_data_t)s); - st_free_table(s->take_ractors); - ruby_xfree(ptr); -} - -static size_t -ractor_selector_memsize(const void *ptr) -{ - const struct rb_ractor_selector *s = ptr; - return sizeof(struct rb_ractor_selector) + - st_memsize(s->take_ractors) + - s->take_ractors->num_entries * sizeof(struct rb_ractor_selector_take_config); -} - -static const rb_data_type_t ractor_selector_data_type = { - "ractor/selector", - { - ractor_selector_mark, - ractor_selector_free, - ractor_selector_memsize, - NULL, // update - }, - 0, 0, RUBY_TYPED_FREE_IMMEDIATELY, -}; - -static struct rb_ractor_selector * -RACTOR_SELECTOR_PTR(VALUE selv) -{ - VM_ASSERT(rb_typeddata_is_kind_of(selv, &ractor_selector_data_type)); - - return (struct rb_ractor_selector *)DATA_PTR(selv); -} - -// Ractor::Selector.new - -static VALUE -ractor_selector_create(VALUE klass) -{ - struct rb_ractor_selector *s; - VALUE selv = TypedData_Make_Struct(klass, struct rb_ractor_selector, &ractor_selector_data_type, s); - s->take_basket.type.e = basket_type_reserved; - s->take_ractors = st_init_numtable(); // ractor (ptr) -> take_config - return selv; -} - -// Ractor::Selector#add(r) - -/* - * call-seq: - * add(ractor) -> ractor - * - * Adds _ractor_ to +self+. Raises an exception if _ractor_ is already added. - * Returns _ractor_. - */ -static VALUE -ractor_selector_add(VALUE selv, VALUE rv) -{ - if (!rb_ractor_p(rv)) { - rb_raise(rb_eArgError, "Not a ractor object"); - } - - rb_ractor_t *r = RACTOR_PTR(rv); - struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv); - - if (st_lookup(s->take_ractors, (st_data_t)r, NULL)) { - rb_raise(rb_eArgError, "already added"); - } - - struct rb_ractor_selector_take_config *config = malloc(sizeof(struct rb_ractor_selector_take_config)); - VM_ASSERT(config != NULL); - config->closed = false; - config->oneshot = false; - - if (ractor_register_take(GET_RACTOR(), GET_THREAD(), r, &s->take_basket, false, config, true)) { - st_insert(s->take_ractors, (st_data_t)r, (st_data_t)config); - } - - return rv; -} - -// Ractor::Selector#remove(r) - -/* call-seq: - * remove(ractor) -> ractor - * - * Removes _ractor_ from +self+. Raises an exception if _ractor_ is not added. - * Returns the removed _ractor_. - */ -static VALUE -ractor_selector_remove(VALUE selv, VALUE rv) -{ - if (!rb_ractor_p(rv)) { - rb_raise(rb_eArgError, "Not a ractor object"); - } - - rb_ractor_t *r = RACTOR_PTR(rv); - struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv); - - RUBY_DEBUG_LOG("r:%u", rb_ractor_id(r)); - - if (!st_lookup(s->take_ractors, (st_data_t)r, NULL)) { - rb_raise(rb_eArgError, "not added yet"); - } - - ractor_deregister_take(r, &s->take_basket); - struct rb_ractor_selector_take_config *config; - st_delete(s->take_ractors, (st_data_t *)&r, (st_data_t *)&config); - free(config); - - return rv; -} - -// Ractor::Selector#clear - -struct ractor_selector_clear_data { - VALUE selv; - rb_execution_context_t *ec; -}; - -static int -ractor_selector_clear_i(st_data_t key, st_data_t val, st_data_t data) -{ - VALUE selv = (VALUE)data; - rb_ractor_t *r = (rb_ractor_t *)key; - ractor_selector_remove(selv, r->pub.self); - return ST_CONTINUE; -} - -/* - * call-seq: - * clear -> self - * - * Removes all ractors from +self+. Raises +self+. - */ -static VALUE -ractor_selector_clear(VALUE selv) -{ - struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv); - - st_foreach(s->take_ractors, ractor_selector_clear_i, (st_data_t)selv); - st_clear(s->take_ractors); - return selv; -} - -/* - * call-seq: - * empty? -> true or false - * - * Returns +true+ if no ractor is added. - */ -static VALUE -ractor_selector_empty_p(VALUE selv) -{ - struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv); - return s->take_ractors->num_entries == 0 ? Qtrue : Qfalse; -} - -static int -ractor_selector_wait_i(st_data_t key, st_data_t val, st_data_t dat) -{ - rb_ractor_t *r = (rb_ractor_t *)key; - struct rb_ractor_basket *tb = (struct rb_ractor_basket *)dat; - int ret; - - if (!basket_none_p(tb)) { - RUBY_DEBUG_LOG("already taken:%s", basket_type_name(tb->type.e)); - return ST_STOP; - } - - RACTOR_LOCK(r); - { - if (basket_type_p(&r->sync.will_basket, basket_type_will)) { - RUBY_DEBUG_LOG("r:%u has will", rb_ractor_id(r)); - - if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_will) == basket_type_none) { - ractor_take_will(r, tb); - ret = ST_STOP; - } - else { - RUBY_DEBUG_LOG("has will, but already taken (%s)", basket_type_name(tb->type.e)); - ret = ST_CONTINUE; - } - } - else if (r->sync.outgoing_port_closed) { - RUBY_DEBUG_LOG("r:%u is closed", rb_ractor_id(r)); - - if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_deleted) == basket_type_none) { - tb->sender = r->pub.self; - ret = ST_STOP; - } - else { - RUBY_DEBUG_LOG("closed, but already taken (%s)", basket_type_name(tb->type.e)); - ret = ST_CONTINUE; - } - } - else { - RUBY_DEBUG_LOG("wakeup r:%u", rb_ractor_id(r)); - ractor_wakeup(r, NULL, wait_yielding, wakeup_by_take); - ret = ST_CONTINUE; - } - } - RACTOR_UNLOCK(r); - - return ret; -} - -// Ractor::Selector#wait - -// cleanup function, cr is unlocked -static void -ractor_selector_wait_cleanup(rb_ractor_t *cr, void *ptr) -{ - struct rb_ractor_basket *tb = (struct rb_ractor_basket *)ptr; - - RACTOR_LOCK_SELF(cr); - { - while (basket_type_p(tb, basket_type_yielding)) { - RACTOR_UNLOCK_SELF(cr); - { - rb_thread_sleep(0); - } - RACTOR_LOCK_SELF(cr); - } - // if tb->type is not none, taking is succeeded, but interruption ignore it unfortunately. - tb->type.e = basket_type_reserved; - } - RACTOR_UNLOCK_SELF(cr); -} - -/* :nodoc: */ -static VALUE -ractor_selector__wait(VALUE selv, VALUE do_receivev, VALUE do_yieldv, VALUE yield_value, VALUE move) -{ - rb_execution_context_t *ec = GET_EC(); - struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv); - struct rb_ractor_basket *tb = &s->take_basket; - struct rb_ractor_basket taken_basket; - rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - rb_thread_t *cur_th = rb_ec_thread_ptr(ec); - bool do_receive = !!RTEST(do_receivev); - bool do_yield = !!RTEST(do_yieldv); - VALUE ret_v, ret_r; - enum rb_ractor_wait_status wait_status; - struct rb_ractor_queue *rq = &cr->sync.recv_queue; - struct rb_ractor_queue *ts = &cr->sync.takers_queue; - - RUBY_DEBUG_LOG("start"); - - retry: - RUBY_DEBUG_LOG("takers:%ld", s->take_ractors->num_entries); - - // setup wait_status - wait_status = wait_none; - if (s->take_ractors->num_entries > 0) wait_status |= wait_taking; - if (do_receive) wait_status |= wait_receiving; - if (do_yield) wait_status |= wait_yielding; - - RUBY_DEBUG_LOG("wait:%s", wait_status_str(wait_status)); - - if (wait_status == wait_none) { - rb_raise(rb_eRactorError, "no taking ractors"); - } - - // check recv_queue - if (do_receive && !UNDEF_P(ret_v = ractor_try_receive(ec, cr, rq))) { - ret_r = ID2SYM(rb_intern("receive")); - goto success; - } - - // check takers - if (do_yield && ractor_try_yield(ec, cr, ts, yield_value, move, false, false)) { - ret_v = Qnil; - ret_r = ID2SYM(rb_intern("yield")); - goto success; - } - - // check take_basket - VM_ASSERT(basket_type_p(&s->take_basket, basket_type_reserved)); - s->take_basket.type.e = basket_type_none; - // kick all take target ractors - st_foreach(s->take_ractors, ractor_selector_wait_i, (st_data_t)tb); - - RACTOR_LOCK_SELF(cr); - { - retry_waiting: - while (1) { - if (!basket_none_p(tb)) { - RUBY_DEBUG_LOG("taken:%s from r:%u", basket_type_name(tb->type.e), - tb->sender ? rb_ractor_id(RACTOR_PTR(tb->sender)) : 0); - break; - } - if (do_receive && !ractor_queue_empty_p(cr, rq)) { - RUBY_DEBUG_LOG("can receive (%d)", rq->cnt); - break; - } - if (do_yield && ractor_check_take_basket(cr, ts)) { - RUBY_DEBUG_LOG("can yield"); - break; - } - - ractor_sleep_with_cleanup(ec, cr, cur_th, wait_status, ractor_selector_wait_cleanup, tb); - } - - taken_basket = *tb; - - // ensure - // tb->type.e = basket_type_reserved # do it atomic in the following code - if (taken_basket.type.e == basket_type_yielding || - RUBY_ATOMIC_CAS(tb->type.atomic, taken_basket.type.e, basket_type_reserved) != taken_basket.type.e) { - - if (basket_type_p(tb, basket_type_yielding)) { - RACTOR_UNLOCK_SELF(cr); - { - rb_thread_sleep(0); - } - RACTOR_LOCK_SELF(cr); - } - goto retry_waiting; - } - } - RACTOR_UNLOCK_SELF(cr); - - // check the taken result - switch (taken_basket.type.e) { - case basket_type_none: - VM_ASSERT(do_receive || do_yield); - goto retry; - case basket_type_yielding: - rb_bug("unreachable"); - case basket_type_deleted: { - ractor_selector_remove(selv, taken_basket.sender); - - rb_ractor_t *r = RACTOR_PTR(taken_basket.sender); - if (ractor_take_will_lock(r, &taken_basket)) { - RUBY_DEBUG_LOG("has_will"); - } - else { - RUBY_DEBUG_LOG("no will"); - // rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed"); - // remove and retry wait - goto retry; - } - break; - } - case basket_type_will: - // no more messages - ractor_selector_remove(selv, taken_basket.sender); - break; - default: - break; - } - - RUBY_DEBUG_LOG("taken_basket:%s", basket_type_name(taken_basket.type.e)); - - ret_v = ractor_basket_accept(&taken_basket); - ret_r = taken_basket.sender; - success: - return rb_ary_new_from_args(2, ret_r, ret_v); -} - -/* - * call-seq: - * wait(receive: false, yield_value: undef, move: false) -> [ractor, value] - * - * Waits until any ractor in _selector_ can be active. - */ -static VALUE -ractor_selector_wait(int argc, VALUE *argv, VALUE selector) -{ - VALUE options; - ID keywords[3]; - VALUE values[3]; - - keywords[0] = rb_intern("receive"); - keywords[1] = rb_intern("yield_value"); - keywords[2] = rb_intern("move"); - - rb_scan_args(argc, argv, "0:", &options); - rb_get_kwargs(options, keywords, 0, numberof(values), values); - return ractor_selector__wait(selector, - values[0] == Qundef ? Qfalse : RTEST(values[0]), - values[1] != Qundef, values[1], values[2]); -} - -static VALUE -ractor_selector_new(int argc, VALUE *ractors, VALUE klass) -{ - VALUE selector = ractor_selector_create(klass); - - for (int i=0; i<argc; i++) { - ractor_selector_add(selector, ractors[i]); - } - - return selector; -} - -static VALUE -ractor_select_internal(rb_execution_context_t *ec, VALUE self, VALUE ractors, VALUE do_receive, VALUE do_yield, VALUE yield_value, VALUE move) -{ - VALUE selector = ractor_selector_new(RARRAY_LENINT(ractors), (VALUE *)RARRAY_CONST_PTR(ractors), rb_cRactorSelector); - VALUE result; - int state; - - EC_PUSH_TAG(ec); - if ((state = EC_EXEC_TAG()) == TAG_NONE) { - result = ractor_selector__wait(selector, do_receive, do_yield, yield_value, move); - } - EC_POP_TAG(); - if (state != TAG_NONE) { - // ensure - ractor_selector_clear(selector); - - // jump - EC_JUMP_TAG(ec, state); - } - - RB_GC_GUARD(ractors); - return result; -} - -// Ractor#close_incoming - -static VALUE -ractor_close_incoming(rb_execution_context_t *ec, rb_ractor_t *r) -{ - VALUE prev; - rb_thread_t *r_th = NULL; - if (r == rb_ec_ractor_ptr(ec)) { - r_th = rb_ec_thread_ptr(ec); - } - - RACTOR_LOCK(r); - { - if (!r->sync.incoming_port_closed) { - prev = Qfalse; - r->sync.incoming_port_closed = true; - if (ractor_wakeup(r, r_th, wait_receiving, wakeup_by_close)) { - VM_ASSERT(ractor_queue_empty_p(r, &r->sync.recv_queue)); - RUBY_DEBUG_LOG("cancel receiving"); - } - } - else { - prev = Qtrue; - } - } - RACTOR_UNLOCK(r); - return prev; -} - -// Ractor#close_outgoing - -static VALUE -ractor_close_outgoing(rb_execution_context_t *ec, rb_ractor_t *r) -{ - VALUE prev; - - RACTOR_LOCK(r); - { - struct rb_ractor_queue *ts = &r->sync.takers_queue; - rb_ractor_t *tr; - struct rb_ractor_basket b; - - if (!r->sync.outgoing_port_closed) { - prev = Qfalse; - r->sync.outgoing_port_closed = true; - } - else { - VM_ASSERT(ractor_queue_empty_p(r, ts)); - prev = Qtrue; - } - - // wakeup all taking ractors - while (ractor_queue_deq(r, ts, &b)) { - if (basket_type_p(&b, basket_type_take_basket)) { - tr = RACTOR_PTR(b.sender); - rb_thread_t *tr_th = b.sending_th; - struct rb_ractor_basket *tb = b.p.take.basket; - - if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_yielding) == basket_type_none) { - b.p.take.basket->sender = r->pub.self; - if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_yielding, basket_type_deleted) != basket_type_yielding) { - rb_bug("unreachable"); - } - RUBY_DEBUG_LOG("set delete for r:%u", rb_ractor_id(RACTOR_PTR(b.sender))); - } - - if (b.p.take.config) { - b.p.take.config->closed = true; - } - - // TODO: deadlock-able? - RACTOR_LOCK(tr); - { - ractor_wakeup(tr, tr_th, wait_taking, wakeup_by_close); - } - RACTOR_UNLOCK(tr); - } - } - - // raising yielding Ractor - ractor_wakeup(r, NULL, wait_yielding, wakeup_by_close); - - VM_ASSERT(ractor_queue_empty_p(r, ts)); - } - RACTOR_UNLOCK(r); - return prev; -} // creation/termination @@ -2175,9 +440,7 @@ rb_ractor_terminate_atfork(rb_vm_t *vm, rb_ractor_t *r) rb_gc_ractor_cache_free(r->newobj_cache); r->newobj_cache = NULL; r->status_ = ractor_terminated; - r->sync.outgoing_port_closed = true; - r->sync.incoming_port_closed = true; - r->sync.will_basket.type.e = basket_type_none; } #endif @@ -2194,15 +457,7 @@ rb_ractor_living_threads_init(rb_ractor_t *r) static void ractor_init(rb_ractor_t *r, VALUE name, VALUE loc) { - ractor_queue_setup(&r->sync.recv_queue); - ractor_queue_setup(&r->sync.takers_queue); - rb_native_mutex_initialize(&r->sync.lock); - rb_native_cond_initialize(&r->barrier_wait_cond); - -#ifdef RUBY_THREAD_WIN32_H - rb_native_cond_initialize(&r->barrier_wait_cond); -#endif - ccan_list_head_init(&r->sync.wait.waiting_threads); // thread management rb_thread_sched_init(&r->threads.sched, false); @@ -2255,69 +510,39 @@ ractor_create(rb_execution_context_t *ec, VALUE self, VALUE loc, VALUE name, VAL return rv; } static VALUE ractor_create_func(VALUE klass, VALUE loc, VALUE name, VALUE args, rb_block_call_func_t func) { VALUE block = rb_proc_new(func, Qnil); return ractor_create(rb_current_ec_noinline(), klass, loc, name, args, block); } static void -ractor_yield_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE v, bool exc) { - if (cr->sync.outgoing_port_closed) { - return; - } - - ASSERT_ractor_unlocking(cr); - - struct rb_ractor_queue *ts = &cr->sync.takers_queue; - rb_thread_t *cur_th = rb_ec_thread_ptr(ec); - - retry: - if (ractor_try_yield(ec, cr, ts, v, Qfalse, exc, true)) { - // OK. - } - else { - bool retry = false; - RACTOR_LOCK(cr); - { - if (!ractor_check_take_basket(cr, ts)) { - VM_ASSERT(cur_th->ractor_waiting.wait_status == wait_none); - RUBY_DEBUG_LOG("leave a will"); - ractor_basket_fill_will(cr, cur_th, &cr->sync.will_basket, v, exc); - } - else { - RUBY_DEBUG_LOG("rare timing!"); - retry = true; // another ractor is waiting for the yield. - } - } - RACTOR_UNLOCK(cr); - - if (retry) goto retry; - } } void rb_ractor_atexit(rb_execution_context_t *ec, VALUE result) { rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - ractor_yield_atexit(ec, cr, result, false); } void rb_ractor_atexit_exception(rb_execution_context_t *ec) { rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - ractor_yield_atexit(ec, cr, ec->errinfo, true); } void rb_ractor_teardown(rb_execution_context_t *ec) { rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - ractor_close_incoming(ec, cr); - ractor_close_outgoing(ec, cr); // sync with rb_ractor_terminate_interrupt_main_thread() RB_VM_LOCKING() { @@ -2330,7 +555,7 @@ void rb_ractor_receive_parameters(rb_execution_context_t *ec, rb_ractor_t *r, int len, VALUE *ptr) { for (int i=0; i<len; i++) { - ptr[i] = ractor_receive(ec, r); } } @@ -2339,7 +564,7 @@ rb_ractor_send_parameters(rb_execution_context_t *ec, rb_ractor_t *r, VALUE args { int len = RARRAY_LENINT(args); for (int i=0; i<len; i++) { - ractor_send(ec, r, RARRAY_AREF(args, i), false); } } @@ -2642,35 +867,6 @@ ractor_moved_missing(int argc, VALUE *argv, VALUE self) rb_raise(rb_eRactorMovedError, "can not send any methods to a moved object"); } -#ifndef USE_RACTOR_SELECTOR -#define USE_RACTOR_SELECTOR 0 -#endif - -RUBY_SYMBOL_EXPORT_BEGIN -void rb_init_ractor_selector(void); -RUBY_SYMBOL_EXPORT_END - -/* - * Document-class: Ractor::Selector - * :nodoc: currently - * - * Selects multiple Ractors to be activated. - */ -void -rb_init_ractor_selector(void) -{ - rb_cRactorSelector = rb_define_class_under(rb_cRactor, "Selector", rb_cObject); - rb_undef_alloc_func(rb_cRactorSelector); - - rb_define_singleton_method(rb_cRactorSelector, "new", ractor_selector_new , -1); - rb_define_method(rb_cRactorSelector, "add", ractor_selector_add, 1); - rb_define_method(rb_cRactorSelector, "remove", ractor_selector_remove, 1); - rb_define_method(rb_cRactorSelector, "clear", ractor_selector_clear, 0); - rb_define_method(rb_cRactorSelector, "empty?", ractor_selector_empty_p, 0); - rb_define_method(rb_cRactorSelector, "wait", ractor_selector_wait, -1); - rb_define_method(rb_cRactorSelector, "_wait", ractor_selector__wait, 4); -} - /* * Document-class: Ractor::ClosedError * @@ -2791,11 +987,7 @@ Init_Ractor(void) rb_define_method(rb_cRactorMovedObject, "instance_eval", ractor_moved_missing, -1); rb_define_method(rb_cRactorMovedObject, "instance_exec", ractor_moved_missing, -1); - // internal - -#if USE_RACTOR_SELECTOR - rb_init_ractor_selector(); -#endif } void @@ -4028,91 +2220,10 @@ ractor_local_value_store_if_absent(rb_execution_context_t *ec, VALUE self, VALUE return rb_mutex_synchronize(cr->local_storage_store_lock, ractor_local_value_store_i, (VALUE)&data); } -// Ractor::Channel (emulate with Ractor) - -typedef rb_ractor_t rb_ractor_channel_t; - -static VALUE -ractor_channel_func(RB_BLOCK_CALL_FUNC_ARGLIST(y, c)) -{ - rb_execution_context_t *ec = GET_EC(); - rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - - while (1) { - int state; - - EC_PUSH_TAG(ec); - if ((state = EC_EXEC_TAG()) == TAG_NONE) { - VALUE obj = ractor_receive(ec, cr); - ractor_yield(ec, cr, obj, Qfalse); - } - EC_POP_TAG(); - - if (state) { - // ignore the error - break; - } - } - - return Qnil; -} - -static VALUE -rb_ractor_channel_new(void) -{ -#if 0 - return rb_funcall(rb_const_get(rb_cRactor, rb_intern("Channel")), rb_intern("new"), 0); -#else - // class Channel - // def self.new - // Ractor.new do # func body - // while true - // obj = Ractor.receive - // Ractor.yield obj - // end - // rescue Ractor::ClosedError - // nil - // end - // end - // end - - return ractor_create_func(rb_cRactor, Qnil, rb_str_new2("Ractor/channel"), rb_ary_new(), ractor_channel_func); -#endif -} - -static VALUE -rb_ractor_channel_yield(rb_execution_context_t *ec, VALUE vch, VALUE obj) -{ - VM_ASSERT(ec == rb_current_ec_noinline()); - rb_ractor_channel_t *ch = RACTOR_PTR(vch); - - ractor_send(ec, (rb_ractor_t *)ch, obj, Qfalse); - return Qnil; -} - -static VALUE -rb_ractor_channel_take(rb_execution_context_t *ec, VALUE vch) -{ - VM_ASSERT(ec == rb_current_ec_noinline()); - rb_ractor_channel_t *ch = RACTOR_PTR(vch); - - return ractor_take(ec, (rb_ractor_t *)ch); -} - -static VALUE -rb_ractor_channel_close(rb_execution_context_t *ec, VALUE vch) -{ - VM_ASSERT(ec == rb_current_ec_noinline()); - rb_ractor_channel_t *ch = RACTOR_PTR(vch); - - ractor_close_incoming(ec, (rb_ractor_t *)ch); - return ractor_close_outgoing(ec, (rb_ractor_t *)ch); -} - // Ractor#require struct cross_ractor_require { - VALUE ch; VALUE result; VALUE exception; @@ -4179,9 +2290,8 @@ ractor_require_protect(struct cross_ractor_require *crr, VALUE (*func)(VALUE)) rb_rescue2(require_result_copy_body, (VALUE)crr, require_result_copy_resuce, (VALUE)crr, rb_eException, 0); - rb_ractor_channel_yield(GET_EC(), crr->ch, Qtrue); return Qnil; - } static VALUE @@ -4197,7 +2307,7 @@ rb_ractor_require(VALUE feature) // TODO: make feature shareable struct cross_ractor_require crr = { .feature = feature, // TODO: ractor - .ch = rb_ractor_channel_new(), .result = Qundef, .exception = Qundef, }; @@ -4207,8 +2317,8 @@ rb_ractor_require(VALUE feature) rb_ractor_interrupt_exec(main_r, ractore_require_func, &crr, 0); // wait for require done - rb_ractor_channel_take(ec, crr.ch); - rb_ractor_channel_close(ec, crr.ch); if (crr.exception != Qundef) { ractor_reset_belonging(crr.exception); @@ -4248,7 +2358,7 @@ rb_ractor_autoload_load(VALUE module, ID name) struct cross_ractor_require crr = { .module = module, .name = name, - .ch = rb_ractor_channel_new(), .result = Qundef, .exception = Qundef, }; @@ -4258,8 +2368,8 @@ rb_ractor_autoload_load(VALUE module, ID name) rb_ractor_interrupt_exec(main_r, ractor_autoload_load_func, &crr, 0); // wait for require done - rb_ractor_channel_take(ec, crr.ch); - rb_ractor_channel_close(ec, crr.ch); if (crr.exception != Qundef) { rb_exc_raise(crr.exception); @@ -4,7 +4,7 @@ # # # The simplest ractor # r = Ractor.new {puts "I am in Ractor!"} -# r.take # wait for it to finish # # Here, "I am in Ractor!" is printed # # Ractors do not share all objects with each other. There are two main benefits to this: across ractors, thread-safety @@ -36,53 +36,11 @@ # puts "I am in Ractor! a=#{a_in_ractor}" # end # r.send(a) # pass it -# r.take # # Here, "I am in Ractor! a=1" is printed # -# There are two pairs of methods for sending/receiving messages: -# -# * Ractor#send and Ractor.receive for when the _sender_ knows the receiver (push); -# * Ractor.yield and Ractor#take for when the _receiver_ knows the sender (pull); -# # In addition to that, any arguments passed to Ractor.new are passed to the block and available there -# as if received by Ractor.receive, and the last block value is sent outside of the -# ractor as if sent by Ractor.yield. -# -# A little demonstration of a classic ping-pong: -# -# server = Ractor.new(name: "server") do -# puts "Server starts: #{self.inspect}" -# puts "Server sends: ping" -# Ractor.yield 'ping' # The server doesn't know the receiver and sends to whoever interested -# received = Ractor.receive # The server doesn't know the sender and receives from whoever sent -# puts "Server received: #{received}" -# end -# -# client = Ractor.new(server) do |srv| # The server is sent to the client, and available as srv -# puts "Client starts: #{self.inspect}" -# received = srv.take # The client takes a message from the server -# puts "Client received from " \ -# "#{srv.inspect}: #{received}" -# puts "Client sends to " \ -# "#{srv.inspect}: pong" -# srv.send 'pong' # The client sends a message to the server -# end -# -# [client, server].each(&:take) # Wait until they both finish -# -# This will output something like: -# -# Server starts: #<Ractor:#2 server test.rb:1 running> -# Server sends: ping -# Client starts: #<Ractor:#3 test.rb:8 running> -# Client received from #<Ractor:#2 server test.rb:1 blocking>: ping -# Client sends to #<Ractor:#2 server test.rb:1 blocking>: pong -# Server received: pong -# -# Ractors receive their messages via the <em>incoming port</em>, and send them -# to the <em>outgoing port</em>. Either one can be disabled with Ractor#close_incoming and -# Ractor#close_outgoing, respectively. When a ractor terminates, its ports are closed -# automatically. # # == Shareable and unshareable objects # @@ -307,130 +265,52 @@ class Ractor # # call-seq: - # Ractor.select(*ractors, [yield_value:, move: false]) -> [ractor or symbol, obj] - # - # Wait for any ractor to have something in its outgoing port, read from this ractor, and - # then return that ractor and the object received. - # - # r1 = Ractor.new {Ractor.yield 'from 1'} - # r2 = Ractor.new {Ractor.yield 'from 2'} - # - # r, obj = Ractor.select(r1, r2) - # - # puts "received #{obj.inspect} from #{r.inspect}" - # # Prints: received "from 1" from #<Ractor:#2 test.rb:1 running> - # # But could just as well print "from r2" here, either prints could be first. - # - # If one of the given ractors is the current ractor, and it is selected, +r+ will contain - # the +:receive+ symbol instead of the ractor object. - # - # r1 = Ractor.new(Ractor.current) do |main| - # main.send 'to main' - # Ractor.yield 'from 1' - # end - # r2 = Ractor.new do - # Ractor.yield 'from 2' - # end - # - # r, obj = Ractor.select(r1, r2, Ractor.current) - # puts "received #{obj.inspect} from #{r.inspect}" - # # Could print: received "to main" from :receive - # - # If +yield_value+ is provided, that value may be yielded if another ractor is calling #take. - # In this case, the pair <tt>[:yield, nil]</tt> is returned: - # - # r1 = Ractor.new(Ractor.current) do |main| - # puts "Received from main: #{main.take}" - # end - # - # puts "Trying to select" - # r, obj = Ractor.select(r1, Ractor.current, yield_value: 123) - # wait - # puts "Received #{obj.inspect} from #{r.inspect}" - # - # This will print: - # - # Trying to select - # Received from main: 123 - # Received nil from :yield - # - # +move+ boolean flag defines whether yielded value will be copied (default) or moved. - def self.select(*ractors, yield_value: yield_unspecified = true, move: false) - raise ArgumentError, 'specify at least one ractor or `yield_value`' if yield_unspecified && ractors.empty? - - if ractors.delete Ractor.current - do_receive = true - else - do_receive = false end - __builtin_ractor_select_internal ractors, do_receive, !yield_unspecified, yield_value, move end # # call-seq: - # Ractor.receive -> msg - # - # Receive a message from the incoming port of the current ractor (which was - # sent there by #send from another ractor). - # - # r = Ractor.new do - # v1 = Ractor.receive - # puts "Received: #{v1}" - # end - # r.send('message1') - # r.take - # # Here will be printed: "Received: message1" - # - # Alternatively, the private instance method +receive+ may be used: - # - # r = Ractor.new do - # v1 = receive - # puts "Received: #{v1}" - # end - # r.send('message1') - # r.take - # # This prints: "Received: message1" - # - # The method blocks if the queue is empty. - # - # r = Ractor.new do - # puts "Before first receive" - # v1 = Ractor.receive - # puts "Received: #{v1}" - # v2 = Ractor.receive - # puts "Received: #{v2}" - # end - # wait - # puts "Still not received" - # r.send('message1') - # wait - # puts "Still received only one" - # r.send('message2') - # r.take - # - # Output: - # - # Before first receive - # Still not received - # Received: message1 - # Still received only one - # Received: message2 - # - # If close_incoming was called on the ractor, the method raises Ractor::ClosedError - # if there are no more messages in the incoming queue: - # - # Ractor.new do - # close_incoming - # receive - # end - # wait - # # in `receive': The incoming port is already closed => #<Ractor:#2 test.rb:1 running> (Ractor::ClosedError) # def self.receive - __builtin_cexpr! %q{ - ractor_receive(ec, rb_ec_ractor_ptr(ec)) - } end class << self @@ -439,280 +319,21 @@ class Ractor # same as Ractor.receive private def receive - __builtin_cexpr! %q{ - ractor_receive(ec, rb_ec_ractor_ptr(ec)) - } end alias recv receive # # call-seq: - # Ractor.receive_if {|msg| block } -> msg - # - # Receive only a specific message. - # - # Instead of Ractor.receive, Ractor.receive_if can be given a pattern (or any - # filter) in a block and you can choose the messages to accept that are available in - # your ractor's incoming queue. - # - # r = Ractor.new do - # p Ractor.receive_if{|msg| msg.match?(/foo/)} #=> "foo3" - # p Ractor.receive_if{|msg| msg.match?(/bar/)} #=> "bar1" - # p Ractor.receive_if{|msg| msg.match?(/baz/)} #=> "baz2" - # end - # r << "bar1" - # r << "baz2" - # r << "foo3" - # r.take - # - # This will output: - # - # foo3 - # bar1 - # baz2 - # - # If the block returns a truthy value, the message is removed from the incoming queue - # and returned. - # Otherwise, the message remains in the incoming queue and the next messages are checked - # by the given block. - # - # If there are no messages left in the incoming queue, the method will - # block until new messages arrive. - # - # If the block is escaped by break/return/exception/throw, the message is removed from - # the incoming queue as if a truthy value had been returned. - # - # r = Ractor.new do - # val = Ractor.receive_if{|msg| msg.is_a?(Array)} - # puts "Received successfully: #{val}" - # end - # - # r.send(1) - # r.send('test') - # wait - # puts "2 non-matching sent, nothing received" - # r.send([1, 2, 3]) - # wait - # - # Prints: - # - # 2 non-matching sent, nothing received - # Received successfully: [1, 2, 3] - # - # Note that you can not call receive/receive_if in the given block recursively. - # You should not do any tasks in the block other than message filtration. - # - # Ractor.current << true - # Ractor.receive_if{|msg| Ractor.receive} - # #=> `receive': can not call receive/receive_if recursively (Ractor::Error) - # - def self.receive_if &b - Primitive.ractor_receive_if b - end - - # same as Ractor.receive_if - private def receive_if &b - Primitive.ractor_receive_if b - end - - # - # call-seq: - # ractor.send(msg, move: false) -> self - # - # Send a message to a Ractor's incoming queue to be accepted by Ractor.receive. - # - # r = Ractor.new do - # value = Ractor.receive - # puts "Received #{value}" - # end - # r.send 'message' - # # Prints: "Received: message" - # - # The method is non-blocking (will return immediately even if the ractor is not ready - # to receive anything): - # - # r = Ractor.new {sleep(5)} - # r.send('test') - # puts "Sent successfully" - # # Prints: "Sent successfully" immediately - # - # An attempt to send to a ractor which already finished its execution will raise Ractor::ClosedError. - # - # r = Ractor.new {} - # r.take - # p r - # # "#<Ractor:#6 (irb):23 terminated>" - # r.send('test') - # # Ractor::ClosedError (The incoming-port is already closed) - # - # If close_incoming was called on the ractor, the method also raises Ractor::ClosedError. - # - # r = Ractor.new do - # sleep(500) - # receive - # end - # r.close_incoming - # r.send('test') - # # Ractor::ClosedError (The incoming-port is already closed) - # # The error is raised immediately, not when the ractor tries to receive - # - # If the +obj+ is unshareable, by default it will be copied into the receiving ractor by deep cloning. - # If <tt>move: true</tt> is passed, the object is _moved_ into the receiving ractor and becomes - # inaccessible to the sender. - # - # r = Ractor.new {puts "Received: #{receive}"} - # msg = 'message' - # r.send(msg, move: true) - # r.take - # p msg - # - # This prints: - # - # Received: message - # in `p': undefined method `inspect' for #<Ractor::MovedObject:0x000055c99b9b69b8> - # - # All references to the object and its parts will become invalid to the sender. - # - # r = Ractor.new {puts "Received: #{receive}"} - # s = 'message' - # ary = [s] - # copy = ary.dup - # r.send(ary, move: true) - # - # s.inspect - # # Ractor::MovedError (can not send any methods to a moved object) - # ary.class - # # Ractor::MovedError (can not send any methods to a moved object) - # copy.class - # # => Array, it is different object - # copy[0].inspect - # # Ractor::MovedError (can not send any methods to a moved object) - # # ...but its item was still a reference to `s`, which was moved - # - # If the object is shareable, <tt>move: true</tt> has no effect on it: - # - # r = Ractor.new {puts "Received: #{receive}"} - # s = 'message'.freeze - # r.send(s, move: true) - # s.inspect #=> "message", still available # - def send(obj, move: false) - __builtin_cexpr! %q{ - ractor_send(ec, RACTOR_PTR(self), obj, move) - } end alias << send - # - # call-seq: - # Ractor.yield(msg, move: false) -> nil - # - # Send a message to the current ractor's outgoing port to be accepted by #take. - # - # r = Ractor.new {Ractor.yield 'Hello from ractor'} - # puts r.take - # # Prints: "Hello from ractor" - # - # This method is blocking, and will return only when somebody consumes the - # sent message. - # - # r = Ractor.new do - # Ractor.yield 'Hello from ractor' - # puts "Ractor: after yield" - # end - # wait - # puts "Still not taken" - # puts r.take - # - # This will print: - # - # Still not taken - # Hello from ractor - # Ractor: after yield - # - # If the outgoing port was closed with #close_outgoing, the method will raise: - # - # r = Ractor.new do - # close_outgoing - # Ractor.yield 'Hello from ractor' - # end - # wait - # # `yield': The outgoing-port is already closed (Ractor::ClosedError) - # - # The meaning of the +move+ argument is the same as for #send. - def self.yield(obj, move: false) - __builtin_cexpr! %q{ - ractor_yield(ec, rb_ec_ractor_ptr(ec), obj, move) - } - end - - # - # call-seq: - # ractor.take -> msg - # - # Get a message from the ractor's outgoing port, which was put there by Ractor.yield or at ractor's - # termination. - # - # r = Ractor.new do - # Ractor.yield 'explicit yield' - # 'last value' - # end - # puts r.take #=> 'explicit yield' - # puts r.take #=> 'last value' - # puts r.take # Ractor::ClosedError (The outgoing-port is already closed) - # - # The fact that the last value is also sent to the outgoing port means that +take+ can be used - # as an analog of Thread#join ("just wait until ractor finishes"). However, it will raise if - # somebody has already consumed that message. - # - # If the outgoing port was closed with #close_outgoing, the method will raise Ractor::ClosedError. - # - # r = Ractor.new do - # sleep(500) - # Ractor.yield 'Hello from ractor' - # end - # r.close_outgoing - # r.take - # # Ractor::ClosedError (The outgoing-port is already closed) - # # The error would be raised immediately, not when ractor will try to receive - # - # If an uncaught exception is raised in the Ractor, it is propagated by take as a - # Ractor::RemoteError. - # - # r = Ractor.new {raise "Something weird happened"} - # - # begin - # r.take - # rescue => e - # p e # => #<Ractor::RemoteError: thrown by remote Ractor.> - # p e.ractor == r # => true - # p e.cause # => #<RuntimeError: Something weird happened> - # end - # - # Ractor::ClosedError is a descendant of StopIteration, so the termination of the ractor will break - # out of any loops that receive this message without propagating the error: - # - # r = Ractor.new do - # 3.times {|i| Ractor.yield "message #{i}"} - # "finishing" - # end - # - # loop {puts "Received: " + r.take} - # puts "Continue successfully" - # - # This will print: - # - # Received: message 0 - # Received: message 1 - # Received: message 2 - # Received: finishing - # Continue successfully - def take - __builtin_cexpr! %q{ - ractor_take(ec, RACTOR_PTR(self)) - } - end - def inspect loc = __builtin_cexpr! %q{ RACTOR_PTR(self)->loc } name = __builtin_cexpr! %q{ RACTOR_PTR(self)->name } @@ -737,38 +358,13 @@ class Ractor # # call-seq: - # ractor.close_incoming -> true | false # - # Closes the incoming port and returns whether it was already closed. All further attempts - # to Ractor.receive in the ractor, and #send to the ractor will fail with Ractor::ClosedError. # - # r = Ractor.new {sleep(500)} - # r.close_incoming #=> false - # r.close_incoming #=> true - # r.send('test') - # # Ractor::ClosedError (The incoming-port is already closed) - def close_incoming - __builtin_cexpr! %q{ - ractor_close_incoming(ec, RACTOR_PTR(self)); - } - end - - # - # call-seq: - # ractor.close_outgoing -> true | false - # - # Closes the outgoing port and returns whether it was already closed. All further attempts - # to Ractor.yield in the ractor, and #take from the ractor will fail with Ractor::ClosedError. - # - # r = Ractor.new {sleep(500)} - # r.close_outgoing #=> false - # r.close_outgoing #=> true - # r.take - # # Ractor::ClosedError (The outgoing-port is already closed) - def close_outgoing - __builtin_cexpr! %q{ - ractor_close_outgoing(ec, RACTOR_PTR(self)); - } end # @@ -922,4 +518,247 @@ class Ractor } end end end @@ -9,118 +9,36 @@ #define RACTOR_CHECK_MODE (VM_CHECK_MODE || RUBY_DEBUG) && (SIZEOF_UINT64_T == SIZEOF_VALUE) #endif -enum rb_ractor_basket_type { - // basket is empty - basket_type_none, - - // value is available - basket_type_ref, - basket_type_copy, - basket_type_move, - basket_type_will, - - // basket should be deleted - basket_type_deleted, - - // basket is reserved - basket_type_reserved, - - // take_basket is available - basket_type_take_basket, - - // basket is keeping by yielding ractor - basket_type_yielding, -}; - -// per ractor taking configuration -struct rb_ractor_selector_take_config { - bool closed; - bool oneshot; -}; - -struct rb_ractor_basket { - union { - enum rb_ractor_basket_type e; - rb_atomic_t atomic; - } type; - VALUE sender; // Ractor object sending message - rb_thread_t *sending_th; - - union { - struct { - VALUE v; - bool exception; - } send; - - struct { - struct rb_ractor_basket *basket; - struct rb_ractor_selector_take_config *config; - } take; - } p; // payload -}; - -static inline bool -basket_type_p(struct rb_ractor_basket *b, enum rb_ractor_basket_type type) -{ - return b->type.e == type; -} - -static inline bool -basket_none_p(struct rb_ractor_basket *b) -{ - return basket_type_p(b, basket_type_none); -} - -struct rb_ractor_queue { - struct rb_ractor_basket *baskets; - int start; - int cnt; - int size; - unsigned int serial; - unsigned int reserved_cnt; -}; - -enum rb_ractor_wait_status { - wait_none = 0x00, - wait_receiving = 0x01, - wait_taking = 0x02, - wait_yielding = 0x04, - wait_moving = 0x08, -}; - -enum rb_ractor_wakeup_status { - wakeup_none, - wakeup_by_send, - wakeup_by_yield, - wakeup_by_take, - wakeup_by_close, - wakeup_by_interrupt, - wakeup_by_retry, -}; - struct rb_ractor_sync { // ractor lock rb_nativethread_lock_t lock; #if RACTOR_CHECK_MODE > 0 VALUE locked_by; #endif - bool incoming_port_closed; - bool outgoing_port_closed; - // All sent messages will be pushed into recv_queue - struct rb_ractor_queue recv_queue; - // The following ractors waiting for the yielding by this ractor - struct rb_ractor_queue takers_queue; - // Enabled if the ractor already terminated and not taken yet. - struct rb_ractor_basket will_basket; - struct ractor_wait { - struct ccan_list_head waiting_threads; - // each thread has struct ccan_list_node ractor_waiting.waiting_node - } wait; }; // created @@ -146,12 +64,8 @@ enum ractor_status { struct rb_ractor_struct { struct rb_ractor_pub pub; - struct rb_ractor_sync sync; - // vm wide barrier synchronization - rb_nativethread_cond_t barrier_wait_cond; - // thread management struct { struct ccan_list_head set; @@ -162,6 +76,7 @@ struct rb_ractor_struct { rb_execution_context_t *running_ec; rb_thread_t *main; } threads; VALUE thgroup_default; VALUE name; @@ -0,0 +1,1489 @@ @@ -151,7 +151,7 @@ class TestThreadInstrumentation < Test::Unit::TestCase end full_timeline = record do - ractor.take end timeline = timeline_for(Thread.current, full_timeline) @@ -172,7 +172,7 @@ class TestThreadInstrumentation < Test::Unit::TestCase thread = Ractor.new{ sleep 0.1 Thread.current - }.take sleep 0.1 end @@ -8,7 +8,7 @@ class TestDateParseRactor < Test::Unit::TestCase share = #{share} d = Date.parse('Aug 23:55') Ractor.make_shareable(d) if share - d2, d3 = Ractor.new(d) { |d| [d, Date.parse(d.to_s)] }.take if share assert_same d, d2 else @@ -14,7 +14,7 @@ class RactorCompatibilityTest < Test::Unit::TestCase e.corrections # It is important to call the #corrections method within Ractor. e end - }.take assert_correction "Book", error.corrections CODE @@ -32,7 +32,7 @@ class RactorCompatibilityTest < Test::Unit::TestCase e.corrections # It is important to call the #corrections method within Ractor. e end - }.take assert_correction ":bar", error.corrections assert_match "Did you mean? :bar", get_message(error) @@ -49,7 +49,7 @@ class RactorCompatibilityTest < Test::Unit::TestCase e.corrections # It is important to call the #corrections method within Ractor. e end - }.take assert_correction :to_s, error.corrections assert_match "Did you mean? to_s", get_message(error) @@ -71,7 +71,7 @@ class RactorCompatibilityTest < Test::Unit::TestCase e.corrections # It is important to call the #corrections method within Ractor. e end - }.take assert_correction ":foo", error.corrections assert_match "Did you mean? :foo", get_message(error) @@ -90,7 +90,7 @@ class RactorCompatibilityTest < Test::Unit::TestCase e.corrections # It is important to call the #corrections method within Ractor. e end - }.take assert_not_match(/Did you mean\?/, error.message) CODE @@ -108,7 +108,7 @@ class RactorCompatibilityTest < Test::Unit::TestCase e.corrections # It is important to call the #corrections method within Ractor. e end - }.take assert_correction :in_ractor, error.corrections assert_match "Did you mean? in_ractor", get_message(error) @@ -26,7 +26,7 @@ module TestDigestRactor [r, hexdigest] end rs.each do |r, hexdigest| - puts r.take == hexdigest end end; end @@ -198,7 +198,7 @@ class TestEtc < Test::Unit::TestCase raise unless Integer === Etc.nprocessors end end - end.each(&:take) RUBY end @@ -210,7 +210,7 @@ class TestEtc < Test::Unit::TestCase rescue => e e.class end - end.take assert_equal Ractor::UnsafeError, r RUBY end @@ -221,19 +221,19 @@ class TestEtc < Test::Unit::TestCase Etc.endpwent assert_ractor(<<~RUBY, require: 'etc') - ractor = Ractor.new do Etc.passwd do |s| - Ractor.yield :sync - Ractor.yield s.name break :done end end - ractor.take # => :sync assert_raise RuntimeError, /parallel/ do Etc.passwd {} end - name = ractor.take # => first name - ractor.take # => :done name2 = Etc.passwd do |s| break s.name end @@ -251,7 +251,7 @@ class TestEtc < Test::Unit::TestCase raise unless Etc.getgrgid(Process.gid).gid == Process.gid end end - end.each(&:take) RUBY end end @@ -17,7 +17,7 @@ class TestFiberCurrentRactor < Test::Unit::TestCase Fiber.current.class end.resume end - assert_equal(Fiber, r.take) end; end end @@ -18,7 +18,7 @@ class TestIOConsoleInRactor < Test::Unit::TestCase else true # should not success end - puts r.take end; assert_in_out_err(%W[-r#{path}], "#{<<~"begin;"}\n#{<<~'end;'}", ["true"], []) @@ -28,7 +28,7 @@ class TestIOConsoleInRactor < Test::Unit::TestCase r = Ractor.new do IO.console end - puts console.class == r.take.class end; end end if defined? Ractor @@ -11,7 +11,7 @@ class TestIOWaitInRactor < Test::Unit::TestCase r = Ractor.new do $stdout.equal?($stdout.wait_writable) end - puts r.take end; end end if defined? Ractor @@ -25,7 +25,7 @@ class JSONInRactorTest < Test::Unit::TestCase end expected_json = JSON.parse('{"a":2,"b":3.141,"c":"c","d":[1,"b",3.14],"e":{"foo":"bar"},' + '"g":"\\"\\u0000\\u001f","h":1000.0,"i":0.001}') - actual_json = r.take if expected_json == actual_json exit 0 @@ -5,12 +5,10 @@ class TestObjSpaceRactor < Test::Unit::TestCase assert_ractor(<<~RUBY, require: 'objspace') ObjectSpace.trace_object_allocations do r = Ractor.new do - obj = 'a' * 1024 - Ractor.yield obj end - r.take - r.take end RUBY end @@ -30,7 +28,7 @@ class TestObjSpaceRactor < Test::Unit::TestCase end end - ractors.each(&:take) RUBY end @@ -51,7 +49,7 @@ class TestObjSpaceRactor < Test::Unit::TestCase end end - ractors.each(&:take) RUBY end end @@ -15,7 +15,7 @@ class TestPathnameRactor < Test::Unit::TestCase r = Ractor.new Pathname("a") do |x| x.join(Pathname("b"), Pathname("c")) end - assert_equal(Pathname("a/b/c"), r.take) end; end end @@ -62,7 +62,7 @@ module Prism if reader reader.gets.chomp else - puts(ignore_warnings { Ractor.new(*arguments, &block) }.take) end end end @@ -7,7 +7,7 @@ class TestPsychRactor < Test::Unit::TestCase obj = {foo: [42]} obj2 = Ractor.new(obj) do |obj| Psych.unsafe_load(Psych.dump(obj)) - end.take assert_equal obj, obj2 RUBY end @@ -33,7 +33,7 @@ class TestPsychRactor < Test::Unit::TestCase val * 2 end Psych.load('--- !!omap hello') - end.take assert_equal 'hellohello', r assert_equal 'hello', Psych.load('--- !!omap hello') RUBY @@ -43,7 +43,7 @@ class TestPsychRactor < Test::Unit::TestCase assert_ractor(<<~RUBY, require_relative: 'helper') r = Ractor.new do Psych.libyaml_version.join('.') == Psych::LIBYAML_VERSION - end.take assert_equal true, r RUBY end @@ -130,7 +130,7 @@ class TestEncoding < Test::Unit::TestCase def test_ractor_load_encoding assert_ractor("#{<<~"begin;"}\n#{<<~'end;'}") begin; - Ractor.new{}.take $-w = nil Encoding.default_external = Encoding::ISO8859_2 assert "[Bug #19562]" @@ -601,13 +601,13 @@ class TestEnv < Test::Unit::TestCase rescue Exception => e #{exception_var} = e end - Ractor.yield #{exception_var}.class end; end def str_for_assert_raise_on_yielded_exception_class(expected_error_class, ractor_var) <<-"end;" - error_class = #{ractor_var}.take assert_raise(#{expected_error_class}) do if error_class < Exception raise error_class @@ -649,100 +649,101 @@ class TestEnv < Test::Unit::TestCase def test_bracket_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do - Ractor.yield ENV['test'] - Ractor.yield ENV['TEST'] ENV['test'] = 'foo' - Ractor.yield ENV['test'] - Ractor.yield ENV['TEST'] ENV['TEST'] = 'bar' - Ractor.yield ENV['TEST'] - Ractor.yield ENV['test'] #{str_for_yielding_exception_class("ENV[1]")} #{str_for_yielding_exception_class("ENV[1] = 'foo'")} #{str_for_yielding_exception_class("ENV['test'] = 0")} end - assert_nil(r.take) - assert_nil(r.take) - assert_equal('foo', r.take) if #{ignore_case_str} - assert_equal('foo', r.take) else - assert_nil(r.take) end - assert_equal('bar', r.take) if #{ignore_case_str} - assert_equal('bar', r.take) else - assert_equal('foo', r.take) end 3.times do - #{str_for_assert_raise_on_yielded_exception_class(TypeError, "r")} end end; end def test_dup_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do #{str_for_yielding_exception_class("ENV.dup")} end - #{str_for_assert_raise_on_yielded_exception_class(TypeError, "r")} end; end def test_has_value_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do val = 'a' val.succ! while ENV.has_value?(val) || ENV.has_value?(val.upcase) ENV['test'] = val[0...-1] - Ractor.yield(ENV.has_value?(val)) - Ractor.yield(ENV.has_value?(val.upcase)) ENV['test'] = val - Ractor.yield(ENV.has_value?(val)) - Ractor.yield(ENV.has_value?(val.upcase)) ENV['test'] = val.upcase - Ractor.yield ENV.has_value?(val) - Ractor.yield ENV.has_value?(val.upcase) - end - assert_equal(false, r.take) - assert_equal(false, r.take) - assert_equal(true, r.take) - assert_equal(false, r.take) - assert_equal(false, r.take) - assert_equal(true, r.take) end; end def test_key_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do val = 'a' val.succ! while ENV.has_value?(val) || ENV.has_value?(val.upcase) ENV['test'] = val[0...-1] - Ractor.yield ENV.key(val) - Ractor.yield ENV.key(val.upcase) ENV['test'] = val - Ractor.yield ENV.key(val) - Ractor.yield ENV.key(val.upcase) ENV['test'] = val.upcase - Ractor.yield ENV.key(val) - Ractor.yield ENV.key(val.upcase) end - assert_nil(r.take) - assert_nil(r.take) if #{ignore_case_str} - assert_equal('TEST', r.take.upcase) else - assert_equal('test', r.take) end - assert_nil(r.take) - assert_nil(r.take) if #{ignore_case_str} - assert_equal('TEST', r.take.upcase) else - assert_equal('test', r.take) end end; @@ -750,87 +751,87 @@ class TestEnv < Test::Unit::TestCase def test_delete_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do #{str_to_yield_invalid_envvar_errors("v", "ENV.delete(v)")} - Ractor.yield ENV.delete("TEST") #{str_for_yielding_exception_class("ENV.delete('#{PATH_ENV}')")} - Ractor.yield(ENV.delete("TEST"){|name| "NO "+name}) end - #{str_to_receive_invalid_envvar_errors("r")} - assert_nil(r.take) - exception_class = r.take assert_equal(NilClass, exception_class) - assert_equal("NO TEST", r.take) end; end def test_getenv_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do #{str_to_yield_invalid_envvar_errors("v", "ENV[v]")} ENV["#{PATH_ENV}"] = "" - Ractor.yield ENV["#{PATH_ENV}"] - Ractor.yield ENV[""] end - #{str_to_receive_invalid_envvar_errors("r")} - assert_equal("", r.take) - assert_nil(r.take) end; end def test_fetch_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do ENV["test"] = "foo" - Ractor.yield ENV.fetch("test") ENV.delete("test") #{str_for_yielding_exception_class("ENV.fetch('test')", exception_var: "ex")} - Ractor.yield ex.receiver.object_id - Ractor.yield ex.key - Ractor.yield ENV.fetch("test", "foo") - Ractor.yield(ENV.fetch("test"){"bar"}) #{str_to_yield_invalid_envvar_errors("v", "ENV.fetch(v)")} #{str_for_yielding_exception_class("ENV.fetch('#{PATH_ENV}', 'foo')")} ENV['#{PATH_ENV}'] = "" - Ractor.yield ENV.fetch('#{PATH_ENV}') - end - assert_equal("foo", r.take) - #{str_for_assert_raise_on_yielded_exception_class(KeyError, "r")} - assert_equal(ENV.object_id, r.take) - assert_equal("test", r.take) - assert_equal("foo", r.take) - assert_equal("bar", r.take) - #{str_to_receive_invalid_envvar_errors("r")} - exception_class = r.take assert_equal(NilClass, exception_class) - assert_equal("", r.take) end; end def test_aset_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do #{str_for_yielding_exception_class("ENV['test'] = nil")} ENV["test"] = nil - Ractor.yield ENV["test"] #{str_to_yield_invalid_envvar_errors("v", "ENV[v] = 'test'")} #{str_to_yield_invalid_envvar_errors("v", "ENV['test'] = v")} end - exception_class = r.take assert_equal(NilClass, exception_class) - assert_nil(r.take) - #{str_to_receive_invalid_envvar_errors("r")} - #{str_to_receive_invalid_envvar_errors("r")} end; end def test_keys_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do a = ENV.keys - Ractor.yield a end - a = r.take assert_kind_of(Array, a) a.each {|k| assert_kind_of(String, k) } end; @@ -839,11 +840,11 @@ class TestEnv < Test::Unit::TestCase def test_each_key_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do - ENV.each_key {|k| Ractor.yield(k)} - Ractor.yield "finished" end - while((x=r.take) != "finished") assert_kind_of(String, x) end end; @@ -851,11 +852,11 @@ class TestEnv < Test::Unit::TestCase def test_values_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do a = ENV.values - Ractor.yield a end - a = r.take assert_kind_of(Array, a) a.each {|k| assert_kind_of(String, k) } end; @@ -863,11 +864,11 @@ class TestEnv < Test::Unit::TestCase def test_each_value_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do - ENV.each_value {|k| Ractor.yield(k)} - Ractor.yield "finished" end - while((x=r.take) != "finished") assert_kind_of(String, x) end end; @@ -875,11 +876,11 @@ class TestEnv < Test::Unit::TestCase def test_each_pair_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do - ENV.each_pair {|k, v| Ractor.yield([k,v])} - Ractor.yield "finished" end - while((k,v=r.take) != "finished") assert_kind_of(String, k) assert_kind_of(String, v) end @@ -888,116 +889,116 @@ class TestEnv < Test::Unit::TestCase def test_reject_bang_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do h1 = {} ENV.each_pair {|k, v| h1[k] = v } ENV["test"] = "foo" ENV.reject! {|k, v| #{ignore_case_str} ? k.upcase == "TEST" : k == "test" } h2 = {} ENV.each_pair {|k, v| h2[k] = v } - Ractor.yield [h1, h2] - Ractor.yield(ENV.reject! {|k, v| #{ignore_case_str} ? k.upcase == "TEST" : k == "test" }) end - h1, h2 = r.take assert_equal(h1, h2) - assert_nil(r.take) end; end def test_delete_if_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do h1 = {} ENV.each_pair {|k, v| h1[k] = v } ENV["test"] = "foo" ENV.delete_if {|k, v| #{ignore_case_str} ? k.upcase == "TEST" : k == "test" } h2 = {} ENV.each_pair {|k, v| h2[k] = v } - Ractor.yield [h1, h2] - Ractor.yield (ENV.delete_if {|k, v| #{ignore_case_str} ? k.upcase == "TEST" : k == "test" }) end - h1, h2 = r.take assert_equal(h1, h2) - assert_same(ENV, r.take) end; end def test_select_bang_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do h1 = {} ENV.each_pair {|k, v| h1[k] = v } ENV["test"] = "foo" ENV.select! {|k, v| #{ignore_case_str} ? k.upcase != "TEST" : k != "test" } h2 = {} ENV.each_pair {|k, v| h2[k] = v } - Ractor.yield [h1, h2] - Ractor.yield(ENV.select! {|k, v| #{ignore_case_str} ? k.upcase != "TEST" : k != "test" }) end - h1, h2 = r.take assert_equal(h1, h2) - assert_nil(r.take) end; end def test_filter_bang_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do h1 = {} ENV.each_pair {|k, v| h1[k] = v } ENV["test"] = "foo" ENV.filter! {|k, v| #{ignore_case_str} ? k.upcase != "TEST" : k != "test" } h2 = {} ENV.each_pair {|k, v| h2[k] = v } - Ractor.yield [h1, h2] - Ractor.yield(ENV.filter! {|k, v| #{ignore_case_str} ? k.upcase != "TEST" : k != "test" }) end - h1, h2 = r.take assert_equal(h1, h2) - assert_nil(r.take) end; end def test_keep_if_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do h1 = {} ENV.each_pair {|k, v| h1[k] = v } ENV["test"] = "foo" ENV.keep_if {|k, v| #{ignore_case_str} ? k.upcase != "TEST" : k != "test" } h2 = {} ENV.each_pair {|k, v| h2[k] = v } - Ractor.yield [h1, h2] - Ractor.yield (ENV.keep_if {|k, v| #{ignore_case_str} ? k.upcase != "TEST" : k != "test" }) end - h1, h2 = r.take assert_equal(h1, h2) - assert_equal(ENV, r.take) end; end def test_values_at_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do ENV["test"] = "foo" - Ractor.yield ENV.values_at("test", "test") end - assert_equal(["foo", "foo"], r.take) end; end def test_select_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do ENV["test"] = "foo" h = ENV.select {|k| #{ignore_case_str} ? k.upcase == "TEST" : k == "test" } - Ractor.yield h.size k = h.keys.first v = h.values.first - Ractor.yield [k, v] end - assert_equal(1, r.take) - k, v = r.take if #{ignore_case_str} assert_equal("TEST", k.upcase) assert_equal("FOO", v.upcase) @@ -1010,16 +1011,16 @@ class TestEnv < Test::Unit::TestCase def test_filter_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do ENV["test"] = "foo" h = ENV.filter {|k| #{ignore_case_str} ? k.upcase == "TEST" : k == "test" } - Ractor.yield(h.size) k = h.keys.first v = h.values.first - Ractor.yield [k, v] end - assert_equal(1, r.take) - k, v = r.take if #{ignore_case_str} assert_equal("TEST", k.upcase) assert_equal("FOO", v.upcase) @@ -1032,49 +1033,49 @@ class TestEnv < Test::Unit::TestCase def test_slice_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do ENV.clear ENV["foo"] = "bar" ENV["baz"] = "qux" ENV["bar"] = "rab" - Ractor.yield(ENV.slice()) - Ractor.yield(ENV.slice("")) - Ractor.yield(ENV.slice("unknown")) - Ractor.yield(ENV.slice("foo", "baz")) - end - assert_equal({}, r.take) - assert_equal({}, r.take) - assert_equal({}, r.take) - assert_equal({"foo"=>"bar", "baz"=>"qux"}, r.take) end; end def test_except_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do ENV.clear ENV["foo"] = "bar" ENV["baz"] = "qux" ENV["bar"] = "rab" - Ractor.yield ENV.except() - Ractor.yield ENV.except("") - Ractor.yield ENV.except("unknown") - Ractor.yield ENV.except("foo", "baz") - end - assert_equal({"bar"=>"rab", "baz"=>"qux", "foo"=>"bar"}, r.take) - assert_equal({"bar"=>"rab", "baz"=>"qux", "foo"=>"bar"}, r.take) - assert_equal({"bar"=>"rab", "baz"=>"qux", "foo"=>"bar"}, r.take) - assert_equal({"bar"=>"rab"}, r.take) end; end def test_clear_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do ENV.clear - Ractor.yield ENV.size end - assert_equal(0, r.take) end; end @@ -1083,20 +1084,20 @@ class TestEnv < Test::Unit::TestCase r = Ractor.new do ENV.to_s end - assert_equal("ENV", r.take) end; end def test_inspect_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do ENV.clear ENV["foo"] = "bar" ENV["baz"] = "qux" s = ENV.inspect - Ractor.yield s end - s = r.take expected = ['"foo" => "bar"', '"baz" => "qux"'] unless s.start_with?(/\{"foo"/i) expected.reverse! @@ -1112,14 +1113,14 @@ class TestEnv < Test::Unit::TestCase def test_to_a_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do ENV.clear ENV["foo"] = "bar" ENV["baz"] = "qux" a = ENV.to_a - Ractor.yield a end - a = r.take assert_equal(2, a.size) expected = [%w(baz qux), %w(foo bar)] if #{ignore_case_str} @@ -1136,59 +1137,59 @@ class TestEnv < Test::Unit::TestCase r = Ractor.new do ENV.rehash end - assert_nil(r.take) end; end def test_size_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do s = ENV.size ENV["test"] = "foo" - Ractor.yield [s, ENV.size] end - s, s2 = r.take assert_equal(s + 1, s2) end; end def test_empty_p_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do ENV.clear - Ractor.yield ENV.empty? ENV["test"] = "foo" - Ractor.yield ENV.empty? end - assert r.take - assert !r.take end; end def test_has_key_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do - Ractor.yield ENV.has_key?("test") ENV["test"] = "foo" - Ractor.yield ENV.has_key?("test") #{str_to_yield_invalid_envvar_errors("v", "ENV.has_key?(v)")} end - assert !r.take - assert r.take - #{str_to_receive_invalid_envvar_errors("r")} end; end def test_assoc_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do - Ractor.yield ENV.assoc("test") ENV["test"] = "foo" - Ractor.yield ENV.assoc("test") #{str_to_yield_invalid_envvar_errors("v", "ENV.assoc(v)")} end - assert_nil(r.take) - k, v = r.take if #{ignore_case_str} assert_equal("TEST", k.upcase) assert_equal("FOO", v.upcase) @@ -1196,7 +1197,7 @@ class TestEnv < Test::Unit::TestCase assert_equal("test", k) assert_equal("foo", v) end - #{str_to_receive_invalid_envvar_errors("r")} encoding = /mswin|mingw/ =~ RUBY_PLATFORM ? Encoding::UTF_8 : Encoding.find("locale") assert_equal(encoding, v.encoding) end; @@ -1204,29 +1205,29 @@ class TestEnv < Test::Unit::TestCase def test_has_value2_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do ENV.clear - Ractor.yield ENV.has_value?("foo") ENV["test"] = "foo" - Ractor.yield ENV.has_value?("foo") end - assert !r.take - assert r.take end; end def test_rassoc_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do ENV.clear - Ractor.yield ENV.rassoc("foo") ENV["foo"] = "bar" ENV["test"] = "foo" ENV["baz"] = "qux" - Ractor.yield ENV.rassoc("foo") end - assert_nil(r.take) - k, v = r.take if #{ignore_case_str} assert_equal("TEST", k.upcase) assert_equal("FOO", v.upcase) @@ -1239,39 +1240,39 @@ class TestEnv < Test::Unit::TestCase def test_to_hash_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do h = {} ENV.each {|k, v| h[k] = v } - Ractor.yield [h, ENV.to_hash] end - h, h2 = r.take assert_equal(h, h2) end; end def test_to_h_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do - Ractor.yield [ENV.to_hash, ENV.to_h] - Ractor.yield [ENV.map {|k, v| ["$\#{k}", v.size]}.to_h, ENV.to_h {|k, v| ["$\#{k}", v.size]}] end - a, b = r.take assert_equal(a,b) - c, d = r.take assert_equal(c,d) end; end def test_reject_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do h1 = {} ENV.each_pair {|k, v| h1[k] = v } ENV["test"] = "foo" h2 = ENV.reject {|k, v| #{ignore_case_str} ? k.upcase == "TEST" : k == "test" } - Ractor.yield [h1, h2] end - h1, h2 = r.take assert_equal(h1, h2) end; end @@ -1279,86 +1280,86 @@ class TestEnv < Test::Unit::TestCase def test_shift_in_ractor assert_ractor(<<-"end;") #{STR_DEFINITION_FOR_CHECK} - r = Ractor.new do ENV.clear ENV["foo"] = "bar" ENV["baz"] = "qux" a = ENV.shift b = ENV.shift - Ractor.yield [a,b] - Ractor.yield ENV.shift end - a,b = r.take check([a, b], [%w(foo bar), %w(baz qux)]) - assert_nil(r.take) end; end def test_invert_in_ractor assert_ractor(<<-"end;") #{STR_DEFINITION_FOR_CHECK} - r = Ractor.new do ENV.clear ENV["foo"] = "bar" ENV["baz"] = "qux" - Ractor.yield(ENV.invert) end - check(r.take.to_a, [%w(bar foo), %w(qux baz)]) end; end def test_replace_in_ractor assert_ractor(<<-"end;") #{STR_DEFINITION_FOR_CHECK} - r = Ractor.new do ENV["foo"] = "xxx" ENV.replace({"foo"=>"bar", "baz"=>"qux"}) - Ractor.yield ENV.to_hash ENV.replace({"Foo"=>"Bar", "Baz"=>"Qux"}) - Ractor.yield ENV.to_hash end - check(r.take.to_a, [%w(foo bar), %w(baz qux)]) - check(r.take.to_a, [%w(Foo Bar), %w(Baz Qux)]) end; end def test_update_in_ractor assert_ractor(<<-"end;") #{STR_DEFINITION_FOR_CHECK} - r = Ractor.new do ENV.clear ENV["foo"] = "bar" ENV["baz"] = "qux" ENV.update({"baz"=>"quux","a"=>"b"}) - Ractor.yield ENV.to_hash ENV.clear ENV["foo"] = "bar" ENV["baz"] = "qux" ENV.update({"baz"=>"quux","a"=>"b"}) {|k, v1, v2| k + "_" + v1 + "_" + v2 } - Ractor.yield ENV.to_hash end - check(r.take.to_a, [%w(foo bar), %w(baz quux), %w(a b)]) - check(r.take.to_a, [%w(foo bar), %w(baz baz_qux_quux), %w(a b)]) end; end def test_huge_value_in_ractor assert_ractor(<<-"end;") huge_value = "bar" * 40960 - r = Ractor.new huge_value do |v| ENV["foo"] = "bar" #{str_for_yielding_exception_class("ENV['foo'] = v ")} - Ractor.yield ENV["foo"] end if /mswin|ucrt/ =~ RUBY_PLATFORM - #{str_for_assert_raise_on_yielded_exception_class(Errno::EINVAL, "r")} - result = r.take assert_equal("bar", result) else - exception_class = r.take assert_equal(NilClass, exception_class) - result = r.take assert_equal(huge_value, result) end end; @@ -1366,34 +1367,34 @@ class TestEnv < Test::Unit::TestCase def test_frozen_env_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do #{str_for_yielding_exception_class("ENV.freeze")} end - #{str_for_assert_raise_on_yielded_exception_class(TypeError, "r")} end; end def test_frozen_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do ENV["#{PATH_ENV}"] = "/" ENV.each do |k, v| - Ractor.yield [k.frozen?] - Ractor.yield [v.frozen?] end ENV.each_key do |k| - Ractor.yield [k.frozen?] end ENV.each_value do |v| - Ractor.yield [v.frozen?] end ENV.each_key do |k| - Ractor.yield [ENV[k].frozen?, "[\#{k.dump}]"] - Ractor.yield [ENV.fetch(k).frozen?, "fetch(\#{k.dump})"] end - Ractor.yield "finished" end - while((params=r.take) != "finished") assert(*params) end end; @@ -1401,7 +1402,7 @@ class TestEnv < Test::Unit::TestCase def test_shared_substring_in_ractor assert_ractor(<<-"end;") - r = Ractor.new do bug12475 = '[ruby-dev:49655] [Bug #12475]' n = [*"0".."9"].join("")*3 e0 = ENV[n0 = "E\#{n}"] @@ -1411,9 +1412,9 @@ class TestEnv < Test::Unit::TestCase ENV[n1.chop] = "T\#{n}.".chop ENV[n0], e0 = e0, ENV[n0] ENV[n1], e1 = e1, ENV[n1] - Ractor.yield [n, e0, e1, bug12475] end - n, e0, e1, bug12475 = r.take assert_equal("T\#{n}", e0, bug12475) assert_nil(e1, bug12475) end; @@ -1429,7 +1430,7 @@ class TestEnv < Test::Unit::TestCase rescue Ractor::IsolationError => e e end - assert_equal Ractor::IsolationError, r_get.take.class r_get = Ractor.new do ENV.instance_eval{ @a } @@ -1437,7 +1438,7 @@ class TestEnv < Test::Unit::TestCase e end - assert_equal Ractor::IsolationError, r_get.take.class r_set = Ractor.new do ENV.instance_eval{ @b = "hello" } @@ -1445,7 +1446,7 @@ class TestEnv < Test::Unit::TestCase e end - assert_equal Ractor::IsolationError, r_set.take.class RUBY end @@ -808,7 +808,7 @@ class TestISeq < Test::Unit::TestCase GC.start Float(30) } - assert_equal :new, r.take RUBY end @@ -335,7 +335,7 @@ class TestMemoryView < Test::Unit::TestCase p mv[[0, 2]] mv[[1, 3]] end - p r.take end; end end @@ -74,7 +74,7 @@ class TestRactor < Test::Unit::TestCase Warning[:experimental] = false main_ractor_id = Thread.current.group.object_id - ractor_id = Ractor.new { Thread.current.group.object_id }.take refute_equal main_ractor_id, ractor_id end; end @@ -93,7 +93,7 @@ class TestRactor < Test::Unit::TestCase else nil end - end.take assert_equal "uh oh", err_msg RUBY end @@ -596,8 +596,8 @@ class TestShapes < Test::Unit::TestCase assert_predicate RubyVM::Shape.of(tc), :too_complex? assert_equal 3, tc.very_unique - assert_equal 3, Ractor.new(tc) { |x| Ractor.yield(x.very_unique) }.take - assert_equal tc.instance_variables.sort, Ractor.new(tc) { |x| Ractor.yield(x.instance_variables) }.take.sort end; end @@ -699,10 +699,10 @@ class TestShapes < Test::Unit::TestCase r = Ractor.new do o = Object.new o.instance_variable_set(:@a, "hello") - Ractor.yield(o) end - o = r.take assert_equal "hello", o.instance_variable_get(:@a) end; end @@ -717,10 +717,10 @@ class TestShapes < Test::Unit::TestCase r = Ractor.new do o = [] o.instance_variable_set(:@a, "hello") - Ractor.yield(o) end - o = r.take assert_equal "hello", o.instance_variable_get(:@a) end; end @@ -17,7 +17,7 @@ class TestStringIOInRactor < Test::Unit::TestCase io.puts "def" "\0\0\0\0def\n" == io.string end - puts r.take end; end end @@ -22,7 +22,7 @@ class TestStringScannerRactor < Test::Unit::TestCase s.scan(/\\w+/) ] end - puts r.take.compact end; end end @@ -60,7 +60,7 @@ class TestRbConfig < Test::Unit::TestCase [sizeof_int, fixnum_max] end - sizeof_int, fixnum_max = r.take assert_kind_of Integer, sizeof_int, "RbConfig::SIZEOF['int'] should be an Integer" assert_kind_of Integer, fixnum_max, "RbConfig::LIMITS['FIXNUM_MAX'] should be an Integer" @@ -74,7 +74,7 @@ class TestTimeExtension < Test::Unit::TestCase # :nodoc: if defined?(Ractor) def test_rfc2822_ractor assert_ractor(<<~RUBY, require: 'time') - actual = Ractor.new { Time.rfc2822("Fri, 21 Nov 1997 09:55:06 -0600") }.take assert_equal(Time.utc(1997, 11, 21, 9, 55, 6) + 6 * 3600, actual) RUBY end @@ -134,16 +134,17 @@ class TestTmpdir < Test::Unit::TestCase def test_ractor assert_ractor(<<~'end;', require: "tmpdir") - r = Ractor.new do Dir.mktmpdir() do |d| - Ractor.yield d Ractor.receive end end - dir = r.take assert_file.directory? dir r.send true - r.take assert_file.not_exist? dir end; end @@ -75,7 +75,7 @@ class URI::TestCommon < Test::Unit::TestCase return unless defined?(Ractor) assert_ractor(<<~RUBY, require: 'uri') r = Ractor.new { URI.parse("https://ruby-lang.org/").inspect } - assert_equal(URI.parse("https://ruby-lang.org/").inspect, r.take) RUBY end @@ -526,9 +526,6 @@ thread_cleanup_func(void *th_ptr, int atfork) } rb_native_mutex_destroy(&th->interrupt_lock); -#ifndef RUBY_THREAD_PTHREAD_H - rb_native_cond_destroy(&th->ractor_waiting.cond); -#endif } static VALUE rb_threadptr_raise(rb_thread_t *, int, VALUE *); @@ -6174,6 +6171,8 @@ threadptr_interrupt_exec_exec(rb_thread_t *th) } rb_native_mutex_unlock(&th->interrupt_lock); if (task) { (*task->func)(task->data); ruby_xfree(task); @@ -6228,6 +6227,8 @@ rb_ractor_interrupt_exec(struct rb_ractor_struct *target_r, { struct interrupt_ractor_new_thread_data *d = ALLOC(struct interrupt_ractor_new_thread_data); d->func = func; d->data = data; rb_thread_t *main_th = target_r->threads.main; @@ -374,40 +374,47 @@ ractor_sched_dump_(const char *file, int line, rb_vm_t *vm) #define thread_sched_unlock(a, b) thread_sched_unlock_(a, b, __FILE__, __LINE__) static void -thread_sched_lock_(struct rb_thread_sched *sched, rb_thread_t *th, const char *file, int line) { - rb_native_mutex_lock(&sched->lock_); - -#if VM_CHECK_MODE - RUBY_DEBUG_LOG2(file, line, "th:%u prev_owner:%u", rb_th_serial(th), rb_th_serial(sched->lock_owner)); VM_ASSERT(sched->lock_owner == NULL); sched->lock_owner = th; -#else - RUBY_DEBUG_LOG2(file, line, "th:%u", rb_th_serial(th)); #endif } static void -thread_sched_unlock_(struct rb_thread_sched *sched, rb_thread_t *th, const char *file, int line) { - RUBY_DEBUG_LOG2(file, line, "th:%u", rb_th_serial(th)); - -#if VM_CHECK_MODE VM_ASSERT(sched->lock_owner == th); sched->lock_owner = NULL; #endif - - rb_native_mutex_unlock(&sched->lock_); } static void -thread_sched_set_lock_owner(struct rb_thread_sched *sched, rb_thread_t *th) { - RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); -#if VM_CHECK_MODE > 0 - sched->lock_owner = th; #endif } static void @@ -542,7 +549,6 @@ ractor_sched_timeslice_threads_contain_p(rb_vm_t *vm, rb_thread_t *th) } static void ractor_sched_barrier_join_signal_locked(rb_vm_t *vm); -static void ractor_sched_barrier_join_wait_locked(rb_vm_t *vm, rb_thread_t *th); // setup timeslice signals by the timer thread. static void @@ -585,11 +591,10 @@ thread_sched_setup_running_threads(struct rb_thread_sched *sched, rb_ractor_t *c } if (add_th) { - while (UNLIKELY(vm->ractor.sched.barrier_waiting)) { - RUBY_DEBUG_LOG("barrier-wait"); - - ractor_sched_barrier_join_signal_locked(vm); - ractor_sched_barrier_join_wait_locked(vm, add_th); } VM_ASSERT(!ractor_sched_running_threads_contain_p(vm, add_th)); @@ -598,7 +603,6 @@ thread_sched_setup_running_threads(struct rb_thread_sched *sched, rb_ractor_t *c ccan_list_add(&vm->ractor.sched.running_threads, &add_th->sched.node.running_threads); vm->ractor.sched.running_cnt++; sched->is_running = true; - VM_ASSERT(!vm->ractor.sched.barrier_waiting); } if (add_timeslice_th) { @@ -622,19 +626,6 @@ thread_sched_setup_running_threads(struct rb_thread_sched *sched, rb_ractor_t *c } ractor_sched_unlock(vm, cr); - if (add_th && !del_th && UNLIKELY(vm->ractor.sync.lock_owner != NULL)) { - // it can be after barrier synchronization by another ractor - rb_thread_t *lock_owner = NULL; -#if VM_CHECK_MODE - lock_owner = sched->lock_owner; -#endif - thread_sched_unlock(sched, lock_owner); - { - RB_VM_LOCKING(); - } - thread_sched_lock(sched, lock_owner); - } - //RUBY_DEBUG_LOG("+:%u -:%u +ts:%u -ts:%u run:%u->%u", // rb_th_serial(add_th), rb_th_serial(del_th), // rb_th_serial(add_timeslice_th), rb_th_serial(del_timeslice_th), @@ -753,7 +744,8 @@ thread_sched_enq(struct rb_thread_sched *sched, rb_thread_t *ready_th) } } else { - VM_ASSERT(!ractor_sched_timeslice_threads_contain_p(ready_th->vm, sched->running)); } ccan_list_add_tail(&sched->readyq, &ready_th->sched.node.readyq); @@ -849,12 +841,12 @@ thread_sched_wait_running_turn(struct rb_thread_sched *sched, rb_thread_t *th, b if (th_has_dedicated_nt(th)) { RUBY_DEBUG_LOG("(nt) sleep th:%u running:%u", rb_th_serial(th), rb_th_serial(sched->running)); - thread_sched_set_lock_owner(sched, NULL); { RUBY_DEBUG_LOG("nt:%d cond:%p", th->nt->serial, &th->nt->cond.readyq); rb_native_cond_wait(&th->nt->cond.readyq, &sched->lock_); } - thread_sched_set_lock_owner(sched, th); RUBY_DEBUG_LOG("(nt) wakeup %s", sched->running == th ? "success" : "failed"); if (th == sched->running) { @@ -870,12 +862,12 @@ thread_sched_wait_running_turn(struct rb_thread_sched *sched, rb_thread_t *th, b RUBY_DEBUG_LOG("th:%u->%u (direct)", rb_th_serial(th), rb_th_serial(next_th)); - thread_sched_set_lock_owner(sched, NULL); { rb_ractor_set_current_ec(th->ractor, NULL); thread_sched_switch(th, next_th); } - thread_sched_set_lock_owner(sched, th); } else { // search another ready ractor @@ -884,12 +876,12 @@ thread_sched_wait_running_turn(struct rb_thread_sched *sched, rb_thread_t *th, b RUBY_DEBUG_LOG("th:%u->%u (ractor scheduling)", rb_th_serial(th), rb_th_serial(next_th)); - thread_sched_set_lock_owner(sched, NULL); { rb_ractor_set_current_ec(th->ractor, NULL); coroutine_transfer0(th->sched.context, nt->nt_context, false); } - thread_sched_set_lock_owner(sched, th); } VM_ASSERT(rb_current_ec_noinline() == th->ec); @@ -1041,15 +1033,45 @@ thread_sched_to_waiting(struct rb_thread_sched *sched, rb_thread_t *th) } // mini utility func -static void -setup_ubf(rb_thread_t *th, rb_unblock_function_t *func, void *arg) { rb_native_mutex_lock(&th->interrupt_lock); { th->unblock.func = func; th->unblock.arg = arg; } rb_native_mutex_unlock(&th->interrupt_lock); } static void @@ -1085,7 +1107,10 @@ thread_sched_to_waiting_until_wakeup(struct rb_thread_sched *sched, rb_thread_t RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); RB_VM_SAVE_MACHINE_CONTEXT(th); - setup_ubf(th, ubf_waiting, (void *)th); RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th); @@ -1102,7 +1127,7 @@ thread_sched_to_waiting_until_wakeup(struct rb_thread_sched *sched, rb_thread_t } thread_sched_unlock(sched, th); - setup_ubf(th, NULL, NULL); } // run another thread in the ready queue. @@ -1311,66 +1336,59 @@ void rb_ractor_unlock_self(rb_ractor_t *r); // The current thread for a ractor is put to "sleep" (descheduled in the STOPPED_FOREVER state) waiting for // a ractor action to wake it up. See docs for `ractor_sched_sleep_with_cleanup` for more info. void -rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf_schedule_ractor_th) { // ractor lock of cr is acquired - // r is sleeping status rb_thread_t * volatile th = rb_ec_thread_ptr(ec); struct rb_thread_sched *sched = TH_SCHED(th); - struct ccan_list_node *waitn = &th->ractor_waiting.waiting_node; - VM_ASSERT(waitn->next == waitn->prev && waitn->next == waitn); // it should be unlinked - ccan_list_add(&cr->sync.wait.waiting_threads, waitn); - setup_ubf(th, ubf_schedule_ractor_th, (void *)ec); thread_sched_lock(sched, th); { rb_ractor_unlock_self(cr); { - if (RUBY_VM_INTERRUPTED(th->ec)) { - RUBY_DEBUG_LOG("interrupted"); - } - else if (th->ractor_waiting.wakeup_status != wakeup_none) { - RUBY_DEBUG_LOG("awaken:%d", (int)th->ractor_waiting.wakeup_status); - } - else { - // sleep - RB_VM_SAVE_MACHINE_CONTEXT(th); - th->status = THREAD_STOPPED_FOREVER; - - RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th); - - bool can_direct_transfer = !th_has_dedicated_nt(th); - thread_sched_wakeup_next_thread(sched, th, can_direct_transfer); - thread_sched_wait_running_turn(sched, th, can_direct_transfer); - th->status = THREAD_RUNNABLE; - // wakeup - } } } thread_sched_unlock(sched, th); - setup_ubf(th, NULL, NULL); - rb_ractor_lock_self(cr); - ccan_list_del_init(waitn); } void -rb_ractor_sched_wakeup(rb_ractor_t *r, rb_thread_t *th) { - // ractor lock of r is acquired - struct rb_thread_sched *sched = TH_SCHED(th); - VM_ASSERT(th->ractor_waiting.wakeup_status != 0); - thread_sched_lock(sched, th); { - if (th->status == THREAD_STOPPED_FOREVER) { - thread_sched_to_ready_common(sched, th, true, false); } } - thread_sched_unlock(sched, th); } static bool @@ -1378,6 +1396,7 @@ ractor_sched_barrier_completed_p(rb_vm_t *vm) { RUBY_DEBUG_LOG("run:%u wait:%u", vm->ractor.sched.running_cnt, vm->ractor.sched.barrier_waiting_cnt); VM_ASSERT(vm->ractor.sched.running_cnt - 1 >= vm->ractor.sched.barrier_waiting_cnt); return (vm->ractor.sched.running_cnt - vm->ractor.sched.barrier_waiting_cnt) == 1; } @@ -1388,6 +1407,8 @@ rb_ractor_sched_barrier_start(rb_vm_t *vm, rb_ractor_t *cr) VM_ASSERT(vm->ractor.sync.lock_owner == cr); // VM is locked VM_ASSERT(!vm->ractor.sched.barrier_waiting); VM_ASSERT(vm->ractor.sched.barrier_waiting_cnt == 0); RUBY_DEBUG_LOG("start serial:%u", vm->ractor.sched.barrier_serial); @@ -1396,46 +1417,60 @@ rb_ractor_sched_barrier_start(rb_vm_t *vm, rb_ractor_t *cr) ractor_sched_lock(vm, cr); { vm->ractor.sched.barrier_waiting = true; // release VM lock lock_rec = vm->ractor.sync.lock_rec; vm->ractor.sync.lock_rec = 0; vm->ractor.sync.lock_owner = NULL; rb_native_mutex_unlock(&vm->ractor.sync.lock); - { - // interrupts all running threads - rb_thread_t *ith; - ccan_list_for_each(&vm->ractor.sched.running_threads, ith, sched.node.running_threads) { - if (ith->ractor != cr) { - RUBY_DEBUG_LOG("barrier int:%u", rb_th_serial(ith)); - RUBY_VM_SET_VM_BARRIER_INTERRUPT(ith->ec); - } - } - // wait for other ractors - while (!ractor_sched_barrier_completed_p(vm)) { - ractor_sched_set_unlocked(vm, cr); - rb_native_cond_wait(&vm->ractor.sched.barrier_complete_cond, &vm->ractor.sched.lock); - ractor_sched_set_locked(vm, cr); } } - } - ractor_sched_unlock(vm, cr); - // acquire VM lock - rb_native_mutex_lock(&vm->ractor.sync.lock); - vm->ractor.sync.lock_rec = lock_rec; - vm->ractor.sync.lock_owner = cr; - RUBY_DEBUG_LOG("completed seirial:%u", vm->ractor.sched.barrier_serial); - ractor_sched_lock(vm, cr); - { - vm->ractor.sched.barrier_waiting = false; vm->ractor.sched.barrier_serial++; vm->ractor.sched.barrier_waiting_cnt = 0; rb_native_cond_broadcast(&vm->ractor.sched.barrier_release_cond); } ractor_sched_unlock(vm, cr); } @@ -164,4 +164,8 @@ native_tls_set(native_tls_key_t key, void *ptr) RUBY_EXTERN native_tls_key_t ruby_current_ec_key; #endif #endif /* RUBY_THREAD_PTHREAD_H */ @@ -72,7 +72,7 @@ thread_sched_wait_events(struct rb_thread_sched *sched, rb_thread_t *th, int fd, RUBY_DEBUG_LOG("wait fd:%d", fd); RB_VM_SAVE_MACHINE_CONTEXT(th); - setup_ubf(th, ubf_event_waiting, (void *)th); RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th); @@ -102,7 +102,7 @@ thread_sched_wait_events(struct rb_thread_sched *sched, rb_thread_t *th, int fd, timer_thread_cancel_waiting(th); } - setup_ubf(th, NULL, NULL); // TODO: maybe it is already NULL? th->status = THREAD_RUNNABLE; } @@ -450,7 +450,7 @@ co_start(struct coroutine_context *from, struct coroutine_context *self) // RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); - thread_sched_set_lock_owner(sched, th); thread_sched_add_running_thread(TH_SCHED(th), th); thread_sched_unlock(sched, th); { @@ -475,13 +475,11 @@ co_start(struct coroutine_context *from, struct coroutine_context *self) coroutine_transfer0(self, nt->nt_context, true); } else { - rb_vm_t *vm = th->vm; - bool has_ready_ractor = vm->ractor.sched.grq_cnt > 0; // at least this ractor is not queued rb_thread_t *next_th = sched->running; - if (!has_ready_ractor && next_th && !next_th->nt) { // switch to the next thread - thread_sched_set_lock_owner(sched, NULL); th->sched.finished = true; thread_sched_switch0(th->sched.context, next_th, nt, true); } @@ -922,6 +922,7 @@ vm_barrier_finish_p(rb_vm_t *vm) vm->ractor.blocking_cnt); VM_ASSERT(vm->ractor.blocking_cnt <= vm->ractor.cnt); return vm->ractor.blocking_cnt == vm->ractor.cnt; } @@ -947,7 +948,7 @@ rb_ractor_sched_barrier_start(rb_vm_t *vm, rb_ractor_t *cr) // wait while (!vm_barrier_finish_p(vm)) { - rb_vm_cond_wait(vm, &vm->ractor.sync.barrier_cond); } RUBY_DEBUG_LOG("cnt:%u barrier success", vm->ractor.sync.barrier_cnt); @@ -957,9 +958,7 @@ rb_ractor_sched_barrier_start(rb_vm_t *vm, rb_ractor_t *cr) vm->ractor.sync.barrier_waiting = false; vm->ractor.sync.barrier_cnt++; - ccan_list_for_each(&vm->ractor.set, r, vmlr_node) { - rb_native_cond_signal(&r->barrier_wait_cond); - } } void @@ -983,7 +982,7 @@ rb_ractor_sched_barrier_join(rb_vm_t *vm, rb_ractor_t *cr) if (vm_barrier_finish_p(vm)) { RUBY_DEBUG_LOG("wakeup barrier owner"); - rb_native_cond_signal(&vm->ractor.sync.barrier_cond); } else { RUBY_DEBUG_LOG("wait for barrier finish"); @@ -991,10 +990,7 @@ rb_ractor_sched_barrier_join(rb_vm_t *vm, rb_ractor_t *cr) // wait for restart while (barrier_cnt == vm->ractor.sync.barrier_cnt) { - vm->ractor.sync.lock_owner = NULL; - rb_native_cond_wait(&cr->barrier_wait_cond, &vm->ractor.sync.lock); - VM_ASSERT(vm->ractor.sync.lock_owner == NULL); - vm->ractor.sync.lock_owner = cr; } RUBY_DEBUG_LOG("barrier is released. Acquire vm_lock"); @@ -3557,7 +3557,6 @@ thread_mark(void *ptr) rb_gc_mark(th->last_status); rb_gc_mark(th->locking_mutex); rb_gc_mark(th->name); - rb_gc_mark(th->ractor_waiting.receiving_mutex); rb_gc_mark(th->scheduler); @@ -3719,10 +3718,6 @@ th_init(rb_thread_t *th, VALUE self, rb_vm_t *vm) th->ext_config.ractor_safe = true; ccan_list_head_init(&th->interrupt_exec_tasks); - ccan_list_node_init(&th->ractor_waiting.waiting_node); -#ifndef RUBY_THREAD_PTHREAD_H - rb_native_cond_initialize(&th->ractor_waiting.cond); -#endif #if USE_RUBY_DEBUG_LOG static rb_atomic_t thread_serial = 1; @@ -4381,7 +4376,8 @@ Init_BareVM(void) vm_opt_mid_table = st_init_numtable(); #ifdef RUBY_THREAD_WIN32_H - rb_native_cond_initialize(&vm->ractor.sync.barrier_cond); #endif } @@ -683,12 +683,15 @@ typedef struct rb_vm_struct { bool terminate_waiting; #ifndef RUBY_THREAD_PTHREAD_H bool barrier_waiting; unsigned int barrier_cnt; - rb_nativethread_cond_t barrier_cond; #endif } sync; // ractor scheduling struct { rb_nativethread_lock_t lock; @@ -722,7 +725,10 @@ typedef struct rb_vm_struct { bool barrier_waiting; unsigned int barrier_waiting_cnt; unsigned int barrier_serial; } sched; } ractor; #ifdef USE_SIGALTSTACK @@ -1105,18 +1111,6 @@ typedef struct rb_ractor_struct rb_ractor_t; struct rb_native_thread; -struct rb_thread_ractor_waiting { - //enum rb_ractor_wait_status wait_status; - int wait_status; - //enum rb_ractor_wakeup_status wakeup_status; - int wakeup_status; - struct ccan_list_node waiting_node; // the rb_thread_t - VALUE receiving_mutex; // protects Ractor.receive_if -#ifndef RUBY_THREAD_PTHREAD_H - rb_nativethread_cond_t cond; -#endif -}; - typedef struct rb_thread_struct { struct ccan_list_node lt_node; // managed by a ractor (r->threads.set) VALUE self; @@ -1129,8 +1123,6 @@ typedef struct rb_thread_struct { bool mn_schedulable; rb_atomic_t serial; // only for RUBY_DEBUG_LOG() - struct rb_thread_ractor_waiting ractor_waiting; - VALUE last_status; /* $? */ /* for cfunc */ @@ -1903,7 +1895,9 @@ rb_vm_living_threads_init(rb_vm_t *vm) { ccan_list_head_init(&vm->workqueue); ccan_list_head_init(&vm->ractor.set); ccan_list_head_init(&vm->ractor.sched.zombie_threads); } typedef int rb_backtrace_iter_func(void *, VALUE, int, VALUE); @@ -7,6 +7,7 @@ void rb_ractor_sched_barrier_start(rb_vm_t *vm, rb_ractor_t *cr); void rb_ractor_sched_barrier_join(rb_vm_t *vm, rb_ractor_t *cr); static bool vm_locked(rb_vm_t *vm) @@ -103,15 +104,26 @@ vm_lock_enter(rb_ractor_t *cr, rb_vm_t *vm, bool locked, bool no_barrier, unsign } static void -vm_lock_leave(rb_vm_t *vm, unsigned int *lev APPEND_LOCATION_ARGS) { RUBY_DEBUG_LOG2(file, line, "rec:%u owner:%u%s", vm->ractor.sync.lock_rec, - (unsigned int)rb_ractor_id(vm->ractor.sync.lock_owner), vm->ractor.sync.lock_rec == 1 ? " (leave)" : ""); ASSERT_vm_locking(); VM_ASSERT(vm->ractor.sync.lock_rec > 0); VM_ASSERT(vm->ractor.sync.lock_rec == *lev); vm->ractor.sync.lock_rec--; *lev = vm->ractor.sync.lock_rec; @@ -154,9 +166,15 @@ rb_vm_lock_enter_body_cr(rb_ractor_t *cr, unsigned int *lev APPEND_LOCATION_ARGS } void rb_vm_lock_leave_body(unsigned int *lev APPEND_LOCATION_ARGS) { - vm_lock_leave(GET_VM(), lev APPEND_LOCATION_PARAMS); } void @@ -174,7 +192,7 @@ rb_vm_unlock_body(LOCATION_ARGS) rb_vm_t *vm = GET_VM(); ASSERT_vm_locking(); VM_ASSERT(vm->ractor.sync.lock_rec == 1); - vm_lock_leave(vm, &vm->ractor.sync.lock_rec APPEND_LOCATION_PARAMS); } static void @@ -24,6 +24,7 @@ struct rb_ractor_struct; NOINLINE(void rb_vm_lock_enter_body_cr(struct rb_ractor_struct *cr, unsigned int *lev APPEND_LOCATION_ARGS)); NOINLINE(void rb_vm_lock_enter_body_nb(unsigned int *lev APPEND_LOCATION_ARGS)); NOINLINE(void rb_vm_lock_enter_body(unsigned int *lev APPEND_LOCATION_ARGS)); void rb_vm_lock_leave_body(unsigned int *lev APPEND_LOCATION_ARGS); void rb_vm_barrier(void); @@ -87,6 +88,14 @@ rb_vm_lock_enter_nb(unsigned int *lev, const char *file, int line) } static inline void rb_vm_lock_leave(unsigned int *lev, const char *file, int line) { if (rb_multi_ractor_p()) { @@ -124,11 +133,12 @@ rb_vm_lock_leave_cr(struct rb_ractor_struct *cr, unsigned int *levp, const char vm_locking_do; RB_VM_LOCK_LEAVE_LEV(&vm_locking_level), vm_locking_do = 0) #define RB_VM_LOCK_ENTER_LEV_NB(levp) rb_vm_lock_enter_nb(levp, __FILE__, __LINE__) #define RB_VM_LOCK_ENTER_NO_BARRIER() { unsigned int _lev; RB_VM_LOCK_ENTER_LEV_NB(&_lev); -#define RB_VM_LOCK_LEAVE_NO_BARRIER() RB_VM_LOCK_LEAVE_LEV(&_lev); } #define RB_VM_LOCKING_NO_BARRIER() \ for (unsigned int vm_locking_level, vm_locking_do = (RB_VM_LOCK_ENTER_LEV_NB(&vm_locking_level), 1); \ - vm_locking_do; RB_VM_LOCK_LEAVE_LEV(&vm_locking_level), vm_locking_do = 0) #if RUBY_DEBUG > 0 void RUBY_ASSERT_vm_locking(void); |