package org.bitrepository.protocol.activemq;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadFactory;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.util.ByteArrayInputStream;
import org.bitrepository.bitrepositorymessages.MessageRequest;
import org.bitrepository.common.DefaultThreadFactory;
import org.bitrepository.common.JaxbHelper;
import org.bitrepository.common.settings.Settings;
import org.bitrepository.protocol.CoordinationLayerException;
import org.bitrepository.protocol.MessageContext;
import org.bitrepository.protocol.MessageVersionValidator;
import org.bitrepository.protocol.OperationType;
import org.bitrepository.protocol.messagebus.MessageBus;
import org.bitrepository.protocol.messagebus.ReceivedMessageHandler;
import org.bitrepository.protocol.messagebus.logger.AlarmMessageLogger;
import org.bitrepository.protocol.messagebus.logger.DeleteFileMessageLogger;
import org.bitrepository.protocol.messagebus.logger.GetAuditTrailsMessageLogger;
import org.bitrepository.protocol.messagebus.logger.GetChecksumsMessageLogger;
import org.bitrepository.protocol.messagebus.logger.GetFileIDsMessageLogger;
import org.bitrepository.protocol.messagebus.logger.GetFileMessageLogger;
import org.bitrepository.protocol.messagebus.logger.GetStatusMessageLogger;
import org.bitrepository.protocol.messagebus.logger.MessageLoggerProvider;
import org.bitrepository.protocol.messagebus.logger.PutFileMessageLogger;
import org.bitrepository.protocol.security.SecurityManager;
import org.bitrepository.settings.repositorysettings.MessageBusConfiguration;
import org.bouncycastle.cms.SignerId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;

/* loaded from: input_file:WEB-INF/lib/bitrepository-core-2.0-SNAPSHOT.jar:org/bitrepository/protocol/activemq/ActiveMQMessageBus.class */
public class ActiveMQMessageBus implements MessageBus {
    public static final String MESSAGE_TYPE_KEY = "org.bitrepository.messages.type";
    public static final String COLLECTION_ID_KEY = "org.bitrepository.messages.collectionid";
    public static final String MESSAGE_SIGNATURE_KEY = "org.bitrepository.messages.signature";
    public static final String MESSAGE_TO_KEY = "org.bitrepository.messages.to";
    public static final boolean TRANSACTED = false;
    private static final String CONSUMER_KEY_SEPARATOR = "#";
    private final Session producerSession;
    private final Session consumerSession;
    private final String clientID;
    private final MessageBusConfiguration configuration;
    private final JaxbHelper jaxbHelper;
    private final Connection connection;
    private final SecurityManager securityManager;
    private final MessageProducer producer;
    private final ReceivedMessageHandler receivedMessageHandler;
    private static final ThreadFactory threadFactory = new DefaultThreadFactory(ActiveMQMessageListener.class.getSimpleName() + "-", 5, false);
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final Map<String, MessageConsumer> consumers = Collections.synchronizedMap(new HashMap());
    private final Map<String, Destination> destinations = new HashMap();
    private String schemaLocation = "BitRepositoryMessages.xsd";
    private final Set<String> componentFilter = new HashSet();
    private final Set<String> collectionFilter = new HashSet();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/bitrepository-core-2.0-SNAPSHOT.jar:org/bitrepository/protocol/activemq/ActiveMQMessageBus$ActiveMQMessageListener.class */
    public class ActiveMQMessageListener implements MessageListener {
        private Logger log = LoggerFactory.getLogger(getClass());
        private final org.bitrepository.protocol.messagebus.MessageListener messageListener;

        public ActiveMQMessageListener(org.bitrepository.protocol.messagebus.MessageListener messageListener) {
            this.messageListener = messageListener;
        }

