001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.harvester.scheduler; 024 025import org.apache.commons.lang.StringUtils; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028 029import dk.netarkivet.common.distribute.JMSConnection; 030import dk.netarkivet.common.exceptions.ArgumentNotValid; 031import dk.netarkivet.common.exceptions.UnknownID; 032import dk.netarkivet.common.lifecycle.ComponentLifeCycle; 033import dk.netarkivet.harvester.datamodel.HarvestChannel; 034import dk.netarkivet.harvester.datamodel.HarvestChannelDAO; 035import dk.netarkivet.harvester.distribute.HarvesterChannels; 036import dk.netarkivet.harvester.distribute.HarvesterMessageHandler; 037import dk.netarkivet.harvester.harvesting.distribute.HarvesterReadyMessage; 038import dk.netarkivet.harvester.harvesting.distribute.HarvesterRegistrationRequest; 039import dk.netarkivet.harvester.harvesting.distribute.HarvesterRegistrationResponse; 040 041/** 042 * Handles the reception of status messages from the harvesters. Will call the {@link #visit(HarvesterReadyMessage)} 043 * method when a Ready message is received. 044 */ 045public class HarvesterStatusReceiver extends HarvesterMessageHandler implements ComponentLifeCycle { 046 047 /** The logger to use. */ 048 private static final Logger log = LoggerFactory.getLogger(HarvesterStatusReceiver.class); 049 050 /** @see HarvesterStatusReceiver#visit(dk.netarkivet.harvester.harvesting.distribute.HarvesterReadyMessage) */ 051 private final JobDispatcher jobDispatcher; 052 /** Connection to JMS provider. */ 053 private final JMSConnection jmsConnection; 054 055 /** The DAO handling {@link HarvestChannel}s */ 056 private final HarvestChannelDAO harvestChannelDao; 057 058 private final HarvestChannelRegistry harvestChannelRegistry; 059 060 /** 061 * @param jobDispatcher The <code>JobDispatcher</code> to delegate the dispatching of new jobs to, when a 'Ready for 062 * job' event is received. 063 * @param jmsConnection The JMS connection by which {@link HarvesterReadyMessage} is received. 064 */ 065 public HarvesterStatusReceiver(JobDispatcher jobDispatcher, JMSConnection jmsConnection, 066 HarvestChannelDAO harvestChannelDao, HarvestChannelRegistry harvestChannelRegistry) { 067 ArgumentNotValid.checkNotNull(jobDispatcher, "jobDispatcher"); 068 ArgumentNotValid.checkNotNull(jmsConnection, "jmsConnection"); 069 ArgumentNotValid.checkNotNull(harvestChannelDao, "harvestChannelDao"); 070 this.jobDispatcher = jobDispatcher; 071 this.jmsConnection = jmsConnection; 072 this.harvestChannelDao = harvestChannelDao; 073 this.harvestChannelRegistry = harvestChannelRegistry; 074 } 075 076 @Override 077 public void start() { 078 jmsConnection.setListener(HarvesterChannels.getHarvesterStatusChannel(), this); 079 jmsConnection.setListener(HarvesterChannels.getHarvesterRegistrationRequestChannel(), this); 080 } 081 082 @Override 083 public void shutdown() { 084 jmsConnection.removeListener(HarvesterChannels.getHarvesterStatusChannel(), this); 085 } 086 087 /** 088 * Tells the dispatcher that it may dispatch a new job. 089 * 090 * @param message The message containing the relevant harvester information. 091 */ 092 @Override 093 public void visit(HarvesterReadyMessage message) { 094 ArgumentNotValid.checkNotNull(message, "message"); 095 log.trace("Received ready message from {} on host {}", message.getApplicationInstanceId(), message.getHostName() ); 096 HarvestChannel channel = harvestChannelDao.getByName(message.getHarvestChannelName()); 097 if (!harvestChannelRegistry.isRegistered(message.getHarvestChannelName())) { 098 log.info("Reregistering the harvester '{}' to channel '{}'", message.getApplicationInstanceId(),message.getHarvestChannelName()); 099 harvestChannelRegistry.register(message.getHarvestChannelName(), message.getApplicationInstanceId()); 100 } else if (!harvestChannelRegistry.isRegisteredToChannel(message.getApplicationInstanceId(), message.getHarvestChannelName())) { 101 harvestChannelRegistry.register(message.getHarvestChannelName(), message.getApplicationInstanceId()); 102 }; 103 jobDispatcher.submitNextNewJob(channel); 104 } 105 106 @Override 107 public void visit(HarvesterRegistrationRequest msg) { 108 ArgumentNotValid.checkNotNull(msg, "msg"); 109 110 String harvesterInstanceId = msg.getInstanceId(); 111 String channelName = msg.getHarvestChannelName(); 112 113 boolean isSnapshot = true; 114 boolean isValid = true; 115 try { 116 HarvestChannel chan = harvestChannelDao.getByName(channelName); 117 isSnapshot = chan.isSnapshot(); 118 } catch (UnknownID e) { 119 log.warn("The channel '{}' is unknown by the channels table, wherefore the HarvesterRegistrationRequest is denied. The known channels are ", channelName, 120 StringUtils.join(harvestChannelDao.getAll(true), ",")); 121 isValid = false; 122 } 123 124 if (isValid) { 125 harvestChannelRegistry.register(channelName, harvesterInstanceId); 126 } 127 128 // Send the reply 129 jmsConnection.send(new HarvesterRegistrationResponse(channelName, isValid, isSnapshot)); 130 log.info("Sent a message to host {} to notify that harvest channel '{}' is {}", msg.getHostname(), channelName, (isValid ? "valid." 131 : "invalid.")); 132 } 133}