001/*
002 * #%L
003 * Netarchivesuite - harvester
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.harvester.indexserver.distribute;
024
025import java.io.File;
026import java.io.FileInputStream;
027import java.io.FileOutputStream;
028import java.io.IOException;
029import java.io.ObjectInputStream;
030import java.io.ObjectOutputStream;
031import java.util.ArrayList;
032import java.util.EnumMap;
033import java.util.HashMap;
034import java.util.HashSet;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038import java.util.Timer;
039import java.util.TimerTask;
040import java.util.concurrent.atomic.AtomicBoolean;
041
042import org.apache.commons.io.IOUtils;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import dk.netarkivet.common.distribute.Channels;
047import dk.netarkivet.common.distribute.JMSConnection;
048import dk.netarkivet.common.distribute.JMSConnectionFactory;
049import dk.netarkivet.common.distribute.RemoteFile;
050import dk.netarkivet.common.distribute.RemoteFileFactory;
051import dk.netarkivet.common.distribute.RemoteFileSettings;
052import dk.netarkivet.common.distribute.indexserver.RequestType;
053import dk.netarkivet.common.exceptions.ArgumentNotValid;
054import dk.netarkivet.common.exceptions.IOFailure;
055import dk.netarkivet.common.exceptions.IllegalState;
056import dk.netarkivet.common.exceptions.UnknownID;
057import dk.netarkivet.common.utils.ChecksumCalculator;
058import dk.netarkivet.common.utils.CleanupIF;
059import dk.netarkivet.common.utils.FileUtils;
060import dk.netarkivet.common.utils.Settings;
061import dk.netarkivet.common.utils.StringUtils;
062import dk.netarkivet.harvester.HarvesterSettings;
063import dk.netarkivet.harvester.distribute.HarvesterMessageHandler;
064import dk.netarkivet.harvester.distribute.IndexReadyMessage;
065import dk.netarkivet.harvester.indexserver.FileBasedCache;
066import dk.netarkivet.harvester.indexserver.IndexRequestServerInterface;
067
068/**
069 * Index request server singleton.
070 * <p>
071 * This class contains a singleton that handles requesting an index over JMS.
072 * <p>
073 * It will ALWAYS reply to such messages, either with the index, a message telling that only a subset is available, and
074 * which, or an error message,
075 */
076public final class IndexRequestServer extends HarvesterMessageHandler implements CleanupIF, IndexRequestServerInterface {
077
078    /** The class logger. */
079    private static final Logger log = LoggerFactory.getLogger(IndexRequestServer.class);
080
081    /** The unique instance. */
082    private static IndexRequestServer instance;
083    /** The handlers for index request types. */
084    private Map<RequestType, FileBasedCache<Set<Long>>> handlers;
085
086    /** The connection to the JMSBroker. */
087    private static JMSConnection conn;
088    /** A set with the current indexing jobs in progress. */
089    private static Map<String, IndexRequestMessage> currentJobs;
090
091    /** The max number of concurrent jobs. */
092    private static long maxConcurrentJobs;
093    /** Are we listening, now. */
094    private static AtomicBoolean isListening = new AtomicBoolean();
095
096    /** Interval in milliseconds between listening checks. */
097    private static long listeningInterval;
098    /** The timer that initiates the checkIflisteningTask. */
099    private Timer checkIflisteningTimer = new Timer();
100
101    /** satisfactoryThreshold percentage as an integer. */
102    private int satisfactoryThresholdPercentage;
103
104    /**
105     * The directory to store backup copies of the currentJobs. In case of the indexserver crashing.
106     */
107    private File requestDir;
108
109    /**
110     * Initialise index request server with no handlers, listening to the index JMS channel.
111     */
112    private IndexRequestServer() {
113        maxConcurrentJobs = Settings.getLong(HarvesterSettings.INDEXSERVER_INDEXING_MAXCLIENTS);
114        requestDir = Settings.getFile(HarvesterSettings.INDEXSERVER_INDEXING_REQUESTDIR);
115        listeningInterval = Settings.getLong(HarvesterSettings.INDEXSERVER_INDEXING_LISTENING_INTERVAL);
116        satisfactoryThresholdPercentage = Settings
117                .getInt(HarvesterSettings.INDEXSERVER_INDEXING_SATISFACTORYTHRESHOLD_PERCENTAGE);
118
119        currentJobs = new HashMap<String, IndexRequestMessage>();
120        handlers = new EnumMap<RequestType, FileBasedCache<Set<Long>>>(RequestType.class);
121        conn = JMSConnectionFactory.getInstance();
122        checkIflisteningTimer = new Timer();
123    }
124
125    /**
126     * Restore old requests from requestDir.
127     */
128    private void restoreRequestsfromRequestDir() {
129        if (!requestDir.exists()) {
130            log.info("requestdir not found: creating request dir");
131            if (!requestDir.mkdirs()) {
132                throw new IOFailure("Unable to create requestdir '" + requestDir.getAbsolutePath() + "'");
133            } else {
134                return; // requestdir was just created, so nothing to do
135            }
136        }
137
138        File[] requests = requestDir.listFiles();
139        // Fill up the currentJobs
140        for (File request : requests) {
141            if (request.isFile()) {
142                final IndexRequestMessage msg = restoreMessage(request);
143                synchronized (currentJobs) {
144                    if (!currentJobs.containsKey(msg.getID())) {
145                        currentJobs.put(msg.getID(), msg);
146                    } else {
147                        log.debug("Skipped message w/id='{}'. Already among current jobs", msg.getID());
148                        continue;
149                    }
150
151                }
152                // Start a new thread to handle the actual request.
153                new Thread() {
154                    public void run() {
155                        doProcessIndexRequestMessage(msg);
156                    }
157                }.start();
158                log.info("Restarting indexjob w/ ID={}", msg.getID());
159            } else {
160                log.debug("Ignoring directory in requestdir: " + request.getAbsolutePath());
161            }
162        }
163    }
164
165    /**
166     * Get the unique index request server instance.
167     *
168     * @return The index request server.
169     */
170    public static synchronized IndexRequestServer getInstance() {
171        if (instance == null) {
172            instance = new IndexRequestServer();
173        }
174
175        return instance;
176    }
177
178    /**
179     * Set handler for certain type of index request. If called more than once, new handler overwrites old one.
180     *
181     * @param t The type of index requested
182     * @param handler The handler that should handle this request.
183     */
184    public void setHandler(RequestType t, FileBasedCache<Set<Long>> handler) {
185        ArgumentNotValid.checkNotNull(t, "RequestType t");
186        ArgumentNotValid.checkNotNull(handler, "FileBasedCache<Set<Long>> handler");
187        log.info("Setting handler for RequestType: {}", t);
188        handlers.put(t, handler);
189    }
190
191    /**
192     * Given a request for an index over a set of job ids, use a cache to try to create the index, Then reply result.
193     * <p>
194     * If for any reason not all requested jobs can be indexed, return the subset. The client can then retry with this
195     * subset, in order to get index of that subset.
196     * <p>
197     * Values read from the message in order to handle this: - Type of index requested - will use the index cache of
198     * this type - Set of job IDs - which jobs to generate index for
199     * <p>
200     * Values written to message before replying: - The subset indexed - may be the entire set. ALWAYS set unless reply
201     * !OK - File with index - ONLY if subset is entire set, the index requested.
202     * <p>
203     * This method should ALWAYS reply. May reply with not OK message if: - Message received was not OK - Request type
204     * is null or unknown in message - Set of job ids is null in message - Cache generation throws exception
205     *
206     * @param irMsg A message requesting an index.
207     * @throws ArgumentNotValid on null parameter
208     */
209    public synchronized void visit(final IndexRequestMessage irMsg) throws ArgumentNotValid {
210        ArgumentNotValid.checkNotNull(irMsg, "IndexRequestMessage irMsg");
211        // save new msg to requestDir
212        try {
213            saveMsg(irMsg);
214            synchronized (currentJobs) {
215                if (!currentJobs.containsKey(irMsg.getID())) {
216                    currentJobs.put(irMsg.getID(), irMsg);
217                } else {
218                    final String errMsg = "Should not happen. Skipping msg w/ id= '" + irMsg.getID()
219                            + "' because already among current jobs. "
220                            + "Unable to initiate indexing. Sending failed message back to sender";
221                    log.warn(errMsg);
222                    irMsg.setNotOk(errMsg);
223                    JMSConnectionFactory.getInstance().reply(irMsg);
224                    return;
225                }
226            }
227            // Limit the number of concurrently indexing job
228            if (currentJobs.size() >= maxConcurrentJobs) {
229                if (isListening.get()) {
230                    conn.removeListener(Channels.getTheIndexServer(), this);
231                    isListening.set(false);
232                }
233            }
234
235            // Start a new thread to handle the actual request.
236            new Thread() {
237                public void run() {
238                    doProcessIndexRequestMessage(irMsg);
239                }
240            }.start();
241            log.debug("Now {} indexing jobs in progress", currentJobs.size());
242        } catch (IOException e) {
243            final String errMsg = "Unable to initiate indexing. Send failed message back to sender: " + e;
244            log.warn(errMsg, e);
245            irMsg.setNotOk(errMsg);
246            JMSConnectionFactory.getInstance().reply(irMsg);
247        }
248    }
249
250    /**
251     * Save a IndexRequestMessage to disk.
252     *
253     * @param irMsg A message to store to disk
254     * @throws IOException Throws IOExecption, if unable to save message
255     */
256    private void saveMsg(IndexRequestMessage irMsg) throws IOException {
257        File dest = new File(requestDir, irMsg.getID());
258        log.debug("Storing message to {}", dest.getAbsolutePath());
259        // Writing message to file
260        ObjectOutputStream oos = null;
261        try {
262            FileOutputStream fos = new FileOutputStream(dest);
263            oos = new ObjectOutputStream(fos);
264            oos.writeObject(irMsg);
265        } finally {
266            IOUtils.closeQuietly(oos);
267        }
268
269    }
270
271    /**
272     * Restore message from serialized state.
273     *
274     * @param serializedObject the object stored as a file.
275     * @return the restored message.
276     */
277    private IndexRequestMessage restoreMessage(File serializedObject) {
278        Object obj = null;
279        ObjectInputStream ois = null;
280        try {
281            // Read the message from disk.
282            FileInputStream fis = new FileInputStream(serializedObject);
283            ois = new ObjectInputStream(fis);
284
285            obj = ois.readObject();
286        } catch (ClassNotFoundException e) {
287            throw new IllegalState("Not possible to read the stored message from file '"
288                    + serializedObject.getAbsolutePath() + "':", e);
289        } catch (IOException e) {
290            throw new IOFailure("Not possible to read the stored message from file '"
291                    + serializedObject.getAbsolutePath() + "':", e);
292        } finally {
293            IOUtils.closeQuietly(ois);
294        }
295
296        if (obj instanceof IndexRequestMessage) {
297            return (IndexRequestMessage) obj;
298        } else {
299            throw new IllegalState("The serialized message is not a " + IndexRequestMessage.class.getName() + " but a "
300                    + obj.getClass().getName());
301        }
302    }
303
304    /**
305     * Method that handles the processing of an indexRequestMessage. Returns the requested index immediately, if already
306     * available, otherwise proceeds with the index generation of the requested index. Must be run in its own thread,
307     * because it blocks while the index is generated.
308     *
309     * @param irMsg A message requesting an index
310     * @see #visit(IndexRequestMessage)
311     */
312    private void doProcessIndexRequestMessage(final IndexRequestMessage irMsg) {
313        final boolean mustReturnIndex = irMsg.mustReturnIndex();
314        try {
315            checkMessage(irMsg);
316            RequestType type = irMsg.getRequestType();
317            Set<Long> jobIDs = irMsg.getRequestedJobs();
318
319            if (log.isInfoEnabled()) {
320                log.info("Request received for an index of type '{}' for the {} jobs [{}]", type, jobIDs.size(),
321                        StringUtils.conjoin(",", jobIDs));
322            }
323            FileBasedCache<Set<Long>> handler = handlers.get(type);
324
325            // Here we need to make sure that we don't accidentally process more than
326            // one message at the time before the whole process is over
327            List<Long> sortedList = new ArrayList<Long>(jobIDs);
328            String allIDsString = StringUtils.conjoin("-", sortedList);
329            String checksum = ChecksumCalculator.calculateMd5(allIDsString.getBytes());
330            log.debug("Waiting to enter the synchronization zone for the indexing job of size {} with checksum '{}'",
331                    jobIDs.size(), checksum);
332            // Begin synchronization
333            synchronized (checksum.intern()) {
334                log.debug("The indexing job of size {} with checksum '{}' is now in the synchronization zone",
335                        jobIDs.size(), checksum);
336                Set<Long> foundIDs = handler.cache(jobIDs);
337                irMsg.setFoundJobs(foundIDs);
338                if (foundIDs.equals(jobIDs)) {
339                    if (log.isInfoEnabled()) {
340                        log.info("Retrieved successfully index of type '{}' for the {} jobs [{}]", type, jobIDs.size(),
341                                StringUtils.conjoin(",", jobIDs));
342                    }
343                    File cacheFile = handler.getCacheFile(jobIDs);
344                    if (mustReturnIndex) { // return index now!
345                        packageResultFiles(irMsg, cacheFile);
346                    }
347                } else if (satisfactoryTresholdReached(foundIDs, jobIDs)) {
348                    log.info("Data for full index w/ {} jobs not available. Only found data for {} jobs - "
349                            + "but satisfactoryTreshold reached, so assuming presence of all data", jobIDs.size(),
350                            foundIDs.size());
351                    // Make sure that the index of the data available is generated
352                    Set<Long> theFoundIDs = handler.cache(foundIDs);
353                    // TheFoundIDS should be identical to foundIDs
354                    // Lets make sure of that
355                    Set<Long> diffSet = new HashSet<Long>(foundIDs);
356                    diffSet.removeAll(theFoundIDs);
357
358                    // Make a copy of the index available, and give it the name of
359                    // the index cache file wanted.
360                    File cacheFileWanted = handler.getCacheFile(jobIDs);
361                    File cacheFileCreated = handler.getCacheFile(foundIDs);
362
363                    log.info("Satisfactory threshold reached - copying index {} '{}' to full index: {}",
364                            (cacheFileCreated.isDirectory() ? "dir" : "file"), cacheFileCreated.getAbsolutePath(),
365                            cacheFileWanted.getAbsolutePath());
366                    if (cacheFileCreated.isDirectory()) {
367                        // create destination cacheFileWanted, and
368                        // copy all files in cacheFileCreated to cacheFileWanted.
369                        cacheFileWanted.mkdirs();
370                        FileUtils.copyDirectory(cacheFileCreated, cacheFileWanted);
371                    } else {
372                        FileUtils.copyFile(cacheFileCreated, cacheFileWanted);
373                    }
374
375                    // TODO This delete-operation commented out, because it is deemed too dangerous,
376                    // as the cachedir represented by cacheFileCreated may still be used
377
378                    // log.info("Deleting the temporary index "
379                    // + cacheFileCreated.getAbsolutePath());
380                    // FileUtils.removeRecursively(cacheFileCreated);
381                    log.info("We keep the index '{}', as we don't know if anybody is using it",
382                            cacheFileCreated.getAbsolutePath());
383
384                    // Information needed by recipient to store index in local cache
385                    irMsg.setFoundJobs(jobIDs);
386                    if (mustReturnIndex) { // return index now.
387                        packageResultFiles(irMsg, cacheFileWanted);
388                    }
389                } else {
390                    Set<Long> missingJobIds = new HashSet<Long>(jobIDs);
391                    missingJobIds.removeAll(foundIDs);
392                    log.warn("Failed generating index of type '{}' for the jobs [{}]. Missing data for jobs [{}].",
393                            type, StringUtils.conjoin(",", jobIDs), StringUtils.conjoin(",", missingJobIds));
394                }
395
396            } // End of synchronization block
397        } catch (Throwable t) {
398            log.warn("Unable to generate index for jobs [" + StringUtils.conjoin(",", irMsg.getRequestedJobs()) + "]",
399                    t);
400            irMsg.setNotOk(t);
401        } finally {
402            // Remove job from currentJobs Set
403            synchronized (currentJobs) {
404                currentJobs.remove(irMsg.getID());
405            }
406            // delete stored message
407            deleteStoredMessage(irMsg);
408            String state = "failed";
409            if (irMsg.isOk()) {
410                state = "successful";
411            }
412            if (mustReturnIndex) {
413                log.info("Sending {} reply for IndexRequestMessage back to sender '{}'.", state, irMsg.getReplyTo());
414                JMSConnectionFactory.getInstance().reply(irMsg);
415            } else {
416                log.info("Sending {} IndexReadyMessage to Scheduler for harvest {}", state, irMsg.getHarvestId());
417                boolean isindexready = true;
418                if (state.equalsIgnoreCase("failed")) {
419                    isindexready = false;
420                }
421                IndexReadyMessage irm = new IndexReadyMessage(irMsg.getHarvestId(), isindexready, irMsg.getReplyTo(),
422                        Channels.getTheIndexServer());
423                JMSConnectionFactory.getInstance().send(irm);
424            }
425        }
426    }
427
428    /**
429     * Package the result files with the message reply.
430     *
431     * @param irMsg the message being answered
432     * @param cacheFile The location of the result on disk.
433     */
434    private void packageResultFiles(IndexRequestMessage irMsg, File cacheFile) {
435        RemoteFileSettings connectionParams = irMsg.getRemoteFileSettings();
436
437        if (connectionParams != null) {
438            log.debug("Trying to use client supplied RemoteFileServer: {}", connectionParams.getServerName());
439        }
440        if (cacheFile.isDirectory()) {
441            // This cache uses multiple files stored in a directory,
442            // so transfer them all.
443            File[] cacheFiles = cacheFile.listFiles();
444            List<RemoteFile> resultFiles = new ArrayList<RemoteFile>(cacheFiles.length);
445            for (File f : cacheFiles) {
446                resultFiles.add(RemoteFileFactory.getCopyfileInstance(f, irMsg.getRemoteFileSettings()));
447            }
448            irMsg.setResultFiles(resultFiles);
449        } else {
450            irMsg.setResultFile(RemoteFileFactory.getCopyfileInstance(cacheFile, irMsg.getRemoteFileSettings()));
451        }
452    }
453
454    /**
455     * Threshold for when the created index contains enough data to be considered a satisfactory index. Uses the
456     * {@link IndexRequestServer#satisfactoryThresholdPercentage}.
457     *
458     * @param foundIDs The list of IDs contained in the index
459     * @param requestedIDs The list of IDs requested in the index.
460     * @return true, if the ratio foundIDs/requestedIDs is above the
461     * {@link IndexRequestServer#satisfactoryThresholdPercentage}.
462     */
463    private boolean satisfactoryTresholdReached(Set<Long> foundIDs, Set<Long> requestedIDs) {
464        int jobsRequested = requestedIDs.size();
465        int jobsFound = foundIDs.size();
466        int percentage = (jobsFound * 100) / jobsRequested;
467        if (percentage > satisfactoryThresholdPercentage) {
468            return true;
469        } else {
470            return false;
471        }
472    }
473
474    /**
475     * Deleted stored file for given message.
476     *
477     * @param irMsg a given IndexRequestMessage
478     */
479    private void deleteStoredMessage(IndexRequestMessage irMsg) {
480        File expectedSerializedFile = new File(requestDir, irMsg.getID());
481        log.debug("Trying to delete stored serialized message: {}", expectedSerializedFile.getAbsolutePath());
482        if (!expectedSerializedFile.exists()) {
483            log.warn("The file does not exist any more.");
484            return;
485        }
486        boolean deleted = FileUtils.remove(expectedSerializedFile);
487        if (!deleted) {
488            log.debug("The file '{}' was not deleted", expectedSerializedFile);
489        }
490    }
491
492    /**
493     * Helper method to check message properties. Will throw exceptions on any trouble.
494     *
495     * @param irMsg The message to check.
496     * @throws ArgumentNotValid If message is not OK, or if the list of jobs or the index request type is null.
497     * @throws UnknownID If the index request type is of a form that is unknown to the server.
498     */
499    private void checkMessage(final IndexRequestMessage irMsg) throws UnknownID, ArgumentNotValid {
500        ArgumentNotValid.checkTrue(irMsg.isOk(), "Message was not OK");
501        ArgumentNotValid.checkNotNull(irMsg.getRequestType(), "RequestType type");
502        ArgumentNotValid.checkNotNull(irMsg.getRequestedJobs(), "Set<Long> jobIDs");
503        if (handlers.get(irMsg.getRequestType()) == null) {
504            throw new UnknownID("No handler known for requesttype " + irMsg.getRequestType());
505        }
506    }
507
508    /** Releases the JMS-connection and resets the singleton. */
509    public void close() {
510        cleanup();
511    }
512
513    /** Releases the JMS-connection and resets the singleton. */
514    public void cleanup() {
515        // shutdown listening timer.
516        checkIflisteningTimer.cancel();
517        conn.removeListener(Channels.getTheIndexServer(), this);
518        handlers.clear();
519
520        if (instance != null) {
521            instance = null;
522        }
523    }
524
525    /**
526     * Look for stored messages to be preprocessed, and start processing those. And start the separate thread that
527     * decides if we should listen for index-requests.
528     */
529    public void start() {
530        restoreRequestsfromRequestDir();
531        log.info("{} indexing jobs in progress that was stored in requestdir: {}", currentJobs.size(),
532                requestDir.getAbsolutePath());
533
534        // Define and start thread to observe current jobs:
535        // Only job is to look at the isListening atomicBoolean.
536        // If not listening, check if we are ready to listen again.
537        TimerTask checkIfListening = new ListeningTask(this);
538        isListening.set(false);
539        checkIflisteningTimer.schedule(checkIfListening, 0L, listeningInterval);
540    }
541
542    /**
543     * Defines the task to repeatedly check the listening status. And begin listening again, if we are ready for more
544     * tasks.
545     */
546    private static class ListeningTask extends TimerTask {
547        /** The indexrequestserver this task is associated with. */
548        private IndexRequestServer thisIrs;
549
550        /**
551         * Constructor for the ListeningTask.
552         *
553         * @param irs The indexrequestserver this task should be associated with
554         */
555        ListeningTask(IndexRequestServer irs) {
556            thisIrs = irs;
557        }
558
559        @Override
560        public void run() {
561            log.trace("Checking if we should be listening again");
562            if (!isListening.get()) {
563                if (maxConcurrentJobs > currentJobs.size()) {
564                    log.info("Enabling listening to the indexserver channel '{}'", Channels.getTheIndexServer());
565                    conn.setListener(Channels.getTheIndexServer(), thisIrs);
566                    isListening.set(true);
567                }
568            }
569        }
570
571    }
572
573}