001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023 024package dk.netarkivet.harvester.scheduler; 025 026import java.util.Collections; 027import java.util.Date; 028import java.util.HashMap; 029import java.util.HashSet; 030import java.util.Map; 031import java.util.Set; 032 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036import dk.netarkivet.common.lifecycle.ComponentLifeCycle; 037import dk.netarkivet.common.lifecycle.PeriodicTaskExecutor; 038import dk.netarkivet.common.utils.NotificationType; 039import dk.netarkivet.common.utils.NotificationsFactory; 040import dk.netarkivet.common.utils.Settings; 041import dk.netarkivet.harvester.HarvesterSettings; 042import dk.netarkivet.harvester.datamodel.HarvestChannel; 043import dk.netarkivet.harvester.datamodel.HarvestChannelDAO; 044import dk.netarkivet.harvester.datamodel.HarvestDefinition; 045import dk.netarkivet.harvester.datamodel.HarvestDefinitionDAO; 046import dk.netarkivet.harvester.scheduler.jobgen.JobGenerator; 047import dk.netarkivet.harvester.scheduler.jobgen.JobGeneratorFactory; 048 049/** 050 * Handles the generation of new jobs based on the harvest definitions in persistent storage. The 051 * <code>HarvestJobGenerator</code> continuously scans the harvest definition database for harvest which should be run 052 * now. If a HD defines a harvest which should be run, a Harvest Job is created in the harvest job database. 053 */ 054public class HarvestJobGenerator implements ComponentLifeCycle { 055 056 /** The class logger. */ 057 private static final Logger log = LoggerFactory.getLogger(HarvestJobGenerator.class); 058 059 /** 060 * The set of HDs (or rather their OIDs) that are currently being scheduled in a separate thread. This set is a 061 * SynchronizedSet 062 */ 063 protected static Set<Long> harvestDefinitionsBeingScheduled = Collections.synchronizedSet(new HashSet<Long>()); 064 065 /** 066 * Used the store the currenttimeMillis when the scheduling of a particular harvestdefinition # started or when last 067 * a warning was issued. 068 */ 069 protected static Map<Long, Long> schedulingStartedMap = Collections.synchronizedMap(new HashMap<Long, Long>()); 070 071 /** The executor used to schedule the generator jobs. */ 072 private PeriodicTaskExecutor genExec; 073 074 /** @see HarvesterSettings#JOBGEN_POSTPONE_UNREGISTERED_HARVEST_CHANNEL */ 075 private static final boolean postponeUnregisteredChannel = Settings 076 .getBoolean(HarvesterSettings.JOBGEN_POSTPONE_UNREGISTERED_HARVEST_CHANNEL); 077 078 /** The HarvestDefinitionDAO used by the HarvestJobGenerator. */ 079 private static final HarvestDefinitionDAO haDefinitionDAO = HarvestDefinitionDAO.getInstance(); 080 081 private final HarvestChannelRegistry harvestChannelRegistry; 082 083 public HarvestJobGenerator(final HarvestChannelRegistry harvestChannelRegistry) { 084 this.harvestChannelRegistry = harvestChannelRegistry; 085 } 086 087 /** 088 * Starts the job generation scheduler. 089 */ 090 @Override 091 public void start() { 092 genExec = new PeriodicTaskExecutor("JobGeneratorTask", new JobGeneratorTask(harvestChannelRegistry), 0, 093 Settings.getInt(HarvesterSettings.GENERATE_JOBS_PERIOD)); 094 } 095 096 @Override 097 public void shutdown() { 098 if (genExec != null) { 099 genExec.shutdown(); 100 } 101 } 102 103 /** 104 * Contains the functionality for the individual JobGenerations. 105 */ 106 static class JobGeneratorTask implements Runnable { 107 108 private final HarvestChannelRegistry harvestChannelRegistry; 109 110 public JobGeneratorTask(HarvestChannelRegistry harvestChannelRegistry) { 111 this.harvestChannelRegistry = harvestChannelRegistry; 112 } 113 114 @Override 115 public synchronized void run() { 116 try { 117 generateJobs(new Date()); 118 } catch (Exception e) { 119 log.info("Exception caught at fault barrier while generating jobs.", e); 120 } 121 } 122 123 /** 124 * Check if jobs should be generated for any ready harvest definitions for the specified time. 125 * 126 * @param timeToGenerateJobsFor Jobs will be generated which should be run at this time. Note: In a production 127 * system the provided time will normally be current time, but during testing we need to simulated other 128 * points-in-time 129 */ 130 void generateJobs(Date timeToGenerateJobsFor) { 131 final Iterable<Long> readyHarvestDefinitions = haDefinitionDAO 132 .getReadyHarvestDefinitions(timeToGenerateJobsFor); 133 134 HarvestChannelDAO hChanDao = HarvestChannelDAO.getInstance(); 135 136 for (final Long id : readyHarvestDefinitions) { 137 // Make every HD run in its own thread, but at most once. 138 if (harvestDefinitionsBeingScheduled.contains(id)) { 139 if (takesSuspiciouslyLongToSchedule(id)) { 140 String harvestName = haDefinitionDAO.getHarvestName(id); 141 String errMsg = "Not creating jobs for harvestdefinition #" + id + " (" + harvestName + ")" 142 + " as the previous scheduling is still running"; 143 if (haDefinitionDAO.isSnapshot(id)) { 144 // Log only at level debug if the ID represents 145 // is a snapshot harvestdefinition, which are only run 146 // once anyway 147 log.debug(errMsg); 148 } else { // Log at level WARN, and send a notification, if it is time 149 log.warn(errMsg); 150 NotificationsFactory.getInstance().notify(errMsg, NotificationType.WARNING); 151 } 152 } 153 continue; 154 } 155 156 final HarvestDefinition harvestDefinition = haDefinitionDAO.read(id); 157 158 if (!harvestDefinition.isSnapShot()) { 159 Long chanId = harvestDefinition.getChannelId(); 160 161 HarvestChannel chan = (chanId == null ? hChanDao.getDefaultChannel(false) : hChanDao 162 .getById(chanId)); 163 164 String channelName = chan.getName(); 165 if (postponeUnregisteredChannel && !harvestChannelRegistry.isRegistered(channelName)) { 166 log.info("Harvest channel '{}' has not yet been registered by any harvester, hence harvest " 167 + "definition '{}' ({}) cannot be processed by the job generator for now.", 168 channelName, harvestDefinition.getName(), id); 169 continue; 170 } 171 } 172 173 harvestDefinitionsBeingScheduled.add(id); 174 schedulingStartedMap.put(id, System.currentTimeMillis()); 175 176 if (!harvestDefinition.runNow(timeToGenerateJobsFor)) { 177 log.trace("The harvestdefinition #{}'{}' should not run now.", id, harvestDefinition.getName()); 178 log.trace("numEvents: {}", harvestDefinition.getNumEvents()); 179 continue; 180 } 181 182 log.info("Starting to create jobs for harvest definition #{}({})", id, harvestDefinition.getName()); 183 184 new Thread("JobGeneratorTask-" + id) { 185 public void run() { 186 try { 187 JobGenerator jobGen = JobGeneratorFactory.getInstance(); 188 int jobsMade = jobGen.generateJobs(harvestDefinition); 189 if (jobsMade > 0) { 190 log.info("Created {} jobs for harvest definition ({})", jobsMade, 191 harvestDefinition.getName()); 192 } else { 193 String msg = "No jobs created for harvest definition '" 194 + harvestDefinition.getName() 195 + "'. Probable cause: harvest tries to continue harvest that is already finished "; 196 log.warn(msg); 197 NotificationsFactory.getInstance().notify(msg, NotificationType.WARNING); 198 } 199 haDefinitionDAO.update(harvestDefinition); 200 } catch (Throwable e) { 201 try { 202 HarvestDefinition hd = haDefinitionDAO.read(harvestDefinition.getOid()); 203 hd.setActive(false); 204 haDefinitionDAO.update(hd); 205 String errMsg = "Exception while scheduling harvestdefinition #" + id + "(" 206 + harvestDefinition.getName() + "). The harvestdefinition has been " 207 + "deactivated!"; 208 log.warn(errMsg, e); 209 NotificationsFactory.getInstance().notify(errMsg, NotificationType.ERROR, e); 210 } catch (Exception e1) { 211 String errMsg = "Exception while scheduling harvestdefinition #" + id + "(" 212 + harvestDefinition.getName() + "). The harvestdefinition couldn't be " 213 + "deactivated!"; 214 log.warn(errMsg, e); 215 log.warn("Unable to deactivate", e1); 216 NotificationsFactory.getInstance().notify(errMsg, NotificationType.ERROR, e); 217 } 218 } finally { 219 harvestDefinitionsBeingScheduled.remove(id); 220 schedulingStartedMap.remove(id); 221 log.debug("Removed HD #{}({}) from list of harvestdefinitions to be scheduled. " 222 + "Harvestdefinitions still to be scheduled: {}", id, harvestDefinition.getName(), 223 harvestDefinitionsBeingScheduled); 224 } 225 } 226 }.start(); 227 } 228 } 229 230 /** 231 * Find out if a scheduling takes more than is acceptable currently 5 minutes. 232 * 233 * @param harvestId A given harvestId 234 * @return true, if a scheduling of the given harvestId has taken more than 5 minutes, or false, if not or no 235 * scheduling for this harvestId is underway 236 */ 237 private static boolean takesSuspiciouslyLongToSchedule(Long harvestId) { 238 // acceptable delay before issuing warning is currently hard-wired to 239 // 5 minutes (5 * 60 * 1000 milliseconds) 240 final long acceptableDelay = 5 * 60 * 1000; 241 Long timewhenscheduled = schedulingStartedMap.get(harvestId); 242 if (timewhenscheduled == null) { 243 return false; 244 } else { 245 long now = System.currentTimeMillis(); 246 if (timewhenscheduled + acceptableDelay < now) { 247 // updates the schedulingStartedMap with currenttime for 248 // the given harvestID when returning true 249 schedulingStartedMap.put(harvestId, now); 250 return true; 251 } else { 252 return false; 253 } 254 } 255 } 256 } 257 258}