添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
精彩文章免费看

Spark开发--Spark SQL--内置函数(十二)

文档地址: http://spark.apache.org/docs/latest/api/sql/index.html

一、常用函数

org.apache.spark.sql.functions类。
内置函数基本都在这个类里面。包括聚合函数,集合函数,日期时间函数,字符串函数,数学函数,排序函数,窗口函数等。约有299个函数。
测试数据:

{"EMPNO": 7369,"ENAME": "SMITH","JOB": "CLERK","MGR": 7902,"HIREDATE": "1980-12-17 00:00:00","SAL": 800.00,"COMM": null,"DEPTNO": 20}
{"EMPNO": 7499,"ENAME": "ALLEN","JOB": "SALESMAN","MGR": 7698,"HIREDATE": "1981-02-20 00:00:00","SAL": 1600.00,"COMM": 300.00,"DEPTNO": 30}
{"EMPNO": 7521,"ENAME": "WARD","JOB": "SALESMAN","MGR": 7698,"HIREDATE": "1981-02-22 00:00:00","SAL": 1250.00,"COMM": 500.00,"DEPTNO": 30}
{"EMPNO": 7566,"ENAME": "JONES","JOB": "MANAGER","MGR": 7839,"HIREDATE": "1981-04-02 00:00:00","SAL": 2975.00,"COMM": null,"DEPTNO": 20}
{"EMPNO": 7654,"ENAME": "MARTIN","JOB": "SALESMAN","MGR": 7698,"HIREDATE": "1981-09-28 00:00:00","SAL": 1250.00,"COMM": 1400.00,"DEPTNO": 30}
{"EMPNO": 7698,"ENAME": "BLAKE","JOB": "MANAGER","MGR": 7839,"HIREDATE": "1981-05-01 00:00:00","SAL": 2850.00,"COMM": null,"DEPTNO": 30}
{"EMPNO": 7782,"ENAME": "CLARK","JOB": "MANAGER","MGR": 7839,"HIREDATE": "1981-06-09 00:00:00","SAL": 2450.00,"COMM": null,"DEPTNO": 10}
{"EMPNO": 7788,"ENAME": "SCOTT","JOB": "ANALYST","MGR": 7566,"HIREDATE": "1987-04-19 00:00:00","SAL": 1500.00,"COMM": null,"DEPTNO": 20}
{"EMPNO": 7839,"ENAME": "KING","JOB": "PRESIDENT","MGR": null,"HIREDATE": "1981-11-17 00:00:00","SAL": 5000.00,"COMM": null,"DEPTNO": 10}
{"EMPNO": 7844,"ENAME": "TURNER","JOB": "SALESMAN","MGR": 7698,"HIREDATE": "1981-09-08 00:00:00","SAL": 1500.00,"COMM": 0.00,"DEPTNO": 30}
{"EMPNO": 7876,"ENAME": "ADAMS","JOB": "CLERK","MGR": 7788,"HIREDATE": "1987-05-23 00:00:00","SAL": 1100.00,"COMM": null,"DEPTNO": 20}
{"EMPNO": 7900,"ENAME": "JAMES","JOB": "CLERK","MGR": 7698,"HIREDATE": "1981-12-03 00:00:00","SAL": 950.00,"COMM": null,"DEPTNO": 30}
{"EMPNO": 7902,"ENAME": "FORD","JOB": "ANALYST","MGR": 7566,"HIREDATE": "1981-12-03 00:00:00","SAL": 3000.00,"COMM": null,"DEPTNO": 20}
{"EMPNO": 7934,"ENAME": "MILLER","JOB": "CLERK","MGR": 7782,"HIREDATE": "1982-01-23 00:00:00","SAL": 1300.00,"COMM": null,"DEPTNO": 10}

测试方法:
以编程的方式使用内置函数:

import org.apache.spark.sql.functions._
scala> df.select(lower(col("ename")).as("name"), col("sal")).show()
+------+------+
|  name|   sal|
+------+------+
| smith| 800.0|
| allen|1600.0|
|miller|1300.0|
+------+------+

以SQL的方式使用:

df.createOrReplaceTempView("emp")
scala> spark.sql("select lower(ename) as name,sal from emp").show()
+------+------+
|  name|   sal|
+------+------+
| smith| 800.0|
| allen|1600.0|
| james| 950.0|
|  ford|3000.0|
|miller|1300.0|
+------+------+

1. 字符串函数

1) concat对于字符串进行拼接

concat(str1, str2, ..., strN) - Returns the concatenation of str1, str2, ..., strN.
scala>  spark.sql("SELECT concat('Spark', 'SQL')").show
+------------------+
|concat(Spark, SQL)|
+------------------+
|          SparkSQL|
+------------------+

