001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2018 The Royal Danish Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023 024package dk.netarkivet.harvester.scheduler; 025 026import java.util.Date; 027 028import javax.inject.Provider; 029import javax.jms.MessageListener; 030 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034import dk.netarkivet.common.distribute.JMSConnection; 035import dk.netarkivet.common.distribute.NetarkivetMessage; 036import dk.netarkivet.common.exceptions.ArgumentNotValid; 037import dk.netarkivet.common.lifecycle.ComponentLifeCycle; 038import dk.netarkivet.common.utils.NotificationType; 039import dk.netarkivet.common.utils.Notifications; 040import dk.netarkivet.harvester.datamodel.HarvestDefinition; 041import dk.netarkivet.harvester.datamodel.HarvestDefinitionDAO; 042import dk.netarkivet.harvester.datamodel.Job; 043import dk.netarkivet.harvester.datamodel.JobDAO; 044import dk.netarkivet.harvester.datamodel.JobStatus; 045import dk.netarkivet.harvester.distribute.HarvesterChannels; 046import dk.netarkivet.harvester.distribute.HarvesterMessageHandler; 047import dk.netarkivet.harvester.distribute.IndexReadyMessage; 048import dk.netarkivet.harvester.harvesting.distribute.CrawlProgressMessage; 049import dk.netarkivet.harvester.harvesting.distribute.CrawlStatusMessage; 050import dk.netarkivet.harvester.harvesting.distribute.JobEndedMessage; 051 052/** 053 * Submitted harvesting jobs are registered by listening for CrawlStatusMessages on the 054 * THE_SCHED queue and processes completed harvests. 055 */ 056public class HarvestSchedulerMonitorServer 057 extends HarvesterMessageHandler 058 implements MessageListener, ComponentLifeCycle { 059 private static final Logger log = LoggerFactory.getLogger(HarvestSchedulerMonitorServer.class); 060 061 private final Provider<JMSConnection> jmsConnectionProvider; 062 private final Provider<JobDAO> jobDAOProvider; 063 private final Provider<HarvestDefinitionDAO> harvestDefinitionDAOProvider; 064 private final Provider<Notifications> notificationsProvider; 065 066 public HarvestSchedulerMonitorServer( 067 Provider<JMSConnection> jmsConnectionProvider, 068 Provider<JobDAO> jobDAOProvider, 069 Provider<HarvestDefinitionDAO> harvestDefinitionDAOProvider, 070 Provider<Notifications> notificationsProvider) { 071 this.jmsConnectionProvider = jmsConnectionProvider; 072 this.jobDAOProvider = jobDAOProvider; 073 this.harvestDefinitionDAOProvider = harvestDefinitionDAOProvider; 074 this.notificationsProvider = notificationsProvider; 075 } 076 077 @Override 078 public void start() { 079 jmsConnectionProvider.get().setListener(HarvesterChannels.getTheSched(), this); 080 } 081 082 /** 083 * Updates the job status with the information in the message and notifies the HarvestMonitor. 084 * If a DomainHarvestReport is included in either a DOne or FAILED message, the DomainHarvestReport is processed. 085 */ 086 private void processCrawlStatusMessage(CrawlStatusMessage cmsg) { 087 long jobID = cmsg.getJobID(); 088 JobStatus newStatus = cmsg.getStatusCode(); 089 Job job = jobDAOProvider.get().read(Long.valueOf(jobID)); 090 JobStatus oldStatus = job.getStatus(); 091 boolean ignoreDomainHarvestReport = false; // Don't ignore DomainHarvestReport unless job already in DONE or FAILED state 092 switch (newStatus) { 093 case STARTED: 094 if (oldStatus == JobStatus.NEW) { 095 log.warn("Received new status 'Started' for unsubmitted job {}", jobID); 096 } 097 if (oldStatus == JobStatus.SUBMITTED || oldStatus == JobStatus.NEW) { 098 log.info("Job #{} has been started by the harvester.", job.getJobID()); 099 100 job.setStatus(newStatus); 101 jobDAOProvider.get().update(job); 102 103 notifyRunningJobMonitor(new CrawlProgressMessage(job.getOrigHarvestDefinitionID(), jobID)); 104 } else { 105 log.warn("Received new status 'Started' for job {} with current status {}, ignoring.", 106 job.getJobID(), oldStatus); 107 } 108 break; 109 case DONE: 110 case FAILED: 111 if (oldStatus == JobStatus.STARTED || 112 oldStatus == JobStatus.SUBMITTED || 113 oldStatus == JobStatus.RESUBMITTED || 114 oldStatus == JobStatus.NEW) { 115 if (oldStatus != JobStatus.STARTED) { 116 log.warn("Received unexpected CrawlStatusMessage for job {} with new status {}, current state is {}", 117 jobID, newStatus ,oldStatus); 118 } 119 if (newStatus == JobStatus.FAILED) { 120 log.warn("Job {} failed: HarvestErrors = {}\n" + "HarvestErrorDetails = {}\n" 121 + "UploadErrors = {}\n" + "UploadErrorDetails = {}", jobID, cmsg.getHarvestErrors(), 122 cmsg.getHarvestErrorDetails(), cmsg.getUploadErrors(), cmsg.getUploadErrorDetails()); 123 } else { 124 log.info("Job #{} succesfully completed", jobID); 125 } 126 job.setStatus(newStatus); 127 job.appendHarvestErrors(cmsg.getHarvestErrors()); 128 job.appendHarvestErrorDetails(cmsg.getHarvestErrorDetails()); 129 job.appendUploadErrors(cmsg.getUploadErrors()); 130 job.appendUploadErrorDetails(cmsg.getUploadErrorDetails()); 131 } else { 132 // Received done or failed on already dead job. Bad! 133 // Marking as FAILED, unless oldStatus is DONE (issue NAS-2612) 134 JobStatus newStatus1 = JobStatus.FAILED; 135 // Ignore domainharvestreport if oldstatus either done or failed 136 if (oldStatus.equals(JobStatus.DONE)) { 137 newStatus1 = JobStatus.DONE; 138 ignoreDomainHarvestReport = true; 139 } else if (oldStatus.equals(JobStatus.FAILED)) { 140 ignoreDomainHarvestReport = true; 141 } 142 143 String message = "Received unexpected CrawlStatusMessage for job " + jobID + " with new status " + newStatus + 144 ", current state is " + oldStatus+ ". Marking job as " + newStatus1.name() + ". Reported harvestErrors on job: " + cmsg.getHarvestErrors(); 145 job.setStatus(newStatus1); 146 job.appendHarvestErrors(cmsg.getHarvestErrors()); 147 job.appendHarvestErrors(message); 148 job.appendHarvestErrorDetails(cmsg.getHarvestErrors()); 149 job.appendHarvestErrorDetails(message); 150 log.warn(message); 151 } 152 153 jobDAOProvider.get().update(job); 154 155 if (!ignoreDomainHarvestReport && cmsg.getDomainHarvestReport() != null) { 156 cmsg.getDomainHarvestReport().postProcess(job); 157 } 158 159 notifyRunningJobMonitor(new JobEndedMessage(job.getJobID(), newStatus)); 160 break; 161 default: 162 log.warn("CrawlStatusMessage tried to update job status to unsupported status {} for job {}", 163 newStatus, jobID); 164 break; 165 } 166 } 167 168 private void notifyRunningJobMonitor(NetarkivetMessage message) { 169 jmsConnectionProvider.get().send(message); 170 } 171 172 /** 173 * @param msg a given CrawlStatusMessage 174 * @see dk.netarkivet.harvester.distribute.HarvesterMessageHandler#visit(dk.netarkivet.harvester.harvesting.distribute.CrawlStatusMessage) 175 */ 176 public void visit(CrawlStatusMessage msg) { 177 ArgumentNotValid.checkNotNull(msg, "msg"); 178 processCrawlStatusMessage(msg); 179 } 180 181 /** 182 * Removes the HarvestSchedulerMonitorServer as listener to the JMS scheduler Channel. 183 */ 184 @Override 185 public void shutdown() { 186 // FIXME See NAS-1976 187 jmsConnectionProvider.get().removeListener(HarvesterChannels.getTheSched(), this); 188 } 189 190 @Override 191 public void visit(IndexReadyMessage msg) { 192 ArgumentNotValid.checkNotNull(msg, "msg"); 193 processIndexReadyMessage(msg); 194 } 195 196 private void processIndexReadyMessage(IndexReadyMessage msg) { 197 // Set isindexready to true if Indexisready is true 198 Long harvestId = msg.getHarvestId(); 199 boolean indexisready = msg.getIndexOK(); 200 if (harvestDefinitionDAOProvider.get().isSnapshot(harvestId)) { 201 harvestDefinitionDAOProvider.get().setIndexIsReady(harvestId, indexisready); 202 if (indexisready) { 203 log.info("Got message from the IndexServer, that the index is ready for harvest #{}", harvestId); 204 } else { 205 String errMsg = "Got message from IndexServer, that it failed to generate index for" + " harvest # " 206 + harvestId + ". Deactivating harvest"; 207 log.warn(errMsg); 208 HarvestDefinition hd = harvestDefinitionDAOProvider.get().read(harvestId); 209 hd.setActive(false); 210 StringBuilder commentsBuf = new StringBuilder(hd.getComments()); 211 commentsBuf.append("\n" + (new Date()) 212 + ": Deactivated by the system because indexserver failed to generate index"); 213 hd.setComments(commentsBuf.toString()); 214 harvestDefinitionDAOProvider.get().update(hd); 215 notificationsProvider.get().notify(errMsg, NotificationType.ERROR); 216 } 217 } else { 218 log.debug("Ignoring IndexreadyMesssage sent on behalf on selective harvest w/id {}", harvestId); 219 } 220 } 221}