以下の内容はhttps://wyukawa.hatenablog.com/entry/20120403/1333451617より取得しました。


Multitable insertとhive.exec.parallel

またHiveのチューニングネタです。

以下のような単一のテーブルを入力として集計キーごとにそれぞれ個別に集計したいというケースがあります。

カラム1 カラム2 カラム3 ... 集計キー1 集計キー2 集計キー3 ...

ええと具体例が無いとわからないですよね(汗

ここで例としてまたまた郵便番号データを使ってみます。

テーブル定義はこんな感じ。

CREATE TABLE IF NOT EXISTS postdata (
    chihou_code string,
    post_code string,
    choiki_name string,
    choson_name string,
    todoufuken_name string,
    flag1 int,
    flag2 int,
    flag3 int,
    flag4 int,
    flag5 int,
    flag6 int
)

postdataテーブルを入力としてchoiki_name、choson_name、todoufuken_nameのそれぞれで個別集計したいというようなケースです。

こんな感じですね。

select choiki_name, count(1) from postdata group by choiki_name
select choson_name, count(1) from postdata group by choson_name
select todoufuken_name, count(1) from postdata group by todoufuken_name

こういうケースではMultitable insertを使うとテーブルのスキャンが1回で済むので性能があがります。
https://cwiki.apache.org/Hive/gettingstarted.html#GettingStarted-MULTITABLEINSERT

こんな感じに書きます。

from
 (
  select
    choiki_name,
    choson_name,
    todoufuken_name
  from
    postdata
 )tab

insert overwrite table choiki_name_table
select
  choiki_name,
  count(1)
group by choiki_name

insert overwrite table choson_name_table
select
  choson_name,
  count(1)
group by choson_name

insert overwrite table todoufuken_name_table
select
  todoufuken_name,
  count(1)
group by todoufuken_name

この辺は象本にも書いてあります。ただinsert先のテーブル数が多いとOutOfMemoryErrorが出ることもあるようです。

これに加えてhive.exec.parallelをtrueにすると性能アップにつながります。

<property>
  <name>hive.exec.parallel</name>
  <value>false</value>
  <description>Whether to execute jobs in parallel</description>
</property>

これはジョブを並行に実行するというものです。

上記のMultitable insertのHiveQLを実行するとジョブ数は3ですが、hive.exec.parallelをtrueにすると2番目と3番目のジョブを並列に実行します。

スレッドを使ってジョブの終了を待たずに次のジョブを実行しています。実行例は以下のような感じ。Stage-4とStage-5を並列に実行しています。

2012-04-03 19:34:09,346 Stage-4 map = 0%,  reduce = 0%
2012-04-03 19:34:16,576 Stage-5 map = 0%,  reduce = 0%
2012-04-03 19:34:47,630 Stage-4 map = 100%,  reduce = 0%
2012-04-03 19:34:57,716 Stage-5 map = 100%,  reduce = 0%
2012-04-03 19:35:40,147 Stage-4 map = 100%,  reduce = 67%
2012-04-03 19:35:46,430 Stage-4 map = 100%,  reduce = 100%
2012-04-03 19:35:46,443 Stage-5 map = 100%,  reduce = 100%

なお実行計画は下記のようになっています。Stage-4とStage-5はどちらもStage-3に依存していますが、お互いに依存性は無いので並列に実行できるわけですね。

STAGE DEPENDENCIES:
  Stage-3 is a root stage
  Stage-0 depends on stages: Stage-3
  Stage-4 depends on stages: Stage-3
  Stage-1 depends on stages: Stage-4
  Stage-5 depends on stages: Stage-3
  Stage-2 depends on stages: Stage-5

ただスレーブノード数が少ないと下記のようなエラーが出ることがあります。出た場合は並列実行を諦めましょう。

java.io.IOException: java.lang.InterruptedException
        at org.apache.hadoop.ipc.Client.call(Client.java:1083)
        at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:226)
        at org.apache.hadoop.mapred.$Proxy8.getStagingAreaDir(Unknown Source)
        at org.apache.hadoop.mapred.JobClient.getStagingAreaDir(JobClient.java:1198)
        at org.apache.hadoop.mapreduce.JobSubmissionFiles.getStagingDir(JobSubmissionFiles.java:102)
        at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:839)
        at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:833)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:396)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1115)
        at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:833)
        at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:807)
        at org.apache.hadoop.hive.ql.exec.ExecDriver.execute(ExecDriver.java:657)
        at org.apache.hadoop.hive.ql.exec.MapRedTask.execute(MapRedTask.java:123)
        at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:130)
        at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:57)
        at org.apache.hadoop.hive.ql.exec.TaskRunner.run(TaskRunner.java:47)
Caused by: java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1215)
        at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:218)
        at java.util.concurrent.FutureTask.get(FutureTask.java:83)
        at org.apache.hadoop.ipc.Client$Connection.sendParam(Client.java:787)
        at org.apache.hadoop.ipc.Client.call(Client.java:1077)
        ... 16 more

ここで使ったクエリ類は下記に置きました。

https://github.com/wyukawa/hive-sandbox/tree/master/parallel

いじょ




以上の内容はhttps://wyukawa.hatenablog.com/entry/20120403/1333451617より取得しました。
このページはhttp://font.textar.tv/のウェブフォントを使用してます

不具合報告/要望等はこちらへお願いします。
モバイルやる夫Viewer Ver0.14