Source code: AccountingSender.java

index |  552 lines | javadoc ]

package nl.west.aaa;

import java.util.*;
import java.io.*;

/**
 * Class to send accounting data to an accounting server.
 * Accounting data is buffered until a reply is received.
 * Event-driven, polling and event-driven polling
 * models are supported (see constructor).
 */ 
public class AccountingSender
{


    // MODEL VALUES
    

    /**
     * Used for event-driven delivery model (no batching).
     * When this value is passed to the constructor, an event-driven
     * message delivery model is used. No batching will occur.
     */
    public static final int EVENT_DRIVEN          = 0;
    
    /**
     * Used for event-driven batching delivery model.
     * When this value is passed to the constructor, an event-driven
     * message delivery model with batching is used.
     */
    public static final int EVENT_DRIVEN_BATCHING = 1;
    
    /**
     * Used for polling delivery model.
     * When this value is passed to the contructor, a
     * polling message delivery model is asumed.
     */
    public static final int POLLING               = 2;
    
    /**
     * Used for event-driven-polling.
     * When this value is passed to the constructor, an
     * event-driven polling message delivery model is used.
     */
    public static final int EVENT_DRIVEN_POLLING  = 3;
    
    /**
     * The inner thread wakes up al least every second
     * to check if the parameters have changed.
     */
    static final int MINIMUM_THREAD_WAKEUP = 100;


    // TUNING PARAMETERS


    /**
     * Maximum number of buffered messages. If the buffer
     * overflows, all buffered messages are sent.
     * 0 indicates immediate send.
     */
    private int maxBufferedSize=0;
    
    /**
     * The timeout value after wich all buffered accounting data
     * is sent. Timeout is in milliseconds. If this is zero
     * no timeout will occur.
     */
    private long maxBufferedTime;

    /**
     * If an incoming message is received and this is set to
     * true and the buffer is not full an indication to the accounting
     * server is sent.
     */
    private boolean sendIndication=false;
    
    /**
     * The timeout value that is used when assuming sent
     * request messages are lost.
     */
    private long requestTimeout;
    
    /**
     * The timeout value that is used when resending indication
     * requests. Timeout is in milliseconds.
     */
    private long indicationTimeout;
    
    
    // STATE
    
    
    /**
     * The AAAUnit that is used for sending and receiving
     * messages.
     */
    private AAAUnit unit;

    /**
     * The time at wich the last indication message was sent.
     */
    private long lastIndicationSent;

    /**
     * The currently selected Receiver from the list
     * of registered receivers.
     */
    private Identifier currentReceiver;
    
    /**
     * The current list of registered accounting-servers.
     */
    private Vector receivers;
    
    /**
     * The time the first data was entered to the buffer.
     * Set to zero when buffer is empty.
     */
    private long firstMessageBuffered=0;


    // BUFFERS


    /**
     * The set of messages waiting for a valid reply.
     */
    private Set awaitingReply=new HashSet();
    
    /**
     * The set of messages waiting to be sent.
     */
    private Vector awaitingSend=new Vector();
    
    /**
     * Create a new AccountingSender and connect
     * it to the given AAAUnit. The model can be one
     * of <code>EVENT_DRIVEN</code>, 
     * <code>EVENT_DRIVEN_BATCHING</code>
     * <code>POLLING</code>
     * or <code>EVENT_DRIVEN_POLLING</code>.
     * @see AccountingSender#EVENT_DRIVEN
     * @see AccountingSender#EVENT_DRIVEN_BATCHING
     * @see AccountingSender#POLLING
     * @see AccountingSender#EVENT_DRIVEN_POLLING
     */
    public AccountingSender(int model,AAAUnit unit)
    {
        this.unit=unit;
        switch(model)
        {
        case EVENT_DRIVEN:
            maxBufferedSize=0;       // no buffering
            maxBufferedTime=0;       // no buffering
            sendIndication=false;
            break;
        case EVENT_DRIVEN_BATCHING:
            maxBufferedSize=100;     // buffer max 100 messages
            maxBufferedTime=1000;    // buffer for 1 second max
            sendIndication=false;
            break;
        case POLLING:
            maxBufferedSize=1000000; // buffer infinitly
            maxBufferedTime=1000000; // buffer infinitly
            sendIndication=false;
            break;
        case EVENT_DRIVEN_POLLING:
            maxBufferedSize=1000000; // buffer infinitly
            maxBufferedTime=1000000; // buffer infinitly
            sendIndication=true;
            break;
        default:
            throw new IllegalArgumentException("Illegal mode");
        }
        requestTimeout=5000;       // 5 sec
        indicationTimeout=3000;    // wait 3 secs. before indication retry
        receivers=new Vector();
        unit.addMessageHandler(new Handler());
        new Poller().start();
    }
    
