package org.bitrepository.protocol.performancetest;

import java.util.Date;
import org.bitrepository.bitrepositorymessages.AlarmMessage;
import org.bitrepository.protocol.LocalActiveMQBroker;
import org.bitrepository.protocol.activemq.ActiveMQMessageBus;
import org.bitrepository.protocol.bus.MessageBusConfigurationFactory;
import org.bitrepository.protocol.message.ExampleMessageFactory;
import org.bitrepository.protocol.messagebus.AbstractMessageListener;
import org.bitrepository.protocol.messagebus.MessageBus;
import org.bitrepository.protocol.security.DummySecurityManager;
import org.bitrepository.protocol.security.SecurityManager;
import org.bitrepository.settings.collectionsettings.MessageBusConfiguration;
import org.jaccept.structure.ExtendedTestCase;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/bitrepository/protocol/performancetest/MessageBusTimeToSendMessagesStressTest.class */
public class MessageBusTimeToSendMessagesStressTest extends ExtendedTestCase {
    static final int TIME_FOR_MESSAGE_TRANSFER_WAIT = 500;
    private static String QUEUE = "TEST-QUEUE";
    private static int NUMBER_OF_MESSAGES = 1000;
    private static int NUMBER_OF_SENDERS = 10;
    private static Date startSending;

    /* loaded from: input_file:org/bitrepository/protocol/performancetest/MessageBusTimeToSendMessagesStressTest$CountMessagesListener.class */
    private class CountMessagesListener extends AbstractMessageListener {
        private final MessageBus bus;
        private Date stopSending;
        private boolean awaitingMore = true;
        private int count = 0;

        public CountMessagesListener(MessageBusConfiguration messageBusConfiguration, SecurityManager securityManager) {
            this.bus = new ActiveMQMessageBus(messageBusConfiguration, securityManager);
            this.bus.addListener(MessageBusTimeToSendMessagesStressTest.QUEUE, this);
        }

        public void stop() {
            this.bus.removeListener(MessageBusTimeToSendMessagesStressTest.QUEUE, this);
        }

        public int getCount() {
            return this.count;
        }

        public void onMessage(AlarmMessage alarmMessage) {
            this.count++;
            if (this.count >= MessageBusTimeToSendMessagesStressTest.NUMBER_OF_MESSAGES) {
                this.stopSending = new Date();
                this.awaitingMore = false;
            }
        }

        public Date getStopSending() {
            return this.stopSending;
        }