        @Override // javax.jms.MessageListener
        public void onMessage(Message message) {
            try {
                String stringProperty = message.getStringProperty(ActiveMQMessageBus.MESSAGE_TO_KEY);
                String stringProperty2 = message.getStringProperty(ActiveMQMessageBus.MESSAGE_TYPE_KEY);
                if (stringProperty2.startsWith("Identify") && stringProperty2.endsWith("Request")) {
                    if (!ActiveMQMessageBus.this.componentFilter.isEmpty() && stringProperty != null && !ActiveMQMessageBus.this.componentFilter.contains(stringProperty)) {
                        this.log.trace("Ignoring " + stringProperty2 + " message to other component " + stringProperty);
                        return;
                    }
                    String stringProperty3 = message.getStringProperty(ActiveMQMessageBus.COLLECTION_ID_KEY);
                    if (!ActiveMQMessageBus.this.collectionFilter.isEmpty() && stringProperty3 != null && !ActiveMQMessageBus.this.collectionFilter.contains(stringProperty3)) {
                        this.log.trace("Ignoring message to unknown collection " + stringProperty3);
                        return;
                    }
                }
                String stringProperty4 = message.getStringProperty(ActiveMQMessageBus.MESSAGE_SIGNATURE_KEY);
                String text = ((TextMessage) message).getText();
                this.log.trace("Received xml message: " + text);
                ActiveMQMessageBus.this.jaxbHelper.validate(new ByteArrayInputStream(text.getBytes("UTF-8")));
                org.bitrepository.bitrepositorymessages.Message message2 = (org.bitrepository.bitrepositorymessages.Message) ActiveMQMessageBus.this.jaxbHelper.loadXml(Class.forName("org.bitrepository.bitrepositorymessages." + stringProperty2), new ByteArrayInputStream(text.getBytes("UTF-8")));
                this.log.trace("Checking signature " + stringProperty4);
                SignerId authenticateMessage = ActiveMQMessageBus.this.securityManager.authenticateMessage(text, stringProperty4);
                ActiveMQMessageBus.this.securityManager.authorizeCertificateUse(message2.getFrom(), text, stringProperty4);
                if (message2 instanceof MessageRequest) {
                    ActiveMQMessageBus.this.securityManager.authorizeOperation(message2.getClass().getSimpleName(), text, stringProperty4, message2.getCollectionID());
                }
                MessageVersionValidator.validateMessageVersion(message2);
                MessageLoggerProvider.getInstance().logMessageReceived(message2);
                String str = null;
                if (authenticateMessage != null) {
                    str = ActiveMQMessageBus.this.securityManager.getCertificateFingerprint(authenticateMessage);
                }
                ActiveMQMessageBus.this.receivedMessageHandler.deliver(this.messageListener, message2, new MessageContext(str));
            } catch (SAXException e) {
                this.log.error("Error validating message " + message, (Throwable) e);
            } catch (Exception e2) {
                this.log.error("Error handling message. Received type was '" + ((String) null) + "'.\n{}", (Object) null, e2);
            }
        }
    }

    /* loaded from: input_file:WEB-INF/lib/bitrepository-core-2.0-SNAPSHOT.jar:org/bitrepository/protocol/activemq/ActiveMQMessageBus$MessageBusExceptionListener.class */
    private class MessageBusExceptionListener implements ExceptionListener {
        private MessageBusExceptionListener() {
        }

        @Override // javax.jms.ExceptionListener
        public void onException(JMSException jMSException) {
            ActiveMQMessageBus.this.log.error("JMSException caught: ", (Throwable) jMSException);
        }
    }