    /**
     * Send the given message to the currently selected
     * accounting-server. If the message is an accounting request
     * it is added to the buffer of messages awaiting a reply message.
     */
    private synchronized void sendMessage(Message msg)
    {
        long now=System.currentTimeMillis();
        try
        {
            if(msg.getMessageType()==Message.ACCOUNTING_REQUEST)
            {
                awaitingReply.add(new MessageContainer(msg,now));
            }
            else
                lastIndicationSent=now;
            unit.sendMessage(msg,currentReceiver);
        }
        catch (IOException e)
        {
            nextReceiver();
        }
    }
    
    private synchronized void resendMessage(MessageContainer mc)
    {
        long now=System.currentTimeMillis();
        try
        {
            unit.sendMessage(mc.msg,currentReceiver);
            mc.lastTimeSent=now;
        }
        catch (IOException e)
        {
            nextReceiver();
        }
    }
    
    /**
     * Make another server the default server.
     * Selects the next accounting-server from
     * the list of registered servers.
     */
    private synchronized void nextReceiver()
    {
        if(receivers.size()==0)
        {
            currentReceiver=null;
        }
        else if(currentReceiver==null)
        {
            currentReceiver=(Identifier)receivers.elementAt(0);
        }
        else
        {
            int i=receivers.indexOf(currentReceiver)+1;
            if(i>=receivers.size())
                currentReceiver=(Identifier)receivers.elementAt(0);
            else
                currentReceiver=(Identifier)receivers.elementAt(i);
        }
    }

    /**
     * Add the given message to the buffer.
     */    
    private void bufferMessage(Message msg)
    {
        awaitingSend.add(msg);
        if(firstMessageBuffered==0)
            firstMessageBuffered=System.currentTimeMillis();
        //checkSendBuffer(); 
    }
    
    /**
     * Flush all the buffered messages.
     */
    private synchronized void flushBuffer()
    {
        // send buffered messages
        Enumeration e=awaitingSend.elements();
        while(e.hasMoreElements())
        {
            Message msg=(Message)e.nextElement();
            sendMessage(msg);
        }
        awaitingSend.clear();
        firstMessageBuffered=0;
    }
    
    /**
     * Check if the buffer overflowed and
     * flush the buffer if it did.
     */
    synchronized void checkSendBuffer()
    {
        if ( awaitingSend.size() <=0 )
            return; // do nothing with empty buffer
        if ( (awaitingSend.size() >= maxBufferedSize) ||
             (System.currentTimeMillis() >= (firstMessageBuffered+maxBufferedTime) ) )
        {
            flushBuffer();
        }
    }
    
    /**
     * Class that is used for embedding Messages in a list
     * while waiting for a reply.
     */
    class MessageContainer
    {

        /**
         * The embedded Message object.
         */
        Message msg;
        
        /**
         * The last time this Message was sent.
         */
        long lastTimeSent;

        /**
         * Construct a new MessageContainer Object.
         */
        MessageContainer(Message msg,long timeSent)
        {
            this.msg=msg;
            this.lastTimeSent=timeSent;
        }

    }
    
    /**
     * Resends the messages that have not been replied to
     * with requestTimeoud time.
     */
    synchronized void checkReplyBuffer()
    {
        long limit=System.currentTimeMillis()-requestTimeout;
        Iterator e=awaitingReply.iterator();
        while (e.hasNext())
        {
            MessageContainer mc=(MessageContainer)e.next();
            if(mc.lastTimeSent<limit)
            {
                resendMessage(mc);
            }
        }
    }
    
    /**
     * An indication message is sent to the accounting
     * server if it has not been sent within
     * the specified indicationTimeout.
     * If the indication has been sent recently 
     * (within indicationTimeout) it will not be sent.
     * @see #indicationTimeout
     */
    private synchronized void sendIndication()
    {
        long now=System.currentTimeMillis();
        // only send if needed
        if ( now > (lastIndicationSent+indicationTimeout) )
        {
            sendMessage(new Message(
	        Message.ACCOUNTING_INDICATION,
		new Hashtable()));
	    lastIndicationSent=now;
        }
    }
    
