001package dk.netarkivet.harvester.scheduler; 002 003import java.util.Enumeration; 004 005import javax.jms.JMSException; 006import javax.jms.QueueBrowser; 007import javax.jms.QueueSession; 008 009import org.slf4j.Logger; 010import org.slf4j.LoggerFactory; 011 012import dk.netarkivet.common.distribute.ChannelID; 013import dk.netarkivet.common.distribute.JMSConnection; 014import dk.netarkivet.common.distribute.JMSConnectionFactory; 015 016/** 017 * Helper class to test the status of the number of submitted jobs on our JMS Queues. 018 * Uses the same QueueSession for all calls to getCount() to avoid memory-leak caused by accumulation 019 * of imqConsumerReader threads. 020 */ 021public class QueueController { 022 023 /** The logger to use. */ 024 private static final Logger log = LoggerFactory.getLogger(QueueController.class); 025 026 /** Connection to JMS provider. */ 027 private JMSConnection jmsConnection; 028 /** The current qSession. */ 029 QueueSession qSession = null; 030 031 public QueueController() { 032 this.jmsConnection = JMSConnectionFactory.getInstance(); 033 } 034 035 /** 036 * Retrieve the number of current messages defined by the given queueID. 037 * @param queueID a given QueueID 038 * @return the number of current messages defined by the given queueID 039 */ 040 synchronized int getCount(ChannelID queueID) { 041 QueueBrowser qBrowser = null; 042 int submittedCounter = 0; 043 try { 044 if (jmsConnection == null) { 045 jmsConnection = JMSConnectionFactory.getInstance(); 046 log.info("Fetched a new JMSConnection from the factory"); 047 } 048 if (qSession == null) { 049 qSession = jmsConnection.getQueueSession(); 050 log.info("Created a new QueueSession"); 051 } 052 qBrowser = jmsConnection.createQueueBrowser(queueID, qSession); 053 Enumeration msgs = qBrowser.getEnumeration(); 054 055 if ( !msgs.hasMoreElements() ) { 056 return 0; 057 } else { 058 while (msgs.hasMoreElements()) { 059 msgs.nextElement(); 060 submittedCounter++; 061 } 062 } 063 } catch (JMSException e) { 064 log.warn("JMSException thrown: ", e); 065 jmsConnection = null; 066 qSession = null; 067 } catch (Exception e1) { 068 log.warn("Unexpected exception of type {} thrown: ", e1.getClass().getName(), e1); 069 } finally { 070 if (qBrowser != null) { 071 try { 072 qBrowser.close(); 073 } catch (JMSException e) { 074 log.debug("JMSException while trying to close the qBrowser.", e); 075 } 076 qBrowser = null; 077 } 078 } 079 080 return submittedCounter; 081 } 082}