OSSコードリーディング Batch-loader
Batch-loaderというRubyでバッチローディングをするGemを読んで、そのきれいな実装に感心したのでまとめた。
実装自体はシンプルだけど、特定のGemに依存しているわけではないので使い方に融通が効くし、困った時にトラブルシューティングしやすい。
この記事は社内で一度発表したものだけど、上司に確認したらブログに載せても問題ないとのことだったので載せる。
README、Gem作成者のスライドの2つを読むのが一番という説がある。
Batch-loaderがやりたいこと
そもそもこのGem何がしたいの?という人はこの記事を読むといい。
Rails: N+1クエリを「バッチング」で解決するBatchLoader gem(翻訳)
DBやHTTPリクエストのN + 1問題をbatch load(一塊りでロードする)で解消するというもの。リクエストされた引数をまとめておき、値とマッピングだけコレクションしておき、実際の値の取得は最後に解決しようとする。
def load_user(post) BatchLoader.for(post.user_id).batch do |user_ids, loader| User.where(id: user_ids).each { |user| loader.call(user.id, user) } end end posts = Post.where(id: [1, 2, 3]) # SELECT * FROM posts WHERE id IN (1, 2, 3) users = posts.map do |post| load_user(post) end puts users # SELECT * FROM users WHERE id IN (1, 2, 3)
load_user
の中身がBatch-loaderの基本的な使い方。
この記事では大きく分けて
- 実際に複数回呼び出されるpost.user_idをどうやってbatch(一群)にしているのか
- batchしたもの(今回の例だとuser_id)をどうやって本当のオブジェクトにしているのか
の2つに分けて書いていく。
図でまとめたやつ
最初に図でザックリまとめたやつをのっけておく。
def load_user(post) BatchLoader.for(post.user_id).batch do |user_ids, loader| User.where(id: user_ids).each { |user| loader.call(user.id, user) } end end
実際に複数回呼び出されるpost.user_idをどうやってbatch(一群)にしているのか
最初に例に上げたload_user
メソッド内では
BatchLoader.for(post.user_id).batch(&block)
という形の処理を書いていた。
forはnewのエイリアス。BatchLoaderのインスタンスは引数を@item
として持つ。
batchの中身は以下。
def batch(default_value: nil, cache: true, replace_methods: nil, key: nil, &batch_block) @default_value = default_value @cache = cache @replace_methods = replace_methods.nil? ? cache : replace_methods @key = key @batch_block = batch_block __executor_proxy.add(item: @item) __singleton_class.class_eval { undef_method(:batch) } self end
@key = key
以上の行はオプション処理。渡したブロックは@batch_block
変数に入る。
ここでは大きく分けて2つ処理がある
__executor_proxy.add(item: @item)
__singleton_class.class_eval { undef_method(:batch) }
__executor_proxy.add(item: @item)
__executor_proxy
の中身はBatchLoader::Executor
というクラスのプロキシオブジェクト。
def __executor_proxy @__executor_proxy ||= begin raise NoBatchError.new("Please provide a batch block first") unless @batch_block BatchLoader::ExecutorProxy.new(@default_value, @key, &@batch_block) end end
class BatchLoader class ExecutorProxy def initialize(default_value, key, &block) @default_value = default_value @block = block @block_hash_key = [block.source_location, key] @global_executor = BatchLoader::Executor.ensure_current end
実際のBatchLoader::Executor
の中身。
class BatchLoader class Executor NAMESPACE = :batch_loader def self.ensure_current Thread.current[NAMESPACE] ||= new end attr_reader :items_by_block, :loaded_values_by_block def initialize @items_by_block = Hash.new { |hash, key| hash[key] = Set.new } @loaded_values_by_block = Hash.new { |hash, key| hash[key] = {} } end end end
要はスレッドごとに@items_by_block
と@loaded_values_by_block
という2つハッシュを持つのがExecutor。
ExecutorProxyの役割はインスタンスごとに@block
を保持し自身を通してブロックごとにBatchLoaderからアクセスするExecutorのハッシュを特定すること。
# in ExecutorProxy def items_to_load global_executor.items_by_block[@block_hash_key] end def loaded global_executor.loaded_values_by_block[@block_hash_key] end
最終的にBatchLoaderのインスタンスが複数作られ、それらは全て同じExecutorの@items_by_block
に対してBatchLoader#batch
に渡されたブロックそのものをkeyとしてpost.user_id
をためていく。
@loaded_values_by_block
は実際の値とuser.id
を紐づけたものが入っていくことになる。
__executor_proxy.add(item: @item)
についてまとめると
__executor_proxy.add(item: post.user_id) # ↓ BatchLoader::ExecutorProxy.new(&@batch_block).items_by_block["#{`batch_blockのソースライン情報`}"] << post.user_id # items_by_block[@block_hash_key]はデフォルトで空のSetのインスタンスが入っている。
BatchLoader.for(post.user_id).batch(&block)
は何度実行しても&block
のソースロケーションが同じであれば同一の
items_by_block[batch_blockのソースライン情報]
というSetにpost.user_id`をつめこんでいくことになる。
__singleton_class.class_eval { undef_method(:batch) }
最後の__singleton_class.class_eval { undef_method(:batch) }
について。
BatchLoaderのインスタンスの特異クラスからbatchメソッドを消している。
これは同じインスタンスに対してbatch
の次の二回目メソッドの呼び出しはbatch
自身も含めて全てmethod_missing
に飛ばしたいという意図があると思われる。
そしてこれがBatchLoaderクラスのmethod_missing
。
def method_missing(method_name, *args, &block) __sync!.public_send(method_name, *args, &block) end
実際にmethod_missingが走る時(本当にUserのインスタンスそのものが必要になった時) については次章で解説する。
ここまででexecutor.items_by_block[#{BatchLoader#batchに渡したブロック}"]
にpost.user_id
が複数積み込まれたことになる。
batchしたものをどうやって本当のオブジェクトにしているのか
ここから実際にオブジェクトをロードする過程に入る。
実際にlazyなオブジェクトが使用される時、ということはなにかしらメソッドが呼び出される時であり、その時にBatchLoaderのインスタンスでは method_missingが呼び出される。
def method_missing(method_name, *args, &block) __sync!.public_send(method_name, *args, &block) end
__sync!
は概ね__sync
のラッパー。
BatchLoaderクラス内では多くのメソッドが__
というprefixが付いているが、実際のオブジェクトのメソッドと極力かぶらないためだろう。
def __sync! loaded_value = __sync if @replace_methods __replace_with!(loaded_value) else loaded_value end end
def __sync return @loaded_value if @synced __ensure_batched @loaded_value = __executor_proxy.loaded_value(item: @item) if @cache @synced = true else __purge_cache end @loaded_value end
__ensure_batched
をみると
def __ensure_batched return if __executor_proxy.value_loaded?(item: @item) items = __executor_proxy.list_items loader = __loader args = {default_value: @default_value, cache: @cache, replace_methods: @replace_methods, key: @key} @batch_block.call(items, loader, args) items.each do |item| next if __executor_proxy.value_loaded?(item: item) loader.call(item, @default_value) en __executor_proxy.delete(items: items) end
最初にloaderを作っている。
def __loader mutex = Mutex.new -> (item, value = (no_value = true; nil), &block) do if no_value && !block raise ArgumentError, "Please pass a value or a block" elsif block && !no_value raise ArgumentError, "Please pass a value or a block, not both" end mutex.synchronize do next_value = block ? block.call(__executor_proxy.loaded_value(item: item)) : value] __executor_proxy.load(item: item, value: next_value) end end end
lamdaオブジェクト。「(item, value xor block)を引数にとって実際にvalueをloadするlamda」がloaderである。
__ensure_batched
では次に@batch_block.call(items, loader, args)
が実行される。
最初のload_user
で書いたbatch
に渡しているブロックが@batch_block。loaderには先程のlamdaオブジェクトが渡される。
def load_user(post) BatchLoader.for(post.user_id).batch do |user_ids, loader| User.where(id: user_ids).each { |user| loader.call(user.id, user) } end end
loaderというlamdaに渡しているのはuser.id
と実際のvalueであるuser
。
この2つの引数がExecutorProxyを通してExecutorの@loaded_values_by_block[batch_blockのソースライン情報]
というハッシュに
{ user.id: user }
という形で追加されていくことになる。
ここで__sync
に戻ると
@loaded_value = __executor_proxy.loaded_value(item: @item)
これはBatchLoaderのインスタンスがもつそれぞれの@item
( = post.user_id)
を使ってExecuterの@loaded_values_by_block[batch_blockのソースライン情報][@item]
としてハッシュから値をとりだしている処理。
post.user_id
と先程loadした{ user.id: user }
のuser.id
は等しいはずなのでここで無事にBatchLoaderのインスタンスごとに正しいuserを引っ張ってくることができる。
以降は別のBatchLoaderインスタンスでは@loaded_values_by_block[batch_blockのソースライン情報]
というハッシュが中身付きでメモリに存在するため、再度User.where(id: user_ids).each { |user| loader.call(user.id, user) }
という行為をする必要がなくなる。これにてBatch-Loading完了となる。