001/* 
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 *  contributor license agreements.  See the NOTICE file distributed with
004 *  this work for additional information regarding copyright ownership.
005 *  The ASF licenses this file to You under the Apache License, Version 2.0
006 *  (the "License"); you may not use this file except in compliance with
007 *  the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 *  Unless required by applicable law or agreed to in writing, software
012 *  distributed under the License is distributed on an "AS IS" BASIS,
013 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 *  See the License for the specific language governing permissions and
015 *  limitations under the License.
016 *
017 */
018
019package org.apache.commons.exec;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.OutputStream;
024import java.io.PipedOutputStream;
025
026import org.apache.commons.exec.util.DebugUtils;
027
028/**
029 * Copies standard output and error of sub-processes to standard output and error
030 * of the parent process. If output or error stream are set to null, any feedback
031 * from that stream will be lost.
032 *
033 * @version $Id: PumpStreamHandler.java 1557263 2014-01-10 21:18:09Z ggregory $
034 */
035public class PumpStreamHandler implements ExecuteStreamHandler {
036
037    private static final long STOP_TIMEOUT_ADDITION = 2000L;
038
039    private Thread outputThread;
040
041    private Thread errorThread;
042
043    private Thread inputThread;
044
045    private final OutputStream out;
046
047    private final OutputStream err;
048
049    private final InputStream input;
050
051    private InputStreamPumper inputStreamPumper;
052
053    /** the timeout in ms the implementation waits when stopping the pumper threads */
054    private long stopTimeout;
055
056    /** the last exception being caught */
057    private IOException caught = null;
058
059    /**
060     * Construct a new <CODE>PumpStreamHandler</CODE>.
061     */
062    public PumpStreamHandler() {
063        this(System.out, System.err);
064    }
065
066    /**
067     * Construct a new <CODE>PumpStreamHandler</CODE>.
068     *
069     * @param outAndErr the output/error <CODE>OutputStream</CODE>.
070     */
071    public PumpStreamHandler(final OutputStream outAndErr) {
072        this(outAndErr, outAndErr);
073    }
074
075    /**
076     * Construct a new <CODE>PumpStreamHandler</CODE>.
077     *
078     * @param out the output <CODE>OutputStream</CODE>.
079     * @param err the error <CODE>OutputStream</CODE>.
080     */
081    public PumpStreamHandler(final OutputStream out, final OutputStream err) {
082        this(out, err, null);
083    }
084
085    /**
086     * Construct a new <CODE>PumpStreamHandler</CODE>.
087     *
088     * @param out   the output <CODE>OutputStream</CODE>.
089     * @param err   the error <CODE>OutputStream</CODE>.
090     * @param input the input <CODE>InputStream</CODE>.
091     */
092    public PumpStreamHandler(final OutputStream out, final OutputStream err, final InputStream input) {
093        this.out = out;
094        this.err = err;
095        this.input = input;
096    }
097
098    /**
099     * Set maximum time to wait until output streams are exchausted
100     * when {@link #stop()} was called.
101     *
102     * @param timeout timeout in milliseconds or zero to wait forever (default)
103     */
104    public void setStopTimeout(final long timeout) {
105        this.stopTimeout = timeout;
106    }
107
108    /**
109     * Set the <CODE>InputStream</CODE> from which to read the standard output
110     * of the process.
111     *
112     * @param is the <CODE>InputStream</CODE>.
113     */
114    public void setProcessOutputStream(final InputStream is) {
115        if (out != null) {
116            createProcessOutputPump(is, out);
117        }
118    }
119
120    /**
121     * Set the <CODE>InputStream</CODE> from which to read the standard error
122     * of the process.
123     *
124     * @param is the <CODE>InputStream</CODE>.
125     */
126    public void setProcessErrorStream(final InputStream is) {
127        if (err != null) {
128            createProcessErrorPump(is, err);
129        }
130    }
131
132    /**
133     * Set the <CODE>OutputStream</CODE> by means of which input can be sent
134     * to the process.
135     *
136     * @param os the <CODE>OutputStream</CODE>.
137     */
138    public void setProcessInputStream(final OutputStream os) {
139        if (input != null) {
140            if (input == System.in) {
141                inputThread = createSystemInPump(input, os);
142            } else {
143                inputThread = createPump(input, os, true);
144            }
145        } else {
146            try {
147                os.close();
148            } catch (final IOException e) {
149                final String msg = "Got exception while closing output stream";
150                DebugUtils.handleException(msg, e);
151            }
152        }
153    }
154
155    /**
156     * Start the <CODE>Thread</CODE>s.
157     */
158    public void start() {
159        if (outputThread != null) {
160            outputThread.start();
161        }
162        if (errorThread != null) {
163            errorThread.start();
164        }
165        if (inputThread != null) {
166            inputThread.start();
167        }
168    }
169
170    /**
171     * Stop pumping the streams. When a timeout is specified it it is not guaranteed that the
172     * pumper threads are cleanly terminated.
173     */
174    public void stop() throws IOException {
175
176        if (inputStreamPumper != null) {
177            inputStreamPumper.stopProcessing();
178        }
179
180        stopThread(outputThread, stopTimeout);
181        stopThread(errorThread, stopTimeout);
182        stopThread(inputThread, stopTimeout);
183
184        if (err != null && err != out) {
185            try {
186                err.flush();
187            } catch (final IOException e) {
188                final String msg = "Got exception while flushing the error stream : " + e.getMessage();
189                DebugUtils.handleException(msg, e);
190            }
191        }
192
193        if (out != null) {
194            try {
195                out.flush();
196            } catch (final IOException e) {
197                final String msg = "Got exception while flushing the output stream";
198                DebugUtils.handleException(msg, e);
199            }
200        }
201
202        if (caught != null) {
203            throw caught;
204        }
205    }
206
207    /**
208     * Get the error stream.
209     *
210     * @return <CODE>OutputStream</CODE>.
211     */
212    protected OutputStream getErr() {
213        return err;
214    }
215
216    /**
217     * Get the output stream.
218     *
219     * @return <CODE>OutputStream</CODE>.
220     */
221    protected OutputStream getOut() {
222        return out;
223    }
224
225    /**
226     * Create the pump to handle process output.
227     *
228     * @param is the <CODE>InputStream</CODE>.
229     * @param os the <CODE>OutputStream</CODE>.
230     */
231    protected void createProcessOutputPump(final InputStream is, final OutputStream os) {
232        outputThread = createPump(is, os);
233    }
234
235    /**
236     * Create the pump to handle error output.
237     *
238     * @param is the <CODE>InputStream</CODE>.
239     * @param os the <CODE>OutputStream</CODE>.
240     */
241    protected void createProcessErrorPump(final InputStream is, final OutputStream os) {
242        errorThread = createPump(is, os);
243    }
244
245    /**
246     * Creates a stream pumper to copy the given input stream to the given
247     * output stream. When the 'os' is an PipedOutputStream we are closing
248     * 'os' afterwards to avoid an IOException ("Write end dead").
249     *
250     * @param is the input stream to copy from
251     * @param os the output stream to copy into
252     * @return the stream pumper thread
253     */
254    protected Thread createPump(final InputStream is, final OutputStream os) {
255        final boolean closeWhenExhausted = os instanceof PipedOutputStream ? true : false;
256        return createPump(is, os, closeWhenExhausted);
257    }
258
259    /**
260     * Creates a stream pumper to copy the given input stream to the given
261     * output stream.
262     *
263     * @param is                 the input stream to copy from
264     * @param os                 the output stream to copy into
265     * @param closeWhenExhausted close the output stream when the input stream is exhausted
266     * @return the stream pumper thread
267     */
268    protected Thread createPump(final InputStream is, final OutputStream os, final boolean closeWhenExhausted) {
269        final Thread result = new Thread(new StreamPumper(is, os, closeWhenExhausted), "Exec Stream Pumper");
270        result.setDaemon(true);
271        return result;
272    }
273
274    /**
275     * Stopping a pumper thread. The implementation actually waits
276     * longer than specified in 'timeout' to detect if the timeout
277     * was indeed exceeded. If the timeout was exceeded an IOException
278     * is created to be thrown to the caller.
279     *
280     * @param thread  the thread to be stopped
281     * @param timeout the time in ms to wait to join
282     */
283    protected void stopThread(final Thread thread, final long timeout) {
284
285        if (thread != null) {
286            try {
287                if (timeout == 0) {
288                    thread.join();
289                } else {
290                    final long timeToWait = timeout + STOP_TIMEOUT_ADDITION;
291                    final long startTime = System.currentTimeMillis();
292                    thread.join(timeToWait);
293                    if (!(System.currentTimeMillis() < startTime + timeToWait)) {
294                        final String msg = "The stop timeout of " + timeout + " ms was exceeded";
295                        caught = new ExecuteException(msg, Executor.INVALID_EXITVALUE);
296                    }
297                }
298            } catch (final InterruptedException e) {
299                thread.interrupt();
300            }
301        }
302    }
303
304    /**
305     * Creates a stream pumper to copy the given input stream to the given
306     * output stream.
307     *
308     * @param is the System.in input stream to copy from
309     * @param os the output stream to copy into
310     * @return the stream pumper thread
311     */
312    private Thread createSystemInPump(final InputStream is, final OutputStream os) {
313        inputStreamPumper = new InputStreamPumper(is, os);
314        final Thread result = new Thread(inputStreamPumper, "Exec Input Stream Pumper");
315        result.setDaemon(true);
316        return result;
317    }
318}