System V 消息队列—复用消息

  消息队列中的消息结构可以由我们自由定义,具备较强的灵活性。通过消息结构可以共享一个队列,进行消息复用。通常定义一个类似如下的消息结构:

#define MSGMAXDAT     1024
struct mymsg
{
    long msg_len;   //消息长度
    long msg_type; //消息类型
    long msg_data[MSGMAXDATA]; //消息内容
};

 消息结构相关联的类型字段(msg_type)提供了两个特性:

(1)标识消息,使得多个进程在单个队列上复用消息。

(2)用作优先级字段,允许接收者以不同于先进先出的某个顺序读出各个消息。

例子1:每个应用一个队列,可以在多个客户和单个服务器之间复用消息。使用一个消息队列进行通信,由消息类型标识消息是从客户到服务器,还是服务器到客户。通信模型如下:

按照通信模型编写程序如下:

公共头文件svmsg.h

 1 #ifndef  SVMSG_H
 2 #define  SVMSG_H
 3 #include <stdio.h>
 4 #include <stdlib.h>
 5 #include <string.h>
 6 #include <unistd.h>
 7 #include <sys/types.h>
 8 #include <sys/ipc.h>
 9 #include <sys/msg.h>
10 #include <errno.h>
11
12 #define MSG_R 0400 /* read permission */
13 #define MSG_W 0200 /* write permission */
14 #define SVMSG_MODE (MSG_R | MSG_W | MSG_R >>3 | MSG_R >>6)
15 #define MQ_KEY  1234L
16 #define MSGMAX  1024
17 //消息结构
18 struct mymesg
19 {
20     long mesg_len;
21     long mesg_type;
22     char mesg_data[MSGMAX];
23 };
24 #endif

客户端程序sysv_client.c

 1 #include "svmsg.h"
 2 void client(int ,int);
 3
 4 int main(int argc,char *argv[])
 5 {
 6     int     msqid;
 7     if((msqid = msgget(MQ_KEY,0)) == -1)
 8     {
 9         perror("megget()");
10         exit(-1);
11     }
12     client(msqid,msqid);
13     exit(0);
14 }
15
16 void client(int readfd,int writefd)
17 {
18     size_t len;
19     ssize_t n;
20     char *ptr;
21     struct mymesg mesg;
22     printf("Send request to server.\n");
23     //set pid to message
24     snprintf(mesg.mesg_data,MSGMAX,"%ld",(long)getpid());
25     len = strlen(mesg.mesg_data);
26     mesg.mesg_data[len] = ' '; //blank
27     ptr = mesg.mesg_data+len+1;
28     printf("Enter filename: ");
29     fgets(ptr,MSGMAX-len,stdin);
30     len = strlen(mesg.mesg_data);
31     if(mesg.mesg_data[len-1] == '\n')
32         len--;
33     mesg.mesg_len = len;
34     mesg.mesg_type = 1;
35     printf("mesg_data is :%s len=%ld\n",mesg.mesg_data, mesg.mesg_len);
36     if(msgsnd(writefd,&(mesg.mesg_type),mesg.mesg_len,0) == -1)
37     {
38         perror("msgsnd() error");
39         exit(-1);
40     }
41     //read from IPC,write to standard output
42     mesg.mesg_type = getpid();
43     while( (n = msgrcv(readfd,&(mesg.mesg_type),MSGMAX,mesg.mesg_type,0))>0)
44     {
45         write(STDOUT_FILENO,mesg.mesg_data,n);
46         putchar('\n');
47     }
48     if(n == 0 )
49     {
50         printf("Read file from server is completed.\n");
51     }
52     if(n == -1)
53     {
54         perror("msgrcv() error");
55         exit(-1);
56     }
57 }

