博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
基于zookeeper、连接池、Failover/LoadBalance等改造Thrift 服务化
阅读量:6240 次
发布时间:2019-06-22

本文共 51503 字,大约阅读时间需要 171 分钟。

对于Thrift服务化的改造,主要是客户端,可以从如下几个方面进行:

1.服务端的服务注册,客户端自动发现,无需手工修改配置,这里我们使用zookeeper,但由于zookeeper本身提供的客户端使用较为复杂,因此采用curator-recipes工具类进行处理服务的注册与发现。

2.客户端使用连接池对服务调用进行管理,提升性能,这里我们使用Apache Commons项目commons-pool,可以大大减少代码的复杂度。

3.关于Failover/LoadBalance,由于zookeeper的watcher,当服务端不可用是及时通知客户端,并移除不可用的服务节点,而LoadBalance有很多算法,这里我们采用随机加权方式,也是常有的负载算法,至于其他的算法介绍参考:。

4.使thrift服务的注册和发现可以基于spring配置,可以提供很多的便利。

5.其他的改造如:

1)通过动态代理实现client和server端的交互细节透明化,让用户只需通过服务方提供的接口进行访问

2)Thrift通过两种方式调用服务Client和Iface

 

[java]
  1. // *) Client API 调用  
  2. (EchoService.Client)client.echo("hello lilei");  ---(1)  
  3. // *) Service 接口 调用  
  4. (EchoService.Iface)service.echo("hello lilei");  ---(2)  
// *) Client API 调用(EchoService.Client)client.echo("hello lilei");  ---(1)// *) Service 接口 调用(EchoService.Iface)service.echo("hello lilei");  ---(2)

Client API的方式, 不推荐, 我们推荐Service接口的方式(服务化)。

 

下面我们来一一实现:

一、pom.xml引入依赖jar包

 

[html]
  1. <dependency>  
  2.             <groupId>org.apache.thrift</groupId>  
  3.             <artifactId>libthrift</artifactId>  
  4.             <version>0.9.2</version>  
  5.         </dependency>  
  6.         <dependency>  
  7.             <groupId>commons-pool</groupId>  
  8.             <artifactId>commons-pool</artifactId>  
  9.             <version>1.6</version>  
  10.         </dependency>  
  11.         <dependency>  
  12.             <groupId>org.springframework</groupId>  
  13.             <artifactId>spring-context</artifactId>  
  14.             <version>4.0.9.RELEASE</version>  
  15.         </dependency>  
  16.   
  17.         <dependency>  
  18.             <groupId>org.apache.zookeeper</groupId>  
  19.             <artifactId>zookeeper</artifactId>  
  20.             <version>3.4.6</version>  
  21.         </dependency>  
  22.         <dependency>  
  23.             <groupId>org.apache.curator</groupId>  
  24.             <artifactId>curator-recipes</artifactId>  
  25.             <version>2.7.1</version>  
  26.         </dependency>  
org.apache.thrift
libthrift
0.9.2
commons-pool
commons-pool
1.6
org.springframework
spring-context
4.0.9.RELEASE
org.apache.zookeeper
zookeeper
3.4.6
org.apache.curator
curator-recipes
2.7.1

二、使用zookeeper管理服务节点配置

RPC服务往平台化的方向发展, 会屏蔽掉更多的服务细节(服务的IP地址集群, 集群的扩容和迁移), 只暴露服务接口. 这部分的演化, 使得server端和client端完全的解耦合. 两者的交互通过ConfigServer(MetaServer)的中介角色来搭线。

注: 该图源自dubbo的官网

这边借助Zookeeper来扮演该角色, server扮演发布者的角色, 而client扮演订阅者的角色.

Zookeeper是分布式应用协作服务. 它实现了paxos的一致性算法, 在命名管理/配置推送/数据同步/主从切换方面扮演重要的角色。 其数据组织类似文件系统的目录结构: 

每个节点被称为znode, 为znode节点依据其特性, 又可以分为如下类型:

  1). PERSISTENT: 永久节点
  2). EPHEMERAL: 临时节点, 会随session(client disconnect)的消失而消失
  3). PERSISTENT_SEQUENTIAL: 永久节点, 其节点的名字编号是单调递增的
  4). EPHEMERAL_SEQUENTIAL: 临时节点, 其节点的名字编号是单调递增的
  注: 临时节点不能成为父节点
  Watcher观察模式, client可以注册对节点的状态/内容变更的事件回调机制. 其Event事件的两类属性需要关注下:
  1). KeeperState: Disconnected,SyncConnected,Expired
  2). EventType: None,NodeCreated,NodeDeleted,NodeDataChanged,NodeChildrenChanged
RPC服务端:
  作为具体业务服务的RPC服务发布方, 对其自身的服务描述由以下元素构成.
  1). namespace: 命名空间,来区分不同应用 
  2). service: 服务接口, 采用发布方的类全名来表示
  3). version: 版本号
  借鉴了Maven的GAV坐标系, 三维坐标系更符合服务平台化的大环境.
  *) 数据模型的设计
  具体RPC服务的注册路径为: /rpc/{namespace}/{service}/{version}, 该路径上的节点都是永久节点
  RPC服务集群节点的注册路径为: /rpc/{namespace}/{service}/{version}/{ip:port:weight}, 末尾的节点是临时节点.

1.定义Zookeeper的客户端的管理

ZookeeperFactory.java

 

[java]
  1. package cn.slimsmart.thrift.rpc.zookeeper;  
  2.   
  3. import org.apache.curator.framework.CuratorFramework;  
  4. import org.apache.curator.framework.CuratorFrameworkFactory;  
  5. import org.apache.curator.retry.ExponentialBackoffRetry;  
  6. import org.springframework.beans.factory.FactoryBean;  
  7. import org.springframework.util.StringUtils;  
  8.   
  9. /** 
  10.  * 获取zookeeper客户端链接 
  11.  */  
  12. public class ZookeeperFactory implements FactoryBean<CuratorFramework> {  
  13.   
  14.     private String zkHosts;  
  15.     // session超时  
  16.     private int sessionTimeout = 30000;  
  17.     private int connectionTimeout = 30000;  
  18.   
  19.     // 共享一个zk链接  
  20.     private boolean singleton = true;  
  21.   
  22.     // 全局path前缀,常用来区分不同的应用  
  23.     private String namespace;  
  24.   
  25.     private final static String ROOT = "rpc";  
  26.   
  27.     private CuratorFramework zkClient;  
  28.   
  29.     public void setZkHosts(String zkHosts) {  
  30.         this.zkHosts = zkHosts;  
  31.     }  
  32.   
  33.     public void setSessionTimeout(int sessionTimeout) {  
  34.         this.sessionTimeout = sessionTimeout;  
  35.     }  
  36.   
  37.     public void setConnectionTimeout(int connectionTimeout) {  
  38.         this.connectionTimeout = connectionTimeout;  
  39.     }  
  40.   
  41.     public void setSingleton(boolean singleton) {  
  42.         this.singleton = singleton;  
  43.     }  
  44.   
  45.     public void setNamespace(String namespace) {  
  46.         this.namespace = namespace;  
  47.     }  
  48.   
  49.     public void setZkClient(CuratorFramework zkClient) {  
  50.         this.zkClient = zkClient;  
  51.     }  
  52.   
  53.     @Override  
  54.     public CuratorFramework getObject() throws Exception {  
  55.         if (singleton) {  
  56.             if (zkClient == null) {  
  57.                 zkClient = create();  
  58.                 zkClient.start();  
  59.             }  
  60.             return zkClient;  
  61.         }  
  62.         return create();  
  63.     }  
  64.   
  65.     @Override  
  66.     public Class<?> getObjectType() {  
  67.         return CuratorFramework.class;  
  68.     }  
  69.   
  70.     @Override  
  71.     public boolean isSingleton() {  
  72.         return singleton;  
  73.     }  
  74.   
  75.     public CuratorFramework create() throws Exception {  
  76.         if (StringUtils.isEmpty(namespace)) {  
  77.             namespace = ROOT;  
  78.         } else {  
  79.             namespace = ROOT +"/"+ namespace;  
  80.         }  
  81.         return create(zkHosts, sessionTimeout, connectionTimeout, namespace);  
  82.     }  
  83.   
  84.     public static CuratorFramework create(String connectString, int sessionTimeout, int connectionTimeout, String namespace) {  
  85.         CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();  
  86.         return builder.connectString(connectString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(30000)  
  87.                 .canBeReadOnly(true).namespace(namespace).retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))  
  88.                 .defaultData(null).build();  
  89.     }  
  90.   
  91.     public void close() {  
  92.         if (zkClient != null) {  
  93.             zkClient.close();  
  94.         }  
  95.     }  
  96. }  
