001/* 002 * #%L 003 * Netarchivesuite - common 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023 024package dk.netarkivet.common.utils; 025 026import java.io.BufferedInputStream; 027import java.io.BufferedOutputStream; 028import java.io.ByteArrayOutputStream; 029import java.io.File; 030import java.io.FileNotFoundException; 031import java.io.FileOutputStream; 032import java.io.IOException; 033import java.io.InputStream; 034import java.io.OutputStream; 035import java.util.Arrays; 036import java.util.LinkedList; 037import java.util.Set; 038import java.util.Timer; 039import java.util.TimerTask; 040import java.util.concurrent.atomic.AtomicBoolean; 041 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import dk.netarkivet.common.Constants; 046import dk.netarkivet.common.exceptions.ArgumentNotValid; 047import dk.netarkivet.common.exceptions.IOFailure; 048 049/** 050 * Various utilities for running processes -- not exactly Java's forte. 051 */ 052public class ProcessUtils { 053 054 /** The logger. */ 055 private static final Logger log = LoggerFactory.getLogger(ProcessUtils.class); 056 057 /** 058 * Runs an external process that takes no input, discarding its output. 059 * 060 * @param environment An environment to run the process in (may be null) 061 * @param programAndArgs The program and its arguments. 062 * @return The return code of the process. 063 */ 064 public static int runProcess(String[] environment, String... programAndArgs) { 065 try { 066 if (log.isDebugEnabled()) { 067 log.debug("Running external program: {} with environment {}", StringUtils.conjoin(" ", programAndArgs), 068 StringUtils.conjoin(" ", environment)); 069 } 070 071 Process p = Runtime.getRuntime().exec(programAndArgs, environment); 072 discardProcessOutput(p.getInputStream()); 073 discardProcessOutput(p.getErrorStream()); 074 while (true) { 075 try { 076 return p.waitFor(); 077 } catch (InterruptedException e) { 078 // Ignoring interruptions, we just want to try waiting 079 // again. 080 } 081 } 082 } catch (IOException e) { 083 throw new IOFailure("Failure while running " + Arrays.toString(programAndArgs), e); 084 } 085 } 086 087 /** 088 * Runs an external process that takes no input, discarding its output. This is a convenience wrapper for 089 * runProcess(environment, programAndArgs) 090 * 091 * @param programAndArgs The program to run and its arguments 092 * @return The return code of the process. 093 */ 094 public static int runProcess(String... programAndArgs) { 095 return runProcess(null, programAndArgs); 096 } 097 098 /** 099 * Read the output from a process. Due to oddities in the Process handling, this has to be done char by char. This 100 * method just implements a consumer thread to eat the output of a process and so prevent blocking. 101 * 102 * @param inputStream A stream to read up to end of file. This stream is closed at some point in the future, but not 103 * necessarily before this method returns. 104 */ 105 public static void discardProcessOutput(final InputStream inputStream) { 106 makeCollectorThread(inputStream, new DiscardingOutputStream(), -1).start(); 107 } 108 109 /** 110 * Collect all output from an inputstream, up to maxCollect bytes, in an output object. This will eventually close 111 * the given InputStream, but not necessarily before the method returns. The thread created is placed in a thread 112 * set, and should be removed once all output has been collected. While only a limited amount may be written to the 113 * output object, the entire output will be read from the inputStream unless the thread or the inputStream is 114 * destroyed first. 115 * 116 * @param inputStream The inputstream to read contents from 117 * @param maxCollect The maximum number of bytes to collect, or -1 for no limit 118 * @param collectionThreads Set of threads that concurrently collect output 119 * @return An object that collects the output. Once the thread returned is finished, the object will no longer be 120 * written to. The collected output can be retrieved with the toString method. 121 */ 122 public static Object collectProcessOutput(final InputStream inputStream, final int maxCollect, 123 Set<Thread> collectionThreads) { 124 final OutputStream stream = new ByteArrayOutputStream(); 125 Thread t = makeCollectorThread(inputStream, stream, maxCollect); 126 t.start(); 127 collectionThreads.add(t); 128 return stream; 129 } 130 131 /** 132 * Collect all output from an inputstream, appending it to a file. This will eventually close the given InputStream, 133 * but not necessarily before the method returns. The thread created is placed in a thread set, and should be 134 * removed once all output has been collected. 135 * 136 * @param inputStream The inputstream to read contents from 137 * @param outputFile The file that output should be appended to. 138 * @param collectionThreads Set of threads that concurrently collect output 139 */ 140 public static void writeProcessOutput(final InputStream inputStream, final File outputFile, 141 Set<Thread> collectionThreads) { 142 final OutputStream stream; 143 try { 144 stream = new FileOutputStream(outputFile, true); 145 } catch (FileNotFoundException e) { 146 throw new IOFailure("Cannot create file '" + outputFile + " to write process output to.", e); 147 } 148 Thread t = makeCollectorThread(inputStream, stream, -1); 149 t.start(); 150 collectionThreads.add(t); 151 } 152 153 /** 154 * Collect all output from an inputstream, writing it to an output stream, using a separate thread. This will 155 * eventually close the given InputStream and OutputStream, but not necessarily before the method returns. While 156 * only a limited amount may be written to the output object, the entire output will be read fron the inputStream 157 * unless the thread or the inputStream is destroyed first. 158 * 159 * @param inputStream The inputstream to read contents from 160 * @param outputStream An stream to write the output to. 161 * @param maxCollect The maximum number of bytes to collect, or -1 for no limit 162 * @return The thread that will collect the output. 163 */ 164 private static Thread makeCollectorThread(final InputStream inputStream, final OutputStream outputStream, 165 final int maxCollect) { 166 return new Thread() { 167 public void run() { 168 try { 169 InputStream reader = null; 170 OutputStream writer = null; 171 try { 172 reader = new BufferedInputStream(inputStream); 173 writer = new BufferedOutputStream(outputStream); 174 copyContents(reader, writer, maxCollect); 175 } finally { 176 if (reader != null) { 177 reader.close(); 178 } 179 if (writer != null) { 180 writer.close(); 181 } 182 } 183 } catch (IOException e) { 184 // This seems ugly 185 throw new RuntimeException("Couldn't close streams for " + "process.", e); 186 } 187 } 188 }; 189 } 190 191 /** 192 * Reads all contents from a stream, writing some or all to another. 193 * 194 * @param in InputStream to read from 195 * @param out OutputStream to write to 196 * @param maxCollect Maximum number of bytes to write to out 197 * @throws IOFailure If there are problems reading or writing. 198 */ 199 private static void copyContents(InputStream in, OutputStream out, int maxCollect) { 200 int bytesRead; 201 byte[] buffer = new byte[Constants.IO_BUFFER_SIZE]; 202 int totalBytesRead = 0; 203 try { 204 while ((bytesRead = in.read(buffer, 0, Constants.IO_BUFFER_SIZE)) != -1) { 205 if (maxCollect == -1) { 206 out.write(buffer, 0, bytesRead); 207 } else if (totalBytesRead < maxCollect) { 208 out.write(buffer, 0, Math.min(bytesRead, maxCollect - totalBytesRead)); 209 } 210 // Close early if applicable 211 if (maxCollect != -1 && totalBytesRead < maxCollect && totalBytesRead + bytesRead > maxCollect) { 212 out.close(); 213 } 214 totalBytesRead += bytesRead; 215 } 216 } catch (IOException e) { 217 throw new IOFailure("Error reading or writing process output", e); 218 } 219 } 220 221 /** 222 * Wait for the end of a process, but only for a limited time. This method takes care of the ways waitFor can get 223 * interrupted. 224 * 225 * @param p Process to wait for 226 * @param maxWait The maximum number of milliseconds to wait for the process to exit. 227 * @return Exit value for process, or null if the process didn't exit within the expected time. 228 */ 229 public static Integer waitFor(final Process p, long maxWait) { 230 ArgumentNotValid.checkNotNull(p, "Process p"); 231 ArgumentNotValid.checkPositive(maxWait, "long maxWait"); 232 long startTime = System.currentTimeMillis(); 233 Timer timer = new Timer(true); 234 final Thread waitThread = Thread.currentThread(); 235 boolean wakeupScheduled = false; 236 final AtomicBoolean doneWaiting = new AtomicBoolean(false); 237 while (System.currentTimeMillis() < startTime + maxWait) { 238 try { 239 if (!wakeupScheduled) { 240 // First time in here, we need to start the wakup thread, 241 // but be sure it doesn't notify us too early or too late. 242 synchronized (waitThread) { 243 timer.schedule(new TimerTask() { 244 public void run() { 245 synchronized (waitThread) { 246 if (!doneWaiting.get()) { 247 waitThread.interrupt(); 248 } 249 } 250 } 251 }, maxWait); 252 wakeupScheduled = true; 253 } 254 } 255 256 p.waitFor(); 257 break; 258 } catch (InterruptedException e) { 259 // May happen for a number of reasons. We just check if we've 260 // timed out yet when we go through the loop again. 261 } 262 } 263 synchronized (waitThread) { 264 timer.cancel(); 265 doneWaiting.set(true); 266 Thread.interrupted(); // In case the timer task interrupted. 267 } 268 try { 269 return p.exitValue(); 270 } catch (IllegalThreadStateException e) { 271 log.warn("Process '{}' did not exit within {} milliseconds", p, (System.currentTimeMillis() - startTime)); 272 return null; 273 } 274 } 275 276 /** 277 * Runs a system process (Unix sort) to sort a file. 278 * 279 * @param inputFile the input file. 280 * @param outputFile the output file. 281 * @return the process exit code. 282 */ 283 public static int runUnixSort(File inputFile, File outputFile) { 284 return runUnixSort(inputFile, outputFile, null, false); 285 } 286 287 /** 288 * Runs a system process (Unix sort) to sort a file. 289 * 290 * @param inputFile the input file. 291 * @param outputFile the output file. 292 * @param tempDir the directory where to store temporary files (null for default system temp). 293 * @param crawllogSorting Should we sort crawllog style ("-k 4b") or not 294 * @return the process exit code. 295 */ 296 public static int runUnixSort(File inputFile, File outputFile, File tempDir, boolean crawllogSorting) { 297 String[] environment = new String[] {"LANG=C"}; 298 LinkedList<String> cmdAndParams = new LinkedList<String>(); 299 cmdAndParams.add("sort"); 300 cmdAndParams.add(inputFile.getAbsolutePath()); 301 if (crawllogSorting) { 302 // -k 4b means fourth field (from 1) ignoring leading blanks 303 cmdAndParams.add("-k"); 304 cmdAndParams.add("4b"); 305 } 306 // -o means output to (file) 307 cmdAndParams.add("-o"); 308 cmdAndParams.add(outputFile.getAbsolutePath()); 309 310 if (tempDir != null) { 311 // -T configures where to store temporary files 312 cmdAndParams.add("-T"); 313 cmdAndParams.add(tempDir.getAbsolutePath()); 314 } 315 316 return ProcessUtils.runProcess(environment, (String[]) cmdAndParams.toArray(new String[cmdAndParams.size()])); 317 } 318 319}