001/*
002 * #%L
003 * Netarchivesuite - harvester
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.harvester.harvesting.metadata;
024
025import java.io.ByteArrayInputStream;
026import java.io.File;
027import java.io.FileInputStream;
028import java.io.FileNotFoundException;
029import java.io.IOException;
030import java.io.InputStream;
031import java.net.URISyntaxException;
032import java.util.Date;
033import java.util.UUID;
034
035import org.apache.commons.io.IOUtils;
036import org.jwat.common.ANVLRecord;
037import org.jwat.common.ContentType;
038import org.jwat.common.Uri;
039import org.jwat.warc.WarcConstants;
040import org.jwat.warc.WarcFileNaming;
041import org.jwat.warc.WarcFileNamingSingleFile;
042import org.jwat.warc.WarcFileWriter;
043import org.jwat.warc.WarcFileWriterConfig;
044import org.jwat.warc.WarcHeader;
045import org.jwat.warc.WarcRecord;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049import dk.netarkivet.common.exceptions.IOFailure;
050import dk.netarkivet.common.exceptions.IllegalState;
051import dk.netarkivet.common.utils.ChecksumCalculator;
052import dk.netarkivet.common.utils.SystemUtils;
053
054/**
055 * MetadataFileWriter that writes to WARC files.
056 */
057public class MetadataFileWriterWarc extends MetadataFileWriter {
058
059    private static final Logger log = LoggerFactory.getLogger(MetadataFileWriterWarc.class);
060
061    /** Writer to this jobs metadatafile. This is closed when the metadata is marked as ready. */
062    private WarcFileWriter writer = null;
063
064    /** The ID of the Warcinfo record. Set when calling the insertInfoRecord method. */
065    private Uri warcInfoUID = null;
066
067    /**
068     * Create a <code>MetadataFileWriter</code> for WARC output.
069     *
070     * @param metadataWarcFile The WARC output file
071     * @return <code>MetadataFileWriter</code> for writing metadata files in WARC
072     */
073    public static MetadataFileWriter createWriter(File metadataWarcFile) {
074        MetadataFileWriterWarc mtfw = new MetadataFileWriterWarc();
075        WarcFileNaming naming = new WarcFileNamingSingleFile(metadataWarcFile);
076        WarcFileWriterConfig config = new WarcFileWriterConfig(metadataWarcFile.getParentFile(), false, Long.MAX_VALUE, true);
077        mtfw.writer = WarcFileWriter.getWarcWriterInstance(naming, config);
078        mtfw.open();
079        return mtfw;
080    }
081
082    protected void open() {
083        try {
084            writer.open();
085        } catch (IOException e) {
086            throw new IOFailure("Error opening MetadataFileWriterWarc", e);
087        }
088    }
089
090    @Override
091    public void close() {
092        if (writer != null) {
093            try {
094                writer.close();
095            } catch (IOException e) {
096                throw new IOFailure("Error closing MetadataFileWriterWarc", e);
097            }
098        }
099        writer = null;
100    }
101
102    @Override
103    public File getFile() {
104        return writer.getFile();
105    }
106
107    /**
108     * Insert a warcInfoRecord in the WARC-file, if it doesn't already exists. saves the recordID of the written
109     * info-record for future reference to be used for later in the
110     *
111     * @param payloadToInfoRecord the given payload for this record.
112     */
113    public void insertInfoRecord(ANVLRecord payloadToInfoRecord) {
114        if (warcInfoUID != null) {
115            throw new IllegalState("An WarcInfo record has already been inserted");
116        }
117        String filename = writer.getFile().getName();
118        if (filename.endsWith(WarcFileWriter.ACTIVE_SUFFIX)) {
119                filename = filename.substring(0, filename.length() - WarcFileWriter.ACTIVE_SUFFIX.length());
120        }
121        Uri recordId;
122        try {
123            recordId = new Uri("urn:uuid:" + UUID.randomUUID().toString());
124        } catch (URISyntaxException e) {
125            throw new IllegalState("Epic fail creating URI from UUID!");
126        }
127        warcInfoUID = recordId;
128        try {
129            byte[] payloadAsBytes = payloadToInfoRecord.getUTF8Bytes();
130            String blockDigest = ChecksumCalculator.calculateSha1(new ByteArrayInputStream(payloadAsBytes));
131            WarcRecord record = WarcRecord.createRecord(writer.writer);
132            WarcHeader header = record.header;
133            header.warcTypeIdx = WarcConstants.RT_IDX_WARCINFO;
134            header.addHeader(WarcConstants.FN_WARC_RECORD_ID, recordId, null);
135            header.addHeader(WarcConstants.FN_WARC_DATE, new Date(), null);
136            header.addHeader(WarcConstants.FN_WARC_FILENAME, filename);
137            header.addHeader(WarcConstants.FN_CONTENT_TYPE, ContentType.parseContentType(WarcConstants.CT_APP_WARC_FIELDS), null);
138            header.addHeader(WarcConstants.FN_CONTENT_LENGTH, new Long(payloadAsBytes.length), null);
139            header.addHeader(WarcConstants.FN_WARC_BLOCK_DIGEST, blockDigest);
140            writer.writer.writeHeader(record);
141            ByteArrayInputStream bin = new ByteArrayInputStream(payloadAsBytes);
142            writer.writer.streamPayload(bin);
143            writer.writer.closeRecord();
144        } catch (IOException e) {
145            throw new IllegalState("Error inserting warcinfo record", e);
146        }
147    }
148
149    @Override
150    public void writeFileTo(File file, String uri, String mime) {
151        writeTo(file, uri, mime);
152    }
153
154    @Override
155    public boolean writeTo(File fileToArchive, String URL, String mimetype) {
156        if (!fileToArchive.isFile()) {
157            throw new IOFailure("Not a file: " + fileToArchive.getPath());
158        }
159        if (warcInfoUID == null) {
160            throw new IllegalState("An WarcInfo record has not been inserted yet");
161        }
162        log.info("{} {}", fileToArchive, fileToArchive.length());
163        String blockDigest = ChecksumCalculator.calculateSha1(fileToArchive);
164        Uri recordId;
165        try {
166            recordId = new Uri("urn:uuid:" + UUID.randomUUID().toString());
167        } catch (URISyntaxException e) {
168            throw new IllegalState("Epic fail creating URI from UUID!");
169        }
170        InputStream in = null;
171        try {
172            WarcRecord record = WarcRecord.createRecord(writer.writer);
173            WarcHeader header = record.header;
174            header.warcTypeIdx = WarcConstants.RT_IDX_RESOURCE;
175            header.addHeader(WarcConstants.FN_WARC_RECORD_ID, recordId, null);
176            header.addHeader(WarcConstants.FN_WARC_DATE, new Date(), null);
177            header.addHeader(WarcConstants.FN_WARC_WARCINFO_ID, warcInfoUID, null);
178            header.addHeader(WarcConstants.FN_WARC_IP_ADDRESS, SystemUtils.getLocalIP());
179            header.addHeader(WarcConstants.FN_WARC_TARGET_URI, URL);
180            header.addHeader(WarcConstants.FN_WARC_BLOCK_DIGEST, blockDigest);
181            header.addHeader(WarcConstants.FN_CONTENT_TYPE, ContentType.parseContentType(mimetype), null);
182            header.addHeader(WarcConstants.FN_CONTENT_LENGTH, new Long(fileToArchive.length()), null);
183            writer.writer.writeHeader(record);
184            in = new FileInputStream(fileToArchive);
185            writer.writer.streamPayload(in);
186            writer.writer.closeRecord();
187        } catch (FileNotFoundException e) {
188            throw new IOFailure("Unable to open file: " + fileToArchive.getPath(), e);
189        } catch (IOException e) {
190            throw new IOFailure("Epic IO fail while writing to WARC file: " + fileToArchive.getPath(), e);
191        } finally {
192            IOUtils.closeQuietly(in);
193        }
194        return true;
195    }
196
197    @Override
198    public void write(String uri, String contentType, String hostIP, long fetchBeginTimeStamp, byte[] payload)
199            throws java.io.IOException {        
200        ByteArrayInputStream in = new ByteArrayInputStream(payload);
201        String blockDigest = ChecksumCalculator.calculateSha1(in);
202        Uri recordId;
203        try {
204            recordId = new Uri("urn:uuid:" + UUID.randomUUID().toString());
205        } catch (URISyntaxException e) {
206            throw new IllegalState("Epic fail creating URI from UUID!");
207        }
208        WarcRecord record = WarcRecord.createRecord(writer.writer);
209        WarcHeader header = record.header;
210        header.warcTypeIdx = WarcConstants.RT_IDX_RESOURCE;
211        header.addHeader(WarcConstants.FN_WARC_RECORD_ID, recordId, null);
212        header.addHeader(WarcConstants.FN_WARC_DATE, new Date(fetchBeginTimeStamp), null);
213        header.addHeader(WarcConstants.FN_WARC_WARCINFO_ID, warcInfoUID, null);
214        header.addHeader(WarcConstants.FN_WARC_IP_ADDRESS, hostIP);
215        header.addHeader(WarcConstants.FN_WARC_TARGET_URI, uri);
216        header.addHeader(WarcConstants.FN_WARC_BLOCK_DIGEST, blockDigest);
217        header.addHeader(WarcConstants.FN_CONTENT_TYPE, ContentType.parseContentType(contentType), null);
218        header.addHeader(WarcConstants.FN_CONTENT_LENGTH, new Long(payload.length), null);
219        writer.writer.writeHeader(record);
220        in = new ByteArrayInputStream(payload); // A re-read is necessary here!
221        writer.writer.streamPayload(in);
222        writer.writer.closeRecord();
223    }
224
225}