Flink中怎么自定义Redis的Sink函数
发表于:2024-11-28 作者:千家信息网编辑
千家信息网最后更新 2024年11月28日,这期内容当中小编将会给大家带来有关Flink中怎么自定义Redis的Sink函数,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。1.添加redis对应pom依赖
千家信息网最后更新 2024年11月28日Flink中怎么自定义Redis的Sink函数
这期内容当中小编将会给大家带来有关Flink中怎么自定义Redis的Sink函数,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
1.添加redis对应pom依赖
org.apache.bahir flink-connector-redis_2.11 1.0
2.主函数代码:
package com.hadoop.ljs.flink110.redis;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import scala.Tuple2;
/**
* @author: Created By lujisen
* @company ChinaUnicom Software JiNan
* @date: 2020-05-02 10:30
* @version: v1.0
* @description: com.hadoop.ljs.flink110.redis
*/
public class RedisSinkMain {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment senv =StreamExecutionEnvironment.getExecutionEnvironment();
DataStream
source = senv.socketTextStream("localhost", 9000); DataStream
filter = source.filter(new FilterFunction () { @Override
public boolean filter(String value) throws Exception {
if (null == value || value.split(",").length != 2) {
return false;
}
return true;
}
});
DataStream
> keyValue = filter.map(new MapFunction >() { @Override
public Tuple2
map(String value) throws Exception {
String[] split = value.split(",");
return new Tuple2<>(split[0], split[1]);
}
});
//创建redis的配置 单机redis用FlinkJedisPoolConfig,集群redis需要用FlinkJedisClusterConfig
FlinkJedisPoolConfig redisConf = new FlinkJedisPoolConfig.Builder().setHost("worker2.hadoop.ljs").setPort(6379).setPassword("123456a?").build();
keyValue.addSink(new RedisSink
>(redisConf, new RedisMapper >() { @Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET,"table1");
}
@Override
public String getKeyFromData(Tuple2
data) { return data._1;
}
@Override
public String getValueFromData(Tuple2
data) { return data._2;
}
}));
/*启动执行*/
senv.execute();
}
}
3.函数测试
1).window端scoket发送数据
2.redis结果验证
上述就是小编为大家分享的Flink中怎么自定义Redis的Sink函数了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。
函数
内容
分析
专业
中小
代码
内容丰富
单机
就是
数据
文章
更多
知识
篇文章
结果
行业
角度
资讯
资讯频道
集群
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
如何登陆liunx数据库
天津手机客户管理软件开发
淮安室内led大屏服务器
数据库是如何建立安全体质的
软件工程与网络安全的区别
服务器4个硬盘能分几个区
数据库开发后要学点什么
网络安全专家有用吗
qq使用代理服务器
rs260服务器管理口密码
mc服务器租用
数据库分库用什么
网站建设需要自己的服务器吗
安装sql数据库怎么收费
服务器正在搬迁
网络技术社团个人介绍
软件开发工程师 竞聘词
怎样修复数据库6
列举生活中网络安全问题
服务器如何关闭安全6
2020年网络安全试点示范项目
genbank数据库名字解释
合肥软件开发工程师培训
一起团互联网科技有限公司
银行 软件开发 招聘
网络安全的黑板报简单图案
应用软件开发行业简介
中美网络安全对话
软件开发受托方的权利
网络安全sass