001/* 002 * This file is part of the Heritrix web crawler (crawler.archive.org). 003 * 004 * Licensed to the Internet Archive (IA) by one or more individual 005 * contributors. 006 * 007 * The IA licenses this file to You under the Apache License, Version 2.0 008 * (the "License"); you may not use this file except in compliance with 009 * the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.archive.crawler.framework; 021 022import static org.archive.modules.CoreAttributeConstants.A_RUNTIME_EXCEPTION; 023import static org.archive.modules.fetcher.FetchStatusCodes.S_PROCESSING_THREAD_KILLED; 024import static org.archive.modules.fetcher.FetchStatusCodes.S_RUNTIME_EXCEPTION; 025import static org.archive.modules.fetcher.FetchStatusCodes.S_SERIOUS_ERROR; 026 027import java.io.PrintWriter; 028import java.lang.management.ManagementFactory; 029import java.lang.management.ThreadInfo; 030import java.lang.management.ThreadMXBean; 031import java.net.InetAddress; 032import java.util.LinkedHashMap; 033import java.util.Map; 034import java.util.logging.Level; 035import java.util.logging.Logger; 036 037import org.archive.io.SinkHandlerLogThread; 038import org.archive.modules.CrawlURI; 039import org.archive.modules.Processor; 040import org.archive.modules.ProcessorChain.ChainStatusReceiver; 041import org.archive.modules.fetcher.HostResolver; 042import org.archive.spring.KeyedProperties; 043import org.archive.util.ArchiveUtils; 044import org.archive.util.DevUtils; 045import org.archive.util.ProgressStatisticsReporter; 046import org.archive.util.Recorder; 047import org.archive.util.ReportUtils; 048import org.archive.util.Reporter; 049 050import com.sleepycat.util.RuntimeExceptionWrapper; 051 052/** 053 * One "worker thread"; asks for CrawlURIs, processes them, 054 * repeats unless told otherwise. 055 * 056 * @author Gordon Mohr 057 */ 058public class ToeThread extends Thread 059implements Reporter, ProgressStatisticsReporter, 060 HostResolver, SinkHandlerLogThread, ChainStatusReceiver { 061 062 public enum Step { 063 NASCENT, ABOUT_TO_GET_URI, FINISHED, 064 ABOUT_TO_BEGIN_PROCESSOR, HANDLING_RUNTIME_EXCEPTION, 065 ABOUT_TO_RETURN_URI, FINISHING_PROCESS 066 } 067 068 private static Logger logger = 069 Logger.getLogger("org.archive.crawler.framework.ToeThread"); 070 071 private CrawlController controller; 072 private int serialNumber; 073 074 /** 075 * Each ToeThead has an instance of HttpRecord that gets used 076 * over and over by each request. 077 * 078 * @see org.archive.util.RecorderMarker 079 */ 080 private Recorder httpRecorder = null; 081 082 // activity monitoring, debugging, and problem detection 083 private Step step = Step.NASCENT; 084 private long atStepSince; 085 private String currentProcessorName = ""; 086 087 private String coreName; 088 private CrawlURI currentCuri; 089 private long lastStartTime; 090 private long lastFinishTime; 091 092 093 // default priority; may not be meaningful in recent JVMs 094 private static final int DEFAULT_PRIORITY = Thread.NORM_PRIORITY-2; 095 096 // indicator that a thread is now surplus based on current desired 097 // count; it should wrap up cleanly 098 private volatile boolean shouldRetire = false; 099 100 /** 101 * Create a ToeThread 102 * 103 * @param g ToeThreadGroup 104 * @param sn serial number 105 */ 106 public ToeThread(ToePool g, int sn) { 107 // TODO: add crawl name? 108 super(g,"ToeThread #" + sn); 109 coreName="ToeThread #" + sn + ": "; 110 controller = g.getController(); 111 serialNumber = sn; 112 setPriority(DEFAULT_PRIORITY); 113 int outBufferSize = controller.getRecorderOutBufferBytes(); 114 int inBufferSize = controller.getRecorderInBufferBytes(); 115 httpRecorder = new Recorder(controller.getScratchDir().getFile(), 116 "tt" + sn + "http", outBufferSize, inBufferSize); 117 lastFinishTime = System.currentTimeMillis(); 118 } 119 120 /** (non-Javadoc) 121 * @see java.lang.Thread#run() 122 */ 123 public void run() { 124 String name = controller.getMetadata().getJobName(); 125 logger.fine(getName()+" started for order '"+name+"'"); 126 Recorder.setHttpRecorder(httpRecorder); 127 128 try { 129 while ( true ) { 130 ArchiveUtils.continueCheck(); 131 132 setStep(Step.ABOUT_TO_GET_URI, null); 133 134 CrawlURI curi = controller.getFrontier().next(); 135 136 137 synchronized(this) { 138 ArchiveUtils.continueCheck(); 139 setCurrentCuri(curi); 140 currentCuri.setThreadNumber(this.serialNumber); 141 lastStartTime = System.currentTimeMillis(); 142 currentCuri.setRecorder(httpRecorder); 143 } 144 145 try { 146 KeyedProperties.loadOverridesFrom(curi); 147 148 controller.getFetchChain().process(curi,this); 149 150 controller.getFrontier().beginDisposition(curi); 151 152 controller.getDispositionChain().process(curi,this); 153 154 } catch (RuntimeExceptionWrapper e) { 155 // Workaround to get cause from BDB 156 if(e.getCause() == null) { 157 e.initCause(e.getCause()); 158 } 159 recoverableProblem(e); 160 } catch (AssertionError ae) { 161 // This risks leaving crawl in fatally inconsistent state, 162 // but is often reasonable for per-Processor assertion problems 163 recoverableProblem(ae); 164 } catch (RuntimeException e) { 165 recoverableProblem(e); 166 } catch (InterruptedException e) { 167 if(currentCuri!=null) { 168 recoverableProblem(e); 169 Thread.interrupted(); // clear interrupt status 170 } else { 171 throw e; 172 } 173 } catch (StackOverflowError err) { 174 recoverableProblem(err); 175 } catch (Error err) { 176 // OutOfMemory and any others 177 seriousError(err); 178 } finally { 179 httpRecorder.endReplays(); 180 KeyedProperties.clearOverridesFrom(curi); 181 } 182 183 setStep(Step.ABOUT_TO_RETURN_URI, null); 184 ArchiveUtils.continueCheck(); 185 186 synchronized(this) { 187 // Tentative fix for https://sbforge.org/jira/browse/NAS-2560 188 try { 189 // Report the URI (currentCuri) being processed as having finished processing. 190 controller.getFrontier().finished(currentCuri); 191 } catch (NullPointerException e) { 192 // Let's try ignoring this NullPointerException and keep on processing 193 logger.log(Level.SEVERE, "NPE caught during call to .getFrontier().finished " 194 + " with argument currentCuri = '" + currentCuri + '"', e); 195 } 196 controller.getFrontier().endDisposition(); 197 setCurrentCuri(null); 198 } 199 curi = null; 200 201 setStep(Step.FINISHING_PROCESS, null); 202 lastFinishTime = System.currentTimeMillis(); 203 if(shouldRetire) { 204 break; // from while(true) 205 } 206 } 207 } catch (InterruptedException e) { 208 if(currentCuri!=null){ 209 logger.log(Level.SEVERE,"Interrupt leaving unfinished CrawlURI "+getName()+" - job may hang",e); 210 } 211 // thread interrupted, ok to end 212 logger.log(Level.FINE,this.getName()+ " ended with Interruption"); 213 } catch (Exception e) { 214 // everything else (including interruption) 215 logger.log(Level.SEVERE,"Fatal exception in "+getName(),e); 216 } catch (OutOfMemoryError err) { 217 seriousError(err); 218 } finally { 219 controller.getFrontier().endDisposition(); 220 221 } 222 223 setCurrentCuri(null); 224 // Do cleanup so that objects can be GC. 225 this.httpRecorder.closeRecorders(); 226 this.httpRecorder = null; 227 228 logger.fine(getName()+" finished for order '"+name+"'"); 229 setStep(Step.FINISHED, null); 230 controller = null; 231 } 232 233 /** 234 * Set currentCuri, updating thread name as appropriate 235 * @param curi 236 */ 237 private void setCurrentCuri(CrawlURI curi) { 238 if(curi==null) { 239 setName(coreName); 240 } else { 241 setName(coreName+curi); 242 } 243 currentCuri = curi; 244 } 245 246 /** 247 * @param s 248 */ 249 public void setStep(Step s, String procName) { 250 step=s; 251 atStepSince = System.currentTimeMillis(); 252 currentProcessorName = procName != null ? procName : ""; 253 } 254 255 public void atProcessor(Processor proc) { 256 setStep(Step.ABOUT_TO_BEGIN_PROCESSOR, proc.getBeanName()); 257 } 258 259 private void seriousError(Error err) { 260 // try to prevent timeslicing until we have a chance to deal with OOM 261 // Note that modern-day JVM priority indifference with native threads 262 // may make this priority-jumbling pointless 263 setPriority(DEFAULT_PRIORITY+1); 264 if (controller!=null) { 265 // hold all ToeThreads from proceeding to next processor 266 controller.freeReserveMemory(); 267 controller.requestCrawlPause(); 268 if (controller.getFrontier().getFrontierJournal() != null) { 269 controller.getFrontier().getFrontierJournal().seriousError( 270 getName() + err.getMessage()); 271 } 272 } 273 274 // OutOfMemory etc. 275 String extraInfo = DevUtils.extraInfo(); 276 System.err.println("<<<"); 277 System.err.println(ArchiveUtils.getLog17Date()); 278 System.err.println(err); 279 System.err.println(extraInfo); 280 err.printStackTrace(System.err); 281 282 if (controller!=null) { 283 PrintWriter pw = new PrintWriter(System.err); 284 controller.getToePool().compactReportTo(pw); 285 pw.flush(); 286 } 287 System.err.println(">>>"); 288// DevUtils.sigquitSelf(); 289 290 String context = "unknown"; 291 if(currentCuri!=null) { 292 // update fetch-status, saving original as annotation 293 currentCuri.getAnnotations().add("err="+err.getClass().getName()); 294 currentCuri.getAnnotations().add("os"+currentCuri.getFetchStatus()); 295 currentCuri.setFetchStatus(S_SERIOUS_ERROR); 296 context = currentCuri.shortReportLine() + " in " + currentProcessorName; 297 } 298 String message = "Serious error occured trying " + 299 "to process '" + context + "'\n" + extraInfo; 300 logger.log(Level.SEVERE, message.toString(), err); 301 setPriority(DEFAULT_PRIORITY); 302 } 303 304 /** 305 * Handling for exceptions and errors that are possibly recoverable. 306 * 307 * @param e 308 */ 309 private void recoverableProblem(Throwable e) { 310 Object previousStep = step; 311 setStep(Step.HANDLING_RUNTIME_EXCEPTION, null); 312 //e.printStackTrace(System.err); 313 currentCuri.setFetchStatus(S_RUNTIME_EXCEPTION); 314 // store exception temporarily for logging 315 currentCuri.getAnnotations().add("err="+e.getClass().getName()); 316 currentCuri.getData().put(A_RUNTIME_EXCEPTION, e); 317 String message = "Problem " + e + 318 " occured when trying to process '" 319 + currentCuri.toString() 320 + "' at step " + previousStep 321 + " in " + currentProcessorName +"\n"; 322 logger.log(Level.SEVERE, message.toString(), e); 323 } 324 325 326 /** 327 * @return Return toe thread serial number. 328 */ 329 public int getSerialNumber() { 330 return this.serialNumber; 331 } 332 333 /** Get the CrawlController acossiated with this thread. 334 * 335 * @return Returns the CrawlController. 336 */ 337 public CrawlController getController() { 338 return controller; 339 } 340 341 /** 342 * Terminates a thread. 343 * 344 * <p> Calling this method will ensure that the current thread will stop 345 * processing as soon as possible (note: this may be never). Meant to 346 * 'short circuit' hung threads. 347 * 348 * <p> Current crawl uri will have its fetch status set accordingly and 349 * will be immediately returned to the frontier. 350 * 351 * <p> As noted before, this does not ensure that the thread will stop 352 * running (ever). But once evoked it will not try and communicate with 353 * other parts of crawler and will terminate as soon as control is 354 * established. 355 */ 356 protected void kill(){ 357 this.interrupt(); 358 synchronized(this) { 359 if (currentCuri!=null) { 360 currentCuri.setFetchStatus(S_PROCESSING_THREAD_KILLED); 361 controller.getFrontier().finished(currentCuri); 362 } 363 } 364 } 365 366 /** 367 * @return Current step (For debugging/reporting, give abstract step 368 * where this thread is). 369 */ 370 public Object getStep() { 371 return step; 372 } 373 374 /** 375 * Is this thread validly processing a URI, not paused, waiting for 376 * a URI, or interrupted? 377 * @return whether thread is actively processing a URI 378 */ 379 public boolean isActive() { 380 // if alive and not waiting in/for frontier.next(), we're 'active' 381 return this.isAlive() && (currentCuri != null) && !isInterrupted(); 382 } 383 384 /** 385 * Request that this thread retire (exit cleanly) at the earliest 386 * opportunity. 387 */ 388 public void retire() { 389 shouldRetire = true; 390 } 391 392 /** 393 * Whether this thread should cleanly retire at the earliest 394 * opportunity. 395 * 396 * @return True if should retire. 397 */ 398 public boolean shouldRetire() { 399 return shouldRetire; 400 } 401 402 // 403 // Reporter implementation 404 // 405 406 /** 407 * Compiles and returns a report on its status. 408 * @param pw Where to print. 409 */ 410 @Override 411 public void reportTo(PrintWriter pw) { 412 // name is ignored for now: only one kind of report 413 414 pw.print("["); 415 pw.println(getName()); 416 417 // Make a local copy of the currentCuri reference in case it gets 418 // nulled while we're using it. We're doing this because 419 // alternative is synchronizing and we don't want to do this -- 420 // it causes hang ups as controller waits on a lock for this thread, 421 // something it gets easily enough on old threading model but something 422 // it can wait interminably for on NPTL threading model. 423 // See [ 994946 ] Pause/Terminate ignored on 2.6 kernel 1.5 JVM. 424 CrawlURI c = currentCuri; 425 if(c != null) { 426 pw.print(" "); 427 c.shortReportLineTo(pw); 428 pw.print(" "); 429 pw.print(c.getFetchAttempts()); 430 pw.print(" attempts"); 431 pw.println(); 432 pw.print(" "); 433 pw.print("in processor: "); 434 pw.print(currentProcessorName); 435 } else { 436 pw.print(" -no CrawlURI- "); 437 } 438 pw.println(); 439 440 long now = System.currentTimeMillis(); 441 long time = 0; 442 443 pw.print(" "); 444 if(lastFinishTime > lastStartTime) { 445 // That means we finished something after we last started something 446 // or in other words we are not working on anything. 447 pw.print("WAITING for "); 448 time = now - lastFinishTime; 449 } else if(lastStartTime > 0) { 450 // We are working on something 451 pw.print("ACTIVE for "); 452 time = now-lastStartTime; 453 } 454 pw.print(ArchiveUtils.formatMillisecondsToConventional(time)); 455 pw.println(); 456 457 pw.print(" "); 458 pw.print("step: "); 459 pw.print(step); 460 pw.print(" for "); 461 pw.print(ArchiveUtils.formatMillisecondsToConventional(System.currentTimeMillis()-atStepSince)); 462 pw.println(); 463 464 reportThread(this, pw); 465 pw.print("]"); 466 pw.println(); 467 468 pw.flush(); 469 } 470 471 /** 472 * @param t Thread 473 * @param pw PrintWriter 474 */ 475 static public void reportThread(Thread t, PrintWriter pw) { 476 ThreadMXBean tmxb = ManagementFactory.getThreadMXBean(); 477 ThreadInfo info = tmxb.getThreadInfo(t.getId()); 478 pw.print("Java Thread State: "); 479 pw.println(info.getThreadState()); 480 pw.print("Blocked/Waiting On: "); 481 if (info.getLockOwnerId() >= 0) { 482 pw.print(info.getLockName()); 483 pw.print(" which is owned by "); 484 pw.print(info.getLockOwnerName()); 485 pw.print("("); 486 pw.print(info.getLockOwnerId()); 487 pw.println(")"); 488 } else { 489 pw.println("NONE"); 490 } 491 492 StackTraceElement[] ste = t.getStackTrace(); 493 for(int i=0;i<ste.length;i++) { 494 pw.print(" "); 495 pw.print(ste[i].toString()); 496 pw.println(); 497 } 498 } 499 500 @Override 501 public Map<String, Object> shortReportMap() { 502 Map<String,Object> data = new LinkedHashMap<String, Object>(); 503 data.put("serialNumber", serialNumber); 504 CrawlURI c = currentCuri; 505 if (c != null) { 506 data.put("currentURI", c.toString()); 507 data.put("currentProcessor", currentProcessorName); 508 data.put("fetchAttempts", c.getFetchAttempts()); 509 } else { 510 data.put("currentURI", null); 511 } 512 513 long now = System.currentTimeMillis(); 514 long time = 0; 515 if (lastFinishTime > lastStartTime) { 516 data.put("status", "WAITING"); 517 time = now - lastFinishTime; 518 } else if (lastStartTime > 0) { 519 data.put("status", "ACTIVE"); 520 time = now - lastStartTime; 521 } 522 data.put("currentStatusElapsedMilliseconds", time); 523 data.put("currentStatusElapsedPretty", ArchiveUtils.formatMillisecondsToConventional(time)); 524 data.put("step", step); 525 return data; 526 } 527 528 /** 529 * @param w PrintWriter to write to. 530 */ 531 @Override 532 public void shortReportLineTo(PrintWriter w) 533 { 534 w.print("#"); 535 w.print(this.serialNumber); 536 537 // Make a local copy of the currentCuri reference in case it gets 538 // nulled while we're using it. We're doing this because 539 // alternative is synchronizing and we don't want to do this -- 540 // it causes hang ups as controller waits on a lock for this thread, 541 // something it gets easily enough on old threading model but something 542 // it can wait interminably for on NPTL threading model. 543 // See [ 994946 ] Pause/Terminate ignored on 2.6 kernel 1.5 JVM. 544 CrawlURI c = currentCuri; 545 if(c != null) { 546 w.print(" "); 547 w.print(currentProcessorName); 548 w.print(" "); 549 w.print(c.toString()); 550 w.print(" ("); 551 w.print(c.getFetchAttempts()); 552 w.print(") "); 553 } else { 554 w.print(" [no CrawlURI] "); 555 } 556 557 long now = System.currentTimeMillis(); 558 long time = 0; 559 560 if(lastFinishTime > lastStartTime) { 561 // That means we finished something after we last started something 562 // or in other words we are not working on anything. 563 w.print("WAITING for "); 564 time = now - lastFinishTime; 565 } else if(lastStartTime > 0) { 566 // We are working on something 567 w.print("ACTIVE for "); 568 time = now-lastStartTime; 569 } 570 w.print(ArchiveUtils.formatMillisecondsToConventional(time)); 571 w.print(" at "); 572 w.print(step); 573 w.print(" for "); 574 w.print(ArchiveUtils.formatMillisecondsToConventional(now-atStepSince)); 575 w.print("\n"); 576 w.flush(); 577 } 578 579 @Override 580 public String shortReportLegend() { 581 return "#serialNumber processorName currentUri (fetchAttempts) threadState threadStep"; 582 } 583 584 public String shortReportLine() { 585 return ReportUtils.shortReportLine(this); 586 } 587 588 public void progressStatisticsLine(PrintWriter writer) { 589 writer.print(getController().getStatisticsTracker() 590 .getSnapshot().getProgressStatisticsLine()); 591 writer.print("\n"); 592 } 593 594 public void progressStatisticsLegend(PrintWriter writer) { 595 writer.print(getController().getStatisticsTracker() 596 .progressStatisticsLegend()); 597 writer.print("\n"); 598 } 599 600 public String getCurrentProcessorName() { 601 return currentProcessorName; 602 } 603 604 605 public InetAddress resolve(String host) { 606 return controller.getServerCache().getHostFor(host).getIP(); 607 } 608}