001/* 002 * #%L 003 * Netarchivesuite - common 004 * %% 005 * Copyright (C) 2005 - 2018 The Royal Danish Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.common.utils.cdx; 024 025import java.io.IOException; 026import java.io.OutputStream; 027import java.util.HashMap; 028import java.util.Map; 029 030import org.jwat.common.ByteCountingPushBackInputStream; 031import org.jwat.common.ContentType; 032import org.jwat.common.HttpHeader; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036import dk.netarkivet.common.Constants; 037import dk.netarkivet.common.exceptions.IOFailure; 038import dk.netarkivet.common.utils.ChecksumCalculator; 039import dk.netarkivet.common.utils.archive.ArchiveBatchJob; 040import dk.netarkivet.common.utils.archive.ArchiveHeaderBase; 041import dk.netarkivet.common.utils.archive.ArchiveRecordBase; 042import dk.netarkivet.common.utils.batch.ArchiveBatchFilter; 043 044/** 045 * Batch job that extracts information to create a CDX file. 046 * <p> 047 * A CDX file contains sorted lines of metadata from the ARC/WARC files, with each line followed by the file and offset 048 * the record was found at, and optionally a checksum. The timeout of this job is 7 days. See 049 * http://www.archive.org/web/researcher/cdx_file_format.php 050 */ 051@SuppressWarnings({"serial", "unused"}) 052public class ArchiveExtractCDXJob extends ArchiveBatchJob { 053 054 /** Logger for this class. */ 055 private static final Logger log = LoggerFactory.getLogger(ArchiveExtractCDXJob.class); 056 057 /** An encoding for the standard included metadata fields without checksum. */ 058 private static final String[] STD_FIELDS_EXCL_CHECKSUM = {"A", "e", "b", "m", "n", "g", "v"}; 059 060 /** An encoding for the standard included metadata fields with checksum. */ 061 private static final String[] STD_FIELDS_INCL_CHECKSUM = {"A", "e", "b", "m", "n", "g", "v", "c"}; 062 063 /** Buffer size used to read the http header. */ 064 private int HTTP_HEADER_BUFFER_SIZE = 1024 * 1024; 065 066 /** The fields to be included in CDX output. */ 067 private String[] fields; 068 069 /** True if we put an MD5 in each CDX line as well. */ 070 private boolean includeChecksum; 071 072 /** 073 * Constructs a new job for extracting CDX indexes. 074 * 075 * @param includeChecksum If true, an MD5 checksum is also written for each record. If false, it is not. 076 */ 077 public ArchiveExtractCDXJob(boolean includeChecksum) { 078 this.fields = includeChecksum ? STD_FIELDS_INCL_CHECKSUM : STD_FIELDS_EXCL_CHECKSUM; 079 this.includeChecksum = includeChecksum; 080 batchJobTimeout = 7 * Constants.ONE_DAY_IN_MILLIES; 081 } 082 083 /** 084 * Equivalent to ArchiveExtractCDXJob(true). 085 */ 086 public ArchiveExtractCDXJob() { 087 this(true); 088 } 089 090 /** 091 * Filters out the NON-RESPONSE records. 092 * 093 * @return The filter that defines what ARC/WARC records are wanted in the output CDX file. 094 * @see dk.netarkivet.common.utils.archive.ArchiveBatchJob#getFilter() 095 */ 096 @Override 097 public ArchiveBatchFilter getFilter() { 098 return ArchiveBatchFilter.EXCLUDE_NON_RESPONSE_RECORDS; 099 } 100 101 /** 102 * Initialize any data needed (none). 103 * 104 * @see dk.netarkivet.common.utils.archive.ArchiveBatchJob#initialize(OutputStream) 105 */ 106 @Override 107 public void initialize(OutputStream os) { 108 } 109 110 /** 111 * Process this entry, reading metadata into the output stream. 112 * 113 * @throws IOFailure on trouble reading arc record data 114 * @see dk.netarkivet.common.utils.archive.ArchiveBatchJob#processRecord(ArchiveRecordBase, OutputStream) 115 */ 116 @Override 117 public void processRecord(ArchiveRecordBase record, OutputStream os) { 118 log.trace("Processing Archive Record with offset: {}", record.getHeader().getOffset()); 119 /* 120 * Fields are stored in a map so that it's easy to pull them out when looking at the fieldarray. 121 */ 122 ArchiveHeaderBase header = record.getHeader(); 123 Map<String, String> fieldsread = new HashMap<String, String>(); 124 fieldsread.put("A", header.getUrl()); 125 fieldsread.put("e", header.getIp()); 126 fieldsread.put("b", header.getArcDateStr()); 127 fieldsread.put("n", Long.toString(header.getLength())); 128 fieldsread.put("g", record.getHeader().getArchiveFile().getName()); 129 fieldsread.put("v", Long.toString(record.getHeader().getOffset())); 130 131 String mimeType = header.getMimetype(); 132 String msgType; 133 ContentType contentType = ContentType.parseContentType(mimeType); 134 boolean bResponse = false; 135 boolean bRequest = false; 136 if (contentType != null) { 137 if ("application".equals(contentType.contentType) && "http".equals(contentType.mediaType)) { 138 msgType = contentType.getParameter("msgtype"); 139 if ("response".equals(msgType)) { 140 bResponse = true; 141 } else if ("request".equals(msgType)) { 142 bRequest = true; 143 } 144 } 145 mimeType = contentType.toStringShort(); 146 } 147 ByteCountingPushBackInputStream pbin = new ByteCountingPushBackInputStream(record.getInputStream(), HTTP_HEADER_BUFFER_SIZE); 148 HttpHeader httpResponse = null; 149 if (bResponse) { 150 try { 151 httpResponse = HttpHeader.processPayload(HttpHeader.HT_RESPONSE, pbin, header.getLength(), null); 152 if (httpResponse != null && httpResponse.contentType != null) { 153 contentType = ContentType.parseContentType(httpResponse.contentType); 154 if (contentType != null) { 155 mimeType = contentType.toStringShort(); 156 } 157 } 158 } catch (IOException e) { 159 throw new IOFailure("Error reading httpresponse header", e); 160 } 161 } 162 fieldsread.put("m", mimeType); 163 164 /* Only include checksum if necessary: */ 165 if (includeChecksum) { 166 // InputStream instream = sar; //Note: ARCRecord extends InputStream 167 // fieldsread.put("c", MD5.generateMD5(instream)); 168 fieldsread.put("c", ChecksumCalculator.calculateMd5(pbin)); 169 } 170 171 if (httpResponse != null) { 172 try { 173 httpResponse.close(); 174 } catch (IOException e) { 175 throw new IOFailure("Error closing httpresponse header", e); 176 } 177 } 178 179 printFields(fieldsread, os); 180 } 181 182 /** 183 * End of the batch job. 184 * 185 * @see dk.netarkivet.common.utils.arc.ARCBatchJob#finish(OutputStream) 186 */ 187 @Override 188 public void finish(OutputStream os) { 189 } 190 191 /** 192 * Print the values found for a set of fields. Prints the '-' character for any null values. 193 * 194 * @param fieldsread A hashtable of values indexed by field letters 195 * @param outstream The outputstream to write the values to 196 */ 197 private void printFields(Map<String, String> fieldsread, OutputStream outstream) { 198 StringBuffer sb = new StringBuffer(); 199 200 for (int i = 0; i < fields.length; i++) { 201 Object o = fieldsread.get(fields[i]); 202 sb.append((i > 0) ? " " : ""); 203 sb.append((o == null) ? "-" : o.toString()); 204 } 205 sb.append("\n"); 206 try { 207 outstream.write(sb.toString().getBytes("UTF-8")); 208 } catch (IOException e) { 209 throw new IOFailure("Error writing CDX line '" + sb + "' to batch outstream", e); 210 } 211 } 212 213 /** 214 * @return Humanly readable description of this instance. 215 */ 216 public String toString() { 217 return getClass().getName() + ", with Filter: " + getFilter() + ", include checksum = " + includeChecksum; 218 } 219 220}