Flink1.8中如何进行流处理SocketWordCount
发表于:2024-11-19 作者:千家信息网编辑
千家信息网最后更新 2024年11月19日,本篇文章给大家分享的是有关Flink1.8中如何进行流处理SocketWordCount,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。概
千家信息网最后更新 2024年11月19日Flink1.8中如何进行流处理SocketWordCount
本篇文章给大家分享的是有关Flink1.8中如何进行流处理SocketWordCount,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
概述:
这里主要演示flink源码实例中"WordCount"程序的流窗口版本。
此程序连接到socket服务器并从socket读取字符串。最简单的尝试方法是打开一个文本服务器(在端口9999),使用netcat工具
我这里也贴一下:
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hadoop.ljs.flink.streaming;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* Implements a streaming windowed version of the "WordCount" program.
*
*
This program connects to a server socket and reads strings from the socket.
* The easiest way to try this out is to open a text server (at port 12345)
* using the netcat tool via
*
* nc -l 12345 on Linux or nc -l -p 12345 on Windows
*
* and run this example with the hostname and the port as arguments.
*/
@SuppressWarnings("serial")
public class SocketWordCount {
public static void main(String[] args) throws Exception {
// the host and the port to connect to
final String hostname;
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
hostname = params.has("hostname") ? params.get("hostname") : "localhost";
port = params.getInt("port");
/*hostname = "10.124.165.98";
port = 9999;*/
} catch (Exception e) {
System.err.println("No port specified. Please run 'SocketWordCount " +
"--hostname --port ', where hostname (localhost by default) " +
"and port is the address of the text server");
System.err.println("To start a simple text server, run 'netcat -l ' and " +
"type the input text into the command line");
return;
}
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream text = env.socketTextStream(hostname, port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream windowCounts = text
.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5))
.reduce(new ReduceFunction() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
// ------------------------------------------------------------------------
/**
* Data type for words with count.
*/
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
通过maven package打出jar包:flink191-1.0-SNAPSHOT-jar-with-dependencies
直接提交到flink在yarn中已启动的一个session中,从flink界面上传jar:
上传后,选中jar前面的复选框,可直接填写相关参数:
参数格式:--参数名 参数值 --参数名2 参数值2
参数获取是通过上面代码第49行的工具类获取(固定格式):
ParameterTool params = ParameterTool.fromArgs(args);
最后点击"Submit"按钮,提交任务运行即可。
界面也可查看日志和输出:
以上就是Flink1.8中如何进行流处理SocketWordCount,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。
参数
处理
工具
更多
服务器
格式
界面
知识
程序
篇文章
服务
实用
代码
任务
字符
字符串
实例
就是
工作会
按钮
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
其他软件开发属于什么行业分类
艾尼斯软件开发
浙江云软件开发安全
保山互联网科技在哪里
齐齐哈尔合韵软件开发
网络安全网络舆情事件及点评
有关网络安全手抄报图片全彩
宁波融翼集装箱网络技术有限公司
外卖派送系统数据库
数据库变可疑
网络安全路由服务器
计算机网络安全数据的传输方式
在线国际矿物数据库
呼和浩特网络安全周活动
网络安全300113
如何用服务器做接口
流媒体服务器和云存储对应关系
数据库安全与生活论文
金凤区政务软件开发公司贵吗
深圳市迅雷网络技术消费
文登软件开发公司电话
网络技术生态
企业网络安全参数
软件开发的都掉头发么
集中数据库类型
武汉软件开发驻场如何收费
网络安全官英文
网络安全维护难度大怎么解决
淮安无线网络技术口碑推荐
郑州西瓜软件开发