001/*
002 * #%L
003 * Netarchivesuite - harvester
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.harvester.harvesting;
024
025import java.io.ByteArrayInputStream;
026import java.io.File;
027import java.io.IOException;
028import java.net.InetAddress;
029import java.net.URI;
030import java.net.URISyntaxException;
031import java.net.UnknownHostException;
032import java.util.Collection;
033import java.util.HashMap;
034import java.util.Map;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.logging.Level;
037import java.util.logging.Logger;
038
039import javax.management.AttributeNotFoundException;
040import javax.management.MBeanException;
041import javax.management.ReflectionException;
042import javax.xml.xpath.XPath;
043import javax.xml.xpath.XPathConstants;
044import javax.xml.xpath.XPathExpression;
045import javax.xml.xpath.XPathExpressionException;
046import javax.xml.xpath.XPathFactory;
047
048import org.apache.commons.httpclient.Header;
049import org.apache.commons.httpclient.HttpMethodBase;
050import org.apache.commons.httpclient.HttpStatus;
051import org.apache.commons.lang.StringUtils;
052import org.archive.crawler.Heritrix;
053import org.archive.crawler.datamodel.CoreAttributeConstants;
054import org.archive.crawler.datamodel.CrawlURI;
055import org.archive.crawler.datamodel.FetchStatusCodes;
056import org.archive.crawler.deciderules.recrawl.IdenticalDigestDecideRule;
057import org.archive.crawler.event.CrawlStatusListener;
058import org.archive.crawler.extractor.Link;
059import org.archive.crawler.framework.WriterPoolProcessor;
060import org.archive.crawler.settings.MapType;
061import org.archive.crawler.settings.SimpleType;
062import org.archive.crawler.settings.Type;
063import org.archive.io.ReplayInputStream;
064import org.archive.io.WriterPoolMember;
065import org.archive.io.WriterPoolSettings;
066import org.archive.io.warc.WARCConstants;
067import org.archive.io.warc.WARCWriter;
068import org.archive.io.warc.WARCWriterPool;
069import org.archive.uid.GeneratorFactory;
070import org.archive.util.ArchiveUtils;
071import org.archive.util.XmlUtils;
072import org.archive.util.anvl.ANVLRecord;
073import org.w3c.dom.Document;
074import org.w3c.dom.Node;
075import org.w3c.dom.NodeList;
076
077import dk.netarkivet.harvester.datamodel.H1HeritrixTemplate;
078
079/**
080 * WARCWriterProcessor. Goes against the 0.18 version of the WARC specification (which is functionally identical to 0.17
081 * except in the protocol identifier string). See http://archive-access.sourceforge.net/warc/
082 * <p>
083 * Based on the WARCWriterProcessor in package org.archive.crawler.writer With modifications to the WARC-info record..
084 *
085 * @author stack
086 * @author svc
087 * 
088 * // template for adding this metadata to a H1 template.
089/*
090       <map name="metadata-items">
091            <string name="harvestInfo.version">Vilhelm</string>
092            <string name="harvestInfo.jobId">Caroline</string>
093            <string name="harvestInfo.channel">Login</string>
094                        <string name="harvestInfo.harvestNum">ffff</string>                        
095                        <string name="harvestInfo.origHarvestDefinitionID">ffff</string>
096                        <string name="harvestInfo.maxBytesPerDomain">ffff</string>
097                        <string name="harvestInfo.maxObjectsPerDomain">ffff</string>
098                        
099                        <string name="harvestInfo.orderXMLName">Default Orderxml</string>
100                        <string name="harvestInfo.origHarvestDefinitionName">ddddd</string>
101                        <string name="harvestInfo.scheduleName">Every Hour</string>
102                        <string name="harvestInfo.harvestFilenamePrefix">1-1</string>
103                        <string name="harvestInfo.jobSubmitDate">NOW</string>
104                        <string name="harvestInfo.performer">performer</string>
105                        <string name="harvestInfo.audience">audience</string>
106      </map>
107*/
108public class WARCWriterProcessor extends WriterPoolProcessor implements CoreAttributeConstants, CrawlStatusListener,
109        WriterPoolSettings, FetchStatusCodes, WARCConstants {
110
111   private static final Logger logger = Logger.getLogger(WARCWriterProcessor.class.getName());
112
113   private static final long serialVersionUID = -2006725968882994351L;
114
115     public long getDefaultMaxFileSize() {
116        return 1000000000L; // 1 SI giga-byte (109 bytes), per WARC appendix A
117    }
118
119    /** Key for whether to write 'request' type records where possible */
120    public static final String ATTR_WRITE_REQUESTS = "write-requests";
121
122    /** Key for whether to write 'metadata' type records where possible */
123    public static final String ATTR_WRITE_METADATA = "write-metadata";
124
125    /**
126     * Key for whether to write 'revisit' type records when consecutive identical digest
127     */
128    public static final String ATTR_WRITE_REVISIT_FOR_IDENTICAL_DIGESTS = "write-revisit-for-identical-digests";
129
130    /**
131     * Key for whether to write 'revisit' type records for server "304 not modified" responses
132     */
133    public static final String ATTR_WRITE_REVISIT_FOR_NOT_MODIFIED = "write-revisit-for-not-modified";
134    /**
135     * Key for metadata-items to include in the warcinfo.
136     */
137    public static final String ATTR_METADATA_ITEMS = "metadata-items";
138
139    /** Default path list. */
140    private static final String[] DEFAULT_PATH = {"warcs"};
141
142    protected String[] getDefaultPath() {
143        return DEFAULT_PATH;
144    }
145    
146    private Map metadataMap;
147
148    private static final String HARVESTINFO_VERSION = "harvestInfo.version";
149    private static final String HARVESTINFO_JOBID = "harvestInfo.jobId";
150    private static final String HARVESTINFO_CHANNEL = "harvestInfo.channel";
151
152    private static final String HARVESTINFO_HARVESTNUM = "harvestInfo.harvestNum";
153
154    private static final String HARVESTINFO_ORIGHARVESTDEFINITIONID = "harvestInfo.origHarvestDefinitionID";
155
156    private static final String HARVESTINFO_MAXBYTESPERDOMAIN = "harvestInfo.maxBytesPerDomain";
157
158    private static final String HARVESTINFO_MAXOBJECTSPERDOMAIN = "harvestInfo.maxObjectsPerDomain";
159
160    private static final String HARVESTINFO_ORDERXMLNAME = "harvestInfo.orderXMLName";
161
162    private static final String HARVESTINFO_ORIGHARVESTDEFINITIONNAME = "harvestInfo.origHarvestDefinitionName";
163
164    private static final String HARVESTINFO_SCHEDULENAME = "harvestInfo.scheduleName";
165
166    private static final String HARVESTINFO_HARVESTFILENAMEPREFIX = "harvestInfo.harvestFilenamePrefix";
167    private static final String HARVESTINFO_JOBSUBMITDATE = "harvestInfo.jobSubmitDate";
168
169    private static final String HARVESTINFO_PERFORMER = "harvestInfo.performer";
170
171    private static final String HARVESTINFO_AUDIENCE = "harvestInfo.audience";
172
173    /**
174     * @param name Name of this writer.
175     */
176    public WARCWriterProcessor(final String name) {
177        super(name, "Netarchivesuite WARCWriter processor (Version 1.0");
178        Type e = addElementToDefinition(new SimpleType(ATTR_WRITE_REQUESTS,
179                "Whether to write 'request' type records. Default is true.", new Boolean(true)));
180        e.setOverrideable(true);
181        e.setExpertSetting(true);
182        e = addElementToDefinition(new SimpleType(ATTR_WRITE_METADATA,
183                "Whether to write 'metadata' type records. Default is true.", new Boolean(true)));
184        e.setOverrideable(true);
185        e.setExpertSetting(true);
186        e = addElementToDefinition(new SimpleType(ATTR_WRITE_REVISIT_FOR_IDENTICAL_DIGESTS,
187                "Whether to write 'revisit' type records when a URI's "
188                        + "history indicates the previous fetch had an identical " + "content digest. "
189                        + "Default is true.", new Boolean(true)));
190        e.setOverrideable(true);
191        e.setExpertSetting(true);
192        e = addElementToDefinition(new SimpleType(ATTR_WRITE_REVISIT_FOR_NOT_MODIFIED,
193                "Whether to write 'revisit' type records when a "
194                        + "304-Not Modified response is received. Default is true.", new Boolean(true)));
195        e.setOverrideable(true);
196        e.setExpertSetting(true);
197
198        // Add map setting to add NAS metadata to WarcInfo records. 
199
200        e = addElementToDefinition(new MapType(ATTR_METADATA_ITEMS, "Metadata items.", String.class));
201        //e = addElementToDefinition(new StringList(ATTR_METADATA_ITEMS, "Metadata items."));
202        e.setOverrideable(true);
203        e.setExpertSetting(true);
204    }
205
206    protected void setupPool(final AtomicInteger serialNo) {
207        setPool(new WARCWriterPool(serialNo, this, getPoolMaximumActive(), getPoolMaximumWait()));
208    }
209
210    /**
211     * @return Metadata inputs as convenient map.  Returns null if no metadata items.
212     * @throws AttributeNotFoundException
213     * @throws ReflectionException 
214     * @throws MBeanException 
215     */
216    /*
217    public Map<String,Object> getMetadataItems() throws AttributeNotFoundException, MBeanException, ReflectionException {
218        Map<String,Object> result = null;
219        MapType items = (MapType)getAttribute(ATTR_METADATA_ITEMS);
220        if (items != null) {
221            for (Iterator i = items.iterator(null); i.hasNext();) {
222                Attribute a = (Attribute)i.next();
223                if (result == null) {
224                    result = new HashMap<String,Object>();
225                }
226                result.put(a.getName(), a.getValue());
227            }
228        }
229        return result;
230    }
231    */
232
233    @SuppressWarnings("unchecked")
234    /*
235    public List<String> getMetadataItems() {
236        ArrayList<String> results = new ArrayList<String>();
237        Object obj = getAttributeUnchecked(ATTR_METADATA_ITEMS);
238        if (obj != null) {
239            List list = (StringList)obj;
240            for (Iterator i = list.iterator(); i.hasNext();) {
241                String str = (String)i.next();
242                results.add(str);
243            }
244        }
245        return results;
246    }
247    */
248
249    /**
250     * Writes a CrawlURI and its associated data to store file.
251     * <p>
252     * Currently this method understands the following uri types: dns, http, and https.
253     *
254     * @param curi CrawlURI to process.
255     */
256    protected void innerProcess(CrawlURI curi) {
257        // If failure, or we haven't fetched the resource yet, return
258        if (curi.getFetchStatus() <= 0) {
259            return;
260        }
261
262        // If no recorded content at all, don't write record. Except FTP, which
263        // can have empty content, since the "headers" don't count as content.
264        String scheme = curi.getUURI().getScheme().toLowerCase();
265        long recordLength = curi.getContentSize();
266        if (recordLength <= 0 && !scheme.equals("ftp")) {
267            // getContentSize() should be > 0 if any material (even just
268            // HTTP headers with zero-length body) is available.
269            return;
270        }
271
272        try {
273            if (shouldWrite(curi)) {
274                write(scheme, curi);
275            } else {
276                logger.info("This writer does not write out scheme " + scheme + " content");
277            }
278        } catch (IOException e) {
279            curi.addLocalizedError(this.getName(), e, "WriteRecord: " + curi.toString());
280            logger.log(Level.SEVERE, "Failed write of Record: " + curi.toString(), e);
281        }
282    }
283
284    protected void write(final String lowerCaseScheme, final CrawlURI curi) throws IOException {
285        logger.info("writing warc record for " + curi);
286        WriterPoolMember writer = getPool().borrowFile();
287        long position = writer.getPosition();
288        // See if we need to open a new file because we've exceeed maxBytes.
289        // Call to checkFileSize will open new file if we're at maximum for
290        // current file.
291        writer.checkSize();
292        if (writer.getPosition() != position) {
293            // We just closed the file because it was larger than maxBytes.
294            // Add to the totalBytesWritten the size of the first record
295            // in the file, if any.
296            setTotalBytesWritten(getTotalBytesWritten() + (writer.getPosition() - position));
297            position = writer.getPosition();
298        }
299
300        WARCWriter w = (WARCWriter) writer;
301        try {
302            // Write a request, response, and metadata all in the one
303            // 'transaction'.
304            final URI baseid = getRecordID();
305            final String timestamp = ArchiveUtils.getLog14Date(curi.getLong(A_FETCH_BEGAN_TIME));
306            if (lowerCaseScheme.startsWith("http")) {
307                writeHttpRecords(w, curi, baseid, timestamp);
308            } else if (lowerCaseScheme.equals("dns")) {
309                writeDnsRecords(w, curi, baseid, timestamp);
310            } else if (lowerCaseScheme.equals("ftp")) {
311                writeFtpRecords(w, curi, baseid, timestamp);
312            } else {
313                logger.warning("No handler for scheme " + lowerCaseScheme);
314            }
315        } catch (IOException e) {
316            // Invalidate this file (It gets a '.invalid' suffix).
317            getPool().invalidateFile(writer);
318            // Set the writer to null otherwise the pool accounting
319            // of how many active writers gets skewed if we subsequently
320            // do a returnWriter call on this object in the finally block.
321            writer = null;
322            throw e;
323        } finally {
324            if (writer != null) {
325                setTotalBytesWritten(getTotalBytesWritten() + (writer.getPosition() - position));
326                getPool().returnFile(writer);
327            }
328        }
329        checkBytesWritten();
330    }
331
332    private void writeFtpRecords(WARCWriter w, final CrawlURI curi, final URI baseid, final String timestamp)
333            throws IOException {
334        ANVLRecord headers = new ANVLRecord(3);
335        headers.addLabelValue(HEADER_KEY_IP, getHostAddress(curi));
336        String controlConversation = curi.getString(A_FTP_CONTROL_CONVERSATION);
337        URI rid = writeFtpControlConversation(w, timestamp, baseid, curi, headers, controlConversation);
338
339        if (curi.getContentDigest() != null) {
340            headers.addLabelValue(HEADER_KEY_PAYLOAD_DIGEST, curi.getContentDigestSchemeString());
341        }
342
343        if (curi.getHttpRecorder() != null) {
344            if (IdenticalDigestDecideRule.hasIdenticalDigest(curi)
345                    && ((Boolean) getUncheckedAttribute(curi, ATTR_WRITE_REVISIT_FOR_IDENTICAL_DIGESTS))) {
346                rid = writeRevisitDigest(w, timestamp, null, baseid, curi, headers);
347            } else {
348                headers = new ANVLRecord(3);
349                if (curi.isTruncatedFetch()) {
350                    String value = curi.isTimeTruncatedFetch() ? NAMED_FIELD_TRUNCATED_VALUE_TIME : curi
351                            .isLengthTruncatedFetch() ? NAMED_FIELD_TRUNCATED_VALUE_LENGTH : curi
352                            .isHeaderTruncatedFetch() ? NAMED_FIELD_TRUNCATED_VALUE_HEAD :
353                    // TODO: Add this to spec.
354                            TRUNCATED_VALUE_UNSPECIFIED;
355                    headers.addLabelValue(HEADER_KEY_TRUNCATED, value);
356                }
357                if (curi.getContentDigest() != null) {
358                    headers.addLabelValue(HEADER_KEY_PAYLOAD_DIGEST, curi.getContentDigestSchemeString());
359                }
360                headers.addLabelValue(HEADER_KEY_CONCURRENT_TO, '<' + rid.toString() + '>');
361                rid = writeResource(w, timestamp, curi.getContentType(), baseid, curi, headers);
362            }
363        }
364        if (((Boolean) getUncheckedAttribute(curi, ATTR_WRITE_METADATA))) {
365            headers = new ANVLRecord(1);
366            headers.addLabelValue(HEADER_KEY_CONCURRENT_TO, '<' + rid.toString() + '>');
367            writeMetadata(w, timestamp, baseid, curi, headers);
368        }
369    }
370
371    private void writeDnsRecords(WARCWriter w, final CrawlURI curi, final URI baseid, final String timestamp)
372            throws IOException {
373        ANVLRecord headers = null;
374        String ip = curi.getString(A_DNS_SERVER_IP_LABEL);
375        if (ip != null && ip.length() > 0) {
376            headers = new ANVLRecord(1);
377            headers.addLabelValue(HEADER_KEY_IP, ip);
378        }
379        writeResponse(w, timestamp, curi.getContentType(), baseid, curi, headers);
380    }
381
382    private void writeHttpRecords(WARCWriter w, final CrawlURI curi, final URI baseid, final String timestamp)
383            throws IOException {
384        // Add named fields for ip, checksum, and relate the metadata
385        // and request to the resource field.
386        // TODO: Use other than ANVL (or rename ANVL as NameValue or
387        // use RFC822 (commons-httpclient?).
388        ANVLRecord headers = new ANVLRecord(5);
389        if (curi.getContentDigest() != null) {
390            headers.addLabelValue(HEADER_KEY_PAYLOAD_DIGEST, curi.getContentDigestSchemeString());
391        }
392        headers.addLabelValue(HEADER_KEY_IP, getHostAddress(curi));
393        URI rid;
394
395        if (IdenticalDigestDecideRule.hasIdenticalDigest(curi)
396                && ((Boolean) getUncheckedAttribute(curi, ATTR_WRITE_REVISIT_FOR_IDENTICAL_DIGESTS))) {
397            rid = writeRevisitDigest(w, timestamp, HTTP_RESPONSE_MIMETYPE, baseid, curi, headers);
398        } else if (curi.getFetchStatus() == HttpStatus.SC_NOT_MODIFIED
399                && ((Boolean) getUncheckedAttribute(curi, ATTR_WRITE_REVISIT_FOR_NOT_MODIFIED))) {
400            rid = writeRevisitNotModified(w, timestamp, baseid, curi, headers);
401        } else {
402            if (curi.isTruncatedFetch()) {
403                String value = curi.isTimeTruncatedFetch() ? NAMED_FIELD_TRUNCATED_VALUE_TIME : curi
404                        .isLengthTruncatedFetch() ? NAMED_FIELD_TRUNCATED_VALUE_LENGTH
405                        : curi.isHeaderTruncatedFetch() ? NAMED_FIELD_TRUNCATED_VALUE_HEAD :
406                        // TODO: Add this to spec.
407                                TRUNCATED_VALUE_UNSPECIFIED;
408                headers.addLabelValue(HEADER_KEY_TRUNCATED, value);
409            }
410            rid = writeResponse(w, timestamp, HTTP_RESPONSE_MIMETYPE, baseid, curi, headers);
411        }
412
413        headers = new ANVLRecord(1);
414        headers.addLabelValue(HEADER_KEY_CONCURRENT_TO, '<' + rid.toString() + '>');
415
416        if (((Boolean) getUncheckedAttribute(curi, ATTR_WRITE_REQUESTS))) {
417            writeRequest(w, timestamp, HTTP_REQUEST_MIMETYPE, baseid, curi, headers);
418        }
419        if (((Boolean) getUncheckedAttribute(curi, ATTR_WRITE_METADATA))) {
420            writeMetadata(w, timestamp, baseid, curi, headers);
421        }
422    }
423
424    protected URI writeFtpControlConversation(WARCWriter w, String timestamp, URI baseid, CrawlURI curi,
425            ANVLRecord headers, String controlConversation) throws IOException {
426        final URI uid = qualifyRecordID(baseid, TYPE, METADATA);
427        byte[] b = controlConversation.getBytes("UTF-8");
428        w.writeMetadataRecord(curi.toString(), timestamp, FTP_CONTROL_CONVERSATION_MIMETYPE, uid, headers,
429                new ByteArrayInputStream(b), b.length);
430        return uid;
431    }
432
433    protected URI writeRequest(final WARCWriter w, final String timestamp, final String mimetype, final URI baseid,
434            final CrawlURI curi, final ANVLRecord namedFields) throws IOException {
435        final URI uid = qualifyRecordID(baseid, TYPE, REQUEST);
436        ReplayInputStream ris = curi.getHttpRecorder().getRecordedOutput().getReplayInputStream();
437        try {
438            w.writeRequestRecord(curi.toString(), timestamp, mimetype, uid, namedFields, ris, curi.getHttpRecorder()
439                    .getRecordedOutput().getSize());
440        } finally {
441            if (ris != null) {
442                ris.close();
443            }
444        }
445        return uid;
446    }
447
448    protected URI writeResponse(final WARCWriter w, final String timestamp, final String mimetype, final URI baseid,
449            final CrawlURI curi, final ANVLRecord namedFields) throws IOException {
450        ReplayInputStream ris = curi.getHttpRecorder().getRecordedInput().getReplayInputStream();
451        try {
452            w.writeResponseRecord(curi.toString(), timestamp, mimetype, baseid, namedFields, ris, curi
453                    .getHttpRecorder().getRecordedInput().getSize());
454        } finally {
455            if (ris != null) {
456                ris.close();
457            }
458        }
459        return baseid;
460    }
461
462    protected URI writeResource(final WARCWriter w, final String timestamp, final String mimetype, final URI baseid,
463            final CrawlURI curi, final ANVLRecord namedFields) throws IOException {
464        ReplayInputStream ris = curi.getHttpRecorder().getRecordedInput().getReplayInputStream();
465        try {
466            w.writeResourceRecord(curi.toString(), timestamp, mimetype, baseid, namedFields, ris, curi
467                    .getHttpRecorder().getRecordedInput().getSize());
468        } finally {
469            if (ris != null) {
470                ris.close();
471            }
472        }
473        return baseid;
474    }
475
476    protected URI writeRevisitDigest(final WARCWriter w, final String timestamp, final String mimetype,
477            final URI baseid, final CrawlURI curi, final ANVLRecord namedFields) throws IOException {
478        namedFields.addLabelValue(HEADER_KEY_PROFILE, PROFILE_REVISIT_IDENTICAL_DIGEST);
479        namedFields.addLabelValue(HEADER_KEY_TRUNCATED, NAMED_FIELD_TRUNCATED_VALUE_LENGTH);
480
481        ReplayInputStream ris = null;
482        long revisedLength = 0;
483
484        // null mimetype implies no payload
485        if (mimetype != null) {
486            ris = curi.getHttpRecorder().getRecordedInput().getReplayInputStream();
487            revisedLength = curi.getHttpRecorder().getRecordedInput().getContentBegin();
488            revisedLength = revisedLength > 0 ? revisedLength : curi.getHttpRecorder().getRecordedInput().getSize();
489        }
490
491        try {
492            w.writeRevisitRecord(curi.toString(), timestamp, mimetype, baseid, namedFields, ris, revisedLength);
493        } finally {
494            if (ris != null) {
495                ris.close();
496            }
497        }
498        curi.addAnnotation("warcRevisit:digest");
499        return baseid;
500    }
501
502    protected URI writeRevisitNotModified(final WARCWriter w, final String timestamp, final URI baseid,
503            final CrawlURI curi, final ANVLRecord namedFields) throws IOException {
504        namedFields.addLabelValue(HEADER_KEY_PROFILE, PROFILE_REVISIT_NOT_MODIFIED);
505        // save just enough context to understand basis of not-modified
506        if (curi.containsKey(A_HTTP_TRANSACTION)) {
507            HttpMethodBase method = (HttpMethodBase) curi.getObject(A_HTTP_TRANSACTION);
508            saveHeader(A_ETAG_HEADER, method, namedFields, HEADER_KEY_ETAG);
509            saveHeader(A_LAST_MODIFIED_HEADER, method, namedFields, HEADER_KEY_LAST_MODIFIED);
510        }
511        // truncate to zero-length (all necessary info is above)
512        namedFields.addLabelValue(HEADER_KEY_TRUNCATED, NAMED_FIELD_TRUNCATED_VALUE_LENGTH);
513        ReplayInputStream ris = curi.getHttpRecorder().getRecordedInput().getReplayInputStream();
514        try {
515            w.writeRevisitRecord(curi.toString(), timestamp, null, baseid, namedFields, ris, 0);
516        } finally {
517            if (ris != null) {
518                ris.close();
519            }
520        }
521        curi.addAnnotation("warcRevisit:notModified");
522        return baseid;
523    }
524
525    /**
526     * Save a header from the given HTTP operation into the provider headers under a new name
527     *
528     * @param origName header name to get if present
529     * @param method http operation containing headers
530     */
531    protected void saveHeader(String origName, HttpMethodBase method, ANVLRecord headers, String newName) {
532        Header header = method.getResponseHeader(origName);
533        if (header != null) {
534            headers.addLabelValue(newName, header.getValue());
535        }
536    }
537
538    protected URI writeMetadata(final WARCWriter w, final String timestamp, final URI baseid, final CrawlURI curi,
539            final ANVLRecord namedFields) throws IOException {
540        final URI uid = qualifyRecordID(baseid, TYPE, METADATA);
541        // Get some metadata from the curi.
542        // TODO: Get all curi metadata.
543        // TODO: Use other than ANVL (or rename ANVL as NameValue or use
544        // RFC822 (commons-httpclient?).
545        ANVLRecord r = new ANVLRecord();
546        if (curi.isSeed()) {
547            r.addLabel("seed");
548        } else {
549            if (curi.forceFetch()) {
550                r.addLabel("force-fetch");
551            }
552            r.addLabelValue("via", curi.flattenVia());
553            r.addLabelValue("hopsFromSeed", curi.getPathFromSeed());
554            if (curi.containsKey(A_SOURCE_TAG)) {
555                r.addLabelValue("sourceTag", curi.getString(A_SOURCE_TAG));
556            }
557        }
558        long duration = curi.getFetchDuration();
559        if (duration > -1) {
560            r.addLabelValue("fetchTimeMs", Long.toString(duration));
561        }
562
563        if (curi.containsKey(A_FTP_FETCH_STATUS)) {
564            r.addLabelValue("ftpFetchStatus", curi.getString(A_FTP_FETCH_STATUS));
565        }
566
567        // Add outlinks though they are effectively useless without anchor text.
568        Collection<Link> links = curi.getOutLinks();
569        if (links != null && links.size() > 0) {
570            for (Link link : links) {
571                r.addLabelValue("outlink", link.toString());
572            }
573        }
574
575        // TODO: Other curi fields to write to metadata.
576        //
577        // Credentials
578        //
579        // fetch-began-time: 1154569278774
580        // fetch-completed-time: 1154569281816
581        //
582        // Annotations.
583
584        byte[] b = r.getUTF8Bytes();
585        w.writeMetadataRecord(curi.toString(), timestamp, ANVLRecord.MIMETYPE, uid, namedFields,
586                new ByteArrayInputStream(b), b.length);
587        return uid;
588    }
589
590    protected URI getRecordID() throws IOException {
591        URI result;
592        try {
593            result = GeneratorFactory.getFactory().getRecordID();
594        } catch (URISyntaxException e) {
595            throw new IOException(e.toString());
596        }
597        return result;
598    }
599
600    protected URI qualifyRecordID(final URI base, final String key, final String value) throws IOException {
601        URI result;
602        Map<String, String> qualifiers = new HashMap<String, String>(1);
603        qualifiers.put(key, value);
604        try {
605            result = GeneratorFactory.getFactory().qualifyRecordID(base, qualifiers);
606        } catch (URISyntaxException e) {
607            throw new IOException(e.toString());
608        }
609        return result;
610    }
611
612    @Override
613    protected String getFirstrecordStylesheet() {
614        return "/warcinfobody.xsl";
615    }
616
617    /**
618     * Return relevant values as header-like fields (here ANVLRecord, but spec-defined "application/warc-fields" type
619     * when written). Field names from from DCMI Terms and the WARC/0.17 specification.
620     *
621     * @see org.archive.crawler.framework.WriterPoolProcessor#getFirstrecordBody(java.io.File)
622     */
623    @Override
624    protected String getFirstrecordBody(File orderFile) {
625        ANVLRecord record = new ANVLRecord(7);
626        record.addLabelValue("software", "Heritrix/" + Heritrix.getVersion() + " http://crawler.archive.org");
627
628        try {
629            InetAddress host = InetAddress.getLocalHost();
630            record.addLabelValue("ip", host.getHostAddress());
631            record.addLabelValue("hostname", host.getCanonicalHostName());
632        } catch (UnknownHostException e) {
633            logger.log(Level.WARNING, "unable top obtain local crawl engine host", e);
634        }
635
636        // conforms to ISO 28500:2009 as of May 2009
637        // as described at http://bibnum.bnf.fr/WARC/
638        // latest draft as of November 2008
639        record.addLabelValue("format", "WARC File Format 1.0");
640        record.addLabelValue("conformsTo", "http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf");
641
642        // Get other values from order.xml
643        try {
644            Document doc = XmlUtils.getDocument(orderFile);
645            addIfNotBlank(record, "operator", XmlUtils.xpathOrNull(doc, "//meta/operator"));
646            addIfNotBlank(record, "publisher", XmlUtils.xpathOrNull(doc, "//meta/organization"));
647            addIfNotBlank(record, "audience", XmlUtils.xpathOrNull(doc, "//meta/audience"));
648            addIfNotBlank(record, "isPartOf", XmlUtils.xpathOrNull(doc, "//meta/name"));
649
650            // disabling "created" field per HER-1634
651            // though it's theoretically useful as a means of distinguishing
652            // one crawl from another, the current usage/specification is too
653            // vague... in particular a 'created' field in the 'warcinfo' is
654            // reasonable to interpret as applying to the WARC-unit, rather
655            // than the crawl-job-unit so we remove it and see if anyone
656            // complains or makes a case for restoring it in a less-ambiguous
657            // manner
658            // String rawDate = XmlUtils.xpathOrNull(doc,"//meta/date");
659            // if(StringUtils.isNotBlank(rawDate)) {
660            // Date date;
661            // try {
662            // date = ArchiveUtils.parse14DigitDate(rawDate);
663            // addIfNotBlank(record,"created",ArchiveUtils.getLog14Date(date));
664            // } catch (ParseException e) {
665            // logger.log(Level.WARNING,"obtaining warc created date",e);
666            // }
667            // }
668
669            addIfNotBlank(record, "description", XmlUtils.xpathOrNull(doc, "//meta/description"));
670            addIfNotBlank(record, "robots",
671                    XmlUtils.xpathOrNull(doc, "//newObject[@name='robots-honoring-policy']/string[@name='type']"));
672            addIfNotBlank(record, "http-header-user-agent",
673                    XmlUtils.xpathOrNull(doc, "//map[@name='http-headers']/string[@name='user-agent']"));
674            addIfNotBlank(record, "http-header-from",
675                    XmlUtils.xpathOrNull(doc, "//map[@name='http-headers']/string[@name='from']"));
676            if (metadataMap == null) {
677                //metadataMap = getMetadataItems();
678                XPathFactory factory = XPathFactory.newInstance();
679                XPath xpath = factory.newXPath();
680                XPathExpression expr = xpath.compile(H1HeritrixTemplate.METADATA_ITEMS_XPATH);
681                Node node = (Node) expr.evaluate(doc, XPathConstants.NODE);
682                //NodeList nodeList = (NodeList) expr.evaluate(doc, XPathConstants.NODESET);
683                //Node node = nodeList.item(0);
684                if (node != null) {
685                    NodeList nodeList = node.getChildNodes();
686                    if (nodeList != null) {
687                        metadataMap = new HashMap();
688                        for (int i=0; i<nodeList.getLength(); ++i) {
689                                node = nodeList.item(i);
690                                if (node.getNodeType() == Node.ELEMENT_NODE) {
691                                        String typeName = node.getNodeName();
692                                        if ("string".equals(typeName)) {
693                                                Node attribute = node.getAttributes().getNamedItem("name");
694                                                if (attribute != null && attribute.getNodeType() == Node.ATTRIBUTE_NODE) {
695                                                        String key = attribute.getNodeValue();
696                                                        if (key != null && key.length() > 0) {
697                                                                String value = node.getTextContent();
698                                                                metadataMap.put(key, value);
699                                                                // debug
700                                                                //System.out.println(key + "=" + value);
701                                                        }
702                                                }
703                                        }
704                                }
705                        }
706                    }
707                }
708            }
709        } catch (IOException e) {
710            logger.log(Level.WARNING, "Error obtaining warcinfo", e);
711        } catch (XPathExpressionException e) {
712            logger.log(Level.WARNING, "Error obtaining metadata items", e);
713        }
714        /* catch (AttributeNotFoundException e) {
715                logger.log(Level.WARNING, "Error obtaining warcinfo", e);
716                } catch (MBeanException e) {
717                        logger.log(Level.WARNING, "Error obtaining warcinfo", e);
718                } catch (ReflectionException e) {
719                        logger.log(Level.WARNING, "Error obtaining warcinfo", e);
720                }
721                */
722
723        // add fields from harvesInfo.xml version 0.4
724        /*
725         * <harvestInfo> <version>0.4</version> <jobId>1</jobId> <priority>HIGHPRIORITY</priority>
726         * <harvestNum>0</harvestNum> <origHarvestDefinitionID>1</origHarvestDefinitionID>
727         * <maxBytesPerDomain>500000000</maxBytesPerDomain> <maxObjectsPerDomain>2000</maxObjectsPerDomain>
728         * <orderXMLName>default_orderxml</orderXMLName>
729         * <origHarvestDefinitionName>netarkivet</origHarvestDefinitionName> <scheduleName>Once_a_week</scheduleName>
730         * <harvestFilenamePrefix>1-1</harvestFilenamePrefix> <jobSubmitDate>Some date</jobSubmitDate>
731         * <performer>undefined</performer> </harvestInfo>
732         */
733        String netarchiveSuiteComment = "#added by NetarchiveSuite "
734                + dk.netarkivet.common.Constants.getVersionString();
735        ANVLRecord recordNAS = new ANVLRecord(7);
736
737        if (metadataMap != null) {
738            // Add the data from the metadataMap to the WarcInfoRecord.
739            recordNAS.addLabelValue(HARVESTINFO_VERSION, (String) metadataMap.get(HARVESTINFO_VERSION));
740            recordNAS.addLabelValue(HARVESTINFO_JOBID, (String) metadataMap.get(HARVESTINFO_JOBID));
741            recordNAS.addLabelValue(HARVESTINFO_CHANNEL, (String) metadataMap.get(HARVESTINFO_CHANNEL));
742            recordNAS.addLabelValue(HARVESTINFO_HARVESTNUM, (String) metadataMap.get(HARVESTINFO_HARVESTNUM));
743            recordNAS.addLabelValue(HARVESTINFO_ORIGHARVESTDEFINITIONID,  (String) metadataMap.get(HARVESTINFO_ORIGHARVESTDEFINITIONID));
744            recordNAS.addLabelValue(HARVESTINFO_MAXBYTESPERDOMAIN, (String) metadataMap.get(HARVESTINFO_MAXBYTESPERDOMAIN));
745
746            recordNAS.addLabelValue(HARVESTINFO_MAXOBJECTSPERDOMAIN, (String) metadataMap.get(HARVESTINFO_MAXOBJECTSPERDOMAIN));
747            recordNAS.addLabelValue(HARVESTINFO_ORDERXMLNAME, (String) metadataMap.get(HARVESTINFO_ORDERXMLNAME));
748            recordNAS.addLabelValue(HARVESTINFO_ORIGHARVESTDEFINITIONNAME, (String) metadataMap.get(HARVESTINFO_ORIGHARVESTDEFINITIONNAME));
749
750            if (metadataMap.containsKey((HARVESTINFO_SCHEDULENAME))) {
751                recordNAS.addLabelValue(HARVESTINFO_SCHEDULENAME, (String) metadataMap.get(HARVESTINFO_SCHEDULENAME));
752            }
753            recordNAS.addLabelValue(HARVESTINFO_HARVESTFILENAMEPREFIX, (String) metadataMap.get(HARVESTINFO_HARVESTFILENAMEPREFIX));
754     
755            recordNAS.addLabelValue(HARVESTINFO_JOBSUBMITDATE, (String) metadataMap.get(HARVESTINFO_JOBSUBMITDATE));
756        
757            if (metadataMap.containsKey(HARVESTINFO_PERFORMER)) {
758                    recordNAS.addLabelValue(HARVESTINFO_PERFORMER, (String) metadataMap.get(HARVESTINFO_PERFORMER));
759            }
760
761            if (metadataMap.containsKey(HARVESTINFO_AUDIENCE)) { 
762                recordNAS.addLabelValue(HARVESTINFO_AUDIENCE, (String) metadataMap.get(HARVESTINFO_AUDIENCE));
763            }
764        } else {
765                        logger.log(Level.SEVERE, "Error missing metadata");
766        }
767
768        // really ugly to return as string, when it may just be merged with
769        // a couple other fields at write time, but changing would require
770        // larger refactoring
771        return record.toString() + netarchiveSuiteComment + "\n" + recordNAS.toString();
772    }
773
774    protected void addIfNotBlank(ANVLRecord record, String label, String value) {
775        if (StringUtils.isNotBlank(value)) {
776            record.addLabelValue(label, value);
777        }
778    }
779
780}