
業務でPyHiveを使用したのでメモ。(忘れる自信しかない😂 )
環境によっては認証周りが変わりそうです。
⬇️ クリーンアーキテクチャの事が分かりやすく書かれていて、お勧めです🐱
- 作者:Robert C.Martin,角 征典,高木 正弘
- 発売日: 2018/08/01
- メディア: Kindle版
環境
Pythonのバージョンは3.7。
$ pip3 install PyHive==0.6.3 sasl==0.2.1 thrift==0.13.0 thrift-sasl==0.4.2
SASL
Hive
Hiveの高速化はこちらが参考になります。
実装
最近、with ステートメントがスマートでお気に入りです。
configrationはconfに切り出して管理すれば、環境毎に設定を分けたりしやすくなります。
# -*- encoding: utf-8 -*-
import os
import sasl
import thrift
import thrift-sasl
from thrift import transport
from pyhive import hive
class HiveConnector:
def __init__(self, host, port, db, service='hive', auth='GSSAPI', queue='unfunded'):
self.host = host
self.port = int(port)
self.db = db
self.service = service
self.auth = auth
self.queue = queue
self.max_buf_size = 256 * 1024 * 1024
socket = thrift.transport.TSocket.TSocket(self.host, self.port)
thrift_transport = thrift_sasl.TSaslClientTransport(
self._sasl_factory, self.auth, socket)
self.connection = hive.Connection(
thrift_transport=thrift_transport,
database=self.db,
configuration={ // オプションはここで指定、別ファイルに切り出しても良さそう
'tez.queue.name': self.queue,
'hive.exec.scratchdir': '/user/{ユーザー名}/tmp',
'hive.exec.dynamic.partition': 'true',
'hive.exec.dynamic.partition.mode': 'nonstrict',
'hive.optimize.sort.dynamic.partition': 'false',
'hive.vectorized.execution.enabled': 'true',
'hive.vectorized.execution.reduce.enabled': 'true',
'hive.cbo.enabled': 'true',
'hive.compute.query.using.stats': 'true',
'hive.stats.fetch.column.stats': 'true',
'hive.stats.fetch.partition.stats': 'true'
}
)
def _sasl_factory(self):
sasl_client = sasl.Client()
sasl_client.setAttr('host', self.host)
sasl_client.setAttr('service', self.service)
sasl_client.setAttr('maxbufsize', self.max_buf_size)
sasl_client.init()
return sasl_client
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
// 更新系
def execute(self, query, param=None):
with self.connection.cursor() as cur:
cur.execute(query, parameters=param)
self.connection.commit()
// 1件取得
def fetchone(self, query, param=None):
with self.connection.cursor() as cur:
cur.execute(query, parameters=param)
return cur.fetchone()
// 全件取得
def fetchall(self, query, param=None):
with self.connection.cursor() as cur:
cur.execute(query, parameters=param)
return cur.fetchall()
// 逐次取得
def fetchmany(self, query, param=None, size=1000):
with self.connection.cursor() as cur:
cur.execute(query, parameters=param)
with True:
rows = cur.fetchmany()
if not rows:
break
for row in rows:
yield row
def close(self):
self.connection.close()
if __name__ == '__main__':
host = 'https://xxxx.xxx'
with HiveConnector(host, 10000, 'test_db') as hc:
query = 'SELECT * FROM access_logs WHERE log_date=%s'
result = hc.fetchall(query, ('20210101',))
print(result)