April 04, 2007

Non-blocking I/O With PHP-MIO

A couple of weeks ago I was thinking about non-blocking I/O in PHP, specifically about how clunky PHP's select implementation is. I say clunky because it's not bad, it's just not as easy to use as it could be. It's not as easy, for example, as the implementation found in Java's NIO package which is beautifully simple to use. The main issue I have with PHP's implementation is that I am responsible for keeping track of everything. I have to remember which streams I'm interested in writing to, which streams I'm interested in reading from and when I get to accepting connections, which streams are server sockets that I'm interested in accepting connections on. I'm lazy, I don't want to have to do that, I want a library to handle all that for me. At this point I decided to implement something similar to Java's non-blocking I/O in PHP5. This is now finished and up on sourceforce (under the name of phpmio). In this article I hope to give you enough information to get up and running with the package.


But What Is Multiplexed I/O?

Before I go any further I suppose I should explain exactly what multiplexed (or non-blocking) I/O actually is. When reading from or writing to a stream PHP usually blocks until the operation is complete, however, a stream's blocking mode can be set such that operations on streams don't block and instead return immediately. Used correctly this technique can vastly improve performance in networked applications. This comes at the price of increased complexity and some would argue a more confusing program flow. For this reason I wouldn't suggest it for trivial applications. Let's take a look at this in action. In the example below we open a stream to amazon and try to read some data from it, then display how long each operation took and how much data was read. If this is run with the stream's blocking mode set to 0 (non-blocking) you will notice that the read takes very little time and not all of the bytes are read. If, on the other hand, the stream's blocking mode is set to 1 (blocking) you will notice that the read takes much longer and all 2048 bytes are read.


<?php
$start = microtime( true );

$fp = fopen( 'http://www.amazon.co.uk/', 'r' );
$open = microtime( true );

stream_set_blocking( $fp, 1 );
$block = microtime( true );

$data = fread( $fp, 2048 );
$read = microtime( true );

// the time taken to open the url
echo "Open: " . ($open-$start) . "\n";
// the time taken to set the stream to blocking
echo "Block: " . ($block-$open) . "\n";
// the time taken to read from the the stream
echo "Read: " . ($read-$block) . "\n";
// the amount of data read
echo "Data Read: " . strlen( $data ) . "\n";

OK, So What Is PHP MIO?

So, how does multiplexed I/O work with the PHP MIO package? Within the package there are three key classes; MioStream, MioSelector and MioSelectionKey. There is also a factory class to provide a convenient way of creating different types of streams, MioStreamFactory, and a few Exception classes. Before diving into the core of the package let us take a quick look at MioStreamFactory to get it out of the way. Below are three examples of how it can be used. Each method creates an instance of the MioStream class which wraps a PHP stream. The first method, createSocketStream, creates (as you might expect) a socket stream (as would be created with fsockopen); the second a server socket (stream_socket_server); and the third a file socket (fopen). One thing which should be noted is that creating an MioStream with MioStreamFactory implicitly sets it's blocking flag to 0.


<?php
$factory = new MioStreamFactory();

// Create a client socket stream
$socket = $factory->createSocketStream( '127.0.0.1', 8888 );

// Create a server socket stream
$server = $factory->createServerStream( '127.0.0.1:8888' );

// Create a local file stream
$file = $factory->createFileStream( '/etc/hosts' );

An MioStream object allows us to do the basic things we would want to do with a stream such as reading and writing. Although MioStream only handles basic functions itself, it does give you access to it's underlying stream resource in case you need to do anything more advanced. In the code below we write some data to the socket stream we made in the last example and then read some data from it. We then attempt to accept a new stream on the server socket stream (note that the accept method will return an MioStream object). On this new stream we check that it's open, close it and then check that it's no longer open. Finally, we get hold of the socket stream's internal resource and append a stream filter to it.


<?php
$socket->write( 'Put some data' );

$socket->read( 1024 );

$stream = $server->accept();

if( !$stream->isOpen() ) {
trigger_error( "The new stream should be open", E_USER_ERROR );
}

$stream->close();

if( $stream->isOpen() ) {
trigger_error( "The new stream should now be closed", E_USER_ERROR );
}

stream_filter_append( $socket->getStream(), 'string.toupper' );

Downloading The AMP Stack

Now that we know how to create MioStream objects and interact with them let's take a look at how they are used with the MioSelector. A selector is an object used for managing and selecting streams which are available for different types of work (reading, writing or accepting connections). This is done by registering MioStream objects with the selector, the relationship between the selector and each stream is excapsulated in an MioSelectionKey object. When we register a stream with a selector we also provide what we're interested in for this stream (this can be reading, writing or accepting connections) and optionally an object which we want associated with the stream (so we know what to do with it). Once we have registered our streams with the selector we can call the select method to get all registered streams which are ready for any of the operations we are interested in. To get an idea of how this works let's take a look at a simple example, downloading three files.

In this example we open a stream to each remote file we want to download and one for each local file we want to write to. We the register each remote stream with the selector, attaching the stream's respective local stream for writing to. Once we have registered them we loop over the selector's select method, the select method returns the number of ready streams (streams which area available for one of the actions we have registered an interest in) or false if there are no streams registered with the selector. An important note to make here is that streams are automatically unregistered from the selector when they are closed so in this case we don't have to explicitly unregister them. Now we can loop over all the streams which have been selected and perform our action on them. In this case we read from the remote stream and write the data to it's associated local stream.


<?php
$selector = new MioSelector();
$factory = new MioStreamFactory();

