设为首页 收藏本站
查看: 881|回复: 0

[经验分享] kafka logManager类 kafka存储机制

[复制链接]

尚未签到

发表于 2017-5-23 17:04:45 | 显示全部楼层 |阅读模式
  logManager类:管理kafka数据log的类,包括数据clean,flush等操作
   Log类:每个tplog的对象
      logSegment:每个tplog目录下的文件对象
          filemessageSet:每个log file的管道类
          base offset:在topic中的绝对offset值
          offsetindex:每个log index的管道map类,存储相对offset值和文件position
 
   按照partition分区topic,分发到各个机子上
   partition上有多个log文件,每个log文件一个索引文件
   log文件是实际的数据,索引文件是log文件里数据的相对偏移量和在log文件里的position,偏移量offset是一段数据生成一个offset,避免offset文件过大
 
1.初始化:

val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
val LockFile = ".lock"
val InitialTaskDelayMs = 30*1000
private val logCreationOrDeletionLock = new Object
private val logs = new Pool[TopicAndPartition, Log]()//所有log的对象,一个topicpartition 一个log对象
//获得log文件,并获得文件channel锁
createAndValidateLogDirs(logDirs)
private val dirLocks = lockLogDirs(logDirs)
private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
//遍历所有的log,生成Log对象,并且执行log clean(checkposition)
loadLogs()
主要方法loadLogs:

if (cleanShutdownFile.exists) {//表示上次关闭kafka时,已经clean完,这次不需要clean
debug(
"Found clean shutdown file. " +
"Skipping recovery for all logs in data directory: " +
dir.getAbsolutePath)
} else {
// log recovery itself is being performed by `Log` class during initialization
brokerState.newState(RecoveringFromUncleanShutdown)
}
//获得log下recover文件
val recoveryPoints = this.recoveryPointCheckpoints(dir).read
val jobsForDir = for {
dirContent <- Option(dir.listFiles).toList
logDir <- dirContent if logDir.isDirectory
} yield {
Utils.runnable {
debug("Loading log '" + logDir.getName + "'")
//从文件目录上获得topic和partition
val topicPartition = Log.parseTopicPartitionName(logDir.getName)
//从map中获得topic的自定义config,如果
val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
val current = new Log(logDir, config, logRecoveryPoint, scheduler, time)
val previous = this.logs.put(topicPartition, current)
//判断是否有重复的topic+partition
if (previous != null) {
throw new IllegalArgumentException(
"Duplicate log directories found: %s, %s!".format(
current.dir.getAbsolutePath, previous.dir.getAbsolutePath))
}
}
}
//对每个logDir执行 上边的runnable,生成Log对象添加到log pool中
jobs(cleanShutdownFile) = jobsForDir.map(pool.submit).toSeq
  其中new Log方法,为初始化log file和index
 主方法:loadSegments
   1.处理swap文件,log则重新加载(rename),index则删除
   2.加载log和index,恢复不存在的index

private def loadSegments() {
// create the log directory if it doesn't exist
dir.mkdirs()
// first do a pass through the files in the log directory and remove any temporary files
// and complete any interrupted swap operations
for(file <- dir.listFiles if file.isFile) {
if(!file.canRead)
throw new IOException("Could not read file " + file)
val filename = file.getName
if(filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) {
// if the file ends in .deleted or .cleaned, delete it
file.delete()
} else if(filename.endsWith(SwapFileSuffix)) {//文件用于swap时候,恢复log
// we crashed in the middle of a swap operation, to recover:
// if a log, swap it in and delete the .index file
// if an index just delete it, it will be rebuilt
//如果是index则删除,如果是log则重新加载(重命名),并删除已经存在的index
val baseName = new File(Utils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
if(baseName.getPath.endsWith(IndexFileSuffix)) {
file.delete()
} else if(baseName.getPath.endsWith(LogFileSuffix)){
// delete the index
val index = new File(Utils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix))
index.delete()
// complete the swap operation
val renamed = file.renameTo(baseName)
if(renamed)
info("Found log file %s from interrupted swap operation, repairing.".format(file.getPath))
else
throw new KafkaException("Failed to rename file %s.".format(file.getPath))
}
}
}
// now do a second pass and load all the .log and .index files
for(file <- dir.listFiles if file.isFile) {
val filename = file.getName
if(filename.endsWith(IndexFileSuffix)) {
// if it is an index file, make sure it has a corresponding .log file 查看index log是否对应的 log,如果没有则删除
val logFile = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))
if(!logFile.exists) {
warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
file.delete()
}
} else if(filename.endsWith(LogFileSuffix)) {
// if its a log file, load the corresponding log segment
// 文件名是start offset
val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
val hasIndex = Log.indexFilename(dir, start).exists
//建立tplog中 每个日志文件对象 logsegment,包含filemessage,offsetindex,baseoffset值
val segment = new LogSegment(dir = dir,
startOffset = start,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
time = time)
if(!hasIndex) {
error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
//重建index文件和内存索引,文件和内存索引是用的channel map机制
segment.recover(config.maxMessageSize)
}
segments.put(start, segment)
}
}
if(logSegments.size == 0) {
// no existing segments, create a new mutable segment beginning at offset 0
segments.put(0L, new LogSegment(dir = dir,
startOffset = 0,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
time = time))
} else {
recoverLog()
// reset the index size of the currently active log segment to allow more entries
activeSegment.index.resize(config.maxIndexSize)
}
// sanity check the index file of every segment to ensure we don't proceed with a corrupt segment
for (s <- logSegments)
s.index.sanityCheck()
}
 -----------------------------初始化完毕---------------------------------
 
