2016年12月9日金曜日

ReaderWriterStateモナドと畳込み

ReaderWriterStateモナドは「Patterns in Types - A look at Reader, Writer and State in Scala」を見てからずっと気になっていたのですが、実務のプログラミングでも汎用的な基盤として使えるのではないかとふたたび自分の中でブームになってきたので、少し試してみました。

例題

レコードを正常なものと異常なものに選別する処理を考えます。異常なレコードは異常と判断した理由つきで記録します。

データ連携処理ではよく出てくる処理だと思います。

この処理を以下の関数として実装することにします。

def fold(
    xs: Vector[Record]
  )(implicit context: ExecutionContext): (Vector[Record], Vector[ErrorRecord])

この関数を以下のバリエーションで実装していきます。

Foldable
通常の畳込み
Monoid
モノイドによる畳込み
State
Stateモナドによる畳込み
Traverse&State
TraverseとStateモナドによる畳込み
Reducer
Reducerを使った畳込み
ReaderWriterState
ReaderWriterStateモナドを使った畳込み

直接の目的はボクが常用しているFoldableによる畳込みとReaderWriterStateモナドによる畳込みを比較することです。

同時に色々なアプローチの比較も行い、それぞれのアプローチの使い所を探っていきます。

準備

まずプログラムが扱うドメインのオブジェクトを定義します。

object Domain {
  type Record = Map[String, Any]
  type Errors = Vector[ErrorRecord]
  type Reason = String
  case class ErrorRecord(record: Record, reason: Reason)
  case class RecordsState(totalCount: Int = 0, errorCount: Int = 0) {
    def success = copy(totalCount = totalCount + 1)
    def error = RecordsState(totalCount + 1, errorCount + 1)
  }
  trait ExecutionContext {
    def verify(r: Record): Option[Reason]
  }
  class DefaultExecutionContext() extends ExecutionContext {
    def verify(r: Record): Option[Reason] =
      if (r.get("id").isEmpty)
        Some("No id")
      else
        None
  }
}

以下の型、case class、クラスを定義しました。

Record
レコード
ErrorRecord
エラーとなったレコードと理由
Reason
エラー理由
Errors
ErrorRecordの集まり
RecordState
処理状況を記録
ExecutionContext
実行コンテキストのインタフェース
ExecutionContextImpl
実行コンテキストの実装

単にエラーコードを選り分けるだけでなく、以下の機能を実現できるようにしています。

  • 実行状況をRecordStateに記録して取得可能にしている。
  • エラーの判定のロジックをExecutionContextとして指定可能にしている。

Foldable

FP(Functional Programming)で一般的な畳込み処理です。

object FoldLeft {
  import Domain._
  case class Z(
    context: ExecutionContext,
    records: Vector[Record] = Vector.empty,
    errors: Vector[ErrorRecord] = Vector.empty
  ) {
    def result = (records, errors)

    def +(rhs: Record): Z = context.verify(rhs) match {
      case Some(reason) => copy(errors = errors :+ ErrorRecord(rhs, reason))
      case None => copy(records = records :+ rhs)
    }
  }

  def fold(
    xs: Vector[Record]
  )(implicit context: ExecutionContext): (Vector[Record], Vector[ErrorRecord]) =
    xs.foldLeft(Z(context))(_+_).result
}

VectorのfoldLeft関数を使って畳込み処理を行います。

case classを使う手法はボクが個人的に使っているもので一般的ではないと思いますが、特に難しくはないと思います。(「foldの小技」)

FoldLeftのfold関数はExecutionContextを暗黙パラメタとして受取りcase class Zのパラメタとして渡しています。case class ZはこのExecutionContextを使用してロジックを実行します。ロジックの可変部分をExecutionContextに分離することでfold関数の処理をチューニング可能な構造になっています。

Monoid

次はMonoidを使った畳込みを考えてみます。

object FoldableMonoid {
  import Domain._
  case class Z(
    records: Vector[Record] = Vector.empty,
    errors: Vector[ErrorRecord] = Vector.empty
  ) {
    val context: ExecutionContext = new ExecutionContextImpl()
    def result = (records, errors)

    def +(rhs: Record): Z = context.verify(rhs) match {
      case Some(reason) => copy(errors = errors :+ ErrorRecord(rhs, reason))
      case None => copy(records = records :+ rhs)
    }

    def +(rhs: Z): Z = copy(
      records = records ++ rhs.records,
      errors = errors ++ rhs.errors
    )
  }

