package org.bitrepository.protocol.messagebus;

import java.math.BigInteger;
import java.util.Arrays;
import org.bitrepository.bitrepositorymessages.GetStatusRequest;
import org.bitrepository.bitrepositorymessages.Message;
import org.bitrepository.bitrepositorymessages.PutFileRequest;
import org.bitrepository.protocol.MessageContext;
import org.bitrepository.settings.referencesettings.MessageCategory;
import org.bitrepository.settings.referencesettings.MessageThreadPool;
import org.bitrepository.settings.referencesettings.MessageThreadPools;
import org.jaccept.structure.ExtendedTestCase;
import org.mockito.Mockito;
import org.testng.annotations.Test;

/* loaded from: input_file:org/bitrepository/protocol/messagebus/ReceivedMessageHandlerTest.class */
public class ReceivedMessageHandlerTest extends ExtendedTestCase {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/bitrepository/protocol/messagebus/ReceivedMessageHandlerTest$BlockingMessageListener.class */
    public class BlockingMessageListener implements MessageListener {
        final MessageListener listener;

        private BlockingMessageListener(MessageListener messageListener) {
            this.listener = messageListener;
        }

        public synchronized void onMessage(Message message, MessageContext messageContext) {
            try {
                wait(5000L);
            } catch (InterruptedException e) {
            }
            this.listener.onMessage(message, messageContext);
        }

        public synchronized void unblock() {
            notify();
        }
    }

    @Test(groups = {"regressiontest"})
    public void singleMessageDispatch() {
        addDescription("Tests that a single message is dispatched correctly");
        ReceivedMessageHandler receivedMessageHandler = new ReceivedMessageHandler((MessageThreadPools) null);
        MessageListener messageListener = (MessageListener) Mockito.mock(MessageListener.class);
        Message message = new Message();
        MessageContext messageContext = new MessageContext("fingerprint");
        receivedMessageHandler.deliver(messageListener, message, messageContext);
        ((MessageListener) Mockito.verify(messageListener, Mockito.timeout(100))).onMessage(message, messageContext);
    }

    @Test(groups = {"regressiontest"})
    public void parallelMessageDispatch() {
        addDescription("Tests that two messages can be handled in parallel in the default pool configuration.");
        addFixture("Create a ReceivedMessageHandler with a null configuration. This should create a ReceivedMessageHandler with a single CachedThreadPool.");
        ReceivedMessageHandler receivedMessageHandler = new ReceivedMessageHandler(createMessageThreadPools(createMessageThreadPool(2, null, null, null)));
        addStep("Dispatch messages to two listeners, the first blocking.", "The second listener should be notified");
        BlockingMessageListener blockingMessageListener = new BlockingMessageListener((MessageListener) Mockito.mock(MessageListener.class));
        MessageListener messageListener = (MessageListener) Mockito.mock(MessageListener.class);
        Message message = new Message();
        deliverAsynchronously(receivedMessageHandler, message, blockingMessageListener);
        deliverAsynchronously(receivedMessageHandler, message, messageListener);
        Mockito.verifyNoMoreInteractions(new Object[]{blockingMessageListener.listener});
        ((MessageListener) Mockito.verify(messageListener)).onMessage(message, (MessageContext) null);
        addStep("Unblock the first listener", "The blocking listener should now be notified.");
        blockingMessageListener.unblock();
        ((MessageListener) Mockito.verify(blockingMessageListener.listener, Mockito.timeout(100))).onMessage(message, (MessageContext) null);
        Mockito.verifyNoMoreInteractions(new Object[]{messageListener});
    }

