生产者 消费者 linux(生产者消费者进程)
大家好,今天小编来为大家解答以下的问题,关于生产者 消费者 linux,生产者消费者进程这个很多人还不知道,现在让我们一起来看看吧!
kafka消费者如何在linux命令行后台执行
Kafka是一个开源流处理平台,用于实时数据处理,由Apache软件基金会开发,使用Scala和Java编写。它提供了一个统一、高吞吐、低延迟的处理实时数据的平台。Kafka的持久化层基于分布式事务日志架构,作为大规模发布/订阅消息队列,使其成为处理流失数据的企业级基础设施。本文将指导如何在Linux命令行后台通过Docker容器部署Kafka。
部署Kafka的过程分为以下几个步骤:
1.**基础环境准备**:
在Ubuntu 22.04.3 LTS虚拟机上安装Docker。检查Docker是否已安装,若未安装,使用命令安装Docker。确保Linux发行版支持Kafka部署。
2.**安装Zookeeper**:
Zookeeper是Kafka依赖的服务,为Kafka提供分布式协调服务。通过Docker拉取并安装Zookeeper集群。执行命令自动拉取Zookeeper镜像。
3.**安装Kafka**:
在成功安装Zookeeper后,使用Docker拉取并安装Kafka组件。根据服务器实际IP地址和自定义的Topic名称调整命令参数。
4.**进入容器并启动生产者和消费者**:
完成Kafka安装后,进入容器内部,启动生产者和消费者脚本。通过命令验证Kafka功能是否正常。在容器中执行生产者脚本,配置Topic名称;在新终端中执行消费者脚本,使用已建立的Topic名称。
5.**生产者与消费者测试**:
在生产者窗口连续输入信息,切换至消费者窗口查看接收情况。正常情况下,生产者发送的信息应能被消费者接收。
6.**故障排查**:
若在部署过程中遇到错误或问题,通过Docker日志进行故障排查。检查容器日志获取问题信息,定位问题所在。
部署Kafka的总体步骤如下:
-首先检查Docker是否正常安装。
-安装Kafka依赖服务Zookeeper。
-安装Kafka组件。
-在容器内启动生产者和消费者脚本。
-在部署过程中,通过Docker日志进行问题排查。
遵循以上步骤,可以顺利部署Kafka并在Linux命令行后台进行实时数据处理。
生产者消费者问题--进程
#i nclude<stdio.h>
#i nclude< iostream.h>
#i nclude< windows.h>
#define BufferSize 15
char Buffer[BufferSize];
int head,tail=0;//Buffer数组下标
int count;//被使用的缓冲区数量
HANDLE hMutex;
HANDLE hNotFullEvent, hNotEmptyEvent;//用来同步生产者和消费者线程
////////缓冲区存储情况
display(char a[15])
{
int i;
cout<<"缓冲区存储情况为:"<<endl;
for(i=14;i>=0;i--){
cout<<"\t|----"<<a<<"----|"<<endl;
}
}
//p1
void p1_Producer()
{
int i;
char ch;
char p1[]={'a','A','b','B','c','C','D','d','E','e'};
if(tail<15){
for(i=0;i<10;i++){
while(1){
WaitForSingleObject(hMutex,INFINITE);
if(count==BufferSize){//缓冲区满
ReleaseMutex(hMutex);
//等待直到缓冲区非满
WaitForSingleObject(hNotFullEvent,INFINITE);
continue;
}
//得到互斥锁且缓冲区非满,跳出while循环
break;
}
if(tail>14){
cout<<"缓冲区已满,不能再存入数据!"<<endl;
ReleaseMutex(hMutex);//结束临界区
PulseEvent(hNotEmptyEvent);//唤醒消费者线程
}
else{
//得到互斥锁且缓冲区非满,开始产生新数据
cout<<"Producer p1:\t"<<p1<<endl;
Buffer[tail]=p1;
//tail=(tail+1)%BufferSize;///存放于缓冲区的位置
display(Buffer);
tail++;
count++;
cout<<"按ENTER继续...."<<endl;
ch=getchar();
ReleaseMutex(hMutex);//结束临界区
PulseEvent(hNotEmptyEvent);//唤醒消费者线程
}
}
}
}
//////////////////////////////////////////////////////////////////
//p2
void p2_Producer()
{
int i;
char ch;
char p2[]={'0','1','2','3','4','5','6','7','8','9'};
if(tail<15){
for(i=0;i<10;i++){
while(1){
ch=getchar();
WaitForSingleObject(hMutex,INFINITE);
if(count==BufferSize){//缓冲区满
ReleaseMutex(hMutex);
//等待直到缓冲区非满
WaitForSingleObject(hNotFullEvent,INFINITE);
continue;
}
//得到互斥锁且缓冲区非满,跳出while循环
break;
}
if(tail>14){
cout<<"缓冲区已满,不能再存入数据!程序结束!"<<endl;
ReleaseMutex(hMutex);//结束临界区
PulseEvent(hNotEmptyEvent);//唤醒消费者线程
}
else{
//得到互斥锁且缓冲区非满,开始产生新数据
cout<<"Producer p2:\t"<<p2<<endl;
Buffer[tail]=p2;
//tail=(tail+1)%BufferSize;
display(Buffer);
tail++;
count++;
cout<<"按ENTER继续...."<<endl;
ch=getchar();
ReleaseMutex(hMutex);//结束临界区
PulseEvent(hNotEmptyEvent);//唤醒消费者线程
}
}
}
}
//////////////////////////////////////////////////////////////////
//p3
void p3_Producer()
{
int i;
char ch;
char p3[]={'!','#','$','%','&','*','+','-','.','/'};
if(tail<15){
for(i=0;i<10;i++){
while(1){
ch=getchar();
WaitForSingleObject(hMutex,INFINITE);
if(count==BufferSize){//缓冲区满
ReleaseMutex(hMutex);
//等待直到缓冲区非满
WaitForSingleObject(hNotFullEvent,INFINITE);
continue;
}
//得到互斥锁且缓冲区非满,跳出while循环
break;
}
if(tail>14){
cout<<"缓冲区已满,不能再存入数据!程序结束!"<<endl;
ReleaseMutex(hMutex);//结束临界区
PulseEvent(hNotEmptyEvent);//唤醒消费者线程
}
else{
//得到互斥锁且缓冲区非满,开始产生新数据
cout<<"Producer p3:\t"<<p3<<endl;
Buffer[tail]=p3;
//tail=(tail+1)%BufferSize;
display(Buffer);
tail++;
count++;
cout<<"按ENTER继续...."<<endl;
ch=getchar();
ReleaseMutex(hMutex);//结束临界区
PulseEvent(hNotEmptyEvent);//唤醒消费者线程
}
}
}
}
//////////////////////////////////////////////////////////////////
//c1
void c1_Consumer()
{
int i,j,k;
char result,ch;
while(1){
ch=getchar();
WaitForSingleObject(hMutex,INFINITE);
if(count==0){//没有可以处理的数据
ReleaseMutex(hMutex);//释放互斥锁且等待
//等待直到缓冲区非空
WaitForSingleObject(hNotEmptyEvent,INFINITE);
}
else{if(Buffer[head]==0){
cout<<"Consumer 0:缓冲区的数据已全消费过一次,消费完毕!"<<endl;
ReleaseMutex(hMutex);//结束临界区
ExitThread(0);
}
else{//获得互斥锁且缓冲区有数据,开始处理
result=Buffer[head];
if(result>64&&result<70){
result=result+32;
cout<<"Consumer c1:(大写->小写)\t"<<result<<endl;
Buffer[head]='^';//'^'表示数据已被消费
cout<<"'^'表示数据已被消费"<<endl;
display(Buffer);
}
if(result>96&&result<102){
result=result-32;
cout<<"Consumer c1:(小写->大写)\t"<<result<<endl;
Buffer[head]='^';
cout<<"'^'表示数据已被消费"<<endl;
display(Buffer);
}
if(result>47&&result<58){
cout<<"Consumer c1:(显示字符)\t"<<result<<endl;
Buffer[head]='^';
cout<<"'^'表示数据已被消费"<<endl;
display(Buffer);}
if(result>32&&result<48){
cout<<"Consumer c1:(用符号打印出菱形)"<<endl;
for(i=1;i<=(9+1)/2;i++)
{
for(j=1;j<=40-i;j++)
cout<<"";
for(k=1;k<=2*i-1;k++)
cout<<result;
cout<<endl;
}
for(i=1;i<=9/2;i++)
{
for(j=1;j<=40-(9+1)/2+i;j++)
cout<<"";
for(k=1;k<=9-2*i;k++)
cout<<result;
cout<<endl;
}
Buffer[head]='^';
cout<<"'^'表示数据已被消费"<<endl;
display(Buffer);
}
head=(head+1)%BufferSize;
count--;
cout<<"按ENTER继续...."<<endl;
ch=getchar();
ReleaseMutex(hMutex);//结束临界区
PulseEvent(hNotFullEvent);//唤醒生产者线程
}
}
}
}
//////////////////////////////////////////////////////////////////
//c2
void c2_Consumer()
{
int i,j,k;
char result,ch;
while(1){
WaitForSingleObject(hMutex,INFINITE);
if(count==0){//没有可以处理的数据
ReleaseMutex(hMutex);//释放互斥锁且等待
//等待直到缓冲区非空
WaitForSingleObject(hNotEmptyEvent,INFINITE);
}
else{if(Buffer[head]==0){
cout<<"Consumer 0:缓冲区的数据已全消费过一次,消费完毕!"<<endl;
ReleaseMutex(hMutex);//结束临界区
ExitThread(0);
}
else{//获得互斥锁且缓冲区有数据,开始处理
result=Buffer[head];
if(result>64&&result<90){
result=result+32;
cout<<"Consumer c2:(大写->小写)\t"<<result<<endl;
Buffer[head]='^';
cout<<"'^'表示数据已被消费"<<endl;
display(Buffer);
}
if(result>96&&result<102){
result=result-32;
cout<<"Consumer c2:(小写->大写)\t"<<result<<endl;
Buffer[head]='^';
cout<<"'^'表示数据已被消费"<<endl;
display(Buffer);}
if(result>47&&result<58){
cout<<"Consumed c2:(显示字符)\t"<<result<<endl;
Buffer[head]='^';
cout<<"'^'表示数据已被消费"<<endl;
display(Buffer);}
if(result>32&&result<48){
cout<<"Consumer c2:(用符号打印出菱形)"<<endl;
for(i=1;i<=(9+1)/2;i++)
{
for(j=1;j<=40-i;j++)
cout<<"";
for(k=1;k<=2*i-1;k++)
cout<<result;
cout<<endl;
}
for(i=1;i<=9/2;i++)
{
for(j=1;j<=40-(9+1)/2+i;j++)
cout<<"";
for(k=1;k<=9-2*i;k++)
cout<<result;
cout<<endl;
}
Buffer[head]='^';
cout<<"'^'表示数据已被消费"<<endl;
display(Buffer);
}
head=(head+1)%BufferSize;
count--;
cout<<"按ENTER继续...."<<endl;
ch=getchar();
ReleaseMutex(hMutex);//结束临界区
PulseEvent(hNotFullEvent);//唤醒生产者线程
}
}
}
}
//////////////////////////////////////////////////////////////////
//c3
void c3_Consumer()
{
int i,j,k;
char result,ch;
while(1){
WaitForSingleObject(hMutex,INFINITE);
if(count==0){//没有可以处理的数据
ReleaseMutex(hMutex);//释放互斥锁且等待
//等待直到缓冲区非空
WaitForSingleObject(hNotEmptyEvent,INFINITE);
}
else{if(Buffer[head]==0){
cout<<"Consumer 0:缓冲区的数据已全消费过一次,消费完毕!"<<endl;
ReleaseMutex(hMutex);//结束临界区
ExitThread(0);
}
else{//获得互斥锁且缓冲区有数据,开始处理
result=Buffer[head];
if(result>64&&result<70){
result=result+32;
cout<<"Consumer c3:(大写->小写)\t"<<result<<endl;
Buffer[head]='^';
cout<<"'^'表示数据已被消费"<<endl;
display(Buffer);}
if(result>96&&result<102){
result=result-32;
cout<<"Consumer c3:(小写->大写)\t"<<result<<endl;
Buffer[head]='^';
cout<<"'^'表示数据已被消费"<<endl;
display(Buffer);}
if(result>47&&result<58){
cout<<"Consumer c1:(显示字符)\t"<<result<<endl;
Buffer[head]='^';
cout<<"'^'表示数据已被消费"<<endl;
display(Buffer);
}
if(result>32&&result<48){
cout<<"Consumer c3:(用符号打印出菱形)"<<endl;
for(i=1;i<=(7+1)/2;i++)
{
for(j=1;j<=40-i;j++)
cout<<"";
for(k=1;k<=2*i-1;k++)
cout<<result;
cout<<endl;
}
for(i=1;i<=7/2;i++)
{
for(j=1;j<=40-(7+1)/2+i;j++)
cout<<"";
for(k=1;k<=7-2*i;k++)
cout<<result;
cout<<endl;
}
Buffer[head]='^';
cout<<"'^'表示数据已被消费"<<endl;
display(Buffer);
}
head=(head+1)%BufferSize;
count--;
cout<<"按ENTER继续...."<<endl;
ch=getchar();
ReleaseMutex(hMutex);//结束临界区
PulseEvent(hNotFullEvent);//唤醒生产者线程
}
}
}
}
//////////////////////////////////////////////////////////////////
//主函数
void main()
{
HANDLE hThreadVector[6];
DWORD ThreadID;
count= 0;
head= 0;
tail= 0;
hMutex=CreateMutex(NULL,FALSE,NULL);
hNotFullEvent=CreateEvent(NULL,TRUE,FALSE,NULL);
hNotEmptyEvent=CreateEvent(NULL,TRUE,FALSE,NULL);
hThreadVector[0]=CreateThread(NULL, 0,(LPTHREAD_START_ROUTINE) p1_Producer,NULL, 0,(LPDWORD)&ThreadID);
hThreadVector[1]=CreateThread(NULL, 0,(LPTHREAD_START_ROUTINE) c1_Consumer,NULL, 0,(LPDWORD)&ThreadID);
hThreadVector[3]=CreateThread(NULL, 0,(LPTHREAD_START_ROUTINE) p2_Producer,NULL, 0,(LPDWORD)&ThreadID);
hThreadVector[4]=CreateThread(NULL, 0,(LPTHREAD_START_ROUTINE) c2_Consumer,NULL, 0,(LPDWORD)&ThreadID);
hThreadVector[5]=CreateThread(NULL, 0,(LPTHREAD_START_ROUTINE) p3_Producer,NULL, 0,(LPDWORD)&ThreadID);
hThreadVector[5]=CreateThread(NULL, 0,(LPTHREAD_START_ROUTINE) c3_Consumer,NULL, 0,(LPDWORD)&ThreadID);
WaitForMultipleObjects(2,hThreadVector,TRUE,INFINITE);
//cout<<"**********************Finish*************************"<<endl;
}
我最近也在学操作系统,PV好麻烦的
进程的同步与互斥实验报告Linux
相交进程之间的关系主要有两种,同步与互斥。所谓互斥,是指散步在不同进程之间的若干程序片断,当某个进程运行其中一个程序片段时,其它进程就不能运行它们之中的任一程序片段,只能等到该进程运行完这个程序片段后才可以运行。所谓同步,是指散步在不同进程之间的若干程序片断,它们的运行必须严格按照规定的某种先后次序来运行,这种先后次序依赖于要完成的特定的任务。
显然,同步是一种更为复杂的互斥,而互斥是一种特殊的同步。
也就是说互斥是两个线程之间不可以同时运行,他们会相互排斥,必须等待一个线程运行完毕,另一个才能运行,而同步也是不能同时运行,但他是必须要安照某种次序来运行相应的线程(也是一种互斥)!
总结:互斥:是指某一资源同时只允许一个访问者对其进行访问,具有唯一性和排它性。但互斥无法限制访问者对资源的访问顺序,即访问是无序的。
同步:是指在互斥的基础上(大多数情况),通过其它机制实现访问者对资源的有序访问。在大多数情况下,同步已经实现了互斥,特别是所有写入资源的情况必定是互斥的。少数情况是指可以允许多个访问者同时访问资源