添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

前言

Pandas一直是非常受欢迎的数据分析利器,它基于Numpy,专为解决数据分析任务。因其基于Python,只能单节点单核心运行,所以在大数据分析场景下,瓶颈很明显。PySpark是基于Spark JavaClient的上层接口,可以结合Python语言以及Spark分布式运行的特点,来解决Pandas在大数据下的瓶颈。本篇文章主要对比Pandas API与PySparkAPI,总结一些Pandas应用场景下使用PySpark提高效率的方案。
本篇主要是对比Pandas和PySpark的API使用,但不能对它们众多API做一一对比介绍,所以对于PySpark的更多API使用请参考:
pyspark.sql官方使用文档

Pandas PySpark 多节点内存和磁盘 大数据支持 数据处理方式 懒加载+优化无用操作 DataFrame
  • 需要对大量数据进行分析的场景下,在大数据处理的源头必须使用PySpark
  • 数据经过一系列操作、聚合后数据量减少,且 迫不得已 用Pandas的情况下再使用Pandas( 用Pandas处理的数据尽量更少 )
  • 如果可以,尽量全程使用PySpark进行分析操作
  • 需要对计算复杂且耗时的Sparkdataframe进行cache避免重算提高效率
  • 尽可能将一段处理逻辑写到一段SQL中,而非得到多个Dataframe然后进行join
  • 数据创建

    文中所有Spark Dataframe对象简称 df ,Pandas的Dataframe对象简称 pd_df

  • Pandas

    pd_df = pd.read_csv('/datas/root/csv_data/csv_file.csv')   # 1.读本地csv数据源
    pd_df = spark.sql("select col1,col2 from table").to_pandas # 2.读Hive数据源
    pd_df = spark.sql("select * from table").to_pandas # 3.读Hive整个表
    # 4.读MySQL表数据
    pd_df = pd.read_sql('select * from table', con=pymysql.connect(host="localhost",user=username,passwd=password,db=database_name,charset="utf8"))
    # 5.从list,set,dict创建dataftame
    pd_df = pd.DataFrame({"id":[1,2,3,4,5],"name":['qjj','zxw','zzz','abc',np.nan]})
    # 6.读json
    pd_df = pd.read_json('/datas/root/csv_data/json_file')
    # zeros创建指定shape的带0的ndarray
    pd_df = np.zeros((5,3), dtype='int64')  #5 行 3 列
  • PySpark

    df = spark.read.option('inferSchema',"true").option("header", "true").csv('/data/data_test/csv_file.csv')   # 1.读HDFS上csv数据源
    df = spark.read.csv("file:///a.csv") # 读本地csv 路径/a.csv 
    df = spark.sql("select col1,col2 from table") # 2.读Hive数据源
    df = spark.table('table') # 3.读Hive整个表
    # 4.读MySQL表数据 
    conf = {
      "driver": "com.mysql.jdbc.Driver",
      "url": "jdbc:mysql://cdh101:3306/",
      "dbtable": 'test.a',
      "user": 'root',
      "password": '123456',
    df = spark.read.format("jdbc").options(**conf).load()
    # 5.从list,set,dict创建dataftame
    df = spark.createDataFrame(pd.DataFrame({"id":[1,2,3,4,5],"name":['qjj','zxw','zzz','abc',None]}))  或
    df = spark.createDataFrame([(1,'qjj'),(2,'zxw'),(3,'zzz'),(4,'abc')], ['id', 'name'])
    # 6.读json文件
    df = spark.read.json('/datas/root/csv_data/json_file')
    # 7.从Parquet创建数据
    df = spark.read.parquet("...")
    df = spark.read.format('parquet').load('parquet_file'),opt...)
    # 8.从ORC创建数据
    df = spark.read.orc('...')
    # 9.从text创建数据
    df = spark.read.text('...')
    # 10.创建指定shape的带0的dataframe
    df = spark.createDataFrame([[0 for i in range(3)] for i in range(5)])  #5 行 3 列
    # 创建数据并指定字段名(Schema)
    from pyspark.sql.types import *
    schema = StructType().add('col1', StringType(), True).add('col2', IntegerType())  # True是否可以为空
    df = spark.createDataFrame([('aaa', 1),('bbb', 2)], schema=schema)
  • Pandas
    index索引:自动创建
    行结构:Series结构,属于Pandas DataFrame
    列结构:Column结构,属于Pandas DataFrame

    pd_df['col'] = 0  # 列添加
    pd_df['col'] = 1  # 列修改
    pd_df.rename(columns={'col':'new_col','xx':'xxx'})  # 重命名列名
    pd_df.columns=['col1','col2','col3']  # 重命名列名
    pd_df.dtypes  # 查看字段和类型
    pd_df.drop(columns=['col', 'name'])  # 删除字段col
  • PySpark
    index索引:无
    行结构:Row对象,属于Spark DataFrame
    列结构:Column对象,属于Spark DataFrame

    from pyspark.sql.functions import lit
    df = df.withColumn("col", lit(0))  # 列添加
    df = df.withColumn("col", lit(1))  # 列修改
    df = df.withColumnRenamed('col', 'new_col').withColumnRenamed('col1', 'new_col1')  # 重命名列名
    df.dtypes  # 查看字段和类型
    df.printSchema() # 打印字段和类型-树形
    df.drop('col', 'name')  # 删除字段col
  • Pandas

    pd.set_option('max_rows',1024)  # 最多显示1024行不隐藏
    pd.set_option('max_columns',1024)  # 最多显示1024列不隐藏
    pd_df或print(pd_df)
  • PySpark

    df.show()  # 打印前20行且每个字段打印不超过20字符
    df.show(30)  # 打印前30行且每个字段打印不超过20字符
    df.show(100,False)  # 打印前100行且每个字段打印字符数不限(不隐藏)
  • Pandas

    pd_df.sort_index(by='score', ascending=False) # 按轴(字段score)进行倒序排序
    pd_df.sort_index(by='score', ascending=False).reset_index() # 按轴(字段score)进行倒序排序,排序后index会乱序,重设index为顺序
    pd_df.sort_values(by='score') # 在列中按值进行排序
  • PySpark

    df.sort('score', ascending=False) # 按列(score字段)倒序排序
    df.orderBy('score') # 按列(score字段)顺序排序

    交集并集差集

  •