    /**
     * Send accounting data to the registered server given a model
     * for delivering accounting messages.
     */
    public synchronized void account(Hashtable attrs)
    {
        // build a message
        Message msg=new Message(Message.ACCOUNTING_REQUEST,attrs);
        // check if should be sent or buffered
        if ( maxBufferedSize == 0 )
        {
            // just do event-driven model
            sendMessage(msg);
        }
        else
        {
            // buffer the message
            bufferMessage(msg);
            // an indication should only be sent if the buffer was not yet flushed
            if ( sendIndication && (awaitingSend.size()>0) )
            {
                sendIndication();
            }
        }
    }
    
    /**
     * Adds the given Identifier to the list of possible
     * accounting servers.
     * All servers are tried in a ronud-robin fashion
     * if the AAAUnit signals an error, of if the message
     * was not replied to within a given time.
     */
    public void addReceiver(Identifier server)
    {
        receivers.add(server);
        // if this is the first, current is null
        if(currentReceiver==null)
            currentReceiver=server;
    }

    /**
     * Removes the given accounting-server from
     * the list of registered accounting-servers.
     * No more accounting-messages will be sent to
     * the accounting-server.
     */    
    public void removeReceiver(Identifier server)
    {
        // check if it is current if so do next
        if(server==currentReceiver)
            nextReceiver();
        // if it is still the current, it is the last one
        if(server==currentReceiver)
            currentReceiver=null;
        // TODO: check for multiple same recievers
        // actualy remove is
        receivers.remove(server);
    }
    
    /**
     * Handle an incoming reply message.
     * Check if it is a reply to a sent accounting request.
     */
    boolean handleReply(Message msg,AAAUnit unit)
    {
        Iterator i=awaitingReply.iterator();
        while(i.hasNext())
        {
            MessageContainer mc=(MessageContainer)i.next();
            if(msg.isReplyTo(mc.msg))
            {
                awaitingReply.remove(mc);
                return true; // message handled
            }
        }
        return false; // message not handled
    }
    
    /**
     * Handle incoming accounting poll messages
     * by changing the default accounting server
     * to the sender of this message and sending
     * all buffered accounting messages to that server.
     */
    boolean handlePoll(Message msg,AAAUnit unit)
    {
        Identifier id=msg.sender;
        if(id!=null)
            currentReceiver=id;
        flushBuffer();
        return true;
    }
    
    /**
     * Inner class that filters out incoming messages that
     * are replies to given sent messages and handles
     * accounting polls.
     */
    class Handler
        implements MessageHandler
    {

        /**
         * Handle incoming messages. Find replies
         * to sent messages and mark them as being handles.
         */        
        public boolean handleMessage(Message msg,AAAUnit unit)
        {
            switch(msg.getMessageType())
            {
            case Message.ACCOUNTING_REPLY:
                return handleReply(msg,unit);
            case Message.ACCOUNTING_POLL:
                return handlePoll(msg,unit);
            default:
                return false;
            }
        }
    }
    
    public void setMaxBufferSize(int size)
    {
        maxBufferedSize=size;
    }
    
    public void setMaxBufferTime(long time)
    {
        maxBufferedTime=time;
    }
    
    public void setSendIndication(boolean send)
    {
        sendIndication=send;
    }
    
    public void setRequestTimeout(long timeout)
    {
        requestTimeout=timeout;
    }
    
    public void setIndicationTimeout(long timeout)
    {
        indicationTimeout=timeout;
    }

    public int getMaxBufferSize()
    {
        return maxBufferedSize;
    }
    
    public long getMaxBufferTime()
    {
        return maxBufferedTime;
    }
    
    public boolean getSendIndication()
    {
        return sendIndication;
    }
    
    public long getRequestTimeout()
    {
        return requestTimeout;
    }
    
    public long getIndicationTimeout()
    {
        return indicationTimeout;
    }
        
    class Poller
        extends Thread
    {
    
        public void run()
        {
            while(true)
            {
                // check if buffered messages should be sent
                checkSendBuffer();
                // check if messages are buffered too long
                checkReplyBuffer();
                // TODO:implement
                //  - check for indication messages not replied to (+fail-over etc)
                //  - make the sleep smart so it waits for some maximum time
                try 
                {
                    Thread.sleep(MINIMUM_THREAD_WAKEUP);
                }
                catch (InterruptedException e)
                {}
            }
        }
    
    }
    
}


Arthur <arthur@ch.twi.tudelft.nl> http://ch.twi.tudelft.nl/~arthur/
2002-05-27