linux生产者消费者,kafka生产者和消费者

大家好,今天小编来为大家解答以下的问题,关于linux生产者消费者,kafka生产者和消费者这个很多人还不知道,现在让我们一起来看看吧!

进程的同步与互斥实验报告Linux

相交进程之间的关系主要有两种,同步与互斥。所谓互斥,是指散步在不同进程之间的若干程序片断,当某个进程运行其中一个程序片段时,其它进程就不能运行它们之中的任一程序片段,只能等到该进程运行完这个程序片段后才可以运行。所谓同步,是指散步在不同进程之间的若干程序片断,它们的运行必须严格按照规定的某种先后次序来运行,这种先后次序依赖于要完成的特定的任务。

显然,同步是一种更为复杂的互斥,而互斥是一种特殊的同步。

也就是说互斥是两个线程之间不可以同时运行,他们会相互排斥,必须等待一个线程运行完毕,另一个才能运行,而同步也是不能同时运行,但他是必须要安照某种次序来运行相应的线程(也是一种互斥)!

总结:互斥:是指某一资源同时只允许一个访问者对其进行访问,具有唯一性和排它性。但互斥无法限制访问者对资源的访问顺序,即访问是无序的。

同步:是指在互斥的基础上(大多数情况),通过其它机制实现访问者对资源的有序访问。在大多数情况下,同步已经实现了互斥,特别是所有写入资源的情况必定是互斥的。少数情况是指可以允许多个访问者同时访问资源

kfifo(linux kernel 无锁队列)

队列作为常见数据结构,其主要特点是先进先出(FIFO)。FIFO用于缓冲通信速度不匹配场景,例如生产者快速生成数据,消费者则难以及时处理。在通信接口驱动中,数据被暂存于队列中,驱动程序可以立即返回接收新数据或等待,而解析程序仅需从队列中获取数据。FIFO也可解决“多生产者-单消费者”问题。

kfifo是Linux内核中实现队列功能的一种无锁环形队列。无锁意味着在单生产者单消费者场景下无需加锁操作。它通过使用in和out两个变量作为入队和出队索引来实现无锁操作,避免了在多个生产者或消费者场景下的加锁需求。通过将in/out与fifo的空间大小减一进行“与”操作,kfifo实现了比取余操作更快的队列循环。这意味着当in和out超过队列大小时,它们会继续向前加,而不是简单地减去队列大小后重新开始。

kfifo实现“无锁”特性仅针对“单生产者-单消费者”场景。对于“多生产者”或“多消费者”情况,需对入队或出队操作进行加锁以避免数据竞争。

使用kfifo有两种方式:动态申请和静态定义。动态申请包括包含头文件、定义结构体、申请内存、执行入队和出队操作,最后释放内存。静态定义则在定义fifo变量时使用宏,操作函数更加简洁,无需内存管理步骤。

kfifo结构体包含用于管理队列的变量,如in、out、mask等。内存申请过程确保了用mask大小掩码与in/out索引进行“与”操作,实现队列循环,避免了取余运算的开销。使用kfifo_alloc动态申请内存时,最终分配的内存空间是向上取2的次方,以支持mask大小掩码操作。这可能导致使用者在不了解规则的情况下踩坑,例如,申请100字节内存时实际上分配了128字节,可能导致数据错位。

入队操作涉及确定内存地址、拷贝数据并确保内存屏障以避免乱序异常。整个过程高效且简洁。对于更深入的了解,可查阅kfifo的源码。

生产者消费者问题--进程

#i nclude<stdio.h>

#i nclude< iostream.h>

#i nclude< windows.h>

#define BufferSize 15

char Buffer[BufferSize];

int head,tail=0;//Buffer数组下标

int count;//被使用的缓冲区数量

HANDLE hMutex;

HANDLE hNotFullEvent, hNotEmptyEvent;//用来同步生产者和消费者线程

////////缓冲区存储情况

display(char a[15])

{

int i;

cout<<"缓冲区存储情况为:"<<endl;

for(i=14;i>=0;i--){

cout<<"\t|----"<<a<<"----|"<<endl;

}

}

//p1

void p1_Producer()

