2015年3月30日月曜日

Scala的状態機械/FP編

Scalaにおける状態機械の実装戦略について検討しています。

Scala的状態機械/OOP編」で状態+状態遷移を表現するトレイトであるParseStateを作成しました。ParseStateの具象クラスはcase classまたはcase objectとして実現しています。これらのトレイト、case class、case objectがワンセットでFPで使用できる代数的データ構造となっています。

「OOP編」ではこのParseStateを使用して、オブジェクト版の状態機械とアクター版の状態機械を作成しました。代数的データ構造でもあるParseStateがOOP的に問題なく使用できることが確認できました。

「FP編」では、ParseStateの代数的データ構造の性質を活かしてMonadic Programming(以下MP)版の状態機械を考えてみます。

Stateモナド版状態機械

MPで状態を扱いたい場合には、状態を扱うモナドであるStateモナドが有力な選択肢です。

代数的データ型であるParseStateはそのまま利用し、これをStateモナドでくるむことで状態遷移を実現したものが以下のParserStateMonadです。

package sample

import scalaz._, Scalaz._

object ParserStateMonad {
  def action(event: Char) = State((s: ParseState) => {
    (s.event(event), event)
  })

  def parse(events: Seq[Char]): Seq[String] = {
    val s = events.toVector.traverseS(action)
    val r = s.run(InitState)
    r._1.endEvent.row
  }
}
action関数

モナドを使った共通機能を作る場合には、共通機能としていわゆるモナディック関数を提供するのが一つの形になっています。

モナディック関数とは「A→M[B]」(Mはモナド)の形をしている関数です。モナドMのflatMapコンビネータの引数になります。

Stateモナドを使用する場合には、A→State[B]の形の関数を用意することになります。

def action(event: Char) = State((s: ParseState) => {
    (s.event(event), event)
  })

今回作成したStateモナド用のモナディック関数であるaction関数は「Char→State[ParseState→(ParseState, Char)]」の形をしています。

A→M[B]の形とは以下の対応になります。

  • A : Char
  • M : State
  • B : ParseState→(ParseState, Char)

Stateモナドに設定している関数は「ParseState→(ParseState, Char)」の形をしていますが、action関数全体ではaction関数の引数もパラメタとして利用しているので、結果として「Char→ParseState→(Parse, Char)」の動きをする関数になっています。

action関数が返すStateモナドはParseStateによって表現された状態を、受信したイベントとの組合せ状態遷移する関数が設定されています。

状態+状態遷移を表現するオブジェクト兼代数的データ型であるParseStateがきちんと定義されていれば、Stateモナド用のモナディック関数を定義するのは容易なことが分かります。

parse関数

CharのシーケンスからCSVをパースするparse関数は以下になります。

def parse(events: Seq[Char]): Seq[String] = {
    val s = events.toVector.traverseS(action)
    val r = s.run(InitState)
    r._1.endEvent.row
  }

parse関数は、Stateモナド用モナディック関数actionと型クラスTraverseのtraverseSコンビネータの組合せで実現しています。

traverseSコンビネータはStateモナド用のtraverseコンビネータです。Scalaの型推論が若干弱いためStateモナド専用のコンビネータを用意していますが、動きは通常のtraverseコンビネータと同じです。

状態遷移のロジックそのものはParseStateオブジェクトにカプセル化したものをaction関数から返されるStateモナド経由で使用します。

traverseSコンビネータとStateモナドを組み合わせると、Traverseで走査対象となるコレクションをイベント列と見立てて、各イベントの発生に対応した状態機械をStateモナドで実現することができます。この最終状態を取得することで、イベント列を消化した最終結果を得ることができます。

OOPオブジェクトであり代数的データ構造でもあるParseStateは、このようにしてStateモナドに包むことで、FP的な状態機械としてもそのまま使用することができるわけです。

使い方

プログラムを実行するためのSpecは以下になります。

package sample

import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest._
import org.scalatest.prop.GeneratorDrivenPropertyChecks

@RunWith(classOf[JUnitRunner])
class ParserStateMonadSpec extends WordSpec with Matchers with GivenWhenThen with GeneratorDrivenPropertyChecks {
  "ParserStateMonad" should {
    "parse" in {
      val events = "abc,def,xyz".toVector
      val r = ParserStateMonad.parse(events)
      println(s"ParserStateMonadSpec: $r")
      r should be (Vector("abc", "def", "xyz"))
    }
  }
}
実行

