001package dk.netarkivet.harvester.webinterface.servlet;
002
003import java.io.File;
004import java.io.FilenameFilter;
005import java.io.IOException;
006import java.util.ArrayList;
007import java.util.Collections;
008import java.util.HashMap;
009import java.util.HashSet;
010import java.util.Iterator;
011import java.util.LinkedList;
012import java.util.List;
013import java.util.Map;
014import java.util.Set;
015import java.util.TreeMap;
016
017import org.netarchivesuite.heritrix3wrapper.Heritrix3Wrapper;
018import org.slf4j.Logger;
019import org.slf4j.LoggerFactory;
020
021import dk.netarkivet.harvester.datamodel.HarvestChannelDAO;
022import dk.netarkivet.harvester.datamodel.JobDAO;
023import dk.netarkivet.harvester.datamodel.RunningJobsInfoDAO;
024import dk.netarkivet.harvester.harvesting.monitor.HarvestMonitor;
025
026public class Heritrix3JobMonitorThread implements Runnable {
027
028    /** The logger for this class. */
029    private static final Logger LOG = LoggerFactory.getLogger(Heritrix3JobMonitorThread.class);
030
031    protected NASEnvironment environment;
032
033    protected static HarvestMonitor harvestMonitor;
034
035    protected static JobDAO jobDAO;
036
037    protected static RunningJobsInfoDAO runningJobsInfoDAO;
038
039    protected static HarvestChannelDAO harvestChannelDAO;
040
041    static {
042        harvestMonitor = HarvestMonitor.getInstance();
043        jobDAO = JobDAO.getInstance();
044        runningJobsInfoDAO = RunningJobsInfoDAO.getInstance();
045        harvestChannelDAO = HarvestChannelDAO.getInstance();
046    }
047
048    public Thread thread;
049
050    public boolean bExit = false;
051
052    public Map<Long, Heritrix3JobMonitor> runningJobMonitorMap = new TreeMap<Long, Heritrix3JobMonitor>();
053
054    public Map<Long, Heritrix3JobMonitor> filterJobMonitorMap = new TreeMap<Long, Heritrix3JobMonitor>();
055
056    public Set<Heritrix3Wrapper> h3WrapperSet = new HashSet<Heritrix3Wrapper>();
057
058    public Set<String> h3HostPortSet = new HashSet<String>();
059
060    public List<String> h3HostnamePortEnabledList = new ArrayList<String>();
061
062    public List<String> h3HostnamePortDisabledList = new ArrayList<String>();
063
064    public Heritrix3JobMonitorThread(NASEnvironment environment) {
065        this.environment = environment;
066    }
067
068    public void start() {
069        thread = new Thread(this, "Heritrix3 Job Monitor Thread");
070        thread.start();
071    }
072
073    @Override
074    public void run() {
075        Map<Long, Heritrix3JobMonitor> tmpJobMonitorMap;
076        Iterator<Heritrix3JobMonitor> jobmonitorIter;
077        byte[] tmpBuf = new byte[1024 * 1024];
078        try {
079            LOG.info("CrawlLog Thread started.");
080
081            //File tmpFolder = new File("/tmp/");
082            File tmpFolder = environment.tempPath;;
083            File[] oldFiles = tmpFolder.listFiles(new FilenameFilter() {
084                @Override
085                public boolean accept(File dir, String name) {
086                    if (name.startsWith("crawllog-")) {
087                        if (name.endsWith(".log") || name.endsWith(".idx")) {
088                            return true;
089                        }
090                    }
091                    return false;
092                }
093            });
094
095            Map<String, File> oldFilesMap = new HashMap<String, File>();
096            File tmpFile;
097            for (int i=0; i<oldFiles.length; ++i) {
098                tmpFile = oldFiles[i];
099                oldFilesMap.put(tmpFile.getName(), tmpFile);
100            };
101            List<File> oldFilesList = new ArrayList<File>();
102
103            while (!bExit) {
104                @SuppressWarnings("unchecked")
105                Set<Long> runningJobs = harvestMonitor.getRunningJobs();
106                Iterator<Long> jobidIter = runningJobs.iterator();
107                Heritrix3JobMonitor jobmonitor;
108                synchronized (runningJobMonitorMap) {
109                    filterJobMonitorMap.clear();
110                    while (jobidIter.hasNext()) {
111                        long jobId = jobidIter.next();
112                        jobmonitor = runningJobMonitorMap.remove(jobId);
113                        if (jobmonitor == null) {
114                            try {
115                                // New H3 job.
116                                jobmonitor = Heritrix3WrapperManager.getJobMonitor(jobId, environment);
117                            } catch (IOException e) {
118                            }
119                        }
120                        filterJobMonitorMap.put(jobId, jobmonitor);
121                    }
122                    tmpJobMonitorMap = filterJobMonitorMap;
123                    filterJobMonitorMap = runningJobMonitorMap;
124                    runningJobMonitorMap = tmpJobMonitorMap;
125                }
126                jobmonitorIter = filterJobMonitorMap.values().iterator();
127                while (jobmonitorIter.hasNext()) {
128                    jobmonitor = jobmonitorIter.next();
129                    jobmonitor.cleanup(oldFilesList);
130                }
131                jobmonitorIter = runningJobMonitorMap.values().iterator();
132                while (jobmonitorIter.hasNext()) {
133                    jobmonitor = jobmonitorIter.next();
134                    if (oldFilesMap != null) {
135                        oldFilesMap.remove(jobmonitor.logFile.getName());
136                        oldFilesMap.remove(jobmonitor.idxFile.getName());
137                    }
138                    if (!jobmonitor.bInitialized) {
139                        jobmonitor.init();
140                    }
141                    checkH3HostnamePort(jobmonitor);
142                    isH3HostnamePortEnabled(jobmonitor);
143                    if (jobmonitor.bPull) {
144                        jobmonitor.updateCrawlLog(tmpBuf);
145                    }
146                }
147                if (oldFilesMap != null) {
148                    oldFilesList.addAll(oldFilesMap.values());
149                    oldFilesMap = null;
150                }
151                int idx = 0;
152                while (idx < oldFilesList.size()) {
153                    if (oldFilesList.get(idx).delete()) {
154                        idx++;
155                    } else {
156                        oldFilesList.remove(idx);
157                    }
158                }
159                try {
160                    Thread.sleep(60 * 1000);
161                } catch (InterruptedException e) {
162                }
163            }
164            LOG.info("CrawlLog Thread stopped.");
165        } catch (Throwable t) {
166            LOG.error("CrawlLog Thread stopped unexpectedly!.", t);
167        }
168    }
169
170    public Heritrix3JobMonitor getRunningH3Job(long jobId) {
171        Heritrix3JobMonitor h3Job;
172        synchronized (runningJobMonitorMap) {
173            h3Job = runningJobMonitorMap.get(jobId);
174        }
175        return h3Job;
176    }
177
178    public List<Heritrix3JobMonitor> getRunningH3Jobs() {
179        List<Heritrix3JobMonitor> h3JobsList = new LinkedList<Heritrix3JobMonitor>();
180        synchronized (runningJobMonitorMap) {
181            h3JobsList.addAll(runningJobMonitorMap.values());
182        }
183        return h3JobsList;
184    }
185
186    public void checkH3HostnamePort(Heritrix3JobMonitor jobmonitor) {
187        Heritrix3Wrapper h3wrapper = jobmonitor.h3wrapper; 
188        if (jobmonitor.h3HostnamePort == null && h3wrapper != null) {
189            synchronized (h3HostPortSet) {
190                jobmonitor.h3HostnamePort = h3wrapper.hostname + ":" + h3wrapper.port;
191                if (!h3HostPortSet.contains(jobmonitor.h3HostnamePort)) {
192                    h3HostPortSet.add(jobmonitor.h3HostnamePort);
193                    updateH3HostnamePortFilter();
194                }
195            }
196        }
197    }
198
199    public boolean isH3HostnamePortEnabled(Heritrix3JobMonitor jobmonitor) {
200        synchronized (h3HostnamePortEnabledList) {
201            // TODO Not ideal to do contains on a list. But its fairly short.
202            jobmonitor.bPull = h3HostnamePortEnabledList.contains(jobmonitor.h3HostnamePort);
203        }
204        return jobmonitor.bPull;
205    }
206
207    public void updateH3HostnamePortFilter() {
208        String h3HostnamePort;
209        List<String> enabledList = new LinkedList<String>();
210        List<String> disabledList = new LinkedList<String>();
211        synchronized (h3HostPortSet) {
212            Iterator<String> iter = h3HostPortSet.iterator();
213            while (iter.hasNext()) {
214                h3HostnamePort = iter.next();
215                if (environment.isH3HostnamePortEnabled(h3HostnamePort)) {
216                    enabledList.add(h3HostnamePort);
217                } else {
218                    disabledList.add(h3HostnamePort);
219                }
220            }
221        }
222        synchronized (h3HostnamePortEnabledList) {
223            h3HostnamePortEnabledList.clear();
224            h3HostnamePortEnabledList.addAll(enabledList);
225            Collections.sort(h3HostnamePortEnabledList);
226        }
227        synchronized (h3HostnamePortDisabledList) {
228            h3HostnamePortDisabledList.clear();
229            h3HostnamePortDisabledList.addAll(disabledList);
230            Collections.sort(h3HostnamePortDisabledList);
231        }
232    }
233
234}