001/*
002 * #%L
003 * Netarchivesuite - common
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.common.utils.cdx;
024
025import java.io.IOException;
026import java.io.OutputStream;
027import java.util.HashMap;
028import java.util.Map;
029
030import org.jwat.common.ByteCountingPushBackInputStream;
031import org.jwat.common.ContentType;
032import org.jwat.common.HttpHeader;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import dk.netarkivet.common.Constants;
037import dk.netarkivet.common.exceptions.IOFailure;
038import dk.netarkivet.common.utils.ChecksumCalculator;
039import dk.netarkivet.common.utils.archive.ArchiveBatchJob;
040import dk.netarkivet.common.utils.archive.ArchiveHeaderBase;
041import dk.netarkivet.common.utils.archive.ArchiveRecordBase;
042import dk.netarkivet.common.utils.batch.ArchiveBatchFilter;
043
044/**
045 * Batch job that extracts information to create a CDX file.
046 * <p>
047 * A CDX file contains sorted lines of metadata from the ARC/WARC files, with each line followed by the file and offset
048 * the record was found at, and optionally a checksum. The timeout of this job is 7 days. See
049 * http://www.archive.org/web/researcher/cdx_file_format.php
050 */
051@SuppressWarnings({"serial", "unused"})
052public class ArchiveExtractCDXJob extends ArchiveBatchJob {
053
054    /** Logger for this class. */
055    private static final Logger log = LoggerFactory.getLogger(ArchiveExtractCDXJob.class);
056
057    /** An encoding for the standard included metadata fields without checksum. */
058    private static final String[] STD_FIELDS_EXCL_CHECKSUM = {"A", "e", "b", "m", "n", "g", "v"};
059
060    /** An encoding for the standard included metadata fields with checksum. */
061    private static final String[] STD_FIELDS_INCL_CHECKSUM = {"A", "e", "b", "m", "n", "g", "v", "c"};
062
063    /** The fields to be included in CDX output. */
064    private String[] fields;
065
066    /** True if we put an MD5 in each CDX line as well. */
067    private boolean includeChecksum;
068
069    /**
070     * Constructs a new job for extracting CDX indexes.
071     *
072     * @param includeChecksum If true, an MD5 checksum is also written for each record. If false, it is not.
073     */
074    public ArchiveExtractCDXJob(boolean includeChecksum) {
075        this.fields = includeChecksum ? STD_FIELDS_INCL_CHECKSUM : STD_FIELDS_EXCL_CHECKSUM;
076        this.includeChecksum = includeChecksum;
077        batchJobTimeout = 7 * Constants.ONE_DAY_IN_MILLIES;
078    }
079
080    /**
081     * Equivalent to ArchiveExtractCDXJob(true).
082     */
083    public ArchiveExtractCDXJob() {
084        this(true);
085    }
086
087    /**
088     * Filters out the NON-RESPONSE records.
089     *
090     * @return The filter that defines what ARC/WARC records are wanted in the output CDX file.
091     * @see dk.netarkivet.common.utils.archive.ArchiveBatchJob#getFilter()
092     */
093    @Override
094    public ArchiveBatchFilter getFilter() {
095        return ArchiveBatchFilter.EXCLUDE_NON_RESPONSE_RECORDS;
096    }
097
098    /**
099     * Initialize any data needed (none).
100     *
101     * @see dk.netarkivet.common.utils.archive.ArchiveBatchJob#initialize(OutputStream)
102     */
103    @Override
104    public void initialize(OutputStream os) {
105    }
106
107    /**
108     * Process this entry, reading metadata into the output stream.
109     *
110     * @throws IOFailure on trouble reading arc record data
111     * @see dk.netarkivet.common.utils.archive.ArchiveBatchJob#processRecord(ArchiveRecordBase, OutputStream)
112     */
113    @Override
114    public void processRecord(ArchiveRecordBase record, OutputStream os) {
115        log.trace("Processing Archive Record with offset: {}", record.getHeader().getOffset());
116        /*
117         * Fields are stored in a map so that it's easy to pull them out when looking at the fieldarray.
118         */
119        ArchiveHeaderBase header = record.getHeader();
120        Map<String, String> fieldsread = new HashMap<String, String>();
121        fieldsread.put("A", header.getUrl());
122        fieldsread.put("e", header.getIp());
123        fieldsread.put("b", header.getArcDateStr());
124        fieldsread.put("n", Long.toString(header.getLength()));
125        fieldsread.put("g", record.getHeader().getArchiveFile().getName());
126        fieldsread.put("v", Long.toString(record.getHeader().getOffset()));
127
128        String mimeType = header.getMimetype();
129        String msgType;
130        ContentType contentType = ContentType.parseContentType(mimeType);
131        boolean bResponse = false;
132        boolean bRequest = false;
133        if (contentType != null) {
134            if ("application".equals(contentType.contentType) && "http".equals(contentType.mediaType)) {
135                msgType = contentType.getParameter("msgtype");
136                if ("response".equals(msgType)) {
137                    bResponse = true;
138                } else if ("request".equals(msgType)) {
139                    bRequest = true;
140                }
141            }
142            mimeType = contentType.toStringShort();
143        }
144        ByteCountingPushBackInputStream pbin = new ByteCountingPushBackInputStream(record.getInputStream(), 8192);
145        HttpHeader httpResponse = null;
146        if (bResponse) {
147            try {
148                httpResponse = HttpHeader.processPayload(HttpHeader.HT_RESPONSE, pbin, header.getLength(), null);
149                if (httpResponse != null && httpResponse.contentType != null) {
150                    contentType = ContentType.parseContentType(httpResponse.contentType);
151                    if (contentType != null) {
152                        mimeType = contentType.toStringShort();
153                    }
154                }
155            } catch (IOException e) {
156                throw new IOFailure("Error reading httpresponse header", e);
157            }
158        }
159        fieldsread.put("m", mimeType);
160
161        /* Only include checksum if necessary: */
162        if (includeChecksum) {
163            // InputStream instream = sar; //Note: ARCRecord extends InputStream
164            // fieldsread.put("c", MD5.generateMD5(instream));
165            fieldsread.put("c", ChecksumCalculator.calculateMd5(pbin));
166        }
167
168        if (httpResponse != null) {
169            try {
170                httpResponse.close();
171            } catch (IOException e) {
172                throw new IOFailure("Error closing httpresponse header", e);
173            }
174        }
175
176        printFields(fieldsread, os);
177    }
178
179    /**
180     * End of the batch job.
181     *
182     * @see dk.netarkivet.common.utils.arc.ARCBatchJob#finish(OutputStream)
183     */
184    @Override
185    public void finish(OutputStream os) {
186    }
187
188    /**
189     * Print the values found for a set of fields. Prints the '-' character for any null values.
190     *
191     * @param fieldsread A hashtable of values indexed by field letters
192     * @param outstream The outputstream to write the values to
193     */
194    private void printFields(Map<String, String> fieldsread, OutputStream outstream) {
195        StringBuffer sb = new StringBuffer();
196
197        for (int i = 0; i < fields.length; i++) {
198            Object o = fieldsread.get(fields[i]);
199            sb.append((i > 0) ? " " : "");
200            sb.append((o == null) ? "-" : o.toString());
201        }
202        sb.append("\n");
203        try {
204            outstream.write(sb.toString().getBytes("UTF-8"));
205        } catch (IOException e) {
206            throw new IOFailure("Error writing CDX line '" + sb + "' to batch outstream", e);
207        }
208    }
209
210    /**
211     * @return Humanly readable description of this instance.
212     */
213    public String toString() {
214        return getClass().getName() + ", with Filter: " + getFilter() + ", include checksum = " + includeChecksum;
215    }
216
217}