001/*
002 * #%L
003 * Netarchivesuite - harvester
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.harvester.harvesting.frontier;
024
025import java.io.BufferedReader;
026import java.io.BufferedWriter;
027import java.io.File;
028import java.io.FileNotFoundException;
029import java.io.FileReader;
030import java.io.FileWriter;
031import java.io.IOException;
032import java.util.Iterator;
033
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037import com.sleepycat.je.DatabaseException;
038import com.sleepycat.je.Environment;
039import com.sleepycat.je.EnvironmentConfig;
040import com.sleepycat.persist.EntityCursor;
041import com.sleepycat.persist.EntityStore;
042import com.sleepycat.persist.PrimaryIndex;
043import com.sleepycat.persist.SecondaryIndex;
044import com.sleepycat.persist.StoreConfig;
045import com.sleepycat.persist.model.Entity;
046import com.sleepycat.persist.model.KeyField;
047import com.sleepycat.persist.model.Persistent;
048import com.sleepycat.persist.model.PrimaryKey;
049import com.sleepycat.persist.model.Relationship;
050import com.sleepycat.persist.model.SecondaryKey;
051
052import dk.netarkivet.common.CommonSettings;
053import dk.netarkivet.common.exceptions.ArgumentNotValid;
054import dk.netarkivet.common.exceptions.IOFailure;
055import dk.netarkivet.common.utils.FileUtils;
056import dk.netarkivet.common.utils.Settings;
057
058/**
059 * Wraps an Heritrix 1 full frontier report. As these reports can be big in size, this implementation relies on Berkeley
060 * DB direct persistence layer to store the report lines, allowing to store the lines partially in memory, and on disk.
061 */
062@SuppressWarnings({"serial"})
063public class FullFrontierReport extends AbstractFrontierReport {
064
065    @Persistent
066    static class PersistentLineKey implements Comparable<PersistentLineKey>, FrontierReportLineOrderKey {
067
068        @KeyField(1)
069        long totalEnqueues;
070
071        @KeyField(2)
072        String domainName;
073
074        // Default empty constructor for BDB.
075        PersistentLineKey() {
076
077        }
078
079        public PersistentLineKey(FrontierReportLine l) {
080            this.domainName = l.getDomainName();
081            this.totalEnqueues = l.getTotalEnqueues();
082        }
083
084        public String getQueueId() {
085            return domainName;
086        }
087
088        public long getQueueSize() {
089            return totalEnqueues;
090        }
091
092        /**
093         * Compares first by decreasing queue size, then by domain name.
094         */
095        @Override
096        public int compareTo(PersistentLineKey k) {
097            return FrontierReportLineNaturalOrder.getInstance().compare(this, k);
098        }
099
100        @Override
101        public String toString() {
102            return totalEnqueues + " " + domainName;
103        }
104
105    }
106
107    @Entity
108    static class PersistentLine extends FrontierReportLine {
109
110        @PrimaryKey
111        private PersistentLineKey primaryKey;
112
113        @SecondaryKey(relate = Relationship.ONE_TO_ONE)
114        private String domainNameKey;
115
116        @SecondaryKey(relate = Relationship.MANY_TO_ONE)
117        private Long totalSpendKey;
118
119        @SecondaryKey(relate = Relationship.MANY_TO_ONE)
120        private Long currentSizeKey;
121
122        // Default empty constructor for BDB.
123        PersistentLine() {
124
125        }
126
127        PersistentLine(FrontierReportLine reportLine) {
128            super(reportLine);
129            this.primaryKey = new PersistentLineKey(reportLine);
130            this.domainNameKey = reportLine.getDomainName();
131            this.currentSizeKey = reportLine.getCurrentSize();
132            this.totalSpendKey = reportLine.getTotalSpend();
133        }
134
135    }
136
137    public class ReportIterator implements Iterator<FrontierReportLine> {
138
139        private final EntityCursor<PersistentLine> cursor;
140        private final Iterator<PersistentLine> iter;
141
142        /**
143         * Returns an iterator on the given sort key.
144         *
145         * @param cursor The cursor (sort key) to iterate on.
146         */
147        ReportIterator(EntityCursor<PersistentLine> cursor) {
148            this.cursor = cursor;
149            iter = cursor.iterator();
150        }
151
152        @Override
153        public boolean hasNext() {
154            return iter.hasNext();
155        }
156
157        @Override
158        public FrontierReportLine next() {
159            return iter.next();
160        }
161
162        @Override
163        public void remove() {
164            throw new ArgumentNotValid("Remove is not supported!");
165        }
166
167        /**
168         * Close method should be called explicitly to free underlying resources!
169         */
170        public void close() {
171            try {
172                cursor.close();
173            } catch (DatabaseException e) {
174                LOG.error("Error closing entity cursor:\n" + e.getLocalizedMessage());
175            }
176        }
177
178    }
179
180    private static final String WORKING_DIR = FullFrontierReport.class.getSimpleName();
181
182    /** The logger for this class. */
183    private static final Logger LOG = LoggerFactory.getLogger(FullFrontierReport.class);
184
185    /**
186     * The Berkeley DB JE environment.
187     */
188    private final Environment dbEnvironment;
189
190    /**
191     * The BDB entity store.
192     */
193    private final EntityStore store;
194
195    /**
196     * Primary index.
197     */
198    private final PrimaryIndex<PersistentLineKey, PersistentLine> linesIndex;
199
200    /**
201     * Secondary index, per domain name.
202     */
203    private final SecondaryIndex<String, PersistentLineKey, PersistentLine> linesByDomain;
204
205    /**
206     * Secondary index, per current size.
207     */
208    private final SecondaryIndex<Long, PersistentLineKey, PersistentLine> linesByCurrentSize;
209
210    /**
211     * Secondary index, per spent budget.
212     */
213    private final SecondaryIndex<Long, PersistentLineKey, PersistentLine> linesBySpentBudget;
214
215    /**
216     * The directory where the BDB is stored.
217     */
218    private final File storageDir;
219
220    /**
221     * Builds an empty frontier report wrapper.
222     *
223     * @param jobName the Heritrix job name
224     */
225    private FullFrontierReport(String jobName) {
226        super(jobName);
227
228        File workingDir = new File(Settings.getFile(CommonSettings.CACHE_DIR), WORKING_DIR);
229
230        this.storageDir = new File(workingDir, jobName);
231        if (!storageDir.mkdirs()) {
232            throw new IOFailure("Failed to create directory " + storageDir.getAbsolutePath());
233        }
234
235        try {
236            EnvironmentConfig envConfig = new EnvironmentConfig();
237            envConfig.setAllowCreate(true);
238            dbEnvironment = new Environment(storageDir, envConfig);
239
240            StoreConfig storeConfig = new StoreConfig();
241            storeConfig.setAllowCreate(true);
242
243            store = new EntityStore(dbEnvironment, FrontierReportLine.class.getSimpleName() + "-" + jobName,
244                    storeConfig);
245
246            linesIndex = store.getPrimaryIndex(PersistentLineKey.class, PersistentLine.class);
247
248            linesByDomain = store.getSecondaryIndex(linesIndex, String.class, "domainNameKey");
249
250            linesByCurrentSize = store.getSecondaryIndex(linesIndex, Long.class, "currentSizeKey");
251
252            linesBySpentBudget = store.getSecondaryIndex(linesIndex, Long.class, "totalSpendKey");
253
254        } catch (DatabaseException e) {
255            throw new IOFailure("Failed to init frontier BDB for job " + jobName, e);
256        }
257
258    }
259
260    /**
261     * Releases all resources once this report is to be discarded. NB this method MUST be explicitly called!
262     */
263    public void dispose() {
264
265        try {
266            store.close();
267            dbEnvironment.cleanLog();
268            dbEnvironment.close();
269        } catch (DatabaseException e) {
270            throw new IOFailure("Failed to close frontier BDB for job " + getJobName(), e);
271        }
272
273        FileUtils.removeRecursively(storageDir);
274    }
275
276    @Override
277    public void addLine(FrontierReportLine line) {
278        try {
279            linesIndex.put(new PersistentLine(line));
280        } catch (DatabaseException e) {
281            throw new IOFailure("Failed to store frontier report line for job " + getJobName(), e);
282        }
283    }
284
285    @Override
286    public FrontierReportLine getLineForDomain(String domainName) {
287        try {
288            return linesByDomain.get(domainName);
289        } catch (DatabaseException e) {
290            LOG.warn("Failed to get queue for domain " + domainName, e);
291            return null;
292        }
293    }
294
295    /**
296     * Returns an iterator where lines are ordered by primary key order: first by decreasing totalEnqueues, then by
297     * domain name natural order.
298     *
299     * @return an iterator on the report lines.
300     */
301    public ReportIterator iterateOnTotalEnqueues() {
302        try {
303            return new ReportIterator(linesIndex.entities());
304        } catch (DatabaseException e) {
305            throw new IOFailure("Failed to read frontier BDB for job " + getJobName(), e);
306        }
307    }
308
309    /**
310     * Returns an iterator where lines are ordered by domain name natural order.
311     *
312     * @return an iterator on the report lines.
313     */
314    public ReportIterator iterateOnDomainName() {
315        try {
316            return new ReportIterator(linesByDomain.entities());
317        } catch (DatabaseException e) {
318            throw new IOFailure("Failed to read frontier BDB for job " + getJobName(), e);
319        }
320    }
321
322    /**
323     * Returns an iterator where lines are ordered by increasing currentSize.
324     *
325     * @return an iterator on the report lines.
326     */
327    public ReportIterator iterateOnCurrentSize() {
328        try {
329            return new ReportIterator(linesByCurrentSize.entities());
330        } catch (DatabaseException e) {
331            throw new IOFailure("Failed to read frontier BDB for job " + getJobName(), e);
332        }
333    }
334
335    /**
336     * Returns an iterator on lines having a given currentSize.
337     *
338     * @param dupValue
339     * @return an iterator on the report lines.
340     */
341    public ReportIterator iterateOnDuplicateCurrentSize(long dupValue) {
342        try {
343            return new ReportIterator(linesByCurrentSize.subIndex(dupValue).entities());
344        } catch (DatabaseException e) {
345            throw new IOFailure("Failed to read frontier BDB for job " + getJobName(), e);
346        }
347    }
348
349    /**
350     * Returns an iterator where lines are ordered by increasing totalSpend.
351     *
352     * @return an iterator on the report lines.
353     */
354    public ReportIterator iterateOnSpentBudget() {
355        try {
356            return new ReportIterator(linesBySpentBudget.entities());
357        } catch (DatabaseException e) {
358            throw new IOFailure("Failed to read frontier BDB for job " + getJobName(), e);
359        }
360    }
361
362    /**
363     * Returns an iterator on lines having a given totalSpend.
364     *
365     * @param dupValue
366     * @return an iterator on the report lines.
367     */
368    public ReportIterator iterateOnDuplicateSpentBudget(long dupValue) {
369        try {
370            return new ReportIterator(linesBySpentBudget.subIndex(dupValue).entities());
371        } catch (DatabaseException e) {
372            throw new IOFailure("Failed to read frontier BDB for job " + getJobName(), e);
373        }
374    }
375
376    /**
377     * Generates an Heritrix frontier report wrapper object by parsing the frontier report returned by the JMX
378     * controller as a string.
379     *
380     * @param jobName the Heritrix job name
381     * @param contentsAsString the text returned by the JMX call
382     * @return the report wrapper object
383     */
384    public static FullFrontierReport parseContentsAsString(String jobName, String contentsAsString) {
385
386        FullFrontierReport report = new FullFrontierReport(jobName);
387
388        // First dump this possibly huge string to a file
389        File tmpDir = Settings.getFile(CommonSettings.CACHE_DIR);
390        File tmpFile = new File(tmpDir, jobName + "-" + System.currentTimeMillis() + ".txt");
391        try {
392            tmpFile.createNewFile();
393            BufferedWriter out = new BufferedWriter(new FileWriter(tmpFile));
394            out.write(contentsAsString);
395            out.close();
396        } catch (IOException e) {
397            LOG.error("Failed to create temporary file", e);
398            return report;
399        }
400
401        BufferedReader br;
402        try {
403            br = new BufferedReader(new FileReader(tmpFile));
404        } catch (FileNotFoundException e) {
405            LOG.error("Failed to read temporary file", e);
406            return report;
407        }
408
409        try {
410            String lineToken = br.readLine(); // Discard header line
411            while ((lineToken = br.readLine()) != null) {
412                report.addLine(new FrontierReportLine(lineToken));
413            }
414
415            br.close();
416        } catch (IOException e) {
417            LOG.warn("Failed to close reader", e);
418        } catch (Throwable t) {
419            LOG.error("",t);
420            t.printStackTrace(System.err);
421        } finally {
422            FileUtils.remove(tmpFile);
423        }
424
425        return report;
426    }
427
428    /**
429     * Return the directory where the BDB is stored.
430     *
431     * @return the storage directory.
432     */
433    File getStorageDir() {
434        return storageDir;
435    }
436
437}