生产者 消费者 linux(生产者消费者进程)

大家好,今天小编来为大家解答以下的问题,关于生产者 消费者 linux,生产者消费者进程这个很多人还不知道,现在让我们一起来看看吧!

kafka消费者如何在linux命令行后台执行

Kafka是一个开源流处理平台,用于实时数据处理,由Apache软件基金会开发,使用Scala和Java编写。它提供了一个统一、高吞吐、低延迟的处理实时数据的平台。Kafka的持久化层基于分布式事务日志架构,作为大规模发布/订阅消息队列,使其成为处理流失数据的企业级基础设施。本文将指导如何在Linux命令行后台通过Docker容器部署Kafka。

部署Kafka的过程分为以下几个步骤:

1.**基础环境准备**:

在Ubuntu 22.04.3 LTS虚拟机上安装Docker。检查Docker是否已安装,若未安装,使用命令安装Docker。确保Linux发行版支持Kafka部署。

2.**安装Zookeeper**:

Zookeeper是Kafka依赖的服务,为Kafka提供分布式协调服务。通过Docker拉取并安装Zookeeper集群。执行命令自动拉取Zookeeper镜像。

3.**安装Kafka**:

在成功安装Zookeeper后,使用Docker拉取并安装Kafka组件。根据服务器实际IP地址和自定义的Topic名称调整命令参数。

4.**进入容器并启动生产者和消费者**:

完成Kafka安装后,进入容器内部,启动生产者和消费者脚本。通过命令验证Kafka功能是否正常。在容器中执行生产者脚本,配置Topic名称;在新终端中执行消费者脚本,使用已建立的Topic名称。

5.**生产者与消费者测试**:

在生产者窗口连续输入信息,切换至消费者窗口查看接收情况。正常情况下,生产者发送的信息应能被消费者接收。

6.**故障排查**:

若在部署过程中遇到错误或问题,通过Docker日志进行故障排查。检查容器日志获取问题信息,定位问题所在。

部署Kafka的总体步骤如下:

-首先检查Docker是否正常安装。

-安装Kafka依赖服务Zookeeper。

-安装Kafka组件。

-在容器内启动生产者和消费者脚本。

-在部署过程中,通过Docker日志进行问题排查。

遵循以上步骤,可以顺利部署Kafka并在Linux命令行后台进行实时数据处理。

生产者消费者问题--进程

#i nclude<stdio.h>

#i nclude< iostream.h>

#i nclude< windows.h>

#define BufferSize 15

char Buffer[BufferSize];

int head,tail=0;//Buffer数组下标

int count;//被使用的缓冲区数量

HANDLE hMutex;

HANDLE hNotFullEvent, hNotEmptyEvent;//用来同步生产者和消费者线程

////////缓冲区存储情况

display(char a[15])

{

int i;

cout<<"缓冲区存储情况为:"<<endl;

for(i=14;i>=0;i--){

cout<<"\t|----"<<a<<"----|"<<endl;

}

}

//p1

void p1_Producer()

{

int i;

char ch;

char p1[]={'a','A','b','B','c','C','D','d','E','e'};

if(tail<15){

for(i=0;i<10;i++){

while(1){

WaitForSingleObject(hMutex,INFINITE);

if(count==BufferSize){//缓冲区满

ReleaseMutex(hMutex);

//等待直到缓冲区非满

WaitForSingleObject(hNotFullEvent,INFINITE);

continue;

}

//得到互斥锁且缓冲区非满,跳出while循环

break;

}

if(tail>14){

cout<<"缓冲区已满,不能再存入数据!"<<endl;

ReleaseMutex(hMutex);//结束临界区

PulseEvent(hNotEmptyEvent);//唤醒消费者线程

}

else{

//得到互斥锁且缓冲区非满,开始产生新数据

cout<<"Producer p1:\t"<<p1<<endl;

Buffer[tail]=p1;

//tail=(tail+1)%BufferSize;///存放于缓冲区的位置

display(Buffer);

tail++;

count++;

cout<<"按ENTER继续...."<<endl;

ch=getchar();

ReleaseMutex(hMutex);//结束临界区

PulseEvent(hNotEmptyEvent);//唤醒消费者线程

}

}

}

}

