RindaのnotifyでマルチスレッドなTupleSpace
前回に続いてRinda。RindaのTupleSpaceを使ってデータをやりとりできるようになったので、今度はほかのプロセスからデータが更新されるようなケースで、その更新を反映させながら動作するようなデーモンっぽいものを作ってみます。
ほかのプロセスなどによる、TupleSpaceへの更新を監視、反映させるスレッドを作ろう、という記事です。
概要
Squidのurl_rewrite_programとして動作するフィルタ、つまりIPアドレスに対するマッチング処理プログラムです。
標準入力からデータを読み込んで、許可IPアドレスリストにあったら要求されたURL文字列を返す、なかったら認証ページのURL文字列を返すような仕組みです。
マッチングのたびにTupleSpaceを見に行くとオーバヘッドで死ぬので、IPアドレスリストはローカルで保持することにします。ただし、ほかのプロセスがIPアドレスリストを更新した場合には、それが直ちに反映されるような仕組みにします。
TupleSpaceの更新通知
Rindaでは、TupleSpaceが更新された場合にそのイベントをフックすることができます。
TupleSpace.notify() で更新通知オブジェクトを取得できます。引数は、通知されたいイベントの種類(write/take/delete/nil=全て)と、そのイベントで対象にしたいタプルの情報です。
あとは、その通知を受け付けるスレッドを生成して、Notify.pop()でイベントを取り出して処理していくだけです。このスレッドが実行されている間、本来行いたいマッチング処理のスレッドが実行されているので、発生したイベントをもとにしてHashを更新する際にMutexによるロックを行っています。もちろん、マッチング処理を行うスレッドでも、IPアドレスが存在するか問い合わせる際にMutexを使っています。
更新通知イベントハンドラスレッド周辺:
# Rinda Notifier notifier = ts.notify(nil, [TS_tuplename,nil]) # Notifier's Mutex notify_mutex = Mutex::new # fire notifier thread notify_thread = Thread.new do while(true) # wait for new event (event, tuple) = notifier.pop tgt_ipaddr = tuple[1] case event when "write" # add new ipaddr notify_mutex.synchronize do authenticated[tgt_ipaddr] = 1 end when "take", "delete" # remove ipaddr notify_mutex.synchronize do authenticated.delete(tgt_ipaddr) end end end end # of notifier thread
マッチング処理の一部:
# check ip address is_authenticated = false notify_mutex.synchronize do is_authenticated = authenticated.has_key?(client_ipaddr) end if is_authenticated # マッチしたときの処理...
これだけで、マッチング処理は何も気にせずHashに問い合わせるループとして記述できます。
ほかのプロセスやほかのサーバ上のプログラムなどからIPアドレスを追加、削除しても、リアルタイムにマッチング処理に反映されます。とっても便利。
プログラム全文
#!/usr/bin/ruby -Ku require 'drb' require 'rinda/tuplespace' require 'thread' require 'cgi' require 'pp' TS_tuplename = "authenticated_ipaddr" REDIR_SP = "host.to/redir/auth_redir.rb" REDIR_PARAM_NAME = "url" drb_host = "127.0.0.1" drb_port = "6456" # hash table of authenticated ipaddrs. key=ipaddr, value=ignored authenticated = Hash.new() # Rinda TupleSpace DRb.start_service ts = DRbObject.new_with_uri("druby://#{drb_host}:#{drb_port}") # read current authenticated ipaddr list ts.read_all([TS_tuplename,nil]).each do |tuple| authenticated[tuple[1]] = 1 end # Rinda Notifier notifier = ts.notify(nil, [TS_tuplename,nil]) # Notifier's Mutex notify_mutex = Mutex::new # fire notifier thread notify_thread = Thread.new do while(true) # wait for new event (event, tuple) = notifier.pop tgt_ipaddr = tuple[1] case event when "write" # add new ipaddr notify_mutex.synchronize do authenticated[tgt_ipaddr] = 1 end when "take", "delete" # remove ipaddr notify_mutex.synchronize do authenticated.delete(tgt_ipaddr) end end end end # of notifier thread # STDIN loop while( (STDIN.closed? == false) ) do break if STDIN.eof? line=STDIN.gets # parse (url,client,user,method,urlgroup) = line.split(" ") if client != nil (client_ipaddr, client_host) = client.split("/") else client_ipaddr = "0.0.0.0" client_host = "-" end # check ip address is_authenticated = false notify_mutex.synchronize do is_authenticated = authenticated.has_key?(client_ipaddr) end if is_authenticated # access is allowed! puts url STDOUT.flush next end # authentication is required! if (method == "CONNECT") redir_to = "https://#{REDIR_SP}" else redir_to = "http://#{REDIR_SP}?#{REDIR_PARAM_NAME}=#{CGI.escape(url)}" end puts redir_to STDOUT.flush end # of while read from stdin