首页  

Spark大数据分析实战 第5章 Spark SQL结构化数据处理引擎     所属分类 spark 浏览量 766
什么是Spark SQL
DataFrame和Dataset
Spark SQL基本使用
Spark SQL数据源
Spark SQL内置函数


什么是Spark SQL Spark SQL 结构化数据处理 Schema信息 json parquet avro csv 与基础的Spark RDD API不同,Spark SQL提供了对结构化数据的查询和计算接口 Spark SQL的主要特点 1 将SQL查询与Spark应用程序无缝结合 使用SQL在Spark程序中查询结构化数据 与Hive区别 Hive将SQL翻译成MapReduce作业 Spark SQL底层使用 Spark RDD results = spark.sql( "SELECT * FROM people") 2 以相同的方式连接到多种数据源 Spark SQL提供了访问各种数据源的通用方法 数据源包括 Hive Avro Parquet ORC JSON JDBC等 //读取JSON文件 val userScoreDF = spark.read.json("hdfs://centos01:9000/people.json") //创建临时视图user_score userScoreDF.createTempView("user_score") //根据name关联查询 val resDF=spark.sql( "SELECT i.age,i.name,c.score FROM user_info i JOIN user_score c ON i.name=c.name" ) 3 在现有的数据仓库上运行SQL或HiveQL查询 Spark SQL支持HiveQL语法以及Hive SerDes和UDF(用户自定义函数) 允许访问现有的Hive仓库
DataFrame和Dataset DataFrame是Spark SQL提供的一个编程抽象 与RDD类似,也是一个分布式的数据集合 DataFrame的数据都被组织到有名字的列中,就像关系型数据库中的表一样 多种数据都可以转化为DataFrame 例如:Spark计算过程中生成的RDD、结构化数据文件、Hive中的表、外部数据库等 DataFrame在RDD的基础上添加了数据描述信息(Schema) 因此看起来更像是一张数据库表 使用DataFrame API结合SQL处理结构化数据比RDD更加的容易 Spark优化器会自动对其优化 Dataset也是一个分布式数据集,是Spark1.6中添加的一个新的API 相对于RDD,Dataset提供了强类型支持 使用Dataset API同样会经过Spark SQL优化器的优化,提高执行效率 DataFrame 是 Dataset[Row] 的一个别名
Spark SQL基本使用 Spark Shell 内置变量 sc SparkContext spark SparkSession SparkSession是在SparkContext基础上的封装,应用程序的入口仍然是SparkContext SparkSession 可调用DataFrame和Dataset API 支持从不同的数据源加载数据,并把数据转换成DataFrame,然后使用SQL语句来操作DataFrame HDFS文件 /input/person.txt 1,zhangsan,25 2,lisi,22 3,wangwu,30 SparkSession read.textFile() scala> val d1=spark.read.textFile("hdfs://centos01:9000/input/person.txt") d1: org.apache.spark.sql.Dataset[String] = [value: string] csv() jdbc() json()等方法读取csv文件 jdbc数据源 json文件等数据 scala> d1.show() +-------------+ | value| +-------------+ |1,zhangsan,25| |2,lisi,22| |3,wangwu,30| +-------------+ 列名默认为 value 给Dataset添加元数据信息 定义一个样例类Person,用于存放数据描述信息(Schema) scala> case class Person(id:Int,name:String,age:Int) 导入SparkSession的隐式转换,以便后续使用Dataset的算子 scala> import spark.implicits._ 调用Dataset的map() 转换成 Person scala> val personDataset=d1.map(line=>{ val fields = line.split(",") val id = fields(0).toInt val name = fields(1) val age = fields(2).toInt Person(id, name, age) }) scala> personDataset.show() +---+--------+---+ | id| name|age| +---+--------+---+ | 1|zhangsan| 25| | 2| lisi| 22| | 3| wangwu| 30| +---+--------+---+ 将Dataset转为DataFrame scala> val pdf = personDataset.toDF() pdf: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field] 执行SQL查询 scala> pdf.createTempView("v_person") scala> val result = spark.sql("select * from v_person order by age desc") result: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field] scala> result.show() +---+--------+---+ | id| name|age| +---+--------+---+ | 3| wangwu| 30| | 1|zhangsan| 25| | 2| lisi | 22| +---+--------+---+
Spark SQL数据源 通过DataFrame 对各种数据源进行操作 使用相关转换算子进行操作,也可以创建临时视图 然后使用SQL查询 数据加载和写入 load() 和 save() load() 加载外部数据源为一个DataFrame save() 将一个DataFrame写入到指定的数据源 默认 Parquet格式 val spark = SparkSession.builder() .appName("SparkSQLDataSource") .master("local[*]") .getOrCreate() // 加载parquet格式的文件,返回一个DataFrame集合 val usersDF = spark.read.load("hdfs://centos01:9000/users.parquet") usersDF.show() //查询DataFrame中的id列和name列 ,并写入HDFS usersDF.select("id","name").write.save("hdfs://centos01:9000/result") //创建临时视图 usersDF.createTempView("t_user") // 执行SQL查询,并将结果写入 HDFS spark.sql("SELECT id,name FROM t_user") .write.save("hdfs://centos01:9000/result") 手动指定数据源 使用format()方法 手动指定数据源 数据源需要使用完全限定名(例如org.apache.spark.sql.parquet 对于Spark SQL的内置数据源,可以使用缩写名(json parquet jdbc orc libsvm csv text) val peopleDFCsv=spark.read.format("csv").load("hdfs://centos01:9000/people.csv") 可以使用option()方法向指定的数据源传递所需参数 例如JDBC数据源 账号、密码等参数 val jdbcDF = spark.read.format("jdbc") .option("url", "jdbc:mysql://192.168.1.69:3306/spark_db") .option("driver","com.mysql.jdbc.Driver") .option("dbtable", "student") .option("user", "root") .option("password", "123456") .load() 数据写入模式 mode() 指定如何处理已经存在的数据,枚举类SaveMode (1)SaveMode.ErrorIfExists 默认值 如果数据已经存在,则抛出异常 (2)SaveMode.Append (3)SaveMode.Overwrite 覆盖(包括数据或表的Schema) (4)SaveMode.Ignore 如果数据或表已经存在,则不会写入 HDFS文件 JSON格式 /people.json {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19} val peopleDF = spark.read.format("json").load("hdfs://centos01:9000/people.json") peopleDF.select("name") .write.mode(SaveMode.Overwrite).format("json") .save("hdfs://centos01:9000/result") 分区自动推断 表分区是Hive等系统中常用的优化查询效率的方法( 在分区表中,数据通常存储在不同的分区目录中 分区目录命名 分区列名=值 people 表 gender和country作为分区列 存储数据的目录结构 path └── to └── people ├── gender=male │ ├── ... │ │ │ ├── country=US │ │ └── data.parquet │ ├── country=CN │ │ └── data.parquet │ └── ... └── gender=female ├── ... │ ├── country=US │ └── data.parquet ├── country=CN │ └── data.parquet └── ... 对于所有内置的数据源(包括Text/CSV/JSON/ORC/Parquet) Spark SQL都能够根据目录名自动发现和推断分区信息 (1) 在本地(或HDFS)新建以下三个目录及文件 其中的目录people代表表名,gender和country代表分区列 people.json存储实际人口数据: D:\people\gender=male\country=CN\people.json D:\people\gender=male\country=US\people.json D:\people\gender=female\country=CN\people.json 三个people.json文件的数据分别如下: {"name":"zhangsan","age":32} {"name":"lisi", "age":30} {"name":"wangwu", "age":19} {"name":"Michael"} {"name":"Jack", "age":20} {"name":"Justin", "age":18} {"name":"xiaohong","age":17} {"name":"xiaohua", "age":22} {"name":"huanhuan", "age":16} val usersDF = spark.read.format("json").load("D:\\people") usersDF.printSchema() usersDF.show() Schema信息 root |-- age: long (nullable = true) |-- name: string (nullable = true) |-- gender: string (nullable = true) |-- country: string (nullable = true) 表数据 +----+--------+------+-------+ | age| name|gender|country| +----+--------+------+-------+ | 17|xiaohong|female| CN| | 22| xiaohua|female| CN| | 16|huanhuan|female| CN| | 32|zhangsan| male| CN| | 30| lisi| male| CN| | 19| wangwu| male| CN| |null| Michael| male| US| | 20| Jack| male| US| | 18| Justin| male| US| +----+--------+------+-------+ 读取数据时,自动推断出了两个分区列gender和country,将该两列的值添加到了DataFrame中 Parquet文件 列式存储格式 Spark SQL支持对Parquet文件的读写,并且可以自动保存源数据的Schema 写入Parquet文件时,为了提高兼容性,所有列都会自动转换为 可为空 状态 加载和写入Parquet文件时,可以使用load()方法和save() 还可以直接使用Spark SQL内置的parquet()方法 //读取Parquet文件为一个DataFrame val usersDF = spark.read.parquet("hdfs://centos01:9000/users.parquet") //将DataFrame相关数据保存为Parquet文件,包括Schema信息 usersDF.select("id","name") .write.parquet("hdfs://centos01:9000/result") JSON数据集 JSON文件的每一行必须包含一个独立有效的JSON对象 一行一个json对象 {"name":"zhangsan","age":32} {"name":"lisi", "age":30} {"name":"wangwu", "age":19} val spark = SparkSession.builder() .appName("SparkSQLDataSource") .config("spark.sql.parquet.mergeSchema",true) .master("local[*]") .getOrCreate()   // 1. 创建用户基本信息表 import spark.implicits._ //创建用户信息Dataset集合 val arr=Array( "{'name':'zhangsan','age':20}", "{'name':'lisi','age':18}" ) val userInfo: Dataset[String] = spark.createDataset(arr) //将Dataset[String]转为DataFrame val userInfoDF = spark.read.json(userInfo) //创建临时视图user_info userInfoDF.createTempView("user_info") //显示数据 userInfoDF.show() // +---+--------+ // |age| name| // +---+--------+ // | 20|zhangsan| // | 18| lisi| // +---+--------+ // 2. 创建用户成绩表 //读取JSON文件 val userScoreDF = spark.read.json("D:\\people\\score.json") //创建临时视图user_score userScoreDF.createTempView("user_score") userScoreDF.show() // +--------+-----+ // | name|score| // +--------+-----+ // |zhangsan| 98| // | lisi| 88| // | wangwu| 95| // +--------+-----+ // 3. 根据name字段关联查询 val resDF=spark.sql("SELECT i.age,i.name,c.score FROM user_info i " + "JOIN user_score c ON i.name=c.name") resDF.show() // +---+--------+-----+ // |age| name|score| // +---+--------+-----+ // | 20|zhangsan| 98| // | 18| lisi| 88| // +---+--------+-----+ Hive表 Hive有大量依赖项,这些依赖项不包括在默认的Spark发行版中 如果在classpath上配置了这些Hive依赖项,Spark将自动加载它们 Hive依赖项必须出现在所有Worker节点上,因为需要访问Hive序列化和反序列化库(SerDes) val spark = SparkSession .builder() .appName("Spark Hive Demo") .enableHiveSupport()//开启Hive支持 .getOrCreate() 创建Hive表students,并指定字段分隔符为制表符 spark.sql("CREATE TABLE IF NOT EXISTS students (name STRING, age INT) " + "ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'") 导入本地数据到Hive表 本地文件/home/hadoop/students.txt zhangsan 20 lisi 25 wangwu 19 spark.sql("LOAD DATA LOCAL INPATH '/home/hadoop/students.txt' INTO TABLE students") 查询表数据 spark.sql("SELECT * FROM students").show() +--------+---+ | name|age| +--------+---+ |zhangsan| 20| | lisi| 25| | wangwu| 19| +--------+---+ 创建表的同时指定存储格式 创建一个Hive表hive_records,数据存储格式为Parquet(默认为普通文本格式) spark.sql("CREATE TABLE hive_records(key STRING, value INT) STORED AS PARQUET") 将DataFrame写入Hive表 使用saveAsTable() 将一个DataFrame写入到指定的Hive表中 //加载students表的数据为DataFrame val studentsDF = spark.table("students") //将DataFrame写入表hive_records中 studentsDF.write.mode(SaveMode.Overwrite).saveAsTable("hive_records") //查询hive_records表数据并显示到控制 spark.sql("SELECT * FROM hive_records").show() Hive数据源,提交之前需要做好Hive数据仓库、元数据库等的配置 JDBC url 连接的 JDBC URL driver JDBC 驱动的类名 user 数据库用户名 password 数据库密码 dbtable 数据库表名或能代表一张数据库表的子查询 只使用数据库表名,将查询整张表的数据 查询部分数据或多表关联查询,可以使用SQL查询 注意,不允许同时指定 dbtable 和 query 属性 query 指定查询的SQL语句 不允许同时指定 query 和 partitionColumn 属性 当需要指定 partitionColumn 属性时 可以使用 dbtable 属性指定子查询 并使用子查询的别名对分区列进行限定 partitionColumn,lowerBound,upperBound 若有一个被指定则必须全部指定,且必须指定numPartitions属性 如何在从多个Worker中并行读取数据时对表进行分区 partitionColumn必须是表中的数字、日期或时间戳列 lowerBound和upperBound只是用来决定分区跨度 而不是用于过滤表中的行 ,因此表中的所有行都将被分区并返回 numPartitions 对表进行并行读写数据时的最大分区数 决定了并发JDBC连接的最大数量 如果要写入数据的分区数量超过了此限制的值 在写入之前可以调用coalesce(numpartition)将分区数量减少到此限制的值 使用JDBC API对MySQL表student和表score进行关联查询 val jdbcDF = spark.read.format("jdbc") .option("url", "jdbc:mysql://192.168.1.69:3306/spark_db") .option("driver","com.mysql.jdbc.Driver") .option("dbtable", "(select st.name,sc.score from student st,score sc " + "where st.id=sc.id) t") .option("user", "root") .option("password", "123456") .load() dbtable 是一个子查询,相当于SQL查询中的FROM关键字后的一部分 使用query属性编写完整SQL语句进行查询也能达到同样的效果 val jdbcDF = spark.read.format("jdbc") .option("url", "jdbc:mysql://192.168.1.234:3306/spark_db") .option("driver","com.mysql.jdbc.Driver") .option("query", "select st.name,sc.score from student st,score sc " + "where st.id=sc.id") .option("user", "root") .option("password", "123456") .load()
Spark SQL内置函数 org.apache.spark.sql.functions 函数主要分为10类 UDF函数 聚合函数 日期函数 排序函数 非聚合函数 数学函数 混杂函数 窗口函数 字符串函数 集合函数 大部分函数与Hive相同 内置函数两种使用方式 编程方式 SQL语句中使用 import org.apache.spark.sql.functions._ df.select(lower(col("name")).as("name")).show() UDF User Defined Functions 常用的聚合函数 count countDistinct avg max min 自定义聚合函数 UDAF User Defined Aggregate Functions UDF主要针对单个输入,返回单个输出 UDAF 可以针对多个输入进行聚合计算返回单个输出,功能更加强大 UDAF 继承抽象类UserDefinedAggregateFunction 开窗函数 row_number() 使用该函数可以在查询结果中对每个分组的数据,按照其排序的顺序添加一列行号(从1开始) 根据行号可以方便的对每一组数据取前N行(分组取TOPN) row_number() over (partition by 列名 order by 列名 desc) 行号列别名 partition by 按照某一列进行分组。 order by 分组后按照某一列进行组内排序 desc 降序,默认升序

上一篇     下一篇
Spark Standalone 的两种提交方式

Spark大数据分析实战 第3章 RDD弹性分布式数据集

Spark大数据分析实战 第4章 Spark内核源码分析

Spark大数据分析实战 第6章 Kafka分布式消息系统

Spark大数据分析实战 第7章 Spark Streaming实时流处理引擎

Clickhouse 监控运维常用SQL