服务器程序sysv_server.c

 1 #include "svmsg.h"
 2 void server(int ,int);
 3 int main(int argc,char *argv[])
 4 {
 5     int     msqid;
 6     if((msqid = msgget(MQ_KEY,SVMSG_MODE | IPC_CREAT)) == -1)
 7     {
 8         perror("megget()");
 9         exit(-1);
10     }
11     server(msqid,msqid);
12     exit(0);
13 }
14
15 void server(int readfd,int writefd)
16 {
17     FILE            *fp;
18     char            *ptr;
19     pid_t           pid;
20     ssize_t         n;
21     ssize_t         len;
22     struct mymesg   mesg;
23     printf("Waiting for client......\n");
24     for(; ;)
25     {
26         mesg.mesg_type = 1;
27         if((n = msgrcv(readfd,&(mesg.mesg_type),MSGMAX,mesg.mesg_type,0)) == 0)
28         {
29             printf("pathname missing.\n");
30             continue;
31         }
32         mesg.mesg_data[n] = '\0';
33         printf("Received message from client is: %s\n",mesg.mesg_data);
34         if ((ptr = strchr(mesg.mesg_data,' ')) == NULL)
35         {
36             printf("bogus request: %s\n",mesg.mesg_data);
37             continue;
38         }
39         *ptr++ = 0;
40         pid = atoi(mesg.mesg_data);
41         mesg.mesg_type = pid;
42         //open fiel and read data
43         if((fp = fopen(ptr,"r")) == NULL)
44         {
45             printf("open file failed.sent msg to client\n");
46             snprintf(mesg.mesg_data+n,sizeof(mesg.mesg_data)-n,": can't open,%s\n",strerror(errno));
47             mesg.mesg_len = strlen(ptr);
48             memmove(mesg.mesg_data,ptr,mesg.mesg_len);
49             if(msgsnd(writefd,&(mesg.mesg_type),mesg.mesg_len,0) == -1)
50             {
51                 perror("msgsnd() error");
52                 exit(-1);
53             }
54         }
55         else
56         {
57             printf("open file successed.sent file to client\n");
58             while(fgets(mesg.mesg_data,MSGMAX,fp) != NULL)
59             {
60                 mesg.mesg_len = strlen(mesg.mesg_data);
61                 if(msgsnd(writefd,&(mesg.mesg_type),mesg.mesg_len,0) == -1)
62                 {
63                     perror("msgsnd() error");
64                     exit(-1);
65                 }
66             }
67             fclose(fp);
68         }
69         printf("send compelted.\n");
70         mesg.mesg_len = 0;
71         if(msgsnd(writefd,&(mesg.mesg_type),mesg.mesg_len,0) == -1)
72         {
73             perror("msgsnd() error");
74             exit(-1);
75         }
76     }
77 }

程序测试结果如下所示:

 例子2:每个客户一个队列,将例子1改成所有用户用一个共同的消息队列向服务器发送消息,给每个客户分配一个消息队列,使得服务器对每个客户进行应答。通信模型如下:

以并发服务器模型编写这个程序,服务器给每个客户fork一个子进程进行处理。程序如下:

公共头文件svmsg.h和svmsg.c:

 1 //svmsg.h file
 2 #ifndef  SVMSG_H
 3 #define  SVMSG_H
 4 #include <stdio.h>
 5 #include <stdlib.h>
 6 #include <string.h>
 7 #include <unistd.h>
 8 #include <signal.h>
 9 #include <sys/types.h>
10 #include <sys/ipc.h>
11 #include <sys/msg.h>
12 #include <errno.h>
13
14 #define MSG_R 0400 /* read permission */
15 #define MSG_W 0200 /* write permission */
16 #define SVMSG_MODE (MSG_R | MSG_W | MSG_R >>3 | MSG_R >>6)
17 #define MQ_KEY  1234L
18 #define MSGMAX  1024
19 //message structure
20 struct mymesg
21 {
22     long mesg_len;
23     long mesg_type;
24     char mesg_data[MSGMAX];
25 };
26
27 ssize_t mesg_send(int id,struct mymesg *mptr);
28 ssize_t mesg_recv(int id,struct mymesg *mptr);
29
30 void    Mesg_send(int id,struct mymesg *mptr);
31 ssize_t Mesg_recv(int id,struct mymesg *mptr);
32 #endif

 1 //svmsg.c file
 2 #include "svmsg.h"
 3
 4 ssize_t mesg_send(int id,struct mymesg *mptr)
 5 {
 6     return (msgsnd(id,&(mptr->mesg_type),mptr->mesg_len,0));
 7 }
 8
 9 ssize_t mesg_recv(int id,struct mymesg *mptr)
10 {
11     ssize_t n;
12     n = msgrcv(id,&(mptr->mesg_type),MSGMAX,mptr->mesg_type,0);
13     mptr->mesg_len = n;
14     return n;
15 }
16
17 void Mesg_send(int id,struct mymesg *mptr)
18 {
19     if(mesg_send(id,mptr) == -1)
20     {
21         perror("mesg_send() error");
22         exit(-1);
23     }
24 }
25 ssize_t Mesg_recv(int id,struct mymesg *mptr)
26 {
27     ssize_t n;
28     do
29     {
30         n = mesg_recv(id,mptr);
31     }while(n==-1 && errno == EINTR);
32     if(n == -1)
33     {
34         perror("mesg_recv() error");
35         exit(-1);
36     }
37     return n;
38 }

