/******************************************************************************* * Product of NIST/ITL Advanced Networking Technologies Division (ANTD). * *******************************************************************************/ package gov.nist.javax.sip.stack; import java.net.DatagramSocket; import java.net.SocketException; import java.net.DatagramPacket; import java.io.IOException; import java.util.LinkedList; import java.net.InetAddress; import java.net.UnknownHostException; import gov.nist.core.*; import java.lang.reflect.*; /** * Sit in a loop and handle incoming udp datagram messages. For each Datagram * packet, a new UDPMessageChannel is created (upto the max thread pool size). * Each UDP message is processed in its own thread). * * @version JAIN-SIP-1.1 $Revision: 1.7 $ $Date: 2006/05/31 07:47:27 $ * * @author M. Ranganathan
* * This code is in the public domain. * * * See the implementation sequence diagram for processing incoming requests. * * * * Acknowledgement: Jeff Keyser contributed ideas on starting and stoppping the * stack that were incorporated into this code. Niklas Uhrberg suggested that * thread pooling be added to limit the number of threads and improve * performance. */ public class UDPMessageProcessor extends MessageProcessor { private static final int HIGHWAT = 100 ; // High water mark for queue size. private static final int LOWAT = 50 ; // Low water mark for queue size /** * The Mapped port (in case STUN suport is enabled) */ private int port; /** * Incoming messages are queued here. */ protected LinkedList messageQueue; /** * A list of message channels that we have started. */ protected LinkedList messageChannels; /** * Max # of udp message channels */ protected int threadPoolSize; /** * Max datagram size. */ protected static final int MAX_DATAGRAM_SIZE = 8 * 1024; /** * Our stack (that created us). */ protected SIPTransactionStack sipStack; protected DatagramSocket sock; /** * A flag that is set to false to exit the message processor (suggestion by * Jeff Keyser). */ protected boolean isRunning; /** * Constructor. * * @param sipStack * pointer to the stack. */ protected UDPMessageProcessor(InetAddress ipAddress, SIPTransactionStack sipStack, int port) throws IOException { super(ipAddress, port, "udp"); this.sipStack = sipStack; this.messageQueue = new LinkedList(); this.port = port; try { this.sock = sipStack.getNetworkLayer().createDatagramSocket(port, ipAddress); // Create a new datagram socket. sock.setReceiveBufferSize(MAX_DATAGRAM_SIZE); } catch (SocketException ex) { throw new IOException(ex.getMessage()); } } /** * Get port on which to listen for incoming stuff. * * @return port on which I am listening. */ public int getPort() { return this.port; } /** * Start our processor thread. */ public void start() throws IOException { this.isRunning = true; Thread thread = new Thread(this); thread.setDaemon(true); // Issue #32 on java.net thread.setName("UDPMessageProcessorThread"); thread.start(); } /** * Thread main routine. */ public void run() { // Check for running flag. this.messageChannels = new LinkedList(); // start all our messageChannels (unless the thread pool size is // infinity. if (sipStack.threadPoolSize != -1) { for (int i = 0; i < sipStack.threadPoolSize; i++) { UDPMessageChannel channel = new UDPMessageChannel(sipStack, this); this.messageChannels.add(channel); } } // Somebody asked us to exit. if isRunnning is set to false. while (this.isRunning) { try { int bufsize = sock.getReceiveBufferSize(); byte message[] = new byte[bufsize]; DatagramPacket packet = new DatagramPacket(message, bufsize); sock.receive(packet); // This is a simplistic congestion control algorithm. // It accepts packets if queuesize is < LOWAT. It drops // requests if the queue size exceeds a HIGHWAT and accepts // requests with probability p proportional to the difference // between current queue size and LOWAT in the range // of queue sizes between HIGHWAT and LOWAT. // TODO -- penalize spammers by looking at the source // port and IP address. if ( this.messageQueue.size() >= HIGHWAT) { if (sipStack.logWriter.isLoggingEnabled()) { sipStack.logWriter.logDebug("Dropping message -- queue length exceeded"); } //System.out.println("HIGHWAT Drop!"); continue; } else if ( this.messageQueue.size() > LOWAT && this .messageQueue.size() < HIGHWAT ) { // Drop the message with a probabilty that is linear in the range 0 to 1 float threshold = ((float)(messageQueue.size() - LOWAT))/ ((float)(HIGHWAT - LOWAT)); boolean decision = Math.random() > 1.0 - threshold; if ( decision ) { if (sipStack.logWriter.isLoggingEnabled()) { sipStack.logWriter.logDebug("Dropping message with probability " + (1.0 - threshold)); } //System.out.println("RED Drop!"); continue; } } // Count of # of packets in process. // this.useCount++; if (sipStack.threadPoolSize != -1) { // Note: the only condition watched for by threads // synchronizing on the messageQueue member is that it is // not empty. As soon as you introduce some other // condition you will have to call notifyAll instead of // notify below. synchronized (this.messageQueue) { this.messageQueue.addLast(packet); this.messageQueue.notify(); } } else { new UDPMessageChannel(sipStack, this, packet); } } catch (SocketException ex) { if (sipStack.isLoggingEnabled()) getSIPStack().logWriter .logDebug("UDPMessageProcessor: Stopping"); isRunning = false; // The notifyAll should be in a synchronized block. // ( bug report by Niklas Uhrberg ). synchronized (this.messageQueue) { this.messageQueue.notifyAll(); } } catch (IOException ex) { isRunning = false; ex.printStackTrace(); if (sipStack.isLoggingEnabled()) getSIPStack().logWriter .logDebug("UDPMessageProcessor: Got an IO Exception"); } catch (Exception ex) { if (sipStack.isLoggingEnabled()) getSIPStack().logWriter .logDebug("UDPMessageProcessor: Unexpected Exception - quitting"); InternalErrorHandler.handleException(ex); return; } } } /** * Shut down the message processor. Close the socket for recieving incoming * messages. */ public void stop() { synchronized (this.messageQueue) { this.isRunning = false; this.messageQueue.notifyAll(); sock.close(); } } /** * Return the transport string. * * @return the transport string */ public String getTransport() { return "udp"; } /** * Returns the stack. * * @return my sip stack. */ public SIPTransactionStack getSIPStack() { return sipStack; } /** * Create and return new TCPMessageChannel for the given host/port. */ public MessageChannel createMessageChannel(HostPort targetHostPort) throws UnknownHostException { return new UDPMessageChannel(targetHostPort.getInetAddress(), targetHostPort.getPort(), sipStack, this); } public MessageChannel createMessageChannel(InetAddress host, int port) throws IOException { return new UDPMessageChannel(host, port, sipStack, this); } /** * Default target port for UDP */ public int getDefaultTargetPort() { return 5060; } /** * UDP is not a secure protocol. */ public boolean isSecure() { return false; } /** * UDP can handle a message as large as the MAX_DATAGRAM_SIZE. */ public int getMaximumMessageSize() { return MAX_DATAGRAM_SIZE; } /** * Return true if there are any messages in use. */ public boolean inUse() { synchronized (messageQueue) { return messageQueue.size() != 0; } } }