Python asyncio subprocess write stdin and read stdout/stderr continuously
I’m currently on a task with subprocess in python3 asyncio. My code is simply write to stdin and read stdout/stderr simultaneously:
import asyncio async def read_stdout(stdout): print('read_stdout') while True: buf = await stdout.read(10) if not buf: break print(f'stdout: < buf >') async def read_stderr(stderr): print('read_stderr') while True: buf = await stderr.read() if not buf: break print(f'stderr: < buf >') async def write_stdin(stdin): print('write_stdin') for i in range(100): buf = f'line: < i >\n'.encode() print(f'stdin: < buf >') stdin.write(buf) await stdin.drain() await asyncio.sleep(0.5) async def run(): proc = await asyncio.create_subprocess_exec( '/usr/bin/tee', stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) await asyncio.gather( read_stderr(proc.stderr), read_stdout(proc.stdout), write_stdin(proc.stdin)) asyncio.run(run())
Warning Use the communicate() method rather than process.stdin.write() , await process.stdout.read() or await process.stderr.read . This avoids deadlocks due to streams pausing reading or writing and blocking the child process.
Does that mean the above code will fall into deadlock in some scenarios? If so how to write stdin and read stdout / stderr continuously in python3 asyncio without deadlock? Thank you very much.
communicate waits for the subprocess to terminate. If you expect to read multiple times (e.g. read something, write a reply to stdin, read again etc) then communicate simply cannot be used. The warning deals only with the simple case of one-shot reads.
1 Answer 1
The warning was carried over from the regular subprocess module, and warns against naive code that tries to implement simple communication that appears perfectly correct, such as:
# write the request to the subprocess await proc.stdin.write(request) # read the response response = await proc.stdout.readline()
This can cause a deadlock if the subprocess starts writing the response before it has read the whole request. If the response is large enough, the subprocess will block, waiting for the parent to read some of it and make room in the pipe buffer. However, the parent cannot do so because it is still writing the response and waiting for the write to complete before starting reading. So, the child waits for the parent to read (some of) its response, and the parent waits for the child to finish accepting the request. As both are waiting for the other’s current operation to complete, it’s a deadlock.
Your code doesn’t have that issue simply because your reads and writes are executed in parallel. Since the reader never waits for the writer and vice versa, there is no opportunity for (that kind of) deadlock. If you take a look at how communicate is implemented, you will find that, barring some debug logging, it works pretty much like your code.
waylan / subprocess_pipe.md
Here’s a few things I tried to write output to a python subprocess pipe.
from subprocess import Popen, PIPE p = Popen('less', stdin=PIPE) for x in xrange(100): p.communicate('Line number %d.\n' % x)
This seemed like the most obvious solution but it fails miserably. It seems that the first call to communicate also closes the pipe and the second loop raises an exception.
from subprocess import Popen, PIPE p = Popen('less', stdin=PIPE) for x in xrange(100): p.stdin.write('Line number %d.\n' % x)
This is expressly stated to be a bad idea in the docs, but it works — sort of. I get some weird behavior. There’s no call to p.wait() (which communicate does by default) so anything after the loop runs before the subproccess ( less in this case) is closed. Adding a call to wait after the loop causes even weirder behavior.
from subprocess import Popen, PIPE out = [] p = Popen('less', stdin=PIPE) for x in xrange(100): out.append('Line number %d.' % x) p.communicate('\n'.join(out))
This works. We only have one call to communicate and that calls wait properly. Unfortunately, we have to create the entire output in memory before writing any of it out. We can do better:
from subprocess import Popen, PIPE p = Popen('less', stdin=PIPE) for x in xrange(100): p.stdin.write('Line number %d.\n' % x) p.stdin.close() p.wait()
The key it to close stdin (flush and send EOF) before calling wait . This is actually what communicate does internally minus all the stdout and stderr stuff I don’t need. If I wanted to force the buffer to remain empty, I suppose I could do p.stdin.flush() on each loop, but why? Note that there probably should be some error checking on write (like there is in the source of communicate . Perhaps something like:
import errno . try: p.stdin.write(input) except IOError as e: if e.errno != errno.EPIPE and e.errno != errno.EINVAL: raise
Note that errno.EPIPE is «Broken pipe» and errno.EINVAL is «Invalid argument».
So the final code looks like this:
from subprocess import Popen, PIPE import errno p = Popen('less', stdin=PIPE) for x in xrange(100): line = 'Line number %d.\n' % x try: p.stdin.write(line) except IOError as e: if e.errno == errno.EPIPE or e.errno == errno.EINVAL: # Stop loop on "Invalid pipe" or "Invalid argument". # No sense in continuing with broken pipe. break else: # Raise any other error. raise p.stdin.close() p.wait() print 'All done!' # This should always be printed below any output written to less.
writing large amount of data to stdin
It seems to hang at p.stdin.write() after i read a large string and write to it. I have a large corpus of files which will be written to stdin sequentially(>1k files) So what happens is that i am running a loop
#this loop is repeated for all the files for stri in lines: p=subprocess.Popen([path],stdout=subprocess.PIPE,stdin=subprocess.PIPE) p.stdin.write(stri) output = p.stdout.readline() #do some processing
It somehow hangs at file no. 400. The file is a large file with long strings. I do suspect its a blocking issue. This only happens if i iterate from 0 to 1000. However, if i were to start from file 400, the error would not happen
Do you want to avoid it blocking at all, or just worried about a deadlock when the process’s output fills up the stdout pipe before you finish writing to the stdin pipe? I think p.communicate will fix the deadlock, but it still will block until all the input has been sent (it just uses threads to buffer in memory whatever is coming back at the same time).
What does the program you’re running print out its stdout? You seem to be reading a single line back for each line you write, but could the program be printing more than that? Similarly, does it read your full input line before starting to write its response line, or does it work on shorter bits of data (e.g. byte by byte)?
@aceminer: I understand, my question was about the program on the other end. Does it sometimes return two or more lines for a single input line, or is it guaranteed to only ever return one for one? Similarly, does it operate on the input as a byte stream, or does it buffer the whole line you’re sending before starting to respond? If the former of either of these, you’ll probably need to use threading or some similar situation (maybe p.communicate with a timeout if you’re using Python 3.3+) on your end to make sure you’re reading the response at the same time you’re writing.
2 Answers 2
To avoid the deadlock in a portable way, write to the child in a separate thread:
#!/usr/bin/env python from subprocess import Popen, PIPE from threading import Thread def pump_input(pipe, lines): with pipe: for line in lines: pipe.write(line) p = Popen(path, stdin=PIPE, stdout=PIPE, bufsize=1) Thread(target=pump_input, args=[p.stdin, lines]).start() with p.stdout: for line in iter(p.stdout.readline, b''): # read output print line, p.wait()
You may have to use Popen.communicate() .
If you write a large amount of data to the stdin and during this the child process generates output to stdout then it may become a problem that the stdout buffer of the child becomes full before processing all of your stdin data. The child process blocks on a write to stdout (because you are not reading it) and you are blocked on writing the stdin.
Popen.communicate() can be used to write stdin and read stdout/stderr at the same time to avoid the previous problem.
Note: Popen.communicate() is suitable only when the input and output data can fit to your memory (they are not too large).
Update: If you decide to hack around with threads here is an example parent and child process implementation that you can tailor to suit your needs:
#!/usr/bin/env python2 import os import sys import subprocess import threading import Queue class MyStreamingSubprocess(object): def __init__(self, *argv): self.process = subprocess.Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE) self.stdin_queue = Queue.Queue() self.stdout_queue = Queue.Queue() self.stdin_thread = threading.Thread(target=self._stdin_writer_thread) self.stdout_thread = threading.Thread(target=self._stdout_reader_thread) self.stdin_thread.start() self.stdout_thread.start() def process_item(self, item): self.stdin_queue.put(item) return self.stdout_queue.get() def terminate(self): self.stdin_queue.put(None) self.process.terminate() self.stdin_thread.join() self.stdout_thread.join() return self.process.wait() def _stdin_writer_thread(self): while 1: item = self.stdin_queue.get() if item is None: # signaling the child process that the end of the # input has been reached: some console progs handle # the case when reading from stdin returns empty string self.process.stdin.close() break try: self.process.stdin.write(item) except IOError: # making sure that the current self.process_item() # call doesn't deadlock self.stdout_queue.put(None) break def _stdout_reader_thread(self): while 1: try: output = self.process.stdout.readline() except IOError: output = None self.stdout_queue.put(output) # output is empty string if the process has # finished or None if an IOError occurred if not output: break if __name__ == '__main__': child_script_path = os.path.join(os.path.dirname(__file__), 'child.py') process = MyStreamingSubprocess(sys.executable, '-u', child_script_path) try: while 1: item = raw_input('Enter an item to process (leave empty and press ENTER to exit): ') if not item: break result = process.process_item(item + '\n') if result: print('Result: ' + result) else: print('Error processing item! Exiting.') break finally: print('Terminating child process. ') process.terminate() print('Finished.')
#!/usr/bin/env python2 import sys while 1: item = sys.stdin.readline() sys.stdout.write('Processed: ' + item)
Note: IOError is processed on the reader/writer threads to handle the cases where the child process exits/crashes/killed.