Python实战hive

2022/11/09 Python Hive

Python实战Hive

PyHive 是 Python 语言编写的用于操作 Hive 的简便工具库。

PyHive安装

# Liunx系统
pip install sasl
pip install thrift
pip install thrift-sasl
pip install PyHive

# Windows系统会出现莫名其妙的报错,sasl需要选择对应的版本号
pip install sasl‑0.3.1‑cp310‑cp310‑win_amd64.whl
或者下载到本地手动导入包
pip install D:\sasl-0.3.1-cp310-cp310-win_amd64.whl

优化后导入如下:
pip install sasl-0.2.1-cp36-cp36m-win_amd64.whl
pip install thrift -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install thrift_sasl==0.3.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install pyhive -i https://pypi.tuna.tsinghua.edu.cn/simple

下载一个对应你所使用的Python版本和Windows版本的sasl文件:https://www.lfd.uci.edu/~gohlke/pythonlibs/#sasl。 例如, sasl‑0.2.1‑cp36‑cp36m‑win_amd64.whl,它对应的Python 版本为3.6,对应的Windows系统为64位。 安装执行pip install sasl-0.2.1-cp37-cp37m-win_amd64.whl

简介

pyhive通过与HiveServer2通讯来操作Hive数据。当hiveserver2服务启动后,会开启10000的端口,对外提供服务,此时pyhive客户端通过JDBC连接hiveserver2进行Hive sql操作。

Pyhive Client通过JDBC与hiveserver2建立通信,hiveserver2服务端发送HQL语句到Driver端,Driver端将HQL发送至Compiler组件进行语法树解析,此时需在metastore获取HQL相关的database和table等信息,在对HQL完成解析后,Compiler组件发送执行计划至Driver端等待处理,Driver端发送执行计划至Executor端,再由Executor端发送MapReduce任务至Hadoop集群执行Job,Job完成后最终将HQL查询数据发送Driver端,再由hive server2返回数据至pyhive Client。

访问

PyHive 连接 Hive 一般流程:

  • 创建连接
  • 获取游标
  • 执行SQL语句
  • 获取结果
  • 关闭连接
# 加载包
from pyhive import hive

# 建立连接
conn = hive.connect(host = '10.8.1.2',      # 主机
                    port = 10000,                  # 端口 
                    username = 'hdfs',                  # 用户
                    )
                    
# 查询
cursor = conn.cursor()  # 获取一个游标
cursor.execute('show databases')  # 执行sql语句
# sql = 'show tables'  # 操作语句
for result in cursor.fetchall():  # 输出获取结果的所有行
    print(result)
 
# 关闭连接
cursor.close()
conn.close()

其中,cursor.fetchall() 返回的是一个 list 对象,并且每一个元素都是一个 tuple 对象。 需要对其进行一定的处理,才能转换为建模需要的 DataFrame。

函数封装

# 函数封装
def get_data(params, sql_text, is_all_col=1):
    '''
    is_all_col: 是否选取所有的列 select *
    '''
    # 建立连接
    con = hive.connect(host = params.get('ip'),
                       port = params.get('port'),
                       auth = params.get('auth'),
                       kerberos_service_name = params.get('kerberos_service_name'),
                       database = params.get('database'),
                       password = params.get('password'))
    cursor = con.cursor()
    cursor.execute(sql_text)
    # 列名
    col_tmp = cursor.description
    col = list()
    if is_all_col == 1:
        for i in range(len(col_tmp)):
            col.append(col_tmp[i][0].split('.')[1])
    else:
        for i in range(len(col_tmp)):
            col.append(col_tmp[i][0])
    # 数据
    data = cursor.fetchall()
    result = pd.DataFrame(list(data), columns=col)
    # 关闭连接 释放资源
    cursor.close()
    con.close()
    
    return result

if __name__ == '__main__':

    import pandas as pd
    import numpy as np
    
    params = {
        'ip': 'xxx.xxx.xxx.xxx',
        'port': '1000',
        'auth': 'hider',
        'kerberos_service_name': 'hive',
        'database': 'hive',
        'password': '100',
    }
    
    sql_text1 = 'select * from table limit 5'
    sql_text2 = 'select a, b, c from table limit 5'
    
    # 所有列
    data1 = get_data(params, sql_text1, is_all_col=1)
    # 指定列
    data2 = get_data(params, sql_text2, is_all_col=0)

Hive配置

Hive 有许多必要的参数设置,通过 Connection 类的 configuration 参数可进行配置。

hive_config = {
    'mapreduce.job.queuename': 'my_hive',
    'hive.exec.compress.output': 'false',
    'hive.exec.compress.intermediate': 'true',
    'mapred.min.split.size.per.node': '1',
    'mapred.min.split.size.per.rack': '1',
    'hive.map.aggr': 'true',
    'hive.groupby.skewindata': 'true',
    'hive.exec.dynamic.partition.mode': 'nonstrict'
}

conn = hive.connect(host = '',
                    port = '',
                    ...
                    configuration = hive_config)

列名

Cursor 类中有一个 description 方法,可以获取数据表中的列名、数据类型等信息。

col = cursor.description
col_names = list()
for column in col:
	col_names.append(column[0]) # 提取第一个元素:列名

执行脚本带参数

游标所执行的脚本可以不写死,通过参数的方式进行配置。

month = 202205

# %占位符
sql_text = 'select * from table where month = %d limit 5' % month
print(sql_text)

# format
sql_text = 'select * from table where month = {} limit 5'.format(month)

# f-string
sql_text = f'select * from table where month = {month} limit 5'
print(sql_text)

Search

    微信好友

    博士的沙漏

    Table of Contents