しょんぼり技術メモ

まいにちがしょんぼり

RindaのnotifyでマルチスレッドなTupleSpace

前回に続いてRinda。RindaのTupleSpaceを使ってデータをやりとりできるようになったので、今度はほかのプロセスからデータが更新されるようなケースで、その更新を反映させながら動作するようなデーモンっぽいものを作ってみます。

ほかのプロセスなどによる、TupleSpaceへの更新を監視、反映させるスレッドを作ろう、という記事です。

概要

Squidのurl_rewrite_programとして動作するフィルタ、つまりIPアドレスに対するマッチング処理プログラムです。
標準入力からデータを読み込んで、許可IPアドレスリストにあったら要求されたURL文字列を返す、なかったら認証ページのURL文字列を返すような仕組みです。

マッチングのたびにTupleSpaceを見に行くとオーバヘッドで死ぬので、IPアドレスリストはローカルで保持することにします。ただし、ほかのプロセスがIPアドレスリストを更新した場合には、それが直ちに反映されるような仕組みにします。

TupleSpaceの更新通知

Rindaでは、TupleSpaceが更新された場合にそのイベントをフックすることができます。

TupleSpace.notify() で更新通知オブジェクトを取得できます。引数は、通知されたいイベントの種類(write/take/delete/nil=全て)と、そのイベントで対象にしたいタプルの情報です。

あとは、その通知を受け付けるスレッドを生成して、Notify.pop()でイベントを取り出して処理していくだけです。このスレッドが実行されている間、本来行いたいマッチング処理のスレッドが実行されているので、発生したイベントをもとにしてHashを更新する際にMutexによるロックを行っています。もちろん、マッチング処理を行うスレッドでも、IPアドレスが存在するか問い合わせる際にMutexを使っています。

更新通知イベントハンドラスレッド周辺:

# Rinda Notifier
notifier = ts.notify(nil, [TS_tuplename,nil])

# Notifier's Mutex
notify_mutex = Mutex::new

# fire notifier thread
notify_thread = Thread.new do
  while(true)
    # wait for new event
    (event, tuple) = notifier.pop
    tgt_ipaddr = tuple[1]

    case event
      when "write"
      # add new ipaddr
      notify_mutex.synchronize do
        authenticated[tgt_ipaddr] = 1
      end

      when "take", "delete"
      # remove ipaddr
      notify_mutex.synchronize do
        authenticated.delete(tgt_ipaddr)
      end

    end
  end
end # of notifier thread


マッチング処理の一部:

# check ip address
is_authenticated = false
notify_mutex.synchronize do
  is_authenticated = authenticated.has_key?(client_ipaddr)
end

if is_authenticated
  # マッチしたときの処理...

これだけで、マッチング処理は何も気にせずHashに問い合わせるループとして記述できます。
ほかのプロセスやほかのサーバ上のプログラムなどからIPアドレスを追加、削除しても、リアルタイムにマッチング処理に反映されます。とっても便利。

プログラム全文

#!/usr/bin/ruby -Ku
require 'drb'
require 'rinda/tuplespace'
require 'thread'
require 'cgi'
require 'pp'

TS_tuplename = "authenticated_ipaddr"
REDIR_SP = "host.to/redir/auth_redir.rb"
REDIR_PARAM_NAME = "url"

drb_host = "127.0.0.1"
drb_port = "6456"

# hash table of authenticated ipaddrs. key=ipaddr, value=ignored
authenticated = Hash.new()

# Rinda TupleSpace
DRb.start_service
ts = DRbObject.new_with_uri("druby://#{drb_host}:#{drb_port}")

# read current authenticated ipaddr list
ts.read_all([TS_tuplename,nil]).each do |tuple|
  authenticated[tuple[1]] = 1
end

# Rinda Notifier
notifier = ts.notify(nil, [TS_tuplename,nil])

# Notifier's Mutex
notify_mutex = Mutex::new

# fire notifier thread
notify_thread = Thread.new do
  while(true)
    # wait for new event
    (event, tuple) = notifier.pop
    tgt_ipaddr = tuple[1]

    case event
      when "write"
      # add new ipaddr
      notify_mutex.synchronize do
        authenticated[tgt_ipaddr] = 1
      end

      when "take", "delete"
      # remove ipaddr
      notify_mutex.synchronize do
        authenticated.delete(tgt_ipaddr)
      end

    end
  end
end # of notifier thread


# STDIN loop
while( (STDIN.closed? == false) ) do
  break if STDIN.eof?
  line=STDIN.gets

  # parse
  (url,client,user,method,urlgroup) = line.split(" ")
  if client != nil
    (client_ipaddr, client_host) = client.split("/")
  else
    client_ipaddr = "0.0.0.0"
    client_host = "-"
  end

  # check ip address
  is_authenticated = false
  notify_mutex.synchronize do
    is_authenticated = authenticated.has_key?(client_ipaddr)
  end

  if is_authenticated
    # access is allowed!
    puts url
    STDOUT.flush
    next
  end

  # authentication is required!
  if (method == "CONNECT")
    redir_to = "https://#{REDIR_SP}"
  else
    redir_to = "http://#{REDIR_SP}?#{REDIR_PARAM_NAME}=#{CGI.escape(url)}"
  end
  puts redir_to
  STDOUT.flush
end # of while read from stdin