();boolean cleanseqid = false;ioexception e = null;try {internalscanner scanner = null;try {/* include deletes, unless we are doing a compaction of all files */// 确定scan类型scantype:// 如果compact请求是major或all_files合并,则scantype为compact_drop_deletes;// 如果compact请求是minor合并,则scantype为compact_retain_deletes。scantype scantype =request.isallfiles() ? scantype.compact_drop_deletes : scantype.compact_retain_deletes;// 如果有协处理器,调用协处理器的precreatecoprocscanner()方法scanner = precreatecoprocscanner(request, scantype, fd.earliestputts, scanners);if (scanner == null) {// 如果协处理器中未创建scanner,调用createscanner()方法创建一个scanner = createscanner(store, scanners, scantype, smallestreadpoint, fd.earliestputts);}// 如果有协处理器,调用协处理器的precompact()方法scanner = postcreatecoprocscanner(request, scantype, scanner);if (scanner == null) {// null scanner returned from coprocessor hooks means skip normal processing.return newfiles;}// create the writer even if no kv(empty store file is also ok),// because we need record the max seq id for the store file, see hbase-6059// 确定最小读取点smallestreadpointif(fd.minseqidtokeep > 0) {smallestreadpoint = math.min(fd.minseqidtokeep, smallestreadpoint);cleanseqid = true;}// when all mvcc readpoints are 0, don't write them.// see hbase-8166, hbase-12600, and hbase-13389.// 调用hstore的createwriterintmp()方法,获取writerwriter = store.createwriterintmp(fd.maxkeycount, this.compactioncompression, true,fd.maxmvccreadpoint > 0, fd.maxtagslength > 0);// 调用performcompaction()方法,执行合并boolean finished = performcompaction(scanner, writer, smallestreadpoint, cleanseqid);// 如果没有完成合并if (!finished) {// 关闭writerwriter.close();// 删除writer中的临时文件store.getfilesystem().delete(writer.getpath(), false);writer = null;// 抛出异常throw new interruptedioexception( "aborting compaction of store " store " in region " store.getregioninfo().getregionnameasstring() " because it was interrupted.");}} finally {// 关闭scannerif (scanner != null) {scanner.close();}}} catch (ioexception ioe) {e = ioe;// throw the exceptionthrow ioe;}finally {try {if (writer != null) {if (e != null) {// 无异常的话,关闭writerwriter.close();} else {// 存在异常的话,写入元数据,关闭writer,并将写入地址加入newfileswriter.appendmetadata(fd.maxseqid, request.isallfiles());writer.close();newfiles.add(writer.getpath());}}} finally {// 依次关闭readerstoclose中storefile的readerfor (storefile f : readerstoclose) {try {f.closereader(true);} catch (ioexception ioe) {log.warn("exception closing " f, ioe);}}}}// 返回newfilesreturn newfiles;} 总结下defaultcompactor的compact()方法的处理流程,大体有如下几点: 1、通过父类compactor的getfiledetails()方法从请求中获取文件详情fd,fd是filedetails类型,这个filedetails类型的文件详情中主要包含如下信息:
(1)合并之后总的keyvalue数目:maxkeycount;
(2)如果是major合并,最早的put时间戳earliestputts;
(3)合并时文件中最大的序列号maxseqid;
(4)相关文件中最新的memstore数据读取点maxmvccreadpoint;
(5)最大的tag长度maxtagslength;
(6)在major合并期间需要保持的最小序列号minseqidtokeep。
2、构造合并过程追踪器compactionprogress,用于追踪合并过程;
3、通过父类compactor的getsmallestreadpoint()方法找到所有scanners中的最小的可读点,实际上就是找到最小能够读取数据的点smallestreadpoint;
4、根据参数hbase.regionserver.compaction.private.readers确定是否使用私有readers,默认为false不使用:
4.1、如果需要使用,即参数配置为true的话,克隆所有的storefiles,以便我们将在storefiles、hfilefiles以及它们的readers等一个独立的副本上执行合并;
4.1.1、根据请求中待合并文件的数目创建一个storefile列表:readerstoclose;
4.1.2、将请求中待合并文件逐一复制加入readerstoclose列表;
4.1.3、根据readerstoclose列表,即待合并文件的副本创建文件浏览器filescanners;
4.2、如果不需要使用,即参数配置为false的话,使用请求中实际发送的文件列表;
4.2.1、创建空的列表readerstoclose;
4.2.2、根据实际请求中的待合并文件列表创建文件浏览器filescanners;
5、根据compact请求类型确定scan类型scantype:
如果compact请求是major或all_files合并,则scantype为compact_drop_deletes;
如果compact请求是minor合并,则scantype为compact_retain_deletes。
6、如果有协处理器,调用协处理器的precreatecoprocscanner()方法,获得scanner,如果协处理器中未创建scanner,调用createscanner()方法创建一个;
7、如果有协处理器,调用协处理器的precompact()方法;
8、根据之前获取的smallestreadpoint和文件详情fd中的minseqidtokeep确定最小读取点smallestreadpoint,并置状态位cleanseqid;
9、调用hstore的createwriterintmp()方法,获取writer;
10、调用父类compactor的performcompaction()方法,利用scanner、writer、smallestreadpoint、cleanseqid执行合并:
实际上就是利用scanner读取旧文件数据,利用writer写入新文件数据。
11、如果没有完成合并:关闭writer、删除writer中的临时文件并抛出异常;
12、关闭scanner;
13、无异常的话,关闭writer;存在异常的话,写入元数据,关闭writer,并将写入地址加入newfiles;
14、依次关闭readerstoclose中storefile的reader;
15、返回newfiles。
大体流程就是如此。针对其中的某些细节,我们逐一进行分析。
首先说下这个文件详情filedetails,它是通过getfiledetails()方法获取的。文件详情filedetails类定义如下:
/** the sole reason this class exists is that java has no ref/out/pointer parameters. */protected static class filedetails {/** maximum key count after compaction (for blooms) */// 合并之后总的keyvalue数目public long maxkeycount = 0;/** earliest put timestamp if major compaction */// 如果是major合并,最早的put时间戳earliestputtspublic long earliestputts = hconstants.latest_timestamp;/** the last key in the files we're compacting. */// 合并时文件中最大的序列号public long maxseqid = 0;/** latest memstore read point found in any of the involved files */// 相关文件中最新的memstore数据读取点maxmvccreadpointpublic long maxmvccreadpoint = 0;/** max tags length**/// 最大的tag长度maxtagslengthpublic int maxtagslength = 0;/** min seqid to keep during a major compaction **/// 在major合并期间需要保持的最小序列号minseqidtokeeppublic long minseqidtokeep = 0;} 而它的获取方法如下:
/*** extracts some details about the files to compact that are commonly needed by compactors.* 提取文件合并的一些细节* @param filestocompact files.* @param allfiles whether all files are included for compaction* @return the result.*/protected filedetails getfiledetails(collection filestocompact, boolean allfiles) throws ioexception {// 构造一个filedetails实例fdfiledetails fd = new filedetails();// 计算保持mvcc的最新hfile时间戳:当前时间-24小时 * keepseqidperiod// keepseqidperiod为一个参数,即被指定的在major合并期间mvcc值可以保持多少天long oldesthfiletimestamptokeepmvcc = system.currenttimemillis() - (1000l * 60 * 60 * 24 * this.keepseqidperiod); // 遍历需要合并的文件for (storefile file : filestocompact) {// 如果allfiles为true,即所有文件都需要检测,且文件的修改时间小于上述保持mvcc的最新hfile时间戳if(allfiles && (file.getmodificationtimestamp() < oldesthfiletimestamptokeepmvcc)) {// when isallfiles is true, all files are compacted so we can calculate the smallest // mvcc value to keep// 如果文件细节中需要保持的最小序列号小于文件memstore的时间戳if(fd.minseqidtokeep < file.getmaxmemstorets()) {// 将文件memstore的时间戳赋值给fd的需要保持的最小序列号minseqidtokeepfd.minseqidtokeep = file.getmaxmemstorets();}}// 获取文件的最大序列号idlong seqnum = file.getmaxsequenceid();// 赋值给文件细节fd中的maxseqid,记录待合并文件的最大序列号idfd.maxseqid = math.max(fd.maxseqid, seqnum);// 获取readerstorefile.reader r = file.getreader();if (r == null) {log.warn("null reader for " file.getpath());continue;}// note: use getentries when compacting instead of getfilterentries, otherwise under-sized// blooms can cause progress to be miscalculated or if the user switches bloom// type (e.g. from row to rowcol)// 获取文件中的keyvalue数量,实际上就是列的数量,// hbase底层对每个列都是按照keyvalue格式存储的,key包含rowkey column family quality tm等,value即列值long keycount = r.getentries();// 累加keyvalue数目maxkeycountfd.maxkeycount = keycount;// calculate the latest mvcc readpoint in any of the involved store files// 计算所有相关存储文件的最新mvcc读取点maxmvccreadpoint// 先加载文件信息fileinfomap fileinfo = r.loadfileinfo();byte tmp[] = null;// get and set the real mvccreadpoint for bulk loaded files, which is the// seqid number.// 如果是bulk导入的,maxmvccreadpoint为fd的maxmvccreadpoint和文件sequenceid中较大者if (r.isbulkloaded()) {fd.maxmvccreadpoint = math.max(fd.maxmvccreadpoint, r.getsequenceid());}else {// 否则,读取文件信息中最大的memstore时间戳max_memstore_ts_keytmp = fileinfo.get(hfilewriterv2.max_memstore_ts_key);if (tmp != null) {// maxmvccreadpoint就是fd的maxmvccreadpoint和文件信息中最大的memstore时间戳max_memstore_ts_key中较大者fd.maxmvccreadpoint = math.max(fd.maxmvccreadpoint, bytes.tolong(tmp));}}// 更新最大标签长度maxtagslengthtmp = fileinfo.get(fileinfo.max_tags_len);if (tmp != null) {fd.maxtagslength = math.max(fd.maxtagslength, bytes.toint(tmp));}// if required, calculate the earliest put timestamp of all involved storefiles.// this is used to remove family delete marker during compaction.long earliestputts = 0;// 获取最早的put时间戳earliestputtsif (allfiles) {tmp = fileinfo.get(storefile.earliest_put_ts);if (tmp == null) {// there's a file with no information, must be an old one// assume we have very old putsfd.earliestputts = earliestputts = hconstants.oldest_timestamp;} else {earliestputts = bytes.tolong(tmp);fd.earliestputts = math.min(fd.earliestputts, earliestputts);}}if (log.isdebugenabled()) {log.debug("compacting " file ", keycount=" keycount ", bloomtype=" r.getbloomfiltertype().tostring() ", size=" stringutils.humanreadableint(r.length()) ", encoding=" r.gethfilereader().getdatablockencoding() ", seqnum=" seqnum (allfiles ? ", earliestputts=" earliestputts: ""));}}// 返回合并细节fdreturn fd;} 接下来再看下找到scanners中的最小的可读点,实际上就是找到最小能够读取数据的点,它是通过父类compactor的getsmallestreadpoint()方法实现的,代码如下: protected long getsmallestreadpoint() {// 获取的是hstore中的smallestreadpointreturn store.getsmallestreadpoint();} 可以看出,父类的该方法实际上还是通过hstore中的getsmallestreadpoint()方法实现的,如下: @overridepublic long getsmallestreadpoint() {// 获取的是region中的smallestreadpoint,因为hbase是行级事务,smallestreadpoint应该也是行级的return this.region.getsmallestreadpoint();} 而hstore实际上最终获取的是region中的smallestreadpoint,这也从侧面反映了那个我们熟知的问题:因为hbase是行级事务,smallestreadpoint应该也是行级的。而具体的smallestreadpoint该如何获取,我们在以后的多版本控制协议mvcc中再细讲。 接下来,我们再看下如何创建文件浏览器filescanners,它是通过父类compactor的createfilescanners()方法来构造的,代码如下:
/*** creates file scanners for compaction.* @param filestocompact files.* @return scanners.*/protected list createfilescanners(final collection filestocompact, long smallestreadpoint) throws ioexception {return storefilescanner.getscannersforstorefiles(filestocompact, false, false, true,smallestreadpoint);} 它是一个专门为合并创建scanner的方法,这个scanner区别于客户端的scanner,我们继续看storefilescanner的getscannersforstorefiles()方法,如下: /*** return an array of scanners corresponding to the given set of store files,* and set the scanquerymatcher for each store file scanner for further* optimization*/public static list getscannersforstorefiles(collection files, boolean cacheblocks, boolean usepread,boolean iscompaction, scanquerymatcher matcher, long readpt) throws ioexception {list scanners = new arraylist(files.size());// 遍历storefile文件filesfor (storefile file : files) {// 获取每个文件的readerstorefile.reader r = file.createreader();// 根据reader获取storefilescanner类型的scanner,这个scanner专门用于读取storefilestorefilescanner scanner = r.getstorefilescanner(cacheblocks, usepread,iscompaction, readpt);scanner.setscanquerymatcher(matcher);// 加入scanner列表scannersscanners.add(scanner);}// 返回scanner列表return scanners;} 很简单,不再赘述,读者可以自己阅读源码。 继续,我们再看下如果获取一个内部internalscanner类型的scanner,它是通过createscanner()来获取的,代码如下:
/*** 创建一个scanner* * @param store store* @param scanners store file scanners.* @param scantype scan type.* @param smallestreadpoint smallest mvcc read point.* @param earliestputts earliest put across all files.* @return a compaction scanner.*/protected internalscanner createscanner(store store, list scanners,scantype scantype, long smallestreadpoint, long earliestputts) throws ioexception {// 构造一个scan实例scanscan scan = new scan();// 设置最大版本号,即列簇被设置的最大版本号(是不是从这里就能看出,compact时会做数据清理工作呢,o(∩_∩)o)scan.setmaxversions(store.getfamily().getmaxversions());// 返回一个storescanner实例return new storescanner(store, store.getscaninfo(), scan, scanners,scantype, smallestreadpoint, earliestputts);} 这里的scanner,实际上是storescanner类型的实例,它是针对store的内部scanner,而且,这里有一个重点,创建scan时会设置最大版本号,即列簇被设置的最大版本号,那么我们是不是从这里就能看出,compact时会做数据清理工作呢,答案当然是肯定的。所以hbase在数据修改时,并不是简单的删除,而是增加一个版本,而过期数据则会在compact过程中,通过scanner设置最大版本号的方式来过滤掉,这种处理方式是很高效的,它体现了hbase低延迟的特点。 有了读数据的scanner,我们接着来看下写数据的writer。毕竟数据得有读有写,才能将旧文件合并成新文件,而writer是通过hstore的createwriterintmp()方法来创建的,如下:
/** @param maxkeycount* @param compression compression algorithm to use* @param iscompaction whether we are creating a new file in a compaction* @param includesmvccreadpoint - whether to include mvcc or not* @param includestag - includestag or not* @return writer for a new storefile in the tmp dir.*/@overridepublic storefile.writer createwriterintmp(long maxkeycount, compression.algorithm compression,boolean iscompaction, boolean includemvccreadpoint, boolean includestag)throws ioexception {final cacheconfig writercacheconf;// 是否为合并if (iscompaction) {// 如果是合并,不在writercacheconf上缓存数据// don't cache data on write on compactions.writercacheconf = new cacheconfig(cacheconf);writercacheconf.setcachedataonwrite(false);} else {writercacheconf = cacheconf;}inetsocketaddress[] favorednodes = null;// 获取有利节点if (region.getregionserverservices() != null) {favorednodes = region.getregionserverservices().getfavorednodesforregion(region.getregioninfo().getencodedname());}// 创建hfile上下文hfilecontexthfilecontext hfilecontext = createfilecontext(compression, includemvccreadpoint, includestag,cryptocontext);// 创建storefile的storefile,需要使用上述信息,比如文件系统、文件路径、合并器、最大keyvalue数目、有利节点等storefile.writer w = new storefile.writerbuilder(conf, writercacheconf,this.getfilesystem())// 文件系统.withfilepath(fs.createtempname())// 文件路径.withcomparator(comparator)// 合并器.withbloomtype(family.getbloomfiltertype()).withmaxkeycount(maxkeycount)// 最大keyvalue数目.withfavorednodes(favorednodes)// 有利节点.withfilecontext(hfilecontext)// hfile上下文信息.build();return w;} 这个writer本质上是storefile的writer,它是针对存储文件的写入者,其中包含很多关键信息,比如文件系统、文件路径、合并器、最大keyvalue数目、有利节点、hfile上下文信息等。 有了scanner,可以读数据了,又有了writer,也可以写数据了,那么我们就可以开始合并了:由旧文件读取数据,往新文件写入数据。我们看下compactor的performcompaction()方法,代码如下:
/*** performs the compaction.* 执行合并* * @param scanner where to read from.* @param writer where to write to.* @param smallestreadpoint smallest read point.* @param cleanseqid when true, remove seqid(used to be mvcc) value which is <= smallestreadpoint* @return whether compaction ended; false if it was interrupted for some reason.*/protected boolean performcompaction(internalscanner scanner,cellsink writer, long smallestreadpoint, boolean cleanseqid) throws ioexception {// 已写字节数long byteswritten = 0;// 处于写过程的字节数long byteswrittenprogress = 0;// since scanner.next() can return 'false' but still be delivering data,// we have to use a do/while loop.// cell列表list cells = new arraylist();// 周期性检测的阈值:合并已被处理的数据量大小,取参数hbase.hstore.close.check.interval,默认为10mlong closecheckinterval = hstore.getclosecheckinterval();long lastmillis = 0;if (log.isdebugenabled()) {lastmillis = environmentedgemanager.currenttime();}long now = 0;// 进入一个do...while循环,一直循环的条件是hasmore为true,即scanner中还有数据boolean hasmore;do {// scanner中是否还存在数据,取出到cells中hasmore = scanner.next(cells, compactionkvmax);if (log.isdebugenabled()) {now = environmentedgemanager.currenttime();}// output to writer:// 遍历cells,写入writerfor (cell c : cells) {if (cleanseqid && c.getsequenceid() <= smallestreadpoint) {cellutil.setsequenceid(c, 0);}// 写入writerwriter.append(c);// keyvalue大小int len = keyvalueutil.length(c);// 计数器累加:kv累计数目和累计大小 progress.currentcompactedkvs;progress.totalcompactedsize = len;if (log.isdebugenabled()) {byteswrittenprogress = len;}// check periodically to see if a system stop is requested// 周期性检测是否一个系统停止被请求if (closecheckinterval > 0) {// 累加已写字节数byteswrittenbyteswritten = len;// 如果已写字节数byteswritten大于closecheckintervalif (byteswritten > closecheckinterval) {// 重置已写字节数byteswritten byteswritten = 0;// 判断hstore是否可写,不可写的话,说明一个system stop请求已发起,则通过progress取消合并if (!store.arewritesenabled()) {progress.cancel();return false;}}}}// log the progress of long running compactions every minute if// logging at debug levelif (log.isdebugenabled()) {if ((now - lastmillis) >= 60 * 1000) {log.debug("compaction progress: " progress string.format(", rate=%.2f kb/sec",(byteswrittenprogress / 1024.0) / ((now - lastmillis) / 1000.0)));lastmillis = now;byteswrittenprogress = 0;}}// 情况cell列表cells.clear();} while (hasmore);// 合并过程progress标记已完成progress.complete();return true;} 这个合并执行的过程还是比较简单的,它通过一个do...while循环,不断的从scanner中读取数据,放入cell列表,然后遍历cells,将cell依次写入writer,并累加kv数目和大小,直到scanner中数据被处理完。如此,旧文件数据不断的被读取出来,然后将其不断的写入新文件,最好通过合并过程progress标记合并已完成。大致就是这个流程。 这里有个需要特别说明的地方,在数据合并过程中,还需要周期性的检测是否有外部发起系统关系的请求,如果是的话,则需要取消合并。这个周期性不是针对时间的,而是针对一个已合并数据量的阈值closecheckinterval,这个closecheckinterval取自参数hbase.hstore.close.check.interval,默认为10m。在合并过程中,被合并数据大小byteswritten不断的被累加,直到超过阈值closecheckinterval,清空,并且根据hstore的可写状态来判断是否有外部发起系统停止的请求,如果有的话,通过progress取消合并,否则继续进入下一个累加至阈值再进行判断的周期。 接下来,根据上述合并的结果finished,来判断后续处理步骤:如果没有完成合并:关闭writer、删除writer中的临时文件并抛出异常。 最好,如果存在异常e,写入元数据,关闭writer,并将写入地址加入newfiles;如果不存在异常e,则关闭writer,返回合并后的文件列表newfiles。不管结果如何,最终依次关闭readerstoclose中storefile的reader。 至此,整个hregion中精确到hstore上的compact流程就分析完毕了。限于篇幅的原因,可能部分细节简单掠过或者没有提及,留待以后再慢慢分析吧!
总结
以上是尊龙游戏旗舰厅官网为你收集整理的hbase源码分析之hregion上compact流程分析(三)的全部内容,希望文章能够帮你解决所遇到的问题。
如果觉得尊龙游戏旗舰厅官网网站内容还不错,欢迎将尊龙游戏旗舰厅官网推荐给好友。
| |