001/*
002 * #%L
003 * Netarchivesuite - harvester
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.harvester.scheduler;
024
025import org.apache.commons.lang.StringUtils;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028
029import dk.netarkivet.common.distribute.JMSConnection;
030import dk.netarkivet.common.exceptions.ArgumentNotValid;
031import dk.netarkivet.common.exceptions.UnknownID;
032import dk.netarkivet.common.lifecycle.ComponentLifeCycle;
033import dk.netarkivet.harvester.datamodel.HarvestChannel;
034import dk.netarkivet.harvester.datamodel.HarvestChannelDAO;
035import dk.netarkivet.harvester.distribute.HarvesterChannels;
036import dk.netarkivet.harvester.distribute.HarvesterMessageHandler;
037import dk.netarkivet.harvester.harvesting.distribute.HarvesterReadyMessage;
038import dk.netarkivet.harvester.harvesting.distribute.HarvesterRegistrationRequest;
039import dk.netarkivet.harvester.harvesting.distribute.HarvesterRegistrationResponse;
040
041/**
042 * Handles the reception of status messages from the harvesters. Will call the {@link #visit(HarvesterReadyMessage)}
043 * method when a Ready message is received.
044 */
045public class HarvesterStatusReceiver extends HarvesterMessageHandler implements ComponentLifeCycle {
046
047    /** The logger to use. */
048    private static final Logger log = LoggerFactory.getLogger(HarvesterStatusReceiver.class);
049
050    /** @see HarvesterStatusReceiver#visit(dk.netarkivet.harvester.harvesting.distribute.HarvesterReadyMessage) */
051    private final JobDispatcher jobDispatcher;
052    /** Connection to JMS provider. */
053    private final JMSConnection jmsConnection;
054
055    /** The DAO handling {@link HarvestChannel}s */
056    private final HarvestChannelDAO harvestChannelDao;
057
058    private final HarvestChannelRegistry harvestChannelRegistry;
059
060    /**
061     * @param jobDispatcher The <code>JobDispatcher</code> to delegate the dispatching of new jobs to, when a 'Ready for
062     * job' event is received.
063     * @param jmsConnection The JMS connection by which {@link HarvesterReadyMessage} is received.
064     */
065    public HarvesterStatusReceiver(JobDispatcher jobDispatcher, JMSConnection jmsConnection,
066            HarvestChannelDAO harvestChannelDao, HarvestChannelRegistry harvestChannelRegistry) {
067        ArgumentNotValid.checkNotNull(jobDispatcher, "jobDispatcher");
068        ArgumentNotValid.checkNotNull(jmsConnection, "jmsConnection");
069        ArgumentNotValid.checkNotNull(harvestChannelDao, "harvestChannelDao");
070        this.jobDispatcher = jobDispatcher;
071        this.jmsConnection = jmsConnection;
072        this.harvestChannelDao = harvestChannelDao;
073        this.harvestChannelRegistry = harvestChannelRegistry;
074    }
075
076    @Override
077    public void start() {
078        jmsConnection.setListener(HarvesterChannels.getHarvesterStatusChannel(), this);
079        jmsConnection.setListener(HarvesterChannels.getHarvesterRegistrationRequestChannel(), this);
080    }
081
082    @Override
083    public void shutdown() {
084        jmsConnection.removeListener(HarvesterChannels.getHarvesterStatusChannel(), this);
085    }
086
087    /**
088     * Tells the dispatcher that it may dispatch a new job.
089     *
090     * @param message The message containing the relevant harvester information.
091     */
092    @Override
093    public void visit(HarvesterReadyMessage message) {
094        ArgumentNotValid.checkNotNull(message, "message");
095        log.trace("Received ready message from {} on host {}", message.getApplicationInstanceId(), message.getHostName() );
096        HarvestChannel channel = harvestChannelDao.getByName(message.getHarvestChannelName());
097        if (!harvestChannelRegistry.isRegistered(message.getHarvestChannelName())) {
098                log.info("Reregistering the harvester '{}' to channel '{}'", message.getApplicationInstanceId(),message.getHarvestChannelName()); 
099                harvestChannelRegistry.register(message.getHarvestChannelName(), message.getApplicationInstanceId());
100        } else if (!harvestChannelRegistry.isRegisteredToChannel(message.getApplicationInstanceId(), message.getHarvestChannelName())) {
101                harvestChannelRegistry.register(message.getHarvestChannelName(), message.getApplicationInstanceId());
102        };
103        jobDispatcher.submitNextNewJob(channel);
104    }
105
106    @Override
107    public void visit(HarvesterRegistrationRequest msg) {
108        ArgumentNotValid.checkNotNull(msg, "msg");
109
110        String harvesterInstanceId = msg.getInstanceId();
111        String channelName = msg.getHarvestChannelName();
112
113        boolean isSnapshot = true;
114        boolean isValid = true;
115        try {
116            HarvestChannel chan = harvestChannelDao.getByName(channelName);
117            isSnapshot = chan.isSnapshot();
118        } catch (UnknownID e) {
119                log.warn("The channel '{}' is unknown by the channels table, wherefore the HarvesterRegistrationRequest is denied. The known channels are ", channelName, 
120                                StringUtils.join(harvestChannelDao.getAll(true), ","));
121            isValid = false;
122        }
123
124        if (isValid) {
125            harvestChannelRegistry.register(channelName, harvesterInstanceId);
126        }
127
128        // Send the reply
129        jmsConnection.send(new HarvesterRegistrationResponse(channelName, isValid, isSnapshot));
130        log.info("Sent a message to host {} to notify that harvest channel '{}' is {}", msg.getHostname(), channelName, (isValid ? "valid."
131                : "invalid."));
132    }
133}