2015年8月24日月曜日

[scalaz]Task - 並列処理

先日の「Reactive System Meetup in 西新宿」で「Scalaz-StreamによるFunctional Reactive Programming」のスライドを作るにあたってscalazのTaskについて調べなおしてみたのですが、Taskの実用性について再確認することができました。

色々と切り口がありますが、その中で並列性能が今回のテーマです。

準備

準備として以下のものを用意します。

implicit val scheduler = new java.util.concurrent.ForkJoinPool(1000)

  def go[T](msg: String)(body: => T): Unit = {
    System.gc
    val ts = System.currentTimeMillis
    val r = body
    println(s"$msg(${System.currentTimeMillis - ts}): $r")
  }

  def fa(i: Int): Task[Int] = Task {
    Thread.sleep(i)
    i
  }

スレッド実行コンテキスト(ExecutorService)にはForkJoinPoolを使用します。

ForkJoinPoolは分割統治(devide and conquer)により並行処理を再帰的(recursive)に構成する処理向けのスレッドスケジュールを行います。ざっくりいうと並列で何らかの計算を行う処理全般に向くスケジュールといえるのではないかと思います。IO処理の場合も復数のIO発行後に同期待ち合わせをするケースでは同様にForkJoinPoolが有効と思われます。

今回の性能検証では1000並列させたいのでパラメタで指定しています。

goメソッドは性能測定用のユーティリティです。

fa関数は性能測定対象の関数です。関数faは指定されたマイクロ秒間ウェイトして指定されたマイクロ秒を返す関数をTask化したものです。

問題

以下は性能検証の課題のプログラムです。fa関数の呼出しを1000回逐次型で行います。

Vector.fill(1000)(fa(1000)).map(_.run).sum

関数faは指定されたマイクロ秒間ウェイトして指定されたマイクロ秒を返す関数をTask化したものです。これを1000回繰り返したものを合計したものを計算します。

パラメタでは1000マイクロ秒=1秒を指定しているので1000回繰り返すと16.7分程かかる処理になります。

性能検証

Taskの並列処理を行う以下の関数について性能測定を行いました。

  • Task/gatherUnordered
  • Task/reduceUnordered
  • Nondeterminism/gather
  • Nondeterminism/gatherUnordered
  • Nondeterminism/reduceUnordered
  • Nondeterminism/aggregate
  • Nondeterminism/aggregateCommutative

以下ではそれぞれの関数について説明します。

gatherUnordered(Task)

TaskのgatherUnordered関数の性能測定対象プログラムは以下になります。

def apply_gather_unordered_task: Int = {
    val xs = Vector.fill(1000)(fa(1000))
    val t = Task.gatherUnordered(xs).map(_.sum)
    t.run
  }

TaskのgatherUnordered関数はNondeterminismのgatherUnordered関数とよく似ていますが、並列実行しているTaskの1つが例外になった時に処理全体を止める機能を持っている点が異なります。デフォルトではfalseになっているので、ここではこの機能は使っていません。

unorderedつまり結果順序は元の順序を維持していない(計算が終わった順の可能性が高い)ことは待ち合わせ処理を最適化できる可能性があるので実行性能的には好材料です。一方、アルゴリズム的には順序が保たれていることが必要な場合があるので、その場合はgatherUnorderedは使用することは難しくなります。

可換モノイド(commutative monoid)は演算の順序が変わっても結果が同じになることが保証されているので、並列処理結果が可換モノイドであり、並列処理結果の結合処理が可換モノイドの演算である場合は、並列処理結果が元の順序を保持している必要はありません。つまりgatherUnorderedを使っても全く問題ないわけです。

Intは「+」演算に対して可換モノイドなので、並列処理結果の総和を計算するという結合処理向けにgatherUnorderedを使うことができます。

reduceUnordered(Task)

TaskのreduceUnordered関数の性能測定対象プログラムは以下になります。

def apply_reduce_unordered_task: Int = {
    val xs = Vector.fill(1000)(fa(1000))
    val t = Task.reduceUnordered(xs)(Reducer.identityReducer)
    t.run
  }

並列処理を行った後で、復数の処理結果をまとめる場合にはreduce機能を使用すると意図が分かりやすいですし、共通処理内での最適化も期待できます。

TaskのreduceUnorderedはscalazのReducerを使って、並列処理結果のreduce処理を行う関数です。NondeterminismのreduceUnordered関数とよく似ていますが、並列実行しているTaskの1つが例外になった時に処理全体を止める機能を持っている点が異なります。デフォルトではfalseになっているので、ここではこの機能は使っていません。

