本文将对redis的java客户端jedis进行源码剖析,目的是让读者对jedis客户端有清楚的认识;代码结构如下:
代码结构jedis
jedis 使用
jedis的直接使用很简单,新建一个客户端便可直接使用
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6666);
jedis.set("hello", "world");
String value = jedis.get("hello");
System.out.println("value:" + value);
jedis.close();
}
jedis源码梳理
jedis结构Jedis继承了 BinaryJedis ,并实现了诸多commands接口
BinaryJedisBinaryJedis声明了三个成员变量分别是Client(客户端),Transaction(redis事务),Pipeline(redis批量执行)。BinaryJedis的构造函数创建了Client实例,以下为其中一种示例
public BinaryJedis(final String host, final int port) {
client = new Client(host, port);
}
我们再看具体的set命令实现
public String set(final byte[] key, final byte[] value) {
checkIsInMultiOrPipeline();
client.set(key, value);
return client.getStatusCodeReply();
}
由client去调用set方法,并返回client.getStatusCodeReply()结果;那么client是如何实现的呢,我们来看看client的类图
client
我们发现Client的父类也是继承了Connection类;我们从顶层向下梳理
Connection原来是由Connection类实现了发送命令,建立连接,关闭连接;参照以下代码片段;
/**建立连接**/
public void connect() {
if (!isConnected()) {
try {
socket = new Socket();
socket.setReuseAddress(true);
socket.setKeepAlive(true);
socket.setTcpNoDelay(true);
socket.setSoLinger(true, 0);
socket.connect(new InetSocketAddress(host, port), connectionTimeout);
socket.setSoTimeout(soTimeout);
if (ssl) {
if (null == sslSocketFactory) {
sslSocketFactory = (SSLSocketFactory)SSLSocketFactory.getDefault();
}
socket = (SSLSocket) sslSocketFactory.createSocket(socket, host, port, true);
if (null != sslParameters) {
((SSLSocket) socket).setSSLParameters(sslParameters);
}
if ((null != hostnameVerifier) &&
(!hostnameVerifier.verify(host, ((SSLSocket) socket).getSession()))) {
String message = String.format(
"The connection to '%s' failed ssl/tls hostname verification.", host);
throw new JedisConnectionException(message);
}
}
outputStream = new RedisOutputStream(socket.getOutputStream());
inputStream = new RedisInputStream(socket.getInputStream());
} catch (IOException ex) {
broken = true;
throw new JedisConnectionException(ex);
}
}
}
/**发送命令**/
protected Connection sendCommand(final Command cmd, final byte[]... args) {
try {
connect();
/**以协议的格式发送命令**/
Protocol.sendCommand(outputStream, cmd, args);
pipelinedCommands++;
return this;
} catch (JedisConnectionException ex) {
try {
String errorMessage = Protocol.readErrorLineIfPossible(inputStream);
if (errorMessage != null && errorMessage.length() > 0) {
ex = new JedisConnectionException(errorMessage, ex.getCause());
}
} catch (Exception e) {
}
broken = true;
throw ex;
}
}
/**获取响应 *Reply方法都是获取不同格式的响应 **/
public String getStatusCodeReply() {
flush();
pipelinedCommands--;
final byte[] resp = (byte[]) readProtocolWithCheckingBroken();
if (null == resp) {
return null;
} else {
return SafeEncoder.encode(resp);
}
}
通过阅读Connection的源码,我们明确了它的职责是与Redis服务端建立连接,还可以向Redis服务端发送命令,以及获取Redis服务端的响应;我们再来看Connection的子类BinaryClient。
/**重写父类的Connect方法,并加入auth与select的过程**/
@Override
public void connect() {
if (!isConnected()) {
super.connect();
if (password != null) {
auth(password);
getStatusCodeReply();
}
if (db > 0) {
select(Long.valueOf(db).intValue());
getStatusCodeReply();
}
}
}
/**通过定义入参为byte数组的API方法,调用父类的sendCommand来发送命令**/
public void set(final byte[] key, final byte[] value){
sendCommand(Command.SET, key, value);
}
....
public void get(final byte[] key){
sendCommand(Command.GET, key);
}
BinaryClient重写了connect方法,并实现了auth和select的过程,并声明了调用Redis的相关方法;最后是我们的目标类Client,它实现了定义了Redis常用API的Commands接口,而Client具体的实现方法则是调用BinaryClient之前声明的方法;
@Override
public void set(final String key, final String value) {
/**调用BinaryClient的方法,将参数转换为byte[]**/
set(SafeEncoder.encode(key), SafeEncoder.encode(value));
}
....
public void get(final String key) {
get(SafeEncoder.encode(key));
}
至此我们明白Client是一个调用Redis相应API功能的客户端。
我们来梳理一下,Connection是负责与Redis服务端的通讯的连接,Client是负责调用通讯的客户端,Jedis是给开发人员使用的客户端。我们知道了底层通讯是通过socket来实现的, 为了避免频繁的创建连接销毁连接,常用的办法是采用连接池技术,那么接下来我们一起来看看JedisPool相关的实现。
JedisPool
JedisPool使用
JedisPool使用是先创建Pool实例,然后获取Jedis资源,使用结束后使用jedis.close()归还资源。
public static void main(String[] args) {
JedisPool jedisPool = new JedisPool("127.0.0.1", 6666);
Jedis jedis = jedisPool.getResource();
jedis.set("test", "jedis");
String value = jedis.get("test");
System.out.println("value:" + value);
jedis.close();
}
JedisPool源码分析
分析JedisPool源码,我们还是先看JedisPool的类图,了解其继承实现结构。
JedisPool
我们还是从上往下分析查看,先一起看看Pool的实现,以下是Pool的核心代码
public abstract class Pool<T> implements Closeable {
protected GenericObjectPool<T> internalPool;
public Pool() {
}
/**Pool构造方法**/
public Pool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) {
initPool(poolConfig, factory);
}
/**根据poolConfig初始化池**/
public void initPool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) {
if (this.internalPool != null) {
try {
closeInternalPool();
} catch (Exception e) {
}
}
this.internalPool = new GenericObjectPool<T>(factory, poolConfig);
}
/**获取资源**/
public T getResource(){
try {
return internalPool.borrowObject();
} catch (NoSuchElementException nse) {
if (null == nse.getCause()) { pool
throw new JedisExhaustedPoolException(
"Could not get a resource since the pool is exhausted", nse);
}
throw new JedisException("Could not get a resource from the pool", nse);
} catch (Exception e) {
throw new JedisConnectionException("Could not get a resource from the pool", e);
}
}
/**归还资源**/
protected void returnResourceObject(final T resource) {
if (resource == null) {
return;
}
try {
internalPool.returnObject(resource);
} catch (Exception e) {
throw new JedisException("Could not return the resource to the pool", e);
}
}
/**销毁资源**/
public void destroy() {..省略代码..}
}
public GenericObjectPool(PooledObjectFactory<T> factory, GenericObjectPoolConfig<T> config) {
super(config, config.getJmxNamePrefix());
this.factoryType = null;
this.maxIdle = 8;
this.minIdle = 0;
this.allObjects = new ConcurrentHashMap();
this.createCount = new AtomicLong(0L);
this.makeObjectCount = 0L;
this.makeObjectCountLock = new Object();
this.abandonedConfig = null;
if (factory == null) {
this.jmxUnregister();
throw new IllegalArgumentException("factory may not be null");
} else {
this.factory = factory;
this.idleObjects = new LinkedBlockingDeque(config.getFairness());
this.setConfig(config);
}
}
构造函数中有两个参数,一个是池对象工厂PooledObjectFactory factory,一个是对象池配置GenericObjectPoolConfig config;config用于配置最大连接数,最大空闲数,最小空闲数;factory则是用于创建,销毁,验证池对象,其实现我们后续查看,先了解其作用即可。
public interface PooledObjectFactory<T> {
/**创建池对象**/
PooledObject<T> makeObject() throws Exception;
/**销毁对象**/
void destroyObject(PooledObject<T> var1) throws Exception;
/**验证池对象**/
boolean validateObject(PooledObject<T> var1);
/**激活对象**/
void activateObject(PooledObject<T> var1) throws Exception;
/**冻结对象**/
void passivateObject(PooledObject<T> var1) throws Exception;
}
我们接着了解从GenericObjectPool的borrowObject过程,我们解释核心部分;
public T borrowObject(long borrowMaxWaitMillis) throws Exception {
this.assertOpen();//对象池是否打开的断言
//....省略部分代码...
PooledObject<T> p = null;//声明池对象
while(true) {
boolean create;
do {
do {
do {
//创建成功后返回对象
if (p != null) {
this.updateStatsBorrow(p, System.currentTimeMillis() - waitTime);
return p.getObject();
}
create = false;
p = (PooledObject)this.idleObjects.pollFirst();
if (p == null) {
//创建池对象过程
p = this.create();
if (p != null) {
create = true;
}
}
if (blockWhenExhausted) {
if (p == null) {
if (borrowMaxWaitMillis < 0L) {
p = (PooledObject)this.idleObjects.takeFirst();
} else {
p = (PooledObject)this.idleObjects.pollFirst(borrowMaxWaitMillis, TimeUnit.MILLISECONDS);
}
}
if (p == null) {
throw new NoSuchElementException("Timeout waiting for idle object");
}
} else if (p == null) {
throw new NoSuchElementException("Pool exhausted");
}
if (!p.allocate()) {
p = null;
}
} while(p == null);
try {
//激活池对象
this.factory.activateObject(p);
} catch (Exception var13) {
try {
this.destroy(p);
} catch (Exception var12) {
}
p = null;
if (create) {
NoSuchElementException nsee = new NoSuchElementException("Unable to activate object");
nsee.initCause(var13);
throw nsee;
}
}
} while(p == null);
} while(!this.getTestOnBorrow() && (!create || !this.getTestOnCreate()));
boolean validate = false;
Throwable validationThrowable = null;
try {
//验证池对象
validate = this.factory.validateObject(p);
} catch (Throwable var15) {
PoolUtils.checkRethrow(var15);
validationThrowable = var15;
}
if (!validate) {
try {
this.destroy(p);
this.destroyedByBorrowValidationCount.incrementAndGet();
} catch (Exception var14) {
}
p = null;
if (create) {
NoSuchElementException nsee = new NoSuchElementException("Unable to validate object");
nsee.initCause(validationThrowable);
throw nsee;
}
}
}
}
我们来梳理一下流程,首先声明对象,然后调用create,之后会在create方法中调用this.factory.makeObject()创建对象,激活对象,验证对象,对象不为空则返回对象。
接着是JedisPoolAbstract,JedisPoolAbstract的代码比较简单,指定了泛型为Jedis,限制了池中资源是Jedis
public class JedisPoolAbstract extends Pool<Jedis> {
public JedisPoolAbstract() {
super();
}
public JedisPoolAbstract(GenericObjectPoolConfig poolConfig, PooledObjectFactory<Jedis> factory) {
super(poolConfig, factory);
}
@Override
protected void returnBrokenResource(Jedis resource) {
super.returnBrokenResource(resource);
}
@Override
protected void returnResource(Jedis resource) {
super.returnResource(resource);
}
}
最后是JedisPool的代码
public class JedisPool extends JedisPoolAbstract {
/**丰富的构造方法-省略**/
/**核心构造方法**/
public JedisPool(final GenericObjectPoolConfig poolConfig, final URI uri,
final int connectionTimeout, final int soTimeout) {
//JedisFactory 是PooledObjectFactory实现类
super(poolConfig, new JedisFactory(uri, connectionTimeout, soTimeout, null));
}
/**重写获取资源方法**/
@Override
public Jedis getResource() {
Jedis jedis = super.getResource();
jedis.setDataSource(this);
return jedis;
}
/**重写归还异常资源方法,对象池会使其不可用并销毁**/
@Override
protected void returnBrokenResource(final Jedis resource) {
if (resource != null) {
returnBrokenResourceObject(resource);
}
}
/**重写归还资源方法**/
@Override
protected void returnResource(final Jedis resource) {
if (resource != null) {
try {
resource.resetState();
returnResourceObject(resource);
} catch (Exception e) {
returnBrokenResource(resource);
throw new JedisException("Resource is returned to the pool as broken", e);
}
}
}
}
JedisPool也是重写了父类获取资源,归还资源的方法;其中我们需要注意的是JedisPool的构造方法调用了父类的构造方法,JedisFactory是PooledObjectFactory的实现类。
@Override
public PooledObject<Jedis> makeObject() throws Exception {
final HostAndPort hostAndPort = this.hostAndPort.get();
final Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout,
soTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier);
try {
jedis.connect();
if (password != null) {
jedis.auth(password);
}
if (database != 0) {
jedis.select(database);
}
if (clientName != null) {
jedis.clientSetname(clientName);
}
} catch (JedisException je) {
jedis.close();
throw je;
}
return new DefaultPooledObject<Jedis>(jedis);
}
我们梳理一下整个JedisPool的使用流程;JedisPool利用GenericObjectPool实现了Jedis资源池化,其构造函数中的JedisFactory实现了PooledObjectFactory的接口,GenericObjectPool实例对象 internalPool 是 JedisPool 的父类 Pool的成员变量,初始化JedisPool 时会调用父类构造方法,初始化internalPool,在需要申请资源时JedisPool 实例调用 getResource方法,getResource调用父类实现,父类实现是调用internalPool 的borrowObject()完成资源的获取;归还资源的流程类似。