Spark提供了三种主要的与数据相关的API:
在实际中如果我们丢失了RDD的部分分区,可以通过对丢失分区关联性的transformation重新计算得到。而不是在众多节点中做数据的复制等操作。这个特性是RDD的最大优点,它节省了大量的数据管理、复制等操作,使得计算速度更快。
所有的transformation都是惰性的,他们并不是立刻计算出结果,而是只是记住了各个transformation对数据集的依赖关系。当driver程序需要一个action结果时才开始执行。
RDD支持两种类型的算子:transformation是指从已经存在的数据集中计算得到新的数据集;action是指通过对通过对数据集的计算得到一个结果返回给driver。
轻松且有效支持各种数据,包括结构化的和非结构化的。
RDD的API支持Scala、Java、Python和R
RDD的限制
当对结构化的数据进行处理时,RDD没有使用Spark的高级优化器,比如catalyst优化器和Tungsten执行引擎。
不像Dataframe或者Dataset,RDD不会主动推测出数据的schema,而是需要用户在代码里指示。
DataFrame
Spark从1.3版本开始引入Dataframe,它克服了RDD的最主要的挑战。
主要描述:Dataframe是一个分布式的数据collection,而且将数据按照列名进行组织。在概念上它与关系型的数据库的表或者R/Python语言中的DataFrame类似。与之一起提供的还有,Spark引入了catalyst优化器,它可以优化查询。
DataFrame的特性
分布式、列名组织的数据、后台优化。
具体到代码里面,Dataframe就是Dataset<Row>
处理支持结构或者非结构化的格式(比如Avro, CSV, elastic search, 以及Cassandra)以及不同的文件系统(HDFS, HIVE tables, MySQL, etc)。它支持非常多的数据源
它对SQL查询以及DataFrame API都提供优化支持。Dataframe使用catalyst树transformation框架有四个步骤:
1、Analyzing a logical plan to resolve references
2、Logical plan optimization
3、Physical planning
4、Code generation to compile parts of the query to Java bytecode.
使用Spark的SQL可以无修改的支持Hive查询在已经存在的Hive warehouses。它重用了Hive的前端、MetaStore并且对已经存在的Hive数据、查询和UDF提供完整的兼容性。
Tungsten提供了一个物理执行后端,管理内存动态产生expression evaluation的字节码
Dataframe API支持Scala、Java、Python和R
DataFrame的限制
不能在编译时刻对安全性做出检查,而且限制了用户对于未知结构的数据进行操作。比如下面代码在编译时没有错误,但是在执行时会出现异常:
case class Person(name : String , age : Int)
val dataframe = sqlContect.read.json("people.json")
dataframe.filter("salary > 10000").show
=> throws Exception : cannot resolve 'salary' given input age , name
不能保留类对象的结构:
一旦把一个类结构的对象转成了Dataframe,就不能转回去了。下面这个栗子就是指出了:
case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContect.createDataframe(personRDD)
personDF.rdd // returns RDD[Row] , does not returns RDD[Person]
DataSet
主要描述:Dataset API是对DataFrame的一个扩展,使得可以支持类型安全的检查,并且对类结构的对象支持程序接口。它是强类型的,不可变collection,并映射成一个相关的schema。
Dataset API的核心是一个被称为Encoder的概念。它是负责对JVM的对象以及表格化的表达(tabular representation)之间的相互转化。
表格化的表达在存储时使用了Spark内置的Tungsten二进制形式,允许对序列化数据操作并改进了内存使用。在Spark 1.6版本之后,支持自动化生成Encoder,可以对广泛的primitive类型(比如String,Integer,Long等)、Scala的case class以及Java Bean自动生成对应的Encoder。
DataSet的特性
支持RDD和Dataframe的优点:
包括RDD的类型安全检查,Dataframe的关系型模型,查询优化,Tungsten执行,排序和shuffling。
Encoder:
通过使用Encoder,用户可以轻松转换JVM对象到一个Dataset,允许用户在结构化和非结构化的数据操作。
编程语言:
Scala和Java
类型安全检查:
提供编译阶段的安全类型检查。比如下面这个栗子:
case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContect.createDataframe(personRDD)
val ds:Dataset[Person] = personDF.as[Person]
ds.filter(p => p.age > 25)
ds.filter(p => p.salary > 25)
// error : value salary is not a member of person
ds.rdd // returns RDD[Person]
相互转换:
Dataset可以让用户轻松从RDD和Dataframe转换到Dataset不需要额外太多代码。
DataSet的限制
需要把类型转成String:
Querying the data from datasets currently requires us to specify the fields in the class as a string. Once we have queried the data, we are forced to cast column to the required data type. On the other hand, if we use map operation on Datasets, it will not use Catalyst optimizer.
ds.select(col("name").as[String], $"age".as[Int]).collect()
Java API中三种数据格式的相互转换
首先构造一个数据集,是由Person类的结构组成的,然后在此之上看这三个API实例的构造以及相互转换
直接构建出 JavaRDD<Person>
JavaRDD<Person> personJavaRDD = jsc.parallelize(personList);
System.out.println("1. 直接构建出 JavaRDD<Person>");
personJavaRDD.foreach(element -> System.out.println(element.toString()));
Print结果:
直接构建出 JavaRDD<Person>
Person: name = Andy, age = 32
Person: name = Michael, age = 23
Person: name = Justin, age = 19
直接构建出 Dataset<Person>
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> personDS = spark.createDataset(personList, personEncoder);
System.out.println("2. 直接构建出 Dataset<Person>");
personDS.show();
personDS.printSchema();
Print结果:
直接构建出 Dataset<Person>
+---+-------+
|age| name|
+---+-------+
| 32| Andy|
| 23|Michael|
| 19| Justin|
+---+-------+
|-- age: integer (nullable = false)
|-- name: string (nullable = true)
直接构建出 Dataset<Row>
Dataset<Row> personDF = spark.createDataFrame(personList, Person.class);
System.out.println("3. 直接构建出 Dataset<Row>");
personDF.show();
personDF.printSchema();
Print结果:
直接构建出 Dataset<Row>
+---+-------+
|age| name|
+---+-------+
| 32| Andy|
| 23|Michael|
| 19| Justin|
+---+-------+
|-- age: integer (nullable = false)
|-- name: string (nullable = true)
JavaRDD<Person> -> Dataset<Person>
personDS = spark.createDataset(personJavaRDD.rdd(), personEncoder);
System.out.println("1->2 JavaRDD<Person> -> Dataset<Person>");
personDS.show();
personDS.printSchema();
Print结果:
1->2 JavaRDD<Person> -> Dataset<Person>
+---+-------+
|age| name|
+---+-------+
| 32| Andy|
| 23|Michael|
| 19| Justin|
+---+-------+
|-- age: integer (nullable = true)
|-- name: string (nullable = true)
JavaRDD<Person> -> Dataset<Row>
personDF = spark.createDataFrame(personJavaRDD, Person.class);
System.out.println("1->3 JavaRDD<Person> -> Dataset<Row>");
personDF.show();
personDF.printSchema();
Print结果:
1->3 JavaRDD<Person> -> Dataset<Row>
+---+-------+
|age| name|
+---+-------+
| 32| Andy|
| 23|Michael|
| 19| Justin|
+---+-------+
|-- age: integer (nullable = false)
|-- name: string (nullable = true)
补充从JavaRDD<Row>到Dataset<Row>
JavaRDD<Row> personRowRdd = personJavaRDD.map(person -> RowFactory.create(person.age, person.name));
List<StructField> fieldList = new ArrayList<>();
fieldList.add(DataTypes.createStructField("age", DataTypes.IntegerType, false));
fieldList.add(DataTypes.createStructField("name", DataTypes.StringType, false));
StructType rowAgeNameSchema = DataTypes.createStructType(fieldList);
personDF = spark.createDataFrame(personRowRdd, rowAgeNameSchema);
System.out.println("\n\n\n补充,由JavaRDD<Row> -> Dataset<Row>");
personDF.show();
personDF.printSchema();
主要就是使用RowFactory把Row中的每一项写好后,通过spark的createDataFrame来创建。其中对于Row的解读包含在了自建的StructType中。
Dataset<Person> -> JavaRDD<Person>
personJavaRDD = personDS.toJavaRDD();
System.out.println("2->1 Dataset<Person> -> JavaRDD<Person>");
personJavaRDD.foreach(element -> System.out.println(element.toString()));
Print结果:
2->1 Dataset<Person> -> JavaRDD<Person>
Person: name = Justin, age = 19
Person: name = Andy, age = 32
Person: name = Michael, age = 23
Dataset<Row> -> JavaRDD<Person>
personJavaRDD = personDF.toJavaRDD().map(row -> {
String name = row.getAs("name");
int age = row.getAs("age");
return new Person(name, age);
System.out.println("3->1 Dataset<Row> -> JavaRDD<Person>");
personJavaRDD.foreach(element -> System.out.println(element.toString()));
Print结果:
3->1 Dataset<Row> -> JavaRDD<Person>
Person: name = Justin, age = 19
Person: name = Michael, age = 23
Person: name = Andy, age = 32
Dataset<Person> -> Dataset<Row>
List<StructField> fieldList = new ArrayList<>();
fieldList.add(DataTypes.createStructField("name", DataTypes.StringType, false));
fieldList.add(DataTypes.createStructField("age", DataTypes.IntegerType, false));
StructType rowSchema = DataTypes.createStructType(fieldList);
ExpressionEncoder<Row> rowEncoder = RowEncoder.apply(rowSchema);
Dataset<Row> personDF_fromDS = personDS.map(
(MapFunction<Person, Row>) person -> {
List<Object> objectList = new ArrayList<>();
objectList.add(person.name);
objectList.add(person.age);
return RowFactory.create(objectList.toArray());
rowEncoder
System.out.println("2->3 Dataset<Person> -> Dataset<Row>");
personDF_fromDS.show();
personDF_fromDS.printSchema();
Print结果:
2->3 Dataset<Person> -> Dataset<Row>
+---+-------+
|age| name|
+---+-------+
| 32| Andy|
| 23|Michael
| 19| Justin|
+---+-------+
|-- age: integer (nullable = false)
|-- name: string (nullable = true)
Dataset<Row> -> Dataset<Person>
personDS = personDF.map(new MapFunction<Row, Person>() {
@Override
public Person call(Row value) throws Exception {
return new Person(value.getAs("name"), value.getAs("age"));
}, personEncoder);
System.out.println("3->2 Dataset<Row> -> Dataset<Person>");
personDS.show();
personDS.printSchema();
Print结果:
3->2 Dataset<Row> -> Dataset<Person>
+---+-------+
|age| name|
+---+-------+
| 32| Andy|
| 23|Michael|
| 19| Justin|
+---+-------+
|-- age: integer (nullable = true)
|-- name: string (nullable = true)
其实RDD的Map和Dataset的Map只有一点不同,就是Dataset的Map要指定一个Encoder的参数。