Monday, January 3, 2011

How to implement a non-blocking two-way pipe in Python

I was surprised by the lack of a way to read from a stream without blocking in Python, and this is the main reason why I'm writing this post. Some years ago I had the urge to open a two-way channel of communication between the program I was writing and an external command-line program (actually a GUI). I had solved this problem in mainly two different ways in C and C++ on Unix, by using pseudo-terminals in one case, and duplicated file descriptors in the other. Here we are going to create an object which inherits from a preexisting class which is provided by one of the modules for Python.

But let us just start by looking at the reason why we can't be sure that a read operation on an open stream will not hang indefinitely in our Python code. Here's our scenario: we want to code a program that interacts with an external command-line driven application, by sending and receiving data from it. Pretty simple, uh? The module subprocess defines one class called Popen that basically created a two-way pipe between our parent process and a new child process, forked from the parent one and used to spawn the external application. This is just the standard fork/exec common practice and as it turns out it is exactly what we need to accomplish our task. But here comes the problem, as soon as we look at the methods of the class Popen: just the subprocess.Popen.communicate() method allows us to send data to the external application, read its response and then terminate it. But most of the time this is not what we want to do. We'd like to continuously send and receive data to and from the external application, so we'd like the connection to be kept open instead of being closed after the very first input ignition. Actually things are not this dramatic and we are indeed not to a dead end. Every instance of the class subprocess.Popen have attributes too, and among them we notice subprocess.Popen.stdin, .stdout and .stderr. As claimed by the Python documentation, these three attributes are actually standard Python file objects, and this means that we can use their read() and write() methods to respectively read from and write to the external application in the child process. We can have access to these file objects only if we have passed subprocess.PIPE to the corresponding arguments of the constructor of subprocess.Popen, but I won't give much more details about this here because we will find everything later in the example code below. Here we shall convince ourself that if the child process has no available data to be read in the stdout stream, then calling its read() method will cause our program to hang indefinitely, waiting for some data to show up at the reading end of the pipe. Here is the code to prove what we claimed. I'm assuming that you are running a Unix system and that you have the utility cat installed on your system. Now we open the python interpreter in a shell and we type in the following few lines of code
Python 2.6.6 (r266:84292, Sep 15 2010, 15:52:39)
[GCC 4.4.5] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import subprocess as subp
>>> p = subp.Popen("cat",stdin=subp.PIPE,stdout=subp.PIPE)
>>> p.stdin.write("Hello")
>>> p.stdout.read(5)
'Hello'
>>> p.stdout.read(1)

^CTraceback (most recent call last):
  File "<stdin>", line 1, in <module>
KeyboardInterrupt
>>>
We open a two-way pipe with the external application cat and we ask the constructor of subprocess.Popen to attach the stdin and stdout end to our parent process. We can thus write on and read from them, and a first step we write a string on the stream p.stdin. cat will just echo out our string and send it back to us through its stdout. In this case we are sure that there is data to be read from the stream p.stdout, as shown by the string returned by p.stdout.read(5). If we now try to read even only 1 more byte from the stream, the interpreter will hang indefinitely, waiting for something to read from p.stdout, and the interpreter freezes, refusing to accept additional code. All that we can do at this point is send the SIGINT signal with the key combo Ctrl+C. I suggest you to revisit this example, substituting the line p.stdout.read(5) with p.stdout.read(n) where n is a integer greater than 5.

So far we became aware of the potential pitfalls that we might experience using the subprocess.Popen class. Before working out the solution for the aforementioned issue, I kindly suggest you to check this web page out. As you can read from it, what we have encountered with the previous example is something that is well known among the team maintaining Python. So the solution we are going to work out might be just a temporary remedy. All we need is one low-level system call, namely poll(), provided by the module select by means of the class select.poll. The poll Unix system call waits until an event occurs at some set of file descriptors for a certain amount of time. We can use this feature to check if the stream associated to the strout end of the pipe has data ready to be read before actually reading it. Only if data is available we the proceed by reading from the stream, otherwise we skip this step, avoiding the locking of our process. Here follows a very simple example of a class, Pipe that inherits from subprocess.Popen and extends it with a few new methods, the most interesting of whom are certainly read(), readlines() and write()
import select
import subprocess as subp

