进程间通信方式—消息队列(System V IPC)

消息队列

1.消息队列进程间通信原理

System V IPC 消息队列是一种进程间通信(IPC)机制,它允许不同进程通过发送和接收消息来进行通信。消息队列就像是一个邮箱系统,进程可以将消息(信件)发送到队列(邮箱)中,其他进程可以从这个队列中接收消息。

消息队列是在两个进程间传递二进制数据块的方式,每个数据块都有一个特定类型,接收方可以根据类型来有选择地接收数据,而不一定像管道和命名管道那样必须以先进先出的方式接收数据。

原理:多个进程通过共享消息队列的标识符(msqid)来访问同一个消息队列。发送进程将消息放入消息队列后,消息队列会按照一定的规则(如先进先出)存储这些消息。接收进程可以根据消息类型等条件从消息队列中取出消息。这样,不同进程之间就可以通过消息队列进行数据传输和通信,实现进程间的同步和信息共享。例如,在生产者 - 消费者模型中,生产者进程将生产的数据作为消息发送到消息队列,消费者进程从消息队列中接收消息并进行消费,通过消息类型等机制可以确保消息的正确发送和接收,从而实现生产者和消费者之间的协调工作。

Linux消息队列的API都定义在sys/msg.h头文件中,包括4个系统调用:msggetmsgsndmsgrcvmsgctl

2.msgget 系统调用

msgget系统调用创建一个消息队列,或获取一个已有的消息队列:

1
2
#include <sys/msg.h>
int msgget(key_t key, int msgflg);
  • 参数:
    • key_t key
      • 这是一个键值,用于标识一个全局唯一的消息队列。可以通过ftok函数生成一个唯一的key值,或者使用IPC_PRIVATE来创建一个私有消息队列(通常用于具有亲缘关系的进程,如父子进程)。
    • int msgflg
      • 用于控制消息队列的创建和访问权限,它由以下几种标志组成:
        • IPC_CREAT:如果消息队列不存在,则创建它。如果和IPC_EXCL一起使用(IPC_CREAT | IPC_EXCL),则只有在消息队列不存在时才创建,若已存在则msgget函数返回 - 1 并设置errnoEEXIST
        • 权限标志:如0666等,用于指定消息队列的访问权限,格式与文件权限相同(用户、组、其他用户的读、写、执行权限)。
  • 返回值:
    • 成功时,返回一个非负整数,即消息队列的标识符(msqid)。
    • 失败时,函数返回 - 1,并设置errno变量来指示错误原因,例如EEXIST(当IPC_CREAT | IPC_EXCL且队列已存在时)、ENOENT(当没有IPC_CREAT且队列不存在时)等。

如果它用于创建消息队列的话,与之相关的内核数据结构msqid_ds将被创建并初始化。

1
2
3
4
5
6
7
8
9
10
11
12
struct msqid_ds
{
struct ipc_perm msg_perm; // 消息队列的操作权限
time_t msg_stime; // 最后一次调用msgsnd的时间
time_t msg_rtime; // 最后一次调用msgrcv的时间
time_t msg_ctime; // 最后一次被修改的时间
unsigned long msg_cbytes; // 消息队列中已有的字节数
msgqnum_t msg_qnum; // 消息队列中已有的消息数
msglen_t msg_qbytes; // 消息队列允许的最大字节数
pid_t msg_lspid; // 最后执行msgsnd的进程的PID
pid_t msg_lrpid; // 最后执行msgrcv的进程的PID
};

3.msgsnd 系统调用

msgsnd系统调用将一条消息添加到消息队列中:

1
2
3
4
5
6
7
#include <sys/msg.h>
int msgsnd(int msqid, const void* msg_ptr, size_t msg_sz, int msgflg);

struct msgbuf{
long mtype;//消息类型
char mtext[512];//消息数据
};

参数:

int msqid

  • 消息队列的标识符,由msgget函数返回。

  • const void* msg_ptr

    • 指向要发送消息的指针。消息的结构必须以一个长整型成员变量开始,这个长整型变量用于存放消息类型,后面可以跟随消息的实际数据。msg_ptr参数指向一个准备发送的消息,消息被定义为如下类型:

      1
      2
      3
      4
      struct msgbuf{
      long mtype; /* 消息类型 */
      char mtext[512]; /* 消息数据 */
      };
  • size_t msg_sz

    • 这是消息数据部分的大小,不包括消息类型的长整型变量所占的字节数。
  • int msgflg

    • 控制消息发送的行为,和semget的flag一样的,常用的标志有:
      • 0:表示阻塞发送,如果消息队列已满,则发送进程会阻塞,直到有空间可以发送消息。
      • IPC_NOWAIT:表示非阻塞发送,如果消息队列已满,则msgsnd函数立即返回 - 1,并设置errnoEAGAIN

