添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

1 概览

(1) SQL

查询结果以Dataset/DataFrame形式返回。

可以通过命令行 command-line JDBC/ODBC 连接

(2) Dataset和DataFrame

1) Dataset

DataSet是分布式数据集合。

可从JVM对象生成。

Python和R暂时只能间接实现类似功能。

2) DataFrame

是使用命名的列组织的Dataset.

等同于关系型数据库中的表和Pathon/R中的data frame.

Scala中表示为Dataset[Row]的别名DataFrame,Java中表示为Dataset\

2 入门

(1) Spark Session

SparkSession是Spark SQL的入口。

在版本2.0中提供了Hive支持。如使用HiveQL查询、访问Hive UDF和从Hive表中读取数据。

1
2
3
4
5
6
7
8
9
10
import org.apache.spark.sql.SparkSession

val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

(2) 创建DataFrame

从JSON数据中创建

1
2
3
4
5
6
7
8
9
10
11
val df = spark.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+

(3) DataFrame操作

即无类型数据集操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+

// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+

(4) 编码SQL查询

sql语句作为参数传递。示例如下

1
2
3
4
5
6
7
8
9
10
11
12
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+

(5) 全局临时视图

通常,临时视图的生命周期仅限于当前会话。

全局临时视图将视图绑定到系统保存的数据库 global_temp 。使用时必须使用限定的名称引用。

全局临时视图可以在会话间共享。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+

(6) 创建Dataset

Dataset适用于RDD不同的序列化方式 Encoder ,能够直接过滤、排序或哈希,而不需要反序列化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+

(7) RDD间操作

RDD转换为DataFrame

适用于运行前已知模式

使用样例类

适用于运行前未知模式

使用StructureType

1) 使用反射推断模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// For implicit conversions from RDDs to DataFrames
import spark.implicits._

// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
.textFile("examples/src/main/resources/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

// The columns of a row in the result can be accessed by field index
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+

// or by field name
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+

// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))

2) 指定模式

