International PHP Conference Munich 2025

La clase parallel\Channel

(0.9.0)

canales no tamponados

Un canal no tamponado bloqueará las llamadas a parallel\Channel::send() hasta que haya un receptor, y bloqueará las llamadas a parallel\Channel::recv() hasta que haya un emisor. Esto significa que un canal no tamponado no es solo un medio para compartir datos entre las tareas sino también un método simple de sincronización.

Un canal no tamponado es la forma más rápida de compartir datos entre las tareas, requiriendo la menor cantidad de copias.

canales tamponados

Un canal tamponado no bloqueará las llamadas a parallel\Channel::send() hasta que se alcance la capacidad, las llamadas a parallel\Channel::recv() bloquearán hasta que haya datos en el búfer.

Cierres en los canales

Una funcionalidad poderosa de los canales paralelos es que permiten el intercambio de cierres entre las tareas (y los entornos de ejecución).

Cuando un cierre es enviado a través de un canal, el cierre es tamponado, esto no cambia el búfer del canal que transmite el cierre, pero afecta el ámbito estático dentro del cierre: el mismo cierre enviado a diferentes ejecuciones, o a la misma ejecución, no compartirá su ámbito estático.

Esto significa que cada vez que un cierre es ejecutado y ha sido transmitido por un canal, el estado estático será el mismo que cuando el cierre fue tamponado.

canales anónimos

La construcción de canales anónimos permite al desarrollador evitar asignar nombres a cada canal: parallel generará un nombre único para los canales anónimos.

Sinopsis de la Clase

final class parallel\Channel {
/* Constructores anónimos */
public __construct()
public __construct(int $capacity)
/* Acceso */
public make(string $name): Channel
public make(string $name, int $capacity): Channel
public open(string $name): Channel
/* Compartir */
public recv(): mixed
public send(mixed $value): void
/* Cerrar */
public close(): void
/* Constantes para el tamponamiento infinito */
const Infinite;
}

Tabla de contenidos

add a note

User Contributed Notes 5 notes

up
4
hdvianna
5 years ago
This is an example of using a channel to produce data for consumers. In this example, the producer Runtime instance will send the time in seconds in which the consumers shall sleep.

<?php

use parallel\{Runtime, Channel};

main($argv);

function
main(array $argv)
{
if (
count($argv) !== 3) {
echo
"Type: hello-parallel.php <number-of-tasks> <maximum-time-of-sleep (in seconds)>" . PHP_EOL;
echo
"Example: hello-parallel.php 5 3" . PHP_EOL;
die;
} else {
$numberOfTasks = intval($argv[1]);
$maximumTimeOfSleep = intval($argv[2]);
$t1 = microtime(true);
parallelize($numberOfTasks, $maximumTimeOfSleep);
$endTime = microtime(true) - $t1;
echo
PHP_EOL."Finished $numberOfTasks task(s) in {$endTime}s".PHP_EOL;
}
}

function
parallelize(int $numberOfTasks, int $maximumTimeOfSleep)
{
$channel = new Channel();

$taskIds = array_map(function () use ($maximumTimeOfSleep) {
return
$id = uniqid("task::");
},
range(0, $numberOfTasks - 1));

$timesToSleep = array_map(function () use ($maximumTimeOfSleep) {
return
rand(1, $maximumTimeOfSleep);
},
$taskIds);

$producer = new Runtime();
$producerFuture = $producer->run(function (Channel $channel, array $timesToSleep) {
foreach (
$timesToSleep as $timeToSleep) {
$channel->send($timeToSleep);
}
}, [
$channel, $timesToSleep]);

$consumerFutures = array_map(function (string $id) use ($channel) {
$runtime = new Runtime();
return
$runtime->run(function (string $id, Channel $channel) {
$timeToSleep = $channel->recv();
echo
"Hello from $id. I will sleep for $timeToSleep second(s).".PHP_EOL;
sleep($timeToSleep);
echo
"$id slept for $timeToSleep second(s).".PHP_EOL;
return
$timeToSleep;
}, [
$id, $channel]);
},
$taskIds);

wait($consumerFutures);
wait([$producerFuture]);
}

function
wait(array $futures)
{
return
array_map(function ($future) {
return
$future->value();
},
$futures);
}
up
2
rustysun
5 years ago
an example used unbuffered channel.
<?php

use parallel\{Channel,Runtime};

