张子阳的博客

首页 读书 技术 店铺 关于
张子阳的博客 首页 读书 技术 关于

Spark 从文本创建DataFrame的3种方式

2018-10-19 张子阳 分类: 大数据处理

Spark Sql的核心对象DataFrame提供了Sql查询的能力,极大的方便了数据的查询和统计。如果要对DataFrame进行Sql查询,需要指定Scheme信息。而文本文件往往缺少Scheme信息,这篇文章将演示3种从文本文件创建DataFrame,并设置其Scheme的方式。

范例文件介绍

本文用来做示范的文件,是一个简化的手机端的事件日志,以“|”分隔,其数据格式为:

客户端ip|事件时间|事件标识|机器码|操作系统|设备型号

一般我们在代码里习惯用英文,翻译一下就是:

ip|date|event|machine|os|device

事件日志的名称为 events.log,内容如下:

49.77.134.137|1536544453|touxiang_1267|868385038260432|android 7.1.1 R11s_11_A.25_180706|OPPO R11s
223.104.145.59|1536544452|icon_frame_314|865902037967669|android 7.1.1 NMF26F|MI MAX 2
222.93.202.10|1536544344|btn_speak_30|867701035125231|android 7.1.1 NMF26X release-keys|vivo X20Plus A
222.93.202.10|1536544346|img_sound_293|867701035125231|android 7.1.1 NMF26X release-keys|vivo X20Plus A
222.93.202.10|1536544437|btn_hui_fu_1997|867701035125231|android 7.1.1 NMF26X release-keys|vivo X20Plus A
117.91.166.96|1536544419|btn_speak_30|864088038729653|android 6.0.1 R9sPlus_11_A.11_180515|OPPO R9s Plus
117.91.166.96|1536544426|btn_speak_30|864088038729653|android 6.0.1 R9sPlus_11_A.11_180515|OPPO R9s Plus
117.91.166.96|1536544433|btn_speak_30|864088038729653|android 6.0.1 R9sPlus_11_A.11_180515|OPPO R9s Plus

先将这个文件从当前目录(~/downloads/events.log)上传到hdfs的/test文件夹下,方便后续测试:

# hdfs dfs -put ~/downloads/events.log /test

方式1:从RDD创建

第一种方式先创建RDD,然后将RDD的行转换为Row对象,再根据Row去推断Schema。也可以转换为元组,再明确指定Schema。

# coding=utf-8
from pyspark.sql import SparkSession
from pyspark import Row
from pyspark.sql import types as T

spark = SparkSession.builder\
    .master("local[*]")\
    .appName("test.dataframe")\
    .getOrCreate()

sc = spark.sparkContext

linesRDD = sc.textFile("/test/events.log")
partsRDD = linesRDD.map(lambda x: x.split("|"))

# 方法1:创建ROW对象,根据ROW来创建Schema,自动推断字段类型
rowsRDD = partsRDD.map(lambda x:Row(ip=x[0],date=int(x[1]),event=x[2], machine=x[3], os=x[4], device=x[5]))
df = spark.createDataFrame(rowsRDD)

# 方法2:通过Schema字符串创建
# rowsRDD = partsRDD.map(lambda x:(x[0], int(x[1]), x[2], x[3], x[4], x[5]))
# schema = "ip: string, date: long, event: string, machine: string, os: string, device: string"
# df = spark.createDataFrame(rowsRDD, schema)

# 方法3:通过StructType对象创建
# rowsRDD = partsRDD.map(lambda x:(x[0], int(x[1]), x[2], x[3], x[4], x[5]))
# schema = T.StructType([
#     T.StructField("ip", T.StringType(), True),
#     T.StructField("date", T.LongType(), False),
#     T.StructField("event", T.StringType()),
#     T.StructField("machine", T.StringType(), True),
#     T.StructField("os", T.StringType(), True),
#     T.StructField("device", T.StringType(), True),
# ])
# df = spark.createDataFrame(rowsRDD, schema)

df.printSchema()
df.createOrReplaceTempView("events")
df = spark.sql("select * from events where device like '%OPPO%'")
df.show()

print "success"

这里采用了三种给RDD赋予Scheme的方法。最复杂(繁琐)的是第3种,但是第3种可以指定Field是否可以为Null。

结果如下(3种方法的结果大同小异):

# $SPARK_HOME/bin/spark-submit /data/pyjobs/sample/dataframe-rdd.py
root
 |-- ip: string (nullable = true)
 |-- date: long (nullable = true)
 |-- event: string (nullable = true)
 |-- machine: string (nullable = true)
 |-- os: string (nullable = true)
 |-- device: string (nullable = true)

+-------------+----------+-------------+---------------+--------------------+-------------+
|           ip|      date|        event|        machine|                  os|       device|
+-------------+----------+-------------+---------------+--------------------+-------------+
|49.77.134.137|1536544453|touxiang_1267|868385038260432|android 7.1.1 R11...|    OPPO R11s|
|117.91.166.96|1536544419| btn_speak_30|864088038729653|android 6.0.1 R9s...|OPPO R9s Plus|
|117.91.166.96|1536544426| btn_speak_30|864088038729653|android 6.0.1 R9s...|OPPO R9s Plus|
|117.91.166.96|1536544433| btn_speak_30|864088038729653|android 6.0.1 R9s...|OPPO R9s Plus|
+-------------+----------+-------------+---------------+--------------------+-------------+