実行結果は以下になります。

$ sbt test-only sample.ParserStateMonadSpec
ParserStateMonadSpec: List(abc, def, xyz)
[info] ParserStateMonadSpec:
[info] ParserStateMonad
[info] - should parse
[info] ScalaTest
[info] 36mRun completed in 400 milliseconds.0m
[info] 36mTotal number of tests run: 10m
[info] 36mSuites: completed 1, aborted 00m
[info] 36mTests: succeeded 1, failed 0, canceled 0, ignored 0, pending 00m
[info] 32mAll tests passed.0m
[info] Passed: Total 1, Failed 0, Errors 0, Passed 1
[success] Total time: 1 s, completed 2015/03/21 17:49:39

scalaz stream版状態機械

Stateモナド版状態機械ではStateモナドと型クラスTraverseのtraverseSコンビネータを使用して、状態機械をMPで実現しました。

この実現方法は、メモリ上に展開したデータに対して一括処理をするのには適していますが、大規模データ処理やストリーム処理への適用は不向きです。

そこで、大規模データ処理やストリーム処理をFunctional Reactive Programming(以下FRP)の枠組みで行うために、scalaz streamを使用して状態機械の実装してみました。

package sample

import scalaz._, Scalaz._
import scalaz.stream._
import scalaz.stream.Process.Process0Syntax

object ParserProcessMonad {
  def fsm(state: ParseState): Process1[Char, ParseState] = {
    Process.receive1 { c: Char =>
      val s = state.event(c)
      Process.emit(s) fby fsm(s)
    }
  }

  def parse(events: Seq[Char]): Seq[String] = {
    val source: Process0[Char] = Process.emitAll(events)
    val pipeline: Process0[ParseState] = source.pipe(fsm(InitState))
    val result = pipeline.toList.last
    result.endEvent.row
  }

  import scalaz.concurrent.Task

  def parseTask(events: Seq[Char]): Task[Seq[String]] = {
    val source: Process0[Char] = Process.emitAll(events)
    val pipeline: Process[Task, ParseState] = source.pipe(fsm(InitState)).toSource
    for {
      lastoption <- pipeline.runLast
      last = lastoption.get
    } yield last.endEvent.row
  }
}
fsm関数

Processモナドは一種の状態機械なので、この性質を利用してPraseStateによる状態遷移をProcessモナド化します。この処理を行うのがfsm関数です。

fsm関数は状態であるParseStateを引数に、Charを受け取るとParseStateとCharの組合せで計算された新しいParseStateによる状態に遷移しつつ処理結果としてParseStateを返すProcessモナドを返します。

def fsm(state: ParseState): Process1[Char, ParseState] = {
    Process.receive1 { c: Char =>
      val s = state.event(c)
      Process.emit(s) fby fsm(s)
    }
  }
parse関数

parse関数はscalaz streamをメモリ上の小規模データに適用する際の典型的な使い方です。

def parse(events: Seq[Char]): Seq[String] = {
    val source: Process0[Char] = Process.emitAll(events)
    val pipeline: Process0[ParseState] = source.pipe(fsm(InitState))
    val result = pipeline.toList.last
    result.endEvent.row
  }

parse関数のおおまかな流れは以下になります。

  1. パイプライン(Processモナド)を生成
  2. パイプラインの処理を記述
  3. パイプラインで加工後のデータを取得
パイプラインの生成

まず引数のCharシーケンスからソースとなるProcessモナドを生成します。

val source: Process0[Char] = Process.emitAll(events)

ProcessのemitAll関数は引数に指定されたシーケンスによるストリームを表現するProcessモナドを生成します。

ただし、内部的に非同期実行する本格的なシーケンスではなく、通常のシーケンスに対してストリームのインタフェースでアクセスできるという意味合いのProcessモナドになります。(内部的には実行制御に後述のTaskモナドではなく、Idモナドを使用しています。)

パイプラインの処理を記述

pipeコンビネータでfsm関数にInitStateを適用して得られるProcessモナドをパイプライン本体のProcessモナドに設定しています。

val pipeline: Process0[ParseState] = source.pipe(fsm(InitState))

これが処理の本体です。ここではpipeコンビネータを始めとする各種コンビネータを使ってパイプラインを構築します。

パイプラインで加工後のデータを取得

最後にtoListメソッドで、パイプラインの処理結果をListとして取り出し、ここからパース結果を取り出す処理を行っています。

val result = pipeline.toList.last
    result.endEvent.row