// Create and register streams to download the PHP 5.2.1 source
$reader = $factory->createFileStream( 'http://uk.php.net/get/php-5.2.1.tar.bz2/from/this/mirror', 'r' );
$writer = $factory->createFileStream( 'php-5.2.1.tar.bz2', 'w+' );
$selector->register( $reader, MioSelectionKey::OP_READ, $writer );

// Create and register streams to download the MySQL 5.11.15 binary
$reader = $factory->createFileStream( 'http://dev.mysql.com/get/Downloads/MySQL-5.1/mysql-5.11.15-beta-linux-i686-glibc23.tar.gz/from/http://mirrors.dedipower.com/www.mysql.com/', 'r' );
$writer = $factory->createFileStream( 'mysql-5.11.15-beta-linux-i686-glibc23.tar.gz' );
$selector->register( $reader, MioSelectionKey::OP_READ, $writer );

// Create and register streams to download the Apache 2.2.4 source
$reader = $factory->createFileStream( 'http://www.mirrorservice.org/sites/ftp.apache.org/httpd/httpd-2.2.4.tar.bz2' );
$writer = $factory->createFileStream( 'httpd-2.2.4.tar.bz2' );
$selector->register( $reader, MioSelectionKey::OP_READ, $writer );

while( true ) {
// Loop over select untill we have some streams to act on
while( !$count = $selector->select() ) {
if( $count === false ) {
$selector->close();
break 2;
}
}

// Loop over all streams which are available for
// something we're interested in
foreach( $selector->selected_keys as $key ) {
if( $key->isReadable() ) {
$key->attachment->write(
$key->stream->read( 16384 )
);
}
}
}

Serving Up Echoes

I think we have a good understanding of how PHP MIO works now so let's take a look at a
server example. To keep it simple I'm going to do an echo server. This example will accept
connections on port 7, read data in and then send it straight back. First off, we're going
to need a class to encapsulate the echoing.


<?php
/**
* A class to echo data.
* This is eessentially just a FIFO queue. Data can be
* added onto the end of the buffer and at a later date
* it can be read (and implicitly removed) from the
* beginning of the buffer.
*/

class Echoer
{
/**
* Holds the data untill it needs to
* be echoed back
*/

private $buffer='';

/**
* Add some data to the buffer
*
*
@param string $data
*
@return void
*/

public function put( $data )
{
$this->buffer .= $data;
}

/**
* Read and remove a chunk of data from
* the start of the buffer
*
*
@param int $size The amount of data to read
*
@return string
*/

public function get( $size = 4096 )
{
$data = substr( $this->buffer, 0, $size );
$this->buffer = substr( $this->buffer, $size );
return $data;
}
}

Now we need to set up our server and get it working, what we're going to do is accept connections and then register these with the selector with an interest in reading. These streams will then appear in later selects where we can read a chunk of data off, put it in the echoer and reset the selection key's interest to writing so that we can echo the data back down the line.


<?php
// Create our base objects
$selector = new MioSelector();
$factory = new MioStreamFactory();

// Register a server stream with the selector
$selector->register(
// the server stream is listening on 127.0.0.1 port 7
$factory->createServerStream( '127.0.0.1:7' ),
// we are interested in accepting connections
MioSelectionKey::OP_ACCEPT
);

// loop for ever, this is going to be server
while( true ) {
// keep selecting until there's something to do
while( !$count = $selector->select() ) { }

// when there's something to do loop over the ready set
foreach( $selector->selected_keys as $key ) {
// do different acctions for different ready ops
if( $key->isAcceptable() ) {
// if the stream has connections ready to
// accept then accept them until there's no more
while( $stream = $key->stream->accept() ) {
// register the newly accepted connection with the
// selector so that it is handled in subsequent operations
$selector->register(
$stream,
// we are interested in reading from the stream
MioSelectionKey::OP_READ,
// attach an instance of the echoer to manage echoing
new Echoer()
);
}
} elseif( $key->isReadable() ) {
// if the stream is ready for reading then
// read a chunk of data off it and add it to
// the echoer
$key->attachment->put(
$key->stream->read( 4096 )
);
// now we're interested in writing back down the pipe
$key->setInterestOps( MioSelectionKey::OP_WRITE );
} elseif( $key->isWritable() ) {
// if the stream is ready for writing then
// get some data from the echoer
$data = $key->attachment->get();
if( $data ) {
// if there's data there then send it back
$key->stream->write(
$data
);
} else {
// if there's none then remove the key
$selector->removeKey( $key );
}
}
}
}

So, now we've done a multiplexed downloader and a multiplexed server. We have processed PHP sockets in a high performance and very efficient manner. PHP may not be the first choice for writing high performance networking applications but for knocking up, in a matter of minutes, something which performes pretty damned well, I think this could do the trick.

5 comments:

Unknown said...

Great class.

Interested in making a port to a big (meaning successful) french meta search engine ?
(paid job or in exchange of another service)

Contact me if you want malaiac / gmail . com

Adam Zivner said...

Cool class.

The only problem is that establishing connection using fsockopen() is blocking and you can't do anything about it.

But you can at least add some timeout so your program won't hang when trying to connect to unresponsive server.

Unknown said...

this class does not work :/

Rob Young said...

Hi exn e, I can assure you it does. What problems are you having with it. Maybe I can help.

Unknown said...

Rob,

Found your class while cruising the web looking for PHP code to do non-blocking IO.

Looks interesting, but I'll admit, I'm struggling to grawk your classes.

Any chance you can ping me at bradh (at) konamoxt (dot) com

Thanks in advance

-Brad