PHPの開発で並列処理はほとんど使われていません。確かにシンプルにシングルスレッドで同期したプログラムを書きたくなるものの、並列処理を取り入れることでパフォーマンスを大きく改善できる可能性があります。
この記事ではpthreadsという拡張モジュールを使ってPHPでマルチスレッド処理をする方法を紹介します。必要な環境はZTS(Zend Thread Safety)バージョンのPHP 7.xとpthreads v3です。執筆時点では、PHP 7.1ユーザーはpthreadsリポジトリーのマスターブランチからインストールする必要があります。詳しくは『How to Install PHP Extensions from Source(サードパーティー拡張モジュールをソースからビルドする方法)』の「Installing a third party extension」を読んでください。
注釈:pthreads v2はPHP 5.x向けで、サポートが終了しています。pthreads v3はPHP 7.x向けで、精力的に開発が続けられています。
pthreadsを使うべきでないとき
本題に入る前に、pthreads拡張モジュールを使うべきでない(または使えない)場合について説明します。
pthreads v2では、Webサーバー環境(たとえばFastCGIプロセス)で使わないことが推奨されていました。pthreads v3では推奨から強制に変わり、Webサーバー環境では使えなくなりました。これには次の2つの理由があります。
- . Webサーバーでマルチスレッドを使うのは安全でない(特にI/Oの問題)
- スケールしない。たとえば、新しいスレッドを作成してタスクを処理するPHPスクリプトがあり、このスクリプトがリクエストごとに実行される、リクエストを受けるたびに新たなスレッドを1つ作成する1対1スレッドモデルとする。仮に毎秒1000のリクエストがれば、毎秒1000個のスレッドが作成されてしまう。これだけの数のスレッドを1つのマシンで実行するとたちまち処理が追いつかなくなり、リクエスト数が増えれば、さらにパフォーマンスは悪化する
以上がマルチスレッドがWeb環境に適さない理由です。もしI/Oをブロックするタスク(たとえば、HTTP request)をマルチスレッドで処理しようと考えているなら、非同期プログラミングをAmpのようなフレームワークでの実装を検討すべきです。SitePointにはこの分野のすぐれた記事があります。たとえば、『Writing Async Libraries – Let’s Convert HTML to PDF(非同期ライブラリーを書く)』や『Modding Minecraft with PHP – Buildings from Code!(PHPでMinecraft Modding)』です。
避けるべきことが分かったので、ここから本題に入ります。
単発タスクを処理する
単発のタスク(たとえば、I/Oバウンドなタスク)をマルチスレッドで処理したいときにはThreadクラスを使って新しいスレッドを作成し、作成した別スレッドでタスクを実行します。例を示します。
$task = new class extends Thread {
private $response;
public function run()
{
$content = file_get_contents("http://google.com");
preg_match("~<title>(.+)</title>~", $content, $matches);
$this->response = $matches[1];
}
};
$task->start() && $task->join();
var_dump($task->response); // string(6) "Google"
例では、新たに作成したスレッドでrunメソッドを実行しています。Thread::startを呼び出すと、新しいスレッドが作られてrunメソッドが実行されます。そしてThread::joinにより、追加したスレッドの処理が終わるのを待ってメインスレッドへ統合します。タスクの終了後に$task->responseに保存されている結果を出力できます。
スレッドに関係するロジック(例:runメソッドの定義)をクラスに押し込むのは好ましくありません。代わりにThreadedクラスを継承することで、クラスを分離して別のスレッドで実行できます。
class Task extends Threaded
{
public $response;
public function someWork()
{
$content = file_get_contents('http://google.com');
preg_match('~<title>(.+)</title>~', $content, $matches);
$this->response = $matches[1];
}
}
$task = new Task;
$thread = new class($task) extends Thread {
private $task;
public function __construct(Threaded $task)
{
$this->task = $task;
}
public function run()
{
$this->task->someWork();
}
};
$thread->start() && $thread->join();
var_dump($task->response);
別スレッドで実行するクラスは、なんらかの方法でThreadedクラスを継承する必要があります。別スレッドで安全に実行できることに加えて、便利なインターフェイス(リソース同期など)を使えるようになるからです。
それではpthreadsでアクセスできるクラスの構造を示します。
Threaded (implements Traversable, Collectable)
Thread
Worker
Volatile
Pool
ThreadとThreadedクラスの概要は説明したので、残りの3つ(Worker、Volatile、Pool)について取り上げます。
スレッドの再利用
並列化したいタスクの分だけ新しいスレッドを生み出すのは高コストです。pthreadsはシェアード・ナッシング・アーキテクチャに基づいてPHPでマルチスレッドを実装しているためです。すなわち、実行中のPHPのインタープリター(すべてのクラス、インターフェイス、トレイト、関数など)のインスタンスすべてがスレッドごとにコピーされます。これはパフォーマンスを少なからず低下させるので、スレッドの再利用を試みるべきです。スレッドを再利用する方法にはWorkerとPoolの2つがあります。
Workerクラスは別スレッドで一連のタスクを同期しながら実行するときに使用します。最初に新しいWorkerインスタンスを作成すると、このWorkerインスタンスが新しいスレッドを作成します。次にWorker::stackで別のスレッドにタスクをスタックします。
簡単な例を示します。
class Task extends Threaded
{
private $value;
public function __construct(int $i)
{
$this->value = $i;
}
public function run()
{
usleep(250000);
echo "Task: {$this->value}\n";
}
}
$worker = new Worker();
$worker->start();
for ($i = 0; $i < 15; ++$i) {
$worker->stack(new Task($i));
}
while ($worker->collect());
$worker->shutdown();
結果は次のようになります。
ここではWorker::stackを使って15個のタスクを新たに作成した$workerオブジェクトにスタックし、その順に処理しています。そしてタスク実行後にWorker::collectメソッドでクリーンナップしています。collectメソッドをwhileループ内に置いているので、スタックしたタスクの実行とクリーンナップが終わってからメインスレッドに戻りWorker::shutdownが呼び出されます。workerのshutdownが早すぎても(例:タスクの実行中)、タスクすべてが終わってからメインスレッドに戻りますが、タスクはガベージコレクトされないのでメモリーリークを引き起こします。
Workerクラスにはタスクスタックに関するメソッドがほかにもあります。たとえば、Worker::unstackは一番古いアイテムをスタックから削除し、Worker::getStackedはスタックに積まれているアイテム数を返します。Workerクラスのスタックには実行予定のタスクだけが積まれます。スタックに積んだタスクが実行されるとそのタスクは削除され、別の内部スタックに移動した後にWorker::collectによりガーベッジコレクトされます。
多数のタスクを実行するときに、Poolクラスのスレッドプールを使ってもスレッドを再利用できます。スレッドプールは複数のWorkerを使ってタスクを同時に実行します。同時実行数(スレッドプールを実行するスレッドの数)はスレッドプールの作成時に指定します。
先ほどの例をスレッドプールを使って実装します。
class Task extends Threaded
{
private $value;
public function __construct(int $i)
{
$this->value = $i;
}
public function run()
{
usleep(250000);
echo "Task: {$this->value}\n";
}
}
$pool = new Pool(4);
for ($i = 0; $i < 15; ++$i) {
$pool->submit(new Task($i));
}
while ($pool->collect());
$pool->shutdown();
結果は次のようになります。
workerの代わりにpoolを使うには、いくつか修正が必要です。まずpoolは、わざわざ開始の指示をしなくても実行可能な状態になればタスクを実行し始めます。次にタスクはスタックに積むのではなく、poolにsubmitします。さらに、PoolクラスはThreadedクラスを継承する必要はなく、Workerのようにほかのスレッドに渡されることはありません。
完了したタスクをcollectしたあとで手動でshutdownするのはworkerとpoolの両方に共通する作法です。Threadクラスが作成したスレッドは、メインスレッドに統合します。
pthreadsとイミュータブル
取り上げる最後のクラスはVolatileで、pthreads v3で追加されました。イミュータブルはpthreadsでパフォーマンスを大幅に低下させないための大切なコンセプトです。Threadedクラスのプロパティは、自身がThreadedオブジェクトでありデフォルトでイミュータブルなので、最初に設定した値から変更できません。これらのプロパティをミュータブルにするには新しいVolatileクラスを使って明示的に指定することが好まれます。
次の例を使いながら、新しいイミュータブルの制約を説明します。
class Task extends Threaded // a Threaded class
{
public function __construct()
{
$this->data = new Threaded();
// $this->data is not overwritable, since it is a Threaded property of a Threaded class
}
}
$task = new class(new Task()) extends Thread { // a Threaded class, since Thread extends Threaded
public function __construct($tm)
{
$this->threadedMember = $tm;
var_dump($this->threadedMember->data); // object(Threaded)#3 (0) {}
$this->threadedMember = new StdClass(); // invalid, since the property is a Threaded member of a Threaded class
}
};
一方、VolatileクラスのThreadedプロパティはミュータブルです。
class Task extends Volatile
{
public function __construct()
{
$this->data = new Threaded();
$this->data = new StdClass(); // valid, since we are in a volatile class
}
}
$task = new class(new Task()) extends Thread {
public function __construct($vm)
{
$this->volatileMember = $vm;
var_dump($this->volatileMember->data); // object(stdClass)#4 (0) {}
// still invalid, since Volatile extends Threaded, so the property is still a Threaded member of a Threaded class
$this->volatileMember = new StdClass();
}
};
ここでは、親クラスのThreadedから受け継いだイミュータブルの制約をVolatileクラスがオーバーライドして、Threadedプロパティを再代入可能にしています(unset()も同様)。
ミュータブルとVolatileクラスに関して重要なことがあと1つあります。それは配列で、pthreadsにおいてはThreadedクラスのプロパティに代入されたときに自動的にVolatileオブジェクトに変換されます。PHPでは複数のコンテキストから配列を操作することは安全ではないためです。
それでは次の例を使って説明します。
$array = [1,2,3];
$task = new class($array) extends Thread {
private $data;
public function __construct(array $array)
{
$this->data = $array;
}
public function run()
{
$this->data[3] = 4;
$this->data[] = 5;
print_r($this->data);
}
};
$task->start() && $task->join();
/* Output:
Volatile Object
(
[0] => 1
[1] => 2
[2] => 3
[3] => 4
[4] => 5
)
*/
例のように、Volatileオブジェクトにはsubsetオペレーター([])による配列の機能があるので、配列と同じように取り扱えます。しかしVolatileクラスはarray_popやarray_shiftなどの標準的な配列の関数をサポートしていません。代わりにThreadedクラスにはそのようなメソッドが組み込まれています。
例を示します。
$data = new class extends Volatile {
public $a = 1;
public $b = 2;
public $c = 3;
};
var_dump($data);
var_dump($data->pop());
var_dump($data->shift());
var_dump($data);
/* Output:
object(class@anonymous)#1 (3) {
["a"]=> int(1)
["b"]=> int(2)
["c"]=> int(3)
}
int(3)
int(1)
object(class@anonymous)#1 (1) {
["b"]=> int(2)
}
*/
ほかにもThreaded::chunkやThreaded::mergeなどのメソッドがサポートされています。
同期
最後にpthreadsの同期について説明します。同期とは共有しているリソースへのアクセスをコントロールする手法です。
例として、同期をとらずにカウンターを実装します。
$counter = new class extends Thread {
public $i = 0;
public function run()
{
for ($i = 0; $i < 10; ++$i) {
++$this->i;
}
}
};
$counter->start();
for ($i = 0; $i < 10; ++$i) {
++$counter->i;
}
$counter->join();
var_dump($counter->i); // outputs a number from between 10 and 20
同期をとっていないので、実行するたびに結果が変わります。複数のスレッドからのアクセスをコントロールしないまま1つの変数を書き換えているので、更新した内容が失われているためです。
同期をとることで、正しいアウトプットである20を得られるようにします。
$counter = new class extends Thread {
public $i = 0;
public function run()
{
$this->synchronized(function () {
for ($i = 0; $i < 10; ++$i) {
++$this->i;
}
});
}
};
$counter->start();
$counter->synchronized(function ($counter) {
for ($i = 0; $i < 10; ++$i) {
++$counter->i;
}
}, $counter);
$counter->join();
var_dump($counter->i); // int(20)
同期をとっているコードのブロック同士は、Threaded::waitやThreaded::notify(ほかにもThreaded::notifyAll)を使って連携します。
同期中の2つのforループが交互に加算する例を示します。
$counter = new class extends Thread {
public $cond = 1;
public function run()
{
$this->synchronized(function () {
for ($i = 0; $i < 10; ++$i) {
var_dump($i);
$this->notify();
if ($this->cond === 1) {
$this->cond = 2;
$this->wait();
}
}
});
}
};
$counter->start();
$counter->synchronized(function ($counter) {
if ($counter->cond !== 2) {
$counter->wait(); // wait for the other to start first
}
for ($i = 10; $i < 20; ++$i) {
var_dump($i);
$counter->notify();
if ($counter->cond === 2) {
$counter->cond = 1;
$counter->wait();
}
}
}, $counter);
$counter->join();
/* Output:
int(0)
int(10)
int(1)
int(11)
int(2)
int(12)
int(3)
int(13)
int(4)
int(14)
int(5)
int(15)
int(6)
int(16)
int(7)
int(17)
int(8)
int(18)
int(9)
int(19)
*/
Threaded::waitを呼び出すところに条件を追加しました。この条件があるので、通知を受領し、かつ条件がtrueのときに同期をとってコールバックを再開できます。Threaded::notify以外の場所から通知が送られる可能性があるため、この条件は欠かせません。指定した条件を伴わずにThreaded::waitを呼び出すと、スプリアスウェイクアップ問題を起こして予測できない動きをする可能性があります。
最後に
この記事ではpthreadsの5つのクラス(Threaded、Thread、Worker、Volatile、Pool)と使い方を説明しました。またpthreadsの新しいイミュータブルのコンセプトとサポートされている同期の機能についても取り上げました。基本的なことを理解すれば実際のケースにpthreadsを利用できます。
※この記事を校閲し、助言をしてくれたJoe Watkins(pthreads拡張モジュールの開発者)に感謝します!
※本記事はChristopher Pittが査読を担当しています。最高のコンテンツに仕上げるために尽力してくれたSitePointの査読担当者のみなさんに感謝します。
(原文:Parallel Programming with Pthreads in PHP – the Fundamentals)
[翻訳:内藤夏樹/編集:Livit]