Package dk.netarkivet.common.distribute
Class JMSConnection
- java.lang.Object
-
- dk.netarkivet.common.distribute.JMSConnection
-
- All Implemented Interfaces:
CleanupIF
,javax.jms.ExceptionListener
- Direct Known Subclasses:
JMSConnectionMockupMQ
,JMSConnectionSunMQ
public abstract class JMSConnection extends Object implements javax.jms.ExceptionListener, CleanupIF
Handles the communication with a JMS broker. Note on Thread-safety: the methods and fields of JMSConnection are not accessed by multiple threads (though JMSConnection itself creates threads). Thus no synchronization is needed on methods and fields of JMSConnection. A shutdown hook is also added, which closes the connection. Class JMSConnection is now also a exceptionhandler for the JMS Connections
-
-
Field Summary
Fields Modifier and Type Field Description protected Thread
closeHook
Shutdown hook that closes the JMS connection.protected javax.jms.Connection
connection
The JMS Connection.protected ReentrantReadWriteLock
connectionLock
Lock for the connection.protected static String
CONSUMER_KEY_SEPARATOR
Separator used in the consumer key.protected Map<String,javax.jms.MessageConsumer>
consumers
Map for caching message consumers (topic-subscribers and queue-receivers).protected static JMSConnection
instance
Singleton pattern is be used for this class.protected Map<String,javax.jms.MessageListener>
listeners
Map for caching message listeners (topic-subscribers and queue-receivers).protected Map<String,javax.jms.MessageProducer>
producers
Map for caching message producers.protected javax.jms.Session
session
The Session handling messages sent to / received from the NetarchiveSuite queues and topics.
-
Constructor Summary
Constructors Modifier Constructor Description protected
JMSConnection()
Class constructor.
-
Method Summary
All Methods Static Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description void
cleanup()
Clean up.javax.jms.QueueBrowser
createQueueBrowser(ChannelID queueID)
Creates a QueueBrowser object to peek at the messages on the specified queue.javax.jms.QueueBrowser
createQueueBrowser(ChannelID queueID, javax.jms.QueueSession QSession)
Creates a QueueBrowser object to peek at the messages on the specified queue.protected abstract javax.jms.ConnectionFactory
getConnectionFactory()
Should be implemented according to a specific JMS broker.protected static String
getConsumerKey(String channel, javax.jms.MessageListener messageListener)
Generate a consumerkey based on the given channel name and messageListener.protected abstract javax.jms.Destination
getDestination(String destinationName)
Should be implemented according to a specific JMS broker.abstract javax.jms.QueueSession
getQueueSession()
Provides a QueueSession instance.protected void
initConnection()
Initializes the JMS connection.abstract void
onException(javax.jms.JMSException e)
Exceptionhandler for the JMSConnection.protected void
reconnect()
Do a reconnect to the JMSbroker.void
removeListener(ChannelID mq, javax.jms.MessageListener ml)
Removes the specified MessageListener from the given queue or topic.void
reply(NetarkivetMessage msg)
Submit an object to the reply queue.void
resend(NetarkivetMessage msg, ChannelID to)
Sends a message msg to the channel defined by the parameter to - NOT the channel defined in the message.void
send(NetarkivetMessage msg)
Submit an object to the destination queue.protected void
sendMessage(NetarkivetMessage nMsg, ChannelID to)
Submit an ObjectMessage to the destination channel.void
setListener(ChannelID mq, javax.jms.MessageListener ml)
Method adds a listener to the given queue or topic.static NetarkivetMessage
unpack(javax.jms.Message msg)
Unwraps a NetarkivetMessage from an ObjectMessage.
-
-
-
Field Detail
-
CONSUMER_KEY_SEPARATOR
protected static final String CONSUMER_KEY_SEPARATOR
Separator used in the consumer key. Separates the ChannelName from the MessageListener.toString().- See Also:
- Constant Field Values
-
connection
protected javax.jms.Connection connection
The JMS Connection.
-
session
protected javax.jms.Session session
The Session handling messages sent to / received from the NetarchiveSuite queues and topics.
-
producers
protected final Map<String,javax.jms.MessageProducer> producers
Map for caching message producers.
-
consumers
protected final Map<String,javax.jms.MessageConsumer> consumers
Map for caching message consumers (topic-subscribers and queue-receivers).
-
listeners
protected final Map<String,javax.jms.MessageListener> listeners
Map for caching message listeners (topic-subscribers and queue-receivers).
-
connectionLock
protected final ReentrantReadWriteLock connectionLock
Lock for the connection. Locked for read on adding/removing listeners and sending messages. Locked for write when connection, releasing and reconnecting.
-
closeHook
protected Thread closeHook
Shutdown hook that closes the JMS connection.
-
instance
protected static JMSConnection instance
Singleton pattern is be used for this class. This is the one and only instance.
-
-
Method Detail
-
getConnectionFactory
protected abstract javax.jms.ConnectionFactory getConnectionFactory() throws javax.jms.JMSException
Should be implemented according to a specific JMS broker.- Returns:
- QueueConnectionFactory
- Throws:
javax.jms.JMSException
- If unable to get QueueConnectionFactory
-
getDestination
protected abstract javax.jms.Destination getDestination(String destinationName) throws javax.jms.JMSException
Should be implemented according to a specific JMS broker.- Parameters:
destinationName
- the name of the wanted Queue- Returns:
- The destination. Note that the implementation should make sure that this is a Queue or a Topic, as
required by the NetarchiveSuite design.
Channels.isTopic(String)
- Throws:
javax.jms.JMSException
- If unable to get a destination.
-
onException
public abstract void onException(javax.jms.JMSException e)
Exceptionhandler for the JMSConnection. Implemented according to a specific JMS broker. Should try to reconnect if at all possible.- Specified by:
onException
in interfacejavax.jms.ExceptionListener
- Parameters:
e
- a JMSException
-
initConnection
protected void initConnection() throws IOFailure
Initializes the JMS connection. Creates and starts connection and session. Adds a shutdown hook that closes down JMSConnection. Adds this object as ExceptionListener for the connection.- Throws:
IOFailure
- if initialization fails.
-
send
public void send(NetarkivetMessage msg)
Submit an object to the destination queue. This method shouldn't be overridden. Override the method sendMessage to change functionality.- Parameters:
msg
- The NetarkivetMessage to send to the destination queue (null not allowed)- Throws:
ArgumentNotValid
- if nMsg is null.IOFailure
- if the operation failed.
-
resend
public final void resend(NetarkivetMessage msg, ChannelID to)
Sends a message msg to the channel defined by the parameter to - NOT the channel defined in the message.- Parameters:
msg
- Message to be sentto
- The destination channel
-
reply
public final void reply(NetarkivetMessage msg)
Submit an object to the reply queue.- Parameters:
msg
- The NetarkivetMessage to send to the reply queue (null not allowed)- Throws:
ArgumentNotValid
- if nMsg is null.PermissionDenied
- if message nMsg has not been sent yet.IOFailure
- if unable to reply.
-
setListener
public void setListener(ChannelID mq, javax.jms.MessageListener ml) throws IOFailure
Method adds a listener to the given queue or topic.- Parameters:
mq
- the messagequeue to listen toml
- the messagelistener- Throws:
IOFailure
- if the operation failed.
-
removeListener
public void removeListener(ChannelID mq, javax.jms.MessageListener ml) throws IOFailure
Removes the specified MessageListener from the given queue or topic.- Parameters:
mq
- the given queue or topicml
- a messagelistener- Throws:
IOFailure
- On network trouble
-
createQueueBrowser
public javax.jms.QueueBrowser createQueueBrowser(ChannelID queueID) throws javax.jms.JMSException
Creates a QueueBrowser object to peek at the messages on the specified queue.- Parameters:
queueID
- The ChannelID for a specified queue.- Returns:
- A new QueueBrowser instance with access to the specified queue
- Throws:
javax.jms.JMSException
- If unable to create the specified queue browser
-
createQueueBrowser
public javax.jms.QueueBrowser createQueueBrowser(ChannelID queueID, javax.jms.QueueSession QSession) throws javax.jms.JMSException
Creates a QueueBrowser object to peek at the messages on the specified queue.- Parameters:
queueID
- The ChannelID for a specified queue.QSession
- The QueueSession to use.- Returns:
- A new QueueBrowser instance with access to the specified queue
- Throws:
javax.jms.JMSException
- If unable to create the specified queue browser
-
getQueueSession
public abstract javax.jms.QueueSession getQueueSession() throws javax.jms.JMSException
Provides a QueueSession instance. Functionality for retrieving aQueueSession
object isen't available on the genericJMSConnectionFactory
- Returns:
- A
QueueSession
object connected to the current JMS broker - Throws:
javax.jms.JMSException
- Failure to retrieve theQueueBrowser
JMS Browser
-
cleanup
public void cleanup()
Clean up. Remove close connection, remove shutdown hook and null the instance.
-
unpack
public static NetarkivetMessage unpack(javax.jms.Message msg) throws ArgumentNotValid
Unwraps a NetarkivetMessage from an ObjectMessage.- Parameters:
msg
- a javax.jms.ObjectMessage- Returns:
- a NetarkivetMessage
- Throws:
ArgumentNotValid
- when msg in valid or format of JMS Object message is invalid
-
sendMessage
protected void sendMessage(NetarkivetMessage nMsg, ChannelID to) throws IOFailure
Submit an ObjectMessage to the destination channel.- Parameters:
nMsg
- the NetarkivetMessage to be wrapped and send as an ObjectMessageto
- the destination channel- Throws:
IOFailure
- if message failed to be sent.
-
reconnect
protected void reconnect()
Do a reconnect to the JMSbroker. Does absolutely nothing, if already in the process of reconnecting.
-
-