Flink中怎么自定义Redis的Sink函数
发表于:2025-01-30 作者:千家信息网编辑
千家信息网最后更新 2025年01月30日,这期内容当中小编将会给大家带来有关Flink中怎么自定义Redis的Sink函数,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。1.添加redis对应pom依赖
千家信息网最后更新 2025年01月30日Flink中怎么自定义Redis的Sink函数
这期内容当中小编将会给大家带来有关Flink中怎么自定义Redis的Sink函数,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
1.添加redis对应pom依赖
org.apache.bahir flink-connector-redis_2.11 1.0
2.主函数代码:
package com.hadoop.ljs.flink110.redis;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import scala.Tuple2;
/**
* @author: Created By lujisen
* @company ChinaUnicom Software JiNan
* @date: 2020-05-02 10:30
* @version: v1.0
* @description: com.hadoop.ljs.flink110.redis
*/
public class RedisSinkMain {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment senv =StreamExecutionEnvironment.getExecutionEnvironment();
DataStream
source = senv.socketTextStream("localhost", 9000); DataStream
filter = source.filter(new FilterFunction () { @Override
public boolean filter(String value) throws Exception {
if (null == value || value.split(",").length != 2) {
return false;
}
return true;
}
});
DataStream
> keyValue = filter.map(new MapFunction >() { @Override
public Tuple2
map(String value) throws Exception {
String[] split = value.split(",");
return new Tuple2<>(split[0], split[1]);
}
});
//创建redis的配置 单机redis用FlinkJedisPoolConfig,集群redis需要用FlinkJedisClusterConfig
FlinkJedisPoolConfig redisConf = new FlinkJedisPoolConfig.Builder().setHost("worker2.hadoop.ljs").setPort(6379).setPassword("123456a?").build();
keyValue.addSink(new RedisSink
>(redisConf, new RedisMapper >() { @Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET,"table1");
}
@Override
public String getKeyFromData(Tuple2
data) { return data._1;
}
@Override
public String getValueFromData(Tuple2
data) { return data._2;
}
}));
/*启动执行*/
senv.execute();
}
}
3.函数测试
1).window端scoket发送数据
2.redis结果验证
上述就是小编为大家分享的Flink中怎么自定义Redis的Sink函数了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。
函数
内容
分析
专业
中小
代码
内容丰富
单机
就是
数据
文章
更多
知识
篇文章
结果
行业
角度
资讯
资讯频道
集群
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
新乡安卓软件开发
服务器返回状态500什么意思
内存数据库的关键技术
网络安全攻防两天可以学会不
闵行区软件开发预算
公众号互联网科技风
数据库怎样设置当前系统时间
k30s没有上小白测评数据库
和平精英充值服务器失败怎么办
网络安全超级简单又漂亮的手抄报
有关时间分配的调查数据库
春考计算机网络技术技能测试
网络安全小语录
如何一次性百度出所有数据库
携程旅游数据库
魔兽世界数据库表
曙光服务器a840r-g
静安区创新数据库有哪些
软件开发人员应聘问题
软件开发叫设计吗
中心站服务器是什么意思
数据库关系模式是什么
vs生成数据库连接串
服务器网卡有单模和多模区分吗
广州物流软件开发要多少钱
网络安全攻防两天可以学会不
玉溪文山互联网科技
莆田购呗网络技术有限公司
27了想干软件开发
net软件开发人员