1# Copyright (c) 2012 The Chromium OS Authors. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4# 5# Copyright (c) 2003-2005 by Peter Astrand <astrand@lysator.liu.se> 6# Licensed to PSF under a Contributor Agreement. 7# See http://www.python.org/2.4/license for licensing details. 8 9"""Subprocess execution 10 11This module holds a subclass of subprocess.Popen with our own required 12features, mainly that we get access to the subprocess output while it 13is running rather than just at the end. This makes it easier to show 14progress information and filter output in real time. 15""" 16 17import errno 18import os 19import pty 20import select 21import subprocess 22import sys 23import unittest 24 25 26# Import these here so the caller does not need to import subprocess also. 27PIPE = subprocess.PIPE 28STDOUT = subprocess.STDOUT 29PIPE_PTY = -3 # Pipe output through a pty 30stay_alive = True 31 32 33class Popen(subprocess.Popen): 34 """Like subprocess.Popen with ptys and incremental output 35 36 This class deals with running a child process and filtering its output on 37 both stdout and stderr while it is running. We do this so we can monitor 38 progress, and possibly relay the output to the user if requested. 39 40 The class is similar to subprocess.Popen, the equivalent is something like: 41 42 Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 43 44 But this class has many fewer features, and two enhancement: 45 46 1. Rather than getting the output data only at the end, this class sends it 47 to a provided operation as it arrives. 48 2. We use pseudo terminals so that the child will hopefully flush its output 49 to us as soon as it is produced, rather than waiting for the end of a 50 line. 51 52 Use CommunicateFilter() to handle output from the subprocess. 53 54 """ 55 56 def __init__(self, args, stdin=None, stdout=PIPE_PTY, stderr=PIPE_PTY, 57 shell=False, cwd=None, env=None, **kwargs): 58 """Cut-down constructor 59 60 Args: 61 args: Program and arguments for subprocess to execute. 62 stdin: See subprocess.Popen() 63 stdout: See subprocess.Popen(), except that we support the sentinel 64 value of cros_subprocess.PIPE_PTY. 65 stderr: See subprocess.Popen(), except that we support the sentinel 66 value of cros_subprocess.PIPE_PTY. 67 shell: See subprocess.Popen() 68 cwd: Working directory to change to for subprocess, or None if none. 69 env: Environment to use for this subprocess, or None to inherit parent. 70 kwargs: No other arguments are supported at the moment. Passing other 71 arguments will cause a ValueError to be raised. 72 """ 73 stdout_pty = None 74 stderr_pty = None 75 76 if stdout == PIPE_PTY: 77 stdout_pty = pty.openpty() 78 stdout = os.fdopen(stdout_pty[1]) 79 if stderr == PIPE_PTY: 80 stderr_pty = pty.openpty() 81 stderr = os.fdopen(stderr_pty[1]) 82 83 super(Popen, self).__init__(args, stdin=stdin, 84 stdout=stdout, stderr=stderr, shell=shell, cwd=cwd, env=env, 85 **kwargs) 86 87 # If we're on a PTY, we passed the slave half of the PTY to the subprocess. 88 # We want to use the master half on our end from now on. Setting this here 89 # does make some assumptions about the implementation of subprocess, but 90 # those assumptions are pretty minor. 91 92 # Note that if stderr is STDOUT, then self.stderr will be set to None by 93 # this constructor. 94 if stdout_pty is not None: 95 self.stdout = os.fdopen(stdout_pty[0]) 96 if stderr_pty is not None: 97 self.stderr = os.fdopen(stderr_pty[0]) 98 99 # Insist that unit tests exist for other arguments we don't support. 100 if kwargs: 101 raise ValueError("Unit tests do not test extra args - please add tests") 102 103 def ConvertData(self, data): 104 """Convert stdout/stderr data to the correct format for output 105 106 Args: 107 data: Data to convert, or None for '' 108 109 Returns: 110 Converted data, as bytes 111 """ 112 if data is None: 113 return b'' 114 return data 115 116 def CommunicateFilter(self, output): 117 """Interact with process: Read data from stdout and stderr. 118 119 This method runs until end-of-file is reached, then waits for the 120 subprocess to terminate. 121 122 The output function is sent all output from the subprocess and must be 123 defined like this: 124 125 def Output([self,] stream, data) 126 Args: 127 stream: the stream the output was received on, which will be 128 sys.stdout or sys.stderr. 129 data: a string containing the data 130 131 Returns: 132 True to terminate the process 133 134 Note: The data read is buffered in memory, so do not use this 135 method if the data size is large or unlimited. 136 137 Args: 138 output: Function to call with each fragment of output. 139 140 Returns: 141 A tuple (stdout, stderr, combined) which is the data received on 142 stdout, stderr and the combined data (interleaved stdout and stderr). 143 144 Note that the interleaved output will only be sensible if you have 145 set both stdout and stderr to PIPE or PIPE_PTY. Even then it depends on 146 the timing of the output in the subprocess. If a subprocess flips 147 between stdout and stderr quickly in succession, by the time we come to 148 read the output from each we may see several lines in each, and will read 149 all the stdout lines, then all the stderr lines. So the interleaving 150 may not be correct. In this case you might want to pass 151 stderr=cros_subprocess.STDOUT to the constructor. 152 153 This feature is still useful for subprocesses where stderr is 154 rarely used and indicates an error. 155 156 Note also that if you set stderr to STDOUT, then stderr will be empty 157 and the combined output will just be the same as stdout. 158 """ 159 160 read_set = [] 161 write_set = [] 162 stdout = None # Return 163 stderr = None # Return 164 165 if self.stdin: 166 # Flush stdio buffer. This might block, if the user has 167 # been writing to .stdin in an uncontrolled fashion. 168 self.stdin.flush() 169 if input: 170 write_set.append(self.stdin) 171 else: 172 self.stdin.close() 173 if self.stdout: 174 read_set.append(self.stdout) 175 stdout = bytearray() 176 if self.stderr and self.stderr != self.stdout: 177 read_set.append(self.stderr) 178 stderr = bytearray() 179 combined = bytearray() 180 181 stop_now = False 182 input_offset = 0 183 while read_set or write_set: 184 try: 185 rlist, wlist, _ = select.select(read_set, write_set, [], 0.2) 186 except select.error as e: 187 if e.args[0] == errno.EINTR: 188 continue 189 raise 190 191 if not stay_alive: 192 self.terminate() 193 194 if self.stdin in wlist: 195 # When select has indicated that the file is writable, 196 # we can write up to PIPE_BUF bytes without risk 197 # blocking. POSIX defines PIPE_BUF >= 512 198 chunk = input[input_offset : input_offset + 512] 199 bytes_written = os.write(self.stdin.fileno(), chunk) 200 input_offset += bytes_written 201 if input_offset >= len(input): 202 self.stdin.close() 203 write_set.remove(self.stdin) 204 205 if self.stdout in rlist: 206 data = b'' 207 # We will get an error on read if the pty is closed 208 try: 209 data = os.read(self.stdout.fileno(), 1024) 210 except OSError: 211 pass 212 if not len(data): 213 self.stdout.close() 214 read_set.remove(self.stdout) 215 else: 216 stdout += data 217 combined += data 218 if output: 219 stop_now = output(sys.stdout, data) 220 if self.stderr in rlist: 221 data = b'' 222 # We will get an error on read if the pty is closed 223 try: 224 data = os.read(self.stderr.fileno(), 1024) 225 except OSError: 226 pass 227 if not len(data): 228 self.stderr.close() 229 read_set.remove(self.stderr) 230 else: 231 stderr += data 232 combined += data 233 if output: 234 stop_now = output(sys.stderr, data) 235 if stop_now: 236 self.terminate() 237 238 # All data exchanged. Translate lists into strings. 239 stdout = self.ConvertData(stdout) 240 stderr = self.ConvertData(stderr) 241 combined = self.ConvertData(combined) 242 243 # Translate newlines, if requested. We cannot let the file 244 # object do the translation: It is based on stdio, which is 245 # impossible to combine with select (unless forcing no 246 # buffering). 247 if self.universal_newlines and hasattr(file, 'newlines'): 248 if stdout: 249 stdout = self._translate_newlines(stdout) 250 if stderr: 251 stderr = self._translate_newlines(stderr) 252 253 self.wait() 254 return (stdout, stderr, combined) 255 256 257# Just being a unittest.TestCase gives us 14 public methods. Unless we 258# disable this, we can only have 6 tests in a TestCase. That's not enough. 259# 260# pylint: disable=R0904 261 262class TestSubprocess(unittest.TestCase): 263 """Our simple unit test for this module""" 264 265 class MyOperation: 266 """Provides a operation that we can pass to Popen""" 267 def __init__(self, input_to_send=None): 268 """Constructor to set up the operation and possible input. 269 270 Args: 271 input_to_send: a text string to send when we first get input. We will 272 add \r\n to the string. 273 """ 274 self.stdout_data = '' 275 self.stderr_data = '' 276 self.combined_data = '' 277 self.stdin_pipe = None 278 self._input_to_send = input_to_send 279 if input_to_send: 280 pipe = os.pipe() 281 self.stdin_read_pipe = pipe[0] 282 self._stdin_write_pipe = os.fdopen(pipe[1], 'w') 283 284 def Output(self, stream, data): 285 """Output handler for Popen. Stores the data for later comparison""" 286 if stream == sys.stdout: 287 self.stdout_data += data 288 if stream == sys.stderr: 289 self.stderr_data += data 290 self.combined_data += data 291 292 # Output the input string if we have one. 293 if self._input_to_send: 294 self._stdin_write_pipe.write(self._input_to_send + '\r\n') 295 self._stdin_write_pipe.flush() 296 297 def _BasicCheck(self, plist, oper): 298 """Basic checks that the output looks sane.""" 299 self.assertEqual(plist[0], oper.stdout_data) 300 self.assertEqual(plist[1], oper.stderr_data) 301 self.assertEqual(plist[2], oper.combined_data) 302 303 # The total length of stdout and stderr should equal the combined length 304 self.assertEqual(len(plist[0]) + len(plist[1]), len(plist[2])) 305 306 def test_simple(self): 307 """Simple redirection: Get process list""" 308 oper = TestSubprocess.MyOperation() 309 plist = Popen(['ps']).CommunicateFilter(oper.Output) 310 self._BasicCheck(plist, oper) 311 312 def test_stderr(self): 313 """Check stdout and stderr""" 314 oper = TestSubprocess.MyOperation() 315 cmd = 'echo fred >/dev/stderr && false || echo bad' 316 plist = Popen([cmd], shell=True).CommunicateFilter(oper.Output) 317 self._BasicCheck(plist, oper) 318 self.assertEqual(plist [0], 'bad\r\n') 319 self.assertEqual(plist [1], 'fred\r\n') 320 321 def test_shell(self): 322 """Check with and without shell works""" 323 oper = TestSubprocess.MyOperation() 324 cmd = 'echo test >/dev/stderr' 325 self.assertRaises(OSError, Popen, [cmd], shell=False) 326 plist = Popen([cmd], shell=True).CommunicateFilter(oper.Output) 327 self._BasicCheck(plist, oper) 328 self.assertEqual(len(plist [0]), 0) 329 self.assertEqual(plist [1], 'test\r\n') 330 331 def test_list_args(self): 332 """Check with and without shell works using list arguments""" 333 oper = TestSubprocess.MyOperation() 334 cmd = ['echo', 'test', '>/dev/stderr'] 335 plist = Popen(cmd, shell=False).CommunicateFilter(oper.Output) 336 self._BasicCheck(plist, oper) 337 self.assertEqual(plist [0], ' '.join(cmd[1:]) + '\r\n') 338 self.assertEqual(len(plist [1]), 0) 339 340 oper = TestSubprocess.MyOperation() 341 342 # this should be interpreted as 'echo' with the other args dropped 343 cmd = ['echo', 'test', '>/dev/stderr'] 344 plist = Popen(cmd, shell=True).CommunicateFilter(oper.Output) 345 self._BasicCheck(plist, oper) 346 self.assertEqual(plist [0], '\r\n') 347 348 def test_cwd(self): 349 """Check we can change directory""" 350 for shell in (False, True): 351 oper = TestSubprocess.MyOperation() 352 plist = Popen('pwd', shell=shell, cwd='/tmp').CommunicateFilter(oper.Output) 353 self._BasicCheck(plist, oper) 354 self.assertEqual(plist [0], '/tmp\r\n') 355 356 def test_env(self): 357 """Check we can change environment""" 358 for add in (False, True): 359 oper = TestSubprocess.MyOperation() 360 env = os.environ 361 if add: 362 env ['FRED'] = 'fred' 363 cmd = 'echo $FRED' 364 plist = Popen(cmd, shell=True, env=env).CommunicateFilter(oper.Output) 365 self._BasicCheck(plist, oper) 366 self.assertEqual(plist [0], add and 'fred\r\n' or '\r\n') 367 368 def test_extra_args(self): 369 """Check we can't add extra arguments""" 370 self.assertRaises(ValueError, Popen, 'true', close_fds=False) 371 372 def test_basic_input(self): 373 """Check that incremental input works 374 375 We set up a subprocess which will prompt for name. When we see this prompt 376 we send the name as input to the process. It should then print the name 377 properly to stdout. 378 """ 379 oper = TestSubprocess.MyOperation('Flash') 380 prompt = 'What is your name?: ' 381 cmd = 'echo -n "%s"; read name; echo Hello $name' % prompt 382 plist = Popen([cmd], stdin=oper.stdin_read_pipe, 383 shell=True).CommunicateFilter(oper.Output) 384 self._BasicCheck(plist, oper) 385 self.assertEqual(len(plist [1]), 0) 386 self.assertEqual(plist [0], prompt + 'Hello Flash\r\r\n') 387 388 def test_isatty(self): 389 """Check that ptys appear as terminals to the subprocess""" 390 oper = TestSubprocess.MyOperation() 391 cmd = ('if [ -t %d ]; then echo "terminal %d" >&%d; ' 392 'else echo "not %d" >&%d; fi;') 393 both_cmds = '' 394 for fd in (1, 2): 395 both_cmds += cmd % (fd, fd, fd, fd, fd) 396 plist = Popen(both_cmds, shell=True).CommunicateFilter(oper.Output) 397 self._BasicCheck(plist, oper) 398 self.assertEqual(plist [0], 'terminal 1\r\n') 399 self.assertEqual(plist [1], 'terminal 2\r\n') 400 401 # Now try with PIPE and make sure it is not a terminal 402 oper = TestSubprocess.MyOperation() 403 plist = Popen(both_cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE, 404 shell=True).CommunicateFilter(oper.Output) 405 self._BasicCheck(plist, oper) 406 self.assertEqual(plist [0], 'not 1\n') 407 self.assertEqual(plist [1], 'not 2\n') 408 409if __name__ == '__main__': 410 unittest.main() 411