001/*
002 * #%L
003 * Netarchivesuite - wayback
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 *
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 *
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023
024package dk.netarkivet.wayback.batch;
025
026import java.io.IOException;
027import java.io.OutputStream;
028
029import org.archive.io.warc.WARCRecord;
030import org.archive.wayback.UrlCanonicalizer;
031import org.archive.wayback.core.CaptureSearchResult;
032import org.archive.wayback.resourceindex.cdx.SearchResultToCDXLineAdapter;
033import org.archive.wayback.resourcestore.indexer.WARCRecordToSearchResultAdapter;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037import dk.netarkivet.common.Constants;
038import dk.netarkivet.common.exceptions.IOFailure;
039import dk.netarkivet.common.utils.batch.WARCBatchFilter;
040import dk.netarkivet.common.utils.warc.WARCBatchJob;
041
042/**
043 * Returns a cdx file using the appropriate format for wayback, including canonicalisation of urls. The returned files
044 * are unsorted.
045 */
046@SuppressWarnings({"serial"})
047public class WaybackCDXExtractionWARCBatchJob extends WARCBatchJob {
048
049    /** Logger for this class. */
050    private static final Logger log = LoggerFactory.getLogger(WaybackCDXExtractionWARCBatchJob.class);
051
052    /** Utility for converting an WArcRecord to a CaptureSearchResult (wayback's representation of a CDX record). */
053    private WARCRecordToSearchResultAdapter aToSAdapter;
054
055    /** Utility for converting a wayback CaptureSearchResult to a String representing a line in a CDX file. */
056    private SearchResultToCDXLineAdapter srToCDXAdapter;
057
058    /**
059     * Constructor which set timeout to one day.
060     */
061    public WaybackCDXExtractionWARCBatchJob() {
062        batchJobTimeout = Constants.ONE_DAY_IN_MILLIES;
063    }
064
065    /**
066     * Set the filter, so only response records are currently processed.
067     */
068    @Override
069    public WARCBatchFilter getFilter() {
070        return WARCBatchFilter.EXCLUDE_NON_RESPONSE_RECORDS;
071    }
072
073    /**
074     * Alternate constructor, where a timeout can be set.
075     *
076     * @param timeout specific timeout period
077     */
078    public WaybackCDXExtractionWARCBatchJob(long timeout) {
079        batchJobTimeout = timeout;
080    }
081
082    /**
083     * Initializes the private fields of this class. Some of these are relatively heavy objects, so it is important that
084     * they are only initialised once.
085     *
086     * @param os unused argument
087     */
088    @Override
089    public void initialize(OutputStream os) {
090        log.info("Starting a {}", this.getClass().getName());
091        aToSAdapter = new WARCRecordToSearchResultAdapter();
092        UrlCanonicalizer uc = UrlCanonicalizerFactory.getDefaultUrlCanonicalizer();
093        aToSAdapter.setCanonicalizer(uc);
094        srToCDXAdapter = new SearchResultToCDXLineAdapter();
095    }
096
097    /**
098     * Does nothing except log the end of the job.
099     *
100     * @param os unused argument.
101     */
102    public void finish(OutputStream os) {
103        log.info("Finishing the {}", this.getClass().getName());
104        // No cleanup required
105    }
106
107    /**
108     * For each response WARCRecord it writes one CDX line (including newline) to the output. If an warcrecord cannot be
109     * converted to a CDX record for any reason then any resulting exception is caught and logged.
110     *
111     * @param record the WARCRecord to be indexed.
112     * @param os the OutputStream to which output is written.
113     */
114    @Override
115    public void processRecord(WARCRecord record, OutputStream os) {
116        CaptureSearchResult csr = null;
117        try {
118            csr = aToSAdapter.adapt(record);
119        } catch (Exception e) {
120            log.error("Exception processing WARC record:", e);
121        }
122        try {
123            if (csr != null) {
124                os.write(srToCDXAdapter.adapt(csr).getBytes());
125                os.write("\n".getBytes());
126            }
127        } catch (IOException e) {
128            throw new IOFailure("Write error in batch job", e);
129        } catch (Exception e) {
130            log.error("Exception processing WARC record:", e);
131        }
132    }
133
134}