客户端程序如下:

 1 #include "svmsg.h"
 2
 3 void client(int ,int);
 4
 5 int main(int argc,char *argv[])
 6 {
 7     int    readid,writeid;
 8     if((writeid = msgget(MQ_KEY,0)) == -1)
 9     {
10         perror("megget()");
11         exit(-1);
12     }
13     if((readid = msgget(IPC_PRIVATE,SVMSG_MODE | IPC_CREAT)) == -1)
14     {
15         perror("megget()");
16         exit(-1);
17     }
18     client(readid,writeid);
19     msgctl(readid,IPC_RMID,NULL);
20     exit(0);
21 }
22
23 void client(int readid,int writeid)
24 {
25     size_t len;
26     ssize_t n;
27     char *ptr;
28     struct mymesg mesg;
29     printf("Send request to server.\n");
30     //set pid to message
31     snprintf(mesg.mesg_data,MSGMAX,"%d",readid);
32     len = strlen(mesg.mesg_data);
33     mesg.mesg_data[len] = ' '; //blank
34     ptr = mesg.mesg_data+len+1;
35     printf("Enter filename: ");
36     fgets(ptr,MSGMAX-len,stdin);
37     len = strlen(mesg.mesg_data);
38     if(mesg.mesg_data[len-1] == '\n')
39         len--;
40     mesg.mesg_len = len;
41     mesg.mesg_type = 1;
42     printf("mesg_data is :%s\n",mesg.mesg_data);
43     Mesg_send(writeid,&mesg);
44     printf("Send messge to server successed.\n");
45     //read from IPC,write to standard output
46     while( (n = Mesg_recv(readid,&mesg))>0)
47     {
48         write(STDOUT_FILENO,mesg.mesg_data,n);
49         putchar('\n');
50     }
51     if(n == 0 )
52     {
53         printf("Read file from server is completed.\n");
54     }
55 }

服务器程序如下:

 1 #include "svmsg.h"
 2
 3 void server(int ,int);
 4 void sig_child(int signo);
 5
 6 int main(int argc,char *argv[])
 7 {
 8     int     msqid;
 9     if((msqid = msgget(MQ_KEY,SVMSG_MODE | IPC_CREAT)) == -1)
10     {
11         perror("megget()");
12         exit(-1);
13     }
14     server(msqid,msqid);
15     exit(0);
16 }
17
18 void server(int readid,int writeid)
19 {
20     FILE            *fp;
21     char            *ptr;
22     pid_t           pid;
23     ssize_t         n;
24     ssize_t         len;
25     struct mymesg   mesg;
26     signal(SIGCHLD,sig_child);
27     printf("Waiting for client......\n");
28     for(; ;)
29     {
30         mesg.mesg_type = 1;
31         if((n = Mesg_recv(readid,&mesg)) == 0)
32         {
33             printf("pathname missing.\n");
34             continue;
35         }
36         mesg.mesg_data[n] = '\0';
37         printf("Received message from client is: %s\n",mesg.mesg_data);
38         if ((ptr = strchr(mesg.mesg_data,' ')) == NULL)
39         {
40             printf("bogus request: %s\n",mesg.mesg_data);
41             continue;
42         }
43         *ptr++ = 0;
44         writeid = atoi(mesg.mesg_data);
45         if(fork() == 0)
46         {
47             //open fiel and read data
48             if((fp = fopen(ptr,"r")) == NULL)
49             {
50                 printf("open file failed.sent msg to client\n");
51                 snprintf(mesg.mesg_data+n,sizeof(mesg.mesg_data)-n,": can't open,%s\n",strerror(errno));
52                 mesg.mesg_len = strlen(ptr);
53                 memmove(mesg.mesg_data,ptr,mesg.mesg_len);
54                 Mesg_send(writeid,&mesg);
55             }
56             else
57             {
58                 printf("open file successed.sent file to client\n");
59                 while(fgets(mesg.mesg_data,MSGMAX,fp) != NULL)
60                 {
61                     mesg.mesg_len = strlen(mesg.mesg_data);
62                     Mesg_send(writeid,&mesg);
63                 }
64                 fclose(fp);
65             }
66             printf("send compelted.\n");
67             mesg.mesg_len = 0;
68             Mesg_send(writeid,&mesg);
69         }
70     }
71 }
72
73 void sig_child(int signo)
74 {
75     pid_t   pid;
76     int     stat;
77     while ((pid = waitpid(-1,&stat,WNOHANG)) > 0);
78     return ;
79 }