parseTask関数

parse関数はscalaz streamをメモリ上の小規模データに適用する際の使用例ですが、より本格的な応用である大規模データ処理やストリーム処理では少し使い方が変わってきます。

そこで、参考のために実行制御にTaskモナドを使ったバージョンのparseTask関数を作りました。

def parseTask(events: Seq[Char]): Task[Seq[String]] = {
    val source: Process[Task, Char] = Process.emitAll(events).toSource
    val pipeline: Process[Task, ParseState] = source.pipe(fsm(InitState))
    for {
      lastoption <- pipeline.runLast
      last = lastoption.get
    } yield last.endEvent.row
  }

parse関数と同様にparseTask関数のおおまかな流れは以下になります。

  1. パイプライン(Processモナド)を生成
  2. パイプラインの処理を記述
  3. パイプラインで加工後のデータを取得
パイプラインの生成

ProcessモナドのtoSourceメソッドで、Taskモナドを実行制御に使用するProcessモナドに変換されます。

val source: Process[Task, Char] = Process.emitAll(events).toSource
パイプラインの処理を記述

fsm関数から得られたProcessモナドをpipeコンビネータでパイプラインに設定します。

val pipeline: Process[Task, ParseState] = source.pipe(fsm(InitState))

この処理はTask版でないparse関数と全く同じです。

パイプラインで加工後のデータを取得

Taskモナドを実行制御に使用する場合には、for式などを使ってモナディックに実行結果を取得する形になります。

for {
      lastoption <- pipeline.runLast
      last = lastoption.get
    } yield last.endEvent.row
使い方

プログラムを実行するためのSpecは以下になります。

package sample

import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest._
import org.scalatest.prop.GeneratorDrivenPropertyChecks

@RunWith(classOf[JUnitRunner])
class ParserProcessMonadSpec extends WordSpec with Matchers with GivenWhenThen with GeneratorDrivenPropertyChecks {
  "ParserProcessMonad" should {
    "parse" in {
      val events = "abc,def,xyz".toVector
      val r = ParserProcessMonad.parse(events)
      println(s"ParserProcessMonadSpec: $r")
      r should be (Vector("abc", "def", "xyz"))
    }
  }
}
実行

実行結果は以下になります。

$ sbt test-only sample.ParserProcessMonadSpec
ParserProcessMonadSpec: List(abc, def, xyz)
[info] ParserProcessMonadSpec:
[info] ParserProcessMonad
[info] - should parse
[info] ScalaTest
[info] 36mRun completed in 301 milliseconds.0m
[info] 36mTotal number of tests run: 10m
[info] 36mSuites: completed 1, aborted 00m
[info] 36mTests: succeeded 1, failed 0, canceled 0, ignored 0, pending 00m
[info] 32mAll tests passed.0m
[info] Passed: Total 1, Failed 0, Errors 0, Passed 1
[success] Total time: 0 s, completed 2015/03/21 17:49:12

scalaz stream + Stateモナド版状態機械

「scalaz stream」ではParseStateオブジェクトを直接使用して状態機械を作成しました。通常はこれで十分ですが、Stateモナド用モナディック関数が用意されている場合は、こちらを使用する方法もあります。

この方法では、Stateモナドの使い方だけ理解していればよいので、プログラミングはより簡単かつ汎用的になります。

package sample

import scalaz._, Scalaz._
import scalaz.stream._

object ParserProcessMonadStateMonad {
  def fsm(state: ParseState): Process1[Char, ParseState] = {
    Process.receive1 { c: Char =>
      val s = ParserStateMonad.action(c).exec(state)
      Process.emit(s) fby fsm(s)
    }
  }

  def parse(events: Seq[Char]): Seq[String] = {
    val source: Process0[Char] = Process.emitAll(events)
    val pipeline: Process0[ParseState] = source.pipe(fsm(InitState))
    val result = pipeline.toList.last
    result.endEvent.row
  }

  import scalaz.concurrent.Task

  def parseTask(events: Seq[Char]): Task[Seq[String]] = {
    val source: Process0[Char] = Process.emitAll(events)
    val pipeline: Process[Task, ParseState] = source.pipe(fsm(InitState)).toSource
    for {
      lastoption <- pipeline.runLast
      last = lastoption.get
    } yield last.endEvent.row
  }
}
fsm関数

ParseStateによる状態機械の動作を行うProcessモナドを生成するfsm関数のStateモナド版は以下になります。

