While you are trying to put Millions and even billions of key-values into HBase from your MR job, you can feel, even TableOutPutFormat is not that much efficient.
In such cases you can use HBase's Bulk load feature, which is tremendously faster than TableOutPutFormat.
The bulk load feature uses a MapReduce job to output table data in HBase's internal data format, and then directly loads the generated StoreFiles into a running cluster.
The process consists of 2 main steps.
To achieve the same we need to change the OutPutFormat class of our MR job to HFileOutputFormat, which writes out data in HBase's internal storage format.
The following are the main changes that you have to make in your MR job,
.....
mapRedJob.setMapOutputKeyClass(ImmutableBytesWritable.class);
mapRedJob.setMapOutputValueClass(Put.class);
mapRedJob.setInputFormatClass(TextInputFormat.class);
mapRedJob.setOutputFormatClass(HFileOutputFormat.class);
.....
//HBase configuration
Configuration hConf = HBaseConfiguration.create(hadoopConf);
hConf.set("hbase.zookeeper.quorum", zookeeper);
hConf.set("hbase.zookeeper.property.clientPort", port);
HTable hTable = new HTable(hConf, tableName);
HFileOutputFormat.configureIncrementalLoad(mapRedJob, hTable);
.....
A test map method would look like the following,
.....
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Put row = new Put(Bytes.toBytes(value.toString()));
row.add(Bytes.toBytes("CF"), Bytes.toBytes("C"), Bytes.toBytes(value.toString()));
try {
context.write(new ImmutableBytesWritable(Bytes.toBytes(value.toString())), row);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
.....
The format is as follows,
$ hadoop jar hbase-VERSION.jar completebulkload [-c /path/to/hbase/config/hbase-site.xml] /user/todd/myoutput mytable.
You can also load these files from your MR job programmatically by using the following code,
Try it and feel the performance improvement.
In such cases you can use HBase's Bulk load feature, which is tremendously faster than TableOutPutFormat.
The bulk load feature uses a MapReduce job to output table data in HBase's internal data format, and then directly loads the generated StoreFiles into a running cluster.
The process consists of 2 main steps.
- Preparing data via a MapReduce job
To achieve the same we need to change the OutPutFormat class of our MR job to HFileOutputFormat, which writes out data in HBase's internal storage format.
The following are the main changes that you have to make in your MR job,
.....
mapRedJob.setMapOutputKeyClass(ImmutableBytesWritable.class);
mapRedJob.setMapOutputValueClass(Put.class);
mapRedJob.setInputFormatClass(TextInputFormat.class);
mapRedJob.setOutputFormatClass(HFileOutputFormat.class);
.....
//HBase configuration
Configuration hConf = HBaseConfiguration.create(hadoopConf);
hConf.set("hbase.zookeeper.quorum", zookeeper);
hConf.set("hbase.zookeeper.property.clientPort", port);
HTable hTable = new HTable(hConf, tableName);
HFileOutputFormat.configureIncrementalLoad(mapRedJob, hTable);
.....
A test map method would look like the following,
.....
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Put row = new Put(Bytes.toBytes(value.toString()));
row.add(Bytes.toBytes("CF"), Bytes.toBytes("C"), Bytes.toBytes(value.toString()));
try {
context.write(new ImmutableBytesWritable(Bytes.toBytes(value.toString())), row);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
.....
- Loading the Data into the HBase Table
The format is as follows,
$ hadoop jar hbase-VERSION.jar completebulkload [-c /path/to/hbase/config/hbase-site.xml] /user/todd/myoutput mytable.
LoadIncrementalHFiles lihf = new LoadIncrementalHFiles(hConf);
lihf.doBulkLoad(new Path(hfileOutPutPath), hTable);
Try it and feel the performance improvement.
Hey It's nice one :) can u please help me in clearing following error ? Am getting this when am running Mapreduce job fopr inserting data into hbase tables from hdfs files. using HFileOutputFormat.class , earlier i ran the same using MultiTableOutputFormat.class it was wrorking fine , but it was taking so much time while inserting data into hbase tables .
ReplyDeletejob.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(HFileOutputFormat.class);
job.setMapperClass(Map.class);
HTable hTable = new HTable(conf,"ARK_3000");
HFileOutputFormat.configureIncrementalLoad(job, hTable);******
*
ERROR security.UserGroupInformation: PriviledgedActionException as:reddym (auth:SIMPLE) cause:org.apache.hadoop.mapred.InvalidJobConfException: Output directory not set.
Exception in thread "main" org.apache.hadoop.mapred.InvalidJobConfException: Output directory not set. *
Thanks Madhusudhana Reddy
Hi Madhu,
ReplyDeleteI think you haven't configured the output path for your map reduce job,
Could you please test the same with adding the following configuration,
FileInputFormat.addInputPath(job, new Path(args[0]));
HFileOutputFormat.setOutputPath(job, new Path(args[1]));
If you have already added the same, please check any permission issue exists in the specified location.
Thanks & Regards,
Krishnaprasad