001/*
002 * #%L
003 * Netarchivesuite - common
004 * %%
005 * Copyright (C) 2005 - 2018 The Royal Danish Library, 
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.common.distribute;
024
025import java.util.Calendar;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.Map;
029import java.util.concurrent.locks.ReentrantReadWriteLock;
030
031import javax.jms.Connection;
032import javax.jms.ConnectionFactory;
033import javax.jms.Destination;
034import javax.jms.ExceptionListener;
035import javax.jms.JMSException;
036import javax.jms.Message;
037import javax.jms.MessageConsumer;
038import javax.jms.MessageListener;
039import javax.jms.MessageProducer;
040import javax.jms.ObjectMessage;
041import javax.jms.Queue;
042import javax.jms.QueueBrowser;
043import javax.jms.QueueSession;
044import javax.jms.Session;
045
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049import dk.netarkivet.common.CommonSettings;
050import dk.netarkivet.common.exceptions.ArgumentNotValid;
051import dk.netarkivet.common.exceptions.IOFailure;
052import dk.netarkivet.common.exceptions.PermissionDenied;
053import dk.netarkivet.common.utils.CleanupHook;
054import dk.netarkivet.common.utils.CleanupIF;
055import dk.netarkivet.common.utils.Settings;
056import dk.netarkivet.common.utils.TimeUtils;
057
058/**
059 * Handles the communication with a JMS broker. Note on Thread-safety: the methods and fields of JMSConnection are not
060 * accessed by multiple threads (though JMSConnection itself creates threads). Thus no synchronization is needed on
061 * methods and fields of JMSConnection. A shutdown hook is also added, which closes the connection. Class JMSConnection
062 * is now also a exceptionhandler for the JMS Connections
063 */
064public abstract class JMSConnection implements ExceptionListener, CleanupIF {
065
066    /** The log. */
067    private static final Logger log = LoggerFactory.getLogger(JMSConnection.class);
068
069    /** Separator used in the consumer key. Separates the ChannelName from the MessageListener.toString(). */
070    protected static final String CONSUMER_KEY_SEPARATOR = "##";
071
072    /** The number to times to (re)try whenever a JMSException is thrown. */
073    static final int JMS_MAXTRIES = Settings.getInt(CommonSettings.JMS_BROKER_RETRIES);
074
075    /** The JMS Connection. */
076    protected Connection connection;
077
078    /**
079     * The Session handling messages sent to / received from the NetarchiveSuite queues and topics.
080     */
081    protected Session session;
082
083    /** Map for caching message producers. */
084    protected final Map<String, MessageProducer> producers = Collections
085            .synchronizedMap(new HashMap<String, MessageProducer>());
086
087    /** Map for caching message consumers (topic-subscribers and queue-receivers). */
088    protected final Map<String, MessageConsumer> consumers = Collections
089            .synchronizedMap(new HashMap<String, MessageConsumer>());
090
091    /** Map for caching message listeners (topic-subscribers and queue-receivers). */
092    protected final Map<String, MessageListener> listeners = Collections
093            .synchronizedMap(new HashMap<String, MessageListener>());
094
095    /**
096     * Lock for the connection. Locked for read on adding/removing listeners and sending messages. Locked for write when
097     * connection, releasing and reconnecting.
098     */
099    protected final ReentrantReadWriteLock connectionLock = new ReentrantReadWriteLock();
100
101    /** Shutdown hook that closes the JMS connection. */
102    protected Thread closeHook;
103
104    /** Singleton pattern is be used for this class. This is the one and only instance. */
105    protected static JMSConnection instance;
106
107    /**
108     * Should be implemented according to a specific JMS broker.
109     *
110     * @return QueueConnectionFactory
111     * @throws JMSException If unable to get QueueConnectionFactory
112     */
113    protected abstract ConnectionFactory getConnectionFactory() throws JMSException;
114
115    /**
116     * Should be implemented according to a specific JMS broker.
117     *
118     * @param destinationName the name of the wanted Queue
119     * @return The destination. Note that the implementation should make sure that this is a Queue or a Topic, as
120     * required by the NetarchiveSuite design. {@link Channels#isTopic(String)}
121     * @throws JMSException If unable to get a destination.
122     */
123    protected abstract Destination getDestination(String destinationName) throws JMSException;
124
125    /**
126     * Exceptionhandler for the JMSConnection. Implemented according to a specific JMS broker. Should try to reconnect
127     * if at all possible.
128     *
129     * @param e a JMSException
130     */
131    public abstract void onException(JMSException e);
132
133    /** Class constructor. */
134    protected JMSConnection() {
135    }
136
137    /**
138     * Initializes the JMS connection. Creates and starts connection and session. Adds a shutdown hook that closes down
139     * JMSConnection. Adds this object as ExceptionListener for the connection.
140     *
141     * @throws IOFailure if initialization fails.
142     */
143    protected void initConnection() throws IOFailure {
144        log.debug("Initializing a JMS connection {}", getClass().getName());
145
146        connectionLock.writeLock().lock();
147        try {
148            int tries = 0;
149            JMSException lastException = null;
150            boolean operationSuccessful = false;
151            while (!operationSuccessful && tries < JMS_MAXTRIES) {
152                ++tries;
153                try {
154                    establishConnectionAndSession();
155                    operationSuccessful = true;
156                } catch (JMSException e) {
157                    closeConnection();
158                    log.debug("Connect failed (try {})", tries, e);
159                    lastException = e;
160                    if (tries < JMS_MAXTRIES) {
161                        log.debug("Will sleep a while before trying to connect again");
162                        TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE);
163                    }
164                }
165            }
166            if (!operationSuccessful) {
167                log.warn("Could not initialize JMS connection {}", getClass(), lastException);
168                cleanup();
169                throw new IOFailure("Could not initialize JMS connection " + getClass(), lastException);
170            }
171            closeHook = new CleanupHook(this);
172            Runtime.getRuntime().addShutdownHook(closeHook);
173        } finally {
174            connectionLock.writeLock().unlock();
175        }
176    }
177
178    /**
179     * Submit an object to the destination queue. This method shouldn't be overridden. Override the method sendMessage
180     * to change functionality.
181     *
182     * @param msg The NetarkivetMessage to send to the destination queue (null not allowed)
183     * @throws ArgumentNotValid if nMsg is null.
184     * @throws IOFailure if the operation failed.
185     */
186    public void send(NetarkivetMessage msg) {
187        ArgumentNotValid.checkNotNull(msg, "msg");
188        log.trace("Sending message ({}) to {}", msg.toString(), msg.getTo());
189        sendMessage(msg, msg.getTo());
190    }
191
192    /**
193     * Sends a message msg to the channel defined by the parameter to - NOT the channel defined in the message.
194     *
195     * @param msg Message to be sent
196     * @param to The destination channel
197     */
198    public final void resend(NetarkivetMessage msg, ChannelID to) {
199        ArgumentNotValid.checkNotNull(msg, "msg");
200        ArgumentNotValid.checkNotNull(to, "to");
201        log.trace("Resending message ({}) to {}", msg.toString(), to.getName());
202        sendMessage(msg, to);
203    }
204
205    /**
206     * Submit an object to the reply queue.
207     *
208     * @param msg The NetarkivetMessage to send to the reply queue (null not allowed)
209     * @throws ArgumentNotValid if nMsg is null.
210     * @throws PermissionDenied if message nMsg has not been sent yet.
211     * @throws IOFailure if unable to reply.
212     */
213    public final void reply(NetarkivetMessage msg) {
214        ArgumentNotValid.checkNotNull(msg, "msg");
215        log.trace("Reply on message ({}) to {}", msg.toString(), msg.getReplyTo().getName());
216        if (!msg.hasBeenSent()) {
217            throw new PermissionDenied("Message has not been sent yet");
218        }
219        sendMessage(msg, msg.getReplyTo());
220    }
221
222    /**
223     * Method adds a listener to the given queue or topic.
224     *
225     * @param mq the messagequeue to listen to
226     * @param ml the messagelistener
227     * @throws IOFailure if the operation failed.
228     */
229    public void setListener(ChannelID mq, MessageListener ml) throws IOFailure {
230        ArgumentNotValid.checkNotNull(mq, "ChannelID mq");
231        ArgumentNotValid.checkNotNull(ml, "MessageListener ml");
232        setListener(mq.getName(), ml);
233    }
234
235    /**
236     * Removes the specified MessageListener from the given queue or topic.
237     *
238     * @param mq the given queue or topic
239     * @param ml a messagelistener
240     * @throws IOFailure On network trouble
241     */
242    public void removeListener(ChannelID mq, MessageListener ml) throws IOFailure {
243        ArgumentNotValid.checkNotNull(mq, "ChannelID mq");
244        ArgumentNotValid.checkNotNull(ml, "MessageListener ml");
245        removeListener(ml, mq.getName());
246    }
247
248    /**
249     * Creates a QueueBrowser object to peek at the messages on the specified queue.
250     *
251     * @param queueID The ChannelID for a specified queue.
252     * @return A new QueueBrowser instance with access to the specified queue
253     * @throws JMSException If unable to create the specified queue browser
254     */
255    public QueueBrowser createQueueBrowser(ChannelID queueID) throws JMSException {
256        ArgumentNotValid.checkNotNull(queueID, "ChannelID queueID");
257        Queue queue = getQueueSession().createQueue(queueID.getName());
258        return getQueueSession().createBrowser(queue);
259    }
260    
261    /**
262     * Creates a QueueBrowser object to peek at the messages on the specified queue.
263     *
264     * @param queueID The ChannelID for a specified queue.
265     * @param QSession The QueueSession to use.
266     * @return A new QueueBrowser instance with access to the specified queue
267     * @throws JMSException If unable to create the specified queue browser
268     */
269    public QueueBrowser createQueueBrowser(ChannelID queueID, QueueSession QSession) throws JMSException {
270        ArgumentNotValid.checkNotNull(queueID, "ChannelID queueID");
271        ArgumentNotValid.checkNotNull(QSession, "QueueSession QSession");
272        Queue queue = QSession.createQueue(queueID.getName());
273        return QSession.createBrowser(queue);
274    }
275
276    /**
277     * Provides a QueueSession instance. Functionality for retrieving a <code>QueueSession</code> object isen't
278     * available on the generic <code>JMSConnectionFactory</code>
279     *
280     * @return A <code>QueueSession</code> object connected to the current JMS broker
281     * @throws JMSException Failure to retrieve the <code>QueueBrowser</code> JMS Browser
282     */
283    public abstract QueueSession getQueueSession() throws JMSException;
284
285    /**
286     * Clean up. Remove close connection, remove shutdown hook and null the instance.
287     */
288    public void cleanup() {
289        connectionLock.writeLock().lock();
290        try {
291            // Remove shutdown hook
292            log.info("Starting cleanup");
293            try {
294                if (closeHook != null) {
295                    Runtime.getRuntime().removeShutdownHook(closeHook);
296                }
297            } catch (IllegalStateException e) {
298                // Okay, it just means we are already shutting down.
299            }
300            closeHook = null;
301            // Close session
302            closeConnection();
303            // Clear list of listeners
304            listeners.clear();
305            instance = null;
306            log.info("Cleanup finished");
307        } finally {
308            connectionLock.writeLock().unlock();
309        }
310    }
311
312    /**
313     * Close connection, session and listeners. Will ignore trouble, and simply log it.
314     */
315    private void closeConnection() {
316        // Close terminates all pending message received on the
317        // connection's session's consumers.
318        if (connection != null) { // close connection
319            try {
320                connection.close();
321            } catch (JMSException e) {
322                // Just ignore it
323                log.warn("Error closing JMS Connection.", e);
324            }
325        }
326        connection = null;
327        session = null;
328        consumers.clear();
329        producers.clear();
330    }
331
332    /**
333     * Unwraps a NetarkivetMessage from an ObjectMessage.
334     *
335     * @param msg a javax.jms.ObjectMessage
336     * @return a NetarkivetMessage
337     * @throws ArgumentNotValid when msg in valid or format of JMS Object message is invalid
338     */
339    public static NetarkivetMessage unpack(Message msg) throws ArgumentNotValid {
340        ArgumentNotValid.checkNotNull(msg, "msg");
341
342        ObjectMessage objMsg;
343        try {
344            objMsg = (ObjectMessage) msg;
345        } catch (ClassCastException e) {
346            log.warn("Invalid message type: {}", msg.getClass());
347            throw new ArgumentNotValid("Invalid message type: " + msg.getClass());
348        }
349
350        NetarkivetMessage netMsg;
351        String classname = "Unknown class"; // for error reporting purposes
352        try {
353            classname = objMsg.getObject().getClass().getName();
354            netMsg = (NetarkivetMessage) objMsg.getObject();
355            // Note: Id is only updated if the message does not already have an
356            // id. On unpack, this means the first time the message is received.
357
358            // FIXME Fix for NAS-2043 doesn't seem to work
359            // String randomID = UUID.randomUUID().toString();
360            // netMsg.updateId(randomID);
361
362            netMsg.updateId(msg.getJMSMessageID());
363        } catch (ClassCastException e) {
364            log.warn("Invalid message type: {}", classname, e);
365            throw new ArgumentNotValid("Invalid message type: " + classname, e);
366        } catch (Exception e) {
367            String message = "Message invalid. Unable to unpack message: " + classname;
368            log.warn(message, e);
369            throw new ArgumentNotValid(message, e);
370        }
371        log.trace("Unpacked message '{}'", netMsg);
372        return netMsg;
373    }
374
375    /**
376     * Submit an ObjectMessage to the destination channel.
377     *
378     * @param nMsg the NetarkivetMessage to be wrapped and send as an ObjectMessage
379     * @param to the destination channel
380     * @throws IOFailure if message failed to be sent.
381     */
382    protected void sendMessage(NetarkivetMessage nMsg, ChannelID to) throws IOFailure {
383        Exception lastException = null;
384        boolean operationSuccessful = false;
385        int tries = 0;
386
387        while (!operationSuccessful && tries < JMS_MAXTRIES) {
388            ++tries;
389            try {
390                doSend(nMsg, to);
391                operationSuccessful = true;
392            } catch (JMSException e) {
393                log.debug("Send failed (try {})", tries, e);
394                lastException = e;
395                if (tries < JMS_MAXTRIES) {
396                    onException(e);
397                    log.debug("Will sleep a while before trying to send again");
398                    TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE);
399                }
400            } catch (Exception e) {
401                log.debug("Send failed (try {})", tries, e);
402                lastException = e;
403                if (tries < JMS_MAXTRIES) {
404                    reconnect();
405                    log.debug("Will sleep a while before trying to send again");
406                    TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE);
407                }
408            }
409        }
410        if (!operationSuccessful) {
411            log.warn("Send failed", lastException);
412            throw new IOFailure("Send failed.", lastException);
413        }
414    }
415
416    /**
417     * Do a reconnect to the JMSbroker. Does absolutely nothing, if already in the process of reconnecting.
418     */
419    protected void reconnect() {
420        if (!connectionLock.writeLock().tryLock()) {
421            log.debug("Reconnection already in progress. Do nothing");
422            return;
423        }
424        try {
425            log.info("Trying to reconnect to jmsbroker");
426
427            boolean operationSuccessful = false;
428            Exception lastException = null;
429            int tries = 0;
430            while (!operationSuccessful && tries < JMS_MAXTRIES) {
431                ++tries;
432                try {
433                    doReconnect();
434                    operationSuccessful = true;
435                } catch (Exception e) {
436                    lastException = e;
437                    log.debug("Reconnect failed (try {})", tries, e);
438                    if (tries < JMS_MAXTRIES) {
439                        log.debug("Will sleep a while before trying to reconnect again");
440                        TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE);
441                    }
442                }
443            }
444            if (!operationSuccessful) {
445                log.warn("Reconnect to JMS broker failed", lastException);
446                closeConnection();
447            }
448        } finally {
449            // Tell everybody, that we are not trying to reconnect any longer
450            connectionLock.writeLock().unlock();
451        }
452    }
453
454    /**
455     * Helper method for getting the right producer for a queue or topic.
456     *
457     * @param queueName The name of the channel
458     * @return The producer for that channel. A new one is created, if none exists.
459     * @throws JMSException If a new producer cannot be created.
460     */
461    private MessageProducer getProducer(String queueName) throws JMSException {
462        // Check if producer is in cache
463        // If it is not, it is created and stored in cache:
464        MessageProducer producer = producers.get(queueName);
465        if (producer == null) {
466            producer = getSession().createProducer(getDestination(queueName));
467            producers.put(queueName, producer);
468        }
469        return producer;
470    }
471
472    /**
473     * Get the session. Will try reconnecting if session is null.
474     *
475     * @return The session.
476     * @throws IOFailure if no session is available, and reconnect does not help.
477     */
478    private Session getSession() {
479        if (session == null) {
480            reconnect();
481        }
482        if (session == null) {
483            throw new IOFailure("Session not available");
484        }
485        return session;
486    }
487
488    /**
489     * Helper method for getting the right consumer for a queue or topic, and message listener.
490     *
491     * @param channelName The name of the channel
492     * @param ml The message listener to add as listener to the channel
493     * @return The producer for that channel. A new one is created, if none exists.
494     * @throws JMSException If a new producer cannot be created.
495     */
496    private MessageConsumer getConsumer(String channelName, MessageListener ml) throws JMSException {
497        String key = getConsumerKey(channelName, ml);
498        MessageConsumer consumer = consumers.get(key);
499        if (consumer == null) {
500            consumer = getSession().createConsumer(getDestination(channelName));
501            consumers.put(key, consumer);
502            listeners.put(key, ml);
503        }
504        return consumer;
505    }
506
507    /**
508     * Generate a consumerkey based on the given channel name and messageListener.
509     *
510     * @param channel Channel name
511     * @param messageListener a messageListener
512     * @return the generated consumerkey.
513     */
514    protected static String getConsumerKey(String channel, MessageListener messageListener) {
515        return channel + CONSUMER_KEY_SEPARATOR + messageListener;
516    }
517
518    /**
519     * Get the channelName embedded in a consumerKey.
520     *
521     * @param consumerKey a consumerKey
522     * @return name of channel embedded in a consumerKey
523     */
524    private static String getChannelName(String consumerKey) {
525        // assumes argument consumerKey was created using metod getConsumerKey()
526        return consumerKey.split(CONSUMER_KEY_SEPARATOR)[0];
527    }
528
529    /**
530     * Helper method to establish one Connection and associated Session.
531     *
532     * @throws JMSException If some JMS error occurred during the creation of the required JMS connection and session
533     */
534    private void establishConnectionAndSession() throws JMSException {
535        // Establish a queue connection and a session
536        connection = getConnectionFactory().createConnection();
537        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
538        connection.setExceptionListener(this);
539        connection.start();
540    }
541
542    /**
543     * Sends an ObjectMessage on a queue destination.
544     *
545     * @param msg the NetarkivetMessage to be wrapped and send as an ObjectMessage.
546     * @param to the destination topic.
547     * @throws JMSException if message failed to be sent.
548     */
549    private void doSend(NetarkivetMessage msg, ChannelID to) throws JMSException {
550        connectionLock.readLock().lock();
551        try {
552            ObjectMessage message = getSession().createObjectMessage(msg);
553            synchronized (msg) {
554                getProducer(to.getName()).send(message);
555                // Note: Id is only updated if the message does not already have
556                // an id. This ensures that resent messages keep the same ID
557                // TODO Is it always OK for resent messages to keep the same ID
558
559                // FIXME Solution for NAS-2043 doesn't work; rolled back
560                // String randomID = UUID.randomUUID().toString();
561                // msg.updateId(randomID);
562                msg.updateId(message.getJMSMessageID());
563
564            }
565        } finally {
566            connectionLock.readLock().unlock();
567        }
568        log.trace("Sent message '{}'", msg.toString());
569    }
570
571    /**
572     * Method adds a listener to the given queue or topic.
573     *
574     * @param channelName the messagequeue to listen to
575     * @param ml the messagelistener
576     * @throws IOFailure if the operation failed.
577     */
578    private void setListener(String channelName, MessageListener ml) {
579        log.debug("Adding {} as listener to {}", ml.toString(), channelName);
580        String errMsg = "JMS-error - could not add Listener to queue/topic: " + channelName;
581
582        int tries = 0;
583        boolean operationSuccessful = false;
584        Exception lastException = null;
585        while (!operationSuccessful && tries < JMS_MAXTRIES) {
586            ++tries;
587            try {
588                connectionLock.readLock().lock();
589                try {
590                    getConsumer(channelName, ml).setMessageListener(ml);
591                } finally {
592                    connectionLock.readLock().unlock();
593                }
594                operationSuccessful = true;
595            } catch (JMSException e) {
596                lastException = e;
597                log.debug("Set listener failed (try {})", tries, e);
598                if (tries < JMS_MAXTRIES) {
599                    onException(e);
600                    log.debug("Will sleep a while before trying to set listener again");
601                    TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE);
602                }
603            } catch (Exception e) {
604                lastException = e;
605                log.debug("Set listener failed (try {})", tries, e);
606                if (tries < JMS_MAXTRIES) {
607                    reconnect();
608                    log.debug("Will sleep a while before trying to set listener again");
609                    TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE);
610                }
611            }
612        }
613
614        if (!operationSuccessful) {
615            log.warn(errMsg, lastException);
616            throw new IOFailure(errMsg, lastException);
617        }
618    }
619
620    /**
621     * Remove a messagelistener from a channel (a queue or a topic).
622     *
623     * @param ml A specific MessageListener
624     * @param channelName a channelname
625     */
626    private void removeListener(MessageListener ml, String channelName) {
627        String errMsg = "JMS-error - could not remove Listener from queue/topic: " + channelName;
628        int tries = 0;
629        Exception lastException = null;
630        boolean operationSuccessful = false;
631
632        log.info("Removing listener from channel '{}'", channelName);
633        while (!operationSuccessful && tries < JMS_MAXTRIES) {
634            try {
635                ++tries;
636                connectionLock.readLock().lock();
637                try {
638                    MessageConsumer messageConsumer = getConsumer(channelName, ml);
639                    messageConsumer.close();
640                    consumers.remove(getConsumerKey(channelName, ml));
641                    listeners.remove(getConsumerKey(channelName, ml));
642                } finally {
643                    connectionLock.readLock().unlock();
644                }
645                operationSuccessful = true;
646            } catch (JMSException e) {
647                lastException = e;
648                log.debug("Remove listener failed (try {})", tries, e);
649                onException(e);
650                log.debug("Will and sleep a while before trying to remove listener again");
651                TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE);
652            } catch (Exception e) {
653                lastException = e;
654                log.debug("Remove  listener failed (try {})", tries, e);
655                reconnect();
656                log.debug("Will and sleep a while before trying to remove listener again");
657                TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE);
658            }
659        }
660        if (!operationSuccessful) {
661            log.warn(errMsg, lastException);
662            throw new IOFailure(errMsg, lastException);
663        }
664    }
665
666    /**
667     * Reconnect to JMSBroker and reestablish session. Resets senders and publishers.
668     *
669     * @throws JMSException If unable to reconnect to JMSBroker and/or reestablish sessions
670     */
671    private void doReconnect() throws JMSException {
672        closeConnection();
673        establishConnectionAndSession();
674        // Add listeners already stored in the consumers map
675        log.debug("Re-add listeners");
676        for (Map.Entry<String, MessageListener> listener : listeners.entrySet()) {
677            setListener(getChannelName(listener.getKey()), listener.getValue());
678        }
679        log.info("Reconnect successful");
680    }
681
682}