diff options
-rw-r--r-- | ractor.rb | 751 |
1 files changed, 295 insertions, 456 deletions
@@ -4,7 +4,7 @@ # # # The simplest ractor # r = Ractor.new {puts "I am in Ractor!"} -# r.take # wait for it to finish # # Here, "I am in Ractor!" is printed # # Ractors do not share all objects with each other. There are two main benefits to this: across ractors, thread-safety @@ -36,53 +36,11 @@ # puts "I am in Ractor! a=#{a_in_ractor}" # end # r.send(a) # pass it -# r.take # # Here, "I am in Ractor! a=1" is printed # -# There are two pairs of methods for sending/receiving messages: -# -# * Ractor#send and Ractor.receive for when the _sender_ knows the receiver (push); -# * Ractor.yield and Ractor#take for when the _receiver_ knows the sender (pull); -# # In addition to that, any arguments passed to Ractor.new are passed to the block and available there -# as if received by Ractor.receive, and the last block value is sent outside of the -# ractor as if sent by Ractor.yield. -# -# A little demonstration of a classic ping-pong: -# -# server = Ractor.new(name: "server") do -# puts "Server starts: #{self.inspect}" -# puts "Server sends: ping" -# Ractor.yield 'ping' # The server doesn't know the receiver and sends to whoever interested -# received = Ractor.receive # The server doesn't know the sender and receives from whoever sent -# puts "Server received: #{received}" -# end -# -# client = Ractor.new(server) do |srv| # The server is sent to the client, and available as srv -# puts "Client starts: #{self.inspect}" -# received = srv.take # The client takes a message from the server -# puts "Client received from " \ -# "#{srv.inspect}: #{received}" -# puts "Client sends to " \ -# "#{srv.inspect}: pong" -# srv.send 'pong' # The client sends a message to the server -# end -# -# [client, server].each(&:take) # Wait until they both finish -# -# This will output something like: -# -# Server starts: #<Ractor:#2 server test.rb:1 running> -# Server sends: ping -# Client starts: #<Ractor:#3 test.rb:8 running> -# Client received from #<Ractor:#2 server test.rb:1 blocking>: ping -# Client sends to #<Ractor:#2 server test.rb:1 blocking>: pong -# Server received: pong -# -# Ractors receive their messages via the <em>incoming port</em>, and send them -# to the <em>outgoing port</em>. Either one can be disabled with Ractor#close_incoming and -# Ractor#close_outgoing, respectively. When a ractor terminates, its ports are closed -# automatically. # # == Shareable and unshareable objects # @@ -307,130 +265,52 @@ class Ractor # # call-seq: - # Ractor.select(*ractors, [yield_value:, move: false]) -> [ractor or symbol, obj] - # - # Wait for any ractor to have something in its outgoing port, read from this ractor, and - # then return that ractor and the object received. - # - # r1 = Ractor.new {Ractor.yield 'from 1'} - # r2 = Ractor.new {Ractor.yield 'from 2'} - # - # r, obj = Ractor.select(r1, r2) - # - # puts "received #{obj.inspect} from #{r.inspect}" - # # Prints: received "from 1" from #<Ractor:#2 test.rb:1 running> - # # But could just as well print "from r2" here, either prints could be first. - # - # If one of the given ractors is the current ractor, and it is selected, +r+ will contain - # the +:receive+ symbol instead of the ractor object. - # - # r1 = Ractor.new(Ractor.current) do |main| - # main.send 'to main' - # Ractor.yield 'from 1' - # end - # r2 = Ractor.new do - # Ractor.yield 'from 2' - # end - # - # r, obj = Ractor.select(r1, r2, Ractor.current) - # puts "received #{obj.inspect} from #{r.inspect}" - # # Could print: received "to main" from :receive - # - # If +yield_value+ is provided, that value may be yielded if another ractor is calling #take. - # In this case, the pair <tt>[:yield, nil]</tt> is returned: - # - # r1 = Ractor.new(Ractor.current) do |main| - # puts "Received from main: #{main.take}" - # end - # - # puts "Trying to select" - # r, obj = Ractor.select(r1, Ractor.current, yield_value: 123) - # wait - # puts "Received #{obj.inspect} from #{r.inspect}" - # - # This will print: - # - # Trying to select - # Received from main: 123 - # Received nil from :yield - # - # +move+ boolean flag defines whether yielded value will be copied (default) or moved. - def self.select(*ractors, yield_value: yield_unspecified = true, move: false) - raise ArgumentError, 'specify at least one ractor or `yield_value`' if yield_unspecified && ractors.empty? - - if ractors.delete Ractor.current - do_receive = true - else - do_receive = false end - __builtin_ractor_select_internal ractors, do_receive, !yield_unspecified, yield_value, move end # # call-seq: - # Ractor.receive -> msg - # - # Receive a message from the incoming port of the current ractor (which was - # sent there by #send from another ractor). - # - # r = Ractor.new do - # v1 = Ractor.receive - # puts "Received: #{v1}" - # end - # r.send('message1') - # r.take - # # Here will be printed: "Received: message1" - # - # Alternatively, the private instance method +receive+ may be used: - # - # r = Ractor.new do - # v1 = receive - # puts "Received: #{v1}" - # end - # r.send('message1') - # r.take - # # This prints: "Received: message1" - # - # The method blocks if the queue is empty. - # - # r = Ractor.new do - # puts "Before first receive" - # v1 = Ractor.receive - # puts "Received: #{v1}" - # v2 = Ractor.receive - # puts "Received: #{v2}" - # end - # wait - # puts "Still not received" - # r.send('message1') - # wait - # puts "Still received only one" - # r.send('message2') - # r.take - # - # Output: - # - # Before first receive - # Still not received - # Received: message1 - # Still received only one - # Received: message2 - # - # If close_incoming was called on the ractor, the method raises Ractor::ClosedError - # if there are no more messages in the incoming queue: - # - # Ractor.new do - # close_incoming - # receive - # end - # wait - # # in `receive': The incoming port is already closed => #<Ractor:#2 test.rb:1 running> (Ractor::ClosedError) # def self.receive - __builtin_cexpr! %q{ - ractor_receive(ec, rb_ec_ractor_ptr(ec)) - } end class << self @@ -439,280 +319,21 @@ class Ractor # same as Ractor.receive private def receive - __builtin_cexpr! %q{ - ractor_receive(ec, rb_ec_ractor_ptr(ec)) - } end alias recv receive # # call-seq: - # Ractor.receive_if {|msg| block } -> msg - # - # Receive only a specific message. - # - # Instead of Ractor.receive, Ractor.receive_if can be given a pattern (or any - # filter) in a block and you can choose the messages to accept that are available in - # your ractor's incoming queue. - # - # r = Ractor.new do - # p Ractor.receive_if{|msg| msg.match?(/foo/)} #=> "foo3" - # p Ractor.receive_if{|msg| msg.match?(/bar/)} #=> "bar1" - # p Ractor.receive_if{|msg| msg.match?(/baz/)} #=> "baz2" - # end - # r << "bar1" - # r << "baz2" - # r << "foo3" - # r.take - # - # This will output: - # - # foo3 - # bar1 - # baz2 - # - # If the block returns a truthy value, the message is removed from the incoming queue - # and returned. - # Otherwise, the message remains in the incoming queue and the next messages are checked - # by the given block. - # - # If there are no messages left in the incoming queue, the method will - # block until new messages arrive. - # - # If the block is escaped by break/return/exception/throw, the message is removed from - # the incoming queue as if a truthy value had been returned. - # - # r = Ractor.new do - # val = Ractor.receive_if{|msg| msg.is_a?(Array)} - # puts "Received successfully: #{val}" - # end - # - # r.send(1) - # r.send('test') - # wait - # puts "2 non-matching sent, nothing received" - # r.send([1, 2, 3]) - # wait - # - # Prints: - # - # 2 non-matching sent, nothing received - # Received successfully: [1, 2, 3] - # - # Note that you can not call receive/receive_if in the given block recursively. - # You should not do any tasks in the block other than message filtration. - # - # Ractor.current << true - # Ractor.receive_if{|msg| Ractor.receive} - # #=> `receive': can not call receive/receive_if recursively (Ractor::Error) - # - def self.receive_if &b - Primitive.ractor_receive_if b - end - - # same as Ractor.receive_if - private def receive_if &b - Primitive.ractor_receive_if b - end - - # - # call-seq: - # ractor.send(msg, move: false) -> self - # - # Send a message to a Ractor's incoming queue to be accepted by Ractor.receive. - # - # r = Ractor.new do - # value = Ractor.receive - # puts "Received #{value}" - # end - # r.send 'message' - # # Prints: "Received: message" - # - # The method is non-blocking (will return immediately even if the ractor is not ready - # to receive anything): - # - # r = Ractor.new {sleep(5)} - # r.send('test') - # puts "Sent successfully" - # # Prints: "Sent successfully" immediately - # - # An attempt to send to a ractor which already finished its execution will raise Ractor::ClosedError. - # - # r = Ractor.new {} - # r.take - # p r - # # "#<Ractor:#6 (irb):23 terminated>" - # r.send('test') - # # Ractor::ClosedError (The incoming-port is already closed) - # - # If close_incoming was called on the ractor, the method also raises Ractor::ClosedError. - # - # r = Ractor.new do - # sleep(500) - # receive - # end - # r.close_incoming - # r.send('test') - # # Ractor::ClosedError (The incoming-port is already closed) - # # The error is raised immediately, not when the ractor tries to receive - # - # If the +obj+ is unshareable, by default it will be copied into the receiving ractor by deep cloning. - # If <tt>move: true</tt> is passed, the object is _moved_ into the receiving ractor and becomes - # inaccessible to the sender. - # - # r = Ractor.new {puts "Received: #{receive}"} - # msg = 'message' - # r.send(msg, move: true) - # r.take - # p msg - # - # This prints: - # - # Received: message - # in `p': undefined method `inspect' for #<Ractor::MovedObject:0x000055c99b9b69b8> - # - # All references to the object and its parts will become invalid to the sender. - # - # r = Ractor.new {puts "Received: #{receive}"} - # s = 'message' - # ary = [s] - # copy = ary.dup - # r.send(ary, move: true) - # - # s.inspect - # # Ractor::MovedError (can not send any methods to a moved object) - # ary.class - # # Ractor::MovedError (can not send any methods to a moved object) - # copy.class - # # => Array, it is different object - # copy[0].inspect - # # Ractor::MovedError (can not send any methods to a moved object) - # # ...but its item was still a reference to `s`, which was moved - # - # If the object is shareable, <tt>move: true</tt> has no effect on it: - # - # r = Ractor.new {puts "Received: #{receive}"} - # s = 'message'.freeze - # r.send(s, move: true) - # s.inspect #=> "message", still available # - def send(obj, move: false) - __builtin_cexpr! %q{ - ractor_send(ec, RACTOR_PTR(self), obj, move) - } end alias << send - # - # call-seq: - # Ractor.yield(msg, move: false) -> nil - # - # Send a message to the current ractor's outgoing port to be accepted by #take. - # - # r = Ractor.new {Ractor.yield 'Hello from ractor'} - # puts r.take - # # Prints: "Hello from ractor" - # - # This method is blocking, and will return only when somebody consumes the - # sent message. - # - # r = Ractor.new do - # Ractor.yield 'Hello from ractor' - # puts "Ractor: after yield" - # end - # wait - # puts "Still not taken" - # puts r.take - # - # This will print: - # - # Still not taken - # Hello from ractor - # Ractor: after yield - # - # If the outgoing port was closed with #close_outgoing, the method will raise: - # - # r = Ractor.new do - # close_outgoing - # Ractor.yield 'Hello from ractor' - # end - # wait - # # `yield': The outgoing-port is already closed (Ractor::ClosedError) - # - # The meaning of the +move+ argument is the same as for #send. - def self.yield(obj, move: false) - __builtin_cexpr! %q{ - ractor_yield(ec, rb_ec_ractor_ptr(ec), obj, move) - } - end - - # - # call-seq: - # ractor.take -> msg - # - # Get a message from the ractor's outgoing port, which was put there by Ractor.yield or at ractor's - # termination. - # - # r = Ractor.new do - # Ractor.yield 'explicit yield' - # 'last value' - # end - # puts r.take #=> 'explicit yield' - # puts r.take #=> 'last value' - # puts r.take # Ractor::ClosedError (The outgoing-port is already closed) - # - # The fact that the last value is also sent to the outgoing port means that +take+ can be used - # as an analog of Thread#join ("just wait until ractor finishes"). However, it will raise if - # somebody has already consumed that message. - # - # If the outgoing port was closed with #close_outgoing, the method will raise Ractor::ClosedError. - # - # r = Ractor.new do - # sleep(500) - # Ractor.yield 'Hello from ractor' - # end - # r.close_outgoing - # r.take - # # Ractor::ClosedError (The outgoing-port is already closed) - # # The error would be raised immediately, not when ractor will try to receive - # - # If an uncaught exception is raised in the Ractor, it is propagated by take as a - # Ractor::RemoteError. - # - # r = Ractor.new {raise "Something weird happened"} - # - # begin - # r.take - # rescue => e - # p e # => #<Ractor::RemoteError: thrown by remote Ractor.> - # p e.ractor == r # => true - # p e.cause # => #<RuntimeError: Something weird happened> - # end - # - # Ractor::ClosedError is a descendant of StopIteration, so the termination of the ractor will break - # out of any loops that receive this message without propagating the error: - # - # r = Ractor.new do - # 3.times {|i| Ractor.yield "message #{i}"} - # "finishing" - # end - # - # loop {puts "Received: " + r.take} - # puts "Continue successfully" - # - # This will print: - # - # Received: message 0 - # Received: message 1 - # Received: message 2 - # Received: finishing - # Continue successfully - def take - __builtin_cexpr! %q{ - ractor_take(ec, RACTOR_PTR(self)) - } - end - def inspect loc = __builtin_cexpr! %q{ RACTOR_PTR(self)->loc } name = __builtin_cexpr! %q{ RACTOR_PTR(self)->name } @@ -737,38 +358,13 @@ class Ractor # # call-seq: - # ractor.close_incoming -> true | false # - # Closes the incoming port and returns whether it was already closed. All further attempts - # to Ractor.receive in the ractor, and #send to the ractor will fail with Ractor::ClosedError. # - # r = Ractor.new {sleep(500)} - # r.close_incoming #=> false - # r.close_incoming #=> true - # r.send('test') - # # Ractor::ClosedError (The incoming-port is already closed) - def close_incoming - __builtin_cexpr! %q{ - ractor_close_incoming(ec, RACTOR_PTR(self)); - } - end - - # - # call-seq: - # ractor.close_outgoing -> true | false - # - # Closes the outgoing port and returns whether it was already closed. All further attempts - # to Ractor.yield in the ractor, and #take from the ractor will fail with Ractor::ClosedError. - # - # r = Ractor.new {sleep(500)} - # r.close_outgoing #=> false - # r.close_outgoing #=> true - # r.take - # # Ractor::ClosedError (The outgoing-port is already closed) - def close_outgoing - __builtin_cexpr! %q{ - ractor_close_outgoing(ec, RACTOR_PTR(self)); - } end # @@ -922,4 +518,247 @@ class Ractor } end end end |