package cn.slimsmart.thrift.rpc.zookeeper;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.retry.ExponentialBackoffRetry;import org.springframework.beans.factory.FactoryBean;import org.springframework.util.StringUtils;/** * 获取zookeeper客户端链接 */public class ZookeeperFactory implements FactoryBean
{ private String zkHosts; // session超时 private int sessionTimeout = 30000; private int connectionTimeout = 30000; // 共享一个zk链接 private boolean singleton = true; // 全局path前缀,常用来区分不同的应用 private String namespace; private final static String ROOT = "rpc"; private CuratorFramework zkClient; public void setZkHosts(String zkHosts) { this.zkHosts = zkHosts; } public void setSessionTimeout(int sessionTimeout) { this.sessionTimeout = sessionTimeout; } public void setConnectionTimeout(int connectionTimeout) { this.connectionTimeout = connectionTimeout; } public void setSingleton(boolean singleton) { this.singleton = singleton; } public void setNamespace(String namespace) { this.namespace = namespace; } public void setZkClient(CuratorFramework zkClient) { this.zkClient = zkClient; } @Override public CuratorFramework getObject() throws Exception { if (singleton) { if (zkClient == null) { zkClient = create(); zkClient.start(); } return zkClient; } return create(); } @Override public Class
getObjectType() { return CuratorFramework.class; } @Override public boolean isSingleton() { return singleton; } public CuratorFramework create() throws Exception { if (StringUtils.isEmpty(namespace)) { namespace = ROOT; } else { namespace = ROOT +"/"+ namespace; } return create(zkHosts, sessionTimeout, connectionTimeout, namespace); } public static CuratorFramework create(String connectString, int sessionTimeout, int connectionTimeout, String namespace) { CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); return builder.connectString(connectString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(30000) .canBeReadOnly(true).namespace(namespace).retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE)) .defaultData(null).build(); } public void close() { if (zkClient != null) { zkClient.close(); } }}

 

2.服务端注册服务

由于服务端配置需要获取本机的IP地址,因此定义IP获取接口

ThriftServerIpResolve.java

 

[java]
  1. package cn.slimsmart.thrift.rpc.zookeeper;  
  2.   
  3. /** 
  4.  *  
  5.  * 解析thrift-server端IP地址,用于注册服务 
  6.  * 1) 可以从一个物理机器或者虚机的特殊文件中解析 
  7.  * 2) 可以获取指定网卡序号的Ip 
  8.  * 3) 其他 
  9.  */  
  10. public interface ThriftServerIpResolve {  
  11.       
  12.     String getServerIp() throws Exception;  
  13.       
  14.     void reset();  
  15.       
  16.     //当IP变更时,将会调用reset方法  
  17.     static interface IpRestCalllBack{  
  18.         public void rest(String newIp);  
  19.     }  
  20. }  
package cn.slimsmart.thrift.rpc.zookeeper;/** *  * 解析thrift-server端IP地址,用于注册服务 * 1) 可以从一个物理机器或者虚机的特殊文件中解析 * 2) 可以获取指定网卡序号的Ip * 3) 其他 */public interface ThriftServerIpResolve {		String getServerIp() throws Exception;		void reset();		//当IP变更时,将会调用reset方法	static interface IpRestCalllBack{		public void rest(String newIp);	}}

可以对该接口做不通的实现,下面我们基于网卡获取IP地址,也可以通过配置serverIp

ThriftServerIpLocalNetworkResolve.java

 

 

[java]
  1. package cn.slimsmart.thrift.rpc.zookeeper;  
  2.   
  3. import java.net.Inet6Address;  
  4. import java.net.InetAddress;  
  5. import java.net.NetworkInterface;  
  6. import java.net.SocketException;  
  7. import java.util.Enumeration;  
  8.   
  9. import org.slf4j.Logger;  
  10. import org.slf4j.LoggerFactory;  
  11.   
  12. /** 
  13.  * 解析网卡Ip 
  14.  * 
  15.  */  
  16. public class ThriftServerIpLocalNetworkResolve implements ThriftServerIpResolve {  
  17.       
  18.     private Logger logger = LoggerFactory.getLogger(getClass());  
  19.   
  20.     //缓存  
  21.     private String serverIp;  
  22.       
  23.     public void setServerIp(String serverIp) {  
  24.         this.serverIp = serverIp;  
  25.     }  
  26.   
  27.     @Override  
  28.     public String getServerIp() {  
  29.         if (serverIp != null) {  
  30.             return serverIp;  
  31.         }  
  32.         // 一个主机有多个网络接口  
  33.         try {  
  34.             Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces();  
  35.             while (netInterfaces.hasMoreElements()) {  
  36.                 NetworkInterface netInterface = netInterfaces.nextElement();  
  37.                 // 每个网络接口,都会有多个"网络地址",比如一定会有lookback地址,会有siteLocal地址等.以及IPV4或者IPV6 .  
  38.                 Enumeration<InetAddress> addresses = netInterface.getInetAddresses();  
  39.                 while (addresses.hasMoreElements()) {  
  40.                     InetAddress address = addresses.nextElement();  
  41.                     if(address instanceof Inet6Address){  
  42.                         continue;  
  43.                     }  
  44.                     if (address.isSiteLocalAddress() && !address.isLoopbackAddress()) {  
  45.                         serverIp = address.getHostAddress();  
  46.                         logger.info("resolve server ip :"+ serverIp);  
  47.                         continue;  
  48.                     }  
  49.                 }  
  50.             }  
  51.         } catch (SocketException e) {  
  52.             e.printStackTrace();  
  53.         }  
  54.         return serverIp;  
  55.     }  
  56.   
  57.     @Override  
  58.     public void reset() {  
  59.         serverIp = null;  
  60.     }  
  61. }  
package cn.slimsmart.thrift.rpc.zookeeper;import java.net.Inet6Address;import java.net.InetAddress;import java.net.NetworkInterface;import java.net.SocketException;import java.util.Enumeration;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * 解析网卡Ip * */public class ThriftServerIpLocalNetworkResolve implements ThriftServerIpResolve {		private Logger logger = LoggerFactory.getLogger(getClass());	//缓存	private String serverIp;		public void setServerIp(String serverIp) {		this.serverIp = serverIp;	}	@Override	public String getServerIp() {		if (serverIp != null) {			return serverIp;		}		// 一个主机有多个网络接口		try {			Enumeration
netInterfaces = NetworkInterface.getNetworkInterfaces(); while (netInterfaces.hasMoreElements()) { NetworkInterface netInterface = netInterfaces.nextElement(); // 每个网络接口,都会有多个"网络地址",比如一定会有lookback地址,会有siteLocal地址等.以及IPV4或者IPV6 . Enumeration
addresses = netInterface.getInetAddresses(); while (addresses.hasMoreElements()) { InetAddress address = addresses.nextElement(); if(address instanceof Inet6Address){ continue; } if (address.isSiteLocalAddress() && !address.isLoopbackAddress()) { serverIp = address.getHostAddress(); logger.info("resolve server ip :"+ serverIp); continue; } } } } catch (SocketException e) { e.printStackTrace(); } return serverIp; } @Override public void reset() { serverIp = null; }}