{

int i;

char ch;

char p1[]={'a','A','b','B','c','C','D','d','E','e'};

if(tail<15){

for(i=0;i<10;i++){

while(1){

WaitForSingleObject(hMutex,INFINITE);

if(count==BufferSize){//缓冲区满

ReleaseMutex(hMutex);

//等待直到缓冲区非满

WaitForSingleObject(hNotFullEvent,INFINITE);

continue;

}

//得到互斥锁且缓冲区非满,跳出while循环

break;

}

if(tail>14){

cout<<"缓冲区已满,不能再存入数据!"<<endl;

ReleaseMutex(hMutex);//结束临界区

PulseEvent(hNotEmptyEvent);//唤醒消费者线程

}

else{

//得到互斥锁且缓冲区非满,开始产生新数据

cout<<"Producer p1:\t"<<p1<<endl;

Buffer[tail]=p1;

//tail=(tail+1)%BufferSize;///存放于缓冲区的位置

display(Buffer);

tail++;

count++;

cout<<"按ENTER继续...."<<endl;

ch=getchar();

ReleaseMutex(hMutex);//结束临界区

PulseEvent(hNotEmptyEvent);//唤醒消费者线程

}

}

}

}

//////////////////////////////////////////////////////////////////

//p2

void p2_Producer()

{

int i;

char ch;

char p2[]={'0','1','2','3','4','5','6','7','8','9'};

if(tail<15){

for(i=0;i<10;i++){

while(1){

ch=getchar();

WaitForSingleObject(hMutex,INFINITE);

if(count==BufferSize){//缓冲区满

ReleaseMutex(hMutex);

//等待直到缓冲区非满

WaitForSingleObject(hNotFullEvent,INFINITE);

continue;

}

//得到互斥锁且缓冲区非满,跳出while循环

break;

}

if(tail>14){

cout<<"缓冲区已满,不能再存入数据!程序结束!"<<endl;

ReleaseMutex(hMutex);//结束临界区

PulseEvent(hNotEmptyEvent);//唤醒消费者线程

}

else{

//得到互斥锁且缓冲区非满,开始产生新数据

cout<<"Producer p2:\t"<<p2<<endl;

Buffer[tail]=p2;

//tail=(tail+1)%BufferSize;

display(Buffer);

tail++;

count++;

cout<<"按ENTER继续...."<<endl;

ch=getchar();

ReleaseMutex(hMutex);//结束临界区

PulseEvent(hNotEmptyEvent);//唤醒消费者线程

}

}

}

}

//////////////////////////////////////////////////////////////////

//p3

void p3_Producer()

{

int i;

char ch;

char p3[]={'!','#','$','%','&','*','+','-','.','/'};

if(tail<15){

for(i=0;i<10;i++){

while(1){

ch=getchar();

WaitForSingleObject(hMutex,INFINITE);

if(count==BufferSize){//缓冲区满

ReleaseMutex(hMutex);

//等待直到缓冲区非满

WaitForSingleObject(hNotFullEvent,INFINITE);

continue;

}

//得到互斥锁且缓冲区非满,跳出while循环

break;

}

if(tail>14){

cout<<"缓冲区已满,不能再存入数据!程序结束!"<<endl;

ReleaseMutex(hMutex);//结束临界区

PulseEvent(hNotEmptyEvent);//唤醒消费者线程

}

else{

//得到互斥锁且缓冲区非满,开始产生新数据

cout<<"Producer p3:\t"<<p3<<endl;

Buffer[tail]=p3;

//tail=(tail+1)%BufferSize;

display(Buffer);

tail++;

count++;

cout<<"按ENTER继续...."<<endl;

ch=getchar();

ReleaseMutex(hMutex);//结束临界区

PulseEvent(hNotEmptyEvent);//唤醒消费者线程

}

}

}

}

//////////////////////////////////////////////////////////////////

//c1

void c1_Consumer()

