spark向mysql写入数据

    科技2024-07-01  68

    ```scala package TestSpark import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.log4j.{Level, Logger} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object WriteMysql { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.ERROR) val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("test") val sc = new SparkContext(conf) val url: String = "jdbc:mysql://localhost:3306/腾讯招聘?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT" val user: String = "root" val driver = "com.mysql.cj.jdbc.Driver" //com.mysql.cj.jdbc.Driver 高版本这样 val passwd = "123456" //rdd写入mysql数据库 val rdd: RDD[(Int, String)] = sc.makeRDD(Array((10, "tom"), (20, "Lisa"), (23, "UI"))) // Class.forName(driver) // val conn: Connection = DriverManager.getConnection(url, user, passwd) //连接对象在Drive端执行, // 而sql语句是在executors中执行,故报object not serializable(对象无法序列化异常) val sql = "insert into user values(?,?)" //mysql不需要table关键字 // rdd.foreach { // case (age, name) => { // //获取数据库连接对象 // Class.forName(driver) //对rdd迭代的次数就等于对mysql的连接次数(连接次数太多) // val conn: Connection = DriverManager.getConnection(url, user, passwd) // //sql语句预编译,JDBC存储过程,对于批处理很常用,Statement方法对于每一条sql语句都要编译(效率不高) // val statement: PreparedStatement = conn.prepareStatement(sql) // statement.setInt(1,age) // statement.setString(2,name) // statement.execute() // conn.close() // } // } // 优化1: 使用spark分区方式,按照一个分区产生一个mysql连接对象 // rdd.foreachPartition { x => // Class.forName(driver) // val conn: Connection = DriverManager.getConnection(url, user, passwd) // val statement: PreparedStatement = conn.prepareStatement(sql) // x.foreach { //对分区中的数据进行遍历,插入 // case (age, name) => { // statement.setInt(1, age) // statement.setString(2, name) // //添加一个批次 // statement.addBatch() //假如,一个分区中有上百万条数据,我需要将一个分区中的数据, // // 一次性加入到mysql中吗.这显然是不现实的 // } // } // //执行用批次来处理 // statement.executeBatch() // conn.close() // } //优化2: 判断分区中的数据是否达到一定程度 val rdd2: RDD[(Int, String)] = sc.makeRDD(1 to 1000000).map(x=>(x, "用户" + x.toString)) rdd2.foreachPartition { x => Class.forName(driver) val connection: Connection = DriverManager.getConnection(url, user, passwd) val statement: PreparedStatement = connection.prepareStatement(sql) var count = 0 x.foreach{ case (age,name)=>{ statement.setInt(1,age) statement.setString(2,name) statement.addBatch() count += 1 if (count % 100==0) { statement.executeBatch() Thread.sleep(3000) } } } //该语句必须有,如果count不是100的 整数倍就不加入数据了吗(该语句还是要有,首尾) statement.executeBatch() connection.close() } sc.stop() } }
    Processed: 0.009, SQL: 8