dk.netarkivet.common.distribute
Class JMSConnection

java.lang.Object
  extended by dk.netarkivet.common.distribute.JMSConnection
All Implemented Interfaces:
CleanupIF, javax.jms.ExceptionListener
Direct Known Subclasses:
JMSConnectionSunMQ

public abstract class JMSConnection
extends java.lang.Object
implements javax.jms.ExceptionListener, CleanupIF

Handles the communication with a JMS broker. Note on Thread-safety: the methods and fields of JMSConnection are not accessed by multiple threads (though JMSConnection itself creates threads). Thus no synchronization is needed on methods and fields of JMSConnection. A shutdown hook is also added, which closes the connection. Class JMSConnection is now also a exceptionhandler for the JMS Connections


Field Summary
(package private) static java.lang.String CONSUMER_KEY_SEPARATOR
          Separator used in the consumerkey.
protected  java.util.Map<java.lang.String,javax.jms.MessageConsumer> consumers
          Set for caching consumers (topic-subscribers and queue-receivers).
(package private) static int JMS_MAXTRIES
          The number to times to (re)try whenever a JMSException is thrown.
protected static org.apache.commons.logging.Log log
          The log.
protected  javax.jms.QueueConnection myQConn
          The QueueConnection.
protected  javax.jms.QueueConnectionFactory myQConnFactory
          The factory used to create QueueConnections.
protected  javax.jms.QueueSession myQSess
          The Session handling messages sent to / received from the NetarchiveSuite queues.
protected  javax.jms.TopicConnection myTConn
          The TopicConnection.
protected  javax.jms.TopicConnectionFactory myTConnFactory
          The factory used to create TopicConnections.
protected  javax.jms.TopicSession myTSess
          The Session handling messages sent to / received from the NetarchiveSuite topics.
protected  java.util.Map<java.lang.String,javax.jms.TopicPublisher> publishers
          Map for caching Topic publishers.
protected static java.util.concurrent.atomic.AtomicBoolean reconnectInProgress
          Semaphore for whether or not a reconnect is in progress.
protected  java.util.Map<java.lang.String,javax.jms.QueueSender> senders
          Map for caching Queue senders.
 
Constructor Summary
protected JMSConnection()
          Class constructor.
 
Method Summary
 void cleanup()
          Clean up.
protected  void close()
          Close all connections to the JMS broker.
protected  void establishConnectionAndSessions()
          Helper method to establish one QueueConnection and associated Session, and one TopicConnection and associated Session.
protected static java.lang.String getChannelName(java.lang.String consumerKey)
          Get the channelName embedded in a consumerKey.
 java.lang.String getHost()
          Get the hostname for the JMSBroker.
 java.lang.String getPort()
          Get the port for the JMSBroker.
protected abstract  javax.jms.Queue getQueue(java.lang.String queueName)
          Should be implemented according to a specific JMS broker.
protected abstract  javax.jms.QueueConnectionFactory getQueueConnectionFactory()
          Should be implemented according to a specific JMS broker.
protected abstract  javax.jms.Topic getTopic(java.lang.String topicName)
          Should be implemented according to a specific JMS broker.
protected abstract  javax.jms.TopicConnectionFactory getTopicConnectionFactory()
          Should be implemented according to a specific JMS broker.
protected  void initConnection()
          Initializes the JMS connection.
abstract  void onException(javax.jms.JMSException e)
          Exceptionhandler for the JMSConnection.
 java.util.List<javax.jms.Message> removeAllMessages(ChannelID mq)
          Deprecated. "Method does not work, See bugs 422 and 423.
 void removeListener(ChannelID mq, javax.jms.MessageListener ml)
          Removes the specified MessageListener from the given queue or topic.
 void reply(NetarkivetMessage nMsg)
          Submit an object to the reply queue.
 void resend(NetarkivetMessage msg, ChannelID to)
          Sends a message msg to the channel defined by the parameter to - NOT the channel defined in the message.
 void send(NetarkivetMessage nMsg)
          Submit an object to the destination queue.
