Source code: AccountingHandler.java
|
[ index | 197 lines | javadoc ]
|
package nl.west.aaa;
import java.util.*;
import java.io.*;
/**
* A MessageHandler handling accounting messages.
* All messages of type <code>Message.ACCOUNTING_REQUEST</code>
* are replied to with a <code>Message.ACCOUNTING_REPLY</code>
* message. Subclasses must override the handleRequest() method
* to store the message in a database or to generate session records.
*/
public abstract class AccountingHandler
implements MessageHandler
{
private AAAUnit unit;
private Vector pollingList=new Vector();
private LinkedList pollingQueue=new LinkedList();
static int MINIMUM_THREAD_WAKEUP=100;
private long minimumPollingInterval=100;
/**
* The time at wich the last polling message was sent.
*/
private long lastPollSent=0;
public AccountingHandler(AAAUnit unit)
{
this.unit=unit;
}
/**
* Process the incoming message. Return true if message is an
* <code>ACCOUNTING_REQUEST</code> message, otherwise return false.
* A <code>ACCOUNTING_REPLY</code> message is sent back.
*/
public boolean handleMessage(Message msg,AAAUnit unit)
{
switch(msg.getMessageType())
{
case Message.ACCOUNTING_REQUEST:
return handleRequest(msg,unit);
case Message.ACCOUNTING_INDICATION:
return handleIndication(msg,unit);
default:
return false;
}
}
/**
* Handle the incoming accounting request.
* A accounting request should be replied to with
* an accounting reply.
*/
protected abstract boolean handleRequest(Message msg,AAAUnit unit);
/**
* Handle the incoming accounting indication message
* by scheduling the service-element for polling.
*/
private boolean handleIndication(Message msg,AAAUnit unit)
{
pollingQueue.addLast(msg.reply(Message.ACCOUNTING_POLL,new Hashtable()));
return true;
}
void sendPoll(Object o)
{
long now=System.currentTimeMillis();
Identifier id;
Message msg;
if (o instanceof PollingElement)
{
PollingElement pe=(PollingElement)o;
id=pe.ident;
pe.nextPoll=now+pe.interval;
pe.isQueued=false;
msg=new Message(Message.ACCOUNTING_POLL,new Hashtable());
}
else
{
msg=(Message)o;
id=msg.receiver;
}
lastPollSent=now;
try
{
unit.sendMessage(msg,id);
}
catch (IOException ex)
{
// TODO: handle errors
}
}
public void addForPolling(Identifier id,long interval)
{
pollingList.add(new PollingElement(id,interval));
}
public void removeForPolling(Identifier id)
{
Enumeration e=pollingList.elements();
while(e.hasMoreElements())
{
PollingElement pe=(PollingElement)e.nextElement();
if(id.equals(pe.ident))
{
pollingList.remove(pe);
return;
}
}
}
/**
* Simple class to hold data for polling
* of service elements.
*/
class PollingElement
{
/**
* The element to poll.
*/
Identifier ident;
/**
* The interval atwhich to poll.
*/
long interval=100;
/**
* The next time to poll the element.
*/
long nextPoll=0;
/**
* Flag to indicate it is being queued.
*/
boolean isQueued=false;
PollingElement(Identifier id,long i)
{
this.ident=id;
this.interval=i;
}
}
class Poller
extends Thread
{
public void run()
{
while(true)
{
// check if any elements should be added to the polling list
long now=System.currentTimeMillis();
Enumeration e=pollingList.elements();
while(e.hasMoreElements())
{
PollingElement pe=(PollingElement)e.nextElement();
if( !pe.isQueued &&
(now>pe.nextPoll) )
{
pe.isQueued=true;
pollingQueue.addLast(pe);
}
}
// check if any element should be polled
now=System.currentTimeMillis();
if(now>lastPollSent+minimumPollingInterval)
{
synchronized(pollingQueue)
{
if(!pollingQueue.isEmpty())
sendPoll(pollingQueue.removeFirst());
}
}
try
{
// TODO: sleep smarter, until next pollinginterval or next polling add
// or maybe even split to two threads
Thread.sleep(10);
}
catch (InterruptedException ex)
{}
}
}
}
}
|