001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.harvester.harvesting.frontier; 024 025import java.io.BufferedReader; 026import java.io.BufferedWriter; 027import java.io.File; 028import java.io.FileNotFoundException; 029import java.io.FileReader; 030import java.io.FileWriter; 031import java.io.IOException; 032import java.util.Iterator; 033 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import com.sleepycat.je.DatabaseException; 038import com.sleepycat.je.Environment; 039import com.sleepycat.je.EnvironmentConfig; 040import com.sleepycat.persist.EntityCursor; 041import com.sleepycat.persist.EntityStore; 042import com.sleepycat.persist.PrimaryIndex; 043import com.sleepycat.persist.SecondaryIndex; 044import com.sleepycat.persist.StoreConfig; 045import com.sleepycat.persist.model.Entity; 046import com.sleepycat.persist.model.KeyField; 047import com.sleepycat.persist.model.Persistent; 048import com.sleepycat.persist.model.PrimaryKey; 049import com.sleepycat.persist.model.Relationship; 050import com.sleepycat.persist.model.SecondaryKey; 051 052import dk.netarkivet.common.CommonSettings; 053import dk.netarkivet.common.exceptions.ArgumentNotValid; 054import dk.netarkivet.common.exceptions.IOFailure; 055import dk.netarkivet.common.utils.FileUtils; 056import dk.netarkivet.common.utils.Settings; 057 058/** 059 * Wraps an Heritrix 1 full frontier report. As these reports can be big in size, this implementation relies on Berkeley 060 * DB direct persistence layer to store the report lines, allowing to store the lines partially in memory, and on disk. 061 */ 062@SuppressWarnings({"serial"}) 063public class FullFrontierReport extends AbstractFrontierReport { 064 065 @Persistent 066 static class PersistentLineKey implements Comparable<PersistentLineKey>, FrontierReportLineOrderKey { 067 068 @KeyField(1) 069 long totalEnqueues; 070 071 @KeyField(2) 072 String domainName; 073 074 // Default empty constructor for BDB. 075 PersistentLineKey() { 076 077 } 078 079 public PersistentLineKey(FrontierReportLine l) { 080 this.domainName = l.getDomainName(); 081 this.totalEnqueues = l.getTotalEnqueues(); 082 } 083 084 public String getQueueId() { 085 return domainName; 086 } 087 088 public long getQueueSize() { 089 return totalEnqueues; 090 } 091 092 /** 093 * Compares first by decreasing queue size, then by domain name. 094 */ 095 @Override 096 public int compareTo(PersistentLineKey k) { 097 return FrontierReportLineNaturalOrder.getInstance().compare(this, k); 098 } 099 100 @Override 101 public String toString() { 102 return totalEnqueues + " " + domainName; 103 } 104 105 } 106 107 @Entity 108 static class PersistentLine extends FrontierReportLine { 109 110 @PrimaryKey 111 private PersistentLineKey primaryKey; 112 113 @SecondaryKey(relate = Relationship.ONE_TO_ONE) 114 private String domainNameKey; 115 116 @SecondaryKey(relate = Relationship.MANY_TO_ONE) 117 private Long totalSpendKey; 118 119 @SecondaryKey(relate = Relationship.MANY_TO_ONE) 120 private Long currentSizeKey; 121 122 // Default empty constructor for BDB. 123 PersistentLine() { 124 125 } 126 127 PersistentLine(FrontierReportLine reportLine) { 128 super(reportLine); 129 this.primaryKey = new PersistentLineKey(reportLine); 130 this.domainNameKey = reportLine.getDomainName(); 131 this.currentSizeKey = reportLine.getCurrentSize(); 132 this.totalSpendKey = reportLine.getTotalSpend(); 133 } 134 135 } 136 137 public class ReportIterator implements Iterator<FrontierReportLine> { 138 139 private final EntityCursor<PersistentLine> cursor; 140 private final Iterator<PersistentLine> iter; 141 142 /** 143 * Returns an iterator on the given sort key. 144 * 145 * @param cursor The cursor (sort key) to iterate on. 146 */ 147 ReportIterator(EntityCursor<PersistentLine> cursor) { 148 this.cursor = cursor; 149 iter = cursor.iterator(); 150 } 151 152 @Override 153 public boolean hasNext() { 154 return iter.hasNext(); 155 } 156 157 @Override 158 public FrontierReportLine next() { 159 return iter.next(); 160 } 161 162 @Override 163 public void remove() { 164 throw new ArgumentNotValid("Remove is not supported!"); 165 } 166 167 /** 168 * Close method should be called explicitly to free underlying resources! 169 */ 170 public void close() { 171 try { 172 cursor.close(); 173 } catch (DatabaseException e) { 174 LOG.error("Error closing entity cursor:\n" + e.getLocalizedMessage()); 175 } 176 } 177 178 } 179 180 private static final String WORKING_DIR = FullFrontierReport.class.getSimpleName(); 181 182 /** The logger for this class. */ 183 private static final Logger LOG = LoggerFactory.getLogger(FullFrontierReport.class); 184 185 /** 186 * The Berkeley DB JE environment. 187 */ 188 private final Environment dbEnvironment; 189 190 /** 191 * The BDB entity store. 192 */ 193 private final EntityStore store; 194 195 /** 196 * Primary index. 197 */ 198 private final PrimaryIndex<PersistentLineKey, PersistentLine> linesIndex; 199 200 /** 201 * Secondary index, per domain name. 202 */ 203 private final SecondaryIndex<String, PersistentLineKey, PersistentLine> linesByDomain; 204 205 /** 206 * Secondary index, per current size. 207 */ 208 private final SecondaryIndex<Long, PersistentLineKey, PersistentLine> linesByCurrentSize; 209 210 /** 211 * Secondary index, per spent budget. 212 */ 213 private final SecondaryIndex<Long, PersistentLineKey, PersistentLine> linesBySpentBudget; 214 215 /** 216 * The directory where the BDB is stored. 217 */ 218 private final File storageDir; 219 220 /** 221 * Builds an empty frontier report wrapper. 222 * 223 * @param jobName the Heritrix job name 224 */ 225 private FullFrontierReport(String jobName) { 226 super(jobName); 227 228 File workingDir = new File(Settings.getFile(CommonSettings.CACHE_DIR), WORKING_DIR); 229 230 this.storageDir = new File(workingDir, jobName); 231 if (!storageDir.mkdirs()) { 232 throw new IOFailure("Failed to create directory " + storageDir.getAbsolutePath()); 233 } 234 235 try { 236 EnvironmentConfig envConfig = new EnvironmentConfig(); 237 envConfig.setAllowCreate(true); 238 dbEnvironment = new Environment(storageDir, envConfig); 239 240 StoreConfig storeConfig = new StoreConfig(); 241 storeConfig.setAllowCreate(true); 242 243 store = new EntityStore(dbEnvironment, FrontierReportLine.class.getSimpleName() + "-" + jobName, 244 storeConfig); 245 246 linesIndex = store.getPrimaryIndex(PersistentLineKey.class, PersistentLine.class); 247 248 linesByDomain = store.getSecondaryIndex(linesIndex, String.class, "domainNameKey"); 249 250 linesByCurrentSize = store.getSecondaryIndex(linesIndex, Long.class, "currentSizeKey"); 251 252 linesBySpentBudget = store.getSecondaryIndex(linesIndex, Long.class, "totalSpendKey"); 253 254 } catch (DatabaseException e) { 255 throw new IOFailure("Failed to init frontier BDB for job " + jobName, e); 256 } 257 258 } 259 260 /** 261 * Releases all resources once this report is to be discarded. NB this method MUST be explicitly called! 262 */ 263 public void dispose() { 264 265 try { 266 store.close(); 267 dbEnvironment.cleanLog(); 268 dbEnvironment.close(); 269 } catch (DatabaseException e) { 270 throw new IOFailure("Failed to close frontier BDB for job " + getJobName(), e); 271 } 272 273 FileUtils.removeRecursively(storageDir); 274 } 275 276 @Override 277 public void addLine(FrontierReportLine line) { 278 try { 279 linesIndex.put(new PersistentLine(line)); 280 } catch (DatabaseException e) { 281 throw new IOFailure("Failed to store frontier report line for job " + getJobName(), e); 282 } 283 } 284 285 @Override 286 public FrontierReportLine getLineForDomain(String domainName) { 287 try { 288 return linesByDomain.get(domainName); 289 } catch (DatabaseException e) { 290 LOG.warn("Failed to get queue for domain " + domainName, e); 291 return null; 292 } 293 } 294 295 /** 296 * Returns an iterator where lines are ordered by primary key order: first by decreasing totalEnqueues, then by 297 * domain name natural order. 298 * 299 * @return an iterator on the report lines. 300 */ 301 public ReportIterator iterateOnTotalEnqueues() { 302 try { 303 return new ReportIterator(linesIndex.entities()); 304 } catch (DatabaseException e) { 305 throw new IOFailure("Failed to read frontier BDB for job " + getJobName(), e); 306 } 307 } 308 309 /** 310 * Returns an iterator where lines are ordered by domain name natural order. 311 * 312 * @return an iterator on the report lines. 313 */ 314 public ReportIterator iterateOnDomainName() { 315 try { 316 return new ReportIterator(linesByDomain.entities()); 317 } catch (DatabaseException e) { 318 throw new IOFailure("Failed to read frontier BDB for job " + getJobName(), e); 319 } 320 } 321 322 /** 323 * Returns an iterator where lines are ordered by increasing currentSize. 324 * 325 * @return an iterator on the report lines. 326 */ 327 public ReportIterator iterateOnCurrentSize() { 328 try { 329 return new ReportIterator(linesByCurrentSize.entities()); 330 } catch (DatabaseException e) { 331 throw new IOFailure("Failed to read frontier BDB for job " + getJobName(), e); 332 } 333 } 334 335 /** 336 * Returns an iterator on lines having a given currentSize. 337 * 338 * @param dupValue 339 * @return an iterator on the report lines. 340 */ 341 public ReportIterator iterateOnDuplicateCurrentSize(long dupValue) { 342 try { 343 return new ReportIterator(linesByCurrentSize.subIndex(dupValue).entities()); 344 } catch (DatabaseException e) { 345 throw new IOFailure("Failed to read frontier BDB for job " + getJobName(), e); 346 } 347 } 348 349 /** 350 * Returns an iterator where lines are ordered by increasing totalSpend. 351 * 352 * @return an iterator on the report lines. 353 */ 354 public ReportIterator iterateOnSpentBudget() { 355 try { 356 return new ReportIterator(linesBySpentBudget.entities()); 357 } catch (DatabaseException e) { 358 throw new IOFailure("Failed to read frontier BDB for job " + getJobName(), e); 359 } 360 } 361 362 /** 363 * Returns an iterator on lines having a given totalSpend. 364 * 365 * @param dupValue 366 * @return an iterator on the report lines. 367 */ 368 public ReportIterator iterateOnDuplicateSpentBudget(long dupValue) { 369 try { 370 return new ReportIterator(linesBySpentBudget.subIndex(dupValue).entities()); 371 } catch (DatabaseException e) { 372 throw new IOFailure("Failed to read frontier BDB for job " + getJobName(), e); 373 } 374 } 375 376 /** 377 * Generates an Heritrix frontier report wrapper object by parsing the frontier report returned by the JMX 378 * controller as a string. 379 * 380 * @param jobName the Heritrix job name 381 * @param contentsAsString the text returned by the JMX call 382 * @return the report wrapper object 383 */ 384 public static FullFrontierReport parseContentsAsString(String jobName, String contentsAsString) { 385 386 FullFrontierReport report = new FullFrontierReport(jobName); 387 388 // First dump this possibly huge string to a file 389 File tmpDir = Settings.getFile(CommonSettings.CACHE_DIR); 390 File tmpFile = new File(tmpDir, jobName + "-" + System.currentTimeMillis() + ".txt"); 391 try { 392 tmpFile.createNewFile(); 393 BufferedWriter out = new BufferedWriter(new FileWriter(tmpFile)); 394 out.write(contentsAsString); 395 out.close(); 396 } catch (IOException e) { 397 LOG.error("Failed to create temporary file", e); 398 return report; 399 } 400 401 BufferedReader br; 402 try { 403 br = new BufferedReader(new FileReader(tmpFile)); 404 } catch (FileNotFoundException e) { 405 LOG.error("Failed to read temporary file", e); 406 return report; 407 } 408 409 try { 410 String lineToken = br.readLine(); // Discard header line 411 while ((lineToken = br.readLine()) != null) { 412 report.addLine(new FrontierReportLine(lineToken)); 413 } 414 415 br.close(); 416 } catch (IOException e) { 417 LOG.warn("Failed to close reader", e); 418 } catch (Throwable t) { 419 LOG.error("",t); 420 t.printStackTrace(System.err); 421 } finally { 422 FileUtils.remove(tmpFile); 423 } 424 425 return report; 426 } 427 428 /** 429 * Return the directory where the BDB is stored. 430 * 431 * @return the storage directory. 432 */ 433 File getStorageDir() { 434 return storageDir; 435 } 436 437}