success

方式2:从DataFrame转换

从DataFrame转换,就是SparkSession的read方法,直接将文件读取为DataFrame,此时的Schema就一个string类型的字段,默认名为value,接着再对此DataFrame进行转换。

需要注意:pyspark.sql.functions.split函数,其接收的是一个正则表达式,而“|”在正则表达式中是一个特殊字符,因此需要用“\”进行转义。

# coding=utf-8
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import functions as F

spark = SparkSession.builder\
    .master("local[*]")\
    .appName("test.dataframe")\
    .getOrCreate()

# step1. 原始数据
df = spark.read.text("/test/events.log")
df.printSchema()
df.show()

# step2. 转成了数组
df = df.select(F.split(df["value"], '\|').alias("cols"))
df.printSchema()
df.show()

# step3. 将数组字段转成了列
cols = df["cols"]
df = df.withColumn("ip", cols.getItem(0)) \
    .withColumn("date", cols.getItem(1).cast(T.LongType())) \
    .withColumn("event", cols.getItem(2)) \
    .withColumn("machine", cols.getItem(3)) \
    .withColumn("os", cols.getItem(4)) \
    .withColumn("device", cols.getItem(5)) \
    .drop("cols")

df.printSchema()
df.createOrReplaceTempView("events")
df = spark.sql("select * from events where device like '%OPPO%'")
df.show()

print "success"

输出的结果如下:

# $SPARK_HOME/bin/spark-submit /data/pyjobs/sample/dataframe-df.py
root
 |-- value: string (nullable = true)

+--------------------+
|               value|
+--------------------+
|49.77.134.137|153...|
|223.104.145.59|15...|
|222.93.202.10|153...|
|222.93.202.10|153...|
|222.93.202.10|153...|
|117.91.166.96|153...|
|117.91.166.96|153...|
|117.91.166.96|153...|
+--------------------+

root
 |-- cols: array (nullable = true)
 |    |-- element: string (containsNull = true)

+--------------------+
|                cols|
+--------------------+
|[49.77.134.137, 1...|
|[223.104.145.59, ...|
|[222.93.202.10, 1...|
|[222.93.202.10, 1...|
|[222.93.202.10, 1...|
|[117.91.166.96, 1...|
|[117.91.166.96, 1...|
|[117.91.166.96, 1...|
+--------------------+

root
 |-- ip: string (nullable = true)
 |-- date: long (nullable = true)
 |-- event: string (nullable = true)
 |-- machine: string (nullable = true)
 |-- os: string (nullable = true)
 |-- device: string (nullable = true)

+-------------+----------+-------------+---------------+--------------------+-------------+
|           ip|      date|        event|        machine|                  os|       device|
+-------------+----------+-------------+---------------+--------------------+-------------+
|49.77.134.137|1536544453|touxiang_1267|868385038260432|android 7.1.1 R11...|    OPPO R11s|
|117.91.166.96|1536544419| btn_speak_30|864088038729653|android 6.0.1 R9s...|OPPO R9s Plus|
|117.91.166.96|1536544426| btn_speak_30|864088038729653|android 6.0.1 R9s...|OPPO R9s Plus|
|117.91.166.96|1536544433| btn_speak_30|864088038729653|android 6.0.1 R9s...|OPPO R9s Plus|
+-------------+----------+-------------+---------------+--------------------+-------------+

success

方式3:使用csv方法

最后一种就是使用csv方法了,可以直接指定分隔符和schema:

需要注意:这里的schema字符串格式,和SparkSession.CreateDataFrame方法中接收的schema字符串格式略有差异,少了“:”。

# coding=utf-8
from pyspark.sql import SparkSession

spark = SparkSession.builder\
    .master("local[*]")\
    .appName("test.dataframe")\
    .getOrCreate()

schema = "ip string, date long, event string, machine string, os string, device string"
df = spark.read.csv("/test/events.log", sep="|", schema=schema)

df.printSchema()
df.createOrReplaceTempView("events")
df = spark.sql("select * from events where device like '%OPPO%'")
df.show()

print "success"

其运行结果和方式1是类似的,就不再演示了。

总结

不是后缀名是csv的文件就是csv文件,csv文件本意是以逗号分隔的文件,扩展一下,就是以某字符进行分隔的文件都可以视为csv文件。因此,对于上面的事件日志文件,也可以视为csv文件,那么显然,使用上面的方式3是最方便快捷的了。我本人最开始用的是方式1,兜兜转转才发现了方式3,文档还是看得不够仔细呀。总的来说,Spark的文档在几个开源大数据项目中,算是最优秀的一个了。有时间就看看说不定就会有新的收获。

感谢阅读,希望这篇文章能给你带来帮助!