网站首页 包含标签 watcher 的所有文章

  • ZK的watch机制实现原理

    newZooKeeper(StringconnectString,intsessionTimeout,Watcherwatcher)这个Watcher将作为整个ZooKeeper会话期间的上下文,一直被保存在客户端ZKWatchManager的defaultWatcher 也可以动态添加watcher:getData(),exists,getChildren。 分布式环境下的观察者模式:通过客服端和服务端分别创建有观察者的信息列表。 客户端调用相应接口时,首先将对应的Watch事件放到本地的ZKWatchManager中进行管理。 服务端在接收到客户端的请求后根据请求类型判断是否含有Watch事件,并将对应事件放到WatchManager中进行管理。 在事件触发的时候服务端通过节点的路径信息查询相应的Watch事件通知给客户端,客户端在接收到通知后,首先查询本地的ZKWatchManager获得对应的Watch信息处理回调操作。 这种设计不但实现了一个分布式环境下的观察者模式,而且通过将客户端和服务端各自处理Watch事件所需要的额外信息分别保存在两端,减少彼此通信的内容,大大提升了服务的处理性能 客户端实现过程 标记该会话是一个带有Watch事件的请求 通过DataWatchRegistration类来保存watcher事件和节点的对应关系 客户端向服务器发送请求,将请求封装成一个Packet对象,并添加到一个等待发送队列outgoingQueue中调用负责处理队列outgoingQueue的SendThread线程类中的readResponse方法接收服务端的回调,并在最后执行finishPacket()方法将Watch注册到ZKWatchManager,sendThread通过发送path路径和watcher为true,到server注册watch事件 ZKWatchManager保存了Map<String,Set> dataWatchers、Map<String,Set> existsWatchers、Map<String,Set> childrenWatchers三个集合,客户端会在dataWatchers中会添加一个key为path路径的本地事件 服务端实现过程 解析收到的请求是否带有Watch注册事件,通过FinalRequestProcessor类中的processRequest函数实现的。当getDataRequest.getWatch()值为True时,表明该请求需要进行Watch监控注册。 将对应的Watch事件存储到WatchManager,通过zks.getZKDatabase().getData函数实现, WatchManger该类中有HashMap<String,HashSet> watchTable,key为path,Watcher是一个客户端网络连接封装,当节点变化时会通知对应的连接(连接通过心跳保持) 服务端触发过程 调用WatchManager中的方法触发数据变更事件 封装了一个具有会话状态、事件类型、数据节点3种属性的WatchedEvent对象。之后查询该节点注册的Watch事件,如果为空说明该节点没有注册过Watch事件。如果存在Watch事件则添加到定义的Wathcers集合中,并在WatchManager管理中删除。最后,通过调用process方法向客户端发送通知 客户端回调过程 使用SendThread.readResponse()方法来统一处理服务端的相应 将收到的字节流反序列化转换成WatcherEvent对象。调用eventThread.queueEvent()方法将接收到的事件交给EventThread线程进行处理 从ZKWatchManager中查询注册过的客户端Watch信息。查询到后,会将其从ZKWatchManager的管理中删除。因此客户端的Watcher机制是一次性的,触发后就会被删除 将查询到的Watcher存储到waitingEvents队列中,调用EventThread类中的run方法循环取出在waitingEvents队列中等待的Watcher事件进行处理 ...

    2023-11-02 156

联系我们

在线咨询:点击这里给我发消息

QQ交流群:KirinBlog

工作日:8:00-23:00,节假日休息

扫码关注