接下来我们定义发布服务接口,并实现将服务信息(服务接口、版本号,IP、port、weight)发布到zookeeper中。

ThriftServerAddressRegister.java

 

 

[java]
  1. package cn.slimsmart.thrift.rpc.zookeeper;  
  2.   
  3. /** 
  4.  * 发布服务地址及端口到服务注册中心,这里是zookeeper服务器 
  5.  */  
  6. public interface ThriftServerAddressRegister {  
  7.     /** 
  8.      * 发布服务接口 
  9.      * @param service 服务接口名称,一个产品中不能重复 
  10.      * @param version 服务接口的版本号,默认1.0.0 
  11.      * @param address 服务发布的地址和端口 
  12.      */  
  13.     void register(String service,String version,String address);  
  14. }  
package cn.slimsmart.thrift.rpc.zookeeper;/** * 发布服务地址及端口到服务注册中心,这里是zookeeper服务器 */public interface ThriftServerAddressRegister {	/**	 * 发布服务接口	 * @param service 服务接口名称,一个产品中不能重复	 * @param version 服务接口的版本号,默认1.0.0	 * @param address 服务发布的地址和端口	 */	void register(String service,String version,String address);}

实现:ThriftServerAddressRegisterZookeeper.java

 

 

[java]
  1. package cn.slimsmart.thrift.rpc.zookeeper;  
  2.   
  3. import java.io.UnsupportedEncodingException;  
  4.   
  5. import org.apache.curator.framework.CuratorFramework;  
  6. import org.apache.curator.framework.imps.CuratorFrameworkState;  
  7. import org.apache.zookeeper.CreateMode;  
  8. import org.slf4j.Logger;  
  9. import org.slf4j.LoggerFactory;  
  10. import org.springframework.util.StringUtils;  
  11.   
  12. import cn.slimsmart.thrift.rpc.ThriftException;  
  13.   
  14. /** 
  15.  *  注册服务列表到Zookeeper 
  16.  */  
  17. public class ThriftServerAddressRegisterZookeeper implements ThriftServerAddressRegister{  
  18.       
  19.     private Logger logger = LoggerFactory.getLogger(getClass());  
  20.       
  21.     private CuratorFramework zkClient;  
  22.       
  23.     public ThriftServerAddressRegisterZookeeper(){}  
  24.       
  25.     public ThriftServerAddressRegisterZookeeper(CuratorFramework zkClient){  
  26.         this.zkClient = zkClient;  
  27.     }  
  28.   
  29.     public void setZkClient(CuratorFramework zkClient) {  
  30.         this.zkClient = zkClient;  
  31.     }  
  32.   
  33.     @Override  
  34.     public void register(String service, String version, String address) {  
  35.         if(zkClient.getState() == CuratorFrameworkState.LATENT){  
  36.             zkClient.start();  
  37.         }  
  38.         if(StringUtils.isEmpty(version)){  
  39.             version="1.0.0";  
  40.         }  
  41.         //临时节点  
  42.         try {  
  43.             zkClient.create()  
  44.                 .creatingParentsIfNeeded()  
  45.                 .withMode(CreateMode.EPHEMERAL)  
  46.                 .forPath("/"+service+"/"+version+"/"+address);  
  47.         } catch (UnsupportedEncodingException e) {  
  48.             logger.error("register service address to zookeeper exception:{}",e);  
  49.             throw new ThriftException("register service address to zookeeper exception: address UnsupportedEncodingException", e);  
  50.         } catch (Exception e) {  
  51.             logger.error("register service address to zookeeper exception:{}",e);  
  52.             throw new ThriftException("register service address to zookeeper exception:{}", e);  
  53.         }  
  54.     }  
  55.       
  56.     public void close(){  
  57.         zkClient.close();  
  58.     }  
  59. }  
