While you are trying to put Millions and even billions of key-values into HBase from your MR job, you can feel, even TableOutPutFormat is not that much efficient.
In such cases you can use HBase's Bulk load feature, which is tremendously faster than TableOutPutFormat.
The bulk load feature uses a MapReduce job to output table data in HBase's internal data format, and then directly loads the generated StoreFiles into a running cluster.
The process consists of 2 main steps.
To achieve the same we need to change the OutPutFormat class of our MR job to HFileOutputFormat, which writes out data in HBase's internal storage format.
The following are the main changes that you have to make in your MR job,
.....
mapRedJob.setMapOutputKeyClass(ImmutableBytesWritable.class);
mapRedJob.setMapOutputValueClass(Put.class);
mapRedJob.setInputFormatClass(TextInputFormat.class);
mapRedJob.setOutputFormatClass(HFileOutputFormat.class);
.....
//HBase configuration
Configuration hConf = HBaseConfiguration.create(hadoopConf);
hConf.set("hbase.zookeeper.quorum", zookeeper);
hConf.set("hbase.zookeeper.property.clientPort", port);
HTable hTable = new HTable(hConf, tableName);
HFileOutputFormat.configureIncrementalLoad(mapRedJob, hTable);
.....
A test map method would look like the following,
.....
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Put row = new Put(Bytes.toBytes(value.toString()));
row.add(Bytes.toBytes("CF"), Bytes.toBytes("C"), Bytes.toBytes(value.toString()));
try {
context.write(new ImmutableBytesWritable(Bytes.toBytes(value.toString())), row);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
.....
The format is as follows,
$ hadoop jar hbase-VERSION.jar completebulkload [-c /path/to/hbase/config/hbase-site.xml] /user/todd/myoutput mytable.
You can also load these files from your MR job programmatically by using the following code,
Try it and feel the performance improvement.
In such cases you can use HBase's Bulk load feature, which is tremendously faster than TableOutPutFormat.
The bulk load feature uses a MapReduce job to output table data in HBase's internal data format, and then directly loads the generated StoreFiles into a running cluster.
The process consists of 2 main steps.
- Preparing data via a MapReduce job
To achieve the same we need to change the OutPutFormat class of our MR job to HFileOutputFormat, which writes out data in HBase's internal storage format.
The following are the main changes that you have to make in your MR job,
.....
mapRedJob.setMapOutputKeyClass(ImmutableBytesWritable.class);
mapRedJob.setMapOutputValueClass(Put.class);
mapRedJob.setInputFormatClass(TextInputFormat.class);
mapRedJob.setOutputFormatClass(HFileOutputFormat.class);
.....
//HBase configuration
Configuration hConf = HBaseConfiguration.create(hadoopConf);
hConf.set("hbase.zookeeper.quorum", zookeeper);
hConf.set("hbase.zookeeper.property.clientPort", port);
HTable hTable = new HTable(hConf, tableName);
HFileOutputFormat.configureIncrementalLoad(mapRedJob, hTable);
.....
A test map method would look like the following,
.....
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Put row = new Put(Bytes.toBytes(value.toString()));
row.add(Bytes.toBytes("CF"), Bytes.toBytes("C"), Bytes.toBytes(value.toString()));
try {
context.write(new ImmutableBytesWritable(Bytes.toBytes(value.toString())), row);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
.....
- Loading the Data into the HBase Table
The format is as follows,
$ hadoop jar hbase-VERSION.jar completebulkload [-c /path/to/hbase/config/hbase-site.xml] /user/todd/myoutput mytable.
LoadIncrementalHFiles lihf = new LoadIncrementalHFiles(hConf);
lihf.doBulkLoad(new Path(hfileOutPutPath), hTable);
Try it and feel the performance improvement.