PHP 8.5.4 Released!

Pool 类

(PECL pthreads >= 2.0.0)

简介

Pool 对象是多个 Worker 对象的容器,同时也是它们的控制器。

线程池是对 Worker 功能的高层抽象,包括按照 pthreads 需要的方式来管理应用的功能。

类摘要

class Pool {
/* 属性 */
protected $size;
protected $class;
protected $workers;
protected $ctor;
protected $last;
/* 方法 */
public __construct(int $size, string $class = ?, array $ctor = ?)
public collect(Callable $collector = ?): int
publicresize(int $size): void
publicshutdown(): void
public submit(Threaded $task): int
public submitTo(int $worker, Threaded $task): int
}

属性

size
Pool 对象可容纳的 Worker 对象的最大数量
class
Worker 的类
workers
指向 Worker 对象的引用
ctor
构造新的 Worker 对象时所需的参数
last
最后使用的 Worker 对象在池中的位置偏移量

目录

添加备注

用户贡献的备注 3 notes

up
6
meadowsjared at gmail dot com
10 years ago
Please note, when using the collect function, it's important that you extend the pool class so you can keep checking for finished threads until they're all done.

<?php
class TestWork extends Threaded {
    protected $complete;
    //$pData is the data sent to your worker thread to do it's job.
    public function __construct($pData){
        //transfer all the variables to local variables
        $this->complete = false;
        $this->testData = $pData;
    }
    //This is where all of your work will be done.
    public function run(){
        usleep(2000000); //sleep 2 seconds to simulate a large job
        $this->complete = true;
    }
    public function isGarbage() {
        return $this->complete;
    }
}
class ExamplePool extends Pool
{
    public $data = array();
    public function process()
    {
        // Run this loop as long as we have
        // jobs in the pool
        while (count($this->work)) {
            $this->collect(function (TestWork $task) {
                // If a task was marked as done
                // collect its results
                if ($task->isGarbage()) {
                    $tmpObj = new stdclass();
                    $tmpObj->complete = $task->complete;
                    //this is how you get your completed data back out [accessed by $pool->process()]
                    $this->data[] = $tmpObj;
                }
                return $task->isGarbage();
            });
        }
        // All jobs are done
        // we can shutdown the pool
        $this->shutdown();
        return $this->data;
    }
}
$pool = new ExamplePool(3);
$testData = 'asdf';
for($i=0;$i<5;$i++) {
    $pool->submit(new TestWork($testData));
}
$retArr = $pool->process(); //get all of the results
echo '<pre>';
print_r($retArr); //return the array of results (and maybe errors)
echo '</pre>';
?>
up
4
meadowsjared at gmail dot com
5 years ago
In this example, it shows how to use a pool to get an array of results, using pThreads v3.2.1 and php 7.3.23

<?php
class TestWork extends Threaded {
//updated version that works with pThreads v3.2.1 and php 7.3.23
    protected $complete;
    //$pData is the data sent to your worker thread to do it's job.
    public function __construct($pData) {
        //transfer all the variables to local variables
        $this->complete = false;
        $this->testData = $pData;
    }
    //This is where all of your work will be done.
    public function run() {
        usleep(2000000); //sleep 2 seconds to simulate a large job
        $this->complete = true;
    }
    public function isDone() {
        return $this->complete;
    }
}
class ExamplePool extends Pool {
    public $data = array(); // used to return data after we're done
    private $numTasks = 0; // counter used to know when we're done
    /**
     * override the submit function from the parent
     * to keep track of our jobs
     */
    public function submit(Threaded $task) {
        $this->numTasks++;
        parent::submit($task);
    }
    /**
     * used to wait until all workers are done
     */
    public function process() {
        // Run this loop as long as we have
        // jobs in the pool
        while (count($this->data) < $this->numTasks) {
            $this->collect(function (TestWork $task) {
                // If a task was marked as done, collect its results
                if ($task->isDone()) {
                    $tmpObj = new stdclass();
                    $tmpObj->complete = $task->complete;
                    //this is how you get your completed data back out [accessed by $pool->process()]
                    $this->data[] = $tmpObj;
                }
                return $task->isDone();
            });
        }
        // All jobs are done
        // we can shutdown the pool
        $this->shutdown();
        return $this->data;
    }
}
$pool = new ExamplePool(3);
$testData = 'asdf';
for($i=0;$i<5;$i++) {
    $pool->submit(new TestWork($testData));
}
$retArr = $pool->process(); //get all of the results
echo '<pre>';
print_r($retArr); //return the array of results (and maybe errors)
echo '</pre>';
?>
up
2
olavk
10 years ago
Simple example with Collectable (basically Thread meant for Pool) and Pool

<?php

class job extends Collectable {
  public $val;

  public function __construct($val){
    // init some properties
    $this->val = $val;
  }
  public function run(){
    // do some work
    $this->val = $this->val . file_get_contents('http://www.example.com/', null, null, 3, 20);
    $this->setGarbage();
  }
}

// At most 3 threads will work at once
$p = new Pool(3);

$tasks = array(
  new job('0'),
  new job('1'),
  new job('2'),
  new job('3'),
  new job('4'),
  new job('5'),
  new job('6'),
  new job('7'),
  new job('8'),
  new job('9'),
  new job('10'),
);
// Add tasks to pool queue
foreach ($tasks as $task) {
  $p->submit($task);
}

// shutdown will wait for current queue to be completed
$p->shutdown();
// garbage collection check / read results
$p->collect(function($checkingTask){
  echo $checkingTask->val;
  return $checkingTask->isGarbage();
});

?>
To Top