各种进程间通信方式详解

1.基本原理

image-20241218145628555

进程地址空间的3g-4g是内核空间,0-3g是进程自己的,但是每个进程的3-4g是内核空间就一份,大家都相同的。

所以通信就是进程在内核空间中的一块缓冲区进行写而另外一个进程读从而完成了通信。这两个进程可以是父子进程,也可以不是。

2.基本概念

“IPC” 全称是 “Inter - Process Communication”,即进程间通信。

1.进程间通信的重要性

  • 在操作系统中,每个进程都有自己独立的地址空间。这使得进程之间的数据是相互隔离的,这种隔离性在保证进程独立性和安全性的同时,也带来了进程间共享数据和协同工作的困难。进程间通信机制就是为了解决这个问题,使得不同的进程能够相互交换信息、同步操作,从而共同完成复杂的任务。

2.IPC 方式分类汇总

1.基于内核的通信

  • 管道(Pipe)
    • 原理:管道是一种半双工的通信机制,它在操作系统内核中创建一个缓冲区,用于连接一个写进程和一个读进程。数据在管道中单向流动,写进程将数据写入管道缓冲区的一端,读进程从另一端读取数据。当管道缓冲区满时,写进程会被阻塞,直到读进程读取数据腾出空间;同理,当管道缓冲区空时,读进程会被阻塞,直到写进程写入数据。
    • 示例场景:在 Unix/Linux 系统中,pipe函数用于创建管道。常见于父子进程之间的通信,例如,一个进程用于生成数据(如执行ls命令列出文件列表),另一个进程用于处理这些数据(如对文件列表进行过滤)。
    • 优点:实现简单,使用方便,在简单的父子进程通信场景中效率较高。
    • 缺点:半双工通信方式限制了数据传输的灵活性;只能用于有亲缘关系(如父子进程)的进程之间通信;管道缓冲区大小有限,数据量较大时可能需要频繁地进行读写操作。
    • 匿名管道(Anonymous Pipe):主要用于父子进程之间的通信,它和管道类似,但是没有名字,在创建时直接返回文件描述符供父子进程使用。
    • 命名管道(Named Pipe):也叫 FIFO(先进先出)管道,它可以在不相关的进程之间进行通信。进程通过打开一个命名管道文件来进行读写操作,就像使用普通管道一样,但是它的名字在文件系统中是可见的,允许不同的进程通过文件名来访问它。
  • 信号(Signal)
    • 原理:信号是一种异步事件通知机制,用于通知进程发生了某个特定的事件。信号由操作系统内核发送给进程,进程可以选择忽略信号、执行默认的信号处理动作或者自定义信号处理函数来响应信号。
    • 示例场景:当用户在终端中按下Ctrl + C组合键时,内核会向当前正在运行的前台进程发送一个SIGINT(中断信号)。进程可以根据需要处理这个信号,比如安全地退出程序或者进行一些清理工作。
    • 优点:可以用于处理各种异步事件,如硬件中断、软件异常等;能够及时通知进程重要的事件发生。
    • 缺点:信号携带的信息量有限,通常只是一个信号编号;信号处理函数的执行时机是不确定的,可能会在进程执行的任意时刻被触发,这可能会影响进程的正常执行流程。
  • 共享映射区(Memory - Mapped Files)
    • 原理:共享映射区是将文件内容映射到进程的地址空间中,使得多个进程可以通过访问这个共享的内存区域来实现通信。进程对映射区域的操作就如同对文件进行操作一样,这些操作会直接反映在文件和其他共享该映射区域的进程中。
    • 示例场景:多个进程需要共同操作一个配置文件,通过将该配置文件映射到共享映射区,进程可以直接在内存中读取和修改配置信息,而不需要频繁地进行文件 I/O 操作。
    • 优点:结合了内存操作的高效性和文件存储的持久性;可以方便地在不相关的进程之间实现通信,只要它们能访问到同一个文件。
    • 缺点:对文件的操作需要注意同步问题,否则可能导致数据不一致;文件大小可能会限制共享映射区的大小。
  • 本地套接字(Unix Domain Socket)
    • 原理:本地套接字提供了一种类似于网络套接字的进程间通信方式,但它只用于本地进程之间的通信。它使用文件系统中的特殊文件(如 Unix 系统中的套接字文件)来标识通信端点,进程通过对这些文件进行读写操作来实现通信。
    • 示例场景:在一个复杂的本地服务系统中,不同的服务进程(如数据库服务进程和应用服务进程)之间通过本地套接字进行通信,以实现数据的请求和响应。
    • 优点:可以在不相关的本地进程之间进行通信;支持双向通信,通信方式灵活;与网络套接字类似,有成熟的编程接口。
    • 缺点:相比其他本地通信方式(如共享内存),可能会有一定的性能损耗,因为它涉及到操作系统内核的更多参与。
  • 共享内存(Shared Memory)
    • 原理:共享内存允许两个或多个进程共享同一块物理内存区域。这些进程将共享内存区域映射到自己的虚拟地址空间中,从而可以直接读写这块内存区域,就像访问自己的内存一样。为了避免多个进程同时访问共享内存导致数据不一致,通常需要配合使用信号量等同步机制。
    • 示例场景:在高性能的多进程计算任务中,如多进程并行处理一个大型数组,通过共享内存可以让每个进程快速地访问和修改数组中的元素。
    • 优点:是所有进程间通信方式中速度最快的,因为进程直接访问内存,不需要进行数据的复制和传输等操作。
    • 缺点:需要解决同步问题,否则容易出现数据竞争和不一致的情况;共享内存的管理(如创建、映射、删除等操作)相对复杂。
  • 信号量(Semaphore)
    • 原理:信号量是一种用于进程同步和互斥的机制,本质上是一个非负整数计数器。它有两个基本操作:P操作(也称为wait操作)和V操作(也称为signal操作)。P操作会将信号量的值减 1,如果信号量的值小于 0,则进程会被阻塞;V操作会将信号量的值加 1,如果信号量的值小于等于 0,则会唤醒一个被阻塞的进程。信号量主要用于控制对共享资源的访问。
    • 示例场景:在多个进程访问一个共享缓冲区时,通过信号量来控制同时访问缓冲区的进程数量,避免数据冲突。
    • 优点:有效地实现了进程之间的同步和互斥,保证了共享资源的合理访问。
    • 缺点:使用不当可能会导致死锁等问题;信号量的操作相对复杂,需要仔细设计。
  • 消息队列(Message Queue)
    • 原理:消息队列是一个由内核维护的消息链表。进程可以向消息队列发送消息,也可以从消息队列接收消息。消息队列有一个消息类型的属性,接收进程可以根据消息类型有选择地接收消息。当消息队列满时,发送消息的进程可能会被阻塞;当消息队列空时,接收消息的进程可能会被阻塞。
    • 示例场景:在一个分布式系统的本地节点中,不同的进程可以通过消息队列传递任务请求和结果,方便进行任务的分发和处理。
    • 优点:可以实现异步通信,发送和接收消息的进程不需要同时运行;消息队列可以存储多个消息,方便进行消息的缓存和排队。
    • 缺点:消息的传递需要在内核的消息队列中进行,相对共享内存等方式速度较慢;消息队列的大小有限,可能会出现消息满的情况。
共享内存和共享映射区的区别
  • 共享内存是直接在内存中开辟一块共享区域供进程访问,主要关注内存的共享访问。而共享映射区是将文件内容映射到内存中,重点在于将文件和内存关联起来,通过内存操作来间接操作文件。虽然它们都涉及内存共享的概念,但共享映射区更侧重于文件与内存的交互,共享内存则更侧重于进程间对内存的直接共享访问。

2.基于文件系统的通信原理

  • 原理:进程可以通过读写文件来进行通信。一个进程将数据写入文件,另一个进程从文件中读取数据。这种方式简单直观,但效率可能相对较低,因为文件系统操作涉及磁盘 I/O,速度比内存操作慢得多。而且,需要注意文件的并发访问问题,例如多个进程同时写一个文件可能会导致数据混乱,通常需要使用文件锁等机制来进行同步。

3.基于网络的通信原理(适用于不同主机上的进程通信)

  • 原理:在不同主机上的进程可以通过网络协议进行通信。最常见的是基于 TCP/IP 协议栈的通信方式。一个进程作为服务器,在指定的端口上监听客户端的连接请求。当客户端进程发起连接请求并建立连接后,双方可以通过套接字(Socket)进行数据的发送和接收。数据在网络中以数据包的形式传输,经过多个网络设备(如路由器、交换机等)的转发,最终到达目标进程。这种通信方式需要考虑网络协议、IP 地址、端口号、网络安全等诸多因素。

3.管道(使用最简单)

1.概述

管道是一种最基本的IPC机制,作用于有血缘关系的进程之间,完成数据传递。调用pipe系统函数即可创建一个管道。有如下特质:

  1. 其本质是一个伪文件(实为内核缓冲区)
  2. 由两个文件描述符引用,一个表示读端,一个表示写端
  3. 规定数据从管道的写端流入管道,从读端流出

管道的原理:

管道实为内核使用环形队列机制,借助内核缓冲区(4k)实现

管道的局限性:

1 数据不能进程自己写,自己读

2.管道中数据不可反复读取。一旦读走,管道中不再存在

3 采用双向半双工通信方式,数据只能在单方向上流动

4 只能在有公共祖先的进程间使用管道

双向半双工例子:

对于管道来说,一旦我进程间通信时,第一次发生数据交换是A进程读B进程写,那第二次就不可以A写B读了,只能是A读B写。

常见通信方式:单工通信、半双工通信、全双工通信。

注意:因为父子进程共享文件描述符所以父进程已经创建并打开的管道子进程也能用

2.使用

创建并打开管道

c
1
2
#include<unistd.h>
int pipe(int fd[2]);

参数:

fd[0]表示读端

fd[1]表示写端

返回值:成功0失败-1

父进程写子进程读:

一开始父子进程都持有读端和写端

image-20241218152941810

父进程关闭写端子进程关闭读端后就有一条明确的数据流通方向

image-20241218153042572

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/wait.h>

void sys_err(const char *str)
{
perror(str);
exit(1);
}

int main(void)
{
pid_t pid;
char buf[1024];
int fd[2];
char *p = "test for pipe\n";

if (pipe(fd) == -1)
sys_err("pipe");

pid = fork();
if (pid < 0) {
sys_err("fork err");
} else if (pid == 0) {
//子进程关闭写端
close(fd[1]);
int len = read(fd[0], buf, sizeof(buf));
write(STDOUT_FILENO, buf, len);
close(fd[0]);
} else {
//父进程关闭读端
close(fd[0]);
write(fd[1], p, strlen(p));
sleep(1);
wait(NULL);
close(fd[1]);
}

return 0;
}

image-20241218153547049

3.管道的读写行为

1.读管道

​ 1. 管道中有数据,read返回实际读到的字节数

​ 2.管道中无数据:

​ (1)管道写端被全部关闭,没有人会继续往管道写数据了,read返回0(好像读到文件结尾)

​ (2)写端没有全部被关闭,read阻塞等待(不久的将来可能有数据递达,此时会让出cpu)

2.写管道

  1. 管道读端全部被关闭,进程异常终止(也可使用捕捉SIGPIPE信号,使进程不终止)

  2. 管道读端没有全部关闭:

​ (1)管道已满,write阻塞

​ (2)管道未满,write将数据写入,并返回实际写入的字节数

3.练习

1.管道实现 ls|wc-l

使用管道实现父子进程间通信,完成:ls|wc-l。假定父进程实现ls,子进程实现wc

Is命令正常会将结果集写出到stdout,但现在会写入管道的写端;wc-l 正常应该从stdin读取数据,但
此时会从管道的读端读

1.创建打开管道

2.fork子进程

3.关闭父进程读端,关闭子进程写端

4.父进程调用execlp执行ls命令

5.子进程调用execlp执行wc -l

6.父进程调用dup2把标准输出重定向到管道写端(往管道写数据,原来ls的输出是在标准输出的,现在输出到管道)

7.子进程调用dup2把标准输入重定向到管道读端(从管道拿数据,原来wc从标准输入拿数据,现在从管道拿)

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <stdio.h>
#include <unistd.h>
#include <sys/wait.h>

int main(void)
{
pid_t pid;
int fd[2];

pipe(fd);
pid = fork();

if (pid == 0) { //child
close(fd[1]); //子进程从管道中读数据,关闭写端
dup2(fd[0], STDIN_FILENO); //让wc从管道中读取数据
execlp("wc", "wc", "-l", NULL); //wc命令默认从标准读入取数据

} else {

close(fd[0]); //父进程向管道中写数据,关闭读端
dup2(fd[1], STDOUT_FILENO); //将ls的结果写入管道中
execlp("ls", "ls", NULL); //ls输出结果默认对应屏幕
}

return 0;
}
  • 程序不时的会出现先打印$提示符,再出程序运行结果的现象。
  • 这是因为:父进程执行ls命令,将输出结果给通过管道传递给子进程去执行wc命令,这时父进程若先于子进程打印wc运行结果之前被shell使用wait函数成功回收,shell就会先于子进程打印wc运行结果之前打印$提示符。
  • 在这之中子进程一定得等父进程写完数据以后才会执行自己的代码,所以一定是父进程先执行完毕。
  • 所以解决方法:让子进程执行ls,父进程执行wc命令。或者在兄弟进程间完成。

2.管道实现兄弟进程通信

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <stdio.h>
#include <unistd.h>
#include <sys/wait.h>

int main(void)
{
pid_t pid;
int fd[2], i;

pipe(fd);

for (i = 0; i < 2; i++) {
if((pid = fork()) == 0) {
break;
}
}

if (i == 0) { //兄
close(fd[0]); //写,关闭读端
dup2(fd[1], STDOUT_FILENO);
execlp("ls", "ls", NULL);
} else if (i == 1) { //弟
close(fd[1]); //读,关闭写端
dup2(fd[0], STDIN_FILENO);
execlp("wc", "wc", "-l", NULL);
} else {
close(fd[0]);
close(fd[1]);
for(i = 0; i < 2; i++) //两个儿子wait两次
wait(NULL);
}

return 0;
}

image-20241218200410336

注意要关闭父进程持有的读端和写端,不然形不成数据的单向流动

3.测试管道是否允许一个pipe有一个写端多个读端?有一个读端多个写端?

是允许的,但是一般都是写成一个读端一个写端

  1. 管道允许一个读端多个写端
    • 管道是可以有一个读端和多个写端的。这在很多场景下是非常有用的,例如在日志系统中,多个不同的进程可以作为写端向一个管道写入日志信息,而一个专门的日志收集进程作为读端从管道中读取这些日志信息进行处理。多个写端可以同时向管道写入数据,不过需要注意数据的同步问题,因为如果多个写端同时写入可能会导致数据混乱,通常需要配合信号量等同步机制来保证数据的有序写入。
  2. 管道也允许一个写端多个读端
    • 管道同样允许一个写端多个读端。当数据被写入管道后,所有的读端都可以读取到这些数据。数据从管道中被读取后,对于管道中的其他读端来说,数据仍然存在(只要没有被其他读端全部读取完)。
    • 例如,在一个数据分发系统中,一个进程作为写端向管道写入数据,多个其他进程作为读端可以从管道中读取相同的数据进行不同的处理,如一个读端用于数据显示,另一个读端用于数据存储等。管道中的数据是可以被多个读端共享读取的,并不是一个读端读取后数据就消失了。管道内部维护了一个缓冲区,数据存储在这个缓冲区中,读端从缓冲区读取数据,只要缓冲区中的数据没有被全部读取,其他读端仍然可以读取剩余的数据。