{

int i,j,k;

char result,ch;

while(1){

ch=getchar();

WaitForSingleObject(hMutex,INFINITE);

if(count==0){//没有可以处理的数据

ReleaseMutex(hMutex);//释放互斥锁且等待

//等待直到缓冲区非空

WaitForSingleObject(hNotEmptyEvent,INFINITE);

}

else{if(Buffer[head]==0){

cout<<"Consumer 0:缓冲区的数据已全消费过一次,消费完毕!"<<endl;

ReleaseMutex(hMutex);//结束临界区

ExitThread(0);

}

else{//获得互斥锁且缓冲区有数据,开始处理

result=Buffer[head];

if(result>64&&result<70){

result=result+32;

cout<<"Consumer c1:(大写->小写)\t"<<result<<endl;

Buffer[head]='^';//'^'表示数据已被消费

cout<<"'^'表示数据已被消费"<<endl;

display(Buffer);

}

if(result>96&&result<102){

result=result-32;

cout<<"Consumer c1:(小写->大写)\t"<<result<<endl;

Buffer[head]='^';

cout<<"'^'表示数据已被消费"<<endl;

display(Buffer);

}

if(result>47&&result<58){

cout<<"Consumer c1:(显示字符)\t"<<result<<endl;

Buffer[head]='^';

cout<<"'^'表示数据已被消费"<<endl;

display(Buffer);}

if(result>32&&result<48){

cout<<"Consumer c1:(用符号打印出菱形)"<<endl;

for(i=1;i<=(9+1)/2;i++)

{

for(j=1;j<=40-i;j++)

cout<<"";

for(k=1;k<=2*i-1;k++)

cout<<result;

cout<<endl;

}

for(i=1;i<=9/2;i++)

{

for(j=1;j<=40-(9+1)/2+i;j++)

cout<<"";

for(k=1;k<=9-2*i;k++)

cout<<result;

cout<<endl;

}

Buffer[head]='^';

cout<<"'^'表示数据已被消费"<<endl;

display(Buffer);

}

head=(head+1)%BufferSize;

count--;

cout<<"按ENTER继续...."<<endl;

ch=getchar();

ReleaseMutex(hMutex);//结束临界区

PulseEvent(hNotFullEvent);//唤醒生产者线程

}

}

}

}

//////////////////////////////////////////////////////////////////

//c2

void c2_Consumer()

{

int i,j,k;

char result,ch;

while(1){

WaitForSingleObject(hMutex,INFINITE);

if(count==0){//没有可以处理的数据

ReleaseMutex(hMutex);//释放互斥锁且等待

//等待直到缓冲区非空

WaitForSingleObject(hNotEmptyEvent,INFINITE);

}

else{if(Buffer[head]==0){

cout<<"Consumer 0:缓冲区的数据已全消费过一次,消费完毕!"<<endl;

ReleaseMutex(hMutex);//结束临界区

ExitThread(0);

}

else{//获得互斥锁且缓冲区有数据,开始处理

result=Buffer[head];

if(result>64&&result<90){

result=result+32;

cout<<"Consumer c2:(大写->小写)\t"<<result<<endl;

Buffer[head]='^';

cout<<"'^'表示数据已被消费"<<endl;

display(Buffer);

}

if(result>96&&result<102){

result=result-32;

cout<<"Consumer c2:(小写->大写)\t"<<result<<endl;

Buffer[head]='^';

cout<<"'^'表示数据已被消费"<<endl;

display(Buffer);}

if(result>47&&result<58){

cout<<"Consumed c2:(显示字符)\t"<<result<<endl;

Buffer[head]='^';

cout<<"'^'表示数据已被消费"<<endl;

display(Buffer);}

if(result>32&&result<48){

cout<<"Consumer c2:(用符号打印出菱形)"<<endl;

for(i=1;i<=(9+1)/2;i++)

{

for(j=1;j<=40-i;j++)

cout<<"";

for(k=1;k<=2*i-1;k++)

cout<<result;

cout<<endl;

}

for(i=1;i<=9/2;i++)

{

for(j=1;j<=40-(9+1)/2+i;j++)

cout<<"";

for(k=1;k<=9-2*i;k++)

cout<<result;

cout<<endl;

}

Buffer[head]='^';

cout<<"'^'表示数据已被消费"<<endl;

display(Buffer);

}

head=(head+1)%BufferSize;

count--;

cout<<"按ENTER继续...."<<endl;

ch=getchar();

ReleaseMutex(hMutex);//结束临界区

PulseEvent(hNotFullEvent);//唤醒生产者线程

}

}

}

}

