001/*
002 * #%L
003 * Netarchivesuite - common
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.common.distribute;
024
025import java.util.Hashtable;
026
027import javax.jms.Message;
028import javax.jms.MessageListener;
029
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033import dk.netarkivet.common.exceptions.ArgumentNotValid;
034import dk.netarkivet.common.exceptions.IOFailure;
035
036/**
037 * Converts an asynchronous call to a synchronous call. The method sendAndWaitForOneReply() is a blocking call which
038 * responds when a reply is received or returns null on timeout.
039 */
040public class Synchronizer implements MessageListener {
041
042    private static final Logger log = LoggerFactory.getLogger(Synchronizer.class);
043
044    /** Collection containing messages on which a reply is awaited. */
045    private Hashtable<String, NetarkivetMessage> requests;
046
047    /** Collection containing reply messages which have not yet been returned to the caller. */
048    private Hashtable<String, NetarkivetMessage> replies;
049
050    /**
051     * Initialise maps containing requests and replies.
052     */
053    public Synchronizer() {
054        requests = new Hashtable<String, NetarkivetMessage>();
055        replies = new Hashtable<String, NetarkivetMessage>();
056    }
057
058    /**
059     * Receives replies from a message queue and triggers the blocked call in sendAndWaitForOneReply().
060     *
061     * @param msg an ObjectMessage containing a NetarkivetMessage.
062     */
063    public void onMessage(Message msg) {
064        ArgumentNotValid.checkNotNull(msg, "msg");
065        NetarkivetMessage naMsg = JMSConnection.unpack(msg);
066        NetarkivetMessage requestMsg;
067        synchronized (requests) {
068            requestMsg = requests.get(naMsg.getReplyOfId());
069        }
070        if (requestMsg != null) {
071            synchronized (requestMsg) {
072                replies.put(naMsg.getReplyOfId(), naMsg);
073                requestMsg.notifyAll();
074            }
075        } else {
076            log.warn("Received unexpected reply for unknown message '{}' of type '{}'. Ignored!!: {}",
077                    naMsg.getReplyOfId(), naMsg.getClass().getName(), naMsg.toString());
078        }
079    }
080
081    /**
082     * Sends a message to a message queue and blocks the method invocation until a reply arrives. If it times out a null
083     * is returned. If a spurious wakeup is received and a timeout is set, the method will carry on waiting for the
084     * reply until the total timeout time has been used up. If a spurious wakeup is received and no timeout is set the
085     * method will just go back to waiting
086     *
087     * @param msg the request message
088     * @param timeout the timeout in milliseconds (or zero for no timeout)
089     * @return a reply message from the receiver of the request or null if timed out.
090     */
091    public NetarkivetMessage sendAndWaitForOneReply(NetarkivetMessage msg, long timeout) {
092        ArgumentNotValid.checkNotNull(msg, "msg");
093        boolean noTimeout = (timeout == 0);
094        JMSConnection con = JMSConnectionFactory.getInstance();
095        synchronized (msg) {
096            synchronized (requests) {
097                con.send(msg);
098                requests.put(msg.getID(), msg);
099            }
100            try {
101                while (!replies.containsKey(msg.getID())) {
102                    long timeBeforeWait = System.currentTimeMillis();
103                    msg.wait(timeout);
104                    synchronized (requests) {
105                        if (!replies.containsKey(msg.getID())) {
106                            // At this point we either got an unexpected wakeup
107                            // or timed out
108                            long timeAfterWait = System.currentTimeMillis();
109                            // the new timeout value
110                            timeout -= timeAfterWait - timeBeforeWait;
111                            if (noTimeout || timeout > 0) { // Unexpected wakeup
112                                log.debug("Unexpected wakeup for {}", msg.toString());
113                            } else {
114                                // timed out
115                                // NB! if timeout is exactly zero here then this
116                                // counts as a timeout. Otherwise we would call
117                                // wait(0) on the next loop with disastrous
118                                // results
119                                requests.remove(msg.getID());
120                                log.debug("Timed out waiting for reply to {}", msg.toString());
121                                return null;
122                            }
123                        }
124                    }
125                }
126            } catch (InterruptedException e) {
127                throw new IOFailure("Interrupted while waiting for reply to " + msg, e);
128            }
129        }
130        // If we get here, we must have received the expected reply
131        synchronized (requests) {
132            requests.remove(msg.getID());
133            log.debug("Received reply for message: {}", msg.toString());
134            return replies.remove(msg.getID());
135        }
136    }
137
138}