001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.harvester.harvesting.metadata; 024 025import java.io.ByteArrayInputStream; 026import java.io.File; 027import java.io.FileInputStream; 028import java.io.FileNotFoundException; 029import java.io.IOException; 030import java.io.InputStream; 031import java.net.URISyntaxException; 032import java.util.Date; 033import java.util.UUID; 034 035import org.apache.commons.io.IOUtils; 036import org.jwat.common.ANVLRecord; 037import org.jwat.common.ContentType; 038import org.jwat.common.Uri; 039import org.jwat.warc.WarcConstants; 040import org.jwat.warc.WarcFileNaming; 041import org.jwat.warc.WarcFileNamingSingleFile; 042import org.jwat.warc.WarcFileWriter; 043import org.jwat.warc.WarcFileWriterConfig; 044import org.jwat.warc.WarcHeader; 045import org.jwat.warc.WarcRecord; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049import dk.netarkivet.common.exceptions.IOFailure; 050import dk.netarkivet.common.exceptions.IllegalState; 051import dk.netarkivet.common.utils.ChecksumCalculator; 052import dk.netarkivet.common.utils.SystemUtils; 053 054/** 055 * MetadataFileWriter that writes to WARC files. 056 */ 057public class MetadataFileWriterWarc extends MetadataFileWriter { 058 059 private static final Logger log = LoggerFactory.getLogger(MetadataFileWriterWarc.class); 060 061 /** Writer to this jobs metadatafile. This is closed when the metadata is marked as ready. */ 062 private WarcFileWriter writer = null; 063 064 /** The ID of the Warcinfo record. Set when calling the insertInfoRecord method. */ 065 private Uri warcInfoUID = null; 066 067 /** 068 * Create a <code>MetadataFileWriter</code> for WARC output. 069 * 070 * @param metadataWarcFile The WARC output file 071 * @return <code>MetadataFileWriter</code> for writing metadata files in WARC 072 */ 073 public static MetadataFileWriter createWriter(File metadataWarcFile) { 074 MetadataFileWriterWarc mtfw = new MetadataFileWriterWarc(); 075 WarcFileNaming naming = new WarcFileNamingSingleFile(metadataWarcFile); 076 WarcFileWriterConfig config = new WarcFileWriterConfig(metadataWarcFile.getParentFile(), false, Long.MAX_VALUE, true); 077 mtfw.writer = WarcFileWriter.getWarcWriterInstance(naming, config); 078 mtfw.open(); 079 return mtfw; 080 } 081 082 protected void open() { 083 try { 084 writer.open(); 085 } catch (IOException e) { 086 throw new IOFailure("Error opening MetadataFileWriterWarc", e); 087 } 088 } 089 090 @Override 091 public void close() { 092 if (writer != null) { 093 try { 094 writer.close(); 095 } catch (IOException e) { 096 throw new IOFailure("Error closing MetadataFileWriterWarc", e); 097 } 098 } 099 writer = null; 100 } 101 102 @Override 103 public File getFile() { 104 return writer.getFile(); 105 } 106 107 /** 108 * Insert a warcInfoRecord in the WARC-file, if it doesn't already exists. saves the recordID of the written 109 * info-record for future reference to be used for later in the 110 * 111 * @param payloadToInfoRecord the given payload for this record. 112 */ 113 public void insertInfoRecord(ANVLRecord payloadToInfoRecord) { 114 if (warcInfoUID != null) { 115 throw new IllegalState("An WarcInfo record has already been inserted"); 116 } 117 String filename = writer.getFile().getName(); 118 if (filename.endsWith(WarcFileWriter.ACTIVE_SUFFIX)) { 119 filename = filename.substring(0, filename.length() - WarcFileWriter.ACTIVE_SUFFIX.length()); 120 } 121 Uri recordId; 122 try { 123 recordId = new Uri("urn:uuid:" + UUID.randomUUID().toString()); 124 } catch (URISyntaxException e) { 125 throw new IllegalState("Epic fail creating URI from UUID!"); 126 } 127 warcInfoUID = recordId; 128 try { 129 byte[] payloadAsBytes = payloadToInfoRecord.getUTF8Bytes(); 130 String blockDigest = ChecksumCalculator.calculateSha1(new ByteArrayInputStream(payloadAsBytes)); 131 WarcRecord record = WarcRecord.createRecord(writer.writer); 132 WarcHeader header = record.header; 133 header.warcTypeIdx = WarcConstants.RT_IDX_WARCINFO; 134 header.addHeader(WarcConstants.FN_WARC_RECORD_ID, recordId, null); 135 header.addHeader(WarcConstants.FN_WARC_DATE, new Date(), null); 136 header.addHeader(WarcConstants.FN_WARC_FILENAME, filename); 137 header.addHeader(WarcConstants.FN_CONTENT_TYPE, ContentType.parseContentType(WarcConstants.CT_APP_WARC_FIELDS), null); 138 header.addHeader(WarcConstants.FN_CONTENT_LENGTH, new Long(payloadAsBytes.length), null); 139 header.addHeader(WarcConstants.FN_WARC_BLOCK_DIGEST, blockDigest); 140 writer.writer.writeHeader(record); 141 ByteArrayInputStream bin = new ByteArrayInputStream(payloadAsBytes); 142 writer.writer.streamPayload(bin); 143 writer.writer.closeRecord(); 144 } catch (IOException e) { 145 throw new IllegalState("Error inserting warcinfo record", e); 146 } 147 } 148 149 @Override 150 public void writeFileTo(File file, String uri, String mime) { 151 writeTo(file, uri, mime); 152 } 153 154 @Override 155 public boolean writeTo(File fileToArchive, String URL, String mimetype) { 156 if (!fileToArchive.isFile()) { 157 throw new IOFailure("Not a file: " + fileToArchive.getPath()); 158 } 159 if (warcInfoUID == null) { 160 throw new IllegalState("An WarcInfo record has not been inserted yet"); 161 } 162 log.info("{} {}", fileToArchive, fileToArchive.length()); 163 String blockDigest = ChecksumCalculator.calculateSha1(fileToArchive); 164 Uri recordId; 165 try { 166 recordId = new Uri("urn:uuid:" + UUID.randomUUID().toString()); 167 } catch (URISyntaxException e) { 168 throw new IllegalState("Epic fail creating URI from UUID!"); 169 } 170 InputStream in = null; 171 try { 172 WarcRecord record = WarcRecord.createRecord(writer.writer); 173 WarcHeader header = record.header; 174 header.warcTypeIdx = WarcConstants.RT_IDX_RESOURCE; 175 header.addHeader(WarcConstants.FN_WARC_RECORD_ID, recordId, null); 176 header.addHeader(WarcConstants.FN_WARC_DATE, new Date(), null); 177 header.addHeader(WarcConstants.FN_WARC_WARCINFO_ID, warcInfoUID, null); 178 header.addHeader(WarcConstants.FN_WARC_IP_ADDRESS, SystemUtils.getLocalIP()); 179 header.addHeader(WarcConstants.FN_WARC_TARGET_URI, URL); 180 header.addHeader(WarcConstants.FN_WARC_BLOCK_DIGEST, blockDigest); 181 header.addHeader(WarcConstants.FN_CONTENT_TYPE, ContentType.parseContentType(mimetype), null); 182 header.addHeader(WarcConstants.FN_CONTENT_LENGTH, new Long(fileToArchive.length()), null); 183 writer.writer.writeHeader(record); 184 in = new FileInputStream(fileToArchive); 185 writer.writer.streamPayload(in); 186 writer.writer.closeRecord(); 187 } catch (FileNotFoundException e) { 188 throw new IOFailure("Unable to open file: " + fileToArchive.getPath(), e); 189 } catch (IOException e) { 190 throw new IOFailure("Epic IO fail while writing to WARC file: " + fileToArchive.getPath(), e); 191 } finally { 192 IOUtils.closeQuietly(in); 193 } 194 return true; 195 } 196 197 @Override 198 public void write(String uri, String contentType, String hostIP, long fetchBeginTimeStamp, byte[] payload) 199 throws java.io.IOException { 200 ByteArrayInputStream in = new ByteArrayInputStream(payload); 201 String blockDigest = ChecksumCalculator.calculateSha1(in); 202 Uri recordId; 203 try { 204 recordId = new Uri("urn:uuid:" + UUID.randomUUID().toString()); 205 } catch (URISyntaxException e) { 206 throw new IllegalState("Epic fail creating URI from UUID!"); 207 } 208 WarcRecord record = WarcRecord.createRecord(writer.writer); 209 WarcHeader header = record.header; 210 header.warcTypeIdx = WarcConstants.RT_IDX_RESOURCE; 211 header.addHeader(WarcConstants.FN_WARC_RECORD_ID, recordId, null); 212 header.addHeader(WarcConstants.FN_WARC_DATE, new Date(fetchBeginTimeStamp), null); 213 header.addHeader(WarcConstants.FN_WARC_WARCINFO_ID, warcInfoUID, null); 214 header.addHeader(WarcConstants.FN_WARC_IP_ADDRESS, hostIP); 215 header.addHeader(WarcConstants.FN_WARC_TARGET_URI, uri); 216 header.addHeader(WarcConstants.FN_WARC_BLOCK_DIGEST, blockDigest); 217 header.addHeader(WarcConstants.FN_CONTENT_TYPE, ContentType.parseContentType(contentType), null); 218 header.addHeader(WarcConstants.FN_CONTENT_LENGTH, new Long(payload.length), null); 219 writer.writer.writeHeader(record); 220 in = new ByteArrayInputStream(payload); // A re-read is necessary here! 221 writer.writer.streamPayload(in); 222 writer.writer.closeRecord(); 223 } 224 225}