package cn.slimsmart.thrift.rpc.zookeeper;import java.io.UnsupportedEncodingException;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.imps.CuratorFrameworkState;import org.apache.zookeeper.CreateMode;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.util.StringUtils;import cn.slimsmart.thrift.rpc.ThriftException;/** *  注册服务列表到Zookeeper */public class ThriftServerAddressRegisterZookeeper implements ThriftServerAddressRegister{		private Logger logger = LoggerFactory.getLogger(getClass());		private CuratorFramework zkClient;		public ThriftServerAddressRegisterZookeeper(){}		public ThriftServerAddressRegisterZookeeper(CuratorFramework zkClient){		this.zkClient = zkClient;	}	public void setZkClient(CuratorFramework zkClient) {		this.zkClient = zkClient;	}	@Override	public void register(String service, String version, String address) {		if(zkClient.getState() == CuratorFrameworkState.LATENT){			zkClient.start();		}		if(StringUtils.isEmpty(version)){			version="1.0.0";		}		//临时节点		try {			zkClient.create()				.creatingParentsIfNeeded()				.withMode(CreateMode.EPHEMERAL)				.forPath("/"+service+"/"+version+"/"+address);		} catch (UnsupportedEncodingException e) {			logger.error("register service address to zookeeper exception:{}",e);			throw new ThriftException("register service address to zookeeper exception: address UnsupportedEncodingException", e);		} catch (Exception e) {			logger.error("register service address to zookeeper exception:{}",e);			throw new ThriftException("register service address to zookeeper exception:{}", e);		}	}		public void close(){		zkClient.close();	}}

 

3.客户端发现服务

定义获取服务地址接口

ThriftServerAddressProvider.java

 

[java]
  1. package cn.slimsmart.thrift.rpc.zookeeper;  
  2.   
  3. import java.net.InetSocketAddress;  
  4. import java.util.List;  
  5.   
  6. /** 
  7.  * thrift server-service地址提供者,以便构建客户端连接池 
  8.  */  
  9. public interface ThriftServerAddressProvider {  
  10.       
  11.     //获取服务名称  
  12.     String getService();  
  13.   
  14.     /** 
  15.      * 获取所有服务端地址 
  16.      * @return 
  17.      */  
  18.     List<InetSocketAddress> findServerAddressList();  
  19.   
  20.     /** 
  21.      * 选取一个合适的address,可以随机获取等' 
  22.      * 内部可以使用合适的算法. 
  23.      * @return 
  24.      */  
  25.     InetSocketAddress selector();  
  26.   
  27.     void close();  
  28. }  
package cn.slimsmart.thrift.rpc.zookeeper;import java.net.InetSocketAddress;import java.util.List;/** * thrift server-service地址提供者,以便构建客户端连接池 */public interface ThriftServerAddressProvider {		//获取服务名称	String getService();	/**	 * 获取所有服务端地址	 * @return	 */    List
findServerAddressList(); /** * 选取一个合适的address,可以随机获取等' * 内部可以使用合适的算法. * @return */ InetSocketAddress selector(); void close();}

基于zookeeper服务地址自动发现实现:ThriftServerAddressProviderZookeeper.java

[java]
  1. package cn.slimsmart.thrift.rpc.zookeeper;  
  2.   
  3. import java.net.InetSocketAddress;  
  4. import java.util.ArrayList;  
  5. import java.util.Collections;  
  6. import java.util.HashSet;  
  7. import java.util.LinkedList;  
  8. import java.util.List;  
  9. import java.util.Queue;  
  10. import java.util.Set;  
  11.   
  12. import org.apache.curator.framework.CuratorFramework;  
  13. import org.apache.curator.framework.imps.CuratorFrameworkState;  
  14. import org.apache.curator.framework.recipes.cache.ChildData;  
  15. import org.apache.curator.framework.recipes.cache.PathChildrenCache;  
  16. import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;  
  17. import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;  
  18. import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;  
  19. import org.slf4j.Logger;  
  20. import org.slf4j.LoggerFactory;  
  21. import org.springframework.beans.factory.InitializingBean;  
  22.   
  23. /** 
  24.  * 使用zookeeper作为"config"中心,使用apache-curator方法库来简化zookeeper开发 
  25.  */  
  26. public class ThriftServerAddressProviderZookeeper implements ThriftServerAddressProvider, InitializingBean {  
  27.   
  28.     private Logger logger = LoggerFactory.getLogger(getClass());  
  29.   
  30.     // 注册服务  
  31.     private String service;  
  32.     // 服务版本号  
  33.     private String version = "1.0.0";  
  34.   
  35.     private PathChildrenCache cachedPath;  
  36.   
  37.     private CuratorFramework zkClient;  
  38.   
  39.     // 用来保存当前provider所接触过的地址记录  
  40.     // 当zookeeper集群故障时,可以使用trace中地址,作为"备份"  
  41.     private Set<String> trace = new HashSet<String>();  
  42.   
  43.     private final List<InetSocketAddress> container = new ArrayList<InetSocketAddress>();  
  44.   
  45.     private Queue<InetSocketAddress> inner = new LinkedList<InetSocketAddress>();  
  46.   
  47.     private Object lock = new Object();  
  48.   
  49.     // 默认权重  
  50.     private static final Integer DEFAULT_WEIGHT = 1;  
  51.   
  52.     public void setService(String service) {  
  53.         this.service = service;  
  54.     }  
  55.   
  56.     public void setVersion(String version) {  
  57.         this.version = version;  
  58.     }  
  59.   
  60.     public ThriftServerAddressProviderZookeeper() {  
  61.     }  
  62.   
  63.     public ThriftServerAddressProviderZookeeper(CuratorFramework zkClient) {  
  64.         this.zkClient = zkClient;  
  65.     }  
  66.   
  67.     public void setZkClient(CuratorFramework zkClient) {  
  68.         this.zkClient = zkClient;  
  69.     }  
  70.   
  71.     @Override  
  72.     public void afterPropertiesSet() throws Exception {  
  73.         // 如果zk尚未启动,则启动  
  74.         if (zkClient.getState() == CuratorFrameworkState.LATENT) {  
  75.             zkClient.start();  
  76.         }  
  77.         buildPathChildrenCache(zkClient, getServicePath(), true);  
  78.         cachedPath.start(StartMode.POST_INITIALIZED_EVENT);  
  79.     }  
  80.   
  81.     private String getServicePath(){  
  82.         return "/" + service + "/" + version;  
  83.     }  
  84.     private void buildPathChildrenCache(final CuratorFramework client, String path, Boolean cacheData) throws Exception {  
  85.         cachedPath = new PathChildrenCache(client, path, cacheData);  
  86.         cachedPath.getListenable().addListener(new PathChildrenCacheListener() {  
  87.             @Override  
  88.             public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {  
  89.                 PathChildrenCacheEvent.Type eventType = event.getType();  
  90.                 switch (eventType) {  
  91.                 case CONNECTION_RECONNECTED:  
  92.                     logger.info("Connection is reconection.");  
  93.                     break;  
  94.                 case CONNECTION_SUSPENDED:  
  95.                     logger.info("Connection is suspended.");  
  96.                     break;  
  97.                 case CONNECTION_LOST:  
  98.                     logger.warn("Connection error,waiting...");  
  99.                     return;  
  100.                 default:  
  101.                     //  
  102.                 }  
  103.                 // 任何节点的时机数据变动,都会rebuild,此处为一个"简单的"做法.  
  104.                 cachedPath.rebuild();  
  105.                 rebuild();  
  106.             }  
  107.   
  108.             protected void rebuild() throws Exception {  
  109.                 List<ChildData> children = cachedPath.getCurrentData();  
  110.                 if (children == null || children.isEmpty()) {  
  111.                     // 有可能所有的thrift server都与zookeeper断开了链接  
  112.                     // 但是,有可能,thrift client与thrift server之间的网络是良好的  
  113.                     // 因此此处是否需要清空container,是需要多方面考虑的.  
  114.                     container.clear();  
  115.                     logger.error("thrift server-cluster error....");  
  116.                     return;  
  117.                 }  
  118.                 List<InetSocketAddress> current = new ArrayList<InetSocketAddress>();  
  119.                 String path = null;  
  120.                 for (ChildData data : children) {  
  121.                     path = data.getPath();  
  122.                     logger.debug("get path:"+path);  
  123.                     path = path.substring(getServicePath().length()+1);  
  124.                     logger.debug("get serviceAddress:"+path);  
  125.                     String address = new String(path.getBytes(), "utf-8");  
  126.                     current.addAll(transfer(address));  
  127.                     trace.add(address);  
  128.                 }  
  129.                 Collections.shuffle(current);  
  130.                 synchronized (lock) {  
  131.                     container.clear();  
  132.                     container.addAll(current);  
  133.                     inner.clear();  
  134.                     inner.addAll(current);  
  135.   
  136.                 }  
  137.             }  
  138.         });  
  139.     }  
  140.   
  141.     private List<InetSocketAddress> transfer(String address) {  
  142.         String[] hostname = address.split(":");  
  143.         Integer weight = DEFAULT_WEIGHT;  
  144.         if (hostname.length == 3) {  
  145.             weight = Integer.valueOf(hostname[2]);  
  146.         }  
  147.         String ip = hostname[0];  
  148.         Integer port = Integer.valueOf(hostname[1]);  
  149.         List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();  
  150.         // 根据优先级,将ip:port添加多次到地址集中,然后随机取地址实现负载  
  151.         for (int i = 0; i < weight; i++) {  
  152.             result.add(new InetSocketAddress(ip, port));  
  153.         }  
  154.         return result;  
  155.     }  
  156.   
  157.     @Override  
  158.     public List<InetSocketAddress> findServerAddressList() {  
  159.         return Collections.unmodifiableList(container);  
  160.     }  
  161.   
  162.     @Override  
  163.     public synchronized InetSocketAddress selector() {  
  164.         if (inner.isEmpty()) {  
  165.             if (!container.isEmpty()) {  
  166.                 inner.addAll(container);  
  167.             } else if (!trace.isEmpty()) {  
  168.                 synchronized (lock) {  
  169.                     for (String hostname : trace) {  
  170.                         container.addAll(transfer(hostname));  
  171.                     }  
  172.                     Collections.shuffle(container);  
  173.                     inner.addAll(container);  
  174.                 }  
  175.             }  
  176.         }  
  177.         return inner.poll();  
  178.     }  
  179.   
  180.     @Override  
  181.     public void close() {  
  182.         try {  
  183.             cachedPath.close();  
  184.             zkClient.close();  
  185.         } catch (Exception e) {  
  186.         }  
  187.     }  
  188.   
  189.     @Override  
  190.     public String getService() {  
  191.         return service;  
  192.     }  
  193.   
  194. }  
package cn.slimsmart.thrift.rpc.zookeeper;import java.net.InetSocketAddress;import java.util.ArrayList;import java.util.Collections;import java.util.HashSet;import java.util.LinkedList;import java.util.List;import java.util.Queue;import java.util.Set;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.imps.CuratorFrameworkState;import org.apache.curator.framework.recipes.cache.ChildData;import org.apache.curator.framework.recipes.cache.PathChildrenCache;import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.InitializingBean;/** * 使用zookeeper作为"config"中心,使用apache-curator方法库来简化zookeeper开发 */public class ThriftServerAddressProviderZookeeper implements ThriftServerAddressProvider, InitializingBean {	private Logger logger = LoggerFactory.getLogger(getClass());	// 注册服务	private String service;	// 服务版本号	private String version = "1.0.0";	private PathChildrenCache cachedPath;	private CuratorFramework zkClient;	// 用来保存当前provider所接触过的地址记录	// 当zookeeper集群故障时,可以使用trace中地址,作为"备份"	private Set
trace = new HashSet
(); private final List
container = new ArrayList
(); private Queue
inner = new LinkedList
(); private Object lock = new Object(); // 默认权重 private static final Integer DEFAULT_WEIGHT = 1; public void setService(String service) { this.service = service; } public void setVersion(String version) { this.version = version; } public ThriftServerAddressProviderZookeeper() { } public ThriftServerAddressProviderZookeeper(CuratorFramework zkClient) { this.zkClient = zkClient; } public void setZkClient(CuratorFramework zkClient) { this.zkClient = zkClient; } @Override public void afterPropertiesSet() throws Exception { // 如果zk尚未启动,则启动 if (zkClient.getState() == CuratorFrameworkState.LATENT) { zkClient.start(); } buildPathChildrenCache(zkClient, getServicePath(), true); cachedPath.start(StartMode.POST_INITIALIZED_EVENT); } private String getServicePath(){ return "/" + service + "/" + version; } private void buildPathChildrenCache(final CuratorFramework client, String path, Boolean cacheData) throws Exception { cachedPath = new PathChildrenCache(client, path, cacheData); cachedPath.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { PathChildrenCacheEvent.Type eventType = event.getType(); switch (eventType) { case CONNECTION_RECONNECTED: logger.info("Connection is reconection."); break; case CONNECTION_SUSPENDED: logger.info("Connection is suspended."); break; case CONNECTION_LOST: logger.warn("Connection error,waiting..."); return; default: // } // 任何节点的时机数据变动,都会rebuild,此处为一个"简单的"做法. cachedPath.rebuild(); rebuild(); } protected void rebuild() throws Exception { List
children = cachedPath.getCurrentData(); if (children == null || children.isEmpty()) { // 有可能所有的thrift server都与zookeeper断开了链接 // 但是,有可能,thrift client与thrift server之间的网络是良好的 // 因此此处是否需要清空container,是需要多方面考虑的. container.clear(); logger.error("thrift server-cluster error...."); return; } List
current = new ArrayList
(); String path = null; for (ChildData data : children) { path = data.getPath(); logger.debug("get path:"+path); path = path.substring(getServicePath().length()+1); logger.debug("get serviceAddress:"+path); String address = new String(path.getBytes(), "utf-8"); current.addAll(transfer(address)); trace.add(address); } Collections.shuffle(current); synchronized (lock) { container.clear(); container.addAll(current); inner.clear(); inner.addAll(current); } } }); } private List
transfer(String address) { String[] hostname = address.split(":"); Integer weight = DEFAULT_WEIGHT; if (hostname.length == 3) { weight = Integer.valueOf(hostname[2]); } String ip = hostname[0]; Integer port = Integer.valueOf(hostname[1]); List
result = new ArrayList
(); // 根据优先级,将ip:port添加多次到地址集中,然后随机取地址实现负载 for (int i = 0; i < weight; i++) { result.add(new InetSocketAddress(ip, port)); } return result; } @Override public List
findServerAddressList() { return Collections.unmodifiableList(container); } @Override public synchronized InetSocketAddress selector() { if (inner.isEmpty()) { if (!container.isEmpty()) { inner.addAll(container); } else if (!trace.isEmpty()) { synchronized (lock) { for (String hostname : trace) { container.addAll(transfer(hostname)); } Collections.shuffle(container); inner.addAll(container); } } } return inner.poll(); } @Override public void close() { try { cachedPath.close(); zkClient.close(); } catch (Exception e) { } } @Override public String getService() { return service; }}

