001/*
002 * #%L
003 * Netarchivesuite - harvester
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.harvester.scheduler.jobgen;
024
025import java.util.Comparator;
026import java.util.Iterator;
027import java.util.Map;
028
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032import dk.netarkivet.common.exceptions.ArgumentNotValid;
033import dk.netarkivet.common.exceptions.UnknownID;
034import dk.netarkivet.common.utils.Settings;
035import dk.netarkivet.harvester.HarvesterSettings;
036import dk.netarkivet.harvester.datamodel.DomainConfiguration;
037import dk.netarkivet.harvester.datamodel.HarvestDefinition;
038import dk.netarkivet.harvester.datamodel.Job;
039import dk.netarkivet.harvester.datamodel.JobDAO;
040import dk.netarkivet.harvester.datamodel.NumberUtils;
041import dk.netarkivet.harvester.datamodel.eav.EAV;
042
043/**
044 * The legacy job generator implementation. Aims at generating jobs that execute in a predictable time by taking
045 * advantage of previous crawls statistics.
046 */
047public class DefaultJobGenerator extends AbstractJobGenerator {
048
049    /** Logger for this class. */
050    private static final Logger log = LoggerFactory.getLogger(DefaultJobGenerator.class);
051
052    private static Long CONFIG_COUNT_SNAPSHOT = null;
053
054    public DefaultJobGenerator() {
055        try {
056            CONFIG_COUNT_SNAPSHOT = Settings.getLong(HarvesterSettings.JOBGEN_FIXED_CONFIG_COUNT_SNAPSHOT);
057            if (CONFIG_COUNT_SNAPSHOT <= 0) {
058                log.info("The parameter {} has the value {} and is therefore ignored during job splitting for "
059                        + "snapshot jobs.", HarvesterSettings.JOBGEN_FIXED_CONFIG_COUNT_SNAPSHOT, CONFIG_COUNT_SNAPSHOT);
060            } else {
061                log.info("Snapshot jobs will be split at an absolute maximum of {} configurations ({}).",
062                        CONFIG_COUNT_SNAPSHOT, HarvesterSettings.JOBGEN_FIXED_CONFIG_COUNT_SNAPSHOT);
063            }
064        } catch (UnknownID u) {
065            log.info("The parameter {} is not set so there is no absolute limit to the number of configurations per "
066                    + "snapshot job.", HarvesterSettings.JOBGEN_FIXED_CONFIG_COUNT_SNAPSHOT);
067        }
068    }
069
070    /**
071     * Compare two configurations using the following order: 1) Harvest template 2) Byte limit 3) expected number of
072     * object a harvest of the configuration will produce. The comparison will put the largest configuration first (with
073     * respect to 2) and 3))
074     */
075    public static class CompareConfigsDesc implements Comparator<DomainConfiguration> {
076
077        private long objectLimit;
078        private long byteLimit;
079
080        public CompareConfigsDesc(long objectLimit, long byteLimit) {
081            this.objectLimit = objectLimit;
082            this.byteLimit = byteLimit;
083        }
084
085        public int compare(DomainConfiguration cfg1, DomainConfiguration cfg2) {
086            log.trace("Comparing " + cfg1 + " " + cfg2);
087            // Compare order xml names
088            int cmp = cfg1.getOrderXmlName().compareTo(cfg2.getOrderXmlName());
089            if (cmp != 0) {
090                return cmp;
091            }
092            log.trace("Comparing EAV attributes now");
093            int result = EAV.compare(cfg1.getAttributesAndTypes(), cfg2.getAttributesAndTypes());
094            log.trace("Comparison of EAV attributes gave result " + result);
095            if (result != 0) {
096                return result;
097            }
098            // Compare byte limits
099            long bytelimit1 = NumberUtils.minInf(cfg1.getMaxBytes(), byteLimit);
100            long bytelimit2 = NumberUtils.minInf(cfg2.getMaxBytes(), byteLimit);
101            cmp = NumberUtils.compareInf(bytelimit2, bytelimit1);
102            if (cmp != 0) {
103                return cmp;
104            }
105            // Compare expected sizes
106            long expectedsize1 = cfg1.getExpectedNumberOfObjects(objectLimit, byteLimit);
107            long expectedsize2 = cfg2.getExpectedNumberOfObjects(objectLimit, byteLimit);
108            long res = expectedsize2 - expectedsize1;
109            if (res != 0L) {
110                return res < 0L ? -1 : 1;
111            }
112            return 0;
113        }
114    }
115
116    /**
117     * Job limits read from settings during construction.
118     */
119    private final long LIM_MAX_REL_SIZE = Long.parseLong(Settings
120            .get(HarvesterSettings.JOBS_MAX_RELATIVE_SIZE_DIFFERENCE));
121    private final long LIM_MIN_ABS_SIZE = Long.parseLong(Settings
122            .get(HarvesterSettings.JOBS_MIN_ABSOLUTE_SIZE_DIFFERENCE));
123    private final long LIM_MAX_TOTAL_SIZE = Long.parseLong(Settings.get(HarvesterSettings.JOBS_MAX_TOTAL_JOBSIZE));
124    /** Constant : exclude {@link DomainConfiguration}s with a budget of zero (bytes or objects). */
125    private final boolean EXCLUDE_ZERO_BUDGET = Settings
126            .getBoolean(HarvesterSettings.JOBGEN_FIXED_CONFIG_COUNT_EXCLUDE_ZERO_BUDGET);
127
128    /** Singleton instance. */
129    private static DefaultJobGenerator instance;
130
131    /**
132     * @return the singleton instance, builds it if necessary.
133     */
134    public static synchronized DefaultJobGenerator getInstance() {
135        if (instance == null) {
136            instance = new DefaultJobGenerator();
137        }
138        return instance;
139    }
140
141    @Override
142    protected Comparator<DomainConfiguration> getDomainConfigurationSubsetComparator(HarvestDefinition harvest) {
143        return new CompareConfigsDesc(harvest.getMaxCountObjects(), harvest.getMaxBytes());
144    }
145
146    /**
147     * Create new jobs from a collection of configurations. All configurations must use the same order.xml file.Jobs
148     *
149     * @param harvest the {@link HarvestDefinition} being processed.
150     * @param domainConfSubset the configurations to use to create the jobs
151     * @return The number of jobs created
152     * @throws ArgumentNotValid if any of the parameters is null or if the cfglist does not contain any configurations
153     */
154    @Override
155    protected int processDomainConfigurationSubset(HarvestDefinition harvest,
156            Iterator<DomainConfiguration> domainConfSubset) {
157        int jobsMade = 0;
158        Job job = null;
159        log.debug("Adding domainconfigs with the same order.xml for harvest #{}", harvest.getOid());
160        JobDAO dao = JobDAO.getInstance();
161        DomainConfiguration previousDomainConf = null;
162        while (domainConfSubset.hasNext()) {
163            DomainConfiguration cfg = domainConfSubset.next();
164            log.trace("Processing " + DomainConfiguration.cfgToString(cfg));
165            if (EXCLUDE_ZERO_BUDGET && (0 == cfg.getMaxBytes() || 0 == cfg.getMaxObjects())) {
166                log.info("Config '{}' for '{}'" + " excluded (0{})", cfg.getName(), cfg.getDomainName(),
167                        (cfg.getMaxBytes() == 0 ? " bytes" : " objects"));
168                continue;
169            }
170            if ((job == null) || (!canAccept(job, cfg, previousDomainConf))) {
171                if (job != null) {
172                    // If we're done with a job, write it out
173                    ++jobsMade;
174                    dao.create(job);
175                }
176                job = getNewJob(harvest, cfg);
177                log.trace("Created new job for harvest #{} to add configuration {} for domain {}", harvest.getOid(),
178                        cfg.getName(), cfg.getDomainName());
179
180            } else {
181                job.addConfiguration(cfg);
182                log.trace("Added job configuration {} for domain {} to current job for harvest #{}", cfg.getName(),
183                        cfg.getDomainName(), harvest.getOid());
184            }
185            previousDomainConf = cfg;
186        }
187        if (job != null) {
188            ++jobsMade;
189            editJobOrderXml(job);
190            dao.create(job);
191            if (log.isTraceEnabled()) {
192                log.trace("Generated job: '{}'", job.toString());
193                StringBuilder logMsg = new StringBuilder("Job configurationsDomain:");
194                for (Map.Entry<String, String> config : job.getDomainConfigurationMap().entrySet()) {
195                    logMsg.append("\n ").append(config.getKey()).append(":").append(config.getValue());
196                }
197                log.trace(logMsg.toString());
198            }
199            log.debug("Created {} jobs for harvest #{}", jobsMade, harvest.getOid());
200        }
201        return jobsMade;
202    }
203
204
205    @Override
206    protected boolean checkSpecificAcceptConditions(Job job, DomainConfiguration cfg) {
207        if (job.isSnapshot()
208                && CONFIG_COUNT_SNAPSHOT != null
209                && CONFIG_COUNT_SNAPSHOT > 0
210                && job.getDomainConfigurationMap().size() >= CONFIG_COUNT_SNAPSHOT
211                ) {
212            return false;
213        }
214
215        // By default byte limit is used as base criterion for splitting a
216        // harvest in config chunks, however the configuration can override
217        // this and instead use object limit.
218        boolean splitByObjectLimit = Settings.getBoolean(HarvesterSettings.SPLIT_BY_OBJECTLIMIT);
219        long forceMaxObjectsPerDomain = job.getForceMaxObjectsPerDomain();
220        long forceMaxBytesPerDomain = job.getForceMaxBytesPerDomain();
221        if (splitByObjectLimit) {
222            if (NumberUtils.compareInf(cfg.getMaxObjects(), forceMaxObjectsPerDomain) < 0
223                    || (job.isConfigurationSetsObjectLimit() && NumberUtils.compareInf(cfg.getMaxObjects(),
224                            forceMaxObjectsPerDomain) != 0)) {
225                return false;
226            }
227        } else {
228            if (NumberUtils.compareInf(cfg.getMaxBytes(), forceMaxBytesPerDomain) < 0
229                    || (job.isConfigurationSetsByteLimit() && NumberUtils.compareInf(cfg.getMaxBytes(),
230                            forceMaxBytesPerDomain) != 0)) {
231                return false;
232            }
233        }
234
235        long maxCountObjects = job.getMaxCountObjects();
236        long minCountObjects = job.getMinCountObjects();
237
238        assert (maxCountObjects >= minCountObjects) : "basic invariant";
239
240        // The expected number of objects retrieved by this job from
241        // the configuration based on historical harvest results.
242        long expectation = cfg.getExpectedNumberOfObjects(forceMaxObjectsPerDomain, forceMaxBytesPerDomain);
243
244        // Check if total count is exceeded
245        long totalCountObjects = job.getTotalCountObjects();
246        if ((totalCountObjects > 0) && ((expectation + totalCountObjects) > LIM_MAX_TOTAL_SIZE)) {
247            return false;
248        }
249
250        // total count OK
251        // Check if size within existing limits
252        if ((expectation <= maxCountObjects) && (expectation >= minCountObjects)) {
253            // total count ok and within current max and min
254            return true;
255        }
256
257        // Outside current range we need to check the relative difference
258        long absDiff;
259        long xmaxCountObjects = maxCountObjects;
260        long yminCountObjects = minCountObjects;
261
262        // New max or new min ?
263        if (expectation > maxCountObjects) {
264            xmaxCountObjects = expectation;
265        } else {
266            assert (expectation < minCountObjects) : "New minimum expected";
267            yminCountObjects = expectation;
268        }
269
270        absDiff = (xmaxCountObjects - yminCountObjects);
271
272        if ((absDiff == 0) || (absDiff <= LIM_MIN_ABS_SIZE)) {
273            return true; // difference too small to matter
274        }
275
276        if (yminCountObjects == 0) {
277            yminCountObjects = 1; // make sure division succeeds
278        }
279
280        float relDiff = (float) xmaxCountObjects / (float) yminCountObjects;
281        if (relDiff > LIM_MAX_REL_SIZE) {
282            return false;
283        }
284
285        // all tests passed
286        return true;
287    }
288
289    /** Only to be used by unittests. */
290    public static void reset() {
291        instance = null;
292    }
293
294}