添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
data = spark.read.csv(csv_path, sep = '', header = True)         #读取csv文件
data = spark.read.load(parquet_path)                             #读取parquet文件
  • 数据修改列名
  • columnName_new = []
    data = reduce(lambda data, i: data.withColumnRenamed(data.columns[i], columnName_new[i]), xrange(len(columnName_new)), data)
    
    data.show()                                #数据显示首20行
    data.columns                               #数据的列名
    data.withColumnsRenamed(oldName, newName)  #更换数据列名
    data.filter()                              #数据按条件筛选
    data.groupBy()                             #数据分组统计
    data.orderBy(,ascending = False)           #数据按特征进行排序
    data.collect()                             #返回输出一个list
    
    data[data.id.isin([str(i.id) for i in user1.collect()])].show()    #查询data1中id属于user1中id的数据
    
  • udf的使用
  • ###例子:将hive中的字符串转化为为列表,展开并获取所需的内容
    from pyspark.sql.functions import lit, udf
    def getJsonList(d, v):
            dd = eval(str(d))
        except:
            ff = float(np.nan)
        else:
            for di in dd:
                if di['node_type'] == v:
                    ff = di['id']
                    return str(ff)
    spark.udf.register('getJsonList',getJsonList)
    ###例子:将数据转化为指定格式的array
    from pyspark.sql.types import DoubleType 
    from pyspark.ml.linalg import Vectors, VectorUDT
    def ith_(v, i):
            return float(v[i])
        except ValueError:
            return None
    ith = udf(ith_, DoubleType())
    a.select(ith("scaled", lit(0)).cast('float').alias('feature'))
    # 要求字符串a的格式必须是数组或者列表格式, b的格式必须是数值型
    to_vector1 = udf(lambda a: Vectors.parse(a), VectorUDT())
    to_vector2 = udf(lambda b: Vectors.dense(b), VectorUDT())
    a.select(to_vector2('feature').alias('feat')).show()
    ###例子:将数据标准化
    from pyspark.ml.feature import StandardScaler
    from pyspark.ml.feature import Normalizer
    from pyspark.ml.feature import MinMaxScaler
    scaler = StandardScaler(inputCol="features", outputCol="scaled", withStd=True, withMean=False)
    #scaler = Normalizer(inputCol="features", outputCol="scaledFeatures", p=1.0)
    #scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
    scaler_data = scaler.fit(dataFrame).transform(dataFrame)
    scaler_data.select(
            ith("scaled", lit(4)).cast('float').alias('progress_diff_scalar'))
    ###例子:Python连接mongo库
    import commands
    from pymongo import MongoClient
    from ConfigParser import ConfigParser
    def getSubContentType(path):
        #连接mongo的相关配置
        config_path = '/opt/develop/homework/conf/mongo.cfg'
        config = ConfigParser()
        config.readfp(open(config_path))
        user_name = config.get('mongodb-online','user')
        password = config.get('mongodb-online','pw')
        ip = config.get('mongodb-online','ip')
        mongo_conn = 'mongodb://{un}:{pw}@{ip}/?authSource={auth}'.format(un=user_name,pw=password, ip=ip,auth="admin")
        #连接mongo表
        print '可用的题目类型sub_content_type_id:'
        output = open(path + "/sub_content_type_ids.txt", "w")
        client = MongoClient(mongo_conn)
        db = client['vox-question']
        posts = db.venus_sub_content_type
        row = posts.find()
        sub_content_type_ids = ()
        for line in row:
            #print line
            sub_content_type_id = line["sub_content_type_id"]
            print '------', sub_content_type_id
            sub_content_type_ids += (str(sub_content_type_id),)
            output.write(str(sub_content_type_id) + "\n")
        return sub_content_type_ids
    ###例子:Python连接mysql
    import commands
    import smtplib
    online_sql_conn = "mysql -h10.0.1.54 -P5001 -udata.athena.2016 -pdata.DATA.17.PWD.ld -e "
    def get_database_table_line_num(online_sql_conn, select_sql):
        print "online_sql_conn + select_sql:\t", online_sql_conn + select_sql
        status, output = commands.getstatusoutput(online_sql_conn + select_sql)
        if status == 0:
            table_line_num = output.split('\n')[1]
            return table_line_num
        else:
            return -1
    select_sql = "\"select count(1) from HS_Athena_Model.mdl_teacher_auth_score where auth_status = 0 and predict = 1 and dt = '" + yesterday + "';\""        
    table_line_num = get_database_table_line_num(online_sql_conn, select_sql)
    ###例子:Python发送邮件
    import smtplib
    import sys
    from email.mime.text import MIMEText
    class SendMail():
        def __init__(self):
            self.sender = '[email protected]'
            self.receiver_list = ['[email protected]', '[email protected]']
            self.smtpserver = '10.0.1.101'
            self.username = '[email protected]'
            self.password = 'abc17zuoye'
            self.mail_postfix = '17zuoye.com'
        def send_email(self, subject, content):
            #主题乱码解决
            if not isinstance(subject, unicode):
                subject = unicode(subject)
            me = "Data Management Center" + "<" + self.username + ">"
            #邮件正文乱码
            msg = MIMEText(content, format, 'utf-8')
            msg['Accept-Language'] = "zh-CN"
            msg["Accept-Charset"] = "ISO-8859-1,utf-8"
            msg['Subject'] = subject.encode('utf8')
            msg['Frome'] = me
            msg['To'] = ";".join(self.receiver_list)
                smtp = smtplib.SMTP()
                smtp.connect(self.smtpserver, 25)
                smtp.sendmail(self.sender.encode('utf8'), self.receiver_list, msg.as_string().encode('utf8'))
                smtp.quit()
                return True
            except Exception, e:
                print str(e)
                return False
    send_email_ins.send_email(subject, content)