2) concat_ws在拼接的字符串中间添加某种格式

concat_ws(sep, [str | array(str)]+) - Returns the concatenation of the strings separated by sep.
# 在单词之间添加空格。
scala>  spark.sql("SELECT concat_ws(' ', 'Spark', 'SQL')").show
+------------------------+
|concat_ws( , Spark, SQL)|
+------------------------+
|               Spark SQL|
+------------------------+

3) decode转码

decode(bin, charset) - Decodes the first argument using the second argument character set.
scala>  spark.sql("SELECT decode(encode('abc', 'utf-8'), 'utf-8')").show
+---------------------------------+
|decode(encode(abc, utf-8), utf-8)|
+---------------------------------+
|                              abc|
+---------------------------------+

4) encode设置编码格式

encode(str, charset) - Encodes the first argument using the second argument character set.
scala>  spark.sql("SELECT encode('abc', 'utf-8')").show
+------------------+
|encode(abc, utf-8)|
+------------------+
|        [61 62 63]|
+------------------+

5) format_string/printf 格式化字符串

format_string(strfmt, obj, ...) - Returns a formatted string from printf-style format strings.
scala>  spark.sql("SELECT format_string('Hello World %d %s', 100, 'days')").show
+-------------------------------------------+
|format_string(Hello World %d %s, 100, days)|
+-------------------------------------------+
|                       Hello World 100 days|
+-------------------------------------------+

6) initcap将每个单词的首字母变为大写,其他字母小写; lower全部转为小写,upper大写

initcap(str) - Returns str with the first letter of each word in uppercase. All other letters are in lowercase. Words are delimited by white space.
scala>  spark.sql("SELECT initcap('sPark sql')").show
+------------------+
|initcap(sPark sql)|
+------------------+
|         Spark Sql|
+------------------+

7) length返回字符串的长度

scala>  spark.sql("SELECT length('Spark SQL ')").show
+------------------+
|length(Spark SQL )|
+------------------+
|                10|
+------------------+

8) levenshtein编辑距离(将一个字符串变为另一个字符串的距离)

levenshtein(str1, str2) - Returns the Levenshtein distance between the two given strings.
scala>  spark.sql("SELECT levenshtein('kitten', 'sitting')").show
+----------------------------+
|levenshtein(kitten, sitting)|
+----------------------------+
|                           3|
+----------------------------+

9) lpad返回固定长度的字符串,如果长度不够,用某种字符补全,rpad右补全

lpad(str, len, pad) - Returns str, left-padded with pad to a length of len. If str is longer than len, the return value is shortened to len characters.
scala>  spark.sql("SELECT lpad('hi', 5, '??')").show
+---------------+
|lpad(hi, 5, ??)|
+---------------+
|          ???hi|
+---------------+

10) ltrim去除空格或去除开头的某些字符,rtrim右去除,trim两边同时去除

ltrim(str) - Removes the leading space characters from str.
ltrim(trimStr, str) - Removes the leading string contains the characters from the trim string
scala>  spark.sql("SELECT ltrim('    SparkSQL   ')").show
+----------------------+
|ltrim(    SparkSQL   )|
+----------------------+
|           SparkSQL   |
+----------------------+
scala>  spark.sql("SELECT ltrim('Sp', 'SSparkSQLS')").show
+---------------------+
|ltrim(SSparkSQLS, Sp)|
+---------------------+
|              arkSQLS|
+---------------------+

11) regexp_extract 正则提取某些字符串,regexp_replace正则替换

# \转义需\\\\
scala>  spark.sql("SELECT regexp_extract('100-200', '(\\\\d+)-(\\\\d+)', 1) as str").show
+---+
|str|
+---+
|100|
+---+
scala>  spark.sql("SELECT regexp_extract('foothebar', 'foo(.*?)(bar)',1) as str").show
+---+
|str|
+---+
|the|
+---+
spark.sql("SELECT regexp_replace('100-200', '(\\\\d+)', 'num') as str").show

12) repeat复制给的字符串n次

scala>  spark.sql("SELECT repeat('123', 2)").show
+--------------+
|repeat(123, 2)|
+--------------+
|        123123|
+--------------+

13) instr返回截取字符串的位置/locate

instr(str, substr) - Returns the (1-based) index of the first occurrence of substr in str.
scala>  spark.sql("SELECT instr('SparkSQL', 'SQL')").show
+--------------------+
|instr(SparkSQL, SQL)|
+--------------------+
|                   6|
+--------------------+
scala>  spark.sql(" SELECT locate('bar', 'foobarbar')").show
+-------------------------+
|locate(bar, foobarbar, 1)|
+-------------------------+
|                        4|
+-------------------------+