返回值:

  • 成功时,返回0
  • 失败时,返回 - 1,并设置errno来指示错误原因,如EAGAIN(非阻塞发送时队列已满)、EINVAL(参数无效)、EIDRM(消息队列已被删除)等。

处于阻塞状态的msgsnd调用可能被如下两种异常情况所中断:

  • 消息队列被移除。此时msgsnd调用将立即返回并设置errno为EIDRM。

  • 程序接收到信号。此时msgsnd调用将立即返回并设置errno为EINTR。

msgsnd成功时将修改内核数据结构msqid_ds的部分字段,如下所示:

  • 将msg_qnum加1。

  • 将msg_lspid设置为调用进程的PID。

  • 将msg_stime设置为当前的时间。

4.msgrcv 系统调用

msgrcv系统调用从消息队列中获取消息:

1
2
#include <sys/msg.h>
int msgrcv(int msqid, void* msg_ptr, size_t msg_sz, long int msgtype, int msgflg);

参数:

  • int msqid
    • 消息队列的标识符。
  • void* msg_ptr
    • 一个指向接收消息缓冲区的指针。与msgsnd类似,缓冲区的结构应以一个长整型开始用于存放接收到的消息类型,后面是存放消息数据的空间。
  • size_t msg_sz
    • 这是接收消息缓冲区中数据部分的大小。
  • long int msgtype
    • 指定要接收的消息类型,可以有以下几种取值:
      • 0:接收(读取)消息队列中的第一条消息,不考虑消息类型。
      • > 0:接收第一个消息类型等于msgtype的消息。(除非指定了标志MSG_EXCEPT,见后文)
      • < 0:接收(读取)消息队列中第一个类型值比msgtype的绝对值小的消息。
  • int msgflg
    • 控制消息接收的行为,常用标志有:
      • 0:表示阻塞接收,如果消息队列中没有符合条件的消息,则接收进程会阻塞,直到有符合条件的消息到达。
      • IPC_NOWAIT:表示非阻塞接收,如果消息队列中没有符合条件的消息,则msgrcv函数立即返回 - 1,并设置errnoENOMSG
      • MSG_EXCEPT。如果msgtype大于0,则接收消息队列中第一个非msgtype类型的消息。
      • MSG_NOERROR。如果消息数据部分的长度超过了msg_sz,就将它截断。

返回值:

  • 成功时,返回接收到的消息数据部分的字节数。
  • 失败时,返回 - 1,并设置errno来指示错误原因,如ENOMSG(非阻塞接收时没有符合条件的消息)、EINVAL(参数无效)、EIDRM(消息队列已被删除)等。

处于阻塞状态的msgrcv调用还可能被如下两种异常情况所中断:

  • 消息队列被移除。此时msgrcv调用将立即返回并设置errno为EIDRM。

  • 程序接收到信号。此时msgrcv调用将立即返回并设置errno为EINTR。

msgrcv成功时将修改内核数据结构msqid_ds的部分字段,如下所示:

  • 将msg_qnum减1。

  • 将msg_lrpid设置为调用进程的PID。

  • 将msg_rtime设置为当前的时间。

5.msgctl 系统调用

msgctl系统调用,用于对消息队列进行控制操作(控制消息队列某些属性),如获取消息队列的状态信息、设置消息队列的属性、删除消息队列等。

1
2
#incldue <sys/msg.h>
int msgctl(int msqid, int command, struct msqid_ds* buf);

参数:

  • int msqid
    • 消息队列的标识符。
  • int command
    • 这是一个控制命令,用于指定对消息队列进行何种操作(见下表),常见的命令有:
      • IPC_STAT:获取消息队列的状态信息,并将其存储到buf所指向的struct msqid_ds结构体中。这个结构体包含了消息队列的各种属性,如操作权限、当前消息数量等。
      • IPC_SET:根据buf所指向的struct msqid_ds结构体中的信息来设置消息队列的属性。例如,可以修改消息队列的操作权限等。
      • IPC_RMID:删除由msqid标识的消息队列。这是一个非常重要且具有危险性的操作,一旦执行,消息队列及其所包含的消息将被永久删除。
  • struct msqid_ds* buf
    • 这是一个指向struct msqid_ds结构体的指针,其作用取决于command参数的值:
      • commandIPC_STAT时,buf用于存储获取到的消息队列的状态信息。
      • commandIPC_SET时,buf指向的结构体中的信息将被用于设置消息队列的属性。
      • 如果command不涉及IPC_STATIPC_SET操作(如IPC_RMID),buf通常可以设置为NULL

返回值:

  • 成功时,取决于command(见下表)。
  • 失败时,返回 - 1,并设置errno来指示错误原因,如EINVALmsqid无效,或者command参数无效,或者buf指向的结构体无效(在IPC_STATIPC_SET操作时))、EPERM(调用进程没有足够的权限来执行请求的操作)等。

