001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.harvester.harvesting.frontier; 024 025import java.io.BufferedReader; 026import java.io.BufferedWriter; 027import java.io.ByteArrayInputStream; 028import java.io.File; 029import java.io.FileNotFoundException; 030import java.io.FileReader; 031import java.io.FileWriter; 032import java.io.IOException; 033import java.util.Iterator; 034 035import javax.xml.parsers.DocumentBuilder; 036import javax.xml.parsers.DocumentBuilderFactory; 037 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040import org.w3c.dom.Document; 041import org.w3c.dom.Element; 042import org.w3c.dom.Node; 043import org.w3c.dom.NodeList; 044 045import com.sleepycat.je.DatabaseException; 046import com.sleepycat.je.Environment; 047import com.sleepycat.je.EnvironmentConfig; 048import com.sleepycat.persist.EntityCursor; 049import com.sleepycat.persist.EntityStore; 050import com.sleepycat.persist.PrimaryIndex; 051import com.sleepycat.persist.SecondaryIndex; 052import com.sleepycat.persist.StoreConfig; 053import com.sleepycat.persist.model.Entity; 054import com.sleepycat.persist.model.KeyField; 055import com.sleepycat.persist.model.Persistent; 056import com.sleepycat.persist.model.PrimaryKey; 057import com.sleepycat.persist.model.Relationship; 058import com.sleepycat.persist.model.SecondaryKey; 059 060import dk.netarkivet.common.CommonSettings; 061import dk.netarkivet.common.exceptions.ArgumentNotValid; 062import dk.netarkivet.common.exceptions.IOFailure; 063import dk.netarkivet.common.utils.FileUtils; 064import dk.netarkivet.common.utils.Settings; 065 066/** 067 * Wraps an Heritrix 1 full frontier report. As these reports can be big in size, this implementation relies on Berkeley 068 * DB direct persistence layer to store the report lines, allowing to store the lines partially in memory, and on disk. 069 */ 070@SuppressWarnings({"serial"}) 071public class FullFrontierReport extends AbstractFrontierReport { 072 073 @Persistent 074 static class PersistentLineKey implements Comparable<PersistentLineKey>, FrontierReportLineOrderKey { 075 076 @KeyField(1) 077 long totalEnqueues; 078 079 @KeyField(2) 080 String domainName; 081 082 // Default empty constructor for BDB. 083 PersistentLineKey() { 084 085 } 086 087 public PersistentLineKey(FrontierReportLine l) { 088 this.domainName = l.getDomainName(); 089 this.totalEnqueues = l.getTotalEnqueues(); 090 } 091 092 public String getQueueId() { 093 return domainName; 094 } 095 096 public long getQueueSize() { 097 return totalEnqueues; 098 } 099 100 /** 101 * Compares first by decreasing queue size, then by domain name. 102 */ 103 @Override 104 public int compareTo(PersistentLineKey k) { 105 return FrontierReportLineNaturalOrder.getInstance().compare(this, k); 106 } 107 108 @Override 109 public String toString() { 110 return totalEnqueues + " " + domainName; 111 } 112 113 } 114 115 @Entity 116 static class PersistentLine extends FrontierReportLine { 117 118 @PrimaryKey 119 private PersistentLineKey primaryKey; 120 121 @SecondaryKey(relate = Relationship.ONE_TO_ONE) 122 private String domainNameKey; 123 124 @SecondaryKey(relate = Relationship.MANY_TO_ONE) 125 private Long totalSpendKey; 126 127 @SecondaryKey(relate = Relationship.MANY_TO_ONE) 128 private Long currentSizeKey; 129 130 // Default empty constructor for BDB. 131 PersistentLine() { 132 133 } 134 135 PersistentLine(FrontierReportLine reportLine) { 136 super(reportLine); 137 this.primaryKey = new PersistentLineKey(reportLine); 138 this.domainNameKey = reportLine.getDomainName(); 139 this.currentSizeKey = reportLine.getCurrentSize(); 140 this.totalSpendKey = reportLine.getTotalSpend(); 141 } 142 143 } 144 145 public class ReportIterator implements Iterator<FrontierReportLine> { 146 147 private final EntityCursor<PersistentLine> cursor; 148 private final Iterator<PersistentLine> iter; 149 150 /** 151 * Returns an iterator on the given sort key. 152 * 153 * @param cursor The cursor (sort key) to iterate on. 154 */ 155 ReportIterator(EntityCursor<PersistentLine> cursor) { 156 this.cursor = cursor; 157 iter = cursor.iterator(); 158 } 159 160 @Override 161 public boolean hasNext() { 162 return iter.hasNext(); 163 } 164 165 @Override 166 public FrontierReportLine next() { 167 return iter.next(); 168 } 169 170 @Override 171 public void remove() { 172 throw new ArgumentNotValid("Remove is not supported!"); 173 } 174 175 /** 176 * Close method should be called explicitly to free underlying resources! 177 */ 178 public void close() { 179 try { 180 cursor.close(); 181 } catch (DatabaseException e) { 182 LOG.error("Error closing entity cursor:\n" + e.getLocalizedMessage()); 183 } 184 } 185 186 } 187 188 private static final String WORKING_DIR = FullFrontierReport.class.getSimpleName(); 189 190 /** The logger for this class. */ 191 private static final Logger LOG = LoggerFactory.getLogger(FullFrontierReport.class); 192 193 /** 194 * The Berkeley DB JE environment. 195 */ 196 private final Environment dbEnvironment; 197 198 /** 199 * The BDB entity store. 200 */ 201 private final EntityStore store; 202 203 /** 204 * Primary index. 205 */ 206 private final PrimaryIndex<PersistentLineKey, PersistentLine> linesIndex; 207 208 /** 209 * Secondary index, per domain name. 210 */ 211 private final SecondaryIndex<String, PersistentLineKey, PersistentLine> linesByDomain; 212 213 /** 214 * Secondary index, per current size. 215 */ 216 private final SecondaryIndex<Long, PersistentLineKey, PersistentLine> linesByCurrentSize; 217 218 /** 219 * Secondary index, per spent budget. 220 */ 221 private final SecondaryIndex<Long, PersistentLineKey, PersistentLine> linesBySpentBudget; 222 223 /** 224 * The directory where the BDB is stored. 225 */ 226 private final File storageDir; 227 228 /** 229 * Builds an empty frontier report wrapper. 230 * 231 * @param jobName the Heritrix job name 232 */ 233 private FullFrontierReport(String jobName) { 234 super(jobName); 235 236 File workingDir = new File(Settings.getFile(CommonSettings.CACHE_DIR), WORKING_DIR); 237 238 this.storageDir = new File(workingDir, jobName); 239 if (!storageDir.mkdirs()) { 240 throw new IOFailure("Failed to create directory " + storageDir.getAbsolutePath()); 241 } 242 243 try { 244 EnvironmentConfig envConfig = new EnvironmentConfig(); 245 envConfig.setAllowCreate(true); 246 dbEnvironment = new Environment(storageDir, envConfig); 247 248 StoreConfig storeConfig = new StoreConfig(); 249 storeConfig.setAllowCreate(true); 250 251 store = new EntityStore(dbEnvironment, FrontierReportLine.class.getSimpleName() + "-" + jobName, 252 storeConfig); 253 254 linesIndex = store.getPrimaryIndex(PersistentLineKey.class, PersistentLine.class); 255 256 linesByDomain = store.getSecondaryIndex(linesIndex, String.class, "domainNameKey"); 257 258 linesByCurrentSize = store.getSecondaryIndex(linesIndex, Long.class, "currentSizeKey"); 259 260 linesBySpentBudget = store.getSecondaryIndex(linesIndex, Long.class, "totalSpendKey"); 261 262 } catch (DatabaseException e) { 263 throw new IOFailure("Failed to init frontier BDB for job " + jobName, e); 264 } 265 266 } 267 268 /** 269 * Releases all resources once this report is to be discarded. NB this method MUST be explicitly called! 270 */ 271 public void dispose() { 272 273 try { 274 store.close(); 275 dbEnvironment.cleanLog(); 276 dbEnvironment.close(); 277 } catch (DatabaseException e) { 278 throw new IOFailure("Failed to close frontier BDB for job " + getJobName(), e); 279 } 280 281 FileUtils.removeRecursively(storageDir); 282 } 283 284 @Override 285 public void addLine(FrontierReportLine line) { 286 try { 287 linesIndex.put(new PersistentLine(line)); 288 } catch (DatabaseException e) { 289 throw new IOFailure("Failed to store frontier report line for job " + getJobName(), e); 290 } 291 } 292 293 @Override 294 public FrontierReportLine getLineForDomain(String domainName) { 295 try { 296 return linesByDomain.get(domainName); 297 } catch (DatabaseException e) { 298 LOG.warn("Failed to get queue for domain " + domainName, e); 299 return null; 300 } 301 } 302 303 /** 304 * Returns an iterator where lines are ordered by primary key order: first by decreasing totalEnqueues, then by 305 * domain name natural order. 306 * 307 * @return an iterator on the report lines. 308 */ 309 public ReportIterator iterateOnTotalEnqueues() { 310 try { 311 return new ReportIterator(linesIndex.entities()); 312 } catch (DatabaseException e) { 313 throw new IOFailure("Failed to read frontier BDB for job " + getJobName(), e); 314 } 315 } 316 317 /** 318 * Returns an iterator where lines are ordered by domain name natural order. 319 * 320 * @return an iterator on the report lines. 321 */ 322 public ReportIterator iterateOnDomainName() { 323 try { 324 return new ReportIterator(linesByDomain.entities()); 325 } catch (DatabaseException e) { 326 throw new IOFailure("Failed to read frontier BDB for job " + getJobName(), e); 327 } 328 } 329 330 /** 331 * Returns an iterator where lines are ordered by increasing currentSize. 332 * 333 * @return an iterator on the report lines. 334 */ 335 public ReportIterator iterateOnCurrentSize() { 336 try { 337 return new ReportIterator(linesByCurrentSize.entities()); 338 } catch (DatabaseException e) { 339 throw new IOFailure("Failed to read frontier BDB for job " + getJobName(), e); 340 } 341 } 342 343 /** 344 * Returns an iterator on lines having a given currentSize. 345 * 346 * @param dupValue 347 * @return an iterator on the report lines. 348 */ 349 public ReportIterator iterateOnDuplicateCurrentSize(long dupValue) { 350 try { 351 return new ReportIterator(linesByCurrentSize.subIndex(dupValue).entities()); 352 } catch (DatabaseException e) { 353 throw new IOFailure("Failed to read frontier BDB for job " + getJobName(), e); 354 } 355 } 356 357 /** 358 * Returns an iterator where lines are ordered by increasing totalSpend. 359 * 360 * @return an iterator on the report lines. 361 */ 362 public ReportIterator iterateOnSpentBudget() { 363 try { 364 return new ReportIterator(linesBySpentBudget.entities()); 365 } catch (DatabaseException e) { 366 throw new IOFailure("Failed to read frontier BDB for job " + getJobName(), e); 367 } 368 } 369 370 /** 371 * Returns an iterator on lines having a given totalSpend. 372 * 373 * @param dupValue 374 * @return an iterator on the report lines. 375 */ 376 public ReportIterator iterateOnDuplicateSpentBudget(long dupValue) { 377 try { 378 return new ReportIterator(linesBySpentBudget.subIndex(dupValue).entities()); 379 } catch (DatabaseException e) { 380 throw new IOFailure("Failed to read frontier BDB for job " + getJobName(), e); 381 } 382 } 383 384 /** 385 * Generates an Heritrix frontier report wrapper object by parsing the frontier report returned by the REST API 386 * controller as XML 387 * 388 * @param jobName the Heritrix job name 389 * @param contentsAsString the text returned by the http REST call 390 * @return the report wrapper object 391 */ 392 public static FullFrontierReport parseContentsAsXML(String jobName, byte[] contentsAsXML, String tagName) { 393 //FIXME : instanciate an unique dBuilder 394 DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance(); 395 DocumentBuilder dBuilder; 396 try { 397 dBuilder = dbFactory.newDocumentBuilder(); 398 Document doc = dBuilder.parse((new ByteArrayInputStream(contentsAsXML))); 399 400 Element e = doc.getDocumentElement(); 401 NodeList nList = e.getElementsByTagName(tagName); 402 //get first (and normally unique) item 403 Node nNode = nList.item(0); 404 String contentAsString = nNode.getTextContent(); 405 return FullFrontierReport.parseContentsAsString(jobName, contentAsString); 406 } catch (Exception e) { 407 LOG.error("Failed to parse XML content", e); 408 return new FullFrontierReport(jobName); 409 } 410 } 411 412 /** 413 * Generates an Heritrix frontier report wrapper object by parsing the frontier report returned by the JMX 414 * controller as a string. 415 * 416 * @param jobName the Heritrix job name 417 * @param contentsAsString the text returned by the JMX call 418 * @return the report wrapper object 419 */ 420 public static FullFrontierReport parseContentsAsString(String jobName, String contentsAsString) { 421 422 FullFrontierReport report = new FullFrontierReport(jobName); 423 424 // First dump this possibly huge string to a file 425 File tmpDir = Settings.getFile(CommonSettings.CACHE_DIR); 426 File tmpFile = new File(tmpDir, jobName + "-" + System.currentTimeMillis() + ".txt"); 427 try { 428 tmpFile.createNewFile(); 429 BufferedWriter out = new BufferedWriter(new FileWriter(tmpFile)); 430 out.write(contentsAsString); 431 out.close(); 432 } catch (IOException e) { 433 LOG.error("Failed to create temporary file", e); 434 return report; 435 } 436 437 BufferedReader br; 438 try { 439 br = new BufferedReader(new FileReader(tmpFile)); 440 } catch (FileNotFoundException e) { 441 LOG.error("Failed to read temporary file", e); 442 return report; 443 } 444 445 try { 446 String lineToken = br.readLine(); // Discard header line 447 while ((lineToken = br.readLine()) != null) { 448 report.addLine(new FrontierReportLine(lineToken)); 449 } 450 451 br.close(); 452 } catch (IOException e) { 453 LOG.warn("Failed to close reader", e); 454 } catch (Throwable t) { 455 LOG.error("",t); 456 t.printStackTrace(System.err); 457 } finally { 458 FileUtils.remove(tmpFile); 459 } 460 461 return report; 462 } 463 464 /** 465 * Return the directory where the BDB is stored. 466 * 467 * @return the storage directory. 468 */ 469 File getStorageDir() { 470 return storageDir; 471 } 472 473}