001/*
002 * #%L
003 * Netarchivesuite - common - test
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.common.distribute;
024
025import static org.junit.Assert.assertEquals;
026import static org.junit.Assert.assertFalse;
027import static org.junit.Assert.assertNotNull;
028import static org.junit.Assert.assertTrue;
029import static org.junit.Assert.fail;
030
031import java.io.Serializable;
032import java.lang.reflect.Method;
033import java.lang.reflect.Modifier;
034import java.util.Enumeration;
035import java.util.Map;
036
037import javax.jms.Destination;
038import javax.jms.JMSException;
039import javax.jms.MapMessage;
040import javax.jms.Message;
041import javax.jms.MessageConsumer;
042import javax.jms.MessageListener;
043import javax.jms.MessageProducer;
044import javax.jms.ObjectMessage;
045
046import org.junit.After;
047import org.junit.Before;
048import org.junit.Test;
049
050import dk.netarkivet.common.CommonSettings;
051import dk.netarkivet.common.exceptions.ArgumentNotValid;
052import dk.netarkivet.common.exceptions.NotImplementedException;
053import dk.netarkivet.common.exceptions.PermissionDenied;
054import dk.netarkivet.common.utils.Settings;
055import dk.netarkivet.testutils.preconfigured.MockupJMS;
056import dk.netarkivet.testutils.preconfigured.PreventSystemExit;
057import dk.netarkivet.testutils.preconfigured.ReloadSettings;
058
059/**
060 * Tests JMSConnection, the class that handles all JMS operations for Netarkivet.
061 */
062@SuppressWarnings({"unchecked", "rawtypes", "unused", "serial"})
063public class JMSConnectionTester {
064    private SecurityManager originalSecurityManager;
065
066    ReloadSettings rs = new ReloadSettings();
067    PreventSystemExit pse = new PreventSystemExit();
068    MockupJMS mj = new MockupJMS();
069
070    @Before
071    public void setUp() {
072        rs.setUp();
073        pse.setUp();
074        mj.setUp();
075    }
076
077    @After
078    public void tearDown() {
079        mj.tearDown();
080        pse.tearDown();
081        rs.tearDown();
082    }
083
084    /**
085     * Test that asking for a fake JMSConnection actually gets you just that.
086     */
087    @Test
088    public void testFakeJMSConnection() {
089        JMSConnectionMockupMQ.useJMSConnectionMockupMQ();
090
091        assertTrue("Fake JMS connection must be of type JMSConnectionMockupMQ",
092                JMSConnectionFactory.getInstance() instanceof JMSConnectionMockupMQ);
093    }
094
095    /**
096     * Tests for null parameters.
097     */
098    @Test
099    public void testUnpackParameterIsNull() {
100        Settings.set(CommonSettings.JMS_BROKER_CLASS, "dk.netarkivet.common.distribute.JMSConnectionMockupMQ");
101        try {
102            JMSConnection.unpack(null);
103            fail("Should throw an ArgumentNotValidException when given a " + "null parameter");
104        } catch (ArgumentNotValid e) {
105            // Expected
106        }
107    }
108
109    /**
110     * Tests for wrong parameters.
111     */
112    @Test
113    public void testUnpackParameterIsAnObjectMessage() {
114        Settings.set(CommonSettings.JMS_BROKER_CLASS, "dk.netarkivet.common.distribute.JMSConnectionMockupMQ");
115        try {
116            JMSConnection.unpack(new DummyMapMessage());
117            fail("Should throw an ArgumentNotValid exception when given a " + "wrong message type");
118        } catch (ArgumentNotValid e) {
119            // Expected
120        }
121    }
122
123    /**
124     * Tests for correct error handling if ObjectMessage has the wrong payload.
125     */
126    @Test
127    public void testUnpackInvalidPayload() {
128        Settings.set(CommonSettings.JMS_BROKER_CLASS, "dk.netarkivet.common.distribute.JMSConnectionMockupMQ");
129        try {
130            JMSConnection.unpack(new JMSConnectionMockupMQ.TestObjectMessage(new DummySerializableClass()));
131            fail("Should throw an ArgumentNotValidException when given a " + "wrong payload");
132        } catch (ArgumentNotValid e) {
133            // Expected
134        }
135    }
136
137    /**
138     * Tests if correct payload is unwrapped.
139     */
140    @Test
141    public void testUnpackOfCorrectPayload() {
142        Settings.set(CommonSettings.JMS_BROKER_CLASS, "dk.netarkivet.common.distribute.JMSConnectionMockupMQ");
143        String testID = "42";
144        TestMessage testMessage = new TestMessage(Channels.getTheRepos(), Channels.getTheBamon(), testID);
145        JMSConnectionMockupMQ.updateMsgID(testMessage, "ID89");
146        TestMessage msg = (TestMessage) JMSConnection.unpack(new JMSConnectionMockupMQ.TestObjectMessage(testMessage));
147        assertEquals("Unpacking should have given correct ID", msg.testID, testID);
148    }
149
150    /**
151     * Test resend() methods arguments.
152     */
153    @Test
154    public void testResendArgumentsNotNull() {
155        /*
156         * Check it is the correct resend method which is invoked and not an overloaded version in fx.
157         * JMSConnectionMockupMQ. Resend should be declared final.
158         */
159        Class parameterTypes[] = {NetarkivetMessage.class, ChannelID.class};
160        assertMethodIsFinal(JMSConnection.class, "resend", parameterTypes);
161
162        /*
163         * Set up JMSConnection and dummy receive servers.
164         */
165        Settings.set(CommonSettings.JMS_BROKER_CLASS, "dk.netarkivet.common.distribute.JMSConnectionMockupMQ");
166
167        /*
168         * Test if ArgumentNotValid is thrown if null is given as first parameter.
169         */
170        try {
171            JMSConnectionFactory.getInstance().resend(null, Channels.getError());
172            fail("Should throw ArgumentNotValid exception");
173        } catch (ArgumentNotValid e) {
174            // Expected
175        }
176
177        /*
178         * Test if ArgumentNotValid is thrown if null is given as second parameter
179         */
180        try {
181            JMSConnectionFactory.getInstance().resend(new TestMessage(Channels.getError(), Channels.getError(), ""),
182                    null);
183            fail("Should throw ArgumentNotValid exception");
184        } catch (ArgumentNotValid e) {
185            // Expected
186        }
187    }
188
189    /**
190     * Test the resend method. Message shouldn't be sent according to the address specified in the "to" field of the
191     * message. It should be sent to the address given in the "to" parameter of the resend() method.
192     */
193    @Test
194    public void testResendCorrectSendBehaviour() {
195        /**
196         * Check it is the correct resend method which is invoked and not an overloaded version in fx.
197         * JMSConnectionMockupMQ. Resend should be declared final.
198         */
199        Class parameterTypes[] = {NetarkivetMessage.class, ChannelID.class};
200        assertMethodIsFinal(JMSConnection.class, "resend", parameterTypes);
201
202        /**
203         * Set up JMSConnection and dummy receive servers.
204         */
205        Settings.set(CommonSettings.JMS_BROKER_CLASS, "dk.netarkivet.common.distribute.JMSConnectionMockupMQ");
206        JMSConnection con = JMSConnectionFactory.getInstance();
207
208        // Create dummy server and listen on the Error queue.
209        DummyServer serverErrorQueue = new DummyServer();
210        serverErrorQueue.reset();
211        con.setListener(Channels.getError(), serverErrorQueue);
212
213        // Create dummy server and listen on the TheArcrepos queue
214        DummyServer serverTheArcreposQueue = new DummyServer();
215        serverTheArcreposQueue.reset();
216        con.setListener(Channels.getTheRepos(), serverTheArcreposQueue);
217
218        // Create dummy server and listen on the TheArcrepos queue
219        DummyServer serverTheBamonQueue = new DummyServer();
220        serverTheBamonQueue.reset();
221        con.setListener(Channels.getTheBamon(), serverTheBamonQueue);
222
223        /**
224         * The actual test.
225         */
226        assertEquals("Server should not have received any messages", 0, serverErrorQueue.msgReceived);
227        assertEquals("Server should not have received any messages", 0, serverTheBamonQueue.msgReceived);
228        assertEquals("Server should not have received any messages", 0, serverTheArcreposQueue.msgReceived);
229
230        NetarkivetMessage msg = new TestMessage(Channels.getTheRepos(), Channels.getTheBamon(), "testMSG");
231        con.resend(msg, Channels.getError());
232
233        ((JMSConnectionMockupMQ) con).waitForConcurrentTasksToFinish();
234
235        assertEquals("Server should not have received any messages", 0, serverTheArcreposQueue.msgReceived);
236        assertEquals("Server should not have received any messages", 0, serverTheBamonQueue.msgReceived);
237        assertEquals("Server should have received 1 message", 1, serverErrorQueue.msgReceived);
238    }
239
240    /**
241     * Tests that initconnection actually starts a topic connection and a queue connection.
242     *
243     * @throws Exception On failures
244     */
245    @Test
246    public void testInitConnection() throws Exception {
247        /*
248         * Set up JMSConnection and dummy receive servers.
249         */
250        Settings.set(CommonSettings.JMS_BROKER_CLASS, "dk.netarkivet.common.distribute.JMSConnectionMockupMQ");
251        JMSConnection con = JMSConnectionFactory.getInstance();
252        assertTrue("Should have started the queue connection",
253                ((JMSConnectionMockupMQ.TestConnection) con.connection).isStarted);
254    }
255
256    @Test
257    public void testSendToQueue() throws JMSException, NoSuchFieldException, IllegalAccessException {
258        Settings.set(CommonSettings.JMS_BROKER_CLASS, "dk.netarkivet.common.distribute.JMSConnectionMockupMQ");
259        JMSConnection con = JMSConnectionFactory.getInstance();
260        con.initConnection();
261
262        ChannelID sendChannel = Channels.getTheRepos();
263        ChannelID replyChannel = Channels.getTheBamon();
264        NetarkivetMessage msg = new TestMessage(sendChannel, replyChannel, "testMSG");
265
266        con.send(msg);
267
268        String sendName = sendChannel.getName();
269
270        // Find message producer for queue.
271        JMSConnectionMockupMQ.TestMessageProducer queueSender = (JMSConnectionMockupMQ.TestMessageProducer) con.producers
272                .get(sendName);
273
274        assertNotNull("Should have created a sender for " + sendName, queueSender);
275        ObjectMessage sentSerialMsg = queueSender.messages.get(0);
276        assertNotNull("Should have a sent message", sentSerialMsg);
277        assertEquals("Received message should be the same as was sent", sentSerialMsg.getObject(), msg);
278        assertNotNull("Message should now have an id", msg.getID());
279    }
280
281    @Test
282    public void testSendToTopic() throws JMSException, NoSuchFieldException, IllegalAccessException {
283        Settings.set(CommonSettings.JMS_BROKER_CLASS, "dk.netarkivet.common.distribute.JMSConnectionMockupMQ");
284        JMSConnection con = JMSConnectionFactory.getInstance();
285        con.initConnection();
286
287        Map<String, MessageProducer> publishersMap = con.producers;
288
289        ChannelID sendChannel = Channels.getAllBa();
290        ChannelID replyChannel = Channels.getTheBamon();
291        NetarkivetMessage msg = new TestMessage(sendChannel, replyChannel, "testMSG");
292
293        con.send(msg);
294
295        String sendName = sendChannel.getName();
296
297        JMSConnectionMockupMQ.TestMessageProducer topicPublisher = (JMSConnectionMockupMQ.TestMessageProducer) publishersMap
298                .get(sendName);
299
300        assertNotNull("Should have created a publisher for " + sendName, topicPublisher);
301        ObjectMessage sentSerialMsg = topicPublisher.messages.get(0);
302        assertNotNull("Should have a published message", sentSerialMsg);
303        assertEquals("Received message should be the same as was published", sentSerialMsg.getObject(), msg);
304        assertNotNull("Message should now have an id", msg.getID());
305    }
306
307    @Test
308    public void testSetListener() throws JMSException, NoSuchFieldException, IllegalAccessException {
309        Settings.set(CommonSettings.JMS_BROKER_CLASS, "dk.netarkivet.common.distribute.JMSConnectionMockupMQ");
310        JMSConnection con = JMSConnectionFactory.getInstance();
311        con.initConnection();
312
313        MessageListener listener1 = new MessageListener() {
314            public void onMessage(Message message) {
315                throw new NotImplementedException("Not implemented");
316            }
317
318            public String toString() {
319                return "listener1";
320            }
321        };
322        MessageListener listener2 = new MessageListener() {
323            public void onMessage(Message message) {
324                throw new NotImplementedException("Not implemented");
325            }
326
327            public String toString() {
328                return "listener2";
329            }
330        };
331        ChannelID anyBa = Channels.getAnyBa();
332        con.setListener(anyBa, listener1);
333        Map<String, MessageConsumer> consumerMap = con.consumers;
334        MessageConsumer messageConsumer = consumerMap.get(anyBa.getName() + "##listener1");
335        assertEquals("Should have added listener for queue", listener1, messageConsumer.getMessageListener());
336
337        ChannelID allBa = Channels.getAllBa();
338        con.setListener(allBa, listener2);
339        messageConsumer = consumerMap.get(allBa.getName() + "##listener2");
340        assertEquals("Should have added listener for topic", listener2, messageConsumer.getMessageListener());
341
342        con.removeListener(anyBa, listener1);
343        assertFalse("Should have lost the listener", consumerMap.containsKey(anyBa.getName() + "##listener1"));
344        messageConsumer = consumerMap.get(allBa.getName() + "##listener2");
345        assertEquals("Should still have listener for topic", listener2, messageConsumer.getMessageListener());
346        con.setListener(anyBa, listener1);
347        assertEquals("Should have two listeners now", 2, consumerMap.size());
348    }
349
350    @Test
351    public void testGetConsumerKey() throws NoSuchMethodException, IllegalAccessException {
352        Settings.set(CommonSettings.JMS_BROKER_CLASS, "dk.netarkivet.common.distribute.JMSConnectionMockupMQ");
353        MessageListener listener = new MessageListener() {
354            public void onMessage(Message message) {
355                throw new NotImplementedException("Not implemented");
356            }
357
358            public String toString() {
359                return "ourListener";
360            }
361        };
362        assertEquals("Should have expected key for topic", Channels.getAllBa().getName() + "##" + listener.toString(),
363                JMSConnection.getConsumerKey(Channels.getAllBa().getName(), listener));
364        assertEquals("Should have expected key for queue",
365                Channels.getTheBamon().getName() + "##" + listener.toString(),
366                JMSConnection.getConsumerKey(Channels.getTheBamon().getName(), listener));
367    }
368
369    @Test
370    public void testReply() throws JMSException, NoSuchFieldException, IllegalAccessException {
371        Settings.set(CommonSettings.JMS_BROKER_CLASS, "dk.netarkivet.common.distribute.JMSConnectionMockupMQ");
372        JMSConnection con = JMSConnectionFactory.getInstance();
373
374        NetarkivetMessage msg = new TestMessage(Channels.getTheRepos(), Channels.getTheBamon(), "testMSG");
375
376        Map<String, MessageProducer> sendersMap = con.producers;
377
378        con.send(msg);
379        String sendName = Channels.getTheRepos().getName();
380        JMSConnectionMockupMQ.TestMessageProducer queueSender = (JMSConnectionMockupMQ.TestMessageProducer) sendersMap
381                .get(sendName);
382        ObjectMessage sentSerialMsg = (queueSender.messages.get(0));
383        NetarkivetMessage sentMessage = (NetarkivetMessage) sentSerialMsg.getObject();
384        sentMessage.setNotOk("Test error");
385        con.reply(sentMessage);
386
387        String replyName = Channels.getTheBamon().getName();
388        queueSender = (JMSConnectionMockupMQ.TestMessageProducer) sendersMap.get(replyName);
389        assertNotNull("Should have a sender for " + replyName, queueSender);
390
391        ObjectMessage receivedSerialMsg = queueSender.messages.get(0);
392        NetarkivetMessage received = (NetarkivetMessage) receivedSerialMsg.getObject();
393        assertEquals("Should have sent a message on " + queueSender, received, msg);
394        assertFalse("Message should now be notOk", received.isOk());
395
396        msg = new TestMessage(Channels.getTheRepos(), Channels.getTheBamon(), "testMSG");
397        try {
398            con.reply(msg);
399            fail("Shouldn't be able to reply to unsent message.");
400        } catch (PermissionDenied e) {
401            // expected - msg has not been sent.
402        }
403
404        try {
405            con.reply(null);
406            fail("Should not allow null messages");
407        } catch (ArgumentNotValid e) {
408            // expected
409        }
410    }
411
412    private void assertMethodIsFinal(Class aClass, String name, Class[] parameterTypes) {
413        try {
414            Method m = aClass.getMethod(name, parameterTypes);
415            assertTrue(name + "() in JMSConnection is not declared final!", Modifier.isFinal(m.getModifiers()));
416        } catch (Exception e) {
417            fail("Method " + name + " in JMSConnection doesn't exist!");
418        }
419    }
420
421    public static class DummyServer implements MessageListener {
422        public int msgOK = 0;
423        public int msgNotOK = 0;
424        public int msgReceived = 0;
425
426        /*
427         * (non-Javadoc)
428         * 
429         * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
430         */
431        public void onMessage(Message msg) {
432            NetarkivetMessage netMsg = JMSConnection.unpack(msg);
433            msgReceived = (netMsg.isOk() ? ++msgOK : ++msgNotOK);
434        }
435
436        public void reset() {
437            msgOK = 0;
438            msgNotOK = 0;
439            msgReceived = 0;
440        }
441    }
442
443    private static class DummyMapMessage implements MapMessage {
444
445        public boolean getBoolean(String arg0) throws JMSException {
446            return false;
447        }
448
449        public byte getByte(String arg0) throws JMSException {
450            return 0;
451        }
452
453        public short getShort(String arg0) throws JMSException {
454            return 0;
455        }
456
457        public char getChar(String arg0) throws JMSException {
458            return 0;
459        }
460
461        public int getInt(String arg0) throws JMSException {
462            return 0;
463        }
464
465        public long getLong(String arg0) throws JMSException {
466            return 0;
467        }
468
469        public float getFloat(String arg0) throws JMSException {
470            return 0;
471        }
472
473        public double getDouble(String arg0) throws JMSException {
474            return 0;
475        }
476
477        public String getString(String arg0) throws JMSException {
478            return null;
479        }
480
481        public byte[] getBytes(String arg0) throws JMSException {
482            return null;
483        }
484
485        public Object getObject(String arg0) throws JMSException {
486            return null;
487        }
488
489        public Enumeration getMapNames() throws JMSException {
490            return null;
491        }
492
493        public void setBoolean(String arg0, boolean arg1) throws JMSException {
494        }
495
496        public void setByte(String arg0, byte arg1) throws JMSException {
497        }
498
499        public void setShort(String arg0, short arg1) throws JMSException {
500        }
501
502        public void setChar(String arg0, char arg1) throws JMSException {
503        }
504
505        public void setInt(String arg0, int arg1) throws JMSException {
506        }
507
508        public void setLong(String arg0, long arg1) throws JMSException {
509        }
510
511        public void setFloat(String arg0, float arg1) throws JMSException {
512        }
513
514        public void setDouble(String arg0, double arg1) throws JMSException {
515        }
516
517        public void setString(String arg0, String arg1) throws JMSException {
518        }
519
520        public void setBytes(String arg0, byte[] arg1) throws JMSException {
521        }
522
523        public void setBytes(String arg0, byte[] arg1, int arg2, int arg3) throws JMSException {
524        }
525
526        public void setObject(String arg0, Object arg1) throws JMSException {
527        }
528
529        public boolean itemExists(String arg0) throws JMSException {
530            return false;
531        }
532
533        public String getJMSMessageID() throws JMSException {
534            return null;
535        }
536
537        public void setJMSMessageID(String arg0) throws JMSException {
538        }
539
540        public long getJMSTimestamp() throws JMSException {
541            return 0;
542        }
543
544        public void setJMSTimestamp(long arg0) throws JMSException {
545        }
546
547        public byte[] getJMSCorrelationIDAsBytes() throws JMSException {
548            return null;
549        }
550
551        public void setJMSCorrelationIDAsBytes(byte[] arg0) throws JMSException {
552        }
553
554        public void setJMSCorrelationID(String arg0) throws JMSException {
555        }
556
557        public String getJMSCorrelationID() throws JMSException {
558            return null;
559        }
560
561        public Destination getJMSReplyTo() throws JMSException {
562            return null;
563        }
564
565        public void setJMSReplyTo(Destination arg0) throws JMSException {
566        }
567
568        public Destination getJMSDestination() throws JMSException {
569            return null;
570        }
571
572        public void setJMSDestination(Destination arg0) throws JMSException {
573        }
574
575        public int getJMSDeliveryMode() throws JMSException {
576            return 0;
577        }
578
579        public void setJMSDeliveryMode(int arg0) throws JMSException {
580        }
581
582        public boolean getJMSRedelivered() throws JMSException {
583            return false;
584        }
585
586        public void setJMSRedelivered(boolean arg0) throws JMSException {
587        }
588
589        public String getJMSType() throws JMSException {
590            return null;
591        }
592
593        public void setJMSType(String arg0) throws JMSException {
594        }
595
596        public long getJMSExpiration() throws JMSException {
597            return 0;
598        }
599
600        public void setJMSExpiration(long arg0) throws JMSException {
601        }
602
603        public int getJMSPriority() throws JMSException {
604            return 0;
605        }
606
607        public void setJMSPriority(int arg0) throws JMSException {
608        }
609
610        public void clearProperties() throws JMSException {
611        }
612
613        public boolean propertyExists(String arg0) throws JMSException {
614            return false;
615        }
616
617        public boolean getBooleanProperty(String arg0) throws JMSException {
618            return false;
619        }
620
621        public byte getByteProperty(String arg0) throws JMSException {
622            return 0;
623        }
624
625        public short getShortProperty(String arg0) throws JMSException {
626            return 0;
627        }
628
629        public int getIntProperty(String arg0) throws JMSException {
630            return 0;
631        }
632
633        public long getLongProperty(String arg0) throws JMSException {
634            return 0;
635        }
636
637        public float getFloatProperty(String arg0) throws JMSException {
638            return 0;
639        }
640
641        public double getDoubleProperty(String arg0) throws JMSException {
642            return 0;
643        }
644
645        public String getStringProperty(String arg0) throws JMSException {
646            return null;
647        }
648
649        public Object getObjectProperty(String arg0) throws JMSException {
650            return null;
651        }
652
653        public Enumeration getPropertyNames() throws JMSException {
654            return null;
655        }
656
657        public void setBooleanProperty(String arg0, boolean arg1) throws JMSException {
658        }
659
660        public void setByteProperty(String arg0, byte arg1) throws JMSException {
661        }
662
663        public void setShortProperty(String arg0, short arg1) throws JMSException {
664        }
665
666        public void setIntProperty(String arg0, int arg1) throws JMSException {
667        }
668
669        public void setLongProperty(String arg0, long arg1) throws JMSException {
670        }
671
672        public void setFloatProperty(String arg0, float arg1) throws JMSException {
673        }
674
675        public void setDoubleProperty(String arg0, double arg1) throws JMSException {
676        }
677
678        public void setStringProperty(String arg0, String arg1) throws JMSException {
679        }
680
681        public void setObjectProperty(String arg0, Object arg1) throws JMSException {
682        }
683
684        public void acknowledge() throws JMSException {
685        }
686
687        public void clearBody() throws JMSException {
688        }
689
690    }
691
692    private static class DummySerializableClass implements Serializable {
693
694    }
695
696    private static class TestMessage extends NetarkivetMessage {
697        private String testID;
698
699        public TestMessage(ChannelID to, ChannelID replyTo, String testID) {
700            super(to, replyTo);
701            this.testID = testID;
702        }
703
704        public String getTestID() {
705            return testID;
706        }
707    }
708}