startup方法中三个功能:
1.cleanupLogs
2.flushDirtyLogs
3.checkpointRecoveryPointOffsets
 
1.cleanupLogs
 两个方法一个是超时(超时是modify时间),一个是大小(大小是最老的小于diff)

  private def cleanupExpiredSegments(log: Log): Int = {
val startMs = time.milliseconds
//参数为log manager开始时间-tplog的修改时间 和 配置retention时间 比较,超过则需要删除,返回true
//删除的是最后一次修改时间超过retention time的
log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs)
}
  /**
* 删除规则,是tplog超过阈值,从最老的开始找,找到file的大小小于diff的时候删除
* 如果当前log file大小大于diff,则停止(原则是等最后一个文件可删除)
*  Runs through the log removing segments until the size of the log
*  is at least logRetentionSize bytes in size
*/
private def cleanupSegmentsToMaintainSize(log: Log): Int = {
if(log.config.retentionSize < 0 || log.size < log.config.retentionSize)
return 0//当配置小于0,或log大小小于配置
var diff = log.size - log.config.retentionSize
def shouldDelete(segment: LogSegment) = {
if(diff - segment.size >= 0) {//如果需要删除的大小 大于或等于 logfile,则返回true
diff -= segment.size
true
} else {
false
}
}
log.deleteOldSegments(shouldDelete)
}
  参数:
  清理日志,距离上次修改时间大于config时间,则删除
  val logCleanupIntervalMs = props.getLongInRange("log.retention.check.interval.ms", 5*60*1000, (1, Long.MaxValue))
  log clean参数,达到log大小上限,log的position
  val logRetentionBytes = props.getLong("log.retention.bytes", -1)
 

  def deleteOldSegments(predicate: LogSegment => Boolean): Int = {
// find any segments that match the user-supplied predicate UNLESS it is the final segment
// and it is empty (since we would just end up re-creating it
val lastSegment = activeSegment
//超时,并且包含segment,则删除,获得删除list segment
val deletable = logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastSegment.baseOffset || s.size > 0))
val numToDelete = deletable.size
if(numToDelete > 0) {
lock synchronized {
// we must always have at least one segment, so if we are going to delete all the segments, create a new one first
if(segments.size == numToDelete)
roll()
// remove the segments for lookups
deletable.foreach(deleteSegment(_))//从segment集合中移除,修改文件名称为delete结尾,并异步删除
}
}
numToDelete
}
 
 2.flushDirtyLogs

flush的message条数和时间间隔
/* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */
val logFlushIntervalMs = props.getLong("log.flush.interval.ms", logFlushSchedulerIntervalMs)
/**
* Flush any log which has exceeded its flush interval and has unwritten messages.
*/
private def flushDirtyLogs() = {
debug("Checking for dirty logs to flush...")
for ((topicAndPartition, log) <- logs) {
try {
val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
debug("Checking if flush is needed on " + topicAndPartition.topic + " flush interval  " + log.config.flushMs +
" last flushed " + log.lastFlushTime + " time since last flush: " + timeSinceLastFlush)
if(timeSinceLastFlush >= log.config.flushMs)
log.flush
} catch {
case e: Throwable =>
error("Error flushing topic " + topicAndPartition.topic, e)
}
}
}
@threadsafe
def flush() {
LogFlushStats.logFlushTimer.time {
log.flush()
index.flush()
}
}
 
3.checkpointRecoveryPointOffsets
  checkpointRecoveryPointOffsets,标记logdir上的恢复点,避免启动时,需要恢复所有log,生成index
  是按照logdir遍历,logdir中包含多个tplog
 

  /**
* Make a checkpoint for all logs in provided directory.
*/
private def checkpointLogsInDir(dir: File): Unit = {
//获得当前dir的所有tplog,value:Map【TopicAndPartition, Log】
val recoveryPoints = this.logsByDir.get(dir.toString)
if (recoveryPoints.isDefined) {
//mapValues重新生成map的value,write参数(topicAndPartition:recoverPoint);
//write将tplog的offset写入recover文件的tmp文件中,删除旧文件,rename为recover文件 _是Log对象(value)
this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint))
}
}
 
 logmanager里实现log compact功能

    if(cleanerConfig.enableCleaner)
cleaner.startup()//log compact
 
 
 
 
 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379871-1-1.html 上篇帖子: kafka参数配置 下篇帖子: kafka SocketServer类
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表