Source code: AccountingSender.java
|
[ index | 552 lines | javadoc ]
|
package nl.west.aaa;
import java.util.*;
import java.io.*;
/**
* Class to send accounting data to an accounting server.
* Accounting data is buffered until a reply is received.
* Event-driven, polling and event-driven polling
* models are supported (see constructor).
*/
public class AccountingSender
{
// MODEL VALUES
/**
* Used for event-driven delivery model (no batching).
* When this value is passed to the constructor, an event-driven
* message delivery model is used. No batching will occur.
*/
public static final int EVENT_DRIVEN = 0;
/**
* Used for event-driven batching delivery model.
* When this value is passed to the constructor, an event-driven
* message delivery model with batching is used.
*/
public static final int EVENT_DRIVEN_BATCHING = 1;
/**
* Used for polling delivery model.
* When this value is passed to the contructor, a
* polling message delivery model is asumed.
*/
public static final int POLLING = 2;
/**
* Used for event-driven-polling.
* When this value is passed to the constructor, an
* event-driven polling message delivery model is used.
*/
public static final int EVENT_DRIVEN_POLLING = 3;
/**
* The inner thread wakes up al least every second
* to check if the parameters have changed.
*/
static final int MINIMUM_THREAD_WAKEUP = 100;
// TUNING PARAMETERS
/**
* Maximum number of buffered messages. If the buffer
* overflows, all buffered messages are sent.
* 0 indicates immediate send.
*/
private int maxBufferedSize=0;
/**
* The timeout value after wich all buffered accounting data
* is sent. Timeout is in milliseconds. If this is zero
* no timeout will occur.
*/
private long maxBufferedTime;
/**
* If an incoming message is received and this is set to
* true and the buffer is not full an indication to the accounting
* server is sent.
*/
private boolean sendIndication=false;
/**
* The timeout value that is used when assuming sent
* request messages are lost.
*/
private long requestTimeout;
/**
* The timeout value that is used when resending indication
* requests. Timeout is in milliseconds.
*/
private long indicationTimeout;
// STATE
/**
* The AAAUnit that is used for sending and receiving
* messages.
*/
private AAAUnit unit;
/**
* The time at wich the last indication message was sent.
*/
private long lastIndicationSent;
/**
* The currently selected Receiver from the list
* of registered receivers.
*/
private Identifier currentReceiver;
/**
* The current list of registered accounting-servers.
*/
private Vector receivers;
/**
* The time the first data was entered to the buffer.
* Set to zero when buffer is empty.
*/
private long firstMessageBuffered=0;
// BUFFERS
/**
* The set of messages waiting for a valid reply.
*/
private Set awaitingReply=new HashSet();
/**
* The set of messages waiting to be sent.
*/
private Vector awaitingSend=new Vector();
/**
* Create a new AccountingSender and connect
* it to the given AAAUnit. The model can be one
* of <code>EVENT_DRIVEN</code>,
* <code>EVENT_DRIVEN_BATCHING</code>
* <code>POLLING</code>
* or <code>EVENT_DRIVEN_POLLING</code>.
* @see AccountingSender#EVENT_DRIVEN
* @see AccountingSender#EVENT_DRIVEN_BATCHING
* @see AccountingSender#POLLING
* @see AccountingSender#EVENT_DRIVEN_POLLING
*/
public AccountingSender(int model,AAAUnit unit)
{
this.unit=unit;
switch(model)
{
case EVENT_DRIVEN:
maxBufferedSize=0; // no buffering
maxBufferedTime=0; // no buffering
sendIndication=false;
break;
case EVENT_DRIVEN_BATCHING:
maxBufferedSize=100; // buffer max 100 messages
maxBufferedTime=1000; // buffer for 1 second max
sendIndication=false;
break;
case POLLING:
maxBufferedSize=1000000; // buffer infinitly
maxBufferedTime=1000000; // buffer infinitly
sendIndication=false;
break;
case EVENT_DRIVEN_POLLING:
maxBufferedSize=1000000; // buffer infinitly
maxBufferedTime=1000000; // buffer infinitly
sendIndication=true;
break;
default:
throw new IllegalArgumentException("Illegal mode");
}
requestTimeout=5000; // 5 sec
indicationTimeout=3000; // wait 3 secs. before indication retry
receivers=new Vector();
unit.addMessageHandler(new Handler());
new Poller().start();
}
/**
* Send the given message to the currently selected
* accounting-server. If the message is an accounting request
* it is added to the buffer of messages awaiting a reply message.
*/
private synchronized void sendMessage(Message msg)
{
long now=System.currentTimeMillis();
try
{
if(msg.getMessageType()==Message.ACCOUNTING_REQUEST)
{
awaitingReply.add(new MessageContainer(msg,now));
}
else
lastIndicationSent=now;
unit.sendMessage(msg,currentReceiver);
}
catch (IOException e)
{
nextReceiver();
}
}
private synchronized void resendMessage(MessageContainer mc)
{
long now=System.currentTimeMillis();
try
{
unit.sendMessage(mc.msg,currentReceiver);
mc.lastTimeSent=now;
}
catch (IOException e)
{
nextReceiver();
}
}
/**
* Make another server the default server.
* Selects the next accounting-server from
* the list of registered servers.
*/
private synchronized void nextReceiver()
{
if(receivers.size()==0)
{
currentReceiver=null;
}
else if(currentReceiver==null)
{
currentReceiver=(Identifier)receivers.elementAt(0);
}
else
{
int i=receivers.indexOf(currentReceiver)+1;
if(i>=receivers.size())
currentReceiver=(Identifier)receivers.elementAt(0);
else
currentReceiver=(Identifier)receivers.elementAt(i);
}
}
/**
* Add the given message to the buffer.
*/
private void bufferMessage(Message msg)
{
awaitingSend.add(msg);
if(firstMessageBuffered==0)
firstMessageBuffered=System.currentTimeMillis();
//checkSendBuffer();
}
/**
* Flush all the buffered messages.
*/
private synchronized void flushBuffer()
{
// send buffered messages
Enumeration e=awaitingSend.elements();
while(e.hasMoreElements())
{
Message msg=(Message)e.nextElement();
sendMessage(msg);
}
awaitingSend.clear();
firstMessageBuffered=0;
}
/**
* Check if the buffer overflowed and
* flush the buffer if it did.
*/
synchronized void checkSendBuffer()
{
if ( awaitingSend.size() <=0 )
return; // do nothing with empty buffer
if ( (awaitingSend.size() >= maxBufferedSize) ||
(System.currentTimeMillis() >= (firstMessageBuffered+maxBufferedTime) ) )
{
flushBuffer();
}
}
/**
* Class that is used for embedding Messages in a list
* while waiting for a reply.
*/
class MessageContainer
{
/**
* The embedded Message object.
*/
Message msg;
/**
* The last time this Message was sent.
*/
long lastTimeSent;
/**
* Construct a new MessageContainer Object.
*/
MessageContainer(Message msg,long timeSent)
{
this.msg=msg;
this.lastTimeSent=timeSent;
}
}
/**
* Resends the messages that have not been replied to
* with requestTimeoud time.
*/
synchronized void checkReplyBuffer()
{
long limit=System.currentTimeMillis()-requestTimeout;
Iterator e=awaitingReply.iterator();
while (e.hasNext())
{
MessageContainer mc=(MessageContainer)e.next();
if(mc.lastTimeSent<limit)
{
resendMessage(mc);
}
}
}
/**
* An indication message is sent to the accounting
* server if it has not been sent within
* the specified indicationTimeout.
* If the indication has been sent recently
* (within indicationTimeout) it will not be sent.
* @see #indicationTimeout
*/
private synchronized void sendIndication()
{
long now=System.currentTimeMillis();
// only send if needed
if ( now > (lastIndicationSent+indicationTimeout) )
{
sendMessage(new Message(
Message.ACCOUNTING_INDICATION,
new Hashtable()));
lastIndicationSent=now;
}
}
/**
* Send accounting data to the registered server given a model
* for delivering accounting messages.
*/
public synchronized void account(Hashtable attrs)
{
// build a message
Message msg=new Message(Message.ACCOUNTING_REQUEST,attrs);
// check if should be sent or buffered
if ( maxBufferedSize == 0 )
{
// just do event-driven model
sendMessage(msg);
}
else
{
// buffer the message
bufferMessage(msg);
// an indication should only be sent if the buffer was not yet flushed
if ( sendIndication && (awaitingSend.size()>0) )
{
sendIndication();
}
}
}
/**
* Adds the given Identifier to the list of possible
* accounting servers.
* All servers are tried in a ronud-robin fashion
* if the AAAUnit signals an error, of if the message
* was not replied to within a given time.
*/
public void addReceiver(Identifier server)
{
receivers.add(server);
// if this is the first, current is null
if(currentReceiver==null)
currentReceiver=server;
}
/**
* Removes the given accounting-server from
* the list of registered accounting-servers.
* No more accounting-messages will be sent to
* the accounting-server.
*/
public void removeReceiver(Identifier server)
{
// check if it is current if so do next
if(server==currentReceiver)
nextReceiver();
// if it is still the current, it is the last one
if(server==currentReceiver)
currentReceiver=null;
// TODO: check for multiple same recievers
// actualy remove is
receivers.remove(server);
}
/**
* Handle an incoming reply message.
* Check if it is a reply to a sent accounting request.
*/
boolean handleReply(Message msg,AAAUnit unit)
{
Iterator i=awaitingReply.iterator();
while(i.hasNext())
{
MessageContainer mc=(MessageContainer)i.next();
if(msg.isReplyTo(mc.msg))
{
awaitingReply.remove(mc);
return true; // message handled
}
}
return false; // message not handled
}
/**
* Handle incoming accounting poll messages
* by changing the default accounting server
* to the sender of this message and sending
* all buffered accounting messages to that server.
*/
boolean handlePoll(Message msg,AAAUnit unit)
{
Identifier id=msg.sender;
if(id!=null)
currentReceiver=id;
flushBuffer();
return true;
}
/**
* Inner class that filters out incoming messages that
* are replies to given sent messages and handles
* accounting polls.
*/
class Handler
implements MessageHandler
{
/**
* Handle incoming messages. Find replies
* to sent messages and mark them as being handles.
*/
public boolean handleMessage(Message msg,AAAUnit unit)
{
switch(msg.getMessageType())
{
case Message.ACCOUNTING_REPLY:
return handleReply(msg,unit);
case Message.ACCOUNTING_POLL:
return handlePoll(msg,unit);
default:
return false;
}
}
}
public void setMaxBufferSize(int size)
{
maxBufferedSize=size;
}
public void setMaxBufferTime(long time)
{
maxBufferedTime=time;
}
public void setSendIndication(boolean send)
{
sendIndication=send;
}
public void setRequestTimeout(long timeout)
{
requestTimeout=timeout;
}
public void setIndicationTimeout(long timeout)
{
indicationTimeout=timeout;
}
public int getMaxBufferSize()
{
return maxBufferedSize;
}
public long getMaxBufferTime()
{
return maxBufferedTime;
}
public boolean getSendIndication()
{
return sendIndication;
}
public long getRequestTimeout()
{
return requestTimeout;
}
public long getIndicationTimeout()
{
return indicationTimeout;
}
class Poller
extends Thread
{
public void run()
{
while(true)
{
// check if buffered messages should be sent
checkSendBuffer();
// check if messages are buffered too long
checkReplyBuffer();
// TODO:implement
// - check for indication messages not replied to (+fail-over etc)
// - make the sleep smart so it waits for some maximum time
try
{
Thread.sleep(MINIMUM_THREAD_WAKEUP);
}
catch (InterruptedException e)
{}
}
}
}
}
|