001/*
002 * #%L
003 * Netarchivesuite - common
004 * %%
005 * Copyright (C) 2005 - 2018 The Royal Danish Library, 
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.common.distribute;
024
025import java.util.Calendar;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.Map;
029import java.util.concurrent.locks.ReentrantReadWriteLock;
030
031import javax.jms.Connection;
032import javax.jms.ConnectionFactory;
033import javax.jms.Destination;
034import javax.jms.ExceptionListener;
035import javax.jms.JMSException;
036import javax.jms.Message;
037import javax.jms.MessageConsumer;
038import javax.jms.MessageListener;
039import javax.jms.MessageProducer;
040import javax.jms.ObjectMessage;
041import javax.jms.Queue;
042import javax.jms.QueueBrowser;
043import javax.jms.QueueSession;
044import javax.jms.Session;
045
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049import dk.netarkivet.common.CommonSettings;
050import dk.netarkivet.common.exceptions.ArgumentNotValid;
051import dk.netarkivet.common.exceptions.IOFailure;
052import dk.netarkivet.common.exceptions.PermissionDenied;
053import dk.netarkivet.common.utils.CleanupHook;
054import dk.netarkivet.common.utils.CleanupIF;
055import dk.netarkivet.common.utils.Settings;
056import dk.netarkivet.common.utils.TimeUtils;
057
058/**
059 * Handles the communication with a JMS broker. Note on Thread-safety: the methods and fields of JMSConnection are not
060 * accessed by multiple threads (though JMSConnection itself creates threads). Thus no synchronization is needed on
061 * methods and fields of JMSConnection. A shutdown hook is also added, which closes the connection. Class JMSConnection
062 * is now also a exceptionhandler for the JMS Connections
063 */
064public abstract class JMSConnection implements ExceptionListener, CleanupIF {
065
066    /** The log. */
067    private static final Logger log = LoggerFactory.getLogger(JMSConnection.class);
068
069    /** Separator used in the consumer key. Separates the ChannelName from the MessageListener.toString(). */
070    protected static final String CONSUMER_KEY_SEPARATOR = "##";
071
072    /** The number to times to (re)try whenever a JMSException is thrown. */
073    static final int JMS_MAXTRIES = Settings.getInt(CommonSettings.JMS_BROKER_RETRIES);
074
075    /** The JMS Connection. */
076    protected Connection connection;
077
078    /**
079     * The Session handling messages sent to / received from the NetarchiveSuite queues and topics.
080     */
081    protected Session session;
082
083    /** Map for caching message producers. */
084    protected final Map<String, MessageProducer> producers = Collections
085            .synchronizedMap(new HashMap<String, MessageProducer>());
086
087    /** Map for caching message consumers (topic-subscribers and queue-receivers). */
088    protected final Map<String, MessageConsumer> consumers = Collections
089            .synchronizedMap(new HashMap<String, MessageConsumer>());
090
091    /** Map for caching message listeners (topic-subscribers and queue-receivers). */
092    protected final Map<String, MessageListener> listeners = Collections
093            .synchronizedMap(new HashMap<String, MessageListener>());
094
095    /**
096     * Lock for the connection. Locked for read on adding/removing listeners and sending messages. Locked for write when
097     * connection, releasing and reconnecting.
098     */
099    protected final ReentrantReadWriteLock connectionLock = new ReentrantReadWriteLock();
100
101    /** Shutdown hook that closes the JMS connection. */
102    protected Thread closeHook;
103
104    /** Singleton pattern is be used for this class. This is the one and only instance. */
105    protected static JMSConnection instance;
106
107    /**
108     * Should be implemented according to a specific JMS broker.
109     *
110     * @return QueueConnectionFactory
111     * @throws JMSException If unable to get QueueConnectionFactory
112     */
113    protected abstract ConnectionFactory getConnectionFactory() throws JMSException;
114
115    /**
116     * Should be implemented according to a specific JMS broker.
117     *
118     * @param destinationName the name of the wanted Queue
119     * @return The destination. Note that the implementation should make sure that this is a Queue or a Topic, as
120     * required by the NetarchiveSuite design. {@link Channels#isTopic(String)}
121     * @throws JMSException If unable to get a destination.
122     */
123    protected abstract Destination getDestination(String destinationName) throws JMSException;
124
125    /**
126     * Exceptionhandler for the JMSConnection. Implemented according to a specific JMS broker. Should try to reconnect
127     * if at all possible.
128     *
129     * @param e a JMSException
130     */
131    public abstract void onException(JMSException e);
132
133    /** Class constructor. */
134    protected JMSConnection() {
135    }
136
137    /**
138     * Initializes the JMS connection. Creates and starts connection and session. Adds a shutdown hook that closes down
139     * JMSConnection. Adds this object as ExceptionListener for the connection.
140     *
141     * @throws IOFailure if initialization fails.
142     */
143    protected void initConnection() throws IOFailure {
144        log.debug("Initializing a JMS connection {}", getClass().getName());
145
146        connectionLock.writeLock().lock();
147        try {
148            int tries = 0;
149            JMSException lastException = null;
150            boolean operationSuccessful = false;
151            while (!operationSuccessful && tries < JMS_MAXTRIES) {
152                ++tries;
153                try {
154                    establishConnectionAndSession();
155                    operationSuccessful = true;
156                } catch (JMSException e) {
157                    closeConnection();
158                    log.debug("Connect failed (try {})", tries, e);
159                    lastException = e;
160                    if (tries < JMS_MAXTRIES) {
161                        log.debug("Will sleep a while before trying to connect again");
162                        TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE);
163                    }
164                }
165            }
166            if (!operationSuccessful) {
167                log.warn("Could not initialize JMS connection {}", getClass(), lastException);
168                cleanup();
169                throw new IOFailure("Could not initialize JMS connection " + getClass(), lastException);
170            }
171            closeHook = new CleanupHook(this);
172            Runtime.getRuntime().addShutdownHook(closeHook);
173        } finally {
174            connectionLock.writeLock().unlock();
175        }
176    }
177
178    /**
179     * Submit an object to the destination queue. This method shouldn't be overridden. Override the method sendMessage
180     * to change functionality.
181     *
182     * @param msg The NetarkivetMessage to send to the destination queue (null not allowed)
183     * @throws ArgumentNotValid if nMsg is null.
184     * @throws IOFailure if the operation failed.
185     */
186    public void send(NetarkivetMessage msg) {
187        ArgumentNotValid.checkNotNull(msg, "msg");
188        log.trace("Sending message ({}) to {}", msg.toString(), msg.getTo());
189        sendMessage(msg, msg.getTo());
190    }
191
192    /**
193     * Sends a message msg to the channel defined by the parameter to - NOT the channel defined in the message.
194     *
195     * @param msg Message to be sent
196     * @param to The destination channel
197     */
198    public final void resend(NetarkivetMessage msg, ChannelID to) {
199        ArgumentNotValid.checkNotNull(msg, "msg");
200        ArgumentNotValid.checkNotNull(to, "to");
201        log.trace("Resending message ({}) to {}", msg.toString(), to.getName());
202        sendMessage(msg, to);
203    }
204
205    /**
206     * Submit an object to the reply queue.
207     *
208     * @param msg The NetarkivetMessage to send to the reply queue (null not allowed)
209     * @throws ArgumentNotValid if nMsg is null.
210     * @throws PermissionDenied if message nMsg has not been sent yet.
211     * @throws IOFailure if unable to reply.
212     */
213    public final void reply(NetarkivetMessage msg) {
214        ArgumentNotValid.checkNotNull(msg, "msg");
215        log.trace("Reply on message ({}) to {}", msg.toString(), msg.getReplyTo().getName());
216        if (!msg.hasBeenSent()) {
217            throw new PermissionDenied("Message has not been sent yet");
218        }
219        sendMessage(msg, msg.getReplyTo());
220    }
221
222    /**
223     * Method adds a listener to the given queue or topic.
224     *
225     * @param mq the messagequeue to listen to
226     * @param ml the messagelistener
227     * @throws IOFailure if the operation failed.
228     */
229    public void setListener(ChannelID mq, MessageListener ml) throws IOFailure {
230        ArgumentNotValid.checkNotNull(mq, "ChannelID mq");
231        ArgumentNotValid.checkNotNull(ml, "MessageListener ml");
232        setListener(mq.getName(), ml);
233    }
234
235    /**
236     * Removes the specified MessageListener from the given queue or topic.
237     *
238     * @param mq the given queue or topic
239     * @param ml a messagelistener
240     * @throws IOFailure On network trouble
241     */
242    public void removeListener(ChannelID mq, MessageListener ml) throws IOFailure {
243        ArgumentNotValid.checkNotNull(mq, "ChannelID mq");
244        ArgumentNotValid.checkNotNull(ml, "MessageListener ml");
245        removeListener(ml, mq.getName());
246    }
247
248    /**
249     * Creates a QueueBrowser object to peek at the messages on the specified queue.
250     *
251     * @param queueID The ChannelID for a specified queue.
252     * @return A new QueueBrowser instance with access to the specified queue
253     * @throws JMSException If unable to create the specified queue browser
254     */
255    public QueueBrowser createQueueBrowser(ChannelID queueID) throws JMSException {
256        ArgumentNotValid.checkNotNull(queueID, "ChannelID queueID");
257        Queue queue = getQueueSession().createQueue(queueID.getName());
258        return getQueueSession().createBrowser(queue);
259    }
260
261    /**
262     * Provides a QueueSession instance. Functionality for retrieving a <code>QueueSession</code> object isen't
263     * available on the generic <code>JMSConnectionFactory</code>
264     *
265     * @return A <code>QueueSession</code> object connected to the current JMS broker
266     * @throws JMSException Failure to retrieve the <code>QueueBrowser</code> JMS Browser
267     */
268    public abstract QueueSession getQueueSession() throws JMSException;
269
270    /**
271     * Clean up. Remove close connection, remove shutdown hook and null the instance.
272     */
273    public void cleanup() {
274        connectionLock.writeLock().lock();
275        try {
276            // Remove shutdown hook
277            log.info("Starting cleanup");
278            try {
279                if (closeHook != null) {
280                    Runtime.getRuntime().removeShutdownHook(closeHook);
281                }
282            } catch (IllegalStateException e) {
283                // Okay, it just means we are already shutting down.
284            }
285            closeHook = null;
286            // Close session
287            closeConnection();
288            // Clear list of listeners
289            listeners.clear();
290            instance = null;
291            log.info("Cleanup finished");
292        } finally {
293            connectionLock.writeLock().unlock();
294        }
295    }
296
297    /**
298     * Close connection, session and listeners. Will ignore trouble, and simply log it.
299     */
300    private void closeConnection() {
301        // Close terminates all pending message received on the
302        // connection's session's consumers.
303        if (connection != null) { // close connection
304            try {
305                connection.close();
306            } catch (JMSException e) {
307                // Just ignore it
308                log.warn("Error closing JMS Connection.", e);
309            }
310        }
311        connection = null;
312        session = null;
313        consumers.clear();
314        producers.clear();
315    }
316
317    /**
318     * Unwraps a NetarkivetMessage from an ObjectMessage.
319     *
320     * @param msg a javax.jms.ObjectMessage
321     * @return a NetarkivetMessage
322     * @throws ArgumentNotValid when msg in valid or format of JMS Object message is invalid
323     */
324    public static NetarkivetMessage unpack(Message msg) throws ArgumentNotValid {
325        ArgumentNotValid.checkNotNull(msg, "msg");
326
327        ObjectMessage objMsg;
328        try {
329            objMsg = (ObjectMessage) msg;
330        } catch (ClassCastException e) {
331            log.warn("Invalid message type: {}", msg.getClass());
332            throw new ArgumentNotValid("Invalid message type: " + msg.getClass());
333        }
334
335        NetarkivetMessage netMsg;
336        String classname = "Unknown class"; // for error reporting purposes
337        try {
338            classname = objMsg.getObject().getClass().getName();
339            netMsg = (NetarkivetMessage) objMsg.getObject();
340            // Note: Id is only updated if the message does not already have an
341            // id. On unpack, this means the first time the message is received.
342
343            // FIXME Fix for NAS-2043 doesn't seem to work
344            // String randomID = UUID.randomUUID().toString();
345            // netMsg.updateId(randomID);
346
347            netMsg.updateId(msg.getJMSMessageID());
348        } catch (ClassCastException e) {
349            log.warn("Invalid message type: {}", classname, e);
350            throw new ArgumentNotValid("Invalid message type: " + classname, e);
351        } catch (Exception e) {
352            String message = "Message invalid. Unable to unpack message: " + classname;
353            log.warn(message, e);
354            throw new ArgumentNotValid(message, e);
355        }
356        log.trace("Unpacked message '{}'", netMsg);
357        return netMsg;
358    }
359
360    /**
361     * Submit an ObjectMessage to the destination channel.
362     *
363     * @param nMsg the NetarkivetMessage to be wrapped and send as an ObjectMessage
364     * @param to the destination channel
365     * @throws IOFailure if message failed to be sent.
366     */
367    protected void sendMessage(NetarkivetMessage nMsg, ChannelID to) throws IOFailure {
368        Exception lastException = null;
369        boolean operationSuccessful = false;
370        int tries = 0;
371
372        while (!operationSuccessful && tries < JMS_MAXTRIES) {
373            ++tries;
374            try {
375                doSend(nMsg, to);
376                operationSuccessful = true;
377            } catch (JMSException e) {
378                log.debug("Send failed (try {})", tries, e);
379                lastException = e;
380                if (tries < JMS_MAXTRIES) {
381                    onException(e);
382                    log.debug("Will sleep a while before trying to send again");
383                    TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE);
384                }
385            } catch (Exception e) {
386                log.debug("Send failed (try {})", tries, e);
387                lastException = e;
388                if (tries < JMS_MAXTRIES) {
389                    reconnect();
390                    log.debug("Will sleep a while before trying to send again");
391                    TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE);
392                }
393            }
394        }
395        if (!operationSuccessful) {
396            log.warn("Send failed", lastException);
397            throw new IOFailure("Send failed.", lastException);
398        }
399    }
400
401    /**
402     * Do a reconnect to the JMSbroker. Does absolutely nothing, if already in the process of reconnecting.
403     */
404    protected void reconnect() {
405        if (!connectionLock.writeLock().tryLock()) {
406            log.debug("Reconnection already in progress. Do nothing");
407            return;
408        }
409        try {
410            log.info("Trying to reconnect to jmsbroker");
411
412            boolean operationSuccessful = false;
413            Exception lastException = null;
414            int tries = 0;
415            while (!operationSuccessful && tries < JMS_MAXTRIES) {
416                ++tries;
417                try {
418                    doReconnect();
419                    operationSuccessful = true;
420                } catch (Exception e) {
421                    lastException = e;
422                    log.debug("Reconnect failed (try {})", tries, e);
423                    if (tries < JMS_MAXTRIES) {
424                        log.debug("Will sleep a while before trying to reconnect again");
425                        TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE);
426                    }
427                }
428            }
429            if (!operationSuccessful) {
430                log.warn("Reconnect to JMS broker failed", lastException);
431                closeConnection();
432            }
433        } finally {
434            // Tell everybody, that we are not trying to reconnect any longer
435            connectionLock.writeLock().unlock();
436        }
437    }
438
439    /**
440     * Helper method for getting the right producer for a queue or topic.
441     *
442     * @param queueName The name of the channel
443     * @return The producer for that channel. A new one is created, if none exists.
444     * @throws JMSException If a new producer cannot be created.
445     */
446    private MessageProducer getProducer(String queueName) throws JMSException {
447        // Check if producer is in cache
448        // If it is not, it is created and stored in cache:
449        MessageProducer producer = producers.get(queueName);
450        if (producer == null) {
451            producer = getSession().createProducer(getDestination(queueName));
452            producers.put(queueName, producer);
453        }
454        return producer;
455    }
456
457    /**
458     * Get the session. Will try reconnecting if session is null.
459     *
460     * @return The session.
461     * @throws IOFailure if no session is available, and reconnect does not help.
462     */
463    private Session getSession() {
464        if (session == null) {
465            reconnect();
466        }
467        if (session == null) {
468            throw new IOFailure("Session not available");
469        }
470        return session;
471    }
472
473    /**
474     * Helper method for getting the right consumer for a queue or topic, and message listener.
475     *
476     * @param channelName The name of the channel
477     * @param ml The message listener to add as listener to the channel
478     * @return The producer for that channel. A new one is created, if none exists.
479     * @throws JMSException If a new producer cannot be created.
480     */
481    private MessageConsumer getConsumer(String channelName, MessageListener ml) throws JMSException {
482        String key = getConsumerKey(channelName, ml);
483        MessageConsumer consumer = consumers.get(key);
484        if (consumer == null) {
485            consumer = getSession().createConsumer(getDestination(channelName));
486            consumers.put(key, consumer);
487            listeners.put(key, ml);
488        }
489        return consumer;
490    }
491
492    /**
493     * Generate a consumerkey based on the given channel name and messageListener.
494     *
495     * @param channel Channel name
496     * @param messageListener a messageListener
497     * @return the generated consumerkey.
498     */
499    protected static String getConsumerKey(String channel, MessageListener messageListener) {
500        return channel + CONSUMER_KEY_SEPARATOR + messageListener;
501    }
502
503    /**
504     * Get the channelName embedded in a consumerKey.
505     *
506     * @param consumerKey a consumerKey
507     * @return name of channel embedded in a consumerKey
508     */
509    private static String getChannelName(String consumerKey) {
510        // assumes argument consumerKey was created using metod getConsumerKey()
511        return consumerKey.split(CONSUMER_KEY_SEPARATOR)[0];
512    }
513
514    /**
515     * Helper method to establish one Connection and associated Session.
516     *
517     * @throws JMSException If some JMS error occurred during the creation of the required JMS connection and session
518     */
519    private void establishConnectionAndSession() throws JMSException {
520        // Establish a queue connection and a session
521        connection = getConnectionFactory().createConnection();
522        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
523        connection.setExceptionListener(this);
524        connection.start();
525    }
526
527    /**
528     * Sends an ObjectMessage on a queue destination.
529     *
530     * @param msg the NetarkivetMessage to be wrapped and send as an ObjectMessage.
531     * @param to the destination topic.
532     * @throws JMSException if message failed to be sent.
533     */
534    private void doSend(NetarkivetMessage msg, ChannelID to) throws JMSException {
535        connectionLock.readLock().lock();
536        try {
537            ObjectMessage message = getSession().createObjectMessage(msg);
538            synchronized (msg) {
539                getProducer(to.getName()).send(message);
540                // Note: Id is only updated if the message does not already have
541                // an id. This ensures that resent messages keep the same ID
542                // TODO Is it always OK for resent messages to keep the same ID
543
544                // FIXME Solution for NAS-2043 doesn't work; rolled back
545                // String randomID = UUID.randomUUID().toString();
546                // msg.updateId(randomID);
547                msg.updateId(message.getJMSMessageID());
548
549            }
550        } finally {
551            connectionLock.readLock().unlock();
552        }
553        log.trace("Sent message '{}'", msg.toString());
554    }
555
556    /**
557     * Method adds a listener to the given queue or topic.
558     *
559     * @param channelName the messagequeue to listen to
560     * @param ml the messagelistener
561     * @throws IOFailure if the operation failed.
562     */
563    private void setListener(String channelName, MessageListener ml) {
564        log.debug("Adding {} as listener to {}", ml.toString(), channelName);
565        String errMsg = "JMS-error - could not add Listener to queue/topic: " + channelName;
566
567        int tries = 0;
568        boolean operationSuccessful = false;
569        Exception lastException = null;
570        while (!operationSuccessful && tries < JMS_MAXTRIES) {
571            ++tries;
572            try {
573                connectionLock.readLock().lock();
574                try {
575                    getConsumer(channelName, ml).setMessageListener(ml);
576                } finally {
577                    connectionLock.readLock().unlock();
578                }
579                operationSuccessful = true;
580            } catch (JMSException e) {
581                lastException = e;
582                log.debug("Set listener failed (try {})", tries, e);
583                if (tries < JMS_MAXTRIES) {
584                    onException(e);
585                    log.debug("Will sleep a while before trying to set listener again");
586                    TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE);
587                }
588            } catch (Exception e) {
589                lastException = e;
590                log.debug("Set listener failed (try {})", tries, e);
591                if (tries < JMS_MAXTRIES) {
592                    reconnect();
593                    log.debug("Will sleep a while before trying to set listener again");
594                    TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE);
595                }
596            }
597        }
598
599        if (!operationSuccessful) {
600            log.warn(errMsg, lastException);
601            throw new IOFailure(errMsg, lastException);
602        }
603    }
604
605    /**
606     * Remove a messagelistener from a channel (a queue or a topic).
607     *
608     * @param ml A specific MessageListener
609     * @param channelName a channelname
610     */
611    private void removeListener(MessageListener ml, String channelName) {
612        String errMsg = "JMS-error - could not remove Listener from queue/topic: " + channelName;
613        int tries = 0;
614        Exception lastException = null;
615        boolean operationSuccessful = false;
616
617        log.info("Removing listener from channel '{}'", channelName);
618        while (!operationSuccessful && tries < JMS_MAXTRIES) {
619            try {
620                ++tries;
621                connectionLock.readLock().lock();
622                try {
623                    MessageConsumer messageConsumer = getConsumer(channelName, ml);
624                    messageConsumer.close();
625                    consumers.remove(getConsumerKey(channelName, ml));
626                    listeners.remove(getConsumerKey(channelName, ml));
627                } finally {
628                    connectionLock.readLock().unlock();
629                }
630                operationSuccessful = true;
631            } catch (JMSException e) {
632                lastException = e;
633                log.debug("Remove listener failed (try {})", tries, e);
634                onException(e);
635                log.debug("Will and sleep a while before trying to remove listener again");
636                TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE);
637            } catch (Exception e) {
638                lastException = e;
639                log.debug("Remove  listener failed (try {})", tries, e);
640                reconnect();
641                log.debug("Will and sleep a while before trying to remove listener again");
642                TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE);
643            }
644        }
645        if (!operationSuccessful) {
646            log.warn(errMsg, lastException);
647            throw new IOFailure(errMsg, lastException);
648        }
649    }
650
651    /**
652     * Reconnect to JMSBroker and reestablish session. Resets senders and publishers.
653     *
654     * @throws JMSException If unable to reconnect to JMSBroker and/or reestablish sessions
655     */
656    private void doReconnect() throws JMSException {
657        closeConnection();
658        establishConnectionAndSession();
659        // Add listeners already stored in the consumers map
660        log.debug("Re-add listeners");
661        for (Map.Entry<String, MessageListener> listener : listeners.entrySet()) {
662            setListener(getChannelName(listener.getKey()), listener.getValue());
663        }
664        log.info("Reconnect successful");
665    }
666
667}