001/* 002 * #%L 003 * Netarchivesuite - common 004 * %% 005 * Copyright (C) 2005 - 2018 The Royal Danish Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.common.distribute; 024 025import java.util.Calendar; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.Map; 029import java.util.concurrent.locks.ReentrantReadWriteLock; 030 031import javax.jms.Connection; 032import javax.jms.ConnectionFactory; 033import javax.jms.Destination; 034import javax.jms.ExceptionListener; 035import javax.jms.JMSException; 036import javax.jms.Message; 037import javax.jms.MessageConsumer; 038import javax.jms.MessageListener; 039import javax.jms.MessageProducer; 040import javax.jms.ObjectMessage; 041import javax.jms.Queue; 042import javax.jms.QueueBrowser; 043import javax.jms.QueueSession; 044import javax.jms.Session; 045 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049import dk.netarkivet.common.CommonSettings; 050import dk.netarkivet.common.exceptions.ArgumentNotValid; 051import dk.netarkivet.common.exceptions.IOFailure; 052import dk.netarkivet.common.exceptions.PermissionDenied; 053import dk.netarkivet.common.utils.CleanupHook; 054import dk.netarkivet.common.utils.CleanupIF; 055import dk.netarkivet.common.utils.Settings; 056import dk.netarkivet.common.utils.TimeUtils; 057 058/** 059 * Handles the communication with a JMS broker. Note on Thread-safety: the methods and fields of JMSConnection are not 060 * accessed by multiple threads (though JMSConnection itself creates threads). Thus no synchronization is needed on 061 * methods and fields of JMSConnection. A shutdown hook is also added, which closes the connection. Class JMSConnection 062 * is now also a exceptionhandler for the JMS Connections 063 */ 064public abstract class JMSConnection implements ExceptionListener, CleanupIF { 065 066 /** The log. */ 067 private static final Logger log = LoggerFactory.getLogger(JMSConnection.class); 068 069 /** Separator used in the consumer key. Separates the ChannelName from the MessageListener.toString(). */ 070 protected static final String CONSUMER_KEY_SEPARATOR = "##"; 071 072 /** The number to times to (re)try whenever a JMSException is thrown. */ 073 static final int JMS_MAXTRIES = Settings.getInt(CommonSettings.JMS_BROKER_RETRIES); 074 075 /** The JMS Connection. */ 076 protected Connection connection; 077 078 /** 079 * The Session handling messages sent to / received from the NetarchiveSuite queues and topics. 080 */ 081 protected Session session; 082 083 /** Map for caching message producers. */ 084 protected final Map<String, MessageProducer> producers = Collections 085 .synchronizedMap(new HashMap<String, MessageProducer>()); 086 087 /** Map for caching message consumers (topic-subscribers and queue-receivers). */ 088 protected final Map<String, MessageConsumer> consumers = Collections 089 .synchronizedMap(new HashMap<String, MessageConsumer>()); 090 091 /** Map for caching message listeners (topic-subscribers and queue-receivers). */ 092 protected final Map<String, MessageListener> listeners = Collections 093 .synchronizedMap(new HashMap<String, MessageListener>()); 094 095 /** 096 * Lock for the connection. Locked for read on adding/removing listeners and sending messages. Locked for write when 097 * connection, releasing and reconnecting. 098 */ 099 protected final ReentrantReadWriteLock connectionLock = new ReentrantReadWriteLock(); 100 101 /** Shutdown hook that closes the JMS connection. */ 102 protected Thread closeHook; 103 104 /** Singleton pattern is be used for this class. This is the one and only instance. */ 105 protected static JMSConnection instance; 106 107 /** 108 * Should be implemented according to a specific JMS broker. 109 * 110 * @return QueueConnectionFactory 111 * @throws JMSException If unable to get QueueConnectionFactory 112 */ 113 protected abstract ConnectionFactory getConnectionFactory() throws JMSException; 114 115 /** 116 * Should be implemented according to a specific JMS broker. 117 * 118 * @param destinationName the name of the wanted Queue 119 * @return The destination. Note that the implementation should make sure that this is a Queue or a Topic, as 120 * required by the NetarchiveSuite design. {@link Channels#isTopic(String)} 121 * @throws JMSException If unable to get a destination. 122 */ 123 protected abstract Destination getDestination(String destinationName) throws JMSException; 124 125 /** 126 * Exceptionhandler for the JMSConnection. Implemented according to a specific JMS broker. Should try to reconnect 127 * if at all possible. 128 * 129 * @param e a JMSException 130 */ 131 public abstract void onException(JMSException e); 132 133 /** Class constructor. */ 134 protected JMSConnection() { 135 } 136 137 /** 138 * Initializes the JMS connection. Creates and starts connection and session. Adds a shutdown hook that closes down 139 * JMSConnection. Adds this object as ExceptionListener for the connection. 140 * 141 * @throws IOFailure if initialization fails. 142 */ 143 protected void initConnection() throws IOFailure { 144 log.debug("Initializing a JMS connection {}", getClass().getName()); 145 146 connectionLock.writeLock().lock(); 147 try { 148 int tries = 0; 149 JMSException lastException = null; 150 boolean operationSuccessful = false; 151 while (!operationSuccessful && tries < JMS_MAXTRIES) { 152 ++tries; 153 try { 154 establishConnectionAndSession(); 155 operationSuccessful = true; 156 } catch (JMSException e) { 157 closeConnection(); 158 log.debug("Connect failed (try {})", tries, e); 159 lastException = e; 160 if (tries < JMS_MAXTRIES) { 161 log.debug("Will sleep a while before trying to connect again"); 162 TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE); 163 } 164 } 165 } 166 if (!operationSuccessful) { 167 log.warn("Could not initialize JMS connection {}", getClass(), lastException); 168 cleanup(); 169 throw new IOFailure("Could not initialize JMS connection " + getClass(), lastException); 170 } 171 closeHook = new CleanupHook(this); 172 Runtime.getRuntime().addShutdownHook(closeHook); 173 } finally { 174 connectionLock.writeLock().unlock(); 175 } 176 } 177 178 /** 179 * Submit an object to the destination queue. This method shouldn't be overridden. Override the method sendMessage 180 * to change functionality. 181 * 182 * @param msg The NetarkivetMessage to send to the destination queue (null not allowed) 183 * @throws ArgumentNotValid if nMsg is null. 184 * @throws IOFailure if the operation failed. 185 */ 186 public void send(NetarkivetMessage msg) { 187 ArgumentNotValid.checkNotNull(msg, "msg"); 188 log.trace("Sending message ({}) to {}", msg.toString(), msg.getTo()); 189 sendMessage(msg, msg.getTo()); 190 } 191 192 /** 193 * Sends a message msg to the channel defined by the parameter to - NOT the channel defined in the message. 194 * 195 * @param msg Message to be sent 196 * @param to The destination channel 197 */ 198 public final void resend(NetarkivetMessage msg, ChannelID to) { 199 ArgumentNotValid.checkNotNull(msg, "msg"); 200 ArgumentNotValid.checkNotNull(to, "to"); 201 log.trace("Resending message ({}) to {}", msg.toString(), to.getName()); 202 sendMessage(msg, to); 203 } 204 205 /** 206 * Submit an object to the reply queue. 207 * 208 * @param msg The NetarkivetMessage to send to the reply queue (null not allowed) 209 * @throws ArgumentNotValid if nMsg is null. 210 * @throws PermissionDenied if message nMsg has not been sent yet. 211 * @throws IOFailure if unable to reply. 212 */ 213 public final void reply(NetarkivetMessage msg) { 214 ArgumentNotValid.checkNotNull(msg, "msg"); 215 log.trace("Reply on message ({}) to {}", msg.toString(), msg.getReplyTo().getName()); 216 if (!msg.hasBeenSent()) { 217 throw new PermissionDenied("Message has not been sent yet"); 218 } 219 sendMessage(msg, msg.getReplyTo()); 220 } 221 222 /** 223 * Method adds a listener to the given queue or topic. 224 * 225 * @param mq the messagequeue to listen to 226 * @param ml the messagelistener 227 * @throws IOFailure if the operation failed. 228 */ 229 public void setListener(ChannelID mq, MessageListener ml) throws IOFailure { 230 ArgumentNotValid.checkNotNull(mq, "ChannelID mq"); 231 ArgumentNotValid.checkNotNull(ml, "MessageListener ml"); 232 setListener(mq.getName(), ml); 233 } 234 235 /** 236 * Removes the specified MessageListener from the given queue or topic. 237 * 238 * @param mq the given queue or topic 239 * @param ml a messagelistener 240 * @throws IOFailure On network trouble 241 */ 242 public void removeListener(ChannelID mq, MessageListener ml) throws IOFailure { 243 ArgumentNotValid.checkNotNull(mq, "ChannelID mq"); 244 ArgumentNotValid.checkNotNull(ml, "MessageListener ml"); 245 removeListener(ml, mq.getName()); 246 } 247 248 /** 249 * Creates a QueueBrowser object to peek at the messages on the specified queue. 250 * 251 * @param queueID The ChannelID for a specified queue. 252 * @return A new QueueBrowser instance with access to the specified queue 253 * @throws JMSException If unable to create the specified queue browser 254 */ 255 public QueueBrowser createQueueBrowser(ChannelID queueID) throws JMSException { 256 ArgumentNotValid.checkNotNull(queueID, "ChannelID queueID"); 257 Queue queue = getQueueSession().createQueue(queueID.getName()); 258 return getQueueSession().createBrowser(queue); 259 } 260 261 /** 262 * Creates a QueueBrowser object to peek at the messages on the specified queue. 263 * 264 * @param queueID The ChannelID for a specified queue. 265 * @param QSession The QueueSession to use. 266 * @return A new QueueBrowser instance with access to the specified queue 267 * @throws JMSException If unable to create the specified queue browser 268 */ 269 public QueueBrowser createQueueBrowser(ChannelID queueID, QueueSession QSession) throws JMSException { 270 ArgumentNotValid.checkNotNull(queueID, "ChannelID queueID"); 271 ArgumentNotValid.checkNotNull(QSession, "QueueSession QSession"); 272 Queue queue = QSession.createQueue(queueID.getName()); 273 return QSession.createBrowser(queue); 274 } 275 276 /** 277 * Provides a QueueSession instance. Functionality for retrieving a <code>QueueSession</code> object isen't 278 * available on the generic <code>JMSConnectionFactory</code> 279 * 280 * @return A <code>QueueSession</code> object connected to the current JMS broker 281 * @throws JMSException Failure to retrieve the <code>QueueBrowser</code> JMS Browser 282 */ 283 public abstract QueueSession getQueueSession() throws JMSException; 284 285 /** 286 * Clean up. Remove close connection, remove shutdown hook and null the instance. 287 */ 288 public void cleanup() { 289 connectionLock.writeLock().lock(); 290 try { 291 // Remove shutdown hook 292 log.info("Starting cleanup"); 293 try { 294 if (closeHook != null) { 295 Runtime.getRuntime().removeShutdownHook(closeHook); 296 } 297 } catch (IllegalStateException e) { 298 // Okay, it just means we are already shutting down. 299 } 300 closeHook = null; 301 // Close session 302 closeConnection(); 303 // Clear list of listeners 304 listeners.clear(); 305 instance = null; 306 log.info("Cleanup finished"); 307 } finally { 308 connectionLock.writeLock().unlock(); 309 } 310 } 311 312 /** 313 * Close connection, session and listeners. Will ignore trouble, and simply log it. 314 */ 315 private void closeConnection() { 316 // Close terminates all pending message received on the 317 // connection's session's consumers. 318 if (connection != null) { // close connection 319 try { 320 connection.close(); 321 } catch (JMSException e) { 322 // Just ignore it 323 log.warn("Error closing JMS Connection.", e); 324 } 325 } 326 connection = null; 327 session = null; 328 consumers.clear(); 329 producers.clear(); 330 } 331 332 /** 333 * Unwraps a NetarkivetMessage from an ObjectMessage. 334 * 335 * @param msg a javax.jms.ObjectMessage 336 * @return a NetarkivetMessage 337 * @throws ArgumentNotValid when msg in valid or format of JMS Object message is invalid 338 */ 339 public static NetarkivetMessage unpack(Message msg) throws ArgumentNotValid { 340 ArgumentNotValid.checkNotNull(msg, "msg"); 341 342 ObjectMessage objMsg; 343 try { 344 objMsg = (ObjectMessage) msg; 345 } catch (ClassCastException e) { 346 log.warn("Invalid message type: {}", msg.getClass()); 347 throw new ArgumentNotValid("Invalid message type: " + msg.getClass()); 348 } 349 350 NetarkivetMessage netMsg; 351 String classname = "Unknown class"; // for error reporting purposes 352 try { 353 classname = objMsg.getObject().getClass().getName(); 354 netMsg = (NetarkivetMessage) objMsg.getObject(); 355 // Note: Id is only updated if the message does not already have an 356 // id. On unpack, this means the first time the message is received. 357 358 // FIXME Fix for NAS-2043 doesn't seem to work 359 // String randomID = UUID.randomUUID().toString(); 360 // netMsg.updateId(randomID); 361 362 netMsg.updateId(msg.getJMSMessageID()); 363 } catch (ClassCastException e) { 364 log.warn("Invalid message type: {}", classname, e); 365 throw new ArgumentNotValid("Invalid message type: " + classname, e); 366 } catch (Exception e) { 367 String message = "Message invalid. Unable to unpack message: " + classname; 368 log.warn(message, e); 369 throw new ArgumentNotValid(message, e); 370 } 371 log.trace("Unpacked message '{}'", netMsg); 372 return netMsg; 373 } 374 375 /** 376 * Submit an ObjectMessage to the destination channel. 377 * 378 * @param nMsg the NetarkivetMessage to be wrapped and send as an ObjectMessage 379 * @param to the destination channel 380 * @throws IOFailure if message failed to be sent. 381 */ 382 protected void sendMessage(NetarkivetMessage nMsg, ChannelID to) throws IOFailure { 383 Exception lastException = null; 384 boolean operationSuccessful = false; 385 int tries = 0; 386 387 while (!operationSuccessful && tries < JMS_MAXTRIES) { 388 ++tries; 389 try { 390 doSend(nMsg, to); 391 operationSuccessful = true; 392 } catch (JMSException e) { 393 log.debug("Send failed (try {})", tries, e); 394 lastException = e; 395 if (tries < JMS_MAXTRIES) { 396 onException(e); 397 log.debug("Will sleep a while before trying to send again"); 398 TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE); 399 } 400 } catch (Exception e) { 401 log.debug("Send failed (try {})", tries, e); 402 lastException = e; 403 if (tries < JMS_MAXTRIES) { 404 reconnect(); 405 log.debug("Will sleep a while before trying to send again"); 406 TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE); 407 } 408 } 409 } 410 if (!operationSuccessful) { 411 log.warn("Send failed", lastException); 412 throw new IOFailure("Send failed.", lastException); 413 } 414 } 415 416 /** 417 * Do a reconnect to the JMSbroker. Does absolutely nothing, if already in the process of reconnecting. 418 */ 419 protected void reconnect() { 420 if (!connectionLock.writeLock().tryLock()) { 421 log.debug("Reconnection already in progress. Do nothing"); 422 return; 423 } 424 try { 425 log.info("Trying to reconnect to jmsbroker"); 426 427 boolean operationSuccessful = false; 428 Exception lastException = null; 429 int tries = 0; 430 while (!operationSuccessful && tries < JMS_MAXTRIES) { 431 ++tries; 432 try { 433 doReconnect(); 434 operationSuccessful = true; 435 } catch (Exception e) { 436 lastException = e; 437 log.debug("Reconnect failed (try {})", tries, e); 438 if (tries < JMS_MAXTRIES) { 439 log.debug("Will sleep a while before trying to reconnect again"); 440 TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE); 441 } 442 } 443 } 444 if (!operationSuccessful) { 445 log.warn("Reconnect to JMS broker failed", lastException); 446 closeConnection(); 447 } 448 } finally { 449 // Tell everybody, that we are not trying to reconnect any longer 450 connectionLock.writeLock().unlock(); 451 } 452 } 453 454 /** 455 * Helper method for getting the right producer for a queue or topic. 456 * 457 * @param queueName The name of the channel 458 * @return The producer for that channel. A new one is created, if none exists. 459 * @throws JMSException If a new producer cannot be created. 460 */ 461 private MessageProducer getProducer(String queueName) throws JMSException { 462 // Check if producer is in cache 463 // If it is not, it is created and stored in cache: 464 MessageProducer producer = producers.get(queueName); 465 if (producer == null) { 466 producer = getSession().createProducer(getDestination(queueName)); 467 producers.put(queueName, producer); 468 } 469 return producer; 470 } 471 472 /** 473 * Get the session. Will try reconnecting if session is null. 474 * 475 * @return The session. 476 * @throws IOFailure if no session is available, and reconnect does not help. 477 */ 478 private Session getSession() { 479 if (session == null) { 480 reconnect(); 481 } 482 if (session == null) { 483 throw new IOFailure("Session not available"); 484 } 485 return session; 486 } 487 488 /** 489 * Helper method for getting the right consumer for a queue or topic, and message listener. 490 * 491 * @param channelName The name of the channel 492 * @param ml The message listener to add as listener to the channel 493 * @return The producer for that channel. A new one is created, if none exists. 494 * @throws JMSException If a new producer cannot be created. 495 */ 496 private MessageConsumer getConsumer(String channelName, MessageListener ml) throws JMSException { 497 String key = getConsumerKey(channelName, ml); 498 MessageConsumer consumer = consumers.get(key); 499 if (consumer == null) { 500 consumer = getSession().createConsumer(getDestination(channelName)); 501 consumers.put(key, consumer); 502 listeners.put(key, ml); 503 } 504 return consumer; 505 } 506 507 /** 508 * Generate a consumerkey based on the given channel name and messageListener. 509 * 510 * @param channel Channel name 511 * @param messageListener a messageListener 512 * @return the generated consumerkey. 513 */ 514 protected static String getConsumerKey(String channel, MessageListener messageListener) { 515 return channel + CONSUMER_KEY_SEPARATOR + messageListener; 516 } 517 518 /** 519 * Get the channelName embedded in a consumerKey. 520 * 521 * @param consumerKey a consumerKey 522 * @return name of channel embedded in a consumerKey 523 */ 524 private static String getChannelName(String consumerKey) { 525 // assumes argument consumerKey was created using metod getConsumerKey() 526 return consumerKey.split(CONSUMER_KEY_SEPARATOR)[0]; 527 } 528 529 /** 530 * Helper method to establish one Connection and associated Session. 531 * 532 * @throws JMSException If some JMS error occurred during the creation of the required JMS connection and session 533 */ 534 private void establishConnectionAndSession() throws JMSException { 535 // Establish a queue connection and a session 536 connection = getConnectionFactory().createConnection(); 537 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 538 connection.setExceptionListener(this); 539 connection.start(); 540 } 541 542 /** 543 * Sends an ObjectMessage on a queue destination. 544 * 545 * @param msg the NetarkivetMessage to be wrapped and send as an ObjectMessage. 546 * @param to the destination topic. 547 * @throws JMSException if message failed to be sent. 548 */ 549 private void doSend(NetarkivetMessage msg, ChannelID to) throws JMSException { 550 connectionLock.readLock().lock(); 551 try { 552 ObjectMessage message = getSession().createObjectMessage(msg); 553 synchronized (msg) { 554 getProducer(to.getName()).send(message); 555 // Note: Id is only updated if the message does not already have 556 // an id. This ensures that resent messages keep the same ID 557 // TODO Is it always OK for resent messages to keep the same ID 558 559 // FIXME Solution for NAS-2043 doesn't work; rolled back 560 // String randomID = UUID.randomUUID().toString(); 561 // msg.updateId(randomID); 562 msg.updateId(message.getJMSMessageID()); 563 564 } 565 } finally { 566 connectionLock.readLock().unlock(); 567 } 568 log.trace("Sent message '{}'", msg.toString()); 569 } 570 571 /** 572 * Method adds a listener to the given queue or topic. 573 * 574 * @param channelName the messagequeue to listen to 575 * @param ml the messagelistener 576 * @throws IOFailure if the operation failed. 577 */ 578 private void setListener(String channelName, MessageListener ml) { 579 log.debug("Adding {} as listener to {}", ml.toString(), channelName); 580 String errMsg = "JMS-error - could not add Listener to queue/topic: " + channelName; 581 582 int tries = 0; 583 boolean operationSuccessful = false; 584 Exception lastException = null; 585 while (!operationSuccessful && tries < JMS_MAXTRIES) { 586 ++tries; 587 try { 588 connectionLock.readLock().lock(); 589 try { 590 getConsumer(channelName, ml).setMessageListener(ml); 591 } finally { 592 connectionLock.readLock().unlock(); 593 } 594 operationSuccessful = true; 595 } catch (JMSException e) { 596 lastException = e; 597 log.debug("Set listener failed (try {})", tries, e); 598 if (tries < JMS_MAXTRIES) { 599 onException(e); 600 log.debug("Will sleep a while before trying to set listener again"); 601 TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE); 602 } 603 } catch (Exception e) { 604 lastException = e; 605 log.debug("Set listener failed (try {})", tries, e); 606 if (tries < JMS_MAXTRIES) { 607 reconnect(); 608 log.debug("Will sleep a while before trying to set listener again"); 609 TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE); 610 } 611 } 612 } 613 614 if (!operationSuccessful) { 615 log.warn(errMsg, lastException); 616 throw new IOFailure(errMsg, lastException); 617 } 618 } 619 620 /** 621 * Remove a messagelistener from a channel (a queue or a topic). 622 * 623 * @param ml A specific MessageListener 624 * @param channelName a channelname 625 */ 626 private void removeListener(MessageListener ml, String channelName) { 627 String errMsg = "JMS-error - could not remove Listener from queue/topic: " + channelName; 628 int tries = 0; 629 Exception lastException = null; 630 boolean operationSuccessful = false; 631 632 log.info("Removing listener from channel '{}'", channelName); 633 while (!operationSuccessful && tries < JMS_MAXTRIES) { 634 try { 635 ++tries; 636 connectionLock.readLock().lock(); 637 try { 638 MessageConsumer messageConsumer = getConsumer(channelName, ml); 639 messageConsumer.close(); 640 consumers.remove(getConsumerKey(channelName, ml)); 641 listeners.remove(getConsumerKey(channelName, ml)); 642 } finally { 643 connectionLock.readLock().unlock(); 644 } 645 operationSuccessful = true; 646 } catch (JMSException e) { 647 lastException = e; 648 log.debug("Remove listener failed (try {})", tries, e); 649 onException(e); 650 log.debug("Will and sleep a while before trying to remove listener again"); 651 TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE); 652 } catch (Exception e) { 653 lastException = e; 654 log.debug("Remove listener failed (try {})", tries, e); 655 reconnect(); 656 log.debug("Will and sleep a while before trying to remove listener again"); 657 TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE); 658 } 659 } 660 if (!operationSuccessful) { 661 log.warn(errMsg, lastException); 662 throw new IOFailure(errMsg, lastException); 663 } 664 } 665 666 /** 667 * Reconnect to JMSBroker and reestablish session. Resets senders and publishers. 668 * 669 * @throws JMSException If unable to reconnect to JMSBroker and/or reestablish sessions 670 */ 671 private void doReconnect() throws JMSException { 672 closeConnection(); 673 establishConnectionAndSession(); 674 // Add listeners already stored in the consumers map 675 log.debug("Re-add listeners"); 676 for (Map.Entry<String, MessageListener> listener : listeners.entrySet()) { 677 setListener(getChannelName(listener.getKey()), listener.getValue()); 678 } 679 log.info("Reconnect successful"); 680 } 681 682}