001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2018 The Royal Danish Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023 024package dk.netarkivet.harvester.scheduler; 025 026import java.util.Collections; 027import java.util.Date; 028import java.util.HashMap; 029import java.util.HashSet; 030import java.util.Map; 031import java.util.Set; 032 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036import dk.netarkivet.common.lifecycle.ComponentLifeCycle; 037import dk.netarkivet.common.lifecycle.PeriodicTaskExecutor; 038import dk.netarkivet.common.utils.NotificationType; 039import dk.netarkivet.common.utils.NotificationsFactory; 040import dk.netarkivet.common.utils.Settings; 041import dk.netarkivet.harvester.HarvesterSettings; 042import dk.netarkivet.harvester.datamodel.HarvestChannel; 043import dk.netarkivet.harvester.datamodel.HarvestChannelDAO; 044import dk.netarkivet.harvester.datamodel.HarvestDefinition; 045import dk.netarkivet.harvester.datamodel.HarvestDefinitionDAO; 046import dk.netarkivet.harvester.scheduler.jobgen.JobGenerator; 047import dk.netarkivet.harvester.scheduler.jobgen.JobGeneratorFactory; 048 049/** 050 * Handles the generation of new jobs based on the harvest definitions in persistent storage. The 051 * <code>HarvestJobGenerator</code> continuously scans the harvest definition database for harvest which should be run 052 * now. If a HD defines a harvest which should be run, a Harvest Job is created in the harvest job database. 053 */ 054public class HarvestJobGenerator implements ComponentLifeCycle { 055 056 /** The class logger. */ 057 private static final Logger log = LoggerFactory.getLogger(HarvestJobGenerator.class); 058 059 /** 060 * The set of HDs (or rather their OIDs) that are currently being scheduled in a separate thread. This set is a 061 * SynchronizedSet 062 */ 063 protected static Set<Long> harvestDefinitionsBeingScheduled = Collections.synchronizedSet(new HashSet<Long>()); 064 065 /** 066 * A map giving access to the thread generating jobs for a given harvest definition. 067 */ 068 protected static Map<Long, JobGeneratorTask.JobGeneratorThread> threadMap = Collections.synchronizedMap(new HashMap<>()); 069 070 /** 071 * Used the store the currenttimeMillis when the scheduling of a particular harvestdefinition # started or when last 072 * a warning was issued. 073 */ 074 protected static Map<Long, Long> schedulingStartedMap = Collections.synchronizedMap(new HashMap<Long, Long>()); 075 076 /** The executor used to schedule the generator jobs. */ 077 private PeriodicTaskExecutor genExec; 078 079 /** @see HarvesterSettings#JOBGEN_POSTPONE_UNREGISTERED_HARVEST_CHANNEL */ 080 private static final boolean postponeUnregisteredChannel = Settings 081 .getBoolean(HarvesterSettings.JOBGEN_POSTPONE_UNREGISTERED_HARVEST_CHANNEL); 082 083 /** The HarvestDefinitionDAO used by the HarvestJobGenerator. */ 084 private static final HarvestDefinitionDAO haDefinitionDAO = HarvestDefinitionDAO.getInstance(); 085 086 private final HarvestChannelRegistry harvestChannelRegistry; 087 088 public HarvestJobGenerator(final HarvestChannelRegistry harvestChannelRegistry) { 089 this.harvestChannelRegistry = harvestChannelRegistry; 090 } 091 092 /** 093 * Starts the job generation scheduler. 094 */ 095 @Override 096 public void start() { 097 int generateJobsPeriod = Settings.getInt(HarvesterSettings.GENERATE_JOBS_PERIOD); 098 genExec = new PeriodicTaskExecutor("JobGeneratorTask", new JobGeneratorTask(harvestChannelRegistry), 0, generateJobsPeriod); 099 log.info("JobGeneratorTask set to run every {} seconds", generateJobsPeriod); 100 } 101 102 @Override 103 public void shutdown() { 104 if (genExec != null) { 105 genExec.shutdown(); 106 } 107 } 108 109 /** 110 * Contains the functionality for the individual JobGenerations. 111 */ 112 static class JobGeneratorTask implements Runnable { 113 114 private final HarvestChannelRegistry harvestChannelRegistry; 115 116 public JobGeneratorTask(HarvestChannelRegistry harvestChannelRegistry) { 117 this.harvestChannelRegistry = harvestChannelRegistry; 118 } 119 120 @Override 121 public void run() { 122 try { 123 generateJobs(new Date()); 124 } catch (Exception e) { 125 log.info("Exception caught at fault barrier while generating jobs.", e); 126 } 127 } 128 129 /** 130 * Check if jobs should be generated for any ready harvest definitions for the specified time. 131 * 132 * @param timeToGenerateJobsFor Jobs will be generated which should be run at this time. Note: In a production 133 * system the provided time will normally be current time, but during testing we need to simulated other 134 * points-in-time 135 */ 136 void generateJobs(Date timeToGenerateJobsFor) { 137 final Iterable<Long> readyHarvestDefinitions = haDefinitionDAO 138 .getReadyHarvestDefinitions(timeToGenerateJobsFor); 139 log.trace("Generating jobs for harvests that should run at time '{}'", timeToGenerateJobsFor); 140 HarvestChannelDAO hChanDao = HarvestChannelDAO.getInstance(); 141 142 for (final Long id : readyHarvestDefinitions) { 143 // Make every HD run in its own thread, but at most once. 144 synchronized(harvestDefinitionsBeingScheduled) { 145 if (harvestDefinitionsBeingScheduled.contains(id)) { 146 if (takesSuspiciouslyLongToSchedule(id)) { 147 String harvestName = haDefinitionDAO.getHarvestName(id); 148 String errMsg = "Possible problem creating jobs for harvestdefinition #" + id + " (" + harvestName + ")" 149 + " as the previous scheduling is still running. Trying to recover."; 150 if (haDefinitionDAO.isSnapshot(id)) { 151 // Log only at level debug if the ID represents 152 // is a snapshot harvestdefinition, which are only run 153 // once anyway 154 log.debug(errMsg); 155 continue; 156 } else { // Log at level WARN, and send a notification, if it is time 157 log.warn(errMsg); 158 threadMap.get(id).killScheduling(); 159 NotificationsFactory.getInstance().notify(errMsg, NotificationType.WARNING); 160 } 161 } else { 162 log.debug("We'll skip HD #{}. Jobgeneration of it has been running since {}", id, 163 new Date(schedulingStartedMap.get(id)) ); 164 continue; 165 } 166 } else { 167 harvestDefinitionsBeingScheduled.add(id); // mark the harvest as being scheduled right now 168 } 169 } 170 171 final HarvestDefinition harvestDefinition = haDefinitionDAO.read(id); 172 173 if (!harvestDefinition.isSnapShot()) { 174 Long chanId = harvestDefinition.getChannelId(); 175 176 HarvestChannel chan = (chanId == null ? hChanDao.getDefaultChannel(false) : hChanDao 177 .getById(chanId)); 178 179 String channelName = chan.getName(); 180 if (postponeUnregisteredChannel && !harvestChannelRegistry.isRegistered(channelName)) { 181 log.info("Harvest channel '{}' has not yet been registered by any harvester, hence harvest " 182 + "definition '{}' ({}) cannot be processed by the job generator for now.", 183 channelName, harvestDefinition.getName(), id); 184 harvestDefinitionsBeingScheduled.remove(id); 185 continue; 186 } 187 } 188 189 schedulingStartedMap.put(id, System.currentTimeMillis()); 190 191 if (!harvestDefinition.runNow(timeToGenerateJobsFor)) { 192 log.trace("The harvestdefinition #{}'{}' should not run now.", id, harvestDefinition.getName()); 193 log.trace("numEvents: {}", harvestDefinition.getNumEvents()); 194 continue; 195 } 196 197 log.info("Starting to create jobs for harvest definition #{}({})", id, harvestDefinition.getName()); 198 199 final JobGeneratorThread jobGeneratorThread = new JobGeneratorThread(id, harvestDefinition); 200 threadMap.put(id, jobGeneratorThread); 201 jobGeneratorThread.start(); 202 } 203 } 204 205 /** 206 * Find out if a scheduling takes more than is acceptable currently 5 minutes. 207 * 208 * @param harvestId A given harvestId 209 * @return true, if a scheduling of the given harvestId has taken more than 5 minutes, or false, if not or no 210 * scheduling for this harvestId is underway 211 */ 212 private static boolean takesSuspiciouslyLongToSchedule(Long harvestId) { 213 // acceptable delay before issuing warning is currently hard-wired to 214 // 5 minutes (5 * 60 * 1000 milliseconds) 215 final long acceptableDelay = 5 * 60 * 1000; 216 Long timewhenscheduled = schedulingStartedMap.get(harvestId); 217 if (timewhenscheduled == null) { 218 return false; 219 } else { 220 long now = System.currentTimeMillis(); 221 if (timewhenscheduled + acceptableDelay < now) { 222 // updates the schedulingStartedMap with currenttime for 223 // the given harvestID when returning true 224 schedulingStartedMap.put(harvestId, now); 225 return true; 226 } else { 227 return false; 228 } 229 } 230 } 231 232 private static class JobGeneratorThread extends Thread { 233 private final Long id; 234 private final HarvestDefinition harvestDefinition; 235 236 public JobGeneratorThread(Long id, HarvestDefinition harvestDefinition) { 237 super("JobGeneratorTask-" + id); 238 this.id = id; 239 this.harvestDefinition = harvestDefinition; 240 } 241 242 public void run() { 243 try { 244 JobGenerator jobGen = JobGeneratorFactory.getInstance(); 245 int jobsMade = jobGen.generateJobs(harvestDefinition); 246 if (jobsMade > 0) { 247 log.info("Created {} jobs for harvest definition ({})", jobsMade, 248 harvestDefinition.getName()); 249 } else { 250 String msg = "No jobs created for harvest definition '" 251 + harvestDefinition.getName() 252 + "'. Probable cause: harvest tries to continue harvest that is already finished "; 253 log.warn(msg); 254 NotificationsFactory.getInstance().notify(msg, NotificationType.WARNING); 255 } 256 haDefinitionDAO.update(harvestDefinition); 257 } catch (Throwable e) { 258 try { 259 HarvestDefinition hd = haDefinitionDAO.read(harvestDefinition.getOid()); 260 hd.setActive(false); 261 haDefinitionDAO.update(hd); 262 String errMsg = "Exception while scheduling harvestdefinition #" + id + "(" 263 + harvestDefinition.getName() + "). The harvestdefinition has been " 264 + "deactivated!"; 265 log.warn(errMsg, e); 266 NotificationsFactory.getInstance().notify(errMsg, NotificationType.ERROR, e); 267 } catch (Exception e1) { 268 String errMsg = "Exception while scheduling harvestdefinition #" + id + "(" 269 + harvestDefinition.getName() + "). The harvestdefinition couldn't be " 270 + "deactivated!"; 271 log.warn(errMsg, e); 272 log.warn("Unable to deactivate", e1); 273 NotificationsFactory.getInstance().notify(errMsg, NotificationType.ERROR, e); 274 } 275 } finally { 276 killScheduling(); 277 } 278 } 279 280 public void killScheduling() { 281 harvestDefinitionsBeingScheduled.remove(id); 282 schedulingStartedMap.remove(id); 283 threadMap.remove(id); 284 log.debug("Removed HD #{}({}) from list of harvestdefinitions to be scheduled. " 285 + "Harvestdefinitions still to be scheduled: {}", id, harvestDefinition.getName(), 286 harvestDefinitionsBeingScheduled); 287 } 288 } 289 } 290 291}