//////////////////////////////////////////////////////////////////

//c3

void c3_Consumer()

{

int i,j,k;

char result,ch;

while(1){

WaitForSingleObject(hMutex,INFINITE);

if(count==0){//没有可以处理的数据

ReleaseMutex(hMutex);//释放互斥锁且等待

//等待直到缓冲区非空

WaitForSingleObject(hNotEmptyEvent,INFINITE);

}

else{if(Buffer[head]==0){

cout<<"Consumer 0:缓冲区的数据已全消费过一次,消费完毕!"<<endl;

ReleaseMutex(hMutex);//结束临界区

ExitThread(0);

}

else{//获得互斥锁且缓冲区有数据,开始处理

result=Buffer[head];

if(result>64&&result<70){

result=result+32;

cout<<"Consumer c3:(大写->小写)\t"<<result<<endl;

Buffer[head]='^';

cout<<"'^'表示数据已被消费"<<endl;

display(Buffer);}

if(result>96&&result<102){

result=result-32;

cout<<"Consumer c3:(小写->大写)\t"<<result<<endl;

Buffer[head]='^';

cout<<"'^'表示数据已被消费"<<endl;

display(Buffer);}

if(result>47&&result<58){

cout<<"Consumer c1:(显示字符)\t"<<result<<endl;

Buffer[head]='^';

cout<<"'^'表示数据已被消费"<<endl;

display(Buffer);

}

if(result>32&&result<48){

cout<<"Consumer c3:(用符号打印出菱形)"<<endl;

for(i=1;i<=(7+1)/2;i++)

{

for(j=1;j<=40-i;j++)

cout<<"";

for(k=1;k<=2*i-1;k++)

cout<<result;

cout<<endl;

}

for(i=1;i<=7/2;i++)

{

for(j=1;j<=40-(7+1)/2+i;j++)

cout<<"";

for(k=1;k<=7-2*i;k++)

cout<<result;

cout<<endl;

}

Buffer[head]='^';

cout<<"'^'表示数据已被消费"<<endl;

display(Buffer);

}

head=(head+1)%BufferSize;

count--;

cout<<"按ENTER继续...."<<endl;

ch=getchar();

ReleaseMutex(hMutex);//结束临界区

PulseEvent(hNotFullEvent);//唤醒生产者线程

}

}

}

}

//////////////////////////////////////////////////////////////////

//主函数

void main()

{

HANDLE hThreadVector[6];

DWORD ThreadID;

count= 0;

head= 0;

tail= 0;

hMutex=CreateMutex(NULL,FALSE,NULL);

hNotFullEvent=CreateEvent(NULL,TRUE,FALSE,NULL);

hNotEmptyEvent=CreateEvent(NULL,TRUE,FALSE,NULL);

hThreadVector[0]=CreateThread(NULL, 0,(LPTHREAD_START_ROUTINE) p1_Producer,NULL, 0,(LPDWORD)&ThreadID);

hThreadVector[1]=CreateThread(NULL, 0,(LPTHREAD_START_ROUTINE) c1_Consumer,NULL, 0,(LPDWORD)&ThreadID);

hThreadVector[3]=CreateThread(NULL, 0,(LPTHREAD_START_ROUTINE) p2_Producer,NULL, 0,(LPDWORD)&ThreadID);

hThreadVector[4]=CreateThread(NULL, 0,(LPTHREAD_START_ROUTINE) c2_Consumer,NULL, 0,(LPDWORD)&ThreadID);

hThreadVector[5]=CreateThread(NULL, 0,(LPTHREAD_START_ROUTINE) p3_Producer,NULL, 0,(LPDWORD)&ThreadID);

hThreadVector[5]=CreateThread(NULL, 0,(LPTHREAD_START_ROUTINE) c3_Consumer,NULL, 0,(LPDWORD)&ThreadID);

WaitForMultipleObjects(2,hThreadVector,TRUE,INFINITE);

//cout<<"**********************Finish*************************"<<endl;

}

我最近也在学操作系统,PV好麻烦的

阅读剩余
THE END