Wednesday, 23 May 2012

Throttling Algorithm

A simple Throttling algorithm using DelayQueue, which enable you to maintain the network throttle even with multiple network channels (connectors).

I have multiple connectors with different throttle, so the algorithm should maintain the throttle of each connector and should send the message with a maximum and optimized throughput.

Algorithm,
Put each and every active connectors into a DelayQueue on application startup with a calculated 'delay' (eg: delay = 1000/throttle, for millisecond TimeUnit). When you receive a packet,  take a connector from the queue (take is a blocking call) which will keep the fixed delay and send using that connector, also put the connector into the queue with an updated insert time for further use. The queue will maintain the delay for each connector separately and hence the throttle.
Implementation,
I'm using 2 classes for the algorithm,
  1. AbstractDelayedWrapper (make a sub class of this for your connector).
  2. DelayedPool (contains a DelayQueue of elements T extends AbstractDelayedWrapper).

 --------------------------------------------------------------------------------------------------------------------------------

AbstractDelayedWrapper,

package xxxx.delay;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 *
 * @author krishnaprasad
 *
 */
public abstract class AbstractDelayedWrapper implements Delayed {

    private long queueInsertTime;

    protected long delay;

    private TimeUnit timeUnit;

    public AbstractDelayedWrapper(TimeUnit timeUnit) {
        super();
        this.timeUnit = timeUnit;
        this.queueInsertTime = getCurrentTime();
    }

    /**
     * Should return in the specified TimeUnit
     *
     * @return
     */
    public abstract long getCurrentTime();

    public void updateTime() {
        this.queueInsertTime = getCurrentTime();
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert((queueInsertTime - getCurrentTime()) + delay, timeUnit);
    }

    @Override
    public int compareTo(Delayed obj) {
        int val = 0;
        Delayed newObj = obj;
        long newDelay = newObj.getDelay(timeUnit);
        long thisDelay = this.getDelay(timeUnit);
        if (thisDelay < newDelay) {
            val = -1;
        } else if (thisDelay > newDelay) {
            val = 1;
        }
        return val;
    }
}

--------------------------------------------------------------------------------------------------------------------------------
Connector,
You can create your own Connectors by extending the AbstractDelayedWrapper class, here you need to set the TimeUnit as well as the delay (as calculated above). Then implement the getCurrentTime() method with respect to your TimeUnit(return System.currentTimeMillis() for Millisecond TimeUnit, should use Nanoseconds for large throttles).

package xxxx.delay;

import java.util.concurrent.TimeUnit;

/**
 * Network writer
 *
 * @author krishnaprasad
 *
 */
public class Connector extends AbstractDelayedWrapper {

    public Connector(int throttle) {
        super(TimeUnit.MILLISECONDS);// use nanoseconds for more throughput
        delay = 1000 / throttle;// this can be improved for decimal places
    }

    @Override
    public long getCurrentTime() {
        return System.currentTimeMillis();
    }

    public boolean send(Object obj) {
        // To-do implement the packet writing logic
        System.out.println("Packet send...");
        return true;
    }
}


--------------------------------------------------------------------------------------------------------------------------------
DelayedPool,

package xxxx.delay;

import java.util.concurrent.DelayQueue;

/**
 * Delay Q
 *
 * @author krishnaprasad
 *
 */
public class Pool<T extends AbstractDelayedWrapper> {

    public final DelayQueue<T> pool = new DelayQueue<T>();

    /**
     * Retrieves an available element of type T, waiting if necessary
     *
     * @return
     * @throws InterruptedException
     */
    public T get() throws InterruptedException {
        T element = null;
        element = pool.take();
        put(element);
        return element;
    }

    public void put(T element) {
        element.updateTime();
        if (pool.contains(element)) {
            pool.remove(element);
        }
        pool.put(element);
    }

    public boolean remove(T element) {
        return pool.remove(element);
    }

    public int size() {
        return pool.size();
    }

    @Override
    public String toString() {
        return "DelayedPool [pool=" + pool + "]";
    }

}

--------------------------------------------------------------------------------------------------------------------------------
Main,
In the main class I have created one Connector with throttle 100, Then put the same into the DelayQueue. Now we can use the pool.get() method for getting the  Connector to send packets, which in turn will handle the throttle. In the same way we can use multiple connectors with different throttles.

package xxxx.delay;

import xxx.delay.Pool;

public class Main {

    private static Pool<Connector> pool = new Pool<Connector>();

    public static void main(String[] args) throws InterruptedException {
        int throttle = 100;
        Connector c = new Connector(throttle);
        pool.put(c);
        Connector c1;
        long time = System.currentTimeMillis();
        for (int i = 0; i < 10; i++) {
            c1 = pool.get();
            c1.send(null);
            System.out.println("Throttle:" + (System.currentTimeMillis() - time));
            time = System.currentTimeMillis();
        }
    }
}