001/*
002 * #%L
003 * Netarchivesuite - common
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.common.distribute;
024
025import java.util.Arrays;
026
027import javax.jms.Destination;
028import javax.jms.JMSException;
029import javax.jms.QueueConnection;
030import javax.jms.QueueSession;
031import javax.jms.Session;
032
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import com.sun.messaging.ConnectionConfiguration;
037import com.sun.messaging.Queue;
038import com.sun.messaging.Topic;
039
040import dk.netarkivet.common.exceptions.ArgumentNotValid;
041import dk.netarkivet.common.exceptions.IOFailure;
042import dk.netarkivet.common.utils.Settings;
043
044/**
045 * Handles the communication with a Sun JMS broker.
046 * <p>
047 * Methods are implemented to get a connection, as well as queues and topics. The error handling will try to reconnect
048 * on given error scenarios.
049 * <p>
050 * The warnings and errorcodes reported by Sun Message Queue 4.1 can be found in Appendix A Sun Java System Message
051 * Queue 4.1 Developer's Guide for Java Clients: http://docs.sun.com/app/docs/doc/819-7757/aeqgo?a=view
052 */
053public class JMSConnectionSunMQ extends JMSConnection {
054
055    /** The log. */
056    private static final Logger log = LoggerFactory.getLogger(JMSConnectionSunMQ.class);
057
058    /** The default place in classpath where the settings file can be found. */
059    private static String DEFAULT_SETTINGS_CLASSPATH = "dk/netarkivet/common/distribute/JMSConnectionSunMQSettings.xml";
060
061    /*
062     * The static initialiser is called when the class is loaded. It will add default values for all settings defined in
063     * this class, by loading them from a settings.xml file in classpath.
064     */
065    static {
066        Settings.addDefaultClasspathSettings(DEFAULT_SETTINGS_CLASSPATH);
067    }
068
069    public static final String[] RECONNECT_ERRORCODES = {"C4000", // Packet acknowledgment failed
070            "C4001", // Write packet failed
071            "C4002", // Read packet failed
072            "C4003", // Connection timed out
073            "C4036", // Server error
074            "C4056", // Received goodbye from broker
075            "C4059", // Session is closed
076            "C4062", // Connection is closed
077            "C4063" // Consumer is closed
078    };
079
080    // NOTE: The constants defining setting names below are left non-final on
081    // purpose! Otherwise, the static initialiser that loads default values
082    // will not run.
083
084    /**
085     * <b>settings.common.jms.broker</b>: <br>
086     * The JMS broker host contacted by the JMS connection.
087     */
088    public static String JMS_BROKER_HOST = "settings.common.jms.broker";
089
090    /**
091     * <b>settings.common.jms.port</b>: <br>
092     * The port the JMS connection should use.
093     */
094    public static String JMS_BROKER_PORT = "settings.common.jms.port";
095
096    private QueueConnection qConnection;
097
098    /** Constructor. */
099    private JMSConnectionSunMQ() {
100        super();
101        log.info("Creating instance of {}", getClass().getName());
102        initConnection();
103    }
104
105    /**
106     * Intialises an Open Message Queue JMS connection.
107     *
108     * @return A JMSConnection
109     * @throws IOFailure when connection to JMS broker failed
110     */
111    public static synchronized JMSConnection getInstance() throws IOFailure {
112        if (instance == null) {
113            instance = new JMSConnectionSunMQ();
114        }
115        return instance;
116    }
117
118    /**
119     * Returns a new QueueConnectionFactory. This is an SunMQ implementation of QueueConnectionFactory.
120     * <p>
121     * Notice: The return type is explicitly defined with package prefix to avoid name collision with
122     * javax.jms.QueueConnectionFactory
123     *
124     * @return QueueConnectionFactory
125     * @throws JMSException If unable to create a QueueConnectionfactory with the necessary properties:
126     * imqConsumerflowLimit set to 1, imqBrokerHostname and imqBrokerHostPort set to the values defined in our settings.
127     */
128    protected com.sun.messaging.ConnectionFactory getConnectionFactory() throws JMSException {
129        log.info("Establishing SunMQ JMS Connection to '{}:{}'", Settings.get(JMS_BROKER_HOST),
130                Settings.getInt(JMS_BROKER_PORT));
131        com.sun.messaging.ConnectionFactory cFactory = new com.sun.messaging.ConnectionFactory();
132        cFactory.setProperty(ConnectionConfiguration.imqBrokerHostName, Settings.get(JMS_BROKER_HOST));
133        cFactory.setProperty(ConnectionConfiguration.imqBrokerHostPort, Settings.get(JMS_BROKER_PORT));
134        cFactory.setProperty(ConnectionConfiguration.imqConsumerFlowLimit, "1");
135        return cFactory;
136    }
137
138    /**
139     * Returns an Queue or a Topic. This is an SunMQ implementation of Queue and Topic. The method depends on the JMS
140     * provider being configured to autocreate queues and topics.
141     *
142     * @param channelName the name of the queue or topic.
143     * @return A queue or topic depending on the channel name.
144     * @throws JMSException If unable to create the destination.
145     */
146    protected Destination getDestination(String channelName) throws JMSException {
147        boolean isTopic = Channels.isTopic(channelName);
148        if (isTopic) {
149            return new Topic(channelName);
150        } else {
151            return new Queue(channelName);
152        }
153    }
154
155    /** Reset the singleton and close the connection by calling super(). */
156    public void cleanup() {
157        synchronized (JMSConnectionSunMQ.class) {
158            instance = null;
159            super.cleanup();
160        }
161    }
162
163    /**
164     * Exceptionhandler for the JMSConnection. Will try to reconnect on errors with error codes defined in the constant
165     * RECONNECT_ERRORCODES.
166     *
167     * @param e an JMSException
168     */
169    public void onException(JMSException e) {
170        ArgumentNotValid.checkNotNull(e, "JMSException e");
171        final String errorcode = e.getErrorCode();
172        log.warn("JMSException with errorcode '{}' encountered:", errorcode, e);
173
174        if (Arrays.asList(RECONNECT_ERRORCODES).contains(errorcode)) {
175            reconnect();
176        } else {
177            log.warn("Exception not handled. Don't know how to handle exceptions with errorcode {}", errorcode, e);
178        }
179    }
180
181    @Override
182    public synchronized QueueSession getQueueSession() throws JMSException {
183        if (qConnection == null) {
184            qConnection = getConnectionFactory().createQueueConnection();
185        }
186        boolean transacted = false;
187        return qConnection.createQueueSession(transacted, Session.AUTO_ACKNOWLEDGE);
188    }
189
190}