11. 多进程编程

  • 复制进程映像的fork系统调用和替换进程映像的exec系列系统调用。
  • 僵尸进程以及如何避免僵尸进程。
  • 进程间通信(Inter Process Communication,IPC)最简单的方式:管道。
  • 三种System V进程间通信方式:信号量、消息队列、共享内存。它们是由AT&T System V2版本的UNIX引入的,所以统称为System V IPC。
  • 在进程间传递文件描述符的通用方法:通过UNIX本地域socket传递特殊的辅助数据。(关于辅助数据,参考《Linux 高性能服务器编程》P85)

0.前置知识

1.PCB

PCB进程控制块本质是一个task_struct结构体

  • 进程id,每个进程唯一一个

  • 进程切换时需要保存和恢复的一些CPU寄存器

  • 描述虚拟地址空间的信息

  • 进程状态: 初始态、就绪态、运行态、挂起态、终止态。

  • 进程工作目录位置

  • umask掩码 指定文件权限

  • 文件描述符表,包含很多指向flie的结构体指针

  • 和信号相关的信息

  • 用户id和组id

  • 会话和进程组

  • 进程可以使用的资源上限

  • 描述控制终端的信息

2.环境变量

image-20241218101500357

image-20241218101621870

image-20241218101644318

image-20241218101923136

1
2
3
$+环境变量   查看环境变量内容
echo $+环境变量 查看环境变量内容
env 查看所有的环境变量

image-20241218102203318

3.进程地址空间

image-20241218105718888

  1. 进程地址空间概述
    • 进程地址空间是一种抽象的概念,它为进程提供了一种独立于物理内存的虚拟内存视图。对于 32 位的操作系统,进程地址空间的大小通常是 4GB(2^32 字节)。这个地址空间被划分为不同的区域,用于存放不同类型的数据。
  2. 4GB 进程地址空间的划分
    • 用户空间(0 - 3GB)
      • 代码段(text segment):通常位于低地址部分,从 0x00400000 左右开始(在不同的操作系统和编译器下可能会有所差异)。它存放的是程序的可执行机器指令。这部分是只读的,因为程序在运行过程中指令通常是不允许被修改的。例如,一个简单的 C 程序编译后的可执行文件中的机器码就存放在代码段。当进程运行时,CPU 会从这里读取指令来执行程序的逻辑。
      • 数据段(data segment):在代码段之后,包含已初始化的全局变量和静态变量。这些变量在程序启动时就被分配了内存空间并且被赋予初始值。比如,在 C 语言中定义的全局变量int global_variable = 10;,这个变量就存放在数据段。数据段是可读可写的,因为程序在运行过程中可能会对这些全局和静态变量进行修改。
      • BSS 段(Block Started by Symbol):紧挨着数据段,存放未初始化的全局变量和静态变量。在程序加载时,系统会为 BSS 段中的变量分配内存空间,初始值被默认设置为 0。例如,int uninitialized_global_variable;这个变量就存放在 BSS 段。它也是可读可写的,因为在程序运行后可以对这些变量进行赋值操作。
      • 堆(heap):堆是从低地址向高地址增长的一块内存区域,用于动态分配内存。在 C 语言中,通过malloc()等函数分配的内存就在堆中。例如,int *ptr = (int *)malloc(sizeof(int));ptr所指向的内存空间就在堆中。堆的大小在程序运行时可以动态变化,它向上增长,直到遇到内存分配失败或者达到堆的最大限制。
      • 栈(stack):位于用户空间的高地址部分,是一种后进先出(LIFO)的数据结构。栈用于存储局部变量、函数参数和函数调用的返回地址等信息。当一个函数被调用时,函数的局部变量和参数会被压入栈中。例如,在一个函数void function(int param) { int local_variable; }中,paramlocal_variable都存储在栈中。栈是从高地址向低地址增长的,当栈溢出时(如函数递归调用过深),可能会导致程序崩溃。
    • 内核空间(3GB - 4GB)
      • 这部分地址空间是操作系统内核代码和数据所在的区域。包括内核代码(用于管理系统资源、调度进程等操作的指令)、内核数据结构(如进程控制块 PCB 等用于管理进程的信息)。当进程执行系统调用或者发生中断等情况时,CPU 会切换到内核态,从这个 3GB - 4GB 的内核空间执行相应的内核代码来处理系统相关的事务。例如,当进程进行文件读写操作时,会通过系统调用进入内核空间,由内核代码来完成实际的磁盘 I/O 操作。这样划分可以保证操作系统内核的安全性和稳定性,防止用户进程随意访问和修改内核数据和代码。

1.fork 系统调用

Linux下创建新进程的系统调用是fork

1
2
3
#include <sys/types.h>
#include <unistd.h>
pid_t fork(void);

返回值:该函数的每次调用都返回两次,在父进程中返回的是子进程的PID,在子进程中则返回0,该返回值是后续代码判断当前进程是父进程还是子进程的依据。失败时返回-1,并设置errno。

fork函数复制当前进程,在内核进程表中创建一个新的进程表项,新的进程表项中很多属性和原进程相同,如堆指针、栈指针、标志寄存器的值,但也有很多属性被赋予了新值,如子进程的PPID被设置成原进程的PID,信号位图被清除(原进程设置的信号处理函数不再对新进程起作用)。

子进程的代码和父进程完全相同,同时它还会复制父进程的数据(堆数据、栈数据、静态数据),数据的复制采用的是写时复制(copy on write),即只有在任一进程(父进程或子进程)对数据执行了写操作时,复制才会发生(先是缺页中断,然后操作系统给子进程分配内存并复制父进程的数据),即便如此,如果我们在程序中分配了大量内存,那么使用fork函数时也应当谨慎,尽量避免没必要的内存分配和数据复制。

此外,创建子进程后,父进程中打开的文件描述符默认在子进程中也是打开的,且文件描述符的引用计数加1。父进程的用户根目录、当前工作目录等变量的引用计数也会加 1。

练习:循环创建n个子进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include<pthread.h>

int main()
{
pid_t pid;
int i;
for(i=0;i<5;i++)
if(fork()==0)//如果是子进程就不让它继续往下fork了,不然产生的就不是5个子进程而是2的5次方-1个进程了
break;
if(5==i)
{
sleep(5);
printf("I'm parent\n");
}
else
{
sleep(i);
printf("I'm %dth child\n",i+1);
}
return 0;
}

image-20241218103733857

父子进程gdb调试

image-20241218105030617

这两条命令都是在gdb中使用

2.getpid&&getppid

1
2
3
4
5
#include <sys/types.h>
#include <unistd.h>

pid_t getpid(void);//获取自己进程的pid
pid_t getppid(void);//获取当前进程的父进程的pid

3.父子进程共享内容

父子相同处:

全局变量、.data、.text、、堆、环境变量、用户ID、宿主目录、进程工作目录、信号处理方式 …

父子不同处:

1.进程ID

2.fork返回值

3.父进程ID

4.进程运行时间

5.闹钟(定时器)

6.未决信号集

读时共享写时复制

似乎,子进程复制了父进程0-3G用户空间内容,以及父进程的PCB,但pid不同。真的每fork一个子进程都要将父进程的0-3G地址空间完全拷贝一份,然后在映射至物理内存吗?

当然不是!**父子进程间遵循读时共享写时复制的原则。**这样设计,无论子进程执行父进程的逻辑还是执行自己的逻辑都能节省内存开销。

重点注意!躲避父子进程共享全局变量的知识误区!

更好的理解读时共享写时复制:

假如我有一个全局变量 a=100

父进程和子进程在读取a的时候就直接读取a=100

而在修改a的时候

父进程会先复制一份,比如 b=a,然后去修改b,不会修改原来的a,子进程修改的时候同样也是如此

父进程a=150就相当于b=a,b=150,在此之后,子进程读取a读出来还是100

重点:父子进程共享:1. 文件描述符(打开文件的结构体)2.mmap建立的映射区

4.exec 系列系统调用

有时我们需要在子进程中执行其他程序,即替换当前进程映像。

fork 创建子进程后执行的是和父进程相同的程序(但有可能执行不同的代码分支),子进程往往要调用一种exec函数

以执行另一个程序。当进程调用一种exec函数时,该进程的用户空间代码和数据完全被新程序替换,从新程序的启动

例程开始执行。调用exec并不创建新进程,所以调用exec前后该进程的id并未改变。

将当前进程的.text、.data替换为所要加载的程序的.text、.data,然后让进程从新的.text第一条指令开始执行,但进程ID不变,换核不换壳。

exec系列函数:

1
2
3
4
5
6
7
8
9
#incldue <unistd.h>
extern char** environ;

int execl(const char* path, const char* arg, ...);
int execlp(const char* file, const char* arg, ...);
int execle(const char* path, const char* arg, ..., char* const envp[]);
int execv(const char* path, char* const argv[]);
int execvp(const char* file, char* const argv[]);
int execve(const char* path, char* const arg[], char* const envp[]);

参数

path:指定可执行文件的完整路径;

file:文件名,该文件的具体位置在环境变量PATH中搜寻。

arg:接受可变参数,

argv:则接受参数数组,它和 arg 都会被传递给新程序(pathfile参数指定的程序)的main函数。变参要以NULL结尾

envp:用于设置新程序的环境变量,如果未设置它,则新程序将使用由全局变量environ指定的环境变量。

名字后面带p的就是要借助PATH环境变量的

返回值

一般,exec函数是不返回的,除非出错,此时它返回-1,并设置errno。如果没出错,则原进程中exec调用之后的代码都不会执行,因为此时原程序已经被exec的参数指定的程序完全替换(包括代码和数据)。

exec函数不会关闭原进程打开的文件描述符,除非该文件描述符被设置了类似SOCK_CLOEXEC的属性。

SOCK_CLOEXEC属性用于在子进程中关闭 socket,见《Linux 高性能服务器编程》P75

1.重点掌握 execl和execlp

execlp一般用于执行系统的,比如ls

execl一般用于执行咱们自己编写的,比如a.out

execlp:加载一个进程,借助PATH 环境变量

1
int execlp(const char *file, const char *arg, ... );

参数1:要加载的程序的名字。该函数需要配合PATH环境变量来使用,当PATH中所有目录搜索后没有参数1则出错返回。

该函数通常用来调用系统程序。如:ls、date、cp、cat等命令。

成功:无返回;失败 :- 1

execl

加载一个进程,通过 路径+程序名 来加载。

1
int execl(const char *path, const char *arg, ... );

对比execlp,如加载”Is”命令带有-l,-F参数

使用程序名在PATH中搜索。

使用参数1给出的绝对路径搜索。

成功:无返回;失败 :- 1

例子

1
2
3
4
execlp("'ls", "'ls", "-l", "-F", NULL);
//argv[0] argv[1]
execl("/bin/ls", "ls", "-l", "-F", NULL);
execl("./a.out", "./a.out", NULL);

因为argv[0]是要运行的可执行文件的名字,所以第二个参数还得传ls才能表示 ls -l -F不然就只能表示-l -F,没有前面的可执行文件了

第二个a.out要加./但是第二个ls不用加bin是因为第二个参数传入到命令行的时候

可执行文件执行就是要./,而ls直接输入就行

简单演示execvp

同样执行ls命令

1
2
char *argv[]={"ls","-l","-h",NULL};
execvp("ls",argv);

就是参数形式不一样剩下的都一样

2.练习

将当前系统中的进程信息打印到文件中

1
2
3
4
5
显示进程信息:
ps aux
进程信息写到process_information这个文件里面
ps aux > process_information

exec_ps.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>

int main(void)
{
int fd;

fd = open("process_information", O_WRONLY|O_CREAT|O_TRUNC, 0644);
if(fd < 0){
perror("open process_information error");
exit(1);
}
//标准输出重定向到fd,本来写到标准输出的写到fd中
dup2(fd, STDOUT_FILENO);
//获取进程信息
execlp("ps", "ps", "ax", NULL);
perror("execlp error");

//close(fd);这个写不写都行

return 0;
}

3.exec族一般规律

exec函数一旦调用成功即执行新的程序,不返回。只有失败才返回,错误值-1。所以通常我们直接在exec函数调用后直接调用perror()和exit(),无需if判断。

I (list) 命令行参数列表

p (path) 搜素file时使用path变量

v (vector) 使用命令行参数数组

e (environment) 使用环境变量数组,不使用进程原有的环境变量,设置新加载程序运行的环境变量

事实上,只有execve是真正的系统调用,其它五个函数(都是库函数)最终都调用execve,所以execve在man手册第2节,其它函数在man手册第3节。这些函数之间的关系如下图所示。

image-20241218113520624

5.处理僵尸进程

对多进程程序而言,父进程一般需要跟踪子进程的退出状态,因此,当子进程结束运行时,内核不会立即释放该进程的进程表表项,以满足父进程后续对该子进程退出信息的查询(如果父进程还在运行)。

子进程进入僵尸态的两种情况:

  • 在子进程结束运行后,父进程读取其退出状态前,我们称该子进程处于僵尸态。
  • 父进程结束或者异常终止,而子进程继续运行时,此时子进程的PPID将被操作系统设置为1,即init进程,init进程接管了该子进程,并等待它结束。在父进程退出之后,子进程退出之前,该子进程也处于僵尸态。(也可以叫孤儿进程)

如果父进程没有正确处理子进程的返回信息,子进程将停留在僵尸态,并占据着内核资源,这是不能容许的,因为内核资源有限,以下函数在父进程中调用,以等待子进程的结束,并获取子进程的返回信息,从而避免了僵尸进程的产生,或使子进程从僵尸态结束。

不管是wait还是waitpid,一次调用只能回收一个子进程

1.wait

