In Unix, select waits for file descriptors to become unblocked. More generally, we can define select as a mechanism for waiting on an arbitrary number of input or output sources for communication. We can refer to the input and output sources more simply as channels.
You can blame Andrew Francis for getting me into what follows. Thanks, Andrew!
A few months ago Andrew and I looked into implementing a version of select for Stackless Python based roughly on Rob Pike's implementation in Plan 9, which is examined in more detail in his paper "The Implementation of Newsqueak"[1]. I ended up writing a Python prototype and modifying the Stackless C sources to support it natively as well. Rather than just throwing a ~1000 line svn diff into the wild, I thought I'd write a bit about it in a blog article.
A naive implementation of select in Stackless Python would either involve a series of if/elif statements to check channel balances or an overly complicated system using delegate tasklets and channels whose sole purpose is to inform the working tasklets which channels are ready (i.e. not blocked).
Select can be implemented more elegantly if built into the stackless module itself. Python has no built-in case or switch statement, but as we will see, there are ways to work around this that work almost as nicely, and for some cases perhaps even better.
Prototype
To build a working prototype I wrote a small module (source on github). It wraps the channel and tasklet objects and provides a new tasklet operation called select. It also defines a new alt class of objects (this refers to the naming used by Pike) that represent channel operations. Alt objects are opaque, and the user of the module does not create them with a normal constructor. Instead, two new methods are added to channel objects: sends and receives, both of which return alt objects that can be passed in a list to select.
import select
ch1 = select.channel()
ch2 = select.channel()
ch3 = select.channel()
# more code, tasklets, etc...
def onReceive(ch,op,v): # receive callback
print "Received", v, "from channel", ch
task = select.current()
task.select([ch1.sends("foo"),
ch2.receives(onReceive),
ch3.receives(onReceive)])
One nice feature of this approach is that in the likely case of a dispatcher or server that does little but wait for requests and ask other tasklets to take care of them, the list of alt objects can be reused and even modified from iteration to iteration without reconstruction.
Another thing demonstrated here is that alt objects can be associated with a callback. If a callback is used, it is fired before the select method returns, and the result of the callback is used as the return value of the select method. Otherwise, select returns a tuple of (channel,operation[,received_value]). Unfortunately using callbacks reduces the locality of select result handlers, but lacking any sort of built-in switch/case type of statement in Python, this is one of the few ways we can handle it.
The other point to note is that send and receive have been overridden - they are now implemented as single-case selects.
A Python to C Compiler (i.e. Yours Truly)
This is all very nice, but it's not transparently integrated into the stackless module itself. To do that in standard Stackless, we need to dive into some C code….
To that end, the Python prototype serves nicely as a basis for the C implementation. This presented me with the following tasks:
- Implement a new alt type.
- Move generic_channel_action logic into select (now PyTasklet_Select).
- Hope that things don't blow up in my face.
The diff from a 2.6.5 svn checkout of Stackless is available on github. It works, but there are certainly bugs in the implementation and some debugging and introspection features aren't available (set_channel_callback, tasklet._channel). Some of the unit test currently fail, and there seem to be some reference counting issues that crop up in a few tests, but at this point I have no motivation to perfect it further.
The most difficult part, as someone approaching both the Stackless and Python C source code with little knowledge of it, was figuring out where to copy values between tasklets and fire the callbacks. Because callbacks need to be fired in the selecting tasklet, it took me a while to find where blocked tasklets are resumed. This is especially important when tasklets are running in different threads. Currently, this is not working correctly.
Here's the example from the prototype, re-written using the integrated implementation.
import stackless
ch1 = stackless.channel()
ch2 = stackless.channel()
ch3 = stackless.channel()
# more code, tasklets, etc...
def onReceive(ch,op,v): # receive callback
print "Received", v, "from channel", ch
stackless.select([ch1.sends("foo"),
ch2.receives(onReceive),
ch3.receives(onReceive)])
There is one limitation to the current implementation: a select cannot wait for both sends and receives on a single channel. The next example will not work.
stackless.select([ch.sends(x), ch.receives()])
I believe that this is a rare enough use case that it need not be handled.
One Important Caveat
Alt objects must be created by the tasklet that intends to use them, because they are automatically associated with that tasklet. Passing them between tasklets will result in bad things (currently, a segmentation fault). There should probably be a runtime check to ensure this, but this is currently not implemented.
What I Don't Like
- Copying values to the blocked tasklet is completed by the running tasklet instead of the newly unblocked tasklet. This means that any callbacks for tasklets are run by the wrong tasklet (i.e. they should be run right before select/send/receive return).
- Creating an alt object for each operation. I don't know if there's a feasible way around this, and in any case the fact that they are reusable makes this mostly irrelevant.
- Creating an alt object for each and every send() and receive(). This could be worked around by associating an alt with each tasklet that can be reused.
- Using a static variable for random ready channel selection. This doesn't need to be extremely random anyway so it's probably not that big of a deal.
Possible Optimizations
There is one common case that we may be able to optimize in order to avoid any tear-down and reinitialization of select.
def dispatch(req):
# handle req...
pass
def handler(ch,op,v):
dispatch(build_request(v))
cases = [ch.receives(handler) for ch in channels]
flag = True
## This is a common case for dispatching
# while flag:
# stackless.select(cases)
# To have more control over the select we may want a new
# operation that looks something like this instead:
stackless.select_while(cases, lambda: flag)
My gut feeling is that in order for this to work there is an important restriction that we must be able to guarantee: every channel that the select and its callbacks use must have preference set in favour of the select. This allows us to ensure that select will always be ready for the next operation, however this may not be necessary if all channel balances are properly restored between calls. The precise level of tear-down and reinitialization that cannot be avoided isn't yet clear to me.
The Future
I'm handing this off to Andrew for now. He'll be responsible for ensuring all of the unit tests pass. Oh, and if anybody cares, I've moved to Vancouver and am looking for work.
No comments:
Post a Comment