Spark 从文本创建DataFrame的3种方式
Spark Sql的核心对象DataFrame提供了Sql查询的能力,极大的方便了数据的查询和统计。如果要对DataFrame进行Sql查询,需要指定Scheme信息。而文本文件往往缺少Scheme信息,这篇文章将演示3种从文本文件创建DataFrame,并设置其Scheme的方式。
范例文件介绍
本文用来做示范的文件,是一个简化的手机端的事件日志,以“|”分隔,其数据格式为:
客户端ip|事件时间|事件标识|机器码|操作系统|设备型号
一般我们在代码里习惯用英文,翻译一下就是:
ip|date|event|machine|os|device
事件日志的名称为 events.log,内容如下:
49.77.134.137|1536544453|touxiang_1267|868385038260432|android 7.1.1 R11s_11_A.25_180706|OPPO R11s 223.104.145.59|1536544452|icon_frame_314|865902037967669|android 7.1.1 NMF26F|MI MAX 2 222.93.202.10|1536544344|btn_speak_30|867701035125231|android 7.1.1 NMF26X release-keys|vivo X20Plus A 222.93.202.10|1536544346|img_sound_293|867701035125231|android 7.1.1 NMF26X release-keys|vivo X20Plus A 222.93.202.10|1536544437|btn_hui_fu_1997|867701035125231|android 7.1.1 NMF26X release-keys|vivo X20Plus A 117.91.166.96|1536544419|btn_speak_30|864088038729653|android 6.0.1 R9sPlus_11_A.11_180515|OPPO R9s Plus 117.91.166.96|1536544426|btn_speak_30|864088038729653|android 6.0.1 R9sPlus_11_A.11_180515|OPPO R9s Plus 117.91.166.96|1536544433|btn_speak_30|864088038729653|android 6.0.1 R9sPlus_11_A.11_180515|OPPO R9s Plus
先将这个文件从当前目录(~/downloads/events.log)上传到hdfs的/test文件夹下,方便后续测试:
# hdfs dfs -put ~/downloads/events.log /test
方式1:从RDD创建
第一种方式先创建RDD,然后将RDD的行转换为Row对象,再根据Row去推断Schema。也可以转换为元组,再明确指定Schema。
# coding=utf-8 from pyspark.sql import SparkSession from pyspark import Row from pyspark.sql import types as T spark = SparkSession.builder\ .master("local[*]")\ .appName("test.dataframe")\ .getOrCreate() sc = spark.sparkContext linesRDD = sc.textFile("/test/events.log") partsRDD = linesRDD.map(lambda x: x.split("|")) # 方法1:创建ROW对象,根据ROW来创建Schema,自动推断字段类型 rowsRDD = partsRDD.map(lambda x:Row(ip=x[0],date=int(x[1]),event=x[2], machine=x[3], os=x[4], device=x[5])) df = spark.createDataFrame(rowsRDD) # 方法2:通过Schema字符串创建 # rowsRDD = partsRDD.map(lambda x:(x[0], int(x[1]), x[2], x[3], x[4], x[5])) # schema = "ip: string, date: long, event: string, machine: string, os: string, device: string" # df = spark.createDataFrame(rowsRDD, schema) # 方法3:通过StructType对象创建 # rowsRDD = partsRDD.map(lambda x:(x[0], int(x[1]), x[2], x[3], x[4], x[5])) # schema = T.StructType([ # T.StructField("ip", T.StringType(), True), # T.StructField("date", T.LongType(), False), # T.StructField("event", T.StringType()), # T.StructField("machine", T.StringType(), True), # T.StructField("os", T.StringType(), True), # T.StructField("device", T.StringType(), True), # ]) # df = spark.createDataFrame(rowsRDD, schema) df.printSchema() df.createOrReplaceTempView("events") df = spark.sql("select * from events where device like '%OPPO%'") df.show() print "success"
这里采用了三种给RDD赋予Scheme的方法。最复杂(繁琐)的是第3种,但是第3种可以指定Field是否可以为Null。
结果如下(3种方法的结果大同小异):
# $SPARK_HOME/bin/spark-submit /data/pyjobs/sample/dataframe-rdd.py root |-- ip: string (nullable = true) |-- date: long (nullable = true) |-- event: string (nullable = true) |-- machine: string (nullable = true) |-- os: string (nullable = true) |-- device: string (nullable = true) +-------------+----------+-------------+---------------+--------------------+-------------+ | ip| date| event| machine| os| device| +-------------+----------+-------------+---------------+--------------------+-------------+ |49.77.134.137|1536544453|touxiang_1267|868385038260432|android 7.1.1 R11...| OPPO R11s| |117.91.166.96|1536544419| btn_speak_30|864088038729653|android 6.0.1 R9s...|OPPO R9s Plus| |117.91.166.96|1536544426| btn_speak_30|864088038729653|android 6.0.1 R9s...|OPPO R9s Plus| |117.91.166.96|1536544433| btn_speak_30|864088038729653|android 6.0.1 R9s...|OPPO R9s Plus| +-------------+----------+-------------+---------------+--------------------+-------------+ success
方式2:从DataFrame转换
从DataFrame转换,就是SparkSession的read方法,直接将文件读取为DataFrame,此时的Schema就一个string类型的字段,默认名为value,接着再对此DataFrame进行转换。
需要注意:pyspark.sql.functions.split函数,其接收的是一个正则表达式,而“|”在正则表达式中是一个特殊字符,因此需要用“\”进行转义。
# coding=utf-8 from pyspark.sql import SparkSession from pyspark.sql import types as T from pyspark.sql import functions as F spark = SparkSession.builder\ .master("local[*]")\ .appName("test.dataframe")\ .getOrCreate() # step1. 原始数据 df = spark.read.text("/test/events.log") df.printSchema() df.show() # step2. 转成了数组 df = df.select(F.split(df["value"], '\|').alias("cols")) df.printSchema() df.show() # step3. 将数组字段转成了列 cols = df["cols"] df = df.withColumn("ip", cols.getItem(0)) \ .withColumn("date", cols.getItem(1).cast(T.LongType())) \ .withColumn("event", cols.getItem(2)) \ .withColumn("machine", cols.getItem(3)) \ .withColumn("os", cols.getItem(4)) \ .withColumn("device", cols.getItem(5)) \ .drop("cols") df.printSchema() df.createOrReplaceTempView("events") df = spark.sql("select * from events where device like '%OPPO%'") df.show() print "success"
输出的结果如下:
# $SPARK_HOME/bin/spark-submit /data/pyjobs/sample/dataframe-df.py root |-- value: string (nullable = true) +--------------------+ | value| +--------------------+ |49.77.134.137|153...| |223.104.145.59|15...| |222.93.202.10|153...| |222.93.202.10|153...| |222.93.202.10|153...| |117.91.166.96|153...| |117.91.166.96|153...| |117.91.166.96|153...| +--------------------+ root |-- cols: array (nullable = true) | |-- element: string (containsNull = true) +--------------------+ | cols| +--------------------+ |[49.77.134.137, 1...| |[223.104.145.59, ...| |[222.93.202.10, 1...| |[222.93.202.10, 1...| |[222.93.202.10, 1...| |[117.91.166.96, 1...| |[117.91.166.96, 1...| |[117.91.166.96, 1...| +--------------------+ root |-- ip: string (nullable = true) |-- date: long (nullable = true) |-- event: string (nullable = true) |-- machine: string (nullable = true) |-- os: string (nullable = true) |-- device: string (nullable = true) +-------------+----------+-------------+---------------+--------------------+-------------+ | ip| date| event| machine| os| device| +-------------+----------+-------------+---------------+--------------------+-------------+ |49.77.134.137|1536544453|touxiang_1267|868385038260432|android 7.1.1 R11...| OPPO R11s| |117.91.166.96|1536544419| btn_speak_30|864088038729653|android 6.0.1 R9s...|OPPO R9s Plus| |117.91.166.96|1536544426| btn_speak_30|864088038729653|android 6.0.1 R9s...|OPPO R9s Plus| |117.91.166.96|1536544433| btn_speak_30|864088038729653|android 6.0.1 R9s...|OPPO R9s Plus| +-------------+----------+-------------+---------------+--------------------+-------------+ success
方式3:使用csv方法
最后一种就是使用csv方法了,可以直接指定分隔符和schema:
需要注意:这里的schema字符串格式,和SparkSession.CreateDataFrame方法中接收的schema字符串格式略有差异,少了“:”。
# coding=utf-8 from pyspark.sql import SparkSession spark = SparkSession.builder\ .master("local[*]")\ .appName("test.dataframe")\ .getOrCreate() schema = "ip string, date long, event string, machine string, os string, device string" df = spark.read.csv("/test/events.log", sep="|", schema=schema) df.printSchema() df.createOrReplaceTempView("events") df = spark.sql("select * from events where device like '%OPPO%'") df.show() print "success"
其运行结果和方式1是类似的,就不再演示了。
总结
不是后缀名是csv的文件就是csv文件,csv文件本意是以逗号分隔的文件,扩展一下,就是以某字符进行分隔的文件都可以视为csv文件。因此,对于上面的事件日志文件,也可以视为csv文件,那么显然,使用上面的方式3是最方便快捷的了。我本人最开始用的是方式1,兜兜转转才发现了方式3,文档还是看得不够仔细呀。总的来说,Spark的文档在几个开源大数据项目中,算是最优秀的一个了。有时间就看看说不定就会有新的收获。
感谢阅读,希望这篇文章能给你带来帮助!