1
2
3
#include <sys/types.h>
#include <sys/wait.h>
pid_t wait(int* stat_loc);

wait函数将阻塞进程,直到该进程的某个子进程结束运行,它返回结束运行的子进程的PID,并将该子进程的退出状态信息存储于stat_loc参数指向的内存中,sys/wait.h文件中定义了以下宏函数来帮助解释子进程的退出状态信息。可分为以下三组宏函数

image-20241218121051430

参数:stat_loc是传出参数

返回值:成功返回回收的子进程pid失败返回-1

wait的三个功能:

1.阻塞等待子进程退出

2.回收子进程残留资源

3.获取子进程结束状态(退出原因)

具体使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/wait.h>

int main(void)
{
pid_t pid, wpid;
int status;

pid = fork();
if (pid == 0) {
printf("---child, my id= %d, going to sleep 10s\n", getpid());
sleep(10);
printf("-------------child die--------------\n");
return 73;
} else if (pid > 0) {
//wpid = wait(NULL); // 不关心子进程结束原因
wpid = wait(&status); // 如果子进程未终止,父进程阻塞在这个函数上
if (wpid == -1) {
perror("wait error");
exit(1);
}
if (WIFEXITED(status)) { //为真,说明子进程正常终止.
printf("child exit with %d\n", WEXITSTATUS(status));

}
if (WIFSIGNALED(status)) { //为真,说明子进程是被信号终止.

printf("child kill with signal %d\n", WTERMSIG(status));
}

printf("------------parent wait finish: %d\n", wpid);
} else {
perror("fork");
return 1;
}

return 0;
}

2.waitpid

wait函数的阻塞特性不是服务器程序期望的,而waitpid函数解决了这个问题。

1
pid_t waitpid(pid_t pid, int *stat_loc, int option);

参数

  • pid:回收指定pid的子进程。

    • 若参数取值为-1,那么它就和wait函数相同,即等待任意一个子进程结束。
    • 0 回收和当前调用waitpid一个组的所有子进程
    • <-1 回收制定进程组内的任意子进程
  • stat_loc:和wait函数的stat_loc参数的相同。

  • options:可以控制waitpid函数的行为,该参数最常用的取值为WNOHANG

返回值

>0 返回子进程的PID

=0 参数三制定了WNOHANG,此时waitpid函数是非阻塞的,并且pid指定的目标子进程尚未终止。

-1 失败时返回-1并设置errno。

**要在事件已经发生的情况下执行非阻塞调用才能提高程序的效率。**对于 waitpid 函数,我们最好在某个子进程退出之后再调用它。

当一个进程结束时,它将给父进程发送一个SIGCHLD信号,我们可以在父进程中捕获SIGCHLD信号,并在信号处理函数中调用waitpid函数以“彻底结束”一个子进程:

1
2
3
4
5
6
7
static void handle_child(int sig) {
pid_t pid;
int stat;
while ((pid = waitpid(-1, &stat, WNOHANG)) > 0) {
// 对结束的子进程进行善后处理
}
}

3.waitpid指定回收一个特定pid

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include<pthread.h>
#include<wait.h>

int main(int argc, char *argv[])
{
int n = 5, i; //默认创建5个子进程
pid_t pid,wpid,tmpid;

if(argc == 2){
n = atoi(argv[1]);
}

for(i = 0; i < n; i++) {//出口1,父进程专用出口
pid = fork();
if(pid == 0) {
break; //出口2,子进程出口,i不自增
}
if (i == 2){
tmpid= pid;
printf("这次要被回收的进程pid=%d\n",tmpid);
}
}

if(n == i){
sleep(n);
printf("I am parent, 将要waitpid pid = %d\n", tmpid);
wpid=waitpid(tmpid,NULL,WNOHANG);
/*没有sleep的版本
printf("I am parent, 将要waitpid pid = %d\n", tmpid);
wpid=waitpid(tmpid,NULL,0);
*/
if(wpid==-1)
{
perror("waitpid error");
exit(1);
}
printf("I am parent, 已经waitpid pid = %d\n", wpid);
} else {
sleep(i);
printf("I'm %dth child, pid = %d\n",i+1, getpid());
}

return 0;
}

1.如果waitpid是wohang的话就要加sleep(n),因为这个代表非阻塞,父进程执行太快导致子进程还没完就调用了waitpid,那当然回收不到子进程了

2.不加sleep(n),那就得把waitpid设置为阻塞的,等待指定的子进程结束后进行回收

运行结果:

image-20241218141639461

4.回收多个子进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include<pthread.h>
#include<wait.h>

int main(int argc, char *argv[])
{
int n = 5, i; //默认创建5个子进程
pid_t pid,wpid,tmpid;

if(argc == 2){
n = atoi(argv[1]);
}

for(i = 0; i < n; i++) {//出口1,父进程专用出口
pid = fork();
if(pid == 0) {
break; //出口2,子进程出口,i不自增
}
}

if(n == i){
//使用阻塞方式回收子进程
while((wpid=waitpid(-1,NULL,0)))
{
if(wpid==-1)
continue;
printf("waitpid pid = %d\n", wpid);
}
//非阻塞
/*while((wpid=waitpid(-1,NULL,WNOHANG))!=-1)
{
if(wpid>0)
printf("waitpid pid = %d\n", wpid);
else if(wpid==0)
{
sleep(1);
continue;
}
}*/
} else {
sleep(i);
printf("I'm %dth child, pid = %d\n",i+1, getpid());
}

return 0;
}

阻塞:

image-20241218143556952

非阻塞:

image-20241218143358874

6.管道

管道可以实现进程内部的通信。

管道也是父进程和子进程间通信的常用手段。

管道能在父、子进程间传递数据,利用的是调用fork后两个管道文件描述符都保持打开,一对这样的文件描述符能保证父子进程间一个方向的数据传输,父进程和子进程必须有一个关闭fd[0],另一个关闭fd[1]

如果要实现父子进程之间的双向数据传输,可以使用两个管道。socket编程接口提供了一个创建全双工管道的系统调用socketpair

管道只能用于有关联的两个进程(如父、子进程)间的通信,而以下要讨论的三种System V IPC 能用于无关联的多个进程之间的通信,因为它们都使用一个全局唯一的键值来标识一条信道。有一种特殊的管道称为FIFO(First In First Out,先进先出),也叫命名管道,它也能用于无关联进程之间的通信,但FIFO管道在网络编程中用得不多,所以我们不讨论它。

1.概述

管道是一种最基本的IPC机制,作用于有血缘关系的进程之间,完成数据传递。调用pipe系统函数即可创建一个管道。有如下特质:

  1. 其本质是一个伪文件(实为内核缓冲区)
  2. 由两个文件描述符引用,一个表示读端,一个表示写端
  3. 规定数据从管道的写端流入管道,从读端流出

管道的原理:

管道实为内核使用环形队列机制,借助内核缓冲区(4k)实现

管道的局限性:

1 数据不能进程自己写,自己读

2.管道中数据不可反复读取。一旦读走,管道中不再存在

3 采用双向半双工通信方式,数据只能在单方向上流动

4 只能在有公共祖先的进程间使用管道

双向半双工例子:

对于管道来说,一旦我进程间通信时,第一次发生数据交换是A进程读B进程写,那第二次就不可以A写B读了,只能是A读B写。

常见通信方式:单工通信、半双工通信、全双工通信。

注意:因为父子进程共享文件描述符所以父进程已经创建并打开的管道子进程也能用

2.使用

创建并打开管道

1
2
#include<unistd.h>
int pipe(int fd[2]);

参数:

fd[0]表示读端

fd[1]表示写端

返回值:成功0失败-1

父进程写子进程读:

一开始父子进程都持有读端和写端

image-20241218152941810

父进程关闭写端子进程关闭读端后就有一条明确的数据流通方向

image-20241218153042572

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/wait.h>

void sys_err(const char *str)
{
perror(str);
exit(1);
}

int main(void)
{
pid_t pid;
char buf[1024];
int fd[2];
char *p = "test for pipe\n";

if (pipe(fd) == -1)
sys_err("pipe");

pid = fork();
if (pid < 0) {
sys_err("fork err");
} else if (pid == 0) {
//子进程关闭写端
close(fd[1]);
int len = read(fd[0], buf, sizeof(buf));
write(STDOUT_FILENO, buf, len);
close(fd[0]);
} else {
//父进程关闭读端
close(fd[0]);
write(fd[1], p, strlen(p));
sleep(1);
wait(NULL);
close(fd[1]);
}

return 0;
}

image-20241218153547049

3.管道的读写行为

1.读管道

​ 1. 管道中有数据,read返回实际读到的字节数

​ 2.管道中无数据:

​ (1)管道写端被全部关闭,没有人会继续往管道写数据了,read返回0(好像读到文件结尾)

​ (2)写端没有全部被关闭,read阻塞等待(不久的将来可能有数据递达,此时会让出cpu)

2.写管道

  1. 管道读端全部被关闭,进程异常终止(也可使用捕捉SIGPIPE信号,使进程不终止)

  2. 管道读端没有全部关闭:

​ (1)管道已满,write阻塞

​ (2)管道未满,write将数据写入,并返回实际写入的字节数

3.练习

1.管道实现 ls|wc-l

使用管道实现父子进程间通信,完成:ls|wc-l。假定父进程实现ls,子进程实现wc

Is命令正常会将结果集写出到stdout,但现在会写入管道的写端;wc-l 正常应该从stdin读取数据,但
此时会从管道的读端读

1.创建打开管道

2.fork子进程

3.关闭父进程读端,关闭子进程写端

4.父进程调用execlp执行ls命令

5.子进程调用execlp执行wc -l

6.父进程调用dup2把标准输出重定向到管道写端(往管道写数据,原来ls的输出是在标准输出的,现在输出到管道)

7.子进程调用dup2把标准输入重定向到管道读端(从管道拿数据,原来wc从标准输入拿数据,现在从管道拿)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <stdio.h>
#include <unistd.h>
#include <sys/wait.h>

int main(void)
{
pid_t pid;
int fd[2];

pipe(fd);
pid = fork();

if (pid == 0) { //child
close(fd[1]); //子进程从管道中读数据,关闭写端
dup2(fd[0], STDIN_FILENO); //让wc从管道中读取数据
execlp("wc", "wc", "-l", NULL); //wc命令默认从标准读入取数据

} else {

close(fd[0]); //父进程向管道中写数据,关闭读端
dup2(fd[1], STDOUT_FILENO); //将ls的结果写入管道中
execlp("ls", "ls", NULL); //ls输出结果默认对应屏幕
}

return 0;
}
  • 程序不时的会出现先打印$提示符,再出程序运行结果的现象。
  • 这是因为:父进程执行ls命令,将输出结果给通过管道传递给子进程去执行wc命令,这时父进程若先于子进程打印wc运行结果之前被shell使用wait函数成功回收,shell就会先于子进程打印wc运行结果之前打印$提示符。
  • 在这之中子进程一定得等父进程写完数据以后才会执行自己的代码,所以一定是父进程先执行完毕。
  • 所以解决方法:让子进程执行ls,父进程执行wc命令。或者在兄弟进程间完成。

2.管道实现兄弟进程通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <stdio.h>
#include <unistd.h>
#include <sys/wait.h>

int main(void)
{
pid_t pid;
int fd[2], i;

pipe(fd);

for (i = 0; i < 2; i++) {
if((pid = fork()) == 0) {
break;
}
}

if (i == 0) { //兄
close(fd[0]); //写,关闭读端
dup2(fd[1], STDOUT_FILENO);
execlp("ls", "ls", NULL);
} else if (i == 1) { //弟
close(fd[1]); //读,关闭写端
dup2(fd[0], STDIN_FILENO);
execlp("wc", "wc", "-l", NULL);
} else {
close(fd[0]);
close(fd[1]);
for(i = 0; i < 2; i++) //两个儿子wait两次
wait(NULL);
}

return 0;
}

image-20241218200410336

注意要关闭父进程持有的读端和写端,不然形不成数据的单向流动

3.测试管道是否允许一个pipe有一个写端多个读端?有一个读端多个写端?

是允许的,但是一般都是写成一个读端一个写端

  1. 管道允许一个读端多个写端
    • 管道是可以有一个读端和多个写端的。这在很多场景下是非常有用的,例如在日志系统中,多个不同的进程可以作为写端向一个管道写入日志信息,而一个专门的日志收集进程作为读端从管道中读取这些日志信息进行处理。多个写端可以同时向管道写入数据,不过需要注意数据的同步问题,因为如果多个写端同时写入可能会导致数据混乱,通常需要配合信号量等同步机制来保证数据的有序写入。
  2. 管道也允许一个写端多个读端
    • 管道同样允许一个写端多个读端。当数据被写入管道后,所有的读端都可以读取到这些数据。数据从管道中被读取后,对于管道中的其他读端来说,数据仍然存在(只要没有被其他读端全部读取完)。
    • 例如,在一个数据分发系统中,一个进程作为写端向管道写入数据,多个其他进程作为读端可以从管道中读取相同的数据进行不同的处理,如一个读端用于数据显示,另一个读端用于数据存储等。管道中的数据是可以被多个读端共享读取的,并不是一个读端读取后数据就消失了。管道内部维护了一个缓冲区,数据存储在这个缓冲区中,读端从缓冲区读取数据,只要缓冲区中的数据没有被全部读取,其他读端仍然可以读取剩余的数据。

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <stdio.h>
#include <unistd.h>
#include <sys/wait.h>
#include <string.h>
#include <stdlib.h>

