要实现的功能很简单,就是一个集群注册服务.对于某一个服务,都可以有对应的多个服务地址,当某个服务机器开始提供服务的时候,就 把自己的IP地址注册上去,而对应客户端来说,就是获取对应的服务机器IP列表.而zk会知道每个服务机器的服务状态.
本代码没有经过线上验证..仅供参考
对应的接口很简单.
package zhenghui.lsf.configserver.service; /** * User: zhenghui * Date: 13-12-22 * Time: 下午4:57 * 集群注册服务. */ public interface AddressService { /** * 设置目标服务的地址 * serviceUniqueName 对应接口的标识符 * */ public void setServiceAddresses(String serviceUniqueName, String address); /** * 获取目标服务的地址 * * @param serviceUniqueName * @return String 当没有可用的服务地址的时候,将会返回null */ public String getServiceAddress(String serviceUniqueName); }
对应的实现类如下
AddressComponent.java
package zhenghui.lsf.configserver.impl; import org.springframework.beans.factory.InitializingBean; import zhenghui.lsf.configserver.service.AddressService; import java.util.List; import java.util.Random; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; /** * User: zhenghui * Date: 13-12-22 * Time: 下午4:58 * 基于zk实现.根据简单原则,不做高级路由处理.直接用随机来做路由. */ public class AddressComponent extends ZookeeperWatcher implements AddressService, InitializingBean { private AtomicBoolean inited = new AtomicBoolean(false); private static final int DEFAULT_TIME_OUT = 30000; /** * 服务地址cache */ private ConcurrentHashMap<String, Future<List<String>>> serviceAddressCache = new ConcurrentHashMap<String, Future<List<String>>>(); /** * zk服务器的地址. */ private String zkAdrress = "10.125.195.174:2181"; @Override public void setServiceAddresses(String serviceUniqueName, String address) { String path = DEFAULT_SERVER_PATH + separator + serviceUniqueName; createPath(path, address); } private void init() throws Exception { // 避免被初始化多次 if (!inited.compareAndSet(false, true)) { return; } createConnection(zkAdrress, DEFAULT_TIME_OUT); } @Override public String getServiceAddress(String serviceUniqueName) throws ExecutionException, InterruptedException { final String path = DEFAULT_SERVER_PATH + separator + serviceUniqueName; List<String> addressList; Future<List<String>> future = serviceAddressCache.get(path); if(future == null){ FutureTask<List<String>> futureTask = new FutureTask(new Callable<List<String>>() { public List<String> call() { return getChildren(path, true); } }); Future<List<String>> old = serviceAddressCache.putIfAbsent(path, futureTask); if (old == null) { futureTask.run(); addressList = futureTask.get(); } else { addressList = old.get(); } } else { addressList = future.get(); } return addressList.get(new Random().nextInt(addressList.size())); } @Override public void afterPropertiesSet() throws Exception { init(); } public void setZkAdrress(String zkAdrress) { this.zkAdrress = zkAdrress; } @Override protected void addressChangeHolder(String path) { serviceAddressCache.remove(path); } }
ZookeeperWatcher.java
package zhenghui.lsf.configserver.impl; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import zhenghui.lsf.exception.LSFException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; /** * User: zhenghui * Date: 13-12-25 * Time: 下午4:13 * 一些zk的封装 * 这里注意,别忘记初始化zk的path.比如创建节点的时候,path是 "/zhenghui/lsf/address/interfacename:1.0.0" 那么请保证 "/zhenghui/lsf/address"节点是存在的,否则会报错. */ public abstract class ZookeeperWatcher implements Watcher { private Logger logger = LoggerFactory.getLogger(ZookeeperWatcher.class); private CountDownLatch connectedSemaphore = new CountDownLatch(1); private ZooKeeper zk; protected static final String DEFAULT_SERVER_PATH = "/zhenghui/lsf/address"; /** * 节点path的后缀 */ private static final String DEFAULT_PATH_SUFFIX = "zhenghui"; protected static final String separator = "/"; private static final String charset_utf8 = "utf-8"; private Stat stat = new Stat(); /** * 用来记录watch被调用次数 */ AtomicInteger seq = new AtomicInteger(); /** * 地址变更,需要做对应的处理.比如缓存清理等 */ abstract protected void addressChangeHolder(String path); /** * 创建zk连接 * */ protected void createConnection(String connectString, int sessionTimeout) throws LSFException { //先关闭连接 releaseConnection(); try { zk = new ZooKeeper(connectString, sessionTimeout, this); logger.info(connectString + "开始连接ZK服务器"); connectedSemaphore.await(); } catch (Exception e) { logger.error("zhenghui.lsf.configserver.impl.AddressComponent.createConnection error"); throw new LSFException("zhenghui.lsf.configserver.impl.ZookeeperWatcher.createConnection error", e); } } /** * 关闭ZK连接 */ protected void releaseConnection() { if (zk != null) { try { this.zk.close(); } catch (Exception e) { logger.error("zhenghui.lsf.configserver.impl.AddressComponent.releaseConnection error"); } } } /** * 创建对应的节点. */ protected boolean createPath(String path, String data) { try { //先判断path是否存在 Stat stat = exists(path, true); //如果不存在,则创建 if(stat == null){ this.zk.create(path,"zhenghui".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); logger.info("父节点创建成功.path= " + path); } String childPath = path + separator + DEFAULT_PATH_SUFFIX; this.zk.create(childPath,data.getBytes(charset_utf8),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); logger.info("子节点创建成功.path= " + childPath); return true; } catch (Exception e) { logger.error("zhenghui.lsf.configserver.impl.ZookeeperWatcher.createPath",e); return false; } } protected Stat exists(String path, boolean needWatch) { try { return this.zk.exists(path, needWatch); } catch (Exception e) { return null; } } /** * 获取子节点 * * @param path 节点path */ protected List<String> getChildren(String path, boolean needWatch) { try { List<String> newServerList = new ArrayList<String>(); List<String> subList = this.zk.getChildren(path, needWatch); if(subList != null && !subList.isEmpty()){ for (String subNode : subList) { // 获取每个子节点下关联的server地址 byte[] data = zk.getData(path + separator + subNode, false, stat); newServerList.add(new String(data, charset_utf8)); } } return newServerList; } catch (Exception e) { logger.error("zhenghui.lsf.configserver.impl.ZookeeperWatcher.getChildren", e); return null; } } @Override public void process(WatchedEvent event){ // try { // Thread.sleep(300); // } catch (Exception e) {} if (event == null) return; String logPrefix = "Watch-" + seq.incrementAndGet() + ":"; logger.info(logPrefix + event.toString()); // 连接状态 Watcher.Event.KeeperState keeperState = event.getState(); // 事件类型 Watcher.Event.EventType eventType = event.getType(); // 受影响的path String path = event.getPath(); if (Watcher.Event.KeeperState.SyncConnected == keeperState) { // 成功连接上ZK服务器 if (Watcher.Event.EventType.None == eventType) { logger.info(logPrefix + "成功连接上ZK服务器"); connectedSemaphore.countDown(); } else if (Watcher.Event.EventType.NodeChildrenChanged == eventType) { logger.info(logPrefix + "子节点变更"); //如果是 DEFAULT_SERVER_PATH下面的接口变动,则说明是新增接口,不需要触发holder if(!path.equals(DEFAULT_SERVER_PATH)){ addressChangeHolder(path); } } } //下面可以做一些重连的工作. else if (Watcher.Event.KeeperState.Disconnected == keeperState) { logger.error(logPrefix + "与ZK服务器断开连接"); } else if (Watcher.Event.KeeperState.AuthFailed == keeperState) { logger.error(logPrefix + "权限检查失败"); } else if (Watcher.Event.KeeperState.Expired == keeperState) { logger.error(logPrefix + "会话失效"); } } }
相关推荐
LSF集群使用命令说明 每个命令都有详细说明 英文版
spectrum-lsf-10.1.0-documentation.pdf
TTP229-LSF TonTouchTM IC是一款使用电容感应式原理设计的触摸芯片。此芯片内建稳压电路供触摸传感器使用,稳定的触摸效果可以应用在各种不同应用上,人体触摸面板可以通过非导电性绝缘材料连接,主要应用是以取代...
针对目前单机编译环境中编译资源局限、编译作业执行时间过长等问题,通过对网格集群技术的研究,提出了一种基于集群技术的网格并行编译服务模型。该模型中首先对编译作业进行分解,并依据作业调度算法,把分解后的元...
lsf操作手册 openlava亦可以使用之 详细的操作手册 lsf是IBM的一款集群调度软件 openlava是一款兼容lsf操作的集群调度软件
LSF使用方法、提交作业命令、LSF队列状况、查看作业状态和删除作业等
它们可以与各种版本的LSF一起使用,并且由LSF开发来维护,尽管我们从开源社区中获得了贡献。 如果您计划或希望为该库做出贡献,则必须遵循此存储库根目录中随附的中的DCO流程。 从本质上讲,它要求您在拉取请求的...
我们已经回顾了Romanets等人的统一耦合通道模型中使用的重归一化程序。 (Phys Rev D 85:114032,2012),及其对C = 1,S = -2和I = 0的影响,最近LHCb协作观察到了五个c(ˆ)状态 。 在模型中使用的介子-重子相互...
LSF-260型砂水分离器技术说明.pdf
#lsf-stats lsf-stats使用带有实时散点图和历史图的 IBM 平台负载共享设施 (LSF) 显示集群的资源利用率统计信息。 这些实时图表可在网络浏览器中查看。 此应用程序是使用 、 、 和 api 构建的。 安装 make 打开...
在共享的多租户环境中为不同的使用者和工作负载提供改进的服务水平 优化了通用图形处理单元(GPGPU)等昂贵资源的使用,以帮助确保分配给他们最重要的工作 部署选项 LSF作为Kubernetes的调度程序 想要为Kubernetes...
lsf-perl-api:LSF Perl模块用来操纵所有LSF的位置
建立React材料UI Redux 草稿JS ChartJS 棱镜JS React降价React完整日历快速开始安装依赖项: npm install或yarn 启动服务器: npm run start或yarn start 视图处于打开状态: localhost:3000 推荐最新的...LSF-MSF-v1.0
超级计算cluster资源调度软件LSF管理员配置手册。
matlab灰色处理代码LSF和MTF数码相机的测量 介绍 概括 在这项工作中,我尝试找到智能手机的线路扩展功能(LSF)和调制传递函数(MTF)。 对象是在Mathematica 12.1.1程序中设计的。 对于LSF,我设计了具有不同宽度的...
介绍LSF 许可证调度的安装、配置和管理
LPC与LSF系数转换的FPGA实现.pdf
官网下载奇慢,需要的下载吧
LSF v10管理员手册
语言:English Quick Chimp是一个扩展,允许您在剪裁后发布到LiveStreamFail的ProdredDit。(更新为新的UI。) Quick Chimp是一个扩展,允许您在剪裁后将发布到LiveStreamFail的Prodreddit。