织梦网站上传新闻,外包公司怎么赚钱,江苏省建筑网监督信息平台,WordPress代码实现标签页面文章目录 day05_Spark SQL课程笔记一、今日课程内容二、Spark SQL 基本介绍#xff08;了解#xff09;1、什么是Spark SQL**为什么 Spark SQL 是“SQL与大数据之间的桥梁”#xff1f;****实际意义**为什么要学习Spark SQL呢?**为什么 Spark SQL 像“瑞士军刀”#xff1… 文章目录 day05_Spark SQL课程笔记一、今日课程内容二、Spark SQL 基本介绍了解1、什么是Spark SQL**为什么 Spark SQL 是“SQL与大数据之间的桥梁”****实际意义**为什么要学习Spark SQL呢?**为什么 Spark SQL 像“瑞士军刀”**2、Spark SQL 与 HIVE异同3、Spark SQL的数据结构对比 三、Spark SQL的入门案例掌握四、DataFrame详解熟悉1.DataFrame基本介绍2.DataFrame的构建方式2.1 createDataFrame()创建2.1.1 基于列表2.1.2 基于RDD普通方式2.1.3 基于RDD反射方式 2.2 toDF()创建2.3 read读取外部文件2.3.1 Text方式读取2.3.2 CSV方式读取2.3.3 JSON方式读取 3.DataFrame的相关API3.1 SQL相关的API3.2 DSL相关的API 4.Spark SQL词频统计 01_sparkSession和sparkContext区别联系.py结果 02_[掌握]spark_sql词频统计.py结果 03_createDataFrame方式基于列表方式创建df.py结果 04_createDataFrame方式基于RDD创建df.py结果 05_createDataFrame方式基于RDD反射创建df.py结果 06_toDF方式把RDD转换为df.py结果 07_read传统api方式读取text_csv_json.py结果 08_read简写api方式读取text_csv_json.py结果 09_spark_sql词频统计_多种方式.py结果 day05_Spark SQL课程笔记
一、今日课程内容
1- Spark SQL的基本介绍了解2- Spark SQL的入门案例了解3- DataFrame详解掌握4- Spark SQL的综合案例熟悉 今日目的掌握DataFrame详解
二、Spark SQL 基本介绍了解 https://spark.apache.org/sql 1、什么是Spark SQL
Spark SQL是Spark多种组件中其中一个主要是用于处理大规模的**【结构化数据】**
什么是结构化数据: 一份数据, 每一行都有固定的列, 每一列的类型都是一致的 我们将这样的数据称为结构化的数据
例如: mysql的表数据1 张三 202 李四 153 王五 184 赵六 12简单来说Spark SQL是Spark中用于处理结构化数据的模块就像是“SQL与大数据之间的桥梁”让用户能够用熟悉的SQL语句查询和分析大规模数据。 具体而言 核心功能 DataFrame API提供类似于Pandas DataFrame的API支持结构化数据处理。SQL查询用户可以直接使用标准SQL语句查询数据无需编写复杂的分布式计算代码。数据源集成支持从多种数据源如Hive、Parquet、JSON、JDBC读取数据并将结果写回到这些数据源。优化引擎内置Catalyst优化器能够自动优化查询计划提升执行效率。 应用场景 在数据仓库中使用Spark SQL查询和分析海量数据生成报表和洞察。在ETL任务中使用Spark SQL清洗和转换数据提升数据处理效率。在实时分析中结合Structured Streaming使用Spark SQL处理实时数据流。 实际生产场景 在电商平台中使用Spark SQL分析用户行为数据生成个性化推荐。在金融领域使用Spark SQL处理交易数据进行风险分析和预测。 总之Spark SQL结合了SQL的易用性和Spark的分布式计算能力为结构化数据处理提供了高效、灵活的工具广泛应用于数据分析和处理任务。 为什么 Spark SQL 是“SQL与大数据之间的桥梁” 连接 SQL 与分布式计算 SQL传统SQL用于查询关系型数据库适合小规模数据处理。大数据分布式计算框架如Spark用于处理海量数据但需要编写复杂代码。Spark SQL让用户能够用熟悉的SQL语句直接查询大规模数据无需编写复杂的分布式计算代码从而在SQL的易用性和大数据的处理能力之间架起桥梁。 统一数据访问 SQL通常只能访问关系型数据库。Spark SQL支持多种数据源如Hive、Parquet、JSON、JDBC等将不同数据源的数据统一为结构化数据方便用SQL查询。 高性能优化 SQL传统SQL引擎在处理大数据时性能有限。Spark SQL通过Catalyst优化器和Tungsten引擎自动优化查询计划利用分布式计算和内存加速提升大数据查询性能。 降低大数据处理门槛 SQL数据分析师和开发者熟悉SQL但可能不熟悉分布式计算。Spark SQL让这些用户无需学习复杂的分布式编程直接用SQL处理大数据降低了大数据处理的门槛。 支持复杂场景 SQL适合简单的查询和分析。Spark SQL不仅支持标准SQL还提供DataFrame API支持复杂的数据处理逻辑如ETL、机器学习数据准备并可与Structured Streaming结合处理实时数据流。 实际意义
Spark SQL通过将SQL的易用性与大数据的分布式计算能力结合让用户能够轻松处理海量数据同时保持高性能和灵活性真正成为“SQL与大数据之间的桥梁”。
为什么要学习Spark SQL呢?
1- 会 SQL的人, 一定比会大数据的人多
2- Spark SQL 既可以编写SQL语句, 也可以编写代码, 甚至可以混合使用
3- Spark SQL 可以 和 HIVE进行集成, 集成后, 可以替换掉HIVE原有MR的执行引擎, 提升效率简单来说学习Spark SQL就像是“掌握了一把万能钥匙”能够轻松处理和分析大规模结构化数据为数据驱动的业务决策提供强大支持。 具体而言 高效处理大数据Spark SQL基于Spark引擎能够分布式处理TB甚至PB级数据远超传统数据库的性能。SQL的易用性使用标准SQL语句查询数据降低学习成本尤其适合熟悉SQL的数据分析师和开发人员。多数据源支持支持从Hive、Parquet、JSON、JDBC等多种数据源读取数据满足复杂的数据集成需求。与Spark生态无缝集成Spark SQL可以与Spark的其他模块如MLlib、GraphX、Structured Streaming无缝集成支持从数据清洗到机器学习、实时分析的完整流程。优化性能内置Catalyst优化器和Tungsten引擎能够自动优化查询计划提升执行效率。 实际生产场景 在数据仓库中使用Spark SQL快速查询和分析海量数据生成业务报表。在实时分析中结合Structured Streaming使用Spark SQL处理实时数据流支持实时决策。在机器学习中使用Spark SQL清洗和准备数据为模型训练提供高质量的数据输入。 总之学习Spark SQL能够让你在大数据时代游刃有余无论是数据分析、实时处理还是机器学习都能找到用武之地为职业发展和技术能力提升带来巨大价值。 Spark SQL特点:
1- 融合性: 既可以使用标准SQL语言, 也可以编写代码, 同时支持混合使用2- 统一的数据访问: 可以通过统一的API来对接不同的数据源3- HIVE的兼容性: Spark SQL可以和HIVE进行整合, 整合后替换执行引擎为Spark, 核心: 基于HIVE的metastore来处理4- 标准化连接: Spark SQL也是支持 JDBC/ODBC的连接方式简单来说Spark SQL的特点就像是“瑞士军刀”集成了SQL的易用性、Spark的分布式计算能力和强大的优化引擎为结构化数据处理提供了高效、灵活的工具。 具体而言 统一的数据访问支持从多种数据源如Hive、Parquet、JSON、JDBC读取数据并将结果写回到这些数据源。SQL与DataFrame API既支持标准SQL查询又提供DataFrame API适合不同开发习惯的用户。高性能优化内置Catalyst优化器和Tungsten引擎能够自动优化查询计划提升执行效率。与Spark生态无缝集成可以与Spark的其他模块如MLlib、GraphX、Structured Streaming无缝集成支持从数据清洗到机器学习、实时分析的完整流程。Hive兼容性完全兼容Hive支持HiveQL查询和Hive元数据访问方便从Hive迁移到Spark SQL。结构化数据处理专门为结构化数据设计支持复杂的数据类型如嵌套结构、数组、Map等。实时数据处理结合Structured Streaming支持实时数据流的SQL查询和分析。 实际生产场景 在数据仓库中使用Spark SQL快速查询和分析海量数据生成业务报表。在实时分析中结合Structured Streaming使用Spark SQL处理实时数据流支持实时决策。在机器学习中使用Spark SQL清洗和准备数据为模型训练提供高质量的数据输入。 总之Spark SQL凭借其强大的功能和灵活的接口成为大数据处理领域的利器无论是数据分析、实时处理还是机器学习都能发挥重要作用。 为什么 Spark SQL 像“瑞士军刀” 多功能性 瑞士军刀集多种工具于一身应对不同任务。Spark SQL集成SQL查询、DataFrame API、流处理等功能适应多种数据处理场景。 灵活性 瑞士军刀工具切换灵活适应不同需求。Spark SQL支持多种数据源如Hive、Parquet、JSON等和开发方式SQL或API满足不同开发习惯。 高效性 瑞士军刀设计精巧使用效率高。Spark SQL通过Catalyst优化器和Tungsten引擎自动优化查询计划利用列式存储和内存计算加速处理。 广泛适用性 瑞士军刀适用于多种场景。Spark SQL覆盖批处理、实时流处理、机器学习等适用于数据仓库、实时分析、数据湖等多种场景。 易用性 瑞士军刀操作简单易于使用。Spark SQL兼容Hive支持标准SQL降低大数据处理门槛。
SparkSQL发展历史:
● 2014年 1.0正式发布
● 2015年 1.3 发布DataFrame数据结构, 沿用至今
● 2016年 1.6 发布Dataset数据结构(带泛型的DataFrame), 适用于支持泛型的语言(Java\Scala)
● 2016年 2.0 统一了Dataset 和 DataFrame, 以后只有Dataset了, Python用的DataFrame就是 没有泛型的Dataset
注意: 2019年 3.0 发布 性能大幅度提升SparkSQL变化不大2、Spark SQL 与 HIVE异同
相同点:
1- 都是分布式SQL计算引擎
2- 都可以处理大规模的结构化数据
3- 都可以建立Yarn集群之上运行不同点:
1- Spark SQL是基于内存计算, 而HIVE SQL是基于磁盘进行计算的
2- Spark SQL没有元数据管理服务(自己维护), 而HIVE SQL是有metastore的元数据管理服务的
3- Spark SQL底层执行Spark RDD程序, 而HIVE SQL底层执行是MapReduce
4- Spark SQL可以编写SQL也可以编写代码但是HIVE SQL仅能编写SQL语句简单来说Spark SQL和Hive都是用于处理大规模结构化数据的工具但Spark SQL更像是“高性能跑车”而Hive则是“可靠的卡车”两者各有优劣适合不同的场景。 具体而言 相同点 SQL支持两者都支持标准SQL查询降低了使用门槛。大数据处理都适用于处理大规模结构化数据支持TB甚至PB级数据。Hive兼容性Spark SQL完全兼容Hive支持HiveQL查询和Hive元数据访问。 不同点 执行引擎 Spark SQL基于Spark引擎采用内存计算适合迭代计算和实时处理。Hive基于MapReduce引擎采用磁盘计算适合离线批处理。 性能 Spark SQL的性能通常优于Hive尤其是在复杂查询和迭代计算场景中。Hive在处理超大规模数据时稳定性更高但速度较慢。 实时性 Spark SQL支持实时数据处理通过Structured Streaming。Hive主要用于离线批处理实时性较差。 易用性 Spark SQL提供DataFrame API支持多种编程语言如Python、Scala、Java开发更灵活。Hive主要依赖SQL扩展性较弱。 实际生产场景 在需要快速迭代和实时分析的场景中如用户行为分析Spark SQL更为适合。在超大规模离线批处理场景中如历史数据归档Hive更为稳定可靠。 总之Spark SQL和Hive各有优势选择时需根据业务需求、数据规模和性能要求综合考虑。两者也可以结合使用发挥各自的优势。 3、Spark SQL的数据结构对比 说明:pandas的DataFrame: 二维表 处理单机结构数据SparkCore的RDD: 处理任何的数据结构 处理大规模的分布式数据SparkSQL的DataFrame: 二维表 处理大规模的分布式结构数据RDDResilient Distributed Dataset是Spark中最基本的抽象代表了一个不可变、分布式的数据集合。RDD支持并行操作可以在集群中的多个节点上进行处理。RDD具有容错性即使在节点故障时也能够自动恢复。但是RDD只提供了基本的功能对于结构化数据的处理能力有限。DataFrame是Spark SQL中的一个概念它是一种以列为主的分布式数据集合类似于关系型数据库中的表格。DataFrame具有数据结构化的特点每一列都有相应的数据类型而且可以使用SQL语句进行查询和操作。DataFrame也支持大部分RDD的操作但是在处理结构化数据方面更加方便。DataSet是Spark 2.0引入的一种新的API它是DataFrame的一个扩展提供了类型安全的数据操作。DataSet在编译时检查数据类型可以避免一些运行时的错误。与DataFrame相比DataSet更加适用于需要强类型支持的场景但是在灵活性和易用性方面可能略逊于DataFrame。由于Python不支持泛型, 所以无法使用Dataset类型, 客户端仅支持DataFrame类型三、Spark SQL的入门案例掌握 SparkSession 和 SparkContext 是 Apache Spark 中两个重要的组件它们在 Spark 应用程序中扮演着不同的角色。SparkContext:SparkContext 是 Spark 1.x 版本中最重要的入口点在 Spark 2.x 版本中它已经被 SparkSession 取代但在一些旧的代码和文档中仍然可能会看到它的存在。SparkContext 是 Spark 应用程序与 Spark 集群通信的主要入口点。它负责与集群管理器如 YARN、Mesos 或 Spark 自带的 Standalone通信以便分配资源和执行任务。SparkContext 提供了创建 RDD弹性分布式数据集的功能RDD 是 Spark 中基本的数据抽象代表了分布在集群中的不可变的数据集。SparkSession:在 Spark 2.x 中SparkSession 被引入来取代 SparkContext并提供了更多功能和简化的 API。它是 Spark 应用程序中的入口点封装了 SparkContext。SparkSession 提供了一种统一的入口点用于读取数据、执行查询、进行数据处理等各种 Spark 任务。SparkSession 提供了 DataFrame 和 Dataset API这两种 API 提供了更高级别、更易于使用的抽象用于处理结构化数据。与 SparkContext 不同SparkSession 可以与 Hive 集成允许在 Spark 应用程序中执行 SQL 查询并访问 Hive 中的表和数据。总之SparkContext 是 Spark 1.x 版本中的主要入口点负责与集群通信和管理资源而 SparkSession 是 Spark 2.x 中的主要入口点提供了更多的功能和简化的 API用于执行各种 Spark 任务并且可以与 Hive 集成。还可以通过SparkSession对象还是可以得到SparkContext对象。
入门体验
# 导包
import os
from pyspark.sql import SparkSession# 绑定指定的python解释器os.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3# 创建main函数
if __name__ __main__:# 1.创建SparkContext对象spark SparkSession.builder.appName(pyspark_demo).master(local[*]).getOrCreate()sc spark.sparkContext# print(spark,type(spark))# print(sc,type(sc))# 2.验证是否能生成rddtextRDD sc.textFile(file:///export/data/spark_project/spark_sql/data/uniqlo.csv)# collect: 搜集数据触发任务展示数据 count:获取数据条数 type:查看类型# print(textRDD.collect())print(textRDD.count())print(type(textRDD)) # class pyspark.rdd.RDD# 验证是否能生成DataFramedf spark.read.csv(file:///export/data/spark_project/spark_sql/data/uniqlo.csv)# show: 展示数据 count:获取数据条数 type:查看类型# print(df.show())print(df.count())print(type(df)) # class pyspark.sql.dataframe.DataFrame# 3.关闭资源sc.stop()spark.stop()四、DataFrame详解熟悉
1.DataFrame基本介绍 DataFrame表示的是一个二维的表。二维表必然存在行、列等表结构描述信息表结构描述信息(元数据Schema): StructType对象
字段: StructField对象可以描述字段名称、字段数据类型、是否可以为空
行: Row对象
列: Column对象包含字段名称和字段值在一个StructType对象下由多个StructField组成构建成一个完整的元数据信息如何构建表结构信息数据
2.DataFrame的构建方式
方式1: 使用SparkSession的createDataFrame(data,schema)函数创建data参数1.基于List列表数据进行创建2.基于RDD弹性分布式数据集进行创建3.基于pandas的DataFrame数据进行创建schema参数1: 字符串格式一 :“字段名1 字段类型,字段名2 字段类型”格式二(推荐):“字段名1:字段类型,字段名2:字段类型”2: List格式: [字段名1,字段名2] 3: DataType推荐用的最多格式一:schemaStructType().add(字段名1,字段类型).add(字段名2,字段类型)格式二:schemaStructType([StructField(字段名1,类型),StructField(字段名1,类型)])方式2: 使用DataFrame的toDF(colNames)函数创建DataFrame的toDF方法是一个在Apache Spark的DataFrame API中用来创建一个新的DataFrame的方法。这个方法可以将一个RDD转换为DataFrame或者将一个已存在的DataFrame转换为另一个DataFrame。在Python中你可以使用toDF方法来指定列的名字。如果你不指定列的名字那么默认的列的名字会是_1, _2等等。 格式: rdd.toDF([列名])方式3: 使用SparkSession的read()函数创建在 Spark 中SparkSession 的 read 是用于读取数据的入口点之一它提供了各种方法来读取不同格式的数据并将其加载到 Spark 中进行处理。统一API格式: spark.read.format(text|csv|json|parquet|orc|...) : 读取外部文件的方式.option(k,v) : 选项 可以设置相关的参数 (可选).schema(StructType | String) : 设置表的结构信息.load(加载数据路径) : 读取外部文件的路径, 支持 HDFS 也支持本地简写API格式:注意: 以上所有的外部读取方式都有简单的写法。spark内置了一些常用的读取方案的简写格式: spark.read.文件读取方式(加载数据路径)注意: parquet:是Spark中常用的一种列式存储文件格式和Hive中的ORC差不多, 他俩都是列存储格式
2.1 createDataFrame()创建 场景一般用在开发和测试中。因为只能处理少量的数据 2.1.1 基于列表
# 导包
import os
from pyspark.sql import SparkSession# 绑定指定的python解释器
os.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3# 创建main函数
if __name__ __main__:# 1.创建SparkContext对象spark SparkSession.builder.appName(pyspark_demo).master(local[*]).getOrCreate()# 2.创建DF对象data [(1, 张三, 18), (2, 李四, 28), (3, 王五, 38)]df1 spark.createDataFrame(data,schema[id,name,age])# 展示数据df1.show()# 查看结构信息df1.printSchema()print(---------------------------------------------------------)df2 spark.createDataFrame(data,schemaid int,name string,age int)# 展示数据df2.show()# 查看结构信息df2.printSchema()print(---------------------------------------------------------)df3 spark.createDataFrame(data,schemaid:int,name:string,age:int)# 展示数据df3.show()# 查看结构信息df3.printSchema()# 3.关闭资源spark.stop()2.1.2 基于RDD普通方式 场景RDD可以存储任意结构的数据而DataFrame只能处理二维表数据。在使用Spark处理数据的初期可能输入进来的数据是半结构化或者是非结构化的数据那么我可以先通过RDD对数据进行ETL处理成结构化数据再使用开发效率高的SparkSQL来对后续数据进行处理分析。 Schema选择StructType对象来定义DataFrame的“表结构”转换RDD
# 导包
import os
from pyspark.sql import SparkSession# 绑定指定的python解释器
from pyspark.sql.types import StructType, StringType, StructFieldos.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3# 创建main函数
if __name__ __main__:# 1.创建SparkContext对象spark SparkSession.builder.appName(pyspark_demo).master(local[*]).getOrCreate()sc spark.sparkContext# 2.读取生成rddtextRDD sc.textFile(file:///export/data/spark_project/spark_sql/data/data1.txt)print(type(textRDD)) # class pyspark.rdd.RDDetlRDD textRDD.map(lambda line:line.split(,)).map(lambda l:(l[0],l[1]))# 3.定义schema结构信息schema1 StructType().add(name,StringType(),True).add(age,StringType(),True)schema2 StructType([StructField(name,StringType(),True),StructField(age,StringType(),True)])schema3 [name,age]schema4 name string,age stringschema5 name:string,age:string# 4.创建DF对象dfpeople spark.createDataFrame(etlRDD,schema5)# 5.df展示结构信息dfpeople.show()dfpeople.printSchema()# 6.拓展: 创建临时视图,方便sql查询dfpeople.createTempView(peoples)r spark.sql(select * from peoples)r.show()# 7.关闭资源sc.stop()spark.stop()
2.1.3 基于RDD反射方式 Schema使用反射方法来推断Schema模式Spark SQL 可以将 Row 对象的 RDD 转换为 DataFrame从而推断数据类型。 # 导包
import os
from pyspark.sql import SparkSession# 绑定指定的python解释器
from pyspark.sql.types import StructType, StringType, StructField, Rowos.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3# 创建main函数
if __name__ __main__:# 1.创建SparkContext对象spark SparkSession.builder.appName(pyspark_demo).master(local[*]).getOrCreate()sc spark.sparkContext# 2.读取生成rdd# 3.定义schema结构信息textRDD sc.textFile(file:///export/data/spark_project/spark_sql/data/data1.txt)etlRDD_schema textRDD.map(lambda line:line.split(,)).map(lambda l:Row(namel[0],agel[1]))# 4.创建DF对象dfpeople spark.createDataFrame(etlRDD_schema)# 5.df展示结构信息dfpeople.show()dfpeople.printSchema()# 6.拓展: 创建临时视图,方便sql查询dfpeople.createTempView(peoples)r spark.sql(select * from peoples)r.show()# 7.关闭资源sc.stop()spark.stop()2.2 toDF()创建 schema模式编码在字符串中,toDF参数用于指定列的名字。如果你不指定列的名字那么默认的列的名字会是_1, _2等等。 # 导包
import os
from pyspark.sql import SparkSession# 绑定指定的python解释器
from pyspark.sql.types import StructType, StringType, StructField, Rowos.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3# 创建main函数
if __name__ __main__:# 1.创建SparkContext对象spark SparkSession.builder.appName(pyspark_demo).master(local[*]).getOrCreate()sc spark.sparkContext# 2.读取生成rdd# 3.定义schema结构信息textRDD sc.textFile(file:///export/data/spark_project/spark_sql/data/data1.txt)etlRDD textRDD.map(lambda line:line.split(,))# 4.创建DF对象dfpeople etlRDD.toDF([name,age])# 5.df展示结构信息dfpeople.show()dfpeople.printSchema()# 6.拓展: 创建临时视图,方便sql查询dfpeople.createTempView(peoples)r spark.sql(select * from peoples)r.show()# 7.关闭资源sc.stop()spark.stop()2.3 read读取外部文件
复杂API
统一API格式:
spark.read.format(text|csv|json|parquet|orc|avro|jdbc|.....) # 读取外部文件的方式.option(k,v) # 选项 可以设置相关的参数 (可选).schema(StructType | String) # 设置表的结构信息.load(加载数据路径) # 读取外部文件的路径, 支持 HDFS 也支持本地简写API
请注意: 以上所有的外部读取方式都有简单的写法。spark内置了一些常用的读取方案的简写
格式: spark.read.读取方式()例如: df spark.read.csv(pathfile:///export/data/_03_spark_sql/data/stu.txt,headerTrue,sep ,inferSchemaTrue,encodingutf-8,)2.3.1 Text方式读取
text方式读取文件1- 不管文件中内容是什么样的text会将所有内容全部放到一个列中处理2- 默认生成的列名叫value数据类型string3- 我们只能够在schema中修改字段value的名称其他任何内容不能修改# 导包
import os
from pyspark.sql import SparkSession# 绑定指定的python解释器
from pyspark.sql.types import StructType, StringType, StructField, Rowos.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3# 创建main函数
if __name__ __main__:# 1.创建SparkContext对象spark SparkSession.builder.appName(pyspark_demo).master(local[*]).getOrCreate()# 2.读取数据# 注意: 读取text文件默认只有1列,且列名交value,可以通过schema修改df spark.read\.format(text)\.schema(info string)\.load(file:///export/data/spark_project/spark_sql/data/data1.txt)# 5.df展示结构信息df.show()df.printSchema()# 6.拓展: 创建临时视图,方便sql查询df.createTempView(peoples)r spark.sql(select * from peoples)r.show()# 6.关闭资源spark.stop()2.3.2 CSV方式读取
csv格式读取外部文件1- 复杂API和简写API都必须掌握2- 相关参数作用说明2.1- path指定读取的文件路径。支持HDFS和本地文件路径2.2- schema手动指定元数据信息2.3- sep指定字段间的分隔符2.4- encoding指定文件的编码方式2.5- header指定文件中的第一行是否是字段名称2.6- inferSchema根据数据内容自动推断数据类型。但是推断结果可能不精确# 导包
import os
from pyspark.sql import SparkSession# 绑定指定的python解释器
from pyspark.sql.types import StructType, StringType, StructField, Rowos.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3# 创建main函数
if __name__ __main__:# 1.创建SparkContext对象spark SparkSession.builder.appName(pyspark_demo).master(local[*]).getOrCreate()# 2.读取数据# 注意: csv文件可以识别多个列,可以使用schema指定列名,类型# 原始方式# df spark.read\# .format(csv)\# .schema(name string,age int)\# .option(sep,,)\# .option(encoding,utf8)\# .option(header,False)\# .load(file:///export/data/spark_project/spark_sql/data/data1.txt)# 简化方式df spark.read.csv(schemaname string,age int,sep,,encodingutf8,headerFalse,pathfile:///export/data/spark_project/spark_sql/data/data1.txt)# 5.df展示结构信息df.show()df.printSchema()# 6.拓展: 创建临时视图,方便sql查询df.createTempView(peoples)r spark.sql(select * from peoples)r.show()# 7.关闭资源spark.stop()
2.3.3 JSON方式读取
json读取数据
1- 需要手动指定schema信息。如果手动指定的时候字段名称与json中的key名称不一致会解析不成功以null值填充
2- csv/json中schema的结构如果是字符串类型那么字段名称和字段数据类型间只能以空格分隔json的数据内容
{id: 1,name: 张三,age: 20}
{id: 2,name: 李四,age: 23,address: 北京}
{id: 3,name: 王五,age: 25}
{id: 4,name: 赵六,age: 29}代码实现
# 导包
import os
from pyspark.sql import SparkSession# 绑定指定的python解释器
from pyspark.sql.types import StructType, StringType, StructField, Rowos.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3# 创建main函数
if __name__ __main__:# 1.创建SparkContext对象spark SparkSession.builder.appName(pyspark_demo).master(local[*]).getOrCreate()# 2.读取数据# 注意: json的key和schema指定的字段名不一致,会用null补充,如果没有数据也是用null补充# 简化方式df spark.read.json(schemaid int,name string,age int,address string,encodingutf8,pathfile:///export/data/spark_project/spark_sql/data/data2.txt)# 5.df展示结构信息df.show()df.printSchema()# 6.拓展: 创建临时视图,方便sql查询df.createTempView(peoples)r spark.sql(select * from peoples)r.show()# 关闭资源spark.stop()3.DataFrame的相关API
操作DataFrame一般有二种操作方案一种为【DSL方式】另一种为【SQL方式】
SQL方式: 通过编写SQL语句完成统计分析操作
DSL方式: 特定领域语言使用DataFrame特有的API完成计算操作也就是代码形式从使用角度来说: SQL可能更加的方便一些当适应了DSL写法后你会发现DSL要比SQL更好用
从Spark角度来说: 更推荐使用DSL方案此种方案更加利于Spark底层的优化处理3.1 SQL相关的API
创建一个视图/表
df.createTempView(视图名称): 创建一个临时的视图(表名)
df.createOrReplaceTempView(视图名称): 创建一个临时的视图(表名)如果视图存在直接替换
注意: 临时视图仅能在当前这个Spark Session的会话中使用df.createGlobalTempView(视图名称): 创建一个全局视图运行在一个Spark应用中多个spark会话中都可以使用。在使用的时候必须通过 global_temp.视图名称 方式才可以加载到。较少使用执行SQL语句
spark.sql(书写SQL)3.2 DSL相关的API 官网链接: https://spark.apache.org/docs/3.1.2/api/python/reference/pyspark.sql.html#dataframe-apis select()类似于SQL中select, SQL中select后面可以写什么, 这样同样也一样distinct(): 去重后返回一个新的DataFramewithColumn(参数1,参数2)用来产生新列。参数1是新列的名称参数2是新列数据的来源withColumnRenamed(参数1,参数2)给字段重命名操作。参数1是旧字段名参数2是新字段名alias(): 返回设置了别名的新DataFrameagg()执行聚合操作。如果有多个聚合聚合之间使用逗号分隔即可,比较通用where()和filter()用于对数据进行过滤操作, 一般在spark SQL中主要使用wheregroupBy()使用指定的列对DataFrame进行分组方便后期对它们进行聚合orderBy()返回按指定列排序的新DataFramelimit() : 返回指定数目的结果集show()用于展示DF中数据, 默认仅展示前20行 参数1设置默认展示多少行 默认为20参数2是否为阶段列, 默认仅展示前20个字符数据, 如果过长, 不展示(一般不设置) printSchema()用于打印当前这个DF的表结构信息
DSL主要支持以下几种传递的方式: str | Column对象 | 列表str格式: 字段Column对象: DataFrame含有的字段 df[字段]执行过程新产生: F.col(字段)列表: [字段1,字段2...][df[字段1],df[字段2]]为了能够支持在编写Spark SQL的DSL时候在DSL中使用SQL函数专门提供一个SQL的函数库。直接加载使用 链接: https://spark.apache.org/docs/3.1.2/api/sql/index.html 导入这个函数库: import pyspark.sql.functions as F
通过F调用对应的函数即可,常见函数如下:F.explode()F.split()F.count()F.sum()F.avg()F.max()F.min()...4.Spark SQL词频统计
准备一个words.txt的文件,words.txt文件的内容如下:
hadoop hive hadoop sqoop hive
sqoop hadoop zookeeper hive hue
hue sqoop hue zookeeper hive
spark oozie spark hadoop oozie
hive oozie spark hadoop需求分析
1- 扫描文件将每行内容切分得到单个的单词
2- 组织DataFrame的数据结构,分别利用SQL风格和DSL风格完成每个单词个数统计
3- 要求最后结果有两列:一列是单词一列是次数
代码实现
# 导包
import os
from pyspark.sql import SparkSession,functions as F# 绑定指定的python解释器
os.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3# 创建main函数
if __name__ __main__:# 1.创建spark对象# appName:应用程序名称 master:提交模式# getOrCreate:在builder构建器中获取一个存在的SparkSession如果不存在则创建一个新的spark SparkSession.builder.appName(sparksql_demo).master(local[*]).getOrCreate()# 2.通过read读取外部文件方式创建DF对象df spark.read\.format(text)\.schema(words string)\.load(file:///export/data/spark_project/spark_sql/data/data3.txt)print(type(df))# 需求: 从data3.txt读取所有单词,然后统计每个单词出现的次数# 3.SQL风格# 方式1: 使用子查询方式# 先创建临时视图,然后通过sql语句查询展示df.createTempView(words_tb)qdf spark.sql(select words,count(1) as cnt from (select explode(split(words, )) as words from words_tb) t group by words)print(type(qdf))qdf.show()# # 方式2: 使用侧视图# qdf spark.sql(# select t.words,count(1) as cnt from words_tb lateral view explode(split(words, )) t as words group by t.words# )print(type(qdf))qdf.show()# 4.DSL风格# 方式1: 分组后直接用count()统计df.select(F.explode(F.split(words, )).alias(words)).groupBy(words).count().show()# 方式1升级版:通过withColumnRenamed修改字段名df.select(F.explode(F.split(words, )).alias(words)).groupBy(words).count().withColumnRenamed(count,cnt).show()# 方式2: 分组后用agg函数df.select(F.explode(F.split(words, )).alias(words)).groupBy(words).agg(F.count(words).alias(cnt)).show()# 方式3: 直接使用withColumdf.withColumn(words,F.explode(F.split(words, ))).groupBy(words).agg(F.count(words).alias(cnt)).show()# 5.释放资源spark.stop()01_sparkSession和sparkContext区别联系.py
# 导包
import os
from pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:# 1.先创建spark session对象spark SparkSession.builder.appName(spark_demo).master(local[*]).getOrCreate()# 注意: 如果操作sparksql直接用spark对象即可,如果还有操作rdd的操作,需要获取sc对象sc spark.sparkContext# 此处就可以写python 代码了...# 使用sc对象读取本地文件,测试返回的是不是RDD对象result1 sc.textFile(file:///export/data/spark_project/05_saprk_sql/resources/uniqlo.csv)print(type(result1)) # class pyspark.rdd.RDDprint(result1.count())print(result1.take(10))# 使用spark对象读取本地文件,测试返回的是不是DataFrame对象result2 spark.read.csv(file:///export/data/spark_project/05_saprk_sql/resources/uniqlo.csv)print(type(result2)) # class pyspark.sql.dataframe.DataFrameprint(result2.count())# 注意: show默认是展示20条数据result2.show(10)# 注意: 最后一定释放资源sc.stop()spark.stop()
结果 02_[掌握]spark_sql词频统计.py
# 导包
import os
from pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:# 先创建spark session对象spark SparkSession.builder.appName(spark_demo).master(local[*]).getOrCreate()# 用sparksql读取存储了多个单词的文件,最后计算词频df spark.read.text(file:///export/data/spark_project/05_saprk_sql/resources/data3.txt)# TODO: 把df对象转换为临时表df.createTempView(words_table)# 如何使用sparkSQL去用sql语句查询词频spark.sql(with t1 as (select explode(split(value, )) as wordfrom words_table)select word,count(*) as cnt from t1 group by word).show()# 注意: 最后一定释放资源spark.stop()
结果 03_createDataFrame方式基于列表方式创建df.py
# 导包
import os
from pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:# 1.先创建spark session对象spark SparkSession.builder.appName(spark_demo).master(local[*]).getOrCreate()# 注意: 如果操作sparksql直接用spark对象即可,如果还有操作rdd的操作,需要获取sc对象sc spark.sparkContext# 演示基于列表创建df# 1.先有列表data [(zhangsan, 18), (lisi, 19), (wangwu, 20)]# 2.1创建df# 注意: schema可以不指定,默认列名是_1,_2...df1 spark.createDataFrame(data)# 3.1验证df数据df1.show()df1.printSchema()print(---------------------------------------------------------------)# 2.2创建df# 注意: schema可以不指定,默认列名是_1,_2...# 指定schema的多种方式,以下任选一个传到createDataFrame方法()即可schema1 [name, age]schema2 name:string,age:intschema3 name string,age intdf2 spark.createDataFrame(data, schema3)# 3.2验证df数据df2.show()df2.printSchema()# 注意: 最后一定释放资源sc.stop()spark.stop()
结果 04_createDataFrame方式基于RDD创建df.py
# 导包
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, StructField# 解决JAVA_HOME 未设置问题
os.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:# 1.先创建spark session对象spark SparkSession.builder.appName(spark_demo).master(local[*]).getOrCreate()# 注意: 如果操作sparksql直接用spark对象即可,如果还有操作rdd的操作,需要获取sc对象sc spark.sparkContext# 1.先用sc读取data1.txt文件生成rdd对象rdd sc.textFile(file:///export/data/spark_project/05_saprk_sql/resources/data1.txt)# 注意: rdd需要提前切割数据rdd rdd.map(lambda x: x.split(,))print(type(rdd))print(rdd.collect())# 2.再基于rdd对象创建df# 指定schema的多种方式,以下任选一个传到createDataFrame方法()即可schema1 [name, age]schema2 name:string,age:stringschema3 name string,age stringschema4 StructType().add(name, StringType()).add(age, StringType())schema5 StructType([StructField(name, StringType()), StructField(age, StringType())])df spark.createDataFrame(rdd, schema1)# 3.验证df数据df.show()df.printSchema()# 注意: 最后一定释放资源sc.stop()spark.stop()
结果 05_createDataFrame方式基于RDD反射创建df.py
# 导包
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, StructField, Row# 解决JAVA_HOME 未设置问题
os.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:# 1.先创建spark session对象spark SparkSession.builder.appName(spark_demo).master(local[*]).getOrCreate()# 注意: 如果操作sparksql直接用spark对象即可,如果还有操作rdd的操作,需要获取sc对象sc spark.sparkContext# 1.先用sc读取data1.txt文件生成rdd对象rdd sc.textFile(file:///export/data/spark_project/05_saprk_sql/resources/data1.txt)# 注意: rdd需要提前切割数据# 注意: Row()的功能是将数据转换为Row对象指定属性和类型rdd rdd.map(lambda x: x.split(,)).map(lambda x: Row(namex[0], ageint(x[1])))print(type(rdd))print(rdd.collect())# 2.再基于rdd对象创建df# 不指定schema,通过反射默认获取rdd中Row对象的属性作为df的schemadf spark.createDataFrame(rdd)# 3.验证df数据df.show()df.printSchema()# 注意: 最后一定释放资源sc.stop()spark.stop()
结果 06_toDF方式把RDD转换为df.py
# 导包
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, StructField, Row# 解决JAVA_HOME 未设置问题
os.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:# 1.先创建spark session对象spark SparkSession.builder.appName(spark_demo).master(local[*]).getOrCreate()# 注意: 如果操作sparksql直接用spark对象即可,如果还有操作rdd的操作,需要获取sc对象sc spark.sparkContext# 1.先用sc读取data1.txt文件生成rdd对象rdd sc.textFile(file:///export/data/spark_project/05_saprk_sql/resources/data1.txt)# 注意: rdd需要提前切割数据rdd rdd.map(lambda x: x.split(,))print(type(rdd))print(rdd.collect())# 2.再把rdd对象直接转换为dfdf rdd.toDF([name, age])# 3.验证df数据df.show()df.printSchema()# 注意: 最后一定释放资源sc.stop()spark.stop()
结果 07_read传统api方式读取text_csv_json.py
# 导包
import os
from pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:# 先创建spark session对象spark SparkSession.builder.appName(spark_demo).master(local[*]).getOrCreate()# 注意: 如果操作sparksql直接用spark对象即可,如果还有操作rdd的操作,需要获取sc对象sc spark.sparkContext# 需求: 读取data1.txt# text文件直接读取一行,不分割,默认是列数据,所以需要指定schema一列接收df1 (spark.read.format(text).schema(info string).load(file:///export/data/spark_project/05_saprk_sql/resources/data1.txt))df1.show()df1.printSchema()print(--------------------------------------------------------------------------------------------------)# 注意: csv默认按照,逗号分隔,而数据正好是逗号,,所以即使不指定sep参数也能用schema指定两列接收df2 (spark.read.format(csv).schema(name string,age int).option(sep, ,).option(encoding, utf8).option(header, False).load(file:///export/data/spark_project/05_saprk_sql/resources/data1.txt))df2.show()df2.printSchema()print(--------------------------------------------------------------------------------------------------)# 注意: json文件默认按照{k:v}格式,默认k作为列名,v作为列值,即使要指定schema,列名也要和k一致,否则都是nulldf3 (spark.read.format(json).schema(id int,name string,age int,address string).option(encoding, utf8).load(file:///export/data/spark_project/05_saprk_sql/resources/data2.txt))df3.show()df3.printSchema()# 注意: 最后一定释放资源sc.stop()spark.stop()
结果 08_read简写api方式读取text_csv_json.py
# 导包
import os
from pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:# 先创建spark session对象spark SparkSession.builder.appName(spark_demo).master(local[*]).getOrCreate()# 注意: 如果操作sparksql直接用spark对象即可,如果还有操作rdd的操作,需要获取sc对象sc spark.sparkContext# 需求: 读取data1.txt# text简写方式没有schema参数,默认列名是valuedf1 spark.read.text(pathsfile:///export/data/spark_project/05_saprk_sql/resources/data1.txt)df1.show()df1.printSchema()print(--------------------------------------------------------------------------------------------------)# 注意: csv默认按照,逗号分隔,而数据正好是逗号,,所以即使不指定sep参数也能用schema指定两列接收df2 spark.read.csv(pathfile:///export/data/spark_project/05_saprk_sql/resources/data1.txt,schemaname string,age int,sep,,encodingutf8,headerFalse)df2.show()df2.printSchema()print(--------------------------------------------------------------------------------------------------)# 注意: json文件默认按照{k:v}格式,默认k作为列名,v作为列值,即使要指定schema,列名也要和k一致,否则都是nulldf3 spark.read.json(pathfile:///export/data/spark_project/05_saprk_sql/resources/data2.txt,schemaid int,name string,age int,address string,encodingutf8)df3.show()df3.printSchema()# 注意: 最后一定释放资源sc.stop()spark.stop()
结果 09_spark_sql词频统计_多种方式.py
# 导包
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F# 解决JAVA_HOME 未设置问题
os.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:# 先创建spark session对象spark SparkSession.builder.appName(spark_demo).master(local[*]).getOrCreate()# 1.加载数据默认返回df对象# 细节: text方式默认一列, 默认列名是valuedf spark.read.text(file:///export/data/spark_project/05_saprk_sql/resources/data3.txt)df.show()# 2.把df转为sql临时表df.createOrReplaceTempView(words_table)# 3.1 sql方式# 方式1:子查询方式spark.sql(select word,count(1) as cntfrom(select explode(split(value, )) as wordfrom words_table) tgroup by word).show()print(----------------------------------------------------------------------------)# 方式2:侧视图方式spark.sql(select word,count(1) as cntfrom words_table lateral view explode(split(value, )) cst as wordgroup by word).show()print()# 3.2 dsl方式# 方式1: 分组后直接用count聚合withColumnRenamed重名df.select(F.explode(F.split(value, )).alias(word)).groupby(word).count().withColumnRenamed(count, cnt).show()print(----------------------------------------------------------------------------)# 对方式方式1优化df.select(F.explode(F.split(value, )).alias(word)).groupby(word).agg(F.count(word).alias(cnt)).show()print(----------------------------------------------------------------------------)# 方式2: withcolumn先产生新列.然后再分组聚合df.withColumn(word,F.explode(F.split(value, ))).groupby(word).agg(F.count(word).alias(cnt)).show()# 注意: 最后一定释放资源spark.stop()
结果