构建模式。

  • 将记录转换为行RDD
  • 应用模式到行RDD
  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    import org.apache.spark.sql.types._

    // Create an RDD
    val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

    // 构建模式
    // The schema is encoded in a string
    val schemaString = "name age"

    // Generate the schema based on the string of schema
    val fields = schemaString.split(" ")
    .map(fieldName => StructField(fieldName, StringType, nullable = true))
    val schema = StructType(fields)
    // 记录转换为行
    // Convert records of the RDD (people) to Rows
    val rowRDD = peopleRDD
    .map(_.split(","))
    .map(attributes => Row(attributes(0), attributes(1).trim))

    // 应用模式
    // Apply the schema to the RDD
    val peopleDF = spark.createDataFrame(rowRDD, schema)

    // Creates a temporary view using the DataFrame
    peopleDF.createOrReplaceTempView("people")

    // SQL can be run over a temporary view created using DataFrames
    val results = spark.sql("SELECT name FROM people")

    // The results of SQL queries are DataFrames and support all the normal RDD operations
    // The columns of a row in the result can be accessed by field index or by field name
    results.map(attributes => "Name: " + attributes(0)).show()
    // +-------------+
    // | value|
    // +-------------+
    // |Name: Michael|
    // | Name: Andy|
    // | Name: Justin|
    // +-------------+

    (8) 聚合

    1) 无类型用户自定义聚合函数

    通过实现抽象类 UserDefinedAggregateFunction

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    import org.apache.spark.sql.{Row, SparkSession}
    import org.apache.spark.sql.expressions.MutableAggregationBuffer
    import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
    import org.apache.spark.sql.types._

    // 实现UserDefinedAggregateFunction
    object MyAverage extends UserDefinedAggregateFunction {
    // Nil表示空集合,::表示前者(元素)追加到后者(集合)
    // Data types of input arguments of this aggregate function
    def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil )
    // Data types of values in the aggregation buffer
    def bufferSchema: StructType = {
    StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
    }
    // The data type of the returned value
    def dataType: DataType = DoubleType
    // Whether this function always returns the same output on the identical input
    def deterministic: Boolean = true
    // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
    // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
    // the opportunity to update its values. Note that arrays and maps inside the buffer are still
    // immutable.
    def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0L
    buffer(1) = 0L
    }
    // 使用输入数据更新
    // Updates the given aggregation buffer `buffer` with new input data from `input`
    def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (!input.isNullAt(0)) {
    buffer(0) = buffer.getLong(0) + input.getLong(0)
    buffer(1) = buffer.getLong(1) + 1
    }
    }
    // Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
    def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
    }
    // Calculates the final result
    def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
    }

    // Register the function to access it
    spark.udf.register("myAverage", MyAverage)

    val df = spark.read.json("examples/src/main/resources/employees.json")
    df.createOrReplaceTempView("employees")
    df.show()
    // +-------+------+
    // | name|salary|
    // +-------+------+
    // |Michael| 3000|
    // | Andy| 4500|
    // | Justin| 3500|
    // | Berta| 4000|
    // +-------+------+

    val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
    result.show()
    // +--------------+
    // |average_salary|
    // +--------------+
    // | 3750.0|
    // +--------------+

    2) 类型安全用户自定义聚合函数

    用于与强类型数据集交互

    通过实现 Aggregator

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    import org.apache.spark.sql.expressions.Aggregator

    case class Employee(name: String, salary: Long)
    case class Average(var sum: Long, var count: Long)

    // 实现Aggregator
    object MyAverage extends Aggregator[Employee, Average, Double] {
    // A zero value for this aggregation. Should satisfy the property that any b + zero = b
    def zero: Average = Average(0L, 0L)
    // Combine two values to produce a new value. For performance, the function may modify `buffer`
    // and return it instead of constructing a new object
    def reduce(buffer: Average, employee: Employee): Average = {
    buffer.sum += employee.salary
    buffer.count += 1
    buffer
    }
    // Merge two intermediate values
    def merge(b1: Average, b2: Average): Average = {
    b1.sum += b2.sum
    b1.count += b2.count
    b1
    }
    // Transform the output of the reduction
    def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
    // Specifies the Encoder for the intermediate value type
    def bufferEncoder: Encoder[Average] = Encoders.product
    // Specifies the Encoder for the final output value type
    def outputEncoder: Encoder[Double] = Encoders.scalaDouble
    }

    val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
    ds.show()
    // +-------+------+
    // | name|salary|
    // +-------+------+
    // |Michael| 3000|
    // | Andy| 4500|
    // | Justin| 3500|
    // | Berta| 4000|
    // +-------+------+

    // Convert the function to a `TypedColumn` and give it a name
    val averageSalary = MyAverage.toColumn.name("average_salary")
    val result = ds.select(averageSalary)
    result.show()
    // +--------------+
    // |average_salary|
    // +--------------+
    // | 3750.0|
    // +--------------+

    3 数据源

    将DataFrame注册为临时视图,可以在其上进行SQL查询。

    (1) 通用加载/保存函数

    默认使用parquet作为数据源,可通过spark.sql.sources.default修改。

    1
    2
    val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
    usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

    1) 人工指定选项

    数据源通过全限定名指定(如org.apache.spark.sql.parquet),内建的数据源可以使用短名称(如json, parquet, jdbc, orc, libsvm, csv, text)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // JSON
    val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
    peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

    // CSV
    val peopleDFCsv = spark.read.format("csv")
    .option("sep", ";")
    .option("inferSchema", "true")
    .option("header", "true")
    .load("examples/src/main/resources/people.csv")

    2) 直接在文件上执行SQL

    不用加载到DataFrame,就可以执行SQL查询

    1
    val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

    3) 保存模式

    指定数据存在时的行为。

    以下行为不是原子的,也没有使用锁。

    覆盖时,先删除再写入。

    4) 保存到持久化表中

    持久化表在Spark程序重启后依旧可用,不同于临时视图。

    DataFrame可以使用saveAsTable方法持久化到Hive Metastore中。

    通过SparkSession的table方法按名称调用持久化表。

    不需要单独部署Hive。Spark将创建使用Derby创建本地Hive Metastore。

    基于文件的数据源,可以指定路径。如df.write.option(“path”, “/some/path”).saveAsTable(“t”)。删除表后,路径和文件依旧存在。

    没有指定路径时,Spark将数据写入到仓库目录的默认表路径。删除表后,默认表路径也删除。

    版本>=2.1,持久化表具有存储在Hive Metastore中的分区元数据。可以:

  • 不需要在首次查询时扫描所有分区,因为Metastore可以只返回需要的分区。
  • 可以使用Hive DDL
  • 注意:创建外部数据源表(使用path选项)时,默认不聚集分区信息。可以使用MSCK REPAIR TABLE同步。

    5) 分组(Bucket)、排序和分区

    分组和排序只能用于持久化表。

    分组将数据分布在固定数量的桶中。

    分区对唯一值数量敏感,即对具有高基数的列的适用性有限。

    可以同时使用分组和分区。

    Spark 3.0

    partittionBy创建了分区发现一节中描述的目录结构,因此限制了高基数的列的可用性。

    bucketBy在固定数量桶中分散数据,可以用于唯一值数量没有边界的的场景

    注意:bucketBy使用了列的哈希值分桶,保证具有相同哈希值的记录在同一个桶中,可以避免shuffle。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")

    usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")

    // 同时使用分组和分区
    usersDF
    .write
    .partitionBy("favorite_color")
    .bucketBy(42, "name")
    .saveAsTable("users_partitioned_bucketed")

    (2) Parquet文件

    Parquet 是列格式的数据。自描述格式,保存有模式信息。

    当写入Parquet文件文件时,为了适配,将列自动转换为可为空。

    1) 数据加载

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    // Encoders for most common types are automatically provided by importing spark.implicits._
    import spark.implicits._

    val peopleDF = spark.read.json("examples/src/main/resources/people.json")

    // DataFrames can be saved as Parquet files, maintaining the schema information
    peopleDF.write.parquet("people.parquet")

    // Read in the parquet file created above
    // Parquet files are self-describing so the schema is preserved
    // The result of loading a Parquet file is also a DataFrame
    val parquetFileDF = spark.read.parquet("people.parquet")

    // Parquet files can also be used to create a temporary view and then used in SQL statements
    parquetFileDF.createOrReplaceTempView("parquetFile")
    val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
    namesDF.map(attributes => "Name: " + attributes(0)).show()
    // +------------+
    // | value|
    // +------------+
    // |Name: Justin|
    // +------------+

    2)分区发现

    Hive之类的系统使用表分区作为一种通用优化手段。

    分区表使用分区列将数据分散到不同的目录中。

    内建文件数据源(Text/CSV/JSON/ORC/Parquet)支持自动发现和推断分区信息。

    如使用gender和county作为分区列,分区表目录如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    path
    └── to
    └── table
    ├── gender=male
    │ ├── ...
    │ │
    │ ├── country=US
    │ │ └── data.parquet
    │ ├── country=CN
    │ │ └── data.parquet
    │ └── ...
    └── gender=female
    ├── ...

    ├── country=US
    │ └── data.parquet
    ├── country=CN
    │ └── data.parquet
    └── ...

    传递参数path/to/table到SparkSession.read.parquet或SparkSession.read.load,可以自动从路径中提取分区和模式信息。提取的模式信息如下:

    1
    2
    3
    4
    5
    root
    |-- name: string (nullable = true)
    |-- age: long (nullable = true)
    |-- gender: string (nullable = true)
    |-- country: string (nullable = true)

    分区列数据类型当前支持数值、日期、时间戳和字符串类型。

    通过spark.sql.sources.partitionColumnTypeInference.enabled关闭自动推断。关闭后,将使用字符串作为分区列类型。

    版本>=1.6,默认只发现传递的目录参数中的分区。子目录也不能发现。可以通过数据源的basePath选项更改。

    3) 模式合并

    自动检测并合并兼容的Parquet数据源。

    版本>=1.5,默认关闭。

    开启方法:

    1 读取时,设置数据源选项mergeSchema为true

    2 设置全局SQL选项spark.sql.parquet.mergeSchema为true

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    // This is used to implicitly convert an RDD to a DataFrame.
    import spark.implicits._

    // Create a simple DataFrame, store into a partition directory
    val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
    squaresDF.write.parquet("data/test_table/key=1")

    // Create another DataFrame in a new partition directory,
    // adding a new column and dropping an existing column
    val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
    cubesDF.write.parquet("data/test_table/key=2")

    // Read the partitioned table
    val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
    mergedDF.printSchema()

    // The final schema consists of all 3 columns in the Parquet files together
    // with the partitioning column appeared in the partition directory paths
    // root
    // |-- value: int (nullable = true)
    // |-- square: int (nullable = true)
    // |-- cube: int (nullable = true)
    // |-- key: int (nullable = true)

    4) Hive元数据存储Parquet表转换

    为了性能,Spark使用自身的Parquet支持替代Hive的SerDe。通过spark.sql.hive.convertMetastoreParquet开关。

    1’ Hive/Parquet模式调和(reconciliation)

    Hive与Parquet模式区别:

  • Hive大小写敏感,而Parquet不是。
  • Hive所有列是可为空的,而Parquet不是。
  • 调和规则:

  • 同名列必须数据类型相同,并且是否为空和Parquet相同。
  • 字段与Hive Metastore保持一致
  •