  object Z {
    def empty = Z()
    def point(rec: Record) = empty + rec
  }

  implicit object ZMonoid extends Monoid[Z] {
    def zero = Z.empty
    def append(lhs: Z, rhs: => Z) = lhs + rhs
  }

  def fold(xs: Vector[Record]): (Vector[Record], Vector[ErrorRecord]) = {
    xs.foldMap(Z.point).result
  }
}

モノイドの場合、実行コンテキストの意図のExecutionContextを外部から与えることは筋悪そうなので固定のものを使うことにしています。

評価

コーディング的には、Monoid計算に適合するように各種関数を用意したり、型クラスMoonoidの型クラスインスタンを定義したりという手間がかかります。

何回も使用するロジックの場合はよいですが、その場限りのロジックの場合はコーディングのオーバーヘッドの方が大きくなるのでFoldable方式の方がよさそうです。

またExecutionContextを外付けで与えることができないのはかなり大きな問題です。

Monoidについては、すでにMonoidがある場合はそれを利用するのがよいですが、畳込みのために、わざわざMonoidのメカニズムを積極的に使うというほどではないようです。

State

次はStateモナドを使った畳み込みです。

object StateWithTraverse {
  import Domain._
  case class Z(
    context: ExecutionContext,
    records: Vector[Record] = Vector.empty,
    errors: Vector[ErrorRecord] = Vector.empty
  ) {
    def result = (records, errors)

    def +(rhs: Record): Z = context.verify(rhs) match {
      case Some(reason) => copy(errors = errors :+ ErrorRecord(rhs, reason))
      case None => copy(records = records :+ rhs)
    }
  }

  def fold(
    xs: Vector[Record]
  )(implicit context: ExecutionContext): (Vector[Record], Vector[ErrorRecord]) = {
    val ts = xs.traverseS(x => State[Z, Unit] {
      case s => (s + x, ())
    })
    ts.exec(Z(context)).result
  }
}

scalazの型クラスTraverseにはStateモナドを使って走査する機能があります。Stateモナドで畳み込み動作をするようにしておけば、Traverseでの走査の過程で畳み込みを行うことができます。

ここではFoldableで使用したcase class Zと同じものをStateモナドでの状態として使用する実装を行っています。

実行コンテキストであるExecutionContextは状態の一部として受渡しています。

評価

Stateモナドの使い方に慣れていれば、Foldableとほぼ同じような手間でプログラミングすることができます。

ただ、Stateモナド実行のオーバヘッドなどを考えるとFoldableで間に合っているものをわざわざStateモナド化する必然性はなさそうです。

再利用可能なStateモナド部品を作った時に、このロジックで畳み込みに利用することも可能という選択肢として考えておくとよいと思います。

Stateモナドは畳込みの汎用ロジック向けではなく、以下の記事にまとめたように状態遷移/状態機械を作る時のキーパーツとして考えていくのがよさそうに思いました。

Traverse&State

Stateモナドは、1つの処理ごとに型パラメータAで示す処理結果を出力する機能があり、for式などで組み合わせる時にパラメタとして受け渡しすることで、全体として複雑な処理を記述できる機能を持っているのですが、traverseSによる畳込みの場合はここの部分で、走査結果を蓄積する形になります。

このためTraverseとStateを組み合わせる場合、Traverseの機能を活用してStateの実行結果をTraverse側に蓄積させることができるので、その点を活かした実装に改良してみました。

object TraverseState {
  import Domain._
  case class Z(
    context: ExecutionContext,
    records: Vector[Record] = Vector.empty,
    errors: Vector[ErrorRecord] = Vector.empty
  ) {
    def +(rhs: Record): Z = context.verify(rhs) match {
      case Some(reason) => copy(errors = errors :+ ErrorRecord(rhs, reason))
      case None => copy(records = records :+ rhs)
    }

    def apply(rhs: Record): (Z, Option[Record]) = context.verify(rhs) match {
      case Some(reason) => (copy(errors = errors :+ ErrorRecord(rhs, reason)), None)
      case None => (copy(records = records :+ rhs), Some(rhs))
    }
  }

