java/scala如何实现WordCount程序
发表于:2024-10-15 作者:千家信息网编辑
千家信息网最后更新 2024年10月15日,本篇内容介绍了"java/scala如何实现WordCount程序"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能
千家信息网最后更新 2024年10月15日java/scala如何实现WordCount程序
本篇内容介绍了"java/scala如何实现WordCount程序"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
程序从windows一个socket端的9999端口接收以换行符分隔的多行文本,每两秒一个时间窗口,打印字数统计。
Socket数据发送命令
window发送命令 nc -l -p 9999
linux 发送命令 nc -lk 9999
Java版本:
package com.unicom.ljs.spark220.study.streaming;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
/**
* @author: Created By lujisen
* @company ChinaUnicom Software JiNan
* @date: 2020-01-30 22:21
* @version: v1.0
* @description: com.unicom.ljs.spark220.study.streaming
*/
public class StreamingWordCount {
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamingWordCount");
/*这里JavaStreamingContext类似sparkCore的SparkContext
* 带有两个参数
* 第一个参数:SparkConf 配置
* 第二个参数: 每次收取的数据流的时间间隔 作为一个批次进行处理
*/
JavaStreamingContext jsc=new JavaStreamingContext(sparkConf, Durations.seconds(2));
/*指定从socket数据源接收数据
* 指定两个参数 1:主机名 2:端口
* window发送命令 nc -l -p 9999
* linux 发送命令 nc -lk 9999*/
JavaReceiverInputDStream
sourceDStream = jsc.socketTextStream("localhost", 9999);
/*接下来就是对每个批次就行处理 这里是每2秒钟一个批次 这样一行行的数据流都被拆分为一个个的单词流*/
JavaDStream
wordDStream = sourceDStream.flatMap(new FlatMapFunction () { @Override
public Iterator
call(String line) throws Exception { return Arrays.asList(line.split(" ")).iterator();
}
});
/*转换成 hello 1
* world 1
* a 1
* b 1 格式*/
JavaPairDStream
wordPairDStream = wordDStream.mapToPair(new PairFunction () { @Override
public Tuple2
call(String word) throws Exception { return new Tuple2<>(word, 1);
}
});
JavaPairDStream
wordCountResult = wordPairDStream.reduceByKey(new Function2 () { @Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
});
/*打印结果*/
wordCountResult.print();
/*jsc这里必须要调用start()函数application才会启动执行,接收数据*/
jsc.start();
jsc.awaitTermination();
/*停止*/
jsc.stop();
}
}
Scala版本:
package com.unicom.ljs.study.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
/**
* @author: Created By lujisen
* @company ChinaUnicom Software JiNan
* @date: 2020-01-31 08:59
* @version: v1.0
* @description: com.unicom.ljs.study.streaming
*/
object StreamingWordCount {
def main(args: Array[String]): Unit = {
/*构建SparkConf配置*/
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("StreamingWordCountScala")
val ssc =new StreamingContext(sparkConf,Seconds(2))
/*指定socket数据源*/
val sourceDStream=ssc.socketTextStream("localhost",9999)
val wordDStream=sourceDStream.flatMap(x=>x.split(" "))
val wordPairDStream=wordDStream.map(x=>(x,1))
val wordCountResult=wordPairDStream.reduceByKey(_+_)
/*打印结果*/
wordCountResult.print()
/*启动*/
ssc.start()
ssc.awaitTermination()
/*停止*/
ssc.stop()
}
}
"java/scala如何实现WordCount程序"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!
数据
命令
参数
程序
批次
接下来
两个
内容
数据流
数据源
时间
更多
版本
知识
端口
结果
处理
配置
实用
学有所成
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
软件开发中的事件总线
网络安全竖版小报
数据库怎么写查密码语句
山东金客网络技术有限公司
应用软件和网络技术
数据库可视化工具
乐宁软件开发
软件开发企业服务费分录
网络安全公司 排行榜
数据库添加列sql
企业网络安全安装
阿里的网络安全谁负责
华为软件开发红线标准
网络安全线上讲座图片大全
如何使用爬虫导出数据库
黑龙江智慧城管软件开发系统
大同的软件开发为什么少
软件开发处理器
对日软件开发的现状
机器人恢复数据库
网络技术敲代码
宁夏护网2019网络安全
华为通用软件开发工程师面试
数据库my结构
四川电脑软件开发哪家好
怎么把自己的电脑改为远程服务器
网易mc进入服务器
贵州专门做软件开发的公司
农产品残留基础数据库
服务器加速solidworks