
如何实现Flune Client 开发

发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,如何实现Flune Client 开发,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。由于在实际工作中,数据的生产方式极具
千家信息网最后更新 2025年02月03日如何实现Flune Client 开发

如何实现Flune Client 开发,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。

由于在实际工作中,数据的生产方式极具多样性,Flume 虽然包含了一些内置的机制来采集数据,但是更多的时候用户更希望能将应用程序和flume直接相通。所以这边运行用户开发应用程序,通过IPC或者RPC连接flume并往flume发送数据。

一、RPC client interface

Flume的RpcClient实现了Flume的RPC机制。用户的应用程序可以很简单的调用Flume Client SDK的append(Event) 或者appendBatch(List) 方法发送数据,不用担心底层信息交换的细节。用户可以提供所需的event通过直接实现Event接口,例如可以使用简单的方便的实现SimpleEvent类或者使用EventBuilder的writeBody()静态辅助方法。

自Flume 1.4.0起,Avro是默认的RPC协议。NettyAvroRpcClient和ThriftRpcClient实现了RpcClient接口。实现中我们需要知道我们将要连接的目标flume agent的host和port用于创建client实例,然后使用RpcClient发送数据到flume agent。

官网给了一个Avro RPCclients的例子,这边直接拿来做实际测试例子。


改成 client.init("",50000); 与我们的主机对接

[java] view plain copy

  1. import org.apache.flume.Event;

  2. import org.apache.flume.EventDeliveryException;

  3. import org.apache.flume.api.RpcClient;

  4. import org.apache.flume.api.RpcClientFactory;

  5. import org.apache.flume.event.EventBuilder;

  6. import java.nio.charset.Charset;

  7. public class MyApp {

  8. public static voidmain(String[] args) {

  9. MyRpcClientFacade client = new MyRpcClientFacade();

  10. // Initializeclient with the remote Flume agent's host and port

  11. //client.init("host.example.org",41414);

  12. client.init("",50000);

  13. // Send 10events to the remote Flume agent. That agent should be

  14. // configured tolisten with an AvroSource.

  15. String sampleData = "Hello Flume!";

  16. for (int i =0; i < 10; i++) {

  17. client.sendDataToFlume(sampleData);

  18. }

  19. client.cleanUp();

  20. }

  21. }

  22. class MyRpcClientFacade {

  23. private RpcClient client;

  24. private String hostname;

  25. private int port;

  26. public void init(String hostname, int port) {

  27. // Setup the RPCconnection

  28. this.hostname = hostname;

  29. this.port = port;

  30. this.client = RpcClientFactory.getDefaultInstance(hostname, port);

  31. // Use thefollowing method to create a thrift client (instead of the above line):

  32. // this.client = RpcClientFactory.getThriftInstance(hostname, port);

  33. }

  34. public void sendDataToFlume(String data) {

  35. // Create aFlume Event object that encapsulates the sample data

  36. Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));

  37. // Send theevent

  38. try {

  39. client.append(event);

  40. } catch (EventDeliveryException e) {

  41. // clean up andrecreate the client

  42. client.close();

  43. client = null;

  44. client = RpcClientFactory.getDefaultInstance(hostname, port);

  45. // Use thefollowing method to create a thrift client (instead of the above line):

  46. // this.client =RpcClientFactory.getThriftInstance(hostname, port);

  47. }

  48. }

  49. public void cleanUp() {

  50. // Close the RPCconnection

  51. client.close();

  52. }

  53. }

这边代码不解释了,主要是将HelloFlume 发送10遍给flume,同时记得将flume 安装主目录下的lib 文件都添加进项目,才能正常运行程序。


[html] view plain copy

  1. #配置文件:avro_client_case20.conf

  2. # Name the components on this agent

  3. a1.sources = r1

  4. a1.sinks = k1

  5. a1.channels = c1

  6. # Describe/configure the source

  7. a1.sources.r1.type = avro

  8. a1.sources.r1.port = 50000

  9. a1.sources.r1.host =

  10. a1.sources.r1.channels = c1

  11. # Describe the sink

  12. a1.sinks.k1.channel = c1

  13. a1.sinks.k1.type = logger

  14. # Use a channel which buffers events inmemory

  15. a1.channels.c1.type = memory

  16. a1.channels.c1.capacity = 1000

  17. a1.channels.c1.transactionCapacity = 100

这里要注意下,之前说了,在接收端需要AvroSource或者Thrift Source来监听接口。所以配置代理的时候要把a1.sources.r1.type 写成avro或者thrift


flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console


