Skip to main content

OSSコードリーディング Batch-loader

Batch-loaderというRubyでバッチローディングをするGemを読んで、そのきれいな実装に感心したのでまとめた。

github.com

実装自体はシンプルだけど、特定のGemに依存しているわけではないので使い方に融通が効くし、困った時にトラブルシューティングしやすい。
この記事は社内で一度発表したものだけど、上司に確認したらブログに載せても問題ないとのことだったので載せる。

READMEGem作成者のスライドの2つを読むのが一番という説がある。

Batch-loaderがやりたいこと

そもそもこのGem何がしたいの?という人はこの記事を読むといい。
Rails: N+1クエリを「バッチング」で解決するBatchLoader gem(翻訳)
DBやHTTPリクエストのN + 1問題をbatch load(一塊りでロードする)で解消するというもの。リクエストされた引数をまとめておき、値とマッピングだけコレクションしておき、実際の値の取得は最後に解決しようとする。

 def load_user(post)
  BatchLoader.for(post.user_id).batch do |user_ids, loader|
    User.where(id: user_ids).each { |user| loader.call(user.id, user) }
  end
end

posts = Post.where(id: [1, 2, 3]) #  SELECT * FROM posts WHERE id IN (1, 2, 3)

users = posts.map do |post|
  load_user(post)
end

puts users # SELECT * FROM users WHERE id IN (1, 2, 3)
 

load_userの中身がBatch-loaderの基本的な使い方。

この記事では大きく分けて

  • 実際に複数回呼び出されるpost.user_idをどうやってbatch(一群)にしているのか
  • batchしたもの(今回の例だとuser_id)をどうやって本当のオブジェクトにしているのか

の2つに分けて書いていく。

図でまとめたやつ

最初に図でザックリまとめたやつをのっけておく。

 def load_user(post)
  BatchLoader.for(post.user_id).batch do |user_ids, loader|
    User.where(id: user_ids).each { |user| loader.call(user.id, user) }
  end
end
 

f:id:sasa5740:20200429145512p:plain

f:id:sasa5740:20200429160902p:plain

実際に複数回呼び出されるpost.user_idをどうやってbatch(一群)にしているのか

最初に例に上げたload_userメソッド内では

 BatchLoader.for(post.user_id).batch(&block)
 

という形の処理を書いていた。 forはnewのエイリアス。BatchLoaderのインスタンスは引数を@itemとして持つ。 batchの中身は以下。

   def batch(default_value: nil, cache: true, replace_methods: nil, key: nil, &batch_block)
    @default_value = default_value
    @cache = cache
    @replace_methods = replace_methods.nil? ? cache : replace_methods
    @key = key
    @batch_block = batch_block

    __executor_proxy.add(item: @item)

    __singleton_class.class_eval { undef_method(:batch) }

    self
  end
 

@key = key以上の行はオプション処理。渡したブロックは@batch_block変数に入る。

ここでは大きく分けて2つ処理がある

  • __executor_proxy.add(item: @item)
  • __singleton_class.class_eval { undef_method(:batch) }

__executor_proxy.add(item: @item)

__executor_proxyの中身はBatchLoader::Executorというクラスのプロキシオブジェクト。

   def __executor_proxy
    @__executor_proxy ||= begin
      raise NoBatchError.new("Please provide a batch block first") unless @batch_block
      BatchLoader::ExecutorProxy.new(@default_value, @key, &@batch_block)
    end
  end
 
 class BatchLoader
  class ExecutorProxy
    def initialize(default_value, key, &block)
      @default_value = default_value
      @block = block
      @block_hash_key = [block.source_location, key]
      @global_executor = BatchLoader::Executor.ensure_current
    end
 

実際のBatchLoader::Executorの中身。

 class BatchLoader
  class Executor
    NAMESPACE = :batch_loader

    def self.ensure_current
      Thread.current[NAMESPACE] ||= new
    end

    attr_reader :items_by_block, :loaded_values_by_block

    def initialize
      @items_by_block = Hash.new { |hash, key| hash[key] = Set.new }
      @loaded_values_by_block = Hash.new { |hash, key| hash[key] = {} }
    end
  end
end
 

要はスレッドごとに@items_by_block@loaded_values_by_blockという2つハッシュを持つのがExecutor。 ExecutorProxyの役割はインスタンスごとに@blockを保持し自身を通してブロックごとにBatchLoaderからアクセスするExecutorのハッシュを特定すること。

    # in ExecutorProxy
    def items_to_load
      global_executor.items_by_block[@block_hash_key]
    end

    def loaded
      global_executor.loaded_values_by_block[@block_hash_key]
    end
 

最終的にBatchLoaderのインスタンスが複数作られ、それらは全て同じExecutorの@items_by_blockに対してBatchLoader#batchに渡されたブロックそのものをkeyとしてpost.user_idをためていく。 @loaded_values_by_blockは実際の値とuser.idを紐づけたものが入っていくことになる。

