java/scala如何实现WordCount程序
发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,本篇内容介绍了"java/scala如何实现WordCount程序"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能
千家信息网最后更新 2025年02月03日java/scala如何实现WordCount程序
本篇内容介绍了"java/scala如何实现WordCount程序"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
程序从windows一个socket端的9999端口接收以换行符分隔的多行文本,每两秒一个时间窗口,打印字数统计。
Socket数据发送命令
window发送命令 nc -l -p 9999
linux 发送命令 nc -lk 9999
Java版本:
package com.unicom.ljs.spark220.study.streaming;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
/**
* @author: Created By lujisen
* @company ChinaUnicom Software JiNan
* @date: 2020-01-30 22:21
* @version: v1.0
* @description: com.unicom.ljs.spark220.study.streaming
*/
public class StreamingWordCount {
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamingWordCount");
/*这里JavaStreamingContext类似sparkCore的SparkContext
* 带有两个参数
* 第一个参数:SparkConf 配置
* 第二个参数: 每次收取的数据流的时间间隔 作为一个批次进行处理
*/
JavaStreamingContext jsc=new JavaStreamingContext(sparkConf, Durations.seconds(2));
/*指定从socket数据源接收数据
* 指定两个参数 1:主机名 2:端口
* window发送命令 nc -l -p 9999
* linux 发送命令 nc -lk 9999*/
JavaReceiverInputDStream
sourceDStream = jsc.socketTextStream("localhost", 9999);
/*接下来就是对每个批次就行处理 这里是每2秒钟一个批次 这样一行行的数据流都被拆分为一个个的单词流*/
JavaDStream
wordDStream = sourceDStream.flatMap(new FlatMapFunction () { @Override
public Iterator
call(String line) throws Exception { return Arrays.asList(line.split(" ")).iterator();
}
});
/*转换成 hello 1
* world 1
* a 1
* b 1 格式*/
JavaPairDStream
wordPairDStream = wordDStream.mapToPair(new PairFunction () { @Override
public Tuple2
call(String word) throws Exception { return new Tuple2<>(word, 1);
}
});
JavaPairDStream
wordCountResult = wordPairDStream.reduceByKey(new Function2 () { @Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
});
/*打印结果*/
wordCountResult.print();
/*jsc这里必须要调用start()函数application才会启动执行,接收数据*/
jsc.start();
jsc.awaitTermination();
/*停止*/
jsc.stop();
}
}
Scala版本:
package com.unicom.ljs.study.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
/**
* @author: Created By lujisen
* @company ChinaUnicom Software JiNan
* @date: 2020-01-31 08:59
* @version: v1.0
* @description: com.unicom.ljs.study.streaming
*/
object StreamingWordCount {
def main(args: Array[String]): Unit = {
/*构建SparkConf配置*/
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("StreamingWordCountScala")
val ssc =new StreamingContext(sparkConf,Seconds(2))
/*指定socket数据源*/
val sourceDStream=ssc.socketTextStream("localhost",9999)
val wordDStream=sourceDStream.flatMap(x=>x.split(" "))
val wordPairDStream=wordDStream.map(x=>(x,1))
val wordCountResult=wordPairDStream.reduceByKey(_+_)
/*打印结果*/
wordCountResult.print()
/*启动*/
ssc.start()
ssc.awaitTermination()
/*停止*/
ssc.stop()
}
}
"java/scala如何实现WordCount程序"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!
数据
命令
参数
程序
批次
接下来
两个
内容
数据流
数据源
时间
更多
版本
知识
端口
结果
处理
配置
实用
学有所成
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
睿儿网络技术公社
正规的浪潮存储服务器零售
酒店软件开发票怎么开
互联网科技论坛网
世界服务器游戏
三维数据库管理员
网银转账说数据库表插入错误
浙江计算机软件开发项目管理
html借助软件开发
3g网络技术就业前景
崇明区什么是网络安全服务至上
江苏特种网络技术分类服务标准
集中式数据库 局域網
长城汽车网络安全规划
ACM数据库怎么引用文献
华为云服务器学生礼包
长江云同上一堂网络安全课
asp 创建数据库表
吴忠科技型网站服务器
方舟手游高科技服务器
河北廊坊棋牌游戏软件开发公司
正规的浪潮存储服务器零售
互联网科技公司怎么运营
国家网络安全宣传周的目的
质量好的存储服务器批发
网络安全检查图片
选择要连接的服务器
计算机网络技术有没有未来
活动直播软件开发
pdf图像导出数据库