package dk.netarkivet.common.distribute;

import dk.netarkivet.common.CommonSettings;
import dk.netarkivet.common.exceptions.ArgumentNotValid;
import dk.netarkivet.common.exceptions.IOFailure;
import dk.netarkivet.common.exceptions.PermissionDenied;
import dk.netarkivet.common.utils.CleanupHook;
import dk.netarkivet.common.utils.CleanupIF;
import dk.netarkivet.common.utils.Settings;
import dk.netarkivet.common.utils.TimeUtils;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.QueueBrowser;
import javax.jms.QueueSession;
import javax.jms.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dk/netarkivet/common/distribute/JMSConnection.class */
public abstract class JMSConnection implements ExceptionListener, CleanupIF {
    protected static final String CONSUMER_KEY_SEPARATOR = "##";
    protected Connection connection;
    protected Session session;
    protected final Map<String, MessageProducer> producers = Collections.synchronizedMap(new HashMap());
    protected final Map<String, MessageConsumer> consumers = Collections.synchronizedMap(new HashMap());
    protected final Map<String, MessageListener> listeners = Collections.synchronizedMap(new HashMap());
    protected final ReentrantReadWriteLock connectionLock = new ReentrantReadWriteLock();
    protected Thread closeHook;
    protected static JMSConnection instance;
    private static final Logger log = LoggerFactory.getLogger(JMSConnection.class);
    static final int JMS_MAXTRIES = Settings.getInt(CommonSettings.JMS_BROKER_RETRIES);

    /* renamed from: getConnectionFactory */
    protected abstract ConnectionFactory mo15getConnectionFactory() throws JMSException;

    protected abstract Destination getDestination(String str) throws JMSException;

    public abstract void onException(JMSException jMSException);

    /* JADX INFO: Access modifiers changed from: protected */
    public void initConnection() throws IOFailure {
        log.debug("Initializing a JMS connection {}", getClass().getName());
        this.connectionLock.writeLock().lock();
        int i = 0;
        JMSException jMSException = null;
        boolean z = false;
        while (!z) {
            try {
                if (i >= JMS_MAXTRIES) {
                    break;
                }
                i++;
                try {
                    establishConnectionAndSession();
                    z = true;
                } catch (JMSException e) {
                    closeConnection();
                    log.debug("Connect failed (try {})", Integer.valueOf(i), e);
                    jMSException = e;
                    if (i < JMS_MAXTRIES) {
                        log.debug("Will sleep a while before trying to connect again");
                        TimeUtils.exponentialBackoffSleep(i, 12);
                    }
                }
            } catch (Throwable th) {
                this.connectionLock.writeLock().unlock();
                throw th;
            }
        }
        if (!z) {
            log.warn("Could not initialize JMS connection {}", getClass(), jMSException);
            cleanup();
            throw new IOFailure("Could not initialize JMS connection " + getClass(), jMSException);
        }
        this.closeHook = new CleanupHook(this);
        Runtime.getRuntime().addShutdownHook(this.closeHook);
        this.connectionLock.writeLock().unlock();
    }

    public void send(NetarkivetMessage netarkivetMessage) {
        ArgumentNotValid.checkNotNull(netarkivetMessage, "msg");
        log.trace("Sending message ({}) to {}", netarkivetMessage.toString(), netarkivetMessage.getTo());
        sendMessage(netarkivetMessage, netarkivetMessage.getTo());
    }

    public final void resend(NetarkivetMessage netarkivetMessage, ChannelID channelID) {
        ArgumentNotValid.checkNotNull(netarkivetMessage, "msg");
        ArgumentNotValid.checkNotNull(channelID, "to");
        log.trace("Resending message ({}) to {}", netarkivetMessage.toString(), channelID.getName());
        sendMessage(netarkivetMessage, channelID);
    }

    public final void reply(NetarkivetMessage netarkivetMessage) {
        ArgumentNotValid.checkNotNull(netarkivetMessage, "msg");
        log.trace("Reply on message ({}) to {}", netarkivetMessage.toString(), netarkivetMessage.getReplyTo().getName());
        if (!netarkivetMessage.hasBeenSent()) {
            throw new PermissionDenied("Message has not been sent yet");
        }
        sendMessage(netarkivetMessage, netarkivetMessage.getReplyTo());
    }

    public void setListener(ChannelID channelID, MessageListener messageListener) throws IOFailure {
        ArgumentNotValid.checkNotNull(channelID, "ChannelID mq");
        ArgumentNotValid.checkNotNull(messageListener, "MessageListener ml");
        setListener(channelID.getName(), messageListener);
    }

    public void removeListener(ChannelID channelID, MessageListener messageListener) throws IOFailure {
        ArgumentNotValid.checkNotNull(channelID, "ChannelID mq");
        ArgumentNotValid.checkNotNull(messageListener, "MessageListener ml");
        removeListener(messageListener, channelID.getName());
    }

