linux下安装kafka?kali安装docker
linux shell脚本监控kafka,如果挂了自动重启
实现Kafka服务监控与自动重启的Linux shell脚本如下所示。此脚本适用于定时检查Kafka服务状态,并在服务未运行时自动启动服务。
脚本主要分为以下部分:
1.**定义变量**:脚本中定义了Kafka服务的安装路径(KAFKA_HOME)、日志文件路径(KAFKA_LOG)以及进程ID文件路径(KAFKA_PID_FILE)。
2.**检查Kafka服务**:`check_kafka`函数用于验证进程ID文件是否存在且进程是否仍在运行。如果服务未运行,则执行启动操作。
3.**启动Kafka服务**:`start_kafka`函数使用`nohup`命令在后台启动Kafka,同时将输出重定向至日志文件。启动成功后,脚本会记录进程ID。
4.**循环检查**:在主循环中,脚本每隔10秒执行一次`check_kafka`函数,确保Kafka服务始终运行。
注意:此脚本作为示例,根据具体环境可能需进行适当调整。确保在安全的环境下运行,避免执行可能造成数据丢失或其他不可逆操作的命令。
通过上述脚本,可实现Kafka服务的自动化监控与故障恢复,确保服务连续性,提升系统稳定性。
kafka消费者如何在linux命令行后台执行
Kafka是一个开源流处理平台,用于实时数据处理,由Apache软件基金会开发,使用Scala和Java编写。它提供了一个统一、高吞吐、低延迟的处理实时数据的平台。Kafka的持久化层基于分布式事务日志架构,作为大规模发布/订阅消息队列,使其成为处理流失数据的企业级基础设施。本文将指导如何在Linux命令行后台通过Docker容器部署Kafka。
部署Kafka的过程分为以下几个步骤:
1.**基础环境准备**:
在Ubuntu 22.04.3 LTS虚拟机上安装Docker。检查Docker是否已安装,若未安装,使用命令安装Docker。确保Linux发行版支持Kafka部署。
2.**安装Zookeeper**:
Zookeeper是Kafka依赖的服务,为Kafka提供分布式协调服务。通过Docker拉取并安装Zookeeper集群。执行命令自动拉取Zookeeper镜像。
3.**安装Kafka**:
在成功安装Zookeeper后,使用Docker拉取并安装Kafka组件。根据服务器实际IP地址和自定义的Topic名称调整命令参数。
4.**进入容器并启动生产者和消费者**:
完成Kafka安装后,进入容器内部,启动生产者和消费者脚本。通过命令验证Kafka功能是否正常。在容器中执行生产者脚本,配置Topic名称;在新终端中执行消费者脚本,使用已建立的Topic名称。
5.**生产者与消费者测试**:
在生产者窗口连续输入信息,切换至消费者窗口查看接收情况。正常情况下,生产者发送的信息应能被消费者接收。
6.**故障排查**:
若在部署过程中遇到错误或问题,通过Docker日志进行故障排查。检查容器日志获取问题信息,定位问题所在。
部署Kafka的总体步骤如下:
-首先检查Docker是否正常安装。
-安装Kafka依赖服务Zookeeper。
-安装Kafka组件。
-在容器内启动生产者和消费者脚本。
-在部署过程中,通过Docker日志进行问题排查。
遵循以上步骤,可以顺利部署Kafka并在Linux命令行后台进行实时数据处理。
kafka消息的管理
kafka producer将消息发送给broker后,消息日志会被存储在broker的磁盘上,采用顺序写入的方式。顺序写可以加快磁盘访问速度,并且可以将将多个小型的逻辑写合并成一次大型的物理磁盘写入,官方数据显示顺序写比随机写入快6000倍以上。另外,操作系统使用内存对磁盘进行缓存即pagecache,pagecache完全由操作系统管理,这也使得写数据变得即简洁也快速。
配置中可以调整过期时间,超过改时间的消息日志将移除,默认值为7天;也可配置文件大小阈值,达到该阈值后,从最旧消息开始删除。配置项为:
从文件到套接字的常见数据传输路径有4步:
1).操作系统从磁盘读取数据到内核空间的 pagecache
2).应用程序读取内核空间的数据到用户空间的缓冲区
3).应用程序将数据(用户空间的缓冲区)写回内核空间到套接字缓冲区(内核空间)
4).操作系统将数据从套接字缓冲区(内核空间)复制到通过网络发送的 NIC缓冲区
kafka使用 producer,broker和 consumer都共享的标准化的二进制消息格式,这样数据块不用修改就能在他们之间传递。kafka采用Linux中系统调用sendfile的方式,直接将数据从 pagecache转移到 socket网络连接中。这种零拷贝方式使得kafka数据传输更加高效。
以前面文章中安装的kafka为例: Mac安装kafka
kafka本地文件存储目录可以在配置文件server.properties中设置,参数及默认值为:
进入该目录,可以看到kafka保存的cosumer offset和topic消息:
其中__consumer_offsets开头的为消费的offset信息,test1开头的即为之前创建的topic“test1”,该topic有三个分区,分区编号从0开始,分别是test1-0、test1-1、test1-2。
进入test1-0,查看包含文件如下:
可以看到kafka消息按partition存储的,每个partition一个目录。partition下消息分段(segment)存储,默认每段最大1G,通过参数log.segment.bytes可配置。segment包含索引文件index、消息文件log,分别存储消息的索引和内容,以.index和.log结尾,文件命名为当前segment第一个消息offset。index文件在log每隔一定数据量之间建立索引,可以通过参数index.interval.bytes配置。
通过kafka命令查看00000000000000000000.index内容如下:
00000000000000000000.log内容如下:
其中索引文件中包含两个字段:(offset,position),分别表示消息offset和该消息在log文件的偏移量。如上图中offset=0的消息对应的position=0;对应的就是00000000000000000000.log中的第一条消息:
其中payload为具体的消息内容。
另外里面还有一个以".timeindex"结尾的文件,查看其内容:
该日志文件是kafka0.10.1.1加入的,其中保存的为:(消息时间戳,offset)。时间戳是该segment最后一个消息对应的时间戳(与log文件中最后一条记录时间戳一致),kafka也支持根据时间来读取消息。
由上可知消息是按partition来存储的,partition可以配置n个副本followers。多个partition和其follower在broker上是怎么分配的呢?
partition和broker都进行了排序,下标从0开始;
假设有k个broker,第i个partition被分配到到 i%k个broker上;
第i%k个broker即为partition i的leader,负责这个partition的读写;
partition的followers也进行排序,从leader的后续broker开始分配,第i个partition的第j个副本broker为(j+ i%k)%k。
一个有3个broker的kafka集群,包含3个partition,每个partition副本数为1的topic如下图:
总结:
kafka将消息日志采用顺序写入的方式存放在broker磁盘中;数据传输通过系统调用sendfile零拷贝方式;消息日志分段存放,可配置清除时间或大小阈值;每段包含消息索引、消息内容两个文件,通过索引实现快速查找;按照/topic/partition的目录结构分开存储,且均匀分布到集群各broker上。
参考: