Spark开发--Spark SQL--内置函数(十二)
文档地址: http://spark.apache.org/docs/latest/api/sql/index.html
一、常用函数
org.apache.spark.sql.functions类。
内置函数基本都在这个类里面。包括聚合函数,集合函数,日期时间函数,字符串函数,数学函数,排序函数,窗口函数等。约有299个函数。
测试数据:
{"EMPNO": 7369,"ENAME": "SMITH","JOB": "CLERK","MGR": 7902,"HIREDATE": "1980-12-17 00:00:00","SAL": 800.00,"COMM": null,"DEPTNO": 20}
{"EMPNO": 7499,"ENAME": "ALLEN","JOB": "SALESMAN","MGR": 7698,"HIREDATE": "1981-02-20 00:00:00","SAL": 1600.00,"COMM": 300.00,"DEPTNO": 30}
{"EMPNO": 7521,"ENAME": "WARD","JOB": "SALESMAN","MGR": 7698,"HIREDATE": "1981-02-22 00:00:00","SAL": 1250.00,"COMM": 500.00,"DEPTNO": 30}
{"EMPNO": 7566,"ENAME": "JONES","JOB": "MANAGER","MGR": 7839,"HIREDATE": "1981-04-02 00:00:00","SAL": 2975.00,"COMM": null,"DEPTNO": 20}
{"EMPNO": 7654,"ENAME": "MARTIN","JOB": "SALESMAN","MGR": 7698,"HIREDATE": "1981-09-28 00:00:00","SAL": 1250.00,"COMM": 1400.00,"DEPTNO": 30}
{"EMPNO": 7698,"ENAME": "BLAKE","JOB": "MANAGER","MGR": 7839,"HIREDATE": "1981-05-01 00:00:00","SAL": 2850.00,"COMM": null,"DEPTNO": 30}
{"EMPNO": 7782,"ENAME": "CLARK","JOB": "MANAGER","MGR": 7839,"HIREDATE": "1981-06-09 00:00:00","SAL": 2450.00,"COMM": null,"DEPTNO": 10}
{"EMPNO": 7788,"ENAME": "SCOTT","JOB": "ANALYST","MGR": 7566,"HIREDATE": "1987-04-19 00:00:00","SAL": 1500.00,"COMM": null,"DEPTNO": 20}
{"EMPNO": 7839,"ENAME": "KING","JOB": "PRESIDENT","MGR": null,"HIREDATE": "1981-11-17 00:00:00","SAL": 5000.00,"COMM": null,"DEPTNO": 10}
{"EMPNO": 7844,"ENAME": "TURNER","JOB": "SALESMAN","MGR": 7698,"HIREDATE": "1981-09-08 00:00:00","SAL": 1500.00,"COMM": 0.00,"DEPTNO": 30}
{"EMPNO": 7876,"ENAME": "ADAMS","JOB": "CLERK","MGR": 7788,"HIREDATE": "1987-05-23 00:00:00","SAL": 1100.00,"COMM": null,"DEPTNO": 20}
{"EMPNO": 7900,"ENAME": "JAMES","JOB": "CLERK","MGR": 7698,"HIREDATE": "1981-12-03 00:00:00","SAL": 950.00,"COMM": null,"DEPTNO": 30}
{"EMPNO": 7902,"ENAME": "FORD","JOB": "ANALYST","MGR": 7566,"HIREDATE": "1981-12-03 00:00:00","SAL": 3000.00,"COMM": null,"DEPTNO": 20}
{"EMPNO": 7934,"ENAME": "MILLER","JOB": "CLERK","MGR": 7782,"HIREDATE": "1982-01-23 00:00:00","SAL": 1300.00,"COMM": null,"DEPTNO": 10}
测试方法:
以编程的方式使用内置函数:
import org.apache.spark.sql.functions._
scala> df.select(lower(col("ename")).as("name"), col("sal")).show()
+------+------+
| name| sal|
+------+------+
| smith| 800.0|
| allen|1600.0|
|miller|1300.0|
+------+------+
以SQL的方式使用:
df.createOrReplaceTempView("emp")
scala> spark.sql("select lower(ename) as name,sal from emp").show()
+------+------+
| name| sal|
+------+------+
| smith| 800.0|
| allen|1600.0|
| james| 950.0|
| ford|3000.0|
|miller|1300.0|
+------+------+
1. 字符串函数
1) concat对于字符串进行拼接
concat(str1, str2, ..., strN) - Returns the concatenation of str1, str2, ..., strN.
scala> spark.sql("SELECT concat('Spark', 'SQL')").show
+------------------+
|concat(Spark, SQL)|
+------------------+
| SparkSQL|
+------------------+
2) concat_ws在拼接的字符串中间添加某种格式
concat_ws(sep, [str | array(str)]+) - Returns the concatenation of the strings separated by sep.
# 在单词之间添加空格。
scala> spark.sql("SELECT concat_ws(' ', 'Spark', 'SQL')").show
+------------------------+
|concat_ws( , Spark, SQL)|
+------------------------+
| Spark SQL|
+------------------------+
3) decode转码
decode(bin, charset) - Decodes the first argument using the second argument character set.
scala> spark.sql("SELECT decode(encode('abc', 'utf-8'), 'utf-8')").show
+---------------------------------+
|decode(encode(abc, utf-8), utf-8)|
+---------------------------------+
| abc|
+---------------------------------+
4) encode设置编码格式
encode(str, charset) - Encodes the first argument using the second argument character set.
scala> spark.sql("SELECT encode('abc', 'utf-8')").show
+------------------+
|encode(abc, utf-8)|
+------------------+
| [61 62 63]|
+------------------+
5) format_string/printf 格式化字符串
format_string(strfmt, obj, ...) - Returns a formatted string from printf-style format strings.
scala> spark.sql("SELECT format_string('Hello World %d %s', 100, 'days')").show
+-------------------------------------------+
|format_string(Hello World %d %s, 100, days)|
+-------------------------------------------+
| Hello World 100 days|
+-------------------------------------------+
6) initcap将每个单词的首字母变为大写,其他字母小写; lower全部转为小写,upper大写
initcap(str) - Returns str with the first letter of each word in uppercase. All other letters are in lowercase. Words are delimited by white space.
scala> spark.sql("SELECT initcap('sPark sql')").show
+------------------+
|initcap(sPark sql)|
+------------------+
| Spark Sql|
+------------------+
7) length返回字符串的长度
scala> spark.sql("SELECT length('Spark SQL ')").show
+------------------+
|length(Spark SQL )|
+------------------+
| 10|
+------------------+
8) levenshtein编辑距离(将一个字符串变为另一个字符串的距离)
levenshtein(str1, str2) - Returns the Levenshtein distance between the two given strings.
scala> spark.sql("SELECT levenshtein('kitten', 'sitting')").show
+----------------------------+
|levenshtein(kitten, sitting)|
+----------------------------+
| 3|
+----------------------------+
9) lpad返回固定长度的字符串,如果长度不够,用某种字符补全,rpad右补全
lpad(str, len, pad) - Returns str, left-padded with pad to a length of len. If str is longer than len, the return value is shortened to len characters.
scala> spark.sql("SELECT lpad('hi', 5, '??')").show
+---------------+
|lpad(hi, 5, ??)|
+---------------+
| ???hi|
+---------------+
10) ltrim去除空格或去除开头的某些字符,rtrim右去除,trim两边同时去除
ltrim(str) - Removes the leading space characters from str.
ltrim(trimStr, str) - Removes the leading string contains the characters from the trim string
scala> spark.sql("SELECT ltrim(' SparkSQL ')").show
+----------------------+
|ltrim( SparkSQL )|
+----------------------+
| SparkSQL |
+----------------------+
scala> spark.sql("SELECT ltrim('Sp', 'SSparkSQLS')").show
+---------------------+
|ltrim(SSparkSQLS, Sp)|
+---------------------+
| arkSQLS|
+---------------------+
11) regexp_extract 正则提取某些字符串,regexp_replace正则替换
# \转义需\\\\
scala> spark.sql("SELECT regexp_extract('100-200', '(\\\\d+)-(\\\\d+)', 1) as str").show
+---+
|str|
+---+
|100|
+---+
scala> spark.sql("SELECT regexp_extract('foothebar', 'foo(.*?)(bar)',1) as str").show
+---+
|str|
+---+
|the|
+---+
spark.sql("SELECT regexp_replace('100-200', '(\\\\d+)', 'num') as str").show
12) repeat复制给的字符串n次
scala> spark.sql("SELECT repeat('123', 2)").show
+--------------+
|repeat(123, 2)|
+--------------+
| 123123|
+--------------+
13) instr返回截取字符串的位置/locate
instr(str, substr) - Returns the (1-based) index of the first occurrence of substr in str.
scala> spark.sql("SELECT instr('SparkSQL', 'SQL')").show
+--------------------+
|instr(SparkSQL, SQL)|
+--------------------+
| 6|
+--------------------+
scala> spark.sql(" SELECT locate('bar', 'foobarbar')").show
+-------------------------+
|locate(bar, foobarbar, 1)|
+-------------------------+
| 4|
+-------------------------+
14) space 在字符串前面加n个空格
space(n) - Returns a string consisting of n spaces.
scala> spark.sql("SELECT concat(space(2), '1')").show
+-------------------+
|concat(space(2), 1)|
+-------------------+
| 1|
+-------------------+
15) split以某些字符拆分字符串
split(str, regex) - Splits str around occurrences that match regex.
scala> spark.sql("SELECT split('oneAtwoBthreeC', '[ABC]')").show
+----------------------------+
|split(oneAtwoBthreeC, [ABC])|
+----------------------------+
| [one, two, three, ]|
+----------------------------+
16) substr截取字符串,substring_index
scala> spark.sql("SELECT substr('Spark SQL', 5)").show
+-----------------------------------+
|substring(Spark SQL, 5, 2147483647)|
+-----------------------------------+
| k SQL|
+-----------------------------------+
scala> spark.sql("SELECT substr('Spark SQL', -3)").show
+------------------------------------+
|substring(Spark SQL, -3, 2147483647)|
+------------------------------------+
| SQL|
+------------------------------------+
scala> spark.sql("SELECT substr('Spark SQL', 5, 1)").show
+--------------------------+
|substring(Spark SQL, 5, 1)|
+--------------------------+
| k|
+--------------------------+
scala> spark.sql("SELECT substring_index('www.apache.org', '.', 2)").show
+-------------------------------------+
|substring_index(www.apache.org, ., 2)|
+-------------------------------------+
| www.apache|
+-------------------------------------+
17) translate 替换某些字符串为
scala> spark.sql("SELECT translate('AaBbCc', 'abc', '123')").show
+---------------------------+
|translate(AaBbCc, abc, 123)|
+---------------------------+
| A1B2C3|
+---------------------------+
18) get_json_object
get_json_object(json_txt, path) - Extracts a json object from path.
scala> spark.sql("SELECT get_json_object('{\"a\":\"b\"}', '$.a')").show
+-------------------------------+
|get_json_object({"a":"b"}, $.a)|
+-------------------------------+
| b|
+-------------------------------+
19) .unhex
unhex(expr) - Converts hexadecimal expr to binary.
scala> spark.sql("SELECT decode(unhex('537061726B2053514C'), 'UTF-8')").show
+----------------------------------------+
|decode(unhex(537061726B2053514C), UTF-8)|
+----------------------------------------+
| Spark SQL|
+----------------------------------------+
20) .to_json
to_json(expr[, options]) - Returns a json string with a given struct value
scala> spark.sql("SELECT to_json(named_struct('a', 1, 'b', 2))").show
+---------------------------------------+
|structstojson(named_struct(a, 1, b, 2))|
+---------------------------------------+
| {"a":1,"b":2}|
+---------------------------------------+
scala> spark.sql("SELECT to_json(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy'))").show(false)
+---------------------------------------------------------------------------+
|structstojson(named_struct(time, to_timestamp('2015-08-26', 'yyyy-MM-dd')))|
+---------------------------------------------------------------------------+
|{"time":"26/08/2015"} |
+---------------------------------------------------------------------------+
# 列表
scala> spark.sql("SELECT to_json(array(named_struct('a', 1, 'b', 2)))").show
+----------------------------------------------+
|structstojson(array(named_struct(a, 1, b, 2)))|
+----------------------------------------------+
| [{"a":1,"b":2}]|
+----------------------------------------------+
# 对象嵌套
scala> spark.sql("SELECT to_json(map('a', named_struct('b', 1)))").show
+-----------------------------------------+
|structstojson(map(a, named_struct(b, 1)))|
+-----------------------------------------+
| {"a":{"b":1}}|
+-----------------------------------------+
scala> spark.sql("SELECT to_json(map(named_struct('a', 1),named_struct('b', 2)))").show
+----------------------------------------------------------+
|structstojson(map(named_struct(a, 1), named_struct(b, 2)))|
+----------------------------------------------------------+
| {"[1]":{"b":2}}|
+----------------------------------------------------------+
# 数值类型
scala> spark.sql("SELECT to_json(map('a', 1))").show
+------------------------+
|structstojson(map(a, 1))|
+------------------------+
| {"a":1}|
+------------------------+
scala> spark.sql("SELECT to_json(array((map('a', 1))))").show
+-------------------------------+
|structstojson(array(map(a, 1)))|
+-------------------------------+
| [{"a":1}]|
+-------------------------------+
2. 聚合函数
数据使用测试数据。
创建DF:
// 需要导入 spark sql 内置的函数包
import org.apache.spark.sql.functions._
// val spark = SparkSession.builder().appName("aggregations").master("local[2]").getOrCreate()
val empDF = spark.read.json("/usr/file/json/emp.json")
// 注册为临时视图
empDF.createOrReplaceTempView("emp")
empDF.show()
1) avg与mean
内置的求平均数的函数。
scala> spark.sql("select id from aaa").select(avg("id")).show
scala> spark.sql("select id from aaa").select(mean("id")).show
empDF.select(avg("sal")).show()
2) count
scala> spark.sql("select id from aaa").select(count("id")).show
// 计算员工人数
empDF.select(count("ename")).show()
3) countDistinct
scala> spark.sql("select id from aaa").select(countDistinct("id")).show
// 计算姓名不重复的员工人数
empDF.select(countDistinct("deptno")).show()
4) min & max
最小值 & 最大值
获取 DataFrame 中指定列的最小值或者最大值。
scala> spark.sql("select id from aaa").select(max("id")).show
scala> spark.sql("select id from aaa").select(min("id")).show
empDF.select(min("sal"),max("sal")).show()
5)sum & sumDistinct
求和以及求指定列所有不相同的值的和。
scala> spark.sql("select id from aaa").select(sum("id")).show
empDF.select(sum("sal")).show()
empDF.select(sumDistinct("sal")).show()
6) approx_count_distinct计算去重后的值的大约个数
通常在使用大型数据集时,你可能关注的只是近似值而不是准确值,这时可以使用 approx_count_distinct 函数,并可以使用第二个参数指定最大允许误差。
jdz参数是用来规定值之间的相似度(精度值),值与值之间的相似度(精度)达到 jdz, 则将其看作是一样的值。jdz越小说明值与值之间越相似。控制在 0到0.4(不包括)之间。
scala> spark.sql("select id from aaa").select(approx_count_distinct ("id",0.1)).show
empDF.select(approx_count_distinct ("ename",0.1)).show()
7) first & last
获取 DataFrame 中指定列的第一个值或者最后一个值。
empDF.select(first("ename"),last("job")).show()
8) 聚合数据到集合
scala> empDF.agg(collect_set("job"), collect_list("ename")).show()
+--------------------+--------------------+
| collect_set(job)| collect_list(ename)|
+--------------------+--------------------+
|[MANAGER, SALESMA...|[SMITH, ALLEN, WA...|
+--------------------+--------------------+
3. 数学函数
Spark SQL 中还支持多种数学聚合函数,用于通常的数学计算,以下是一些常用的例子:
// 计算总体方差var_pop、均方差var_samp、总体标准差stddev_pop、样本标准差stddev_samp
empDF.select(var_pop("sal"), var_samp("sal"), stddev_pop("sal"), stddev_samp("sal")).show()
2)计算偏度和峰度
// 计算偏度skewness和峰度kurtosis
empDF.select(skewness("sal"), kurtosis("sal")).show()
3)皮尔逊相关系数
// 计算两列的皮尔逊相关系数、样本协方差、总体协方差。
(这里只是演示,员工编号和薪资两列实际上并没有什么关联关系)
empDF.select(corr("empno", "sal"), covar_samp("empno", "sal"),covar_pop("empno", "sal")).show()
4. 分组聚合
1) 简单分组
empDF.groupBy("deptno", "job").count().show()
//等价 SQL
spark.sql("SELECT deptno, job, count(*) FROM emp GROUP BY deptno, job").show()
+------+---------+-----+
|deptno| job|count|
+------+---------+-----+
| 10|PRESIDENT| 1|
| 30| CLERK| 1|
| 10| MANAGER| 1|
| 30| MANAGER| 1|
| 20| CLERK| 2|
| 30| SALESMAN| 4|
| 20| ANALYST| 2|
| 10| CLERK| 1|
| 20| MANAGER| 1|
+------+---------+-----+
2) 分组聚合
empDF.groupBy("deptno").agg(count("ename").alias("人数"), sum("sal").alias("总工资")).show()
// 等价语法
empDF.groupBy("deptno").agg("ename"->"count","sal"->"sum").show()
// 等价 SQL
spark.sql("SELECT deptno, count(ename) ,sum(sal) FROM emp GROUP BY deptno").show()
+------+----+------+
|deptno|人数|总工资|
+------+----+------+
| 10| 3|8750.0|
| 30| 6|9400.0|
| 20| 5|9375.0|
+------+----+------+