001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023 024package dk.netarkivet.harvester.scheduler; 025 026import java.util.Date; 027 028import javax.inject.Provider; 029import javax.jms.MessageListener; 030 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034import dk.netarkivet.common.distribute.JMSConnection; 035import dk.netarkivet.common.distribute.NetarkivetMessage; 036import dk.netarkivet.common.exceptions.ArgumentNotValid; 037import dk.netarkivet.common.lifecycle.ComponentLifeCycle; 038import dk.netarkivet.common.utils.NotificationType; 039import dk.netarkivet.common.utils.Notifications; 040import dk.netarkivet.harvester.datamodel.HarvestDefinition; 041import dk.netarkivet.harvester.datamodel.HarvestDefinitionDAO; 042import dk.netarkivet.harvester.datamodel.Job; 043import dk.netarkivet.harvester.datamodel.JobDAO; 044import dk.netarkivet.harvester.datamodel.JobStatus; 045import dk.netarkivet.harvester.distribute.HarvesterChannels; 046import dk.netarkivet.harvester.distribute.HarvesterMessageHandler; 047import dk.netarkivet.harvester.distribute.IndexReadyMessage; 048import dk.netarkivet.harvester.harvesting.distribute.CrawlProgressMessage; 049import dk.netarkivet.harvester.harvesting.distribute.CrawlStatusMessage; 050import dk.netarkivet.harvester.harvesting.distribute.JobEndedMessage; 051 052/** 053 * Submitted harvesting jobs are registered by listening for CrawlStatusMessages on the 054 * THE_SCHED queue and processes completed harvests. 055 */ 056public class HarvestSchedulerMonitorServer 057 extends HarvesterMessageHandler 058 implements MessageListener, ComponentLifeCycle { 059 private static final Logger log = LoggerFactory.getLogger(HarvestSchedulerMonitorServer.class); 060 061 private final Provider<JMSConnection> jmsConnectionProvider; 062 private final Provider<JobDAO> jobDAOProvider; 063 private final Provider<HarvestDefinitionDAO> harvestDefinitionDAOProvider; 064 private final Provider<Notifications> notificationsProvider; 065 066 public HarvestSchedulerMonitorServer( 067 Provider<JMSConnection> jmsConnectionProvider, 068 Provider<JobDAO> jobDAOProvider, 069 Provider<HarvestDefinitionDAO> harvestDefinitionDAOProvider, 070 Provider<Notifications> notificationsProvider) { 071 this.jmsConnectionProvider = jmsConnectionProvider; 072 this.jobDAOProvider = jobDAOProvider; 073 this.harvestDefinitionDAOProvider = harvestDefinitionDAOProvider; 074 this.notificationsProvider = notificationsProvider; 075 } 076 077 @Override 078 public void start() { 079 jmsConnectionProvider.get().setListener(HarvesterChannels.getTheSched(), this); 080 } 081 082 /** 083 * Updates the job status with the information in the message and notifies the HarvestMonitor. 084 * If a DomainHarvestReport is included in either a DOne or FAILED message, the DomainHarvestReport is processed. 085 */ 086 private void processCrawlStatusMessage(CrawlStatusMessage cmsg) { 087 long jobID = cmsg.getJobID(); 088 JobStatus newStatus = cmsg.getStatusCode(); 089 Job job = jobDAOProvider.get().read(Long.valueOf(jobID)); 090 JobStatus oldStatus = job.getStatus(); 091 092 switch (newStatus) { 093 case STARTED: 094 if (oldStatus == JobStatus.NEW) { 095 log.warn("Received new status 'Started' for unsubmitted job {}", jobID); 096 } 097 if (oldStatus == JobStatus.SUBMITTED || oldStatus == JobStatus.NEW) { 098 log.info("Job #{} has been started by the harvester.", job.getJobID()); 099 100 job.setStatus(newStatus); 101 jobDAOProvider.get().update(job); 102 103 notifyRunningJobMonitor(new CrawlProgressMessage(job.getOrigHarvestDefinitionID(), jobID)); 104 } else { 105 log.warn("Received new status 'Started' for job {} with current status {}, ignoring.", 106 job.getJobID(), oldStatus); 107 } 108 break; 109 case DONE: 110 case FAILED: 111 if (oldStatus == JobStatus.STARTED || 112 oldStatus == JobStatus.SUBMITTED || 113 oldStatus == JobStatus.RESUBMITTED || 114 oldStatus == JobStatus.NEW) { 115 if (oldStatus != JobStatus.STARTED) { 116 log.warn("Received CrawlStatusMessage for job {} with new status {}, current state is {}", 117 jobID, newStatus ,oldStatus); 118 } 119 if (newStatus == JobStatus.FAILED) { 120 log.warn("Job {} failed: HarvestErrors = {}\n" + "HarvestErrorDetails = {}\n" 121 + "UploadErrors = {}\n" + "UploadErrorDetails = {}", jobID, cmsg.getHarvestErrors(), 122 cmsg.getHarvestErrorDetails(), cmsg.getUploadErrors(), cmsg.getUploadErrorDetails()); 123 } else { 124 log.info("Job #{} succesfully completed", jobID); 125 } 126 job.setStatus(newStatus); 127 job.appendHarvestErrors(cmsg.getHarvestErrors()); 128 job.appendHarvestErrorDetails(cmsg.getHarvestErrorDetails()); 129 job.appendUploadErrors(cmsg.getUploadErrors()); 130 job.appendUploadErrorDetails(cmsg.getUploadErrorDetails()); 131 } else { 132 // Received done or failed on already dead job. Bad! 133 String message = "Received CrawlStatusMessage for job " + jobID + " with new status " + newStatus + 134 ", current state is " + oldStatus+ ". Marking job as FAILED"; 135 log.warn(message); 136 job.setStatus(JobStatus.FAILED); 137 job.appendHarvestErrors(cmsg.getHarvestErrors()); 138 job.appendHarvestErrors(message); 139 job.appendHarvestErrorDetails(cmsg.getHarvestErrors()); 140 job.appendHarvestErrorDetails(message); 141 log.warn("Job {} failed: {}", jobID, job.getHarvestErrorDetails()); 142 } 143 144 jobDAOProvider.get().update(job); 145 146 if (cmsg.getDomainHarvestReport() != null) { 147 cmsg.getDomainHarvestReport().postProcess(job); 148 } 149 150 notifyRunningJobMonitor(new JobEndedMessage(job.getJobID(), newStatus)); 151 break; 152 default: 153 log.warn("CrawlStatusMessage tried to update job status to unsupported status {} for job {}", 154 newStatus, jobID); 155 break; 156 } 157 } 158 159 private void notifyRunningJobMonitor(NetarkivetMessage message) { 160 jmsConnectionProvider.get().send(message); 161 } 162 163 /** 164 * @param msg a given CrawlStatusMessage 165 * @see dk.netarkivet.harvester.distribute.HarvesterMessageHandler#visit(dk.netarkivet.harvester.harvesting.distribute.CrawlStatusMessage) 166 */ 167 public void visit(CrawlStatusMessage msg) { 168 ArgumentNotValid.checkNotNull(msg, "msg"); 169 processCrawlStatusMessage(msg); 170 } 171 172 /** 173 * Removes the HarvestSchedulerMonitorServer as listener to the JMS scheduler Channel. 174 */ 175 @Override 176 public void shutdown() { 177 // FIXME See NAS-1976 178 jmsConnectionProvider.get().removeListener(HarvesterChannels.getTheSched(), this); 179 } 180 181 @Override 182 public void visit(IndexReadyMessage msg) { 183 ArgumentNotValid.checkNotNull(msg, "msg"); 184 processIndexReadyMessage(msg); 185 } 186 187 private void processIndexReadyMessage(IndexReadyMessage msg) { 188 // Set isindexready to true if Indexisready is true 189 Long harvestId = msg.getHarvestId(); 190 boolean indexisready = msg.getIndexOK(); 191 if (harvestDefinitionDAOProvider.get().isSnapshot(harvestId)) { 192 harvestDefinitionDAOProvider.get().setIndexIsReady(harvestId, indexisready); 193 if (indexisready) { 194 log.info("Got message from the IndexServer, that the index is ready for harvest #{}", harvestId); 195 } else { 196 String errMsg = "Got message from IndexServer, that it failed to generate index for" + " harvest # " 197 + harvestId + ". Deactivating harvest"; 198 log.warn(errMsg); 199 HarvestDefinition hd = harvestDefinitionDAOProvider.get().read(harvestId); 200 hd.setActive(false); 201 StringBuilder commentsBuf = new StringBuilder(hd.getComments()); 202 commentsBuf.append("\n" + (new Date()) 203 + ": Deactivated by the system because indexserver failed to generate index"); 204 hd.setComments(commentsBuf.toString()); 205 harvestDefinitionDAOProvider.get().update(hd); 206 notificationsProvider.get().notify(errMsg, NotificationType.ERROR); 207 } 208 } else { 209 log.debug("Ignoring IndexreadyMesssage sent on behalf on selective harvest w/id {}", harvestId); 210 } 211 } 212}