在eclipse 里运行Java程序,当然也可以打包后在服务器上运行JAVA程序。



这里要说明下,开发代码中client.append(event)不仅仅可以发送一条数据,也可以发送一个List(string) 的数据信息,也就是批量发送。这边就不做演示了。

二、Failover Client

这个类包封装了Avro RPCclient的类默认提供故障处理能力。hosts采用空格分开host:port所代表的flume agent,构成一个故障处理组。这Failover RPC Client目前不支持thrift。如果当前选择的host agent有问题,这个failover client会自动负载到组中下一个host中。


[java] view plain copy

  1. // Setup properties for the failover

  2. Properties props = new Properties();

  3. props.put("client.type", "default_failover");

  4. // List of hosts (space-separated list of user-chosen host aliases)

  5. props.put("hosts", "h2 h3 h4");

  6. // host/port pair for each host alias

  7. String host1 = "host1.example.org:41414";

  8. String host2 = "host2.example.org:41414";

  9. String host3 = "host3.example.org:41414";

  10. props.put("hosts.h2", host1);

  11. props.put("hosts.h3", host2);

  12. props.put("hosts.h4", host3);

  13. // create the client with failover properties

  14. RpcClient client = RpcClientFactory.getInstance(props);


[java] view plain copy

  1. import org.apache.flume.Event;

  2. import org.apache.flume.EventDeliveryException;

  3. import org.apache.flume.api.RpcClient;

  4. import org.apache.flume.api.RpcClientFactory;

  5. import org.apache.flume.event.EventBuilder;

  6. import java.nio.charset.Charset;

  7. import java.util.Properties;

  8. public class Failover_Client {

  9. public static void main(String[] args) {

  10. MyRpcClientFacade2 client = new MyRpcClientFacade2();

  11. // Initialize client with the remote Flume agent's host and port

  12. client.init();

  13. // Send 10 events to the remote Flume agent. That agent should be

  14. // configured to listen with an AvroSource.

  15. String sampleData = "Hello Flume!";

  16. for (int i = 0; i < 10; i++) {

  17. client.sendDataToFlume(sampleData);

  18. }

  19. client.cleanUp();

  20. }

  21. }

  22. class MyRpcClientFacade2 {

  23. private RpcClient client;

  24. private String hostname;

  25. private int port;

  26. public void init() {

  27. // Setup the RPC connection

  28. // Use the following method to create a thrift client (instead of the above line):

  29. // this.client = RpcClientFactory.getThriftInstance(hostname, port);

  30. // Setup properties for the failover

  31. Properties props = new Properties();

  32. props.put("client.type", "default_failover");

  33. // List of hosts (space-separated list of user-chosen host aliases)

  34. props.put("hosts", "h2 h3 h4");

  35. // host/port pair for each host alias

  36. String host1 = "";

  37. String host2 = "";

  38. String host3 = "";

  39. props.put("hosts.h2", host1);

  40. props.put("hosts.h3", host2);

  41. props.put("hosts.h4", host3);

  42. // create the client with failover properties

  43. client = RpcClientFactory.getInstance(props);

  44. }

  45. public void sendDataToFlume(String data) {

  46. // Create a Flume Event object that encapsulates the sample data

  47. Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));

  48. // Send the event

  49. try {

  50. client.append(event);

  51. } catch (EventDeliveryException e) {

  52. // clean up and recreate the client

  53. client.close();

  54. client = null;

  55. client = RpcClientFactory.getDefaultInstance(hostname, port);

  56. // Use the following method to create a thrift client (instead of the above line):

  57. // this.client = RpcClientFactory.getThriftInstance(hostname, port);

  58. }

  59. }

  60. public void cleanUp() {

  61. // Close the RPC connection

  62. client.close();

  63. }

  64. }

这边代码设三个host用于故障转移,这里偷懒,用同一个主机的3个端口模拟。代码还是将Hello Flume 发送10遍给第一个flume代理,当第一个代理故障的时候,则发送给第二个代理,以顺序进行故障转移。


cp avro_client_case20.conf avro_client_case21.conf

cp avro_client_case20.conf avro_client_case22.conf


a1.sources.r1.port= 50001 与a1.sources.r1.port = 50002


flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case21.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case22.conf-n a1 -Dflume.root.logger=INFO,console


在eclipse 里运行JAVA程序Failover_Client.java,当然也可以打包后在服务器上运行JAVA程序。




三、LoadBalancing RPC client