    @Test(groups = {"regressiontest"})
    public void manyMessageDispatch() {
        addDescription("Tests that many (50) messages can be handled in parallel in the default pool configuration.");
        addFixture("Create a ReceivedMessageHandler with a null configuration. This should create a ReceivedMessageHandler with a single CachedThreadPool.");
        ReceivedMessageHandler receivedMessageHandler = new ReceivedMessageHandler((MessageThreadPools) null);
        addStep("Dispatch messages to 49 listeners, where the first 49 are blocking.", "The 50'th listener should be notified");
        BlockingMessageListener[] createBlockingMessageListeners = createBlockingMessageListeners(49);
        MessageListener messageListener = (MessageListener) Mockito.mock(MessageListener.class);
        Message message = new Message();
        deliverAsynchronously(receivedMessageHandler, message, createBlockingMessageListeners);
        deliverAsynchronously(receivedMessageHandler, message, messageListener);
        for (BlockingMessageListener blockingMessageListener : createBlockingMessageListeners) {
            Mockito.verifyNoMoreInteractions(new Object[]{blockingMessageListener.listener});
        }
        ((MessageListener) Mockito.verify(messageListener, Mockito.timeout(100))).onMessage(message, (MessageContext) null);
        addStep("Unblock the blocked listeners", "The remaining listener should now be notified.");
        for (BlockingMessageListener blockingMessageListener2 : createBlockingMessageListeners) {
            blockingMessageListener2.unblock();
            ((MessageListener) Mockito.verify(blockingMessageListener2.listener, Mockito.timeout(100))).onMessage(message, (MessageContext) null);
        }
        Mockito.verifyNoMoreInteractions(new Object[]{messageListener});
    }

    @Test(groups = {"regressiontest"})
    public void singleThreadMessageDispatch() {
        addDescription("Tests that two messages will be handled in sequence if a singleThreaded pool is configured.");
        addFixture("Create a ReceivedMessageHandler with a single pool of size 1.");
        ReceivedMessageHandler receivedMessageHandler = new ReceivedMessageHandler(createMessageThreadPools(createMessageThreadPool(1, null, null, null)));
        addStep("Dispatch messages to two listeners, the first blocking.", "The second listener should be not be notified");
        BlockingMessageListener blockingMessageListener = new BlockingMessageListener((MessageListener) Mockito.mock(MessageListener.class));
        MessageListener messageListener = (MessageListener) Mockito.mock(MessageListener.class);
        Message message = new Message();
        deliverAsynchronously(receivedMessageHandler, message, blockingMessageListener, messageListener);
        Mockito.verifyNoMoreInteractions(new Object[]{blockingMessageListener.listener, messageListener});
        addStep("Unblock the first listener", "Both listeners should now be notified.");
        blockingMessageListener.unblock();
        ((MessageListener) Mockito.verify(blockingMessageListener.listener, Mockito.timeout(100))).onMessage(message, (MessageContext) null);
        ((MessageListener) Mockito.verify(messageListener, Mockito.timeout(100))).onMessage(message, (MessageContext) null);
    }

    @Test(groups = {"regressiontest"})
    public void specificMessagePools() {
        addDescription("Tests that different message types can be handled by different executors.");
        addFixture("Create a ReceivedMessageHandler with a two pools, one for status requests and one for put requests. The put file pool should be limited to 1 thread.");
        ReceivedMessageHandler receivedMessageHandler = new ReceivedMessageHandler(createMessageThreadPools(createMessageThreadPool(1, null, null, new String[]{PutFileRequest.class.getSimpleName()}), createMessageThreadPool(null, null, null, new String[]{GetStatusRequest.class.getSimpleName()})));
        addStep("Dispatch two put messages, blocking on the processing of the first message.", "None of the messages should be processed.");
        BlockingMessageListener blockingMessageListener = new BlockingMessageListener((MessageListener) Mockito.mock(MessageListener.class));
        MessageListener messageListener = (MessageListener) Mockito.mock(MessageListener.class);
        MessageListener messageListener2 = (MessageListener) Mockito.mock(MessageListener.class);
        Message getStatusRequest = new GetStatusRequest();
        Message putFileRequest = new PutFileRequest();
        deliverAsynchronously(receivedMessageHandler, putFileRequest, blockingMessageListener, messageListener);
        Mockito.verifyNoMoreInteractions(new Object[]{blockingMessageListener.listener, messageListener});
        addStep("Dispatch a status request message.", "This should be processed.");
        deliverAsynchronously(receivedMessageHandler, getStatusRequest, messageListener2);
        ((MessageListener) Mockito.verify(messageListener2, Mockito.timeout(100))).onMessage(getStatusRequest, (MessageContext) null);
        addStep("Unblock the blocked put listener", "Both put messages should now be processed.");
        blockingMessageListener.unblock();
        ((MessageListener) Mockito.verify(blockingMessageListener.listener, Mockito.timeout(100))).onMessage(putFileRequest, (MessageContext) null);
        ((MessageListener) Mockito.verify(messageListener, Mockito.timeout(100))).onMessage(putFileRequest, (MessageContext) null);
        Mockito.verifyNoMoreInteractions(new Object[]{messageListener2});
    }