//////////////////////////////////////////////////////////////////

//p2

void p2_Producer()

{

int i;

char ch;

char p2[]={'0','1','2','3','4','5','6','7','8','9'};

if(tail<15){

for(i=0;i<10;i++){

while(1){

ch=getchar();

WaitForSingleObject(hMutex,INFINITE);

if(count==BufferSize){//缓冲区满

ReleaseMutex(hMutex);

//等待直到缓冲区非满

WaitForSingleObject(hNotFullEvent,INFINITE);

continue;

}

//得到互斥锁且缓冲区非满,跳出while循环

break;

}

if(tail>14){

cout<<"缓冲区已满,不能再存入数据!程序结束!"<<endl;

ReleaseMutex(hMutex);//结束临界区

PulseEvent(hNotEmptyEvent);//唤醒消费者线程

}

else{

//得到互斥锁且缓冲区非满,开始产生新数据

cout<<"Producer p2:\t"<<p2<<endl;

Buffer[tail]=p2;

//tail=(tail+1)%BufferSize;

display(Buffer);

tail++;

count++;

cout<<"按ENTER继续...."<<endl;

ch=getchar();

ReleaseMutex(hMutex);//结束临界区

PulseEvent(hNotEmptyEvent);//唤醒消费者线程

}

}

}

}

//////////////////////////////////////////////////////////////////

//p3

void p3_Producer()

{

int i;

char ch;

char p3[]={'!','#','$','%','&','*','+','-','.','/'};

if(tail<15){

for(i=0;i<10;i++){

while(1){

ch=getchar();

WaitForSingleObject(hMutex,INFINITE);

if(count==BufferSize){//缓冲区满

ReleaseMutex(hMutex);

//等待直到缓冲区非满

WaitForSingleObject(hNotFullEvent,INFINITE);

continue;

}

//得到互斥锁且缓冲区非满,跳出while循环

break;

}

if(tail>14){

cout<<"缓冲区已满,不能再存入数据!程序结束!"<<endl;

ReleaseMutex(hMutex);//结束临界区

PulseEvent(hNotEmptyEvent);//唤醒消费者线程

}

else{

//得到互斥锁且缓冲区非满,开始产生新数据

cout<<"Producer p3:\t"<<p3<<endl;

Buffer[tail]=p3;

//tail=(tail+1)%BufferSize;

display(Buffer);

tail++;

count++;

cout<<"按ENTER继续...."<<endl;

ch=getchar();

ReleaseMutex(hMutex);//结束临界区

PulseEvent(hNotEmptyEvent);//唤醒消费者线程

}

}

}

}

//////////////////////////////////////////////////////////////////

//c1

void c1_Consumer()

{

int i,j,k;

char result,ch;

while(1){

ch=getchar();

WaitForSingleObject(hMutex,INFINITE);

if(count==0){//没有可以处理的数据

ReleaseMutex(hMutex);//释放互斥锁且等待

//等待直到缓冲区非空

WaitForSingleObject(hNotEmptyEvent,INFINITE);

}

else{if(Buffer[head]==0){

cout<<"Consumer 0:缓冲区的数据已全消费过一次,消费完毕!"<<endl;

ReleaseMutex(hMutex);//结束临界区

ExitThread(0);

}

else{//获得互斥锁且缓冲区有数据,开始处理

result=Buffer[head];

if(result>64&&result<70){

result=result+32;

cout<<"Consumer c1:(大写->小写)\t"<<result<<endl;

Buffer[head]='^';//'^'表示数据已被消费

cout<<"'^'表示数据已被消费"<<endl;

display(Buffer);

}

if(result>96&&result<102){

result=result-32;

cout<<"Consumer c1:(小写->大写)\t"<<result<<endl;

Buffer[head]='^';

cout<<"'^'表示数据已被消费"<<endl;

display(Buffer);

}

if(result>47&&result<58){

cout<<"Consumer c1:(显示字符)\t"<<result<<endl;

Buffer[head]='^';

cout<<"'^'表示数据已被消费"<<endl;

display(Buffer);}

if(result>32&&result<48){

cout<<"Consumer c1:(用符号打印出菱形)"<<endl;

for(i=1;i<=(9+1)/2;i++)

{

for(j=1;j<=40-i;j++)

cout<<"";

for(k=1;k<=2*i-1;k++)

cout<<result;

cout<<endl;

}

for(i=1;i<=9/2;i++)

{

for(j=1;j<=40-(9+1)/2+i;j++)

cout<<"";

for(k=1;k<=9-2*i;k++)

cout<<result;

cout<<endl;

}

Buffer[head]='^';

cout<<"'^'表示数据已被消费"<<endl;

display(Buffer);

}

head=(head+1)%BufferSize;

count--;

cout<<"按ENTER继续...."<<endl;

ch=getchar();

ReleaseMutex(hMutex);//结束临界区

PulseEvent(hNotFullEvent);//唤醒生产者线程

}

}

}

}

