博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
简单的Spark+Mysql整合开发
阅读量:6411 次
发布时间:2019-06-23

本文共 4150 字,大约阅读时间需要 13 分钟。

hot3.png

    今天简单说下Spark和Mysql的整合开发,首先要知道:在Spark中提供了一个JdbcRDD类,该RDD就是读取JDBC中的数据并转换成RDD,之后我们就可以对该RDD进行各种的操作,该类的构造函数如下:

JdbcRDD[T: ClassTag](    sc: SparkContext,    getConnection: () => Connection,    sql: String,    lowerBound: Long,    upperBound: Long,    numPartitions: Int,    mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)

    参数:

    (1)getConnection 返回一个已经打开的结构化数据库连接,JdbcRDD会自动维护关闭。

 (2)sql 是查询语句,此查询语句必须包含两处占位符?来作为分割数据库ResulSet的参数,例如:"select title, author from books where ? < = id and id <= ?"
 (3)lowerBound, upperBound, numPartitions 分别为第一、第二占位符,partition的个数。例如,给出lowebound 1,upperbound 20, numpartitions 2,则查询分别为(1, 10)与(11, 20)
 (4)mapRow 是转换函数,将返回的ResultSet转成RDD需用的单行数据,此处可以选择Array或其他,也可以是自定义的case class。默认的是将ResultSet 转换成一个Object数组。

    下面是动手实践,我的开发环境是:

    虚拟机CentOs7系统,IDEA,JDK8,Scala 2.11,Spark 2.0.1,一些基本环境问题这里就不再叙述了。

    本人使用的是maven,创建maven项目,初始化并添加依赖,下面是pom.xml:

4.0.0
JdbcRdd
Demo
1.0-SNAPSHOT
2018
2.11.8
scala-tools.org
Scala-Tools Maven2 Repository
http://scala-tools.org/repo-releases
scala-tools.org
Scala-Tools Maven2 Repository
http://scala-tools.org/repo-releases
org.scala-lang
scala-library
${scala.version}
org.apache.spark
spark-core_2.11
2.0.1
mysql
mysql-connector-java
5.1.25
src/main/scala
src/test/scala
org.scala-tools
maven-scala-plugin
compile
testCompile
${scala.version}
-target:jvm-1.5
org.apache.maven.plugins
maven-eclipse-plugin
true
ch.epfl.lamp.sdt.core.scalabuilder
ch.epfl.lamp.sdt.core.scalanature
org.eclipse.jdt.launching.JRE_CONTAINER
ch.epfl.lamp.sdt.launching.SCALA_CONTAINER
org.scala-tools
maven-scala-plugin
${scala.version}

    新建scala的Object类,如下:

package JdbcRddimport java.sql.DriverManagerimport org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.rdd.JdbcRDDobject SparkToJdbc {  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setAppName("mysql").setMaster("local")    val sc = new SparkContext(conf)    val rdd = new JdbcRDD(      sc,()=>{        Class.forName("com.mysql.jdbc.Driver").newInstance()        DriverManager.getConnection("jdbc:mysql://连接的IP:3306/连接的数据库名", "用户名", "密码")      },      "SELECT CATEGORY FROM nyw_knowledges WHERE COMPANY_CODE >= ? AND COMPANY_CODE <= ?",      1000, 1200, 3,      r => r.getString(1)).cache()    val rd = rdd.filter(_.contains("咨询")).count()    println(rd)    sc.stop()  }}

    这里基本的代码就这些,连接数据库后对表进行操作。

    注意:这里可能会出现几个问题,需要慎重处理:

    (1)内存问题:如果内存不够,则需要重新设置,本人使用的是运行时配置:

181501_eEct_3747963.png

    也可以用另一种方式,在代码中配置,

    可参考:。

    (2)数据库访问限制问题

    报错:java.sql.SQLException: null, message from server: “Host ‘xxx’ is not allowed to connect,该问题是由于本机的访问权限未开放,需要进行设置。

181825_bJEA_3747963.png

    可参考:。

    (3)mysql Driver依赖未添加报错

mysql
mysql-connector-java
5.1.25

    结果:

182102_94OZ_3747963.png

    通过访问Spark web UI的地址:localhost:4040能够清楚的查看具体的spark参数,大功告成。

转载于:https://my.oschina.net/u/3747963/blog/1630593

你可能感兴趣的文章
scope
查看>>
一起谈.NET技术,晚绑定场景下对象属性赋值和取值可以不需要PropertyInfo
查看>>
一起谈.NET技术,.Net Framework源代码中的模式之Prototype(原型模式)
查看>>
[shell 命令] find 查找文件
查看>>
windows下启动mysql服务的命令行启动和手动启动方法
查看>>
VTK三维点集轮廓凸包提取
查看>>
【概率论与数理统计】小结9-3 - 区间估计
查看>>
Golang性能调优入门
查看>>
sqlloader外部表
查看>>
golang笔记——数组与切片
查看>>
屏蔽可忽略的js脚本错误
查看>>
【Vue】vue.js常用指令
查看>>
NFS学习
查看>>
MySql常用命令总结
查看>>
又一年...
查看>>
文件上传框的美化+预览+ajax
查看>>
Linux VFS
查看>>
ext不能选中复制属性_如何实现Extjs的grid单元格只让选择(即可以复制单元格内容)但是不让修改?...
查看>>
python中print的作用*8、不能+8_在 Python 3.x 中语句 print(*[1,2,3]) 不能正确执行。 (1.0分)_学小易找答案...
查看>>
python 生成html代码_使用Python Markdown 生成 html
查看>>