导航菜单

PHP/并发与异步编程
课程进度 78% · 第18/22章18/22章 · 标签 1/4
1

多进程

进程创建与管理、进程间通信、进程池。

php
1
<?php
2
// 1. 基本进程创建
3
function create_process($callback) {
4
$pid = pcntl_fork();
5
if ($pid == -1) {
6
die("无法创建子进程");
7
} elseif ($pid) {
8
// 父进程
9
return $pid;
10
} else {
11
// 子进程
12
$callback();
13
exit(0);
14
}
15
}
16
 
17
// 2. 进程池示例
18
class ProcessPool {
19
private $size;
20
private $processes = [];
21
 
22
public function __construct($size) {
23
$this->size = $size;
24
}
25
 
26
public function start($callback) {
27
for ($i = 0; $i < $this->size; $i++) {
28
$pid = pcntl_fork();
29
if ($pid == -1) {
30
die("无法创建子进程");
31
} elseif ($pid) {
32
$this->processes[$pid] = true;
33
} else {
34
$callback($i);
35
exit(0);
36
}
37
}
38
}
39
 
40
public function wait() {
41
while (count($this->processes) > 0) {
42
$pid = pcntl_wait($status);
43
if ($pid > 0) {
44
unset($this->processes[$pid]);
45
}
46
}
47
}
48
}
49
 
50
// 3. 共享内存通信
51
function shared_memory_example() {
52
$key = ftok(__FILE__, "t");
53
$shm_id = shmop_open($key, "c", 0644, 100);
54
 
55
if (pcntl_fork() == 0) {
56
shmop_write($shm_id, "Hello from child", 0);
57
exit(0);
58
} else {
59
pcntl_wait($status);
60
$data = shmop_read($shm_id, 0, 100);
61
echo $data;
62
shmop_delete($shm_id);
63
shmop_close($shm_id);
64
}
65
}
66
 
67
// 4. 信号处理
68
function signal_handler($signo) {
69
switch ($signo) {
70
case SIGTERM:
71
echo "收到终止信号\n";
72
exit(0);
73
break;
74
case SIGCHLD:
75
echo "子进程结束\n";
76
pcntl_waitpid(-1, $status);
77
break;
78
}
79
}
80
pcntl_signal(SIGTERM, "signal_handler");
81
pcntl_signal(SIGCHLD, "signal_handler");
82
?>
2

多线程

线程创建与管理、线程同步、线程安全。

php
1
<?php
2
// 1. 使用pthreads扩展
3
class WorkerThread extends Thread {
4
private $id;
5
 
6
public function __construct($id) {
7
$this->id = $id;
8
}
9
 
10
public function run() {
11
echo "线程 {$this->id} 开始执行\n";
12
sleep(1);
13
echo "线程 {$this->id} 执行完成\n";
14
}
15
}
16
 
17
// 2. 线程池示例
18
class ThreadPool {
19
private $size;
20
private $threads = [];
21
 
22
public function __construct($size) {
23
$this->size = $size;
24
}
25
 
26
public function start() {
27
for ($i = 0; $i < $this->size; $i++) {
28
$thread = new WorkerThread($i);
29
$thread->start();
30
$this->threads[] = $thread;
31
}
32
}
33
 
34
public function wait() {
35
foreach ($this->threads as $thread) {
36
$thread->join();
37
}
38
}
39
}
40
 
41
// 3. 线程同步(Mutex)
42
class Counter {
43
private $value = 0;
44
private $mutex;
45
 
46
public function __construct() {
47
$this->mutex = Mutex::create();
48
}
49
 
50
public function increment() {
51
Mutex::lock($this->mutex);
52
$this->value++;
53
Mutex::unlock($this->mutex);
54
}
55
 
56
public function getValue() {
57
return $this->value;
58
}
59
}
60
 
61
// 4. 线程安全数据结构
62
class ThreadSafeArray extends Threaded {
63
public function add($value) {
64
$this->synchronized(function($value) {
65
$this[] = $value;
66
}, $value);
67
}
68
}
69
?>