1# Copyright (c) 2012 The Chromium OS Authors.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4#
5# Copyright (c) 2003-2005 by Peter Astrand <astrand@lysator.liu.se>
6# Licensed to PSF under a Contributor Agreement.
7# See http://www.python.org/2.4/license for licensing details.
8
9"""Subprocess execution
10
11This module holds a subclass of subprocess.Popen with our own required
12features, mainly that we get access to the subprocess output while it
13is running rather than just at the end. This makes it easier to show
14progress information and filter output in real time.
15"""
16
17import errno
18import os
19import pty
20import select
21import subprocess
22import sys
23import unittest
24
25
26# Import these here so the caller does not need to import subprocess also.
27PIPE = subprocess.PIPE
28STDOUT = subprocess.STDOUT
29PIPE_PTY = -3     # Pipe output through a pty
30stay_alive = True
31
32
33class Popen(subprocess.Popen):
34    """Like subprocess.Popen with ptys and incremental output
35
36    This class deals with running a child process and filtering its output on
37    both stdout and stderr while it is running. We do this so we can monitor
38    progress, and possibly relay the output to the user if requested.
39
40    The class is similar to subprocess.Popen, the equivalent is something like:
41
42        Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
43
44    But this class has many fewer features, and two enhancement:
45
46    1. Rather than getting the output data only at the end, this class sends it
47         to a provided operation as it arrives.
48    2. We use pseudo terminals so that the child will hopefully flush its output
49         to us as soon as it is produced, rather than waiting for the end of a
50         line.
51
52    Use CommunicateFilter() to handle output from the subprocess.
53
54    """
55
56    def __init__(self, args, stdin=None, stdout=PIPE_PTY, stderr=PIPE_PTY,
57                 shell=False, cwd=None, env=None, **kwargs):
58        """Cut-down constructor
59
60        Args:
61            args: Program and arguments for subprocess to execute.
62            stdin: See subprocess.Popen()
63            stdout: See subprocess.Popen(), except that we support the sentinel
64                    value of cros_subprocess.PIPE_PTY.
65            stderr: See subprocess.Popen(), except that we support the sentinel
66                    value of cros_subprocess.PIPE_PTY.
67            shell: See subprocess.Popen()
68            cwd: Working directory to change to for subprocess, or None if none.
69            env: Environment to use for this subprocess, or None to inherit parent.
70            kwargs: No other arguments are supported at the moment.    Passing other
71                    arguments will cause a ValueError to be raised.
72        """
73        stdout_pty = None
74        stderr_pty = None
75
76        if stdout == PIPE_PTY:
77            stdout_pty = pty.openpty()
78            stdout = os.fdopen(stdout_pty[1])
79        if stderr == PIPE_PTY:
80            stderr_pty = pty.openpty()
81            stderr = os.fdopen(stderr_pty[1])
82
83        super(Popen, self).__init__(args, stdin=stdin,
84                stdout=stdout, stderr=stderr, shell=shell, cwd=cwd, env=env,
85                **kwargs)
86
87        # If we're on a PTY, we passed the slave half of the PTY to the subprocess.
88        # We want to use the master half on our end from now on.    Setting this here
89        # does make some assumptions about the implementation of subprocess, but
90        # those assumptions are pretty minor.
91
92        # Note that if stderr is STDOUT, then self.stderr will be set to None by
93        # this constructor.
94        if stdout_pty is not None:
95            self.stdout = os.fdopen(stdout_pty[0])
96        if stderr_pty is not None:
97            self.stderr = os.fdopen(stderr_pty[0])
98
99        # Insist that unit tests exist for other arguments we don't support.
100        if kwargs:
101            raise ValueError("Unit tests do not test extra args - please add tests")
102
103    def ConvertData(self, data):
104        """Convert stdout/stderr data to the correct format for output
105
106        Args:
107            data: Data to convert, or None for ''
108
109        Returns:
110            Converted data, as bytes
111        """
112        if data is None:
113            return b''
114        return data
115
116    def CommunicateFilter(self, output):
117        """Interact with process: Read data from stdout and stderr.
118
119        This method runs until end-of-file is reached, then waits for the
120        subprocess to terminate.
121
122        The output function is sent all output from the subprocess and must be
123        defined like this:
124
125            def Output([self,] stream, data)
126            Args:
127                stream: the stream the output was received on, which will be
128                        sys.stdout or sys.stderr.
129                data: a string containing the data
130
131            Returns:
132                True to terminate the process
133
134        Note: The data read is buffered in memory, so do not use this
135        method if the data size is large or unlimited.
136
137        Args:
138            output: Function to call with each fragment of output.
139
140        Returns:
141            A tuple (stdout, stderr, combined) which is the data received on
142            stdout, stderr and the combined data (interleaved stdout and stderr).
143
144            Note that the interleaved output will only be sensible if you have
145            set both stdout and stderr to PIPE or PIPE_PTY. Even then it depends on
146            the timing of the output in the subprocess. If a subprocess flips
147            between stdout and stderr quickly in succession, by the time we come to
148            read the output from each we may see several lines in each, and will read
149            all the stdout lines, then all the stderr lines. So the interleaving
150            may not be correct. In this case you might want to pass
151            stderr=cros_subprocess.STDOUT to the constructor.
152
153            This feature is still useful for subprocesses where stderr is
154            rarely used and indicates an error.
155
156            Note also that if you set stderr to STDOUT, then stderr will be empty
157            and the combined output will just be the same as stdout.
158        """
159
160        read_set = []
161        write_set = []
162        stdout = None # Return
163        stderr = None # Return
164
165        if self.stdin:
166            # Flush stdio buffer.    This might block, if the user has
167            # been writing to .stdin in an uncontrolled fashion.
168            self.stdin.flush()
169            if input:
170                write_set.append(self.stdin)
171            else:
172                self.stdin.close()
173        if self.stdout:
174            read_set.append(self.stdout)
175            stdout = bytearray()
176        if self.stderr and self.stderr != self.stdout:
177            read_set.append(self.stderr)
178            stderr = bytearray()
179        combined = bytearray()
180
181        stop_now = False
182        input_offset = 0
183        while read_set or write_set:
184            try:
185                rlist, wlist, _ = select.select(read_set, write_set, [], 0.2)
186            except select.error as e:
187                if e.args[0] == errno.EINTR:
188                    continue
189                raise
190
191            if not stay_alive:
192                    self.terminate()
193
194            if self.stdin in wlist:
195                # When select has indicated that the file is writable,
196                # we can write up to PIPE_BUF bytes without risk
197                # blocking.    POSIX defines PIPE_BUF >= 512
198                chunk = input[input_offset : input_offset + 512]
199                bytes_written = os.write(self.stdin.fileno(), chunk)
200                input_offset += bytes_written
201                if input_offset >= len(input):
202                    self.stdin.close()
203                    write_set.remove(self.stdin)
204
205            if self.stdout in rlist:
206                data = b''
207                # We will get an error on read if the pty is closed
208                try:
209                    data = os.read(self.stdout.fileno(), 1024)
210                except OSError:
211                    pass
212                if not len(data):
213                    self.stdout.close()
214                    read_set.remove(self.stdout)
215                else:
216                    stdout += data
217                    combined += data
218                    if output:
219                        stop_now = output(sys.stdout, data)
220            if self.stderr in rlist:
221                data = b''
222                # We will get an error on read if the pty is closed
223                try:
224                    data = os.read(self.stderr.fileno(), 1024)
225                except OSError:
226                    pass
227                if not len(data):
228                    self.stderr.close()
229                    read_set.remove(self.stderr)
230                else:
231                    stderr += data
232                    combined += data
233                    if output:
234                        stop_now = output(sys.stderr, data)
235            if stop_now:
236                self.terminate()
237
238        # All data exchanged.    Translate lists into strings.
239        stdout = self.ConvertData(stdout)
240        stderr = self.ConvertData(stderr)
241        combined = self.ConvertData(combined)
242
243        # Translate newlines, if requested.    We cannot let the file
244        # object do the translation: It is based on stdio, which is
245        # impossible to combine with select (unless forcing no
246        # buffering).
247        if self.universal_newlines and hasattr(file, 'newlines'):
248            if stdout:
249                stdout = self._translate_newlines(stdout)
250            if stderr:
251                stderr = self._translate_newlines(stderr)
252
253        self.wait()
254        return (stdout, stderr, combined)
255
256
257# Just being a unittest.TestCase gives us 14 public methods.    Unless we
258# disable this, we can only have 6 tests in a TestCase.    That's not enough.
259#
260# pylint: disable=R0904
261
262class TestSubprocess(unittest.TestCase):
263    """Our simple unit test for this module"""
264
265    class MyOperation:
266        """Provides a operation that we can pass to Popen"""
267        def __init__(self, input_to_send=None):
268            """Constructor to set up the operation and possible input.
269
270            Args:
271                input_to_send: a text string to send when we first get input. We will
272                    add \r\n to the string.
273            """
274            self.stdout_data = ''
275            self.stderr_data = ''
276            self.combined_data = ''
277            self.stdin_pipe = None
278            self._input_to_send = input_to_send
279            if input_to_send:
280                pipe = os.pipe()
281                self.stdin_read_pipe = pipe[0]
282                self._stdin_write_pipe = os.fdopen(pipe[1], 'w')
283
284        def Output(self, stream, data):
285            """Output handler for Popen. Stores the data for later comparison"""
286            if stream == sys.stdout:
287                self.stdout_data += data
288            if stream == sys.stderr:
289                self.stderr_data += data
290            self.combined_data += data
291
292            # Output the input string if we have one.
293            if self._input_to_send:
294                self._stdin_write_pipe.write(self._input_to_send + '\r\n')
295                self._stdin_write_pipe.flush()
296
297    def _BasicCheck(self, plist, oper):
298        """Basic checks that the output looks sane."""
299        self.assertEqual(plist[0], oper.stdout_data)
300        self.assertEqual(plist[1], oper.stderr_data)
301        self.assertEqual(plist[2], oper.combined_data)
302
303        # The total length of stdout and stderr should equal the combined length
304        self.assertEqual(len(plist[0]) + len(plist[1]), len(plist[2]))
305
306    def test_simple(self):
307        """Simple redirection: Get process list"""
308        oper = TestSubprocess.MyOperation()
309        plist = Popen(['ps']).CommunicateFilter(oper.Output)
310        self._BasicCheck(plist, oper)
311
312    def test_stderr(self):
313        """Check stdout and stderr"""
314        oper = TestSubprocess.MyOperation()
315        cmd = 'echo fred >/dev/stderr && false || echo bad'
316        plist = Popen([cmd], shell=True).CommunicateFilter(oper.Output)
317        self._BasicCheck(plist, oper)
318        self.assertEqual(plist [0], 'bad\r\n')
319        self.assertEqual(plist [1], 'fred\r\n')
320
321    def test_shell(self):
322        """Check with and without shell works"""
323        oper = TestSubprocess.MyOperation()
324        cmd = 'echo test >/dev/stderr'
325        self.assertRaises(OSError, Popen, [cmd], shell=False)
326        plist = Popen([cmd], shell=True).CommunicateFilter(oper.Output)
327        self._BasicCheck(plist, oper)
328        self.assertEqual(len(plist [0]), 0)
329        self.assertEqual(plist [1], 'test\r\n')
330
331    def test_list_args(self):
332        """Check with and without shell works using list arguments"""
333        oper = TestSubprocess.MyOperation()
334        cmd = ['echo', 'test', '>/dev/stderr']
335        plist = Popen(cmd, shell=False).CommunicateFilter(oper.Output)
336        self._BasicCheck(plist, oper)
337        self.assertEqual(plist [0], ' '.join(cmd[1:]) + '\r\n')
338        self.assertEqual(len(plist [1]), 0)
339
340        oper = TestSubprocess.MyOperation()
341
342        # this should be interpreted as 'echo' with the other args dropped
343        cmd = ['echo', 'test', '>/dev/stderr']
344        plist = Popen(cmd, shell=True).CommunicateFilter(oper.Output)
345        self._BasicCheck(plist, oper)
346        self.assertEqual(plist [0], '\r\n')
347
348    def test_cwd(self):
349        """Check we can change directory"""
350        for shell in (False, True):
351            oper = TestSubprocess.MyOperation()
352            plist = Popen('pwd', shell=shell, cwd='/tmp').CommunicateFilter(oper.Output)
353            self._BasicCheck(plist, oper)
354            self.assertEqual(plist [0], '/tmp\r\n')
355
356    def test_env(self):
357        """Check we can change environment"""
358        for add in (False, True):
359            oper = TestSubprocess.MyOperation()
360            env = os.environ
361            if add:
362                env ['FRED'] = 'fred'
363            cmd = 'echo $FRED'
364            plist = Popen(cmd, shell=True, env=env).CommunicateFilter(oper.Output)
365            self._BasicCheck(plist, oper)
366            self.assertEqual(plist [0], add and 'fred\r\n' or '\r\n')
367
368    def test_extra_args(self):
369        """Check we can't add extra arguments"""
370        self.assertRaises(ValueError, Popen, 'true', close_fds=False)
371
372    def test_basic_input(self):
373        """Check that incremental input works
374
375        We set up a subprocess which will prompt for name. When we see this prompt
376        we send the name as input to the process. It should then print the name
377        properly to stdout.
378        """
379        oper = TestSubprocess.MyOperation('Flash')
380        prompt = 'What is your name?: '
381        cmd = 'echo -n "%s"; read name; echo Hello $name' % prompt
382        plist = Popen([cmd], stdin=oper.stdin_read_pipe,
383                shell=True).CommunicateFilter(oper.Output)
384        self._BasicCheck(plist, oper)
385        self.assertEqual(len(plist [1]), 0)
386        self.assertEqual(plist [0], prompt + 'Hello Flash\r\r\n')
387
388    def test_isatty(self):
389        """Check that ptys appear as terminals to the subprocess"""
390        oper = TestSubprocess.MyOperation()
391        cmd = ('if [ -t %d ]; then echo "terminal %d" >&%d; '
392                'else echo "not %d" >&%d; fi;')
393        both_cmds = ''
394        for fd in (1, 2):
395            both_cmds += cmd % (fd, fd, fd, fd, fd)
396        plist = Popen(both_cmds, shell=True).CommunicateFilter(oper.Output)
397        self._BasicCheck(plist, oper)
398        self.assertEqual(plist [0], 'terminal 1\r\n')
399        self.assertEqual(plist [1], 'terminal 2\r\n')
400
401        # Now try with PIPE and make sure it is not a terminal
402        oper = TestSubprocess.MyOperation()
403        plist = Popen(both_cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
404                shell=True).CommunicateFilter(oper.Output)
405        self._BasicCheck(plist, oper)
406        self.assertEqual(plist [0], 'not 1\n')
407        self.assertEqual(plist [1], 'not 2\n')
408
409if __name__ == '__main__':
410    unittest.main()
411