博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark SQL External Data Sources JDBC官方实现写测试
阅读量:5039 次
发布时间:2019-06-12

本文共 3764 字,大约阅读时间需要 12 分钟。

通过Spark SQL External Data Sources JDBC实现将RDD的数据写入到MySQL数据库中。

jdbc.scala重要API介绍:

/** * Save this RDD to a JDBC database at `url` under the table name `table`. * This will run a `CREATE TABLE` and a bunch of `INSERT INTO` statements. * If you pass `true` for `allowExisting`, it will drop any table with the * given name; if you pass `false`, it will throw if the table already * exists. */def createJDBCTable(url: String, table: String, allowExisting: Boolean) /** * Save this RDD to a JDBC database at `url` under the table name `table`. * Assumes the table already exists and has a compatible schema.  If you * pass `true` for `overwrite`, it will `TRUNCATE` the table before * performing the `INSERT`s. * * The table must already exist on the database.  It must have a schema * that is compatible with the schema of this RDD; inserting the rows of * the RDD in order via the simple statement * `INSERT INTO table VALUES (?, ?, ..., ?)` should not fail. */def insertIntoJDBC(url: String, table: String, overwrite: Boolean)

 

import org.apache.spark.sql.SQLContext  import org.apache.spark.sql.Rowimport org.apache.spark.sql.types._val sqlContext  = new SQLContext(sc)import sqlContext._#数据准备val url = "jdbc:mysql://hadoop000:3306/test?user=root&password=root"val arr2x2 = Array[Row](Row.apply("dave", 42), Row.apply("mary", 222))val arr1x2 = Array[Row](Row.apply("fred", 3))val schema2 = StructType(StructField("name", StringType) :: StructField("id", IntegerType) :: Nil)val arr2x3 = Array[Row](Row.apply("dave", 42, 1), Row.apply("mary", 222, 2))val schema3 = StructType(StructField("name", StringType) :: StructField("id", IntegerType) :: StructField("seq", IntegerType) :: Nil) import org.apache.spark.sql.jdbc._================================CREATE======================================val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2)srdd.createJDBCTable(url, "person", false)sqlContext.jdbcRDD(url, "person").collect.foreach(println)[dave,42][mary,222]==============================CREATE with overwrite========================================val srdd = sqlContext.applySchema(sc.parallelize(arr2x3), schema3)srdd.createJDBCTable(url, "person2", false)sqlContext.jdbcRDD(url, "person2").collect.foreach(println)[mary,222,2][dave,42,1]val srdd2 = sqlContext.applySchema(sc.parallelize(arr1x2), schema2)srdd2.createJDBCTable(url, "person2", true)sqlContext.jdbcRDD(url, "person2").collect.foreach(println)[fred,3]================================CREATE then INSERT to append======================================val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2)val srdd2 = sqlContext.applySchema(sc.parallelize(arr1x2), schema2)srdd.createJDBCTable(url, "person3", false)sqlContext.jdbcRDD(url, "person3").collect.foreach(println)[mary,222][dave,42]srdd2.insertIntoJDBC(url, "person3", false)sqlContext.jdbcRDD(url, "person3").collect.foreach(println)[mary,222][dave,42][fred,3]================================CREATE then INSERT to truncate======================================val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2)val srdd2 = sqlContext.applySchema(sc.parallelize(arr1x2), schema2)srdd.createJDBCTable(url, "person4", false)sqlContext.jdbcRDD(url, "person4").collect.foreach(println)[dave,42][mary,222]srdd2.insertIntoJDBC(url, "person4", true)[fred,3]================================Incompatible INSERT to append======================================val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2)val srdd2 = sqlContext.applySchema(sc.parallelize(arr2x3), schema3)srdd.createJDBCTable(url, "person5", false)srdd2.insertIntoJDBC(url, "person5", true)    java.sql.SQLException: Column count doesn't match value count at row 1

 

转载于:https://www.cnblogs.com/luogankun/p/4275213.html

你可能感兴趣的文章
2011-07-06 11:19 Hibernate提供了3种检索策略
查看>>
CSS Hacker
查看>>
有关Botton的用法(一)
查看>>
前端jquery一些基本语法
查看>>
单表入库最快的方法
查看>>
线性的数据结构
查看>>
使用MQ消息队列的优缺点
查看>>
SQL Server 表的管理_关于数据增删查改的操作的详解(案例代码)
查看>>
Win8Metro(C#)数字图像处理--2.5图像亮度调整
查看>>
SQLServer2005数据库快照的简单使用
查看>>
ASP.NET MVC 5 入门教程 (4) View和ViewBag
查看>>
T-SQL性能调整——信息收集
查看>>
Powerdesigner 16.5 从SQL Server 2012做逆向工程时提示:Unable to list tables问题
查看>>
NSIS:卸载时选择组件
查看>>
程序员最新笑话集锦
查看>>
10.29
查看>>
Linux(CentOS)下安装Zend Framework
查看>>
ArcEngine 数据导入经验(一)
查看>>
LINQ 【增、删、改、查】数据绑定
查看>>
ubuntu 14.04中Elasticsearch 2.3 中 Nginx 权限认证
查看>>