int main(void)
{
pid_t pid;
int fd[2], i, n;
char buf[1024];

int ret = pipe(fd);
if(ret == -1){
perror("pipe error");
exit(1);
}

for(i = 0; i < 2; i++){
if((pid = fork()) == 0)
break;
else if(pid == -1){
perror("pipe error");
exit(1);
}
}

if (i == 0) {
close(fd[0]);
write(fd[1], "1.hello\n", strlen("1.hello\n"));
} else if(i == 1) {
close(fd[0]);
write(fd[1], "2.world\n", strlen("2.world\n"));
} else {
close(fd[1]); //父进程关闭写端,留读端读取数据
sleep(1); //sleep的原因是为了让两个写端都把数据写上,而不是就只有其中一个写上之后父进程就读了
n = read(fd[0], buf, 1024); //从管道中读数据
write(STDOUT_FILENO, buf, n);

for(i = 0; i < 2; i++) //两个儿子wait两次
wait(NULL);
}

return 0;
}

  • 当两个子进程快速地向管道写入数据时,管道缓冲区可能在第一个子进程写入1.hello\n后,父进程就开始读取数据。由于管道缓冲区的数据可能没有被第二个子进程的2.world\n完全覆盖或者父进程读取操作已经完成,就可能导致父进程只读取到1.hello\n而没有读取到2.world\n

  • 管道的读取操作在缓冲区有数据时就会开始读取,而不会等待所有子进程都写入数据。sleep函数在这里起到了让父进程暂停一下的作用,给两个子进程足够的时间将数据都写入管道缓冲区,从而保证父进程能够读取到两个子进程写入的完整数据。

4.管道大小

默认4KB

image-20241218201417711

5.管道优劣

**优点:**简单,相比信号,套接字实现进程间通信,简单很多

缺点:

  • 1.只能单向通信,双向通信需建立两个管道
  • 2.只能用于父子、兄弟进程(有共同祖先)间通信。该问题后来使用fifo有名管道解决

6.有名管道FIFO

FIFO常被称为命名管道,以区分管道(pipe)。管道(pipe)只能用于“有血缘关系”的进程间。但通过FIFO,不相
关的进程也能交换数据。

FIFO是Linux基础文件类型中的一种。但,FIFO文件在磁盘上没有数据块,仅仅用来标识内核中一条通道。备
进程可以打开这个文件进行read/write,实际上是在读写内核通道,这样就实现了进程间通信。

创建方式:

1.命令:mkfifo 管道名

1
2
mkfifo 管道名
mkfifo myfifo

2.库函数:

1
2
3
#include<sys/types.h>
#include<sys/stat.h>
int mkfifo(const char *pathname,mode_t mode);

参数:

pathname:文件名

mode:8进制的权限,比如0644这种的

**返回值:**成功:0;失败 :- 1

一旦使用mkfifo创建了一个FIFO,就可以使用open打开它,常见的文件I/O函数都可用于fifo。如:close、read、
write、unlink等。

例子:

1
2
3
int ret = mkfifo("mytestfifo", 0664);
if (ret == -1)
sys_err("mkfifo error");

实现没有血缘关系的进程通信

读端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <stdio.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <fcntl.h>
#include <stdlib.h>
#include <string.h>

void sys_err(char *str)
{
perror(str);
exit(1);
}

int main(int argc, char *argv[])
{
int fd, len;
char buf[4096];

if (argc < 2) {
printf("./a.out fifoname\n");
return -1;
}
//不提前创建fifo,直接写fifo
//int fd = mkfifo("testfifo", 644);
//open(fd, ...);
fd = open(argv[1], O_RDONLY); // 打开管道文件
if (fd < 0)
sys_err("open");
while (1) {
len = read(fd, buf, sizeof(buf)); // 从管道的读端获取数据
write(STDOUT_FILENO, buf, len);
sleep(3); //多個读端时应增加睡眠秒数,放大效果.
}
close(fd);

return 0;
}

写端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <stdio.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <fcntl.h>
#include <stdlib.h>
#include <string.h>

void sys_err(char *str)
{
perror(str);
exit(-1);
}

int main(int argc, char *argv[])
{
int fd, i;
char buf[4096];

if (argc < 2) {
printf("Enter like this: ./a.out fifoname\n");
return -1;
}
fd = open(argv[1], O_WRONLY); //打开管道文件
if (fd < 0)
sys_err("open");

i = 0;
while (1) {
sprintf(buf, "hello itcast %d\n", i++);

write(fd, buf, strlen(buf)); // 向管道写数据
sleep(1);
}
close(fd);

return 0;
}

更多进程间通信请查看该博客:

7.信号量

1.信号量原语

多个进程同时访问系统上某个资源时,如同时写一个数据库的某条记录,或同时修改某个文件,就需要考虑进程同步问题,以确保任一时刻只有一个进程可以拥有对资源的独占式访问。通常,进程对共享资源的访问的代码只是很短的一段**,但这段代码引发了进程之间的竞态条件,我们称这段代码为关键代码区,或临界区,对进程同步,就是确保任一时刻只有一个进程能进入关键代码段。**

Dekker算法和Peterson算法试图从语言本身(不需要内核支持)解决进程同步问题,但它们依赖于忙等待,即进程要持续不断地等待某个内存位置状态的改变,这种方式的CPU利用率太低,不可取。

Dijkstra提出的信号量(Semaphore)是一种特殊的变量,它只能取自然数值且只支持两种操作:等待(wait)和信号(signal)。但在Linux/UNIX中,等待和信号都已经具有特殊含义,所以对信号量的这两种操作更常用的称呼是P、V操作,这两个字母来自荷兰语单词passeren(传递,就好像进入临界区)和vrijgeven(释放,就好像退出临界区)。

假设有信号量SV,对它的P、V操作含义如下:

  • P(SV),如果SV的值大于0,就将它减1,如果SV的值为0,则挂起进程的执行。
  • V(SV),如果有其他进程因为等待SV而挂起,则唤醒之,如果没有,则将SV加1。

信号量的取值可以是任何自然数,但最常用的、最简单的信号量是二进制信号量,它只能取0或1两个值,我们仅讨论二进制信号量。使用二进制信号量同步两个进程,以确保关键代码段的独占式访问的例子:

img

上图中,当关键代码段可用时,二进制信号量SV的值为1,进程A和B都有机会进入关键代码段,如果此时进程A执行了P(SV)操作将SV减1,则进程B再执行P(SV)操作就会被挂起,直到进程A离开关键代码段,并执行V(SV)操作将SV加1,关键代码段才重新变得可用。

不能使用普通变量来模拟二进制信号量,因为所有高级语言都没有一个原子操作可以同时完成以下两步操作:检测变量是否为true/false,如果是则将它设置为false/true。

Linux信号量的API定义在sys/sem.h头文件中,主要包括3个系统调用:semgetsemopsemctl。它们被设计为操作一组信号量,即信号量集,而不是单个信号量。

2.semget 系统调用

semget系统调用创建一个新的信号量集,或获取一个已经存在的信号量集。

1
2
#include <sys/sem.h>
int semget(key_t key, int num_sems, int sem_flags);
参数

key:键值,用来标志全局唯一的信号量集,要通过信号量通信的进程需要使用相同的键值来创建/获取该信号量。

num_sems:指定要创建/获取的信号量集中信号量的数目,如果是创建信号量,该值必须指定,如果是获取已经存在的信号量,该值可以设置为0。

sem_flags:指定一组标志,低端的9个bite是信号量的权限,格式和含义与openmode参数一致。此外,它可以和IPC_CREAT标志做按位或运算以创建新的信号量集。还可以联合使用IPC_CREATIPC_EXCL标志确保创建新的、唯一的信号量集,如果这时候该信号量集已经存在,semget返回错误并设置errno为EEXIST

返回值

semget成功返回一个正整数,也就是信号量集的标识符,失败返回-1并设置errno。

如果用semget创建一个新的信号量集,与之相关的内核数据结构体semid_ds将被创建并初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
struct semid_ds {
struct ipc_perm sem_perm; /* 信号量操作权限 */
unsigned long int sem_nsems; /* 该信号量集中的信号量数目 */
time_t sem_otime; /* 最后一次调用 semop 的时间 */
time_t sem_ctime; /* 最后一次调用 semctl 的时间 */
/* 省略其他填充字段 */
}:

struct ipc_perm{
key_t key; /* 键值 */
uid_t uid; /* 所有者的用户id */
gid_t gid; /* 所有者的组id */
uid_t cuid; /* 创建者的用户id */
git_t cgid; /* 创建者的组id */
mode_t mode; /* 访问权限 */
/* 省略其他填充字段 */
};

img

3.semop 系统调用

semop系统调用改变信号量的值,即执行P、V操作,在讨论semop函数前,先介绍与每个信号量关联的一些重要的内核变量:

1
2
3
4
unsigned short semval; 			/* 信号量的值 */
unsigned short semzcnt; /* 等待信号量变为0的进程数量 */
unsigned short semncnt; /* 等待信号量值增加的进程数量 */
pid_t sempid; /* 最后一次执行 semop 操作的进程ID */

semop函数对信号量的操作实际就是改变上述内核变量的操作,该函数定义如下:

1
2
#include <sys/sem.h>
int semop(int sem_id, struct sembuf* sem_ops, size_t num_sem_ops);
参数

sem_idsemget调用返回的信号量集标识符,指定被操作的目标信号量集。

sem_ops:指向一个 sembuf 类型结构体的数组:

1
2
3
4
5
6
struct sembuf
{
unsigned short int sem_num;
short int sem_op;
short int sem_flg;
};
  • sem_num:信号量集中信号量的编号,0代表信号量集的第一个信号量,以此类推。

  • sem_op:指定操作类型,可选值:正整数,0、负整数,同时受到 sem_flg的影响。op>0执行V操作,op小于0执行P操作

    • 通常P操作值为 - 1,V操作值为 1
  • sem_flg:可选值为IPC_NOWAIT,SEM_UNDO,0

    • IPC_NOWAIT代表非阻塞操作SEM_UNDO代表撤销操作0代表阻塞操作
    • sem_flg设置为IPC_NOWAIT时,如果信号量操作(如sem_op中的减法操作使得信号量的值小于 0)不能立即执行,操作不会阻塞等待信号量状态改变,而是立即返回一个错误,错误码通常为EAGAIN。这种方式适用于不希望进程在信号量操作上长时间阻塞的场景,例如在一些对实时性要求较高的应用中,当获取不到信号量时可以先去执行其他任务。
    • sem_flg设置为SEM_UNDO时,系统会记录信号量操作,以便在进程异常终止时自动撤销(调整)信号量的值,以避免信号量状态被错误地锁定或者资源无法释放的情况。例如,如果一个进程对信号量进行了P操作(减操作)获取资源后异常终止,没有来得及进行V操作(加操作)释放资源,设置了SEM_UNDO的信号量系统会自动进行适当的调整,保证信号量状态的正确性。
    • sem_flg为 0 时,信号量操作会按照正常的阻塞方式执行。对于P操作(sem_op为负数),如果信号量的值不够减,进程会被阻塞,直到信号量的值满足操作要求(例如其他进程进行了V操作增加了信号量的值)。这种方式在需要确保资源按照顺序被访问和操作,且允许进程等待资源可用的场景下非常有用,比如在经典的生产者 - 消费者模型中,消费者进程等待生产者生产出产品(通过信号量控制),此时使用阻塞式操作可以保证消费者在没有产品时等待,直到生产者生产出产品后再继续执行。

num_sem_ops:指定要执行的操作个数,即sem_ops数组中元素的个数。semop函数对sem_ops数组参数中的每个成员按数组顺序依次执行操作,且该过程是原子操作,以避免别的进程在同一时刻按不同顺序对该信号集中的信号量执行semop函数导致的竞态条件。

返回值

semop成功返回0,失败返回-1并设置errno。

sem_op值的不同操作规则:

  • sem_op 大于 0 时,表示进程要增加信号量的值。操作要求调用进程对被操作信号量集拥有写权限。若设置了 SEM_UNDO 标志,系统将更新进程的 semadj 变量。
  • sem_op 等于 0 时,表示这是一个 “等待 0” 操作。操作要求调用进程对被操作信号量集拥有读权限。如果信号量的值为 0,调用立即成功;如果不是 0,则操作失败或阻塞进程直到信号量变为 0。在这种情况下,当 IPC_NOWAIT 标志被指定时,操作立即返回一个错误,并设置 errnoEAGAIN。若未指定 IPC_NOWAIT 标志,信号量的 semncnt 值加 1,进程将被投入睡眠直到满足特定条件。
  • sem_op 小于 0 时,表示对信号量值进行减操作,即期望获得信号量。操作要求调用进程对被操作信号量集拥有写权限。如果信号量的值 semval 大于或等于 sem_op 的绝对值,操作成功,调用进程立即获得信号量,并且系统将该信号量的 semval 值减去 sem_op 的绝对值。若设置了 SEM_UNDO 标志,则系统将更新进程的 semadj 变量。

4.semctl 系统调用

semctl系统调用允许调用者对信号量进行直接控制:

1
2
#include <sys/sem.h>
int semctl(int sem_id, int sem_num, int command, ...);

参数:

  • sem_id参数是由semget调用返回的信号量集标识符,用于指定被操作的信号量集。
  • sem_num参数指定被操作的信号量在信号量集中的编号。
  • command参数指定要执行的命令,有些命令需要调用者传递第 4 个参数。

第四个参数可以自定义,但是系统给出了推荐的定义格式:

1
2
3
4
5
6
7
union semun
{
int val;// 用于SETVAL命令
struct semid_ds *buf;// 用于IPC_STAT和IPC_SET命令
unsigned short *array;// 用于GETALL和SETALL命令
struct seminfo *__buf;// 用于IPC_INFO命令
};
1
2
3
4
5
6
7
8
struct seminfo
{
int semmap;// Linux内核没有使用
int semmni;// 系统最多可以拥有的信号量集数目
int semmns;// 系统最多可以拥有的信号量数目
int semmnu;// Linux内核没有使用
int semmsl;// 一个信号量集最多允许包含的信号量数目
};

