001/*
002 * #%L
003 * Netarchivesuite - heritrix 3 monitor
004 * %%
005 * Copyright (C) 2005 - 2018 The Royal Danish Library, 
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023
024package dk.netarkivet.heritrix3.monitor;
025
026import java.io.File;
027import java.io.FilenameFilter;
028import java.io.IOException;
029import java.util.ArrayList;
030import java.util.Collections;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.Iterator;
034import java.util.LinkedList;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038import java.util.TreeMap;
039import java.util.TreeSet;
040
041import org.netarchivesuite.heritrix3wrapper.Heritrix3Wrapper;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import dk.netarkivet.harvester.datamodel.HarvestChannelDAO;
046import dk.netarkivet.harvester.datamodel.JobDAO;
047import dk.netarkivet.harvester.datamodel.RunningJobsInfoDAO;
048
049public class Heritrix3JobMonitorThread implements Runnable {
050
051    /** The logger for this class. */
052    private static final Logger LOG = LoggerFactory.getLogger(Heritrix3JobMonitorThread.class);
053
054    /** Environment used for all servlets. */
055    private NASEnvironment environment;
056
057    /** <code>HarvestMonitor</code> instance. */
058    //public static HarvestMonitor harvestMonitor;
059
060    /** <code>JobDAO</code> instance. */
061    public static JobDAO jobDAO;
062
063    /** <code>RunningJobsInfoDAO</code> instance. */
064    public static RunningJobsInfoDAO runningJobsInfoDAO;
065
066    /** <code>HarvestChannelDAO</code> instance. */
067    public static HarvestChannelDAO harvestChannelDAO;
068
069    /** Current thread. */
070    public Thread thread;
071
072    /** If caught, the throwable that stopped the monitor thread. */
073    public Throwable throwable;
074
075    /** Boolean switch to close the thread. */
076    public boolean bExit = false;
077
078    /** A map from harvest job number to the running H3 job monitor for the given job */
079    public Map<Long, Heritrix3JobMonitor> runningJobMonitorMap = new TreeMap<Long, Heritrix3JobMonitor>();
080
081    private final Object runningJobMonitorMapSynchronizer = new Object();
082
083    public Map<Long, Heritrix3JobMonitor> filterJobMonitorMap = new TreeMap<Long, Heritrix3JobMonitor>();
084
085    public Set<String> h3HostPortSet = new HashSet<String>();
086
087    /** List of hosts with monitoring enabled. */
088    public List<String> h3HostnamePortEnabledList = new ArrayList<String>();
089
090    /** List of hosts with monitoring disabled. */
091    public List<String> h3HostnamePortDisabledList = new ArrayList<String>();
092
093    public Heritrix3JobMonitorThread(NASEnvironment environment) {
094        this.environment = environment;
095    }
096
097    public synchronized void init() throws Exception {
098        /*
099        if (harvestMonitor == null) {
100            harvestMonitor = HarvestMonitor.getInstance();
101        }
102        */
103        if (jobDAO == null) {
104            jobDAO = JobDAO.getInstance();
105        }
106        if (runningJobsInfoDAO == null) {
107            runningJobsInfoDAO = RunningJobsInfoDAO.getInstance();
108        }
109        if (harvestChannelDAO == null) {
110            harvestChannelDAO = HarvestChannelDAO.getInstance();
111        }
112    }
113
114    public synchronized void start() {
115        if (thread == null || !thread.isAlive()) {
116            thread = new Thread(this, "Heritrix3 Job Monitor Thread");
117            thread.start();
118        }
119    }
120
121    @Override
122    public void run() {
123        Map<Long, Heritrix3JobMonitor> tmpJobMonitorMap;
124        Iterator<Heritrix3JobMonitor> jobmonitorIter;
125        byte[] tmpBuf = new byte[1024 * 1024];
126        try {
127            LOG.info("Heritrix3 Job Monitor Thread started.");
128
129            //File tmpFolder = new File("/tmp/");
130            File tmpFolder = environment.tempPath;
131            File[] oldFiles = tmpFolder.listFiles(new FilenameFilter() {
132                @Override
133                public boolean accept(File dir, String name) {
134                    if (name.startsWith("crawllog-")) {
135                        if (name.endsWith(".log") || name.endsWith(".idx")) {
136                            return true;
137                        }
138                    }
139                    return false;
140                }
141            });
142
143            Map<String, File> oldFilesMap = new HashMap<String, File>();
144            File tmpFile;
145            for (int i=0; i<oldFiles.length; ++i) {
146                tmpFile = oldFiles[i];
147                oldFilesMap.put(tmpFile.getName(), tmpFile);
148            }
149            List<File> oldFilesList = new ArrayList<File>();
150
151            while (!bExit) {
152                Set<Long> runningJobs = getRunningJobs();
153                if (runningJobs != null) {
154                        /*
155                         * Identify running and stopped jobs.
156                         */
157                    Heritrix3JobMonitor jobmonitor;
158
159                    Iterator<Long> jobidIter = runningJobs.iterator();
160                    synchronized (runningJobMonitorMapSynchronizer) {
161                        filterJobMonitorMap.clear();
162
163                        // For all running jobs..
164                        while (jobidIter.hasNext()) {
165                            Long jobId = jobidIter.next();
166
167                            if (jobId != null) {
168                                jobmonitor = runningJobMonitorMap.remove(jobId);
169                                if (jobmonitor == null) {
170                                    // Either jobId was not in runningJobMonitorMap, or the jobmonitor for
171                                    // key jobId was itself null. Either way, the jobmonitor for jobId
172                                    // could not be found.
173                                    try {
174                                        // New H3 job.
175                                        jobmonitor = Heritrix3WrapperManager.getJobMonitor(jobId,
176                                                environment);
177                                    } catch (IOException e) {
178                                        LOG.debug("IOException assigning to job monitor");
179                                        // Ignored because exceptions can occur communicating with H3 in this call.
180                                    }
181                                }
182                                filterJobMonitorMap.put(jobId, jobmonitor);
183                            }
184                        }
185
186                        // Swap filterJobMonitorMap and runningJobMonitorMap
187                        tmpJobMonitorMap = filterJobMonitorMap;
188                        filterJobMonitorMap = runningJobMonitorMap;
189                        runningJobMonitorMap = tmpJobMonitorMap;
190                    }
191                    /*
192                     * Add the cached files for stopped jobs to the list of old files.
193                     */
194                    jobmonitorIter = filterJobMonitorMap.values().iterator();
195                    while (jobmonitorIter.hasNext()) {
196                        jobmonitor = jobmonitorIter.next();
197                        jobmonitor.cleanup(oldFilesList);
198                    }
199                    /*
200                     * Remove cached files for running jobs in the list of old files.
201                     * On thread start all cached files are added to old files even though they might still be running.
202                     */
203                    jobmonitorIter = runningJobMonitorMap.values().iterator();
204                    while (jobmonitorIter.hasNext()) {
205                        jobmonitor = jobmonitorIter.next();
206                        if (oldFilesMap != null) {
207                            oldFilesMap.remove(jobmonitor.logFile.getName());
208                            oldFilesMap.remove(jobmonitor.idxFile.getName());
209                        }
210                        if (!jobmonitor.bInitialized) {
211                            jobmonitor.init();
212                        }
213                        checkH3HostnamePort(jobmonitor);
214                        isH3HostnamePortEnabled(jobmonitor);
215                        if (jobmonitor.bPull) {
216                            jobmonitor.updateCrawlLog(tmpBuf);
217                        }
218                    }
219                    if (oldFilesMap != null) {
220                        oldFilesList.addAll(oldFilesMap.values());
221                        oldFilesMap = null;
222                    }
223                    int idx = 0;
224                    while (idx < oldFilesList.size()) {
225                        if (oldFilesList.get(idx).delete()) {
226                            idx++;
227                        } else {
228                            oldFilesList.remove(idx);
229                        }
230                    }
231                }
232                try {
233                    Thread.sleep(60 * 1000);
234                } catch (InterruptedException e) {
235                }
236            }
237            LOG.info("Heritrix3 Job Monitor Thread stopped.");
238        } catch (Throwable t) {
239            // Save throwable so we can show it in the restart GUI.
240            throwable = t;
241            LOG.error("Heritrix3 Job Monitor Thread stopped unexpectedly!.", t);
242        }
243    }
244
245    /**
246     * Encapsulate call to get the set of running jobs and make a copy of it inside a throwable
247     * since concurrency is an issues.
248     * @return a copy of the running jobs set
249     */
250    public Set<Long> getRunningJobs() {
251        try {
252            //@SuppressWarnings("unchecked")
253            //Set<Long> orgJobs = harvestMonitor.getRunningJobs();
254            Set<Long> orgJobs = RunningJobsInfoDAO.getInstance().getHistoryRecordIds();
255            Set<Long> jobs = new TreeSet<Long>(orgJobs);
256            return jobs;
257        } catch (Throwable t) {
258            LOG.debug("Heritrix3 Job Monitor Thread cloning of running jobs failed with an exception!", t);
259            return null;
260        }
261    }
262
263    public Heritrix3JobMonitor getRunningH3Job(long jobId) {
264        Heritrix3JobMonitor h3Job;
265        synchronized (runningJobMonitorMapSynchronizer) {
266            h3Job = runningJobMonitorMap.get(jobId);
267        }
268        return h3Job;
269    }
270
271    public List<Heritrix3JobMonitor> getRunningH3Jobs() {
272        List<Heritrix3JobMonitor> h3JobsList = new LinkedList<Heritrix3JobMonitor>();
273        synchronized (runningJobMonitorMapSynchronizer) {
274            h3JobsList.addAll(runningJobMonitorMap.values());
275        }
276        return h3JobsList;
277    }
278
279    public void checkH3HostnamePort(Heritrix3JobMonitor jobmonitor) {
280        Heritrix3Wrapper h3wrapper = jobmonitor.h3wrapper; 
281        if (jobmonitor.h3HostnamePort == null && h3wrapper != null) {
282            synchronized (h3HostPortSet) {
283                jobmonitor.h3HostnamePort = h3wrapper.hostname + ":" + h3wrapper.port;
284                if (!h3HostPortSet.contains(jobmonitor.h3HostnamePort)) {
285                    h3HostPortSet.add(jobmonitor.h3HostnamePort);
286                    updateH3HostnamePortFilter();
287                }
288            }
289        }
290    }
291
292    public boolean isH3HostnamePortEnabled(Heritrix3JobMonitor jobmonitor) {
293        synchronized (h3HostnamePortEnabledList) {
294            // TODO Not ideal to do contains on a list. But its fairly short (i.e. max number of running
295            // H3 instances, presumably 80 or so, said Nicholas)
296            jobmonitor.bPull = h3HostnamePortEnabledList.contains(jobmonitor.h3HostnamePort);
297        }
298        return jobmonitor.bPull;
299    }
300
301    public void updateH3HostnamePortFilter() {
302        String h3HostnamePort;
303        List<String> enabledList = new LinkedList<String>();
304        List<String> disabledList = new LinkedList<String>();
305        synchronized (h3HostPortSet) {
306            Iterator<String> iter = h3HostPortSet.iterator();
307            while (iter.hasNext()) {
308                h3HostnamePort = iter.next();
309                if (environment.isH3HostnamePortEnabled(h3HostnamePort)) {
310                    enabledList.add(h3HostnamePort);
311                } else {
312                    disabledList.add(h3HostnamePort);
313                }
314            }
315        }
316        synchronized (h3HostnamePortEnabledList) {
317            h3HostnamePortEnabledList.clear();
318            h3HostnamePortEnabledList.addAll(enabledList);
319            Collections.sort(h3HostnamePortEnabledList);
320        }
321        synchronized (h3HostnamePortDisabledList) {
322            h3HostnamePortDisabledList.clear();
323            h3HostnamePortDisabledList.addAll(disabledList);
324            Collections.sort(h3HostnamePortDisabledList);
325        }
326    }
327
328}