Akihito Ikeda

2021年01月24日

posts/2021-01-24diary

日中はずっと家にいてだらだらしていた。夜になんとなく体を動かしたくなって、散歩を兼ねて近所のスーパーまでYと歩いて買い物に行った。 スーパーまで直行せずに遠回りをして、気になっていた建物の正体を突き止めたり、無駄に歩道橋を渡ってみたり、道路標示が塗り直されてるのに気づいたり、見慣れた近所をけっこう新鮮に感じることができて楽しかった。 ほどよい疲労感も得られてとてもよかったので今後もちょくちょくやっていきたい。夜だと車や自転車が少なくなるので歩きやすくていいし、うるさくなくて会話もしやすい。

プログラミングElixir(第2版)』を読み返して、複数のプロセスを起動して並列に何かさせるような基本的なコードを書いた。

状況設定としては、

list = [382253568, 723152896, 37802240, 379425024, 404894720, 471526144]

こういうリストがあって、各要素がそれぞれ何回2で割れるかを並列に計算したい。

SchedulerがSolverプロセスを複数個立ち上げ、それぞれのSolverプロセスに対して「この数字が何回2で割り切れるか計算して結果を返してね」という依頼をする。 Solverプロセスは依頼された計算を実際に行って結果をSchedulerに返す。SchedulerはSolverからの答えを受け取りつつ、まだlist(キュー)に計算対象が残っていれば再度依頼をする。キュー(実際はスタックとして使われてる)が空になったら、結果が返ってきたSolverから順にプロセスを終了させていき、Solverから受け取った複数の結果をまとめて最終的な計算をする、という感じ。

コードの雛形は本に載っていたものそのままで、それを手頃なAtCoderの問題に当てはめて書いてみた。

defmodule Main do
  def main do
    IO.read(:line)
    IO.read(:line)
    |> String.trim()
    |> String.split(" ")
    |> Enum.map(&String.to_integer/1)
    |> solve()
    |> IO.puts()
  end

  def solve(list) do
    Scheduler.run(10, Solver, :div, list)
  end
end

defmodule Solver do
  def div(scheduler) do
    send scheduler, {:ready, self()}
    receive do
      {:div, n, client} ->
        send client, {:answer, div_count(n, 2)}
        div(scheduler)
      {:shutdown} ->
        exit(:normal)
    end
  end

  defp div_count(dividend, divisor), do: _div_count(dividend, divisor, 0)
  defp _div_count(1, _, count), do: count
  defp _div_count(dividend, divisor, count) when rem(dividend, divisor) == 0 do
    _div_count(div(dividend, divisor), divisor, count+1)
  end
  defp _div_count(_, _, count), do: count
end

defmodule Scheduler do
  def run(num_processes, module, func, to_calculate) do
    (1..num_processes)
    |> Enum.map(fn(_) -> spawn(module, func, [self()]) end)
    |> schedule_processes(to_calculate, [])
  end

  defp schedule_processes(processes, queue, results) do
    receive do
      {:ready, pid} when length(queue) > 0 ->
        [next | tail] = queue
        send pid, {:div, next, self()}
        schedule_processes(processes, tail, results)
      {:ready, pid} ->
        send pid, {:shutdown}
        if length(processes) > 1 do
          schedule_processes(List.delete(processes, pid), queue, results)
        else
          Enum.min(results)
        end
      {:answer, result} ->
        schedule_processes(processes, queue, [ result | results ])
    end
  end
end

https://atcoder.jp/contests/abc081/submissions/19672718

以下のコードは上のものと全く同じ計算をするんだけど、各Solverプロセスに依頼する計算タスクの粒度をより細かくして「この数が2で割り切れたらアキュムレータに1足して、割り切れなかったらそのまま返して」という依頼をするよう書いたもの(SolverというよりJob的な単位になった)。つまり、ひとつのプロセスへの一度の依頼で「何回2で割り切れるか」までの計算はしない。

この問題設定ではあんまり意味のある変更じゃないけど、こちらの方がよりプロセスを使ってる感じがするし、 同じカタのコードでありつつも依頼するタスクの粒度を変えたりメッセージのやりとりを変えてうまく強調するように書き換えてみるのはいい訓練になったような気がする。

defmodule Main do
  def main do
    IO.read(:line)
    IO.read(:line)
    |> String.trim()
    |> String.split(" ")
    |> Enum.map(&String.to_integer/1)
    |> solve()
    |> IO.puts()
  end

  def solve(list) do
    Scheduler.run(10, Solver, :div, list)
  end
end

defmodule Solver do
  def div(scheduler) do
    send scheduler, {:ready, self()}
    receive do
      {:div, {n, acc}, client} ->
        send client, {:answer, solve(n, acc)}
        div(scheduler)
      {:shutdown} ->
        exit(:normal)
    end
  end

  defp solve(1, acc), do: {:fail, 1, acc}
  defp solve(n, acc) do
    case rem(n, 2) do
      0 -> {:success, div(n, 2), acc+1}
      _ -> {:fail, n, acc}
    end
  end
end

defmodule Scheduler do
  def run(num_processes, module, func, to_calculate) do
    to_calculate = to_calculate |> Enum.map(fn n -> {n , 0} end)
    (1..num_processes)
    |> Enum.map(fn(_) -> spawn(module, func, [self()]) end)
    |> schedule_processes(to_calculate, [])
  end

  defp schedule_processes(processes, queue, results) do
    receive do
      {:ready, pid} when length(queue) > 0 ->
        [next | tail] = queue
        send pid, {:div, next, self()}
        schedule_processes(processes, tail, results)
      {:ready, pid} ->
        send pid, {:shutdown}
        if length(processes) > 1 do
          schedule_processes(List.delete(processes, pid), queue, results)
        else
          Enum.min(results)
        end
      {:answer, {:success, n, acc}} ->
        schedule_processes(processes, [ {n, acc} | queue ], results)
      {:answer, {:fail, _, acc}} ->
        schedule_processes(processes, queue, [ acc | results ])
    end
  end
end

https://atcoder.jp/contests/abc081/submissions/19673800

いやでもこのschedule_processes()、各プロセスから受け取った結果の最終的なまとめをどこでやってるのかぱっと見じゃわからないよなー…。

© Akihito Ikeda - Last update 22.08.2021 19:15.