チューニングや導入の苦労も想像せずに,SparkがあればMapReduce1は過去のものだと思っている人もいそうですが...
特にSpark SQLは実装がまだ甘いこともあって,非常に単純なクエリだとHiveにあっさり負けます.理由のうちの一つは,Hive SQL on Sparkということで,SparkがHive側に寄ったこともあるのですが...
単純なクエリ.Word Countよりも単純な,タダのCOUNT(*).
Hiveのデータベースには,400Gバイトほどのデータがあるとします.
>hdfs dfs -du -h /user/hive/warehouse/metameta.db/ 405.3 G /user/hive/warehouse/metameta.db/user_log
ただし,Hiveの常套手段として,日別にパーティションが切られていて,これからクエリを投げる対象は60Gバイトくらいな感じです.
> hdfs dfs -du /user/hive/warehouse/metameta.db/user_log|grep 2014-02 | awk '{sum+=$1}END{print sum/1024/1024/1024" G"}'
60.8193 GあるIntのカラムの総和を計算してみます.Hiveで.
> time hive --database=metameta -e "select count(*) from user_log where (dt >= '2014-02-01' and dt <= '2014-02-28') and tcnt>10" MapReduce Total cumulative CPU time: 0 days 3 hours 4 minutes 15 seconds 540 msec Ended Job = job_1401264313901_26799 MapReduce Jobs Launched: Job 0: Map: 1116 Reduce: 1 Cumulative CPU: 11055.54 sec HDFS Read: 65304557940 HDFS Write: 7 SUCCESS Total MapReduce CPU Time Spent: 0 days 3 hours 4 minutes 15 seconds 540 msec OK _c0 991398 Time taken: 178.672 seconds real 3m4.654s user 0m33.209s sys 0m0.831s
3分位で一ヶ月の集計が終わりです.
同じことをSparkで,Hive on Sparkで.
> ./bin/spark-shell --master yarn-client --driver-class-path /usr/local/hive/lib/mysql-connector-java.jar --executor-cores 8 --num-executors 12
scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@158f8cfe
scala> import hiveContext._
import hiveContext._
scala> val res = hql("select count(*) from metameta.user_log where (dt >= '2014-02-01' and dt <= '2014-02-28') and tcnt>10")
res: org.apache.spark.sql.SchemaRDD =
SchemaRDD[0] at RDD at SchemaRDD.scala:98
== Query Plan ==
Aggregate false, [], [SUM(PartialCount#17L) AS c_0#0L]
Exchange SinglePartition
Aggregate true, [], [COUNT(1) AS PartialCount#17L]
Project []
Filter (tt#12:0 > 10)
HiveTableScan [tsumcnt#12], (MetastoreRelation metameta, user_log, None), Some(((dt#2 >= 2014-02-01) && (dt#2 <= 2014-02-28)))
scala> res.map(t=>t(0)).collect().foreach(println)
14/07/10 16:36:30 INFO spark.SparkContext: Job finished: collect at <console>:20, took 1274.304651956 s
991398集計結果は等しく991398ですが,Hiveだと3分,Hive on Sparkだと20分以上かかりました.
Sparkの方が秒にして一桁遅い.約180秒のHiveと約1300秒のSpark.
もちろんこれは意地悪な例で,例えば,結果が複数出るクエリを投げる.これもシンプル過ぎる例ですが.
scala> val res = hql("select sum(tcnt),sum(addtcnt) from metameta.user_log where (dt >= '2014-02-01' and dt <= '2014-02-28') and tcnt>10")
(中略)
14/07/10 21:11:19 INFO spark.SparkContext: Job finished: collect at <console>:20, took 1189.674418821 s
count: 13693172,60772また20分近くかかってしまいましたが...
そしてこの結果から,さらに別な統計量を求める.
14/07/10 21:11:19 INFO spark.SparkContext: Job finished: collect at <console>:20, took 1189.674418821 s count: 13693172,60772 scala> res.map(t=>"average: " + ((t(0).asInstanceOf[Long] + t(1).asInstanceOf[Long])/2)).collect().foreach(println) 14/07/10 21:17:33 INFO spark.SparkContext: Job finished: collect at <console>:20, took 1.613828991 s average: 6876972
統計量といっても,2量の相加平均でしたが...結果がキャッシュされているので,1.6秒で出ます.
この場合は2数の値が出ているのだからSparkじゃなくても自分で計算すれば良いとかいろいろあるかと思いますが,ここは例えばScala shellを使うのだったら,Scalaのプログラムを書けばどんな計算でもできるので,例が悪いということでお許し下さい.
だいたい,Hiveで長い時間かけて計算させて,「クエリ間違った!」とか「一つ値求めるの忘れた!」とか,何度あったことか...Hiveなら最初からやり直しですが,Hive on Sparkなら,終わってからガッカリせずに,その後1秒で忘れたものを取り出すことが可能かもしれません(上記の例は,実体験に基づく...).
何より,既存のHadoopなりYARNなりHDFSなりHiveなりと完全共存できるのが素晴らしいです.なので導入が手軽です.
Spark SQL with Hadoop Hiveにハマる - なぜか数学者にはワイン好きが多い
(↑まぁ,手軽とはいえ悩んだのですが...)