001/*
002 *  This file is part of the Heritrix web crawler (crawler.archive.org).
003 *
004 *  Licensed to the Internet Archive (IA) by one or more individual 
005 *  contributors. 
006 *
007 *  The IA licenses this file to You under the Apache License, Version 2.0
008 *  (the "License"); you may not use this file except in compliance with
009 *  the License.  You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 *  Unless required by applicable law or agreed to in writing, software
014 *  distributed under the License is distributed on an "AS IS" BASIS,
015 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 *  See the License for the specific language governing permissions and
017 *  limitations under the License.
018 */
019
020package org.archive.crawler.framework;
021
022import static org.archive.modules.CoreAttributeConstants.A_RUNTIME_EXCEPTION;
023import static org.archive.modules.fetcher.FetchStatusCodes.S_PROCESSING_THREAD_KILLED;
024import static org.archive.modules.fetcher.FetchStatusCodes.S_RUNTIME_EXCEPTION;
025import static org.archive.modules.fetcher.FetchStatusCodes.S_SERIOUS_ERROR;
026
027import java.io.PrintWriter;
028import java.lang.management.ManagementFactory;
029import java.lang.management.ThreadInfo;
030import java.lang.management.ThreadMXBean;
031import java.net.InetAddress;
032import java.util.LinkedHashMap;
033import java.util.Map;
034import java.util.logging.Level;
035import java.util.logging.Logger;
036
037import org.archive.io.SinkHandlerLogThread;
038import org.archive.modules.CrawlURI;
039import org.archive.modules.Processor;
040import org.archive.modules.ProcessorChain.ChainStatusReceiver;
041import org.archive.modules.fetcher.HostResolver;
042import org.archive.spring.KeyedProperties;
043import org.archive.util.ArchiveUtils;
044import org.archive.util.DevUtils;
045import org.archive.util.ProgressStatisticsReporter;
046import org.archive.util.Recorder;
047import org.archive.util.ReportUtils;
048import org.archive.util.Reporter;
049
050import com.sleepycat.util.RuntimeExceptionWrapper;
051
052/**
053 * One "worker thread"; asks for CrawlURIs, processes them,
054 * repeats unless told otherwise.
055 *
056 * @author Gordon Mohr
057 */
058public class ToeThread extends Thread
059implements Reporter, ProgressStatisticsReporter, 
060           HostResolver, SinkHandlerLogThread, ChainStatusReceiver {
061
062    public enum Step {
063        NASCENT, ABOUT_TO_GET_URI, FINISHED, 
064        ABOUT_TO_BEGIN_PROCESSOR, HANDLING_RUNTIME_EXCEPTION, 
065        ABOUT_TO_RETURN_URI, FINISHING_PROCESS
066    }
067
068    private static Logger logger =
069        Logger.getLogger("org.archive.crawler.framework.ToeThread");
070
071    private CrawlController controller;
072    private int serialNumber;
073    
074    /**
075     * Each ToeThead has an instance of HttpRecord that gets used
076     * over and over by each request.
077     * 
078     * @see org.archive.util.RecorderMarker
079     */
080    private Recorder httpRecorder = null;
081
082    // activity monitoring, debugging, and problem detection
083    private Step step = Step.NASCENT;
084    private long atStepSince;
085    private String currentProcessorName = "";
086    
087    private String coreName;
088    private CrawlURI currentCuri;
089    private long lastStartTime;
090    private long lastFinishTime;
091
092    
093    // default priority; may not be meaningful in recent JVMs
094    private static final int DEFAULT_PRIORITY = Thread.NORM_PRIORITY-2;
095    
096    // indicator that a thread is now surplus based on current desired
097    // count; it should wrap up cleanly
098    private volatile boolean shouldRetire = false;
099    
100    /**
101     * Create a ToeThread
102     * 
103     * @param g ToeThreadGroup
104     * @param sn serial number
105     */
106    public ToeThread(ToePool g, int sn) {
107        // TODO: add crawl name?
108        super(g,"ToeThread #" + sn);
109        coreName="ToeThread #" + sn + ": ";
110        controller = g.getController();
111        serialNumber = sn;
112        setPriority(DEFAULT_PRIORITY);
113        int outBufferSize = controller.getRecorderOutBufferBytes();
114        int inBufferSize = controller.getRecorderInBufferBytes();
115        httpRecorder = new Recorder(controller.getScratchDir().getFile(),
116            "tt" + sn + "http", outBufferSize, inBufferSize);
117        lastFinishTime = System.currentTimeMillis();
118    }
119
120    /** (non-Javadoc)
121     * @see java.lang.Thread#run()
122     */
123    public void run() {
124        String name = controller.getMetadata().getJobName();
125        logger.fine(getName()+" started for order '"+name+"'");
126        Recorder.setHttpRecorder(httpRecorder); 
127        
128        try {
129            while ( true ) {
130                ArchiveUtils.continueCheck();
131                
132                setStep(Step.ABOUT_TO_GET_URI, null);
133
134                CrawlURI curi = controller.getFrontier().next();
135                
136                
137                synchronized(this) {
138                    ArchiveUtils.continueCheck();
139                    setCurrentCuri(curi);
140                    currentCuri.setThreadNumber(this.serialNumber);
141                    lastStartTime = System.currentTimeMillis();
142                    currentCuri.setRecorder(httpRecorder);
143                }
144                
145                try {
146                    KeyedProperties.loadOverridesFrom(curi);
147                    
148                    controller.getFetchChain().process(curi,this);
149                    
150                    controller.getFrontier().beginDisposition(curi);
151                    
152                    controller.getDispositionChain().process(curi,this);
153  
154                } catch (RuntimeExceptionWrapper e) {
155                    // Workaround to get cause from BDB
156                    if(e.getCause() == null) {
157                        e.initCause(e.getCause());
158                    }
159                    recoverableProblem(e);
160                } catch (AssertionError ae) {
161                    // This risks leaving crawl in fatally inconsistent state, 
162                    // but is often reasonable for per-Processor assertion problems 
163                    recoverableProblem(ae);
164                } catch (RuntimeException e) {
165                    recoverableProblem(e);
166                } catch (InterruptedException e) {
167                    if(currentCuri!=null) {
168                        recoverableProblem(e);
169                        Thread.interrupted(); // clear interrupt status
170                    } else {
171                        throw e;
172                    }
173                } catch (StackOverflowError err) {
174                    recoverableProblem(err);
175                } catch (Error err) {
176                    // OutOfMemory and any others
177                    seriousError(err); 
178                } finally {
179                    httpRecorder.endReplays();
180                    KeyedProperties.clearOverridesFrom(curi); 
181                }
182                
183                setStep(Step.ABOUT_TO_RETURN_URI, null);
184                ArchiveUtils.continueCheck();
185
186                synchronized(this) {
187                        // Tentative fix for https://sbforge.org/jira/browse/NAS-2560
188                        try {
189                                // Report the URI (currentCuri) being processed as having finished processing.
190                                controller.getFrontier().finished(currentCuri);
191                        } catch (NullPointerException e) {
192                                // Let's try ignoring this NullPointerException and keep on processing
193                                logger.log(Level.SEVERE, "NPE caught during call to .getFrontier().finished "
194                                                + " with argument currentCuri = '" + currentCuri + '"', e);
195                        }
196                    controller.getFrontier().endDisposition();
197                    setCurrentCuri(null);
198                }
199                curi = null;
200                
201                setStep(Step.FINISHING_PROCESS, null);
202                lastFinishTime = System.currentTimeMillis();
203                if(shouldRetire) {
204                    break; // from while(true)
205                }
206            }
207        } catch (InterruptedException e) {
208            if(currentCuri!=null){
209                logger.log(Level.SEVERE,"Interrupt leaving unfinished CrawlURI "+getName()+" - job may hang",e);
210            }
211            // thread interrupted, ok to end
212            logger.log(Level.FINE,this.getName()+ " ended with Interruption");
213        } catch (Exception e) {
214            // everything else (including interruption)
215            logger.log(Level.SEVERE,"Fatal exception in "+getName(),e);
216        } catch (OutOfMemoryError err) {
217            seriousError(err);
218        } finally {
219            controller.getFrontier().endDisposition();
220
221        }
222
223        setCurrentCuri(null);
224        // Do cleanup so that objects can be GC.
225        this.httpRecorder.closeRecorders();
226        this.httpRecorder = null;
227
228        logger.fine(getName()+" finished for order '"+name+"'");
229        setStep(Step.FINISHED, null);
230        controller = null;
231    }
232
233    /**
234     * Set currentCuri, updating thread name as appropriate
235     * @param curi
236     */
237    private void setCurrentCuri(CrawlURI curi) {
238        if(curi==null) {
239            setName(coreName);
240        } else {
241            setName(coreName+curi);
242        }
243        currentCuri = curi;
244    }
245
246    /**
247     * @param s
248     */
249    public void setStep(Step s, String procName) {
250        step=s;
251        atStepSince = System.currentTimeMillis();
252        currentProcessorName = procName != null ? procName : "";
253    }
254    
255    public void atProcessor(Processor proc) {
256        setStep(Step.ABOUT_TO_BEGIN_PROCESSOR, proc.getBeanName());
257    }
258
259    private void seriousError(Error err) {
260        // try to prevent timeslicing until we have a chance to deal with OOM
261        // Note that modern-day JVM priority indifference with native threads
262        // may make this priority-jumbling pointless
263        setPriority(DEFAULT_PRIORITY+1);  
264        if (controller!=null) {
265            // hold all ToeThreads from proceeding to next processor
266            controller.freeReserveMemory();
267            controller.requestCrawlPause();
268            if (controller.getFrontier().getFrontierJournal() != null) {
269                controller.getFrontier().getFrontierJournal().seriousError(
270                    getName() + err.getMessage());
271            }
272        }
273        
274        // OutOfMemory etc.
275        String extraInfo = DevUtils.extraInfo();
276        System.err.println("<<<");
277        System.err.println(ArchiveUtils.getLog17Date());
278        System.err.println(err);
279        System.err.println(extraInfo);
280        err.printStackTrace(System.err);
281        
282        if (controller!=null) {
283            PrintWriter pw = new PrintWriter(System.err);
284            controller.getToePool().compactReportTo(pw);
285            pw.flush();
286        }
287        System.err.println(">>>");
288//        DevUtils.sigquitSelf();
289        
290        String context = "unknown";
291        if(currentCuri!=null) {
292            // update fetch-status, saving original as annotation
293            currentCuri.getAnnotations().add("err="+err.getClass().getName());
294            currentCuri.getAnnotations().add("os"+currentCuri.getFetchStatus());
295                        currentCuri.setFetchStatus(S_SERIOUS_ERROR);
296            context = currentCuri.shortReportLine() + " in " + currentProcessorName;
297         }
298        String message = "Serious error occured trying " +
299            "to process '" + context + "'\n" + extraInfo;
300        logger.log(Level.SEVERE, message.toString(), err);
301        setPriority(DEFAULT_PRIORITY);
302    }
303
304    /**
305     * Handling for exceptions and errors that are possibly recoverable.
306     * 
307     * @param e
308     */
309    private void recoverableProblem(Throwable e) {
310        Object previousStep = step;
311        setStep(Step.HANDLING_RUNTIME_EXCEPTION, null);
312        //e.printStackTrace(System.err);
313        currentCuri.setFetchStatus(S_RUNTIME_EXCEPTION);
314        // store exception temporarily for logging
315        currentCuri.getAnnotations().add("err="+e.getClass().getName());
316        currentCuri.getData().put(A_RUNTIME_EXCEPTION, e);
317        String message = "Problem " + e + 
318                " occured when trying to process '"
319                + currentCuri.toString()
320                + "' at step " + previousStep 
321                + " in " + currentProcessorName +"\n";
322        logger.log(Level.SEVERE, message.toString(), e);
323    }
324
325
326    /**
327     * @return Return toe thread serial number.
328     */
329    public int getSerialNumber() {
330        return this.serialNumber;
331    }
332    
333    /** Get the CrawlController acossiated with this thread.
334     *
335     * @return Returns the CrawlController.
336     */
337    public CrawlController getController() {
338        return controller;
339    }
340
341    /**
342     * Terminates a thread.
343     *
344     * <p> Calling this method will ensure that the current thread will stop
345     * processing as soon as possible (note: this may be never). Meant to
346     * 'short circuit' hung threads.
347     *
348     * <p> Current crawl uri will have its fetch status set accordingly and
349     * will be immediately returned to the frontier.
350     *
351     * <p> As noted before, this does not ensure that the thread will stop
352     * running (ever). But once evoked it will not try and communicate with
353     * other parts of crawler and will terminate as soon as control is
354     * established.
355     */
356    protected void kill(){
357        this.interrupt();
358        synchronized(this) {
359            if (currentCuri!=null) {
360                currentCuri.setFetchStatus(S_PROCESSING_THREAD_KILLED);
361                controller.getFrontier().finished(currentCuri);
362             }
363        }
364    }
365
366        /**
367         * @return Current step (For debugging/reporting, give abstract step
368     * where this thread is).
369         */
370        public Object getStep() {
371                return step;
372        }
373
374    /**
375     * Is this thread validly processing a URI, not paused, waiting for 
376     * a URI, or interrupted?
377     * @return whether thread is actively processing a URI
378     */
379    public boolean isActive() {
380        // if alive and not waiting in/for frontier.next(), we're 'active'
381        return this.isAlive() && (currentCuri != null) && !isInterrupted();
382    }
383    
384    /**
385     * Request that this thread retire (exit cleanly) at the earliest
386     * opportunity.
387     */
388    public void retire() {
389        shouldRetire = true;
390    }
391
392    /**
393     * Whether this thread should cleanly retire at the earliest 
394     * opportunity. 
395     * 
396     * @return True if should retire.
397     */
398    public boolean shouldRetire() {
399        return shouldRetire;
400    }
401
402    //
403    // Reporter implementation
404    // 
405    
406    /**
407     * Compiles and returns a report on its status.
408     * @param pw Where to print.
409     */
410    @Override
411    public void reportTo(PrintWriter pw) {
412        // name is ignored for now: only one kind of report
413        
414        pw.print("[");
415        pw.println(getName());
416
417        // Make a local copy of the currentCuri reference in case it gets
418        // nulled while we're using it.  We're doing this because
419        // alternative is synchronizing and we don't want to do this --
420        // it causes hang ups as controller waits on a lock for this thread,
421        // something it gets easily enough on old threading model but something
422        // it can wait interminably for on NPTL threading model.
423        // See [ 994946 ] Pause/Terminate ignored on 2.6 kernel 1.5 JVM.
424        CrawlURI c = currentCuri;
425        if(c != null) {
426            pw.print(" ");
427            c.shortReportLineTo(pw);
428            pw.print("    ");
429            pw.print(c.getFetchAttempts());
430            pw.print(" attempts");
431            pw.println();
432            pw.print("    ");
433            pw.print("in processor: ");
434            pw.print(currentProcessorName);
435        } else {
436            pw.print(" -no CrawlURI- ");
437        }
438        pw.println();
439
440        long now = System.currentTimeMillis();
441        long time = 0;
442
443        pw.print("    ");
444        if(lastFinishTime > lastStartTime) {
445            // That means we finished something after we last started something
446            // or in other words we are not working on anything.
447            pw.print("WAITING for ");
448            time = now - lastFinishTime;
449        } else if(lastStartTime > 0) {
450            // We are working on something
451            pw.print("ACTIVE for ");
452            time = now-lastStartTime;
453        }
454        pw.print(ArchiveUtils.formatMillisecondsToConventional(time));
455        pw.println();
456
457        pw.print("    ");
458        pw.print("step: ");
459        pw.print(step);
460        pw.print(" for ");
461        pw.print(ArchiveUtils.formatMillisecondsToConventional(System.currentTimeMillis()-atStepSince));
462        pw.println();
463
464        reportThread(this, pw);
465        pw.print("]");
466        pw.println();
467        
468        pw.flush();
469    }
470
471    /**
472     * @param t Thread
473     * @param pw PrintWriter
474     */
475    static public void reportThread(Thread t, PrintWriter pw) {
476        ThreadMXBean tmxb = ManagementFactory.getThreadMXBean();
477        ThreadInfo info = tmxb.getThreadInfo(t.getId());
478        pw.print("Java Thread State: ");
479        pw.println(info.getThreadState());
480        pw.print("Blocked/Waiting On: ");
481        if (info.getLockOwnerId() >= 0) {
482            pw.print(info.getLockName());
483            pw.print(" which is owned by ");
484            pw.print(info.getLockOwnerName());
485            pw.print("(");
486            pw.print(info.getLockOwnerId());
487            pw.println(")");
488        } else {
489            pw.println("NONE");
490        }
491        
492        StackTraceElement[] ste = t.getStackTrace();
493        for(int i=0;i<ste.length;i++) {
494            pw.print("    ");
495            pw.print(ste[i].toString());
496            pw.println();
497        }
498    }
499
500    @Override
501    public Map<String, Object> shortReportMap() {
502        Map<String,Object> data = new LinkedHashMap<String, Object>();
503        data.put("serialNumber", serialNumber);
504        CrawlURI c = currentCuri;
505        if (c != null) {
506            data.put("currentURI", c.toString());
507            data.put("currentProcessor", currentProcessorName);
508            data.put("fetchAttempts", c.getFetchAttempts());
509        } else {
510            data.put("currentURI", null);
511        }
512
513        long now = System.currentTimeMillis();
514        long time = 0;
515        if (lastFinishTime > lastStartTime) {
516            data.put("status", "WAITING");
517            time = now - lastFinishTime;
518        } else if (lastStartTime > 0) {
519            data.put("status", "ACTIVE");
520            time = now - lastStartTime;
521        }
522        data.put("currentStatusElapsedMilliseconds", time);
523        data.put("currentStatusElapsedPretty", ArchiveUtils.formatMillisecondsToConventional(time));
524        data.put("step", step);
525        return data;
526    }
527
528    /**
529     * @param w PrintWriter to write to.
530     */
531    @Override
532    public void shortReportLineTo(PrintWriter w)
533    {
534        w.print("#");
535        w.print(this.serialNumber);
536
537        // Make a local copy of the currentCuri reference in case it gets
538        // nulled while we're using it.  We're doing this because
539        // alternative is synchronizing and we don't want to do this --
540        // it causes hang ups as controller waits on a lock for this thread,
541        // something it gets easily enough on old threading model but something
542        // it can wait interminably for on NPTL threading model.
543        // See [ 994946 ] Pause/Terminate ignored on 2.6 kernel 1.5 JVM.
544        CrawlURI c = currentCuri;
545        if(c != null) {
546            w.print(" ");
547            w.print(currentProcessorName);
548            w.print(" ");
549            w.print(c.toString());
550            w.print(" (");
551            w.print(c.getFetchAttempts());
552            w.print(") ");
553        } else {
554            w.print(" [no CrawlURI] ");
555        }
556        
557        long now = System.currentTimeMillis();
558        long time = 0;
559
560        if(lastFinishTime > lastStartTime) {
561            // That means we finished something after we last started something
562            // or in other words we are not working on anything.
563            w.print("WAITING for ");
564            time = now - lastFinishTime;
565        } else if(lastStartTime > 0) {
566            // We are working on something
567            w.print("ACTIVE for ");
568            time = now-lastStartTime;
569        }
570        w.print(ArchiveUtils.formatMillisecondsToConventional(time));
571        w.print(" at ");
572        w.print(step);
573        w.print(" for ");
574        w.print(ArchiveUtils.formatMillisecondsToConventional(now-atStepSince));
575        w.print("\n");
576        w.flush();
577    }
578
579    @Override
580    public String shortReportLegend() {
581        return "#serialNumber processorName currentUri (fetchAttempts) threadState threadStep";
582    }
583
584    public String shortReportLine() {
585        return ReportUtils.shortReportLine(this);
586    }
587
588    public void progressStatisticsLine(PrintWriter writer) {
589        writer.print(getController().getStatisticsTracker()
590            .getSnapshot().getProgressStatisticsLine());
591        writer.print("\n");
592    }
593
594    public void progressStatisticsLegend(PrintWriter writer) {
595        writer.print(getController().getStatisticsTracker()
596            .progressStatisticsLegend());
597        writer.print("\n");
598    }
599    
600    public String getCurrentProcessorName() {
601        return currentProcessorName;
602    }
603    
604    
605    public InetAddress resolve(String host) {
606        return controller.getServerCache().getHostFor(host).getIP();
607    }
608}