001/*
002 * #%L
003 * Netarchivesuite - harvester
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023
024package dk.netarkivet.harvester.scheduler;
025
026import java.util.Date;
027
028import javax.inject.Provider;
029import javax.jms.MessageListener;
030
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034import dk.netarkivet.common.distribute.JMSConnection;
035import dk.netarkivet.common.distribute.NetarkivetMessage;
036import dk.netarkivet.common.exceptions.ArgumentNotValid;
037import dk.netarkivet.common.lifecycle.ComponentLifeCycle;
038import dk.netarkivet.common.utils.NotificationType;
039import dk.netarkivet.common.utils.Notifications;
040import dk.netarkivet.harvester.datamodel.HarvestDefinition;
041import dk.netarkivet.harvester.datamodel.HarvestDefinitionDAO;
042import dk.netarkivet.harvester.datamodel.Job;
043import dk.netarkivet.harvester.datamodel.JobDAO;
044import dk.netarkivet.harvester.datamodel.JobStatus;
045import dk.netarkivet.harvester.distribute.HarvesterChannels;
046import dk.netarkivet.harvester.distribute.HarvesterMessageHandler;
047import dk.netarkivet.harvester.distribute.IndexReadyMessage;
048import dk.netarkivet.harvester.harvesting.distribute.CrawlProgressMessage;
049import dk.netarkivet.harvester.harvesting.distribute.CrawlStatusMessage;
050import dk.netarkivet.harvester.harvesting.distribute.JobEndedMessage;
051
052/**
053 * Submitted harvesting jobs are registered by listening for CrawlStatusMessages on the
054 * THE_SCHED queue and processes completed harvests.
055 */
056public class HarvestSchedulerMonitorServer
057        extends HarvesterMessageHandler
058        implements MessageListener, ComponentLifeCycle {
059    private static final Logger log = LoggerFactory.getLogger(HarvestSchedulerMonitorServer.class);
060
061    private final Provider<JMSConnection> jmsConnectionProvider;
062    private final Provider<JobDAO> jobDAOProvider;
063    private final Provider<HarvestDefinitionDAO> harvestDefinitionDAOProvider;
064    private final Provider<Notifications> notificationsProvider;
065
066    public HarvestSchedulerMonitorServer(
067            Provider<JMSConnection> jmsConnectionProvider,
068            Provider<JobDAO> jobDAOProvider,
069            Provider<HarvestDefinitionDAO> harvestDefinitionDAOProvider,
070            Provider<Notifications> notificationsProvider) {
071        this.jmsConnectionProvider = jmsConnectionProvider;
072        this.jobDAOProvider = jobDAOProvider;
073        this.harvestDefinitionDAOProvider = harvestDefinitionDAOProvider;
074        this.notificationsProvider = notificationsProvider;
075    }
076
077    @Override
078    public void start() {
079        jmsConnectionProvider.get().setListener(HarvesterChannels.getTheSched(), this);
080    }
081
082    /**
083     * Updates the job status with the information in the message and notifies the HarvestMonitor.
084     * If a DomainHarvestReport is included in either a DOne or FAILED message, the DomainHarvestReport is processed.
085     */
086    private void processCrawlStatusMessage(CrawlStatusMessage cmsg) {
087        long jobID = cmsg.getJobID();
088        JobStatus newStatus = cmsg.getStatusCode();
089        Job job = jobDAOProvider.get().read(Long.valueOf(jobID));
090        JobStatus oldStatus = job.getStatus();
091
092        switch (newStatus) {
093        case STARTED:
094            if (oldStatus == JobStatus.NEW) {
095                log.warn("Received new status 'Started' for unsubmitted job {}", jobID);
096            }
097            if (oldStatus == JobStatus.SUBMITTED || oldStatus == JobStatus.NEW) {
098                log.info("Job #{} has been started by the harvester.", job.getJobID());
099
100                job.setStatus(newStatus);
101                jobDAOProvider.get().update(job);
102
103                notifyRunningJobMonitor(new CrawlProgressMessage(job.getOrigHarvestDefinitionID(), jobID));
104            } else {
105                log.warn("Received new status 'Started' for job {} with current status {}, ignoring.",
106                        job.getJobID(), oldStatus);
107            }
108            break;
109        case DONE:
110        case FAILED:
111            if (oldStatus == JobStatus.STARTED ||
112                    oldStatus == JobStatus.SUBMITTED ||
113                    oldStatus == JobStatus.RESUBMITTED ||
114                    oldStatus == JobStatus.NEW) {
115                if (oldStatus != JobStatus.STARTED) {
116                    log.warn("Received CrawlStatusMessage for job {} with new status {}, current state is {}",
117                            jobID, newStatus ,oldStatus);
118                }
119                if (newStatus == JobStatus.FAILED) {
120                    log.warn("Job {} failed: HarvestErrors = {}\n" + "HarvestErrorDetails = {}\n"
121                                    + "UploadErrors = {}\n" + "UploadErrorDetails = {}", jobID, cmsg.getHarvestErrors(),
122                            cmsg.getHarvestErrorDetails(), cmsg.getUploadErrors(), cmsg.getUploadErrorDetails());
123                } else {
124                    log.info("Job #{} succesfully completed", jobID);
125                }
126                job.setStatus(newStatus);
127                job.appendHarvestErrors(cmsg.getHarvestErrors());
128                job.appendHarvestErrorDetails(cmsg.getHarvestErrorDetails());
129                job.appendUploadErrors(cmsg.getUploadErrors());
130                job.appendUploadErrorDetails(cmsg.getUploadErrorDetails());
131            } else {
132                // Received done or failed on already dead job. Bad!
133                String message = "Received CrawlStatusMessage for job " + jobID + " with new status " + newStatus +
134                        ", current state is " + oldStatus+ ". Marking job as FAILED";
135                log.warn(message);
136                job.setStatus(JobStatus.FAILED);
137                job.appendHarvestErrors(cmsg.getHarvestErrors());
138                job.appendHarvestErrors(message);
139                job.appendHarvestErrorDetails(cmsg.getHarvestErrors());
140                job.appendHarvestErrorDetails(message);
141                log.warn("Job {} failed: {}", jobID, job.getHarvestErrorDetails());
142            }
143
144            jobDAOProvider.get().update(job);
145
146            if (cmsg.getDomainHarvestReport() != null) {
147                cmsg.getDomainHarvestReport().postProcess(job);
148            }
149
150            notifyRunningJobMonitor(new JobEndedMessage(job.getJobID(), newStatus));
151            break;
152        default:
153            log.warn("CrawlStatusMessage tried to update job status to unsupported status {} for job {}",
154                    newStatus, jobID);
155            break;
156        }
157    }
158
159    private void notifyRunningJobMonitor(NetarkivetMessage message) {
160        jmsConnectionProvider.get().send(message);
161    }
162
163    /**
164     * @param msg a given CrawlStatusMessage
165     * @see dk.netarkivet.harvester.distribute.HarvesterMessageHandler#visit(dk.netarkivet.harvester.harvesting.distribute.CrawlStatusMessage)
166     */
167    public void visit(CrawlStatusMessage msg) {
168        ArgumentNotValid.checkNotNull(msg, "msg");
169        processCrawlStatusMessage(msg);
170    }
171
172    /**
173     * Removes the HarvestSchedulerMonitorServer as listener to the JMS scheduler Channel.
174     */
175    @Override
176    public void shutdown() {
177        // FIXME See NAS-1976
178        jmsConnectionProvider.get().removeListener(HarvesterChannels.getTheSched(), this);
179    }
180
181    @Override
182    public void visit(IndexReadyMessage msg) {
183        ArgumentNotValid.checkNotNull(msg, "msg");
184        processIndexReadyMessage(msg);
185    }
186
187    private void processIndexReadyMessage(IndexReadyMessage msg) {
188        // Set isindexready to true if Indexisready is true
189        Long harvestId = msg.getHarvestId();
190        boolean indexisready = msg.getIndexOK();
191        if (harvestDefinitionDAOProvider.get().isSnapshot(harvestId)) {
192            harvestDefinitionDAOProvider.get().setIndexIsReady(harvestId, indexisready);
193            if (indexisready) {
194                log.info("Got message from the IndexServer, that the index is ready for harvest #{}", harvestId);
195            } else {
196                String errMsg = "Got message from IndexServer, that it failed to generate index for" + " harvest # "
197                        + harvestId + ". Deactivating harvest";
198                log.warn(errMsg);
199                HarvestDefinition hd = harvestDefinitionDAOProvider.get().read(harvestId);
200                hd.setActive(false);
201                StringBuilder commentsBuf = new StringBuilder(hd.getComments());
202                commentsBuf.append("\n" + (new Date())
203                        + ": Deactivated by the system because indexserver failed to generate index");
204                hd.setComments(commentsBuf.toString());
205                harvestDefinitionDAOProvider.get().update(hd);
206                notificationsProvider.get().notify(errMsg, NotificationType.ERROR);
207            }
208        } else {
209            log.debug("Ignoring IndexreadyMesssage sent on behalf on selective harvest w/id {}", harvestId);
210        }
211    }
212}