001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2018 The Royal Danish Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.harvester.harvesting.metadata; 024 025import java.io.ByteArrayInputStream; 026import java.io.File; 027import java.io.FileInputStream; 028import java.io.FileNotFoundException; 029import java.io.IOException; 030import java.io.InputStream; 031import java.net.URISyntaxException; 032import java.util.Date; 033import java.util.UUID; 034 035import org.apache.commons.io.IOUtils; 036import org.archive.util.Base32; 037import org.jwat.common.ANVLRecord; 038import org.jwat.common.ContentType; 039import org.jwat.common.Uri; 040import org.jwat.warc.WarcConstants; 041import org.jwat.warc.WarcDigest; 042import org.jwat.warc.WarcFileNaming; 043import org.jwat.warc.WarcFileNamingSingleFile; 044import org.jwat.warc.WarcFileWriter; 045import org.jwat.warc.WarcFileWriterConfig; 046import org.jwat.warc.WarcHeader; 047import org.jwat.warc.WarcRecord; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051import dk.netarkivet.common.exceptions.IOFailure; 052import dk.netarkivet.common.exceptions.IllegalState; 053import dk.netarkivet.common.utils.ChecksumCalculator; 054import dk.netarkivet.common.utils.SystemUtils; 055 056/** 057 * MetadataFileWriter that writes to WARC files. 058 */ 059public class MetadataFileWriterWarc extends MetadataFileWriter { 060 061 private static final Logger log = LoggerFactory.getLogger(MetadataFileWriterWarc.class); 062 063 /** Writer to this jobs metadatafile. This is closed when the metadata is marked as ready. */ 064 private WarcFileWriter writer = null; 065 066 /** The ID of the Warcinfo record. Set when calling the insertInfoRecord method. */ 067 private Uri warcInfoUID = null; 068 069 /** 070 * Create a <code>MetadataFileWriter</code> for WARC output. 071 * 072 * @param metadataWarcFile The WARC output file 073 * @return <code>MetadataFileWriter</code> for writing metadata files in WARC 074 */ 075 public static MetadataFileWriter createWriter(File metadataWarcFile) { 076 MetadataFileWriterWarc mtfw = new MetadataFileWriterWarc(); 077 WarcFileNaming naming = new WarcFileNamingSingleFile(metadataWarcFile); 078 WarcFileWriterConfig config = new WarcFileWriterConfig(metadataWarcFile.getParentFile(), compressRecords(), Long.MAX_VALUE, true); 079 mtfw.writer = WarcFileWriter.getWarcWriterInstance(naming, config); 080 mtfw.open(); 081 return mtfw; 082 } 083 084 protected void open() { 085 try { 086 writer.open(); 087 } catch (IOException e) { 088 throw new IOFailure("Error opening MetadataFileWriterWarc", e); 089 } 090 } 091 092 @Override 093 public void close() { 094 if (writer != null) { 095 try { 096 writer.close(); 097 } catch (IOException e) { 098 throw new IOFailure("Error closing MetadataFileWriterWarc", e); 099 } 100 } 101 writer = null; 102 } 103 104 @Override 105 public File getFile() { 106 return writer.getFile(); 107 } 108 109 /** 110 * Insert a warcInfoRecord in the WARC-file, if it doesn't already exists. saves the recordID of the written 111 * info-record for future reference to be used for later in the 112 * 113 * @param payloadToInfoRecord the given payload for this record. 114 */ 115 public void insertInfoRecord(ANVLRecord payloadToInfoRecord) { 116 if (warcInfoUID != null) { 117 throw new IllegalState("An WarcInfo record has already been inserted"); 118 } 119 String filename = writer.getFile().getName(); 120 if (filename.endsWith(WarcFileWriter.ACTIVE_SUFFIX)) { 121 filename = filename.substring(0, filename.length() - WarcFileWriter.ACTIVE_SUFFIX.length()); 122 } 123 Uri recordId; 124 try { 125 recordId = new Uri("urn:uuid:" + UUID.randomUUID().toString()); 126 } catch (URISyntaxException e) { 127 throw new IllegalState("Epic fail creating URI from UUID!", e); 128 } 129 warcInfoUID = recordId; 130 try { 131 byte[] payloadAsBytes = payloadToInfoRecord.getUTF8Bytes(); 132 byte[] blockDigestBytes = ChecksumCalculator.digestInputStream(new ByteArrayInputStream(payloadAsBytes), "SHA1"); 133 WarcDigest blockDigest = WarcDigest.createWarcDigest("SHA1", blockDigestBytes, "base32", Base32.encode(blockDigestBytes)); 134 WarcRecord record = WarcRecord.createRecord(writer.writer); 135 WarcHeader header = record.header; 136 header.warcTypeIdx = WarcConstants.RT_IDX_WARCINFO; 137 header.addHeader(WarcConstants.FN_WARC_RECORD_ID, recordId, null); 138 header.addHeader(WarcConstants.FN_WARC_DATE, new Date(), null); 139 header.addHeader(WarcConstants.FN_WARC_FILENAME, filename); 140 header.addHeader(WarcConstants.FN_CONTENT_TYPE, ContentType.parseContentType(WarcConstants.CT_APP_WARC_FIELDS), null); 141 header.addHeader(WarcConstants.FN_CONTENT_LENGTH, new Long(payloadAsBytes.length), null); 142 header.addHeader(WarcConstants.FN_WARC_BLOCK_DIGEST, blockDigest, null); 143 writer.writer.writeHeader(record); 144 ByteArrayInputStream bin = new ByteArrayInputStream(payloadAsBytes); 145 writer.writer.streamPayload(bin); 146 writer.writer.closeRecord(); 147 } catch (IOException e) { 148 throw new IllegalState("Error inserting warcinfo record", e); 149 } 150 } 151 152 @Override 153 public void writeFileTo(File file, String uri, String mime) { 154 writeTo(file, uri, mime); 155 } 156 157 @Override 158 public boolean writeTo(File fileToArchive, String URL, String mimetype) { 159 if (!fileToArchive.isFile()) { 160 throw new IOFailure("Not a file: " + fileToArchive.getPath()); 161 } 162 if (warcInfoUID == null) { 163 throw new IllegalState("An WarcInfo record has not been inserted yet"); 164 } 165 log.info("{} {}", fileToArchive, fileToArchive.length()); 166 byte[] blockDigestBytes = ChecksumCalculator.digestFile(fileToArchive, "SHA1"); 167 WarcDigest blockDigest = WarcDigest.createWarcDigest("SHA1", blockDigestBytes, "base32", Base32.encode(blockDigestBytes)); 168 Uri recordId; 169 try { 170 recordId = new Uri("urn:uuid:" + UUID.randomUUID().toString()); 171 } catch (URISyntaxException e) { 172 throw new IllegalState("Epic fail creating URI from UUID!", e); 173 } 174 InputStream in = null; 175 try { 176 WarcRecord record = WarcRecord.createRecord(writer.writer); 177 WarcHeader header = record.header; 178 header.warcTypeIdx = WarcConstants.RT_IDX_RESOURCE; 179 header.addHeader(WarcConstants.FN_WARC_RECORD_ID, recordId, null); 180 header.addHeader(WarcConstants.FN_WARC_DATE, new Date(), null); 181 header.addHeader(WarcConstants.FN_WARC_WARCINFO_ID, warcInfoUID, null); 182 header.addHeader(WarcConstants.FN_WARC_IP_ADDRESS, SystemUtils.getLocalIP()); 183 header.addHeader(WarcConstants.FN_WARC_TARGET_URI, URL); 184 header.addHeader(WarcConstants.FN_WARC_BLOCK_DIGEST, blockDigest, null); 185 header.addHeader(WarcConstants.FN_CONTENT_TYPE, ContentType.parseContentType(mimetype), null); 186 header.addHeader(WarcConstants.FN_CONTENT_LENGTH, new Long(fileToArchive.length()), null); 187 writer.writer.writeHeader(record); 188 in = new FileInputStream(fileToArchive); 189 writer.writer.streamPayload(in); 190 writer.writer.closeRecord(); 191 } catch (FileNotFoundException e) { 192 throw new IOFailure("Unable to open file: " + fileToArchive.getPath(), e); 193 } catch (IOException e) { 194 throw new IOFailure("Epic IO fail while writing to WARC file: " + fileToArchive.getPath(), e); 195 } finally { 196 IOUtils.closeQuietly(in); 197 } 198 return true; 199 } 200 201 @Override 202 public void write(String uri, String contentType, String hostIP, long fetchBeginTimeStamp, byte[] payload) 203 throws java.io.IOException { 204 ByteArrayInputStream in = new ByteArrayInputStream(payload); 205 byte[] blockDigestBytes = ChecksumCalculator.digestInputStream(in, "SHA1"); 206 WarcDigest blockDigest = WarcDigest.createWarcDigest("SHA1", blockDigestBytes, "base32", Base32.encode(blockDigestBytes)); 207 Uri recordId; 208 try { 209 recordId = new Uri("urn:uuid:" + UUID.randomUUID().toString()); 210 } catch (URISyntaxException e) { 211 throw new IllegalState("Epic fail creating URI from UUID!", e); 212 } 213 WarcRecord record = WarcRecord.createRecord(writer.writer); 214 WarcHeader header = record.header; 215 header.warcTypeIdx = WarcConstants.RT_IDX_RESOURCE; 216 header.addHeader(WarcConstants.FN_WARC_RECORD_ID, recordId, null); 217 header.addHeader(WarcConstants.FN_WARC_DATE, new Date(fetchBeginTimeStamp), null); 218 header.addHeader(WarcConstants.FN_WARC_WARCINFO_ID, warcInfoUID, null); 219 header.addHeader(WarcConstants.FN_WARC_IP_ADDRESS, hostIP); 220 header.addHeader(WarcConstants.FN_WARC_TARGET_URI, uri); 221 header.addHeader(WarcConstants.FN_WARC_BLOCK_DIGEST, blockDigest, null); 222 header.addHeader(WarcConstants.FN_CONTENT_TYPE, ContentType.parseContentType(contentType), null); 223 header.addHeader(WarcConstants.FN_CONTENT_LENGTH, new Long(payload.length), null); 224 writer.writer.writeHeader(record); 225 in = new ByteArrayInputStream(payload); // A re-read is necessary here! 226 writer.writer.streamPayload(in); 227 writer.writer.closeRecord(); 228 } 229 230}