class Pipe(subp.Popen):
    def __init__(self, exe, args = None, timeout = 0):
        self.timeout = timeout
        argv = [exe]
        if args != None:
            argv = argv + args
        subp.Popen.__init__(self, argv, stdin = subp.PIPE, stdout = subp.PIPE, stderr = subp.STDOUT)

    def close(self):
        self.terminate()
        self.wait()

    def write(self, data):
        poll = select.poll()
        poll.register(self.stdin.fileno(), select.POLLOUT)
        fd = poll.poll(self.timeout)
        if len(fd):
            f = fd[0]
            if f[1] > 0:
                self.stdin.write(data)

    def read(self, n = 1):
        poll = select.poll()
        poll.register(self.stdout.fileno(), select.POLLIN | select.POLLPRI)
        fd = poll.poll(self.timeout)
        if len(fd):
            f = fd[0]
            if f[1] > 0:
                return self.stdout.read(n)

    def readlines(self, n = 1):
        c = self.read()
        string = ""
        while c != None:
            string = string + str(c)
            c = self.read()
        return string

    def set_timeout(self, timeout):
        self.timeout = timeout
We shall have a look at read() because the other two methods are then straightforward to understand. Here we single it out from the rest of the code
def read(self, n = 1):
        poll = select.poll()
        poll.register(self.stdout.fileno(), select.POLLIN or select.POLLPRI)
        fd = poll.poll(self.timeout)
        if len(fd):
            f = fd[0]
            if f[1] > 0:
                return self.stdout.read(n)
The first line is just the construction of a new instance of the object select.poll(). Its method select.poll.register() registers a file descriptor with the specified flags, i.e. instructs the method select.poll.poll() (see below) to watch the stream associated with the file descriptor for the occurrence of the events indicated by the flags. Then the method select.poll.poll() starts watching the registered file descriptors for an amount of time specified by its argument. If all the file descriptors are ready, then the method returns, even if the whole timeout interval hasn't elapsed, and the return value is a list containing 2-tuples (fd, event) for each file descriptor that has earlier been registered. Otherwise the method stops the execution until the timeout is reached, and the returns a (possibly empty) list of all the file descriptors that have been found ready for I/O operations. This is the most general behaviour of the method select.poll.poll(). In the case that we are now analyzing we deal with only one file descriptor, namely the one associated with the stream self.stdout. As you can see from the code, the way to obtain the file descriptor associated to a file object is to call its method fileno(). With the first if we check if the method poll() has returned any file descriptor (it would be our file descriptor since we have registered only one of them). The second if checks that a valid event has occurred on the file descriptor, just to be even more sure that everything's going to be fine, and this being the case we finally perform our reading operation.
The method pipe.Pipe.read(), when called with no arguments, reads at most 1 byte. The reason for this default behavior must be clear to you at this point. With the very first example above in mind, we shall rest to think on the fact that the method poll() tells us whether there is data available on the stream, but not how much data is ready to be read. We can be sure that at least a byte is available for us, but by no means we can be sure that more than a byte is available on the stream. This implies that reading just a byte is the safest thing we can do. This gives a sense to the existence of the method pipe.Pipe.readlines(). It reads from the stdout end, byte-by-byte, until no more data is available on the stream. Then the method returns a string with all the bytes read.
As a concluding remark we shall explain why the method pipe.Pipe.write() looks so similar to .read(). In most of the cases the stdin end of the pipe will always be ready for I/O operations, and we really don't need to care too much about this issue. But it can happen (as a fact of life) that the stream associated stdin is temporary unable to receive data from our program. This again would cause the parent process to hang until the data can be written successfully on the stream. A way to avoid this issue is again to make use of the poll() system call, and this explains why pipe.Pipe.write() and pipe.Pipe.read() look so similar to each other.
Before definitely parting from our journey into the realm of pipes, we shall write down a simple application that implements the class pipe.Pipe. Here is a very basic example, using again cat as scapegoat.
#!/usr/bin/env python

import pipe
import sys

if __name__ == "__main__":
    # Execute cat
    p = pipe.Pipe("cat", timeout = 100)

    # Try to read something. At this stage no data should be ready to be read
    print 'Reading %s' % p.readlines()
    # If the execution did not hang, the following line is executed
    p.write("Hello World!")
    # Now some data should be available
    print 'Reading %s' % p.readlines()
    p.close()

2 comments:

  1. This was a very helpful example and solved the coding problem I was faced with. However there is one mistake in the coding: The boolean "or" for select.POLLIN with select.POLLPRI should be a bitwise or ("|") since POLLIN and POLLPRI are bit values.

    >>> import select
    >>> select.POLLIN
    1
    >>> select.POLLPRI
    2
    >>> select.POLLIN or select.POLLPRI
    1
    >>> select.POLLIN | select.POLLPRI
    3

    The consequence of using "or" instead of "|" is that the POLLPRI is discarded, which is mostly if not completely harmless. (I"m not sure you'd ever get a priority input from a pipe.) Nevertheless it is technically incorrect.

    ReplyDelete
    Replies
    1. Ah! That's right! Many thanks for your comment! Indeed that is the correct thing to do! This has gone unnoticed for quite a long time :).

      Delete