    public ActiveMQMessageBus(Settings settings, SecurityManager securityManager) {
        this.configuration = settings.getMessageBusConfiguration();
        this.log.info("Initializing ActiveMQMessageBus:'" + this.configuration + "'.");
        this.securityManager = securityManager;
        this.clientID = settings.getComponentID();
        this.jaxbHelper = new JaxbHelper("xsd/", this.schemaLocation);
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.configuration.getURL());
        registerCustomMessageLoggers();
        try {
            this.connection = activeMQConnectionFactory.createConnection();
            this.connection.setClientID(this.clientID);
            this.connection.setExceptionListener(new MessageBusExceptionListener());
            this.producerSession = this.connection.createSession(false, 1);
            this.consumerSession = this.connection.createSession(false, 1);
            this.producer = this.producerSession.createProducer(null);
            this.producer.setDeliveryMode(1);
            startListeningForMessages();
            this.log.debug("ActiveMQConnection initialized for '" + this.configuration + "'.");
            this.receivedMessageHandler = new ReceivedMessageHandler(settings.getReferenceSettings().getGeneralSettings() != null ? settings.getReferenceSettings().getGeneralSettings().getMessageThreadPools() : null);
        } catch (JMSException e) {
            throw new CoordinationLayerException("Unable to initialise connection to message bus", e);
        }
    }

    private void startListeningForMessages() {
        threadFactory.newThread(new Runnable() { // from class: org.bitrepository.protocol.activemq.ActiveMQMessageBus.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    ActiveMQMessageBus.this.connection.start();
                } catch (Exception e) {
                    throw new RuntimeException("Unable to start listening on the message bus", e);
                }
            }
        }).start();
    }

    @Override // org.bitrepository.protocol.messagebus.MessageBus
    public synchronized void addListener(String str, org.bitrepository.protocol.messagebus.MessageListener messageListener) {
        addListener(str, messageListener, false);
    }

    @Override // org.bitrepository.protocol.messagebus.MessageBus
    public synchronized void addListener(String str, org.bitrepository.protocol.messagebus.MessageListener messageListener, boolean z) {
        this.log.debug("Adding " + (z ? "durable " : "") + "listener '{}' to destination: '{}' on message-bus '{}'.", messageListener, str, this.configuration.getName());
        try {
            getMessageConsumer(str, messageListener, z).setMessageListener(new ActiveMQMessageListener(messageListener));
        } catch (JMSException e) {
            throw new CoordinationLayerException("Unable to add durable listener '" + messageListener + "' to destinationID '" + str + "'", e);
        }
    }

    @Override // org.bitrepository.protocol.messagebus.MessageBus
    public synchronized void removeListener(String str, org.bitrepository.protocol.messagebus.MessageListener messageListener) {
        this.log.debug("Removing listener '" + messageListener + "' from destination: '" + str + "' on message-bus '" + this.configuration + "'.");
        MessageConsumer messageConsumer = getMessageConsumer(str, messageListener, false);
        try {
            messageConsumer.setMessageListener(null);
            messageConsumer.close();
            this.consumers.remove(getConsumerHash(str, messageListener));
        } catch (JMSException e) {
            throw new CoordinationLayerException("Unable to remove listener '" + messageListener + "' from destinationID '" + str + "'", e);
        }
    }

    @Override // org.bitrepository.protocol.messagebus.MessageBus, java.lang.AutoCloseable
    public void close() throws JMSException {
        this.receivedMessageHandler.close();
        this.log.info("Closing message bus: " + this.configuration);
        this.producerSession.close();
        this.log.debug("Producer session closed.");
        this.consumerSession.close();
        this.log.debug("Consumer session closed.");
        this.connection.close();
        this.log.debug("Connection closed.");
    }

    @Override // org.bitrepository.protocol.messagebus.MessageSender
    public void sendMessage(org.bitrepository.bitrepositorymessages.Message message) {
        sendMessage(message.getDestination(), message.getReplyTo(), message.getTo(), message.getCollectionID(), message.getCorrelationID(), message);
        MessageLoggerProvider.getInstance().logMessageSent(message);
    }

    private synchronized void sendMessage(String str, String str2, String str3, String str4, String str5, org.bitrepository.bitrepositorymessages.Message message) {
        String str6 = null;
        try {
            str6 = this.jaxbHelper.serializeToXml(message);
            this.jaxbHelper.validate(new ByteArrayInputStream(str6.getBytes()));
            this.log.trace("The following message is sent to the destination '" + str + "' on message-bus '" + this.configuration.getName() + "': \n{}", str6);
            TextMessage createTextMessage = this.producerSession.createTextMessage(str6);
            createTextMessage.setStringProperty(MESSAGE_SIGNATURE_KEY, this.securityManager.signMessage(createTextMessage.getText()));
            createTextMessage.setStringProperty(MESSAGE_TYPE_KEY, message.getClass().getSimpleName());
            if (str3 != null) {
                createTextMessage.setStringProperty(MESSAGE_TO_KEY, str3);
            }
            createTextMessage.setStringProperty(COLLECTION_ID_KEY, str4);
            createTextMessage.setJMSCorrelationID(str5);
            createTextMessage.setJMSReplyTo(getDestination(str2, this.producerSession));
            this.producer.send(getDestination(str, this.producerSession), createTextMessage);
        } catch (SAXException e) {
            throw new CoordinationLayerException("Rejecting to send invalid message: " + str6, e);
        } catch (Exception e2) {
            throw new CoordinationLayerException("Could not send message", e2);
        }
    }

    private MessageConsumer getMessageConsumer(String str, org.bitrepository.protocol.messagebus.MessageListener messageListener, boolean z) {
        MessageConsumer createConsumer;
        String consumerHash = getConsumerHash(str, messageListener);
        this.log.debug("Retrieving message consumer on destination '" + str + "' for listener '" + messageListener + "'. Key: '" + consumerHash + "'.");
        if (!this.consumers.containsKey(consumerHash)) {
            this.log.debug("No consumer known. Creating new for key '" + consumerHash + "'.");
            Destination destination = getDestination(str, this.consumerSession);
            try {
                if (!z) {
                    createConsumer = this.consumerSession.createConsumer(destination);
                } else {
                    if (!(destination instanceof Topic)) {
                        throw new IllegalArgumentException("Can not create durable subscriber on " + str + " is is not a topic");
                    }
                    createConsumer = this.consumerSession.createDurableSubscriber((Topic) destination, this.clientID);
                }
                this.consumers.put(consumerHash, createConsumer);
            } catch (JMSException e) {
                throw new CoordinationLayerException("Could not create message consumer for destination '" + destination + '\"', e);
            }
        }
        return this.consumers.get(consumerHash);
    }

    private String getConsumerHash(String str, org.bitrepository.protocol.messagebus.MessageListener messageListener) {
        return str + CONSUMER_KEY_SEPARATOR + messageListener.hashCode();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v3, types: [javax.jms.Destination] */
    private Destination getDestination(String str, Session session) {
        Topic topic = this.destinations.get(str);
        if (topic == null) {
            try {
                String[] split = str.split("://");
                if (split.length == 1) {
                    topic = session.createTopic(str);
                } else if (split.length == 2) {
                    if (split[0].equals("topic")) {
                        topic = session.createTopic(split[1]);
                    } else if (split[0].equals("queue")) {
                        topic = session.createQueue(split[1]);
                    } else if (split[0].equals("temporary-queue")) {
                        topic = session.createTemporaryQueue();
                    } else {
                        if (!split[0].equals("temporary-topic")) {
                            throw new CoordinationLayerException("Unable to create destination '" + topic + "'. Unknown type.");
                        }
                        topic = session.createTemporaryTopic();
                    }
                }
                this.destinations.put(str, topic);
            } catch (JMSException e) {
                throw new CoordinationLayerException("Could not create destination '" + str + "'", e);
            }
        }
        return topic;
    }

    private void registerCustomMessageLoggers() {
        MessageLoggerProvider messageLoggerProvider = MessageLoggerProvider.getInstance();
        messageLoggerProvider.registerLogger(OperationType.GET_FILE, new GetFileMessageLogger());
        messageLoggerProvider.registerLogger(OperationType.PUT_FILE, new PutFileMessageLogger());
        messageLoggerProvider.registerLogger(OperationType.DELETE_FILE, new DeleteFileMessageLogger());
        messageLoggerProvider.registerLogger(OperationType.REPLACE_FILE, new GetStatusMessageLogger());
        messageLoggerProvider.registerLogger(OperationType.GET_FILE_IDS, new GetFileIDsMessageLogger());
        messageLoggerProvider.registerLogger(OperationType.GET_CHECKSUMS, new GetChecksumsMessageLogger());
        messageLoggerProvider.registerLogger(OperationType.GET_AUDIT_TRAILS, new GetAuditTrailsMessageLogger());
        messageLoggerProvider.registerLogger(OperationType.GET_STATUS, new GetStatusMessageLogger());
        messageLoggerProvider.registerLogger(Arrays.asList("AlarmMessage"), new AlarmMessageLogger());
    }

    @Override // org.bitrepository.protocol.messagebus.MessageBus
    public void setComponentFilter(List<String> list) {
        this.log.info("Settings component filter to: " + list);
        this.componentFilter.clear();
        this.componentFilter.addAll(list);
    }

    @Override // org.bitrepository.protocol.messagebus.MessageBus
    public void setCollectionFilter(List<String> list) {
        this.log.info("Settings collection filter to: " + list);
        this.collectionFilter.clear();
        this.collectionFilter.addAll(list);
    }
}
