001package dk.netarkivet.harvester.harvesting;
002
003import static org.archive.format.warc.WARCConstants.TYPE;
004import static org.archive.modules.CoreAttributeConstants.A_FTP_FETCH_STATUS;
005import static org.archive.modules.CoreAttributeConstants.A_SOURCE_TAG;
006
007import java.io.ByteArrayInputStream;
008import java.io.IOException;
009import java.net.InetAddress;
010import java.net.URI;
011import java.net.UnknownHostException;
012import java.util.Collection;
013import java.util.Collections;
014import java.util.HashMap;
015import java.util.List;
016import java.util.Map;
017
018import org.apache.commons.lang.StringUtils;
019import org.archive.format.warc.WARCConstants.WARCRecordType;
020import org.archive.io.warc.WARCRecordInfo;
021import org.archive.io.warc.WARCWriter;
022import org.archive.modules.CrawlMetadata;
023import org.archive.modules.CrawlURI;
024import org.archive.modules.writer.WARCWriterProcessor;
025import org.archive.util.ArchiveUtils;
026import org.archive.util.anvl.ANVLRecord;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030/**
031 * Custom NAS WARCWriterProcessor addding NetarchiveSuite metadata to the WARCInfo records written
032 * by Heritrix by just extending the org.archive.modules.writer.WARCWriterProcessor;
033 * This was not possible in H1.
034 * @author svc 
035 * 
036 */
037public class NasWARCProcessor extends WARCWriterProcessor {
038
039    /** Logger instance. */
040    private static final Logger logger = LoggerFactory.getLogger(NasWARCProcessor.class);
041
042
043    // Constants for the contents of the WarcInfo record
044        private static final String HARVESTINFO_VERSION = "harvestInfo.version";
045        private static final String HARVESTINFO_JOBID = "harvestInfo.jobId";
046        private static final String HARVESTINFO_CHANNEL = "harvestInfo.channel";
047        private static final String HARVESTINFO_HARVESTNUM = "harvestInfo.harvestNum";
048        private static final String HARVESTINFO_ORIGHARVESTDEFINITIONID = "harvestInfo.origHarvestDefinitionID";
049        private static final String HARVESTINFO_MAXBYTESPERDOMAIN = "harvestInfo.maxBytesPerDomain";
050        private static final String HARVESTINFO_MAXOBJECTSPERDOMAIN = "harvestInfo.maxObjectsPerDomain";
051        private static final String HARVESTINFO_ORDERXMLNAME = "harvestInfo.orderXMLName";
052        private static final String HARVESTINFO_ORIGHARVESTDEFINITIONNAME = "harvestInfo.origHarvestDefinitionName";
053        private static final String HARVESTINFO_SCHEDULENAME = "harvestInfo.scheduleName";
054        private static final String HARVESTINFO_HARVESTFILENAMEPREFIX = "harvestInfo.harvestFilenamePrefix";
055        private static final String HARVESTINFO_JOBSUBMITDATE = "harvestInfo.jobSubmitDate";
056        private static final String HARVESTINFO_PERFORMER = "harvestInfo.performer";
057        private static final String HARVESTINFO_AUDIENCE = "harvestInfo.audience";
058
059        public boolean getWriteMetadataOutlinks() {
060        return (Boolean) kp.get("writeMetadataOutlinks");
061    }
062    public void setWriteMetadataOutlinks(boolean writeMetadataOutlinks) {
063        kp.put("writeMetadataOutlinks",writeMetadataOutlinks);
064    }
065
066        public NasWARCProcessor() {
067                super();
068        }
069        
070        List<String> cachedMetadata;
071        
072         /**
073     * metadata items.
074     * Add to bean WARCProcessor bean as as
075     * <property name="metadataItems"> 
076     * <map>
077     *  <entry key="harvestInfo.version" value="0.5"/>
078         *      <entry key="harvestInfo.jobId" value="23"/>
079         *  <entry key="harvestInfo.channel" value="FOCUSED"/>
080         * ...  
081     * </map>
082
083     */
084    protected Map<String,String> metadataMap = new HashMap<String,String>();
085
086    public Map<String,String> getFormItems() {
087        return this.metadataMap;
088    }
089    public void setMetadataItems(Map<String,String> metadataItems) {
090        this.metadataMap = metadataItems;
091    }
092
093        
094        @Override
095        public List<String> getMetadata() {
096        if (cachedMetadata != null) {
097            return cachedMetadata;
098        }
099        ANVLRecord record = new ANVLRecord();
100        record.addLabelValue("software", "Heritrix/" +
101                ArchiveUtils.VERSION + " http://crawler.archive.org");
102        try {
103            InetAddress host = InetAddress.getLocalHost();
104            record.addLabelValue("ip", host.getHostAddress());
105            record.addLabelValue("hostname", host.getCanonicalHostName());
106        } catch (UnknownHostException e) {
107            //logger.log(Level.WARNING,"unable top obtain local crawl engine host",e);
108        }
109        
110        // conforms to ISO 28500:2009 as of May 2009
111        // as described at http://bibnum.bnf.fr/WARC/ 
112        // latest draft as of November 2008
113        record.addLabelValue("format","WARC File Format 1.0"); 
114        record.addLabelValue("conformsTo","http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf");
115        
116        // Get other values from metadata provider
117
118        CrawlMetadata provider = getMetadataProvider();
119
120        addIfNotBlank(record,"operator", provider.getOperator());
121        addIfNotBlank(record,"publisher", provider.getOrganization());
122        addIfNotBlank(record,"audience", provider.getAudience());
123        addIfNotBlank(record,"isPartOf", provider.getJobName());
124        // TODO: make date match 'job creation date' as in Heritrix 1.x
125        // until then, leave out (plenty of dates already in WARC 
126        // records
127//            String rawDate = provider.getBeginDate();
128//            if(StringUtils.isNotBlank(rawDate)) {
129//                Date date;
130//                try {
131//                    date = ArchiveUtils.parse14DigitDate(rawDate);
132//                    addIfNotBlank(record,"created",ArchiveUtils.getLog14Date(date));
133//                } catch (ParseException e) {
134//                    logger.log(Level.WARNING,"obtaining warc created date",e);
135//                }
136//            }
137        addIfNotBlank(record,"description", provider.getDescription());
138        addIfNotBlank(record,"robots", provider.getRobotsPolicyName().toLowerCase());
139
140        addIfNotBlank(record,"http-header-user-agent",
141                provider.getUserAgent());
142        addIfNotBlank(record,"http-header-from",
143                provider.getOperatorFrom());
144        
145        
146        String netarchiveSuiteComment = "#added by NetarchiveSuite "
147                + dk.netarkivet.common.Constants.getVersionString();
148        ANVLRecord recordNAS = new ANVLRecord(); // Previously new ANVLRecord(7); 
149
150        try {
151            // Add the data from the metadataMap to the WarcInfoRecord.
152            recordNAS.addLabelValue(HARVESTINFO_VERSION, (String) metadataMap.get(HARVESTINFO_VERSION));
153            recordNAS.addLabelValue(HARVESTINFO_JOBID, (String) metadataMap.get(HARVESTINFO_JOBID));
154            recordNAS.addLabelValue(HARVESTINFO_CHANNEL, (String) metadataMap.get(HARVESTINFO_CHANNEL));
155            recordNAS.addLabelValue(HARVESTINFO_HARVESTNUM, (String) metadataMap.get(HARVESTINFO_HARVESTNUM));
156            recordNAS.addLabelValue(HARVESTINFO_ORIGHARVESTDEFINITIONID,
157            (String) metadataMap.get(HARVESTINFO_ORIGHARVESTDEFINITIONID));
158            recordNAS.addLabelValue(HARVESTINFO_MAXBYTESPERDOMAIN,
159            (String) metadataMap.get(HARVESTINFO_MAXBYTESPERDOMAIN));
160
161            recordNAS.addLabelValue(HARVESTINFO_MAXOBJECTSPERDOMAIN,
162            (String) metadataMap.get(HARVESTINFO_MAXOBJECTSPERDOMAIN));
163            recordNAS.addLabelValue(HARVESTINFO_ORDERXMLNAME,
164            (String) metadataMap.get(HARVESTINFO_ORDERXMLNAME));
165            recordNAS.addLabelValue(HARVESTINFO_ORIGHARVESTDEFINITIONNAME,
166            (String) metadataMap.get(HARVESTINFO_ORIGHARVESTDEFINITIONNAME));
167
168            if (metadataMap.containsKey(HARVESTINFO_SCHEDULENAME)) {
169                recordNAS.addLabelValue(HARVESTINFO_SCHEDULENAME,
170            (String) metadataMap.get(HARVESTINFO_SCHEDULENAME));
171            }
172            recordNAS.addLabelValue(HARVESTINFO_HARVESTFILENAMEPREFIX,
173            (String) metadataMap.get(HARVESTINFO_HARVESTFILENAMEPREFIX));
174
175            recordNAS.addLabelValue(HARVESTINFO_JOBSUBMITDATE,
176            (String) metadataMap.get(HARVESTINFO_JOBSUBMITDATE));
177
178            if (metadataMap.containsKey(HARVESTINFO_PERFORMER)) {
179            recordNAS.addLabelValue(HARVESTINFO_PERFORMER,
180            (String) metadataMap.get(HARVESTINFO_PERFORMER));
181            }
182
183            if (metadataMap.containsKey(HARVESTINFO_AUDIENCE)) {
184                recordNAS.addLabelValue(HARVESTINFO_AUDIENCE,
185            (String) metadataMap.get(HARVESTINFO_AUDIENCE));
186            }
187        } catch (Exception e) {
188                logger.warn("Error processing harvest info" , e);
189        }
190
191        // really ugly to return as List<String>, but changing would require 
192        // larger refactoring
193        cachedMetadata = Collections.singletonList(record.toString() 
194                        + netarchiveSuiteComment + "\n" + recordNAS.toString());
195        return cachedMetadata;
196    }
197        
198        /**
199         * modify default writeMetadata method to handle the write of outlinks
200         * in metadata or not
201         */
202        @Override
203        protected URI writeMetadata(final WARCWriter w,
204            final String timestamp,
205            final URI baseid, final CrawlURI curi,
206            final ANVLRecord namedFields) 
207    throws IOException {
208            WARCRecordInfo recordInfo = new WARCRecordInfo();
209        recordInfo.setType(WARCRecordType.metadata);
210        recordInfo.setUrl(curi.toString());
211        recordInfo.setCreate14DigitDate(timestamp);
212        recordInfo.setMimetype(ANVLRecord.MIMETYPE);
213        recordInfo.setExtraHeaders(namedFields);
214        recordInfo.setEnforceLength(true);
215            
216        recordInfo.setRecordId(qualifyRecordID(baseid, TYPE, WARCRecordType.metadata.toString()));
217
218        // Get some metadata from the curi.
219        // TODO: Get all curi metadata.
220        // TODO: Use other than ANVL (or rename ANVL as NameValue or use
221        // RFC822 (commons-httpclient?).
222        ANVLRecord r = new ANVLRecord();
223        if (curi.isSeed()) {
224            r.addLabel("seed");
225        } else {
226                if (curi.forceFetch()) {
227                        r.addLabel("force-fetch");
228                }
229            if(StringUtils.isNotBlank(flattenVia(curi))) {
230                r.addLabelValue("via", flattenVia(curi));
231            }
232            if(StringUtils.isNotBlank(curi.getPathFromSeed())) {
233                r.addLabelValue("hopsFromSeed", curi.getPathFromSeed());
234            }
235            if (curi.containsDataKey(A_SOURCE_TAG)) {
236                r.addLabelValue("sourceTag", 
237                        (String)curi.getData().get(A_SOURCE_TAG));
238            }
239        }
240        long duration = curi.getFetchCompletedTime() - curi.getFetchBeginTime();
241        if (duration > -1) {
242            r.addLabelValue("fetchTimeMs", Long.toString(duration));
243        }
244        
245        if (curi.getData().containsKey(A_FTP_FETCH_STATUS)) {
246            r.addLabelValue("ftpFetchStatus", curi.getData().get(A_FTP_FETCH_STATUS).toString());
247        }
248
249        if (curi.getRecorder() != null && curi.getRecorder().getCharset() != null) {
250            r.addLabelValue("charsetForLinkExtraction", curi.getRecorder().getCharset().name());
251        }
252        
253        for (String annotation: curi.getAnnotations()) {
254            if (annotation.startsWith("usingCharsetIn") || annotation.startsWith("inconsistentCharsetIn")) {
255                String[] kv = annotation.split(":", 2);
256                r.addLabelValue(kv[0], kv[1]);
257            }
258        }
259
260        //only if parameter is true, add the outlinks
261        if (getWriteMetadataOutlinks() == true) {
262                // Add outlinks though they are effectively useless without anchor text.
263            Collection<CrawlURI> links = curi.getOutLinks();
264            if (links != null && links.size() > 0) {
265                for (CrawlURI link: links) {
266                    r.addLabelValue("outlink", link.getURI()+" "+link.getLastHop()+" "+link.getViaContext());
267                }
268            }
269        }
270
271        // TODO: Other curi fields to write to metadata.
272        // 
273        // Credentials
274        // 
275        // fetch-began-time: 1154569278774
276        // fetch-completed-time: 1154569281816
277        //
278        // Annotations.
279        
280        byte [] b = r.getUTF8Bytes();
281        recordInfo.setContentStream(new ByteArrayInputStream(b));
282        recordInfo.setContentLength((long) b.length);
283        
284        w.writeRecord(recordInfo);
285        
286        return recordInfo.getRecordId();
287    }
288        
289}