spark = SparkSession.builder \
.master("spark://localhost:7077") \
.appName("pyspark sql demo") \
.getOrCreate()
employeeDF = spark.createDataFrame([
("刘宏明", 31),
("赵薇", 33),
("黄海波", 33),
("杨幂", 34),
("楼一萱", 34),
("龙梅子", None)
], ["name","dept_no"])
deptDF = spark.createDataFrame([
(31, "销售部"),
(33, "工程部"),
(34, "财务部"),
(35, "市场营销部")
], ["id","name"])
employeeDF.join(deptDF, col("dept_no") == col("id"), "inner").show()
# 不需要指定该join类型,因为"inner"是默认的
# employeeDF.join(deptDF, col("dept_no") == col("id")).show()
# 使用SQL
# spark.sql("select * from employees JOIN departments on dept_no == id").show()
连接表达式可以在join()转换中指定,也可以使用where()变换。如果列名是唯一的,则可以使用简写引用join表达式中的列。如果没有,则需要通过使用col()函数指定特定列来自哪个DataFrame,代码如下:
# join表达式的简写版本
employeeDF.join(deptDF, col("dept_no") == col("id")).show()
# 指定特定列来自哪个DataFrame
employeeDF.join(deptDF, employeeDF.dept_no == deptDF.id).show()
# 使用where transformation指定join表达式
employeeDF.join(deptDF).where('dept_no == id').show()
这个join类型的连接后的数据集包括来自内连接的所有行加上来自左边数据集的连接表达式的计算结果为False的所有行。对于那些不匹配的行,它将为右边的数据集的列填充NULL值。例如,对employeeDF和deptDF执行左外连接,代码如下:
# 连接类型既可以是"left_outer",也可以是"leftouter"
employeeDF \
.join(deptDF, col("dept_no") == col("id"), "left_outer") \
.show()
# 或者使用SQL
spark.sql("""
select *
from employees
LEFT OUTER JOIN departments
on dept_no == id
""").show()
+--------+-------+----+------+
|emp_name|dept_no| id| name|
+--------+-------+----+------+
| 刘宏明| 31| 31| 销售部|
| 赵薇| 33| 33| 工程部|
| 黄海波| 33| 33| 工程部|
| 杨幂| 34| 34| 财务部|
| 楼一萱| 34| 34| 财务部|
| 龙梅子| 0|null| null|
+--------+-------+----+------+
这种join类型的行为类似于左外连接类型的行为,除了将相同的处理应用于右边的数据集之外。换句话说,连接后的数据集包括来自内连接的所有行加上来自右边数据集的连接表达式的计算结果为False的所有行。对于那些不匹配的行,它将为左边数据集的列填充NULL值。例如,对employeeDF和deptDF执行右外连接,代码如下:
# 连接类型既可以是"right_outer",也可以是"rightouter"
employeeDF \
.join(deptDF, col("dept_no") == col("id"), "right_outer") \
.show()
# 或者使用SQL
spark.sql("""
select *
from employees
RIGHT OUTER JOIN departments
on dept_no == id
""").show()
+--------+-------+---+----------+
|emp_name|dept_no| id| name|
+--------+-------+---+----------+
| 刘宏明| 31| 31| 销售部|
| 黄海波| 33| 33| 工程部|
| 赵薇| 33| 33| 工程部|
| 楼一萱| 34| 34| 财务部|
| 杨幂| 34| 34| 财务部|
| null| null| 35| 市场营销部|
+--------+-------+---+----------+
+--------+-------+----+----------+
|emp_name|dept_no| id| name|
+--------+-------+----+----------+
| 龙梅子| 0| null| null|
| 杨幂| 34| 34| 财务部|
| 楼一萱| 34| 34| 财务部|
| 刘宏明| 31| 31| 销售部|
| 赵薇| 33| 33| 工程部|
| 黄海波| 33| 33| 工程部|
| null| null| 35| 市场营销部|
+--------+-------+----+----------+
这种join类型能够发现来自左边数据集的哪些行在右边的数据集上没有任何匹配的行,而连接后的数据集只包含来自左边数据集的列。例如,对employeeDF和deptDF执行左反连接,代码如下:
# 使用join转换
employeeDF.join(deptDF, col("dept_no") == col("id"), "left_anti").show()
# 或者使用SQL
spark.sql("""
select *
from employees
LEFT ANTI JOIN departments
on dept_no == id
""").show()
这种join类型的行为类似于内连接类型,除了连接后的数据集不包括来自右边数据集的列。可以将这种join类型看作与左反连接类型相反,在这里,连接后的数据集只包含匹配的行。例如,对employeeDF和deptDF执行左半连接,代码如下:
# 使用join转换
employeeDF.join(deptDF, col("dept_no") == col("id"), "left_semi").show()
# 使用SQL
spark.sql("""
select *
from employees
LEFT SEMI JOIN departments
on dept_no == id
""").show()
# 使用crossJoin transformation 并显示该count
print(employeeDF.crossJoin(deptDF).count()) # 24
# 使用SQL,并显示前30行以观察连接后的数据集中所有的行
spark.sql("select * from employees CROSS JOIN departments").show(30)
+--------+-------+---+----------+
|emp_name|dept_no| id| name|
+--------+-------+---+----------+
| 刘宏明| 31| 31| 销售部|
| 刘宏明| 31| 33| 工程部|
| 刘宏明| 31| 34| 财务部|
| 刘宏明| 31| 35| 市场营销部|
| 赵薇| 33| 31| 销售部|
| 赵薇| 33| 33| 工程部|
| 赵薇| 33| 34| 财务部|
| 赵薇| 33| 35| 市场营销部|
| 黄海波| 33| 31| 销售部|
| 黄海波| 33| 33| 工程部|
| 黄海波| 33| 34| 财务部|
| 黄海波| 33| 35| 市场营销部|
| 杨幂| 34| 31| 销售部|
| 杨幂| 34| 33| 工程部|
| 杨幂| 34| 34| 财务部|
| 杨幂| 34| 35| 市场营销部|
| 楼一萱| 34| 31| 销售部|
| 楼一萱| 34| 33| 工程部|
| 楼一萱| 34| 34| 财务部|
| 楼一萱| 34| 35| 市场营销部|
| 龙梅子| 0| 31| 销售部|
| 龙梅子| 0| 33| 工程部|
| 龙梅子| 0| 34| 财务部|
| 龙梅子| 0| 35| 市场营销部|
+--------+-------+---+----------+
|-- emp_name: string (nullable = true)
|-- dept_no: long (nullable = false)
|-- id: long (nullable = false)
|-- name: string (nullable = true)
|-- dept_no: long (nullable = false)
注意,dupNameDF现在有两个名称相同的列,都叫dept_no。当试图在dupNameDF中投影dept_no列时,PySpark会抛出一个错误。例如,选择dept_no列,代码如下:
dupNameDF.select("dept_no")
# 解决方法二:join之前重命名列,使用withColumnRenamed转换
deptDF3 = deptDF2.withColumnRenamed("dept_no","dept_id")
deptDF3.printSchema()
dupNameDF3 = employeeDF.join(deptDF3, col('dept_no') == col('dept_id'))
dupNameDF3.printSchema()
dupNameDF3.select("dept_no").show()
|-- dept_no: long (nullable = false)
|-- emp_name: string (nullable = true)
|-- id: long (nullable = false)
|-- name: string (nullable = true)
+-------+--------+---+------+
|dept_no|emp_name| id| name|
+-------+--------+---+------+
| 31| 刘宏明| 31| 销售部|
| 33| 赵薇| 33| 工程部|
| 33| 黄海波| 33| 工程部|
| 34| 杨幂| 34| 财务部|
| 34| 楼一萱| 34| 财务部|
+-------+--------+---+------+