千家信息网

Flink入门wordCount

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,Flink的编程模型1、获取Flink上下文;ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();2、加
千家信息网最后更新 2025年01月24日Flink入门wordCount

Flink的编程模型
1、获取Flink上下文;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
2、加载、创建数据;
DataSet
3、数据转换;
Transformation
4、数据结果存放;
5、触发执行。
env.execution

下面为flink输出wordcount数据:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class FlinkMain {

@SuppressWarnings("serial")public static class LineSplit implements FlatMapFunction>{    @SuppressWarnings("rawtypes")    @Override    /**     * @param value 原数据     * @param out 输出的数据     */    public void flatMap(String value, Collector> out) throws Exception {        String[] tokens = value.split(" ");        for (String token : tokens) {            if(token!=null && token.length()>0){                Tuple2 t = new Tuple2(token,1);                out.collect(t);            }        }    }}public static void main(String[] args) throws Exception {    //创建flink上下文    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();    //创建数据集    DataSet text = env.fromElements("to be","or no to be","is question");    //对数据集转换    DataSet> count = text.flatMap(new LineSplit());    //输出转换后的数据集(print中包含了env.execute执行)    count.print();    System.out.println("-----------------------");    //对数据集分组统计转换,0,1是下标,对应Tuple2类中的参数    count = count.groupBy(0).sum(1);    //控制台输出数据集    count.print();    System.out.println("-----------------------");}

}

Flink使用sql方式转换数据
import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;

public class FlinkMain2 {

@SuppressWarnings({ "unchecked", "rawtypes" })public static void main(String[] args) throws Exception {    //创建flink上下文    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();    BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);    List list = new ArrayList();    String workStr = "to be or no to be is question";    String[] tokens = workStr.split(" ");    for (String token : tokens) {        if(token!=null && token.length()>0){            list.add( new WordCount(token,1));        }    }    //创建数据集    DataSet input = env.fromCollection(list);    //注册为数据表wordCount为数据库表,word,frequency为wordCount表字段    tEnv.registerDataSet("wordCount", input, "word, frequency");    Table table = tEnv.sqlQuery(" SELECT word, SUM(frequency) as frequency FROM wordCount GROUP BY word" );    DataSet res = tEnv.toDataSet(table, WordCount.class);    //控制台输出    res.print();}public static class WordCount    {    public String word;    public long frequency;    public WordCount(){}    public WordCount(String word, long frequency) {        this.word = word;        this.frequency = frequency;    }    @Override    public String toString() {        return "词语:" + word + ",词频:" + frequency;    }}

}

数据 输出 上下 上下文 控制台 控制 下标 参数 字段 数据库 数据表 方式 模型 结果 词语 词频 中包 分组 统计 编程 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 服务器维护与管理是什么 沧州租房软件开发 戴尔t30服务器E3 腾讯云 新建数据库无权限 阿拉伯语输入法软件开发 苹果8无法连接激活服务器怎么办 非关系模型数据库 我的世界宝可梦服务器模组下载 公司内网的svn服务器怎么加速 学习网络技术有哪些方面 花溪租房软件开发 斗罗大陆手游服务器什么时候开服 服务器安全检测工具有吗 服务器开机4短响 成都软件开发工资多少钱 打印服务器能管理几台打印机 计算器网络技术工资 数据库系统如何转移 监控服务器价格 最适合面向对象的软件开发 三菱传输软件无法连接服务器 崇明区智能软件开发参考价格 天下3人物数据库 网络安全的标语11条 技术支持类网络安全关键岗位 某app功能对应数据库的概念 中国网络安全与管理专业就业 北京网络技术有限公司在哪 公司通讯录怎么建立数据库 数据库开发社区
0