Flink1.8中如何进行流处理SocketWordCount
发表于:2025-02-19 作者:千家信息网编辑
千家信息网最后更新 2025年02月19日,本篇文章给大家分享的是有关Flink1.8中如何进行流处理SocketWordCount,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。概
千家信息网最后更新 2025年02月19日Flink1.8中如何进行流处理SocketWordCount
本篇文章给大家分享的是有关Flink1.8中如何进行流处理SocketWordCount,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
概述:
这里主要演示flink源码实例中"WordCount"程序的流窗口版本。
此程序连接到socket服务器并从socket读取字符串。最简单的尝试方法是打开一个文本服务器(在端口9999),使用netcat工具
我这里也贴一下:
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hadoop.ljs.flink.streaming;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* Implements a streaming windowed version of the "WordCount" program.
*
*
This program connects to a server socket and reads strings from the socket.
* The easiest way to try this out is to open a text server (at port 12345)
* using the netcat tool via
*
* nc -l 12345 on Linux or nc -l -p 12345 on Windows
*
* and run this example with the hostname and the port as arguments.
*/
@SuppressWarnings("serial")
public class SocketWordCount {
public static void main(String[] args) throws Exception {
// the host and the port to connect to
final String hostname;
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
hostname = params.has("hostname") ? params.get("hostname") : "localhost";
port = params.getInt("port");
/*hostname = "10.124.165.98";
port = 9999;*/
} catch (Exception e) {
System.err.println("No port specified. Please run 'SocketWordCount " +
"--hostname --port ', where hostname (localhost by default) " +
"and port is the address of the text server");
System.err.println("To start a simple text server, run 'netcat -l ' and " +
"type the input text into the command line");
return;
}
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream text = env.socketTextStream(hostname, port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream windowCounts = text
.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5))
.reduce(new ReduceFunction() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
// ------------------------------------------------------------------------
/**
* Data type for words with count.
*/
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
通过maven package打出jar包:flink191-1.0-SNAPSHOT-jar-with-dependencies
直接提交到flink在yarn中已启动的一个session中,从flink界面上传jar:
上传后,选中jar前面的复选框,可直接填写相关参数:
参数格式:--参数名 参数值 --参数名2 参数值2
参数获取是通过上面代码第49行的工具类获取(固定格式):
ParameterTool params = ParameterTool.fromArgs(args);
最后点击"Submit"按钮,提交任务运行即可。
界面也可查看日志和输出:
以上就是Flink1.8中如何进行流处理SocketWordCount,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。
参数
处理
工具
更多
服务器
格式
界面
知识
程序
篇文章
服务
实用
代码
任务
字符
字符串
实例
就是
工作会
按钮
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库意向锁的概念
网络安全钥密不对
软件开发培训班百度云
阿里云服务器国际版
360安全云盘服务器升级
海康磁盘阵列服务器连接录像机
精致网络安全手抄报简单字又少
常州互联网软件开发来电咨询
大型网站服务器配置
tanium网络安全
服务器管理地址查看
多个数据库怎么控制事务
云云服务器申请移动域名
公安内部网络安全汇报
嘉乐软件开发工作室游戏攻略
福建工业软件开发零售价
密云区进口软件开发差异
网络安全是几月几日
门头沟区网络技术咨询概况
思修网络安全对生活的影响
北京编程软件开发机构
服务器开机没电源
广州安卓软件开发价格表
夏门凌耀网络技术有限公司
软件开发各阶段输出物
中国当先的网络技术
最好的网络技术论坛
深圳联想服务器维修服务
拜登市政府网络安全
湖北同力辰网络技术有限公司