Monday, 17 September 2012

Fastest HBase Write using HBase - Bulk Load

While you are trying to put Millions and even billions of key-values into HBase from your MR job, you can feel, even TableOutPutFormat is not that much efficient.
In such cases you can use HBase's Bulk load feature, which is tremendously faster than TableOutPutFormat.

The bulk load feature uses a MapReduce job to output table data in HBase's internal data format, and then directly loads the generated StoreFiles into a running cluster.
The process consists of 2 main steps.

  • Preparing data via a MapReduce job
Data here refers to as the HBase data files(StoreFiles).
To achieve the same we need to change the OutPutFormat class of our MR job to HFileOutputFormat, which writes out data in HBase's internal storage format.

The following are the main changes that you have to make in your MR job,
.....
        mapRedJob.setMapOutputKeyClass(ImmutableBytesWritable.class);
        mapRedJob.setMapOutputValueClass(Put.class);

        mapRedJob.setInputFormatClass(TextInputFormat.class);
        mapRedJob.setOutputFormatClass(HFileOutputFormat.class);
.....
   //HBase configuration
   Configuration hConf = HBaseConfiguration.create(hadoopConf);
        hConf.set("hbase.zookeeper.quorum", zookeeper);
        hConf.set("hbase.zookeeper.property.clientPort", port);
        HTable hTable = new HTable(hConf, tableName);
        HFileOutputFormat.configureIncrementalLoad(mapRedJob, hTable);
.....

A test map method would look like the following,
.....
   public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            Put row = new Put(Bytes.toBytes(value.toString()));
            row.add(Bytes.toBytes("CF"), Bytes.toBytes("C"), Bytes.toBytes(value.toString()));
            try {
                context.write(new ImmutableBytesWritable(Bytes.toBytes(value.toString())), row);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
.....

  • Loading the Data into the HBase Table
Data can be loaded into the cluster using the command line tool 'completebulkload'.
The format is as follows,

$ hadoop jar hbase-VERSION.jar completebulkload [-c /path/to/hbase/config/hbase-site.xml] /user/todd/myoutput mytable.

You can also load these files from your MR job programmatically by using the following code,


LoadIncrementalHFiles lihf = new LoadIncrementalHFiles(hConf);
         lihf.doBulkLoad(new Path(hfileOutPutPath), hTable);


Try it and feel the performance improvement.

Wednesday, 23 May 2012

Throttling Algorithm

A simple Throttling algorithm using DelayQueue, which enable you to maintain the network throttle even with multiple network channels (connectors).

I have multiple connectors with different throttle, so the algorithm should maintain the throttle of each connector and should send the message with a maximum and optimized throughput.

Algorithm,
Put each and every active connectors into a DelayQueue on application startup with a calculated 'delay' (eg: delay = 1000/throttle, for millisecond TimeUnit). When you receive a packet,  take a connector from the queue (take is a blocking call) which will keep the fixed delay and send using that connector, also put the connector into the queue with an updated insert time for further use. The queue will maintain the delay for each connector separately and hence the throttle.
Implementation,
I'm using 2 classes for the algorithm,
  1. AbstractDelayedWrapper (make a sub class of this for your connector).
  2. DelayedPool (contains a DelayQueue of elements T extends AbstractDelayedWrapper).

 --------------------------------------------------------------------------------------------------------------------------------

AbstractDelayedWrapper,

package xxxx.delay;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 *
 * @author krishnaprasad
 *
 */
public abstract class AbstractDelayedWrapper implements Delayed {

    private long queueInsertTime;

    protected long delay;

    private TimeUnit timeUnit;

    public AbstractDelayedWrapper(TimeUnit timeUnit) {
        super();
        this.timeUnit = timeUnit;
        this.queueInsertTime = getCurrentTime();
    }

    /**
     * Should return in the specified TimeUnit
     *
     * @return
     */
    public abstract long getCurrentTime();

    public void updateTime() {
        this.queueInsertTime = getCurrentTime();
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert((queueInsertTime - getCurrentTime()) + delay, timeUnit);
    }

    @Override
    public int compareTo(Delayed obj) {
        int val = 0;
        Delayed newObj = obj;
        long newDelay = newObj.getDelay(timeUnit);
        long thisDelay = this.getDelay(timeUnit);
        if (thisDelay < newDelay) {
            val = -1;
        } else if (thisDelay > newDelay) {
            val = 1;
        }
        return val;
    }
}

--------------------------------------------------------------------------------------------------------------------------------
Connector,
You can create your own Connectors by extending the AbstractDelayedWrapper class, here you need to set the TimeUnit as well as the delay (as calculated above). Then implement the getCurrentTime() method with respect to your TimeUnit(return System.currentTimeMillis() for Millisecond TimeUnit, should use Nanoseconds for large throttles).

package xxxx.delay;

import java.util.concurrent.TimeUnit;

/**
 * Network writer
 *
 * @author krishnaprasad
 *
 */
public class Connector extends AbstractDelayedWrapper {

    public Connector(int throttle) {
        super(TimeUnit.MILLISECONDS);// use nanoseconds for more throughput
        delay = 1000 / throttle;// this can be improved for decimal places
    }

    @Override
    public long getCurrentTime() {
        return System.currentTimeMillis();
    }

    public boolean send(Object obj) {
        // To-do implement the packet writing logic
        System.out.println("Packet send...");
        return true;
    }
}


--------------------------------------------------------------------------------------------------------------------------------
DelayedPool,

package xxxx.delay;

import java.util.concurrent.DelayQueue;

/**
 * Delay Q
 *
 * @author krishnaprasad
 *
 */
public class Pool<T extends AbstractDelayedWrapper> {

    public final DelayQueue<T> pool = new DelayQueue<T>();

    /**
     * Retrieves an available element of type T, waiting if necessary
     *
     * @return
     * @throws InterruptedException
     */
    public T get() throws InterruptedException {
        T element = null;
        element = pool.take();
        put(element);
        return element;
    }

    public void put(T element) {
        element.updateTime();
        if (pool.contains(element)) {
            pool.remove(element);
        }
        pool.put(element);
    }

    public boolean remove(T element) {
        return pool.remove(element);
    }

    public int size() {
        return pool.size();
    }

    @Override
    public String toString() {
        return "DelayedPool [pool=" + pool + "]";
    }

}

--------------------------------------------------------------------------------------------------------------------------------
Main,
In the main class I have created one Connector with throttle 100, Then put the same into the DelayQueue. Now we can use the pool.get() method for getting the  Connector to send packets, which in turn will handle the throttle. In the same way we can use multiple connectors with different throttles.

package xxxx.delay;

import xxx.delay.Pool;

public class Main {

    private static Pool<Connector> pool = new Pool<Connector>();

    public static void main(String[] args) throws InterruptedException {
        int throttle = 100;
        Connector c = new Connector(throttle);
        pool.put(c);
        Connector c1;
        long time = System.currentTimeMillis();
        for (int i = 0; i < 10; i++) {
            c1 = pool.get();
            c1.send(null);
            System.out.println("Throttle:" + (System.currentTimeMillis() - time));
            time = System.currentTimeMillis();
        }
    }
}