001/*
002 * #%L
003 * Netarchivesuite - harvester
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023
024package dk.netarkivet.harvester.scheduler;
025
026import java.util.Collections;
027import java.util.Date;
028import java.util.HashMap;
029import java.util.HashSet;
030import java.util.Map;
031import java.util.Set;
032
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import dk.netarkivet.common.lifecycle.ComponentLifeCycle;
037import dk.netarkivet.common.lifecycle.PeriodicTaskExecutor;
038import dk.netarkivet.common.utils.NotificationType;
039import dk.netarkivet.common.utils.NotificationsFactory;
040import dk.netarkivet.common.utils.Settings;
041import dk.netarkivet.harvester.HarvesterSettings;
042import dk.netarkivet.harvester.datamodel.HarvestChannel;
043import dk.netarkivet.harvester.datamodel.HarvestChannelDAO;
044import dk.netarkivet.harvester.datamodel.HarvestDefinition;
045import dk.netarkivet.harvester.datamodel.HarvestDefinitionDAO;
046import dk.netarkivet.harvester.scheduler.jobgen.JobGenerator;
047import dk.netarkivet.harvester.scheduler.jobgen.JobGeneratorFactory;
048
049/**
050 * Handles the generation of new jobs based on the harvest definitions in persistent storage. The
051 * <code>HarvestJobGenerator</code> continuously scans the harvest definition database for harvest which should be run
052 * now. If a HD defines a harvest which should be run, a Harvest Job is created in the harvest job database.
053 */
054public class HarvestJobGenerator implements ComponentLifeCycle {
055
056    /** The class logger. */
057    private static final Logger log = LoggerFactory.getLogger(HarvestJobGenerator.class);
058
059    /**
060     * The set of HDs (or rather their OIDs) that are currently being scheduled in a separate thread. This set is a
061     * SynchronizedSet
062     */
063    protected static Set<Long> harvestDefinitionsBeingScheduled = Collections.synchronizedSet(new HashSet<Long>());
064
065    /**
066     * Used the store the currenttimeMillis when the scheduling of a particular harvestdefinition # started or when last
067     * a warning was issued.
068     */
069    protected static Map<Long, Long> schedulingStartedMap = Collections.synchronizedMap(new HashMap<Long, Long>());
070
071    /** The executor used to schedule the generator jobs. */
072    private PeriodicTaskExecutor genExec;
073
074    /** @see HarvesterSettings#JOBGEN_POSTPONE_UNREGISTERED_HARVEST_CHANNEL */
075    private static final boolean postponeUnregisteredChannel = Settings
076            .getBoolean(HarvesterSettings.JOBGEN_POSTPONE_UNREGISTERED_HARVEST_CHANNEL);
077
078    /** The HarvestDefinitionDAO used by the HarvestJobGenerator. */
079    private static final HarvestDefinitionDAO haDefinitionDAO = HarvestDefinitionDAO.getInstance();
080
081    private final HarvestChannelRegistry harvestChannelRegistry;
082
083    public HarvestJobGenerator(final HarvestChannelRegistry harvestChannelRegistry) {
084        this.harvestChannelRegistry = harvestChannelRegistry;
085    }
086
087    /**
088     * Starts the job generation scheduler.
089     */
090    @Override
091    public void start() {
092        genExec = new PeriodicTaskExecutor("JobGeneratorTask", new JobGeneratorTask(harvestChannelRegistry), 0,
093                Settings.getInt(HarvesterSettings.GENERATE_JOBS_PERIOD));
094    }
095
096    @Override
097    public void shutdown() {
098        if (genExec != null) {
099            genExec.shutdown();
100        }
101    }
102
103    /**
104     * Contains the functionality for the individual JobGenerations.
105     */
106    static class JobGeneratorTask implements Runnable {
107
108        private final HarvestChannelRegistry harvestChannelRegistry;
109
110        public JobGeneratorTask(HarvestChannelRegistry harvestChannelRegistry) {
111            this.harvestChannelRegistry = harvestChannelRegistry;
112        }
113
114        @Override
115        public synchronized void run() {
116            try {
117                generateJobs(new Date());
118            } catch (Exception e) {
119                log.info("Exception caught at fault barrier while generating jobs.", e);
120            }
121        }
122
123        /**
124         * Check if jobs should be generated for any ready harvest definitions for the specified time.
125         *
126         * @param timeToGenerateJobsFor Jobs will be generated which should be run at this time. Note: In a production
127         * system the provided time will normally be current time, but during testing we need to simulated other
128         * points-in-time
129         */
130        void generateJobs(Date timeToGenerateJobsFor) {
131            final Iterable<Long> readyHarvestDefinitions = haDefinitionDAO
132                    .getReadyHarvestDefinitions(timeToGenerateJobsFor);
133
134            HarvestChannelDAO hChanDao = HarvestChannelDAO.getInstance();
135
136            for (final Long id : readyHarvestDefinitions) {
137                // Make every HD run in its own thread, but at most once.
138                if (harvestDefinitionsBeingScheduled.contains(id)) {
139                    if (takesSuspiciouslyLongToSchedule(id)) {
140                        String harvestName = haDefinitionDAO.getHarvestName(id);
141                        String errMsg = "Not creating jobs for harvestdefinition #" + id + " (" + harvestName + ")"
142                                + " as the previous scheduling is still running";
143                        if (haDefinitionDAO.isSnapshot(id)) {
144                            // Log only at level debug if the ID represents
145                            // is a snapshot harvestdefinition, which are only run
146                            // once anyway
147                            log.debug(errMsg);
148                        } else { // Log at level WARN, and send a notification, if it is time
149                            log.warn(errMsg);
150                            NotificationsFactory.getInstance().notify(errMsg, NotificationType.WARNING);
151                        }
152                    }
153                    continue;
154                }
155
156                final HarvestDefinition harvestDefinition = haDefinitionDAO.read(id);
157
158                if (!harvestDefinition.isSnapShot()) {
159                    Long chanId = harvestDefinition.getChannelId();
160
161                    HarvestChannel chan = (chanId == null ? hChanDao.getDefaultChannel(false) : hChanDao
162                            .getById(chanId));
163
164                    String channelName = chan.getName();
165                    if (postponeUnregisteredChannel && !harvestChannelRegistry.isRegistered(channelName)) {
166                        log.info("Harvest channel '{}' has not yet been registered by any harvester, hence harvest "
167                                + "definition '{}' ({}) cannot be processed by the job generator for now.",
168                                channelName, harvestDefinition.getName(), id);
169                        continue;
170                    }
171                }
172
173                harvestDefinitionsBeingScheduled.add(id);
174                schedulingStartedMap.put(id, System.currentTimeMillis());
175
176                if (!harvestDefinition.runNow(timeToGenerateJobsFor)) {
177                    log.trace("The harvestdefinition #{}'{}' should not run now.", id, harvestDefinition.getName());
178                    log.trace("numEvents: {}", harvestDefinition.getNumEvents());
179                    continue;
180                }
181
182                log.info("Starting to create jobs for harvest definition #{}({})", id, harvestDefinition.getName());
183
184                new Thread("JobGeneratorTask-" + id) {
185                    public void run() {
186                        try {
187                            JobGenerator jobGen = JobGeneratorFactory.getInstance();
188                            int jobsMade = jobGen.generateJobs(harvestDefinition);
189                            if (jobsMade > 0) {
190                                log.info("Created {} jobs for harvest definition ({})", jobsMade,
191                                        harvestDefinition.getName());
192                            } else {
193                                String msg = "No jobs created for harvest definition '"
194                                        + harvestDefinition.getName()
195                                        + "'. Probable cause: harvest tries to continue harvest that is already finished ";
196                                log.warn(msg);
197                                NotificationsFactory.getInstance().notify(msg, NotificationType.WARNING);
198                            }
199                            haDefinitionDAO.update(harvestDefinition);
200                        } catch (Throwable e) {
201                            try {
202                                HarvestDefinition hd = haDefinitionDAO.read(harvestDefinition.getOid());
203                                hd.setActive(false);
204                                haDefinitionDAO.update(hd);
205                                String errMsg = "Exception while scheduling harvestdefinition #" + id + "("
206                                        + harvestDefinition.getName() + "). The harvestdefinition has been "
207                                        + "deactivated!";
208                                log.warn(errMsg, e);
209                                NotificationsFactory.getInstance().notify(errMsg, NotificationType.ERROR, e);
210                            } catch (Exception e1) {
211                                String errMsg = "Exception while scheduling harvestdefinition #" + id + "("
212                                        + harvestDefinition.getName() + "). The harvestdefinition couldn't be "
213                                        + "deactivated!";
214                                log.warn(errMsg, e);
215                                log.warn("Unable to deactivate", e1);
216                                NotificationsFactory.getInstance().notify(errMsg, NotificationType.ERROR, e);
217                            }
218                        } finally {
219                            harvestDefinitionsBeingScheduled.remove(id);
220                            schedulingStartedMap.remove(id);
221                            log.debug("Removed HD #{}({}) from list of harvestdefinitions to be scheduled. "
222                                    + "Harvestdefinitions still to be scheduled: {}", id, harvestDefinition.getName(),
223                                    harvestDefinitionsBeingScheduled);
224                        }
225                    }
226                }.start();
227            }
228        }
229
230        /**
231         * Find out if a scheduling takes more than is acceptable currently 5 minutes.
232         *
233         * @param harvestId A given harvestId
234         * @return true, if a scheduling of the given harvestId has taken more than 5 minutes, or false, if not or no
235         * scheduling for this harvestId is underway
236         */
237        private static boolean takesSuspiciouslyLongToSchedule(Long harvestId) {
238            // acceptable delay before issuing warning is currently hard-wired to
239            // 5 minutes (5 * 60 * 1000 milliseconds)
240            final long acceptableDelay = 5 * 60 * 1000;
241            Long timewhenscheduled = schedulingStartedMap.get(harvestId);
242            if (timewhenscheduled == null) {
243                return false;
244            } else {
245                long now = System.currentTimeMillis();
246                if (timewhenscheduled + acceptableDelay < now) {
247                    // updates the schedulingStartedMap with currenttime for
248                    // the given harvestID when returning true
249                    schedulingStartedMap.put(harvestId, now);
250                    return true;
251                } else {
252                    return false;
253                }
254            }
255        }
256    }
257
258}