001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.harvester.scheduler; 024 025import java.util.Date; 026import java.util.Iterator; 027import java.util.Timer; 028import java.util.TimerTask; 029 030import javax.inject.Provider; 031 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035import dk.netarkivet.common.lifecycle.ComponentLifeCycle; 036import dk.netarkivet.common.utils.Settings; 037import dk.netarkivet.common.utils.TimeUtils; 038import dk.netarkivet.harvester.HarvesterSettings; 039import dk.netarkivet.harvester.datamodel.Job; 040import dk.netarkivet.harvester.datamodel.JobDAO; 041import dk.netarkivet.harvester.datamodel.JobStatus; 042 043/** 044 * Responsible for cleaning obsolete jobs, see {@link #start()} for details. 045 */ 046public class JobSupervisor implements ComponentLifeCycle { 047 048 /** The logger to use. */ 049 private static final Logger log = LoggerFactory.getLogger(JobSupervisor.class); 050 051 /** For scheduling tasks */ 052 private final Timer timer = new Timer(); 053 054 private final Provider<JobDAO> jobDaoProvider; 055 private final Long jobTimeoutTime; 056 057 /** 058 * @param jobDaoProvider Used for accessing the jobdao. 059 * @param jobTimeoutTime timeout in seconds. 060 */ 061 public JobSupervisor(Provider<JobDAO> jobDaoProvider, Long jobTimeoutTime) { 062 this.jobDaoProvider = jobDaoProvider; 063 this.jobTimeoutTime = jobTimeoutTime; 064 } 065 066 /** 067 * <ol> 068 * <li>Starts the rescheduling of left over jobs (in a separate thread). 069 * <li>Starts the timer for cleaning old jobs. eg. jobs that have been run longer than 070 * {@link HarvesterSettings#JOB_TIMEOUT_TIME}. 071 * </ol> 072 */ 073 @Override 074 public void start() { 075 Thread thread = new Thread(new Runnable() { 076 public void run() { 077 rescheduleLeftOverJobs(); 078 } 079 }); 080 thread.start(); 081 082 timer.schedule(new TimerTask() { 083 public void run() { 084 cleanOldJobs(); 085 } 086 }, Settings.getInt(HarvesterSettings.JOB_TIMEOUT_TIME)); 087 } 088 089 @Override 090 public void shutdown() { 091 timer.cancel(); 092 } 093 094 /** 095 * Reschedule all jobs with JobStatus SUBMITTED. Runs in a separate thread to avoid blocking. 096 * <p> 097 * Package protected to allow unit testing. 098 */ 099 void rescheduleLeftOverJobs() { 100 final Iterator<Long> jobs = jobDaoProvider.get().getAllJobIds(JobStatus.SUBMITTED); 101 int resubmitcount = 0; 102 while (jobs.hasNext()) { 103 long oldID = jobs.next(); 104 long newID = jobDaoProvider.get().rescheduleJob(oldID); 105 log.info("Resubmitting old job {} as {}", oldID, newID); 106 ++resubmitcount; 107 } 108 log.info("{} jobs has been resubmitted.", resubmitcount); 109 } 110 111 /** 112 * Stops any job that has been in status STARTED a very long time defined by the 113 * {@link HarvesterSettings#JOB_TIMEOUT_TIME} setting. 114 * <p> 115 * Package protected to allow unit testing. 116 */ 117 void cleanOldJobs() { 118 try { 119 final Iterator<Long> startedJobs = jobDaoProvider.get().getAllJobIds(JobStatus.STARTED); 120 int stoppedJobs = 0; 121 while (startedJobs.hasNext()) { 122 long id = startedJobs.next(); 123 Job job = jobDaoProvider.get().read(id); 124 125 long timeDiff = jobTimeoutTime * TimeUtils.SECOND_IN_MILLIS; 126 Date endTime = new Date(); 127 endTime.setTime(job.getActualStart().getTime() + timeDiff); 128 if (new Date().after(endTime)) { 129 final String msg = " Job " + id + " has exceeded its timeout of " 130 + (jobTimeoutTime / TimeUtils.HOUR_IN_MINUTES) + " minutes." + " Changing status to " 131 + "FAILED."; 132 log.warn(msg); 133 job.setStatus(JobStatus.FAILED); 134 job.appendHarvestErrors(msg); 135 jobDaoProvider.get().update(job); 136 ++stoppedJobs; 137 } 138 } 139 if (stoppedJobs > 0) { 140 log.warn("Changed {} jobs from STARTED to FAILED", stoppedJobs); 141 } 142 } catch (Throwable t) { 143 log.error("Unable to stop obsolete jobs", t); 144 } 145 } 146 147}