001/*
002 * #%L
003 * Netarchivesuite - harvester
004 * %%
005 * Copyright (C) 2005 - 2018 The Royal Danish Library, 
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023
024package dk.netarkivet.harvester.scheduler;
025
026import java.util.Collections;
027import java.util.Date;
028import java.util.HashMap;
029import java.util.HashSet;
030import java.util.Map;
031import java.util.Set;
032
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import dk.netarkivet.common.lifecycle.ComponentLifeCycle;
037import dk.netarkivet.common.lifecycle.PeriodicTaskExecutor;
038import dk.netarkivet.common.utils.NotificationType;
039import dk.netarkivet.common.utils.NotificationsFactory;
040import dk.netarkivet.common.utils.Settings;
041import dk.netarkivet.harvester.HarvesterSettings;
042import dk.netarkivet.harvester.datamodel.HarvestChannel;
043import dk.netarkivet.harvester.datamodel.HarvestChannelDAO;
044import dk.netarkivet.harvester.datamodel.HarvestDefinition;
045import dk.netarkivet.harvester.datamodel.HarvestDefinitionDAO;
046import dk.netarkivet.harvester.scheduler.jobgen.JobGenerator;
047import dk.netarkivet.harvester.scheduler.jobgen.JobGeneratorFactory;
048
049/**
050 * Handles the generation of new jobs based on the harvest definitions in persistent storage. The
051 * <code>HarvestJobGenerator</code> continuously scans the harvest definition database for harvest which should be run
052 * now. If a HD defines a harvest which should be run, a Harvest Job is created in the harvest job database.
053 */
054public class HarvestJobGenerator implements ComponentLifeCycle {
055
056    /** The class logger. */
057    private static final Logger log = LoggerFactory.getLogger(HarvestJobGenerator.class);
058
059    /**
060     * The set of HDs (or rather their OIDs) that are currently being scheduled in a separate thread. This set is a
061     * SynchronizedSet
062     */
063    protected static Set<Long> harvestDefinitionsBeingScheduled = Collections.synchronizedSet(new HashSet<Long>());
064
065    /**
066     * A map giving access to the thread generating jobs for a given harvest definition.
067     */
068    protected static Map<Long, JobGeneratorTask.JobGeneratorThread> threadMap = Collections.synchronizedMap(new HashMap<>());
069
070    /**
071     * Used the store the currenttimeMillis when the scheduling of a particular harvestdefinition # started or when last
072     * a warning was issued.
073     */
074    protected static Map<Long, Long> schedulingStartedMap = Collections.synchronizedMap(new HashMap<Long, Long>());
075
076    /** The executor used to schedule the generator jobs. */
077    private PeriodicTaskExecutor genExec;
078
079    /** @see HarvesterSettings#JOBGEN_POSTPONE_UNREGISTERED_HARVEST_CHANNEL */
080    private static final boolean postponeUnregisteredChannel = Settings
081            .getBoolean(HarvesterSettings.JOBGEN_POSTPONE_UNREGISTERED_HARVEST_CHANNEL);
082
083    /** The HarvestDefinitionDAO used by the HarvestJobGenerator. */
084    private static final HarvestDefinitionDAO haDefinitionDAO = HarvestDefinitionDAO.getInstance();
085
086    private final HarvestChannelRegistry harvestChannelRegistry;
087
088    public HarvestJobGenerator(final HarvestChannelRegistry harvestChannelRegistry) {
089        this.harvestChannelRegistry = harvestChannelRegistry;
090    }
091
092    /**
093     * Starts the job generation scheduler.
094     */
095    @Override
096    public void start() {
097        int generateJobsPeriod = Settings.getInt(HarvesterSettings.GENERATE_JOBS_PERIOD);
098        genExec = new PeriodicTaskExecutor("JobGeneratorTask", new JobGeneratorTask(harvestChannelRegistry), 0, generateJobsPeriod);
099        log.info("JobGeneratorTask set to run every {} seconds", generateJobsPeriod);
100    }
101
102    @Override
103    public void shutdown() {
104        if (genExec != null) {
105            genExec.shutdown();
106        }
107    }
108
109    /**
110     * Contains the functionality for the individual JobGenerations.
111     */
112    static class JobGeneratorTask implements Runnable {
113
114        private final HarvestChannelRegistry harvestChannelRegistry;
115        
116        public JobGeneratorTask(HarvestChannelRegistry harvestChannelRegistry) {
117            this.harvestChannelRegistry = harvestChannelRegistry;
118        }
119
120        @Override
121        public void run() {
122            try {
123                generateJobs(new Date());
124            } catch (Exception e) {
125                log.info("Exception caught at fault barrier while generating jobs.", e);
126            }
127        }
128
129        /**
130         * Check if jobs should be generated for any ready harvest definitions for the specified time.
131         *
132         * @param timeToGenerateJobsFor Jobs will be generated which should be run at this time. Note: In a production
133         * system the provided time will normally be current time, but during testing we need to simulated other
134         * points-in-time
135         */
136        void generateJobs(Date timeToGenerateJobsFor) {
137            final Iterable<Long> readyHarvestDefinitions = haDefinitionDAO
138                    .getReadyHarvestDefinitions(timeToGenerateJobsFor);
139            log.trace("Generating jobs for harvests that should run at time '{}'", timeToGenerateJobsFor);
140            HarvestChannelDAO hChanDao = HarvestChannelDAO.getInstance();
141            
142            for (final Long id : readyHarvestDefinitions) {
143                // Make every HD run in its own thread, but at most once.
144                synchronized(harvestDefinitionsBeingScheduled) {
145                    if (harvestDefinitionsBeingScheduled.contains(id)) {
146                        if (takesSuspiciouslyLongToSchedule(id)) {
147                            String harvestName = haDefinitionDAO.getHarvestName(id);
148                            String errMsg = "Possible problem creating jobs for harvestdefinition #" + id + " (" + harvestName + ")"
149                                    + " as the previous scheduling is still running. Trying to recover.";
150                            if (haDefinitionDAO.isSnapshot(id)) {
151                                // Log only at level debug if the ID represents
152                                // is a snapshot harvestdefinition, which are only run
153                                // once anyway
154                                log.debug(errMsg);
155                                continue;
156                            } else { // Log at level WARN, and send a notification, if it is time
157                                log.warn(errMsg);
158                                threadMap.get(id).killScheduling();
159                                NotificationsFactory.getInstance().notify(errMsg, NotificationType.WARNING);
160                            }
161                        } else {
162                            log.debug("We'll skip HD #{}. Jobgeneration of it has been running since {}", id, 
163                                    new Date(schedulingStartedMap.get(id)) );
164                            continue;
165                        }
166                    } else {
167                        harvestDefinitionsBeingScheduled.add(id); // mark the harvest as being scheduled right now
168                    }
169                }
170
171                final HarvestDefinition harvestDefinition = haDefinitionDAO.read(id);
172
173                if (!harvestDefinition.isSnapShot()) {
174                    Long chanId = harvestDefinition.getChannelId();
175
176                    HarvestChannel chan = (chanId == null ? hChanDao.getDefaultChannel(false) : hChanDao
177                            .getById(chanId));
178
179                    String channelName = chan.getName();
180                    if (postponeUnregisteredChannel && !harvestChannelRegistry.isRegistered(channelName)) {
181                        log.info("Harvest channel '{}' has not yet been registered by any harvester, hence harvest "
182                                + "definition '{}' ({}) cannot be processed by the job generator for now.",
183                                channelName, harvestDefinition.getName(), id);
184                        harvestDefinitionsBeingScheduled.remove(id);
185                        continue;
186                    }
187                }
188                
189                schedulingStartedMap.put(id, System.currentTimeMillis());
190
191                if (!harvestDefinition.runNow(timeToGenerateJobsFor)) {
192                    log.trace("The harvestdefinition #{}'{}' should not run now.", id, harvestDefinition.getName());
193                    log.trace("numEvents: {}", harvestDefinition.getNumEvents());
194                    continue;
195                }
196
197                log.info("Starting to create jobs for harvest definition #{}({})", id, harvestDefinition.getName());
198
199                final JobGeneratorThread jobGeneratorThread = new JobGeneratorThread(id, harvestDefinition);
200                threadMap.put(id, jobGeneratorThread);
201                jobGeneratorThread.start();
202            }
203        }
204
205        /**
206         * Find out if a scheduling takes more than is acceptable currently 5 minutes.
207         *
208         * @param harvestId A given harvestId
209         * @return true, if a scheduling of the given harvestId has taken more than 5 minutes, or false, if not or no
210         * scheduling for this harvestId is underway
211         */
212        private static boolean takesSuspiciouslyLongToSchedule(Long harvestId) {
213            // acceptable delay before issuing warning is currently hard-wired to
214            // 5 minutes (5 * 60 * 1000 milliseconds)
215            final long acceptableDelay = 5 * 60 * 1000;
216            Long timewhenscheduled = schedulingStartedMap.get(harvestId);
217            if (timewhenscheduled == null) {
218                return false;
219            } else {
220                long now = System.currentTimeMillis();
221                if (timewhenscheduled + acceptableDelay < now) {
222                    // updates the schedulingStartedMap with currenttime for
223                    // the given harvestID when returning true
224                    schedulingStartedMap.put(harvestId, now);
225                    return true;
226                } else {
227                    return false;
228                }
229            }
230        }
231
232        private static class JobGeneratorThread extends Thread {
233            private final Long id;
234            private final HarvestDefinition harvestDefinition;
235
236            public JobGeneratorThread(Long id, HarvestDefinition harvestDefinition) {
237                super("JobGeneratorTask-" + id);
238                this.id = id;
239                this.harvestDefinition = harvestDefinition;
240            }
241
242            public void run() {
243                try {
244                    JobGenerator jobGen = JobGeneratorFactory.getInstance();
245                    int jobsMade = jobGen.generateJobs(harvestDefinition);
246                    if (jobsMade > 0) {
247                        log.info("Created {} jobs for harvest definition ({})", jobsMade,
248                                harvestDefinition.getName());
249                    } else {
250                        String msg = "No jobs created for harvest definition '"
251                                + harvestDefinition.getName()
252                                + "'. Probable cause: harvest tries to continue harvest that is already finished ";
253                        log.warn(msg);
254                        NotificationsFactory.getInstance().notify(msg, NotificationType.WARNING);
255                    }
256                    haDefinitionDAO.update(harvestDefinition);
257                } catch (Throwable e) {
258                    try {
259                        HarvestDefinition hd = haDefinitionDAO.read(harvestDefinition.getOid());
260                        hd.setActive(false);
261                        haDefinitionDAO.update(hd);
262                        String errMsg = "Exception while scheduling harvestdefinition #" + id + "("
263                                + harvestDefinition.getName() + "). The harvestdefinition has been "
264                                + "deactivated!";
265                        log.warn(errMsg, e);
266                        NotificationsFactory.getInstance().notify(errMsg, NotificationType.ERROR, e);
267                    } catch (Exception e1) {
268                        String errMsg = "Exception while scheduling harvestdefinition #" + id + "("
269                                + harvestDefinition.getName() + "). The harvestdefinition couldn't be "
270                                + "deactivated!";
271                        log.warn(errMsg, e);
272                        log.warn("Unable to deactivate", e1);
273                        NotificationsFactory.getInstance().notify(errMsg, NotificationType.ERROR, e);
274                    }
275                } finally {
276                    killScheduling();
277                }
278            }
279
280            public void killScheduling() {
281                harvestDefinitionsBeingScheduled.remove(id);
282                schedulingStartedMap.remove(id);
283                threadMap.remove(id);
284                log.debug("Removed HD #{}({}) from list of harvestdefinitions to be scheduled. "
285                                + "Harvestdefinitions still to be scheduled: {}", id, harvestDefinition.getName(),
286                        harvestDefinitionsBeingScheduled);
287            }
288        }
289    }
290
291}