001/*
002 * #%L
003 * Netarchivesuite - harvester
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.harvester.indexserver.distribute;
024
025import java.io.File;
026import java.io.FileInputStream;
027import java.io.FileOutputStream;
028import java.io.FileReader;
029import java.io.IOException;
030import java.io.ObjectInputStream;
031import java.io.ObjectOutputStream;
032import java.util.ArrayList;
033import java.util.EnumMap;
034import java.util.HashMap;
035import java.util.HashSet;
036import java.util.List;
037import java.util.Map;
038import java.util.Set;
039import java.util.Timer;
040import java.util.TimerTask;
041import java.util.concurrent.atomic.AtomicBoolean;
042
043import org.apache.commons.io.IOUtils;
044import org.apache.commons.io.LineIterator;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import dk.netarkivet.common.distribute.Channels;
049import dk.netarkivet.common.distribute.JMSConnection;
050import dk.netarkivet.common.distribute.JMSConnectionFactory;
051import dk.netarkivet.common.distribute.RemoteFile;
052import dk.netarkivet.common.distribute.RemoteFileFactory;
053import dk.netarkivet.common.distribute.indexserver.RequestType;
054import dk.netarkivet.common.exceptions.ArgumentNotValid;
055import dk.netarkivet.common.exceptions.IOFailure;
056import dk.netarkivet.common.exceptions.IllegalState;
057import dk.netarkivet.common.exceptions.UnknownID;
058import dk.netarkivet.common.utils.CleanupIF;
059import dk.netarkivet.common.utils.FileUtils;
060import dk.netarkivet.common.utils.Settings;
061import dk.netarkivet.common.utils.StringUtils;
062import dk.netarkivet.harvester.HarvesterSettings;
063import dk.netarkivet.harvester.distribute.HarvesterMessageHandler;
064import dk.netarkivet.harvester.distribute.IndexReadyMessage;
065import dk.netarkivet.harvester.indexserver.FileBasedCache;
066import dk.netarkivet.harvester.indexserver.IndexRequestServerInterface;
067
068/**
069 * Index request server singleton.
070 * <p>
071 * This class contains a singleton that handles requesting an index over JMS.
072 * <p>
073 * This has two modes. 1) Given a file with a list of jobIDs, it will always return the same lucene index based on the
074 * list of job identifiers in the file regardless of what kind of index the client is requesting. 2) if setting
075 * "settings.harvester.indexserver.alwaysSetIsIndexReadyToFalse" is true it will always return the IndexRequestMessage
076 * with isindexready set to false.
077 */
078public final class TestIndexRequestServer extends HarvesterMessageHandler implements CleanupIF,
079        IndexRequestServerInterface {
080
081    /** The class logger. */
082    private static final Logger log = LoggerFactory.getLogger(TestIndexRequestServer.class);
083
084    /** The default place in classpath where the settings file can be found. */
085    private static String defaultSettingsClasspath = "dk/netarkivet/harvester/"
086            + "indexserver/distribute/TestIndexRequestServerSettings.xml";
087
088    /*
089     * The static initialiser is called when the class is loaded. It will add default values for all settings defined in
090     * this class, by loading them from a settings.xml file in classpath.
091     */
092    static {
093        Settings.addDefaultClasspathSettings(defaultSettingsClasspath);
094    }
095
096    /**
097     * <b>settings.harvester.indexserver.fileContainingJobsForTestindex<b>: The file containing the list of jobids that
098     * the test index uses as data. The default name of the file is "jobids.txt"
099     */
100    public static String JOBS_FOR_TESTINDEX = "settings.harvester.indexserver.indexrequestserver.fileContainingJobsForTestindex";
101
102    /**
103     * <b>settings.archive.indexserver.alwaysSetIsIndexReadyToFalse<b>: The default: false. If set to true, the
104     * IndexRequestMessage returned has always isindexready = false.
105     */
106    public static String ALWAYS_SET_ISINDEX_READY_TO_FALSE = "settings.harvester.indexserver.indexrequestserver.alwaysSetIsIndexReadyToFalse";
107
108    /** The unique instance. */
109    private static TestIndexRequestServer instance;
110    /** The handlers for index request types. */
111    private Map<RequestType, FileBasedCache<Set<Long>>> handlers;
112
113    /** The connection to the JMSBroker. */
114    private static JMSConnection conn;
115    /** A set with the current indexing jobs in progress. */
116    private static Map<String, IndexRequestMessage> currentJobs;
117    /** The max number of concurrent jobs. */
118    private static long maxConcurrentJobs;
119    /** Are we listening, now. */
120    private static AtomicBoolean isListening = new AtomicBoolean();
121
122    /** Interval in milliseconds between listening checks. */
123    private static long listeningInterval;
124    /** The timer that initiates the checkIflisteningTask. */
125    private Timer checkIflisteningTimer = new Timer();
126
127    /** The File containing the list of jobids, that the default index consists of. */
128    private File jobsForDefaultIndex;
129
130    private boolean alwaysReturnFalseMode = false;
131
132    /** The set of Jobs ids used for the default index. */
133    private Set<Long> defaultIDs;
134
135    /**
136     * The directory to store backup copies of the currentJobs. In case of the indexserver crashing.
137     */
138    private File requestDir;
139
140    /**
141     * Initialise index request server with no handlers, listening to the index JMS channel.
142     */
143    private TestIndexRequestServer() {
144        maxConcurrentJobs = Settings.getLong(HarvesterSettings.INDEXSERVER_INDEXING_MAXCLIENTS);
145        requestDir = Settings.getFile(HarvesterSettings.INDEXSERVER_INDEXING_REQUESTDIR);
146        listeningInterval = Settings.getLong(HarvesterSettings.INDEXSERVER_INDEXING_LISTENING_INTERVAL);
147
148        alwaysReturnFalseMode = Settings.getBoolean(ALWAYS_SET_ISINDEX_READY_TO_FALSE);
149        if (alwaysReturnFalseMode) {
150            log.info("alwaysSetIsIndexReadyToFalse is true");
151        } else {
152            log.info("alwaysSetIsIndexReadyToFalse is false");
153        }
154
155        jobsForDefaultIndex = Settings.getFile(JOBS_FOR_TESTINDEX);
156
157        if (!jobsForDefaultIndex.exists()) {
158            final String msg = "The file '" + jobsForDefaultIndex.getAbsolutePath() + "' does not exist";
159            log.error("The file containing job identifiers for default index '{}' does not exist",
160                    jobsForDefaultIndex.getAbsolutePath());
161            System.err.println(msg + ". Exiting program");
162            System.exit(1);
163        }
164        defaultIDs = readLongsFromFile(jobsForDefaultIndex);
165        currentJobs = new HashMap<String, IndexRequestMessage>();
166        handlers = new EnumMap<RequestType, FileBasedCache<Set<Long>>>(RequestType.class);
167        conn = JMSConnectionFactory.getInstance();
168        checkIflisteningTimer = new Timer();
169    }
170
171    private Set<Long> readLongsFromFile(File fileWithLongs) {
172        Set<Long> resultSet = new HashSet<Long>();
173        try {
174            LineIterator lineIterator = new LineIterator(new FileReader(fileWithLongs));
175            while (lineIterator.hasNext()) {
176                String line = lineIterator.next();
177                resultSet.add(Long.parseLong(line));
178            }
179        } catch (IOException e) {
180            log.error("Unable to read from file '{}'. Returns set of size {}", fileWithLongs.getAbsolutePath(),
181                    resultSet.size());
182        }
183
184        return resultSet;
185    }
186
187    /**
188     * Restore old requests from requestDir.
189     */
190    private void restoreRequestsfromRequestDir() {
191        if (!requestDir.exists()) {
192            log.info("requestdir not found: creating request dir");
193            if (!requestDir.mkdirs()) {
194                throw new IOFailure("Unable to create requestdir '" + requestDir.getAbsolutePath() + "'");
195            } else {
196                return; // requestdir was just created, so nothing to do
197            }
198        }
199
200        File[] requests = requestDir.listFiles();
201        // Fill up the currentJobs
202        for (File request : requests) {
203            if (request.isFile()) {
204                final IndexRequestMessage msg = restoreMessage(request);
205                synchronized (currentJobs) {
206                    if (!currentJobs.containsKey(msg.getID())) {
207                        currentJobs.put(msg.getID(), msg);
208                    } else {
209                        log.debug("Skipped message w/id='{}'. Already among current jobs", msg.getID());
210                        continue;
211                    }
212
213                }
214                // Start a new thread to handle the actual request.
215                new Thread() {
216                    public void run() {
217                        doGenerateIndex(msg);
218                    }
219                }.start();
220                log.info("Restarting indexjob w/ ID={}", msg.getID());
221            } else {
222                log.debug("Ignoring directory in requestdir: {}", request.getAbsolutePath());
223            }
224        }
225    }
226
227    /**
228     * Get the unique index request server instance.
229     *
230     * @return The index request server.
231     */
232    public static synchronized TestIndexRequestServer getInstance() {
233        if (instance == null) {
234            instance = new TestIndexRequestServer();
235        }
236
237        return instance;
238    }
239
240    /**
241     * Set handler for certain type of index request. If called more than once, new handler overwrites old one.
242     *
243     * @param t The type of index requested
244     * @param handler The handler that should handle this request.
245     */
246    public void setHandler(RequestType t, FileBasedCache<Set<Long>> handler) {
247        ArgumentNotValid.checkNotNull(t, "RequestType t");
248        ArgumentNotValid.checkNotNull(handler, "FileBasedCache<Set<Long>> handler");
249        log.info("Setting handler for RequestType: " + t);
250        handlers.put(t, handler);
251    }
252
253    /**
254     * Given a request for an index over a set of job ids, use a cache to try to create the index, Then reply result.
255     * <p>
256     * If for any reason not all requested jobs can be indexed, return the subset. The client can then retry with this
257     * subset, in order to get index of that subset.
258     * <p>
259     * Values read from the message in order to handle this: - Type of index requested - will use the index cache of
260     * this type - Set of job IDs - which jobs to generate index for
261     * <p>
262     * Values written to message before replying: - The subset indexed - may be the entire set. ALWAYS set unless reply
263     * !OK - File with index - ONLY if subset is entire set, the index requested.
264     * <p>
265     * This method should ALWAYS reply. May reply with not OK message if: - Message received was not OK - Request type
266     * is null or unknown in message - Set of job ids is null in message - Cache generation throws exception
267     *
268     * @param irMsg A message requesting an index.
269     * @throws ArgumentNotValid on null parameter
270     */
271    public synchronized void visit(final IndexRequestMessage irMsg) throws ArgumentNotValid {
272        ArgumentNotValid.checkNotNull(irMsg, "IndexRequestMessage irMsg");
273        // save new msg to requestDir
274        try {
275            saveMsg(irMsg);
276            synchronized (currentJobs) {
277                if (!currentJobs.containsKey(irMsg.getID())) {
278                    currentJobs.put(irMsg.getID(), irMsg);
279                } else {
280                    final String errMsg = "Should not happen. Skipping msg w/ id= '" + irMsg.getID() + "' "
281                            + "because already among current jobs. "
282                            + "Unable to initiate indexing. Sending failed message back to sender";
283                    log.warn(errMsg);
284                    irMsg.setNotOk(errMsg);
285                    JMSConnectionFactory.getInstance().reply(irMsg);
286                    return;
287                }
288            }
289            // Limit the number of concurrently indexing job
290            if (currentJobs.size() >= maxConcurrentJobs) {
291                if (isListening.get()) {
292                    conn.removeListener(Channels.getTheIndexServer(), this);
293                    isListening.set(false);
294                }
295            }
296
297            // Start a new thread to handle the actual request.
298            new Thread() {
299                public void run() {
300                    doGenerateIndex(irMsg);
301                }
302            }.start();
303            log.debug("Now {} indexing jobs in progress", currentJobs.size());
304        } catch (IOException e) {
305            final String errMsg = "Unable to initiate indexing. Send failed message back to sender: " + e;
306            log.warn(errMsg, e);
307            irMsg.setNotOk(errMsg);
308            JMSConnectionFactory.getInstance().reply(irMsg);
309        }
310    }
311
312    /**
313     * Save a IndexRequestMessage to disk.
314     *
315     * @param irMsg A message to store to disk
316     * @throws IOException Throws IOExecption, if unable to save message
317     */
318    private void saveMsg(IndexRequestMessage irMsg) throws IOException {
319        File dest = new File(requestDir, irMsg.getID());
320        log.debug("Storing message to {}", dest.getAbsolutePath());
321        // Writing message to file
322        ObjectOutputStream oos = null;
323        try {
324            FileOutputStream fos = new FileOutputStream(dest);
325            oos = new ObjectOutputStream(fos);
326            oos.writeObject(irMsg);
327        } finally {
328            IOUtils.closeQuietly(oos);
329        }
330    }
331
332    /**
333     * Restore message from serialized state.
334     *
335     * @param serializedObject the object stored as a file.
336     * @return the restored message.
337     */
338    private IndexRequestMessage restoreMessage(File serializedObject) {
339        Object obj = null;
340        ObjectInputStream ois = null;
341        try {
342            // Read the message from disk.
343            FileInputStream fis = new FileInputStream(serializedObject);
344
345            ois = new ObjectInputStream(fis);
346
347            obj = ois.readObject();
348        } catch (ClassNotFoundException e) {
349            throw new IllegalState("Not possible to read the stored message from file '"
350                    + serializedObject.getAbsolutePath() + "':", e);
351        } catch (IOException e) {
352            throw new IOFailure("Not possible to read the stored message from file '"
353                    + serializedObject.getAbsolutePath() + "':", e);
354        } finally {
355            IOUtils.closeQuietly(ois);
356        }
357
358        if (obj instanceof IndexRequestMessage) {
359            return (IndexRequestMessage) obj;
360        } else {
361            throw new IllegalState("The serialized message is not a " + IndexRequestMessage.class.getName() + " but a "
362                    + obj.getClass().getName());
363        }
364    }
365
366    /**
367     * Method that handles generating an index; supposed to be run in its own thread, because it blocks while the index
368     * is generated.
369     *
370     * @param irMsg A message requesting an index
371     * @see #visit(IndexRequestMessage)
372     */
373    private void doGenerateIndex(final IndexRequestMessage irMsg) {
374        final boolean mustReturnIndex = irMsg.mustReturnIndex();
375        try {
376            checkMessage(irMsg);
377            RequestType type = irMsg.getRequestType();
378            Set<Long> requestedJobIDs = irMsg.getRequestedJobs();
379
380            log.info("Generating an index of type '{}' for the jobs [{}]", type, StringUtils.conjoin(",", defaultIDs));
381
382            FileBasedCache<Set<Long>> handler = handlers.get(type);
383            Set<Long> foundIDs = handler.cache(defaultIDs);
384            if (!foundIDs.containsAll(defaultIDs)) {
385                defaultIDs = foundIDs;
386            }
387            irMsg.setFoundJobs(requestedJobIDs); // Say that everything was found
388
389            log.info("Returning default index");
390
391            File cacheFile = handler.getCacheFile(defaultIDs);
392
393            if (mustReturnIndex) { // return index now! (default behaviour)
394                if (cacheFile.isDirectory()) {
395                    // This cache uses multiple files stored in a directory,
396                    // so transfer them all.
397                    File[] cacheFiles = cacheFile.listFiles();
398                    List<RemoteFile> resultFiles = new ArrayList<RemoteFile>(cacheFiles.length);
399                    for (File f : cacheFiles) {
400                        resultFiles.add(RemoteFileFactory.getCopyfileInstance(f));
401                    }
402                    irMsg.setResultFiles(resultFiles);
403                } else {
404                    irMsg.setResultFile(RemoteFileFactory.getCopyfileInstance(cacheFile));
405                }
406            }
407
408        } catch (Throwable t) {
409            log.warn("Unable to generate index for jobs [{}]", StringUtils.conjoin(",", irMsg.getRequestedJobs()), t);
410            irMsg.setNotOk(t);
411        } finally {
412            // Remove job from currentJobs Set
413            synchronized (currentJobs) {
414                currentJobs.remove(irMsg.getID());
415            }
416            // delete stored message
417            deleteStoredMessage(irMsg);
418            String state = "failed";
419            if (irMsg.isOk()) {
420                state = "successful";
421            }
422            if (mustReturnIndex) {
423                log.info("Sending {} reply for IndexRequestMessage back to sender '{}'.", state, irMsg.getReplyTo());
424                JMSConnectionFactory.getInstance().reply(irMsg);
425            } else {
426                log.info("Sending{} IndexReadyMessage to Scheduler", state);
427                boolean isindexready = true;
428                if (state.equalsIgnoreCase("failed")) {
429                    isindexready = false;
430                }
431                if (alwaysReturnFalseMode) {
432                    log.info("Setting isindexready = false in return message");
433                    isindexready = false;
434                }
435                IndexReadyMessage irm = new IndexReadyMessage(irMsg.getHarvestId(), isindexready, irMsg.getReplyTo(),
436                        Channels.getTheIndexServer());
437                JMSConnectionFactory.getInstance().send(irm);
438            }
439        }
440    }
441
442    /**
443     * Deleted stored file for given message.
444     *
445     * @param irMsg a given IndexRequestMessage
446     */
447    private void deleteStoredMessage(IndexRequestMessage irMsg) {
448        File expectedSerializedFile = new File(requestDir, irMsg.getID());
449        log.debug("Trying to delete stored serialized message: {}", expectedSerializedFile.getAbsolutePath());
450        if (!expectedSerializedFile.exists()) {
451            log.warn("The file does not exist any more.");
452            return;
453        }
454        boolean deleted = FileUtils.remove(expectedSerializedFile);
455        if (!deleted) {
456            log.debug("The file '{}' was not deleted", expectedSerializedFile);
457        }
458    }
459
460    /**
461     * Helper method to check message properties. Will throw exceptions on any trouble.
462     *
463     * @param irMsg The message to check.
464     * @throws ArgumentNotValid If message is not OK, or if the list of jobs or the index request type is null.
465     * @throws UnknownID If the index request type is of a form that is unknown to the server.
466     */
467    private void checkMessage(final IndexRequestMessage irMsg) throws UnknownID, ArgumentNotValid {
468        ArgumentNotValid.checkTrue(irMsg.isOk(), "Message was not OK");
469        ArgumentNotValid.checkNotNull(irMsg.getRequestType(), "RequestType type");
470        ArgumentNotValid.checkNotNull(irMsg.getRequestedJobs(), "Set<Long> jobIDs");
471        if (handlers.get(irMsg.getRequestType()) == null) {
472            throw new UnknownID("No handler known for requesttype " + irMsg.getRequestType());
473        }
474    }
475
476    /** Releases the JMS-connection and resets the singleton. */
477    public void close() {
478        cleanup();
479    }
480
481    /** Releases the JMS-connection and resets the singleton. */
482    public void cleanup() {
483        // shutdown listening timer.
484        checkIflisteningTimer.cancel();
485        conn.removeListener(Channels.getTheIndexServer(), this);
486        handlers.clear();
487
488        if (instance != null) {
489            instance = null;
490        }
491    }
492
493    /**
494     * Look for stored messages to be preprocessed, and start processing those. And start the separate thread that
495     * decides if we should listen for index-requests.
496     */
497    public void start() {
498        restoreRequestsfromRequestDir();
499        log.info("{} indexing jobs in progress that was stored in requestdir: {}", currentJobs.size(),
500                requestDir.getAbsolutePath());
501
502        // Define and start thread to observe current jobs:
503        // Only job is to look at the isListening atomicBoolean.
504        // If not listening, check if we are ready to listen again.
505        TimerTask checkIfListening = new ListeningTask(this);
506        isListening.set(false);
507        checkIflisteningTimer.schedule(checkIfListening, 0L, listeningInterval);
508    }
509
510    /**
511     * Defines the task to repeatedly check the listening status. And begin listening again, if we are ready for more
512     * tasks.
513     */
514    private static class ListeningTask extends TimerTask {
515
516        /** The indexrequestserver this task is associated with. */
517        private TestIndexRequestServer thisIrs;
518
519        /**
520         * Constructor for the ListeningTask.
521         *
522         * @param irs The indexrequestserver this task should be associated with
523         */
524        ListeningTask(TestIndexRequestServer irs) {
525            thisIrs = irs;
526        }
527
528        @Override
529        public void run() {
530            log.trace("Checking if we should be listening again");
531            if (!isListening.get()) {
532                if (maxConcurrentJobs > currentJobs.size()) {
533                    log.info("Enabling listening to the indexserver channel '{}'", Channels.getTheIndexServer());
534                    conn.setListener(Channels.getTheIndexServer(), thisIrs);
535                    isListening.set(true);
536                }
537            }
538        }
539
540    }
541
542}