  def fold(
    xs: Vector[Record]
  )(implicit context: ExecutionContext): (Vector[Record], Vector[ErrorRecord]) = {
    val ts = xs.traverseS(x => State[Z, Option[Record]] {
      case s => s(x)
    })
    val (s, records) = ts.run(Z(context))
    (records.flatten, s.errors)
  }
}
評価

前節「State」は正常レコードもcase class Z経由で取得することを前提にTraverseの主ルートには「()」を渡していて、事実上封印していました。

ここでは、正常レコードをTraverseの主ルートで受け渡すことができるようにcase class Zにapplyメソッドを追加しました。

case class Zが再利用可能な汎用ロジックを実装できるのであれば、ひと手間かけてapplyメソッドを追加しておくことで、利用範囲が広がります。

ただ、foldLeftよりはやや手間がかかるのは「State」と同じなので、一度限りのロジックに対して普段使いで適用する感じではなさそうです。

Reducer

ちょっと脱線してReducerを使った畳込みを考えてみました。

Reducerは畳み込み処理の中のデータを足し込む処理を汎用化したものです。畳込み対象がMonoidでなく、畳込み結果がMonoidである場合に使用できます。畳み込み処理の走査処理を汎用化(左畳み込み、右畳み込みの最適選択)したGeneratorと組み合わせて使用するのが基本的な使い方のようです。

object Reducer {
  import Domain._
  case class Z(
    records: Vector[Record] = Vector.empty,
    errors: Vector[ErrorRecord] = Vector.empty
  ) {
    val context: ExecutionContext = new ExecutionContextImpl()
    def result = (records, errors)

    def +(rhs: Record): Z = context.verify(rhs) match {
      case Some(reason) => copy(errors = errors :+ ErrorRecord(rhs, reason))
      case None => copy(records = records :+ rhs)
    }

    def +(rhs: Z): Z = copy(
      records = records ++ rhs.records,
      errors = errors ++ rhs.errors
    )
  }

  object Z {
    def empty = Z()
    def point(rec: Record) = empty + rec
  }

  implicit object ZMonoid extends Monoid[Z] {
    def zero = Z()
    def append(lhs: Z, rhs: => Z) = lhs + rhs
  }

  def fold(xs: Vector[Record]): (Vector[Record], Vector[ErrorRecord]) = {
    val reducer = UnitReducer((x: Record) => Z.point(x))
    val G = Generator.FoldlGenerator[Vector]
    G.reduce(reducer, xs).result
  }
}

ReducerはMonoid以外の畳込み対象を一度Monoidに変換してから畳み込むというロジックなので、畳込みがMonoidの機能範囲に限定されます。

今回のケースではExecutionContextを外付けにするのが難しいので、Monoidであるcase class Zが内部で固定で持っています。

評価

ReducerはMonoid以外の要素の列をMonoidに畳み込む時のアダプタ的な機能と考えると分かりやすいと思います。ただ、このための機能としてはFoldableのfoldMapコンビネータという非常に汎用的な機能があるので、Reducerをわざわざ使うシーンはあまりなさそうです。

また、色々と糊コードを書かないといけないのとMonoidの制約が入ってくるので、汎用の畳込み機能として使うのはお得ではなさそうということも確認できました。

ReducerはscalazのreduceUnordered関数で並列実行したTaskの結果の順不同畳込みに使用されています。こういった、特別な用途向けの機能と考えておくとよさそうです。

ReaderWriterState

それでは本命のReaderWriterStateモナドを使ってみます。

object ReaderWriterStateFold {
  import scala.language.higherKinds
  import Domain._

  def run[C[_]: Foldable, X, R, W: Monoid, S, A: Monoid](xs: C[X], rws: X => ReaderWriterState[R, W, S, A], r: R, s: S): (W, A, S) = {
    case class RWSZ(
      writer: W = Monoid[W].zero,
      outcome: A = Monoid[A].zero,
      state: S = s
    ) {
      def result = (writer, outcome, state)
      def apply(r: R, x: X) = {
        val (rw, ra, rs) = rws(x).run(r, state)
        RWSZ(rw, ra, rs)
      }
    }
    xs.foldLeft(RWSZ())(_.apply(r, _)).result
  }