image-20241220165757768

image-20241220165805087

6.函数使用案例

以下是使用 System V IPC 消息队列相关函数(msggetmsgsndmsgrcvmsgctl)的一个简单 C 语言案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>

// 消息结构体
struct msgbuf {
long mtype;
char mtext[100];
};

int main() {
// 创建消息队列
int msqid = msgget(IPC_PRIVATE, 0666 | IPC_CREAT);
if (msqid == -1) {
perror("msgget");
return 1;
}

// 发送消息
struct msgbuf send_msg;
send_msg.mtype = 1;
strcpy(send_msg.mtext, "Hello, World!");
if (msgsnd(msqid, &send_msg, sizeof(send_msg.mtext), 0) == -1) {
perror("msgsnd");
return 1;
}

// 接收消息
struct msgbuf recv_msg;
if (msgrcv(msqid, &recv_msg, sizeof(recv_msg.mtext), 1, 0) == -1) {
perror("msgrcv");
return 1;
}
printf("Received message: %s\n", recv_msg.mtext);

// 获取消息队列状态 buf存储消息队列的信息
struct msqid_ds buf;
if (msgctl(msqid, IPC_STAT, &buf) == -1) {
perror("msgctl - IPC_STAT");
return 1;
}
printf("Messages in queue: %ld\n", buf.msg_qnum);

// 删除消息队列
if (msgctl(msqid, IPC_RMID, NULL) == -1) {
perror("msgctl - IPC_RMID");
return 1;
}

return 0;
}

代码解释:

消息结构体定义

  • 定义了struct msgbuf结构体,它包含一个长整型mtype(用于表示消息类型)和一个字符数组mtext(用于存储消息内容)。