並列処理の結果得られるデータはIntで、Intは可換モノイドですから、Monoidの性質を使ってreduce処理を行うことができます。そこで、ReducerとしてidentityReducer(処理結果のMonoidをそのまま使ってreduce処理を行う)を指定しています。

gather(Nondeterminism)

Nondeterminismのgather関数の性能測定対象プログラムは以下になります。

def apply_gather: Int = {
    val xs = Vector.fill(1000)(fa(1000))
    val t = Nondeterminism[Task].gather(xs).map(_.sum)
    t.run
  }

「Nondeterminism[Task]」は型クラスNondeterminismのTask向け型インスタンスの意味です。つまりTaskはNondeterminismでもあるので、Nondeterminismのgather関数を実行することができます。

gather関数はNondeterminismデータシーケンスに対してそれぞれの要素を並列処理し、シーケンスの順序を維持した結果を計算します。

上記ではその結果得られたIntシーケンスをsum関数で合算しています。

gatherUnordered(Nondeterminism)

NondeterminismのgatherUnordered関数の性能測定対象プログラムは以下になります。

def apply_gather_unordered: Int = {
    val xs = Vector.fill(1000)(fa(1000))
    val t = Nondeterminism[Task].gatherUnordered(xs).map(_.sum)
    t.run
  }

TaskのgatherUnordered関数と同様に指定された並列処理の順序を保持せず、処理結果を順不同でシーケンスとして返します。

結果としてIntシーケンスが返ってきますが、Intは可換モノイドの性質を持つため順不同で返ってきてもsum関数で合算して問題ありません。

reduceUnordered(Nondeterminism)

NondeterminismのreduceUnordered関数の性能測定対象プログラムは以下になります。

def apply_reduce_unordered: Int = {
    val xs = Vector.fill(1000)(fa(1000))
    val t = Nondeterminism[Task].reduceUnordered(xs)(Reducer.identityReducer)
    t.run
  }

型クラスNondeterminismのreduceUnordered関数を使って並列実行と実行結果のreduce処理を行います。

TaskのreduceUnorderedの場合と同じくReducerとしてidentityReducerを指定しています。

aggregate(Nondeterminism)

Nondeterminismのaggregate関数の性能測定対象プログラムは以下になります。

def apply_aggregate: Int = {
    val xs = Vector.fill(1000)(fa(1000))
    val t = Nondeterminism[Task].aggregate(xs)
    t.run
  }

aggregate関数はreduceUnordered関数と並列実行後にreduce処理を行う点では同じ系統の計算を行いますが、以下の点が異なります。

  • aggregate関数はMonoidを前提としておりMonoidの性質を利用してreduce処理を行う。それに対してreduceUnordered関数はreduce処理を行うアルゴリズムをReducerとして指定する。
  • Monoidは可換モノイドとは限らないので並列計算の順序が保存されている必要がある。このためaggregate関数は並列実行順序を保存する処理を行っている。それに対してreduceUnordered関数は並列実行順序を保存する処理を行っていない。

aggregateは(可換モノイドでないかもしれない)Monoidを処理対象にしているため、並列計算の順序を保存する処理が必要になるので、その分性能的には不利になります。

この問題に対する改良策が次のaggregateCommutative関数です。

aggregateCommutative(Nondeterminism)

NondeterminismのaggregateCommutative関数の性能測定対象プログラムは以下になります。

def apply_aggregate_commutative: Int = {
    val xs = Vector.fill(1000)(fa(1000))
    val t = Nondeterminism[Task].aggregateCommutative(xs)
    t.run
  }

aggregateCommutative関数はaggreagte関数と同様にMonoidを処理対象としていますが、指定されたMonoidが可換モノイドであるという前提で計算を行います。

可換モノイドであるということは、演算の評価順序が異なっても同じ結果になるということなので、指定された並列計算シーケンスの実行順序を保存する処理は不要です。並列計算シーケンスの実行順序を保存する処理が不要になると実行性能的に有利です。

可換モノイドとモノイドの違い(つまり可換性)は並列処理で重要ですが、現在のところScalazには可換モノイドを表現する型クラスは用意されていないので、型を使ってエラーチェックを行ったり(例:可換性前提の関数で可換性なしのデータを使用できないようにチェック)、最適化(例:可換性の有無で実行順序の保存処理の有無を切り替える)を行うようなことはできません。

aggregateCommutative関数で行っているように、使用者側が違いを意識して使う形になります。

性能

クラスメソッド性能(ms)順序保存集約機能キャンセル機能
TaskgatherUnordered2139--
TaskreduceUnordered1869-
Nondeterminismgather1082--
NondeterminismgatherUnordered1048 ---
NondeterminismreduceUnordered1718--
Nondeterminismaggregate1049-
NondeterminismaggregateCommutative1040--