对此接口还做了一种实现,通过配置获取服务地址,参考附件:FixedAddressProvider.java

 

三、服务端服务注册实现

ThriftServiceServerFactory.java

 

[java]
  1. package cn.slimsmart.thrift.rpc;  
  2.   
  3. import java.lang.instrument.IllegalClassFormatException;  
  4. import java.lang.reflect.Constructor;  
  5.   
  6. import org.apache.thrift.TProcessor;  
  7. import org.apache.thrift.TProcessorFactory;  
  8. import org.apache.thrift.protocol.TBinaryProtocol;  
  9. import org.apache.thrift.server.TServer;  
  10. import org.apache.thrift.server.TThreadedSelectorServer;  
  11. import org.apache.thrift.transport.TFramedTransport;  
  12. import org.apache.thrift.transport.TNonblockingServerSocket;  
  13. import org.springframework.beans.factory.InitializingBean;  
  14. import org.springframework.util.StringUtils;  
  15.   
  16. import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressRegister;  
  17. import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerIpLocalNetworkResolve;  
  18. import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerIpResolve;  
  19.   
  20. /** 
  21.  * 服务端注册服务工厂 
  22.  */  
  23. public class ThriftServiceServerFactory implements InitializingBean {  
  24.     // 服务注册本机端口  
  25.     private Integer port = 8299;  
  26.     // 优先级  
  27.     private Integer weight = 1;// default  
  28.     // 服务实现类  
  29.     private Object service;// serice实现类  
  30.     //服务版本号  
  31.     private String version;  
  32.     // 解析本机IP  
  33.     private ThriftServerIpResolve thriftServerIpResolve;  
  34.     //服务注册  
  35.     private ThriftServerAddressRegister thriftServerAddressRegister;  
  36.   
  37.     private ServerThread serverThread;  
  38.       
  39.     public void setPort(Integer port) {  
  40.         this.port = port;  
  41.     }  
  42.   
  43.     public void setWeight(Integer weight) {  
  44.         this.weight = weight;  
  45.     }  
  46.   
  47.     public void setService(Object service) {  
  48.         this.service = service;  
  49.     }  
  50.   
  51.     public void setVersion(String version) {  
  52.         this.version = version;  
  53.     }  
  54.   
  55.     public void setThriftServerIpResolve(ThriftServerIpResolve thriftServerIpResolve) {  
  56.         this.thriftServerIpResolve = thriftServerIpResolve;  
  57.     }  
  58.   
  59.     public void setThriftServerAddressRegister(ThriftServerAddressRegister thriftServerAddressRegister) {  
  60.         this.thriftServerAddressRegister = thriftServerAddressRegister;  
  61.     }  
  62.   
  63.     @Override  
  64.     public void afterPropertiesSet() throws Exception {  
  65.         if (thriftServerIpResolve == null) {  
  66.             thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve();  
  67.         }  
  68.         String serverIP = thriftServerIpResolve.getServerIp();  
  69.         if (StringUtils.isEmpty(serverIP)) {  
  70.             throw new ThriftException("cant find server ip...");  
  71.         }  
  72.   
  73.         String hostname = serverIP + ":" + port + ":" + weight;  
  74.         Class<?> serviceClass = service.getClass();  
  75.         // 获取实现类接口  
  76.         Class<?>[] interfaces = serviceClass.getInterfaces();  
  77.         if (interfaces.length == 0) {  
  78.             throw new IllegalClassFormatException("service-class should implements Iface");  
  79.         }  
  80.         // reflect,load "Processor";  
  81.         TProcessor processor = null;  
  82.         String serviceName = null;  
  83.         for (Class<?> clazz : interfaces) {  
  84.             String cname = clazz.getSimpleName();  
  85.             if (!cname.equals("Iface")) {  
  86.                 continue;  
  87.             }  
  88.             serviceName = clazz.getEnclosingClass().getName();  
  89.             String pname = serviceName + "$Processor";  
  90.             try {  
  91.                 ClassLoader classLoader = Thread.currentThread().getContextClassLoader();  
  92.                 Class<?> pclass = classLoader.loadClass(pname);  
  93.                 if (!TProcessor.class.isAssignableFrom(pclass)) {  
  94.                     continue;  
  95.                 }  
  96.                 Constructor<?> constructor = pclass.getConstructor(clazz);  
  97.                 processor = (TProcessor) constructor.newInstance(service);  
  98.                 break;  
  99.             } catch (Exception e) {  
  100.                 //  
  101.             }  
  102.         }  
  103.         if (processor == null) {  
  104.             throw new IllegalClassFormatException("service-class should implements Iface");  
  105.         }  
  106.         //需要单独的线程,因为serve方法是阻塞的.  
  107.         serverThread = new ServerThread(processor, port);  
  108.         serverThread.start();  
  109.         // 注册服务  
  110.         if (thriftServerAddressRegister != null) {  
  111.             thriftServerAddressRegister.register(serviceName, version, hostname);  
  112.         }  
  113.   
  114.     }  
  115.     class ServerThread extends Thread {  
  116.         private TServer server;  
  117.         ServerThread(TProcessor processor, int port) throws Exception {  
  118.             TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);  
  119.             TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);    
  120.             TProcessorFactory processorFactory = new TProcessorFactory(processor);  
  121.             tArgs.processorFactory(processorFactory);  
  122.             tArgs.transportFactory(new TFramedTransport.Factory());    
  123.             tArgs.protocolFactory( new TBinaryProtocol.Factory(true, true));   
  124.             server = new TThreadedSelectorServer(tArgs);  
  125.         }  
  126.   
  127.         @Override  
  128.         public void run(){  
  129.             try{  
  130.                 //启动服务  
  131.                 server.serve();  
  132.             }catch(Exception e){  
  133.                 //  
  134.             }  
  135.         }  
  136.           
  137.         public void stopServer(){  
  138.             server.stop();  
  139.         }  
  140.     }  
  141.       
  142.     public void close() {  
  143.         serverThread.stopServer();  
  144.     }  
  145. }  