返回值:

  • semctl成功时的返回值取决于command参数,失败时返回 - 1,并设置errno

image-20241220111936954

注意事项

GETNCNTGETPIDGETVALGETZCNTSETVAL操作中,操作的是单个信号量,此时sem_num参数指定单个信号量在信号量集中的编号。而其他操作针对的是整个信号量集,此时sem_num参数被忽略。

5.特殊键值 IPC_PRIVATE

semget的调用者可以给其key参数传递一个特殊键值IPC_PRIVATE(其值为0),这样无论该信号量是否已存在,semget函数都将创建一个新信号量,使用该键值创建的信号量并非像它的名字声称的那样是进程私有的,其他进程,尤其是子进程,也有方法来访问这个信号量,所以semget函数的man手册的BUGS部分上说,使用名字IPC_PRIVATE有些误导(历史原因),应称为IPC_NEW

6.信号量实现进程间通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <string.h>
#include <unistd.h>

// 定义链表节点结构体
typedef struct ListNode {
int data;
struct ListNode *next;
} ListNode;

// 定义信号量操作结构体
union semun {
int val;
struct semid_ds *buf;
unsigned short int *array;
struct seminfo *__buf;
};

// 声明全局的链表头指针
ListNode *head = NULL;

// 信号量操作函数
void semaphore_op(int semid, int sem_num, int op) {
struct sembuf sem_b;
sem_b.sem_num = sem_num;
sem_b.sem_op = op;
sem_b.sem_flg = 0;
if (semop(semid, &sem_b, 1) == -1) {
perror("semop");
exit(1);
}
}

// 生产者函数
void producer(int semid) {
ListNode *head = NULL; // 链表头指针
int item = 0;
while (1) {
// 申请节点内存
ListNode *new_node = (ListNode *)malloc(sizeof(ListNode));
if (new_node == NULL) {
perror("malloc");
exit(1);
}
new_node->data = item++;

//P操作 获取空闲缓冲区信号量(这里表示链表插入的空位,第二参数表示产品还剩下0)
semaphore_op(semid, 0, -1);

// 将新节点插入链表头部(简单实现,可按需改为其他插入方式)
new_node->next = head;
head = new_node;
printf("生产了一个产品\n");
// V操作 增加产品信号量(表示链表中有新数据可供消费,第二参数表示生产出来了一个产品)
semaphore_op(semid, 1, 1);

sleep(1);
}
}

// 消费者函数
void consumer(int semid) {
while (1) {
semaphore_op(semid, 1, -1); // P操作 获取产品信号量(链表中有数据才可消费,第二个参数1代表有数据)

// 取出链表头节点进行消费
ListNode *node_to_consume = NULL;
if (head!= NULL) {
node_to_consume = head;
head = head->next;
printf("Consumer: %d\n", node_to_consume->data);
free(node_to_consume); // 释放消费完的节点内存
}
printf("消费了一个产品\n");
semaphore_op(semid, 0, 1); // V操作 增加空闲缓冲区信号量(链表腾出空位,第二个参数代表已经消费完了)

sleep(2);
}
}

int main() {
// 创建信号量集 IPC_PRIVATE的值为0,表示不管该信号量创建了没有都会创建一个
int semid = semget(IPC_PRIVATE, 2, 0666 | IPC_CREAT);
if (semid == -1) {
perror("semget");
return 1;
}
union semun arg;
arg.val = 1;
// 初始化空闲缓冲区信号量(初始有1个空位可插入链表节点)
if (semctl(semid, 0, SETVAL, arg) == -1) {
perror("semctl");
return 1;
}
arg.val = 0;
// 初始化产品信号量(初始链表无数据可供消费)
if (semctl(semid, 1, SETVAL, arg) == -1) {
perror("semctl");
return 1;
}

pid_t pid = fork();
if (pid < 0) {
perror("fork");
return 1;
} else if (pid == 0) {
// 子进程为消费者
consumer(semid);
} else {
// 父进程为生产者
producer(semid);
}

// 删除信号量集
if (semctl(semid, 0, IPC_RMID) == -1) {
perror("semctl");
return 1;
}

return 0;
}

总体功能:

生产者生产一个,消费者拿一个,然后再生产,然后再消费

1. 数据结构定义

定义了ListNode结构体来表示链表的节点,包含一个int类型的数据成员用于存放生产者生产的数据,以及一个指向下一个节点的指针成员。

2. 信号量操作相关部分

  • union semun结构体:用于给semctl函数传递参数,根据不同的命令可以传递不同类型的值,在这里主要用于初始化信号量的值。
  • semaphore_op函数:封装了semop函数,用于对信号量进行操作。它接受信号量集标识符、信号量编号以及操作值作为参数,构造struct sembuf结构体并调用semop函数来执行信号量操作,操作失败时会输出错误信息并终止程序。

3. 生产者逻辑

  • producer函数中,首先定义了链表头指针head,并在循环中不断生产数据。每次生产时,先通过malloc函数申请一个新的链表节点内存空间,将数据存入节点。
  • 然后通过semaphore_op函数获取空闲缓冲区信号量(这里代表链表中可插入新节点的空位),接着将新节点插入到链表头部(简单实现了链表插入操作,实际可根据需求调整插入逻辑),再通过semaphore_op函数增加产品信号量,表示链表中有新的数据可供消费者消费,最后通过sleep函数模拟生产过程的时间间隔。

4. 消费者逻辑

  • consumer函数中,通过循环不断尝试消费数据。首先通过semaphore_op函数获取产品信号量,只有当链表中有数据(信号量值大于等于 1)时才能继续执行。
  • 接着取出链表头节点,将其数据打印出来模拟消费过程,然后释放该节点占用的内存空间,最后通过semaphore_op函数增加空闲缓冲区信号量,表示链表腾出了一个空位可供生产者插入新节点,同样通过sleep函数模拟消费过程的时间间隔。

5. 主函数部分

  • main函数中,通过semget函数创建包含两个信号量的信号量集,分别用于控制空闲缓冲区(链表插入空位)和产品(链表中可消费的数据)。
  • 使用semctl函数结合union semun结构体来初始化这两个信号量的初始值。
  • 通过fork函数创建子进程,子进程执行consumer函数作为消费者,父进程执行producer函数作为生产者。
  • 最后在程序结束时,通过semctl函数删除信号量集,释放相关系统资源。

这样就通过 System V IPC 信号量实现了一个基于链表作为共享数据结构的生产者 - 消费者模型,确保了生产者和消费者对链表的并发访问是安全有序的。

运行结果:

image-20241220115519889

