添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
相关文章推荐
温暖的雪糕  ·  zh-cn:安装 WordPress « ...·  1 周前    · 
大力的松鼠  ·  Mysql比较日期和时间 - ·  1 周前    · 
豪情万千的上铺  ·  mysql ...·  1 周前    · 
曾经爱过的松树  ·  批量 kill mysql ...·  3 天前    · 
安静的香菇  ·  Accueil − Insee − ...·  4 月前    · 
坐怀不乱的红金鱼  ·  AIGC ...·  7 月前    · 
眼睛小的乌冬面  ·  OpenAI’s new GPT 3.5 ...·  9 月前    · 

spark foreachPartition 把df 数据插入到mysql

package com.waitingfy
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import scala.collection.mutable.ListBuffer
object foreachPartitionTest {
  case class TopSongAuthor(songAuthor:String, songCount:Long)
  def getConnection() = {
    DriverManager.getConnection("jdbc:mysql://localhost:3306/baidusong?user=root&password=root&useUnicode=true&characterEncoding=UTF-8")
  def release(connection: Connection, pstmt: PreparedStatement): Unit = {
    try {
      if (pstmt != null) {
        pstmt.close()
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      if (connection != null) {
        connection.close()
  def insertTopSong(list:ListBuffer[TopSongAuthor]):Unit ={
     var connect:Connection = null
     var pstmt:PreparedStatement = null
         connect = getConnection()
       connect.setAutoCommit(false)
       val sql = "insert into topSinger(song_author, song_count) values(?,?)"
       pstmt = connect.prepareStatement(sql)
       for(ele <- list){
          pstmt.setString(1, ele.songAuthor)
          pstmt.setLong(2,ele.songCount)
          pstmt.addBatch()
       pstmt.executeBatch()
       connect.commit()
     }catch {
       case e:Exception => e.printStackTrace()
     }finally {
         release(connect, pstmt)
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local[2]")
      .appName("foreachPartitionTest")
      .getOrCreate()
    val gedanDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306").option("dbtable", "baidusong.gedan").option("user", "root").option("password", "root").option("driver", "com.mysql.jdbc.Driver").load()
//    mysqlDF.show()
    val detailDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306").option("dbtable", "baidusong.gedan_detail").option("user", "root").option("password", "root").option("driver", "com.mysql.jdbc.Driver").load()
    val joinDF = gedanDF.join(detailDF, gedanDF.col("id") === detailDF.col("gedan_id"))
//    joinDF.show()
    import spark.implicits._
    val resultDF = joinDF.groupBy("song_author").agg(count("song_name").as("song_count")).orderBy($"song_count".desc).limit(100)
//    resultDF.show()
    resultDF.foreachPartition(partitionOfRecords =>{
       val list = new ListBuffer[TopSongAuthor]
       partitionOfRecords.foreach(info =>{
           val song_author = info.getAs[String]("song_author")
           val song_count = info.getAs[Long]("song_count")
           list.append(TopSongAuthor(song_author, song_count))
      insertTopSong(list)