001/*
002 * #%L
003 * Netarchivesuite - harvester
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.harvester.harvesting;
024
025import java.io.ByteArrayInputStream;
026import java.io.File;
027import java.io.IOException;
028import java.net.InetAddress;
029import java.net.URI;
030import java.net.URISyntaxException;
031import java.net.UnknownHostException;
032import java.util.Collection;
033import java.util.HashMap;
034import java.util.Map;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.logging.Level;
037import java.util.logging.Logger;
038
039import javax.xml.xpath.XPath;
040import javax.xml.xpath.XPathConstants;
041import javax.xml.xpath.XPathExpression;
042import javax.xml.xpath.XPathExpressionException;
043import javax.xml.xpath.XPathFactory;
044
045import org.apache.commons.httpclient.Header;
046import org.apache.commons.httpclient.HttpMethodBase;
047import org.apache.commons.httpclient.HttpStatus;
048import org.apache.commons.lang.StringUtils;
049import org.archive.crawler.Heritrix;
050import org.archive.crawler.datamodel.CoreAttributeConstants;
051import org.archive.crawler.datamodel.CrawlURI;
052import org.archive.crawler.datamodel.FetchStatusCodes;
053import org.archive.crawler.deciderules.recrawl.IdenticalDigestDecideRule;
054import org.archive.crawler.event.CrawlStatusListener;
055import org.archive.crawler.extractor.Link;
056import org.archive.crawler.framework.WriterPoolProcessor;
057import org.archive.crawler.settings.MapType;
058import org.archive.crawler.settings.SimpleType;
059import org.archive.crawler.settings.Type;
060import org.archive.io.ReplayInputStream;
061import org.archive.io.WriterPoolMember;
062import org.archive.io.WriterPoolSettings;
063import org.archive.io.warc.WARCConstants;
064import org.archive.io.warc.WARCWriter;
065import org.archive.io.warc.WARCWriterPool;
066import org.archive.uid.GeneratorFactory;
067import org.archive.util.ArchiveUtils;
068import org.archive.util.XmlUtils;
069import org.archive.util.anvl.ANVLRecord;
070import org.w3c.dom.Document;
071import org.w3c.dom.Node;
072import org.w3c.dom.NodeList;
073
074import dk.netarkivet.harvester.datamodel.H1HeritrixTemplate;
075
076/**
077 * WARCWriterProcessor. Goes against the 0.18 version of the WARC specification (which is functionally identical to 0.17
078 * except in the protocol identifier string). See http://archive-access.sourceforge.net/warc/
079 * <p>
080 * Based on the WARCWriterProcessor in package org.archive.crawler.writer With modifications to the WARC-info record..
081 *
082 * @author stack
083 * @author svc
084 * 
085 * // template for adding this metadata to a H1 template.
086/*
087       <map name="metadata-items">
088            <string name="harvestInfo.version">Vilhelm</string>
089            <string name="harvestInfo.jobId">Caroline</string>
090            <string name="harvestInfo.channel">Login</string>
091                        <string name="harvestInfo.harvestNum">ffff</string>                        
092                        <string name="harvestInfo.origHarvestDefinitionID">ffff</string>
093                        <string name="harvestInfo.maxBytesPerDomain">ffff</string>
094                        <string name="harvestInfo.maxObjectsPerDomain">ffff</string>
095                        
096                        <string name="harvestInfo.orderXMLName">Default Orderxml</string>
097                        <string name="harvestInfo.origHarvestDefinitionName">ddddd</string>
098                        <string name="harvestInfo.scheduleName">Every Hour</string>
099                        <string name="harvestInfo.harvestFilenamePrefix">1-1</string>
100                        <string name="harvestInfo.jobSubmitDate">NOW</string>
101                        <string name="harvestInfo.performer">performer</string>
102                        <string name="harvestInfo.audience">audience</string>
103      </map>
104*/
105public class WARCWriterProcessor extends WriterPoolProcessor implements CoreAttributeConstants, CrawlStatusListener,
106        WriterPoolSettings, FetchStatusCodes, WARCConstants {
107
108   private static final Logger logger = Logger.getLogger(WARCWriterProcessor.class.getName());
109
110   private static final long serialVersionUID = -2006725968882994351L;
111
112     public long getDefaultMaxFileSize() {
113        return 1000000000L; // 1 SI giga-byte (109 bytes), per WARC appendix A
114    }
115
116    /** Key for whether to write 'request' type records where possible */
117    public static final String ATTR_WRITE_REQUESTS = "write-requests";
118
119    /** Key for whether to write 'metadata' type records where possible */
120    public static final String ATTR_WRITE_METADATA = "write-metadata";
121    
122    /** Key for whether to write 'metadata-outlinks' type records where possible */
123    public static final String ATTR_WRITE_METADATA_OUTLINKS = "write-metadata-outlinks";
124
125    /**
126     * Key for whether to write 'revisit' type records when consecutive identical digest
127     */
128    public static final String ATTR_WRITE_REVISIT_FOR_IDENTICAL_DIGESTS = "write-revisit-for-identical-digests";
129
130    /**
131     * Key for whether to write 'revisit' type records for server "304 not modified" responses
132     */
133    public static final String ATTR_WRITE_REVISIT_FOR_NOT_MODIFIED = "write-revisit-for-not-modified";
134    /**
135     * Key for metadata-items to include in the warcinfo.
136     */
137    public static final String ATTR_METADATA_ITEMS = "metadata-items";
138
139    /** Default path list. */
140    private static final String[] DEFAULT_PATH = {"warcs"};
141
142    protected String[] getDefaultPath() {
143        return DEFAULT_PATH;
144    }
145    
146    private Map metadataMap;
147
148    private static final String HARVESTINFO_VERSION = "harvestInfo.version";
149    private static final String HARVESTINFO_JOBID = "harvestInfo.jobId";
150    private static final String HARVESTINFO_CHANNEL = "harvestInfo.channel";
151
152    private static final String HARVESTINFO_HARVESTNUM = "harvestInfo.harvestNum";
153
154    private static final String HARVESTINFO_ORIGHARVESTDEFINITIONID = "harvestInfo.origHarvestDefinitionID";
155
156    private static final String HARVESTINFO_MAXBYTESPERDOMAIN = "harvestInfo.maxBytesPerDomain";
157
158    private static final String HARVESTINFO_MAXOBJECTSPERDOMAIN = "harvestInfo.maxObjectsPerDomain";
159
160    private static final String HARVESTINFO_ORDERXMLNAME = "harvestInfo.orderXMLName";
161
162    private static final String HARVESTINFO_ORIGHARVESTDEFINITIONNAME = "harvestInfo.origHarvestDefinitionName";
163
164    private static final String HARVESTINFO_SCHEDULENAME = "harvestInfo.scheduleName";
165
166    private static final String HARVESTINFO_HARVESTFILENAMEPREFIX = "harvestInfo.harvestFilenamePrefix";
167    private static final String HARVESTINFO_JOBSUBMITDATE = "harvestInfo.jobSubmitDate";
168
169    private static final String HARVESTINFO_PERFORMER = "harvestInfo.performer";
170
171    private static final String HARVESTINFO_AUDIENCE = "harvestInfo.audience";
172
173    /**
174     * @param name Name of this writer.
175     */
176    public WARCWriterProcessor(final String name) {
177        super(name, "Netarchivesuite WARCWriter processor (Version 1.0");
178        Type e = addElementToDefinition(new SimpleType(ATTR_WRITE_REQUESTS,
179                "Whether to write 'request' type records. Default is true.", new Boolean(true)));
180        e.setOverrideable(true);
181        e.setExpertSetting(true);
182        e = addElementToDefinition(new SimpleType(ATTR_WRITE_METADATA,
183                "Whether to write 'metadata' type records. Default is true.", new Boolean(true)));
184        e.setOverrideable(true);
185        e.setExpertSetting(true);
186        e = addElementToDefinition(new SimpleType(ATTR_WRITE_METADATA_OUTLINKS,
187                "Whether to write 'metadata-outlinks' type records. Default is true.", new Boolean(true)));
188        e.setOverrideable(true);
189        e.setExpertSetting(true);
190        e = addElementToDefinition(new SimpleType(ATTR_WRITE_REVISIT_FOR_IDENTICAL_DIGESTS,
191                "Whether to write 'revisit' type records when a URI's "
192                        + "history indicates the previous fetch had an identical " + "content digest. "
193                        + "Default is true.", new Boolean(true)));
194        e.setOverrideable(true);
195        e.setExpertSetting(true);
196        e = addElementToDefinition(new SimpleType(ATTR_WRITE_REVISIT_FOR_NOT_MODIFIED,
197                "Whether to write 'revisit' type records when a "
198                        + "304-Not Modified response is received. Default is true.", new Boolean(true)));
199        e.setOverrideable(true);
200        e.setExpertSetting(true);
201
202        // Add map setting to add NAS metadata to WarcInfo records. 
203        e = addElementToDefinition(new MapType(ATTR_METADATA_ITEMS, "Metadata items.", String.class));
204        e.setOverrideable(true);
205        e.setExpertSetting(true);
206    }
207
208    protected void setupPool(final AtomicInteger serialNo) {
209        setPool(new WARCWriterPool(serialNo, this, getPoolMaximumActive(), getPoolMaximumWait()));
210    }
211
212    
213    /**
214     * Writes a CrawlURI and its associated data to store file.
215     * <p>
216     * Currently this method understands the following uri types: dns, http, and https.
217     *
218     * @param curi CrawlURI to process.
219     */
220    protected void innerProcess(CrawlURI curi) {
221        // If failure, or we haven't fetched the resource yet, return
222        if (curi.getFetchStatus() <= 0) {
223            return;
224        }
225
226        // If no recorded content at all, don't write record. Except FTP, which
227        // can have empty content, since the "headers" don't count as content.
228        String scheme = curi.getUURI().getScheme().toLowerCase();
229        long recordLength = curi.getContentSize();
230        if (recordLength <= 0 && !scheme.equals("ftp")) {
231            // getContentSize() should be > 0 if any material (even just
232            // HTTP headers with zero-length body) is available.
233            return;
234        }
235
236        try {
237            if (shouldWrite(curi)) {
238                write(scheme, curi);
239            } else {
240                logger.info("This writer does not write out scheme " + scheme + " content");
241            }
242        } catch (IOException e) {
243            curi.addLocalizedError(this.getName(), e, "WriteRecord: " + curi.toString());
244            logger.log(Level.SEVERE, "Failed write of Record: " + curi.toString(), e);
245        }
246    }
247
248    protected void write(final String lowerCaseScheme, final CrawlURI curi) throws IOException {
249        logger.info("writing warc record for " + curi);
250        WriterPoolMember writer = getPool().borrowFile();
251        long position = writer.getPosition();
252        // See if we need to open a new file because we've exceeed maxBytes.
253        // Call to checkFileSize will open new file if we're at maximum for
254        // current file.
255        writer.checkSize();
256        if (writer.getPosition() != position) {
257            // We just closed the file because it was larger than maxBytes.
258            // Add to the totalBytesWritten the size of the first record
259            // in the file, if any.
260            setTotalBytesWritten(getTotalBytesWritten() + (writer.getPosition() - position));
261            position = writer.getPosition();
262        }
263
264        WARCWriter w = (WARCWriter) writer;
265        try {
266            // Write a request, response, and metadata all in the one
267            // 'transaction'.
268            final URI baseid = getRecordID();
269            final String timestamp = ArchiveUtils.getLog14Date(curi.getLong(A_FETCH_BEGAN_TIME));
270            if (lowerCaseScheme.startsWith("http")) {
271                writeHttpRecords(w, curi, baseid, timestamp);
272            } else if (lowerCaseScheme.equals("dns")) {
273                writeDnsRecords(w, curi, baseid, timestamp);
274            } else if (lowerCaseScheme.equals("ftp")) {
275                writeFtpRecords(w, curi, baseid, timestamp);
276            } else {
277                logger.warning("No handler for scheme " + lowerCaseScheme);
278            }
279        } catch (IOException e) {
280            // Invalidate this file (It gets a '.invalid' suffix).
281            getPool().invalidateFile(writer);
282            // Set the writer to null otherwise the pool accounting
283            // of how many active writers gets skewed if we subsequently
284            // do a returnWriter call on this object in the finally block.
285            writer = null;
286            throw e;
287        } finally {
288            if (writer != null) {
289                setTotalBytesWritten(getTotalBytesWritten() + (writer.getPosition() - position));
290                getPool().returnFile(writer);
291            }
292        }
293        checkBytesWritten();
294    }
295
296    private void writeFtpRecords(WARCWriter w, final CrawlURI curi, final URI baseid, final String timestamp)
297            throws IOException {
298        ANVLRecord headers = new ANVLRecord(3);
299        headers.addLabelValue(HEADER_KEY_IP, getHostAddress(curi));
300        String controlConversation = curi.getString(A_FTP_CONTROL_CONVERSATION);
301        URI rid = writeFtpControlConversation(w, timestamp, baseid, curi, headers, controlConversation);
302
303        if (curi.getContentDigest() != null) {
304            headers.addLabelValue(HEADER_KEY_PAYLOAD_DIGEST, curi.getContentDigestSchemeString());
305        }
306
307        if (curi.getHttpRecorder() != null) {
308            if (IdenticalDigestDecideRule.hasIdenticalDigest(curi)
309                    && ((Boolean) getUncheckedAttribute(curi, ATTR_WRITE_REVISIT_FOR_IDENTICAL_DIGESTS))) {
310                rid = writeRevisitDigest(w, timestamp, null, baseid, curi, headers);
311            } else {
312                headers = new ANVLRecord(3);
313                if (curi.isTruncatedFetch()) {
314                    String value = curi.isTimeTruncatedFetch() ? NAMED_FIELD_TRUNCATED_VALUE_TIME : curi
315                            .isLengthTruncatedFetch() ? NAMED_FIELD_TRUNCATED_VALUE_LENGTH : curi
316                            .isHeaderTruncatedFetch() ? NAMED_FIELD_TRUNCATED_VALUE_HEAD :
317                    // TODO: Add this to spec.
318                            TRUNCATED_VALUE_UNSPECIFIED;
319                    headers.addLabelValue(HEADER_KEY_TRUNCATED, value);
320                }
321                if (curi.getContentDigest() != null) {
322                    headers.addLabelValue(HEADER_KEY_PAYLOAD_DIGEST, curi.getContentDigestSchemeString());
323                }
324                headers.addLabelValue(HEADER_KEY_CONCURRENT_TO, '<' + rid.toString() + '>');
325                rid = writeResource(w, timestamp, curi.getContentType(), baseid, curi, headers);
326            }
327        }
328        if (((Boolean) getUncheckedAttribute(curi, ATTR_WRITE_METADATA))) {
329            headers = new ANVLRecord(1);
330            headers.addLabelValue(HEADER_KEY_CONCURRENT_TO, '<' + rid.toString() + '>');
331            writeMetadata(w, timestamp, baseid, curi, headers, ((Boolean) getUncheckedAttribute(curi, ATTR_WRITE_METADATA_OUTLINKS)));
332        }
333    }
334
335    private void writeDnsRecords(WARCWriter w, final CrawlURI curi, final URI baseid, final String timestamp)
336            throws IOException {
337        ANVLRecord headers = null;
338        String ip = curi.getString(A_DNS_SERVER_IP_LABEL);
339        if (ip != null && ip.length() > 0) {
340            headers = new ANVLRecord(1);
341            headers.addLabelValue(HEADER_KEY_IP, ip);
342        }
343        writeResponse(w, timestamp, curi.getContentType(), baseid, curi, headers);
344    }
345
346    private void writeHttpRecords(WARCWriter w, final CrawlURI curi, final URI baseid, final String timestamp)
347            throws IOException {
348        // Add named fields for ip, checksum, and relate the metadata
349        // and request to the resource field.
350        // TODO: Use other than ANVL (or rename ANVL as NameValue or
351        // use RFC822 (commons-httpclient?).
352        ANVLRecord headers = new ANVLRecord(5);
353        if (curi.getContentDigest() != null) {
354            headers.addLabelValue(HEADER_KEY_PAYLOAD_DIGEST, curi.getContentDigestSchemeString());
355        }
356        headers.addLabelValue(HEADER_KEY_IP, getHostAddress(curi));
357        URI rid;
358
359        if (IdenticalDigestDecideRule.hasIdenticalDigest(curi)
360                && ((Boolean) getUncheckedAttribute(curi, ATTR_WRITE_REVISIT_FOR_IDENTICAL_DIGESTS))) {
361            rid = writeRevisitDigest(w, timestamp, HTTP_RESPONSE_MIMETYPE, baseid, curi, headers);
362        } else if (curi.getFetchStatus() == HttpStatus.SC_NOT_MODIFIED
363                && ((Boolean) getUncheckedAttribute(curi, ATTR_WRITE_REVISIT_FOR_NOT_MODIFIED))) {
364            rid = writeRevisitNotModified(w, timestamp, baseid, curi, headers);
365        } else {
366            if (curi.isTruncatedFetch()) {
367                String value = curi.isTimeTruncatedFetch() ? NAMED_FIELD_TRUNCATED_VALUE_TIME : curi
368                        .isLengthTruncatedFetch() ? NAMED_FIELD_TRUNCATED_VALUE_LENGTH
369                        : curi.isHeaderTruncatedFetch() ? NAMED_FIELD_TRUNCATED_VALUE_HEAD :
370                        // TODO: Add this to spec.
371                                TRUNCATED_VALUE_UNSPECIFIED;
372                headers.addLabelValue(HEADER_KEY_TRUNCATED, value);
373            }
374            rid = writeResponse(w, timestamp, HTTP_RESPONSE_MIMETYPE, baseid, curi, headers);
375        }
376
377        headers = new ANVLRecord(1);
378        headers.addLabelValue(HEADER_KEY_CONCURRENT_TO, '<' + rid.toString() + '>');
379
380        if (((Boolean) getUncheckedAttribute(curi, ATTR_WRITE_REQUESTS))) {
381            writeRequest(w, timestamp, HTTP_REQUEST_MIMETYPE, baseid, curi, headers);
382        }
383        if (((Boolean) getUncheckedAttribute(curi, ATTR_WRITE_METADATA))) {
384            writeMetadata(w, timestamp, baseid, curi, headers, ((Boolean) getUncheckedAttribute(curi, ATTR_WRITE_METADATA_OUTLINKS)));
385        }
386    }
387
388    protected URI writeFtpControlConversation(WARCWriter w, String timestamp, URI baseid, CrawlURI curi,
389            ANVLRecord headers, String controlConversation) throws IOException {
390        final URI uid = qualifyRecordID(baseid, TYPE, METADATA);
391        byte[] b = controlConversation.getBytes("UTF-8");
392        w.writeMetadataRecord(curi.toString(), timestamp, FTP_CONTROL_CONVERSATION_MIMETYPE, uid, headers,
393                new ByteArrayInputStream(b), b.length);
394        return uid;
395    }
396
397    protected URI writeRequest(final WARCWriter w, final String timestamp, final String mimetype, final URI baseid,
398            final CrawlURI curi, final ANVLRecord namedFields) throws IOException {
399        final URI uid = qualifyRecordID(baseid, TYPE, REQUEST);
400        ReplayInputStream ris = curi.getHttpRecorder().getRecordedOutput().getReplayInputStream();
401        try {
402            w.writeRequestRecord(curi.toString(), timestamp, mimetype, uid, namedFields, ris, curi.getHttpRecorder()
403                    .getRecordedOutput().getSize());
404        } finally {
405            if (ris != null) {
406                ris.close();
407            }
408        }
409        return uid;
410    }
411
412    protected URI writeResponse(final WARCWriter w, final String timestamp, final String mimetype, final URI baseid,
413            final CrawlURI curi, final ANVLRecord namedFields) throws IOException {
414        ReplayInputStream ris = curi.getHttpRecorder().getRecordedInput().getReplayInputStream();
415        try {
416            w.writeResponseRecord(curi.toString(), timestamp, mimetype, baseid, namedFields, ris, curi
417                    .getHttpRecorder().getRecordedInput().getSize());
418        } finally {
419            if (ris != null) {
420                ris.close();
421            }
422        }
423        return baseid;
424    }
425
426    protected URI writeResource(final WARCWriter w, final String timestamp, final String mimetype, final URI baseid,
427            final CrawlURI curi, final ANVLRecord namedFields) throws IOException {
428        ReplayInputStream ris = curi.getHttpRecorder().getRecordedInput().getReplayInputStream();
429        try {
430            w.writeResourceRecord(curi.toString(), timestamp, mimetype, baseid, namedFields, ris, curi
431                    .getHttpRecorder().getRecordedInput().getSize());
432        } finally {
433            if (ris != null) {
434                ris.close();
435            }
436        }
437        return baseid;
438    }
439
440    protected URI writeRevisitDigest(final WARCWriter w, final String timestamp, final String mimetype,
441            final URI baseid, final CrawlURI curi, final ANVLRecord namedFields) throws IOException {
442        namedFields.addLabelValue(HEADER_KEY_PROFILE, PROFILE_REVISIT_IDENTICAL_DIGEST);
443        namedFields.addLabelValue(HEADER_KEY_TRUNCATED, NAMED_FIELD_TRUNCATED_VALUE_LENGTH);
444
445        ReplayInputStream ris = null;
446        long revisedLength = 0;
447
448        // null mimetype implies no payload
449        if (mimetype != null) {
450            ris = curi.getHttpRecorder().getRecordedInput().getReplayInputStream();
451            revisedLength = curi.getHttpRecorder().getRecordedInput().getContentBegin();
452            revisedLength = revisedLength > 0 ? revisedLength : curi.getHttpRecorder().getRecordedInput().getSize();
453        }
454
455        try {
456            w.writeRevisitRecord(curi.toString(), timestamp, mimetype, baseid, namedFields, ris, revisedLength);
457        } finally {
458            if (ris != null) {
459                ris.close();
460            }
461        }
462        curi.addAnnotation("warcRevisit:digest");
463        return baseid;
464    }
465
466    protected URI writeRevisitNotModified(final WARCWriter w, final String timestamp, final URI baseid,
467            final CrawlURI curi, final ANVLRecord namedFields) throws IOException {
468        namedFields.addLabelValue(HEADER_KEY_PROFILE, PROFILE_REVISIT_NOT_MODIFIED);
469        // save just enough context to understand basis of not-modified
470        if (curi.containsKey(A_HTTP_TRANSACTION)) {
471            HttpMethodBase method = (HttpMethodBase) curi.getObject(A_HTTP_TRANSACTION);
472            saveHeader(A_ETAG_HEADER, method, namedFields, HEADER_KEY_ETAG);
473            saveHeader(A_LAST_MODIFIED_HEADER, method, namedFields, HEADER_KEY_LAST_MODIFIED);
474        }
475        // truncate to zero-length (all necessary info is above)
476        namedFields.addLabelValue(HEADER_KEY_TRUNCATED, NAMED_FIELD_TRUNCATED_VALUE_LENGTH);
477        ReplayInputStream ris = curi.getHttpRecorder().getRecordedInput().getReplayInputStream();
478        try {
479            w.writeRevisitRecord(curi.toString(), timestamp, null, baseid, namedFields, ris, 0);
480        } finally {
481            if (ris != null) {
482                ris.close();
483            }
484        }
485        curi.addAnnotation("warcRevisit:notModified");
486        return baseid;
487    }
488
489    /**
490     * Save a header from the given HTTP operation into the provider headers under a new name
491     *
492     * @param origName header name to get if present
493     * @param method http operation containing headers
494     */
495    protected void saveHeader(String origName, HttpMethodBase method, ANVLRecord headers, String newName) {
496        Header header = method.getResponseHeader(origName);
497        if (header != null) {
498            headers.addLabelValue(newName, header.getValue());
499        }
500    }
501
502    protected URI writeMetadata(final WARCWriter w, final String timestamp, final URI baseid, final CrawlURI curi,
503            final ANVLRecord namedFields, final boolean writeMetadataOutlinks) throws IOException {
504        final URI uid = qualifyRecordID(baseid, TYPE, METADATA);
505        // Get some metadata from the curi.
506        // TODO: Get all curi metadata.
507        // TODO: Use other than ANVL (or rename ANVL as NameValue or use
508        // RFC822 (commons-httpclient?).
509        ANVLRecord r = new ANVLRecord();
510        if (curi.isSeed()) {
511            r.addLabel("seed");
512        } else {
513            if (curi.forceFetch()) {
514                r.addLabel("force-fetch");
515            }
516            r.addLabelValue("via", curi.flattenVia());
517            r.addLabelValue("hopsFromSeed", curi.getPathFromSeed());
518            if (curi.containsKey(A_SOURCE_TAG)) {
519                r.addLabelValue("sourceTag", curi.getString(A_SOURCE_TAG));
520            }
521        }
522        long duration = curi.getFetchDuration();
523        if (duration > -1) {
524            r.addLabelValue("fetchTimeMs", Long.toString(duration));
525        }
526
527        if (curi.containsKey(A_FTP_FETCH_STATUS)) {
528            r.addLabelValue("ftpFetchStatus", curi.getString(A_FTP_FETCH_STATUS));
529        }
530
531        //only if parameter is true, add the outlinks
532        if (writeMetadataOutlinks == true) {
533                // Add outlinks though they are effectively useless without anchor text.
534                Collection<Link> links = curi.getOutLinks();
535                if (links != null && links.size() > 0) {
536                    for (Link link : links) {
537                        r.addLabelValue("outlink", link.toString());
538                    }
539                }
540        }
541
542        // TODO: Other curi fields to write to metadata.
543        //
544        // Credentials
545        //
546        // fetch-began-time: 1154569278774
547        // fetch-completed-time: 1154569281816
548        //
549        // Annotations.
550
551        byte[] b = r.getUTF8Bytes();
552        w.writeMetadataRecord(curi.toString(), timestamp, ANVLRecord.MIMETYPE, uid, namedFields,
553                new ByteArrayInputStream(b), b.length);
554        return uid;
555    }
556
557    protected URI getRecordID() throws IOException {
558        URI result;
559        try {
560            result = GeneratorFactory.getFactory().getRecordID();
561        } catch (URISyntaxException e) {
562            throw new IOException(e.toString());
563        }
564        return result;
565    }
566
567    protected URI qualifyRecordID(final URI base, final String key, final String value) throws IOException {
568        URI result;
569        Map<String, String> qualifiers = new HashMap<String, String>(1);
570        qualifiers.put(key, value);
571        try {
572            result = GeneratorFactory.getFactory().qualifyRecordID(base, qualifiers);
573        } catch (URISyntaxException e) {
574            throw new IOException(e.toString());
575        }
576        return result;
577    }
578
579    @Override
580    protected String getFirstrecordStylesheet() {
581        return "/warcinfobody.xsl";
582    }
583
584    /**
585     * Return relevant values as header-like fields (here ANVLRecord, but spec-defined "application/warc-fields" type
586     * when written). Field names from from DCMI Terms and the WARC/0.17 specification.
587     *
588     * @see org.archive.crawler.framework.WriterPoolProcessor#getFirstrecordBody(java.io.File)
589     */
590    @Override
591    protected String getFirstrecordBody(File orderFile) {
592        ANVLRecord record = new ANVLRecord(7);
593        record.addLabelValue("software", "Heritrix/" + Heritrix.getVersion() + " http://crawler.archive.org");
594
595        try {
596            InetAddress host = InetAddress.getLocalHost();
597            record.addLabelValue("ip", host.getHostAddress());
598            record.addLabelValue("hostname", host.getCanonicalHostName());
599        } catch (UnknownHostException e) {
600            logger.log(Level.WARNING, "unable top obtain local crawl engine host", e);
601        }
602
603        // conforms to ISO 28500:2009 as of May 2009
604        // as described at http://bibnum.bnf.fr/WARC/
605        // latest draft as of November 2008
606        record.addLabelValue("format", "WARC File Format 1.0");
607        record.addLabelValue("conformsTo", "http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf");
608
609        // Get other values from order.xml
610        try {
611            Document doc = XmlUtils.getDocument(orderFile);
612            addIfNotBlank(record, "operator", XmlUtils.xpathOrNull(doc, "//meta/operator"));
613            addIfNotBlank(record, "publisher", XmlUtils.xpathOrNull(doc, "//meta/organization"));
614            addIfNotBlank(record, "audience", XmlUtils.xpathOrNull(doc, "//meta/audience"));
615            addIfNotBlank(record, "isPartOf", XmlUtils.xpathOrNull(doc, "//meta/name"));
616
617            // disabling "created" field per HER-1634
618            // though it's theoretically useful as a means of distinguishing
619            // one crawl from another, the current usage/specification is too
620            // vague... in particular a 'created' field in the 'warcinfo' is
621            // reasonable to interpret as applying to the WARC-unit, rather
622            // than the crawl-job-unit so we remove it and see if anyone
623            // complains or makes a case for restoring it in a less-ambiguous
624            // manner
625            // String rawDate = XmlUtils.xpathOrNull(doc,"//meta/date");
626            // if(StringUtils.isNotBlank(rawDate)) {
627            // Date date;
628            // try {
629            // date = ArchiveUtils.parse14DigitDate(rawDate);
630            // addIfNotBlank(record,"created",ArchiveUtils.getLog14Date(date));
631            // } catch (ParseException e) {
632            // logger.log(Level.WARNING,"obtaining warc created date",e);
633            // }
634            // }
635
636            addIfNotBlank(record, "description", XmlUtils.xpathOrNull(doc, "//meta/description"));
637            addIfNotBlank(record, "robots",
638                    XmlUtils.xpathOrNull(doc, "//newObject[@name='robots-honoring-policy']/string[@name='type']"));
639            addIfNotBlank(record, "http-header-user-agent",
640                    XmlUtils.xpathOrNull(doc, "//map[@name='http-headers']/string[@name='user-agent']"));
641            addIfNotBlank(record, "http-header-from",
642                    XmlUtils.xpathOrNull(doc, "//map[@name='http-headers']/string[@name='from']"));
643            if (metadataMap == null) {
644                //metadataMap = getMetadataItems();
645                XPathFactory factory = XPathFactory.newInstance();
646                XPath xpath = factory.newXPath();
647                XPathExpression expr = xpath.compile(H1HeritrixTemplate.METADATA_ITEMS_XPATH);
648                Node node = (Node) expr.evaluate(doc, XPathConstants.NODE);
649                //NodeList nodeList = (NodeList) expr.evaluate(doc, XPathConstants.NODESET);
650                //Node node = nodeList.item(0);
651                if (node != null) {
652                    NodeList nodeList = node.getChildNodes();
653                    if (nodeList != null) {
654                        metadataMap = new HashMap();
655                        for (int i=0; i<nodeList.getLength(); ++i) {
656                                node = nodeList.item(i);
657                                if (node.getNodeType() == Node.ELEMENT_NODE) {
658                                        String typeName = node.getNodeName();
659                                        if ("string".equals(typeName)) {
660                                                Node attribute = node.getAttributes().getNamedItem("name");
661                                                if (attribute != null && attribute.getNodeType() == Node.ATTRIBUTE_NODE) {
662                                                        String key = attribute.getNodeValue();
663                                                        if (key != null && key.length() > 0) {
664                                                                String value = node.getTextContent();
665                                                                metadataMap.put(key, value);
666                                                                // debug
667                                                                //System.out.println(key + "=" + value);
668                                                        }
669                                                }
670                                        }
671                                }
672                        }
673                    }
674                }
675            }
676        } catch (IOException e) {
677            logger.log(Level.WARNING, "Error obtaining warcinfo", e);
678        } catch (XPathExpressionException e) {
679            logger.log(Level.WARNING, "Error obtaining metadata items", e);
680        }
681        
682        // add fields from harvesInfo.xml version 0.4
683        /*
684         * <harvestInfo> <version>0.4</version> <jobId>1</jobId> <priority>HIGHPRIORITY</priority>
685         * <harvestNum>0</harvestNum> <origHarvestDefinitionID>1</origHarvestDefinitionID>
686         * <maxBytesPerDomain>500000000</maxBytesPerDomain> <maxObjectsPerDomain>2000</maxObjectsPerDomain>
687         * <orderXMLName>default_orderxml</orderXMLName>
688         * <origHarvestDefinitionName>netarkivet</origHarvestDefinitionName> <scheduleName>Once_a_week</scheduleName>
689         * <harvestFilenamePrefix>1-1</harvestFilenamePrefix> <jobSubmitDate>Some date</jobSubmitDate>
690         * <performer>undefined</performer> </harvestInfo>
691         */
692        String netarchiveSuiteComment = "#added by NetarchiveSuite "
693                + dk.netarkivet.common.Constants.getVersionString();
694        ANVLRecord recordNAS = new ANVLRecord(7);
695
696        if (metadataMap != null) {
697            // Add the data from the metadataMap to the WarcInfoRecord.
698            recordNAS.addLabelValue(HARVESTINFO_VERSION, (String) metadataMap.get(HARVESTINFO_VERSION));
699            recordNAS.addLabelValue(HARVESTINFO_JOBID, (String) metadataMap.get(HARVESTINFO_JOBID));
700            recordNAS.addLabelValue(HARVESTINFO_CHANNEL, (String) metadataMap.get(HARVESTINFO_CHANNEL));
701            recordNAS.addLabelValue(HARVESTINFO_HARVESTNUM, (String) metadataMap.get(HARVESTINFO_HARVESTNUM));
702            recordNAS.addLabelValue(HARVESTINFO_ORIGHARVESTDEFINITIONID,  (String) metadataMap.get(HARVESTINFO_ORIGHARVESTDEFINITIONID));
703            recordNAS.addLabelValue(HARVESTINFO_MAXBYTESPERDOMAIN, (String) metadataMap.get(HARVESTINFO_MAXBYTESPERDOMAIN));
704
705            recordNAS.addLabelValue(HARVESTINFO_MAXOBJECTSPERDOMAIN, (String) metadataMap.get(HARVESTINFO_MAXOBJECTSPERDOMAIN));
706            recordNAS.addLabelValue(HARVESTINFO_ORDERXMLNAME, (String) metadataMap.get(HARVESTINFO_ORDERXMLNAME));
707            recordNAS.addLabelValue(HARVESTINFO_ORIGHARVESTDEFINITIONNAME, (String) metadataMap.get(HARVESTINFO_ORIGHARVESTDEFINITIONNAME));
708
709            if (metadataMap.containsKey((HARVESTINFO_SCHEDULENAME))) {
710                recordNAS.addLabelValue(HARVESTINFO_SCHEDULENAME, (String) metadataMap.get(HARVESTINFO_SCHEDULENAME));
711            }
712            recordNAS.addLabelValue(HARVESTINFO_HARVESTFILENAMEPREFIX, (String) metadataMap.get(HARVESTINFO_HARVESTFILENAMEPREFIX));
713     
714            recordNAS.addLabelValue(HARVESTINFO_JOBSUBMITDATE, (String) metadataMap.get(HARVESTINFO_JOBSUBMITDATE));
715        
716            if (metadataMap.containsKey(HARVESTINFO_PERFORMER)) {
717                    recordNAS.addLabelValue(HARVESTINFO_PERFORMER, (String) metadataMap.get(HARVESTINFO_PERFORMER));
718            }
719
720            if (metadataMap.containsKey(HARVESTINFO_AUDIENCE)) { 
721                recordNAS.addLabelValue(HARVESTINFO_AUDIENCE, (String) metadataMap.get(HARVESTINFO_AUDIENCE));
722            }
723        } else {
724                        logger.log(Level.SEVERE, "Error missing metadata");
725        }
726
727        // really ugly to return as string, when it may just be merged with
728        // a couple other fields at write time, but changing would require
729        // larger refactoring
730        return record.toString() + netarchiveSuiteComment + "\n" + recordNAS.toString();
731    }
732
733    protected void addIfNotBlank(ANVLRecord record, String label, String value) {
734        if (StringUtils.isNotBlank(value)) {
735            record.addLabelValue(label, value);
736        }
737    }
738
739}