程序测试结果如下:

时间: 2024-11-15 18:15:08

System V 消息队列—复用消息的相关文章

Handler详解系列(三)——在子线程中给主线程的消息队列发送消息

MainActivity如下: package cc.c; import android.os.Bundle; import android.os.Handler; import android.os.Looper; import android.os.Message; import android.view.View; import android.view.View.OnClickListener; import android.widget.Button; import android.w

Handler详解系列(二)——主线程向自身消息队列发消息

MainActivity如下: package cc.c; import android.os.Bundle; import android.os.Handler; import android.os.Looper; import android.os.Message; import android.view.View; import android.view.View.OnClickListener; import android.widget.Button; import android.w

linux 消息队列-Linux消息队列使用时,msgsnd()一直不能通过,大家帮忙看看!

问题描述 Linux消息队列使用时,msgsnd()一直不能通过,大家帮忙看看! #include #include #include #include #include #include #include #include #define QUEUE1 100 #define QUEUE2 101 /* redefine struct msg_buf */ struct msgmbuf{ //msg type long mtype; //the field used to store URL

System V 消息队列

1.概述 消息队列可以认为是一个消息链表,System V 消息队列使用消息队列标识符标识.具有足够特权的任何进程都可以往一个队列放置一个消息,具有足够特权的任何进程都可以从一个给定队列读出一个消息.在某个进程往一个队列写入消息之前,并不需要另外某个进程在该队列上等待消息的到达.System V 消息队列是随内核持续的,只有在内核重起或者显示删除一个消息队列时,该消息队列才会真正被删除.可以将内核中的某个特定的消息队列画为一个消息链表,如下图所示: 对于系统中没个消息队列,内核维护一个msqid

UNIX环境高级编程:system V消息队列

unix早期通信机制中的信号能够传送的信息量有限,管道则只能传送无格式字节流,这远远是不够的. 消息队列(也叫报文队列)客服了这些缺点: 消息队列就是一个消息的链表. 可以把消息看作一个记录,具有特定的格式. 进程可以按照一定的规则向消息队列中添加新消息:另一些进程可以从消息队列中读走消息. 消息队列是随内核持续的,只有内核重启或人工删除时,该消息队列才会被删除. system V消息队列使用消息队列标识符标识.具有足够特权的任何进程都可以往一个给定队列放置一个消息,具有足够特权的任何进程都可以

linux System V消息队列实现回射客户/服务器和msgsnd、msgrcv函数

一.msgsnd 和 msgrcv 函数 #include <sys/types.h>  #include <sys/ipc.h>  #include <sys/msg.h> 功能:把一条消息添加到消息队列中 原型 int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg); 参数 msgid: 由msgget函数返回的消息队列标识码 msgp:是一个指针, 指针指向准备发送的消息结构体 msgsz

POSIX和SYSTEM的消息队列应该注意的问题

  首先看看POSIX的代码: 1.posix_mq_server.c #include <mqueue.h>#include <sys/stat.h>#include <string.h>#include <stdio.h>#define MQ_FILE "/mq_test"#define BUF_LEN 128 int main(){     mqd_t mqd;    char buf[BUF_LEN];    int  por =

Linux进程间通信学习:如何使用消息队列

下面来说说如何用不用消息队列来进行进程间的通信,消息队列与命名管道有很多相似之处.有关命名管道的更多内容可以参阅我的另一篇文章:Linux进程间通信--使用命名管道 一.什么是消息队列 消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法.  每个数据块都被认为含有一个类型,接收进程可以独立地接收含有不同类型的数据结构.我们可以通过发送消息来避免命名管道的同步和阻塞问题.但是消息队列与命名管道一样,每个数据块都有一个最大长度的限制. Linux用宏MSGMAX和MSGMNB来限制一条消息

linux网络编程之POSIX消息队列和系列函数

一.在前面介绍了system v 消息队列的相关知识,现在来稍微看看posix 消息队列. 其实消息队列就是一个可 以让进程间交换数据的场所,而两个标准的消息队列最大的不同可能只是api 函数的不同,如system v 的系列函数是 msgxxx,而posix 是mq_xxx.posix 消息队列也有一些对消息长度等的限制,man 7 mq_overview: simba@ubuntu:~/Documents/code/linux_programming/UNP/posix$ cat /proc