测试代码:

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <stdio.h>
#include <unistd.h>
#include <sys/wait.h>
#include <string.h>
#include <stdlib.h>

int main(void)
{
pid_t pid;
int fd[2], i, n;
char buf[1024];

int ret = pipe(fd);
if(ret == -1){
perror("pipe error");
exit(1);
}

for(i = 0; i < 2; i++){
if((pid = fork()) == 0)
break;
else if(pid == -1){
perror("pipe error");
exit(1);
}
}

if (i == 0) {
close(fd[0]);
write(fd[1], "1.hello\n", strlen("1.hello\n"));
} else if(i == 1) {
close(fd[0]);
write(fd[1], "2.world\n", strlen("2.world\n"));
} else {
close(fd[1]); //父进程关闭写端,留读端读取数据
sleep(1); //sleep的原因是为了让两个写端都把数据写上,而不是就只有其中一个写上之后父进程就读了
n = read(fd[0], buf, 1024); //从管道中读数据
write(STDOUT_FILENO, buf, n);

for(i = 0; i < 2; i++) //两个儿子wait两次
wait(NULL);
}

return 0;
}

  • 当两个子进程快速地向管道写入数据时,管道缓冲区可能在第一个子进程写入1.hello\n后,父进程就开始读取数据。由于管道缓冲区的数据可能没有被第二个子进程的2.world\n完全覆盖或者父进程读取操作已经完成,就可能导致父进程只读取到1.hello\n而没有读取到2.world\n

  • 管道的读取操作在缓冲区有数据时就会开始读取,而不会等待所有子进程都写入数据。sleep函数在这里起到了让父进程暂停一下的作用,给两个子进程足够的时间将数据都写入管道缓冲区,从而保证父进程能够读取到两个子进程写入的完整数据。

4.管道大小

默认4KB

image-20241218201417711

5.管道优劣

**优点:**简单,相比信号,套接字实现进程间通信,简单很多

缺点:

  • 1.只能单向通信,双向通信需建立两个管道
  • 2.只能用于父子、兄弟进程(有共同祖先)间通信。该问题后来使用fifo有名管道解决

6.有名管道FIFO

FIFO常被称为命名管道,以区分管道(pipe)。管道(pipe)只能用于“有血缘关系”的进程间。但通过FIFO,不相
关的进程也能交换数据。

FIFO是Linux基础文件类型中的一种。但,FIFO文件在磁盘上没有数据块,仅仅用来标识内核中一条通道。备
进程可以打开这个文件进行read/write,实际上是在读写内核通道,这样就实现了进程间通信。

创建方式:

1.命令:mkfifo 管道名

plaintext
1
2
mkfifo 管道名
mkfifo myfifo

2.库函数:

c
1
2
3
#include<sys/types.h>
#include<sys/stat.h>
int mkfifo(const char *pathname,mode_t mode);

参数:

pathname:文件名

mode:8进制的权限,比如0644这种的

**返回值:**成功:0;失败 :- 1

一旦使用mkfifo创建了一个FIFO,就可以使用open打开它,常见的文件I/O函数都可用于fifo。如:close、read、
write、unlink等。

例子:

c
1
2
3
int ret = mkfifo("mytestfifo", 0664);
if (ret == -1)
sys_err("mkfifo error");

实现没有血缘关系的进程通信

读端

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <stdio.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <fcntl.h>
#include <stdlib.h>
#include <string.h>

void sys_err(char *str)
{
perror(str);
exit(1);
}

int main(int argc, char *argv[])
{
int fd, len;
char buf[4096];

if (argc < 2) {
printf("./a.out fifoname\n");
return -1;
}
//不提前创建fifo,直接写fifo
//int fd = mkfifo("testfifo", 644);
//open(fd, ...);
fd = open(argv[1], O_RDONLY); // 打开管道文件
if (fd < 0)
sys_err("open");
while (1) {
len = read(fd, buf, sizeof(buf)); // 从管道的读端获取数据
write(STDOUT_FILENO, buf, len);
sleep(3); //多個读端时应增加睡眠秒数,放大效果.
}
close(fd);

return 0;
}

写端

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <stdio.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <fcntl.h>
#include <stdlib.h>
#include <string.h>

void sys_err(char *str)
{
perror(str);
exit(-1);
}

int main(int argc, char *argv[])
{
int fd, i;
char buf[4096];

if (argc < 2) {
printf("Enter like this: ./a.out fifoname\n");
return -1;
}
fd = open(argv[1], O_WRONLY); //打开管道文件
if (fd < 0)
sys_err("open");

i = 0;
while (1) {
sprintf(buf, "hello itcast %d\n", i++);

write(fd, buf, strlen(buf)); // 向管道写数据
sleep(1);
}
close(fd);

return 0;
}

4.文件实现进程间通信

已经过时的东西,过一遍有个印象即可

父子进程

这个理所当然可以通信,因为父子进程文件描述符表都是一样的

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/* 
*父子进程共享打开的文件描述符------使用文件完成进程间通信.
*/
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/wait.h>


int main(void)
{
int fd1, fd2; pid_t pid;
char buf[1024];
char *str = "---------test for shared fd in parent child process-----\n";

pid = fork();
if (pid < 0) {
perror("fork error");
exit(1);
} else if (pid == 0) {
fd1 = open("test.txt", O_RDWR);
if (fd1 < 0) {
perror("open error");
exit(1);
}
sleep(3);
write(fd1, str, strlen(str));
printf("child wrote over...\n");

} else {
fd2 = open("test.txt", O_RDWR);
if (fd2 < 0) {
perror("open error");
exit(1);
}
// sleep(1); //保证子进程写入数据

int len = read(fd2, buf, sizeof(buf));
printf("------parent read len = %d\n", len);
len = write(STDOUT_FILENO, buf, len);
printf("------parent write len = %d\n", len);

wait(NULL);
}

return 0;
}

无血缘关系

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/*
* 先执行,将数据写入文件test.txt
*/
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <string.h>

#define N 5

int main(void)
{
char buf[1024];
char *str = "--------------secesuss-------------\n";
int ret;

int fd = open("test.txt", O_RDWR|O_TRUNC|O_CREAT, 0664);

//直接打开文件写入数据
write(fd, str, strlen(str));
printf("test1 write into test.txt finish\n");

sleep(N);

lseek(fd, 0, SEEK_SET);
ret = read(fd, buf, sizeof(buf));
ret = write(STDOUT_FILENO, buf, ret);

if (ret == -1) {
perror("write second error");
exit(1);
}

close(fd);

return 0;
}

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/*
* 后执行,尝试读取另外一个进程写入文件的内容
*/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <fcntl.h>
#include <string.h>

int main(void)
{
char buf[1024];
char *str = "----------test2 write secesuss--------\n";
int ret;

sleep(2); //睡眠2秒,保证test1将数据写入test.txt文件

int fd = open("test.txt", O_RDWR);

//尝试读取test.txt文件中test1写入的数据
ret = read(fd, buf, sizeof(buf));

//将读到的数据打印至屏幕
write(STDOUT_FILENO, buf, ret);

//写入数据到文件test.txt中, 未修改读写位置
write(fd, str, strlen(str));

printf("test2 read/write finish\n");

close(fd);

return 0;
}

5.共享映射区(无血缘关系用的)

1.概述

  • 原理:共享映射区是将文件内容映射到进程的地址空间中,使得多个进程可以通过访问这个共享的内存区域来实现通信。进程对映射区域的操作就如同对文件进行操作一样,这些操作会直接反映在文件和其他共享该映射区域的进程中。

  • 示例场景:多个进程需要共同操作一个配置文件,通过将该配置文件映射到共享映射区,进程可以直接在内存中读取和修改配置信息,而不需要频繁地进行文件 I/O 操作。

  • 优点:结合了内存操作的高效性和文件存储的持久性;可以方便地在不相关的进程之间实现通信,只要它们能访问到同一个文件。

  • 缺点:对文件的操作需要注意同步问题,否则可能导致数据不一致;文件大小可能会限制共享映射区的大小。

**存储映射I/O(Memory-mapped l/O)使一个磁盘文件与内存存储空间中的一个缓冲区相映射。**于是当从缓冲区中取数据,就相当于读文件中的相应字节。于此类似,将数据存入缓冲区,则相应的字节就自动写入文件。这样,就可在不适用read和write函数的情况下,使用地址(指针)完成I/O操作。

使用这种方法,首先应通知内核,将一个指定文件映射到存储区域中。这个映射工作可以通过mmap函数来实
现。

2.mmap&&munmap函数

创建共享内存映射

c
1
2
include<sys/mman.h>
void *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset);

参数:

  • addr: 指定映射区的首地址。通常传NULL,表示让系统自动分配

  • length:共享内存映射区的大小。(<= 文件的实际大小)

  • prot: 共享内存映射区的读写属性。PROT_READ、PROT_WRITE、PROT_READ|PROT_WRITE

  • flags: 标注共享内存的共享属性,共享就是对文件的修改会写回磁盘,私有就不会写回磁盘。MAP_SHARED、MAP_PRIVATE

  • fd: 用于创建共享内存映射区的那个文件的 文件描述符,就是要映射到内存的文件

  • offset:默认0,表示映射文件全部。偏移位置,从哪里开始映射。需是 4k 的整数倍

返回值:

成功:映射区的首地址

失败:MAP_FAILED (void*(-1)), errno—-就是把-1强转为void *了

释放共享内存映射

c
1
int munmap(void *addr, size_t length);

参数

addr:mmap 的返回值,共享内存映射首地址

length:大小

返回值

成功0,失败-1

函数使用

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>

void sys_err(const char *str)
{
perror(str);
exit(1);
}