        public boolean isFinished() {
            return !this.awaitingMore;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/bitrepository/protocol/performancetest/MessageBusTimeToSendMessagesStressTest$MessageSenderThread.class */
    public class MessageSenderThread extends Thread {
        private final MessageBus bus;
        private final int numberOfMessages;
        private final String id;

        public MessageSenderThread(MessageBusConfiguration messageBusConfiguration, SecurityManager securityManager, int i, String str) {
            this.bus = new ActiveMQMessageBus(messageBusConfiguration, securityManager);
            this.numberOfMessages = i;
            this.id = str;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                AlarmMessage alarmMessage = (AlarmMessage) ExampleMessageFactory.createMessage(AlarmMessage.class);
                alarmMessage.setTo(MessageBusTimeToSendMessagesStressTest.QUEUE);
                for (int i = 0; i < this.numberOfMessages; i++) {
                    alarmMessage.setCorrelationID(this.id + ":" + i);
                    this.bus.sendMessage(alarmMessage);
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    public void SendManyMessagesDistributed() throws Exception {
        addDescription("Tests how fast a given number of messages can be handled.");
        addStep("Define constants", "This should not be possible to fail.");
        QUEUE += "-" + new Date().getTime();
        addStep("Make configuration for the messagebus.", "Both should be created.");
        MessageBusConfiguration createDefaultConfiguration = MessageBusConfigurationFactory.createDefaultConfiguration();
        DummySecurityManager dummySecurityManager = new DummySecurityManager();
        CountMessagesListener countMessagesListener = null;
        try {
            addStep("Initialise the messagelistener", "Should be allowed.");
            countMessagesListener = new CountMessagesListener(createDefaultConfiguration, dummySecurityManager);
            startSending = new Date();
            addStep("Start sending at '" + startSending + "'", "Should just be waiting.");
            sendAllTheMessages(createDefaultConfiguration, dummySecurityManager);
            addStep("Sleept untill the listerner has received all the messages.", "Should be sleeping.");
            while (!countMessagesListener.isFinished()) {
                synchronized (this) {
                    try {
                        wait(500L);
                    } catch (InterruptedException e) {
                    }
                }
            }
            Date stopSending = countMessagesListener.getStopSending();
            addStep("Validating the count. Started at '" + startSending + "' and ended at '" + stopSending + "'", "Should not be wrong.");
            System.out.println("Sent '" + countMessagesListener.getCount() + "' messages in '" + ((stopSending.getTime() - startSending.getTime()) / 1000) + "' seconds.");
            if (countMessagesListener != null) {
                countMessagesListener.stop();
            }
        } catch (Throwable th) {
            if (countMessagesListener != null) {
                countMessagesListener.stop();
            }
            throw th;
        }
    }

    @Test(groups = {"StressTest"})
    public void SendManyMessagesLocally() throws Exception {
        addDescription("Tests how many messages can be handled within a given timeframe.");
        addStep("Define constants", "This should not be possible to fail.");
        QUEUE += "-" + new Date().getTime();
        addStep("Make configuration for the messagebus and define the local broker.", "Both should be created.");
        MessageBusConfiguration createEmbeddedMessageBusConfiguration = MessageBusConfigurationFactory.createEmbeddedMessageBusConfiguration();
        Assert.assertNotNull(createEmbeddedMessageBusConfiguration);
        LocalActiveMQBroker localActiveMQBroker = new LocalActiveMQBroker(createEmbeddedMessageBusConfiguration);
        Assert.assertNotNull(localActiveMQBroker);
        CountMessagesListener countMessagesListener = null;
        DummySecurityManager dummySecurityManager = new DummySecurityManager();
        try {
            addStep("Starting the broker.", "Should be allowed");
            localActiveMQBroker.start();
            addStep("Initialise the messagelistener", "Should be allowed.");
            countMessagesListener = new CountMessagesListener(createEmbeddedMessageBusConfiguration, dummySecurityManager);
            startSending = new Date();
            addStep("Start sending at '" + startSending + "'", "Should just be waiting.");
            sendAllTheMessages(createEmbeddedMessageBusConfiguration, dummySecurityManager);
            addStep("Sleep untill the listerner has received all the messages.", "Should be sleeping.");
            long time = new Date().getTime();
            while (!countMessagesListener.isFinished() && new Date().getTime() - time < 60000) {
                synchronized (this) {
                    try {
                        wait(500L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            addStep("Validating the count. Started at '" + startSending + "' and ended at '" + countMessagesListener.getStopSending() + "'", "Should not be wrong.");
            System.out.println("Sent '" + countMessagesListener.getCount() + "' messages in '" + ((countMessagesListener.getStopSending().getTime() - startSending.getTime()) / 1000) + "' seconds.");
            if (countMessagesListener != null) {
                countMessagesListener.stop();
            }
            if (localActiveMQBroker != null) {
                localActiveMQBroker.stop();
            }
        } catch (Throwable th) {
            if (countMessagesListener != null) {
                countMessagesListener.stop();
            }
            if (localActiveMQBroker != null) {
                localActiveMQBroker.stop();
            }
            throw th;
        }
    }

    private void sendAllTheMessages(MessageBusConfiguration messageBusConfiguration, SecurityManager securityManager) throws Exception {
        for (int i = 0; i < NUMBER_OF_SENDERS; i++) {
            new MessageSenderThread(messageBusConfiguration, securityManager, NUMBER_OF_MESSAGES / NUMBER_OF_SENDERS, "#" + i).start();
        }
    }
}
