2012年9月7日金曜日

Scala Tips / Scala 2.10味見(8) - Future(6)

scala.concurrent.Futureのコンパニオン・オブジェクトには以下のメソッドが定義されています。

  • failed
  • successful
  • apply
  • sequence
  • firstCompletedOf
  • find
  • fold
  • reduce
  • traverse

この中で、sequence, firstCompletedOf, find, fold, reduce, traverseは非同期実行した複数のFutureの同期制御を行います。今回はこれらのメソッドの動きを確認します。

なお本ブログでは、「オブジェクトで定義しているユーティリティ・メソッドは関数と呼ぶことも可」という方針にしているので、以下では関数と呼ぶことにします。

準備

例によって以下の関数gを使います。

  1. val g = (x: Int) => {    
  2.   Thread.sleep(x * 100)  
  3.   x    
  4. }  

実行時間計測はgo関数を用います。

  1. def go[T](a: => T): (T, Long) = {    
  2.   val start = System.currentTimeMillis    
  3.   val r = a    
  4.   val end = System.currentTimeMillis    
  5.   (r, end - start)    
  6. }  

sequenceとtraverse

sequence関数とtraverse関数は「Scala 2.10味見(6) - Future(4)」でも取り上げましたが、Monadicプログラミングを象徴するような関数です。

seqeunce関数やtraverse関数を用いると、複数のFutureの処理の完了を一度に待ち合わせることができます。

sequence
  1. def f = {  
  2.   import scala.concurrent._  
  3.   import ExecutionContext.Implicits.global  
  4.   import scala.concurrent.util.Duration  
  5.   
  6.   go {  
  7.     val f1 = future { g(1) }  
  8.     val f2 = future { g(2) }  
  9.     val f3 = future { g(3) }  
  10.     val r = Future.sequence(List(f1, f2, f3))  
  11.     Await.result(r, Duration.Inf)  
  12.   }  
  13. }  
scala> f
res37: (List[Int], Long) = (List(1, 2, 3),302)
traverse
  1. def f = {  
  2.   import scala.concurrent._  
  3.   import ExecutionContext.Implicits.global  
  4.   import scala.concurrent.util.Duration  
  5.   
  6.   go {  
  7.     val l = List(123)  
  8.     def k(x: Int) = future(g(x))  
  9.     val r = Future.traverse(l)(k)  
  10.     Await.result(r, Duration.Inf)  
  11.   }  
  12. }  
scala> f
res38: (List[Int], Long) = (List(1, 2, 3),302)

sequence関数, traverse関数いずれの場合も、3つのFutureが同時に実行され最長の300msが全体の実行時間になっています。

par

今回のような単純な例では、実際のプログラミングではparメソッドによる並列Listを用いたほうが簡明です。

scala> go(List(1, 2, 3).par.map(g))
res32: (scala.collection.parallel.immutable.ParSeq[Int], Long) = (ParVector(1, 2, 3),306)

sequenceやtraverseはFutureモナドを陽に用いてプログラミングを行う際のツールと考えるとよいでしょう。

firstCompletedOf

複数のFutureの中で最初に完了したFutureの結果を使用するという"早い者勝ち"処理にはfirstCompleteOf関数が使用できます。

  1. def f = {  
  2.   import scala.concurrent._  
  3.   import ExecutionContext.Implicits.global  
  4.   import scala.concurrent.util.Duration  
  5.   
  6.   go {  
  7.     val f1 = future { g(1) }  
  8.     val f2 = future { g(2) }  
  9.     val f3 = future { g(3) }  
  10.     val r = Future.firstCompletedOf(List(f1, f2, f3))  
  11.     Await.result(r, Duration.Inf)  
  12.   }  
  13. }  
scala> f
res39: (Int, Long) = (1,102)

g(1)は100msしかウェイトしないので最初に実行が完了しますが、この結果得られる1が全体の演算結果、100msが全体の実行時間になっています。

find

