001/* 002 * #%L 003 * Netarchivesuite - common 004 * %% 005 * Copyright (C) 2005 - 2018 The Royal Danish Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.common.distribute; 024 025import java.util.Calendar; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.Map; 029import java.util.concurrent.locks.ReentrantReadWriteLock; 030 031import javax.jms.Connection; 032import javax.jms.ConnectionFactory; 033import javax.jms.Destination; 034import javax.jms.ExceptionListener; 035import javax.jms.JMSException; 036import javax.jms.Message; 037import javax.jms.MessageConsumer; 038import javax.jms.MessageListener; 039import javax.jms.MessageProducer; 040import javax.jms.ObjectMessage; 041import javax.jms.Queue; 042import javax.jms.QueueBrowser; 043import javax.jms.QueueSession; 044import javax.jms.Session; 045 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049import dk.netarkivet.common.CommonSettings; 050import dk.netarkivet.common.exceptions.ArgumentNotValid; 051import dk.netarkivet.common.exceptions.IOFailure; 052import dk.netarkivet.common.exceptions.PermissionDenied; 053import dk.netarkivet.common.utils.CleanupHook; 054import dk.netarkivet.common.utils.CleanupIF; 055import dk.netarkivet.common.utils.Settings; 056import dk.netarkivet.common.utils.TimeUtils; 057 058/** 059 * Handles the communication with a JMS broker. Note on Thread-safety: the methods and fields of JMSConnection are not 060 * accessed by multiple threads (though JMSConnection itself creates threads). Thus no synchronization is needed on 061 * methods and fields of JMSConnection. A shutdown hook is also added, which closes the connection. Class JMSConnection 062 * is now also a exceptionhandler for the JMS Connections 063 */ 064public abstract class JMSConnection implements ExceptionListener, CleanupIF { 065 066 /** The log. */ 067 private static final Logger log = LoggerFactory.getLogger(JMSConnection.class); 068 069 /** Separator used in the consumer key. Separates the ChannelName from the MessageListener.toString(). */ 070 protected static final String CONSUMER_KEY_SEPARATOR = "##"; 071 072 /** The number to times to (re)try whenever a JMSException is thrown. */ 073 static final int JMS_MAXTRIES = Settings.getInt(CommonSettings.JMS_BROKER_RETRIES); 074 075 /** The JMS Connection. */ 076 protected Connection connection; 077 078 /** 079 * The Session handling messages sent to / received from the NetarchiveSuite queues and topics. 080 */ 081 protected Session session; 082 083 /** Map for caching message producers. */ 084 protected final Map<String, MessageProducer> producers = Collections 085 .synchronizedMap(new HashMap<String, MessageProducer>()); 086 087 /** Map for caching message consumers (topic-subscribers and queue-receivers). */ 088 protected final Map<String, MessageConsumer> consumers = Collections 089 .synchronizedMap(new HashMap<String, MessageConsumer>()); 090 091 /** Map for caching message listeners (topic-subscribers and queue-receivers). */ 092 protected final Map<String, MessageListener> listeners = Collections 093 .synchronizedMap(new HashMap<String, MessageListener>()); 094 095 /** 096 * Lock for the connection. Locked for read on adding/removing listeners and sending messages. Locked for write when 097 * connection, releasing and reconnecting. 098 */ 099 protected final ReentrantReadWriteLock connectionLock = new ReentrantReadWriteLock(); 100 101 /** Shutdown hook that closes the JMS connection. */ 102 protected Thread closeHook; 103 104 /** Singleton pattern is be used for this class. This is the one and only instance. */ 105 protected static JMSConnection instance; 106 107 /** 108 * Should be implemented according to a specific JMS broker. 109 * 110 * @return QueueConnectionFactory 111 * @throws JMSException If unable to get QueueConnectionFactory 112 */ 113 protected abstract ConnectionFactory getConnectionFactory() throws JMSException; 114 115 /** 116 * Should be implemented according to a specific JMS broker. 117 * 118 * @param destinationName the name of the wanted Queue 119 * @return The destination. Note that the implementation should make sure that this is a Queue or a Topic, as 120 * required by the NetarchiveSuite design. {@link Channels#isTopic(String)} 121 * @throws JMSException If unable to get a destination. 122 */ 123 protected abstract Destination getDestination(String destinationName) throws JMSException; 124 125 /** 126 * Exceptionhandler for the JMSConnection. Implemented according to a specific JMS broker. Should try to reconnect 127 * if at all possible. 128 * 129 * @param e a JMSException 130 */ 131 public abstract void onException(JMSException e); 132 133 /** Class constructor. */ 134 protected JMSConnection() { 135 } 136 137 /** 138 * Initializes the JMS connection. Creates and starts connection and session. Adds a shutdown hook that closes down 139 * JMSConnection. Adds this object as ExceptionListener for the connection. 140 * 141 * @throws IOFailure if initialization fails. 142 */ 143 protected void initConnection() throws IOFailure { 144 log.debug("Initializing a JMS connection {}", getClass().getName()); 145 146 connectionLock.writeLock().lock(); 147 try { 148 int tries = 0; 149 JMSException lastException = null; 150 boolean operationSuccessful = false; 151 while (!operationSuccessful && tries < JMS_MAXTRIES) { 152 ++tries; 153 try { 154 establishConnectionAndSession(); 155 operationSuccessful = true; 156 } catch (JMSException e) { 157 closeConnection(); 158 log.debug("Connect failed (try {})", tries, e); 159 lastException = e; 160 if (tries < JMS_MAXTRIES) { 161 log.debug("Will sleep a while before trying to connect again"); 162 TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE); 163 } 164 } 165 } 166 if (!operationSuccessful) { 167 log.warn("Could not initialize JMS connection {}", getClass(), lastException); 168 cleanup(); 169 throw new IOFailure("Could not initialize JMS connection " + getClass(), lastException); 170 } 171 closeHook = new CleanupHook(this); 172 Runtime.getRuntime().addShutdownHook(closeHook); 173 } finally { 174 connectionLock.writeLock().unlock(); 175 } 176 } 177 178 /** 179 * Submit an object to the destination queue. This method shouldn't be overridden. Override the method sendMessage 180 * to change functionality. 181 * 182 * @param msg The NetarkivetMessage to send to the destination queue (null not allowed) 183 * @throws ArgumentNotValid if nMsg is null. 184 * @throws IOFailure if the operation failed. 185 */ 186 public void send(NetarkivetMessage msg) { 187 ArgumentNotValid.checkNotNull(msg, "msg"); 188 log.trace("Sending message ({}) to {}", msg.toString(), msg.getTo()); 189 sendMessage(msg, msg.getTo()); 190 } 191 192 /** 193 * Sends a message msg to the channel defined by the parameter to - NOT the channel defined in the message. 194 * 195 * @param msg Message to be sent 196 * @param to The destination channel 197 */ 198 public final void resend(NetarkivetMessage msg, ChannelID to) { 199 ArgumentNotValid.checkNotNull(msg, "msg"); 200 ArgumentNotValid.checkNotNull(to, "to"); 201 log.trace("Resending message ({}) to {}", msg.toString(), to.getName()); 202 sendMessage(msg, to); 203 } 204 205 /** 206 * Submit an object to the reply queue. 207 * 208 * @param msg The NetarkivetMessage to send to the reply queue (null not allowed) 209 * @throws ArgumentNotValid if nMsg is null. 210 * @throws PermissionDenied if message nMsg has not been sent yet. 211 * @throws IOFailure if unable to reply. 212 */ 213 public final void reply(NetarkivetMessage msg) { 214 ArgumentNotValid.checkNotNull(msg, "msg"); 215 log.trace("Reply on message ({}) to {}", msg.toString(), msg.getReplyTo().getName()); 216 if (!msg.hasBeenSent()) { 217 throw new PermissionDenied("Message has not been sent yet"); 218 } 219 sendMessage(msg, msg.getReplyTo()); 220 } 221 222 /** 223 * Method adds a listener to the given queue or topic. 224 * 225 * @param mq the messagequeue to listen to 226 * @param ml the messagelistener 227 * @throws IOFailure if the operation failed. 228 */ 229 public void setListener(ChannelID mq, MessageListener ml) throws IOFailure { 230 ArgumentNotValid.checkNotNull(mq, "ChannelID mq"); 231 ArgumentNotValid.checkNotNull(ml, "MessageListener ml"); 232 setListener(mq.getName(), ml); 233 } 234 235 /** 236 * Removes the specified MessageListener from the given queue or topic. 237 * 238 * @param mq the given queue or topic 239 * @param ml a messagelistener 240 * @throws IOFailure On network trouble 241 */ 242 public void removeListener(ChannelID mq, MessageListener ml) throws IOFailure { 243 ArgumentNotValid.checkNotNull(mq, "ChannelID mq"); 244 ArgumentNotValid.checkNotNull(ml, "MessageListener ml"); 245 removeListener(ml, mq.getName()); 246 } 247 248 /** 249 * Creates a QueueBrowser object to peek at the messages on the specified queue. 250 * 251 * @param queueID The ChannelID for a specified queue. 252 * @return A new QueueBrowser instance with access to the specified queue 253 * @throws JMSException If unable to create the specified queue browser 254 */ 255 public QueueBrowser createQueueBrowser(ChannelID queueID) throws JMSException { 256 ArgumentNotValid.checkNotNull(queueID, "ChannelID queueID"); 257 Queue queue = getQueueSession().createQueue(queueID.getName()); 258 return getQueueSession().createBrowser(queue); 259 } 260 261 /** 262 * Provides a QueueSession instance. Functionality for retrieving a <code>QueueSession</code> object isen't 263 * available on the generic <code>JMSConnectionFactory</code> 264 * 265 * @return A <code>QueueSession</code> object connected to the current JMS broker 266 * @throws JMSException Failure to retrieve the <code>QueueBrowser</code> JMS Browser 267 */ 268 public abstract QueueSession getQueueSession() throws JMSException; 269 270 /** 271 * Clean up. Remove close connection, remove shutdown hook and null the instance. 272 */ 273 public void cleanup() { 274 connectionLock.writeLock().lock(); 275 try { 276 // Remove shutdown hook 277 log.info("Starting cleanup"); 278 try { 279 if (closeHook != null) { 280 Runtime.getRuntime().removeShutdownHook(closeHook); 281 } 282 } catch (IllegalStateException e) { 283 // Okay, it just means we are already shutting down. 284 } 285 closeHook = null; 286 // Close session 287 closeConnection(); 288 // Clear list of listeners 289 listeners.clear(); 290 instance = null; 291 log.info("Cleanup finished"); 292 } finally { 293 connectionLock.writeLock().unlock(); 294 } 295 } 296 297 /** 298 * Close connection, session and listeners. Will ignore trouble, and simply log it. 299 */ 300 private void closeConnection() { 301 // Close terminates all pending message received on the 302 // connection's session's consumers. 303 if (connection != null) { // close connection 304 try { 305 connection.close(); 306 } catch (JMSException e) { 307 // Just ignore it 308 log.warn("Error closing JMS Connection.", e); 309 } 310 } 311 connection = null; 312 session = null; 313 consumers.clear(); 314 producers.clear(); 315 } 316 317 /** 318 * Unwraps a NetarkivetMessage from an ObjectMessage. 319 * 320 * @param msg a javax.jms.ObjectMessage 321 * @return a NetarkivetMessage 322 * @throws ArgumentNotValid when msg in valid or format of JMS Object message is invalid 323 */ 324 public static NetarkivetMessage unpack(Message msg) throws ArgumentNotValid { 325 ArgumentNotValid.checkNotNull(msg, "msg"); 326 327 ObjectMessage objMsg; 328 try { 329 objMsg = (ObjectMessage) msg; 330 } catch (ClassCastException e) { 331 log.warn("Invalid message type: {}", msg.getClass()); 332 throw new ArgumentNotValid("Invalid message type: " + msg.getClass()); 333 } 334 335 NetarkivetMessage netMsg; 336 String classname = "Unknown class"; // for error reporting purposes 337 try { 338 classname = objMsg.getObject().getClass().getName(); 339 netMsg = (NetarkivetMessage) objMsg.getObject(); 340 // Note: Id is only updated if the message does not already have an 341 // id. On unpack, this means the first time the message is received. 342 343 // FIXME Fix for NAS-2043 doesn't seem to work 344 // String randomID = UUID.randomUUID().toString(); 345 // netMsg.updateId(randomID); 346 347 netMsg.updateId(msg.getJMSMessageID()); 348 } catch (ClassCastException e) { 349 log.warn("Invalid message type: {}", classname, e); 350 throw new ArgumentNotValid("Invalid message type: " + classname, e); 351 } catch (Exception e) { 352 String message = "Message invalid. Unable to unpack message: " + classname; 353 log.warn(message, e); 354 throw new ArgumentNotValid(message, e); 355 } 356 log.trace("Unpacked message '{}'", netMsg); 357 return netMsg; 358 } 359 360 /** 361 * Submit an ObjectMessage to the destination channel. 362 * 363 * @param nMsg the NetarkivetMessage to be wrapped and send as an ObjectMessage 364 * @param to the destination channel 365 * @throws IOFailure if message failed to be sent. 366 */ 367 protected void sendMessage(NetarkivetMessage nMsg, ChannelID to) throws IOFailure { 368 Exception lastException = null; 369 boolean operationSuccessful = false; 370 int tries = 0; 371 372 while (!operationSuccessful && tries < JMS_MAXTRIES) { 373 ++tries; 374 try { 375 doSend(nMsg, to); 376 operationSuccessful = true; 377 } catch (JMSException e) { 378 log.debug("Send failed (try {})", tries, e); 379 lastException = e; 380 if (tries < JMS_MAXTRIES) { 381 onException(e); 382 log.debug("Will sleep a while before trying to send again"); 383 TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE); 384 } 385 } catch (Exception e) { 386 log.debug("Send failed (try {})", tries, e); 387 lastException = e; 388 if (tries < JMS_MAXTRIES) { 389 reconnect(); 390 log.debug("Will sleep a while before trying to send again"); 391 TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE); 392 } 393 } 394 } 395 if (!operationSuccessful) { 396 log.warn("Send failed", lastException); 397 throw new IOFailure("Send failed.", lastException); 398 } 399 } 400 401 /** 402 * Do a reconnect to the JMSbroker. Does absolutely nothing, if already in the process of reconnecting. 403 */ 404 protected void reconnect() { 405 if (!connectionLock.writeLock().tryLock()) { 406 log.debug("Reconnection already in progress. Do nothing"); 407 return; 408 } 409 try { 410 log.info("Trying to reconnect to jmsbroker"); 411 412 boolean operationSuccessful = false; 413 Exception lastException = null; 414 int tries = 0; 415 while (!operationSuccessful && tries < JMS_MAXTRIES) { 416 ++tries; 417 try { 418 doReconnect(); 419 operationSuccessful = true; 420 } catch (Exception e) { 421 lastException = e; 422 log.debug("Reconnect failed (try {})", tries, e); 423 if (tries < JMS_MAXTRIES) { 424 log.debug("Will sleep a while before trying to reconnect again"); 425 TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE); 426 } 427 } 428 } 429 if (!operationSuccessful) { 430 log.warn("Reconnect to JMS broker failed", lastException); 431 closeConnection(); 432 } 433 } finally { 434 // Tell everybody, that we are not trying to reconnect any longer 435 connectionLock.writeLock().unlock(); 436 } 437 } 438 439 /** 440 * Helper method for getting the right producer for a queue or topic. 441 * 442 * @param queueName The name of the channel 443 * @return The producer for that channel. A new one is created, if none exists. 444 * @throws JMSException If a new producer cannot be created. 445 */ 446 private MessageProducer getProducer(String queueName) throws JMSException { 447 // Check if producer is in cache 448 // If it is not, it is created and stored in cache: 449 MessageProducer producer = producers.get(queueName); 450 if (producer == null) { 451 producer = getSession().createProducer(getDestination(queueName)); 452 producers.put(queueName, producer); 453 } 454 return producer; 455 } 456 457 /** 458 * Get the session. Will try reconnecting if session is null. 459 * 460 * @return The session. 461 * @throws IOFailure if no session is available, and reconnect does not help. 462 */ 463 private Session getSession() { 464 if (session == null) { 465 reconnect(); 466 } 467 if (session == null) { 468 throw new IOFailure("Session not available"); 469 } 470 return session; 471 } 472 473 /** 474 * Helper method for getting the right consumer for a queue or topic, and message listener. 475 * 476 * @param channelName The name of the channel 477 * @param ml The message listener to add as listener to the channel 478 * @return The producer for that channel. A new one is created, if none exists. 479 * @throws JMSException If a new producer cannot be created. 480 */ 481 private MessageConsumer getConsumer(String channelName, MessageListener ml) throws JMSException { 482 String key = getConsumerKey(channelName, ml); 483 MessageConsumer consumer = consumers.get(key); 484 if (consumer == null) { 485 consumer = getSession().createConsumer(getDestination(channelName)); 486 consumers.put(key, consumer); 487 listeners.put(key, ml); 488 } 489 return consumer; 490 } 491 492 /** 493 * Generate a consumerkey based on the given channel name and messageListener. 494 * 495 * @param channel Channel name 496 * @param messageListener a messageListener 497 * @return the generated consumerkey. 498 */ 499 protected static String getConsumerKey(String channel, MessageListener messageListener) { 500 return channel + CONSUMER_KEY_SEPARATOR + messageListener; 501 } 502 503 /** 504 * Get the channelName embedded in a consumerKey. 505 * 506 * @param consumerKey a consumerKey 507 * @return name of channel embedded in a consumerKey 508 */ 509 private static String getChannelName(String consumerKey) { 510 // assumes argument consumerKey was created using metod getConsumerKey() 511 return consumerKey.split(CONSUMER_KEY_SEPARATOR)[0]; 512 } 513 514 /** 515 * Helper method to establish one Connection and associated Session. 516 * 517 * @throws JMSException If some JMS error occurred during the creation of the required JMS connection and session 518 */ 519 private void establishConnectionAndSession() throws JMSException { 520 // Establish a queue connection and a session 521 connection = getConnectionFactory().createConnection(); 522 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 523 connection.setExceptionListener(this); 524 connection.start(); 525 } 526 527 /** 528 * Sends an ObjectMessage on a queue destination. 529 * 530 * @param msg the NetarkivetMessage to be wrapped and send as an ObjectMessage. 531 * @param to the destination topic. 532 * @throws JMSException if message failed to be sent. 533 */ 534 private void doSend(NetarkivetMessage msg, ChannelID to) throws JMSException { 535 connectionLock.readLock().lock(); 536 try { 537 ObjectMessage message = getSession().createObjectMessage(msg); 538 synchronized (msg) { 539 getProducer(to.getName()).send(message); 540 // Note: Id is only updated if the message does not already have 541 // an id. This ensures that resent messages keep the same ID 542 // TODO Is it always OK for resent messages to keep the same ID 543 544 // FIXME Solution for NAS-2043 doesn't work; rolled back 545 // String randomID = UUID.randomUUID().toString(); 546 // msg.updateId(randomID); 547 msg.updateId(message.getJMSMessageID()); 548 549 } 550 } finally { 551 connectionLock.readLock().unlock(); 552 } 553 log.trace("Sent message '{}'", msg.toString()); 554 } 555 556 /** 557 * Method adds a listener to the given queue or topic. 558 * 559 * @param channelName the messagequeue to listen to 560 * @param ml the messagelistener 561 * @throws IOFailure if the operation failed. 562 */ 563 private void setListener(String channelName, MessageListener ml) { 564 log.debug("Adding {} as listener to {}", ml.toString(), channelName); 565 String errMsg = "JMS-error - could not add Listener to queue/topic: " + channelName; 566 567 int tries = 0; 568 boolean operationSuccessful = false; 569 Exception lastException = null; 570 while (!operationSuccessful && tries < JMS_MAXTRIES) { 571 ++tries; 572 try { 573 connectionLock.readLock().lock(); 574 try { 575 getConsumer(channelName, ml).setMessageListener(ml); 576 } finally { 577 connectionLock.readLock().unlock(); 578 } 579 operationSuccessful = true; 580 } catch (JMSException e) { 581 lastException = e; 582 log.debug("Set listener failed (try {})", tries, e); 583 if (tries < JMS_MAXTRIES) { 584 onException(e); 585 log.debug("Will sleep a while before trying to set listener again"); 586 TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE); 587 } 588 } catch (Exception e) { 589 lastException = e; 590 log.debug("Set listener failed (try {})", tries, e); 591 if (tries < JMS_MAXTRIES) { 592 reconnect(); 593 log.debug("Will sleep a while before trying to set listener again"); 594 TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE); 595 } 596 } 597 } 598 599 if (!operationSuccessful) { 600 log.warn(errMsg, lastException); 601 throw new IOFailure(errMsg, lastException); 602 } 603 } 604 605 /** 606 * Remove a messagelistener from a channel (a queue or a topic). 607 * 608 * @param ml A specific MessageListener 609 * @param channelName a channelname 610 */ 611 private void removeListener(MessageListener ml, String channelName) { 612 String errMsg = "JMS-error - could not remove Listener from queue/topic: " + channelName; 613 int tries = 0; 614 Exception lastException = null; 615 boolean operationSuccessful = false; 616 617 log.info("Removing listener from channel '{}'", channelName); 618 while (!operationSuccessful && tries < JMS_MAXTRIES) { 619 try { 620 ++tries; 621 connectionLock.readLock().lock(); 622 try { 623 MessageConsumer messageConsumer = getConsumer(channelName, ml); 624 messageConsumer.close(); 625 consumers.remove(getConsumerKey(channelName, ml)); 626 listeners.remove(getConsumerKey(channelName, ml)); 627 } finally { 628 connectionLock.readLock().unlock(); 629 } 630 operationSuccessful = true; 631 } catch (JMSException e) { 632 lastException = e; 633 log.debug("Remove listener failed (try {})", tries, e); 634 onException(e); 635 log.debug("Will and sleep a while before trying to remove listener again"); 636 TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE); 637 } catch (Exception e) { 638 lastException = e; 639 log.debug("Remove listener failed (try {})", tries, e); 640 reconnect(); 641 log.debug("Will and sleep a while before trying to remove listener again"); 642 TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE); 643 } 644 } 645 if (!operationSuccessful) { 646 log.warn(errMsg, lastException); 647 throw new IOFailure(errMsg, lastException); 648 } 649 } 650 651 /** 652 * Reconnect to JMSBroker and reestablish session. Resets senders and publishers. 653 * 654 * @throws JMSException If unable to reconnect to JMSBroker and/or reestablish sessions 655 */ 656 private void doReconnect() throws JMSException { 657 closeConnection(); 658 establishConnectionAndSession(); 659 // Add listeners already stored in the consumers map 660 log.debug("Re-add listeners"); 661 for (Map.Entry<String, MessageListener> listener : listeners.entrySet()) { 662 setListener(getChannelName(listener.getKey()), listener.getValue()); 663 } 664 log.info("Reconnect successful"); 665 } 666 667}