package cn.slimsmart.thrift.rpc;import java.lang.instrument.IllegalClassFormatException;import java.lang.reflect.Constructor;import org.apache.thrift.TProcessor;import org.apache.thrift.TProcessorFactory;import org.apache.thrift.protocol.TBinaryProtocol;import org.apache.thrift.server.TServer;import org.apache.thrift.server.TThreadedSelectorServer;import org.apache.thrift.transport.TFramedTransport;import org.apache.thrift.transport.TNonblockingServerSocket;import org.springframework.beans.factory.InitializingBean;import org.springframework.util.StringUtils;import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressRegister;import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerIpLocalNetworkResolve;import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerIpResolve;/** * 服务端注册服务工厂 */public class ThriftServiceServerFactory implements InitializingBean {	// 服务注册本机端口	private Integer port = 8299;	// 优先级	private Integer weight = 1;// default	// 服务实现类	private Object service;// serice实现类	//服务版本号	private String version;	// 解析本机IP	private ThriftServerIpResolve thriftServerIpResolve;	//服务注册	private ThriftServerAddressRegister thriftServerAddressRegister;	private ServerThread serverThread;		public void setPort(Integer port) {		this.port = port;	}	public void setWeight(Integer weight) {		this.weight = weight;	}	public void setService(Object service) {		this.service = service;	}	public void setVersion(String version) {		this.version = version;	}	public void setThriftServerIpResolve(ThriftServerIpResolve thriftServerIpResolve) {		this.thriftServerIpResolve = thriftServerIpResolve;	}	public void setThriftServerAddressRegister(ThriftServerAddressRegister thriftServerAddressRegister) {		this.thriftServerAddressRegister = thriftServerAddressRegister;	}	@Override	public void afterPropertiesSet() throws Exception {		if (thriftServerIpResolve == null) {			thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve();		}		String serverIP = thriftServerIpResolve.getServerIp();		if (StringUtils.isEmpty(serverIP)) {			throw new ThriftException("cant find server ip...");		}		String hostname = serverIP + ":" + port + ":" + weight;		Class
serviceClass = service.getClass(); // 获取实现类接口 Class
[] interfaces = serviceClass.getInterfaces(); if (interfaces.length == 0) { throw new IllegalClassFormatException("service-class should implements Iface"); } // reflect,load "Processor"; TProcessor processor = null; String serviceName = null; for (Class
clazz : interfaces) { String cname = clazz.getSimpleName(); if (!cname.equals("Iface")) { continue; } serviceName = clazz.getEnclosingClass().getName(); String pname = serviceName + "$Processor"; try { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); Class
pclass = classLoader.loadClass(pname); if (!TProcessor.class.isAssignableFrom(pclass)) { continue; } Constructor
constructor = pclass.getConstructor(clazz); processor = (TProcessor) constructor.newInstance(service); break; } catch (Exception e) { // } } if (processor == null) { throw new IllegalClassFormatException("service-class should implements Iface"); } //需要单独的线程,因为serve方法是阻塞的. serverThread = new ServerThread(processor, port); serverThread.start(); // 注册服务 if (thriftServerAddressRegister != null) { thriftServerAddressRegister.register(serviceName, version, hostname); } } class ServerThread extends Thread { private TServer server; ServerThread(TProcessor processor, int port) throws Exception { TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port); TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport); TProcessorFactory processorFactory = new TProcessorFactory(processor); tArgs.processorFactory(processorFactory); tArgs.transportFactory(new TFramedTransport.Factory()); tArgs.protocolFactory( new TBinaryProtocol.Factory(true, true)); server = new TThreadedSelectorServer(tArgs); } @Override public void run(){ try{ //启动服务 server.serve(); }catch(Exception e){ // } } public void stopServer(){ server.stop(); } } public void close() { serverThread.stopServer(); }}

 

四、客户端获取服务代理及连接池实现

客户端连接池实现:ThriftClientPoolFactory.java

 

[java]
  1. package cn.slimsmart.thrift.rpc;  
  2.   
  3. import java.net.InetSocketAddress;  
  4.   
  5. import org.apache.commons.pool.BasePoolableObjectFactory;  
  6. import org.apache.thrift.TServiceClient;  
  7. import org.apache.thrift.TServiceClientFactory;  
  8. import org.apache.thrift.protocol.TBinaryProtocol;  
  9. import org.apache.thrift.protocol.TProtocol;  
  10. import org.apache.thrift.transport.TFramedTransport;  
  11. import org.apache.thrift.transport.TSocket;  
  12. import org.apache.thrift.transport.TTransport;  
  13.   
  14. import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider;  
  15.   
  16. /** 
  17.  * 连接池,thrift-client for spring 
  18.  */  
  19. public class ThriftClientPoolFactory extends BasePoolableObjectFactory<TServiceClient> {  
  20.   
  21.     private final ThriftServerAddressProvider serverAddressProvider;  
  22.     private final TServiceClientFactory<TServiceClient> clientFactory;  
  23.     private PoolOperationCallBack callback;  
  24.   
  25.     protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory) throws Exception {  
  26.         this.serverAddressProvider = addressProvider;  
  27.         this.clientFactory = clientFactory;  
  28.     }  
  29.   
  30.     protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory,  
  31.             PoolOperationCallBack callback) throws Exception {  
  32.         this.serverAddressProvider = addressProvider;  
  33.         this.clientFactory = clientFactory;  
  34.         this.callback = callback;  
  35.     }  
  36.   
  37.     static interface PoolOperationCallBack {  
  38.         // 销毁client之前执行  
  39.         void destroy(TServiceClient client);  
  40.   
  41.         // 创建成功是执行  
  42.         void make(TServiceClient client);  
  43.     }  
  44.   
  45.     public void destroyObject(TServiceClient client) throws Exception {  
  46.         if (callback != null) {  
  47.             try {  
  48.                 callback.destroy(client);  
  49.             } catch (Exception e) {  
  50.                 //  
  51.             }  
  52.         }  
  53.         TTransport pin = client.getInputProtocol().getTransport();  
  54.         pin.close();  
  55.     }  
  56.   
  57.     public boolean validateObject(TServiceClient client) {  
  58.         TTransport pin = client.getInputProtocol().getTransport();  
  59.         return pin.isOpen();  
  60.     }  
  61.   
  62.     @Override  
  63.     public TServiceClient makeObject() throws Exception {  
  64.         InetSocketAddress address = serverAddressProvider.selector();  
  65.         TSocket tsocket = new TSocket(address.getHostName(), address.getPort());  
  66.         TTransport transport = new TFramedTransport(tsocket);  
  67.         TProtocol protocol = new TBinaryProtocol(transport);  
  68.         TServiceClient client = this.clientFactory.getClient(protocol);  
  69.         transport.open();  
  70.         if (callback != null) {  
  71.             try {  
  72.                 callback.make(client);  
  73.             } catch (Exception e) {  
  74.                 //  
  75.             }  
  76.         }  
  77.         return client;  
  78.     }  
  79.   
  80. }  