    public QueueBrowser createQueueBrowser(ChannelID channelID) throws JMSException {
        ArgumentNotValid.checkNotNull(channelID, "ChannelID queueID");
        return getQueueSession().createBrowser(getQueueSession().createQueue(channelID.getName()));
    }

    public QueueBrowser createQueueBrowser(ChannelID channelID, QueueSession queueSession) throws JMSException {
        ArgumentNotValid.checkNotNull(channelID, "ChannelID queueID");
        ArgumentNotValid.checkNotNull(queueSession, "QueueSession QSession");
        return queueSession.createBrowser(queueSession.createQueue(channelID.getName()));
    }

    public abstract QueueSession getQueueSession() throws JMSException;

    @Override // dk.netarkivet.common.utils.CleanupIF
    public void cleanup() {
        this.connectionLock.writeLock().lock();
        try {
            log.info("Starting cleanup");
            try {
                if (this.closeHook != null) {
                    Runtime.getRuntime().removeShutdownHook(this.closeHook);
                }
            } catch (IllegalStateException e) {
            }
            this.closeHook = null;
            closeConnection();
            this.listeners.clear();
            instance = null;
            log.info("Cleanup finished");
        } finally {
            this.connectionLock.writeLock().unlock();
        }
    }

    private void closeConnection() {
        if (this.connection != null) {
            try {
                this.connection.close();
            } catch (JMSException e) {
                log.warn("Error closing JMS Connection.", e);
            }
        }
        this.connection = null;
        this.session = null;
        this.consumers.clear();
        this.producers.clear();
    }

    public static NetarkivetMessage unpack(Message message) throws ArgumentNotValid {
        ArgumentNotValid.checkNotNull(message, "msg");
        try {
            ObjectMessage objectMessage = (ObjectMessage) message;
            String str = "Unknown class";
            try {
                str = objectMessage.getObject().getClass().getName();
                NetarkivetMessage netarkivetMessage = (NetarkivetMessage) objectMessage.getObject();
                netarkivetMessage.updateId(message.getJMSMessageID());
                log.trace("Unpacked message '{}'", netarkivetMessage);
                return netarkivetMessage;
            } catch (ClassCastException e) {
                log.warn("Invalid message type: {}", str, e);
                throw new ArgumentNotValid("Invalid message type: " + str, e);
            } catch (Exception e2) {
                String str2 = "Message invalid. Unable to unpack message: " + str;
                log.warn(str2, e2);
                throw new ArgumentNotValid(str2, e2);
            }
        } catch (ClassCastException e3) {
            log.warn("Invalid message type: {}", message.getClass());
            throw new ArgumentNotValid("Invalid message type: " + message.getClass());
        }
    }

