Spark Streaming 实现数据实时统计案例
发表于:2025-01-19 作者:千家信息网编辑
千家信息网最后更新 2025年01月19日,Spark 是一个基于内存式的分布式计算框架。具有高性能,高效可扩展,容错等优点。今天讲解一下spark的流计算,其实它也不完全是实时的流计算,算是一种准实时的流计算。上图讲解运行环境:需要linux
千家信息网最后更新 2025年01月19日Spark Streaming 实现数据实时统计案例
Spark 是一个基于内存式的分布式计算框架。具有高性能,高效可扩展,容错等优点。
今天讲解一下spark的流计算,其实它也不完全是实时的流计算,算是一种准实时的流计算。
上图讲解
运行环境:需要linux环境下的spark环境
本例用的centOS 6.5x64 因为需要使用TCP协议传输数据,所以需要安装一个nc插件。
安装方式: yum install ncxxx 或者挂载光盘安装
安装后启动nc -lk 9999 端口可以随便指定,最好是1024以上的就可以。
下面贴出代码
java版本的
import java.util.Arrays;import java.util.List;import org.apache.spark.SparkConf;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import com.google.common.base.Optional;import scala.Tuple2;public class SparkDemo { public static void main(String[] args) { SparkConf conf=new SparkConf().setAppName("sparkDemo2").setMaster("local[3]"); JavaStreamingContext jsc=new JavaStreamingContext(conf,Durations.seconds(5)); //使用带状态的算子,需要checkpoint做容错处理 jsc.checkpoint("D://chkspark"); JavaReceiverInputDStreamsocketTextStream=jsc.socketTextStream("10.115.27.234", 1000); JavaDStream wordsDstream=socketTextStream.flatMap(new FlatMapFunction () { private static final long serialVersionUID=1L; public Iterable call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); JavaPairDStream wordsToPairDstream=wordsDstream.mapToPair(new PairFunction () { private static final long SerialVersionUID=1L; public Tuple2 call(String word) throws Exception { return new Tuple2 (word, 1); } }); /** * 一个batch对应一个RDD。 * */ JavaPairDStream resultDstream=wordsToPairDstream.updateStateByKey(new Function2 , Optional
, Optional >() { private static final long serialVersionUID=1L; public Optional call(List values, Optional state) throws Exception { Integer oldValue=0; //默认旧value是0 if (state.isPresent()) { oldValue=state.get(); } for (Integer value:values) { oldValue+=value; } return Optional.of(oldValue); } }); //打印结果 resultDstream.print(); jsc.start(); jsc.awaitTermination(); }}
程序测试: 从linux端的nc 下输入任意字符串,spark streaming会实时对输入的数据做出统计。类似于wordcount. 除非手动kill这个进程,否则会一直运行下去。因为它的原理就是和自来水的水流一样,是一连串的数据流。
运行结果展示:
也可以用scala写出同样的程序,代码量更少。
需要深入理解spark streaming的架构原理。
实时
数据
环境
运行
代码
原理
程序
结果
容错
输入
统计
一连串
上图
优点
光盘
内存
分布式
字符
字符串
就是
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
消费软件安装数据库
软件开发可维护性准则
服务器不锈钢外壳供应
微盛网络技术支持
广州御途网络技术
数据库查询同专业的学生
数据库中的year
ftp服务器干嘛的
帆软内置数据库怎么安装
ipfs算力服务器租凭
长沙慧享网络技术有限公司
银川灵武软件开发
什么时候实施网络安全等级
数据库使用时很卡是什么原因
北京环境监测软件开发服务
外包软件开发优势
数据库管理系统班长
全国全月网络安全竞赛
计算机网络安全管理目录
网络安全投诉举报
在旅行网络安全监督
手机应用软件开发区海边
嵌入式软件开发区鲸鱼纹身
数据库表中名字修改
服务器的管理方法有
用excel找的数据库
修改数据库服务时间
网络技术在教学中的应用文献
软件开发入门
计算机服务器操作系统分类