ファイルからの実行
前回はspark-shellをとりあえず使ってみたので今回はそれをまとめてjarファイルから実行したいと思います
上記参考に一連の流れをまとめてプロジェクトにして実行してみます
レスポンスコードのカウント
- sample.scala
package sample
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object Sample {
def main(args: Array[String]) {
val logFile = "/var/log/nginx/access.log-20150627"
val conf = new SparkConf().setAppName("sample")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter( line => line.contains(" 200 ")).count()
val numBs = logData.filter( line => line.contains(" 304 ")).count()
val total = logData.count()
println("total lines: %s".format(logData.count()))
println("Lines with 200: %s, Lines with 304: %s".format(numAs, numBs))
}
}
- sample.sbt
name := "sampleProject" version := "1.0" scalaVersion := "2.10.4" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.0"
ファイル構成は以下
[spark-sample]$ find . . ./sample.sbt ./src ./src/main ./src/main/scala ./src/main/scala/sample.scala [spark-sample]$ sbt package
target以下にjarファイルが生成されるのでspark-submitで実行してみる
[spark-sample]$ SPARK_HOME_DIR/bin/spark-submit target/scala-2.10/sampleproject_2.10-1.0.jar ..... ..... total lines: 729 Lines with 200: 42, Lines with 304: 687 ..... .....
ログが沢山流れるがprintした箇所に関しても出力されていることが確認できた
アクセスログの特定パラメータをカウントしてみる
では、もう少し実用よりの事をやってみます
fluentdで取得したアクセスログに関してURLの特定のパラメータの値がどの程度あるのかカウントしてみる
ディレクトリ構成は一度目と同様
ログの書式は以下(公開にあたり文字列を改変している箇所があります)
2015-06-28 00:06:33 app.balancer.ip-10-0-2-1.access_log {"host":"111.11.111.111","user":"-","method":"GET","path":"/z?app=1.9.1","code":200,"size":5,"referer":"-","agent":"piggy/1.9.1 CFNetwork/711.1.16 Darwin/14.0.0","request_time":0.012,"forwarded_for":"111.11.111.111","cookie":"-"}
2015-06-28 00:06:33 app.balancer.ip-10-0-2-1.access_log {"host":"22.22.22.222","user":"-","method":"GET","path":"/i?app=1.1.6&identifier=B4C97925-FB29-4F0E-ADA9-713C4722B9BF&m_id=88&system=8.3","code":200,"size":20770,"referer":"-","agent":"Mozilla/5.0 (iPhone; CPU iPhone OS 8_3 like Mac OS X) AppleWebKit/600.1.4 (KHTML, like Gecko) Mobile/12F70","request_time":0.034,"forwarded_for":"22.22.22.222","cookie":"-"}
- sample.sbt
name := "sampleProject" version := "1.0" scalaVersion := "2.10.4" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.0"
- samole.scala
package sample
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import scala.util.parsing.combinator.RegexParsers
import scala.util.parsing.json.JSON
object Sample {
val urlPattern = "identifier=(.*?)&".r
def parseParam(path: String): String = {
val m = urlPattern.findFirstMatchIn(path)
if ( m.isDefined ) return m.get.group(1)
return "-"
}
def main(args: Array[String]) {
val logFile = "/home/vagrant/spark/xaaaa"
//sparkの設定
val conf = new SparkConf().setAppName("sample").set("spark.executer.memory", "2g").set("spark.executer.extraClassPath", "/home/vagrant/spark/lib_managed/jars")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
// 日付 時間\t tag名\tjsondata
// jsonデータの部分だけ抜き出す
val jsonDatas = logData.map(line => line.split("\t") ).map( row => row.splitAt(2)._2 ).map( line => line.mkString(" ") )
// jsonへの変換
// path部分だけ抜き出し
val dummyMap:Map[String,String] = Map( "notJson" -> "-" )
val paths = for {
jsonraw <- jsonDatas
jsonobj <- JSON.parseFull(jsonraw).getOrElse( dummyMap ).asInstanceOf[Map[String,String]]
if jsonobj._1 == "path"
} yield jsonobj
val countByIdentifier = paths.map( path => parseParam(path._2)).countByValue().toSeq.sortBy(_._2)
countByIdentifier.foreach( println )
}
}
結果
..... ..... ..... ..... (1323A2FB-9624-4E5D-A334-15F06A29C6B3,98) (D038639F-6F67-42C2-B524-8DB5593CA2E5,109) (18907EA6-57F1-4985-ADC2-89A654FB46C3,194) (85231974-08D8-44F5-8F95-BB176B0693D1,279) (,363) (-,73841)
一応成功!
elasticsearchに突っ込んでみるところまでやる予定だったけど、コンパイルまでできるが実行時に出る「java.lang.NoClassDefFoundError」が解決できず。。。
次はエラー含めウィンドウ集計、elasticsearchとの連携、apache zeppelin、クラスタ構成、あたりまでできたらと思います