001/*
002 * #%L
003 * Netarchivesuite - harvester
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.harvester.harvesting.frontier;
024
025import java.io.BufferedReader;
026import java.io.BufferedWriter;
027import java.io.ByteArrayInputStream;
028import java.io.File;
029import java.io.FileNotFoundException;
030import java.io.FileReader;
031import java.io.FileWriter;
032import java.io.IOException;
033import java.util.Iterator;
034
035import javax.xml.parsers.DocumentBuilder;
036import javax.xml.parsers.DocumentBuilderFactory;
037
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040import org.w3c.dom.Document;
041import org.w3c.dom.Element;
042import org.w3c.dom.Node;
043import org.w3c.dom.NodeList;
044
045import com.sleepycat.je.DatabaseException;
046import com.sleepycat.je.Environment;
047import com.sleepycat.je.EnvironmentConfig;
048import com.sleepycat.persist.EntityCursor;
049import com.sleepycat.persist.EntityStore;
050import com.sleepycat.persist.PrimaryIndex;
051import com.sleepycat.persist.SecondaryIndex;
052import com.sleepycat.persist.StoreConfig;
053import com.sleepycat.persist.model.Entity;
054import com.sleepycat.persist.model.KeyField;
055import com.sleepycat.persist.model.Persistent;
056import com.sleepycat.persist.model.PrimaryKey;
057import com.sleepycat.persist.model.Relationship;
058import com.sleepycat.persist.model.SecondaryKey;
059
060import dk.netarkivet.common.CommonSettings;
061import dk.netarkivet.common.exceptions.ArgumentNotValid;
062import dk.netarkivet.common.exceptions.IOFailure;
063import dk.netarkivet.common.utils.FileUtils;
064import dk.netarkivet.common.utils.Settings;
065
066/**
067 * Wraps an Heritrix 1 full frontier report. As these reports can be big in size, this implementation relies on Berkeley
068 * DB direct persistence layer to store the report lines, allowing to store the lines partially in memory, and on disk.
069 */
070@SuppressWarnings({"serial"})
071public class FullFrontierReport extends AbstractFrontierReport {
072
073    @Persistent
074    static class PersistentLineKey implements Comparable<PersistentLineKey>, FrontierReportLineOrderKey {
075
076        @KeyField(1)
077        long totalEnqueues;
078
079        @KeyField(2)
080        String domainName;
081
082        // Default empty constructor for BDB.
083        PersistentLineKey() {
084
085        }
086
087        public PersistentLineKey(FrontierReportLine l) {
088            this.domainName = l.getDomainName();
089            this.totalEnqueues = l.getTotalEnqueues();
090        }
091
092        public String getQueueId() {
093            return domainName;
094        }
095
096        public long getQueueSize() {
097            return totalEnqueues;
098        }
099
100        /**
101         * Compares first by decreasing queue size, then by domain name.
102         */
103        @Override
104        public int compareTo(PersistentLineKey k) {
105            return FrontierReportLineNaturalOrder.getInstance().compare(this, k);
106        }
107
108        @Override
109        public String toString() {
110            return totalEnqueues + " " + domainName;
111        }
112
113    }
114
115    @Entity
116    static class PersistentLine extends FrontierReportLine {
117
118        @PrimaryKey
119        private PersistentLineKey primaryKey;
120
121        @SecondaryKey(relate = Relationship.ONE_TO_ONE)
122        private String domainNameKey;
123
124        @SecondaryKey(relate = Relationship.MANY_TO_ONE)
125        private Long totalSpendKey;
126
127        @SecondaryKey(relate = Relationship.MANY_TO_ONE)
128        private Long currentSizeKey;
129
130        // Default empty constructor for BDB.
131        PersistentLine() {
132
133        }
134
135        PersistentLine(FrontierReportLine reportLine) {
136            super(reportLine);
137            this.primaryKey = new PersistentLineKey(reportLine);
138            this.domainNameKey = reportLine.getDomainName();
139            this.currentSizeKey = reportLine.getCurrentSize();
140            this.totalSpendKey = reportLine.getTotalSpend();
141        }
142
143    }
144
145    public class ReportIterator implements Iterator<FrontierReportLine> {
146
147        private final EntityCursor<PersistentLine> cursor;
148        private final Iterator<PersistentLine> iter;
149
150        /**
151         * Returns an iterator on the given sort key.
152         *
153         * @param cursor The cursor (sort key) to iterate on.
154         */
155        ReportIterator(EntityCursor<PersistentLine> cursor) {
156            this.cursor = cursor;
157            iter = cursor.iterator();
158        }
159
160        @Override
161        public boolean hasNext() {
162            return iter.hasNext();
163        }
164
165        @Override
166        public FrontierReportLine next() {
167            return iter.next();
168        }
169
170        @Override
171        public void remove() {
172            throw new ArgumentNotValid("Remove is not supported!");
173        }
174
175        /**
176         * Close method should be called explicitly to free underlying resources!
177         */
178        public void close() {
179            try {
180                cursor.close();
181            } catch (DatabaseException e) {
182                LOG.error("Error closing entity cursor:\n" + e.getLocalizedMessage());
183            }
184        }
185
186    }
187
188    private static final String WORKING_DIR = FullFrontierReport.class.getSimpleName();
189
190    /** The logger for this class. */
191    private static final Logger LOG = LoggerFactory.getLogger(FullFrontierReport.class);
192
193    /**
194     * The Berkeley DB JE environment.
195     */
196    private final Environment dbEnvironment;
197
198    /**
199     * The BDB entity store.
200     */
201    private final EntityStore store;
202
203    /**
204     * Primary index.
205     */
206    private final PrimaryIndex<PersistentLineKey, PersistentLine> linesIndex;
207
208    /**
209     * Secondary index, per domain name.
210     */
211    private final SecondaryIndex<String, PersistentLineKey, PersistentLine> linesByDomain;
212
213    /**
214     * Secondary index, per current size.
215     */
216    private final SecondaryIndex<Long, PersistentLineKey, PersistentLine> linesByCurrentSize;
217
218    /**
219     * Secondary index, per spent budget.
220     */
221    private final SecondaryIndex<Long, PersistentLineKey, PersistentLine> linesBySpentBudget;
222
223    /**
224     * The directory where the BDB is stored.
225     */
226    private final File storageDir;
227
228    /**
229     * Builds an empty frontier report wrapper.
230     *
231     * @param jobName the Heritrix job name
232     */
233    private FullFrontierReport(String jobName) {
234        super(jobName);
235
236        File workingDir = new File(Settings.getFile(CommonSettings.CACHE_DIR), WORKING_DIR);
237
238        this.storageDir = new File(workingDir, jobName);
239        if (!storageDir.mkdirs()) {
240            throw new IOFailure("Failed to create directory " + storageDir.getAbsolutePath());
241        }
242
243        try {
244            EnvironmentConfig envConfig = new EnvironmentConfig();
245            envConfig.setAllowCreate(true);
246            dbEnvironment = new Environment(storageDir, envConfig);
247
248            StoreConfig storeConfig = new StoreConfig();
249            storeConfig.setAllowCreate(true);
250
251            store = new EntityStore(dbEnvironment, FrontierReportLine.class.getSimpleName() + "-" + jobName,
252                    storeConfig);
253
254            linesIndex = store.getPrimaryIndex(PersistentLineKey.class, PersistentLine.class);
255
256            linesByDomain = store.getSecondaryIndex(linesIndex, String.class, "domainNameKey");
257
258            linesByCurrentSize = store.getSecondaryIndex(linesIndex, Long.class, "currentSizeKey");
259
260            linesBySpentBudget = store.getSecondaryIndex(linesIndex, Long.class, "totalSpendKey");
261
262        } catch (DatabaseException e) {
263            throw new IOFailure("Failed to init frontier BDB for job " + jobName, e);
264        }
265
266    }
267
268    /**
269     * Releases all resources once this report is to be discarded. NB this method MUST be explicitly called!
270     */
271    public void dispose() {
272
273        try {
274            store.close();
275            dbEnvironment.cleanLog();
276            dbEnvironment.close();
277        } catch (DatabaseException e) {
278            throw new IOFailure("Failed to close frontier BDB for job " + getJobName(), e);
279        }
280
281        FileUtils.removeRecursively(storageDir);
282    }
283
284    @Override
285    public void addLine(FrontierReportLine line) {
286        try {
287            linesIndex.put(new PersistentLine(line));
288        } catch (DatabaseException e) {
289            throw new IOFailure("Failed to store frontier report line for job " + getJobName(), e);
290        }
291    }
292
293    @Override
294    public FrontierReportLine getLineForDomain(String domainName) {
295        try {
296            return linesByDomain.get(domainName);
297        } catch (DatabaseException e) {
298            LOG.warn("Failed to get queue for domain " + domainName, e);
299            return null;
300        }
301    }
302
303    /**
304     * Returns an iterator where lines are ordered by primary key order: first by decreasing totalEnqueues, then by
305     * domain name natural order.
306     *
307     * @return an iterator on the report lines.
308     */
309    public ReportIterator iterateOnTotalEnqueues() {
310        try {
311            return new ReportIterator(linesIndex.entities());
312        } catch (DatabaseException e) {
313            throw new IOFailure("Failed to read frontier BDB for job " + getJobName(), e);
314        }
315    }
316
317    /**
318     * Returns an iterator where lines are ordered by domain name natural order.
319     *
320     * @return an iterator on the report lines.
321     */
322    public ReportIterator iterateOnDomainName() {
323        try {
324            return new ReportIterator(linesByDomain.entities());
325        } catch (DatabaseException e) {
326            throw new IOFailure("Failed to read frontier BDB for job " + getJobName(), e);
327        }
328    }
329
330    /**
331     * Returns an iterator where lines are ordered by increasing currentSize.
332     *
333     * @return an iterator on the report lines.
334     */
335    public ReportIterator iterateOnCurrentSize() {
336        try {
337            return new ReportIterator(linesByCurrentSize.entities());
338        } catch (DatabaseException e) {
339            throw new IOFailure("Failed to read frontier BDB for job " + getJobName(), e);
340        }
341    }
342
343    /**
344     * Returns an iterator on lines having a given currentSize.
345     *
346     * @param dupValue
347     * @return an iterator on the report lines.
348     */
349    public ReportIterator iterateOnDuplicateCurrentSize(long dupValue) {
350        try {
351            return new ReportIterator(linesByCurrentSize.subIndex(dupValue).entities());
352        } catch (DatabaseException e) {
353            throw new IOFailure("Failed to read frontier BDB for job " + getJobName(), e);
354        }
355    }
356
357    /**
358     * Returns an iterator where lines are ordered by increasing totalSpend.
359     *
360     * @return an iterator on the report lines.
361     */
362    public ReportIterator iterateOnSpentBudget() {
363        try {
364            return new ReportIterator(linesBySpentBudget.entities());
365        } catch (DatabaseException e) {
366            throw new IOFailure("Failed to read frontier BDB for job " + getJobName(), e);
367        }
368    }
369
370    /**
371     * Returns an iterator on lines having a given totalSpend.
372     *
373     * @param dupValue
374     * @return an iterator on the report lines.
375     */
376    public ReportIterator iterateOnDuplicateSpentBudget(long dupValue) {
377        try {
378            return new ReportIterator(linesBySpentBudget.subIndex(dupValue).entities());
379        } catch (DatabaseException e) {
380            throw new IOFailure("Failed to read frontier BDB for job " + getJobName(), e);
381        }
382    }
383    
384    /**
385     * Generates an Heritrix frontier report wrapper object by parsing the frontier report returned by the REST API
386     * controller as XML
387     *
388     * @param jobName the Heritrix job name
389     * @param contentsAsString the text returned by the http REST call
390     * @return the report wrapper object
391     */
392    public static FullFrontierReport parseContentsAsXML(String jobName, byte[] contentsAsXML, String tagName) {
393        //FIXME : instanciate an unique dBuilder
394        DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
395        DocumentBuilder dBuilder;
396                try {
397                        dBuilder = dbFactory.newDocumentBuilder();
398                        Document doc = dBuilder.parse((new ByteArrayInputStream(contentsAsXML)));
399                        
400                        Element e = doc.getDocumentElement();
401                NodeList nList = e.getElementsByTagName(tagName);
402                //get first (and normally unique) item
403                Node nNode = nList.item(0);
404                String contentAsString = nNode.getTextContent();
405                return FullFrontierReport.parseContentsAsString(jobName, contentAsString);
406                } catch (Exception e) {
407                        LOG.error("Failed to parse XML content", e);
408                        return new FullFrontierReport(jobName);
409                }
410    }
411
412    /**
413     * Generates an Heritrix frontier report wrapper object by parsing the frontier report returned by the JMX
414     * controller as a string.
415     *
416     * @param jobName the Heritrix job name
417     * @param contentsAsString the text returned by the JMX call
418     * @return the report wrapper object
419     */
420    public static FullFrontierReport parseContentsAsString(String jobName, String contentsAsString) {
421
422        FullFrontierReport report = new FullFrontierReport(jobName);
423
424        // First dump this possibly huge string to a file
425        File tmpDir = Settings.getFile(CommonSettings.CACHE_DIR);
426        File tmpFile = new File(tmpDir, jobName + "-" + System.currentTimeMillis() + ".txt");
427        try {
428            tmpFile.createNewFile();
429            BufferedWriter out = new BufferedWriter(new FileWriter(tmpFile));
430            out.write(contentsAsString);
431            out.close();
432        } catch (IOException e) {
433            LOG.error("Failed to create temporary file", e);
434            return report;
435        }
436
437        BufferedReader br;
438        try {
439            br = new BufferedReader(new FileReader(tmpFile));
440        } catch (FileNotFoundException e) {
441            LOG.error("Failed to read temporary file", e);
442            return report;
443        }
444
445        try {
446            String lineToken = br.readLine(); // Discard header line
447            while ((lineToken = br.readLine()) != null) {
448                report.addLine(new FrontierReportLine(lineToken));
449            }
450
451            br.close();
452        } catch (IOException e) {
453            LOG.warn("Failed to close reader", e);
454        } catch (Throwable t) {
455            LOG.error("",t);
456            t.printStackTrace(System.err);
457        } finally {
458            FileUtils.remove(tmpFile);
459        }
460
461        return report;
462    }
463
464    /**
465     * Return the directory where the BDB is stored.
466     *
467     * @return the storage directory.
468     */
469    File getStorageDir() {
470        return storageDir;
471    }
472
473}