def fsm(state: ParseState): Process1[Char, ParseState] = {
    Process.receive1 { c: Char =>
      val s = ParserStateMonad.action(c).exec(state)
      Process.emit(s) fby fsm(s)
    }
  }

ParseStateのeventメソッドを直接使用する代わりに、ParseStateを包んだStateモナドを返すモナディック関数actionを使用します。

この版のfsm関数ではaction関数から取得したStateモナドのexecメソッドを使用して、現状態とイベント(Char)から新状態を計算し、この状態を保持した、新たなProcessを生成しています。

この方法のメリットはParseStateの使い方(この場合はeventメソッド)は知る必要がなく、汎用的なStateモナドの使い方だけ知っていればよい点です。

つまりaction関数だけ作っておけば、Traverseを使った状態機械、Processモナドを使った状態機械のどちらも簡単に作ることができるわけです。

使い方

プログラムを実行するためのSpecは以下になります。

package sample

import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest._
import org.scalatest.prop.GeneratorDrivenPropertyChecks

@RunWith(classOf[JUnitRunner])
class ParserProcessMonadStateMonadSpec extends WordSpec with Matchers with GivenWhenThen with GeneratorDrivenPropertyChecks {
  "ParserProcessMonad" should {
    "parse" in {
      val events = "abc,def,xyz".toVector
      val r = ParserProcessMonadStateMonad.parse(events)
      println(s"ParserProcessMonadStateMonadSpec: $r")
      r should be (Vector("abc", "def", "xyz"))
    }
  }
}
実行

実行結果は以下になります。

$ sbt test-only sample.ParserProcessMonadStateMonadSpec
ParserProcessMonadStateMonadSpec: List(abc, def, xyz)
[info] ParserProcessMonadStateMonadSpec:
[info] ParserProcessMonad
[info] - should parse
[info] ScalaTest
[info] 36mRun completed in 286 milliseconds.0m
[info] 36mTotal number of tests run: 10m
[info] 36mSuites: completed 1, aborted 00m
[info] 36mTests: succeeded 1, failed 0, canceled 0, ignored 0, pending 00m
[info] 32mAll tests passed.0m
[info] Passed: Total 1, Failed 0, Errors 0, Passed 1
[success] Total time: 0 s, completed 2015/03/21 17:49:26

状態機械実装戦略

オブジェクト・モデルで状態機械が出てきた場合の実装戦略としては、まずベースとして:

  • sealed trait + case classで状態+状態遷移のオブジェクト&代数的データ型(以下、状態遷移case class)

を作成します。

前回に見たようにこの状態遷移case classを使って、OOP版の状態機械を簡単に作成することができます。

次に、FP用に以下の部品を整備しておくのがよいでしょう。

  • Stateモナド用モナディック関数
  • Processモナド用状態機械関数

どちらの関数も状態遷移case classが用意されていれば、ほとんど定型的な記述のみで作成することができます。この部品を使うことでFP版の状態機械を簡単に作成することができます。

以上、状態機械実装戦略について整理しました。

この戦略で重要なのは、状態+状態遷移を表現するために作成した状態遷移case classは1つだけ作ればよく、これをOOP流、FP流にマルチに展開していける点です。

この点が確定すれば、オブジェクト・モデルに状態機械が出てきたら安心して1つの状態遷移case classの実装に集中することができます。

まとめ

OFPの状態機械の実装についてOOP的実装、FP的実装について整理してみました。

いずれの場合もオブジェクト&代数的データ型である「状態遷移case class」が基本で、ここからOOP的状態機械、FP的状態機械へマルチに展開できることが確認できました。

OFPではアプリケーション・ロジックは可能な限りFP側に寄せておくのがよいので、FPでも状態機械を簡単に実現できることが確認できたのは収穫でした。

また、FP的なアプローチが使えないケースが出てきても、簡単にOOP的アプローチに切り替え可能なのも安心材料です。

諸元

  • Scala 2.11.4
  • Scalaz 7.1.0
  • Scalaz-stream 0.6a
  • Scalatest 2.2.4

2 件のコメント:

  1. > traverseSコンビネータはStateモナド用のtraverseコンビネータです。Scalaの型推論が若干弱いためStateモナド専用のコンビネータを用意していますが


    型推論の関係もありますが、普通に実装したものを使うとstack overflowするので、それを避けるためという意味もあります

    返信削除
    返信
    1. お、そうでしたか。
      情報ありがとうございます!

      削除