package dk.netarkivet.harvester.indexserver.distribute;

import dk.netarkivet.common.distribute.Channels;
import dk.netarkivet.common.distribute.JMSConnection;
import dk.netarkivet.common.distribute.JMSConnectionFactory;
import dk.netarkivet.common.distribute.RemoteFileFactory;
import dk.netarkivet.common.distribute.indexserver.RequestType;
import dk.netarkivet.common.exceptions.ArgumentNotValid;
import dk.netarkivet.common.exceptions.IOFailure;
import dk.netarkivet.common.exceptions.IllegalState;
import dk.netarkivet.common.exceptions.UnknownID;
import dk.netarkivet.common.utils.CleanupIF;
import dk.netarkivet.common.utils.FileUtils;
import dk.netarkivet.common.utils.Settings;
import dk.netarkivet.common.utils.StringUtils;
import dk.netarkivet.harvester.HarvesterSettings;
import dk.netarkivet.harvester.distribute.HarvesterMessageHandler;
import dk.netarkivet.harvester.distribute.IndexReadyMessage;
import dk.netarkivet.harvester.indexserver.FileBasedCache;
import dk.netarkivet.harvester.indexserver.IndexRequestServerInterface;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator;
import org.archive.url.UsableURIFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dk/netarkivet/harvester/indexserver/distribute/TestIndexRequestServer.class */
public final class TestIndexRequestServer extends HarvesterMessageHandler implements CleanupIF, IndexRequestServerInterface {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) TestIndexRequestServer.class);
    private static String defaultSettingsClasspath = "dk/netarkivet/harvester/indexserver/distribute/TestIndexRequestServerSettings.xml";
    public static String JOBS_FOR_TESTINDEX;
    public static String ALWAYS_SET_ISINDEX_READY_TO_FALSE;
    private static TestIndexRequestServer instance;
    private Map<RequestType, FileBasedCache<Set<Long>>> handlers;
    private static JMSConnection conn;
    private static Map<String, IndexRequestMessage> currentJobs;
    private static long maxConcurrentJobs;
    private static AtomicBoolean isListening;
    private static long listeningInterval;
    private Timer checkIflisteningTimer;
    private File jobsForDefaultIndex;
    private boolean alwaysReturnFalseMode;
    private Set<Long> defaultIDs;
    private File requestDir;

    /* loaded from: input_file:dk/netarkivet/harvester/indexserver/distribute/TestIndexRequestServer$ListeningTask.class */
    private static class ListeningTask extends TimerTask {
        private TestIndexRequestServer thisIrs;

        ListeningTask(TestIndexRequestServer testIndexRequestServer) {
            this.thisIrs = testIndexRequestServer;
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            TestIndexRequestServer.log.trace("Checking if we should be listening again");
            if (TestIndexRequestServer.isListening.get() || TestIndexRequestServer.maxConcurrentJobs <= TestIndexRequestServer.currentJobs.size()) {
                return;
            }
            TestIndexRequestServer.log.info("Enabling listening to the indexserver channel '{}'", Channels.getTheIndexServer());
            TestIndexRequestServer.conn.setListener(Channels.getTheIndexServer(), this.thisIrs);
            TestIndexRequestServer.isListening.set(true);
        }
    }

    private TestIndexRequestServer() {
        this.checkIflisteningTimer = new Timer();
        this.alwaysReturnFalseMode = false;
        maxConcurrentJobs = Settings.getLong(HarvesterSettings.INDEXSERVER_INDEXING_MAXCLIENTS);
        this.requestDir = Settings.getFile(HarvesterSettings.INDEXSERVER_INDEXING_REQUESTDIR);
        listeningInterval = Settings.getLong(HarvesterSettings.INDEXSERVER_INDEXING_LISTENING_INTERVAL);
        this.alwaysReturnFalseMode = Settings.getBoolean(ALWAYS_SET_ISINDEX_READY_TO_FALSE);
        if (this.alwaysReturnFalseMode) {
            log.info("alwaysSetIsIndexReadyToFalse is true");
        } else {
            log.info("alwaysSetIsIndexReadyToFalse is false");
        }
        this.jobsForDefaultIndex = Settings.getFile(JOBS_FOR_TESTINDEX);
        if (!this.jobsForDefaultIndex.exists()) {
            String str = "The file '" + this.jobsForDefaultIndex.getAbsolutePath() + "' does not exist";
            log.error("The file containing job identifiers for default index '{}' does not exist", this.jobsForDefaultIndex.getAbsolutePath());
            System.err.println(str + ". Exiting program");
            System.exit(1);
        }
        this.defaultIDs = readLongsFromFile(this.jobsForDefaultIndex);
        currentJobs = new HashMap();
        this.handlers = new EnumMap(RequestType.class);
        conn = JMSConnectionFactory.getInstance();
        this.checkIflisteningTimer = new Timer();
    }

    private Set<Long> readLongsFromFile(File file) {
        HashSet hashSet = new HashSet();
        try {
            LineIterator lineIterator = new LineIterator(new FileReader(file));
            while (lineIterator.hasNext()) {
                hashSet.add(Long.valueOf(Long.parseLong(lineIterator.next())));
            }
        } catch (IOException e) {
            log.error("Unable to read from file '{}'. Returns set of size {}", file.getAbsolutePath(), Integer.valueOf(hashSet.size()));
        }
        return hashSet;
    }

    /* JADX WARN: Type inference failed for: r0v28, types: [dk.netarkivet.harvester.indexserver.distribute.TestIndexRequestServer$1] */
    private void restoreRequestsfromRequestDir() {
        if (!this.requestDir.exists()) {
            log.info("requestdir not found: creating request dir");
            if (!this.requestDir.mkdirs()) {
                throw new IOFailure("Unable to create requestdir '" + this.requestDir.getAbsolutePath() + UsableURIFactory.SQUOT);
            }
            return;
        }
        for (File file : this.requestDir.listFiles()) {
            if (file.isFile()) {
                final IndexRequestMessage restoreMessage = restoreMessage(file);
                synchronized (this) {
                    if (currentJobs.containsKey(restoreMessage.getID())) {
                        log.debug("Skipped message w/id='{}'. Already among current jobs", restoreMessage.getID());
                    } else {
                        currentJobs.put(restoreMessage.getID(), restoreMessage);
                        new Thread() { // from class: dk.netarkivet.harvester.indexserver.distribute.TestIndexRequestServer.1
                            @Override // java.lang.Thread, java.lang.Runnable
                            public void run() {
                                TestIndexRequestServer.this.doGenerateIndex(restoreMessage);
                            }
                        }.start();
                        log.info("Restarting indexjob w/ ID={}", restoreMessage.getID());
                    }
                }
            } else {
                log.debug("Ignoring directory in requestdir: {}", file.getAbsolutePath());
            }
        }
    }

    public static synchronized TestIndexRequestServer getInstance() {
        if (instance == null) {
            instance = new TestIndexRequestServer();
        }
        return instance;
    }

    @Override // dk.netarkivet.harvester.indexserver.IndexRequestServerInterface
    public void setHandler(RequestType requestType, FileBasedCache<Set<Long>> fileBasedCache) {
        ArgumentNotValid.checkNotNull(requestType, "RequestType t");
        ArgumentNotValid.checkNotNull(fileBasedCache, "FileBasedCache<Set<Long>> handler");
        log.info("Setting handler for RequestType: " + requestType);
        this.handlers.put(requestType, fileBasedCache);
    }

    /* JADX WARN: Type inference failed for: r0v30, types: [dk.netarkivet.harvester.indexserver.distribute.TestIndexRequestServer$2] */
    @Override // dk.netarkivet.harvester.distribute.HarvesterMessageHandler, dk.netarkivet.harvester.distribute.HarvesterMessageVisitor
    public synchronized void visit(final IndexRequestMessage indexRequestMessage) throws ArgumentNotValid {
        ArgumentNotValid.checkNotNull(indexRequestMessage, "IndexRequestMessage irMsg");
        try {
            saveMsg(indexRequestMessage);
            synchronized (this) {
                if (currentJobs.containsKey(indexRequestMessage.getID())) {
                    String str = "Should not happen. Skipping msg w/ id= '" + indexRequestMessage.getID() + "' because already among current jobs. Unable to initiate indexing. Sending failed message back to sender";
                    log.warn(str);
                    indexRequestMessage.setNotOk(str);
                    JMSConnectionFactory.getInstance().reply(indexRequestMessage);
                    return;
                }
                currentJobs.put(indexRequestMessage.getID(), indexRequestMessage);
                if (currentJobs.size() >= maxConcurrentJobs && isListening.get()) {
                    conn.removeListener(Channels.getTheIndexServer(), this);
                    isListening.set(false);
                }
                new Thread() { // from class: dk.netarkivet.harvester.indexserver.distribute.TestIndexRequestServer.2
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        TestIndexRequestServer.this.doGenerateIndex(indexRequestMessage);
                    }
                }.start();
                log.debug("Now {} indexing jobs in progress", Integer.valueOf(currentJobs.size()));
            }
        } catch (IOException e) {
            String str2 = "Unable to initiate indexing. Send failed message back to sender: " + e;
            log.warn(str2, (Throwable) e);
            indexRequestMessage.setNotOk(str2);
            JMSConnectionFactory.getInstance().reply(indexRequestMessage);
        }
    }

    private void saveMsg(IndexRequestMessage indexRequestMessage) throws IOException {
        File file = new File(this.requestDir, indexRequestMessage.getID());
        log.debug("Storing message to {}", file.getAbsolutePath());
        ObjectOutputStream objectOutputStream = null;
        try {
            objectOutputStream = new ObjectOutputStream(new FileOutputStream(file));
            objectOutputStream.writeObject(indexRequestMessage);
            IOUtils.closeQuietly((OutputStream) objectOutputStream);
        } catch (Throwable th) {
            IOUtils.closeQuietly((OutputStream) objectOutputStream);
            throw th;
        }
    }

    private IndexRequestMessage restoreMessage(File file) {
        ObjectInputStream objectInputStream = null;
        try {
            try {
                try {
                    objectInputStream = new ObjectInputStream(new FileInputStream(file));
                    Object readObject = objectInputStream.readObject();
                    IOUtils.closeQuietly((InputStream) objectInputStream);
                    if (readObject instanceof IndexRequestMessage) {
                        return (IndexRequestMessage) readObject;
                    }
                    throw new IllegalState("The serialized message is not a " + IndexRequestMessage.class.getName() + " but a " + readObject.getClass().getName());
                } catch (ClassNotFoundException e) {
                    throw new IllegalState("Not possible to read the stored message from file '" + file.getAbsolutePath() + "':", e);
                }
            } catch (IOException e2) {
                throw new IOFailure("Not possible to read the stored message from file '" + file.getAbsolutePath() + "':", e2);
            }
        } catch (Throwable th) {
            IOUtils.closeQuietly((InputStream) objectInputStream);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doGenerateIndex(IndexRequestMessage indexRequestMessage) {
        boolean mustReturnIndex = indexRequestMessage.mustReturnIndex();
        try {
            try {
                checkMessage(indexRequestMessage);
                RequestType requestType = indexRequestMessage.getRequestType();
                Set<Long> requestedJobs = indexRequestMessage.getRequestedJobs();
                log.info("Generating an index of type '{}' for the jobs [{}]", requestType, StringUtils.conjoin(",", this.defaultIDs));
                FileBasedCache<Set<Long>> fileBasedCache = this.handlers.get(requestType);
                Set<Long> cache = fileBasedCache.cache(this.defaultIDs);
                if (!cache.containsAll(this.defaultIDs)) {
                    this.defaultIDs = cache;
                }
                indexRequestMessage.setFoundJobs(requestedJobs);
                log.info("Returning default index");
                File cacheFile = fileBasedCache.getCacheFile(this.defaultIDs);
                if (mustReturnIndex) {
                    if (cacheFile.isDirectory()) {
                        File[] listFiles = cacheFile.listFiles();
                        ArrayList arrayList = new ArrayList(listFiles.length);
                        for (File file : listFiles) {
                            arrayList.add(RemoteFileFactory.getCopyfileInstance(file));
                        }
                        indexRequestMessage.setResultFiles(arrayList);
                    } else {
                        indexRequestMessage.setResultFile(RemoteFileFactory.getCopyfileInstance(cacheFile));
                    }
                }
                synchronized (this) {
                    currentJobs.remove(indexRequestMessage.getID());
                }
                deleteStoredMessage(indexRequestMessage);
                String str = indexRequestMessage.isOk() ? "successful" : "failed";
                if (mustReturnIndex) {
                    log.info("Sending {} reply for IndexRequestMessage back to sender '{}'.", str, indexRequestMessage.getReplyTo());
                    JMSConnectionFactory.getInstance().reply(indexRequestMessage);
                    return;
                }
                log.info("Sending{} IndexReadyMessage to Scheduler", str);
                boolean z = str.equalsIgnoreCase("failed") ? false : true;
                if (this.alwaysReturnFalseMode) {
                    log.info("Setting isindexready = false in return message");
                    z = false;
                }
                JMSConnectionFactory.getInstance().send(new IndexReadyMessage(indexRequestMessage.getHarvestId(), z, indexRequestMessage.getReplyTo(), Channels.getTheIndexServer()));
            } catch (Throwable th) {
                log.warn("Unable to generate index for jobs [{}]", StringUtils.conjoin(",", indexRequestMessage.getRequestedJobs()), th);
                indexRequestMessage.setNotOk(th);
                synchronized (this) {
                    currentJobs.remove(indexRequestMessage.getID());
                    deleteStoredMessage(indexRequestMessage);
                    String str2 = indexRequestMessage.isOk() ? "successful" : "failed";
                    if (mustReturnIndex) {
                        log.info("Sending {} reply for IndexRequestMessage back to sender '{}'.", str2, indexRequestMessage.getReplyTo());
                        JMSConnectionFactory.getInstance().reply(indexRequestMessage);
                        return;
                    }
                    log.info("Sending{} IndexReadyMessage to Scheduler", str2);
                    boolean z2 = str2.equalsIgnoreCase("failed") ? false : true;
                    if (this.alwaysReturnFalseMode) {
                        log.info("Setting isindexready = false in return message");
                        z2 = false;
                    }
                    JMSConnectionFactory.getInstance().send(new IndexReadyMessage(indexRequestMessage.getHarvestId(), z2, indexRequestMessage.getReplyTo(), Channels.getTheIndexServer()));
                }
            }
        } catch (Throwable th2) {
            synchronized (this) {
                currentJobs.remove(indexRequestMessage.getID());
                deleteStoredMessage(indexRequestMessage);
                String str3 = indexRequestMessage.isOk() ? "successful" : "failed";
                if (mustReturnIndex) {
                    log.info("Sending {} reply for IndexRequestMessage back to sender '{}'.", str3, indexRequestMessage.getReplyTo());
                    JMSConnectionFactory.getInstance().reply(indexRequestMessage);
                } else {
                    log.info("Sending{} IndexReadyMessage to Scheduler", str3);
                    boolean z3 = str3.equalsIgnoreCase("failed") ? false : true;
                    if (this.alwaysReturnFalseMode) {
                        log.info("Setting isindexready = false in return message");
                        z3 = false;
                    }
                    JMSConnectionFactory.getInstance().send(new IndexReadyMessage(indexRequestMessage.getHarvestId(), z3, indexRequestMessage.getReplyTo(), Channels.getTheIndexServer()));
                }
                throw th2;
            }
        }
    }

    private void deleteStoredMessage(IndexRequestMessage indexRequestMessage) {
        File file = new File(this.requestDir, indexRequestMessage.getID());
        log.debug("Trying to delete stored serialized message: {}", file.getAbsolutePath());
        if (!file.exists()) {
            log.warn("The file does not exist any more.");
        } else {
            if (FileUtils.remove(file)) {
                return;
            }
            log.debug("The file '{}' was not deleted", file);
        }
    }

    private void checkMessage(IndexRequestMessage indexRequestMessage) throws UnknownID, ArgumentNotValid {
        ArgumentNotValid.checkTrue(indexRequestMessage.isOk(), "Message was not OK");
        ArgumentNotValid.checkNotNull(indexRequestMessage.getRequestType(), "RequestType type");
        ArgumentNotValid.checkNotNull(indexRequestMessage.getRequestedJobs(), "Set<Long> jobIDs");
        if (this.handlers.get(indexRequestMessage.getRequestType()) == null) {
            throw new UnknownID("No handler known for requesttype " + indexRequestMessage.getRequestType());
        }
    }

    @Override // dk.netarkivet.harvester.indexserver.IndexRequestServerInterface
    public void close() {
        cleanup();
    }

    @Override // dk.netarkivet.common.utils.CleanupIF
    public void cleanup() {
        this.checkIflisteningTimer.cancel();
        conn.removeListener(Channels.getTheIndexServer(), this);
        this.handlers.clear();
        if (instance != null) {
            instance = null;
        }
    }

    @Override // dk.netarkivet.harvester.indexserver.IndexRequestServerInterface
    public void start() {
        restoreRequestsfromRequestDir();
        log.info("{} indexing jobs in progress that was stored in requestdir: {}", Integer.valueOf(currentJobs.size()), this.requestDir.getAbsolutePath());
        ListeningTask listeningTask = new ListeningTask(this);
        isListening.set(false);
        this.checkIflisteningTimer.schedule(listeningTask, 0L, listeningInterval);
    }

    static {
        Settings.addDefaultClasspathSettings(defaultSettingsClasspath);
        JOBS_FOR_TESTINDEX = "settings.harvester.indexserver.indexrequestserver.fileContainingJobsForTestindex";
        ALWAYS_SET_ISINDEX_READY_TO_FALSE = "settings.harvester.indexserver.indexrequestserver.alwaysSetIsIndexReadyToFalse";
        isListening = new AtomicBoolean();
    }
}
