001/* 002 * #%L 003 * Netarchivesuite - heritrix 3 monitor 004 * %% 005 * Copyright (C) 2005 - 2018 The Royal Danish Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023 024package dk.netarkivet.heritrix3.monitor; 025 026import java.io.File; 027import java.io.FilenameFilter; 028import java.io.IOException; 029import java.util.ArrayList; 030import java.util.Collections; 031import java.util.HashMap; 032import java.util.HashSet; 033import java.util.Iterator; 034import java.util.LinkedList; 035import java.util.List; 036import java.util.Map; 037import java.util.Set; 038import java.util.TreeMap; 039import java.util.TreeSet; 040 041import org.netarchivesuite.heritrix3wrapper.Heritrix3Wrapper; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import dk.netarkivet.harvester.datamodel.HarvestChannelDAO; 046import dk.netarkivet.harvester.datamodel.JobDAO; 047import dk.netarkivet.harvester.datamodel.RunningJobsInfoDAO; 048 049public class Heritrix3JobMonitorThread implements Runnable { 050 051 /** The logger for this class. */ 052 private static final Logger LOG = LoggerFactory.getLogger(Heritrix3JobMonitorThread.class); 053 054 /** Environment used for all servlets. */ 055 private NASEnvironment environment; 056 057 /** <code>HarvestMonitor</code> instance. */ 058 //public static HarvestMonitor harvestMonitor; 059 060 /** <code>JobDAO</code> instance. */ 061 public static JobDAO jobDAO; 062 063 /** <code>RunningJobsInfoDAO</code> instance. */ 064 public static RunningJobsInfoDAO runningJobsInfoDAO; 065 066 /** <code>HarvestChannelDAO</code> instance. */ 067 public static HarvestChannelDAO harvestChannelDAO; 068 069 /** Current thread. */ 070 public Thread thread; 071 072 /** If caught, the throwable that stopped the monitor thread. */ 073 public Throwable throwable; 074 075 /** Boolean switch to close the thread. */ 076 public boolean bExit = false; 077 078 /** A map from harvest job number to the running H3 job monitor for the given job */ 079 public Map<Long, Heritrix3JobMonitor> runningJobMonitorMap = new TreeMap<Long, Heritrix3JobMonitor>(); 080 081 private final Object runningJobMonitorMapSynchronizer = new Object(); 082 083 public Map<Long, Heritrix3JobMonitor> filterJobMonitorMap = new TreeMap<Long, Heritrix3JobMonitor>(); 084 085 public Set<String> h3HostPortSet = new HashSet<String>(); 086 087 /** List of hosts with monitoring enabled. */ 088 public List<String> h3HostnamePortEnabledList = new ArrayList<String>(); 089 090 /** List of hosts with monitoring disabled. */ 091 public List<String> h3HostnamePortDisabledList = new ArrayList<String>(); 092 093 public Heritrix3JobMonitorThread(NASEnvironment environment) { 094 this.environment = environment; 095 } 096 097 public synchronized void init() throws Exception { 098 /* 099 if (harvestMonitor == null) { 100 harvestMonitor = HarvestMonitor.getInstance(); 101 } 102 */ 103 if (jobDAO == null) { 104 jobDAO = JobDAO.getInstance(); 105 } 106 if (runningJobsInfoDAO == null) { 107 runningJobsInfoDAO = RunningJobsInfoDAO.getInstance(); 108 } 109 if (harvestChannelDAO == null) { 110 harvestChannelDAO = HarvestChannelDAO.getInstance(); 111 } 112 } 113 114 public synchronized void start() { 115 if (thread == null || !thread.isAlive()) { 116 thread = new Thread(this, "Heritrix3 Job Monitor Thread"); 117 thread.start(); 118 } 119 } 120 121 @Override 122 public void run() { 123 Map<Long, Heritrix3JobMonitor> tmpJobMonitorMap; 124 Iterator<Heritrix3JobMonitor> jobmonitorIter; 125 byte[] tmpBuf = new byte[1024 * 1024]; 126 try { 127 LOG.info("Heritrix3 Job Monitor Thread started."); 128 129 //File tmpFolder = new File("/tmp/"); 130 File tmpFolder = environment.tempPath; 131 File[] oldFiles = tmpFolder.listFiles(new FilenameFilter() { 132 @Override 133 public boolean accept(File dir, String name) { 134 if (name.startsWith("crawllog-")) { 135 if (name.endsWith(".log") || name.endsWith(".idx")) { 136 return true; 137 } 138 } 139 return false; 140 } 141 }); 142 143 Map<String, File> oldFilesMap = new HashMap<String, File>(); 144 File tmpFile; 145 for (int i=0; i<oldFiles.length; ++i) { 146 tmpFile = oldFiles[i]; 147 oldFilesMap.put(tmpFile.getName(), tmpFile); 148 } 149 List<File> oldFilesList = new ArrayList<File>(); 150 151 while (!bExit) { 152 Set<Long> runningJobs = getRunningJobs(); 153 if (runningJobs != null) { 154 /* 155 * Identify running and stopped jobs. 156 */ 157 Heritrix3JobMonitor jobmonitor; 158 159 Iterator<Long> jobidIter = runningJobs.iterator(); 160 synchronized (runningJobMonitorMapSynchronizer) { 161 filterJobMonitorMap.clear(); 162 163 // For all running jobs.. 164 while (jobidIter.hasNext()) { 165 Long jobId = jobidIter.next(); 166 167 if (jobId != null) { 168 jobmonitor = runningJobMonitorMap.remove(jobId); 169 if (jobmonitor == null) { 170 // Either jobId was not in runningJobMonitorMap, or the jobmonitor for 171 // key jobId was itself null. Either way, the jobmonitor for jobId 172 // could not be found. 173 try { 174 // New H3 job. 175 jobmonitor = Heritrix3WrapperManager.getJobMonitor(jobId, 176 environment); 177 } catch (IOException e) { 178 LOG.debug("IOException assigning to job monitor"); 179 // Ignored because exceptions can occur communicating with H3 in this call. 180 } 181 } 182 filterJobMonitorMap.put(jobId, jobmonitor); 183 } 184 } 185 186 // Swap filterJobMonitorMap and runningJobMonitorMap 187 tmpJobMonitorMap = filterJobMonitorMap; 188 filterJobMonitorMap = runningJobMonitorMap; 189 runningJobMonitorMap = tmpJobMonitorMap; 190 } 191 /* 192 * Add the cached files for stopped jobs to the list of old files. 193 */ 194 jobmonitorIter = filterJobMonitorMap.values().iterator(); 195 while (jobmonitorIter.hasNext()) { 196 jobmonitor = jobmonitorIter.next(); 197 jobmonitor.cleanup(oldFilesList); 198 } 199 /* 200 * Remove cached files for running jobs in the list of old files. 201 * On thread start all cached files are added to old files even though they might still be running. 202 */ 203 jobmonitorIter = runningJobMonitorMap.values().iterator(); 204 while (jobmonitorIter.hasNext()) { 205 jobmonitor = jobmonitorIter.next(); 206 if (oldFilesMap != null) { 207 oldFilesMap.remove(jobmonitor.logFile.getName()); 208 oldFilesMap.remove(jobmonitor.idxFile.getName()); 209 } 210 if (!jobmonitor.bInitialized) { 211 jobmonitor.init(); 212 } 213 checkH3HostnamePort(jobmonitor); 214 isH3HostnamePortEnabled(jobmonitor); 215 if (jobmonitor.bPull) { 216 jobmonitor.updateCrawlLog(tmpBuf); 217 } 218 } 219 if (oldFilesMap != null) { 220 oldFilesList.addAll(oldFilesMap.values()); 221 oldFilesMap = null; 222 } 223 int idx = 0; 224 while (idx < oldFilesList.size()) { 225 if (oldFilesList.get(idx).delete()) { 226 idx++; 227 } else { 228 oldFilesList.remove(idx); 229 } 230 } 231 } 232 try { 233 Thread.sleep(60 * 1000); 234 } catch (InterruptedException e) { 235 } 236 } 237 LOG.info("Heritrix3 Job Monitor Thread stopped."); 238 } catch (Throwable t) { 239 // Save throwable so we can show it in the restart GUI. 240 throwable = t; 241 LOG.error("Heritrix3 Job Monitor Thread stopped unexpectedly!.", t); 242 } 243 } 244 245 /** 246 * Encapsulate call to get the set of running jobs and make a copy of it inside a throwable 247 * since concurrency is an issues. 248 * @return a copy of the running jobs set 249 */ 250 public Set<Long> getRunningJobs() { 251 try { 252 //@SuppressWarnings("unchecked") 253 //Set<Long> orgJobs = harvestMonitor.getRunningJobs(); 254 Set<Long> orgJobs = RunningJobsInfoDAO.getInstance().getHistoryRecordIds(); 255 Set<Long> jobs = new TreeSet<Long>(orgJobs); 256 return jobs; 257 } catch (Throwable t) { 258 LOG.debug("Heritrix3 Job Monitor Thread cloning of running jobs failed with an exception!", t); 259 return null; 260 } 261 } 262 263 public Heritrix3JobMonitor getRunningH3Job(long jobId) { 264 Heritrix3JobMonitor h3Job; 265 synchronized (runningJobMonitorMapSynchronizer) { 266 h3Job = runningJobMonitorMap.get(jobId); 267 } 268 return h3Job; 269 } 270 271 public List<Heritrix3JobMonitor> getRunningH3Jobs() { 272 List<Heritrix3JobMonitor> h3JobsList = new LinkedList<Heritrix3JobMonitor>(); 273 synchronized (runningJobMonitorMapSynchronizer) { 274 h3JobsList.addAll(runningJobMonitorMap.values()); 275 } 276 return h3JobsList; 277 } 278 279 public void checkH3HostnamePort(Heritrix3JobMonitor jobmonitor) { 280 Heritrix3Wrapper h3wrapper = jobmonitor.h3wrapper; 281 if (jobmonitor.h3HostnamePort == null && h3wrapper != null) { 282 synchronized (h3HostPortSet) { 283 jobmonitor.h3HostnamePort = h3wrapper.hostname + ":" + h3wrapper.port; 284 if (!h3HostPortSet.contains(jobmonitor.h3HostnamePort)) { 285 h3HostPortSet.add(jobmonitor.h3HostnamePort); 286 updateH3HostnamePortFilter(); 287 } 288 } 289 } 290 } 291 292 public boolean isH3HostnamePortEnabled(Heritrix3JobMonitor jobmonitor) { 293 synchronized (h3HostnamePortEnabledList) { 294 // TODO Not ideal to do contains on a list. But its fairly short (i.e. max number of running 295 // H3 instances, presumably 80 or so, said Nicholas) 296 jobmonitor.bPull = h3HostnamePortEnabledList.contains(jobmonitor.h3HostnamePort); 297 } 298 return jobmonitor.bPull; 299 } 300 301 public void updateH3HostnamePortFilter() { 302 String h3HostnamePort; 303 List<String> enabledList = new LinkedList<String>(); 304 List<String> disabledList = new LinkedList<String>(); 305 synchronized (h3HostPortSet) { 306 Iterator<String> iter = h3HostPortSet.iterator(); 307 while (iter.hasNext()) { 308 h3HostnamePort = iter.next(); 309 if (environment.isH3HostnamePortEnabled(h3HostnamePort)) { 310 enabledList.add(h3HostnamePort); 311 } else { 312 disabledList.add(h3HostnamePort); 313 } 314 } 315 } 316 synchronized (h3HostnamePortEnabledList) { 317 h3HostnamePortEnabledList.clear(); 318 h3HostnamePortEnabledList.addAll(enabledList); 319 Collections.sort(h3HostnamePortEnabledList); 320 } 321 synchronized (h3HostnamePortDisabledList) { 322 h3HostnamePortDisabledList.clear(); 323 h3HostnamePortDisabledList.addAll(disabledList); 324 Collections.sort(h3HostnamePortDisabledList); 325 } 326 } 327 328}