$sum=function(array $a, Channel $ch) {
$sum=0;
foreach (
$a as $v) {
$sum+=$v;
}
$ch->send($sum);
};
try {
$a=[7, 2, 8, 1, 4, 0, 9, 10];
//unbuffered channel
$runtime=new Runtime;
$ch2=new Channel;
$runtime->run($sum, [array_slice($a, 0, $num), $ch2]);
$runtime->run($sum, [array_slice($a, $num), $ch2]);
//receive from channel
$x=$ch2->recv();
$y=$ch2->recv();
$ch2->close();
echo
"\nch2:", $x, "\t", $y, "\t", $x + $y, "\n";
} catch(
Error $err) {
echo
"\nError:", $err->getMessage();
} catch(
Exception $e) {
echo
"\nException:", $e->getMessage();
}

//output
//ch2:18 23 41
up
2
gam6itko
3 years ago
<?php

// the very weird way to calculate factorial ^_^
// we create one thread and synching them with buffered channel
// at fact only one thread is executing at the time

use parallel\{Channel, Future, Runtime};

for (
$n = 0; $n <= 10; $n++) {
echo
"!$n = " . factorial($n) . PHP_EOL;
}

/**
* Creates $n threads.
*/
function factorial(int $n): int
{
// buffered channel - using for sync threads ^_^
$channel = new Channel(1);
$futureList = [];
for (
$i = 2; $i <= $n; $i++) {
$runtime = new Runtime();
$futureList[] = $runtime->run(
static function (
Channel $channel, $multiplier): void {
$f = $channel->recv();
$channel->send($f * $multiplier);
},
[
$channel, $i]
);
}

$channel->send(1);

// waiting until all threads are done
do {
$allDone = array_reduce(
$futureList,
function (
bool $c, Future $future): bool {

return
$c && $future->done();
},
true
);
} while (
false === $allDone);

return
$channel->recv();
}

// output:
// !0 = 1
// !1 = 1
// !2 = 2
// !3 = 6
// !4 = 24
// !5 = 120
// !6 = 720
// !7 = 5040
// !8 = 40320
// !9 = 362880
// !10 = 3628800
up
1
rustysun
5 years ago
<?php
use parallel\Channel;

function
sum(array $a, Channel $ch) {
$sum=0;
foreach (
$a as $v) {
$sum+=$v;
}
$ch->send($sum);
}

try {
$a=[7, 2, 8, 1, 4, 0, 9, 10];
$ch1=Channel::make('sum', 2);
$ch2=new Channel;
$num=count($a) / 2;
sum(array_slice($a, 0, $num), $ch1);
sum(array_slice($a, $num), $ch1);

//receive from channel
$x=$ch1->recv();
$y=$ch1->recv();
$ch1->close();
echo
"\nch1:", $x, "\t", $y, "\t", $x + $y, "\n";
} catch(
Error $err) {
echo
"\nError:", $err->getMessage();
} catch(
Exception $e) {
echo
"\nException:", $e->getMessage();
}
up
0
thierry at pielo dot net
9 months ago
<?php

/**
* Bzz reloaded!
* Run two simple tasks in parallel and synchronize them with a channel
*
* parallel\Channel(int $capacity): Buffered channel
* Creates a buffered channel for communication between tasks
* @ref https://www.php.net/manual/en/class.parallel-channel.php
*/

echo "zzz... " . PHP_EOL;

// Create new buffered channel
$channel = new \parallel\Channel(2);

\parallel\run(
function(
$channel) {
$snaps_count = rand (8, 12);
echo
"Number of snaps: $snaps_count" . PHP_EOL;
for (
$i=1; $i<=$snaps_count; $i++) {
$other_sleep_time = rand(3, 5);
$my_sleep_time = rand(1, 3);
echo
"Send sleep time to buffer" . PHP_EOL;
$start = microtime(true);
$channel->send($other_sleep_time);
$wait_time = microtime(true) - $start;
if (
$wait_time > .1) {
echo
"Buffer was full. I waited " . round($wait_time) . "s" . PHP_EOL;
}
echo
"I sleep for {$my_sleep_time}s" . PHP_EOL;
sleep($my_sleep_time);
}
echo
"I finished sleeping. Closing channel" . PHP_EOL;
$channel->close();
},
[
$channel]
);

\parallel\run(
function(
$channel) {
try {
while(
true) {
$my_sleep_time = $channel->recv();
echo
"Other sleeps for {$my_sleep_time}s" . PHP_EOL;
sleep($my_sleep_time);
}
} catch(
\parallel\Channel\Error\Closed $e) {
echo
"Channel is closed. Other die.";
die;
}
},
[
$channel]
);
To Top