package org.bitrepository.protocol.performancetest;

import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import org.bitrepository.bitrepositorymessages.AlarmMessage;
import org.bitrepository.bitrepositorymessages.Message;
import org.bitrepository.common.settings.Settings;
import org.bitrepository.common.settings.TestSettingsProvider;
import org.bitrepository.protocol.MessageContext;
import org.bitrepository.protocol.activemq.ActiveMQMessageBus;
import org.bitrepository.protocol.bus.LocalActiveMQBroker;
import org.bitrepository.protocol.bus.MessageBusConfigurationFactory;
import org.bitrepository.protocol.message.ExampleMessageFactory;
import org.bitrepository.protocol.messagebus.MessageBus;
import org.bitrepository.protocol.messagebus.MessageListener;
import org.bitrepository.protocol.security.DummySecurityManager;
import org.bitrepository.protocol.security.SecurityManager;
import org.bitrepository.settings.repositorysettings.MessageBusConfiguration;
import org.jaccept.structure.ExtendedTestCase;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/bitrepository/protocol/performancetest/MessageBusNumberOfListenersStressTest.class */
public class MessageBusNumberOfListenersStressTest extends ExtendedTestCase {
    private static final boolean WRITE_RESULTS_TO_FILE = false;
    private static final String OUTPUT_FILE_NAME = "NumberOfListeners-results.test";
    private static AlarmMessage alarmMessage;
    private static MessageBus bus;
    private Settings settings;
    private static String QUEUE = "TEST-LISTENERS";
    private static long DEFAULT_WAIT_TIME = 500;
    private static long TIME_FRAME = 60000;
    private static int NUMBER_OF_LISTENERS = 10;
    private static int idReached = -1;
    private static int messageReceived = 0;
    private static boolean sendMoreMessages = true;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/bitrepository/protocol/performancetest/MessageBusNumberOfListenersStressTest$NotificationMessageListener.class */
    public class NotificationMessageListener implements MessageListener {
        private final MessageBus bus;
        private int count = 0;

        public NotificationMessageListener(Settings settings, SecurityManager securityManager) {
            this.bus = new ActiveMQMessageBus(settings, securityManager);
            this.bus.addListener(MessageBusNumberOfListenersStressTest.QUEUE, this);
        }

        public void stop() {
            this.bus.removeListener(MessageBusNumberOfListenersStressTest.QUEUE, this);
        }

        public int getCount() {
            return this.count;
        }

        public void onMessage(Message message, MessageContext messageContext) {
            this.count++;
            MessageBusNumberOfListenersStressTest.handleMessageDistribution(Integer.parseInt(message.getCorrelationID()));
        }
    }

    @BeforeMethod
    public void initializeSettings() {
        this.settings = TestSettingsProvider.getSettings(getClass().getSimpleName());
    }

    @Test(groups = {"StressTest"})
    public void testManyListenersOnLocalMessageBus() throws Exception {
        addDescription("Tests how many messages can be handled within a given timeframe when a given number of listeners are receiving them.");
        addStep("Define constants", "This should not be possible to fail.");
        QUEUE += "-" + new Date().getTime();
        messageReceived = 0;
        idReached = -1;
        sendMoreMessages = true;
        addStep("Define the message to send.", "Should retrieve the Alarm message from examples and set the To.");
        alarmMessage = (AlarmMessage) ExampleMessageFactory.createMessage(AlarmMessage.class);
        alarmMessage.setDestination(QUEUE);
        addStep("Make configuration for the messagebus.", "Both should be created.");
        this.settings.getRepositorySettings().getProtocolSettings().setMessageBusConfiguration(MessageBusConfigurationFactory.createEmbeddedMessageBusConfiguration());
        DummySecurityManager dummySecurityManager = new DummySecurityManager();
        LocalActiveMQBroker localActiveMQBroker = new LocalActiveMQBroker(this.settings.getMessageBusConfiguration());
        try {
            addStep("Start the broker and initialise the listeners.", "Connections should be established.");
            localActiveMQBroker.start();
            bus = new ActiveMQMessageBus(this.settings, dummySecurityManager);
            testListeners(this.settings.getMessageBusConfiguration(), dummySecurityManager);
            if (localActiveMQBroker != null) {
                localActiveMQBroker.stop();
            }
        } catch (Throwable th) {
            if (localActiveMQBroker != null) {
                localActiveMQBroker.stop();
            }
            throw th;
        }
    }

