001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2018 The Royal Danish Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.harvester.scheduler; 024 025import java.util.Enumeration; 026 027import javax.jms.JMSException; 028import javax.jms.QueueBrowser; 029 030import org.apache.commons.lang.StringUtils; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034import dk.netarkivet.common.distribute.ChannelID; 035import dk.netarkivet.common.distribute.JMSConnection; 036import dk.netarkivet.common.exceptions.ArgumentNotValid; 037import dk.netarkivet.common.exceptions.UnknownID; 038import dk.netarkivet.common.lifecycle.ComponentLifeCycle; 039import dk.netarkivet.common.utils.Settings; 040import dk.netarkivet.harvester.HarvesterSettings; 041import dk.netarkivet.harvester.datamodel.HarvestChannel; 042import dk.netarkivet.harvester.datamodel.HarvestChannelDAO; 043import dk.netarkivet.harvester.distribute.HarvesterChannels; 044import dk.netarkivet.harvester.distribute.HarvesterMessageHandler; 045import dk.netarkivet.harvester.harvesting.distribute.HarvesterReadyMessage; 046import dk.netarkivet.harvester.harvesting.distribute.HarvesterRegistrationRequest; 047import dk.netarkivet.harvester.harvesting.distribute.HarvesterRegistrationResponse; 048 049/** 050 * Handles the reception of status messages from the harvesters. Will call the {@link #visit(HarvesterReadyMessage)} 051 * method when a Ready message is received. 052 */ 053public class HarvesterStatusReceiver extends HarvesterMessageHandler implements ComponentLifeCycle { 054 055 /** The logger to use. */ 056 private static final Logger log = LoggerFactory.getLogger(HarvesterStatusReceiver.class); 057 058 /** @see HarvesterStatusReceiver#visit(dk.netarkivet.harvester.harvesting.distribute.HarvesterReadyMessage) */ 059 private final JobDispatcher jobDispatcher; 060 /** Connection to JMS provider. */ 061 private final JMSConnection jmsConnection; 062 063 /** The DAO handling {@link HarvestChannel}s */ 064 private final HarvestChannelDAO harvestChannelDao; 065 066 private final HarvestChannelRegistry harvestChannelRegistry; 067 /** Is the feature to limit the number of submitted messages in each queue enabled? */ 068 private final Boolean limitSubmittedJobsInQueue; 069 /** The number of submitted messages in each queue. Only used, if the above is true */ 070 private final int submittedJobsInQueueThreshold; 071 072 /** 073 * Constructor of the <code>HarvesterStatusReceiver</code>. 074 * This constructs also reads from settings, if we're limiting the number of submitted messages in each queue, and its limit. 075 * If the setting 'settings.harvester.scheduler.limitSubmittedJobsInQueue' is false, no limit is enforced, otherwise the limit is 076 * defined by setting 'settings.harvester.scheduler.submittedJobsInQueueLimit'. 077 * @param jobDispatcher The <code>JobDispatcher</code> to delegate the dispatching of new jobs to, when a 'Ready for 078 * job' event is received. 079 * @param jmsConnection The JMS connection by which {@link HarvesterReadyMessage} is received. 080 * @param harvestChannelDao The specific HarvestChannelDAO instance to use 081 * @param harvestChannelRegistry The specific HarvestChannelRegistry instance to use 082 */ 083 public HarvesterStatusReceiver(JobDispatcher jobDispatcher, JMSConnection jmsConnection, 084 HarvestChannelDAO harvestChannelDao, HarvestChannelRegistry harvestChannelRegistry) { 085 ArgumentNotValid.checkNotNull(jobDispatcher, "jobDispatcher"); 086 ArgumentNotValid.checkNotNull(jmsConnection, "jmsConnection"); 087 ArgumentNotValid.checkNotNull(harvestChannelDao, "harvestChannelDao"); 088 this.jobDispatcher = jobDispatcher; 089 this.jmsConnection = jmsConnection; 090 this.harvestChannelDao = harvestChannelDao; 091 this.harvestChannelRegistry = harvestChannelRegistry; 092 this.limitSubmittedJobsInQueue = Settings.getBoolean(HarvesterSettings.SCHEDULER_LIMIT_SUBMITTED_JOBS_IN_QUEUE); 093 this.submittedJobsInQueueThreshold = Settings.getInt(HarvesterSettings.SCHEDULER_SUBMITTED_JOBS_IN_QUEUE_LIMIT); 094 } 095 096 @Override 097 public void start() { 098 jmsConnection.setListener(HarvesterChannels.getHarvesterStatusChannel(), this); 099 jmsConnection.setListener(HarvesterChannels.getHarvesterRegistrationRequestChannel(), this); 100 log.info("limitSubmittedJobsInQueue: {}", limitSubmittedJobsInQueue); 101 if (limitSubmittedJobsInQueue) { 102 log.info("submittedJobsInQueueThreshold: {}", submittedJobsInQueueThreshold); 103 } 104 } 105 106 @Override 107 public void shutdown() { 108 jmsConnection.removeListener(HarvesterChannels.getHarvesterStatusChannel(), this); 109 } 110 111 /** 112 * Tells the dispatcher that it may dispatch a new job. 113 * 114 * @param message The message containing the relevant harvester information. 115 */ 116 @Override 117 public void visit(HarvesterReadyMessage message) { 118 ArgumentNotValid.checkNotNull(message, "message"); 119 log.trace("Received ready message from {} on host {}", message.getApplicationInstanceId(), message.getHostName() ); 120 HarvestChannel channel = harvestChannelDao.getByName(message.getHarvestChannelName()); 121 if (!harvestChannelRegistry.isRegistered(message.getHarvestChannelName())) { 122 log.info("Reregistering the harvester '{}' to channel '{}'", message.getApplicationInstanceId(),message.getHarvestChannelName()); 123 harvestChannelRegistry.register(message.getHarvestChannelName(), message.getApplicationInstanceId()); 124 } else if (!harvestChannelRegistry.isRegisteredToChannel(message.getApplicationInstanceId(), message.getHarvestChannelName())) { 125 harvestChannelRegistry.register(message.getHarvestChannelName(), message.getApplicationInstanceId()); 126 }; 127 if (limitSubmittedJobsInQueue) { 128 // Check If already a Message in the JMS queue for this channel 129 ChannelID relevantChannelId = HarvesterChannels.getHarvestJobChannelId(channel); 130 int currentCount = getCount(relevantChannelId); 131 if (currentCount < submittedJobsInQueueThreshold) { 132 jobDispatcher.submitNextNewJob(channel); 133 } else { 134 log.debug("No jobs submitted to channel {} after receiving ready message from {}. " 135 + "Already {} jobs submitted to channel ", relevantChannelId, message.getApplicationInstanceId(), 136 currentCount); 137 } 138 } else { // If no limit, always submit new job, if a job in status NEW exists scheduled for this channel 139 jobDispatcher.submitNextNewJob(channel); 140 } 141 } 142 143 @Override 144 public void visit(HarvesterRegistrationRequest msg) { 145 ArgumentNotValid.checkNotNull(msg, "msg"); 146 147 String harvesterInstanceId = msg.getInstanceId(); 148 String channelName = msg.getHarvestChannelName(); 149 150 boolean isSnapshot = true; 151 boolean isValid = true; 152 try { 153 HarvestChannel chan = harvestChannelDao.getByName(channelName); 154 isSnapshot = chan.isSnapshot(); 155 } catch (UnknownID e) { 156 log.warn("The channel '{}' is unknown by the channels table, wherefore the HarvesterRegistrationRequest is denied. The known channels are ", channelName, 157 StringUtils.join(harvestChannelDao.getAll(true), ",")); 158 isValid = false; 159 } 160 161 if (isValid) { 162 harvestChannelRegistry.register(channelName, harvesterInstanceId); 163 } 164 165 // Send the reply 166 jmsConnection.send(new HarvesterRegistrationResponse(channelName, isValid, isSnapshot)); 167 log.info("Sent a message to host {} to notify that harvest channel '{}' is {}", msg.getHostname(), channelName, (isValid ? "valid." 168 : "invalid.")); 169 } 170 171 /** 172 * Retrieve the number of current messages defined by the given queueID. 173 * @param queueID a given QueueID 174 * @return the number of current messages defined by the given queueID 175 */ 176 private int getCount(ChannelID queueID) { 177 QueueBrowser qBrowser; 178 int count=0; 179 try { 180 qBrowser = jmsConnection.createQueueBrowser(queueID); 181 Enumeration msgs = qBrowser.getEnumeration(); 182 183 if ( !msgs.hasMoreElements() ) { 184 return 0; 185 } else { 186 while (msgs.hasMoreElements()) { 187 msgs.nextElement(); 188 count++; 189 } 190 } 191 qBrowser.close(); 192 } catch (JMSException e) { 193 log.warn("JMSException thrown: ", e); 194 } catch (Throwable e1) { 195 log.warn("Unexpected exception of type {} thrown: ", e1.getClass().getName(), e1); 196 } 197 198 return count; 199 } 200 201}