001/* 002 * #%L 003 * Netarchivesuite - common 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.common.distribute; 024 025import java.util.Hashtable; 026 027import javax.jms.Message; 028import javax.jms.MessageListener; 029 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033import dk.netarkivet.common.exceptions.ArgumentNotValid; 034import dk.netarkivet.common.exceptions.IOFailure; 035 036/** 037 * Converts an asynchronous call to a synchronous call. The method sendAndWaitForOneReply() is a blocking call which 038 * responds when a reply is received or returns null on timeout. 039 */ 040public class Synchronizer implements MessageListener { 041 042 private static final Logger log = LoggerFactory.getLogger(Synchronizer.class); 043 044 /** Collection containing messages on which a reply is awaited. */ 045 private Hashtable<String, NetarkivetMessage> requests; 046 047 /** Collection containing reply messages which have not yet been returned to the caller. */ 048 private Hashtable<String, NetarkivetMessage> replies; 049 050 /** 051 * Initialise maps containing requests and replies. 052 */ 053 public Synchronizer() { 054 requests = new Hashtable<String, NetarkivetMessage>(); 055 replies = new Hashtable<String, NetarkivetMessage>(); 056 } 057 058 /** 059 * Receives replies from a message queue and triggers the blocked call in sendAndWaitForOneReply(). 060 * 061 * @param msg an ObjectMessage containing a NetarkivetMessage. 062 */ 063 public void onMessage(Message msg) { 064 ArgumentNotValid.checkNotNull(msg, "msg"); 065 NetarkivetMessage naMsg = JMSConnection.unpack(msg); 066 NetarkivetMessage requestMsg; 067 synchronized (requests) { 068 requestMsg = requests.get(naMsg.getReplyOfId()); 069 } 070 if (requestMsg != null) { 071 synchronized (requestMsg) { 072 replies.put(naMsg.getReplyOfId(), naMsg); 073 requestMsg.notifyAll(); 074 } 075 } else { 076 log.warn("Received unexpected reply for unknown message '{}' of type '{}'. Ignored!!: {}", 077 naMsg.getReplyOfId(), naMsg.getClass().getName(), naMsg.toString()); 078 } 079 } 080 081 /** 082 * Sends a message to a message queue and blocks the method invocation until a reply arrives. If it times out a null 083 * is returned. If a spurious wakeup is received and a timeout is set, the method will carry on waiting for the 084 * reply until the total timeout time has been used up. If a spurious wakeup is received and no timeout is set the 085 * method will just go back to waiting 086 * 087 * @param msg the request message 088 * @param timeout the timeout in milliseconds (or zero for no timeout) 089 * @return a reply message from the receiver of the request or null if timed out. 090 */ 091 public NetarkivetMessage sendAndWaitForOneReply(NetarkivetMessage msg, long timeout) { 092 ArgumentNotValid.checkNotNull(msg, "msg"); 093 boolean noTimeout = (timeout == 0); 094 JMSConnection con = JMSConnectionFactory.getInstance(); 095 synchronized (msg) { 096 synchronized (requests) { 097 con.send(msg); 098 requests.put(msg.getID(), msg); 099 } 100 try { 101 while (!replies.containsKey(msg.getID())) { 102 long timeBeforeWait = System.currentTimeMillis(); 103 msg.wait(timeout); 104 synchronized (requests) { 105 if (!replies.containsKey(msg.getID())) { 106 // At this point we either got an unexpected wakeup 107 // or timed out 108 long timeAfterWait = System.currentTimeMillis(); 109 // the new timeout value 110 timeout -= timeAfterWait - timeBeforeWait; 111 if (noTimeout || timeout > 0) { // Unexpected wakeup 112 log.debug("Unexpected wakeup for {}", msg.toString()); 113 } else { 114 // timed out 115 // NB! if timeout is exactly zero here then this 116 // counts as a timeout. Otherwise we would call 117 // wait(0) on the next loop with disastrous 118 // results 119 requests.remove(msg.getID()); 120 log.debug("Timed out waiting for reply to {}", msg.toString()); 121 return null; 122 } 123 } 124 } 125 } 126 } catch (InterruptedException e) { 127 throw new IOFailure("Interrupted while waiting for reply to " + msg, e); 128 } 129 } 130 // If we get here, we must have received the expected reply 131 synchronized (requests) { 132 requests.remove(msg.getID()); 133 log.debug("Received reply for message: {}", msg.toString()); 134 return replies.remove(msg.getID()); 135 } 136 } 137 138}