複数のFutureの中で最初に条件を満たす結果を返したものを使用する処理にはfind関数が使用できます。

  1. def f = {  
  2.   import scala.concurrent._  
  3.   import ExecutionContext.Implicits.global  
  4.   import scala.concurrent.util.Duration  
  5.   
  6.   go {  
  7.     val f1 = future { g(1) }  
  8.     val f2 = future { g(2) }  
  9.     val f3 = future { g(3) }  
  10.     val r = Future.find(List(f1, f2, f3))(_ > 1)  
  11.     Await.result(r, Duration.Inf)  
  12.   }  
  13. }  
scala> f
res40: (Option[Int], Long) = (Some(2),202)

演算結果が1より大きいという条件にあうものはg(2)とg(3)ですが、実行時間はg(2)が200ms、g(3)が300msです。そこでg(2)が演算結果の条件が合うものの中で最初に実行が完了します。

全体の演算結果は、g(2)の演算結果の2、全体の実行時間はg(2)の実行時間の200msになりました。

foldとreduce

fold関数やreduce関数は、複数のFutureが全て終了した後を待ち合わせて畳み込みを行います。

fold
  1. def f = {  
  2.   import scala.concurrent._  
  3.   import ExecutionContext.Implicits.global  
  4.   import scala.concurrent.util.Duration  
  5.   
  6.   go {  
  7.     val f1 = future { g(1) }  
  8.     val f2 = future { g(2) }  
  9.     val f3 = future { g(3) }  
  10.     val r = Future.fold(List(f1, f2, f3))(0)(_ + _)  
  11.     Await.result(r, Duration.Inf)  
  12.   }  
  13. }  
scala> f
res41: (Int, Long) = (6,301)
reduce
  1. def f = {  
  2.   import scala.concurrent._  
  3.   import ExecutionContext.Implicits.global  
  4.   import scala.concurrent.util.Duration  
  5.   
  6.   go {  
  7.     val f1 = future { g(1) }  
  8.     val f2 = future { g(2) }  
  9.     val f3 = future { g(3) }  
  10.     val r = Future.reduce(List(f1, f2, f3))(_ + _)  
  11.     Await.result(r, Duration.Inf)  
  12.   }  
  13. }  
scala> f
res42: (Int, Long) = (6,302)

fold関数, reduce関数いずれの場合も、3つのFutureが同時に実行され最長の300msが全体の実行時間になっています。

ノート

Scala 2.10味見(6) - Future(4)」で触れたように、コンパニオン・オブジェクトFutureの提供する関数は、TraversableOnceとFutureを決め打ちにしているため、汎用性には問題がありますがFuture操作という観点では十分な機能を提供しています。

まずはこの実装を使いこなす事が大事ですが、技術的な流れとしてもう少し先の方向性についてもある程度予測することができます。

まず、モナドの入れ子操作は「Scala 2.10味見(6) - Future(4)」で述べたようにscalazが型クラスTravarseやApplicativeを用いて実装しているような汎用性のある形が求められることになるでしょう。

また、fold系処理ではSemigroup/Monoidを活用して、より高度な部品化を進めたいところです。scalazのfoldMapメソッドやsumrメソッド、foldReduceメソッドといった機能ですね。

現在のfold関数/reduce関数は、Futureの処理が全て完了するのを待って普通の畳込みを行なうナイーブなメカニズムになっていますが、Semigroup/Monoidを導入すれば結合法則(associative law)の性質、さらにCommutativeSemigroupやCommutativeMonoidといった型クラス(これらの型クラスはScalaz7にもまだ導入されていません)を導入すれば可換法則(commutative law)の性質を利用して、並列処理の最適化を行うことができるはずです。

ここまで確認してきたことを総合するとScalaz Promiseの機能はScala 2.10 Futureで置き換えることは可能です。Scalaの進化の方向性からも、Scalazも基本ライブラリのFutureを軸に並列プログラミングの技術体系を構築していくことになると予測します。

Scalaの基本機能でScalaz的なMonadicプログラミングができるようになるのは当面なさそうなので、そういう意味でもScalaz側でFutureを取り込んでMonadicに使えるように各種拡張が行われることを期待します。

諸元

  • Scala 2.10.0-M7
  • Scalaz 7 github 2012-09-03版

0 件のコメント:

コメントを投稿