int main(int argc, char *argv[])
{
char *p = NULL;
int fd;

fd = open("testmap", O_RDWR|O_CREAT|O_TRUNC, 0644); // 创建文件用于创建映射区
if (fd == -1)
sys_err("open error");
/*
lseek(fd, 10, SEEK_END); // 两个函数等价于 ftruncate()函数
write(fd, "\0", 1);
*/
ftruncate(fd, 20); // 需要借助写权限,才能够对文件进行拓展
int len = lseek(fd, 0, SEEK_END);

p = mmap(NULL, len, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
if (p == MAP_FAILED) {
sys_err("mmap error");
}

// 使用 p 对文件进行读写操作.
strcpy(p, "hello mmap"); // 写操作

printf("----%s\n", p); // 读操作

int ret = munmap(p, len); // 释放映射区
if (ret == -1) {
sys_err("munmap error");
}

return 0;
}

3.mmap注意事项

思考 :

  1. 可以open的时候O_CREAT一个新文件来创建映射区吗 ?

  2. 如果open时O_RDONLY,mmap时PROT参数指定PROT_READ|PROT_WRITE会怎样 ?

  3. 文件描述符先关闭,对mmap映射有没有影响 ?

  4. 如果文件偏移量为1000会怎样 ?

  5. 对mem越界操作会怎样?

  6. 如果mem++,munmap可否成功 ?

  7. mmap什么情况下会调用失败 ?

很多参数都会导致失败

  1. 如果不检测mmap的返回值,会怎样?

    会死得很惨

使用注意事项:

  1. 用于创建映射区的文件大小为 0,却指定非0大小创建映射区,出 “总线错误”。
  2. 用于创建映射区的文件大小为 0,也指定0大小创建映射区, 出 “无效参数”。
  3. 用于创建映射区的文件读写属性为,只读,映射区属性为 读、写。 出 “无效参数”; 文件和映射区都是只读的是可以的;文件只有写权限,映射区只有写权限也会报错。(2答案)
  4. 创建映射区,需要read权限。当访问权限指定为 “共享”MAP_SHARED是, mmap的读写权限应该 <=文件的open权限。 映射区只写不行。
  5. 文件描述符fd,在mmap创建映射区完成即可关闭。后续访问文件,用 地址访问。(3答案)
  6. offset 必须是 4096的整数倍。(MMU 映射的最小单位 4k )(4答案)
  7. 对申请的映射区内存,不能越界访问。 (5答案)
  8. 读写都没问题,但是munmap会失败,munmap用于释放的 地址,必须是mmap申请返回的地址。(6答案)
  9. 映射区访问权限为 “私有”MAP_PRIVATE, 对内存所做的所有修改,只在内存有效,不会反应到物理磁盘上。
  10. 映射区访问权限为 “私有”MAP_PRIVATE, 只需要open文件时,文件有读权限,用于创建映射区即可。

image-20241218222315367

mmap函数的保险调用方式:

c
1
2
1. fd = open("文件名", O_RDWR);
2. mmap(NULL, 有效文件大小, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);

4.mmap实现进程通信

父子进程

父子等有血缘关系的进程之间也可以通过mmap建立的映射区来完成数据通信。

但相应的要在创建映射区的时候指定对应的标志位参数flags:

MAP_PRIVATE:(私有映射)父子进程各自独占映射区;

MAP_SHARED:(共享映射) 父子进程共享映射区;

结论:

父子进程共享:1. 打开的文件 2.mmap建立的映射区(但必须要使用MAP_SHARED)

流程:

父子进程使用 mmap 进程间通信:

1.父进程 先 创建映射区。 open( O_RDWR) mmap( MAP_SHARED );

2.指定 MAP_SHARED 权限

3.fork() 创建子进程。

4.一个进程读, 另外一个进程写。

练习

练习:父进程创建映射区,然后fork子进程,子进程修改映射区内容,而后,父进程读取映射区内容,查验是
否共享

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/wait.h>

int var = 100;

int main(void)
{
int *p;
pid_t pid;

int fd = open("temp", O_RDWR);

//p = (int *)mmap(NULL, 40, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
p = (int *)mmap(NULL, 490, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
//p = (int *)mmap(NULL, 4, PROT_READ|PROT_WRITE, MAP_PRIVATE, fd, 0); 私有的不行
if(p == MAP_FAILED){ //注意:不是p == NULL
perror("mmap error");
exit(1);
}
close(fd);

pid = fork(); //创建子进程
if(pid == 0){
*p = 7000; // 写共享内存
var = 1000;
printf("child, *p = %d, var = %d\n", *p, var);
} else {
sleep(1);
printf("parent, *p = %d, var = %d\n", *p, var); // 读共享内存
wait(NULL);

int ret = munmap(p, 4); //释放映射区
if (ret == -1) {
perror("munmap error");
exit(1);
}
}

return 0;
}

无血缘关系

流程: 【要求会写】

1.两个进程 打开同一个文件,创建映射区。

2.指定flags 为 MAP_SHARED。

3.一个进程写入,另外一个进程读出。

【注意】:无血缘关系进程间通信。

​ mmap:数据可以重复读取。

​ fifo:数据只能一次读取。

读端

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <unistd.h>
#include <errno.h>

struct student {
int id;
char name[256];
int age;
};

void sys_err(const char *str)
{
perror(str);
exit(1);
}

int main(int argc, char *argv[])
{
struct student stu;
struct student *p;
int fd;

fd = open("test_map", O_RDONLY);
if (fd == -1)
sys_err("open error");

p = mmap(NULL, sizeof(stu), PROT_READ, MAP_SHARED, fd, 0);
if (p == MAP_FAILED)
sys_err("mmap error");

close(fd);

while (1) {
printf("id= %d, name=%s, age=%d\n", p->id, p->name, p->age);
usleep(10000);
}

munmap(p, sizeof(stu));

return 0;
}

写端

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <unistd.h>
#include <errno.h>

struct student {
int id;
char name[256];
int age;
};

void sys_err(const char *str)
{
perror(str);
exit(1);
}

int main(int argc, char *argv[])
{
struct student stu = {1, "xiaoming", 18};
struct student *p;
int fd;

// fd = open("test_map", O_RDWR|O_CREAT|O_TRUNC, 0664);
fd = open("test_map", O_RDWR);
if (fd == -1)
sys_err("open error");

ftruncate(fd, sizeof(stu));

p = mmap(NULL, sizeof(stu), PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
if (p == MAP_FAILED)
sys_err("mmap error");

close(fd);

while (1) {
memcpy(p, &stu, sizeof(stu));
stu.id++;
sleep(2);
}

munmap(p, sizeof(stu));

return 0;
}

5.mmap匿名映射区

匿名映射:只能用于 血缘关系(父子)进程间通信。

c
1
p = (int *)mmap(NULL, 40, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);

映射区大小想要多少写多少

权限想要啥写啥

文件描述符的地方传-1

flags要 | 下面提到的两个宏

image-20241218224059685

/dev/zero 从这个文件里面拿数据可以随便拿,想要多大拿多大的数据,只不过读出来都是文件空洞

6.本地套接字(最稳定)

进程间通信(IPC): pipe、fifo、mmap、信号、本地套接字(domain)— CS模型

1.本地套接字和网络套接字对比

对比网络编程 TCP C/S模型, 注意以下几点:

1.参数

cpp
1
2
3
4
int socket(int domain, int type, int protocol); 
domain:AF_INET --> AF_UNIX/AF_LOCAL

type: SOCK_STREAM/SOCK_DGRAM 都可以。

2.地址结构:

image-20241219112409410

image-20241219113047440

以下程序将UNIX Domain socket绑定到一个地址。

c
1
2
3
4
5
6
size =offsetof(struct sockaddr_un, sun_path) + strlen(un.sun_path);
//len=2+strlen("srv.socket");这么写也行但并不规范 和下面这行代码等价
//len = offsetof(struct sockaddr_un, sun_path) + strlen("srv.socket");
//其实就是offsetof(struct sockaddr_un, sun_path)=2,因为是16位,所以是两个字节
//偏移2个字节就是路径名了
#define offsetof(type, member) ((int) &((type*)0)->MEMBER)
cpp
1
2
3
4
5
6
7
8
9
10
11
sockaddr_in --> sockaddr_un

struct sockaddr_in srv_addr; --> struct sockaddr_un srv_adrr;

srv_addr.sin_family = AF_INET; --> srv_addr.sun_family = AF_UNIX;

srv_addr.sin_port = htons(8888); --> strcpy(srv_addr.sun_path, "srv.socket")

srv_addr.sin_addr.s_addr = htonl(INADDR_ANY); /*这是本地套接字bind的参数*/len = offsetof(struct sockaddr_un, sun_path) + strlen("srv.socket");

bind(fd, (struct sockaddr *)&srv_addr, sizeof(srv_addr)); --> bind(fd, (struct sockaddr *)&srv_addr, len);

不用ip和port,改成用socket文件名了

  1. bind()函数调用成功,会创建一个 socket。因此为保证bind成功,通常我们在 bind之前, 可以使用 unlink(“srv.socket”);

  2. 客户端不能依赖 “隐式绑定”。并且应该在通信建立过程中,创建且初始化2个地址结构:

    1.client_addr --> bind()
    

    ​ 2.server_addr –> connect();

2.本地套接字实现进程间通信

实现server和client这两个进程间的通信,其实就是各自建立一个socket进行通信

image-20241219113454378

serve.c

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include <stdio.h>
#include <unistd.h>
#include <sys/socket.h>
#include <strings.h>
#include <string.h>
#include <ctype.h>
#include <arpa/inet.h>
#include <sys/un.h>
#include <stddef.h>

#include "wrap.h"

#define SERV_ADDR "serv.socket"

int main(void)
{
int lfd, cfd, len, size, i;
struct sockaddr_un servaddr, cliaddr;
char buf[4096];

lfd = Socket(AF_UNIX, SOCK_STREAM, 0);

bzero(&servaddr, sizeof(servaddr));
servaddr.sun_family = AF_UNIX;
strcpy(servaddr.sun_path, SERV_ADDR);

len = offsetof(struct sockaddr_un, sun_path) + strlen(servaddr.sun_path); /* servaddr total len */

unlink(SERV_ADDR); /* 确保bind之前serv.sock文件不存在,bind会创建该文件,不然有重名的bind就会失败 */
Bind(lfd, (struct sockaddr *)&servaddr, len); /* 参3不能是sizeof(servaddr) */

Listen(lfd, 20);

printf("Accept ...\n");
while (1) {
len = sizeof(cliaddr); //AF_UNIX大小+108B

////len=2+客户端socket文件名的字节数
cfd = Accept(lfd, (struct sockaddr *)&cliaddr, (socklen_t *)&len);

//把2减去就是剩下了客户端socket文件名的字节数
len -= offsetof(struct sockaddr_un, sun_path); /* 得到文件名的长度 */
cliaddr.sun_path[len] = '\0'; /* 确保打印时,没有乱码出现 */

printf("client bind filename %s\n", cliaddr.sun_path);

while ((size = read(cfd, buf, sizeof(buf))) > 0) {
for (i = 0; i < size; i++)
buf[i] = toupper(buf[i]);
write(cfd, buf, size);
}
close(cfd);
}
close(lfd);

return 0;
}

client.c

初始化两个地址结构

1.客户端自己的socket的地址结构

2.服务器端的socket地址结构(用于connect,连接客户端和这个服务器)

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <strings.h>
#include <string.h>
#include <ctype.h>
#include <arpa/inet.h>
#include <sys/un.h>
#include <stddef.h>

#include "wrap.h"

#define SERV_ADDR "serv.socket"
#define CLIE_ADDR "clie.socket"

int main(void)
{
int cfd, len;
struct sockaddr_un servaddr, cliaddr;
char buf[4096];

cfd = Socket(AF_UNIX, SOCK_STREAM, 0);

bzero(&cliaddr, sizeof(cliaddr));
cliaddr.sun_family = AF_UNIX;
strcpy(cliaddr.sun_path,CLIE_ADDR);

len = offsetof(struct sockaddr_un, sun_path) + strlen(cliaddr.sun_path); /* 计算客户端地址结构有效长度 */

unlink(CLIE_ADDR);//确保bind之前clie.socket不存在,bind会创建该文件。不然有重名的bind就会失败
Bind(cfd, (struct sockaddr *)&cliaddr, len); /* 客户端也需要bind, 不能依赖自动绑定*/

/////////////////////////////////////////////////
//在此之前的东西都是和服务器端一样的


//初始化服务器端地址结构,用于连接
bzero(&servaddr, sizeof(servaddr)); /* 构造server 地址 */
servaddr.sun_family = AF_UNIX;
strcpy(servaddr.sun_path, SERV_ADDR);

len = offsetof(struct sockaddr_un, sun_path) + strlen(servaddr.sun_path); /* 计算服务器端地址结构有效长度 */

Connect(cfd, (struct sockaddr *)&servaddr, len);

//小写转大写
while (fgets(buf, sizeof(buf), stdin) != NULL) {
write(cfd, buf, strlen(buf));
len = read(cfd, buf, sizeof(buf));
write(STDOUT_FILENO, buf, len);
}

close(cfd);

return 0;
}

warp.c

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <sys/socket.h>

void perr_exit(const char *s)
{
perror(s);
exit(-1);
}

int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr)
{
int n;

again:
if ((n = accept(fd, sa, salenptr)) < 0) {
if ((errno == ECONNABORTED) || (errno == EINTR))
goto again;
else
perr_exit("accept error");
}
return n;
}

int Bind(int fd, const struct sockaddr *sa, socklen_t salen)
{
int n;

if ((n = bind(fd, sa, salen)) < 0)
perr_exit("bind error");

return n;
}

int Connect(int fd, const struct sockaddr *sa, socklen_t salen)
{
int n;

if ((n = connect(fd, sa, salen)) < 0)
perr_exit("connect error");

return n;
}

int Listen(int fd, int backlog)
{
int n;

if ((n = listen(fd, backlog)) < 0)
perr_exit("listen error");

return n;
}

int Socket(int family, int type, int protocol)
{
int n;

if ((n = socket(family, type, protocol)) < 0)
perr_exit("socket error");

return n;
}

ssize_t Read(int fd, void *ptr, size_t nbytes)
{
ssize_t n;

again:
if ( (n = read(fd, ptr, nbytes)) == -1) {
if (errno == EINTR)
goto again;
else
return -1;
}
return n;
}

ssize_t Write(int fd, const void *ptr, size_t nbytes)
{
ssize_t n;

again:
if ( (n = write(fd, ptr, nbytes)) == -1) {
if (errno == EINTR)
goto again;
else
return -1;
}
return n;
}

int Close(int fd)
{
int n;
if ((n = close(fd)) == -1)
perr_exit("close error");

return n;
}

/*参三: 应该读取的字节数*/
ssize_t Readn(int fd, void *vptr, size_t n)
{
size_t nleft; //usigned int 剩余未读取的字节数
ssize_t nread; //int 实际读到的字节数
char *ptr;

ptr = vptr;
nleft = n;

while (nleft > 0) {
if ((nread = read(fd, ptr, nleft)) < 0) {
if (errno == EINTR)
nread = 0;
else
return -1;
} else if (nread == 0)
break;

nleft -= nread;
ptr += nread;
}
return n - nleft;
}

ssize_t Writen(int fd, const void *vptr, size_t n)
{
size_t nleft;
ssize_t nwritten;
const char *ptr;

ptr = vptr;
nleft = n;
while (nleft > 0) {
if ( (nwritten = write(fd, ptr, nleft)) <= 0) {
if (nwritten < 0 && errno == EINTR)
nwritten = 0;
else
return -1;
}

nleft -= nwritten;
ptr += nwritten;
}
return n;
}

static ssize_t my_read(int fd, char *ptr)
{
static int read_cnt;
static char *read_ptr;
static char read_buf[100];

if (read_cnt <= 0) {
again:
if ( (read_cnt = read(fd, read_buf, sizeof(read_buf))) < 0) {
if (errno == EINTR)
goto again;
return -1;
} else if (read_cnt == 0)
return 0;
read_ptr = read_buf;
}
read_cnt--;
*ptr = *read_ptr++;

return 1;
}

ssize_t Readline(int fd, void *vptr, size_t maxlen)
{
ssize_t n, rc;
char c, *ptr;

ptr = vptr;
for (n = 1; n < maxlen; n++) {
if ( (rc = my_read(fd, &c)) == 1) {
*ptr++ = c;
if (c == '\n')
break;
} else if (rc == 0) {
*ptr = 0;
return n - 1;
} else
return -1;
}
*ptr = 0;

return n;
}


warp.h

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#ifndef __WRAP_H_
#define __WRAP_H_

void perr_exit(const char *s);
int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr);
int Bind(int fd, const struct sockaddr *sa, socklen_t salen);
int Connect(int fd, const struct sockaddr *sa, socklen_t salen);
int Listen(int fd, int backlog);
int Socket(int family, int type, int protocol);
ssize_t Read(int fd, void *ptr, size_t nbytes);
ssize_t Write(int fd, const void *ptr, size_t nbytes);
int Close(int fd);
ssize_t Readn(int fd, void *vptr, size_t n);
ssize_t Writen(int fd, const void *vptr, size_t n);
ssize_t my_read(int fd, char *ptr);
ssize_t Readline(int fd, void *vptr, size_t maxlen);

#endif

3.本地套接字和网络套接字实现通信对比

image-20241219115642257

7.信号量

1.信号量原语

多个进程同时访问系统上某个资源时,如同时写一个数据库的某条记录,或同时修改某个文件,就需要考虑进程同步问题,以确保任一时刻只有一个进程可以拥有对资源的独占式访问。通常,进程对共享资源的访问的代码只是很短的一段**,但这段代码引发了进程之间的竞态条件,我们称这段代码为关键代码区,或临界区,对进程同步,就是确保任一时刻只有一个进程能进入关键代码段。**

Dekker算法和Peterson算法试图从语言本身(不需要内核支持)解决进程同步问题,但它们依赖于忙等待,即进程要持续不断地等待某个内存位置状态的改变,这种方式的CPU利用率太低,不可取。

Dijkstra提出的信号量(Semaphore)是一种特殊的变量,它只能取自然数值且只支持两种操作:等待(wait)和信号(signal)。但在Linux/UNIX中,等待和信号都已经具有特殊含义,所以对信号量的这两种操作更常用的称呼是P、V操作,这两个字母来自荷兰语单词passeren(传递,就好像进入临界区)和vrijgeven(释放,就好像退出临界区)。

假设有信号量SV,对它的P、V操作含义如下:

  • P(SV),如果SV的值大于0,就将它减1,如果SV的值为0,则挂起进程的执行。
  • V(SV),如果有其他进程因为等待SV而挂起,则唤醒之,如果没有,则将SV加1。

信号量的取值可以是任何自然数,但最常用的、最简单的信号量是二进制信号量,它只能取0或1两个值,我们仅讨论二进制信号量。使用二进制信号量同步两个进程,以确保关键代码段的独占式访问的例子:

img

上图中,当关键代码段可用时,二进制信号量SV的值为1,进程A和B都有机会进入关键代码段,如果此时进程A执行了P(SV)操作将SV减1,则进程B再执行P(SV)操作就会被挂起,直到进程A离开关键代码段,并执行V(SV)操作将SV加1,关键代码段才重新变得可用。

不能使用普通变量来模拟二进制信号量,因为所有高级语言都没有一个原子操作可以同时完成以下两步操作:检测变量是否为true/false,如果是则将它设置为false/true。

Linux信号量的API定义在sys/sem.h头文件中,主要包括3个系统调用:semgetsemopsemctl。它们被设计为操作一组信号量,即信号量集,而不是单个信号量。

2.semget 系统调用

semget系统调用创建一个新的信号量集,或获取一个已经存在的信号量集。

cpp
1
2
#include <sys/sem.h>
int semget(key_t key, int num_sems, int sem_flags);
参数

key:键值,用来标志全局唯一的信号量集,要通过信号量通信的进程需要使用相同的键值来创建/获取该信号量。

num_sems:指定要创建/获取的信号量集中信号量的数目,如果是创建信号量,该值必须指定,如果是获取已经存在的信号量,该值可以设置为0。

sem_flags:指定一组标志,低端的9个bite是信号量的权限,格式和含义与openmode参数一致。此外,它可以和IPC_CREAT标志做按位或运算以创建新的信号量集。还可以联合使用IPC_CREATIPC_EXCL标志确保创建新的、唯一的信号量集,如果这时候该信号量集已经存在,semget返回错误并设置errno为EEXIST

返回值

semget成功返回一个正整数,也就是信号量集的标识符,失败返回-1并设置errno。

如果用semget创建一个新的信号量集,与之相关的内核数据结构体semid_ds将被创建并初始化。

plaintext
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
struct semid_ds {
struct ipc_perm sem_perm; /* 信号量操作权限 */
unsigned long int sem_nsems; /* 该信号量集中的信号量数目 */
time_t sem_otime; /* 最后一次调用 semop 的时间 */
time_t sem_ctime; /* 最后一次调用 semctl 的时间 */
/* 省略其他填充字段 */
}:

struct ipc_perm{
key_t key; /* 键值 */
uid_t uid; /* 所有者的用户id */
gid_t gid; /* 所有者的组id */
uid_t cuid; /* 创建者的用户id */
git_t cgid; /* 创建者的组id */
mode_t mode; /* 访问权限 */
/* 省略其他填充字段 */
};

img

3.semop 系统调用

semop系统调用改变信号量的值,即执行P、V操作,在讨论semop函数前,先介绍与每个信号量关联的一些重要的内核变量:

plaintext
1
2
3
4
unsigned short semval; 			/* 信号量的值 */
unsigned short semzcnt; /* 等待信号量变为0的进程数量 */
unsigned short semncnt; /* 等待信号量值增加的进程数量 */
pid_t sempid; /* 最后一次执行 semop 操作的进程ID */

semop函数对信号量的操作实际就是改变上述内核变量的操作,该函数定义如下:

cpp
1
2
#include <sys/sem.h>
int semop(int sem_id, struct sembuf* sem_ops, size_t num_sem_ops);
参数

sem_idsemget调用返回的信号量集标识符,指定被操作的目标信号量集。

sem_ops:指向一个 sembuf 类型结构体的数组:

cpp
1
2
3
4
5
6
struct sembuf
{
unsigned short int sem_num;
short int sem_op;
short int sem_flg;
};
  • sem_num:信号量集中信号量的编号,0代表信号量集的第一个信号量,以此类推。

  • sem_op:指定操作类型,可选值:正整数,0、负整数,同时受到 sem_flg的影响。op>0执行V操作,op小于0执行P操作

    • 通常P操作值为 - 1,V操作值为 1
  • sem_flg:可选值为IPC_NOWAIT,SEM_UNDO,0

    • IPC_NOWAIT代表非阻塞操作SEM_UNDO代表撤销操作0代表阻塞操作
    • sem_flg设置为IPC_NOWAIT时,如果信号量操作(如sem_op中的减法操作使得信号量的值小于 0)不能立即执行,操作不会阻塞等待信号量状态改变,而是立即返回一个错误,错误码通常为EAGAIN。这种方式适用于不希望进程在信号量操作上长时间阻塞的场景,例如在一些对实时性要求较高的应用中,当获取不到信号量时可以先去执行其他任务。
    • sem_flg设置为SEM_UNDO时,系统会记录信号量操作,以便在进程异常终止时自动撤销(调整)信号量的值,以避免信号量状态被错误地锁定或者资源无法释放的情况。例如,如果一个进程对信号量进行了P操作(减操作)获取资源后异常终止,没有来得及进行V操作(加操作)释放资源,设置了SEM_UNDO的信号量系统会自动进行适当的调整,保证信号量状态的正确性。
    • sem_flg为 0 时,信号量操作会按照正常的阻塞方式执行。对于P操作(sem_op为负数),如果信号量的值不够减,进程会被阻塞,直到信号量的值满足操作要求(例如其他进程进行了V操作增加了信号量的值)。这种方式在需要确保资源按照顺序被访问和操作,且允许进程等待资源可用的场景下非常有用,比如在经典的生产者 - 消费者模型中,消费者进程等待生产者生产出产品(通过信号量控制),此时使用阻塞式操作可以保证消费者在没有产品时等待,直到生产者生产出产品后再继续执行。

num_sem_ops:指定要执行的操作个数,即sem_ops数组中元素的个数。semop函数对sem_ops数组参数中的每个成员按数组顺序依次执行操作,且该过程是原子操作,以避免别的进程在同一时刻按不同顺序对该信号集中的信号量执行semop函数导致的竞态条件。

返回值

semop成功返回0,失败返回-1并设置errno。

sem_op值的不同操作规则:

  • sem_op 大于 0 时,表示进程要增加信号量的值。操作要求调用进程对被操作信号量集拥有写权限。若设置了 SEM_UNDO 标志,系统将更新进程的 semadj 变量。
  • sem_op 等于 0 时,表示这是一个 “等待 0” 操作。操作要求调用进程对被操作信号量集拥有读权限。如果信号量的值为 0,调用立即成功;如果不是 0,则操作失败或阻塞进程直到信号量变为 0。在这种情况下,当 IPC_NOWAIT 标志被指定时,操作立即返回一个错误,并设置 errnoEAGAIN。若未指定 IPC_NOWAIT 标志,信号量的 semncnt 值加 1,进程将被投入睡眠直到满足特定条件。
  • sem_op 小于 0 时,表示对信号量值进行减操作,即期望获得信号量。操作要求调用进程对被操作信号量集拥有写权限。如果信号量的值 semval 大于或等于 sem_op 的绝对值,操作成功,调用进程立即获得信号量,并且系统将该信号量的 semval 值减去 sem_op 的绝对值。若设置了 SEM_UNDO 标志,则系统将更新进程的 semadj 变量。

4.semctl 系统调用

semctl系统调用允许调用者对信号量进行直接控制:

cpp
1
2
#include <sys/sem.h>
int semctl(int sem_id, int sem_num, int command, ...);

参数:

  • sem_id参数是由semget调用返回的信号量集标识符,用于指定被操作的信号量集。
  • sem_num参数指定被操作的信号量在信号量集中的编号。
  • command参数指定要执行的命令,有些命令需要调用者传递第 4 个参数。

第四个参数可以自定义,但是系统给出了推荐的定义格式:

c
1
2
3
4
5
6
7
union semun
{
int val;// 用于SETVAL命令
struct semid_ds *buf;// 用于IPC_STAT和IPC_SET命令
unsigned short *array;// 用于GETALL和SETALL命令
struct seminfo *__buf;// 用于IPC_INFO命令
};
c
1
2
3
4
5
6
7
8
struct seminfo
{
int semmap;// Linux内核没有使用
int semmni;// 系统最多可以拥有的信号量集数目
int semmns;// 系统最多可以拥有的信号量数目
int semmnu;// Linux内核没有使用
int semmsl;// 一个信号量集最多允许包含的信号量数目
};

返回值:

  • semctl成功时的返回值取决于command参数,失败时返回 - 1,并设置errno

image-20241220111936954

注意事项

GETNCNTGETPIDGETVALGETZCNTSETVAL操作中,操作的是单个信号量,此时sem_num参数指定单个信号量在信号量集中的编号。而其他操作针对的是整个信号量集,此时sem_num参数被忽略。

5.特殊键值 IPC_PRIVATE

semget的调用者可以给其key参数传递一个特殊键值IPC_PRIVATE(其值为0),这样无论该信号量是否已存在,semget函数都将创建一个新信号量,使用该键值创建的信号量并非像它的名字声称的那样是进程私有的,其他进程,尤其是子进程,也有方法来访问这个信号量,所以semget函数的man手册的BUGS部分上说,使用名字IPC_PRIVATE有些误导(历史原因),应称为IPC_NEW

6.信号量实现进程间通信

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <string.h>
#include <unistd.h>

// 定义链表节点结构体
typedef struct ListNode {
int data;
struct ListNode *next;
} ListNode;

// 定义信号量操作结构体
union semun {
int val;
struct semid_ds *buf;
unsigned short int *array;
struct seminfo *__buf;
};

// 声明全局的链表头指针
ListNode *head = NULL;

// 信号量操作函数
void semaphore_op(int semid, int sem_num, int op) {
struct sembuf sem_b;
sem_b.sem_num = sem_num;
sem_b.sem_op = op;
sem_b.sem_flg = 0;
if (semop(semid, &sem_b, 1) == -1) {
perror("semop");
exit(1);
}
}

// 生产者函数
void producer(int semid) {
ListNode *head = NULL; // 链表头指针
int item = 0;
while (1) {
// 申请节点内存
ListNode *new_node = (ListNode *)malloc(sizeof(ListNode));
if (new_node == NULL) {
perror("malloc");
exit(1);
}
new_node->data = item++;

//P操作 获取空闲缓冲区信号量(这里表示链表插入的空位,第二参数表示产品还剩下0)
semaphore_op(semid, 0, -1);

// 将新节点插入链表头部(简单实现,可按需改为其他插入方式)
new_node->next = head;
head = new_node;
printf("生产了一个产品\n");
// V操作 增加产品信号量(表示链表中有新数据可供消费,第二参数表示生产出来了一个产品)
semaphore_op(semid, 1, 1);

sleep(1);
}
}

// 消费者函数
void consumer(int semid) {
while (1) {
semaphore_op(semid, 1, -1); // P操作 获取产品信号量(链表中有数据才可消费,第二个参数1代表有数据)

// 取出链表头节点进行消费
ListNode *node_to_consume = NULL;
if (head!= NULL) {
node_to_consume = head;
head = head->next;
printf("Consumer: %d\n", node_to_consume->data);
free(node_to_consume); // 释放消费完的节点内存
}
printf("消费了一个产品\n");
semaphore_op(semid, 0, 1); // V操作 增加空闲缓冲区信号量(链表腾出空位,第二个参数代表已经消费完了)

sleep(2);
}
}

int main() {
// 创建信号量集 IPC_PRIVATE的值为0,表示不管该信号量创建了没有都会创建一个
int semid = semget(IPC_PRIVATE, 2, 0666 | IPC_CREAT);
if (semid == -1) {
perror("semget");
return 1;
}
union semun arg;
arg.val = 1;
// 初始化空闲缓冲区信号量(初始有1个空位可插入链表节点)
if (semctl(semid, 0, SETVAL, arg) == -1) {
perror("semctl");
return 1;
}
arg.val = 0;
// 初始化产品信号量(初始链表无数据可供消费)
if (semctl(semid, 1, SETVAL, arg) == -1) {
perror("semctl");
return 1;
}

pid_t pid = fork();
if (pid < 0) {
perror("fork");
return 1;
} else if (pid == 0) {
// 子进程为消费者
consumer(semid);
} else {
// 父进程为生产者
producer(semid);
}

// 删除信号量集
if (semctl(semid, 0, IPC_RMID) == -1) {
perror("semctl");
return 1;
}

return 0;
}

总体功能:

生产者生产一个,消费者拿一个,然后再生产,然后再消费

1. 数据结构定义

定义了ListNode结构体来表示链表的节点,包含一个int类型的数据成员用于存放生产者生产的数据,以及一个指向下一个节点的指针成员。

2. 信号量操作相关部分

  • union semun结构体:用于给semctl函数传递参数,根据不同的命令可以传递不同类型的值,在这里主要用于初始化信号量的值。
  • semaphore_op函数:封装了semop函数,用于对信号量进行操作。它接受信号量集标识符、信号量编号以及操作值作为参数,构造struct sembuf结构体并调用semop函数来执行信号量操作,操作失败时会输出错误信息并终止程序。

3. 生产者逻辑

  • producer函数中,首先定义了链表头指针head,并在循环中不断生产数据。每次生产时,先通过malloc函数申请一个新的链表节点内存空间,将数据存入节点。
  • 然后通过semaphore_op函数获取空闲缓冲区信号量(这里代表链表中可插入新节点的空位),接着将新节点插入到链表头部(简单实现了链表插入操作,实际可根据需求调整插入逻辑),再通过semaphore_op函数增加产品信号量,表示链表中有新的数据可供消费者消费,最后通过sleep函数模拟生产过程的时间间隔。

4. 消费者逻辑

  • consumer函数中,通过循环不断尝试消费数据。首先通过semaphore_op函数获取产品信号量,只有当链表中有数据(信号量值大于等于 1)时才能继续执行。
  • 接着取出链表头节点,将其数据打印出来模拟消费过程,然后释放该节点占用的内存空间,最后通过semaphore_op函数增加空闲缓冲区信号量,表示链表腾出了一个空位可供生产者插入新节点,同样通过sleep函数模拟消费过程的时间间隔。

5. 主函数部分

  • main函数中,通过semget函数创建包含两个信号量的信号量集,分别用于控制空闲缓冲区(链表插入空位)和产品(链表中可消费的数据)。
  • 使用semctl函数结合union semun结构体来初始化这两个信号量的初始值。
  • 通过fork函数创建子进程,子进程执行consumer函数作为消费者,父进程执行producer函数作为生产者。
  • 最后在程序结束时,通过semctl函数删除信号量集,释放相关系统资源。

这样就通过 System V IPC 信号量实现了一个基于链表作为共享数据结构的生产者 - 消费者模型,确保了生产者和消费者对链表的并发访问是安全有序的。

运行结果:

image-20241220115519889

7.无血缘关系进程通信

  1. ftok函数的定义和功能
    • 函数原型key_t ftok(const char *pathname, int proj_id);
    • 功能ftok函数用于生成一个唯一的key(键值),这个key通常用于 System V IPC(进程间通信)机制中,如创建共享内存、消息队列和信号量集等。它将一个文件路径名(pathname)和一个项目标识符(proj_id)组合起来,生成一个适合作为 System V IPC 资源标识符的key值。
  2. 参数解释
    • const char *pathname
      • 这是一个指向文件路径名的指针。这个文件路径必须是一个已经存在的文件的有效路径,通常使用当前目录(.")或者一个程序相关的配置文件路径等。ftok函数会使用文件的inode(索引节点)信息作为生成key的一部分。
      • 注意,如果文件被删除然后重新创建,即使文件名相同,inode可能会改变,这会导致ftok生成不同的key值。
    • int proj_id
      • 这是一个0 - 255之间的整数,作为项目标识符。它和文件路径的inode信息一起组合生成key。不同的项目可以使用不同的proj_id来区分,这样即使基于同一个文件路径,不同的项目也能生成不同的key值用于各自的 IPC 资源。例如,一个程序中有两个不同的模块需要使用消息队列进行通信,它们可以使用相同的文件路径但不同的proj_id来生成不同的key,以创建两个独立的消息队列。
  3. 返回值
    • 成功时,ftok函数返回一个key_t类型的非负整数,这个整数可以作为shmgetmsggetsemget等 System V IPC 函数的key参数来创建或获取对应的 IPC 资源。
    • 失败时,返回-1,并且会设置errno来指示错误原因。常见的错误原因包括:
      • EACCESS:没有权限访问pathname指定的文件。
      • ENOENTpathname指定的文件不存在。

发送端

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <unistd.h>

// 信号量操作函数
void semaphore_op(int semid, int sem_num, int op) {
struct sembuf sem_b;
sem_b.sem_num = sem_num;
sem_b.sem_op = op;
sem_b.sem_flg = 0;
if (semop(semid, &sem_b, 1) == -1) {
perror("semop");
exit(1);
}
}

int main() {
// 通过ftok生成key
key_t key = ftok(".", 'a');
if (key == -1) {
perror("ftok");
return 1;
}

// 创建信号量集(只含一个信号量)
int semid = semget(key, 1, IPC_CREAT | 0666);
if (semid == -1) {
perror("semget");
return 1;
}

// 初始化信号量值为0(表示资源不可用)
union semun {
int val;
struct semid_ds *buf;
unsigned short *array;
struct seminfo *__buf;
} arg;
arg.val = 0;
if (semctl(semid, 0, SETVAL, arg) == -1) {
perror("semctl");
return 1;
}

// 进行一些操作后,释放信号量(表示资源可用了)
semaphore_op(semid, 0, 1);
printf("Sender process released the signal...\n");


sleep(10);
return 0;
}

读入端

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <unistd.h>

// 信号量操作函数
void semaphore_op(int semid, int sem_num, int op) {
struct sembuf sem_b;
sem_b.sem_num = sem_num;
sem_b.sem_op = op;
sem_b.sem_flg = 0;
if (semop(semid, &sem_b, 1) == -1) {
perror("semop");
exit(1);
}
}

int main() {
// 通过ftok生成key
key_t key = ftok(".", 'a');
if (key == -1) {
perror("ftok");
return 1;
}

// 获取信号量
int semid = semget(key, 1, 0666);
if (semid == -1) {
perror("semget");
return 1;
}

// 等待信号量,相当于等待另一个进程释放资源
semaphore_op(semid, 0, -1);
printf("Receiver process got the signal and can continue...\n");

// 操作完成后,释放信号量(这里简单示意)
semaphore_op(semid, 0, 1);

// 删除信号量集
if (semctl(semid, 0, IPC_RMID, NULL) == -1) {
perror("semctl for delete");
}
sleep(10);
return 0;
}

运行结果:

image-20241220192820038

8.共享内存

共享内存的基本原理

  • 共享内存是一种进程间通信(IPC)机制。当多个进程需要共享数据时,可以创建一块共享内存区域。通过shmget函数创建共享内存段后,这块内存区域就存在于系统的内存空间中。
  • 然后,各个进程可以通过shmat函数将这块共享内存连接到自己的进程地址空间。连接后,进程就可以像访问自己的本地内存一样访问共享内存中的数据。

共享内存是最高效的IPC机制,因为它不涉及进程之间的任何数据传输,这种高效率带来的问题是,我们必须用其他辅助手段来同步进程对共享内存的访问,否则会产生竞态条件,因此,共享内存通常和其他进程间通信方式一起使用。

Linux共享内存的API都定义在sys/shm.h头文件中,包括4个系统调用shmgetshmatshmdtshmctl

1.shmget 系统调用

shmget 系统调用创建一段新的共享内存,或者获取一段已经存在的共享内存。

cpp
1
2
#include <sys/shm.h>
int shmget(key_t key, size_t size, int shmflg);
  1. 参数详解

    • key_t key
      • 这是一个键值,用于标识共享内存段。它可以是由ftok函数生成的值,也可以是IPC_PRIVATE(用于创建私有共享内存段,通常在父子进程间使用)。ftok函数根据一个文件路径和一个项目标识符生成一个唯一的key_t值。
    • size_t size
      • 这个参数指定了要创建或获取的共享内存段的大小,单位是字节。如果是创建新的共享内存段,这个大小必须大于 0。如果shmget函数用于获取已存在的共享内存段,这个参数通常可以被忽略(但仍需提供一个合理的值)。
    • int shmflg
      • 这个参数用于控制共享内存段的创建和访问权限,它由以下几种标志组成:
        • IPC_CREAT:如果共享内存段不存在,则创建它。如果和IPC_EXCL一起使用(IPC_CREAT | IPC_EXCL),则只有在共享内存段不存在时才创建,若已存在则shmget函数返回 - 1 并设置errnoEEXIST
        • IPC_EXCL:与IPC_CREAT一起使用,用于确保创建的是一个新的、唯一的共享内存段。
        • 权限标志:如0666等,用于指定共享内存段的访问权限,格式与文件权限相同(用户、组、其他用户的读、写、执行权限)。
        • SHM_HUGETLB标志:当在shmflg参数中使用SHM_HUGETLB标志时,表示请求使用大页(huge pages)来分配共享内存。大页是一种内存管理技术,它使用比标准内存页更大的页面大小(通常为 2MB 或 1GB,取决于系统配置)。
        • SHM_NORESERVE标志SHM_NORESERVE标志用于在创建共享内存时,不预留交换空间(swap space)。通常情况下,当创建共享内存段时,系统会为其预留相应的交换空间,以确保在内存不足时可以将部分数据交换到磁盘上。使用SHM_NORESERVE可以避免这种交换空间的预留。这样,当物理内存不足时,对该共享内存执行写操作将触发SIGESV信号。
  2. 返回值

    • 成功时,shmget函数返回一个非负整数,即共享内存段的标识符(shmid)。这个标识符可以用于后续的shmat(连接共享内存段到进程地址空间)、shmdt(从进程地址空间分离共享内存段)和shmctl(控制共享内存段的操作,如删除、获取状态等)操作。

    • 失败时,函数返回 - 1,并设置errno

      变量来指示错误原因。常见的错误原因包括:

      • EINVALsize小于SHMMIN(最小共享内存大小)或者key无效。
      • EEXIST:当使用IPC_CREAT | IPC_EXCL标志且共享内存段已经存在时。
      • ENOENT:当key对应的共享内存段不存在且没有使用IPC_CREAT标志时。

如果 shmget 用于创建共享内存,则这段共享内存的所有字节都被初始化为 0,与之关联的内核数据结构 shmid_ds 将被创建并初始化。shmid_ds 结构体的定义如下:

c
1
2
3
4
5
6
7
8
9
10
11
struct shmid_ds
{
struct ipc_perm shm_perm; // 共享内存的操作权限
size_t shm_segsz; // 共享内存容量大小,单位是字节
time_t shm_atime; // 对这段内存最后一次调用shmat的时间
time_t shm_dtime; // 对这段内存最后一次调用shmdt的时间
time_t shm_ctime; // 对这段内存最后一次调用shmctl的时间
_pid_t shm_cpid; // 创建者的PID
_pid_t shm_lpid; // 最后一次执行shmat或shmdt操作的进程的PID
shmat_t shm_nattach; // 目前关联到此共享内存的进程数量
};

shmget 对 shmid_ds 结构体的初始化包括:

  • 将 shm_perm.cuid 和 shm_perm.uid 设置为调用进程的有效用户 ID。
  • 将 shm_perm.cgid 和 shm_perm.gid 设置为调用进程的有效组 ID。
  • 将 shm_perm.mode 的最低 9 位设置为 shmflg 参数的最低 9 位。
  • 将 shm_segsz 设置为 size。
  • 将 shm_lpid、shm_nattach、shm_atime、shm_dtime 设置为 0。
  • 将 shm_ctime 设置为当前的时间。

2.shmatshmdt 系统调用

共享内存被创建/获取后,我们不能立即访问它,而是需要先将它关联到进程的地址空间中,使用完共享内存后,我们也需要将它从进程地址空间中分离,这两项任务分别由以下两个系统调用实现:

cpp
1
2
3
#include <sys/shm.h>
void* shmat(int shm_id, const void* shm_addr, int shmflg);
int shmdt(const void* shm_addr);

1.shamt

c
1
void* shmat(int shm_id, const void* shm_addr, int shmflg);
  • 功能:

    • 将由shm_id标识的共享内存段连接到调用进程的地址空间。连接成功后,进程可以像访问普通内存一样访问共享内存中的数据。
  • 参数:

    • shm_id
      • 这是共享内存段的标识符,由shmget函数成功创建或获取共享内存段时返回。它唯一标识了一个共享内存段。
    • shm_addr
      • 用于指定连接共享内存段的地址。
      • 如果shm_addrNULL,则由系统选择合适的连接地址。
      • 如果shm_addr不为NULL,并且shmflg中没有设置SHM_RND标志,那么共享内存段将被连接到shm_addr所指定的地址。
      • 如果shm_addr不为NULL,并且shmflg中设置了SHM_RND标志,那么连接的地址将是shm_addr向下舍入到系统页面大小(page - size)的整数倍的地址。
    • shmflg
      • 包含一些控制连接操作的标志。
      • SHM_RND:如上述,用于对连接地址进行舍入操作。
      • SHM_RDONLY:如果设置了这个标志,那么共享内存段将以只读方式连接到进程地址空间。如果不设置此标志,共享内存段将以读写方式连接。
  • 返回值:

    • 成功时,返回指向连接后的共享内存段在进程地址空间中的起始地址的指针。并修改shmid_ds中的字段

      • 将 shm_nattach 加 1。
      • 将 shm_lpid 设置为调用进程的 PID。
      • 将 shm_atime 设置为当前时间。
    • 失败时,返回(void *)-1,并且会设置errno

      来指示错误原因。常见的错误原因包括:

      • EINVALshm_id无效,或者shm_addrshmflg的组合无效。
      • ENOMEM:没有足够的内存来连接共享内存段。

shm_id参数是由shmget函数返回的共享内存标识符。shm_addr参数指定将共享内存关联到进程的哪块地址空间,最终效果还受到shmflg参数的可选标志SHM_RND的影响。

  • 如果 shm_addr 为 NULL,则被关联的地址由操作系统选择。这是推荐的做法,以确保代码的可移植性。

  • 如果 shm_addr 非空,并且 SHM_RND 标志未被设置,则共享内存被关联到 addr 指定的地址处。

  • 如果 shm_addr 非空,并且设置了 SHM_RND 标志,则被关联的地址是 [shm_addr-(shm_addr % SHMLBA)]。SHMLBA 的含义是 “段低端边界地址倍数”(Segment Low Boundary Address Multiple),它必须是内存页面大小 (PAGE_SIZE) 的整数倍。现在的 Linux 内核中,它等于一个内存页大小。SHM_RND 的含义是圆整 (round),即将共享内存被关联的地址向下圆整到离 shm_addr 最近的 SHMLBA 的整数倍地址处。

除了 SHM_RND 标志外,shmflg 参数还支持如下标志:

  • SHM_RDONLY。进程仅能读取共享内存中的内容。若没有指定该标志,则进程可同时对共享内存进行读写操作(当然,这需要在创建共享内存的时候指定其读写权限)。

  • SHM_REMAP。如果 shmaddr 已经被关联到一段共享内存上,则重新关联。

  • SHM_EXEC。它指定对共享内存段的执行权限。对于共享内存的执行权限,执行权限和读权限是一样的。

2. shmdt

c
1
int shmdt(const void* shm_addr);
  • 功能:

    • 将之前由shmat连接到进程地址空间的共享内存段分离。分离操作并不会删除共享内存段本身,只是将该共享内存段从调用进程的地址空间中移除。

    • shm_addr

      • 这是共享内存段在进程地址空间中的起始地址,即shmat函数成功连接共享内存段时返回的地址。
  • 成功时,返回0。失败时,返回-1,并且会设置errno来指示错误原因。

    成功调用会修改内核数据结构 shmid_ds 的部分字段,如下:

    • 将 shm_nattach 减 1。

    • 将 shm_lpid 设置为调用进程的 PID。

    • 将 shm_dtime 设置为当前时间。

    常见的错误原因包括:

    • EINVALshm_addr不是有效的共享内存段地址。

shmdt函数将关联到shm_addr参数处的共享内存从进程中分离,它成功时返回0,失败则返回-1并设置errno。

调用该函数使得它从进程空间分离的含义

  • 不删除本身:
    • 共享内存段在系统内存中是独立存在的。即使一个进程不再需要使用它,这个共享内存段本身并不会消失。这是因为其他进程可能还在使用这块共享内存进行数据交互。例如,有进程 A、B、C 都连接到了同一块共享内存。如果进程 A 调用shmdt,只是进程 A 不再能访问这块共享内存,但进程 B 和 C 仍然可以正常访问,共享内存段本身依然存在于系统内存中。
  • 从进程地址空间移除:
    • 在进程调用shmdt之前,共享内存段是映射到该进程的地址空间中的。这意味着进程可以直接通过指针访问共享内存中的数据。当调用shmdt后,这个映射关系就被解除了。就好像在进程的 “视野” 中,这块共享内存消失了。虽然共享内存还在系统中,但该进程已经无法再通过之前的指针去访问其中的数据了。这就是所谓的从进程地址空间移除。

3.shmctl 系统调用

shmctl系统调用控制共享内存的某些属性。

cpp
1
2
#include <sys/shm.h>
int shmctl(int shm_id, int command, struct shmid_ds* buf);
  1. 函数功能

    • shmctl函数用于对共享内存段进行控制操作,如获取共享内存段的状态信息、设置共享内存段的属性、删除共享内存段等。
  2. 参数详解

    • int shm_id

      • 这是共享内存段的标识符,由shmget函数成功创建或获取共享内存段时返回。它唯一标识了一个共享内存段,shmctl函数将对这个标识符所对应的共享内存段进行操作。
    • int command:(见下表)

      • 这是一个控制命令,用于指定对共享内存段进行何种操作,常见的命令有:
        • IPC_STAT:获取共享内存段的状态信息,并将其存储到buf所指向的struct shmid_ds结构体中。这个结构体包含了共享内存段的各种属性,如操作权限、大小、最后访问时间等。
        • IPC_SET:根据buf所指向的struct shmid_ds结构体中的信息来设置共享内存段的属性。例如,可以修改共享内存段的操作权限等。
        • IPC_RMID:删除由shm_id标识的共享内存段。这是一个非常重要且具有危险性的操作,一旦执行,共享内存段及其所包含的数据将被永久删除,所有关联到该共享内存段的进程将无法再访问它。
      • 除了上述常见命令外,还有一些其他命令,如SHM_LOCK(锁定共享内存段到物理内存,防止被交换到磁盘)和SHM_UNLOCK(解锁共享内存段,允许其被交换到磁盘)等,但这些命令的可用性可能取决于操作系统的支持。
    • struct shmid_ds* buf

      • 这是一个指向struct shmid_ds结构体的指针,其作用取决于command

        参数的值:

        • commandIPC_STAT时,buf用于存储获取到的共享内存段的状态信息。
        • commandIPC_SET时,buf指向的结构体中的信息将被用于设置共享内存段的属性。
        • 如果command不涉及IPC_STATIPC_SET操作(如IPC_RMID),buf通常可以设置为NULL
  3. 返回值(见下表)

    • 成功时:

      • 如果commandIPC_STATIPC_SET,返回0表示操作成功。
      • 如果commandIPC_RMID,返回0表示共享内存段已成功删除。
    • 失败时:

      • 返回-1,并设置errno

        变量来指示错误原因。常见的错误原因包括:

        • EINVALshm_id无效,或者command参数无效,或者buf指向的结构体无效(在IPC_STATIPC_SET操作时)。
        • EPERM:调用进程没有足够的权限来执行请求的操作(例如,没有权限删除共享内存段或修改其属性)。

image-20241220160445378

4.函数接口使用

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>

int main() {
// 创建共享内存段
int shm_id = shmget(IPC_PRIVATE, 1024, IPC_CREAT | 0666);
if (shm_id == -1) {
perror("shmget");
return 1;
}

// 连接共享内存段
void* shm_addr = shmat(shm_id, NULL, 0);
if (shm_addr == (void *)-1) {
perror("shmat");
return 1;
}

// 使用共享内存段,这里简单地将其初始化为0
for (int i = 0; i < 1024; i++) {
((char *)shm_addr)[i] = 0;
}

// 分离共享内存段
if (shmdt(shm_addr) == -1) {
perror("shmdt");
return 1;
}

// 删除共享内存段
if (shmctl(shm_id, IPC_RMID, NULL) == -1) {
perror("shmctl");
return 1;
}

return 0;
}

5.共享内存的 POSIX 方法

mmap函数和munmap函数利用mmap函数的MAP_ANONYMOUS标志可以实现父、子进程间的匿名内存共享。通过打开同一个文件,mmap也可以实现无关进程之间的内存共享。

Linux提供了另一种在无关进程间共享内存的方式,这种方式无须任何文件的支持,但它需要先用shm_open函数来创建或打开一个POSIX共享内存对象。

1.shm_open

cpp
1
2
3
4
5
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>

int shm_open ( const char* name, int oflag, mode_t mode );

shm_open函数的使用方法与open系统调用完全相同。

  • 功能:
    • 用于创建或打开一个共享内存对象。这个函数是基于 POSIX 标准的,提供了一种可移植的方式来操作共享内存。
  • 参数:
    • const char* name
      • 这是共享内存对象的名字。从可移植性的角度考虑,名字应该遵循特定格式:以/开始,后面跟着多个非/字符,并且以\0结尾,长度通常不超过NAME_MAX(一般为 255)。例如"/my_shared_memory"
    • int oflag
      • 用于指定打开共享内存对象的方式,它是以下标志的按位或组合:
        • O_RDONLY:以只读方式打开共享内存对象。如果只指定了这个标志,那么进程只能从共享内存中读取数据。
        • O_RDWR:以可读可写方式打开共享内存对象,这是最常用的方式之一,允许进程对共享内存进行读写操作。
        • O_CREAT:如果共享内存对象不存在,则创建它。当使用这个标志时,mode参数用于指定新创建的共享内存对象的访问权限(类似文件权限)。
        • O_EXCL:通常与O_CREAT一起使用。如果name指定的共享内存对象已经存在,shm_open调用将返回错误(-1);如果不存在,则创建一个新的共享内存对象。这可以用于确保创建的是一个全新的、未被使用过的共享内存对象。
        • O_TRUNC:如果共享内存对象已经存在,将其截断(长度变为 0)。这个操作会丢失共享内存对象中原先存储的数据。
    • mode_t mode
      • oflag中包含O_CREAT标志时,mode参数用于指定共享内存对象的访问权限。它类似于文件权限,例如0666表示用户、组和其他用户都有读写权限。权限的低 9 位用于设置实际的权限。

返回值:

  • 返回值:
    • 成功时,返回一个非负整数,即共享内存对象的文件描述符。这个文件描述符可以用于后续的操作,如mmap(将共享内存映射到进程地址空间)等操作。
    • 失败时,返回-1,并且会设置errno来指示错误原因。常见的错误原因包括:
      • EEXIST:当O_CREATO_EXCL一起使用且共享内存对象已经存在时。
      • EINVALname参数不符合格式要求,或者oflag参数无效。
      • EACCES:没有足够的权限按照oflagmode指定的方式创建或打开共享内存对象。

和打开的文件最后需要关闭一样,由shm_open函数创建的共享内存对象用完后也需要删除,可通过shm_unlink函数实现。

cpp
1
2
3
4
5
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>

int shm_unlink ( const char* name );
  • 功能:

    • 用于删除一个共享内存对象。当一个进程调用shm_unlink后,共享内存对象会被标记为删除,但实际的删除操作会在所有进程都关闭了对该共享内存对象的引用(通过shm_open打开得到的文件描述符都被关闭)后才会执行。
  • 参数:

shm_unlink函数将name参数指定的共享内存对象标记为等待删除,当所有使用该共享内存对象的进程都使用munmap函数将它从进程中分离后,系统将销毁这个共享内存对象所占据的资源。

  • 返回值:

    • 成功时,返回0

    • 失败时,返回-1,并且会设置errno来指示错误原因。常见的错误原因包括:

      • EINVALname参数不符合格式要求。
      • ENOENT:指定的共享内存对象不存在。

如果代码中使用了以上POSIX共享内存函数,则编译时需要指定链接选项-lrt

3.使用案例

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>

#define MEMORY_SIZE 4096

int main() {
// 创建或打开共享内存对象
int fd = shm_open("/my_shared_memory", O_CREAT | O_RDWR, 0666);
if (fd == -1) {
perror("shm_open");
return 1;
}

// 设置共享内存对象大小
if (ftruncate(fd, MEMORY_SIZE) == -1) {
perror("ftruncate");
return 1;
}

// 映射共享内存到进程地址空间
void* shared_memory = mmap(NULL, MEMORY_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (shared_memory == MAP_FAILED) {
perror("mmap");
return 1;
}

// 使用共享内存,这里简单地写入数据
char* message = "Hello, shared memory!";
for (int i = 0; i < sizeof(message); i++) {
((char*)shared_memory)[i] = message[i];
}

// 解除映射
if (munmap(shared_memory, MEMORY_SIZE) == -1) {
perror("munmap");
return 1;
}

// 关闭共享内存对象
if (close(fd) == -1) {
perror("close");
return 1;
}

// 删除共享内存对象
if (shm_unlink("/my_shared_memory") == -1) {
perror("shm_unlink");
return 1;
}

return 0;
}

代码解释

  • 首先,使用shm_open创建或打开一个名为/my_shared_memory的共享内存对象,权限为0666,并以读写方式打开。
  • 然后,使用ftruncate设置共享内存对象的大小为MEMORY_SIZE(这里定义为 4096 字节)。
  • 接着,使用mmap将共享内存映射到进程的地址空间,以便可以像访问普通内存一样访问共享内存。
  • 之后,向共享内存中写入了一个字符串Hello, shared memory!
  • 再然后,使用munmap解除共享内存的映射。
  • 接着,使用close关闭共享内存对象的文件描述符。
  • 最后,使用shm_unlink删除共享内存对象。

4.实现生产者消费者模型

使用共享内存实现生产者 - 消费者模型(基于shm_openshm_unlink函数)

  • 以下是使用shm_openshm_unlink函数实现的生产者 - 消费者模型的 C++ 代码示例:
cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#include <iostream>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <string.h>
#include <semaphore.h>
#include <cstdlib>
#include <ctime>
#include <thread>

#define MEMORY_SIZE 4096
#define BUFFER_SIZE 10

// 共享内存结构体
struct SharedMemory {
int buffer[BUFFER_SIZE];
int in;
int out;
};

// 生产者函数
void producer(int shm_fd, sem_t* empty_sem, sem_t* full_sem) {
void* shm_ptr = mmap(NULL, MEMORY_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
if (shm_ptr == MAP_FAILED) {
std::cerr << "mmap failed in producer" << std::endl;
return;
}
SharedMemory* shared_mem = (SharedMemory*)shm_ptr;

srand(static_cast<unsigned int>(time(nullptr)));
while (true) {
sem_wait(empty_sem);
int item = rand() % 100;
shared_mem->buffer[shared_mem->in] = item;
shared_mem->in = (shared_mem->in + 1) % BUFFER_SIZE;
std::cout << "Producer produced: " << item << std::endl;
sem_post(full_sem);
// 模拟生产时间
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
munmap(shm_ptr, MEMORY_SIZE);
}

// 消费者函数
void consumer(int shm_fd, sem_t* empty_sem, sem_t* full_sem) {
void* shm_ptr = mmap(NULL, MEMORY_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
if (shm_ptr == MAP_FAILED) {
std::cerr << "mmap failed in consumer" << std::endl;
return;
}
SharedMemory* shared_mem = (SharedMemory*)shm_ptr;

while (true) {
sem_wait(full_sem);
int item = shared_mem->buffer[shared_mem->out];
shared_mem->out = (shared_mem->out + 1) % BUFFER_SIZE;
std::cout << "Consumer consumed: " << item << std::endl;
sem_post(empty_sem);
// 模拟消费时间
std::this_thread::sleep_for(std::chrono::milliseconds(800));
}
munmap(shm_ptr, MEMORY_SIZE);
}

int main() {
// 创建共享内存对象
int shm_fd = shm_open("/my_shared_memory", O_CREAT | O_RDWR, 0666);
if (shm_fd == -1) {
std::cerr << "shm_open failed" << std::endl;
return 1;
}
// 设置共享内存大小
if (ftruncate(shm_fd, MEMORY_SIZE) == -1) {
std::cerr << "ftruncate failed" << std::endl;
return 1;
}

// 创建信号量
sem_t* empty_sem = sem_open("/empty_sem", O_CREAT, 0666, BUFFER_SIZE);
sem_t* full_sem = sem_open("/full_sem", O_CREAT, 0666, 0);
if (empty_sem == SEM_FAILED || full_sem == SEM_FAILED) {
std::cerr << "sem_open failed" << std::endl;
return 1;
}

// 映射共享内存到进程空间
void* shm_ptr = mmap(NULL, MEMORY_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
if (shm_ptr == MAP_FAILED) {
std::cerr << "mmap failed" << std::endl;
return 1;
}
SharedMemory* shared_mem = (SharedMemory*)shm_ptr;
shared_mem->in = 0;
shared_mem->out = 0;

// 创建生产者和消费者线程
std::thread producer_thread(producer, shm_fd, empty_sem, full_sem);
std::thread consumer_thread(consumer, shm_fd, empty_sem, full_sem);

producer_thread.join();
consumer_thread.join();

// 关闭并删除信号量
sem_close(empty_sem);
sem_close(full_sem);
sem_unlink("/empty_sem");
sem_unlink("/full_sem");

// 解除映射并删除共享内存
munmap(shm_ptr, MEMORY_SIZE);
close(shm_fd);
shm_unlink("/my_shared_memory");

return 0;
}

整体代码解释:

  • 共享内存结构体定义:
    • 定义了SharedMemory结构体,包含一个整数数组作为缓冲区,以及inout索引,用于生产者和消费者操作。
  • 生产者函数:
    • 通过mmap将共享内存映射到进程地址空间。
    • 使用sem_wait等待empty_sem信号量(表示缓冲区有空闲位置),生产一个随机数放入缓冲区,更新in索引,然后使用sem_post释放full_sem信号量(表示缓冲区有数据可供消费)。
  • 消费者函数:
    • 同样通过mmap映射共享内存。
    • 使用sem_wait等待full_sem信号量,从缓冲区取出数据,更新out索引,然后使用sem_post释放empty_sem信号量。
  • 主函数:
    • 使用shm_open创建共享内存对象,并设置大小。
    • 创建empty_semfull_sem两个信号量,分别用于控制缓冲区的空闲和满状态。
    • 映射共享内存后初始化inout索引。
    • 创建生产者和消费者线程并等待它们结束。
    • 最后关闭并删除信号量,解除共享内存映射,关闭并删除共享内存对象。

5.无血缘关系的进程之间通信

写入端

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>

#define SHM_NAME "/my_shared_memory"
#define SHM_SIZE 100

int main() {
int shm_fd;
void *ptr;

// 创建共享内存对象
shm_fd = shm_open(SHM_NAME, O_CREAT | O_RDWR, 0666);
if (shm_fd == -1) {
perror("shm_open");
return 1;
}

// 设置共享内存大小
if (ftruncate(shm_fd, SHM_SIZE) == -1) {
perror("ftruncate");
return 1;
}

// 将共享内存映射到进程地址空间
ptr = mmap(NULL, SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
if (ptr == MAP_FAILED) {
perror("mmap");
return 1;
}

// 向共享内存写入数据
strcpy((char *)ptr, "Hello from writer process!");

// 解除映射
if (munmap(ptr, SHM_SIZE) == -1) {
perror("munmap");
return 1;
}

// 关闭共享内存对象描述符
close(shm_fd);

return 0;
}

读出端

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>

#define SHM_NAME "/my_shared_memory"
#define SHM_SIZE 100

int main() {
int shm_fd;
void *ptr;

// 打开共享内存对象
shm_fd = shm_open(SHM_NAME, O_RDWR, 0666);
if (shm_fd == -1) {
perror("shm_open");
return 1;
}

// 将共享内存映射到进程地址空间
ptr = mmap(NULL, SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
if (ptr == MAP_FAILED) {
perror("mmap");
return 1;
}

// 从共享内存读取数据并打印
printf("Read from shared memory: %s\n", (char *)ptr);

// 解除映射
if (munmap(ptr, SHM_SIZE) == -1) {
perror("munmap");
return 1;
}

// 关闭共享内存对象描述符
close(shm_fd);

// 删除共享内存对象
if (shm_unlink(SHM_NAME) == -1) {
perror("shm_unlink");
return 1;
}

return 0;
}

运行结果:

image-20241220190749596

6.共享内存实例–实现生产者消费者模型

使用 System V IPC 的共享内存实现生产者 - 消费者模型

  • 以下是使用 System V IPC 的shmgetshmatshmdtshmctl函数实现生产者 - 消费者模型的 C 代码:
c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <time.h>
#include <unistd.h>

#define BUFFER_SIZE 10

// 共享内存结构体
struct SharedMemory {
int buffer[BUFFER_SIZE];
int in;
int out;
};

// 信号量操作函数
void semaphore_op(int semid, int sem_num, int op) {
struct sembuf sem_b;
sem_b.sem_num = sem_num;
sem_b.sem_op = op;
sem_b.sem_flg = 0;
if (semop(semid, &sem_b, 1) == -1) {
perror("semop");
exit(1);
}
}

// 生产者函数
void producer(int shm_id, int semid) {
struct SharedMemory *shared_mem = (struct SharedMemory *)shmat(shm_id, NULL, 0);
if (shared_mem == (void *)-1) {
perror("shmat producer");
exit(1);
}

srand(time(NULL));
while (1) {
semaphore_op(semid, 0, -1); // 等待有空位(P操作)
int item = rand() % 100;
shared_mem->buffer[shared_mem->in] = item;
shared_mem->in = (shared_mem->in + 1) % BUFFER_SIZE;
printf("Producer produced: %d\n", item);
semaphore_op(semid, 1, 1); // 增加产品数量(V操作)
sleep(1);
}
shmdt(shared_mem);
}

// 消费者函数
void consumer(int shm_id, int semid) {
struct SharedMemory *shared_mem = (struct SharedMemory *)shmat(shm_id, NULL, 0);
if (shared_mem == (void *)-1) {
perror("shmat consumer");
exit(1);
}

while (1) {
semaphore_op(semid, 1, -1); // 等待有产品(P操作)
int item = shared_mem->buffer[shared_mem->out];
shared_mem->out = (shared_mem->out + 1) % BUFFER_SIZE;
printf("Consumer consumed: %d\n", item);
semaphore_op(semid, 0, 1); // 增加空位数量(V操作)
sleep(2);
}
shmdt(shared_mem);
}

int main() {
// 创建共享内存
int shm_id = shmget(IPC_PRIVATE, sizeof(struct SharedMemory), IPC_CREAT | 0666);
if (shm_id == -1) {
perror("shmget");
return 1;
}

// 创建信号量集
int semid = semget(IPC_PRIVATE, 2, IPC_CREAT | 0666);
if (semid == -1) {
perror("semget");
return 1;
}
union semun {
int val;
struct semid_ds *buf;
unsigned short *array;
struct seminfo *__buf;
} arg;
arg.val = BUFFER_SIZE;
// 初始化空闲缓冲区信号量(初始有BUFFER_SIZE个空位可插入数据)
if (semctl(semid, 0, SETVAL, arg) == -1) {
perror("semctl");
return 1;
}
arg.val = 0;
// 初始化产品信号量(初始无产品可供消费)
if (semctl(semid, 1, SETVAL, arg) == -1) {
perror("semctl");
return 1;
}

// 创建生产者和消费者进程(这里简单使用父子进程模拟)
pid_t pid = fork();
if (pid < 0) {
perror("fork");
return 1;
} else if (pid == 0) {
// 子进程为消费者
consumer(shm_id, semid);
} else {
// 父进程为生产者
producer(shm_id, semid);
}

// 等待子进程结束(这里只是简单等待,实际可能需要更完善的机制)
wait(NULL);

// 删除共享内存和信号量集
if (shmctl(shm_id, IPC_RMID, NULL) == -1) {
perror("shmctl");
return 1;
}
if (semctl(semid, 0, IPC_RMID) == -1) {
perror("semctl");
return 1;
}

return 0;
}

整体代码解释:

  • 共享内存结构体定义:
    • 定义了SharedMemory结构体,包含一个整数数组作为缓冲区,以及inout索引,用于生产者和消费者操作。
  • 信号量操作函数:
    • semaphore_op函数用于对信号量进行操作,实现P操作(sem_op为负,获取资源)和V操作(sem_op为正,释放资源)。
  • 生产者函数:
    • 通过shmat将共享内存连接到进程地址空间。
    • 使用semaphore_op等待空闲缓冲区信号量(semid的第 0 个信号量),生产一个随机数放入缓冲区,更新in索引,然后使用semaphore_op释放产品信号量(semid的第 1 个信号量)。
  • 消费者函数:
    • 同样通过shmat连接共享内存。
    • 使用semaphore_op等待产品信号量,从缓冲区取出数据,更新out索引,然后使用semaphore_op释放空闲缓冲区信号量。
  • 主函数:
    • 使用shmget创建共享内存,并使用semget创建信号量集。
    • 初始化信号量,一个表示空闲缓冲区数量,一个表示产品数量。
    • 使用fork创建子进程,子进程作为消费者,父进程作为生产者。
    • 最后等待子进程结束,并使用shmctlsemctl删除共享内存和信号量集。

7.无血缘关系的进程之间通信

  1. ftok函数的定义和功能
    • 函数原型key_t ftok(const char *pathname, int proj_id);
    • 功能ftok函数用于生成一个唯一的key(键值),这个key通常用于 System V IPC(进程间通信)机制中,如创建共享内存、消息队列和信号量集等。它将一个文件路径名(pathname)和一个项目标识符(proj_id)组合起来,生成一个适合作为 System V IPC 资源标识符的key值。
  2. 参数解释
    • const char *pathname
      • 这是一个指向文件路径名的指针。这个文件路径必须是一个已经存在的文件的有效路径,通常使用当前目录(.")或者一个程序相关的配置文件路径等。ftok函数会使用文件的inode(索引节点)信息作为生成key的一部分。
      • 注意,如果文件被删除然后重新创建,即使文件名相同,inode可能会改变,这会导致ftok生成不同的key值。
    • int proj_id
      • 这是一个0 - 255之间的整数,作为项目标识符。它和文件路径的inode信息一起组合生成key。不同的项目可以使用不同的proj_id来区分,这样即使基于同一个文件路径,不同的项目也能生成不同的key值用于各自的 IPC 资源。例如,一个程序中有两个不同的模块需要使用消息队列进行通信,它们可以使用相同的文件路径但不同的proj_id来生成不同的key,以创建两个独立的消息队列。
  3. 返回值
    • 成功时,ftok函数返回一个key_t类型的非负整数,这个整数可以作为shmgetmsggetsemget等 System V IPC 函数的key参数来创建或获取对应的 IPC 资源。
    • 失败时,返回-1,并且会设置errno来指示错误原因。常见的错误原因包括:
      • EACCESS:没有权限访问pathname指定的文件。
      • ENOENTpathname指定的文件不存在。

发送端

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <string.h>

#define MEMORY_SIZE 100

int main() {
// 通过ftok生成key
key_t key = ftok(".", 'b');
if (key == -1) {
perror("ftok");
return 1;
}

// 创建共享内存段
int shm_id = shmget(key, MEMORY_SIZE, IPC_CREAT | 0666);
if (shm_id == -1) {
perror("shmget");
return 1;
}

// 映射共享内存到进程地址空间
void *shm_ptr = shmat(shm_id, NULL, 0);
if (shm_ptr == (void *)-1) {
perror("shmat");
return 1;
}

// 向共享内存写入数据(这里简单写入字符串)
strcpy((char *)shm_ptr, "Hello from sender!");

// 分离共享内存
if (shmdt(shm_ptr) == -1) {
perror("shmdt");
return 1;
}
sleep(10);
return 0;
}

接收端

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <string.h>

#define MEMORY_SIZE 100

int main() {
sleep(3);
// 通过ftok生成key
key_t key = ftok(".", 'b');
if (key == -1) {
perror("ftok");
return 1;
}

// 获取共享内存段
int shm_id = shmget(key, MEMORY_SIZE, 0666);
if (shm_id == -1) {
perror("shmget");
return 1;
}

// 映射共享内存到进程地址空间
void *shm_ptr = shmat(shm_id, NULL, 0);
if (shm_ptr == (void *)-1) {
perror("shmat");
return 1;
}

// 从共享内存读取数据并打印
printf("Receiver read from shared memory: %s\n", (char *)shm_ptr);

// 分离共享内存
if (shmdt(shm_ptr) == -1) {
perror("shmdt");
return 1;
}

// 删除共享内存段
if (shmctl(shm_id, IPC_RMID, NULL) == -1) {
perror("shmctl for delete");
return 1;
}

return 0;
}

运行结果:

image-20241220190008467

9.消息队列

1.消息队列进程间通信原理

System V IPC 消息队列是一种进程间通信(IPC)机制,它允许不同进程通过发送和接收消息来进行通信。消息队列就像是一个邮箱系统,进程可以将消息(信件)发送到队列(邮箱)中,其他进程可以从这个队列中接收消息。

消息队列是在两个进程间传递二进制数据块的方式,每个数据块都有一个特定类型,接收方可以根据类型来有选择地接收数据,而不一定像管道和命名管道那样必须以先进先出的方式接收数据。

原理:多个进程通过共享消息队列的标识符(msqid)来访问同一个消息队列。发送进程将消息放入消息队列后,消息队列会按照一定的规则(如先进先出)存储这些消息。接收进程可以根据消息类型等条件从消息队列中取出消息。这样,不同进程之间就可以通过消息队列进行数据传输和通信,实现进程间的同步和信息共享。例如,在生产者 - 消费者模型中,生产者进程将生产的数据作为消息发送到消息队列,消费者进程从消息队列中接收消息并进行消费,通过消息类型等机制可以确保消息的正确发送和接收,从而实现生产者和消费者之间的协调工作。

Linux消息队列的API都定义在sys/msg.h头文件中,包括4个系统调用:msggetmsgsndmsgrcvmsgctl

2.msgget 系统调用

msgget系统调用创建一个消息队列,或获取一个已有的消息队列:

cpp
1
2
#include <sys/msg.h>
int msgget(key_t key, int msgflg);
  • 参数:
    • key_t key
      • 这是一个键值,用于标识一个全局唯一的消息队列。可以通过ftok函数生成一个唯一的key值,或者使用IPC_PRIVATE来创建一个私有消息队列(通常用于具有亲缘关系的进程,如父子进程)。
    • int msgflg
      • 用于控制消息队列的创建和访问权限,它由以下几种标志组成:
        • IPC_CREAT:如果消息队列不存在,则创建它。如果和IPC_EXCL一起使用(IPC_CREAT | IPC_EXCL),则只有在消息队列不存在时才创建,若已存在则msgget函数返回 - 1 并设置errnoEEXIST
        • 权限标志:如0666等,用于指定消息队列的访问权限,格式与文件权限相同(用户、组、其他用户的读、写、执行权限)。
  • 返回值:
    • 成功时,返回一个非负整数,即消息队列的标识符(msqid)。
    • 失败时,函数返回 - 1,并设置errno变量来指示错误原因,例如EEXIST(当IPC_CREAT | IPC_EXCL且队列已存在时)、ENOENT(当没有IPC_CREAT且队列不存在时)等。

如果它用于创建消息队列的话,与之相关的内核数据结构msqid_ds将被创建并初始化。

c
1
2
3
4
5
6
7
8
9
10
11
12
struct msqid_ds
{
struct ipc_perm msg_perm; // 消息队列的操作权限
time_t msg_stime; // 最后一次调用msgsnd的时间
time_t msg_rtime; // 最后一次调用msgrcv的时间
time_t msg_ctime; // 最后一次被修改的时间
unsigned long msg_cbytes; // 消息队列中已有的字节数
msgqnum_t msg_qnum; // 消息队列中已有的消息数
msglen_t msg_qbytes; // 消息队列允许的最大字节数
pid_t msg_lspid; // 最后执行msgsnd的进程的PID
pid_t msg_lrpid; // 最后执行msgrcv的进程的PID
};

3.msgsnd 系统调用

msgsnd系统调用将一条消息添加到消息队列中:

cpp
1
2
3
4
5
6
7
#include <sys/msg.h>
int msgsnd(int msqid, const void* msg_ptr, size_t msg_sz, int msgflg);

struct msgbuf{
long mtype;//消息类型
char mtext[512];//消息数据
};

参数:

int msqid

  • 消息队列的标识符,由msgget函数返回。

  • const void* msg_ptr

    • 指向要发送消息的指针。消息的结构必须以一个长整型成员变量开始,这个长整型变量用于存放消息类型,后面可以跟随消息的实际数据。msg_ptr参数指向一个准备发送的消息,消息被定义为如下类型:

      c
      1
      2
      3
      4
      struct msgbuf{
      long mtype; /* 消息类型 */
      char mtext[512]; /* 消息数据 */
      };
  • size_t msg_sz

    • 这是消息数据部分的大小,不包括消息类型的长整型变量所占的字节数。
  • int msgflg

    • 控制消息发送的行为,和semget的flag一样的,常用的标志有:
      • 0:表示阻塞发送,如果消息队列已满,则发送进程会阻塞,直到有空间可以发送消息。
      • IPC_NOWAIT:表示非阻塞发送,如果消息队列已满,则msgsnd函数立即返回 - 1,并设置errnoEAGAIN

返回值:

  • 成功时,返回0
  • 失败时,返回 - 1,并设置errno来指示错误原因,如EAGAIN(非阻塞发送时队列已满)、EINVAL(参数无效)、EIDRM(消息队列已被删除)等。

处于阻塞状态的msgsnd调用可能被如下两种异常情况所中断:

  • 消息队列被移除。此时msgsnd调用将立即返回并设置errno为EIDRM。

  • 程序接收到信号。此时msgsnd调用将立即返回并设置errno为EINTR。

msgsnd成功时将修改内核数据结构msqid_ds的部分字段,如下所示:

  • 将msg_qnum加1。

  • 将msg_lspid设置为调用进程的PID。

  • 将msg_stime设置为当前的时间。

4.msgrcv 系统调用

msgrcv系统调用从消息队列中获取消息:

cpp
1
2
#include <sys/msg.h>
int msgrcv(int msqid, void* msg_ptr, size_t msg_sz, long int msgtype, int msgflg);

参数:

  • int msqid
    • 消息队列的标识符。
  • void* msg_ptr
    • 一个指向接收消息缓冲区的指针。与msgsnd类似,缓冲区的结构应以一个长整型开始用于存放接收到的消息类型,后面是存放消息数据的空间。
  • size_t msg_sz
    • 这是接收消息缓冲区中数据部分的大小。
  • long int msgtype
    • 指定要接收的消息类型,可以有以下几种取值:
      • 0:接收(读取)消息队列中的第一条消息,不考虑消息类型。
      • > 0:接收第一个消息类型等于msgtype的消息。(除非指定了标志MSG_EXCEPT,见后文)
      • < 0:接收(读取)消息队列中第一个类型值比msgtype的绝对值小的消息。
  • int msgflg
    • 控制消息接收的行为,常用标志有:
      • 0:表示阻塞接收,如果消息队列中没有符合条件的消息,则接收进程会阻塞,直到有符合条件的消息到达。
      • IPC_NOWAIT:表示非阻塞接收,如果消息队列中没有符合条件的消息,则msgrcv函数立即返回 - 1,并设置errnoENOMSG
      • MSG_EXCEPT。如果msgtype大于0,则接收消息队列中第一个非msgtype类型的消息。
      • MSG_NOERROR。如果消息数据部分的长度超过了msg_sz,就将它截断。

返回值:

  • 成功时,返回接收到的消息数据部分的字节数。
  • 失败时,返回 - 1,并设置errno来指示错误原因,如ENOMSG(非阻塞接收时没有符合条件的消息)、EINVAL(参数无效)、EIDRM(消息队列已被删除)等。

处于阻塞状态的msgrcv调用还可能被如下两种异常情况所中断:

  • 消息队列被移除。此时msgrcv调用将立即返回并设置errno为EIDRM。

  • 程序接收到信号。此时msgrcv调用将立即返回并设置errno为EINTR。

msgrcv成功时将修改内核数据结构msqid_ds的部分字段,如下所示:

  • 将msg_qnum减1。

  • 将msg_lrpid设置为调用进程的PID。

  • 将msg_rtime设置为当前的时间。

5.msgctl 系统调用

msgctl系统调用,用于对消息队列进行控制操作(控制消息队列某些属性),如获取消息队列的状态信息、设置消息队列的属性、删除消息队列等。

cpp
1
2
#incldue <sys/msg.h>
int msgctl(int msqid, int command, struct msqid_ds* buf);

参数:

  • int msqid
    • 消息队列的标识符。
  • int command
    • 这是一个控制命令,用于指定对消息队列进行何种操作(见下表),常见的命令有:
      • IPC_STAT:获取消息队列的状态信息,并将其存储到buf所指向的struct msqid_ds结构体中。这个结构体包含了消息队列的各种属性,如操作权限、当前消息数量等。
      • IPC_SET:根据buf所指向的struct msqid_ds结构体中的信息来设置消息队列的属性。例如,可以修改消息队列的操作权限等。
      • IPC_RMID:删除由msqid标识的消息队列。这是一个非常重要且具有危险性的操作,一旦执行,消息队列及其所包含的消息将被永久删除。
  • struct msqid_ds* buf
    • 这是一个指向struct msqid_ds结构体的指针,其作用取决于command参数的值:
      • commandIPC_STAT时,buf用于存储获取到的消息队列的状态信息。
      • commandIPC_SET时,buf指向的结构体中的信息将被用于设置消息队列的属性。
      • 如果command不涉及IPC_STATIPC_SET操作(如IPC_RMID),buf通常可以设置为NULL

返回值:

  • 成功时,取决于command(见下表)。
  • 失败时,返回 - 1,并设置errno来指示错误原因,如EINVALmsqid无效,或者command参数无效,或者buf指向的结构体无效(在IPC_STATIPC_SET操作时))、EPERM(调用进程没有足够的权限来执行请求的操作)等。

image-20241220165757768

image-20241220165805087

6.函数使用案例

以下是使用 System V IPC 消息队列相关函数(msggetmsgsndmsgrcvmsgctl)的一个简单 C 语言案例:

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>

// 消息结构体
struct msgbuf {
long mtype;
char mtext[100];
};

int main() {
// 创建消息队列
int msqid = msgget(IPC_PRIVATE, 0666 | IPC_CREAT);
if (msqid == -1) {
perror("msgget");
return 1;
}

// 发送消息
struct msgbuf send_msg;
send_msg.mtype = 1;
strcpy(send_msg.mtext, "Hello, World!");
if (msgsnd(msqid, &send_msg, sizeof(send_msg.mtext), 0) == -1) {
perror("msgsnd");
return 1;
}

// 接收消息
struct msgbuf recv_msg;
if (msgrcv(msqid, &recv_msg, sizeof(recv_msg.mtext), 1, 0) == -1) {
perror("msgrcv");
return 1;
}
printf("Received message: %s\n", recv_msg.mtext);

// 获取消息队列状态 buf存储消息队列的信息
struct msqid_ds buf;
if (msgctl(msqid, IPC_STAT, &buf) == -1) {
perror("msgctl - IPC_STAT");
return 1;
}
printf("Messages in queue: %ld\n", buf.msg_qnum);

// 删除消息队列
if (msgctl(msqid, IPC_RMID, NULL) == -1) {
perror("msgctl - IPC_RMID");
return 1;
}

return 0;
}

代码解释:

消息结构体定义

  • 定义了struct msgbuf结构体,它包含一个长整型mtype(用于表示消息类型)和一个字符数组mtext(用于存储消息内容)。

创建消息队列(msgget

  • 使用IPC_PRIVATE作为key来创建一个新的私有消息队列,权限设置为0666(用户、组和其他用户都有读写权限)。如果msgget调用成功,返回消息队列标识符msqid;否则,打印错误信息并返回。

发送消息(msgsnd

  • 初始化send_msg结构体,设置mtype1,并将消息内容设置为"Hello, World!"
  • 使用msgsnd函数将消息发送到消息队列中。msgsnd函数的参数包括消息队列标识符msqid、消息结构体指针&send_msg、消息数据部分大小sizeof(send_msg.mtext)和标志0(表示阻塞发送,如果队列满则等待)。如果发送失败,打印错误信息并返回。

接收消息(msgrcv

  • 初始化recv_msg结构体。
  • 使用msgrcv函数从消息队列中接收消息。msgrcv函数的参数包括消息队列标识符msqid、接收消息结构体指针&recv_msg、接收消息数据部分大小sizeof(recv_msg.mtext)、要接收的消息类型1和标志0(表示阻塞接收,如果没有符合条件的消息则等待)。如果接收失败,打印错误信息并返回。
  • 接收到消息后,打印出消息内容。

获取消息队列状态(msgctl

  • 定义struct msqid_ds类型的变量buf
  • 使用msgctl函数的IPC_STAT命令获取消息队列的状态信息,并将其存储在buf中。如果获取状态失败,打印错误信息并返回。
  • 打印出消息队列中的消息数量(buf.msg_qnum)。

删除消息队列(msgctl

  • 使用msgctl函数的IPC_RMID命令删除消息队列。如果删除失败,打印错误信息并返回。

image-20241220171912147

7.实现生产者消费者模型

**消息队列并不能实现互斥。**以下是使用上述消息队列函数实现的生产者 - 消费者模型的 C 代码:

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>
#include <time.h>
#include <unistd.h>

#define MAX_MSG_SIZE 100

// 消息结构体
struct msgbuf {
long mtype;
char mtext[MAX_MSG_SIZE];
};

// 生产者函数
void producer(int msqid) {
struct msgbuf msg;
msg.mtype = 1; // 消息类型设为1

srand(time(NULL));
while (1) {
int num = rand() % 100;
sprintf(msg.mtext, "%d", num);
if (msgsnd(msqid, &msg, sizeof(msg.mtext), 0) == -1) {
perror("msgsnd");
exit(1);
}
printf("Producer sent: %s\n", msg.mtext);
sleep(1);
}
}

// 消费者函数
void consumer(int msqid) {
struct msgbuf msg;
while (1) {
if (msgrcv(msqid, &msg, sizeof(msg.mtext), 1, 0) == -1) {
perror("msgrcv");
exit(1);
}
printf("Consumer received: %s\n", msg.mtext);
sleep(2);
}
}

int main() {
// 创建消息队列
int msqid = msgget(IPC_PRIVATE, IPC_CREAT | 0666);
if (msqid == -1) {
perror("msgget");
return 1;
}

pid_t pid = fork();
if (pid < 0) {
perror("fork");
return 1;
} else if (pid == 0) {
// 子进程为消费者
consumer(msqid);
} else {
// 父进程为生产者
producer(msqid);
}

// 等待子进程结束(这里只是简单等待,实际可能需要更完善的机制)
wait(NULL);

// 删除消息队列
if (msgctl(msqid, IPC_RMID, NULL) == -1) {
perror("msgctl");
return 1;
}

return 0;
}

代码解释

消息结构体定义:

  • 定义了struct msgbuf结构体,包含一个长整型的mtype(消息类型)和一个字符数组mtext(用于存放消息数据)。

生产者函数:

  • 生成一个随机数,将其转换为字符串后放入消息结构体的mtext中,然后使用msgsnd将消息发送到消息队列中,发送的消息类型为1,发送操作是阻塞的(msgflg0)。

消费者函数:

  • 使用msgrcv从消息队列中接收消息类型为1的消息,接收操作是阻塞的(msgflg0),接收到消息后打印出消息内容。

主函数:

  • 使用msgget创建一个私有消息队列。
  • 通过fork创建子进程,子进程作为消费者,父进程作为生产者。
  • 最后等待子进程结束,并使用msgctl删除消息队列。

并不能够实现互斥,只是能够通信。

运行结果:

image-20241220173408405

8.无血缘关系进程通信

  1. ftok函数的定义和功能
    • 函数原型key_t ftok(const char *pathname, int proj_id);
    • 功能ftok函数用于生成一个唯一的key(键值),这个key通常用于 System V IPC(进程间通信)机制中,如创建共享内存、消息队列和信号量集等。它将一个文件路径名(pathname)和一个项目标识符(proj_id)组合起来,生成一个适合作为 System V IPC 资源标识符的key值。
  2. 参数解释
    • const char *pathname
      • 这是一个指向文件路径名的指针。这个文件路径必须是一个已经存在的文件的有效路径,通常使用当前目录(.")或者一个程序相关的配置文件路径等。ftok函数会使用文件的inode(索引节点)信息作为生成key的一部分。
      • 注意,如果文件被删除然后重新创建,即使文件名相同,inode可能会改变,这会导致ftok生成不同的key值。
    • int proj_id
      • 这是一个0 - 255之间的整数,作为项目标识符。它和文件路径的inode信息一起组合生成key。不同的项目可以使用不同的proj_id来区分,这样即使基于同一个文件路径,不同的项目也能生成不同的key值用于各自的 IPC 资源。例如,一个程序中有两个不同的模块需要使用消息队列进行通信,它们可以使用相同的文件路径但不同的proj_id来生成不同的key,以创建两个独立的消息队列。
  3. 返回值
    • 成功时,ftok函数返回一个key_t类型的非负整数,这个整数可以作为shmgetmsggetsemget等 System V IPC 函数的key参数来创建或获取对应的 IPC 资源。
    • 失败时,返回-1,并且会设置errno来指示错误原因。常见的错误原因包括:
      • EACCESS:没有权限访问pathname指定的文件。
      • ENOENTpathname指定的文件不存在。

发送端

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>

// 消息结构体
struct msgbuf {
long mtype;
char mtext[100];
};

int main() {
// 通过ftok生成key
key_t key = ftok(".", 'c');
if (key == -1) {
perror("ftok");
return 1;
}

// 创建消息队列
int msqid = msgget(key, IPC_CREAT | 0666);
if (msqid == -1) {
perror("msgget");
return 1;
}

// 准备发送消息的结构体
struct msgbuf send_msg;
send_msg.mtype = 1;
strcpy(send_msg.mtext, "Hello from sender!");

if (msgsnd(msqid, &send_msg, sizeof(send_msg.mtext), 0) == -1) {
perror("msgsnd");
return 1;
}

printf("Sender sent message...\n");

sleep(10);
// 删除消息队列
if (msgctl(msqid, IPC_RMID, NULL) == -1) {
perror("msgctl for delete");
return 1;
}

return 0;
}

接收端

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include <stdio.h>
#include <stdlib.combined
#include <sys/types.combined
#include <sys/ipc.combined
#include <sys/msg.combined
#include <string.combined

// 消息结构体
struct msgbuf {
long mtype;
char mtext[100];
};

int main() {
// 通过ftok生成key
key_t key = ftok(".", 'c');
if (key == -1) {
perror("ftok");
return 1;
}

// 获取消息队列
int msqid = msgget(key, 0666);
if (msqid == -1) {
perror("msgget")
return 1;
}

// 准备接收消息的结构体
struct msgbuf recv_msg;
if (msgrcv(msqid, &recv_msg, sizeof(recv_msg.mtext), 1, 0) == -1) {
perror("msgrcv")
return 1;
}

printf("Receiver received message: %s\n", recv_msg.mtext);

return 0;
}

运行结果:

image-20241220192302773

10.IPC 命令

以上3种System V IPC进程间通信方式都使用一个全局唯一的键值来描述一个共享资源,当程序调用semgetshmgetmsgget时,就创建了这些共享资源的一个实例。Linux提供ipcs命令来观察当前系统上拥有哪些共享资源实例:

image-20241220171101662

输出结果分段显示了系统拥有的消息队列、共享内存、信号量资源,可见,该系统目前尚未使用任何消息队列和信号量,但分配了一组键值为0的共享内存。这些信号所有者正是apache,它们是由httpd服务器程序创建的。

我们还可用ipcrm命令删除遗留在系统中的共享资源。

11.在进程间传递文件描述符

fork调用后,父进程中打开的文件描述符在子进程中仍然保持打开,所以文件描述符可以很方便地从父进程传递到子进程。注意,传递一个文件描述符并不是传递一个文件描述符的值,而是在接收进程中创建一个新的文件描述符,且新文件描述符和发送进程中被传递的文件描述符指向内核中相同的文件表项。

要想在两个不相干的进程之间传递文件描述符,在Linux下,可利用UNIX域socket在进程间传递特殊的辅助数据,以实现文件描述符的传递,下例代码中,子进程中打开一个文件描述符,然后将它传递给父进程,父进程则通过读取该文件描述符来获得文件内容:

c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#include <sys/socket.h>
#include <fcntl.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <assert.h>
#include <string.h>

static const int CONTROL_LEN = CMSG_LEN(sizeof(int));
// 发送文件描述符,fd参数是用来传递信息的UNIX域socket,fd_to_send参数是待发送的文件描述符
void send_fd(int fd, int fd_to_send) {
struct iovec iov[1];
struct msghdr msg;
char buf[0];

iov[0].iov_base = buf;
iov[0].iov_len = 1;
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = 1;

cmsghdr cm;
cm.cmsg_len = CONTROL_LEN;
cm.cmsg_level = SOL_SOCKET;
cm.cmsg_type = SCM_RIGHTS;
*(int *)CMSG_DATA(&cm) = fd_to_send;
msg.msg_control = &cm; /* 设置辅助数据 */
msg.msg_controllen = CONTROL_LEN;

sendmsg(fd, &msg, 0); /* 通用数据读 */
}

// 接收目标文件描述符
int recv_fd(int fd) {
struct iovec iov[1];
struct msghdr msg;
char buf[0];

iov[0].iov_base = buf;
iov[0].iov_len = 1;
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = 1;

cmsghdr cm;
msg.msg_control = &cm;
msg.msg_controllen = CONTROL_LEN;

recvmsg(fd, &msg, 0); /* 通用数据写 */

int fd_to_read = *(int *)CMSG_DATA(&cm);
return fd_to_read;
}

int main() {
int pipefd[2];
int fd_to_pass = 0;
/* 创建父、子进程间的管道,文件描述符pipefd[0]和pipefd[1]都是UNIX域socket */
int ret = socketpair(PF_UNIX, SOCK_DGRAM, 0, pipefd);
assert(ret != -1);

pid_t pid = fork(); /* 创建子进程 */
assert(pid >= 0);

if (pid == 0) {
close(pipefd[0]); /* 子进程关闭读 */
fd_to_pass = open("test.txt", O_RDWR, 0666); /* 打开文件 */
send_fd(pipefd[1], (fd_to_pass > 0) ? fd_to_pass : 0); /* 子进程通过管道将文件描述符发送到父进程,如果文件打开失败,则子进程将标准输入发送到父进程 */
close(fd_to_pass);
exit(0);
}

close(pipefd[1]); /* 父进程关闭写 */

fd_to_pass = recv_fd(pipefd[0]); /* 父进程从管道接收目标文件描述符 */
char buf[1024]; /* 存放数据 */
memset(buf, '\0', 1024);
// 读目标文件描述符,验证其有效性
read(fd_to_pass, buf, 1024);
printf("I got fd %d and data %s\n", fd_to_pass, buf);
close(fd_to_pass);
}

结构体 iovec和msghdr见:Linux高性能服务器编程 | 读书笔记 | 4. 高级 I/O 函数-CSDN博客通用数据读写函数部分