001/*
002 * #%L
003 * Netarchivesuite - harvester
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.harvester.scheduler;
024
025import java.util.Date;
026import java.util.Iterator;
027import java.util.Timer;
028import java.util.TimerTask;
029
030import javax.inject.Provider;
031
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035import dk.netarkivet.common.lifecycle.ComponentLifeCycle;
036import dk.netarkivet.common.utils.Settings;
037import dk.netarkivet.common.utils.TimeUtils;
038import dk.netarkivet.harvester.HarvesterSettings;
039import dk.netarkivet.harvester.datamodel.Job;
040import dk.netarkivet.harvester.datamodel.JobDAO;
041import dk.netarkivet.harvester.datamodel.JobStatus;
042
043/**
044 * Responsible for cleaning obsolete jobs, see {@link #start()} for details.
045 */
046public class JobSupervisor implements ComponentLifeCycle {
047
048    /** The logger to use. */
049    private static final Logger log = LoggerFactory.getLogger(JobSupervisor.class);
050
051    /** For scheduling tasks */
052    private final Timer timer = new Timer();
053
054    private final Provider<JobDAO> jobDaoProvider;
055    private final Long jobTimeoutTime;
056
057    /**
058     * @param jobDaoProvider Used for accessing the jobdao.
059     * @param jobTimeoutTime timeout in seconds.
060     */
061    public JobSupervisor(Provider<JobDAO> jobDaoProvider, Long jobTimeoutTime) {
062        this.jobDaoProvider = jobDaoProvider;
063        this.jobTimeoutTime = jobTimeoutTime;
064    }
065
066    /**
067     * <ol>
068     * <li>Starts the rescheduling of left over jobs (in a separate thread).
069     * <li>Starts the timer for cleaning old jobs. eg. jobs that have been run longer than
070     * {@link HarvesterSettings#JOB_TIMEOUT_TIME}.
071     * </ol>
072     */
073    @Override
074    public void start() {
075        Thread thread = new Thread(new Runnable() {
076            public void run() {
077                rescheduleLeftOverJobs();
078            }
079        });
080        thread.start();
081
082        timer.schedule(new TimerTask() {
083            public void run() {
084                cleanOldJobs();
085            }
086        }, Settings.getInt(HarvesterSettings.JOB_TIMEOUT_TIME));
087    }
088
089    @Override
090    public void shutdown() {
091        timer.cancel();
092    }
093
094    /**
095     * Reschedule all jobs with JobStatus SUBMITTED. Runs in a separate thread to avoid blocking.
096     * <p>
097     * Package protected to allow unit testing.
098     */
099    void rescheduleLeftOverJobs() {
100        final Iterator<Long> jobs = jobDaoProvider.get().getAllJobIds(JobStatus.SUBMITTED);
101        int resubmitcount = 0;
102        while (jobs.hasNext()) {
103            long oldID = jobs.next();
104            long newID = jobDaoProvider.get().rescheduleJob(oldID);
105            log.info("Resubmitting old job {} as {}", oldID, newID);
106            ++resubmitcount;
107        }
108        log.info("{} jobs has been resubmitted.", resubmitcount);
109    }
110
111    /**
112     * Stops any job that has been in status STARTED a very long time defined by the
113     * {@link HarvesterSettings#JOB_TIMEOUT_TIME} setting.
114     * <p>
115     * Package protected to allow unit testing.
116     */
117    void cleanOldJobs() {
118        try {
119            final Iterator<Long> startedJobs = jobDaoProvider.get().getAllJobIds(JobStatus.STARTED);
120            int stoppedJobs = 0;
121            while (startedJobs.hasNext()) {
122                long id = startedJobs.next();
123                Job job = jobDaoProvider.get().read(id);
124
125                long timeDiff = jobTimeoutTime * TimeUtils.SECOND_IN_MILLIS;
126                Date endTime = new Date();
127                endTime.setTime(job.getActualStart().getTime() + timeDiff);
128                if (new Date().after(endTime)) {
129                    final String msg = " Job " + id + " has exceeded its timeout of "
130                            + (jobTimeoutTime / TimeUtils.HOUR_IN_MINUTES) + " minutes." + " Changing status to "
131                            + "FAILED.";
132                    log.warn(msg);
133                    job.setStatus(JobStatus.FAILED);
134                    job.appendHarvestErrors(msg);
135                    jobDaoProvider.get().update(job);
136                    ++stoppedJobs;
137                }
138            }
139            if (stoppedJobs > 0) {
140                log.warn("Changed {} jobs from STARTED to FAILED", stoppedJobs);
141            }
142        } catch (Throwable t) {
143            log.error("Unable to stop obsolete jobs", t);
144        }
145    }
146
147}