001/*
002 * #%L
003 * Netarchivesuite - common
004 * %%
005 * Copyright (C) 2005 - 2018 The Royal Danish Library, 
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.common.utils.cdx;
024
025import java.io.IOException;
026import java.io.OutputStream;
027import java.util.HashMap;
028import java.util.Map;
029
030import org.jwat.common.ByteCountingPushBackInputStream;
031import org.jwat.common.ContentType;
032import org.jwat.common.HttpHeader;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import dk.netarkivet.common.Constants;
037import dk.netarkivet.common.exceptions.IOFailure;
038import dk.netarkivet.common.utils.ChecksumCalculator;
039import dk.netarkivet.common.utils.archive.ArchiveBatchJob;
040import dk.netarkivet.common.utils.archive.ArchiveHeaderBase;
041import dk.netarkivet.common.utils.archive.ArchiveRecordBase;
042import dk.netarkivet.common.utils.batch.ArchiveBatchFilter;
043
044/**
045 * Batch job that extracts information to create a CDX file.
046 * <p>
047 * A CDX file contains sorted lines of metadata from the ARC/WARC files, with each line followed by the file and offset
048 * the record was found at, and optionally a checksum. The timeout of this job is 7 days. See
049 * http://www.archive.org/web/researcher/cdx_file_format.php
050 */
051@SuppressWarnings({"serial", "unused"})
052public class ArchiveExtractCDXJob extends ArchiveBatchJob {
053
054    /** Logger for this class. */
055    private static final Logger log = LoggerFactory.getLogger(ArchiveExtractCDXJob.class);
056
057    /** An encoding for the standard included metadata fields without checksum. */
058    private static final String[] STD_FIELDS_EXCL_CHECKSUM = {"A", "e", "b", "m", "n", "g", "v"};
059
060    /** An encoding for the standard included metadata fields with checksum. */
061    private static final String[] STD_FIELDS_INCL_CHECKSUM = {"A", "e", "b", "m", "n", "g", "v", "c"};
062
063    /** Buffer size used to read the http header. */
064    private int HTTP_HEADER_BUFFER_SIZE = 1024 * 1024;
065
066    /** The fields to be included in CDX output. */
067    private String[] fields;
068
069    /** True if we put an MD5 in each CDX line as well. */
070    private boolean includeChecksum;
071
072    /**
073     * Constructs a new job for extracting CDX indexes.
074     *
075     * @param includeChecksum If true, an MD5 checksum is also written for each record. If false, it is not.
076     */
077    public ArchiveExtractCDXJob(boolean includeChecksum) {
078        this.fields = includeChecksum ? STD_FIELDS_INCL_CHECKSUM : STD_FIELDS_EXCL_CHECKSUM;
079        this.includeChecksum = includeChecksum;
080        batchJobTimeout = 7 * Constants.ONE_DAY_IN_MILLIES;
081    }
082
083    /**
084     * Equivalent to ArchiveExtractCDXJob(true).
085     */
086    public ArchiveExtractCDXJob() {
087        this(true);
088    }
089
090    /**
091     * Filters out the NON-RESPONSE records.
092     *
093     * @return The filter that defines what ARC/WARC records are wanted in the output CDX file.
094     * @see dk.netarkivet.common.utils.archive.ArchiveBatchJob#getFilter()
095     */
096    @Override
097    public ArchiveBatchFilter getFilter() {
098        return ArchiveBatchFilter.EXCLUDE_NON_RESPONSE_RECORDS;
099    }
100
101    /**
102     * Initialize any data needed (none).
103     *
104     * @see dk.netarkivet.common.utils.archive.ArchiveBatchJob#initialize(OutputStream)
105     */
106    @Override
107    public void initialize(OutputStream os) {
108    }
109
110    /**
111     * Process this entry, reading metadata into the output stream.
112     *
113     * @throws IOFailure on trouble reading arc record data
114     * @see dk.netarkivet.common.utils.archive.ArchiveBatchJob#processRecord(ArchiveRecordBase, OutputStream)
115     */
116    @Override
117    public void processRecord(ArchiveRecordBase record, OutputStream os) {
118        log.trace("Processing Archive Record with offset: {}", record.getHeader().getOffset());
119        /*
120         * Fields are stored in a map so that it's easy to pull them out when looking at the fieldarray.
121         */
122        ArchiveHeaderBase header = record.getHeader();
123        Map<String, String> fieldsread = new HashMap<String, String>();
124        fieldsread.put("A", header.getUrl());
125        fieldsread.put("e", header.getIp());
126        fieldsread.put("b", header.getArcDateStr());
127        fieldsread.put("n", Long.toString(header.getLength()));
128        fieldsread.put("g", record.getHeader().getArchiveFile().getName());
129        fieldsread.put("v", Long.toString(record.getHeader().getOffset()));
130
131        String mimeType = header.getMimetype();
132        String msgType;
133        ContentType contentType = ContentType.parseContentType(mimeType);
134        boolean bResponse = false;
135        boolean bRequest = false;
136        if (contentType != null) {
137            if ("application".equals(contentType.contentType) && "http".equals(contentType.mediaType)) {
138                msgType = contentType.getParameter("msgtype");
139                if ("response".equals(msgType)) {
140                    bResponse = true;
141                } else if ("request".equals(msgType)) {
142                    bRequest = true;
143                }
144            }
145            mimeType = contentType.toStringShort();
146        }
147        ByteCountingPushBackInputStream pbin = new ByteCountingPushBackInputStream(record.getInputStream(), HTTP_HEADER_BUFFER_SIZE);
148        HttpHeader httpResponse = null;
149        if (bResponse) {
150            try {
151                httpResponse = HttpHeader.processPayload(HttpHeader.HT_RESPONSE, pbin, header.getLength(), null);
152                if (httpResponse != null && httpResponse.contentType != null) {
153                    contentType = ContentType.parseContentType(httpResponse.contentType);
154                    if (contentType != null) {
155                        mimeType = contentType.toStringShort();
156                    }
157                }
158            } catch (IOException e) {
159                throw new IOFailure("Error reading httpresponse header", e);
160            }
161        }
162        fieldsread.put("m", mimeType);
163
164        /* Only include checksum if necessary: */
165        if (includeChecksum) {
166            // InputStream instream = sar; //Note: ARCRecord extends InputStream
167            // fieldsread.put("c", MD5.generateMD5(instream));
168            fieldsread.put("c", ChecksumCalculator.calculateMd5(pbin));
169        }
170
171        if (httpResponse != null) {
172            try {
173                httpResponse.close();
174            } catch (IOException e) {
175                throw new IOFailure("Error closing httpresponse header", e);
176            }
177        }
178
179        printFields(fieldsread, os);
180    }
181
182    /**
183     * End of the batch job.
184     *
185     * @see dk.netarkivet.common.utils.arc.ARCBatchJob#finish(OutputStream)
186     */
187    @Override
188    public void finish(OutputStream os) {
189    }
190
191    /**
192     * Print the values found for a set of fields. Prints the '-' character for any null values.
193     *
194     * @param fieldsread A hashtable of values indexed by field letters
195     * @param outstream The outputstream to write the values to
196     */
197    private void printFields(Map<String, String> fieldsread, OutputStream outstream) {
198        StringBuffer sb = new StringBuffer();
199
200        for (int i = 0; i < fields.length; i++) {
201            Object o = fieldsread.get(fields[i]);
202            sb.append((i > 0) ? " " : "");
203            sb.append((o == null) ? "-" : o.toString());
204        }
205        sb.append("\n");
206        try {
207            outstream.write(sb.toString().getBytes("UTF-8"));
208        } catch (IOException e) {
209            throw new IOFailure("Error writing CDX line '" + sb + "' to batch outstream", e);
210        }
211    }
212
213    /**
214     * @return Humanly readable description of this instance.
215     */
216    public String toString() {
217        return getClass().getName() + ", with Filter: " + getFilter() + ", include checksum = " + includeChecksum;
218    }
219
220}