    @Test(groups = {"regressiontest"})
    public void specificMessageNamePoolAndDefaultPool() {
        addDescription("Tests it is possible to specify a pool for a specific message type, with a default pool for the remainder.");
        addFixture("Create a ReceivedMessageHandler with a one specific pool for put requests. The put file pool should be limited to 1 thread.");
        ReceivedMessageHandler receivedMessageHandler = new ReceivedMessageHandler(createMessageThreadPools(createMessageThreadPool(1, null, null, new String[]{PutFileRequest.class.getSimpleName()})));
        addStep("Dispatch two put messages, blocking on the processing of the first message.", "None of the messages should be processed.");
        BlockingMessageListener blockingMessageListener = new BlockingMessageListener((MessageListener) Mockito.mock(MessageListener.class));
        MessageListener messageListener = (MessageListener) Mockito.mock(MessageListener.class);
        MessageListener messageListener2 = (MessageListener) Mockito.mock(MessageListener.class);
        Message getStatusRequest = new GetStatusRequest();
        Message putFileRequest = new PutFileRequest();
        deliverAsynchronously(receivedMessageHandler, putFileRequest, blockingMessageListener, messageListener);
        Mockito.verifyNoMoreInteractions(new Object[]{blockingMessageListener.listener, messageListener});
        addStep("Dispatch a status request message.", "This should be processed.");
        deliverAsynchronously(receivedMessageHandler, getStatusRequest, messageListener2);
        ((MessageListener) Mockito.verify(messageListener2, Mockito.timeout(100))).onMessage(getStatusRequest, (MessageContext) null);
        addStep("Unblock the blocked put listener", "Both put messages should now be processed.");
        blockingMessageListener.unblock();
        ((MessageListener) Mockito.verify(blockingMessageListener.listener, Mockito.timeout(100))).onMessage(putFileRequest, (MessageContext) null);
        ((MessageListener) Mockito.verify(messageListener, Mockito.timeout(100))).onMessage(putFileRequest, (MessageContext) null);
        Mockito.verifyNoMoreInteractions(new Object[]{messageListener2});
    }

