001/* 002 * #%L 003 * Netarchivesuite - common 004 * %% 005 * Copyright (C) 2005 - 2018 The Royal Danish Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.common.distribute; 024 025import java.util.Arrays; 026 027import javax.jms.Destination; 028import javax.jms.JMSException; 029import javax.jms.QueueConnection; 030import javax.jms.QueueSession; 031import javax.jms.Session; 032 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036import com.sun.messaging.ConnectionConfiguration; 037import com.sun.messaging.Queue; 038import com.sun.messaging.Topic; 039 040import dk.netarkivet.common.exceptions.ArgumentNotValid; 041import dk.netarkivet.common.exceptions.IOFailure; 042import dk.netarkivet.common.utils.Settings; 043 044/** 045 * Handles the communication with a Sun JMS broker. 046 * <p> 047 * Methods are implemented to get a connection, as well as queues and topics. The error handling will try to reconnect 048 * on given error scenarios. 049 * <p> 050 * The warnings and errorcodes reported by Sun Message Queue 4.1 can be found in Appendix A Sun Java System Message 051 * Queue 4.1 Developer's Guide for Java Clients: http://docs.sun.com/app/docs/doc/819-7757/aeqgo?a=view 052 */ 053public class JMSConnectionSunMQ extends JMSConnection { 054 055 /** The log. */ 056 private static final Logger log = LoggerFactory.getLogger(JMSConnectionSunMQ.class); 057 058 /** The default place in classpath where the settings file can be found. */ 059 private static String DEFAULT_SETTINGS_CLASSPATH = "dk/netarkivet/common/distribute/JMSConnectionSunMQSettings.xml"; 060 061 /* 062 * The static initialiser is called when the class is loaded. It will add default values for all settings defined in 063 * this class, by loading them from a settings.xml file in classpath. 064 */ 065 static { 066 Settings.addDefaultClasspathSettings(DEFAULT_SETTINGS_CLASSPATH); 067 } 068 069 public static final String[] RECONNECT_ERRORCODES = {"C4000", // Packet acknowledgment failed 070 "C4001", // Write packet failed 071 "C4002", // Read packet failed 072 "C4003", // Connection timed out 073 "C4036", // Server error 074 "C4056", // Received goodbye from broker 075 "C4059", // Session is closed 076 "C4062", // Connection is closed 077 "C4063" // Consumer is closed 078 }; 079 080 // NOTE: The constants defining setting names below are left non-final on 081 // purpose! Otherwise, the static initialiser that loads default values 082 // will not run. 083 084 /** 085 * <b>settings.common.jms.broker</b>: <br> 086 * The JMS broker host contacted by the JMS connection. 087 */ 088 public static String JMS_BROKER_HOST = "settings.common.jms.broker"; 089 090 /** 091 * <b>settings.common.jms.port</b>: <br> 092 * The port the JMS connection should use. 093 */ 094 public static String JMS_BROKER_PORT = "settings.common.jms.port"; 095 096 private QueueConnection qConnection; 097 098 /** Constructor. */ 099 private JMSConnectionSunMQ() { 100 super(); 101 log.info("Creating instance of {}", getClass().getName()); 102 initConnection(); 103 } 104 105 /** 106 * Intialises an Open Message Queue JMS connection. 107 * 108 * @return A JMSConnection 109 * @throws IOFailure when connection to JMS broker failed 110 */ 111 public static synchronized JMSConnection getInstance() throws IOFailure { 112 if (instance == null) { 113 instance = new JMSConnectionSunMQ(); 114 } 115 return instance; 116 } 117 118 /** 119 * Returns a new QueueConnectionFactory. This is an SunMQ implementation of QueueConnectionFactory. 120 * <p> 121 * Notice: The return type is explicitly defined with package prefix to avoid name collision with 122 * javax.jms.QueueConnectionFactory 123 * 124 * @return QueueConnectionFactory 125 * @throws JMSException If unable to create a QueueConnectionfactory with the necessary properties: 126 * imqConsumerflowLimit set to 1, imqBrokerHostname and imqBrokerHostPort set to the values defined in our settings. 127 */ 128 protected com.sun.messaging.ConnectionFactory getConnectionFactory() throws JMSException { 129 log.info("Establishing SunMQ JMS Connection to '{}:{}'", Settings.get(JMS_BROKER_HOST), 130 Settings.getInt(JMS_BROKER_PORT)); 131 com.sun.messaging.ConnectionFactory cFactory = new com.sun.messaging.ConnectionFactory(); 132 cFactory.setProperty(ConnectionConfiguration.imqBrokerHostName, Settings.get(JMS_BROKER_HOST)); 133 cFactory.setProperty(ConnectionConfiguration.imqBrokerHostPort, Settings.get(JMS_BROKER_PORT)); 134 cFactory.setProperty(ConnectionConfiguration.imqConsumerFlowLimit, "1"); 135 return cFactory; 136 } 137 138 /** 139 * Returns an Queue or a Topic. This is an SunMQ implementation of Queue and Topic. The method depends on the JMS 140 * provider being configured to autocreate queues and topics. 141 * 142 * @param channelName the name of the queue or topic. 143 * @return A queue or topic depending on the channel name. 144 * @throws JMSException If unable to create the destination. 145 */ 146 protected Destination getDestination(String channelName) throws JMSException { 147 boolean isTopic = Channels.isTopic(channelName); 148 if (isTopic) { 149 return new Topic(channelName); 150 } else { 151 return new Queue(channelName); 152 } 153 } 154 155 /** Reset the singleton and close the connection by calling super(). */ 156 public void cleanup() { 157 synchronized (JMSConnectionSunMQ.class) { 158 instance = null; 159 super.cleanup(); 160 } 161 } 162 163 /** 164 * Exceptionhandler for the JMSConnection. Will try to reconnect on errors with error codes defined in the constant 165 * RECONNECT_ERRORCODES. 166 * 167 * @param e an JMSException 168 */ 169 public void onException(JMSException e) { 170 ArgumentNotValid.checkNotNull(e, "JMSException e"); 171 final String errorcode = e.getErrorCode(); 172 log.warn("JMSException with errorcode '{}' encountered:", errorcode, e); 173 174 if (Arrays.asList(RECONNECT_ERRORCODES).contains(errorcode)) { 175 reconnect(); 176 } else { 177 log.warn("Exception not handled. Don't know how to handle exceptions with errorcode {}", errorcode, e); 178 } 179 } 180 181 @Override 182 public synchronized QueueSession getQueueSession() throws JMSException { 183 if (qConnection == null) { 184 qConnection = getConnectionFactory().createQueueConnection(); 185 } 186 boolean transacted = false; 187 return qConnection.createQueueSession(transacted, Session.AUTO_ACKNOWLEDGE); 188 } 189 190}