//////////////////////////////////////////////////////////////////

//c2

void c2_Consumer()

{

int i,j,k;

char result,ch;

while(1){

WaitForSingleObject(hMutex,INFINITE);

if(count==0){//没有可以处理的数据

ReleaseMutex(hMutex);//释放互斥锁且等待

//等待直到缓冲区非空

WaitForSingleObject(hNotEmptyEvent,INFINITE);

}

else{if(Buffer[head]==0){

cout<<"Consumer 0:缓冲区的数据已全消费过一次,消费完毕!"<<endl;

ReleaseMutex(hMutex);//结束临界区

ExitThread(0);

}

else{//获得互斥锁且缓冲区有数据,开始处理

result=Buffer[head];

if(result>64&&result<90){

result=result+32;

cout<<"Consumer c2:(大写->小写)\t"<<result<<endl;

Buffer[head]='^';

cout<<"'^'表示数据已被消费"<<endl;

display(Buffer);

}

if(result>96&&result<102){

result=result-32;

cout<<"Consumer c2:(小写->大写)\t"<<result<<endl;

Buffer[head]='^';

cout<<"'^'表示数据已被消费"<<endl;

display(Buffer);}

if(result>47&&result<58){

cout<<"Consumed c2:(显示字符)\t"<<result<<endl;

Buffer[head]='^';

cout<<"'^'表示数据已被消费"<<endl;

display(Buffer);}

if(result>32&&result<48){

cout<<"Consumer c2:(用符号打印出菱形)"<<endl;

for(i=1;i<=(9+1)/2;i++)

{

for(j=1;j<=40-i;j++)

cout<<"";

for(k=1;k<=2*i-1;k++)

cout<<result;

cout<<endl;

}

for(i=1;i<=9/2;i++)

{

for(j=1;j<=40-(9+1)/2+i;j++)

cout<<"";

for(k=1;k<=9-2*i;k++)

cout<<result;

cout<<endl;

}

Buffer[head]='^';

cout<<"'^'表示数据已被消费"<<endl;

display(Buffer);

}

head=(head+1)%BufferSize;

count--;

cout<<"按ENTER继续...."<<endl;

ch=getchar();

ReleaseMutex(hMutex);//结束临界区

PulseEvent(hNotFullEvent);//唤醒生产者线程

}

}

}

}

//////////////////////////////////////////////////////////////////

//c3

void c3_Consumer()