protected  void sendMessage(NetarkivetMessage nMsg, ChannelID to)
          Submit an ObjectMessage to the destination channel.
 void setListener(ChannelID mq, javax.jms.MessageListener ml)
          Method adds a listener to the given queue or topic.
static NetarkivetMessage unpack(javax.jms.Message msg)
          Unwraps a NetarkivetMessage from an ObjectMessage.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

log

protected static final org.apache.commons.logging.Log log
The log.


myQConnFactory

protected javax.jms.QueueConnectionFactory myQConnFactory
The factory used to create QueueConnections.


myQConn

protected javax.jms.QueueConnection myQConn
The QueueConnection.


myQSess

protected javax.jms.QueueSession myQSess
The Session handling messages sent to / received from the NetarchiveSuite queues.


myTConnFactory

protected javax.jms.TopicConnectionFactory myTConnFactory
The factory used to create TopicConnections.


myTConn

protected javax.jms.TopicConnection myTConn
The TopicConnection.


myTSess

protected javax.jms.TopicSession myTSess
The Session handling messages sent to / received from the NetarchiveSuite topics.


reconnectInProgress

protected static java.util.concurrent.atomic.AtomicBoolean reconnectInProgress
Semaphore for whether or not a reconnect is in progress.


senders

protected java.util.Map<java.lang.String,javax.jms.QueueSender> senders
Map for caching Queue senders.


publishers

protected java.util.Map<java.lang.String,javax.jms.TopicPublisher> publishers
Map for caching Topic publishers.


consumers

protected java.util.Map<java.lang.String,javax.jms.MessageConsumer> consumers
Set for caching consumers (topic-subscribers and queue-receivers). Serves the purpose of closing all consumers on call to JMSConnection.close()


CONSUMER_KEY_SEPARATOR

static final java.lang.String CONSUMER_KEY_SEPARATOR
Separator used in the consumerkey. Separates the ChannelName from the MessageListener.toString().

See Also:
Constant Field Values

JMS_MAXTRIES

static final int JMS_MAXTRIES
The number to times to (re)try whenever a JMSException is thrown.

See Also:
Constant Field Values
Constructor Detail

JMSConnection

protected JMSConnection()
Class constructor. Sets the broker address and port number using values taken from settings.

Method Detail

initConnection

protected void initConnection()
Initializes the JMS connection. Creating and starting connections to queues and topics. Adds the JMSConnection to a shutdown hook. Adds this object as ExceptionListener for the queue and topic connections.

Throws:
IOFailure - if initialization fails.

establishConnectionAndSessions

protected void establishConnectionAndSessions()
                                       throws javax.jms.JMSException
Helper method to establish one QueueConnection and associated Session, and one TopicConnection and associated Session.

Throws:
javax.jms.JMSException

getQueueConnectionFactory

protected abstract javax.jms.QueueConnectionFactory getQueueConnectionFactory()
                                                                       throws javax.jms.JMSException
Should be implemented according to a specific JMS broker.

Returns:
QueueConnectionFactory
Throws:
javax.jms.JMSException

getTopicConnectionFactory

protected abstract javax.jms.TopicConnectionFactory getTopicConnectionFactory()
                                                                       throws javax.jms.JMSException
Should be implemented according to a specific JMS broker.

Returns:
TopicConnectionFactory
Throws:
javax.jms.JMSException

getQueue

protected abstract javax.jms.Queue getQueue(java.lang.String queueName)
                                     throws javax.jms.JMSException
Should be implemented according to a specific JMS broker.

Parameters:
queueName - the name of the wanted Queue
Returns:
Queue
Throws:
javax.jms.JMSException

getTopic

protected abstract javax.jms.Topic getTopic(java.lang.String topicName)
                                     throws javax.jms.JMSException