    @Test(groups = {"regressiontest"})
    public void specificMessageCategoryPoolAndDefaultPool() {
        addDescription("Tests it is possible to specify a pool for a specific message category, with a default pool for the remainder.");
        addFixture("Create a ReceivedMessageHandler with a one specific pool for slow requests. The put file pool should be limited to 1 thread.");
        ReceivedMessageHandler receivedMessageHandler = new ReceivedMessageHandler(createMessageThreadPools(createMessageThreadPool(1, null, MessageCategory.SLOW, null)));
        addStep("Dispatch two put messages, blocking on the processing of the first message.", "None of the messages should be processed.");
        BlockingMessageListener blockingMessageListener = new BlockingMessageListener((MessageListener) Mockito.mock(MessageListener.class));
        MessageListener messageListener = (MessageListener) Mockito.mock(MessageListener.class);
        MessageListener messageListener2 = (MessageListener) Mockito.mock(MessageListener.class);
        Message getStatusRequest = new GetStatusRequest();
        Message putFileRequest = new PutFileRequest();
        deliverAsynchronously(receivedMessageHandler, putFileRequest, blockingMessageListener, messageListener);
        Mockito.verifyNoMoreInteractions(new Object[]{blockingMessageListener.listener, messageListener});
        addStep("Dispatch a status request message.", "This should be processed.");
        deliverAsynchronously(receivedMessageHandler, getStatusRequest, messageListener2);
        ((MessageListener) Mockito.verify(messageListener2, Mockito.timeout(100))).onMessage(getStatusRequest, (MessageContext) null);
        addStep("Unblock the blocked put listener", "Both put messages should now be processed.");
        blockingMessageListener.unblock();
        ((MessageListener) Mockito.verify(blockingMessageListener.listener, Mockito.timeout(100))).onMessage(putFileRequest, (MessageContext) null);
        ((MessageListener) Mockito.verify(messageListener, Mockito.timeout(100))).onMessage(putFileRequest, (MessageContext) null);
        Mockito.verifyNoMoreInteractions(new Object[]{messageListener2});
    }

    @Test(groups = {"regressiontest"})
    public void specificCollectionPoolAndDefaultPool() {
        addDescription("Tests it is possible to specify a pool for a specific collection, with a default pool for the remainder.");
        addFixture("Create a ReceivedMessageHandler with a one specific pool for Collection1. The Collection1 pool should be limited to 1 thread.");
        ReceivedMessageHandler receivedMessageHandler = new ReceivedMessageHandler(createMessageThreadPools(createMessageThreadPool(1, new String[]{"Collection1"}, null, null)));
        addStep("Dispatch two messages for collection1, blocking on the processing of the first message.", "None of the messages should be processed.");
        BlockingMessageListener blockingMessageListener = new BlockingMessageListener((MessageListener) Mockito.mock(MessageListener.class));
        MessageListener messageListener = (MessageListener) Mockito.mock(MessageListener.class);
        MessageListener messageListener2 = (MessageListener) Mockito.mock(MessageListener.class);
        Message message = new Message();
        message.setCollectionID("Collection1");
        deliverAsynchronously(receivedMessageHandler, message, blockingMessageListener, messageListener);
        Mockito.verifyNoMoreInteractions(new Object[]{blockingMessageListener.listener, messageListener});
        addStep("Dispatch a message for the other collection.", "This should be processed.");
        Message message2 = new Message();
        message2.setCollectionID("OtherCollection");
        deliverAsynchronously(receivedMessageHandler, message2, messageListener2);
        ((MessageListener) Mockito.verify(messageListener2, Mockito.timeout(100))).onMessage(message2, (MessageContext) null);
        addStep("Dispatch a message with no collectionID.", "This should be processed.");
        Message message3 = new Message();
        deliverAsynchronously(receivedMessageHandler, message3, messageListener2);
        ((MessageListener) Mockito.verify(messageListener2, Mockito.timeout(100))).onMessage(message3, (MessageContext) null);
        addStep("Unblock the blocked collection1 listener", "Both collection1 messages should now be processed.");
        blockingMessageListener.unblock();
        ((MessageListener) Mockito.verify(blockingMessageListener.listener, Mockito.timeout(100))).onMessage(message, (MessageContext) null);
        ((MessageListener) Mockito.verify(messageListener, Mockito.timeout(100))).onMessage(message, (MessageContext) null);
    }

