欢迎访问 生活随笔!

尊龙游戏旗舰厅官网

当前位置: 尊龙游戏旗舰厅官网 > > 编程问答 >内容正文

编程问答

storm中的localstate 代码解析 -尊龙游戏旗舰厅官网

发布时间:2025/1/21 编程问答 6 豆豆
尊龙游戏旗舰厅官网 收集整理的这篇文章主要介绍了 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

官方的解释这个类为:

/*** a simple, durable, atomic k/v database. *very inefficient*, should only be* used for occasional reads/writes. every read/write hits disk.*/

简单来理解就是这个类每次读写都会将一个map的对象序列化存储到磁盘中,读的时候将其反序列化。

构造函数指定的参数就是你在磁盘中存储的目录,同时也作为versionedstore的构造函数的参数。

这些文件在目录中是以一个long类型的id进行命名

public localstate(string backingdir) throws ioexception {_vs = new versionedstore(backingdir);}

snapshot函数,找到最近的版本,将其反序列化

public synchronized map snapshot() throws ioexception {int attempts = 0;while (true) {string latestpath = _vs.mostrecentversionpath(); //获取最近的版本if (latestpath == null)return new hashmap();try {return (map) utils.deserialize(fileutils.readfiletobytearray(new file(latestpath)));} catch (ioexception e) {attempts ;if (attempts >= 10) {throw e;}}}} public object get(object key) throws ioexception {return snapshot().get(key); }public synchronized void put(object key, object val) throws ioexception {put(key, val, true);}public synchronized void put(object key, object val, boolean cleanup)throws ioexception {map curr = snapshot();curr.put(key, val);persist(curr, cleanup); //persist会将其写入到磁盘中}public synchronized void remove(object key) throws ioexception {remove(key, true); }public synchronized void remove(object key, boolean cleanup)throws ioexception {map curr = snapshot();curr.remove(key);persist(curr, cleanup);}public synchronized void cleanup(int keepversions) throws ioexception {_vs.cleanup(keepversions);}

可以看到,基本暴露的接口都通过synchronized关键字来保证串行化的操作,同时多次调用了以下的persist方法,

private void persist(map val, boolean cleanup)throws ioexception {byte[] towrite = utils.serialize(val);string newpath = _vs.createversion(); //创建一个新的版本号fileutils.writebytearraytofile(new file(newpath), towrite);_vs.succeedversion(newpath); //如果写入成功,那么会生成 id.version 文件来声明该文件写入成功if (cleanup)_vs.cleanup(4); //默认保留4个版本}

接下来看看versionedstore这个类,它是进行实际存储操作的类,提供了接口给localstate

public void succeedversion(string path) throws ioexception {long version = validateandgetversion(path); //验证一下这个文件是否存在// should rewrite this to do a file move createnewfile(tokenpath(version)); //创建对应的 id.version 文件说明写入成功}

path的值是一个long类型的id,表示对应的文件

private long validateandgetversion(string path) {long v = parseversion(path);if (v == null)throw new runtimeexception(path " is not a valid version");return v;}

//解析出版本号,如果以.version结尾的,去掉.version

private long parseversion(string path) {string name = new file(path).getname();if (name.endswith(finished_version_suffix)) {name = name.substring(0,name.length() - finished_version_suffix.length());}try {return long.parselong(name);} catch (numberformatexception e) {return null;}}

 

createnewfile(tokenpath(version)); //创建对应的 id.version 文件说明写入成功

token file就是一种标志文件,用于标志对应的文件已经写入成功,以.version 结尾

private string tokenpath(long version) {return new file(_root, "" version finished_version_suffix).getabsolutepath();}

 

private void createnewfile(string path) throws ioexception {new file(path).createnewfile();}

cleanup函数,保留versionstokeep版本,清除其他的版本

public void cleanup(int versionstokeep) throws ioexception {list versions = getallversions(); //获取所有的版本,这个返回的是以倒序排列的,最新的版本在最前面if (versionstokeep >= 0) {versions = versions.sublist(0,math.min(versions.size(), versionstokeep)); //所以可以用sublist来得到需要的版本}hashset keepers = new hashset(versions); //存在hashset中方便快速存取for (string p : listdir(_root)) {long v = parseversion(p);if (v != null && !keepers.contains(v)) {deleteversion(v); //删除其他的版本}}}

getallversions,注意这里是获取所有以version结尾的文件,也就是说所有写入成功的文件,不包括某些还没写成功的文件

/*** sorted from most recent to oldest*/public list getallversions() throws ioexception {list ret = new arraylist();for (string s : listdir(_root)) { //获取该目录下的所有文件if (s.endswith(finished_version_suffix)) { ret.add(validateandgetversion(s)); //验证该文件是否存在}}collections.sort(ret);collections.reverse(ret); //逆序排列return ret;}

删除对应的version文件和token文件

public void deleteversion(long version) throws ioexception {file versionfile = new file(versionpath(version));file tokenfile = new file(tokenpath(version));if (versionfile.exists()) {fileutils.forcedelete(versionfile);}if (tokenfile.exists()) {fileutils.forcedelete(tokenfile);}}

在最开始的地方,snapshot()函数调用了 mostrecentversionpath() 来获取最近的版本,也就是调用getallversions,然后拿到最新的version

public string mostrecentversionpath() throws ioexception {long v = mostrecentversion();if (v == null)return null;return versionpath(v);} public long mostrecentversion() throws ioexception {list all = getallversions();if (all.size() == 0)return null;return all.get(0);}

如果提供了version号的话,可以看到是取出了比这个version号小的最大的version

public string mostrecentversionpath(long maxversion) throws ioexception {long v = mostrecentversion(maxversion);if (v == null)return null;return versionpath(v);} public long mostrecentversion(long maxversion) throws ioexception {list all = getallversions();for (long v : all) {if (v <= maxversion) //取出比maxversion小的最大versionreturn v;}return null;}

 

转载于:https://www.cnblogs.com/longshaohang/p/3893264.html

总结

以上是尊龙游戏旗舰厅官网为你收集整理的的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得尊龙游戏旗舰厅官网网站内容还不错,欢迎将尊龙游戏旗舰厅官网推荐给好友。

网站地图