Should be implemented according to a specific JMS broker.

Parameters:
topicName - The name of the wanted Topic
Returns:
Topic
Throws:
javax.jms.JMSException

send

public final void send(NetarkivetMessage nMsg)
Submit an object to the destination queue. This method cannot be overridden. Override the method sendMessage to change functionality.

Parameters:
nMsg - The NetarkivetMessage to send to the destination queue (null not allowed)
Throws:
ArgumentNotValid - if nMsg is null.
IOFailure - if the operation failed.

reply

public void reply(NetarkivetMessage nMsg)
Submit an object to the reply queue. We try to do it JMX_MAXTRIES times before giving up.

Parameters:
nMsg - The NetarkivetMessage to send to the reply queue (null not allowed)
Throws:
ArgumentNotValid - if nMsg is null.
PermissionDenied - if message nMsg has not been sent yet.
IOFailure - if unable to reply.

sendMessage

protected void sendMessage(NetarkivetMessage nMsg,
                           ChannelID to)
                    throws IOFailure
Submit an ObjectMessage to the destination channel.

Parameters:
nMsg - the NetarkivetMessage to be wrapped and send as an ObjectMessage
to - the destination channel
Throws:
IOFailure - if message failed to be sent.

resend

public final void resend(NetarkivetMessage msg,
                         ChannelID to)
Sends a message msg to the channel defined by the parameter to - NOT the channel defined in the message.

Parameters:
msg - Message to be sent
to - The destination channel

close

protected void close()
Close all connections to the JMS broker.

Throws:
IOFailure - if closing one of the internal connection objects failed.

cleanup

public void cleanup()
Clean up.

Specified by:
cleanup in interface CleanupIF

unpack

public static NetarkivetMessage unpack(javax.jms.Message msg)
                                throws ArgumentNotValid
Unwraps a NetarkivetMessage from an ObjectMessage.

Parameters:
msg - a javax.jms.ObjectMessage
Returns:
a NetarkivetMessage
Throws:
ArgumentNotValid - when msg in valid or reply from JMS server is invalid

setListener

public void setListener(ChannelID mq,
                        javax.jms.MessageListener ml)
                 throws IOFailure
Method adds a listener to the given queue or topic.

Parameters:
mq - the messagequeue to listen to
ml - the messagelistener
Throws:
IOFailure - if the operation failed.

removeListener

public void removeListener(ChannelID mq,
                           javax.jms.MessageListener ml)
                    throws IOFailure
Removes the specified MessageListener from the given queue or topic.

Parameters:
mq - the given queue or topic
ml - a messagelistener
Throws:
IOFailure

removeAllMessages

public java.util.List<javax.jms.Message> removeAllMessages(ChannelID mq)
Deprecated. "Method does not work, See bugs 422 and 423.

Remove all messages waiting in a queue/topic. When this method ends, there are no messages on the queue/topic (though some can still arrive afterwards). For a queue, this does not ensure that all messages disappear, other processes could still make it in and take some. For a topic, this does not ensure that nobody will receive these messages. FIXME This method does not work at all. Se bug #422, and #423.

Parameters:
mq - The queue/topic to remove messages from
Returns:
A list of all messages removed from the queue/topic.

getChannelName

protected static java.lang.String getChannelName(java.lang.String consumerKey)
Get the channelName embedded in a consumerKey.

Parameters:
consumerKey - a consumerKey
Returns:
name of channel embedded in a consumerKey

getHost

public java.lang.String getHost()
Get the hostname for the JMSBroker.

Returns:
the hostname for the JMSBroker

getPort

public java.lang.String getPort()
Get the port for the JMSBroker.

Returns:
the port for the JMSBroker

onException

public abstract void onException(javax.jms.JMSException e)
Exceptionhandler for the JMSConnection. Implemented according to a specific JMS broker.

Specified by:
onException in interface javax.jms.ExceptionListener
Parameters:
e - an JMSException