001/*
002 * #%L
003 * Netarchivesuite - harvester
004 * %%
005 * Copyright (C) 2005 - 2018 The Royal Danish Library, 
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.harvester.harvesting.metadata;
024
025import java.io.ByteArrayInputStream;
026import java.io.File;
027import java.io.FileInputStream;
028import java.io.FileNotFoundException;
029import java.io.IOException;
030import java.io.InputStream;
031import java.net.URISyntaxException;
032import java.util.Date;
033import java.util.UUID;
034
035import org.apache.commons.io.IOUtils;
036import org.archive.util.Base32;
037import org.jwat.common.ANVLRecord;
038import org.jwat.common.ContentType;
039import org.jwat.common.Uri;
040import org.jwat.warc.WarcConstants;
041import org.jwat.warc.WarcDigest;
042import org.jwat.warc.WarcFileNaming;
043import org.jwat.warc.WarcFileNamingSingleFile;
044import org.jwat.warc.WarcFileWriter;
045import org.jwat.warc.WarcFileWriterConfig;
046import org.jwat.warc.WarcHeader;
047import org.jwat.warc.WarcRecord;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import dk.netarkivet.common.exceptions.IOFailure;
052import dk.netarkivet.common.exceptions.IllegalState;
053import dk.netarkivet.common.utils.ChecksumCalculator;
054import dk.netarkivet.common.utils.SystemUtils;
055
056/**
057 * MetadataFileWriter that writes to WARC files.
058 */
059public class MetadataFileWriterWarc extends MetadataFileWriter {
060
061    private static final Logger log = LoggerFactory.getLogger(MetadataFileWriterWarc.class);
062
063    /** Writer to this jobs metadatafile. This is closed when the metadata is marked as ready. */
064    private WarcFileWriter writer = null;
065
066    /** The ID of the Warcinfo record. Set when calling the insertInfoRecord method. */
067    private Uri warcInfoUID = null;
068
069    /**
070     * Create a <code>MetadataFileWriter</code> for WARC output.
071     *
072     * @param metadataWarcFile The WARC output file
073     * @return <code>MetadataFileWriter</code> for writing metadata files in WARC
074     */
075    public static MetadataFileWriter createWriter(File metadataWarcFile) {
076        MetadataFileWriterWarc mtfw = new MetadataFileWriterWarc();
077        WarcFileNaming naming = new WarcFileNamingSingleFile(metadataWarcFile);
078        WarcFileWriterConfig config = new WarcFileWriterConfig(metadataWarcFile.getParentFile(), compressRecords(), Long.MAX_VALUE, true);
079        mtfw.writer = WarcFileWriter.getWarcWriterInstance(naming, config);
080        mtfw.open();
081        return mtfw;
082    }
083
084    protected void open() {
085        try {
086            writer.open();
087        } catch (IOException e) {
088            throw new IOFailure("Error opening MetadataFileWriterWarc", e);
089        }
090    }
091
092    @Override
093    public void close() {
094        if (writer != null) {
095            try {
096                writer.close();
097            } catch (IOException e) {
098                throw new IOFailure("Error closing MetadataFileWriterWarc", e);
099            }
100        }
101        writer = null;
102    }
103
104    @Override
105    public File getFile() {
106        return writer.getFile();
107    }
108
109    /**
110     * Insert a warcInfoRecord in the WARC-file, if it doesn't already exists. saves the recordID of the written
111     * info-record for future reference to be used for later in the
112     *
113     * @param payloadToInfoRecord the given payload for this record.
114     */
115    public void insertInfoRecord(ANVLRecord payloadToInfoRecord) {
116        if (warcInfoUID != null) {
117            throw new IllegalState("An WarcInfo record has already been inserted");
118        }
119        String filename = writer.getFile().getName();
120        if (filename.endsWith(WarcFileWriter.ACTIVE_SUFFIX)) {
121                filename = filename.substring(0, filename.length() - WarcFileWriter.ACTIVE_SUFFIX.length());
122        }
123        Uri recordId;
124        try {
125            recordId = new Uri("urn:uuid:" + UUID.randomUUID().toString());
126        } catch (URISyntaxException e) {
127            throw new IllegalState("Epic fail creating URI from UUID!", e);
128        }
129        warcInfoUID = recordId;
130        try {
131            byte[] payloadAsBytes = payloadToInfoRecord.getUTF8Bytes();
132            byte[] blockDigestBytes = ChecksumCalculator.digestInputStream(new ByteArrayInputStream(payloadAsBytes), "SHA1");
133            WarcDigest blockDigest = WarcDigest.createWarcDigest("SHA1", blockDigestBytes, "base32", Base32.encode(blockDigestBytes));
134            WarcRecord record = WarcRecord.createRecord(writer.writer);
135            WarcHeader header = record.header;
136            header.warcTypeIdx = WarcConstants.RT_IDX_WARCINFO;
137            header.addHeader(WarcConstants.FN_WARC_RECORD_ID, recordId, null);
138            header.addHeader(WarcConstants.FN_WARC_DATE, new Date(), null);
139            header.addHeader(WarcConstants.FN_WARC_FILENAME, filename);
140            header.addHeader(WarcConstants.FN_CONTENT_TYPE, ContentType.parseContentType(WarcConstants.CT_APP_WARC_FIELDS), null);
141            header.addHeader(WarcConstants.FN_CONTENT_LENGTH, new Long(payloadAsBytes.length), null);
142            header.addHeader(WarcConstants.FN_WARC_BLOCK_DIGEST, blockDigest, null);
143            writer.writer.writeHeader(record);
144            ByteArrayInputStream bin = new ByteArrayInputStream(payloadAsBytes);
145            writer.writer.streamPayload(bin);
146            writer.writer.closeRecord();
147        } catch (IOException e) {
148            throw new IllegalState("Error inserting warcinfo record", e);
149        }
150    }
151
152    @Override
153    public void writeFileTo(File file, String uri, String mime) {
154        writeTo(file, uri, mime);
155    }
156
157    @Override
158    public boolean writeTo(File fileToArchive, String URL, String mimetype) {
159        if (!fileToArchive.isFile()) {
160            throw new IOFailure("Not a file: " + fileToArchive.getPath());
161        }
162        if (warcInfoUID == null) {
163            throw new IllegalState("An WarcInfo record has not been inserted yet");
164        }
165        log.info("{} {}", fileToArchive, fileToArchive.length());
166        byte[] blockDigestBytes = ChecksumCalculator.digestFile(fileToArchive, "SHA1");
167        WarcDigest blockDigest = WarcDigest.createWarcDigest("SHA1", blockDigestBytes, "base32", Base32.encode(blockDigestBytes));
168        Uri recordId;
169        try {
170            recordId = new Uri("urn:uuid:" + UUID.randomUUID().toString());
171        } catch (URISyntaxException e) {
172            throw new IllegalState("Epic fail creating URI from UUID!", e);
173        }
174        InputStream in = null;
175        try {
176            WarcRecord record = WarcRecord.createRecord(writer.writer);
177            WarcHeader header = record.header;
178            header.warcTypeIdx = WarcConstants.RT_IDX_RESOURCE;
179            header.addHeader(WarcConstants.FN_WARC_RECORD_ID, recordId, null);
180            header.addHeader(WarcConstants.FN_WARC_DATE, new Date(), null);
181            header.addHeader(WarcConstants.FN_WARC_WARCINFO_ID, warcInfoUID, null);
182            header.addHeader(WarcConstants.FN_WARC_IP_ADDRESS, SystemUtils.getLocalIP());
183            header.addHeader(WarcConstants.FN_WARC_TARGET_URI, URL);
184            header.addHeader(WarcConstants.FN_WARC_BLOCK_DIGEST, blockDigest, null);
185            header.addHeader(WarcConstants.FN_CONTENT_TYPE, ContentType.parseContentType(mimetype), null);
186            header.addHeader(WarcConstants.FN_CONTENT_LENGTH, new Long(fileToArchive.length()), null);
187            writer.writer.writeHeader(record);
188            in = new FileInputStream(fileToArchive);
189            writer.writer.streamPayload(in);
190            writer.writer.closeRecord();
191        } catch (FileNotFoundException e) {
192            throw new IOFailure("Unable to open file: " + fileToArchive.getPath(), e);
193        } catch (IOException e) {
194            throw new IOFailure("Epic IO fail while writing to WARC file: " + fileToArchive.getPath(), e);
195        } finally {
196            IOUtils.closeQuietly(in);
197        }
198        return true;
199    }
200
201    @Override
202    public void write(String uri, String contentType, String hostIP, long fetchBeginTimeStamp, byte[] payload)
203            throws java.io.IOException {        
204        ByteArrayInputStream in = new ByteArrayInputStream(payload);
205        byte[] blockDigestBytes = ChecksumCalculator.digestInputStream(in, "SHA1");
206        WarcDigest blockDigest = WarcDigest.createWarcDigest("SHA1", blockDigestBytes, "base32", Base32.encode(blockDigestBytes));
207        Uri recordId;
208        try {
209            recordId = new Uri("urn:uuid:" + UUID.randomUUID().toString());
210        } catch (URISyntaxException e) {
211            throw new IllegalState("Epic fail creating URI from UUID!", e);
212        }
213        WarcRecord record = WarcRecord.createRecord(writer.writer);
214        WarcHeader header = record.header;
215        header.warcTypeIdx = WarcConstants.RT_IDX_RESOURCE;
216        header.addHeader(WarcConstants.FN_WARC_RECORD_ID, recordId, null);
217        header.addHeader(WarcConstants.FN_WARC_DATE, new Date(fetchBeginTimeStamp), null);
218        header.addHeader(WarcConstants.FN_WARC_WARCINFO_ID, warcInfoUID, null);
219        header.addHeader(WarcConstants.FN_WARC_IP_ADDRESS, hostIP);
220        header.addHeader(WarcConstants.FN_WARC_TARGET_URI, uri);
221        header.addHeader(WarcConstants.FN_WARC_BLOCK_DIGEST, blockDigest, null);
222        header.addHeader(WarcConstants.FN_CONTENT_TYPE, ContentType.parseContentType(contentType), null);
223        header.addHeader(WarcConstants.FN_CONTENT_LENGTH, new Long(payload.length), null);
224        writer.writer.writeHeader(record);
225        in = new ByteArrayInputStream(payload); // A re-read is necessary here!
226        writer.writer.streamPayload(in);
227        writer.writer.closeRecord();
228    }
229
230}