Spark-Streaming如何处理数据到mysql中
发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,本篇内容主要讲解"Spark-Streaming如何处理数据到mysql中",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Spark-Streaming如何
千家信息网最后更新 2025年01月23日Spark-Streaming如何处理数据到mysql中
本篇内容主要讲解"Spark-Streaming如何处理数据到mysql中",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Spark-Streaming如何处理数据到mysql中"吧!
1.说明
数据表如下:
create database test;use test;DROP TABLE IF EXISTS car_gps;CREATE TABLE IF NOT EXISTS car_gps(deployNum VARCHAR(30) COMMENT '调度编号',plateNum VARCHAR(10) COMMENT '车牌号',timeStr VARCHAR(20) COMMENT '时间戳',lng VARCHAR(20) COMMENT '经度',lat VARCHAR(20) COMMENT '纬度',dbtime TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '数据入库时间',PRIMARY KEY(deployNum, plateNum, timeStr))
2.编写程序
首先引入mysql的驱动
mysql mysql-connector-java 5.1.44
2.1 jdbc写入mysql
package com.hoult.Streaming.workimport java.sql.{Connection, DriverManager, PreparedStatement}import java.util.Propertiesimport com.hoult.structed.bean.BusInfoimport org.apache.spark.sql.ForeachWriterclass JdbcHelper extends ForeachWriter[BusInfo] { var conn: Connection = _ var statement: PreparedStatement = _ override def open(partitionId: Long, epochId: Long): Boolean = { if (conn == null) { conn = JdbcHelper.openConnection } true } override def process(value: BusInfo): Unit = { //把数据写入mysql表中 val arr: Array[String] = value.lglat.split("_") val sql = "insert into car_gps(deployNum,plateNum,timeStr,lng,lat) values(?,?,?,?,?)" statement = conn.prepareStatement(sql) statement.setString(1, value.deployNum) statement.setString(2, value.plateNum) statement.setString(3, value.timeStr) statement.setString(4, arr(0)) statement.setString(5, arr(1)) statement.executeUpdate() } override def close(errorOrNull: Throwable): Unit = { if (null != conn) conn.close() if (null != statement) statement.close() }}object JdbcHelper { var conn: Connection = _ val url = "jdbc:mysql://hadoop1:3306/test?useUnicode=true&characterEncoding=utf8" val username = "root" val password = "123456" def openConnection: Connection = { if (null == conn || conn.isClosed) { val p = new Properties Class.forName("com.mysql.jdbc.Driver") conn = DriverManager.getConnection(url, username, password) } conn }}
2.2 通过foreach来写入mysql
package com.hoult.Streaming.workimport com.hoult.structed.bean.BusInfoimport org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession}object KafkaToJdbc { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME", "root") //1 获取sparksession val spark: SparkSession = SparkSession.builder() .master("local[*]") .appName(KafkaToJdbc.getClass.getName) .getOrCreate() spark.sparkContext.setLogLevel("WARN") import spark.implicits._ //2 定义读取kafka数据源 val kafkaDf: DataFrame = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "linux121:9092") .option("subscribe", "test_bus_info") .load() //3 处理数据 val kafkaValDf: DataFrame = kafkaDf.selectExpr("CAST(value AS STRING)") //转为ds val kafkaDs: Dataset[String] = kafkaValDf.as[String] //解析出经纬度数据,写入redis //封装为一个case class方便后续获取指定字段的数据 val busInfoDs: Dataset[BusInfo] = kafkaDs.map(BusInfo(_)).filter(_ != null) //将数据写入MySQL表 busInfoDs.writeStream .foreach(new JdbcHelper) .outputMode("append") .start() .awaitTermination() }}
2.4 创建topic和从消费者端写入数据
kafka-topics.sh --zookeeper linux121:2181/myKafka --create --topic test_bus_info --partitions 2 --replication-factor 1kafka-console-producer.sh --broker-list linux121:9092 --topic test_bus_info
到此,相信大家对"Spark-Streaming如何处理数据到mysql中"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
数据
内容
时间
学习
实用
更深
兴趣
字段
实用性
实际
操作简单
数据源
数据表
方法
更多
朋友
消费者
程序
纬度
经度
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
计算机网络技术学的是什么东西
网络技术操作说明
网络安全班会感想作文
初二日记软件开发
铁路最近网络安全
crs网络安全
南京教培软件开发
信息网络安全战略大数据
关于网络安全意识的提问问题
数据库网络安全技术与应用
我的世界怎么创造服务器
网络安全行为审计系统视频
江苏信创网络安全
郑州安卓应用软件开发费用多少
ftp服务器配置要求高不
服务器要求用户名和密码
本地https服务器搭建
运城网络安全
杭州网络安全大专班
基因组数据库高中生物
网络安全手抄报二年级简单
网络技术服务的范畴
数据库转库
数据库 市场份额
有关网络安全的政治小论文
yml存进数据库
内外网打印机共享网络安全
php写加密数据库
腾讯云如何打开服务器管理
茂名软件开发公司企业