创建消息队列(msgget

  • 使用IPC_PRIVATE作为key来创建一个新的私有消息队列,权限设置为0666(用户、组和其他用户都有读写权限)。如果msgget调用成功,返回消息队列标识符msqid;否则,打印错误信息并返回。

发送消息(msgsnd

  • 初始化send_msg结构体,设置mtype1,并将消息内容设置为"Hello, World!"
  • 使用msgsnd函数将消息发送到消息队列中。msgsnd函数的参数包括消息队列标识符msqid、消息结构体指针&send_msg、消息数据部分大小sizeof(send_msg.mtext)和标志0(表示阻塞发送,如果队列满则等待)。如果发送失败,打印错误信息并返回。

接收消息(msgrcv

  • 初始化recv_msg结构体。
  • 使用msgrcv函数从消息队列中接收消息。msgrcv函数的参数包括消息队列标识符msqid、接收消息结构体指针&recv_msg、接收消息数据部分大小sizeof(recv_msg.mtext)、要接收的消息类型1和标志0(表示阻塞接收,如果没有符合条件的消息则等待)。如果接收失败,打印错误信息并返回。
  • 接收到消息后,打印出消息内容。

获取消息队列状态(msgctl

  • 定义struct msqid_ds类型的变量buf
  • 使用msgctl函数的IPC_STAT命令获取消息队列的状态信息,并将其存储在buf中。如果获取状态失败,打印错误信息并返回。
  • 打印出消息队列中的消息数量(buf.msg_qnum)。

删除消息队列(msgctl

  • 使用msgctl函数的IPC_RMID命令删除消息队列。如果删除失败,打印错误信息并返回。

image-20241220171912147

7.实现生产者消费者模型

**消息队列并不能实现互斥。**以下是使用上述消息队列函数实现的生产者 - 消费者模型的 C 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>
#include <time.h>
#include <unistd.h>

#define MAX_MSG_SIZE 100

// 消息结构体
struct msgbuf {
long mtype;
char mtext[MAX_MSG_SIZE];
};

// 生产者函数
void producer(int msqid) {
struct msgbuf msg;
msg.mtype = 1; // 消息类型设为1

srand(time(NULL));
while (1) {
int num = rand() % 100;
sprintf(msg.mtext, "%d", num);
if (msgsnd(msqid, &msg, sizeof(msg.mtext), 0) == -1) {
perror("msgsnd");
exit(1);
}
printf("Producer sent: %s\n", msg.mtext);
sleep(1);
}
}

// 消费者函数
void consumer(int msqid) {
struct msgbuf msg;
while (1) {
if (msgrcv(msqid, &msg, sizeof(msg.mtext), 1, 0) == -1) {
perror("msgrcv");
exit(1);
}
printf("Consumer received: %s\n", msg.mtext);
sleep(2);
}
}

int main() {
// 创建消息队列
int msqid = msgget(IPC_PRIVATE, IPC_CREAT | 0666);
if (msqid == -1) {
perror("msgget");
return 1;
}

pid_t pid = fork();
if (pid < 0) {
perror("fork");
return 1;
} else if (pid == 0) {
// 子进程为消费者
consumer(msqid);
} else {
// 父进程为生产者
producer(msqid);
}

// 等待子进程结束(这里只是简单等待,实际可能需要更完善的机制)
wait(NULL);

// 删除消息队列
if (msgctl(msqid, IPC_RMID, NULL) == -1) {
perror("msgctl");
return 1;
}

return 0;
}

代码解释

消息结构体定义:

  • 定义了struct msgbuf结构体,包含一个长整型的mtype(消息类型)和一个字符数组mtext(用于存放消息数据)。

生产者函数:

  • 生成一个随机数,将其转换为字符串后放入消息结构体的mtext中,然后使用msgsnd将消息发送到消息队列中,发送的消息类型为1,发送操作是阻塞的(msgflg0)。

消费者函数:

  • 使用msgrcv从消息队列中接收消息类型为1的消息,接收操作是阻塞的(msgflg0),接收到消息后打印出消息内容。

主函数:

  • 使用msgget创建一个私有消息队列。
  • 通过fork创建子进程,子进程作为消费者,父进程作为生产者。
  • 最后等待子进程结束,并使用msgctl删除消息队列。

并不能够实现互斥,只是能够通信。

运行结果:

image-20241220173408405

8.无血缘关系进程通信

  1. ftok函数的定义和功能
    • 函数原型key_t ftok(const char *pathname, int proj_id);
    • 功能ftok函数用于生成一个唯一的key(键值),这个key通常用于 System V IPC(进程间通信)机制中,如创建共享内存、消息队列和信号量集等。它将一个文件路径名(pathname)和一个项目标识符(proj_id)组合起来,生成一个适合作为 System V IPC 资源标识符的key值。
  2. 参数解释
    • const char *pathname
      • 这是一个指向文件路径名的指针。这个文件路径必须是一个已经存在的文件的有效路径,通常使用当前目录(.")或者一个程序相关的配置文件路径等。ftok函数会使用文件的inode(索引节点)信息作为生成key的一部分。
      • 注意,如果文件被删除然后重新创建,即使文件名相同,inode可能会改变,这会导致ftok生成不同的key值。
    • int proj_id
      • 这是一个0 - 255之间的整数,作为项目标识符。它和文件路径的inode信息一起组合生成key。不同的项目可以使用不同的proj_id来区分,这样即使基于同一个文件路径,不同的项目也能生成不同的key值用于各自的 IPC 资源。例如,一个程序中有两个不同的模块需要使用消息队列进行通信,它们可以使用相同的文件路径但不同的proj_id来生成不同的key,以创建两个独立的消息队列。
  3. 返回值
    • 成功时,ftok函数返回一个key_t类型的非负整数,这个整数可以作为shmgetmsggetsemget等 System V IPC 函数的key参数来创建或获取对应的 IPC 资源。
    • 失败时,返回-1,并且会设置errno来指示错误原因。常见的错误原因包括:
      • EACCESS:没有权限访问pathname指定的文件。
      • ENOENTpathname指定的文件不存在。

发送端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>

// 消息结构体
struct msgbuf {
long mtype;
char mtext[100];
};

int main() {
// 通过ftok生成key
key_t key = ftok(".", 'c');
if (key == -1) {
perror("ftok");
return 1;
}

// 创建消息队列
int msqid = msgget(key, IPC_CREAT | 0666);
if (msqid == -1) {
perror("msgget");
return 1;
}

// 准备发送消息的结构体
struct msgbuf send_msg;
send_msg.mtype = 1;
strcpy(send_msg.mtext, "Hello from sender!");

if (msgsnd(msqid, &send_msg, sizeof(send_msg.mtext), 0) == -1) {
perror("msgsnd");
return 1;
}

printf("Sender sent message...\n");

sleep(10);
// 删除消息队列
if (msgctl(msqid, IPC_RMID, NULL) == -1) {
perror("msgctl for delete");
return 1;
}

return 0;
}

接收端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include <stdio.h>
#include <stdlib.combined
#include <sys/types.combined
#include <sys/ipc.combined
#include <sys/msg.combined
#include <string.combined

// 消息结构体
struct msgbuf {
long mtype;
char mtext[100];
};

int main() {
// 通过ftok生成key
key_t key = ftok(".", 'c');
if (key == -1) {
perror("ftok");
return 1;
}

// 获取消息队列
int msqid = msgget(key, 0666);
if (msqid == -1) {
perror("msgget")
return 1;
}

// 准备接收消息的结构体
struct msgbuf recv_msg;
if (msgrcv(msqid, &recv_msg, sizeof(recv_msg.mtext), 1, 0) == -1) {
perror("msgrcv")
return 1;
}

printf("Receiver received message: %s\n", recv_msg.mtext);

return 0;
}

运行结果:

image-20241220192302773