001/*
002 * #%L
003 * Netarchivesuite - common
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.common.utils.cdx;
024
025import java.io.IOException;
026import java.io.OutputStream;
027import java.util.HashMap;
028import java.util.Map;
029
030import org.archive.io.warc.WARCRecord;
031import org.jwat.common.ByteCountingPushBackInputStream;
032import org.jwat.common.ContentType;
033import org.jwat.common.HttpHeader;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037import dk.netarkivet.common.Constants;
038import dk.netarkivet.common.exceptions.IOFailure;
039import dk.netarkivet.common.utils.ChecksumCalculator;
040import dk.netarkivet.common.utils.archive.ArchiveHeaderBase;
041import dk.netarkivet.common.utils.archive.ArchiveRecordBase;
042import dk.netarkivet.common.utils.archive.HeritrixArchiveRecordWrapper;
043import dk.netarkivet.common.utils.batch.WARCBatchFilter;
044import dk.netarkivet.common.utils.warc.WARCBatchJob;
045
046/**
047 * Batch job that extracts information to create a CDX file.
048 * <p>
049 * A CDX file contains sorted lines of metadata from the WARC files, with each line followed by the file and offset the
050 * record was found at, and optionally a checksum. The timeout of this job is 7 days. See
051 * http://www.archive.org/web/researcher/cdx_file_format.php
052 */
053@SuppressWarnings({"serial"})
054public class WARCExtractCDXJob extends WARCBatchJob {
055
056    /** Logger for this class. */
057    private static final Logger log = LoggerFactory.getLogger(WARCExtractCDXJob.class);
058
059    /** An encoding for the standard included metadata fields without checksum. */
060    private static final String[] STD_FIELDS_EXCL_CHECKSUM = {"A", "e", "b", "m", "n", "g", "v"};
061
062    /** An encoding for the standard included metadata fields with checksum. */
063    private static final String[] STD_FIELDS_INCL_CHECKSUM = {"A", "e", "b", "m", "n", "g", "v", "c"};
064
065    /** The fields to be included in CDX output. */
066    private String[] fields;
067
068    /** True if we put an MD5 in each CDX line as well. */
069    private boolean includeChecksum;
070
071    /**
072     * Constructs a new job for extracting CDX indexes.
073     *
074     * @param includeChecksum If true, an MD5 checksum is also written for each record. If false, it is not.
075     */
076    public WARCExtractCDXJob(boolean includeChecksum) {
077        this.fields = includeChecksum ? STD_FIELDS_INCL_CHECKSUM : STD_FIELDS_EXCL_CHECKSUM;
078        this.includeChecksum = includeChecksum;
079        batchJobTimeout = 7 * Constants.ONE_DAY_IN_MILLIES;
080    }
081
082    /**
083     * Equivalent to WARCExtractCDXJob(true).
084     */
085    public WARCExtractCDXJob() {
086        this(true);
087    }
088
089    /**
090     * Filters out the NON-RESPONSE records.
091     *
092     * @return The filter that defines what WARC records are wanted in the output CDX file.
093     * @see dk.netarkivet.common.utils.warc.WARCBatchJob#getFilter()
094     */
095    @Override
096    public WARCBatchFilter getFilter() {
097        // Per default we want to index all response records.
098        return WARCBatchFilter.EXCLUDE_NON_RESPONSE_RECORDS;
099    }
100
101    /**
102     * Initialize any data needed (none).
103     *
104     * @see dk.netarkivet.common.utils.warc.WARCBatchJob#initialize(OutputStream)
105     */
106    @Override
107    public void initialize(OutputStream os) {
108    }
109
110    /**
111     * Process this entry, reading metadata into the output stream.
112     *
113     * @throws IOFailure on trouble reading WARC record data
114     * @see dk.netarkivet.common.utils.warc.WARCBatchJob#processRecord(WARCRecord, OutputStream)
115     */
116    @Override
117    public void processRecord(WARCRecord sar, OutputStream os) {
118        log.trace("Processing WARCRecord with offset: {}", sar.getHeader().getOffset());
119        /*
120         * Fields are stored in a map so that it's easy to pull them out when looking at the fieldarray.
121         */
122        ArchiveRecordBase record = new HeritrixArchiveRecordWrapper(sar);
123        ArchiveHeaderBase header = record.getHeader();
124        Map<String, String> fieldsread = new HashMap<String, String>();
125        fieldsread.put("A", header.getUrl());
126        fieldsread.put("e", header.getIp());
127        fieldsread.put("b", header.getArcDateStr());
128        fieldsread.put("n", Long.toString(header.getLength()));
129
130        /*
131         * Note about offset: The original dk.netarkivet.ArcUtils.ExtractCDX yields offsets that are consistently 1
132         * lower than this version, which pulls the offset value from the org.archive.io.arc-classes. This difference is
133         * that the former classes count the preceeding newline as part of the ARC header.
134         */
135        fieldsread.put("v", Long.toString(sar.getHeader().getOffset()));
136        fieldsread.put("g", sar.getHeader().getReaderIdentifier());
137
138        String mimeType = header.getMimetype();
139        String msgType;
140        ContentType contentType = ContentType.parseContentType(mimeType);
141        boolean bResponse = false;
142        if (contentType != null) {
143            if ("application".equals(contentType.contentType) && "http".equals(contentType.mediaType)) {
144                msgType = contentType.getParameter("msgtype");
145                if ("response".equals(msgType)) {
146                    bResponse = true;
147                } else if ("request".equals(msgType)) {
148                }
149            }
150            mimeType = contentType.toStringShort();
151        }
152        ByteCountingPushBackInputStream pbin = new ByteCountingPushBackInputStream(sar, 8192);
153        HttpHeader httpResponse = null;
154        if (bResponse) {
155            try {
156                httpResponse = HttpHeader.processPayload(HttpHeader.HT_RESPONSE, pbin, header.getLength(), null);
157                if (httpResponse != null && httpResponse.contentType != null) {
158                    contentType = ContentType.parseContentType(httpResponse.contentType);
159                    if (contentType != null) {
160                        mimeType = contentType.toStringShort();
161                    }
162                }
163            } catch (IOException e) {
164                throw new IOFailure("Error reading WARC httpresponse header", e);
165            }
166        }
167        fieldsread.put("m", mimeType);
168
169        /* Only include checksum if necessary: */
170        if (includeChecksum) {
171            // InputStream instream = sar; //Note: ARCRecord extends InputStream
172            // fieldsread.put("c", MD5.generateMD5(instream));
173            fieldsread.put("c", ChecksumCalculator.calculateMd5(pbin));
174        }
175
176        if (httpResponse != null) {
177            try {
178                httpResponse.close();
179            } catch (IOException e) {
180                throw new IOFailure("Error closing WARC httpresponse header", e);
181            }
182        }
183
184        printFields(fieldsread, os);
185    }
186
187    /**
188     * End of the batch job.
189     *
190     * @see dk.netarkivet.common.utils.warc.WARCBatchJob#finish(OutputStream)
191     */
192    @Override
193    public void finish(OutputStream os) {
194    }
195
196    /**
197     * Print the values found for a set of fields. Prints the '-' character for any null values.
198     *
199     * @param fieldsread A hashtable of values indexed by field letters
200     * @param outstream The outputstream to write the values to
201     */
202    private void printFields(Map<String, String> fieldsread, OutputStream outstream) {
203        StringBuffer sb = new StringBuffer();
204
205        for (int i = 0; i < fields.length; i++) {
206            Object o = fieldsread.get(fields[i]);
207            sb.append((i > 0) ? " " : "");
208            sb.append((o == null) ? "-" : o.toString());
209        }
210        sb.append("\n");
211        try {
212            outstream.write(sb.toString().getBytes("UTF-8"));
213        } catch (IOException e) {
214            throw new IOFailure("Error writing CDX line '" + sb + "' to batch outstream", e);
215        }
216    }
217
218    /**
219     * @return Humanly readable description of this instance.
220     */
221    public String toString() {
222        return getClass().getName() + ", with Filter: " + getFilter() + ", include checksum = " + includeChecksum;
223    }
224
225}