Flume Client SDK也支持在多个host之间使用负载均衡的Rpc Client。这种类型的client带有一个通过空格分隔的host:port主机列表并构成了一个负载均衡组。这个client可以指定一个负载均衡的策略,既可以随机的选择一个配置的host,也可以循环选择一个host。当然你也可以自己编写一个类实现LoadBalancingRpcClient$HostSelector接口以至于用户可以使用自己编写的选择顺序。在这种情况下,用户自定义的类需要被指定为host-selector属性的值。LoadBalancing RPC Client当前不支持thrift。




[java] view plain copy

  1. // Setup properties for the load balancing

  2. Properties props = new Properties();

  3. props.put("client.type", "default_loadbalance");

  4. // List of hosts (space-separated list of user-chosen host aliases)

  5. props.put("hosts", "h2 h3 h4");

  6. // host/port pair for each host alias

  7. String host1 = "host1.example.org:41414";

  8. String host2 = "host2.example.org:41414";

  9. String host3 = "host3.example.org:41414";

  10. props.put("hosts.h2", host1);

  11. props.put("hosts.h3", host2);

  12. props.put("hosts.h4", host3);

  13. props.put("host-selector", "random"); // For random host selection

  14. // props.put("host-selector", "round_robin"); // For round-robin host

  15. // // selection

  16. props.put("backoff", "true"); // Disabled by default.

  17. props.put("maxBackoff", "10000"); // Defaults 0, which effectively

  18. // becomes 30000 ms

  19. // Create the client with load balancing properties

  20. RpcClient client = RpcClientFactory.getInstance(props);


[java] view plain copy

  1. import java.nio.charset.Charset;

  2. import org.apache.flume.Event;

  3. import org.apache.flume.EventDeliveryException;

  4. import org.apache.flume.api.RpcClient;

  5. import org.apache.flume.api.RpcClientFactory;

  6. import org.apache.flume.event.EventBuilder;

  7. import java.util.Properties;

  8. public class Load_Client {

  9. public static void main(String[] args) {

  10. MyRpcClientFacade3 client = new MyRpcClientFacade3();

  11. // Initialize client with the remote Flume agent's host and port

  12. client.init();

  13. // Send 10 events to the remote Flume agent. That agent should be

  14. // configured to listen with an AvroSource.

  15. String sampleData = "Flume Load_Client";

  16. for (int i = 0; i < 10; i++) {

  17. client.sendDataToFlume(sampleData);

  18. }

  19. client.cleanUp();

  20. }

  21. }

  22. class MyRpcClientFacade3{

  23. private RpcClient client;

  24. private String hostname;

  25. private int port;

  26. public void init() {

  27. Properties props = new Properties();

  28. props.put("client.type", "default_loadbalance");

  29. // List of hosts (space-separated list of user-chosen host aliases)

  30. props.put("hosts", "h2 h3 h4");

  31. // host/port pair for each host alias

  32. String host1 = "";

  33. String host2 = "";

  34. String host3 = "";

  35. props.put("hosts.h2", host1);

  36. props.put("hosts.h3", host2);

  37. props.put("hosts.h4", host3);

  38. props.put("host-selector", "random"); // For random host selection

  39. // props.put("host-selector", "round_robin"); // For round-robin host

  40. // // selection

  41. props.put("backoff", "true"); // Disabled by default.

  42. props.put("maxBackoff", "10000"); // Defaults 0, which effectively

  43. // becomes 30000 ms

  44. // Create the client with load balancing properties

  45. client = RpcClientFactory.getInstance(props);

  46. }

  47. public void sendDataToFlume(String data) {

  48. // Create a Flume Event object that encapsulates the sample data

  49. Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));

  50. // Send the event

  51. try {

  52. client.append(event);

  53. } catch (EventDeliveryException e) {

  54. // clean up and recreate the client

  55. client.close();

  56. client = null;

  57. client = RpcClientFactory.getDefaultInstance(hostname, port);

  58. // Use the following method to create a thrift client (instead of the above line):

  59. // this.client = RpcClientFactory.getThriftInstance(hostname, port);

  60. }

  61. }

  62. public void cleanUp() {

  63. // Close the RPC connection

  64. client.close();

  65. }

  66. }

这里采用随机的负载均衡props.put("host-selector","random") 。测试的时候沿用之前的3个接受代理配置avro_client_case20.conf、avro_client_case21.conf和avro_client_case22.conf,并将他们起起来。


flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case21.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case22.conf-n a1 -Dflume.root.logger=INFO,console


在eclipse 里运行JAVA程序Failover_Client.java,当然也可以打包后在服务器上运行JAVA程序。







//props.put("host-selector","random"); // For random host selection

props.put("host-selector", "round_robin");// Forround-robin host

再运行Java 程序