14) space 在字符串前面加n个空格

space(n) - Returns a string consisting of n spaces.
scala>  spark.sql("SELECT concat(space(2), '1')").show
+-------------------+
|concat(space(2), 1)|
+-------------------+
|                  1|
+-------------------+

15) split以某些字符拆分字符串

split(str, regex) - Splits str around occurrences that match regex.
scala>  spark.sql("SELECT split('oneAtwoBthreeC', '[ABC]')").show
+----------------------------+
|split(oneAtwoBthreeC, [ABC])|
+----------------------------+
|         [one, two, three, ]|
+----------------------------+

16) substr截取字符串,substring_index

scala>  spark.sql("SELECT substr('Spark SQL', 5)").show
+-----------------------------------+
|substring(Spark SQL, 5, 2147483647)|
+-----------------------------------+
|                              k SQL|
+-----------------------------------+
scala>  spark.sql("SELECT substr('Spark SQL', -3)").show
+------------------------------------+
|substring(Spark SQL, -3, 2147483647)|
+------------------------------------+
|                                 SQL|
+------------------------------------+
scala>  spark.sql("SELECT substr('Spark SQL', 5, 1)").show
+--------------------------+
|substring(Spark SQL, 5, 1)|
+--------------------------+
|                         k|
+--------------------------+
scala>  spark.sql("SELECT substring_index('www.apache.org', '.', 2)").show
+-------------------------------------+
|substring_index(www.apache.org, ., 2)|
+-------------------------------------+
|                           www.apache|
+-------------------------------------+

17) translate 替换某些字符串为

scala>  spark.sql("SELECT translate('AaBbCc', 'abc', '123')").show
+---------------------------+
|translate(AaBbCc, abc, 123)|
+---------------------------+
|                     A1B2C3|
+---------------------------+

18) get_json_object

get_json_object(json_txt, path) - Extracts a json object from path.
scala>  spark.sql("SELECT get_json_object('{\"a\":\"b\"}', '$.a')").show
+-------------------------------+
|get_json_object({"a":"b"}, $.a)|
+-------------------------------+
|                              b|
+-------------------------------+

19) .unhex

unhex(expr) - Converts hexadecimal expr to binary.
scala>  spark.sql("SELECT decode(unhex('537061726B2053514C'), 'UTF-8')").show
+----------------------------------------+
|decode(unhex(537061726B2053514C), UTF-8)|
+----------------------------------------+
|                               Spark SQL|
+----------------------------------------+

20) .to_json

to_json(expr[, options]) - Returns a json string with a given struct value
scala>  spark.sql("SELECT to_json(named_struct('a', 1, 'b', 2))").show
+---------------------------------------+
|structstojson(named_struct(a, 1, b, 2))|
+---------------------------------------+
|                          {"a":1,"b":2}|
+---------------------------------------+
scala>  spark.sql("SELECT to_json(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy'))").show(false)
+---------------------------------------------------------------------------+
|structstojson(named_struct(time, to_timestamp('2015-08-26', 'yyyy-MM-dd')))|
+---------------------------------------------------------------------------+
|{"time":"26/08/2015"}                                                      |
+---------------------------------------------------------------------------+
#  列表
scala>  spark.sql("SELECT to_json(array(named_struct('a', 1, 'b', 2)))").show
+----------------------------------------------+
|structstojson(array(named_struct(a, 1, b, 2)))|
+----------------------------------------------+
|                               [{"a":1,"b":2}]|
+----------------------------------------------+
# 对象嵌套
scala>  spark.sql("SELECT to_json(map('a', named_struct('b', 1)))").show
+-----------------------------------------+
|structstojson(map(a, named_struct(b, 1)))|
+-----------------------------------------+
|                            {"a":{"b":1}}|
+-----------------------------------------+
scala>  spark.sql("SELECT to_json(map(named_struct('a', 1),named_struct('b', 2)))").show
+----------------------------------------------------------+
|structstojson(map(named_struct(a, 1), named_struct(b, 2)))|
+----------------------------------------------------------+
|                                           {"[1]":{"b":2}}|
+----------------------------------------------------------+
# 数值类型
scala>  spark.sql("SELECT to_json(map('a', 1))").show
+------------------------+
|structstojson(map(a, 1))|
+------------------------+
|                 {"a":1}|
+------------------------+
scala>  spark.sql("SELECT to_json(array((map('a', 1))))").show
+-------------------------------+
|structstojson(array(map(a, 1)))|
+-------------------------------+
|                      [{"a":1}]|
+-------------------------------+