  case class Z(
    records: Vector[Record] = Vector.empty,
    errors: Vector[ErrorRecord] = Vector.empty
  ) {
    def result = (records, errors)

    def apply(context: ExecutionContext, rhs: Record) = {
      val z = context.verify(rhs) match {
        case Some(reason) => copy(errors = errors :+ ErrorRecord(rhs, reason))
        case None => copy(records = records :+ rhs)
      }
      (z.errors, z.records, z)
    }
  }

  def fold(
    xs: Vector[Record]
  )(implicit context: ExecutionContext): (Vector[Record], Vector[ErrorRecord]) = {
    def rws(a: Record) = ReaderWriterState[ExecutionContext, Vector[ErrorRecord], Z, Vector[Record]] {
      case (r, s) => s.apply(r, a)
    }
    val (errors, records, z) = run(xs, rws, context, Z())
    (records, errors)
  }
}

run関数は汎用関数なので、今回の用途向けに作成した部分はcase class Zとfold関数だけなのでそれほど大きくはありません。run関数を再利用することを前提にすると、ほとんどStateやTraverse&Stateと同じ手間で畳み込み処理を書くことができます。

ReaderWriterStateモナドは、Stateモナドの持つStateモナド自身と処理結果の出力に加えて実行コンテキストなどの参照専用データの受け渡し(Reader)とログ的な蓄積データの出力(Writer)の機能を持っています。

run関数では、引数に処理対象のVectorとReaderWriterStateモナド、実行コンテキストのExecutionContextと状態を持つcase class Zの初期値を渡しています。実行コンテキストと状態の初期値を外部から与えることができるので、ReaderWriterStateモナドの振る舞いを実行時にカスタマイズできる構造になっています。

run関数の返却値はStateモナドの実行結果の正常レコードとWriterに蓄積されたエラーコード、そしてState(case class Z)の最終結果です。

評価

run関数を事前に用意しておけば、Traverse&Stateモナドとほとんど変わらない使い勝手で使うことができることが確認できました。

Stateモナドの場合は、実行コンテキスト(ExecutionContext)の指定と蓄積データ(エラーレコード)の取得を状態(case class Z)の中に自分で実装する必要がありました。

一方、ReaderWriterStateモナドでは、実行コンテキストの指定は蓄積データの取得はReaderWriterStateモナドの機能としてもっているので、状態と計算結果に実装上の注意を集中することができます。また、実行コンテキストと蓄積データのインタフェースが決まっているので、部品として組み合わせることも可能になります。

本例でもそうであったように、多くの用途ではReaderWriterStateモナドが提供する機能で要件が満たせる事が多いのではないかと思います。そうであるならば、ReaderWriterStateモナドが提供する汎用機能を使って再利用可能な部品を作ることで、部品の再利用を促進できる可能性が高いと考えられます。

考察

畳み込み処理に関しては、一度限りのロジックであるならばFoldableのfoldLeftを使って普通に書くのが一番開発効率がよさそうです。

一方、StateモナドやReaderWriterStateモナドを使って畳込みを実装するのも、それほどの手間でないことも確認できました。StateモナドやReaderWriterStateモナドにピッタリ合うケースでは、一度限りののロジックでもこれらのモナドを使うのもありそうです。

StateモナドやReaderWriterStateモナドは共通部品向けの汎用インタフェースという意味合いが大きいですが、使い方が難しいと積極的には使いづらいところです。どちらのモナドもわりと簡単に使えることが分かったのが収穫でした。StateモナドやReaderWriterStateモナドをつかって再利用可能な部品を整備していく方向性が実現可能という感触を得ることができました。

また、Stateモナド、ReaderWriterStateモナド、Monoid、Reducerの機能差も改めて確認することができました。

当面は以下のような方針で適用していきたいと考えています。

  • 一度限りのロジックはfoldLeft(普通の畳込み)
  • 再利用可能な処理で実行コンテキストがなくMonoid化できるものはMonoid
  • 共通部品はReaderWriterStateモナド化を考える(実行コンテキスト&蓄積データ&計算結果&状態)
  • 必要に応じてStateモナドやReducerを使う

諸元

  • Scala 2.11.7
  • Scalaz 7.2.0