Spark Streaming 实现数据实时统计案例
发表于:2024-12-02 作者:千家信息网编辑
千家信息网最后更新 2024年12月02日,Spark 是一个基于内存式的分布式计算框架。具有高性能,高效可扩展,容错等优点。今天讲解一下spark的流计算,其实它也不完全是实时的流计算,算是一种准实时的流计算。上图讲解运行环境:需要linux
千家信息网最后更新 2024年12月02日Spark Streaming 实现数据实时统计案例
Spark 是一个基于内存式的分布式计算框架。具有高性能,高效可扩展,容错等优点。
今天讲解一下spark的流计算,其实它也不完全是实时的流计算,算是一种准实时的流计算。
上图讲解
运行环境:需要linux环境下的spark环境
本例用的centOS 6.5x64 因为需要使用TCP协议传输数据,所以需要安装一个nc插件。
安装方式: yum install ncxxx 或者挂载光盘安装
安装后启动nc -lk 9999 端口可以随便指定,最好是1024以上的就可以。
下面贴出代码
java版本的
import java.util.Arrays;import java.util.List;import org.apache.spark.SparkConf;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import com.google.common.base.Optional;import scala.Tuple2;public class SparkDemo { public static void main(String[] args) { SparkConf conf=new SparkConf().setAppName("sparkDemo2").setMaster("local[3]"); JavaStreamingContext jsc=new JavaStreamingContext(conf,Durations.seconds(5)); //使用带状态的算子,需要checkpoint做容错处理 jsc.checkpoint("D://chkspark"); JavaReceiverInputDStreamsocketTextStream=jsc.socketTextStream("10.115.27.234", 1000); JavaDStream wordsDstream=socketTextStream.flatMap(new FlatMapFunction () { private static final long serialVersionUID=1L; public Iterable call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); JavaPairDStream wordsToPairDstream=wordsDstream.mapToPair(new PairFunction () { private static final long SerialVersionUID=1L; public Tuple2 call(String word) throws Exception { return new Tuple2 (word, 1); } }); /** * 一个batch对应一个RDD。 * */ JavaPairDStream resultDstream=wordsToPairDstream.updateStateByKey(new Function2 , Optional
, Optional >() { private static final long serialVersionUID=1L; public Optional call(List values, Optional state) throws Exception { Integer oldValue=0; //默认旧value是0 if (state.isPresent()) { oldValue=state.get(); } for (Integer value:values) { oldValue+=value; } return Optional.of(oldValue); } }); //打印结果 resultDstream.print(); jsc.start(); jsc.awaitTermination(); }}
程序测试: 从linux端的nc 下输入任意字符串,spark streaming会实时对输入的数据做出统计。类似于wordcount. 除非手动kill这个进程,否则会一直运行下去。因为它的原理就是和自来水的水流一样,是一连串的数据流。
运行结果展示:
也可以用scala写出同样的程序,代码量更少。
需要深入理解spark streaming的架构原理。
实时
数据
环境
运行
代码
原理
程序
结果
容错
输入
统计
一连串
上图
优点
光盘
内存
分布式
字符
字符串
就是
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
美国软件开发最新项目
泰坦之旅查看数据库
如何查看服务器角色和功能
税务网络安全比武
地铁逃生服务器满了该怎么进去
ip广播服务器功率
iscsi无法连接服务器
本科软件开发怎么提升
软件开发学什么方向好
近十年网络安全事件
在线网络安全测试
网络安全投资主线
网络安全 三高一弱
数据库怎么和java连接
x3450服务器cpu
数据库的服务没有打开
服务器连接后无法上网
领奇互联网科技官网
网络安全讲解小蘑菇
学校网络安全工作案例分析
软件开发部署环境
网络安全就业版百度云
iscsi无法连接服务器
小鼠基因组数据库
能连上网但是不能连接钉钉服务器
网络安全投资主线
简阳中学网络安全宣传周活动
计算机网络安全工程师选择题
国内三大全文数据库分别是
抖音精控软件开发