001/*
002 * #%L
003 * Netarchivesuite - common
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023
024package dk.netarkivet.common.utils;
025
026import java.io.BufferedInputStream;
027import java.io.BufferedOutputStream;
028import java.io.ByteArrayOutputStream;
029import java.io.File;
030import java.io.FileNotFoundException;
031import java.io.FileOutputStream;
032import java.io.IOException;
033import java.io.InputStream;
034import java.io.OutputStream;
035import java.util.Arrays;
036import java.util.LinkedList;
037import java.util.Set;
038import java.util.Timer;
039import java.util.TimerTask;
040import java.util.concurrent.atomic.AtomicBoolean;
041
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import dk.netarkivet.common.Constants;
046import dk.netarkivet.common.exceptions.ArgumentNotValid;
047import dk.netarkivet.common.exceptions.IOFailure;
048
049/**
050 * Various utilities for running processes -- not exactly Java's forte.
051 */
052public class ProcessUtils {
053
054    /** The logger. */
055    private static final Logger log = LoggerFactory.getLogger(ProcessUtils.class);
056
057    /**
058     * Runs an external process that takes no input, discarding its output.
059     *
060     * @param environment An environment to run the process in (may be null)
061     * @param programAndArgs The program and its arguments.
062     * @return The return code of the process.
063     */
064    public static int runProcess(String[] environment, String... programAndArgs) {
065        try {
066            if (log.isDebugEnabled()) {
067                log.debug("Running external program: {} with environment {}", StringUtils.conjoin(" ", programAndArgs),
068                        StringUtils.conjoin(" ", environment));
069            }
070
071            Process p = Runtime.getRuntime().exec(programAndArgs, environment);
072            discardProcessOutput(p.getInputStream());
073            discardProcessOutput(p.getErrorStream());
074            while (true) {
075                try {
076                    return p.waitFor();
077                } catch (InterruptedException e) {
078                    // Ignoring interruptions, we just want to try waiting
079                    // again.
080                }
081            }
082        } catch (IOException e) {
083            throw new IOFailure("Failure while running " + Arrays.toString(programAndArgs), e);
084        }
085    }
086
087    /**
088     * Runs an external process that takes no input, discarding its output. This is a convenience wrapper for
089     * runProcess(environment, programAndArgs)
090     *
091     * @param programAndArgs The program to run and its arguments
092     * @return The return code of the process.
093     */
094    public static int runProcess(String... programAndArgs) {
095        return runProcess(null, programAndArgs);
096    }
097
098    /**
099     * Read the output from a process. Due to oddities in the Process handling, this has to be done char by char. This
100     * method just implements a consumer thread to eat the output of a process and so prevent blocking.
101     *
102     * @param inputStream A stream to read up to end of file. This stream is closed at some point in the future, but not
103     * necessarily before this method returns.
104     */
105    public static void discardProcessOutput(final InputStream inputStream) {
106        makeCollectorThread(inputStream, new DiscardingOutputStream(), -1).start();
107    }
108
109    /**
110     * Collect all output from an inputstream, up to maxCollect bytes, in an output object. This will eventually close
111     * the given InputStream, but not necessarily before the method returns. The thread created is placed in a thread
112     * set, and should be removed once all output has been collected. While only a limited amount may be written to the
113     * output object, the entire output will be read from the inputStream unless the thread or the inputStream is
114     * destroyed first.
115     *
116     * @param inputStream The inputstream to read contents from
117     * @param maxCollect The maximum number of bytes to collect, or -1 for no limit
118     * @param collectionThreads Set of threads that concurrently collect output
119     * @return An object that collects the output. Once the thread returned is finished, the object will no longer be
120     * written to. The collected output can be retrieved with the toString method.
121     */
122    public static Object collectProcessOutput(final InputStream inputStream, final int maxCollect,
123            Set<Thread> collectionThreads) {
124        final OutputStream stream = new ByteArrayOutputStream();
125        Thread t = makeCollectorThread(inputStream, stream, maxCollect);
126        t.start();
127        collectionThreads.add(t);
128        return stream;
129    }
130
131    /**
132     * Collect all output from an inputstream, appending it to a file. This will eventually close the given InputStream,
133     * but not necessarily before the method returns. The thread created is placed in a thread set, and should be
134     * removed once all output has been collected.
135     *
136     * @param inputStream The inputstream to read contents from
137     * @param outputFile The file that output should be appended to.
138     * @param collectionThreads Set of threads that concurrently collect output
139     */
140    public static void writeProcessOutput(final InputStream inputStream, final File outputFile,
141            Set<Thread> collectionThreads) {
142        final OutputStream stream;
143        try {
144            stream = new FileOutputStream(outputFile, true);
145        } catch (FileNotFoundException e) {
146            throw new IOFailure("Cannot create file '" + outputFile + " to write process output to.", e);
147        }
148        Thread t = makeCollectorThread(inputStream, stream, -1);
149        t.start();
150        collectionThreads.add(t);
151    }
152
153    /**
154     * Collect all output from an inputstream, writing it to an output stream, using a separate thread. This will
155     * eventually close the given InputStream and OutputStream, but not necessarily before the method returns. While
156     * only a limited amount may be written to the output object, the entire output will be read fron the inputStream
157     * unless the thread or the inputStream is destroyed first.
158     *
159     * @param inputStream The inputstream to read contents from
160     * @param outputStream An stream to write the output to.
161     * @param maxCollect The maximum number of bytes to collect, or -1 for no limit
162     * @return The thread that will collect the output.
163     */
164    private static Thread makeCollectorThread(final InputStream inputStream, final OutputStream outputStream,
165            final int maxCollect) {
166        return new Thread() {
167            public void run() {
168                try {
169                    InputStream reader = null;
170                    OutputStream writer = null;
171                    try {
172                        reader = new BufferedInputStream(inputStream);
173                        writer = new BufferedOutputStream(outputStream);
174                        copyContents(reader, writer, maxCollect);
175                    } finally {
176                        if (reader != null) {
177                            reader.close();
178                        }
179                        if (writer != null) {
180                            writer.close();
181                        }
182                    }
183                } catch (IOException e) {
184                    // This seems ugly
185                    throw new RuntimeException("Couldn't close streams for " + "process.", e);
186                }
187            }
188        };
189    }
190
191    /**
192     * Reads all contents from a stream, writing some or all to another.
193     *
194     * @param in InputStream to read from
195     * @param out OutputStream to write to
196     * @param maxCollect Maximum number of bytes to write to out
197     * @throws IOFailure If there are problems reading or writing.
198     */
199    private static void copyContents(InputStream in, OutputStream out, int maxCollect) {
200        int bytesRead;
201        byte[] buffer = new byte[Constants.IO_BUFFER_SIZE];
202        int totalBytesRead = 0;
203        try {
204            while ((bytesRead = in.read(buffer, 0, Constants.IO_BUFFER_SIZE)) != -1) {
205                if (maxCollect == -1) {
206                    out.write(buffer, 0, bytesRead);
207                } else if (totalBytesRead < maxCollect) {
208                    out.write(buffer, 0, Math.min(bytesRead, maxCollect - totalBytesRead));
209                }
210                // Close early if applicable
211                if (maxCollect != -1 && totalBytesRead < maxCollect && totalBytesRead + bytesRead > maxCollect) {
212                    out.close();
213                }
214                totalBytesRead += bytesRead;
215            }
216        } catch (IOException e) {
217            throw new IOFailure("Error reading or writing process output", e);
218        }
219    }
220
221    /**
222     * Wait for the end of a process, but only for a limited time. This method takes care of the ways waitFor can get
223     * interrupted.
224     *
225     * @param p Process to wait for
226     * @param maxWait The maximum number of milliseconds to wait for the process to exit.
227     * @return Exit value for process, or null if the process didn't exit within the expected time.
228     */
229    public static Integer waitFor(final Process p, long maxWait) {
230        ArgumentNotValid.checkNotNull(p, "Process p");
231        ArgumentNotValid.checkPositive(maxWait, "long maxWait");
232        long startTime = System.currentTimeMillis();
233        Timer timer = new Timer(true);
234        final Thread waitThread = Thread.currentThread();
235        boolean wakeupScheduled = false;
236        final AtomicBoolean doneWaiting = new AtomicBoolean(false);
237        while (System.currentTimeMillis() < startTime + maxWait) {
238            try {
239                if (!wakeupScheduled) {
240                    // First time in here, we need to start the wakup thread,
241                    // but be sure it doesn't notify us too early or too late.
242                    synchronized (waitThread) {
243                        timer.schedule(new TimerTask() {
244                            public void run() {
245                                synchronized (waitThread) {
246                                    if (!doneWaiting.get()) {
247                                        waitThread.interrupt();
248                                    }
249                                }
250                            }
251                        }, maxWait);
252                        wakeupScheduled = true;
253                    }
254                }
255
256                p.waitFor();
257                break;
258            } catch (InterruptedException e) {
259                // May happen for a number of reasons. We just check if we've
260                // timed out yet when we go through the loop again.
261            }
262        }
263        synchronized (waitThread) {
264            timer.cancel();
265            doneWaiting.set(true);
266            Thread.interrupted(); // In case the timer task interrupted.
267        }
268        try {
269            return p.exitValue();
270        } catch (IllegalThreadStateException e) {
271            log.warn("Process '{}' did not exit within {} milliseconds", p, (System.currentTimeMillis() - startTime));
272            return null;
273        }
274    }
275
276    /**
277     * Runs a system process (Unix sort) to sort a file.
278     *
279     * @param inputFile the input file.
280     * @param outputFile the output file.
281     * @return the process exit code.
282     */
283    public static int runUnixSort(File inputFile, File outputFile) {
284        return runUnixSort(inputFile, outputFile, null, false);
285    }
286
287    /**
288     * Runs a system process (Unix sort) to sort a file.
289     *
290     * @param inputFile the input file.
291     * @param outputFile the output file.
292     * @param tempDir the directory where to store temporary files (null for default system temp).
293     * @param crawllogSorting Should we sort crawllog style ("-k 4b") or not
294     * @return the process exit code.
295     */
296    public static int runUnixSort(File inputFile, File outputFile, File tempDir, boolean crawllogSorting) {
297        String[] environment = new String[] {"LANG=C"};
298        LinkedList<String> cmdAndParams = new LinkedList<String>();
299        cmdAndParams.add("sort");
300        cmdAndParams.add(inputFile.getAbsolutePath());
301        if (crawllogSorting) {
302            // -k 4b means fourth field (from 1) ignoring leading blanks
303            cmdAndParams.add("-k");
304            cmdAndParams.add("4b");
305        }
306        // -o means output to (file)
307        cmdAndParams.add("-o");
308        cmdAndParams.add(outputFile.getAbsolutePath());
309
310        if (tempDir != null) {
311            // -T configures where to store temporary files
312            cmdAndParams.add("-T");
313            cmdAndParams.add(tempDir.getAbsolutePath());
314        }
315
316        return ProcessUtils.runProcess(environment, (String[]) cmdAndParams.toArray(new String[cmdAndParams.size()]));
317    }
318
319}