千家信息网

实战:Streaming data into HBase using Flum

发表于:2025-01-29 作者:千家信息网编辑
千家信息网最后更新 2025年01月29日,1. create Idea project for AsyncHbaseEventSerializer添加dependency 到pom.xmlorg.apache.flume.flume-ng-s
千家信息网最后更新 2025年01月29日实战:Streaming data into HBase using Flum

1. create Idea project for AsyncHbaseEventSerializer

添加dependency 到pom.xml

org.apache.flume.flume-ng-sinks

flume-ng-hbase-sink

1.6.0

Implements AsyncHbaseEventSerializer according to the business.

import org.apache.flume.Context;

import org.apache.flume.Event;

import org.apache.flume.conf.ComponentConfiguration;

import org.apache.flume.sink.hbase.AsyncHbaseEventSerializer;

import org.hbase.async.AtomicIncrementRequest;

import org.hbase.async.PutRequest;

import java.util.ArrayList;

import java.util.List;

/**

* Created by root on 12/5/17.

*/

public class SplittingSerializer implements AsyncHbaseEventSerializer {

private byte[] table;

private byte[] colFam;

private Event currentEvent;

private byte[][]rentRowKey;

private final byte[] eventCountCol = "eventCount".getBytes();

columnNames;

private final List puts = new ArrayList();

private final List incs = new ArrayList();

private byte[] cur

public void initialize(byte[] table, byte[] cf) {

this.table = table;

this.colFam = cf;

//Can not get the columns from context in configure method. Had to hard coded here.

columnNames = new byte[3][];

columnNames[0] = "name".getBytes();

columnNames[1] = "id".getBytes();

columnNames[2] = "phone".getBytes();

}

public void setEvent(Event event) {

// Set the event and verify that the rowKey is not present

this.currentEvent = event;

/*

//Don't know how to set the key of event header.

String rowKeyStr = currentEvent.getHeaders().get("rowKey");

if (rowKeyStr == null) {

throw new FlumeException("No row key found in headers!");

}

currentRowKey = rowKeyStr.getBytes();*/

}

public List getActions() {

// Split the event body and get the values for the columns

String eventStr = new String(currentEvent.getBody());

String[] cols = eventStr.split(",");

Long currTime = System.currentTimeMillis();

long revTs = Long.MAX_VALUE - currTime;

currentRowKey = (Long.toString(revTs) + cols[0]).getBytes();

puts.clear();

for (int i = 0; i < cols.length; i++) {

//Generate a PutRequest for each column.

PutRequest req = new PutRequest(table, currentRowKey, colFam,

columnNames[i], cols[i].getBytes());

puts.add(req);

}

return puts;

}

public List getIncrements() {

incs.clear();

//Increment the number of events received

incs.add(new AtomicIncrementRequest(table, "totalEvents".getBytes(), colFam, eventCountCol));

return incs;

}

public void cleanUp() {

table = null;

colFam = null;

currentEvent = null;

columnNames = null;

currentRowKey = null;

}

public void configure(Context context) {

//Get the column names from the configuration

//Did not work. Don't know how to use it.

String cols = new String(context.getString("columns"));

String[] names = cols.split(",");

byte[][] columnNames = new byte[names.length][];

int i = 0;

System.out.println("getting columnNames");

for(String name : names) {

columnNames[i++] = name.getBytes();

}

}

public void configure(ComponentConfiguration componentConfiguration) {

}

}

build and deploy the jar file

build --> build artifacts

copy to the lib directory of flume. Here I use scp to upload to the flume of another host.

2. configure flume

a1.sources = r1

a1.channels = c1 c2

a1.sinks = k1 sink2

a1.source.s1.selector.type = replicating

#NetCat TCP source

a1.sources.r1.type = netcat

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 6666

a1.sources.r1.channels = c1 c2

#channel

a1.channels.c2.type = memory

a1.channels.c2.capacity = 10000

a1.channels.c2.transactionCapacity = 1000

#HBase sink

a1.sinks.sink2.type = org.apache.flume.sink.hbase.AsyncHBaseSink

a1.sinks.sink2.channel = c2

a1.sinks.sink2.table = law

a1.sinks.sink2.columnFamily = lawfile

a1.sinks.sink2.batchSize = 5000

#The serializer to use

a1.sinks.sink2.serializer = ifre.flume.hbase.SplittingSerializer

#List of columns each event writes to.

a1.sinks.sink2.serializer.columns = name,id,phone

3. create hbase table

# hbase shell

create "law" "lawfile"

4. run flume agent

[root@ifrebigsearch2 apache-flume-1.6.0-bin]# bin/flume-ng agent --conf conf --conf-file conf/crawler-hdfs-conf.properties --name a1 -Dflume.root.logger=INFO,console

5. run nc

[root@ifrebigsearch0 dkh]# nc ifrebigsearch2 6666

zhangsan,10110198806054561,13812345678

OK

6.result

hbase(main):002:0> scan 'law'

ROW COLUMN+CELL

9223370524386395508z column=lawfile:id, timestamp=1512468380362, value=10110198

hangsan 806054561

9223370524386395508z column=lawfile:name, timestamp=1512468380361, value=zhangs

hangsan an

9223370524386395508z column=lawfile:phone, timestamp=1512468380363, value=13812

hangsan 345678


0