博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark Streaming实时流处理项目实战笔记——将统计结果写入到MySQL数据库中
阅读量:3959 次
发布时间:2019-05-24

本文共 2026 字,大约阅读时间需要 6 分钟。

思路

两种方式,一种可优化(foreachRDD后,直接创建连接Mysql),一种在(foreachRDD后通过foreachPartition,通过分区获取)

代码实现

import java.sql.DriverManagerimport Spark.UpdateStateByKey.workdsimport Spark.WordCount.sscimport org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}object MysqlByKey extends App{  val sparkConf = new SparkConf().setMaster("local[2]").setAppName("WordCount")  val ssc = new StreamingContext(sparkConf,Seconds(10))  // 第一点,如果要使用updateStateByKey算子,就必须设置一个checkpoint目录,开启checkpoint机制  // 这样的话才能把每个key对应的state除了在内存中有,那么是不是也要checkpoint一份  // 因为你要长期保存一份key的state的话,那么spark streaming是要求必须用checkpoint的,以便于在  // 内存数据丢失的时候,可以从checkpoint中恢复数据  // 开启checkpoint机制,很简单,只要调用jssc的checkpoint()方法,设置一个hdfs目录即可  ssc.checkpoint("E:/test")  // 实现基础的wordcount逻辑  val lines = ssc.socketTextStream("hadoop2", 9999)  //val lines = ssc.textFileStream("E:/test")  val words = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)  //将结果写入MySql  words.foreachRDD(rdd => rdd.foreachPartition(line => {    Class.forName("com.mysql.jdbc.Driver")    //获取mysql连接    val conn = DriverManager.getConnection("jdbc:mysql://192.168.57.101:3306/test", "root", "1234")    //把数据写入mysql    try {      for (row <- line) {        val sql = "insert into wordcount(word,wordcount)values('" + row._1 + "','" + row._2 + "')"        conn.prepareStatement(sql).executeUpdate()      }    } finally {      conn.close()    }  }))   /*方法二words.foreachRDD(rdd=>{     rdd.foreachPartition(partionOfRecords=>{       if(partionOfRecords.size>0){         val connection = createConnection()         partionOfRecords.foreach(record=>{           val sql = "insert into wordcount(word,wordcount) values("+record._1+","+record._2+")"           connection.createStatement().execute(sql)         })         connection.close()       }     })   })  //获取通过jdbc连接数据库  def createConnection()={    Class.forName("com.mysql.jdbc.Driver")    DriverManager.getConnection("jdbc:mysql://hadoop2:3306/test","root","1234")  }*/  words.print()  ssc.start()  ssc.awaitTermination()}

转载地址:http://qtazi.baihongyu.com/

你可能感兴趣的文章
[杂记] 新年物语&关于Mysql引擎性能测试
查看>>
[心得] 近期更新&关于Infobright
查看>>
[杂记] 流量统计 & 短信接口
查看>>
[Java] JRebel + Maven + Jetty 热部署
查看>>
[算法] 从 Memcached 分布式应用看一致性哈希散列函数的选择
查看>>
[中间件] 消息处理利器 ActiveMQ 的介绍 & Stomp 协议的使用
查看>>
[设计] 原型界面设计利器 Balsamiq Mockups 推荐
查看>>
[闲话] 在西方的程序员眼里,东方的程序员是什么样的
查看>>
[管理] 成功之路的探寻 —— “三力” 理论
查看>>
[连载] Socket 深度探索 4 PHP (一)
查看>>
[连载] Socket 深度探究 4 PHP (二)
查看>>
[连载] Socket 深度探究 4 PHP (三)
查看>>
[无线] Android 系统开发学习杂记
查看>>
[无线] 浅析当代 LBS 技术
查看>>
[杂感] 缅怀乔布斯
查看>>
[无线] 让Android支持cmwap上网
查看>>
[教程] Android PHP 最佳实践视频教程
查看>>
[无线] AndroidManifest.xml配置文件详解
查看>>
[无线] 2012 智能手机市场分析
查看>>
[移动] Android推送方案分析(MQTT/XMPP/GCM)
查看>>