評価

表では各関数を以下の性質の有無で分類しました。

  • 順序保存
  • 集約機能
  • キャンセル機能

それぞれの性質毎に性能特性を考えます。

順序保存

評価順序保存をすると順序保存なしより少し遅くなります。

可能であれば順序保存なしを選ぶのがよいでしょう。

順序保存なしを選べるかどうかは、各並列処理の計算結果がreduce処理の演算に対して可換モノイド(commutative monoid)であるかどうかで判定できます。

整数の加算や積算は典型的な可換モノイドなので、最終的な計算結果が合算処理(全要素の加算)の場合は「順序保存なし」を選べるわけです。

集約機能

並列実行関数に集約機能が包含されていると、各並列処理の結果を使って直接集約処理を行うことができるので効率的です。一度、並列実行結果をリスト上に保存して、そのリストに対して集約するより、集約対象のデータ(例:整数値)を各並列処理の完了後に直接更新して行く方がオーバーヘッドは少なくなります。

集約機能を提供しているaggregate関数とaggregateCommutative関数が、それぞれ対応するgather関数、gatherUnorderedと比較して若干速いのはそのためだと思われます。

Monoidは集約対象として優れた性質を持っているので、集約機能の対象として使用するのがFunctional Programming(以下FP)のパターンになっています。aggregate関数とaggregateCommutative関数はこのパターンに則って、集約機能の対象としてMonoidを使用します。

一方Reducerを使った集約は大きなオーバーヘッドがあるようなので、積極的に利用する価値があるという感じではないようです。

NondeterminismのgatherUnordered関数とreduceUnordered関数の比較ではreduceUnordered関数がかなり遅くなっています。この場合、Reducer経由でMonoidの集約を行っているので、Monoidを直接集約するよりオーバーヘッドがあるのが原因と思われます。

一方TaskのgatherUnordered関数とreduceUnordered関数の場合、reduceUnordered関数の方が速いので、こちらの場合はreduceUnordered関数の利用は有力な選択肢です。キャンセル機能が重たいためにReducer機能の遅さが隠れてしまうのかもしれません。

キャンセル機能

キャンセル機能はTaskのgatherUnordered関数、reduceUnordered関数が提供しています。

NondeterminismのgatherUnordered関数、reduceUnordered関数と比較すると相当遅くなっています。キャンセル機能が必要でない場合は使わない方がよいでしょう。

まとめ

性能測定の結果、並列処理結果を可換モノイドで受け取り集約処理を行うaggregateCommutative関数が一番高速であることが分かりました。

並列処理実行後の集約処理までを一体化して処理の最適化ができるのは可換モノイドの効果です。

並列処理を設計する際には、各並列処理の結果を可換モノイドで返すような形に持ち込むことができるのかというのが一つの論点になると思います。

また可換モノイドにできない場合も、モノイドにできれば汎用関数で集約まで行うことができるので、並列処理を記述する上で大きな助けになります。

Scalazではモノイドを記述する型クラスMonoidが用意されています。MonoidはScalazによるFPで中心的な役割を担う型クラスの一つですが、並列処理においても重要な役割を担うことが確認できました。

Scalazでは可換モノイドを記述する型クラスはまだ用意されていないので、Monoidで代用することになります。aggregateCommutative関数のように引数の型としてはMonoidを使い、暗黙的に可換モノイドを前提とするような使い方になると思います。

メニーコアによる並列計算が本格化するのはもう少し先になると思いますが、その際のベースとなる要素技術はすでに実用技術として利用可能になっていることが確認できました。FPが並列計算に向いているという期待の大きな部分は、モノイドや可換モノイドのような数学的な概念をプログラミング言語で直接使用できる点にあります。Scala&Scalazはこれを実用プログラミングで可能にしているのが大きな美点といえるかと思います。

諸元

  • Mac OS 10.7.5 (2.6 GHz Intel Core i7)
  • Java 1.7.0_75
  • Scala 2.11.6

2015年8月17日月曜日

クラウドアプリケーション・モデリング考

8月7日に「匠の夏まつり ~モデリングの彼方に未来を見た~」のイベントが行われましたが、この中でパネルディスカッションに参加させていただきました。パネルディスカッションでご一緒させていただいた萩本さん、平鍋さん、高崎さん、会場の皆さん、どうもありがとうございました。

パネルディスカッションがよいきっかけとなって、クラウドアプリケーション開発におけるモデリングについての方向性について腰を落として考えることができました。このところFunctional Reactive Programmingを追いかけていましたが、ちょうどモデリングとの接続を考えられる材料が揃ってきているタイミングでした。

