001/*
002 * #%L
003 * Netarchivesuite - common
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.common.utils.warc;
024
025import java.io.File;
026import java.io.FileOutputStream;
027import java.io.IOException;
028import java.io.InputStream;
029import java.io.PrintStream;
030import java.net.URI;
031import java.net.URISyntaxException;
032import java.util.Date;
033import java.util.HashMap;
034import java.util.HashSet;
035import java.util.Iterator;
036import java.util.Map;
037import java.util.Map.Entry;
038import java.util.Set;
039import java.util.concurrent.atomic.AtomicInteger;
040
041import org.archive.format.warc.WARCConstants;
042import org.archive.format.warc.WARCConstants.WARCRecordType;
043import org.archive.io.ArchiveRecord;
044import org.archive.io.ArchiveRecordHeader;
045import org.archive.io.warc.WARCReader;
046import org.archive.io.warc.WARCReaderFactory;
047import org.archive.io.warc.WARCRecord;
048import org.archive.io.warc.WARCRecordInfo;
049import org.archive.io.warc.WARCWriter;
050import org.archive.io.warc.WARCWriterPoolSettings;
051import org.archive.io.warc.WARCWriterPoolSettingsData;
052import org.archive.uid.UUIDGenerator;
053import org.archive.util.anvl.ANVLRecord;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057import dk.netarkivet.common.Constants;
058import dk.netarkivet.common.exceptions.ArgumentNotValid;
059import dk.netarkivet.common.exceptions.IOFailure;
060import dk.netarkivet.common.exceptions.IllegalState;
061import dk.netarkivet.common.utils.archive.ArchiveDateConverter;
062import dk.netarkivet.common.utils.archive.HeritrixArchiveHeaderWrapper;
063
064/**
065 * Various utilities on WARC-records. We have borrowed code from wayback. See
066 * org.archive.wayback.resourcestore.indexer.WARCRecordToSearchResultAdapter
067 */
068public class WARCUtils {
069
070    /** Logging output place. */
071    protected static final Logger log = LoggerFactory.getLogger(WARCUtils.class);
072
073    /**
074     * Create new WARCWriter, writing to warcfile newFile.
075     *
076     * @param newFile the WARCfile, that the WARCWriter writes to.
077     * @return new WARCWriter, writing to warcfile newFile.
078     */
079    public static WARCWriter createWARCWriter(File newFile) {
080        WARCWriter writer;
081        PrintStream ps = null;
082        try {
083            ps = new PrintStream(new FileOutputStream(newFile));
084            /*
085            writer = new WARCWriter(new AtomicInteger(), ps,
086            // This name is used for the first (file metadata) record
087                    newFile, false, // Don't compress
088                    // Use current time
089                    ArchiveDateConverter.getWarcDateFormat().format(new Date()), null // No particular file metadata to
090                                                                                      // add
091            );
092            */
093            WARCWriterPoolSettings settings = new WARCWriterPoolSettingsData(
094                        WARCConstants.WARC_FILE_EXTENSION, null, WARCConstants.DEFAULT_MAX_WARC_FILE_SIZE, false,
095                        null, null, new UUIDGenerator());
096            writer = new WARCWriter(new AtomicInteger(), ps, newFile, settings);
097        } catch (IOException e) {
098            if (ps != null) {
099                ps.close();
100            }
101            String message = "Could not create WARCWriter to file '" + newFile + "'.\n";
102            log.warn(message);
103            throw new IOFailure(message, e);
104        }
105        return writer;
106    }
107
108    /**
109     * Insert the contents of a WARC file into another WARCFile.
110     *
111     * @param warcFile An WARC file to read.
112     * @param writer A place to write the arc records
113     * @throws IOFailure if there are problems reading the file.
114     */
115    public static void insertWARCFile(File warcFile, WARCWriter writer) {
116        ArgumentNotValid.checkNotNull(writer, "WARCWriter aw");
117        ArgumentNotValid.checkNotNull(warcFile, "File warcFile");
118        WARCReader r;
119
120        try {
121            r = WARCReaderFactory.get(warcFile);
122        } catch (IOException e) {
123            String message = "Error while copying WARC records from " + warcFile;
124            log.warn(message, e);
125            throw new IOFailure(message, e);
126        }
127        Iterator<ArchiveRecord> it = r.iterator();
128        WARCRecord record;
129        while (it.hasNext()) {
130            record = (WARCRecord) it.next();
131            copySingleRecord(writer, record);
132        }
133    }
134
135    private static final Set<String> ignoreHeadersMap = new HashSet<String>();
136
137    private static final Map<String, String> headerNamesCaseMap = new HashMap<String, String>();
138
139    static {
140        ignoreHeadersMap.add("content-type");
141        ignoreHeadersMap.add("reader-identifier");
142        ignoreHeadersMap.add("absolute-offset");
143        ignoreHeadersMap.add("content-length");
144        //ignoreHeadersMap.add("warc-date");
145        ignoreHeadersMap.add("warc-record-id");
146        ignoreHeadersMap.add("warc-type");
147        ignoreHeadersMap.add("warc-target-uri");
148        String[] headerNames = {"WARC-Type", "WARC-Record-ID", "WARC-Date", "Content-Length", "Content-Type",
149                "WARC-Concurrent-To", "WARC-Block-Digest", "WARC-Payload-Digest", "WARC-IP-Address", "WARC-Refers-To",
150                "WARC-Target-URI", "WARC-Truncated", "WARC-Warcinfo-ID", "WARC-Filename", "WARC-Profile",
151                "WARC-Identified-Payload-Type", "WARC-Segment-Origin-ID", "WARC-Segment-Number",
152                "WARC-Segment-Total-Length"};
153        for (int i = 0; i < headerNames.length; ++i) {
154            headerNamesCaseMap.put(headerNames[i].toLowerCase(), headerNames[i]);
155        }
156    }
157
158    /**
159     * Writes the given WARCRecord on the given WARCWriter.
160     * <p>
161     * Creates a new unique UUID for the copied record.
162     *
163     * @param aw The WARCWriter to output the record on.
164     * @param record The record to output
165     */
166    private static void copySingleRecord(WARCWriter aw, WARCRecord record) {
167        try {
168            // Prepare metadata...
169            HeritrixArchiveHeaderWrapper header = HeritrixArchiveHeaderWrapper.wrapArchiveHeader(null, record);
170            String warcType = header.getHeaderStringValue("WARC-Type");
171
172            String url = header.getUrl();
173            Date date = header.getDate();
174            String dateStr = ArchiveDateConverter.getWarcDateFormat().format(date);
175            String mimetype = header.getMimetype();
176            String recordIdStr;
177            URI recordId;
178            try {
179                recordIdStr = header.getHeaderStringValue("warc-record-id");
180                if (recordIdStr.startsWith("<") && recordIdStr.endsWith(">")) {
181                    recordIdStr = recordIdStr.substring(1, recordIdStr.length() - 1);
182                }
183                recordId = new URI(recordIdStr);
184            } catch (URISyntaxException e) {
185                throw new IllegalState("Epic fail creating URI from UUID!");
186            }
187
188            //ANVLRecord namedFields = new ANVLRecord();
189
190            // Copy to headers from the original WARC record to the new one.
191            // Since we store the headers lowercase, we recase them.
192            // Non WARC header header are lowercase and loose their case.
193            /*
194            Iterator<Entry<String, Object>> headerIter = header.getHeaderFields().entrySet().iterator();
195            Entry<String, Object> headerEntry;
196            String headerName;
197            String headerNameCased;
198            while (headerIter.hasNext()) {
199                headerEntry = headerIter.next();
200                if (!ignoreHeadersMap.contains(headerEntry.getKey())) {
201                    headerName = headerEntry.getKey();
202                    headerNameCased = headerNamesCaseMap.get(headerName);
203                    if (headerNameCased != null) {
204                        headerName = headerNameCased;
205                    }
206                    namedFields.addLabelValue(headerName, headerEntry.getValue().toString());
207                }
208            }
209            */
210
211            InputStream in = record;
212            // getContentBegin only works for WARC and in H1.44.x!
213            Long payloadLength = header.getLength() - record.getHeader().getContentBegin();
214
215            WARCRecordType type = WARCRecordType.valueOf(warcType);
216            WARCRecordInfo newRecord = new WARCRecordInfo();
217            Iterator<Entry<String, Object>> headerIter = header.getHeaderFields().entrySet().iterator();
218            Entry<String, Object> headerEntry;
219            String headerName;
220            String headerNameCased;
221            while (headerIter.hasNext()) {
222                headerEntry = headerIter.next();
223                if (!ignoreHeadersMap.contains(headerEntry.getKey())) {
224                    headerName = headerEntry.getKey();
225                    headerNameCased = headerNamesCaseMap.get(headerName);
226                    if (headerNameCased != null) {
227                        headerName = headerNameCased;
228                    }
229                    newRecord.addExtraHeader(headerName, headerEntry.getValue().toString());
230                }
231            }
232            newRecord.setType(type);
233            newRecord.setUrl(url);
234            newRecord.setMimetype(mimetype);
235            newRecord.setRecordId(recordId);
236            newRecord.setContentStream(in);
237            newRecord.setContentLength(payloadLength);
238                aw.writeRecord(newRecord);
239
240            // Write WARC record with type=warcType
241                /*
242            if ("metadata".equals(warcType)) {
243                aw.writeMetadataRecord(url, dateStr, mimetype, recordId, namedFields, in, payloadLength);
244            } else if ("request".equals(warcType)) {
245                aw.writeRequestRecord(url, dateStr, mimetype, recordId, namedFields, in, payloadLength);
246            } else if ("resource".equals(warcType)) {
247                aw.writeResourceRecord(url, dateStr, mimetype, recordId, namedFields, in, payloadLength);
248            } else if ("response".equals(warcType)) {
249                aw.writeResponseRecord(url, dateStr, mimetype, recordId, namedFields, in, payloadLength);
250            } else if ("revisit".equals(warcType)) {
251                aw.writeRevisitRecord(url, dateStr, mimetype, recordId, namedFields, in, payloadLength);
252            } else if ("warcinfo".equals(warcType)) {
253                aw.writeWarcinfoRecord(dateStr, mimetype, recordId, namedFields, in, payloadLength);
254            } else {
255                throw new IOFailure("Unknown WARC-Type!");
256            }
257            */
258        } catch (Exception e) {
259            throw new IOFailure("Error occurred while writing an WARC record" + record, e);
260        }
261    }
262
263    /**
264     * Read the contents (payload) of an WARC record into a byte array.
265     *
266     * @param record An WARC record to read from. After reading, the WARC Record will no longer have its own data
267     * available for reading.
268     * @return A byte array containing the payload of the WARC record. Note that the size of the payload is calculated
269     * by subtracting the contentBegin value from the length of the record (both values included in the record header).
270     * @throws IOFailure If there is an error reading the data, or if the record is longer than Integer.MAX_VALUE (since
271     * we can't make bigger arrays).
272     */
273    public static byte[] readWARCRecord(WARCRecord record) throws IOFailure {
274        ArgumentNotValid.checkNotNull(record, "WARCRecord record");
275        if (record.getHeader().getLength() > Integer.MAX_VALUE) {
276            throw new IOFailure("WARC Record too long to fit in array: " + record.getHeader().getLength() + " > "
277                    + Integer.MAX_VALUE);
278        }
279        // Calculate the length of the payload.
280        // the size of the payload is calculated by subtracting
281        // the contentBegin value from the length of the record.
282
283        ArchiveRecordHeader header = record.getHeader();
284        long length = header.getLength();
285
286        int payloadLength = (int) (length - header.getContentBegin());
287
288        // read from stream
289        byte[] tmpbuffer = new byte[payloadLength];
290        byte[] buffer = new byte[Constants.IO_BUFFER_SIZE];
291        int bytesRead;
292        int totalBytes = 0;
293        try {
294            for (; (totalBytes < payloadLength) && ((bytesRead = record.read(buffer)) != -1); totalBytes += bytesRead) {
295                System.arraycopy(buffer, 0, tmpbuffer, totalBytes, bytesRead);
296            }
297        } catch (IOException e) {
298            throw new IOFailure("Failure when reading the WARC-record", e);
299        }
300
301        // Check if the number of bytes read (= totalbytes) matches the
302        // size of the buffer.
303        if (tmpbuffer.length != totalBytes) {
304            // make sure we only return an array with bytes we actually read
305            byte[] truncateBuffer = new byte[totalBytes];
306            System.arraycopy(tmpbuffer, 0, truncateBuffer, 0, totalBytes);
307            log.debug("Storing {} bytes. Expected to store: {}", totalBytes, tmpbuffer.length);
308            return truncateBuffer;
309        } else {
310            return tmpbuffer;
311        }
312
313    }
314
315    /**
316     * Find out what type of WARC-record this is.
317     *
318     * @param record a given WARCRecord
319     * @return the type of WARCRecord as a String.
320     */
321    public static String getRecordType(WARCRecord record) {
322        ArgumentNotValid.checkNotNull(record, "record");
323        ArchiveRecordHeader header = record.getHeader();
324        return (String) header.getHeaderValue(WARCConstants.HEADER_KEY_TYPE);
325    }
326
327    /**
328     * Check if the given filename represents a WARC file.
329     *
330     * @param filename A given filename
331     * @return true, if the filename ends with .warc or .warc.gz
332     */
333    public static boolean isWarc(String filename) {
334        ArgumentNotValid.checkNotNullOrEmpty(filename, "filename");
335        String lowercaseFilename = filename.toLowerCase();
336        return (lowercaseFilename.endsWith(".warc") || lowercaseFilename.endsWith(".warc.gz"));
337    }
338
339}