    @Test(groups = {"StressTest"})
    public void testManyListenersOnDistributedMessageBus() throws Exception {
        addDescription("Tests how many messages can be handled within a given timeframe when a given number of listeners are receiving them.");
        addStep("Define constants", "This should not be possible to fail.");
        QUEUE += "-" + new Date().getTime();
        messageReceived = 0;
        idReached = -1;
        sendMoreMessages = true;
        addStep("Define the message to send.", "Should retrieve the Alarm message from examples and set the To.");
        alarmMessage = (AlarmMessage) ExampleMessageFactory.createMessage(AlarmMessage.class);
        alarmMessage.setDestination(QUEUE);
        addStep("Make configuration for the messagebus.", "Both should be created.");
        MessageBusConfiguration createDefaultConfiguration = MessageBusConfigurationFactory.createDefaultConfiguration();
        DummySecurityManager dummySecurityManager = new DummySecurityManager();
        addStep("Start the broker and initialise the listeners.", "Connections should be established.");
        bus = new ActiveMQMessageBus(this.settings, dummySecurityManager);
        testListeners(createDefaultConfiguration, dummySecurityManager);
    }

    public void testListeners(MessageBusConfiguration messageBusConfiguration, SecurityManager securityManager) throws Exception {
        ArrayList<NotificationMessageListener> arrayList = new ArrayList(NUMBER_OF_LISTENERS);
        try {
            addStep("Initialise the message listeners.", "Should be created and connected to the message bus.");
            for (int i = 0; i < NUMBER_OF_LISTENERS; i++) {
                arrayList.add(new NotificationMessageListener(this.settings, securityManager));
            }
            addStep("Wait for setup", "We wait!");
            synchronized (this) {
                try {
                    wait(DEFAULT_WAIT_TIME);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            addStep("Send the first message", "Message should be send.");
            sendMessageWithId(1);
            addStep("Wait for the timeframe on '" + TIME_FRAME + "' milliseconds.", "We wait!");
            synchronized (this) {
                try {
                    wait(TIME_FRAME);
                } catch (InterruptedException e2) {
                    e2.printStackTrace();
                }
            }
            addStep("Stop sending more messages and await all the messages to be received by all the listeners", "Should be Ok");
            sendMoreMessages = false;
            synchronized (this) {
                try {
                    wait(DEFAULT_WAIT_TIME);
                } catch (InterruptedException e3) {
                    e3.printStackTrace();
                }
            }
            addStep("Verifying the amount of message sent '" + idReached + "' has been received by all '" + NUMBER_OF_LISTENERS + "' listeners", "Should be the same amount for each listener, and the same amount as the correlation ID of the message");
            Assert.assertTrue(idReached * NUMBER_OF_LISTENERS == messageReceived, "Reached message Id " + idReached + " thus each message of the " + NUMBER_OF_LISTENERS + " listener should have received " + idReached + " message, though they have received " + messageReceived + " message all together.");
            for (NotificationMessageListener notificationMessageListener : arrayList) {
                Assert.assertTrue(notificationMessageListener.getCount() == idReached, "Should have received " + idReached + " messages, but has received " + notificationMessageListener.getCount());
            }
            if (arrayList != null) {
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    ((NotificationMessageListener) it.next()).stop();
                }
                arrayList.clear();
            }
            synchronized (this) {
                try {
                    wait(DEFAULT_WAIT_TIME);
                } catch (InterruptedException e4) {
                    e4.printStackTrace();
                }
            }
        } catch (Throwable th) {
            if (arrayList != null) {
                Iterator it2 = arrayList.iterator();
                while (it2.hasNext()) {
                    ((NotificationMessageListener) it2.next()).stop();
                }
                arrayList.clear();
            }
            synchronized (this) {
                try {
                    wait(DEFAULT_WAIT_TIME);
                } catch (InterruptedException e5) {
                    e5.printStackTrace();
                }
                throw th;
            }
        }
    }

    private static void sendMessageWithId(int i) {
        if (sendMoreMessages) {
            alarmMessage.setCorrelationID(i);
            bus.sendMessage(alarmMessage);
        }
    }

    public static synchronized void handleMessageDistribution(int i) {
        if (i > idReached) {
            idReached = i;
            sendMessageWithId(idReached + 1);
        }
        messageReceived++;
    }
}
