Source code: TCPTransport.java
|
[ index | 190 lines | javadoc ]
|
package nl.west.aaa;
import java.io.*;
import java.util.*;
import java.net.*;
//TODO: add an option to timeout the socket and close the streams
//TODO: add listening for connections self opened
//TODO: move listen to constructor?
//TODO: add check if connection is sill alive: if not: remove from
// list of active connections (this enables restart of server)
// TODO: make so constructor makes sense
public class TCPTransport
implements TransportProtocol
{
/**
* New connections are connected to this port
* on the other side.
*/
private int sendPort;
/**
* The object receiving the
* incomming data through a inputstream.
* This class sends data to the receiver.
*/
private AAAUnit receiver;
/**
* The connections currently active.
*/
private Hashtable connections=new Hashtable();
/**
* Initialize for transport over TCP.
* listenPort is the port that is listened on,
* sendPort is the port connections are made on.
*/
public TCPTransport(int listenPort,int sendPort)
{
this.sendPort=sendPort;
new Listener(listenPort).start();
}
/**
* This class represents a connection to a remote host.
*/
class Connection
{
Socket sock;
Identifier id;
InputStream in=null;
OutputStream out=null;
Connection(Socket sock)
{
this.sock=sock;
this.id=new Identifier(sock.getInetAddress());
}
Connection(Identifier id)
throws IOException
{
this.id=id;
this.sock=new Socket(id.getHostname(),sendPort);
}
}
public synchronized void sendMessage(byte[] data,Identifier id)
throws java.io.IOException
{
// check for allready present connections
Connection con=(Connection)connections.get(id);
// if no connection found, create
if(con==null)
{
// make new connection
con=new Connection(id);
// add to current connections
connections.put(con.id,con);
// listen for incoming data
new InputThread(con).start();
}
// if no stream yet, make one
if (con.out==null)
con.out=con.sock.getOutputStream();
con.out.write(data);
}
/**
* Thread that listens for incoming connections.
*/
class Listener
extends Thread
{
/**
* The portnumber that is listened on.
*/
private int listenPort;
/**
* Initialize for listening on the given port.
*/
Listener(int listenPort)
{
this.listenPort=listenPort;
}
public void run()
{
try
{
ServerSocket serverSocket=new ServerSocket(listenPort);
while(true)
{
try
{
// wait for incomming connection
Socket s=serverSocket.accept();
// make new connection object
Connection con=new Connection(s);
// add to current connections
connections.put(con.id,con);
// listen for incoming data
new InputThread(con).start();
}
catch (IOException e)
{
}
}
}
catch (IOException e)
{
}
}
}
/**
* Pass InputStream of astablished connections
* to the receiver in a seporate thread.
*/
class InputThread
extends Thread
{
Connection con;
InputThread(Connection con)
{
this.con=con;
}
public void run()
{
try
{
// if no stream, create
if(con.in==null)
{
con.in=con.sock.getInputStream();
if(!con.in.markSupported())
con.in=new BufferedInputStream(con.in);
}
// pass stream to receiver
if(receiver!=null)
receiver.handleIncoming(con.in,con.id,TCPTransport.this);
}
catch(IOException e)
{
}
}
}
/**
* Listen for connections and pass them to the receiver
* useing the handleIncoming() method of the Receiver.
*/
public void startListening(AAAUnit receiver)
{
// save receiver
this.receiver=receiver;
}
}
|