千家信息网

java/scala如何实现WordCount程序

发表于:2024-10-15 作者:千家信息网编辑
千家信息网最后更新 2024年10月15日,本篇内容介绍了"java/scala如何实现WordCount程序"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能
千家信息网最后更新 2024年10月15日java/scala如何实现WordCount程序

本篇内容介绍了"java/scala如何实现WordCount程序"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

程序从windows一个socket端的9999端口接收以换行符分隔的多行文本,每两秒一个时间窗口,打印字数统计。

Socket数据发送命令

window发送命令 nc -l  -p  9999
linux 发送命令 nc -lk 9999

Java版本:

package com.unicom.ljs.spark220.study.streaming;
import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.function.*;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.Time;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import scala.Tuple2;
import java.util.Arrays;import java.util.Iterator;
/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-01-30 22:21 * @version: v1.0 * @description: com.unicom.ljs.spark220.study.streaming */public class StreamingWordCount { public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamingWordCount"); /*这里JavaStreamingContext类似sparkCore的SparkContext * 带有两个参数 * 第一个参数:SparkConf 配置 * 第二个参数: 每次收取的数据流的时间间隔 作为一个批次进行处理 */ JavaStreamingContext jsc=new JavaStreamingContext(sparkConf, Durations.seconds(2)); /*指定从socket数据源接收数据 * 指定两个参数 1:主机名 2:端口 * window发送命令 nc -l -p 9999 * linux 发送命令 nc -lk 9999*/
JavaReceiverInputDStream sourceDStream = jsc.socketTextStream("localhost", 9999);
/*接下来就是对每个批次就行处理 这里是每2秒钟一个批次 这样一行行的数据流都被拆分为一个个的单词流*/ JavaDStream wordDStream = sourceDStream.flatMap(new FlatMapFunction() { @Override public Iterator call(String line) throws Exception { return Arrays.asList(line.split(" ")).iterator(); } }); /*转换成 hello 1 * world 1 * a 1 * b 1 格式*/ JavaPairDStream wordPairDStream = wordDStream.mapToPair(new PairFunction() { @Override public Tuple2 call(String word) throws Exception { return new Tuple2<>(word, 1); } }); JavaPairDStream wordCountResult = wordPairDStream.reduceByKey(new Function2() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } });
/*打印结果*/ wordCountResult.print();
/*jsc这里必须要调用start()函数application才会启动执行,接收数据*/ jsc.start(); jsc.awaitTermination(); /*停止*/ jsc.stop(); }}

Scala版本:

package com.unicom.ljs.study.streaming
import org.apache.spark.SparkConfimport org.apache.spark.streaming.StreamingContextimport org.apache.spark.streaming.Seconds
/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-01-31 08:59 * @version: v1.0 * @description: com.unicom.ljs.study.streaming */object StreamingWordCount { def main(args: Array[String]): Unit = {
/*构建SparkConf配置*/ val sparkConf =new SparkConf().setMaster("local[*]").setAppName("StreamingWordCountScala") val ssc =new StreamingContext(sparkConf,Seconds(2))
/*指定socket数据源*/ val sourceDStream=ssc.socketTextStream("localhost",9999)
val wordDStream=sourceDStream.flatMap(x=>x.split(" "))
val wordPairDStream=wordDStream.map(x=>(x,1)) val wordCountResult=wordPairDStream.reduceByKey(_+_)
/*打印结果*/ wordCountResult.print() /*启动*/ ssc.start() ssc.awaitTermination() /*停止*/ ssc.stop() }}

"java/scala如何实现WordCount程序"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0