001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.harvester.scheduler.jobgen; 024 025import java.util.Comparator; 026import java.util.Iterator; 027import java.util.Map; 028 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032import dk.netarkivet.common.exceptions.ArgumentNotValid; 033import dk.netarkivet.common.exceptions.UnknownID; 034import dk.netarkivet.common.utils.Settings; 035import dk.netarkivet.harvester.HarvesterSettings; 036import dk.netarkivet.harvester.datamodel.DomainConfiguration; 037import dk.netarkivet.harvester.datamodel.HarvestDefinition; 038import dk.netarkivet.harvester.datamodel.Job; 039import dk.netarkivet.harvester.datamodel.JobDAO; 040import dk.netarkivet.harvester.datamodel.NumberUtils; 041import dk.netarkivet.harvester.datamodel.eav.EAV; 042 043/** 044 * The legacy job generator implementation. Aims at generating jobs that execute in a predictable time by taking 045 * advantage of previous crawls statistics. 046 */ 047public class DefaultJobGenerator extends AbstractJobGenerator { 048 049 /** Logger for this class. */ 050 private static final Logger log = LoggerFactory.getLogger(DefaultJobGenerator.class); 051 052 private static Long CONFIG_COUNT_SNAPSHOT = null; 053 054 public DefaultJobGenerator() { 055 try { 056 CONFIG_COUNT_SNAPSHOT = Settings.getLong(HarvesterSettings.JOBGEN_FIXED_CONFIG_COUNT_SNAPSHOT); 057 if (CONFIG_COUNT_SNAPSHOT <= 0) { 058 log.info("The parameter {} has the value {} and is therefore ignored during job splitting for " 059 + "snapshot jobs.", HarvesterSettings.JOBGEN_FIXED_CONFIG_COUNT_SNAPSHOT, CONFIG_COUNT_SNAPSHOT); 060 } else { 061 log.info("Snapshot jobs will be split at an absolute maximum of {} configurations ({}).", 062 CONFIG_COUNT_SNAPSHOT, HarvesterSettings.JOBGEN_FIXED_CONFIG_COUNT_SNAPSHOT); 063 } 064 } catch (UnknownID u) { 065 log.info("The parameter {} is not set so there is no absolute limit to the number of configurations per " 066 + "snapshot job.", HarvesterSettings.JOBGEN_FIXED_CONFIG_COUNT_SNAPSHOT); 067 } 068 } 069 070 /** 071 * Compare two configurations using the following order: 1) Harvest template 2) Byte limit 3) expected number of 072 * object a harvest of the configuration will produce. The comparison will put the largest configuration first (with 073 * respect to 2) and 3)) 074 */ 075 public static class CompareConfigsDesc implements Comparator<DomainConfiguration> { 076 077 private long objectLimit; 078 private long byteLimit; 079 080 public CompareConfigsDesc(long objectLimit, long byteLimit) { 081 this.objectLimit = objectLimit; 082 this.byteLimit = byteLimit; 083 } 084 085 public int compare(DomainConfiguration cfg1, DomainConfiguration cfg2) { 086 log.trace("Comparing " + cfg1 + " " + cfg2); 087 // Compare order xml names 088 int cmp = cfg1.getOrderXmlName().compareTo(cfg2.getOrderXmlName()); 089 if (cmp != 0) { 090 return cmp; 091 } 092 log.trace("Comparing EAV attributes now"); 093 int result = EAV.compare(cfg1.getAttributesAndTypes(), cfg2.getAttributesAndTypes()); 094 log.trace("Comparison of EAV attributes gave result " + result); 095 if (result != 0) { 096 return result; 097 } 098 // Compare byte limits 099 long bytelimit1 = NumberUtils.minInf(cfg1.getMaxBytes(), byteLimit); 100 long bytelimit2 = NumberUtils.minInf(cfg2.getMaxBytes(), byteLimit); 101 cmp = NumberUtils.compareInf(bytelimit2, bytelimit1); 102 if (cmp != 0) { 103 return cmp; 104 } 105 // Compare expected sizes 106 long expectedsize1 = cfg1.getExpectedNumberOfObjects(objectLimit, byteLimit); 107 long expectedsize2 = cfg2.getExpectedNumberOfObjects(objectLimit, byteLimit); 108 long res = expectedsize2 - expectedsize1; 109 if (res != 0L) { 110 return res < 0L ? -1 : 1; 111 } 112 return 0; 113 } 114 } 115 116 /** 117 * Job limits read from settings during construction. 118 */ 119 private final long LIM_MAX_REL_SIZE = Long.parseLong(Settings 120 .get(HarvesterSettings.JOBS_MAX_RELATIVE_SIZE_DIFFERENCE)); 121 private final long LIM_MIN_ABS_SIZE = Long.parseLong(Settings 122 .get(HarvesterSettings.JOBS_MIN_ABSOLUTE_SIZE_DIFFERENCE)); 123 private final long LIM_MAX_TOTAL_SIZE = Long.parseLong(Settings.get(HarvesterSettings.JOBS_MAX_TOTAL_JOBSIZE)); 124 /** Constant : exclude {@link DomainConfiguration}s with a budget of zero (bytes or objects). */ 125 private final boolean EXCLUDE_ZERO_BUDGET = Settings 126 .getBoolean(HarvesterSettings.JOBGEN_FIXED_CONFIG_COUNT_EXCLUDE_ZERO_BUDGET); 127 128 /** Singleton instance. */ 129 private static DefaultJobGenerator instance; 130 131 /** 132 * @return the singleton instance, builds it if necessary. 133 */ 134 public static synchronized DefaultJobGenerator getInstance() { 135 if (instance == null) { 136 instance = new DefaultJobGenerator(); 137 } 138 return instance; 139 } 140 141 @Override 142 protected Comparator<DomainConfiguration> getDomainConfigurationSubsetComparator(HarvestDefinition harvest) { 143 return new CompareConfigsDesc(harvest.getMaxCountObjects(), harvest.getMaxBytes()); 144 } 145 146 /** 147 * Create new jobs from a collection of configurations. All configurations must use the same order.xml file.Jobs 148 * 149 * @param harvest the {@link HarvestDefinition} being processed. 150 * @param domainConfSubset the configurations to use to create the jobs 151 * @return The number of jobs created 152 * @throws ArgumentNotValid if any of the parameters is null or if the cfglist does not contain any configurations 153 */ 154 @Override 155 protected int processDomainConfigurationSubset(HarvestDefinition harvest, 156 Iterator<DomainConfiguration> domainConfSubset) { 157 int jobsMade = 0; 158 Job job = null; 159 log.debug("Adding domainconfigs with the same order.xml for harvest #{}", harvest.getOid()); 160 JobDAO dao = JobDAO.getInstance(); 161 DomainConfiguration previousDomainConf = null; 162 while (domainConfSubset.hasNext()) { 163 DomainConfiguration cfg = domainConfSubset.next(); 164 log.trace("Processing " + DomainConfiguration.cfgToString(cfg)); 165 if (EXCLUDE_ZERO_BUDGET && (0 == cfg.getMaxBytes() || 0 == cfg.getMaxObjects())) { 166 log.info("Config '{}' for '{}'" + " excluded (0{})", cfg.getName(), cfg.getDomainName(), 167 (cfg.getMaxBytes() == 0 ? " bytes" : " objects")); 168 continue; 169 } 170 if ((job == null) || (!canAccept(job, cfg, previousDomainConf))) { 171 if (job != null) { 172 // If we're done with a job, write it out 173 ++jobsMade; 174 dao.create(job); 175 } 176 job = getNewJob(harvest, cfg); 177 log.trace("Created new job for harvest #{} to add configuration {} for domain {}", harvest.getOid(), 178 cfg.getName(), cfg.getDomainName()); 179 180 } else { 181 job.addConfiguration(cfg); 182 log.trace("Added job configuration {} for domain {} to current job for harvest #{}", cfg.getName(), 183 cfg.getDomainName(), harvest.getOid()); 184 } 185 previousDomainConf = cfg; 186 } 187 if (job != null) { 188 ++jobsMade; 189 editJobOrderXml(job); 190 dao.create(job); 191 if (log.isTraceEnabled()) { 192 log.trace("Generated job: '{}'", job.toString()); 193 StringBuilder logMsg = new StringBuilder("Job configurationsDomain:"); 194 for (Map.Entry<String, String> config : job.getDomainConfigurationMap().entrySet()) { 195 logMsg.append("\n ").append(config.getKey()).append(":").append(config.getValue()); 196 } 197 log.trace(logMsg.toString()); 198 } 199 log.debug("Created {} jobs for harvest #{}", jobsMade, harvest.getOid()); 200 } 201 return jobsMade; 202 } 203 204 205 @Override 206 protected boolean checkSpecificAcceptConditions(Job job, DomainConfiguration cfg) { 207 if (job.isSnapshot() 208 && CONFIG_COUNT_SNAPSHOT != null 209 && CONFIG_COUNT_SNAPSHOT > 0 210 && job.getDomainConfigurationMap().size() >= CONFIG_COUNT_SNAPSHOT 211 ) { 212 return false; 213 } 214 215 // By default byte limit is used as base criterion for splitting a 216 // harvest in config chunks, however the configuration can override 217 // this and instead use object limit. 218 boolean splitByObjectLimit = Settings.getBoolean(HarvesterSettings.SPLIT_BY_OBJECTLIMIT); 219 long forceMaxObjectsPerDomain = job.getForceMaxObjectsPerDomain(); 220 long forceMaxBytesPerDomain = job.getForceMaxBytesPerDomain(); 221 if (splitByObjectLimit) { 222 if (NumberUtils.compareInf(cfg.getMaxObjects(), forceMaxObjectsPerDomain) < 0 223 || (job.isConfigurationSetsObjectLimit() && NumberUtils.compareInf(cfg.getMaxObjects(), 224 forceMaxObjectsPerDomain) != 0)) { 225 return false; 226 } 227 } else { 228 if (NumberUtils.compareInf(cfg.getMaxBytes(), forceMaxBytesPerDomain) < 0 229 || (job.isConfigurationSetsByteLimit() && NumberUtils.compareInf(cfg.getMaxBytes(), 230 forceMaxBytesPerDomain) != 0)) { 231 return false; 232 } 233 } 234 235 long maxCountObjects = job.getMaxCountObjects(); 236 long minCountObjects = job.getMinCountObjects(); 237 238 assert (maxCountObjects >= minCountObjects) : "basic invariant"; 239 240 // The expected number of objects retrieved by this job from 241 // the configuration based on historical harvest results. 242 long expectation = cfg.getExpectedNumberOfObjects(forceMaxObjectsPerDomain, forceMaxBytesPerDomain); 243 244 // Check if total count is exceeded 245 long totalCountObjects = job.getTotalCountObjects(); 246 if ((totalCountObjects > 0) && ((expectation + totalCountObjects) > LIM_MAX_TOTAL_SIZE)) { 247 return false; 248 } 249 250 // total count OK 251 // Check if size within existing limits 252 if ((expectation <= maxCountObjects) && (expectation >= minCountObjects)) { 253 // total count ok and within current max and min 254 return true; 255 } 256 257 // Outside current range we need to check the relative difference 258 long absDiff; 259 long xmaxCountObjects = maxCountObjects; 260 long yminCountObjects = minCountObjects; 261 262 // New max or new min ? 263 if (expectation > maxCountObjects) { 264 xmaxCountObjects = expectation; 265 } else { 266 assert (expectation < minCountObjects) : "New minimum expected"; 267 yminCountObjects = expectation; 268 } 269 270 absDiff = (xmaxCountObjects - yminCountObjects); 271 272 if ((absDiff == 0) || (absDiff <= LIM_MIN_ABS_SIZE)) { 273 return true; // difference too small to matter 274 } 275 276 if (yminCountObjects == 0) { 277 yminCountObjects = 1; // make sure division succeeds 278 } 279 280 float relDiff = (float) xmaxCountObjects / (float) yminCountObjects; 281 if (relDiff > LIM_MAX_REL_SIZE) { 282 return false; 283 } 284 285 // all tests passed 286 return true; 287 } 288 289 /** Only to be used by unittests. */ 290 public static void reset() { 291 instance = null; 292 } 293 294}