千家信息网

flume如何自定义source、sink

发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,这篇文章主要为大家展示了"flume如何自定义source、sink",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"flume如何自定义source、sin
千家信息网最后更新 2025年02月03日flume如何自定义source、sink

这篇文章主要为大家展示了"flume如何自定义source、sink",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"flume如何自定义source、sink"这篇文章吧。

自定义source开发:

source是收集日志存入channel。

Source提供了两种机制:PollableSource(轮训拉取)和EventDrivenSource(事件驱动),

如果使用EventDrivenSource,你可以在start方法中启动额外的线程,不断的往channel中发数据。如果使用PollableSource,你可以在process()实现不断重发。

public class MySource extends AbstractSource implements Configurable, PollableSource {  private String myProp;  @Override  public void configure(Context context) {    String myProp = context.getString("myProp", "defaultValue");    // Process the myProp value (e.g. validation, convert to another type, ...)    // Store myProp for later retrieval by process() method    this.myProp = myProp;  }  @Override  public void start() {    // Initialize the connection to the external client  }  @Override  public void stop () {    // Disconnect from external client and do any additional cleanup    // (e.g. releasing resources or nulling-out field values) ..  }  @Override  public Status process() throws EventDeliveryException {    Status status = null;    try {      // This try clause includes whatever Channel/Event operations you want to do      // Receive new data      Event e = getSomeData();      // Store the Event into this Source's associated Channel(s)      getChannelProcessor().processEvent(e);      status = Status.READY;    } catch (Throwable t) {      // Log exception, handle individual exceptions as needed      status = Status.BACKOFF;      // re-throw all Errors      if (t instanceof Error) {        throw (Error)t;      }    } finally {      txn.close();    }    return status;  }}

或者

package org.apache.flume; import org.apache.flume.conf.Configurable; import org.apache.flume.source.AbstractSource; public class TailSource extends AbstractSource implements EventDrivenSource, Configurable { @Override public void configure(Context context) { } @Override public synchronized void start() { } @Override public synchronized void stop() { } }

自定义sink:

sink是从channel中拉取日志处理。

process会不断调用,你只需在process中去取channel的数据即可。

public class MySink extends AbstractSink implements Configurable {  private String myProp;  @Override  public void configure(Context context) {    String myProp = context.getString("myProp", "defaultValue");    // Process the myProp value (e.g. validation)    // Store myProp for later retrieval by process() method    this.myProp = myProp;  }  @Override  public void start() {    // Initialize the connection to the external repository (e.g. HDFS) that    // this Sink will forward Events to ..  }  @Override  public void stop () {    // Disconnect from the external respository and do any    // additional cleanup (e.g. releasing resources or nulling-out    // field values) ..  }  @Override  public Status process() throws EventDeliveryException {    Status status = null;    // Start transaction    Channel ch = getChannel();    Transaction txn = ch.getTransaction();    txn.begin();    try {      // This try clause includes whatever Channel operations you want to do      Event event = ch.take();      // Send the Event to the external repository.      // storeSomeData(e);      txn.commit();      status = Status.READY;    } catch (Throwable t) {      txn.rollback();      // Log exception, handle individual exceptions as needed      status = Status.BACKOFF;      // re-throw all Errors      if (t instanceof Error) {        throw (Error)t;      }    }    return status;  }}

以上是"flume如何自定义source、sink"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!

0