001package dk.netarkivet.harvester.scheduler;
002
003import java.util.Enumeration;
004
005import javax.jms.JMSException;
006import javax.jms.QueueBrowser;
007import javax.jms.QueueSession;
008
009import org.slf4j.Logger;
010import org.slf4j.LoggerFactory;
011
012import dk.netarkivet.common.distribute.ChannelID;
013import dk.netarkivet.common.distribute.JMSConnection;
014import dk.netarkivet.common.distribute.JMSConnectionFactory;
015
016/**
017 * Helper class to test the status of the number of submitted jobs on our JMS Queues.
018 * Uses the same QueueSession for all calls to getCount() to avoid memory-leak caused by accumulation
019 * of imqConsumerReader threads.
020 */
021public class QueueController {
022
023    /** The logger to use. */
024    private static final Logger log = LoggerFactory.getLogger(QueueController.class);
025    
026    /** Connection to JMS provider. */
027    private JMSConnection jmsConnection; 
028    /** The current qSession. */
029    QueueSession qSession = null;
030    
031    public QueueController() {
032        this.jmsConnection = JMSConnectionFactory.getInstance();
033    }
034    
035    /**
036     * Retrieve the number of current messages defined by the given queueID. 
037     * @param queueID a given QueueID
038     * @return the number of current messages defined by the given queueID
039     */
040    synchronized int getCount(ChannelID queueID) {
041        QueueBrowser qBrowser = null;
042        int submittedCounter = 0;
043        try {
044            if (jmsConnection == null) {
045                jmsConnection = JMSConnectionFactory.getInstance();
046                log.info("Fetched a new JMSConnection from the factory");
047            }
048            if (qSession == null) {
049                qSession = jmsConnection.getQueueSession();
050                log.info("Created a new QueueSession");
051            }
052            qBrowser = jmsConnection.createQueueBrowser(queueID, qSession);
053            Enumeration msgs = qBrowser.getEnumeration();
054
055            if ( !msgs.hasMoreElements() ) {
056                return 0;
057            } else { 
058                while (msgs.hasMoreElements()) { 
059                    msgs.nextElement();
060                    submittedCounter++;
061                }
062            }
063        } catch (JMSException e) {
064            log.warn("JMSException thrown: ", e);
065            jmsConnection = null; 
066            qSession = null;
067        } catch (Exception e1) {
068            log.warn("Unexpected exception of type {} thrown: ", e1.getClass().getName(), e1);
069        } finally {
070            if (qBrowser != null) {
071                try {
072                    qBrowser.close();
073                } catch (JMSException e) {
074                    log.debug("JMSException while trying to close the qBrowser.", e);
075                }
076                qBrowser = null;
077            }
078        }
079
080        return submittedCounter;
081    }
082}