{

int i,j,k;

char result,ch;

while(1){

WaitForSingleObject(hMutex,INFINITE);

if(count==0){//没有可以处理的数据

ReleaseMutex(hMutex);//释放互斥锁且等待

//等待直到缓冲区非空

WaitForSingleObject(hNotEmptyEvent,INFINITE);

}

else{if(Buffer[head]==0){

cout<<"Consumer 0:缓冲区的数据已全消费过一次,消费完毕!"<<endl;

ReleaseMutex(hMutex);//结束临界区

ExitThread(0);

}

else{//获得互斥锁且缓冲区有数据,开始处理

result=Buffer[head];

if(result>64&&result<70){

result=result+32;

cout<<"Consumer c3:(大写->小写)\t"<<result<<endl;

Buffer[head]='^';

cout<<"'^'表示数据已被消费"<<endl;

display(Buffer);}

if(result>96&&result<102){

result=result-32;

cout<<"Consumer c3:(小写->大写)\t"<<result<<endl;

Buffer[head]='^';

cout<<"'^'表示数据已被消费"<<endl;

display(Buffer);}

if(result>47&&result<58){

cout<<"Consumer c1:(显示字符)\t"<<result<<endl;

Buffer[head]='^';

cout<<"'^'表示数据已被消费"<<endl;

display(Buffer);

}

if(result>32&&result<48){

cout<<"Consumer c3:(用符号打印出菱形)"<<endl;

for(i=1;i<=(7+1)/2;i++)

{

for(j=1;j<=40-i;j++)

cout<<"";

for(k=1;k<=2*i-1;k++)

cout<<result;

cout<<endl;

}

for(i=1;i<=7/2;i++)

{

for(j=1;j<=40-(7+1)/2+i;j++)

cout<<"";

for(k=1;k<=7-2*i;k++)

cout<<result;

cout<<endl;

}

Buffer[head]='^';

cout<<"'^'表示数据已被消费"<<endl;

display(Buffer);

}

head=(head+1)%BufferSize;

count--;

cout<<"按ENTER继续...."<<endl;

ch=getchar();

ReleaseMutex(hMutex);//结束临界区

PulseEvent(hNotFullEvent);//唤醒生产者线程

}

}

}

}

//////////////////////////////////////////////////////////////////

//主函数

void main()

{

HANDLE hThreadVector[6];

DWORD ThreadID;

count= 0;

head= 0;

tail= 0;

hMutex=CreateMutex(NULL,FALSE,NULL);

hNotFullEvent=CreateEvent(NULL,TRUE,FALSE,NULL);

hNotEmptyEvent=CreateEvent(NULL,TRUE,FALSE,NULL);

hThreadVector[0]=CreateThread(NULL, 0,(LPTHREAD_START_ROUTINE) p1_Producer,NULL, 0,(LPDWORD)&ThreadID);

hThreadVector[1]=CreateThread(NULL, 0,(LPTHREAD_START_ROUTINE) c1_Consumer,NULL, 0,(LPDWORD)&ThreadID);

hThreadVector[3]=CreateThread(NULL, 0,(LPTHREAD_START_ROUTINE) p2_Producer,NULL, 0,(LPDWORD)&ThreadID);

hThreadVector[4]=CreateThread(NULL, 0,(LPTHREAD_START_ROUTINE) c2_Consumer,NULL, 0,(LPDWORD)&ThreadID);

hThreadVector[5]=CreateThread(NULL, 0,(LPTHREAD_START_ROUTINE) p3_Producer,NULL, 0,(LPDWORD)&ThreadID);

hThreadVector[5]=CreateThread(NULL, 0,(LPTHREAD_START_ROUTINE) c3_Consumer,NULL, 0,(LPDWORD)&ThreadID);

WaitForMultipleObjects(2,hThreadVector,TRUE,INFINITE);

//cout<<"**********************Finish*************************"<<endl;

}

我最近也在学操作系统,PV好麻烦的

进程的同步与互斥实验报告Linux

相交进程之间的关系主要有两种,同步与互斥。所谓互斥,是指散步在不同进程之间的若干程序片断,当某个进程运行其中一个程序片段时,其它进程就不能运行它们之中的任一程序片段,只能等到该进程运行完这个程序片段后才可以运行。所谓同步,是指散步在不同进程之间的若干程序片断,它们的运行必须严格按照规定的某种先后次序来运行,这种先后次序依赖于要完成的特定的任务。

显然,同步是一种更为复杂的互斥,而互斥是一种特殊的同步。

也就是说互斥是两个线程之间不可以同时运行,他们会相互排斥,必须等待一个线程运行完毕,另一个才能运行,而同步也是不能同时运行,但他是必须要安照某种次序来运行相应的线程(也是一种互斥)!

总结:互斥:是指某一资源同时只允许一个访问者对其进行访问,具有唯一性和排它性。但互斥无法限制访问者对资源的访问顺序,即访问是无序的。

同步:是指在互斥的基础上(大多数情况),通过其它机制实现访问者对资源的有序访问。在大多数情况下,同步已经实现了互斥,特别是所有写入资源的情况必定是互斥的。少数情况是指可以允许多个访问者同时访问资源

阅读剩余
THE END