はじめに
こんにちは、じゃがいもの皮はもっぱらキレイにむいて食べるエンジニアの村田です。 前回のエントリ iQONのバックエンドの非同期処理について ではざっくりとした方針とかを書きましたが今回は具体的な実装方法や運用方法などについて紹介したいと思います。
使用技術
iQONではResqueという仕組みを採用して、メール送信やDBの重たい更新処理などを非同期処理しています。 ResqueはRedisにキューを出し入れして遅延処理を実現する仕組みです。シンプルだし導入しやすいと思い採用しました。 このResqueの仕組みをdaemon-spawnというgemでデーモン化して運用しています。 イメージにするとこんな感じです。
導入方法
1.Gemfileに記述
gem 'resque' gem 'daemon-spawn', :require => 'daemon_spawn'
- initializersにRailsの環境ごとのredisサーバのホスト設定を記述
config/initializers/load_redis_config.rb
require 'resque' if Rails.env.to_s == "production" Resque.redis = 'redis.production.iqon.jp:6379' elsif Rails.env.to_s == "test" Resque.redis = 'redis.test.iqon.jp:6379' else Resque.redis = 'redis.dev.iqon.jp:6379' end
3.デーモンの起動処理を記述
script/resque_worker
#!/usr/bin/env ruby require File.expand_path('../../config/application', __FILE__) Rails.application.require_environment! class ResqueWorkerDaemon < DaemonSpawn::Base def start(args) @worker = Resque::Worker.new('default') @worker.verbose = true @worker.work end def stop end end ResqueWorkerDaemon.spawn!({ :working_dir => Rails.root, :pid_file => File.join(Rails.root, 'tmp', 'pids', 'resque_worker.pid'), :log_file => File.join(Rails.root, 'log', 'resque_worker.log'), :sync_log => true, :singleton => true, :signal => 'QUIT' } )
このスクリプトにstart/stop/restartなど渡して実行することでデーモンを操作します。 例えばこんな感じです。
起動 script/resque_worker start
停止 script/resque_worker stop
- キューをredisにエンキューする処理を実装する
エンキューする処理はcontrollerでもmodelの中でも好きなタイミングで問題ないと思います。 大切なのはどのworker(あとでキューを処理する処理、つまり非同期で動く処理)にどんなパラメータを投げるかを指定するところです。
#TestResqueWorkerにparamsを渡す Resque.enqueue(TestResqueWorker, params)
- キューを処理するワーカーを実装する
app/worker/test_resque_worker.rb
class TestResqueWorker @queue = :default def self.perform(params) #DBの更新処理やメール送信などの重たい処理を実装する end end
具体例
ここまでは導入する最低限の概要でしたが、実際のiQONでどのようにしているかを一部紹介したいと思います。 例として、コーディネートモデルからキューを発行して、コーディネートに使われている各アイテムのブランドのコーディネート数を更新していく処理を見てみます。
- キューの発行
コントローラからキューを発行してみます
app/controller/coordinate.rb
class CoordinateController < ApplicationController def update_brand_coordinate_count #params[:brands]は123,235,12345のようにカンマ区切りの文字列が入っている Resque.enqueue(UpdateBrandCoordinateCountWorker, {:brands => params[:brands]}) end end
- キューを処理する
app/worker/update_brand_coordinate_count_worker.rb
class UpdateBrandCoordinateCountWorker @queue = :default def self.perform(params) brand_id_list = params["brands"].split(",") brand_id_list.each do | brand_id | #brandテーブルのコーディネート数を更新していく if Brand.where(:brand_id => brand_id, :delete_flag => 0).exists? Brand.increment_counter(:coordinate_count, brand_id) end end end end
このような形でResque.enqueueで非同期で処理させたい内容(どのworkerクラスに処理させるかを第一引数に、パラメータを第二引数に設定)をキューとしてRedisにエンキューします。 それを受けてUpdateBrandCoordinateCountWorkerがパラメータを受け取り実際の処理をしていくという流れになります。 Redisサーバを用意して、Railsアプリケーションにこれだけ実装すれば非同期処理を実現できるのは手軽で便利ですね。
気をつけること
ここまでで導入と具体例を紹介させて頂きましたが、実際に開発、運用をしていく上で困ったことと対応方法を紹介したいと思います。
- workerが受け取るハッシュのキーはシンボルではなく文字列になる
サンプルコードでも書いてありますが、キューを発行するときにシンボルをキーにしたハッシュを渡していいますが、キューを処理するworkerの中では文字列をキーにしたハッシュとしてパラメータにアクセスしています。 これはResqueがキューを入れるときに内部的にto_jsonをしているからのようです。 開発を始めたときは結構ハマったのでこれから導入を考えている方は気をつけたほうがいいと思います。
- workerがキューを処理中にdaemonをリスタートした時に中断されてしまう問題
この問題も最初はハマりましたが最初に紹介させて頂いたResqueWorkerDaemon.spawnで :signal => 'QUIT'
という指定することで対応できます。 これはworkerが現在処理している内容をすべて処理し終わってからworkerプロセスを終了するというオプションになります。
- 特定のキューだけ優先的に処理したい場合
優先度の低い重たいキューがたまりすぎると優先度の高い軽いキューが詰まってしまう問題ですね。
class FirstPriorityWorker @queue = :first_priority #省略 end class SecondPriorityWorker @queue = :second_priority #省略 end class ResqueWorkerDaemon < DaemonSpawn::Base @worker = Resque::Worker.new(:first_priority, :second_priority) #省略 end
このように設定するとfirst_priorityのキューを優先的に処理することが可能です。 キューに対して優先度を設定したい場合などはこのように設定すればできると思います。
最後に
Resqueはかなり簡単に非同期処理を実現できるので素晴らしいのですが、なんでもかんでも非同期処理にしてしまうと問題になることもあるのでそこは見極めが必要です。 あとは前述した注意点を守れば導入はスムーズに運用していくことができるかと思います。