Spark大数据分析实战 第5章 Spark SQL结构化数据处理引擎
所属分类 spark
浏览量 956
什么是Spark SQL
DataFrame和Dataset
Spark SQL基本使用
Spark SQL数据源
Spark SQL内置函数
什么是Spark SQL
Spark SQL 结构化数据处理
Schema信息
json parquet avro csv
与基础的Spark RDD API不同,Spark SQL提供了对结构化数据的查询和计算接口
Spark SQL的主要特点
1 将SQL查询与Spark应用程序无缝结合
使用SQL在Spark程序中查询结构化数据
与Hive区别
Hive将SQL翻译成MapReduce作业
Spark SQL底层使用 Spark RDD
results = spark.sql( "SELECT * FROM people")
2 以相同的方式连接到多种数据源
Spark SQL提供了访问各种数据源的通用方法
数据源包括 Hive Avro Parquet ORC JSON JDBC等
//读取JSON文件
val userScoreDF = spark.read.json("hdfs://centos01:9000/people.json")
//创建临时视图user_score
userScoreDF.createTempView("user_score")
//根据name关联查询
val resDF=spark.sql(
"SELECT i.age,i.name,c.score FROM user_info i JOIN user_score c ON i.name=c.name"
)
3 在现有的数据仓库上运行SQL或HiveQL查询
Spark SQL支持HiveQL语法以及Hive SerDes和UDF(用户自定义函数)
允许访问现有的Hive仓库
DataFrame和Dataset
DataFrame是Spark SQL提供的一个编程抽象
与RDD类似,也是一个分布式的数据集合
DataFrame的数据都被组织到有名字的列中,就像关系型数据库中的表一样
多种数据都可以转化为DataFrame
例如:Spark计算过程中生成的RDD、结构化数据文件、Hive中的表、外部数据库等
DataFrame在RDD的基础上添加了数据描述信息(Schema)
因此看起来更像是一张数据库表
使用DataFrame API结合SQL处理结构化数据比RDD更加的容易
Spark优化器会自动对其优化
Dataset也是一个分布式数据集,是Spark1.6中添加的一个新的API
相对于RDD,Dataset提供了强类型支持
使用Dataset API同样会经过Spark SQL优化器的优化,提高执行效率
DataFrame 是 Dataset[Row] 的一个别名
Spark SQL基本使用
Spark Shell 内置变量
sc SparkContext
spark SparkSession
SparkSession是在SparkContext基础上的封装,应用程序的入口仍然是SparkContext
SparkSession 可调用DataFrame和Dataset API
支持从不同的数据源加载数据,并把数据转换成DataFrame,然后使用SQL语句来操作DataFrame
HDFS文件 /input/person.txt
1,zhangsan,25
2,lisi,22
3,wangwu,30
SparkSession read.textFile()
scala> val d1=spark.read.textFile("hdfs://centos01:9000/input/person.txt")
d1: org.apache.spark.sql.Dataset[String] = [value: string]
csv() jdbc() json()等方法读取csv文件 jdbc数据源 json文件等数据
scala> d1.show()
+-------------+
| value|
+-------------+
|1,zhangsan,25|
|2,lisi,22|
|3,wangwu,30|
+-------------+
列名默认为 value
给Dataset添加元数据信息
定义一个样例类Person,用于存放数据描述信息(Schema)
scala> case class Person(id:Int,name:String,age:Int)
导入SparkSession的隐式转换,以便后续使用Dataset的算子
scala> import spark.implicits._
调用Dataset的map() 转换成 Person
scala> val personDataset=d1.map(line=>{
val fields = line.split(",")
val id = fields(0).toInt
val name = fields(1)
val age = fields(2).toInt
Person(id, name, age)
})
scala> personDataset.show()
+---+--------+---+
| id| name|age|
+---+--------+---+
| 1|zhangsan| 25|
| 2| lisi| 22|
| 3| wangwu| 30|
+---+--------+---+
将Dataset转为DataFrame
scala> val pdf = personDataset.toDF()
pdf: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]
执行SQL查询
scala> pdf.createTempView("v_person")
scala> val result = spark.sql("select * from v_person order by age desc")
result: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]
scala> result.show()
+---+--------+---+
| id| name|age|
+---+--------+---+
| 3| wangwu| 30|
| 1|zhangsan| 25|
| 2| lisi | 22|
+---+--------+---+
Spark SQL数据源
通过DataFrame 对各种数据源进行操作
使用相关转换算子进行操作,也可以创建临时视图 然后使用SQL查询
数据加载和写入
load() 和 save()
load() 加载外部数据源为一个DataFrame
save() 将一个DataFrame写入到指定的数据源
默认 Parquet格式
val spark = SparkSession.builder()
.appName("SparkSQLDataSource")
.master("local[*]")
.getOrCreate()
// 加载parquet格式的文件,返回一个DataFrame集合
val usersDF = spark.read.load("hdfs://centos01:9000/users.parquet")
usersDF.show()
//查询DataFrame中的id列和name列 ,并写入HDFS
usersDF.select("id","name").write.save("hdfs://centos01:9000/result")
//创建临时视图
usersDF.createTempView("t_user")
// 执行SQL查询,并将结果写入 HDFS
spark.sql("SELECT id,name FROM t_user")
.write.save("hdfs://centos01:9000/result")
手动指定数据源
使用format()方法 手动指定数据源
数据源需要使用完全限定名(例如org.apache.spark.sql.parquet
对于Spark SQL的内置数据源,可以使用缩写名(json parquet jdbc orc libsvm csv text)
val peopleDFCsv=spark.read.format("csv").load("hdfs://centos01:9000/people.csv")
可以使用option()方法向指定的数据源传递所需参数
例如JDBC数据源 账号、密码等参数
val jdbcDF = spark.read.format("jdbc")
.option("url", "jdbc:mysql://192.168.1.69:3306/spark_db")
.option("driver","com.mysql.jdbc.Driver")
.option("dbtable", "student")
.option("user", "root")
.option("password", "123456")
.load()
数据写入模式
mode() 指定如何处理已经存在的数据,枚举类SaveMode
(1)SaveMode.ErrorIfExists 默认值 如果数据已经存在,则抛出异常
(2)SaveMode.Append
(3)SaveMode.Overwrite 覆盖(包括数据或表的Schema)
(4)SaveMode.Ignore 如果数据或表已经存在,则不会写入
HDFS文件 JSON格式 /people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
val peopleDF = spark.read.format("json").load("hdfs://centos01:9000/people.json")
peopleDF.select("name")
.write.mode(SaveMode.Overwrite).format("json")
.save("hdfs://centos01:9000/result")
分区自动推断
表分区是Hive等系统中常用的优化查询效率的方法(
在分区表中,数据通常存储在不同的分区目录中
分区目录命名 分区列名=值
people 表 gender和country作为分区列
存储数据的目录结构
path
└── to
└── people
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
└── gender=female
├── ...
│
├── country=US
│ └── data.parquet
├── country=CN
│ └── data.parquet
└── ...
对于所有内置的数据源(包括Text/CSV/JSON/ORC/Parquet)
Spark SQL都能够根据目录名自动发现和推断分区信息
(1) 在本地(或HDFS)新建以下三个目录及文件
其中的目录people代表表名,gender和country代表分区列
people.json存储实际人口数据:
D:\people\gender=male\country=CN\people.json
D:\people\gender=male\country=US\people.json
D:\people\gender=female\country=CN\people.json
三个people.json文件的数据分别如下:
{"name":"zhangsan","age":32}
{"name":"lisi", "age":30}
{"name":"wangwu", "age":19}
{"name":"Michael"}
{"name":"Jack", "age":20}
{"name":"Justin", "age":18}
{"name":"xiaohong","age":17}
{"name":"xiaohua", "age":22}
{"name":"huanhuan", "age":16}
val usersDF = spark.read.format("json").load("D:\\people")
usersDF.printSchema()
usersDF.show()
Schema信息
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)
表数据
+----+--------+------+-------+
| age| name|gender|country|
+----+--------+------+-------+
| 17|xiaohong|female| CN|
| 22| xiaohua|female| CN|
| 16|huanhuan|female| CN|
| 32|zhangsan| male| CN|
| 30| lisi| male| CN|
| 19| wangwu| male| CN|
|null| Michael| male| US|
| 20| Jack| male| US|
| 18| Justin| male| US|
+----+--------+------+-------+
读取数据时,自动推断出了两个分区列gender和country,将该两列的值添加到了DataFrame中
Parquet文件
列式存储格式
Spark SQL支持对Parquet文件的读写,并且可以自动保存源数据的Schema
写入Parquet文件时,为了提高兼容性,所有列都会自动转换为 可为空 状态
加载和写入Parquet文件时,可以使用load()方法和save()
还可以直接使用Spark SQL内置的parquet()方法
//读取Parquet文件为一个DataFrame
val usersDF = spark.read.parquet("hdfs://centos01:9000/users.parquet")
//将DataFrame相关数据保存为Parquet文件,包括Schema信息
usersDF.select("id","name")
.write.parquet("hdfs://centos01:9000/result")
JSON数据集
JSON文件的每一行必须包含一个独立有效的JSON对象
一行一个json对象
{"name":"zhangsan","age":32}
{"name":"lisi", "age":30}
{"name":"wangwu", "age":19}
val spark = SparkSession.builder()
.appName("SparkSQLDataSource")
.config("spark.sql.parquet.mergeSchema",true)
.master("local[*]")
.getOrCreate()
// 1. 创建用户基本信息表
import spark.implicits._
//创建用户信息Dataset集合
val arr=Array(
"{'name':'zhangsan','age':20}",
"{'name':'lisi','age':18}"
)
val userInfo: Dataset[String] = spark.createDataset(arr)
//将Dataset[String]转为DataFrame
val userInfoDF = spark.read.json(userInfo)
//创建临时视图user_info
userInfoDF.createTempView("user_info")
//显示数据
userInfoDF.show()
// +---+--------+
// |age| name|
// +---+--------+
// | 20|zhangsan|
// | 18| lisi|
// +---+--------+
// 2. 创建用户成绩表
//读取JSON文件
val userScoreDF = spark.read.json("D:\\people\\score.json")
//创建临时视图user_score
userScoreDF.createTempView("user_score")
userScoreDF.show()
// +--------+-----+
// | name|score|
// +--------+-----+
// |zhangsan| 98|
// | lisi| 88|
// | wangwu| 95|
// +--------+-----+
// 3. 根据name字段关联查询
val resDF=spark.sql("SELECT i.age,i.name,c.score FROM user_info i " +
"JOIN user_score c ON i.name=c.name")
resDF.show()
// +---+--------+-----+
// |age| name|score|
// +---+--------+-----+
// | 20|zhangsan| 98|
// | 18| lisi| 88|
// +---+--------+-----+
Hive表
Hive有大量依赖项,这些依赖项不包括在默认的Spark发行版中
如果在classpath上配置了这些Hive依赖项,Spark将自动加载它们
Hive依赖项必须出现在所有Worker节点上,因为需要访问Hive序列化和反序列化库(SerDes)
val spark = SparkSession
.builder()
.appName("Spark Hive Demo")
.enableHiveSupport()//开启Hive支持
.getOrCreate()
创建Hive表students,并指定字段分隔符为制表符
spark.sql("CREATE TABLE IF NOT EXISTS students (name STRING, age INT) " +
"ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'")
导入本地数据到Hive表
本地文件/home/hadoop/students.txt
zhangsan 20
lisi 25
wangwu 19
spark.sql("LOAD DATA LOCAL INPATH '/home/hadoop/students.txt' INTO TABLE students")
查询表数据
spark.sql("SELECT * FROM students").show()
+--------+---+
| name|age|
+--------+---+
|zhangsan| 20|
| lisi| 25|
| wangwu| 19|
+--------+---+
创建表的同时指定存储格式
创建一个Hive表hive_records,数据存储格式为Parquet(默认为普通文本格式)
spark.sql("CREATE TABLE hive_records(key STRING, value INT) STORED AS PARQUET")
将DataFrame写入Hive表
使用saveAsTable() 将一个DataFrame写入到指定的Hive表中
//加载students表的数据为DataFrame
val studentsDF = spark.table("students")
//将DataFrame写入表hive_records中
studentsDF.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
//查询hive_records表数据并显示到控制
spark.sql("SELECT * FROM hive_records").show()
Hive数据源,提交之前需要做好Hive数据仓库、元数据库等的配置
JDBC
url 连接的 JDBC URL
driver JDBC 驱动的类名
user 数据库用户名
password 数据库密码
dbtable
数据库表名或能代表一张数据库表的子查询
只使用数据库表名,将查询整张表的数据
查询部分数据或多表关联查询,可以使用SQL查询
注意,不允许同时指定 dbtable 和 query 属性
query 指定查询的SQL语句
不允许同时指定 query 和 partitionColumn 属性
当需要指定 partitionColumn 属性时
可以使用 dbtable 属性指定子查询 并使用子查询的别名对分区列进行限定
partitionColumn,lowerBound,upperBound
若有一个被指定则必须全部指定,且必须指定numPartitions属性
如何在从多个Worker中并行读取数据时对表进行分区
partitionColumn必须是表中的数字、日期或时间戳列
lowerBound和upperBound只是用来决定分区跨度
而不是用于过滤表中的行 ,因此表中的所有行都将被分区并返回
numPartitions
对表进行并行读写数据时的最大分区数
决定了并发JDBC连接的最大数量
如果要写入数据的分区数量超过了此限制的值
在写入之前可以调用coalesce(numpartition)将分区数量减少到此限制的值
使用JDBC API对MySQL表student和表score进行关联查询
val jdbcDF = spark.read.format("jdbc")
.option("url", "jdbc:mysql://192.168.1.69:3306/spark_db")
.option("driver","com.mysql.jdbc.Driver")
.option("dbtable", "(select st.name,sc.score from student st,score sc " +
"where st.id=sc.id) t")
.option("user", "root")
.option("password", "123456")
.load()
dbtable 是一个子查询,相当于SQL查询中的FROM关键字后的一部分
使用query属性编写完整SQL语句进行查询也能达到同样的效果
val jdbcDF = spark.read.format("jdbc")
.option("url", "jdbc:mysql://192.168.1.234:3306/spark_db")
.option("driver","com.mysql.jdbc.Driver")
.option("query", "select st.name,sc.score from student st,score sc " +
"where st.id=sc.id")
.option("user", "root")
.option("password", "123456")
.load()
Spark SQL内置函数
org.apache.spark.sql.functions
函数主要分为10类
UDF函数 聚合函数 日期函数 排序函数
非聚合函数 数学函数 混杂函数 窗口函数 字符串函数 集合函数
大部分函数与Hive相同
内置函数两种使用方式
编程方式
SQL语句中使用
import org.apache.spark.sql.functions._
df.select(lower(col("name")).as("name")).show()
UDF User Defined Functions
常用的聚合函数 count countDistinct avg max min
自定义聚合函数 UDAF User Defined Aggregate Functions
UDF主要针对单个输入,返回单个输出
UDAF 可以针对多个输入进行聚合计算返回单个输出,功能更加强大
UDAF 继承抽象类UserDefinedAggregateFunction
开窗函数
row_number()
使用该函数可以在查询结果中对每个分组的数据,按照其排序的顺序添加一列行号(从1开始)
根据行号可以方便的对每一组数据取前N行(分组取TOPN)
row_number() over (partition by 列名 order by 列名 desc) 行号列别名
partition by 按照某一列进行分组。
order by 分组后按照某一列进行组内排序
desc 降序,默认升序
上一篇
下一篇
Spark Standalone 的两种提交方式
Spark大数据分析实战 第3章 RDD弹性分布式数据集
Spark大数据分析实战 第4章 Spark内核源码分析
Spark大数据分析实战 第6章 Kafka分布式消息系统
Spark大数据分析实战 第7章 Spark Streaming实时流处理引擎
Clickhouse 监控运维常用SQL