001/*
002 * #%L
003 * Netarchivesuite - harvester
004 * %%
005 * Copyright (C) 2005 - 2018 The Royal Danish Library, 
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023
024package dk.netarkivet.harvester.scheduler;
025
026import java.util.Date;
027
028import javax.inject.Provider;
029import javax.jms.MessageListener;
030
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034import dk.netarkivet.common.distribute.JMSConnection;
035import dk.netarkivet.common.distribute.NetarkivetMessage;
036import dk.netarkivet.common.exceptions.ArgumentNotValid;
037import dk.netarkivet.common.lifecycle.ComponentLifeCycle;
038import dk.netarkivet.common.utils.NotificationType;
039import dk.netarkivet.common.utils.Notifications;
040import dk.netarkivet.harvester.datamodel.HarvestDefinition;
041import dk.netarkivet.harvester.datamodel.HarvestDefinitionDAO;
042import dk.netarkivet.harvester.datamodel.Job;
043import dk.netarkivet.harvester.datamodel.JobDAO;
044import dk.netarkivet.harvester.datamodel.JobStatus;
045import dk.netarkivet.harvester.distribute.HarvesterChannels;
046import dk.netarkivet.harvester.distribute.HarvesterMessageHandler;
047import dk.netarkivet.harvester.distribute.IndexReadyMessage;
048import dk.netarkivet.harvester.harvesting.distribute.CrawlProgressMessage;
049import dk.netarkivet.harvester.harvesting.distribute.CrawlStatusMessage;
050import dk.netarkivet.harvester.harvesting.distribute.JobEndedMessage;
051
052/**
053 * Submitted harvesting jobs are registered by listening for CrawlStatusMessages on the
054 * THE_SCHED queue and processes completed harvests.
055 */
056public class HarvestSchedulerMonitorServer
057        extends HarvesterMessageHandler
058        implements MessageListener, ComponentLifeCycle {
059    private static final Logger log = LoggerFactory.getLogger(HarvestSchedulerMonitorServer.class);
060
061    private final Provider<JMSConnection> jmsConnectionProvider;
062    private final Provider<JobDAO> jobDAOProvider;
063    private final Provider<HarvestDefinitionDAO> harvestDefinitionDAOProvider;
064    private final Provider<Notifications> notificationsProvider;
065
066    public HarvestSchedulerMonitorServer(
067            Provider<JMSConnection> jmsConnectionProvider,
068            Provider<JobDAO> jobDAOProvider,
069            Provider<HarvestDefinitionDAO> harvestDefinitionDAOProvider,
070            Provider<Notifications> notificationsProvider) {
071        this.jmsConnectionProvider = jmsConnectionProvider;
072        this.jobDAOProvider = jobDAOProvider;
073        this.harvestDefinitionDAOProvider = harvestDefinitionDAOProvider;
074        this.notificationsProvider = notificationsProvider;
075    }
076
077    @Override
078    public void start() {
079        jmsConnectionProvider.get().setListener(HarvesterChannels.getTheSched(), this);
080    }
081
082    /**
083     * Updates the job status with the information in the message and notifies the HarvestMonitor.
084     * If a DomainHarvestReport is included in either a DOne or FAILED message, the DomainHarvestReport is processed.
085     */
086    private void processCrawlStatusMessage(CrawlStatusMessage cmsg) {
087        long jobID = cmsg.getJobID();
088        JobStatus newStatus = cmsg.getStatusCode();
089        Job job = jobDAOProvider.get().read(Long.valueOf(jobID));
090        JobStatus oldStatus = job.getStatus();
091        boolean ignoreDomainHarvestReport = false; // Don't ignore DomainHarvestReport unless job already in DONE or FAILED state  
092        switch (newStatus) {
093        case STARTED:
094            if (oldStatus == JobStatus.NEW) {
095                log.warn("Received new status 'Started' for unsubmitted job {}", jobID);
096            }
097            if (oldStatus == JobStatus.SUBMITTED || oldStatus == JobStatus.NEW) {
098                log.info("Job #{} has been started by the harvester.", job.getJobID());
099
100                job.setStatus(newStatus);
101                jobDAOProvider.get().update(job);
102
103                notifyRunningJobMonitor(new CrawlProgressMessage(job.getOrigHarvestDefinitionID(), jobID));
104            } else {
105                log.warn("Received new status 'Started' for job {} with current status {}, ignoring.",
106                        job.getJobID(), oldStatus);
107            }
108            break;
109        case DONE:
110        case FAILED:
111            if (oldStatus == JobStatus.STARTED ||
112                    oldStatus == JobStatus.SUBMITTED ||
113                    oldStatus == JobStatus.RESUBMITTED ||
114                    oldStatus == JobStatus.NEW) {
115                if (oldStatus != JobStatus.STARTED) {
116                    log.warn("Received unexpected CrawlStatusMessage for job {} with new status {}, current state is {}",
117                            jobID, newStatus ,oldStatus);
118                }
119                if (newStatus == JobStatus.FAILED) {
120                    log.warn("Job {} failed: HarvestErrors = {}\n" + "HarvestErrorDetails = {}\n"
121                                    + "UploadErrors = {}\n" + "UploadErrorDetails = {}", jobID, cmsg.getHarvestErrors(),
122                            cmsg.getHarvestErrorDetails(), cmsg.getUploadErrors(), cmsg.getUploadErrorDetails());
123                } else {
124                    log.info("Job #{} succesfully completed", jobID);
125                }
126                job.setStatus(newStatus);
127                job.appendHarvestErrors(cmsg.getHarvestErrors());
128                job.appendHarvestErrorDetails(cmsg.getHarvestErrorDetails());
129                job.appendUploadErrors(cmsg.getUploadErrors());
130                job.appendUploadErrorDetails(cmsg.getUploadErrorDetails());
131            } else {
132                // Received done or failed on already dead job. Bad!
133                // Marking as FAILED, unless oldStatus is DONE (issue NAS-2612)
134                JobStatus newStatus1 = JobStatus.FAILED;
135                // Ignore domainharvestreport if oldstatus either done or failed
136                if (oldStatus.equals(JobStatus.DONE)) {
137                        newStatus1 = JobStatus.DONE;
138                        ignoreDomainHarvestReport = true;
139                } else if (oldStatus.equals(JobStatus.FAILED)) {
140                    ignoreDomainHarvestReport = true;
141                }
142                
143                String message = "Received unexpected CrawlStatusMessage for job " + jobID + " with new status " + newStatus +
144                        ", current state is " + oldStatus+ ". Marking job as " + newStatus1.name() + ". Reported harvestErrors on job: " +  cmsg.getHarvestErrors();
145                job.setStatus(newStatus1);                
146                job.appendHarvestErrors(cmsg.getHarvestErrors());
147                job.appendHarvestErrors(message);
148                job.appendHarvestErrorDetails(cmsg.getHarvestErrors());
149                job.appendHarvestErrorDetails(message);
150                log.warn(message);
151            }
152
153            jobDAOProvider.get().update(job);
154
155            if (!ignoreDomainHarvestReport && cmsg.getDomainHarvestReport() != null) { 
156                cmsg.getDomainHarvestReport().postProcess(job);
157            }
158
159            notifyRunningJobMonitor(new JobEndedMessage(job.getJobID(), newStatus));
160            break;
161        default:
162            log.warn("CrawlStatusMessage tried to update job status to unsupported status {} for job {}",
163                    newStatus, jobID);
164            break;
165        }
166    }
167
168    private void notifyRunningJobMonitor(NetarkivetMessage message) {
169        jmsConnectionProvider.get().send(message);
170    }
171
172    /**
173     * @param msg a given CrawlStatusMessage
174     * @see dk.netarkivet.harvester.distribute.HarvesterMessageHandler#visit(dk.netarkivet.harvester.harvesting.distribute.CrawlStatusMessage)
175     */
176    public void visit(CrawlStatusMessage msg) {
177        ArgumentNotValid.checkNotNull(msg, "msg");
178        processCrawlStatusMessage(msg);
179    }
180
181    /**
182     * Removes the HarvestSchedulerMonitorServer as listener to the JMS scheduler Channel.
183     */
184    @Override
185    public void shutdown() {
186        // FIXME See NAS-1976
187        jmsConnectionProvider.get().removeListener(HarvesterChannels.getTheSched(), this);
188    }
189
190    @Override
191    public void visit(IndexReadyMessage msg) {
192        ArgumentNotValid.checkNotNull(msg, "msg");
193        processIndexReadyMessage(msg);
194    }
195
196    private void processIndexReadyMessage(IndexReadyMessage msg) {
197        // Set isindexready to true if Indexisready is true
198        Long harvestId = msg.getHarvestId();
199        boolean indexisready = msg.getIndexOK();
200        if (harvestDefinitionDAOProvider.get().isSnapshot(harvestId)) {
201            harvestDefinitionDAOProvider.get().setIndexIsReady(harvestId, indexisready);
202            if (indexisready) {
203                log.info("Got message from the IndexServer, that the index is ready for harvest #{}", harvestId);
204            } else {
205                String errMsg = "Got message from IndexServer, that it failed to generate index for" + " harvest # "
206                        + harvestId + ". Deactivating harvest";
207                log.warn(errMsg);
208                HarvestDefinition hd = harvestDefinitionDAOProvider.get().read(harvestId);
209                hd.setActive(false);
210                StringBuilder commentsBuf = new StringBuilder(hd.getComments());
211                commentsBuf.append("\n" + (new Date())
212                        + ": Deactivated by the system because indexserver failed to generate index");
213                hd.setComments(commentsBuf.toString());
214                harvestDefinitionDAOProvider.get().update(hd);
215                notificationsProvider.get().notify(errMsg, NotificationType.ERROR);
216            }
217        } else {
218            log.debug("Ignoring IndexreadyMesssage sent on behalf on selective harvest w/id {}", harvestId);
219        }
220    }
221}