001/*
002 * #%L
003 * Netarchivesuite - harvester
004 * %%
005 * Copyright (C) 2005 - 2018 The Royal Danish Library, 
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.harvester.scheduler;
024
025import java.util.Enumeration;
026
027import javax.jms.JMSException;
028import javax.jms.QueueBrowser;
029
030import org.apache.commons.lang.StringUtils;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034import dk.netarkivet.common.distribute.ChannelID;
035import dk.netarkivet.common.distribute.JMSConnection;
036import dk.netarkivet.common.exceptions.ArgumentNotValid;
037import dk.netarkivet.common.exceptions.UnknownID;
038import dk.netarkivet.common.lifecycle.ComponentLifeCycle;
039import dk.netarkivet.common.utils.Settings;
040import dk.netarkivet.harvester.HarvesterSettings;
041import dk.netarkivet.harvester.datamodel.HarvestChannel;
042import dk.netarkivet.harvester.datamodel.HarvestChannelDAO;
043import dk.netarkivet.harvester.distribute.HarvesterChannels;
044import dk.netarkivet.harvester.distribute.HarvesterMessageHandler;
045import dk.netarkivet.harvester.harvesting.distribute.HarvesterReadyMessage;
046import dk.netarkivet.harvester.harvesting.distribute.HarvesterRegistrationRequest;
047import dk.netarkivet.harvester.harvesting.distribute.HarvesterRegistrationResponse;
048
049/**
050 * Handles the reception of status messages from the harvesters. Will call the {@link #visit(HarvesterReadyMessage)}
051 * method when a Ready message is received.
052 */
053public class HarvesterStatusReceiver extends HarvesterMessageHandler implements ComponentLifeCycle {
054
055    /** The logger to use. */
056    private static final Logger log = LoggerFactory.getLogger(HarvesterStatusReceiver.class);
057
058    /** @see HarvesterStatusReceiver#visit(dk.netarkivet.harvester.harvesting.distribute.HarvesterReadyMessage) */
059    private final JobDispatcher jobDispatcher;
060    /** Connection to JMS provider. */
061    private final JMSConnection jmsConnection;
062
063    /** The DAO handling {@link HarvestChannel}s */
064    private final HarvestChannelDAO harvestChannelDao;
065
066    private final HarvestChannelRegistry harvestChannelRegistry;
067    /** Is the feature to limit the number of submitted messages in each queue enabled? */
068        private final Boolean limitSubmittedJobsInQueue;
069        /** The number of submitted messages in each queue. Only used, if the above is true */
070        private final int submittedJobsInQueueThreshold;
071
072    /**
073     * Constructor of the <code>HarvesterStatusReceiver</code>.
074     * This constructs also reads from settings, if we're limiting the number of submitted messages in each queue, and its limit.
075     * If the setting 'settings.harvester.scheduler.limitSubmittedJobsInQueue' is false, no limit is enforced, otherwise the limit is
076     * defined by setting 'settings.harvester.scheduler.submittedJobsInQueueLimit'.
077     * @param jobDispatcher The <code>JobDispatcher</code> to delegate the dispatching of new jobs to, when a 'Ready for
078     * job' event is received. 
079     * @param jmsConnection The JMS connection by which {@link HarvesterReadyMessage} is received.
080     * @param harvestChannelDao The specific HarvestChannelDAO instance to use 
081     * @param harvestChannelRegistry The specific HarvestChannelRegistry instance to use
082     */
083    public HarvesterStatusReceiver(JobDispatcher jobDispatcher, JMSConnection jmsConnection,
084            HarvestChannelDAO harvestChannelDao, HarvestChannelRegistry harvestChannelRegistry) {
085        ArgumentNotValid.checkNotNull(jobDispatcher, "jobDispatcher");
086        ArgumentNotValid.checkNotNull(jmsConnection, "jmsConnection");
087        ArgumentNotValid.checkNotNull(harvestChannelDao, "harvestChannelDao");
088        this.jobDispatcher = jobDispatcher;
089        this.jmsConnection = jmsConnection;
090        this.harvestChannelDao = harvestChannelDao;
091        this.harvestChannelRegistry = harvestChannelRegistry;
092        this.limitSubmittedJobsInQueue = Settings.getBoolean(HarvesterSettings.SCHEDULER_LIMIT_SUBMITTED_JOBS_IN_QUEUE);
093        this.submittedJobsInQueueThreshold = Settings.getInt(HarvesterSettings.SCHEDULER_SUBMITTED_JOBS_IN_QUEUE_LIMIT);
094    }
095
096    @Override
097    public void start() {
098        jmsConnection.setListener(HarvesterChannels.getHarvesterStatusChannel(), this);
099        jmsConnection.setListener(HarvesterChannels.getHarvesterRegistrationRequestChannel(), this);
100        log.info("limitSubmittedJobsInQueue: {}", limitSubmittedJobsInQueue);
101        if (limitSubmittedJobsInQueue) {
102                log.info("submittedJobsInQueueThreshold: {}", submittedJobsInQueueThreshold);
103        }
104    }
105
106    @Override
107    public void shutdown() {
108        jmsConnection.removeListener(HarvesterChannels.getHarvesterStatusChannel(), this);
109    }
110
111    /**
112     * Tells the dispatcher that it may dispatch a new job.
113     *
114     * @param message The message containing the relevant harvester information.
115     */
116    @Override
117    public void visit(HarvesterReadyMessage message) {
118        ArgumentNotValid.checkNotNull(message, "message");
119        log.trace("Received ready message from {} on host {}", message.getApplicationInstanceId(), message.getHostName() );
120        HarvestChannel channel = harvestChannelDao.getByName(message.getHarvestChannelName());
121        if (!harvestChannelRegistry.isRegistered(message.getHarvestChannelName())) {
122                log.info("Reregistering the harvester '{}' to channel '{}'", message.getApplicationInstanceId(),message.getHarvestChannelName()); 
123                harvestChannelRegistry.register(message.getHarvestChannelName(), message.getApplicationInstanceId());
124        } else if (!harvestChannelRegistry.isRegisteredToChannel(message.getApplicationInstanceId(), message.getHarvestChannelName())) {
125                harvestChannelRegistry.register(message.getHarvestChannelName(), message.getApplicationInstanceId());
126        };
127        if (limitSubmittedJobsInQueue) {
128                // Check If already a Message in the JMS queue for this channel
129                ChannelID relevantChannelId = HarvesterChannels.getHarvestJobChannelId(channel);
130                int currentCount = getCount(relevantChannelId);
131                if (currentCount < submittedJobsInQueueThreshold) {
132                        jobDispatcher.submitNextNewJob(channel);
133                } else {
134                        log.debug("No jobs submitted to channel {} after receiving ready message from {}. "
135                                        + "Already {} jobs submitted to channel ", relevantChannelId, message.getApplicationInstanceId(),
136                                        currentCount);
137                }
138        } else { // If no limit, always submit new job, if a job in status NEW exists scheduled for this channel
139                jobDispatcher.submitNextNewJob(channel);
140        }
141    }
142
143    @Override
144    public void visit(HarvesterRegistrationRequest msg) {
145        ArgumentNotValid.checkNotNull(msg, "msg");
146
147        String harvesterInstanceId = msg.getInstanceId();
148        String channelName = msg.getHarvestChannelName();
149
150        boolean isSnapshot = true;
151        boolean isValid = true;
152        try {
153            HarvestChannel chan = harvestChannelDao.getByName(channelName);
154            isSnapshot = chan.isSnapshot();
155        } catch (UnknownID e) {
156                log.warn("The channel '{}' is unknown by the channels table, wherefore the HarvesterRegistrationRequest is denied. The known channels are ", channelName, 
157                                StringUtils.join(harvestChannelDao.getAll(true), ","));
158            isValid = false;
159        }
160
161        if (isValid) {
162            harvestChannelRegistry.register(channelName, harvesterInstanceId);
163        }
164
165        // Send the reply
166        jmsConnection.send(new HarvesterRegistrationResponse(channelName, isValid, isSnapshot));
167        log.info("Sent a message to host {} to notify that harvest channel '{}' is {}", msg.getHostname(), channelName, (isValid ? "valid."
168                : "invalid."));
169    }
170    
171    /**
172     * Retrieve the number of current messages defined by the given queueID. 
173     * @param queueID a given QueueID
174     * @return the number of current messages defined by the given queueID
175     */
176    private int getCount(ChannelID queueID) {
177        QueueBrowser qBrowser;
178        int count=0;
179        try {
180                qBrowser = jmsConnection.createQueueBrowser(queueID);
181                Enumeration msgs = qBrowser.getEnumeration();
182
183                if ( !msgs.hasMoreElements() ) {
184                        return 0;
185                } else { 
186                        while (msgs.hasMoreElements()) { 
187                                msgs.nextElement();
188                                count++;
189                        }
190                }
191                qBrowser.close();
192        } catch (JMSException e) {
193                log.warn("JMSException thrown: ", e);
194        } catch (Throwable e1) {
195                log.warn("Unexpected exception of type {} thrown: ", e1.getClass().getName(), e1);
196        }
197
198        return count;
199    }
200    
201}