package cn.slimsmart.thrift.rpc;import java.net.InetSocketAddress;import org.apache.commons.pool.BasePoolableObjectFactory;import org.apache.thrift.TServiceClient;import org.apache.thrift.TServiceClientFactory;import org.apache.thrift.protocol.TBinaryProtocol;import org.apache.thrift.protocol.TProtocol;import org.apache.thrift.transport.TFramedTransport;import org.apache.thrift.transport.TSocket;import org.apache.thrift.transport.TTransport;import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider;/** * 连接池,thrift-client for spring */public class ThriftClientPoolFactory extends BasePoolableObjectFactory
{ private final ThriftServerAddressProvider serverAddressProvider; private final TServiceClientFactory
clientFactory; private PoolOperationCallBack callback; protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory
clientFactory) throws Exception { this.serverAddressProvider = addressProvider; this.clientFactory = clientFactory; } protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory
clientFactory, PoolOperationCallBack callback) throws Exception { this.serverAddressProvider = addressProvider; this.clientFactory = clientFactory; this.callback = callback; } static interface PoolOperationCallBack { // 销毁client之前执行 void destroy(TServiceClient client); // 创建成功是执行 void make(TServiceClient client); } public void destroyObject(TServiceClient client) throws Exception { if (callback != null) { try { callback.destroy(client); } catch (Exception e) { // } } TTransport pin = client.getInputProtocol().getTransport(); pin.close(); } public boolean validateObject(TServiceClient client) { TTransport pin = client.getInputProtocol().getTransport(); return pin.isOpen(); } @Override public TServiceClient makeObject() throws Exception { InetSocketAddress address = serverAddressProvider.selector(); TSocket tsocket = new TSocket(address.getHostName(), address.getPort()); TTransport transport = new TFramedTransport(tsocket); TProtocol protocol = new TBinaryProtocol(transport); TServiceClient client = this.clientFactory.getClient(protocol); transport.open(); if (callback != null) { try { callback.make(client); } catch (Exception e) { // } } return client; }}

客户端服务代理工厂实现:ThriftServiceClientProxyFactory.java

 

 

[java]
  1. package cn.slimsmart.thrift.rpc;  
  2.   
  3. import java.lang.reflect.InvocationHandler;  
  4. import java.lang.reflect.Method;  
  5. import java.lang.reflect.Proxy;  
  6.   
  7. import org.apache.commons.pool.impl.GenericObjectPool;  
  8. import org.apache.thrift.TServiceClient;  
  9. import org.apache.thrift.TServiceClientFactory;  
  10. import org.springframework.beans.factory.FactoryBean;  
  11. import org.springframework.beans.factory.InitializingBean;  
  12.   
  13. import cn.slimsmart.thrift.rpc.ThriftClientPoolFactory.PoolOperationCallBack;  
  14. import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider;  
  15.   
  16. /** 
  17.  * 客户端代理 
  18.  */  
  19. @SuppressWarnings({ "unchecked", "rawtypes" })  
  20. public class ThriftServiceClientProxyFactory implements FactoryBean, InitializingBean {  
  21.   
  22.     private Integer maxActive = 32;// 最大活跃连接数  
  23.   
  24.     // ms,default 3 min,链接空闲时间  
  25.     // -1,关闭空闲检测  
  26.     private Integer idleTime = 180000;  
  27.     private ThriftServerAddressProvider serverAddressProvider;  
  28.   
  29.     private Object proxyClient;  
  30.     private Class<?> objectClass;  
  31.   
  32.     private GenericObjectPool<TServiceClient> pool;  
  33.   
  34.     private PoolOperationCallBack callback = new PoolOperationCallBack() {  
  35.         @Override  
  36.         public void make(TServiceClient client) {  
  37.             System.out.println("create");  
  38.         }  
  39.   
  40.         @Override  
  41.         public void destroy(TServiceClient client) {  
  42.             System.out.println("destroy");  
  43.         }  
  44.     };  
  45.       
  46.     public void setMaxActive(Integer maxActive) {  
  47.         this.maxActive = maxActive;  
  48.     }  
  49.   
  50.     public void setIdleTime(Integer idleTime) {  
  51.         this.idleTime = idleTime;  
  52.     }  
  53.   
  54.     public void setServerAddressProvider(ThriftServerAddressProvider serverAddressProvider) {  
  55.         this.serverAddressProvider = serverAddressProvider;  
  56.     }  
  57.   
  58.     @Override  
  59.     public void afterPropertiesSet() throws Exception {  
  60.         ClassLoader classLoader = Thread.currentThread().getContextClassLoader();  
  61.         // 加载Iface接口  
  62.         objectClass = classLoader.loadClass(serverAddressProvider.getService() + "$Iface");  
  63.         // 加载Client.Factory类  
  64.         Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>) classLoader.loadClass(serverAddressProvider.getService() + "$Client$Factory");  
  65.         TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance();  
  66.         ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(serverAddressProvider, clientFactory, callback);  
  67.         GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();  
  68.         poolConfig.maxActive = maxActive;  
  69.         poolConfig.minIdle = 0;  
  70.         poolConfig.minEvictableIdleTimeMillis = idleTime;  
  71.         poolConfig.timeBetweenEvictionRunsMillis = idleTime / 2L;  
  72.         pool = new GenericObjectPool<TServiceClient>(clientPool, poolConfig);  
  73.         proxyClient = Proxy.newProxyInstance(classLoader, new Class[] { objectClass }, new InvocationHandler() {  
  74.             @Override  
  75.             public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {  
  76.                 //  
  77.                 TServiceClient client = pool.borrowObject();  
  78.                 try {  
  79.                     return method.invoke(client, args);  
  80.                 } catch (Exception e) {  
  81.                     throw e;  
  82.                 } finally {  
  83.                     pool.returnObject(client);  
  84.                 }  
  85.             }  
  86.         });  
  87.     }  
  88.   
  89.     @Override  
  90.     public Object getObject() throws Exception {  
  91.         return proxyClient;  
  92.     }  
  93.   
  94.     @Override  
  95.     public Class<?> getObjectType() {  
  96.         return objectClass;  
  97.     }  
  98.   
  99.     @Override  
  100.     public boolean isSingleton() {  
  101.         return true;  
  102.     }  
  103.   
  104.     public void close() {  
  105.         if (serverAddressProvider != null) {  
  106.             serverAddressProvider.close();  
  107.         }  
  108.     }  
  109. }  
