PHPでTokyo Cabinetハッシュデータベースサーバを作ってみた

Tokyo Cabinetのデータベースは書き込みモードで開くと排他ロックがかかるので、Apache2 (prefork)+mod_phpCGIのようなリクエストごとにPHPのプロセスが分かれているWebアプリケーションでTokyo Cabinetを読み書き両用で使うのは苦しいものがあります。(コードの書き方を工夫すればロックの影響を小さくできる場合もあります)
そこで、PHPでHTTPベースのハッシュデータベースサーバを作ってみました。このサーバはPHP 5.2、tokyocabinet拡張モジュール、PECLのhttp拡張モジュールを使います。ソケットの操作にはPHP組み込みのストリーム関数を用い、sockets拡張モジュールは使いません。

<?php
class TCHDBServer
{
    static private $statuses = array(
        100 => 'Continue',
        101 => 'Switching Protocols',
        200 => 'OK',
        201 => 'Created',
        202 => 'Accepted',
        203 => 'Non-Authoritative Information',
        204 => 'No Content',
        205 => 'Reset Content',
        206 => 'Partial Content',
        300 => 'Multiple Choices',
        301 => 'Moved Permanently',
        302 => 'Moved Temporarily',
        303 => 'See Other',
        304 => 'Not Modified',
        305 => 'Use Proxy',
        400 => 'Bad Request',
        401 => 'Unauthorized',
        402 => 'Payment Required',
        403 => 'Forbidden',
        404 => 'Not Found',
        405 => 'Method Not Allowed',
        406 => 'Not Acceptable',
        407 => 'Proxy Authentication Required',
        408 => 'Request Time-out',
        409 => 'Conflict',
        410 => 'Gone',
        411 => 'Length Required',
        412 => 'Precondition Failed',
        413 => 'Request Entity Too Large',
        414 => 'Request-URI Too Large',
        415 => 'Unsupported Media Type',
        500 => 'Internal Server Error',
        501 => 'Not Implemented',
        502 => 'Bad Gateway',
        503 => 'Service Unavailable',
        504 => 'Gateway Time-out',
        505 => 'HTTP Version not supported',
    );

    private $hdb;
    private $sock;
    private $conn;

    public function __construct(TCHDB $hdb, $host = 'localhost', $port = 8080)
    {
        $this->hdb = $hdb;

        if (strpos($host, '::') !== false) {
            $local_host = sprintf('tcp://[%s]:%d', $host, $port);
        } else {
            $local_host = sprintf('tcp://%s:%d', $host, $port);
        }

        $this->sock = stream_socket_server($local_host, $errno, $errstr);
        if (!$this->sock) {
            throw new RuntimeException($errstr, $errno);
        }
    }

    public function run()
    {
        while ($this->conn = stream_socket_accept($this->sock, -1)) {
            try {
                stream_set_timeout($this->conn, 5);

                $headers_string = '';
                while (!feof($this->conn) && ($line = fgets($this->conn)) != "\r\n") {
                    $headers_string .= $line;
                }

                $headers = HttpUtil::parseHeaders($headers_string);

                if (!$headers || !isset($headers['Request Method']) || !isset($headers['Request Url'])) {
                    $this->raiseError(400);
                    fclose($this->conn);
                    continue;
                }

                $key = ltrim($headers['Request Url'], '/');
                if (strlen($key) == 0) {
                    $this->raiseError(400);
                    fclose($this->conn);
                    continue;
                }
                $key = rawurldecode($key);

                switch ($headers['Request Method']) {
                  case 'GET':
                    $this->handleGet($key);
                    break;
                  case 'PUT':
                    $this->handlePut($key, $headers);
                    break;
                  case 'DELETE':
                    $this->handleDelete($key);
                    break;
                  default:
                    $this->raiseError(405);
                }
            } catch (Exception $e) {
                $this->raiseError(500, $e->getMessage());
            }
            fclose($this->conn);
        }
    }

    private function handleGet($key)
    {
        if (($data = $this->hdb->get($key)) === null) {
            $this->raiseError(404, "'$key' not found.");
        } else {
            $this->returnResponse(200, 'application/octet-stream', $data);
        }
    }