パネルディスカッションの前後に考えたことをこれまでの活動の振り返りも含めてまとめてみました。

基本アプローチ

2008年頃からクラウドアプリケーション開発の手法について以下の3点を軸に検討を進めています。

  • クラウド・アプリケーションのアーキテクチャ
  • メタ・モデルと実装技術
  • モデル駆動開発

検討結果は以下にあげるスライドとブログ記事としてまとめていますが、基本的な考え方は現在も同じです。

ざっくりいうと:

  • クラウド・アプリケーションのバックエンドのアーキテクチャはメッセージ方式になる。
  • クラウド・アプリケーションのモデリングではOOADの構造モデル、状態機械モデルを踏襲。
  • 協調モデルの主力モデルとしてメッセージフローまたはデータフローを採用。
  • OOADの構造モデル、状態機械モデルはモデル駆動開発による自動生成。
  • メッセージフローまたはデータフローはDSLによる直接実行方式が有効の可能性が高い。

という方針&仮説です。「OOADの構造モデル、状態機械モデルはモデル駆動開発による自動生成」についてはSimpleModeler、「メッセージフローまたはデータフローはDSLによる直接実行方式」についてはg3 frameworkで試作を行っていました。

ここまでが2010年から2012年中盤にかけての状況です。

ブログ

2012年以降のアプローチ

2012年の後半にEverforthに参画してApparel Cloudを始めとするCloud Service Platformの開発に注力しています。

前述の論点の中で以下の3点についてはApparel Cloudの開発に直接取り入れています。

  • クラウド・アプリケーションのバックエンドのアーキテクチャはメッセージ方式になる。
  • クラウド・アプリケーションのモデリングではOOADの構造モデル、状態機械モデルを踏襲。
  • OOADの構造モデル、状態機械モデルはモデル駆動開発による自動生成。

「メッセージフローまたはデータフローはDSLによる直接実行方式が有効の可能性が高い」については当初はg3 frameworkという独自DSLによる実装を考えていたのですが、Object-Functional Programmingの核となる技術であるモナドがパイプライン的なセマンティクスを持ち、データフローの記述にも使用できそうという感触を得られたため、ScalazベースのMonadic Programmingを追求して技術的な接点を探るという方針に変更しました。

2012年以降ブログの話題がScalaz中心になるのはこのためです。

その後、まさにドンピシャの技術であるscalaz-streamが登場したので、scalaz-streamをApparel Cloudの構築技術として採用し、「メッセージフローまたはデータフローはDSLによる直接実行方式が有効の可能性が高い」の可能性を実システム構築に適用しながら探っている状況です。

今後のアプローチ

現在懸案として残っている項目は以下のものになります。

  • 協調モデルの主力モデルとしてメッセージフローまたはデータフローを採用。

前述したようにメッセージングのDSLとしてはscalaz-streamをベースにノウハウを積み重ねている状況なので、この部分との連続性をみながらモデリングでの取り扱いを考えていく予定です。

また、ストリーミング指向のアーキテクチャ&プログラミングモデルとしては以下のような技術が登場しています。

このような新技術の状況をみながら実装技術の選択を行っていく予定です。

参考: スライド

パネルディスカッションでのポジション宣言的なスライドとして以下のものを作成しました。

この中で6ページ目の「Cloud時代のモデリング」が今回パネルディスカッションのテーマに合わせて新規に作成したものです。

このスライドで言いたいことは、伝統的なスクラッチ開発とくらべてクラウドアプリケーションではプログラミング量が大幅に減るので、要件定義やその上流であるビジネスモデリングが重要になる、ということです。

  • アプリケーションの大きな部分はCloud Service Platformが実現
  • モデル駆動開発によってドメインモデル(静的構造)の大部分は自動生成される
  • Scalaで実現されているDSL指向のOFP(Object-Functional Programming)は記述の抽象度が高いので設計レベルのモデリングは不要
  • Scalaの開発効率は高いのでプログラミングの比重は下がる
補足:Featureモデル

後日スライドのキーワードページに入れておくべきキーワードとしてFeatureモデルがあることに気付いたので、上記のスライドには追加しておきました。

スライドの想定する世界では、クラウドアプリケーションはクラウドサービスプラットフォーム上で動作するため、クラウドサービスプラットフォームが提供している機能とクラウドアプリケーションの機能の差分をモデル化し、このモデルを元に実際に開発する所、カスタマイズで済ませる所などを具体化していく必要があります。この目的にはSoftware ProductlineのFeatureモデルが有効ではないかと考えています。