Kafka作为一个支持大数据量写入写出的消息队列,由于是基于Scala和Java实现的,而Scala和Java均需要在JVM上运行,所以如果是基于内存的方式,即JVM的堆来进行数据存储则需要开辟很大的堆来支持数据读写,从而会导致GC频繁影响性能。考虑到这些因素,kafka是使用磁盘而不是kafka服务器broker进程内存来进行数据存储,并且基于磁盘顺序读写和MMAP技术来实现高性能。
Kafka一个topic下可以存在很多个分区,不考虑分区副本的情况下。一个分区对应一个日志(Log) 。为了防止Log过大,Kafka又引入了日志分段(LogSegment)的概念,将Log切分为多个LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件,这样也便于消息的维护和清理。事实上,Log和LogSegment对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如:以".txnindex"为后缀的事务索引文件)。如下图所示:
Log对应了一个命名形式为-的文件夹。举个例子,假设有一个名为"itheima"的主题,此主题中具有4个分
区,那么在实际的物理存储上表现为"itheima-0" ,“itheima-1” , “itheima-2” , “itheima-3"这4个文件夹(/tmp/kafka-logs)。
向Log中追加消息时是顺序写入的,**只有最后一个LogSegment才能执行写入操作,在此之前所有的LogSegment都不能写入数据。**为了方便描述,我们将最后一个LogSegment为"activeSegment”,即表示当前活跃的日志分段。每当segment文件达到一定的大小,则会创建一个新的segment文件(具体大小在server.properties中通过配置log.segment.bytes=1073741824:默认为1G)。之后追加的消息将写入新的activeSegment。文件名是以该文件的第一个数据相对于该分区的全局offset命名的。名称固定为20位数字,没有达到的位数则用0填充。比如第一个LogSegment的基准偏移量为0,对应的日志文件的名称为00000000000000000000.log。
存储器的两种输入输出方式:
1、随机读写
2、顺序读写
顺序读写 磁盘顺序读或写的速度400M/s,能够发挥磁盘最大的速度。 随机读写,磁盘速度慢的时候十几到几百K/s。这就看出了差距。
随机读写:存储的数据在磁盘中占据空间,对于一个新磁盘,操作系统会将数据文件依次写入磁盘,当有些数据被删除时,就会空出该数据原来占有的存储空间,时间长了,不断的写入、删除数据,就会产生很多零零散散的存储空间,就会造成一个较大的数据文件放在许多不连续的存贮空间上,读写些这部分数据时,就是随机读写,磁头要不断的调整磁道的位置,以在不同位置上的读写数据,相对于连续空间上的顺序读写,要耗时很多。
如下图所示: 当删除某些数据的时候,就会产生很多不连续的磁盘存储空间(磁盘碎片)
新文件的存储:
**顺序读写:**就是在文件的末尾以追加的方式来写入消息,在Kafka中只能在日志文件的末尾追加新的消息,并且也不允许修改和删除已写入的消息。
如下图所示:
这种方法有一个缺陷—— 没有办法删除数据 ,所以Kafka是不会删除数据的,它会把所有的数据都保留下来,消费者在消费消息的时候是通过"消费位移"(offset)来决定从哪个位置开始进行消息的消费。
顺序写入是Kafka高吞吐量的一个原因,当然即使采用的是磁盘的顺序写入,那么也是没有办法和内存相比的。因为为了再一次提高Kakfa的吞吐量,Kafka采用了Memory Mapped Files(后面简称 mmap)也被翻译成内存映射文件 ,它的工作原理是直接利用操作系统的 page cache 来实现文件到物理内存的直接映射,完成映射之后你对物理内存的操作会被同步到硬盘上(操作系统在适当的时候)。
操作系统本身有一层缓存,叫做page cache,是在内存里的缓存,我们也可以称之为os cache,意思就是操作系统自己管理的缓存。你在写入磁盘文件的时候,可以直接写入这个os cache里,也就是仅仅写入内存中,接下来由操作系统自己决定什么时候把os cache里的数据真的刷入磁盘文件中(每5秒检查一次是否需要将页缓存数据同步到磁盘文件)。
仅仅这一个步骤,就可以将磁盘文件写性能提升很多了,因为其实这里相当于是在写内存,不是在写磁盘,大家看下图:
所以大家就知道了,上面那个图里,Kafka在写数据的时候,一方面基于了os层面的page cache来写数据,所以性能很高,本质就是在写内存罢了。另外一个,他是采用磁盘顺序写的方式,所以即使数据刷入磁盘的时候,性能也是极高的,也跟写内存是差不多的。基于上面两点,kafka就实现了写入数据的超高性能。
Kafka将消息存储在磁盘中,为了控制磁盘占用空间的不断增加需要对消息做一定的清理操作。Kafka中每一个分区副本都对应了一个Log,而Log又可以分为多个日志分段,这样也便于日志的清理操作。Kafka提供了两种日志清理的策略:
1、日志删除(Log Retention):按照一定的保留策略直接删除不符合条件的日志分段。
2、日志压缩(Log Compaction):针对每个消息的Key进行整合,对于有相同key的不同value值,只保留最后一个版本。
我们可以通过broker端参数:log.cleanup.policy来设置日志清理策略,此参数的默认值为"delete"(可以在kafka启动时打印的日志中查看到),即采用日志删除的清理策略。如果要采用日志压缩的清理策略,就需要将log.cleanup.policy设置为"compact",并且还需要将log.cleaner.enable的值设置为true。通过将log.cleanup.policy参数设置为"delete,compact",还可以同时支持日志删除和日志压缩两种策略。最常见的日志清理操作是日志删除。
在Kafka的日志管理器中会有一个专门的日志删除任务来周期性地检测和删除不符合保留条件的日志分段文件,这个周期可以通过broker端参数log.retention.check.interval.ms来配置,默认值为300000,即5分钟。当前日志分段的常见的保留策略有2种:
1、基于时间的保留策略
2、基于日志大小的保留策略
基于时间:
日志删除任务会检查当前日志文件中是否有保留时间超过设定的阈值来寻找可删除的日志分段文件集合。日志分段保留时间的阈值可以通过如下参数进行设置:
log.retention.hours = 168 # 单位为小时,默认的取值为7天 优先级最低
log.retention.minutes=null # 单位为分钟 优先级次之
log.retention.ms=null # 单位为毫秒 优先级最高
删除日志分段文件时,首先会从Log对象中所维护日志分段的跳跃表中移除待删除的日志分段,以保证没有线程对这些日志分段进行读取操作。然后将日志分段所对应的所有文件添加上".delete"的后缀(当然也包含对应的索引文件)。最后交由一个以"delete-file"的命令的延迟任务来删除这些以".delete"为后缀的文件,这个任务的延迟时间可以通过file.delete.delay.ms参数来调配,此
参数的默认值为60000,即1分钟。
基于大小:
日志删除任务会检查当前日志的大小是否超过设定的阈值来查找可删除的日志分段的文件集合,可以通过如下参数来设定日志大小的阈值:
log.retention.bytes = -1 # 默认为-1,表示无穷大
注:该参数配置的是Log中所有日志文件的总大小,而不是单个日志分段的大小(.log日志文件)
说完了写入这块,再来谈谈消费这块。
大家应该都知道,从Kafka里我们经常要消费数据,那么消费的时候实际上就是要从kafka的磁盘文件里读取某条数据然后发送给下游的消费者,如下图所示。
那么这里如果频繁的从磁盘读数据然后发给消费者,性能瓶颈在哪里呢?
假设要是kafka什么优化都不做,就是很简单的从磁盘读数据发送给下游的消费者,那么大概过程如下所示:
1、先看看要读的数据在不在os cache里,如果不在的话就从磁盘文件里读取数据后放入os cache。
2、接着从操作系统的os cache里拷贝数据到应用程序进程的缓存里。
3、再从应用程序进程的缓存里拷贝数据到操作系统层面的Socket缓存里。
4、最后从Socket缓存里提取数据后发送到网卡,最后发送出去给下游消费。
完成上述的操作,进行了两次copy:
1、一次是从操作系统的cache里拷贝到应用进程的缓存里。
2、从应用程序缓存里拷贝回操作系统的Socket缓存里。
而且为了进行这两次拷贝,中间还发生了好几次上下文切换,一会儿是应用程序在执行,一会儿上下文切换到操作系统来执行。所以这种方式来读取数据是比较消耗性能的。
Kafka为了解决这个问题,在读数据的时候是引入零拷贝技术。
也就是说,直接让操作系统的cache中的数据发送到网卡后传输给下游的消费者,中间跳过了两次拷贝数据的步骤,Socket缓存中仅仅会拷贝一个描述符过去,不会拷贝数据到Socket缓存。
大家看下图,体会一下这个精妙的过程:
通过零拷贝技术,就不需要把os cache里的数据拷贝到应用缓存,再从应用缓存拷贝到Socket缓存了,两次拷贝都省略了,所以叫做零拷贝。
对Socket缓存仅仅就是拷贝数据的描述符过去,然后数据就直接从os cache中发送到网卡上去了,这个过程大大的提升了数据消费时读取文件数据的性能。
阅读量:1286
点赞量:0
收藏量:0