data = spark.read.csv(csv_path, sep = '', header = True) #读取csv文件
data = spark.read.load(parquet_path) #读取parquet文件
数据修改列名
columnName_new = []
data = reduce(lambda data, i: data.withColumnRenamed(data.columns[i], columnName_new[i]), xrange(len(columnName_new)), data)
data.show() #数据显示首20行
data.columns #数据的列名
data.withColumnsRenamed(oldName, newName) #更换数据列名
data.filter() #数据按条件筛选
data.groupBy() #数据分组统计
data.orderBy(,ascending = False) #数据按特征进行排序
data.collect() #返回输出一个list
data[data.id.isin([str(i.id) for i in user1.collect()])].show() #查询data1中id属于user1中id的数据
udf的使用
###例子:将hive中的字符串转化为为列表,展开并获取所需的内容
from pyspark.sql.functions import lit, udf
def getJsonList(d, v):
dd = eval(str(d))
except:
ff = float(np.nan)
else:
for di in dd:
if di['node_type'] == v:
ff = di['id']
return str(ff)
spark.udf.register('getJsonList',getJsonList)
###例子:将数据转化为指定格式的array
from pyspark.sql.types import DoubleType
from pyspark.ml.linalg import Vectors, VectorUDT
def ith_(v, i):
return float(v[i])
except ValueError:
return None
ith = udf(ith_, DoubleType())
a.select(ith("scaled", lit(0)).cast('float').alias('feature'))
# 要求字符串a的格式必须是数组或者列表格式, b的格式必须是数值型
to_vector1 = udf(lambda a: Vectors.parse(a), VectorUDT())
to_vector2 = udf(lambda b: Vectors.dense(b), VectorUDT())
a.select(to_vector2('feature').alias('feat')).show()
###例子:将数据标准化
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import Normalizer
from pyspark.ml.feature import MinMaxScaler
scaler = StandardScaler(inputCol="features", outputCol="scaled", withStd=True, withMean=False)
#scaler = Normalizer(inputCol="features", outputCol="scaledFeatures", p=1.0)
#scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
scaler_data = scaler.fit(dataFrame).transform(dataFrame)
scaler_data.select(
ith("scaled", lit(4)).cast('float').alias('progress_diff_scalar'))
###例子:Python连接mongo库
import commands
from pymongo import MongoClient
from ConfigParser import ConfigParser
def getSubContentType(path):
#连接mongo的相关配置
config_path = '/opt/develop/homework/conf/mongo.cfg'
config = ConfigParser()
config.readfp(open(config_path))
user_name = config.get('mongodb-online','user')
password = config.get('mongodb-online','pw')
ip = config.get('mongodb-online','ip')
mongo_conn = 'mongodb://{un}:{pw}@{ip}/?authSource={auth}'.format(un=user_name,pw=password, ip=ip,auth="admin")
#连接mongo表
print '可用的题目类型sub_content_type_id:'
output = open(path + "/sub_content_type_ids.txt", "w")
client = MongoClient(mongo_conn)
db = client['vox-question']
posts = db.venus_sub_content_type
row = posts.find()
sub_content_type_ids = ()
for line in row:
#print line
sub_content_type_id = line["sub_content_type_id"]
print '------', sub_content_type_id
sub_content_type_ids += (str(sub_content_type_id),)
output.write(str(sub_content_type_id) + "\n")
return sub_content_type_ids
###例子:Python连接mysql
import commands
import smtplib
online_sql_conn = "mysql -h10.0.1.54 -P5001 -udata.athena.2016 -pdata.DATA.17.PWD.ld -e "
def get_database_table_line_num(online_sql_conn, select_sql):
print "online_sql_conn + select_sql:\t", online_sql_conn + select_sql
status, output = commands.getstatusoutput(online_sql_conn + select_sql)
if status == 0:
table_line_num = output.split('\n')[1]
return table_line_num
else:
return -1
select_sql = "\"select count(1) from HS_Athena_Model.mdl_teacher_auth_score where auth_status = 0 and predict = 1 and dt = '" + yesterday + "';\""
table_line_num = get_database_table_line_num(online_sql_conn, select_sql)
###例子:Python发送邮件
import smtplib
import sys
from email.mime.text import MIMEText
class SendMail():
def __init__(self):
self.sender = '[email protected]'
self.receiver_list = ['[email protected]', '[email protected]']
self.smtpserver = '10.0.1.101'
self.username = '[email protected]'
self.password = 'abc17zuoye'
self.mail_postfix = '17zuoye.com'
def send_email(self, subject, content):
#主题乱码解决
if not isinstance(subject, unicode):
subject = unicode(subject)
me = "Data Management Center" + "<" + self.username + ">"
#邮件正文乱码
msg = MIMEText(content, format, 'utf-8')
msg['Accept-Language'] = "zh-CN"
msg["Accept-Charset"] = "ISO-8859-1,utf-8"
msg['Subject'] = subject.encode('utf8')
msg['Frome'] = me
msg['To'] = ";".join(self.receiver_list)
smtp = smtplib.SMTP()
smtp.connect(self.smtpserver, 25)
smtp.sendmail(self.sender.encode('utf8'), self.receiver_list, msg.as_string().encode('utf8'))
smtp.quit()
return True
except Exception, e:
print str(e)
return False
send_email_ins.send_email(subject, content)