Source code: AccountingHandler.java

index |  197 lines | javadoc ]

package nl.west.aaa;

import java.util.*;
import java.io.*;

/**
 * A MessageHandler handling accounting messages.
 * All messages of type <code>Message.ACCOUNTING_REQUEST</code>
 * are replied to with a <code>Message.ACCOUNTING_REPLY</code>
 * message. Subclasses must override the handleRequest() method
 * to store the message in a database or to generate session records.
 */
public abstract class AccountingHandler
    implements MessageHandler
{
    private AAAUnit unit;

    private Vector pollingList=new Vector();
    
    private LinkedList pollingQueue=new LinkedList();
    
    static int MINIMUM_THREAD_WAKEUP=100;
    
    private long minimumPollingInterval=100;
    
    /**
     * The time at wich the last polling message was sent.
     */
    private long lastPollSent=0;

    public AccountingHandler(AAAUnit unit)
    {
        this.unit=unit;
    }

    /**
     * Process the incoming message. Return true if message is an
     * <code>ACCOUNTING_REQUEST</code> message, otherwise return false.
     * A <code>ACCOUNTING_REPLY</code> message is sent back.
     */
    public boolean handleMessage(Message msg,AAAUnit unit)
    {
        switch(msg.getMessageType())
        {
        case Message.ACCOUNTING_REQUEST:
            return handleRequest(msg,unit);
        case Message.ACCOUNTING_INDICATION:
            return handleIndication(msg,unit);
        default:
            return false;
        }
    }
    
    /**
     * Handle the incoming accounting request.
     * A accounting request should be replied to with
     * an accounting reply.
     */
    protected abstract boolean handleRequest(Message msg,AAAUnit unit);
    
    /**
     * Handle the incoming accounting indication message
     * by scheduling the service-element for polling.
     */
    private boolean handleIndication(Message msg,AAAUnit unit)
    {
        pollingQueue.addLast(msg.reply(Message.ACCOUNTING_POLL,new Hashtable()));
        return true;
    }
    
    void sendPoll(Object o)
    {
        long now=System.currentTimeMillis();
        Identifier id;
        Message msg;
        if (o instanceof PollingElement)
        {
            PollingElement pe=(PollingElement)o;
            id=pe.ident;
            pe.nextPoll=now+pe.interval;
            pe.isQueued=false;
            msg=new Message(Message.ACCOUNTING_POLL,new Hashtable());
        }
        else
        {
            msg=(Message)o;
            id=msg.receiver;
        }
        lastPollSent=now;
        try
        {
            unit.sendMessage(msg,id);
        }
        catch (IOException ex)
        {
        // TODO: handle errors
        }
    }
    
    
    public void addForPolling(Identifier id,long interval)
    {
        pollingList.add(new PollingElement(id,interval));
    }
    
    public void removeForPolling(Identifier id)
    {
        Enumeration e=pollingList.elements();
        while(e.hasMoreElements())
        {
            PollingElement pe=(PollingElement)e.nextElement();
            if(id.equals(pe.ident))
            {
                pollingList.remove(pe);
                return;
            }
        }
    }
    
    /**
     * Simple class to hold data for polling
     * of service elements.
     */
    class PollingElement
    {
        /**
         * The element to poll.
         */
        Identifier ident;
        
        /**
         * The interval atwhich to poll.
         */
        long interval=100;
        
        /**
         * The next time to poll the element.
         */
        long nextPoll=0;
        
        /**
         * Flag to indicate it is being queued.
         */
        boolean isQueued=false;
        
        PollingElement(Identifier id,long i)
        {
            this.ident=id;
            this.interval=i;
        }
    }

    class Poller
        extends Thread
    {
    
        public void run()
        {
            while(true)
            {
                // check if any elements should be added to the polling list
                long now=System.currentTimeMillis();
                Enumeration e=pollingList.elements();
                while(e.hasMoreElements())
                {
                    PollingElement pe=(PollingElement)e.nextElement();
                    if( !pe.isQueued &&
                        (now>pe.nextPoll) )
                    {
                        pe.isQueued=true;
                        pollingQueue.addLast(pe);
                    }
                }
                // check if any element should be polled
                now=System.currentTimeMillis();
                if(now>lastPollSent+minimumPollingInterval)
                {
                    synchronized(pollingQueue)
                    {
                        if(!pollingQueue.isEmpty())
                            sendPoll(pollingQueue.removeFirst());
                    }
                }
                try 
                {
                    // TODO: sleep smarter, until next pollinginterval or next polling add
                    // or maybe even split to two threads
                    Thread.sleep(10);
                }
                catch (InterruptedException ex)
                {}
            }
        }
    
    }

}


Arthur <arthur@ch.twi.tudelft.nl> http://ch.twi.tudelft.nl/~arthur/
2002-05-27