linux生产者消费者,kafka生产者和消费者
大家好,今天小编来为大家解答以下的问题,关于linux生产者消费者,kafka生产者和消费者这个很多人还不知道,现在让我们一起来看看吧!
进程的同步与互斥实验报告Linux
相交进程之间的关系主要有两种,同步与互斥。所谓互斥,是指散步在不同进程之间的若干程序片断,当某个进程运行其中一个程序片段时,其它进程就不能运行它们之中的任一程序片段,只能等到该进程运行完这个程序片段后才可以运行。所谓同步,是指散步在不同进程之间的若干程序片断,它们的运行必须严格按照规定的某种先后次序来运行,这种先后次序依赖于要完成的特定的任务。
显然,同步是一种更为复杂的互斥,而互斥是一种特殊的同步。
也就是说互斥是两个线程之间不可以同时运行,他们会相互排斥,必须等待一个线程运行完毕,另一个才能运行,而同步也是不能同时运行,但他是必须要安照某种次序来运行相应的线程(也是一种互斥)!
总结:互斥:是指某一资源同时只允许一个访问者对其进行访问,具有唯一性和排它性。但互斥无法限制访问者对资源的访问顺序,即访问是无序的。
同步:是指在互斥的基础上(大多数情况),通过其它机制实现访问者对资源的有序访问。在大多数情况下,同步已经实现了互斥,特别是所有写入资源的情况必定是互斥的。少数情况是指可以允许多个访问者同时访问资源
kfifo(linux kernel 无锁队列)
队列作为常见数据结构,其主要特点是先进先出(FIFO)。FIFO用于缓冲通信速度不匹配场景,例如生产者快速生成数据,消费者则难以及时处理。在通信接口驱动中,数据被暂存于队列中,驱动程序可以立即返回接收新数据或等待,而解析程序仅需从队列中获取数据。FIFO也可解决“多生产者-单消费者”问题。
kfifo是Linux内核中实现队列功能的一种无锁环形队列。无锁意味着在单生产者单消费者场景下无需加锁操作。它通过使用in和out两个变量作为入队和出队索引来实现无锁操作,避免了在多个生产者或消费者场景下的加锁需求。通过将in/out与fifo的空间大小减一进行“与”操作,kfifo实现了比取余操作更快的队列循环。这意味着当in和out超过队列大小时,它们会继续向前加,而不是简单地减去队列大小后重新开始。
kfifo实现“无锁”特性仅针对“单生产者-单消费者”场景。对于“多生产者”或“多消费者”情况,需对入队或出队操作进行加锁以避免数据竞争。
使用kfifo有两种方式:动态申请和静态定义。动态申请包括包含头文件、定义结构体、申请内存、执行入队和出队操作,最后释放内存。静态定义则在定义fifo变量时使用宏,操作函数更加简洁,无需内存管理步骤。
kfifo结构体包含用于管理队列的变量,如in、out、mask等。内存申请过程确保了用mask大小掩码与in/out索引进行“与”操作,实现队列循环,避免了取余运算的开销。使用kfifo_alloc动态申请内存时,最终分配的内存空间是向上取2的次方,以支持mask大小掩码操作。这可能导致使用者在不了解规则的情况下踩坑,例如,申请100字节内存时实际上分配了128字节,可能导致数据错位。
入队操作涉及确定内存地址、拷贝数据并确保内存屏障以避免乱序异常。整个过程高效且简洁。对于更深入的了解,可查阅kfifo的源码。
生产者消费者问题--进程
#i nclude<stdio.h>
#i nclude< iostream.h>
#i nclude< windows.h>
#define BufferSize 15
char Buffer[BufferSize];
int head,tail=0;//Buffer数组下标
int count;//被使用的缓冲区数量
HANDLE hMutex;
HANDLE hNotFullEvent, hNotEmptyEvent;//用来同步生产者和消费者线程
////////缓冲区存储情况
display(char a[15])
{
int i;
cout<<"缓冲区存储情况为:"<<endl;
for(i=14;i>=0;i--){
cout<<"\t|----"<<a<<"----|"<<endl;
}
}
//p1
void p1_Producer()
{
int i;
char ch;
char p1[]={'a','A','b','B','c','C','D','d','E','e'};
if(tail<15){
for(i=0;i<10;i++){
while(1){
WaitForSingleObject(hMutex,INFINITE);
if(count==BufferSize){//缓冲区满
ReleaseMutex(hMutex);
//等待直到缓冲区非满
WaitForSingleObject(hNotFullEvent,INFINITE);
continue;
}
//得到互斥锁且缓冲区非满,跳出while循环
break;
}
if(tail>14){
cout<<"缓冲区已满,不能再存入数据!"<<endl;
ReleaseMutex(hMutex);//结束临界区
PulseEvent(hNotEmptyEvent);//唤醒消费者线程
}
else{
//得到互斥锁且缓冲区非满,开始产生新数据
cout<<"Producer p1:\t"<<p1<<endl;
Buffer[tail]=p1;
//tail=(tail+1)%BufferSize;///存放于缓冲区的位置
display(Buffer);
tail++;
count++;
cout<<"按ENTER继续...."<<endl;
ch=getchar();
ReleaseMutex(hMutex);//结束临界区
PulseEvent(hNotEmptyEvent);//唤醒消费者线程
}
}
}
}
//////////////////////////////////////////////////////////////////
//p2
void p2_Producer()
{
int i;
char ch;
char p2[]={'0','1','2','3','4','5','6','7','8','9'};
if(tail<15){
for(i=0;i<10;i++){
while(1){
ch=getchar();
WaitForSingleObject(hMutex,INFINITE);
if(count==BufferSize){//缓冲区满
ReleaseMutex(hMutex);
//等待直到缓冲区非满
WaitForSingleObject(hNotFullEvent,INFINITE);
continue;
}
//得到互斥锁且缓冲区非满,跳出while循环
break;
}
if(tail>14){
cout<<"缓冲区已满,不能再存入数据!程序结束!"<<endl;
ReleaseMutex(hMutex);//结束临界区
PulseEvent(hNotEmptyEvent);//唤醒消费者线程
}
else{
//得到互斥锁且缓冲区非满,开始产生新数据
cout<<"Producer p2:\t"<<p2<<endl;
Buffer[tail]=p2;
//tail=(tail+1)%BufferSize;
display(Buffer);
tail++;
count++;
cout<<"按ENTER继续...."<<endl;
ch=getchar();
ReleaseMutex(hMutex);//结束临界区
PulseEvent(hNotEmptyEvent);//唤醒消费者线程
}
}
}
}
//////////////////////////////////////////////////////////////////
//p3
void p3_Producer()
{
int i;
char ch;
char p3[]={'!','#','$','%','&','*','+','-','.','/'};
if(tail<15){
for(i=0;i<10;i++){
while(1){
ch=getchar();
WaitForSingleObject(hMutex,INFINITE);
if(count==BufferSize){//缓冲区满
ReleaseMutex(hMutex);
//等待直到缓冲区非满
WaitForSingleObject(hNotFullEvent,INFINITE);
continue;
}
//得到互斥锁且缓冲区非满,跳出while循环
break;
}
if(tail>14){
cout<<"缓冲区已满,不能再存入数据!程序结束!"<<endl;
ReleaseMutex(hMutex);//结束临界区
PulseEvent(hNotEmptyEvent);//唤醒消费者线程
}
else{
//得到互斥锁且缓冲区非满,开始产生新数据
cout<<"Producer p3:\t"<<p3<<endl;
Buffer[tail]=p3;
//tail=(tail+1)%BufferSize;
display(Buffer);
tail++;
count++;
cout<<"按ENTER继续...."<<endl;
ch=getchar();
ReleaseMutex(hMutex);//结束临界区
PulseEvent(hNotEmptyEvent);//唤醒消费者线程
}
}
}
}
//////////////////////////////////////////////////////////////////
//c1
void c1_Consumer()
{
int i,j,k;
char result,ch;
while(1){
ch=getchar();
WaitForSingleObject(hMutex,INFINITE);
if(count==0){//没有可以处理的数据
ReleaseMutex(hMutex);//释放互斥锁且等待
//等待直到缓冲区非空
WaitForSingleObject(hNotEmptyEvent,INFINITE);
}
else{if(Buffer[head]==0){
cout<<"Consumer 0:缓冲区的数据已全消费过一次,消费完毕!"<<endl;
ReleaseMutex(hMutex);//结束临界区
ExitThread(0);
}
else{//获得互斥锁且缓冲区有数据,开始处理
result=Buffer[head];
if(result>64&&result<70){
result=result+32;
cout<<"Consumer c1:(大写->小写)\t"<<result<<endl;
Buffer[head]='^';//'^'表示数据已被消费
cout<<"'^'表示数据已被消费"<<endl;
display(Buffer);
}
if(result>96&&result<102){
result=result-32;
cout<<"Consumer c1:(小写->大写)\t"<<result<<endl;
Buffer[head]='^';
cout<<"'^'表示数据已被消费"<<endl;
display(Buffer);
}
if(result>47&&result<58){
cout<<"Consumer c1:(显示字符)\t"<<result<<endl;
Buffer[head]='^';
cout<<"'^'表示数据已被消费"<<endl;
display(Buffer);}
if(result>32&&result<48){
cout<<"Consumer c1:(用符号打印出菱形)"<<endl;
for(i=1;i<=(9+1)/2;i++)
{
for(j=1;j<=40-i;j++)
cout<<"";
for(k=1;k<=2*i-1;k++)
cout<<result;
cout<<endl;
}
for(i=1;i<=9/2;i++)
{
for(j=1;j<=40-(9+1)/2+i;j++)
cout<<"";
for(k=1;k<=9-2*i;k++)
cout<<result;
cout<<endl;
}
Buffer[head]='^';
cout<<"'^'表示数据已被消费"<<endl;
display(Buffer);
}
head=(head+1)%BufferSize;
count--;
cout<<"按ENTER继续...."<<endl;
ch=getchar();
ReleaseMutex(hMutex);//结束临界区
PulseEvent(hNotFullEvent);//唤醒生产者线程
}
}
}
}
//////////////////////////////////////////////////////////////////
//c2
void c2_Consumer()
{
int i,j,k;
char result,ch;
while(1){
WaitForSingleObject(hMutex,INFINITE);
if(count==0){//没有可以处理的数据
ReleaseMutex(hMutex);//释放互斥锁且等待
//等待直到缓冲区非空
WaitForSingleObject(hNotEmptyEvent,INFINITE);
}
else{if(Buffer[head]==0){
cout<<"Consumer 0:缓冲区的数据已全消费过一次,消费完毕!"<<endl;
ReleaseMutex(hMutex);//结束临界区
ExitThread(0);
}
else{//获得互斥锁且缓冲区有数据,开始处理
result=Buffer[head];
if(result>64&&result<90){
result=result+32;
cout<<"Consumer c2:(大写->小写)\t"<<result<<endl;
Buffer[head]='^';
cout<<"'^'表示数据已被消费"<<endl;
display(Buffer);
}
if(result>96&&result<102){
result=result-32;
cout<<"Consumer c2:(小写->大写)\t"<<result<<endl;
Buffer[head]='^';
cout<<"'^'表示数据已被消费"<<endl;
display(Buffer);}
if(result>47&&result<58){
cout<<"Consumed c2:(显示字符)\t"<<result<<endl;
Buffer[head]='^';
cout<<"'^'表示数据已被消费"<<endl;
display(Buffer);}
if(result>32&&result<48){
cout<<"Consumer c2:(用符号打印出菱形)"<<endl;
for(i=1;i<=(9+1)/2;i++)
{
for(j=1;j<=40-i;j++)
cout<<"";
for(k=1;k<=2*i-1;k++)
cout<<result;
cout<<endl;
}
for(i=1;i<=9/2;i++)
{
for(j=1;j<=40-(9+1)/2+i;j++)
cout<<"";
for(k=1;k<=9-2*i;k++)
cout<<result;
cout<<endl;
}
Buffer[head]='^';
cout<<"'^'表示数据已被消费"<<endl;
display(Buffer);
}
head=(head+1)%BufferSize;
count--;
cout<<"按ENTER继续...."<<endl;
ch=getchar();
ReleaseMutex(hMutex);//结束临界区
PulseEvent(hNotFullEvent);//唤醒生产者线程
}
}
}
}
//////////////////////////////////////////////////////////////////
//c3
void c3_Consumer()
{
int i,j,k;
char result,ch;
while(1){
WaitForSingleObject(hMutex,INFINITE);
if(count==0){//没有可以处理的数据
ReleaseMutex(hMutex);//释放互斥锁且等待
//等待直到缓冲区非空
WaitForSingleObject(hNotEmptyEvent,INFINITE);
}
else{if(Buffer[head]==0){
cout<<"Consumer 0:缓冲区的数据已全消费过一次,消费完毕!"<<endl;
ReleaseMutex(hMutex);//结束临界区
ExitThread(0);
}
else{//获得互斥锁且缓冲区有数据,开始处理
result=Buffer[head];
if(result>64&&result<70){
result=result+32;
cout<<"Consumer c3:(大写->小写)\t"<<result<<endl;
Buffer[head]='^';
cout<<"'^'表示数据已被消费"<<endl;
display(Buffer);}
if(result>96&&result<102){
result=result-32;
cout<<"Consumer c3:(小写->大写)\t"<<result<<endl;
Buffer[head]='^';
cout<<"'^'表示数据已被消费"<<endl;
display(Buffer);}
if(result>47&&result<58){
cout<<"Consumer c1:(显示字符)\t"<<result<<endl;
Buffer[head]='^';
cout<<"'^'表示数据已被消费"<<endl;
display(Buffer);
}
if(result>32&&result<48){
cout<<"Consumer c3:(用符号打印出菱形)"<<endl;
for(i=1;i<=(7+1)/2;i++)
{
for(j=1;j<=40-i;j++)
cout<<"";
for(k=1;k<=2*i-1;k++)
cout<<result;
cout<<endl;
}
for(i=1;i<=7/2;i++)
{
for(j=1;j<=40-(7+1)/2+i;j++)
cout<<"";
for(k=1;k<=7-2*i;k++)
cout<<result;
cout<<endl;
}
Buffer[head]='^';
cout<<"'^'表示数据已被消费"<<endl;
display(Buffer);
}
head=(head+1)%BufferSize;
count--;
cout<<"按ENTER继续...."<<endl;
ch=getchar();
ReleaseMutex(hMutex);//结束临界区
PulseEvent(hNotFullEvent);//唤醒生产者线程
}
}
}
}
//////////////////////////////////////////////////////////////////
//主函数
void main()
{
HANDLE hThreadVector[6];
DWORD ThreadID;
count= 0;
head= 0;
tail= 0;
hMutex=CreateMutex(NULL,FALSE,NULL);
hNotFullEvent=CreateEvent(NULL,TRUE,FALSE,NULL);
hNotEmptyEvent=CreateEvent(NULL,TRUE,FALSE,NULL);
hThreadVector[0]=CreateThread(NULL, 0,(LPTHREAD_START_ROUTINE) p1_Producer,NULL, 0,(LPDWORD)&ThreadID);
hThreadVector[1]=CreateThread(NULL, 0,(LPTHREAD_START_ROUTINE) c1_Consumer,NULL, 0,(LPDWORD)&ThreadID);
hThreadVector[3]=CreateThread(NULL, 0,(LPTHREAD_START_ROUTINE) p2_Producer,NULL, 0,(LPDWORD)&ThreadID);
hThreadVector[4]=CreateThread(NULL, 0,(LPTHREAD_START_ROUTINE) c2_Consumer,NULL, 0,(LPDWORD)&ThreadID);
hThreadVector[5]=CreateThread(NULL, 0,(LPTHREAD_START_ROUTINE) p3_Producer,NULL, 0,(LPDWORD)&ThreadID);
hThreadVector[5]=CreateThread(NULL, 0,(LPTHREAD_START_ROUTINE) c3_Consumer,NULL, 0,(LPDWORD)&ThreadID);
WaitForMultipleObjects(2,hThreadVector,TRUE,INFINITE);
//cout<<"**********************Finish*************************"<<endl;
}
我最近也在学操作系统,PV好麻烦的