scala を使ってみたいなということでscalaで書けるsparkを見てみたいと思います
javaが入っていることが前提
今回はopenjdk1.8をインストールしてます
scalaのインストール
$ rpm -ivh http://downloads.typesafe.com/scala/2.11.6/scala-2.11.6.rpm $ scala -version Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL
インストール
- clone
git clone git://github.com/apache/spark.git
- build
$ cd spark $ build/sbt $ build/sbt assembly
spark-shell
- scalaのREPLでsparkの処理を実行できる
試しにいくつか実行してみる
$ ./bin/spark-shell
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.5.0-SNAPSHOT
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_79)
Type in expressions to have them evaluated.
Type :help for more information.
15/06/22 16:08:22 WARN Utils: Your hostname, localhost.localdomain resolves to a loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface eth0)
15/06/22 16:08:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Spark context available as sc.
SQL context available as sqlContext.
こんな感じでログがたくさん出てくる
ちなみに「sc」はSparkContextクラスのインスタンス
sample.txtの中の単語をカウントしてみます
- sample.txt
[spark]$ cat sample.txt aaa bbb ccc ddd eee fff aaa ddd ccc aaa aaa aaa bbb ddd aaa ccc
- spark-shell
scala> val textFile = sc.textFile("/home/vagrant/spark/sample.txt")
textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21
# 行数を取得
scala> textFile.count()
res0: Long = 4
# 単語カウントを取得
scala> val wordCount = textFile.flatMap( line => line.split(" ") ).map(word => ( word , 1 )).reduceByKey((a,b) => a + b)
wordCount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:23
# 出力
scala> wordCount.foreach( println )
(bbb,2)
(ddd,3)
(fff,1)
(eee,1)
(ccc,3)
(aaa,6)
Scalaで書くコードとほぼ変わらない感じで書けるとのことで、結構四苦八苦しながら書いてみました
.cacheでキャッシュに乗せることができて繰り返し処理する場合に早くなるよう
下記で1s -> 31msになったのを確認した
# 実ファイル
scala> val file = sc.textFile("/home/vagrant/spark-sample-access/xaa")
file: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at textFile at <console>:21
scala> file.count()
res2: Long = 200000
scala> val filtered = file.filter( line => line.contains("identifier"))
filtered: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at filter at <console>:23
scala> filtered.cache()
res4: filtered.type = MapPartitionsRDD[7] at filter at <console>:23
scala> filtered.count()
res5: Long = 84087
scala> filtered.count()
res6: Long = 84087
spark-ui
spark-shell起動中は4040番ポートでjobの進捗具合などが見れる

どんな処理をしてるか、どのくらいの実行時間だったかなども見れる模様
上記でcacheする前と後で結果が変わってるのを確認できた
今度はjarファイルを実行してみます