こちらの続き。
準備
サンプルデータは iris 。今回は HDFS に csv を置き、そこから読み取って DataFrame を作成する。
# HDFS にディレクトリを作成しファイルを置く $ hadoop fs -mkdir /data/ $ hadoop fs -put iris.csv /data/ $ hadoop fs -ls / Found 1 items drwxr-xr-x - ec2-user supergroup 0 2015-04-28 20:01 /data # Spark のパスに移動 $ echo $SPARK_HOME /usr/local/spark $ cd $SPARK_HOME $ pwd /usr/local/spark $ bin/pyspark
補足 前回同様に pandas から直接 PySpark の DataFrame を作成した場合、groupBy 時に java.lang.OutOfMemoryError: Java heap space エラーが発生してシェルごと落ちる。
CSV ファイルの読み込み
pandas では前回同様 read_csv。
import numpy as np import pandas as pd # 表示する行数を設定 pd.options.display.max_rows=10 names = ['SepalLength', 'SepalWidth', 'PetalLength', 'PetalWidth', 'Species'] # pandas pdf = pd.read_csv('~/iris.csv', header=None, names=names) pdf # 略
PySpark は標準では csv から直接 DataFrame を作成できないため、一度 Row のリストを作成して DataFrame に変換する。
from pyspark.sql import Row lines = sc.textFile("hdfs://127.0.0.1:9000/data/iris.csv") cells = lines.map(lambda l: l.split(",")) rows = cells.map(lambda x: Row(SepalLength=float(x[0]), SepalWidth=float(x[1]), PetalLength=float(x[2]), PetalWidth=float(x[3]), Species=x[4])) sdf = sqlContext.createDataFrame(rows) sdf.show() # 略
グルーピング/集約
ある列の値ごとに集計
pandas, PySpark で多少 文法は異なる。
列の値でグループ分けし、一列の合計を取得する場合:
# pandas pdf.groupby('Species')['SepalLength'].sum() # Species # setosa 250.3 # versicolor 296.8 # virginica 329.4 # Name: SepalLength, dtype: float64 # PySpark sdf.groupBy('Species').sum('SepalLength').show() # Species SUM(SepalLength) # virginica 329.3999999999999 # versicolor 296.8 # setosa 250.29999999999998
指定した複数列の合計を取得する場合:
# pandas pdf.groupby('Species')[['PetalWidth', 'PetalLength']].sum() # PetalWidth PetalLength # Species # setosa 12.2 73.2 # versicolor 66.3 213.0 # virginica 101.3 277.6 # PySpark sdf.groupBy('Species').sum('PetalWidth', 'PetalLength').show() # Species SUM(PetalWidth) SUM(PetalLength) # virginica 101.29999999999998 277.59999999999997 # versicolor 66.30000000000001 213.0 # setosa 12.199999999999996 73.2
全列の合計を取得する場合:
# pandas pdf.groupby('Species').sum() # SepalLength SepalWidth PetalLength PetalWidth # Species # setosa 250.3 170.9 73.2 12.2 # versicolor 296.8 138.5 213.0 66.3 # virginica 329.4 148.7 277.6 101.3 # PySpark sdf.groupBy('Species').sum().show() # Species SUM(PetalLength) SUM(PetalWidth) SUM(SepalLength) SUM(SepalWidth) # virginica 277.59999999999997 101.29999999999998 329.3999999999999 148.7 # versicolor 213.0 66.30000000000001 296.8 138.5 # setosa 73.2 12.199999999999996 250.29999999999998 170.90000000000003
補足 pandas では グループ化したデータも DataFrame と同じようにスライシングできたりする。
一方、PySpark の GroupedData は集約系のAPI しか持っていない。
# pandas pdf.groupby('Species')['PetalWidth'] # <pandas.core.groupby.SeriesGroupBy object at 0x7f62f4218d50> # PySpark (NG!) sdf.groupBy('Species')[['Species']] # TypeError: 'GroupedData' object has no attribute '__getitem__' sdf.groupBy('Species').select('PetalWidth') # AttributeError: 'GroupedData' object has no attribute 'select'
また、pandas では apply で自作の集約関数 (UDAF) を利用することができるが、PySpark 1.3.1 時点 では非対応らしい。PySpark の udf を利用して定義した自作関数を集約時に使うと以下のエラーになる。
# pandas pdf.groupby('Species')[['PetalWidth', 'PetalLength']].apply(np.sum) # PetalWidth PetalLength # Species # setosa 12.2 73.2 # versicolor 66.3 213.0 # virginica 101.3 277.6 # PySpark (NG!) import pyspark.sql.functions np_sum = pyspark.sql.functions.udf(np.sum, pyspark.sql.types.FloatType()) sdf.groupBy('Species').agg(np_sum(sdf.PetalWidth)) # py4j.protocol.Py4JJavaError: An error occurred while calling o334.agg. # : org.apache.spark.sql.AnalysisException: expression 'pythonUDF' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() if you don't care which value you get.;
行持ち / 列持ち変換
複数列持ちの値を行持ちに展開 (unpivot / melt)
pandas では pd.melt。 DataFrame.melt ではないので注意。
# pandas pmelted = pd.melt(pdf, id_vars=['Species'], var_name='variable', value_name='value') pmelted # Species variable value # 0 setosa SepalLength 5.1 # 1 setosa SepalLength 4.9 # 2 setosa SepalLength 4.7 # 3 setosa SepalLength 4.6 # 4 setosa SepalLength 5.0 # .. ... ... ... # 595 virginica PetalWidth 2.3 # 596 virginica PetalWidth 1.9 # 597 virginica PetalWidth 2.0 # 598 virginica PetalWidth 2.3 # 599 virginica PetalWidth 1.8 # # [600 rows x 3 columns]
同様の処理を PySpark でやるには、DataFrame.flatMap。1行の入力に対して複数行 (この例では4行) のデータを返すことができる。fratMap の返り値は RDD インスタンスになるため、必要なら再度 DataFrame 化する。
# PySpark def mapper(row): return [Row(Species=row[4], variable='PetalLength', value=row[0]), Row(Species=row[4], variable='PetalWidth', value=row[1]), Row(Species=row[4], variable='SepalLength', value=row[2]), Row(Species=row[4], variable='SepalWidth', value=row[3])] smelted = sqlContext.createDataFrame(sdf.flatMap(mapper)) smelted.show() # Species value variable # setosa 1.4 PetalLength # setosa 0.2 PetalWidth # setosa 5.1 SepalLength # setosa 3.5 SepalWidth # ... .. ... # setosa 1.4 PetalLength # setosa 0.2 PetalWidth # setosa 5.0 SepalLength # setosa 3.6 SepalWidth smelted.count() # 600L
複数行持ちの値を列持ちに変換 (pivot)
pandas では DataFrame.pivot。pivotするデータは列にする値 (以下では Species ) と行にする値 (以下では variable ) の組がユニークになっている必要がある。そのため、まず pivot 用データを作成 -> その後 pivot する。
# pandas # pivot 用データを作成 punpivot = pmelted.groupby(['Species', 'variable']).sum() punpivot = punpivot.reset_index() punpivot # Species variable value # 0 setosa PetalLength 73.2 # 1 setosa PetalWidth 12.2 # 2 setosa SepalLength 250.3 # 3 setosa SepalWidth 170.9 # 4 versicolor PetalLength 213.0 # .. ... ... ... # 7 versicolor SepalWidth 138.5 # 8 virginica PetalLength 277.6 # 9 virginica PetalWidth 101.3 # 10 virginica SepalLength 329.4 # 11 virginica SepalWidth 148.7 # # [12 rows x 3 columns] # pivot punpivot.pivot(index='variable', columns='Species', values='value') # Species setosa versicolor virginica # variable # PetalLength 73.2 213.0 277.6 # PetalWidth 12.2 66.3 101.3 # SepalLength 250.3 296.8 329.4 # SepalWidth 170.9 138.5 148.7
PySpark の DataFrame のままでは同じ処理はできないようなので、一度 RDD に変換してから、 groupBy -> map
# PySpark # pivot 用データを作成 sunpivot = smelted.groupBy('Species', 'variable').sum() sunpivot.show() # Species variable SUM(value) # versicolor SepalWidth 138.5 # versicolor SepalLength 296.8 # setosa PetalLength 73.2 # virginica PetalWidth 101.29999999999998 # versicolor PetalWidth 66.30000000000001 # setosa SepalWidth 170.90000000000003 # virginica PetalLength 277.59999999999997 # setosa SepalLength 250.29999999999998 # versicolor PetalLength 213.0 # setosa PetalWidth 12.199999999999996 # virginica SepalWidth 148.7 # virginica SepalLength 329.3999999999999 def reducer(obj): # variable : value の辞書を作成 result = {o[1]:o[2] for o in obj[1]} return Row(Species=obj[0], **result) # pivot spivot = sunpivot.rdd.groupBy(lambda x: x[0]).map(reducer) spivot.collect() # [Row(PetalLength=277.59999999999997, PetalWidth=101.29999999999998, SepalLength=329.3999999999999, SepalWidth=148.7, Species=u'virginica'), # Row(PetalLength=73.2, PetalWidth=12.199999999999996, SepalLength=250.29999999999998, SepalWidth=170.90000000000003, Species=u'setosa'), # Row(PetalLength=213.0, PetalWidth=66.30000000000001, SepalLength=296.8, SepalWidth=138.5, Species=u'versicolor')] sqlContext.createDataFrame(spivot).show() # PetalLength PetalWidth SepalLength SepalWidth Species # 277.59999999999997 101.29999999999998 329.3999999999999 148.7 virginica # 73.2 12.199999999999996 250.29999999999998 170.90000000000003 setosa # 213.0 66.30000000000001 296.8 138.5 versicolor
列の分割 / 結合
列の値を複数列に分割
ある列の値を適当に文字列処理して、新しい列を作成したい。pandas には 文字列処理用のアクセサがあるため、 assign と組み合わせて以下のように書ける。
# pandas psplitted = pmelted.assign(Parts=pmelted['variable'].str.slice(0, 5), Scale=pmelted['variable'].str.slice(5)) psplitted # Species variable value Parts Scale # 0 setosa SepalLength 5.1 Sepal Length # 1 setosa SepalLength 4.9 Sepal Length # 2 setosa SepalLength 4.7 Sepal Length # 3 setosa SepalLength 4.6 Sepal Length # 4 setosa SepalLength 5.0 Sepal Length # .. ... ... ... ... ... # 595 virginica PetalWidth 2.3 Petal Width # 596 virginica PetalWidth 1.9 Petal Width # 597 virginica PetalWidth 2.0 Petal Width # 598 virginica PetalWidth 2.3 Petal Width # 599 virginica PetalWidth 1.8 Petal Width # # [600 rows x 5 columns]
PySparkには上記のようなメソッドはないので map で処理する。
# PySpark def splitter(row): parts = row[2][:5] scale = row[2][5:] return Row(Species=row[0], value=row[1], Parts=parts, Scale=scale) ssplitted = sqlContext.createDataFrame(smelted.map(splitter)) ssplitted.show() # Parts Scale Species value # Petal Length setosa 1.4 # Petal Width setosa 0.2 # Sepal Length setosa 5.1 # Sepal Width setosa 3.5 # Petal Length setosa 1.4 # .. .. ... .. # Petal Length setosa 1.4 # Petal Width setosa 0.2 # Sepal Length setosa 5.0 # Sepal Width setosa 3.6
複数列の値を一列に結合
pandas では普通に文字列結合すればよい。
# pandas psplitted['variable2'] = psplitted['Parts'] + psplitted['Scale'] psplitted # Species variable value Parts Scale variable2 # 0 setosa SepalLength 5.1 Sepal Length SepalLength # 1 setosa SepalLength 4.9 Sepal Length SepalLength # 2 setosa SepalLength 4.7 Sepal Length SepalLength # 3 setosa SepalLength 4.6 Sepal Length SepalLength # 4 setosa SepalLength 5.0 Sepal Length SepalLength # .. ... ... ... ... ... ... # 595 virginica PetalWidth 2.3 Petal Width PetalWidth # 596 virginica PetalWidth 1.9 Petal Width PetalWidth # 597 virginica PetalWidth 2.0 Petal Width PetalWidth # 598 virginica PetalWidth 2.3 Petal Width PetalWidth # 599 virginica PetalWidth 1.8 Petal Width PetalWidth # # [600 rows x 6 columns]
PySpark では map。
# PySpark def unite(row): return Row(Species=row[2], value=row[3], variable=row[0] + row[1]) sqlContext.createDataFrame(splitted.map(unite)).show() # Species value variable # setosa 1.4 PetalLength # setosa 0.2 PetalWidth # setosa 5.1 SepalLength # setosa 3.5 SepalWidth # .. .. .. # setosa 1.4 PetalLength # setosa 0.2 PetalWidth # setosa 5.0 SepalLength # setosa 3.6 SepalWidth
補足 withColumn の場合、オペレータは 数値の演算として扱われてしまうようなのでここでは使えない。
# PySpark (NG!) ssplitted.withColumn('variable', splitted.Parts + splitted.Scale).show() # Parts Scale Species value variable # Petal Length setosa 1.4 null # Petal Width setosa 0.2 null # .. .. .. .. ..
まとめ
PySpark と pandas のデータ集約/変形処理を整理した。
データ分析用途で利用したい場合、(ごく当たり前だが) データ量が少なく手元でさっといろいろ試したい場合は pandas、データ量が比較的多く 単純な処理を全体にかけたい場合は Spark がよい。
Spark は map 系の API が充実するとさらに使いやすくなりそうだ。が、小回りの効く文法/機能が充実していくことは考えにくいので 完全に Spark だけでデータ分析をする、、という状態には将来もならないのではないかと思う。小さいデータは pandas 使いましょう。
- 作者: Holden Karau,Andy Konwinski,Patrick Wendell,Matei Zaharia
- 出版社/メーカー: O'Reilly Media
- 発売日: 2015/01/28
- メディア: Kindle版
- この商品を含むブログを見る