7.无血缘关系进程通信

  1. ftok函数的定义和功能
    • 函数原型key_t ftok(const char *pathname, int proj_id);
    • 功能ftok函数用于生成一个唯一的key(键值),这个key通常用于 System V IPC(进程间通信)机制中,如创建共享内存、消息队列和信号量集等。它将一个文件路径名(pathname)和一个项目标识符(proj_id)组合起来,生成一个适合作为 System V IPC 资源标识符的key值。
  2. 参数解释
    • const char *pathname
      • 这是一个指向文件路径名的指针。这个文件路径必须是一个已经存在的文件的有效路径,通常使用当前目录(.")或者一个程序相关的配置文件路径等。ftok函数会使用文件的inode(索引节点)信息作为生成key的一部分。
      • 注意,如果文件被删除然后重新创建,即使文件名相同,inode可能会改变,这会导致ftok生成不同的key值。
    • int proj_id
      • 这是一个0 - 255之间的整数,作为项目标识符。它和文件路径的inode信息一起组合生成key。不同的项目可以使用不同的proj_id来区分,这样即使基于同一个文件路径,不同的项目也能生成不同的key值用于各自的 IPC 资源。例如,一个程序中有两个不同的模块需要使用消息队列进行通信,它们可以使用相同的文件路径但不同的proj_id来生成不同的key,以创建两个独立的消息队列。
  3. 返回值
    • 成功时,ftok函数返回一个key_t类型的非负整数,这个整数可以作为shmgetmsggetsemget等 System V IPC 函数的key参数来创建或获取对应的 IPC 资源。
    • 失败时,返回-1,并且会设置errno来指示错误原因。常见的错误原因包括:
      • EACCESS:没有权限访问pathname指定的文件。
      • ENOENTpathname指定的文件不存在。

发送端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <unistd.h>

// 信号量操作函数
void semaphore_op(int semid, int sem_num, int op) {
struct sembuf sem_b;
sem_b.sem_num = sem_num;
sem_b.sem_op = op;
sem_b.sem_flg = 0;
if (semop(semid, &sem_b, 1) == -1) {
perror("semop");
exit(1);
}
}

int main() {
// 通过ftok生成key
key_t key = ftok(".", 'a');
if (key == -1) {
perror("ftok");
return 1;
}

// 创建信号量集(只含一个信号量)
int semid = semget(key, 1, IPC_CREAT | 0666);
if (semid == -1) {
perror("semget");
return 1;
}

// 初始化信号量值为0(表示资源不可用)
union semun {
int val;
struct semid_ds *buf;
unsigned short *array;
struct seminfo *__buf;
} arg;
arg.val = 0;
if (semctl(semid, 0, SETVAL, arg) == -1) {
perror("semctl");
return 1;
}

// 进行一些操作后,释放信号量(表示资源可用了)
semaphore_op(semid, 0, 1);
printf("Sender process released the signal...\n");


sleep(10);
return 0;
}

读入端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <unistd.h>

// 信号量操作函数
void semaphore_op(int semid, int sem_num, int op) {
struct sembuf sem_b;
sem_b.sem_num = sem_num;
sem_b.sem_op = op;
sem_b.sem_flg = 0;
if (semop(semid, &sem_b, 1) == -1) {
perror("semop");
exit(1);
}
}

int main() {
// 通过ftok生成key
key_t key = ftok(".", 'a');
if (key == -1) {
perror("ftok");
return 1;
}

// 获取信号量
int semid = semget(key, 1, 0666);
if (semid == -1) {
perror("semget");
return 1;
}

// 等待信号量,相当于等待另一个进程释放资源
semaphore_op(semid, 0, -1);
printf("Receiver process got the signal and can continue...\n");

// 操作完成后,释放信号量(这里简单示意)
semaphore_op(semid, 0, 1);

// 删除信号量集
if (semctl(semid, 0, IPC_RMID, NULL) == -1) {
perror("semctl for delete");
}
sleep(10);
return 0;
}

运行结果:

image-20241220192820038

8.共享内存

共享内存的基本原理

  • 共享内存是一种进程间通信(IPC)机制。当多个进程需要共享数据时,可以创建一块共享内存区域。通过shmget函数创建共享内存段后,这块内存区域就存在于系统的内存空间中。
  • 然后,各个进程可以通过shmat函数将这块共享内存连接到自己的进程地址空间。连接后,进程就可以像访问自己的本地内存一样访问共享内存中的数据。

共享内存是最高效的IPC机制,因为它不涉及进程之间的任何数据传输,这种高效率带来的问题是,我们必须用其他辅助手段来同步进程对共享内存的访问,否则会产生竞态条件,因此,共享内存通常和其他进程间通信方式一起使用。

Linux共享内存的API都定义在sys/shm.h头文件中,包括4个系统调用shmgetshmatshmdtshmctl

1.shmget 系统调用

shmget 系统调用创建一段新的共享内存,或者获取一段已经存在的共享内存。

1
2
#include <sys/shm.h>
int shmget(key_t key, size_t size, int shmflg);
  1. 参数详解

    • key_t key
      • 这是一个键值,用于标识共享内存段。它可以是由ftok函数生成的值,也可以是IPC_PRIVATE(用于创建私有共享内存段,通常在父子进程间使用)。ftok函数根据一个文件路径和一个项目标识符生成一个唯一的key_t值。
    • size_t size
      • 这个参数指定了要创建或获取的共享内存段的大小,单位是字节。如果是创建新的共享内存段,这个大小必须大于 0。如果shmget函数用于获取已存在的共享内存段,这个参数通常可以被忽略(但仍需提供一个合理的值)。
    • int shmflg
      • 这个参数用于控制共享内存段的创建和访问权限,它由以下几种标志组成:
        • IPC_CREAT:如果共享内存段不存在,则创建它。如果和IPC_EXCL一起使用(IPC_CREAT | IPC_EXCL),则只有在共享内存段不存在时才创建,若已存在则shmget函数返回 - 1 并设置errnoEEXIST
        • IPC_EXCL:与IPC_CREAT一起使用,用于确保创建的是一个新的、唯一的共享内存段。
        • 权限标志:如0666等,用于指定共享内存段的访问权限,格式与文件权限相同(用户、组、其他用户的读、写、执行权限)。
        • SHM_HUGETLB标志:当在shmflg参数中使用SHM_HUGETLB标志时,表示请求使用大页(huge pages)来分配共享内存。大页是一种内存管理技术,它使用比标准内存页更大的页面大小(通常为 2MB 或 1GB,取决于系统配置)。
        • SHM_NORESERVE标志SHM_NORESERVE标志用于在创建共享内存时,不预留交换空间(swap space)。通常情况下,当创建共享内存段时,系统会为其预留相应的交换空间,以确保在内存不足时可以将部分数据交换到磁盘上。使用SHM_NORESERVE可以避免这种交换空间的预留。这样,当物理内存不足时,对该共享内存执行写操作将触发SIGESV信号。
  2. 返回值

    • 成功时,shmget函数返回一个非负整数,即共享内存段的标识符(shmid)。这个标识符可以用于后续的shmat(连接共享内存段到进程地址空间)、shmdt(从进程地址空间分离共享内存段)和shmctl(控制共享内存段的操作,如删除、获取状态等)操作。

    • 失败时,函数返回 - 1,并设置errno

      变量来指示错误原因。常见的错误原因包括:

      • EINVALsize小于SHMMIN(最小共享内存大小)或者key无效。
      • EEXIST:当使用IPC_CREAT | IPC_EXCL标志且共享内存段已经存在时。
      • ENOENT:当key对应的共享内存段不存在且没有使用IPC_CREAT标志时。

如果 shmget 用于创建共享内存,则这段共享内存的所有字节都被初始化为 0,与之关联的内核数据结构 shmid_ds 将被创建并初始化。shmid_ds 结构体的定义如下:

1
2
3
4
5
6
7
8
9
10
11
struct shmid_ds
{
struct ipc_perm shm_perm; // 共享内存的操作权限
size_t shm_segsz; // 共享内存容量大小,单位是字节
time_t shm_atime; // 对这段内存最后一次调用shmat的时间
time_t shm_dtime; // 对这段内存最后一次调用shmdt的时间
time_t shm_ctime; // 对这段内存最后一次调用shmctl的时间
_pid_t shm_cpid; // 创建者的PID
_pid_t shm_lpid; // 最后一次执行shmat或shmdt操作的进程的PID
shmat_t shm_nattach; // 目前关联到此共享内存的进程数量
};

shmget 对 shmid_ds 结构体的初始化包括:

  • 将 shm_perm.cuid 和 shm_perm.uid 设置为调用进程的有效用户 ID。
  • 将 shm_perm.cgid 和 shm_perm.gid 设置为调用进程的有效组 ID。
  • 将 shm_perm.mode 的最低 9 位设置为 shmflg 参数的最低 9 位。
  • 将 shm_segsz 设置为 size。
  • 将 shm_lpid、shm_nattach、shm_atime、shm_dtime 设置为 0。
  • 将 shm_ctime 设置为当前的时间。

2.shmatshmdt 系统调用

共享内存被创建/获取后,我们不能立即访问它,而是需要先将它关联到进程的地址空间中,使用完共享内存后,我们也需要将它从进程地址空间中分离,这两项任务分别由以下两个系统调用实现:

1
2
3
#include <sys/shm.h>
void* shmat(int shm_id, const void* shm_addr, int shmflg);
int shmdt(const void* shm_addr);

1.shamt

1
void* shmat(int shm_id, const void* shm_addr, int shmflg);
  • 功能:

    • 将由shm_id标识的共享内存段连接到调用进程的地址空间。连接成功后,进程可以像访问普通内存一样访问共享内存中的数据。
  • 参数:

    • shm_id
      • 这是共享内存段的标识符,由shmget函数成功创建或获取共享内存段时返回。它唯一标识了一个共享内存段。
    • shm_addr
      • 用于指定连接共享内存段的地址。
      • 如果shm_addrNULL,则由系统选择合适的连接地址。
      • 如果shm_addr不为NULL,并且shmflg中没有设置SHM_RND标志,那么共享内存段将被连接到shm_addr所指定的地址。
      • 如果shm_addr不为NULL,并且shmflg中设置了SHM_RND标志,那么连接的地址将是shm_addr向下舍入到系统页面大小(page - size)的整数倍的地址。
    • shmflg
      • 包含一些控制连接操作的标志。
      • SHM_RND:如上述,用于对连接地址进行舍入操作。
      • SHM_RDONLY:如果设置了这个标志,那么共享内存段将以只读方式连接到进程地址空间。如果不设置此标志,共享内存段将以读写方式连接。
  • 返回值:

    • 成功时,返回指向连接后的共享内存段在进程地址空间中的起始地址的指针。并修改shmid_ds中的字段

      • 将 shm_nattach 加 1。
      • 将 shm_lpid 设置为调用进程的 PID。
      • 将 shm_atime 设置为当前时间。
    • 失败时,返回(void *)-1,并且会设置errno

      来指示错误原因。常见的错误原因包括:

      • EINVALshm_id无效,或者shm_addrshmflg的组合无效。
      • ENOMEM:没有足够的内存来连接共享内存段。

shm_id参数是由shmget函数返回的共享内存标识符。shm_addr参数指定将共享内存关联到进程的哪块地址空间,最终效果还受到shmflg参数的可选标志SHM_RND的影响。

  • 如果 shm_addr 为 NULL,则被关联的地址由操作系统选择。这是推荐的做法,以确保代码的可移植性。

  • 如果 shm_addr 非空,并且 SHM_RND 标志未被设置,则共享内存被关联到 addr 指定的地址处。

  • 如果 shm_addr 非空,并且设置了 SHM_RND 标志,则被关联的地址是 [shm_addr-(shm_addr % SHMLBA)]。SHMLBA 的含义是 “段低端边界地址倍数”(Segment Low Boundary Address Multiple),它必须是内存页面大小 (PAGE_SIZE) 的整数倍。现在的 Linux 内核中,它等于一个内存页大小。SHM_RND 的含义是圆整 (round),即将共享内存被关联的地址向下圆整到离 shm_addr 最近的 SHMLBA 的整数倍地址处。

除了 SHM_RND 标志外,shmflg 参数还支持如下标志:

  • SHM_RDONLY。进程仅能读取共享内存中的内容。若没有指定该标志,则进程可同时对共享内存进行读写操作(当然,这需要在创建共享内存的时候指定其读写权限)。

  • SHM_REMAP。如果 shmaddr 已经被关联到一段共享内存上,则重新关联。

  • SHM_EXEC。它指定对共享内存段的执行权限。对于共享内存的执行权限,执行权限和读权限是一样的。

2. shmdt

1
int shmdt(const void* shm_addr);
  • 功能:

    • 将之前由shmat连接到进程地址空间的共享内存段分离。分离操作并不会删除共享内存段本身,只是将该共享内存段从调用进程的地址空间中移除。

    • shm_addr

      • 这是共享内存段在进程地址空间中的起始地址,即shmat函数成功连接共享内存段时返回的地址。
  • 成功时,返回0。失败时,返回-1,并且会设置errno来指示错误原因。

    成功调用会修改内核数据结构 shmid_ds 的部分字段,如下:

    • 将 shm_nattach 减 1。

    • 将 shm_lpid 设置为调用进程的 PID。

    • 将 shm_dtime 设置为当前时间。

    常见的错误原因包括:

    • EINVALshm_addr不是有效的共享内存段地址。

shmdt函数将关联到shm_addr参数处的共享内存从进程中分离,它成功时返回0,失败则返回-1并设置errno。

调用该函数使得它从进程空间分离的含义

  • 不删除本身:
    • 共享内存段在系统内存中是独立存在的。即使一个进程不再需要使用它,这个共享内存段本身并不会消失。这是因为其他进程可能还在使用这块共享内存进行数据交互。例如,有进程 A、B、C 都连接到了同一块共享内存。如果进程 A 调用shmdt,只是进程 A 不再能访问这块共享内存,但进程 B 和 C 仍然可以正常访问,共享内存段本身依然存在于系统内存中。
  • 从进程地址空间移除:
    • 在进程调用shmdt之前,共享内存段是映射到该进程的地址空间中的。这意味着进程可以直接通过指针访问共享内存中的数据。当调用shmdt后,这个映射关系就被解除了。就好像在进程的 “视野” 中,这块共享内存消失了。虽然共享内存还在系统中,但该进程已经无法再通过之前的指针去访问其中的数据了。这就是所谓的从进程地址空间移除。

3.shmctl 系统调用

shmctl系统调用控制共享内存的某些属性。

1
2
#include <sys/shm.h>
int shmctl(int shm_id, int command, struct shmid_ds* buf);
  1. 函数功能

    • shmctl函数用于对共享内存段进行控制操作,如获取共享内存段的状态信息、设置共享内存段的属性、删除共享内存段等。
  2. 参数详解

    • int shm_id

      • 这是共享内存段的标识符,由shmget函数成功创建或获取共享内存段时返回。它唯一标识了一个共享内存段,shmctl函数将对这个标识符所对应的共享内存段进行操作。
    • int command:(见下表)

      • 这是一个控制命令,用于指定对共享内存段进行何种操作,常见的命令有:
        • IPC_STAT:获取共享内存段的状态信息,并将其存储到buf所指向的struct shmid_ds结构体中。这个结构体包含了共享内存段的各种属性,如操作权限、大小、最后访问时间等。
        • IPC_SET:根据buf所指向的struct shmid_ds结构体中的信息来设置共享内存段的属性。例如,可以修改共享内存段的操作权限等。
        • IPC_RMID:删除由shm_id标识的共享内存段。这是一个非常重要且具有危险性的操作,一旦执行,共享内存段及其所包含的数据将被永久删除,所有关联到该共享内存段的进程将无法再访问它。
      • 除了上述常见命令外,还有一些其他命令,如SHM_LOCK(锁定共享内存段到物理内存,防止被交换到磁盘)和SHM_UNLOCK(解锁共享内存段,允许其被交换到磁盘)等,但这些命令的可用性可能取决于操作系统的支持。
    • struct shmid_ds* buf

      • 这是一个指向struct shmid_ds结构体的指针,其作用取决于command

        参数的值:

        • commandIPC_STAT时,buf用于存储获取到的共享内存段的状态信息。
        • commandIPC_SET时,buf指向的结构体中的信息将被用于设置共享内存段的属性。
        • 如果command不涉及IPC_STATIPC_SET操作(如IPC_RMID),buf通常可以设置为NULL
  3. 返回值(见下表)

    • 成功时:

      • 如果commandIPC_STATIPC_SET,返回0表示操作成功。
      • 如果commandIPC_RMID,返回0表示共享内存段已成功删除。
    • 失败时:

      • 返回-1,并设置errno

        变量来指示错误原因。常见的错误原因包括:

        • EINVALshm_id无效,或者command参数无效,或者buf指向的结构体无效(在IPC_STATIPC_SET操作时)。
        • EPERM:调用进程没有足够的权限来执行请求的操作(例如,没有权限删除共享内存段或修改其属性)。

image-20241220160445378

4.函数接口使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>

int main() {
// 创建共享内存段
int shm_id = shmget(IPC_PRIVATE, 1024, IPC_CREAT | 0666);
if (shm_id == -1) {
perror("shmget");
return 1;
}

// 连接共享内存段
void* shm_addr = shmat(shm_id, NULL, 0);
if (shm_addr == (void *)-1) {
perror("shmat");
return 1;
}

// 使用共享内存段,这里简单地将其初始化为0
for (int i = 0; i < 1024; i++) {
((char *)shm_addr)[i] = 0;
}

// 分离共享内存段
if (shmdt(shm_addr) == -1) {
perror("shmdt");
return 1;
}

// 删除共享内存段
if (shmctl(shm_id, IPC_RMID, NULL) == -1) {
perror("shmctl");
return 1;
}

return 0;
}

5.共享内存的 POSIX 方法

mmap函数和munmap函数利用mmap函数的MAP_ANONYMOUS标志可以实现父、子进程间的匿名内存共享。通过打开同一个文件,mmap也可以实现无关进程之间的内存共享。

Linux提供了另一种在无关进程间共享内存的方式,这种方式无须任何文件的支持,但它需要先用shm_open函数来创建或打开一个POSIX共享内存对象。

1.shm_open

1
2
3
4
5
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>

int shm_open ( const char* name, int oflag, mode_t mode );

shm_open函数的使用方法与open系统调用完全相同。

  • 功能:
    • 用于创建或打开一个共享内存对象。这个函数是基于 POSIX 标准的,提供了一种可移植的方式来操作共享内存。
  • 参数:
    • const char* name
      • 这是共享内存对象的名字。从可移植性的角度考虑,名字应该遵循特定格式:以/开始,后面跟着多个非/字符,并且以\0结尾,长度通常不超过NAME_MAX(一般为 255)。例如"/my_shared_memory"
    • int oflag
      • 用于指定打开共享内存对象的方式,它是以下标志的按位或组合:
        • O_RDONLY:以只读方式打开共享内存对象。如果只指定了这个标志,那么进程只能从共享内存中读取数据。
        • O_RDWR:以可读可写方式打开共享内存对象,这是最常用的方式之一,允许进程对共享内存进行读写操作。
        • O_CREAT:如果共享内存对象不存在,则创建它。当使用这个标志时,mode参数用于指定新创建的共享内存对象的访问权限(类似文件权限)。
        • O_EXCL:通常与O_CREAT一起使用。如果name指定的共享内存对象已经存在,shm_open调用将返回错误(-1);如果不存在,则创建一个新的共享内存对象。这可以用于确保创建的是一个全新的、未被使用过的共享内存对象。
        • O_TRUNC:如果共享内存对象已经存在,将其截断(长度变为 0)。这个操作会丢失共享内存对象中原先存储的数据。
    • mode_t mode
      • oflag中包含O_CREAT标志时,mode参数用于指定共享内存对象的访问权限。它类似于文件权限,例如0666表示用户、组和其他用户都有读写权限。权限的低 9 位用于设置实际的权限。

返回值:

  • 返回值:
    • 成功时,返回一个非负整数,即共享内存对象的文件描述符。这个文件描述符可以用于后续的操作,如mmap(将共享内存映射到进程地址空间)等操作。
    • 失败时,返回-1,并且会设置errno来指示错误原因。常见的错误原因包括:
      • EEXIST:当O_CREATO_EXCL一起使用且共享内存对象已经存在时。
      • EINVALname参数不符合格式要求,或者oflag参数无效。
      • EACCES:没有足够的权限按照oflagmode指定的方式创建或打开共享内存对象。

和打开的文件最后需要关闭一样,由shm_open函数创建的共享内存对象用完后也需要删除,可通过shm_unlink函数实现。

1
2
3
4
5
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>

int shm_unlink ( const char* name );
  • 功能:

    • 用于删除一个共享内存对象。当一个进程调用shm_unlink后,共享内存对象会被标记为删除,但实际的删除操作会在所有进程都关闭了对该共享内存对象的引用(通过shm_open打开得到的文件描述符都被关闭)后才会执行。
  • 参数:

shm_unlink函数将name参数指定的共享内存对象标记为等待删除,当所有使用该共享内存对象的进程都使用munmap函数将它从进程中分离后,系统将销毁这个共享内存对象所占据的资源。

  • 返回值:

    • 成功时,返回0

    • 失败时,返回-1,并且会设置errno来指示错误原因。常见的错误原因包括:

      • EINVALname参数不符合格式要求。
      • ENOENT:指定的共享内存对象不存在。

如果代码中使用了以上POSIX共享内存函数,则编译时需要指定链接选项-lrt

3.使用案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>

#define MEMORY_SIZE 4096

int main() {
// 创建或打开共享内存对象
int fd = shm_open("/my_shared_memory", O_CREAT | O_RDWR, 0666);
if (fd == -1) {
perror("shm_open");
return 1;
}

// 设置共享内存对象大小
if (ftruncate(fd, MEMORY_SIZE) == -1) {
perror("ftruncate");
return 1;
}

// 映射共享内存到进程地址空间
void* shared_memory = mmap(NULL, MEMORY_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (shared_memory == MAP_FAILED) {
perror("mmap");
return 1;
}

// 使用共享内存,这里简单地写入数据
char* message = "Hello, shared memory!";
for (int i = 0; i < sizeof(message); i++) {
((char*)shared_memory)[i] = message[i];
}

// 解除映射
if (munmap(shared_memory, MEMORY_SIZE) == -1) {
perror("munmap");
return 1;
}

// 关闭共享内存对象
if (close(fd) == -1) {
perror("close");
return 1;
}

// 删除共享内存对象
if (shm_unlink("/my_shared_memory") == -1) {
perror("shm_unlink");
return 1;
}

return 0;
}

代码解释

  • 首先,使用shm_open创建或打开一个名为/my_shared_memory的共享内存对象,权限为0666,并以读写方式打开。
  • 然后,使用ftruncate设置共享内存对象的大小为MEMORY_SIZE(这里定义为 4096 字节)。
  • 接着,使用mmap将共享内存映射到进程的地址空间,以便可以像访问普通内存一样访问共享内存。
  • 之后,向共享内存中写入了一个字符串Hello, shared memory!
  • 再然后,使用munmap解除共享内存的映射。
  • 接着,使用close关闭共享内存对象的文件描述符。
  • 最后,使用shm_unlink删除共享内存对象。

4.实现生产者消费者模型

使用共享内存实现生产者 - 消费者模型(基于shm_openshm_unlink函数)

  • 以下是使用shm_openshm_unlink函数实现的生产者 - 消费者模型的 C++ 代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#include <iostream>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <string.h>
#include <semaphore.h>
#include <cstdlib>
#include <ctime>
#include <thread>

#define MEMORY_SIZE 4096
#define BUFFER_SIZE 10

// 共享内存结构体
struct SharedMemory {
int buffer[BUFFER_SIZE];
int in;
int out;
};

// 生产者函数
void producer(int shm_fd, sem_t* empty_sem, sem_t* full_sem) {
void* shm_ptr = mmap(NULL, MEMORY_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
if (shm_ptr == MAP_FAILED) {
std::cerr << "mmap failed in producer" << std::endl;
return;
}
SharedMemory* shared_mem = (SharedMemory*)shm_ptr;

srand(static_cast<unsigned int>(time(nullptr)));
while (true) {
sem_wait(empty_sem);
int item = rand() % 100;
shared_mem->buffer[shared_mem->in] = item;
shared_mem->in = (shared_mem->in + 1) % BUFFER_SIZE;
std::cout << "Producer produced: " << item << std::endl;
sem_post(full_sem);
// 模拟生产时间
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
munmap(shm_ptr, MEMORY_SIZE);
}

// 消费者函数
void consumer(int shm_fd, sem_t* empty_sem, sem_t* full_sem) {
void* shm_ptr = mmap(NULL, MEMORY_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
if (shm_ptr == MAP_FAILED) {
std::cerr << "mmap failed in consumer" << std::endl;
return;
}
SharedMemory* shared_mem = (SharedMemory*)shm_ptr;

while (true) {
sem_wait(full_sem);
int item = shared_mem->buffer[shared_mem->out];
shared_mem->out = (shared_mem->out + 1) % BUFFER_SIZE;
std::cout << "Consumer consumed: " << item << std::endl;
sem_post(empty_sem);
// 模拟消费时间
std::this_thread::sleep_for(std::chrono::milliseconds(800));
}
munmap(shm_ptr, MEMORY_SIZE);
}

int main() {
// 创建共享内存对象
int shm_fd = shm_open("/my_shared_memory", O_CREAT | O_RDWR, 0666);
if (shm_fd == -1) {
std::cerr << "shm_open failed" << std::endl;
return 1;
}
// 设置共享内存大小
if (ftruncate(shm_fd, MEMORY_SIZE) == -1) {
std::cerr << "ftruncate failed" << std::endl;
return 1;
}

// 创建信号量
sem_t* empty_sem = sem_open("/empty_sem", O_CREAT, 0666, BUFFER_SIZE);
sem_t* full_sem = sem_open("/full_sem", O_CREAT, 0666, 0);
if (empty_sem == SEM_FAILED || full_sem == SEM_FAILED) {
std::cerr << "sem_open failed" << std::endl;
return 1;
}

// 映射共享内存到进程空间
void* shm_ptr = mmap(NULL, MEMORY_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
if (shm_ptr == MAP_FAILED) {
std::cerr << "mmap failed" << std::endl;
return 1;
}
SharedMemory* shared_mem = (SharedMemory*)shm_ptr;
shared_mem->in = 0;
shared_mem->out = 0;

// 创建生产者和消费者线程
std::thread producer_thread(producer, shm_fd, empty_sem, full_sem);
std::thread consumer_thread(consumer, shm_fd, empty_sem, full_sem);

producer_thread.join();
consumer_thread.join();

// 关闭并删除信号量
sem_close(empty_sem);
sem_close(full_sem);
sem_unlink("/empty_sem");
sem_unlink("/full_sem");

// 解除映射并删除共享内存
munmap(shm_ptr, MEMORY_SIZE);
close(shm_fd);
shm_unlink("/my_shared_memory");

return 0;
}

整体代码解释:

  • 共享内存结构体定义:
    • 定义了SharedMemory结构体,包含一个整数数组作为缓冲区,以及inout索引,用于生产者和消费者操作。
  • 生产者函数:
    • 通过mmap将共享内存映射到进程地址空间。
    • 使用sem_wait等待empty_sem信号量(表示缓冲区有空闲位置),生产一个随机数放入缓冲区,更新in索引,然后使用sem_post释放full_sem信号量(表示缓冲区有数据可供消费)。
  • 消费者函数:
    • 同样通过mmap映射共享内存。
    • 使用sem_wait等待full_sem信号量,从缓冲区取出数据,更新out索引,然后使用sem_post释放empty_sem信号量。
  • 主函数:
    • 使用shm_open创建共享内存对象,并设置大小。
    • 创建empty_semfull_sem两个信号量,分别用于控制缓冲区的空闲和满状态。
    • 映射共享内存后初始化inout索引。
    • 创建生产者和消费者线程并等待它们结束。
    • 最后关闭并删除信号量,解除共享内存映射,关闭并删除共享内存对象。

5.无血缘关系的进程之间通信

写入端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>

#define SHM_NAME "/my_shared_memory"
#define SHM_SIZE 100

int main() {
int shm_fd;
void *ptr;

// 创建共享内存对象
shm_fd = shm_open(SHM_NAME, O_CREAT | O_RDWR, 0666);
if (shm_fd == -1) {
perror("shm_open");
return 1;
}

// 设置共享内存大小
if (ftruncate(shm_fd, SHM_SIZE) == -1) {
perror("ftruncate");
return 1;
}

// 将共享内存映射到进程地址空间
ptr = mmap(NULL, SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
if (ptr == MAP_FAILED) {
perror("mmap");
return 1;
}

// 向共享内存写入数据
strcpy((char *)ptr, "Hello from writer process!");

// 解除映射
if (munmap(ptr, SHM_SIZE) == -1) {
perror("munmap");
return 1;
}

// 关闭共享内存对象描述符
close(shm_fd);

return 0;
}

读出端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>

#define SHM_NAME "/my_shared_memory"
#define SHM_SIZE 100

int main() {
int shm_fd;
void *ptr;

// 打开共享内存对象
shm_fd = shm_open(SHM_NAME, O_RDWR, 0666);
if (shm_fd == -1) {
perror("shm_open");
return 1;
}

// 将共享内存映射到进程地址空间
ptr = mmap(NULL, SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
if (ptr == MAP_FAILED) {
perror("mmap");
return 1;
}

// 从共享内存读取数据并打印
printf("Read from shared memory: %s\n", (char *)ptr);

// 解除映射
if (munmap(ptr, SHM_SIZE) == -1) {
perror("munmap");
return 1;
}

// 关闭共享内存对象描述符
close(shm_fd);

// 删除共享内存对象
if (shm_unlink(SHM_NAME) == -1) {
perror("shm_unlink");
return 1;
}

return 0;
}

运行结果:

image-20241220190749596

6.共享内存实例–实现生产者消费者模型

使用 System V IPC 的共享内存实现生产者 - 消费者模型

  • 以下是使用 System V IPC 的shmgetshmatshmdtshmctl函数实现生产者 - 消费者模型的 C 代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <time.h>
#include <unistd.h>

#define BUFFER_SIZE 10

// 共享内存结构体
struct SharedMemory {
int buffer[BUFFER_SIZE];
int in;
int out;
};

// 信号量操作函数
void semaphore_op(int semid, int sem_num, int op) {
struct sembuf sem_b;
sem_b.sem_num = sem_num;
sem_b.sem_op = op;
sem_b.sem_flg = 0;
if (semop(semid, &sem_b, 1) == -1) {
perror("semop");
exit(1);
}
}

// 生产者函数
void producer(int shm_id, int semid) {
struct SharedMemory *shared_mem = (struct SharedMemory *)shmat(shm_id, NULL, 0);
if (shared_mem == (void *)-1) {
perror("shmat producer");
exit(1);
}

srand(time(NULL));
while (1) {
semaphore_op(semid, 0, -1); // 等待有空位(P操作)
int item = rand() % 100;
shared_mem->buffer[shared_mem->in] = item;
shared_mem->in = (shared_mem->in + 1) % BUFFER_SIZE;
printf("Producer produced: %d\n", item);
semaphore_op(semid, 1, 1); // 增加产品数量(V操作)
sleep(1);
}
shmdt(shared_mem);
}

// 消费者函数
void consumer(int shm_id, int semid) {
struct SharedMemory *shared_mem = (struct SharedMemory *)shmat(shm_id, NULL, 0);
if (shared_mem == (void *)-1) {
perror("shmat consumer");
exit(1);
}

while (1) {
semaphore_op(semid, 1, -1); // 等待有产品(P操作)
int item = shared_mem->buffer[shared_mem->out];
shared_mem->out = (shared_mem->out + 1) % BUFFER_SIZE;
printf("Consumer consumed: %d\n", item);
semaphore_op(semid, 0, 1); // 增加空位数量(V操作)
sleep(2);
}
shmdt(shared_mem);
}

int main() {
// 创建共享内存
int shm_id = shmget(IPC_PRIVATE, sizeof(struct SharedMemory), IPC_CREAT | 0666);
if (shm_id == -1) {
perror("shmget");
return 1;
}

// 创建信号量集
int semid = semget(IPC_PRIVATE, 2, IPC_CREAT | 0666);
if (semid == -1) {
perror("semget");
return 1;
}
union semun {
int val;
struct semid_ds *buf;
unsigned short *array;
struct seminfo *__buf;
} arg;
arg.val = BUFFER_SIZE;
// 初始化空闲缓冲区信号量(初始有BUFFER_SIZE个空位可插入数据)
if (semctl(semid, 0, SETVAL, arg) == -1) {
perror("semctl");
return 1;
}
arg.val = 0;
// 初始化产品信号量(初始无产品可供消费)
if (semctl(semid, 1, SETVAL, arg) == -1) {
perror("semctl");
return 1;
}

// 创建生产者和消费者进程(这里简单使用父子进程模拟)
pid_t pid = fork();
if (pid < 0) {
perror("fork");
return 1;
} else if (pid == 0) {
// 子进程为消费者
consumer(shm_id, semid);
} else {
// 父进程为生产者
producer(shm_id, semid);
}

// 等待子进程结束(这里只是简单等待,实际可能需要更完善的机制)
wait(NULL);

// 删除共享内存和信号量集
if (shmctl(shm_id, IPC_RMID, NULL) == -1) {
perror("shmctl");
return 1;
}
if (semctl(semid, 0, IPC_RMID) == -1) {
perror("semctl");
return 1;
}

return 0;
}

整体代码解释:

  • 共享内存结构体定义:
    • 定义了SharedMemory结构体,包含一个整数数组作为缓冲区,以及inout索引,用于生产者和消费者操作。
  • 信号量操作函数:
    • semaphore_op函数用于对信号量进行操作,实现P操作(sem_op为负,获取资源)和V操作(sem_op为正,释放资源)。
  • 生产者函数:
    • 通过shmat将共享内存连接到进程地址空间。
    • 使用semaphore_op等待空闲缓冲区信号量(semid的第 0 个信号量),生产一个随机数放入缓冲区,更新in索引,然后使用semaphore_op释放产品信号量(semid的第 1 个信号量)。
  • 消费者函数:
    • 同样通过shmat连接共享内存。
    • 使用semaphore_op等待产品信号量,从缓冲区取出数据,更新out索引,然后使用semaphore_op释放空闲缓冲区信号量。
  • 主函数:
    • 使用shmget创建共享内存,并使用semget创建信号量集。
    • 初始化信号量,一个表示空闲缓冲区数量,一个表示产品数量。
    • 使用fork创建子进程,子进程作为消费者,父进程作为生产者。
    • 最后等待子进程结束,并使用shmctlsemctl删除共享内存和信号量集。

7.无血缘关系的进程之间通信

  1. ftok函数的定义和功能
    • 函数原型key_t ftok(const char *pathname, int proj_id);
    • 功能ftok函数用于生成一个唯一的key(键值),这个key通常用于 System V IPC(进程间通信)机制中,如创建共享内存、消息队列和信号量集等。它将一个文件路径名(pathname)和一个项目标识符(proj_id)组合起来,生成一个适合作为 System V IPC 资源标识符的key值。
  2. 参数解释
    • const char *pathname
      • 这是一个指向文件路径名的指针。这个文件路径必须是一个已经存在的文件的有效路径,通常使用当前目录(.")或者一个程序相关的配置文件路径等。ftok函数会使用文件的inode(索引节点)信息作为生成key的一部分。
      • 注意,如果文件被删除然后重新创建,即使文件名相同,inode可能会改变,这会导致ftok生成不同的key值。
    • int proj_id
      • 这是一个0 - 255之间的整数,作为项目标识符。它和文件路径的inode信息一起组合生成key。不同的项目可以使用不同的proj_id来区分,这样即使基于同一个文件路径,不同的项目也能生成不同的key值用于各自的 IPC 资源。例如,一个程序中有两个不同的模块需要使用消息队列进行通信,它们可以使用相同的文件路径但不同的proj_id来生成不同的key,以创建两个独立的消息队列。
  3. 返回值
    • 成功时,ftok函数返回一个key_t类型的非负整数,这个整数可以作为shmgetmsggetsemget等 System V IPC 函数的key参数来创建或获取对应的 IPC 资源。
    • 失败时,返回-1,并且会设置errno来指示错误原因。常见的错误原因包括:
      • EACCESS:没有权限访问pathname指定的文件。
      • ENOENTpathname指定的文件不存在。

发送端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <string.h>

#define MEMORY_SIZE 100

int main() {
// 通过ftok生成key
key_t key = ftok(".", 'b');
if (key == -1) {
perror("ftok");
return 1;
}

// 创建共享内存段
int shm_id = shmget(key, MEMORY_SIZE, IPC_CREAT | 0666);
if (shm_id == -1) {
perror("shmget");
return 1;
}

// 映射共享内存到进程地址空间
void *shm_ptr = shmat(shm_id, NULL, 0);
if (shm_ptr == (void *)-1) {
perror("shmat");
return 1;
}

// 向共享内存写入数据(这里简单写入字符串)
strcpy((char *)shm_ptr, "Hello from sender!");

// 分离共享内存
if (shmdt(shm_ptr) == -1) {
perror("shmdt");
return 1;
}
sleep(10);
return 0;
}

接收端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <string.h>

#define MEMORY_SIZE 100

int main() {
sleep(3);
// 通过ftok生成key
key_t key = ftok(".", 'b');
if (key == -1) {
perror("ftok");
return 1;
}

// 获取共享内存段
int shm_id = shmget(key, MEMORY_SIZE, 0666);
if (shm_id == -1) {
perror("shmget");
return 1;
}

// 映射共享内存到进程地址空间
void *shm_ptr = shmat(shm_id, NULL, 0);
if (shm_ptr == (void *)-1) {
perror("shmat");
return 1;
}

// 从共享内存读取数据并打印
printf("Receiver read from shared memory: %s\n", (char *)shm_ptr);

// 分离共享内存
if (shmdt(shm_ptr) == -1) {
perror("shmdt");
return 1;
}

// 删除共享内存段
if (shmctl(shm_id, IPC_RMID, NULL) == -1) {
perror("shmctl for delete");
return 1;
}

return 0;
}

运行结果:

image-20241220190008467

9.消息队列

1.消息队列进程间通信原理

System V IPC 消息队列是一种进程间通信(IPC)机制,它允许不同进程通过发送和接收消息来进行通信。消息队列就像是一个邮箱系统,进程可以将消息(信件)发送到队列(邮箱)中,其他进程可以从这个队列中接收消息。

消息队列是在两个进程间传递二进制数据块的方式,每个数据块都有一个特定类型,接收方可以根据类型来有选择地接收数据,而不一定像管道和命名管道那样必须以先进先出的方式接收数据。

原理:多个进程通过共享消息队列的标识符(msqid)来访问同一个消息队列。发送进程将消息放入消息队列后,消息队列会按照一定的规则(如先进先出)存储这些消息。接收进程可以根据消息类型等条件从消息队列中取出消息。这样,不同进程之间就可以通过消息队列进行数据传输和通信,实现进程间的同步和信息共享。例如,在生产者 - 消费者模型中,生产者进程将生产的数据作为消息发送到消息队列,消费者进程从消息队列中接收消息并进行消费,通过消息类型等机制可以确保消息的正确发送和接收,从而实现生产者和消费者之间的协调工作。

Linux消息队列的API都定义在sys/msg.h头文件中,包括4个系统调用:msggetmsgsndmsgrcvmsgctl

2.msgget 系统调用

msgget系统调用创建一个消息队列,或获取一个已有的消息队列:

1
2
#include <sys/msg.h>
int msgget(key_t key, int msgflg);
  • 参数:
    • key_t key
      • 这是一个键值,用于标识一个全局唯一的消息队列。可以通过ftok函数生成一个唯一的key值,或者使用IPC_PRIVATE来创建一个私有消息队列(通常用于具有亲缘关系的进程,如父子进程)。
    • int msgflg
      • 用于控制消息队列的创建和访问权限,它由以下几种标志组成:
        • IPC_CREAT:如果消息队列不存在,则创建它。如果和IPC_EXCL一起使用(IPC_CREAT | IPC_EXCL),则只有在消息队列不存在时才创建,若已存在则msgget函数返回 - 1 并设置errnoEEXIST
        • 权限标志:如0666等,用于指定消息队列的访问权限,格式与文件权限相同(用户、组、其他用户的读、写、执行权限)。
  • 返回值:
    • 成功时,返回一个非负整数,即消息队列的标识符(msqid)。
    • 失败时,函数返回 - 1,并设置errno变量来指示错误原因,例如EEXIST(当IPC_CREAT | IPC_EXCL且队列已存在时)、ENOENT(当没有IPC_CREAT且队列不存在时)等。

如果它用于创建消息队列的话,与之相关的内核数据结构msqid_ds将被创建并初始化。

1
2
3
4
5
6
7
8
9
10
11
12
struct msqid_ds
{
struct ipc_perm msg_perm; // 消息队列的操作权限
time_t msg_stime; // 最后一次调用msgsnd的时间
time_t msg_rtime; // 最后一次调用msgrcv的时间
time_t msg_ctime; // 最后一次被修改的时间
unsigned long msg_cbytes; // 消息队列中已有的字节数
msgqnum_t msg_qnum; // 消息队列中已有的消息数
msglen_t msg_qbytes; // 消息队列允许的最大字节数
pid_t msg_lspid; // 最后执行msgsnd的进程的PID
pid_t msg_lrpid; // 最后执行msgrcv的进程的PID
};

3.msgsnd 系统调用

msgsnd系统调用将一条消息添加到消息队列中:

1
2
3
4
5
6
7
#include <sys/msg.h>
int msgsnd(int msqid, const void* msg_ptr, size_t msg_sz, int msgflg);

struct msgbuf{
long mtype;//消息类型
char mtext[512];//消息数据
};

参数:

int msqid

  • 消息队列的标识符,由msgget函数返回。

  • const void* msg_ptr

    • 指向要发送消息的指针。消息的结构必须以一个长整型成员变量开始,这个长整型变量用于存放消息类型,后面可以跟随消息的实际数据。msg_ptr参数指向一个准备发送的消息,消息被定义为如下类型:

      1
      2
      3
      4
      struct msgbuf{
      long mtype; /* 消息类型 */
      char mtext[512]; /* 消息数据 */
      };
  • size_t msg_sz

    • 这是消息数据部分的大小,不包括消息类型的长整型变量所占的字节数。
  • int msgflg

    • 控制消息发送的行为,和semget的flag一样的,常用的标志有:
      • 0:表示阻塞发送,如果消息队列已满,则发送进程会阻塞,直到有空间可以发送消息。
      • IPC_NOWAIT:表示非阻塞发送,如果消息队列已满,则msgsnd函数立即返回 - 1,并设置errnoEAGAIN

返回值:

  • 成功时,返回0
  • 失败时,返回 - 1,并设置errno来指示错误原因,如EAGAIN(非阻塞发送时队列已满)、EINVAL(参数无效)、EIDRM(消息队列已被删除)等。

处于阻塞状态的msgsnd调用可能被如下两种异常情况所中断:

  • 消息队列被移除。此时msgsnd调用将立即返回并设置errno为EIDRM。

  • 程序接收到信号。此时msgsnd调用将立即返回并设置errno为EINTR。

msgsnd成功时将修改内核数据结构msqid_ds的部分字段,如下所示:

  • 将msg_qnum加1。

  • 将msg_lspid设置为调用进程的PID。

  • 将msg_stime设置为当前的时间。

4.msgrcv 系统调用

msgrcv系统调用从消息队列中获取消息:

1
2
#include <sys/msg.h>
int msgrcv(int msqid, void* msg_ptr, size_t msg_sz, long int msgtype, int msgflg);

参数:

  • int msqid
    • 消息队列的标识符。
  • void* msg_ptr
    • 一个指向接收消息缓冲区的指针。与msgsnd类似,缓冲区的结构应以一个长整型开始用于存放接收到的消息类型,后面是存放消息数据的空间。
  • size_t msg_sz
    • 这是接收消息缓冲区中数据部分的大小。
  • long int msgtype
    • 指定要接收的消息类型,可以有以下几种取值:
      • 0:接收(读取)消息队列中的第一条消息,不考虑消息类型。
      • > 0:接收第一个消息类型等于msgtype的消息。(除非指定了标志MSG_EXCEPT,见后文)
      • < 0:接收(读取)消息队列中第一个类型值比msgtype的绝对值小的消息。
  • int msgflg
    • 控制消息接收的行为,常用标志有:
      • 0:表示阻塞接收,如果消息队列中没有符合条件的消息,则接收进程会阻塞,直到有符合条件的消息到达。
      • IPC_NOWAIT:表示非阻塞接收,如果消息队列中没有符合条件的消息,则msgrcv函数立即返回 - 1,并设置errnoENOMSG
      • MSG_EXCEPT。如果msgtype大于0,则接收消息队列中第一个非msgtype类型的消息。
      • MSG_NOERROR。如果消息数据部分的长度超过了msg_sz,就将它截断。

返回值:

  • 成功时,返回接收到的消息数据部分的字节数。
  • 失败时,返回 - 1,并设置errno来指示错误原因,如ENOMSG(非阻塞接收时没有符合条件的消息)、EINVAL(参数无效)、EIDRM(消息队列已被删除)等。

处于阻塞状态的msgrcv调用还可能被如下两种异常情况所中断:

  • 消息队列被移除。此时msgrcv调用将立即返回并设置errno为EIDRM。

  • 程序接收到信号。此时msgrcv调用将立即返回并设置errno为EINTR。

msgrcv成功时将修改内核数据结构msqid_ds的部分字段,如下所示:

  • 将msg_qnum减1。

  • 将msg_lrpid设置为调用进程的PID。

  • 将msg_rtime设置为当前的时间。

5.msgctl 系统调用

msgctl系统调用,用于对消息队列进行控制操作(控制消息队列某些属性),如获取消息队列的状态信息、设置消息队列的属性、删除消息队列等。

1
2
#incldue <sys/msg.h>
int msgctl(int msqid, int command, struct msqid_ds* buf);

参数:

  • int msqid
    • 消息队列的标识符。
  • int command
    • 这是一个控制命令,用于指定对消息队列进行何种操作(见下表),常见的命令有:
      • IPC_STAT:获取消息队列的状态信息,并将其存储到buf所指向的struct msqid_ds结构体中。这个结构体包含了消息队列的各种属性,如操作权限、当前消息数量等。
      • IPC_SET:根据buf所指向的struct msqid_ds结构体中的信息来设置消息队列的属性。例如,可以修改消息队列的操作权限等。
      • IPC_RMID:删除由msqid标识的消息队列。这是一个非常重要且具有危险性的操作,一旦执行,消息队列及其所包含的消息将被永久删除。
  • struct msqid_ds* buf
    • 这是一个指向struct msqid_ds结构体的指针,其作用取决于command参数的值:
      • commandIPC_STAT时,buf用于存储获取到的消息队列的状态信息。
      • commandIPC_SET时,buf指向的结构体中的信息将被用于设置消息队列的属性。
      • 如果command不涉及IPC_STATIPC_SET操作(如IPC_RMID),buf通常可以设置为NULL

返回值:

  • 成功时,取决于command(见下表)。
  • 失败时,返回 - 1,并设置errno来指示错误原因,如EINVALmsqid无效,或者command参数无效,或者buf指向的结构体无效(在IPC_STATIPC_SET操作时))、EPERM(调用进程没有足够的权限来执行请求的操作)等。

image-20241220165757768

image-20241220165805087

6.函数使用案例

以下是使用 System V IPC 消息队列相关函数(msggetmsgsndmsgrcvmsgctl)的一个简单 C 语言案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>

// 消息结构体
struct msgbuf {
long mtype;
char mtext[100];
};

int main() {
// 创建消息队列
int msqid = msgget(IPC_PRIVATE, 0666 | IPC_CREAT);
if (msqid == -1) {
perror("msgget");
return 1;
}

// 发送消息
struct msgbuf send_msg;
send_msg.mtype = 1;
strcpy(send_msg.mtext, "Hello, World!");
if (msgsnd(msqid, &send_msg, sizeof(send_msg.mtext), 0) == -1) {
perror("msgsnd");
return 1;
}

// 接收消息
struct msgbuf recv_msg;
if (msgrcv(msqid, &recv_msg, sizeof(recv_msg.mtext), 1, 0) == -1) {
perror("msgrcv");
return 1;
}
printf("Received message: %s\n", recv_msg.mtext);

// 获取消息队列状态 buf存储消息队列的信息
struct msqid_ds buf;
if (msgctl(msqid, IPC_STAT, &buf) == -1) {
perror("msgctl - IPC_STAT");
return 1;
}
printf("Messages in queue: %ld\n", buf.msg_qnum);

// 删除消息队列
if (msgctl(msqid, IPC_RMID, NULL) == -1) {
perror("msgctl - IPC_RMID");
return 1;
}

return 0;
}

代码解释:

消息结构体定义

  • 定义了struct msgbuf结构体,它包含一个长整型mtype(用于表示消息类型)和一个字符数组mtext(用于存储消息内容)。

创建消息队列(msgget

  • 使用IPC_PRIVATE作为key来创建一个新的私有消息队列,权限设置为0666(用户、组和其他用户都有读写权限)。如果msgget调用成功,返回消息队列标识符msqid;否则,打印错误信息并返回。

发送消息(msgsnd

  • 初始化send_msg结构体,设置mtype1,并将消息内容设置为"Hello, World!"
  • 使用msgsnd函数将消息发送到消息队列中。msgsnd函数的参数包括消息队列标识符msqid、消息结构体指针&send_msg、消息数据部分大小sizeof(send_msg.mtext)和标志0(表示阻塞发送,如果队列满则等待)。如果发送失败,打印错误信息并返回。

接收消息(msgrcv

  • 初始化recv_msg结构体。
  • 使用msgrcv函数从消息队列中接收消息。msgrcv函数的参数包括消息队列标识符msqid、接收消息结构体指针&recv_msg、接收消息数据部分大小sizeof(recv_msg.mtext)、要接收的消息类型1和标志0(表示阻塞接收,如果没有符合条件的消息则等待)。如果接收失败,打印错误信息并返回。
  • 接收到消息后,打印出消息内容。

获取消息队列状态(msgctl

  • 定义struct msqid_ds类型的变量buf
  • 使用msgctl函数的IPC_STAT命令获取消息队列的状态信息,并将其存储在buf中。如果获取状态失败,打印错误信息并返回。
  • 打印出消息队列中的消息数量(buf.msg_qnum)。

删除消息队列(msgctl

  • 使用msgctl函数的IPC_RMID命令删除消息队列。如果删除失败,打印错误信息并返回。

image-20241220171912147

7.实现生产者消费者模型

**消息队列并不能实现互斥。**以下是使用上述消息队列函数实现的生产者 - 消费者模型的 C 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>
#include <time.h>
#include <unistd.h>

#define MAX_MSG_SIZE 100

// 消息结构体
struct msgbuf {
long mtype;
char mtext[MAX_MSG_SIZE];
};

// 生产者函数
void producer(int msqid) {
struct msgbuf msg;
msg.mtype = 1; // 消息类型设为1

srand(time(NULL));
while (1) {
int num = rand() % 100;
sprintf(msg.mtext, "%d", num);
if (msgsnd(msqid, &msg, sizeof(msg.mtext), 0) == -1) {
perror("msgsnd");
exit(1);
}
printf("Producer sent: %s\n", msg.mtext);
sleep(1);
}
}

// 消费者函数
void consumer(int msqid) {
struct msgbuf msg;
while (1) {
if (msgrcv(msqid, &msg, sizeof(msg.mtext), 1, 0) == -1) {
perror("msgrcv");
exit(1);
}
printf("Consumer received: %s\n", msg.mtext);
sleep(2);
}
}

int main() {
// 创建消息队列
int msqid = msgget(IPC_PRIVATE, IPC_CREAT | 0666);
if (msqid == -1) {
perror("msgget");
return 1;
}

pid_t pid = fork();
if (pid < 0) {
perror("fork");
return 1;
} else if (pid == 0) {
// 子进程为消费者
consumer(msqid);
} else {
// 父进程为生产者
producer(msqid);
}

// 等待子进程结束(这里只是简单等待,实际可能需要更完善的机制)
wait(NULL);

// 删除消息队列
if (msgctl(msqid, IPC_RMID, NULL) == -1) {
perror("msgctl");
return 1;
}

return 0;
}

代码解释

消息结构体定义:

  • 定义了struct msgbuf结构体,包含一个长整型的mtype(消息类型)和一个字符数组mtext(用于存放消息数据)。

生产者函数:

  • 生成一个随机数,将其转换为字符串后放入消息结构体的mtext中,然后使用msgsnd将消息发送到消息队列中,发送的消息类型为1,发送操作是阻塞的(msgflg0)。

消费者函数:

  • 使用msgrcv从消息队列中接收消息类型为1的消息,接收操作是阻塞的(msgflg0),接收到消息后打印出消息内容。

主函数:

  • 使用msgget创建一个私有消息队列。
  • 通过fork创建子进程,子进程作为消费者,父进程作为生产者。
  • 最后等待子进程结束,并使用msgctl删除消息队列。

并不能够实现互斥,只是能够通信。

运行结果:

image-20241220173408405

8.无血缘关系进程通信

  1. ftok函数的定义和功能
    • 函数原型key_t ftok(const char *pathname, int proj_id);
    • 功能ftok函数用于生成一个唯一的key(键值),这个key通常用于 System V IPC(进程间通信)机制中,如创建共享内存、消息队列和信号量集等。它将一个文件路径名(pathname)和一个项目标识符(proj_id)组合起来,生成一个适合作为 System V IPC 资源标识符的key值。
  2. 参数解释
    • const char *pathname
      • 这是一个指向文件路径名的指针。这个文件路径必须是一个已经存在的文件的有效路径,通常使用当前目录(.")或者一个程序相关的配置文件路径等。ftok函数会使用文件的inode(索引节点)信息作为生成key的一部分。
      • 注意,如果文件被删除然后重新创建,即使文件名相同,inode可能会改变,这会导致ftok生成不同的key值。
    • int proj_id
      • 这是一个0 - 255之间的整数,作为项目标识符。它和文件路径的inode信息一起组合生成key。不同的项目可以使用不同的proj_id来区分,这样即使基于同一个文件路径,不同的项目也能生成不同的key值用于各自的 IPC 资源。例如,一个程序中有两个不同的模块需要使用消息队列进行通信,它们可以使用相同的文件路径但不同的proj_id来生成不同的key,以创建两个独立的消息队列。
  3. 返回值
    • 成功时,ftok函数返回一个key_t类型的非负整数,这个整数可以作为shmgetmsggetsemget等 System V IPC 函数的key参数来创建或获取对应的 IPC 资源。
    • 失败时,返回-1,并且会设置errno来指示错误原因。常见的错误原因包括:
      • EACCESS:没有权限访问pathname指定的文件。
      • ENOENTpathname指定的文件不存在。

发送端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>

// 消息结构体
struct msgbuf {
long mtype;
char mtext[100];
};

int main() {
// 通过ftok生成key
key_t key = ftok(".", 'c');
if (key == -1) {
perror("ftok");
return 1;
}

// 创建消息队列
int msqid = msgget(key, IPC_CREAT | 0666);
if (msqid == -1) {
perror("msgget");
return 1;
}

// 准备发送消息的结构体
struct msgbuf send_msg;
send_msg.mtype = 1;
strcpy(send_msg.mtext, "Hello from sender!");

if (msgsnd(msqid, &send_msg, sizeof(send_msg.mtext), 0) == -1) {
perror("msgsnd");
return 1;
}

printf("Sender sent message...\n");

sleep(10);
// 删除消息队列
if (msgctl(msqid, IPC_RMID, NULL) == -1) {
perror("msgctl for delete");
return 1;
}

return 0;
}

接收端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include <stdio.h>
#include <stdlib.combined
#include <sys/types.combined
#include <sys/ipc.combined
#include <sys/msg.combined
#include <string.combined

// 消息结构体
struct msgbuf {
long mtype;
char mtext[100];
};

int main() {
// 通过ftok生成key
key_t key = ftok(".", 'c');
if (key == -1) {
perror("ftok");
return 1;
}

// 获取消息队列
int msqid = msgget(key, 0666);
if (msqid == -1) {
perror("msgget")
return 1;
}

// 准备接收消息的结构体
struct msgbuf recv_msg;
if (msgrcv(msqid, &recv_msg, sizeof(recv_msg.mtext), 1, 0) == -1) {
perror("msgrcv")
return 1;
}

printf("Receiver received message: %s\n", recv_msg.mtext);

return 0;
}

运行结果:

image-20241220192302773

10.IPC 命令

以上3种System V IPC进程间通信方式都使用一个全局唯一的键值来描述一个共享资源,当程序调用semgetshmgetmsgget时,就创建了这些共享资源的一个实例。Linux提供ipcs命令来观察当前系统上拥有哪些共享资源实例:

image-20241220171101662

输出结果分段显示了系统拥有的消息队列、共享内存、信号量资源,可见,该系统目前尚未使用任何消息队列和信号量,但分配了一组键值为0的共享内存。这些信号所有者正是apache,它们是由httpd服务器程序创建的。

我们还可用ipcrm命令删除遗留在系统中的共享资源。

11.在进程间传递文件描述符

fork调用后,父进程中打开的文件描述符在子进程中仍然保持打开,所以文件描述符可以很方便地从父进程传递到子进程。注意,传递一个文件描述符并不是传递一个文件描述符的值,而是在接收进程中创建一个新的文件描述符,且新文件描述符和发送进程中被传递的文件描述符指向内核中相同的文件表项。

要想在两个不相干的进程之间传递文件描述符,在Linux下,可利用UNIX域socket在进程间传递特殊的辅助数据,以实现文件描述符的传递,下例代码中,子进程中打开一个文件描述符,然后将它传递给父进程,父进程则通过读取该文件描述符来获得文件内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#include <sys/socket.h>
#include <fcntl.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <assert.h>
#include <string.h>

static const int CONTROL_LEN = CMSG_LEN(sizeof(int));
// 发送文件描述符,fd参数是用来传递信息的UNIX域socket,fd_to_send参数是待发送的文件描述符
void send_fd(int fd, int fd_to_send) {
struct iovec iov[1];
struct msghdr msg;
char buf[0];

iov[0].iov_base = buf;
iov[0].iov_len = 1;
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = 1;

cmsghdr cm;
cm.cmsg_len = CONTROL_LEN;
cm.cmsg_level = SOL_SOCKET;
cm.cmsg_type = SCM_RIGHTS;
*(int *)CMSG_DATA(&cm) = fd_to_send;
msg.msg_control = &cm; /* 设置辅助数据 */
msg.msg_controllen = CONTROL_LEN;

sendmsg(fd, &msg, 0); /* 通用数据读 */
}

// 接收目标文件描述符
int recv_fd(int fd) {
struct iovec iov[1];
struct msghdr msg;
char buf[0];

iov[0].iov_base = buf;
iov[0].iov_len = 1;
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = 1;

cmsghdr cm;
msg.msg_control = &cm;
msg.msg_controllen = CONTROL_LEN;

recvmsg(fd, &msg, 0); /* 通用数据写 */

int fd_to_read = *(int *)CMSG_DATA(&cm);
return fd_to_read;
}

int main() {
int pipefd[2];
int fd_to_pass = 0;
/* 创建父、子进程间的管道,文件描述符pipefd[0]和pipefd[1]都是UNIX域socket */
int ret = socketpair(PF_UNIX, SOCK_DGRAM, 0, pipefd);
assert(ret != -1);

pid_t pid = fork(); /* 创建子进程 */
assert(pid >= 0);

if (pid == 0) {
close(pipefd[0]); /* 子进程关闭读 */
fd_to_pass = open("test.txt", O_RDWR, 0666); /* 打开文件 */
send_fd(pipefd[1], (fd_to_pass > 0) ? fd_to_pass : 0); /* 子进程通过管道将文件描述符发送到父进程,如果文件打开失败,则子进程将标准输入发送到父进程 */
close(fd_to_pass);
exit(0);
}

close(pipefd[1]); /* 父进程关闭写 */

fd_to_pass = recv_fd(pipefd[0]); /* 父进程从管道接收目标文件描述符 */
char buf[1024]; /* 存放数据 */
memset(buf, '\0', 1024);
// 读目标文件描述符,验证其有效性
read(fd_to_pass, buf, 1024);
printf("I got fd %d and data %s\n", fd_to_pass, buf);
close(fd_to_pass);
}

结构体 iovec和msghdr见:Linux高性能服务器编程 | 读书笔记 | 4. 高级 I/O 函数-CSDN博客通用数据读写函数部分