Spark SQL 1.3から以下の2つの機能が導入されています。
- DataSourceとしてJDBCが使えるようになった
- DataFrame
この2つの機能追加によってSpark SQLを汎用のバッチ処理基盤にできるのではないかというインスピレーションが湧きました。
この実現目的でSparkバッチをスタンドアロンで実行するためのDockerイメージspark-sql-scala-dockerを作ってみた、というのが今回のお話です。
Spark SQL
Spark SQLは、Sparkの分散計算処理をSQLで記述できるようにしたものです。SQLとSpark本来のmonadicなAPI(e.g. filter, map, flatMap)を併用して計算処理を記述することができます。
このプログラミングモデルは非常に強力で、大枠の絞り込みはSQLで行っておいて、アプリケーションに特化した検索ロジックをScalaで記述したUDF(User Defined Function)で補完するといった処理を、プログラミング言語的に簡潔に記述することができます。
Spark SQLの基本機能に加えて1.3から以下の機能も使えるようになりました。
DataSourceのJDBC対応
DataSourceとしてJDBCが使えるようになったことで、RedShift上にためた分析データなどから直接データを取得できるようになりました。MySQLやPostreSQLなどのデータを一旦S3に変換するといった準備タスクが不要になったので、ジョブ作成の手間が大きく低減すると思います。
小さな機能追加ですが、実運用上のインパクトは大きいのではないかと思います。
DataFrame
大きな機能追加としてはDataFrameが導入されました。
DataFrameは表形式の大規模データを抽象化したAPIで、元々はR/Pythonで実績のある機能のようです。
DataFrameは分析専用のAPIではなく、表形式データ操作の汎用APIとして使用できるのではないかと期待しています。計算結果を外部出力する際の汎用機能としても期待できます。
もちろんR/Pythonなどのデータ分析処理系との連携も期待できそうです。
Spark SQLの用途
Spark SQLの基本機能と上記の2つの機能追加によって、Sparkバッチを大規模(データ量/計算量)向けデータ処理基盤としてだけではなく、汎用のバッチ実行基盤として使えるようになるのではないかとインスピレーションが湧いたわけです。
データ集計用のバッチをSparkバッチとして作成して、データ量、計算量に応じてスタンドアロンジョブとSparkクラスタ上でのジョブのいずれかでジョブ実行するというユースケースです。
そのベースとして、Sparkクラスタを用いないスタンドアロンジョブとして実行するためのDockerイメージを作ってみました。
spark-sql-scala-docker
spark-sql-scala-dockerはSparkアプリケーションをスタンドアロンで実行するためのDockerイメージです。
GitHubにソースコードがありますので、詳細はこちらを参照して下さい。
以下では、実装上のポイントと使い方について説明します。
Dockerfile
spark-sql-scala-dockerのDockerfileは以下になります。
FROM sequenceiq/spark:1.3.0 RUN mkdir -p /opt/spark/lib RUN cd /opt/spark/lib && curl -L 'http://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.30.tar.gz' -o - | tar -xz --strip-components=1 mysql-connector-java-5.1.30/mysql-connector-java-5.1.30-bin.jar RUN curl -L 'http://jdbc.postgresql.org/download/postgresql-9.2-1002.jdbc4.jar' -o /opt/spark/lib/postgresql-9.2-1002.jdbc4.jar ENV SPARK_CLASSPATH /opt/spark/lib/mysql-connector-java-5.1.30-bin.jar:/opt/spark/lib/postgresql-9.2-1002.jdbc4.jar RUN rpm -ivh http://ftp-srv2.kddilabs.jp/Linux/distributions/fedora/epel/6/x86_64/epel-release-6-8.noarch.rpm RUN yum -y install redis --enablerepo=epel COPY spark-defaults.conf /opt/spark-defaults.conf COPY entrypoint.sh /opt/entrypoint.sh ENV COMMAND_JAR_DIR /opt/command.d ENV COMMAND_JAR_NAME command.jar VOLUME [$COMMAND_JAR_DIR"] ENTRYPOINT ["/opt/entrypoint.sh"]
Dockerイメージsequenceiq/spark:1.3.0をベースにしていて以下の調整だけ行っています。
- MySQLとPostgreSQLのJDBCドライバのインストール
- Sparkアプリケーションの登録処理
entrypoint.sh
spark-sql-scala-dockerのentrypoint.shは以下になります。
#! /bin/bash
# WAIT_CONTAINER_TIMER
# WAIT_CONTAINER_FILE
# WAIT_CONTAINER_KEY
# set -x
set -e
echo MySQL host: ${MYSQL_SERVER_HOST:=$MYSQL_PORT_3306_TCP_ADDR}
echo MySQL port: ${MYSQL_SERVER_PORT:=$MYSQL_PORT_3306_TCP_PORT}
echo PostgreSQL host: ${POSTGRESQL_SERVER_HOST:=$POSTGRESQL_PORT_5432_TCP_ADDR}
echo PostgreSQL port: ${POSTGRESQL_SERVER_PORT:=$POSTGRESQL_PORT_5432_TCP_PORT}
echo Redis host: ${REDIS_SERVER_HOST:=$REDIS_PORT_6379_TCP_ADDR}
echo Redis port: ${REDIS_SERVER_PORT:=$REDIS_PORT_6379_TCP_PORT}
export MYSQL_SERVER_HOST
export MYSQL_SERVER_PORT
export POSTGRESQL_SERVER_HOST
export POSTGRESQL_SERVER_PORT
export REDIS_SERVER_HOST
export REDIS_SERVER_PORT
function wait_container {
if [ -n "$REDIS_SERVER_HOST" ]; then
wait_container_redis
elif [ -n "$WAIT_CONTAINER_FILE" ]; then
wait_container_file
fi
}
function wait_container_redis {
result=1
for i in $(seq 1 ${WAIT_CONTAINER_TIMER:-100})
do
sleep 1s
result=0
if [ $(redis-cli -h $REDIS_SERVER_HOST -p $REDIS_SERVER_PORT GET $WAIT_CONTAINER_KEY)'' = "up" ]; then
break
fi
echo spark-sql-scala-docker wait: $REDIS_SERVER_HOST
result=1
done
if [ $result = 1 ]; then
exit 1
fi
}
function wait_container_file {
result=1
for i in $(seq 1 ${WAIT_CONTAINER_TIMER:-100})
do
sleep 1s
result=0
if [ -e $WAIT_CONTAINER_FILE ]; then
break
fi
echo spark-sql-scala-docker wait: $WAIT_CONTAINER_FILE
result=1
done
if [ $result = 1 ]; then
exit 1
fi
}
COMMAND_JAR=$COMMAND_JAR_DIR/$COMMAND_JAR_NAME
wait_container
sed -i "s!hdfs://.*:9000!file:\/\/\/tmp!g" /usr/local/hadoop/etc/hadoop/core-site.xml
spark-submit --properties-file /opt/spark-defaults.conf $COMMAND_JAR基本的にはspark-submitでSparkアプリケーションのジョブをサブミットしているだけですが、以下の2つの調整を行っています。
- Redisを使って他のコンテナの待ち合わせ
- 中間データのローディング先をHDFSではなくローカルファイルに変更する
コンテナの待ち合わせ
Sparkアプリケーションを動作させる前の準備を他のコンテナで進める場合は、コンテナの待ち合わせが必要になります。この待ち合わせをmysql-java-embulk-dockerと同様にRedisを用いて実現しています。
典型的な使用例は、Sparkアプリケーションのテスト実行時でのテストDBの準備です。この実例は後ほどサンプルで説明します。
中間データのローディング先
core-site.xmlの変更処理です。
sed -i "s!hdfs://.*:9000!file:\/\/\/tmp!g" /usr/local/hadoop/etc/hadoop/core-site.xml
ここの設定を変更しないとDocker環境内でスタンドアロンでは動かなかったので設定変更しています。
設定変更の方法としてはHDFSを動くようにするという方式もあるのですが、スタンドアプリケーションなのでここではローカルのファイルを使う方式で対応しています。
Docker Hub
mysql-java-embulk-dockerと同様にspark-sql-scala-dockerもDocker Hubの自動ビルドの設定を行っているので、以下の場所にDockerイメージが自動ビルドされます。
このイメージは「asami/spark-sql-scala-docker」という名前で利用することができます。
サンプル
Dockerイメージ「asami/spark-sql-scala-docker」を利用してテストデータの投入を行うサンプルを作ってみます。
手元の環境上でテスト目的で動作させるためmysql-java-embulk-dockerを併用してテストデータの投入を行っています。
サンプルのコードはGitHubのspark-sql-scala-dockerのsampleディレクトリにあるので、詳細はこちらを参照して下さい。
SimpleApp.scala
サンプルのSparkバッチであるSimpleAppのプログラムは以下になります。
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SQLContext, DataFrame}
object SimpleApp extends App {
val batting = SparkSqlUtils.createMysqlDataFrame("Simple Application", "batting")
val count = batting.count()
println(s"count = ${batting.count()}")
}
object SparkSqlUtils {
def createSqlContext(name: String): SQLContext = {
val conf = new SparkConf().setAppName(name)
val sc = new SparkContext(conf)
new SQLContext(sc)
}
def createMysqlDataFrame(name: String, table: String): DataFrame = {
val sqlc = createSqlContext(name)
createMysqlDataFrame(sqlc, table)
}
def createMysqlDataFrame(sqlc: SQLContext, table: String): DataFrame = {
val host = System.getenv("MYSQL_SERVER_HOST")
val port = System.getenv("MYSQL_SERVER_PORT")
val user = System.getenv("MYSQL_SERVER_USER")
val password = System.getenv("MYSQL_SERVER_PASSWORD")
sqlc.load("jdbc", Map(
"url" -> s"jdbc:mysql://$host:$port/baseball?user=$user&password=$password",
"dbtable" -> table
))
}
}SparkSqlUtilsにDataFrame取得処理をまとめています。ここは汎用ライブラリ化できるところです。
この処理を除いた以下の処理がSparkバッチの本体です。
val batting = SparkSqlUtils.createMysqlDataFrame("Simple Application", "batting")
val count = batting.count()
println(s"count = ${batting.count()}")アプリケーションロジック
指定したテーブル"batting"に対応したDataFrameを取得し、countメソッドでレコード総数を取得し、その結果をコンソールに出力しています。とても簡単ですね。
この部分を以下の機能を用いて記述することで高度なバッチ処理を簡単に記述できます。
- DataFrameによる表データ操作
- DataFrameから変換したRDDを用いてSpark計算処理
前述したように「大枠の絞り込みはSQLで行っておいて、アプリケーションに特化した検索ロジックをScalaで記述したUDF(User Defined Function)で補完するといった処理を、プログラミング言語的に簡潔に記述することができます。」
移入・移出
テーブル"batting"をDataFrameとしてローディングしているのは、前述のSpark 1.3の機能追加「DataSourceとしてJDBCが使えるようになった」によるものです。
また、ここでは外部出力をコンソール出力にしていますが、RDDのsaveAsTextFileメソッドやDataFrameを用いることで、S3やデータベースなどに集計結果を簡単に出力することができます。
データベースなどへの外部出力が簡単に行えるのもSpark 1.3の機能追加「DataFrame」の効果です。
ここからも分かるように、Spark SQL 1.3で導入された「DataSourceとしてJDBCが使えるようになった」と「DataFrame」により、Sparkバッチ処理の難題であったデータの移入/移出処理が極めて簡単に記述できるようになったわけです。
SBTの設定
SBTによるScalaプログラムのビルドの設定は以下になります。
Spark本体とSpark SQLを依存ライブラリとして設定している、ごくオーソドックスな設定です。
Sparkバッチ用にすべての依存ライブラリをまとめたJARファイルを作る必要があるので、sbt-assemblyの設定を行ってます。
ポイントとしては、Sparkバッチの実行環境にScalaの基本ライブラリとSpark本体/Spark SQLのライブラリが用意されているので、sbt-assemblyでまとめるJARファイルから排除する設定を行っています。
- Spark本体とSpark SQLの依存ライブラリの設定を"provided"にしてリンク対象から外す
- sbt-assemblyの設定で"includeScala = false"としてScala基本ライブラリをリンク対象から外す
これらの設定はなくても動作しますが、JARファイルが巨大になってしまいます。
name := "simple" version := "1.0" scalaVersion := "2.10.4" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.1" % "provided" libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.3.1" % "provided" assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
sbt-assemblyプラグインが必要なのでproject/assembly.sbtに以下の設定をしておきます。
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")docker-compose.yml
サンプルプログラムのdocker-compose.ymlは以下になります。
spark:
image: asami/spark-sql-scala-docker
links:
- mysql
- redis
volumes:
- target/scala-2.10:/opt/command.d
environment:
COMMAND_JAR_NAME: simple-assembly-1.0.jar
WAIT_CONTAINER_KEY: mysql-java-embulk-docker
MYSQL_SERVER_USER: baseball
MYSQL_SERVER_PASSWORD: baseball
mysql:
image: asami/mysql-java-embulk-docker
links:
- redis
ports:
- ":3306"
volumes:
- setup.d:/opt/setup.d
environment:
MYSQL_USER: baseball
MYSQL_PASSWORD: baseball
MYSQL_ROOT_PASSWORD: baseball
MYSQL_DATABASE: baseball
redis:
image: redis
ports:
- ":6379"自前ではDockerイメージを作らず、以下の3つの汎用Dockerイメージを再利用しています。
- asami/spark-sql-scala-docker
- asami/mysql-java-embulk-docker
- redis
asami/spark-sql-scala-docker
ボリュームと環境変数の記述で、targetscala-2.10simple-assembly-1.0.jarがSparkバッチプログラムとして認識されるようにしています。
simple-assembly-1.0.jarはsbt-assemblyで作成した「全部入り(SparkとScala以外)」のJARファイルです。
それ以外は、mysql-java-embulk-dockerと同期をとるためのおまじないです。
asami/mysql-java-embulk-docker
前回の記事「Docker Composeでデータ投入」と同じ設定です。テスト用のMySQLデータベースにテストデータを投入しています。
Batting.csvは以下のサイトからデータを取得しました。
redis
asami/mysql-java-embulk-dockerによるテストデータ投入の待ち合わせにredisを用いています。
ビルド
Sparkバッチのビルドはsbtで行います。
$ sbt assembly
テスト環境はdocker-composeのbuildコマンドでビルドします。
$ docker-compose build
実行
docker-composeのupコマンドで実行します。
$ docker-compose up
動作過程がコンソールに出力されますが、最後の方で以下のような出力があります。
spark_1 | count = 99846
無事Sparkバッチでデータ集計ができました。
まとめ
Spark SQLを汎用のバッチ処理基盤として運用する目的でSparkバッチをスタンドアロンで実行するためのDockerイメージspark-sql-scala-dockerを作ってみましたが、無事動作しました。
このことによってspark-sql-scala-dockerとmysql-java-embulk-dockerを使って手元で簡単にSparkバッチをテストできるようになりました。
汎用Dockerイメージをdocker-composeで組み合わせるだけなので運用的にも大変、楽だと思います。
今回は試していませんが、spark-sql-scala-dockerを使ってSparkバッチをECS(EC2 Container Service)などのDocker環境上でスタンドアロンバッチとして実行するという運用も可能ではないかと考えています。
もちろん、SparkバッチのJARファイルをspark-submitコマンドによるジョブ投入により直接Sparkクラスタ上で実行することでSpark本来の大規模(データ量/計算量)処理を行うことができます。
いずれの場合も、基本的に開発するのは、Scalaによる通常のSparkバッチプログラムだけです。テストやDocker環境上でのスタンドアロンバッチのいずれも汎用Dockerイメージを活用することで、簡単な設定のみで運用することができそうです。
今回の作業で上記の3つのユースケースを同時に満たせることの目処が立ちました。この成果をベースにSpark SQLを汎用のバッチ処理基盤として利用するためのノウハウの積み上げをしてきたいと思います。
諸元
- Mac OS 10.7.5
- docker 1.6
- docker-compose 1.2.0
- Spark SQL 1.3

0 件のコメント:
コメントを投稿