    protected void sendMessage(NetarkivetMessage netarkivetMessage, ChannelID channelID) throws IOFailure {
        Exception exc = null;
        boolean z = false;
        int i = 0;
        while (!z && i < JMS_MAXTRIES) {
            i++;
            try {
                doSend(netarkivetMessage, channelID);
                z = true;
            } catch (Exception e) {
                log.debug("Send failed (try {})", Integer.valueOf(i), e);
                exc = e;
                if (i < JMS_MAXTRIES) {
                    reconnect();
                    log.debug("Will sleep a while before trying to send again");
                    TimeUtils.exponentialBackoffSleep(i, 12);
                }
            } catch (JMSException e2) {
                log.debug("Send failed (try {})", Integer.valueOf(i), e2);
                exc = e2;
                if (i < JMS_MAXTRIES) {
                    onException(e2);
                    log.debug("Will sleep a while before trying to send again");
                    TimeUtils.exponentialBackoffSleep(i, 12);
                }
            }
        }
        if (z) {
            return;
        }
        log.warn("Send failed", exc);
        throw new IOFailure("Send failed.", exc);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void reconnect() {
        if (!this.connectionLock.writeLock().tryLock()) {
            log.debug("Reconnection already in progress. Do nothing");
            return;
        }
        try {
            log.info("Trying to reconnect to jmsbroker");
            boolean z = false;
            Exception exc = null;
            int i = 0;
            while (!z && i < JMS_MAXTRIES) {
                i++;
                try {
                    doReconnect();
                    z = true;
                } catch (Exception e) {
                    exc = e;
                    log.debug("Reconnect failed (try {})", Integer.valueOf(i), e);
                    if (i < JMS_MAXTRIES) {
                        log.debug("Will sleep a while before trying to reconnect again");
                        TimeUtils.exponentialBackoffSleep(i, 12);
                    }
                }
            }
            if (!z) {
                log.warn("Reconnect to JMS broker failed", exc);
                closeConnection();
            }
        } finally {
            this.connectionLock.writeLock().unlock();
        }
    }

    private MessageProducer getProducer(String str) throws JMSException {
        MessageProducer messageProducer = this.producers.get(str);
        if (messageProducer == null) {
            messageProducer = getSession().createProducer(getDestination(str));
            this.producers.put(str, messageProducer);
        }
        return messageProducer;
    }

    private Session getSession() {
        if (this.session == null) {
            reconnect();
        }
        if (this.session == null) {
            throw new IOFailure("Session not available");
        }
        return this.session;
    }

    private MessageConsumer getConsumer(String str, MessageListener messageListener) throws JMSException {
        String consumerKey = getConsumerKey(str, messageListener);
        MessageConsumer messageConsumer = this.consumers.get(consumerKey);
        if (messageConsumer == null) {
            messageConsumer = getSession().createConsumer(getDestination(str));
            this.consumers.put(consumerKey, messageConsumer);
            this.listeners.put(consumerKey, messageListener);
        }
        return messageConsumer;
    }

    protected static String getConsumerKey(String str, MessageListener messageListener) {
        return str + "##" + messageListener;
    }

    private static String getChannelName(String str) {
        return str.split("##")[0];
    }

    private void establishConnectionAndSession() throws JMSException {
        this.connection = mo15getConnectionFactory().createConnection();
        this.session = this.connection.createSession(false, 1);
        this.connection.setExceptionListener(this);
        this.connection.start();
    }

    private void doSend(NetarkivetMessage netarkivetMessage, ChannelID channelID) throws JMSException {
        this.connectionLock.readLock().lock();
        try {
            ObjectMessage createObjectMessage = getSession().createObjectMessage(netarkivetMessage);
            synchronized (netarkivetMessage) {
                getProducer(channelID.getName()).send(createObjectMessage);
                netarkivetMessage.updateId(createObjectMessage.getJMSMessageID());
            }
            log.trace("Sent message '{}'", netarkivetMessage.toString());
        } finally {
            this.connectionLock.readLock().unlock();
        }
    }

    private void setListener(String str, MessageListener messageListener) {
        log.debug("Adding {} as listener to {}", messageListener.toString(), str);
        String str2 = "JMS-error - could not add Listener to queue/topic: " + str;
        int i = 0;
        boolean z = false;
        Exception exc = null;
        while (!z && i < JMS_MAXTRIES) {
            i++;
            try {
                this.connectionLock.readLock().lock();
                try {
                    getConsumer(str, messageListener).setMessageListener(messageListener);
                    this.connectionLock.readLock().unlock();
                    z = true;
                } catch (Throwable th) {
                    this.connectionLock.readLock().unlock();
                    throw th;
                    break;
                }
            } catch (Exception e) {
                exc = e;
                log.debug("Set listener failed (try {})", Integer.valueOf(i), e);
                if (i < JMS_MAXTRIES) {
                    reconnect();
                    log.debug("Will sleep a while before trying to set listener again");
                    TimeUtils.exponentialBackoffSleep(i, 12);
                }
            } catch (JMSException e2) {
                exc = e2;
                log.debug("Set listener failed (try {})", Integer.valueOf(i), e2);
                if (i < JMS_MAXTRIES) {
                    onException(e2);
                    log.debug("Will sleep a while before trying to set listener again");
                    TimeUtils.exponentialBackoffSleep(i, 12);
                }
            }
        }
        if (z) {
            return;
        }
        log.warn(str2, exc);
        throw new IOFailure(str2, exc);
    }

    private void removeListener(MessageListener messageListener, String str) {
        String str2 = "JMS-error - could not remove Listener from queue/topic: " + str;
        int i = 0;
        JMSException jMSException = null;
        boolean z = false;
        log.info("Removing listener from channel '{}'", str);
        while (!z && i < JMS_MAXTRIES) {
            try {
                i++;
                this.connectionLock.readLock().lock();
                try {
                    getConsumer(str, messageListener).close();
                    this.consumers.remove(getConsumerKey(str, messageListener));
                    this.listeners.remove(getConsumerKey(str, messageListener));
                    this.connectionLock.readLock().unlock();
                    z = true;
                } catch (Throwable th) {
                    this.connectionLock.readLock().unlock();
                    throw th;
                    break;
                }
            } catch (JMSException e) {
                jMSException = e;
                log.debug("Remove listener failed (try {})", Integer.valueOf(i), e);
                onException(e);
                log.debug("Will and sleep a while before trying to remove listener again");
                TimeUtils.exponentialBackoffSleep(i, 12);
            } catch (Exception e2) {
                jMSException = e2;
                log.debug("Remove  listener failed (try {})", Integer.valueOf(i), e2);
                reconnect();
                log.debug("Will and sleep a while before trying to remove listener again");
                TimeUtils.exponentialBackoffSleep(i, 12);
            }
        }
        if (z) {
            return;
        }
        log.warn(str2, jMSException);
        throw new IOFailure(str2, jMSException);
    }

    private void doReconnect() throws JMSException {
        closeConnection();
        establishConnectionAndSession();
        log.debug("Re-add listeners");
        for (Map.Entry<String, MessageListener> entry : this.listeners.entrySet()) {
            setListener(getChannelName(entry.getKey()), entry.getValue());
        }
        log.info("Reconnect successful");
    }
}