2. 聚合函数

数据使用测试数据。
创建DF:

// 需要导入 spark sql 内置的函数包
import org.apache.spark.sql.functions._
// val spark = SparkSession.builder().appName("aggregations").master("local[2]").getOrCreate()
val empDF = spark.read.json("/usr/file/json/emp.json")
// 注册为临时视图
empDF.createOrReplaceTempView("emp")
empDF.show()
1) avg与mean

内置的求平均数的函数。

scala> spark.sql("select id from aaa").select(avg("id")).show
scala> spark.sql("select id from aaa").select(mean("id")).show
empDF.select(avg("sal")).show()
2) count
scala> spark.sql("select id from aaa").select(count("id")).show
// 计算员工人数
empDF.select(count("ename")).show()
3) countDistinct
scala> spark.sql("select id from aaa").select(countDistinct("id")).show
// 计算姓名不重复的员工人数
empDF.select(countDistinct("deptno")).show()
4) min & max

最小值 & 最大值
获取 DataFrame 中指定列的最小值或者最大值。

scala> spark.sql("select id from aaa").select(max("id")).show
scala> spark.sql("select id from aaa").select(min("id")).show
empDF.select(min("sal"),max("sal")).show()

5)sum & sumDistinct

求和以及求指定列所有不相同的值的和。

scala> spark.sql("select id from aaa").select(sum("id")).show
empDF.select(sum("sal")).show()
empDF.select(sumDistinct("sal")).show()
6) approx_count_distinct计算去重后的值的大约个数

通常在使用大型数据集时,你可能关注的只是近似值而不是准确值,这时可以使用 approx_count_distinct 函数,并可以使用第二个参数指定最大允许误差。
jdz参数是用来规定值之间的相似度(精度值),值与值之间的相似度(精度)达到 jdz, 则将其看作是一样的值。jdz越小说明值与值之间越相似。控制在 0到0.4(不包括)之间。

scala> spark.sql("select id from aaa").select(approx_count_distinct ("id",0.1)).show
empDF.select(approx_count_distinct ("ename",0.1)).show()
7) first & last

获取 DataFrame 中指定列的第一个值或者最后一个值。

empDF.select(first("ename"),last("job")).show()
8) 聚合数据到集合
scala>  empDF.agg(collect_set("job"), collect_list("ename")).show()
+--------------------+--------------------+
|    collect_set(job)| collect_list(ename)|
+--------------------+--------------------+
|[MANAGER, SALESMA...|[SMITH, ALLEN, WA...|
+--------------------+--------------------+

3. 数学函数

Spark SQL 中还支持多种数学聚合函数,用于通常的数学计算,以下是一些常用的例子:

// 计算总体方差var_pop、均方差var_samp、总体标准差stddev_pop、样本标准差stddev_samp
empDF.select(var_pop("sal"), var_samp("sal"), stddev_pop("sal"), stddev_samp("sal")).show()
2)计算偏度和峰度
// 计算偏度skewness和峰度kurtosis
empDF.select(skewness("sal"), kurtosis("sal")).show()
3)皮尔逊相关系数
// 计算两列的皮尔逊相关系数、样本协方差、总体协方差。
(这里只是演示,员工编号和薪资两列实际上并没有什么关联关系)
empDF.select(corr("empno", "sal"), covar_samp("empno", "sal"),covar_pop("empno", "sal")).show()

4. 分组聚合

1) 简单分组
empDF.groupBy("deptno", "job").count().show()
//等价 SQL
spark.sql("SELECT deptno, job, count(*) FROM emp GROUP BY deptno, job").show()
+------+---------+-----+
|deptno|      job|count|
+------+---------+-----+
|    10|PRESIDENT|    1|
|    30|    CLERK|    1|
|    10|  MANAGER|    1|
|    30|  MANAGER|    1|
|    20|    CLERK|    2|
|    30| SALESMAN|    4|
|    20|  ANALYST|    2|
|    10|    CLERK|    1|
|    20|  MANAGER|    1|
+------+---------+-----+
2) 分组聚合
empDF.groupBy("deptno").agg(count("ename").alias("人数"), sum("sal").alias("总工资")).show()
// 等价语法
empDF.groupBy("deptno").agg("ename"->"count","sal"->"sum").show()
// 等价 SQL
spark.sql("SELECT deptno, count(ename) ,sum(sal) FROM emp GROUP BY deptno").show()
+------+----+------+
|deptno|人数|总工资|
+------+----+------+
|    10|   3|8750.0|
|    30|   6|9400.0|
|    20|   5|9375.0|
+------+----+------+
最后编辑于:2020-04-03 21:32