    @Test(groups = {"regressiontest"})
    public void specificCollectionPoolWithSpecificMessageTypePool() {
        addDescription("Tests it is possible to specify a pool for a specific collection for only a specificmessage type.");
        addFixture("Create a ReceivedMessageHandler with a one specific pool for Collection1 and PutFileRequests. The pool should be limited to 1 thread.");
        ReceivedMessageHandler receivedMessageHandler = new ReceivedMessageHandler(createMessageThreadPools(createMessageThreadPool(1, new String[]{"Collection1"}, null, new String[]{PutFileRequest.class.getSimpleName()})));
        addStep("Dispatch two putFileRequests for collection1, blocking on the processing of the first message.", "None of the messages should be processed.");
        BlockingMessageListener blockingMessageListener = new BlockingMessageListener((MessageListener) Mockito.mock(MessageListener.class));
        MessageListener messageListener = (MessageListener) Mockito.mock(MessageListener.class);
        MessageListener messageListener2 = (MessageListener) Mockito.mock(MessageListener.class);
        Message putFileRequest = new PutFileRequest();
        putFileRequest.setCollectionID("Collection1");
        deliverAsynchronously(receivedMessageHandler, putFileRequest, blockingMessageListener, messageListener);
        Mockito.verifyNoMoreInteractions(new Object[]{blockingMessageListener.listener, messageListener});
        addStep("Dispatch a putFile request for the other collection.", "This should be processed.");
        Message putFileRequest2 = new PutFileRequest();
        putFileRequest2.setCollectionID("OtherCollection");
        deliverAsynchronously(receivedMessageHandler, putFileRequest2, messageListener2);
        ((MessageListener) Mockito.verify(messageListener2, Mockito.timeout(100))).onMessage(putFileRequest2, (MessageContext) null);
        addStep("Dispatch a status request for collection1.", "This should be processed.");
        Message getStatusRequest = new GetStatusRequest();
        deliverAsynchronously(receivedMessageHandler, getStatusRequest, messageListener2);
        ((MessageListener) Mockito.verify(messageListener2, Mockito.timeout(100))).onMessage(getStatusRequest, (MessageContext) null);
        addStep("Unblock the blocked collection1 listener", "Both collection1 putFileRequests should now be processed.");
        blockingMessageListener.unblock();
        ((MessageListener) Mockito.verify(blockingMessageListener.listener, Mockito.timeout(100))).onMessage(putFileRequest, (MessageContext) null);
        ((MessageListener) Mockito.verify(messageListener, Mockito.timeout(100))).onMessage(putFileRequest, (MessageContext) null);
    }

    private BlockingMessageListener[] createBlockingMessageListeners(int i) {
        BlockingMessageListener[] blockingMessageListenerArr = new BlockingMessageListener[i];
        for (int i2 = 0; i2 < i; i2++) {
            blockingMessageListenerArr[i2] = new BlockingMessageListener((MessageListener) Mockito.mock(MessageListener.class));
        }
        return blockingMessageListenerArr;
    }

    private void deliverAsynchronously(ReceivedMessageHandler receivedMessageHandler, Message message, MessageListener... messageListenerArr) {
        for (MessageListener messageListener : messageListenerArr) {
            receivedMessageHandler.deliver(messageListener, message, (MessageContext) null);
        }
        try {
            Thread.sleep(100L);
        } catch (InterruptedException e) {
        }
    }

    private MessageThreadPools createMessageThreadPools(MessageThreadPool... messageThreadPoolArr) {
        MessageThreadPools messageThreadPools = new MessageThreadPools();
        messageThreadPools.getMessageThreadPool().addAll(Arrays.asList(messageThreadPoolArr));
        return messageThreadPools;
    }

    private MessageThreadPool createMessageThreadPool(Integer num, String[] strArr, MessageCategory messageCategory, String[] strArr2) {
        MessageThreadPool messageThreadPool = new MessageThreadPool();
        if (num != null) {
            messageThreadPool.setPoolSize(BigInteger.valueOf(num.intValue()));
        }
        if (strArr != null) {
            messageThreadPool.getCollection().addAll(Arrays.asList(strArr));
        }
        if (messageCategory != null) {
            messageThreadPool.setMessageCategory(messageCategory);
        }
        if (strArr2 != null) {
            messageThreadPool.getMessageName().addAll(Arrays.asList(strArr2));
        }
        return messageThreadPool;
    }
}