    private function handlePut($key, $headers)
    {
        $length = -1;

        foreach ($headers as $name => $value) {
            if (!strcasecmp('Content-Length', $name)) {
                if (is_array($value)) {
                    $length = (int)array_shift($value);
                } else {
                    $length = (int)$value;
                }
                break;
            }
        }

        if ($length < 0) {
            $this->raiseError(411);
            return;
        }

        $data = stream_get_contents($this->conn, $length);

        $info = stream_get_meta_data($this->conn);
        if ($data === false || $info['timed_out']) {
            $this->raiseError(408);
            return;
        }

        $this->hdb->put($key, $data);

        $this->returnResponse(200, 'text/plain', 'OK');
    }

    private function handleDelete($key)
    {
        $this->hdb->out($key);

        $this->returnResponse(200, 'text/plain', 'OK');
    }

    private function raiseError($code, $body = null)
    {
        $msg = new HttpMessage();
        $msg->setType(HttpMessage::TYPE_RESPONSE);
        $msg->setResponseCode($code);
        if (array_key_exists($code, self::$statuses)) {
            $msg->setResponseStatus(self::$statuses[$code]);
            if ($body === null) {
                $body = self::$statuses[$code];
            }
        }
        $headers = array('Content-Type' => 'text/plain');
        if ($body !== null) {
            $headers['Content-Length'] = (string)strlen($body);
            $msg->setHeaders($headers);
            $msg->setBody($body);
        } else {
            $headers['Content-Length'] = (string)strlen('ERROR');
            $msg->setHeaders($headers);
            $msg->setBody('ERROR');
        }
        fwrite($this->conn, $msg->toString());
    }

    private function returnResponse($code, $mimetype = 'text/plain', $body = null)
    {
        $msg = new HttpMessage();
        $msg->setType(HttpMessage::TYPE_RESPONSE);
        $msg->setResponseCode(200);
        $msg->setResponseStatus('OK');
        $msg->setHeaders(array('Content-Type' => (string)$mimetype));
        if ($body !== null) {
            $msg->addHeaders(array('Content-Length' => (string)strlen($body)));
            $msg->setBody($body);
        }
        fwrite($this->conn, $msg->toString());
    }
}

if (PHP_SAPI == 'cli' && realpath($_SERVER['SCRIPT_FILENAME']) == __FILE__) {
    $hdb = new TCHDB();
    $hdb->open('/tmp/server-test.hdb', TCHDB::OWRITER | TCHDB::OCREAT);
    $server = new TCHDBServer($hdb);
    $server->run();
}

コマンドラインで以下のような内容のPHPスクリプトを走らせれば準備完了です。

<?php
require_once 'TCHDBServer.php';

// ハッシュデータベースにライタとして接続する
$hdb = new TCHDB();
$hdb->open('casket.hdb', TCHDB::OWRITER | TCHDB::OCREAT);

// サーバのインスタンスを生成する (デフォルトではlocalhostのTCPポート8080番をlistenする)
$server = new TCHDBServer($hdb);
// サーバを開始する
$server->run();

クライアントからリクエストされたURLがキーとなり、GETメソッドで取得、PUTメソッドで挿入/更新、DELETEメソッドで削除します。
pecl_httpを使った例ではこうなります。

<?php
$base_url = 'http://localhost:8080/';
$key = 'hoge';
$url = $base_url . $key;
$data = 'Tokyo Cabinet';

$req = new HttpRequest($url, HttpRequest::METH_GET);
$res = $req->send();
var_dump($res->getResponseCode(), $res->getBody());

$req = new HttpRequest($url, HttpRequest::METH_PUT);
$req->setPutData($data);
$res = $req->send();
var_dump($res->getResponseCode(), $res->getBody());

$req = new HttpRequest($url, HttpRequest::METH_GET);
$res = $req->send();
var_dump($res->getResponseCode(), $res->getBody());

$req = new HttpRequest($url, HttpRequest::METH_DELETE);
$res = $req->send();
var_dump($res->getResponseCode(), $res->getBody());

$req = new HttpRequest($url, HttpRequest::METH_GET);
$res = $req->send();
var_dump($res->getResponseCode(), $res->getBody());

結果:

int(404)
string(17) "'hoge' not found."
int(200)
string(2) "OK"
int(200)
string(13) "Tokyo Cabinet"
int(200)
string(2) "OK"
int(404)
string(17) "'hoge' not found."

HTTPベースなのでTokyo Cabinetがインストールされていないクライアントからでも簡単にアクセスできます。
・・・が、こちらの環境 (Mac OS X 10.4.11 (intel), PHP 5.2.5) では、handlePutメソッドの$data = stream_get_contents($this->conn, $length);で1-2秒ほど、妙に時間がかかっていて実用には厳しい感じです。他のOSではどうなんでしょうか?