Flink1.8中如何进行流处理SocketWordCount
发表于:2025-02-19 作者:千家信息网编辑
千家信息网最后更新 2025年02月19日,本篇文章给大家分享的是有关Flink1.8中如何进行流处理SocketWordCount,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。概
千家信息网最后更新 2025年02月19日Flink1.8中如何进行流处理SocketWordCount
本篇文章给大家分享的是有关Flink1.8中如何进行流处理SocketWordCount,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
概述:
这里主要演示flink源码实例中"WordCount"程序的流窗口版本。
此程序连接到socket服务器并从socket读取字符串。最简单的尝试方法是打开一个文本服务器(在端口9999),使用netcat工具
我这里也贴一下:
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hadoop.ljs.flink.streaming;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* Implements a streaming windowed version of the "WordCount" program.
*
*
This program connects to a server socket and reads strings from the socket.
* The easiest way to try this out is to open a text server (at port 12345)
* using the netcat tool via
*
* nc -l 12345 on Linux or nc -l -p 12345 on Windows
*
* and run this example with the hostname and the port as arguments.
*/
@SuppressWarnings("serial")
public class SocketWordCount {
public static void main(String[] args) throws Exception {
// the host and the port to connect to
final String hostname;
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
hostname = params.has("hostname") ? params.get("hostname") : "localhost";
port = params.getInt("port");
/*hostname = "10.124.165.98";
port = 9999;*/
} catch (Exception e) {
System.err.println("No port specified. Please run 'SocketWordCount " +
"--hostname --port ', where hostname (localhost by default) " +
"and port is the address of the text server");
System.err.println("To start a simple text server, run 'netcat -l ' and " +
"type the input text into the command line");
return;
}
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream text = env.socketTextStream(hostname, port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream windowCounts = text
.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5))
.reduce(new ReduceFunction() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
// ------------------------------------------------------------------------
/**
* Data type for words with count.
*/
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
通过maven package打出jar包:flink191-1.0-SNAPSHOT-jar-with-dependencies
直接提交到flink在yarn中已启动的一个session中,从flink界面上传jar:
上传后,选中jar前面的复选框,可直接填写相关参数:
参数格式:--参数名 参数值 --参数名2 参数值2
参数获取是通过上面代码第49行的工具类获取(固定格式):
ParameterTool params = ParameterTool.fromArgs(args);
最后点击"Submit"按钮,提交任务运行即可。
界面也可查看日志和输出:
以上就是Flink1.8中如何进行流处理SocketWordCount,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。
参数
处理
工具
更多
服务器
格式
界面
知识
程序
篇文章
服务
实用
代码
任务
字符
字符串
实例
就是
工作会
按钮
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
简答题 网络安全
模特图写真台湾服务器
风豹互联网科技跟既有
怎么管理服务器集群
网络安全整治手抄报
客户端编程软件开发
天幕网络安全攻防平台
浪潮c 软件开发面试题
集群服务数据库
吉林品质软件开发服务优化
软件开发创业做什么
金蝶数据库文件在哪
宜兴现代软件开发设计规范
芜湖苹果软件开发公司排名
桓台纺织软件开发报价
服务器 病毒防护
叮丁网络技术有限公司
江苏企业软件开发服务优化
传播正能量 网络安全
迅雷会员离线下载服务器
网页随机读取数据库
云服务器稳定
物流软件开发零基础实习生
网吧服务器硬盘有什么文件
黑龙江云端服务器租用云主机
信息网络技术实践报告
上海常用软件开发服务价格
电脑建立图书馆数据库
天涯明月刀手游端游服务器
安徽省网络安全等级保护测评