__executor_proxy.add(item: @item)についてまとめると

 __executor_proxy.add(item: post.user_id)
 # ↓
 BatchLoader::ExecutorProxy.new(&@batch_block).items_by_block["#{`batch_blockのソースライン情報`}"] << post.user_id
 # items_by_block[@block_hash_key]はデフォルトで空のSetのインスタンスが入っている。
 

BatchLoader.for(post.user_id).batch(&block)は何度実行しても&blockのソースロケーションが同じであれば同一の items_by_block[batch_blockのソースライン情報]というSetにpost.user_id`をつめこんでいくことになる。

__singleton_class.class_eval { undef_method(:batch) }

最後の__singleton_class.class_eval { undef_method(:batch) }について。 BatchLoaderのインスタンスの特異クラスからbatchメソッドを消している。 これは同じインスタンスに対してbatchの次の二回目メソッドの呼び出しはbatch自身も含めて全てmethod_missingに飛ばしたいという意図があると思われる。

そしてこれがBatchLoaderクラスのmethod_missing

   def method_missing(method_name, *args, &block)
    __sync!.public_send(method_name, *args, &block)
  end
 

実際にmethod_missingが走る時(本当にUserのインスタンスそのものが必要になった時) については次章で解説する。

ここまででexecutor.items_by_block[#{BatchLoader#batchに渡したブロック}"]post.user_idが複数積み込まれたことになる。

batchしたものをどうやって本当のオブジェクトにしているのか

ここから実際にオブジェクトをロードする過程に入る。

実際にlazyなオブジェクトが使用される時、ということはなにかしらメソッドが呼び出される時であり、その時にBatchLoaderのインスタンスでは method_missingが呼び出される。

   def method_missing(method_name, *args, &block)
    __sync!.public_send(method_name, *args, &block)
  end
 

__sync!は概ね__syncのラッパー。 BatchLoaderクラス内では多くのメソッドが__というprefixが付いているが、実際のオブジェクトのメソッドと極力かぶらないためだろう。

   def __sync!
    loaded_value = __sync

    if @replace_methods
      __replace_with!(loaded_value)
    else
      loaded_value
    end
  end
 
   def __sync
    return @loaded_value if @synced

    __ensure_batched
    @loaded_value = __executor_proxy.loaded_value(item: @item)

    if @cache
      @synced = true
    else
      __purge_cache
    end

    @loaded_value
  end
 

__ensure_batchedをみると

   def __ensure_batched
    return if __executor_proxy.value_loaded?(item: @item)

    items = __executor_proxy.list_items
    loader = __loader
    args = {default_value: @default_value, cache: @cache, replace_methods: @replace_methods, key: @key}
    @batch_block.call(items, loader, args)
    items.each do |item|
      next if __executor_proxy.value_loaded?(item: item)
      loader.call(item, @default_value)
    en
    __executor_proxy.delete(items: items)
  end
 

最初にloaderを作っている。

   def __loader
    mutex = Mutex.new
    -> (item, value = (no_value = true; nil), &block) do
      if no_value && !block
        raise ArgumentError, "Please pass a value or a block"
      elsif block && !no_value
        raise ArgumentError, "Please pass a value or a block, not both"
      end

      mutex.synchronize do
        next_value = block ? block.call(__executor_proxy.loaded_value(item: item)) : value]
        __executor_proxy.load(item: item, value: next_value)
      end
    end
  end
 

lamdaオブジェクト。「(item, value xor block)を引数にとって実際にvalueをloadするlamda」がloaderである。

__ensure_batchedでは次に@batch_block.call(items, loader, args)が実行される。 最初のload_userで書いたbatchに渡しているブロックが@batch_block。loaderには先程のlamdaオブジェクトが渡される。

 def load_user(post)
  BatchLoader.for(post.user_id).batch do |user_ids, loader|
    User.where(id: user_ids).each { |user| loader.call(user.id, user) }
  end
end
 

loaderというlamdaに渡しているのはuser.idと実際のvalueであるuser。 この2つの引数がExecutorProxyを通してExecutorの@loaded_values_by_block[batch_blockのソースライン情報]というハッシュに { user.id: user } という形で追加されていくことになる。
ここで__syncに戻ると
@loaded_value = __executor_proxy.loaded_value(item: @item)
これはBatchLoaderのインスタンスがもつそれぞれの@item( = post.user_id) を使ってExecuterの@loaded_values_by_block[batch_blockのソースライン情報][@item]としてハッシュから値をとりだしている処理。
post.user_idと先程loadした{ user.id: user }user.idは等しいはずなのでここで無事にBatchLoaderのインスタンスごとに正しいuserを引っ張ってくることができる。
以降は別のBatchLoaderインスタンスでは@loaded_values_by_block[batch_blockのソースライン情報]というハッシュが中身付きでメモリに存在するため、再度User.where(id: user_ids).each { |user| loader.call(user.id, user) }という行為をする必要がなくなる。これにてBatch-Loading完了となる。