しょんぼり技術メモ

まいにちがしょんぼり

MessagePack-RPCのFutureについての簡単な実験

MessagePack-RPC for Rubyの非同期RPCで使われるFutureについての簡単な実験。実験環境はRuby 1.9.2。


結論を先に:
Future.getを複数回呼ぶとどうなるの? → 別に問題ない
Future.getはスレッドセーフ? → いいえ。ロックしないと死ぬ。
Future.getを複数回呼ぶとき、毎回サーバを見に行く? → いいえ。一度getしたらサーバが落ちてても問題ない。

実験コード

サーバ側

128MB分の"a"を返すサーバ。

#!/bin/env ruby
require 'msgpack/rpc'

class Server
  def get
    return "a" * (1024*1024*128)
  end
end

svr = MessagePack::RPC::Server.new
svr.listen("0.0.0.0", 18800, Server.new)
svr.run
クライアント側
#!/bin/env ruby
require 'msgpack/rpc'
require 'date'
require 'thread'

def bench_time
  before = Time.now
  ret = yield
  printf("  %8.6f sec\n", Time.now - before)
  return ret
end

client = MessagePack::RPC::Client.new("127.0.0.1", 18800)

puts "*** Synchronous RPC"
bench_time{ client.call(:get) }
puts


puts "*** Asynchronous RPC"
future = bench_time{ client.call_async(:get)}

puts "** Future.get"
bench_time{ future.get }

puts "** Future.get (2)"
bench_time{ future.get }

puts "** Future.get in another thread"
t = Thread.new(future) do |f|
  puts "  [thread] Future.get (3)"
  bench_time{ f.get }
end
t.join

puts
puts

puts "*** Asynchronous RPC, Simultaneously get request test"
threads = Array.new(10)
mutex   = Mutex.new
future  = client.call_async(:get)

10.times do |i|
  threads[i] = Thread.new(future){|f|
    mutex.synchronize{
      ret = f.get
      raise "size mismatch" if ret.length != (1024*1024*128)
    }
  }
end

threads.each{|t| t.join}
puts "yes, Lock is required."

puts
puts

puts "*** Asynchronous RPC, call-many test"
puts "*1 call asynchronously."
bench_time{ future = client.call_async(:get) }

puts "*2 call Future.get."
bench_time{ future.get }

puts "*** Please kill the server process."
gets

puts "*3 re-call Future.get in the Thread. is the value cached?"
t = Thread.new(future) do |f|
  puts "  [thread] Future.get (2)"
  bench_time{ f.get }
end
t.join

puts "*** okay, the value is cached!"
実行結果
*** Synchronous RPC
  1.199702 sec

*** Asynchronous RPC
  0.000061 sec
** Future.get
  1.027940 sec
** Future.get (2)
  0.000009 sec
** Future.get in another thread
  [thread] Future.get (3)
  0.000014 sec


*** Asynchronous RPC, Simultaneously get request test
yes, Lock is required.


*** Asynchronous RPC, call-many test
*1 call asynchronously.
  0.000074 sec
*2 call Future.get.
  1.044357 sec
*** Please kill the server process.

*3 re-call Future.get in the Thread. is the value cached?
  [thread] Future.get (2)
  0.000011 sec
*** okay, the value is cached!