Spark 从Hive中读取数据
在默认情况下,Hive使用MapReduce来对数据进行操作和运算,即将HQL语句翻译成MapReduce作业执行。而MapReduce的执行速度是比较慢的,一种改进方案就是使用Spark来进行数据的查找和运算。Hive和Spark的结合使用有两种方式,一种称为Hive on Spark:即将Hive底层的运算引擎由MapReduce切换为Spark,官方文档在这里:Hive on Spark: Getting Started。还有一种方式,可以称之为Spark on Hive:即使用Hive作为Spark的数据源,用Spark来读取HIVE的表数据(数据仍存储在HDFS上)。
因为Spark是一个更为通用的计算引擎,以后还会有更深度的使用(比如使用Spark streaming来进行实时运算),因此,我选用了Spark on Hive这种解决方案,将Hive仅作为管理结构化数据的工具。本文是Spark的配置过程。
配置spark
拷贝hive-site.xml至$SPARK_HOME下,然后再其中添加下面的语句:
<property> <name>hive.metastore.uris</name> <value>thrift://192.168.1.56:9083</value> </property>
这里192.168.1.56是Hive的元数据服务的地址,9083是默认的端口号。通过这里的配置,让Spark与Hive的元数据库建立起联系,Spark就可以获得Hive中有哪些库、表、分区、字段等信息。
配置Hive的元数据,可以参考 配置Hive使用MySql记录元数据。
确认Hive元数据服务已经运行
Hive的元数据服务是单独启动的,可以通过下面两种方式验证其是否启动:
# ps aux | grep hive-metastore root 10516 3.0 5.7 2040832 223484 pts/4 Sl+ 14:52 0:11 /opt/jdk/jdk1.8.0_171/jre/bin/java -Xmx256m -Djava.library.path=/opt/hadoop/hadoop-2.9.1/lib/native -Djava.net.preferIPv4Stack=true -Dhadoop.log.dir=/opt/hadoop/hadoop-2.9.1/logs ...
也可以使用下面的语句,以验证端口的方式来确认服务是否启动:
# lsof -i:9083 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 10516 root 509u IPv4 6889656 0t0 TCP *:emc-pp-mgmtsvc (LISTEN)
如果metastore服务没有启动,可以使用下面的命令启动之:
# hive --service metastore 2018-07-25 14:52:27: Starting Hive Metastore Server
编写python脚本,访问Hive仓库
配置完成后,就可以编写python脚本来对数据进行查询和运算了:
from pyspark.sql import SparkSession from pyspark.sql import HiveContext spark = SparkSession.builder.master("local")\ .appName("SparkOnHive")\ .config("spark.sql.warehouse.dir", "/user/hive/warehouse")\ .enableHiveSupport()\ .getOrCreate() hiveCtx = HiveContext(spark) df = hiveCtx.sql("select * from tglog_aw_2018.golds_log limit 10") rows = df.collect() for row in rows: print " ".join([str(row[0]), row[1].encode('utf-8'), row[2].encode('utf-8'), str(row[3]), str(row[4])])
本人是使用PyCharm这个IDE进行开发的,上面引用了pyspark这个包,如何进行python的包管理可以自行百度。
将上面的代码保存至文件 golds_read.py,然后上传至已安装好spark的服务器的~/python 文件夹下。
上面的查询语句中,tglog_aw_2018是数据库名,golds_log是表名。配置HIVE并写入数据,可以参考这两篇文章:
1. linux上安装和配置Hive
2. 写入数据到Hive表(命令行)
接下来像spark提交作业,可以获得执行结果:
# spark-submit ~/python/golds_read.py 3645356 wds7654321(4171752) 妞妞拼十翻牌 1700 1526027152 2016869 dqyx123456789(2376699) 妞妞拼十翻牌 1140 1526027152 3630468 dke3776611(4156064) 妞妞拼十翻牌 1200 1526027152 3642022 黑娃123456(4168266) 妞妞拼十翻牌 500 1526027152
这个例子主要只是演示一下如何使用spark结合hive使用。spark默认支持java、scala和python三种语言编写的作业。可以看出,大部分的逻辑都是要通过python/java/scala编程来实现的。本人选择的是比较轻量的python,操作spark主要是要学习pySpark这个类库,它的官方地址位于:https://spark.apache.org/docs/latest/api/python/index.html
感谢阅读,希望这篇文章能给你带来帮助!