package cn.slimsmart.thrift.rpc;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import org.apache.commons.pool.impl.GenericObjectPool;import org.apache.thrift.TServiceClient;import org.apache.thrift.TServiceClientFactory;import org.springframework.beans.factory.FactoryBean;import org.springframework.beans.factory.InitializingBean;import cn.slimsmart.thrift.rpc.ThriftClientPoolFactory.PoolOperationCallBack;import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider;/** * 客户端代理 */@SuppressWarnings({ "unchecked", "rawtypes" })public class ThriftServiceClientProxyFactory implements FactoryBean, InitializingBean {	private Integer maxActive = 32;// 最大活跃连接数	// ms,default 3 min,链接空闲时间	// -1,关闭空闲检测	private Integer idleTime = 180000;	private ThriftServerAddressProvider serverAddressProvider;	private Object proxyClient;	private Class
objectClass; private GenericObjectPool
pool; private PoolOperationCallBack callback = new PoolOperationCallBack() { @Override public void make(TServiceClient client) { System.out.println("create"); } @Override public void destroy(TServiceClient client) { System.out.println("destroy"); } }; public void setMaxActive(Integer maxActive) { this.maxActive = maxActive; } public void setIdleTime(Integer idleTime) { this.idleTime = idleTime; } public void setServerAddressProvider(ThriftServerAddressProvider serverAddressProvider) { this.serverAddressProvider = serverAddressProvider; } @Override public void afterPropertiesSet() throws Exception { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); // 加载Iface接口 objectClass = classLoader.loadClass(serverAddressProvider.getService() + "$Iface"); // 加载Client.Factory类 Class
> fi = (Class
>) classLoader.loadClass(serverAddressProvider.getService() + "$Client$Factory"); TServiceClientFactory
clientFactory = fi.newInstance(); ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(serverAddressProvider, clientFactory, callback); GenericObjectPool.Config poolConfig = new GenericObjectPool.Config(); poolConfig.maxActive = maxActive; poolConfig.minIdle = 0; poolConfig.minEvictableIdleTimeMillis = idleTime; poolConfig.timeBetweenEvictionRunsMillis = idleTime / 2L; pool = new GenericObjectPool
(clientPool, poolConfig); proxyClient = Proxy.newProxyInstance(classLoader, new Class[] { objectClass }, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // TServiceClient client = pool.borrowObject(); try { return method.invoke(client, args); } catch (Exception e) { throw e; } finally { pool.returnObject(client); } } }); } @Override public Object getObject() throws Exception { return proxyClient; } @Override public Class
getObjectType() { return objectClass; } @Override public boolean isSingleton() { return true; } public void close() { if (serverAddressProvider != null) { serverAddressProvider.close(); } }}

下面我们看一下服务端和客户端的配置;

 

服务端spring-context-thrift-server.xml

 

[html]
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"  
  3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"  
  4.     xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"  
  5.     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd  
  6.                 http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd  
  7.                 http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd  
  8.                 http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"  
  9.     default-lazy-init="false">  
  10.   
  11.     <!-- zookeeper -->  
  12.     <bean id="thriftZookeeper" class="cn.slimsmart.thrift.rpc.zookeeper.ZookeeperFactory"  
  13.         destroy-method="close">  
  14.         <property name="zkHosts"  
  15.             value="192.168.36.54:2181,192.168.36.99:2181,192.168.36.189:2181" />  
  16.         <property name="namespace" value="cn.slimsmart.thrift.rpc.demo" />  
  17.         <property name="connectionTimeout" value="3000" />  
  18.         <property name="sessionTimeout" value="3000" />  
  19.         <property name="singleton" value="true" />  
  20.     </bean>  
  21.     <bean id="sericeAddressRegister"  
  22.         class="cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressRegisterZookeeper"  
  23.         destroy-method="close">  
  24.         <property name="zkClient" ref="thriftZookeeper" />  
  25.     </bean>  
  26.     <bean id="echoSerivceImpl" class="cn.slimsmart.thrift.rpc.demo.EchoSerivceImpl" />  
  27.   
  28.     <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"  
  29.         destroy-method="close">  
  30.         <property name="service" ref="echoSerivceImpl" />  
  31.         <property name="port" value="9000" />  
  32.         <property name="version" value="1.0.0" />  
  33.         <property name="weight" value="1" />  
  34.         <property name="thriftServerAddressRegister" ref="sericeAddressRegister" />  
  35.     </bean>  
  36.       
  37.     <bean id="echoSerivce1" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"  
  38.         destroy-method="close">  
  39.         <property name="service" ref="echoSerivceImpl" />  
  40.         <property name="port" value="9001" />  
  41.         <property name="version" value="1.0.0" />  
  42.         <property name="weight" value="1" />  
  43.         <property name="thriftServerAddressRegister" ref="sericeAddressRegister" />  
  44.     </bean>  
  45.       
  46.     <bean id="echoSerivce2" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"  
  47.         destroy-method="close">  
  48.         <property name="service" ref="echoSerivceImpl" />  
  49.         <property name="port" value="9002" />  
  50.         <property name="version" value="1.0.0" />  
  51.         <property name="weight" value="1" />  
  52.         <property name="thriftServerAddressRegister" ref="sericeAddressRegister" />  
  53.     </bean>  
  54. </beans>  

客户端:spring-context-thrift-client.xml

[html]
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"  
  3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"  
  4.     xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"  
  5.     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd  
  6.                 http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd  
  7.                 http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd  
  8.                 http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"  
  9.     default-lazy-init="false">  
  10.       
  11.     <!-- fixedAddress -->  
  12.     <!--   
  13.     <bean id="fixedAddressProvider" class="cn.slimsmart.thrift.rpc.zookeeper.FixedAddressProvider">  
  14.          <property name="service" value="cn.slimsmart.thrift.rpc.demo.EchoSerivce" />  
  15.          <property name="serverAddress" value="192.168.36.215:9001:1,192.168.36.215:9002:2,192.168.36.215:9003:3" />  
  16.     </bean>  
  17.     <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceClientProxyFactory">  
  18.         <property name="maxActive" value="5" />  
  19.         <property name="idleTime" value="10000" />  
  20.         <property name="serverAddressProvider" ref="fixedAddressProvider" />  
  21.     </bean>  
  22.    -->  
  23.     <!-- zookeeper   -->  
  24.     <bean id="thriftZookeeper" class="cn.slimsmart.thrift.rpc.zookeeper.ZookeeperFactory"  
  25.         destroy-method="close">  
  26.         <property name="zkHosts"  
  27.             value="192.168.36.54:2181,192.168.36.99:2181,192.168.36.189:2181" />  
  28.         <property name="namespace" value="cn.slimsmart.thrift.rpc.demo" />  
  29.         <property name="connectionTimeout" value="3000" />  
  30.         <property name="sessionTimeout" value="3000" />  
  31.         <property name="singleton" value="true" />  
  32.     </bean>  
  33.     <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceClientProxyFactory" destroy-method="close">  
  34.         <property name="maxActive" value="5" />  
  35.         <property name="idleTime" value="1800000" />  
  36.         <property name="serverAddressProvider">  
  37.             <bean class="cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProviderZookeeper">  
  38.                 <property name="service" value="cn.slimsmart.thrift.rpc.demo.EchoSerivce" />  
  39.                 <property name="version" value="1.0.0" />  
  40.                 <property name="zkClient" ref="thriftZookeeper" />  
  41.             </bean>  
  42.         </property>  
  43.     </bean>  
  44. </beans>  

运行服务端后,我们可以看见zookeeper注册了多个服务地址。

 

详细实例这里就不详述了,请参考实例代码:

关于Thrift设计优化文档:

你可能感兴趣的文章
Loading——spin.js
查看>>
Hadoop完全分布式环境搭建(四)——基于Ubuntu16.04安装和配置Hadoop大数据环境...
查看>>
Mule ESB工程的部署
查看>>
分离被碰撞物体, 求碰撞冲量
查看>>
js移动端 可移动滑块
查看>>
【kruscal】【最小生成树】poj3522 Slim Span
查看>>
jquery ajax提交表单数据的两种方式
查看>>
hdu 2102 A计划-bfs
查看>>
学习集合
查看>>
18校招借鉴
查看>>
JAVA第三次作业
查看>>
2017ICPC北京 J:Pangu and Stones
查看>>
Pandas 数据清洗保存
查看>>
SpringBoot + nodeJS + zookeeper 搭建微服务示例
查看>>
《互联网时代》第二集·浪潮
查看>>
8.10 exec函数
查看>>
Shell命令-文件及内容处理之sort、uniq
查看>>
Android 之文件夹排序
查看>>
Java Assert 用法简介
查看>>
关于redo size(一)
查看>>