flume如何自定义source、sink
发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,这篇文章主要为大家展示了"flume如何自定义source、sink",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"flume如何自定义source、sin
千家信息网最后更新 2025年02月03日flume如何自定义source、sink
这篇文章主要为大家展示了"flume如何自定义source、sink",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"flume如何自定义source、sink"这篇文章吧。
自定义source开发:
source是收集日志存入channel。
Source提供了两种机制:PollableSource(轮训拉取)和EventDrivenSource(事件驱动),
如果使用EventDrivenSource,你可以在start方法中启动额外的线程,不断的往channel中发数据。如果使用PollableSource,你可以在process()实现不断重发。
public class MySource extends AbstractSource implements Configurable, PollableSource { private String myProp; @Override public void configure(Context context) { String myProp = context.getString("myProp", "defaultValue"); // Process the myProp value (e.g. validation, convert to another type, ...) // Store myProp for later retrieval by process() method this.myProp = myProp; } @Override public void start() { // Initialize the connection to the external client } @Override public void stop () { // Disconnect from external client and do any additional cleanup // (e.g. releasing resources or nulling-out field values) .. } @Override public Status process() throws EventDeliveryException { Status status = null; try { // This try clause includes whatever Channel/Event operations you want to do // Receive new data Event e = getSomeData(); // Store the Event into this Source's associated Channel(s) getChannelProcessor().processEvent(e); status = Status.READY; } catch (Throwable t) { // Log exception, handle individual exceptions as needed status = Status.BACKOFF; // re-throw all Errors if (t instanceof Error) { throw (Error)t; } } finally { txn.close(); } return status; }}
或者
package
org.apache.flume;
import
org.apache.flume.conf.Configurable;
import
org.apache.flume.source.AbstractSource;
public
class
TailSource
extends
AbstractSource
implements
EventDrivenSource,
Configurable {
@Override
public
void
configure(Context context) {
}
@Override
public
synchronized
void
start() {
}
@Override
public
synchronized
void
stop() {
}
}
自定义sink:
sink是从channel中拉取日志处理。
process会不断调用,你只需在process中去取channel的数据即可。
public class MySink extends AbstractSink implements Configurable { private String myProp; @Override public void configure(Context context) { String myProp = context.getString("myProp", "defaultValue"); // Process the myProp value (e.g. validation) // Store myProp for later retrieval by process() method this.myProp = myProp; } @Override public void start() { // Initialize the connection to the external repository (e.g. HDFS) that // this Sink will forward Events to .. } @Override public void stop () { // Disconnect from the external respository and do any // additional cleanup (e.g. releasing resources or nulling-out // field values) .. } @Override public Status process() throws EventDeliveryException { Status status = null; // Start transaction Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try { // This try clause includes whatever Channel operations you want to do Event event = ch.take(); // Send the Event to the external repository. // storeSomeData(e); txn.commit(); status = Status.READY; } catch (Throwable t) { txn.rollback(); // Log exception, handle individual exceptions as needed status = Status.BACKOFF; // re-throw all Errors if (t instanceof Error) { throw (Error)t; } } return status; }}
以上是"flume如何自定义source、sink"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!
e.g.
不断
内容
篇文章
数据
日志
学习
帮助
事件
只需
方法
易懂
更多
机制
条理
知识
线程
编带
行业
资讯
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
每台linux电脑都是服务器吗
国家网络安全中心黄
社区开展网络安全知识宣讲活动
找到元器件数据库
sql数据库安全性脚本
软件技术和网络技术哪个难
html的服务器字体是什么
杭州戴尔服务器生产厂商
数据库中主键自动递增
数据库中重复键是什么
网络安全硕士学校排名
合集小说软件开发
ios清空流量数据库
中国移动软件开发工程师二面
无线网络技术锐捷实验
数据库.ppt
3g手机软件开发培训
学校网络安全教育讲座报道
小学三年级介绍网络安全
冰点还原服务器快捷方式
新型智慧城市网络安全工作情况
为什么服务器没有重启
找到元器件数据库
珠海移动软件开发哪家好
传奇sf服务器系统
服务器 数据服务
餐饮软件开发
如何备份两个服务器数据
数据库级安全性
苹果六怎么信任软件开发者