通过Spark SQL External Data Sources JDBC实现将RDD的数据写入到MySQL数据库中。
jdbc.scala重要API介绍:
/** * Save this RDD to a JDBC database at `url` under the table name `table`. * This will run a `CREATE TABLE` and a bunch of `INSERT INTO` statements. * If you pass `true` for `allowExisting`, it will drop any table with the * given name; if you pass `false`, it will throw if the table already * exists. */def createJDBCTable(url: String, table: String, allowExisting: Boolean) /** * Save this RDD to a JDBC database at `url` under the table name `table`. * Assumes the table already exists and has a compatible schema. If you * pass `true` for `overwrite`, it will `TRUNCATE` the table before * performing the `INSERT`s. * * The table must already exist on the database. It must have a schema * that is compatible with the schema of this RDD; inserting the rows of * the RDD in order via the simple statement * `INSERT INTO table VALUES (?, ?, ..., ?)` should not fail. */def insertIntoJDBC(url: String, table: String, overwrite: Boolean)
import org.apache.spark.sql.SQLContext import org.apache.spark.sql.Rowimport org.apache.spark.sql.types._val sqlContext = new SQLContext(sc)import sqlContext._#数据准备val url = "jdbc:mysql://hadoop000:3306/test?user=root&password=root"val arr2x2 = Array[Row](Row.apply("dave", 42), Row.apply("mary", 222))val arr1x2 = Array[Row](Row.apply("fred", 3))val schema2 = StructType(StructField("name", StringType) :: StructField("id", IntegerType) :: Nil)val arr2x3 = Array[Row](Row.apply("dave", 42, 1), Row.apply("mary", 222, 2))val schema3 = StructType(StructField("name", StringType) :: StructField("id", IntegerType) :: StructField("seq", IntegerType) :: Nil) import org.apache.spark.sql.jdbc._================================CREATE======================================val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2)srdd.createJDBCTable(url, "person", false)sqlContext.jdbcRDD(url, "person").collect.foreach(println)[dave,42][mary,222]==============================CREATE with overwrite========================================val srdd = sqlContext.applySchema(sc.parallelize(arr2x3), schema3)srdd.createJDBCTable(url, "person2", false)sqlContext.jdbcRDD(url, "person2").collect.foreach(println)[mary,222,2][dave,42,1]val srdd2 = sqlContext.applySchema(sc.parallelize(arr1x2), schema2)srdd2.createJDBCTable(url, "person2", true)sqlContext.jdbcRDD(url, "person2").collect.foreach(println)[fred,3]================================CREATE then INSERT to append======================================val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2)val srdd2 = sqlContext.applySchema(sc.parallelize(arr1x2), schema2)srdd.createJDBCTable(url, "person3", false)sqlContext.jdbcRDD(url, "person3").collect.foreach(println)[mary,222][dave,42]srdd2.insertIntoJDBC(url, "person3", false)sqlContext.jdbcRDD(url, "person3").collect.foreach(println)[mary,222][dave,42][fred,3]================================CREATE then INSERT to truncate======================================val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2)val srdd2 = sqlContext.applySchema(sc.parallelize(arr1x2), schema2)srdd.createJDBCTable(url, "person4", false)sqlContext.jdbcRDD(url, "person4").collect.foreach(println)[dave,42][mary,222]srdd2.insertIntoJDBC(url, "person4", true)[fred,3]================================Incompatible INSERT to append======================================val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2)val srdd2 = sqlContext.applySchema(sc.parallelize(arr2x3), schema3)srdd.createJDBCTable(url, "person5", false)srdd2.insertIntoJDBC(url, "person5", true) java.sql.SQLException: Column count doesn't match value count at row 1