在贴代码之前先介绍一下DataFrame与DataSet,以下介绍内容来自以下博客:
https://www.cnblogs.com/seaspring/p/5831677.html
DataFrame
DataFrame是一个分布式集合,其中数据逻辑存储结构为有名字的列。它概念上等价于关系数据库中的表,一个列名对应很多列值,但底层做了更多的优化。DataFrame可以从很多数据源构建,比如:已经存在的RDD、结构化文件(json文件、csv文件等)、外部数据库、Hive表。
DataFrame的前身是SchemaRDD,从Spark 1.3.0开始SchemaRDD更名为DataFrame。与SchemaRDD的主要区别是:DataFrame不再直接继承自RDD,而是自己实现了RDD的绝大多数功能。你仍旧可以在DataFrame上调用.rdd方法将其转换为一个RDD。RDD可看作是分布式的对象的集合,Spark并不知道对象的详细模式信息,DataFrame可看作是分布式的Row对象的集合,其提供了由列组成的详细模式信息,使得Spark SQL可以进行某些形式的执行优化。DataFrame和普通的RDD的逻辑框架区别如下所示:
DataFrame不仅比RDD有更加丰富的算子,更重要的是它可以进行执行计划优化(得益于Catalyst SQL解析器),另外Tungsten项目给DataFrame的执行效率带来了很大提升(不过Tungsten优化也可能在后续开发中加入到RDD API中)。
但是在有些情况下RDD可以表达的逻辑用DataFrame无法表达,所以后续提出了Dataset API,Dataset结合了RDD和DataFrame的好处。
Dataset
Dataset是Spark 1.6新添加的一个实验性接口,其目的是想结合RDD的好处(强类型(这意味着可以在编译时进行类型安全检查)、可以使用强大的lambda函数)和Spark SQL的优化执行引擎的好处。可以从JVM对象构造出Dataset,然后使用类似于RDD的函数式转换算子(map/flatMap/filter等)对其进行操作。
Dataset通过Encoder实现了自定义的序列化格式,使得某些操作可以在无需解序列化的情况下直接进行。另外Dataset还进行了包括Tungsten优化在内的很多性能方面的优化。
实际上Dataset是包含了DataFrame的功能的,这样二者就出现了很大的冗余,故Spark 2.0将二者统一:保留Dataset API,把DataFrame表示为Dataset[Row],即Dataset的子集。
API进化
Spark在迅速的发展,从原始的RDD API,再到DataFrame API,再到Dataset的出现,速度可谓惊人,执行性能上也有了很大提升。
我们在使用API时,应该优先选择DataFrame & Dataset,因为它的性能很好,而且以后的优化它都可以享受到,但是为了兼容早期版本的程序,RDD API也会一直保留着。后续Spark上层的库将全部会用 DataFrame & Dataset,比如MLlib、Streaming、Graphx等。
①使用DataSet的foreach方法进行遍历
DataFrame是DataSet的一个子集,以下遍历的代码也是基于DataSet的API进行遍历的
// 先定义循环遍历里面的处理函数
// 输入参数类型为Row
def circularProcess(line : Row) : Unit = {
// 打印行里面的值,此处是获取第一列的值
println(line.get(0))
val df : DataFrame = (这里是获取DataFrame内容代码,获取方式有多种,见以上提到的内容)
// 调用DataSet的(foreach(f: T => Unit): Unit)方法进行遍历
// 将上面定义的处理函数作为参数传入foreach方法
df.foreach(circularProcess : Row => Unit)
②利用DataSet的collect方法将DataFrame变成数组再用for循环进行遍历
第①种方法有个不太方便的点,就是遍历处理函数不能传其他参数,只能有Row类型的一个输入参数。而转换成数组再for循环则可以直接使用同一个函数里面的参数,不用再传递
正常DataFrame调用show方法是以表格的形式展示数据
DataFrame调用collect方法后是以[ [..,..,..] , [..,..,..] , [..,..,..] ...... ]的形式存储数据,外层数组是每一行数据(Row),里层数组是一行中每一列(Column)的数据,因此调用collect方法生成的数据其实就是二维数组,即数组里面的数据元素也是数组,因此遍历方式就是使用二维数据遍历方式
var array = df.collect
for(i <- 0 to array.length-1){
for(j <- 0 to array(i).length-1){
println(array(i)(j))
代码运行结果如下所示
1. 多表多行多列的情况
foreach (DataTable dt in YourDataset.Tables) //遍历所有的datatable
foreach (DataRow dr in dt.Rows) //遍历所有的行
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class TestSparkSql {
publ...
说起dataframe,大家一般会首先想起pandas.dataframe。随着数据科学越来越火热,大部分同学都使用过python去进行一些数据科学的实践,也应该会对dataframe的简单易用颇有好感。
然而pandas只能用于处理单机问题,面对工业级的海量数据处理和计算,就显得无能为力。
spark作为分布式计算框架,在工业界占据了比较主流的地位。spark同样也提供了dataframe...
你可以使用 for 循环来遍历 dataframe 中的每一行。在每次循环中,你可以使用 iterrows() 方法来获取 dataframe 的每一行。示例代码如下:
forindex, row in df.iterrows():
print(index, row)
在这段代码中,index 是每一行的索引,row 是一个包含每一列数据的 Series 对象。你可以使用 row['co...
我之前写的代码大体功能如下:
val map= new mutable.HashMap[String, String]
val df: DataFrame = DbDataApi.requestColMetaInfo(dataId)
df.foreach(row =>{
map.put(row.getAs[String](fieldName = "colName"),row.getAs[String](fieldName = "col1"))
运行后发现
1. 首先spark读取hive表,得到DataFrame。
如果直接对spark的dataframe进行遍历的话,需要进行collect操作,这对性能消耗是非常大的,一般不建议直接对dataframe进行collect操作。
2. 将DataFrame转为RD...
// |-- _1: long (nullable = false)
// |-- _2: string (nullable = true)
// |-- _3: string (nullable = true)
// |-- _4: double (nullable = false)
最简单的办法toDF方法
val schemas= Seq("id", "x1.
本博客周五更新
本文记录spark 分布式数据类型DataSet的基本原理和使用方法。
DataSet是Spark1.6添加的分布式数据集合,Spark2.0合并DataSet和DataFrame数据集合API,DataFrame变成DataSet的子集。
DataSet继承RDD优点,并使用Spark SQL优化的执行引擎。支持JVM对象构建,支持函数式转换(map/flatmap/filter)等多种操作
IntelliJ IDEA使用debug模式启动应用报错: java.net.BindException "Address already in use: JVM_Bind"
w1047667241:
Spark/Scala/SparkSQL问题记录:使用Scala语言遍历DateFrame/DataSet数据集里的每一行、每一列
阿政557: