首页 热点资讯 义务教育 高等教育 出国留学 考研考公
您的当前位置:首页正文

你读过jedis源码吗?

2024-12-20 来源:化拓教育网

本文将对redis的java客户端jedis进行源码剖析,目的是让读者对jedis客户端有清楚的认识;代码结构如下:

代码结构

jedis

jedis 使用

jedis的直接使用很简单,新建一个客户端便可直接使用

public static void main(String[] args) {
        Jedis jedis = new Jedis("127.0.0.1", 6666);
        jedis.set("hello", "world");
        String value = jedis.get("hello");
        System.out.println("value:" + value);
        jedis.close();
}

jedis源码梳理

jedis结构

Jedis继承了 BinaryJedis ,并实现了诸多commands接口

BinaryJedis

BinaryJedis声明了三个成员变量分别是Client(客户端),Transaction(redis事务),Pipeline(redis批量执行)。BinaryJedis的构造函数创建了Client实例,以下为其中一种示例

public BinaryJedis(final String host, final int port) {
    client = new Client(host, port);
  }

我们再看具体的set命令实现

public String set(final byte[] key, final byte[] value) {
    checkIsInMultiOrPipeline();
    client.set(key, value);
    return client.getStatusCodeReply();
}

由client去调用set方法,并返回client.getStatusCodeReply()结果;那么client是如何实现的呢,我们来看看client的类图


client

我们发现Client的父类也是继承了Connection类;我们从顶层向下梳理

Connection

原来是由Connection类实现了发送命令,建立连接,关闭连接;参照以下代码片段;

/**建立连接**/
public void connect() {
    if (!isConnected()) {
      try {
        socket = new Socket();
        socket.setReuseAddress(true);
        socket.setKeepAlive(true);
        socket.setTcpNoDelay(true);
        socket.setSoLinger(true, 0); 
        socket.connect(new InetSocketAddress(host, port), connectionTimeout);
        socket.setSoTimeout(soTimeout);
        if (ssl) {
          if (null == sslSocketFactory) {
            sslSocketFactory = (SSLSocketFactory)SSLSocketFactory.getDefault();
          }
          socket = (SSLSocket) sslSocketFactory.createSocket(socket, host, port, true);
          if (null != sslParameters) {
            ((SSLSocket) socket).setSSLParameters(sslParameters);
          }
          if ((null != hostnameVerifier) &&
              (!hostnameVerifier.verify(host, ((SSLSocket) socket).getSession()))) {
            String message = String.format(
                "The connection to '%s' failed ssl/tls hostname verification.", host);
            throw new JedisConnectionException(message);
          }
        }
        outputStream = new RedisOutputStream(socket.getOutputStream());
        inputStream = new RedisInputStream(socket.getInputStream());
      } catch (IOException ex) {
        broken = true;
        throw new JedisConnectionException(ex);
      }
    }
  }

/**发送命令**/
protected Connection sendCommand(final Command cmd, final byte[]... args) {
    try {
      connect();
     /**以协议的格式发送命令**/
      Protocol.sendCommand(outputStream, cmd, args);
      pipelinedCommands++;
      return this;
    } catch (JedisConnectionException ex) {
      try {
        String errorMessage = Protocol.readErrorLineIfPossible(inputStream);
        if (errorMessage != null && errorMessage.length() > 0) {
          ex = new JedisConnectionException(errorMessage, ex.getCause());
        }
      } catch (Exception e) {
      }
      broken = true;
      throw ex;
    }
  }

/**获取响应 *Reply方法都是获取不同格式的响应 **/
public String getStatusCodeReply() {
    flush();
    pipelinedCommands--;
    final byte[] resp = (byte[]) readProtocolWithCheckingBroken();
    if (null == resp) {
      return null;
    } else {
      return SafeEncoder.encode(resp);
    }
  }

通过阅读Connection的源码,我们明确了它的职责是与Redis服务端建立连接,还可以向Redis服务端发送命令,以及获取Redis服务端的响应;我们再来看Connection的子类BinaryClient。

/**重写父类的Connect方法,并加入auth与select的过程**/
@Override
  public void connect() {
    if (!isConnected()) {
      super.connect();
      if (password != null) {
        auth(password);
        getStatusCodeReply();
      }
      if (db > 0) {
        select(Long.valueOf(db).intValue());
        getStatusCodeReply();
      }
    }
  }

/**通过定义入参为byte数组的API方法,调用父类的sendCommand来发送命令**/
public void set(final byte[] key, final byte[] value){
    sendCommand(Command.SET, key, value);
}
....
public void get(final byte[] key){
    sendCommand(Command.GET, key);
}

BinaryClient重写了connect方法,并实现了auth和select的过程,并声明了调用Redis的相关方法;最后是我们的目标类Client,它实现了定义了Redis常用API的Commands接口,而Client具体的实现方法则是调用BinaryClient之前声明的方法;

@Override
  public void set(final String key, final String value) {
   /**调用BinaryClient的方法,将参数转换为byte[]**/
    set(SafeEncoder.encode(key), SafeEncoder.encode(value));
  }
....
public void get(final String key) {
    get(SafeEncoder.encode(key));
  }

至此我们明白Client是一个调用Redis相应API功能的客户端。

我们来梳理一下,Connection是负责与Redis服务端的通讯的连接,Client是负责调用通讯的客户端,Jedis是给开发人员使用的客户端。我们知道了底层通讯是通过socket来实现的, 为了避免频繁的创建连接销毁连接,常用的办法是采用连接池技术,那么接下来我们一起来看看JedisPool相关的实现。

JedisPool

JedisPool使用

JedisPool使用是先创建Pool实例,然后获取Jedis资源,使用结束后使用jedis.close()归还资源。

public static void main(String[] args) {
        JedisPool jedisPool = new JedisPool("127.0.0.1", 6666);
        Jedis jedis = jedisPool.getResource();
        jedis.set("test", "jedis");
        String value = jedis.get("test");
        System.out.println("value:" + value);
        jedis.close();
    }

JedisPool源码分析

分析JedisPool源码,我们还是先看JedisPool的类图,了解其继承实现结构。


JedisPool

我们还是从上往下分析查看,先一起看看Pool的实现,以下是Pool的核心代码

public abstract class Pool<T> implements Closeable {
 
   
 protected GenericObjectPool<T> internalPool;
  
 public Pool() {
  }
 /**Pool构造方法**/
 public Pool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) {
    initPool(poolConfig, factory);
  }

 /**根据poolConfig初始化池**/
 public void initPool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) {
    if (this.internalPool != null) {
      try {
        closeInternalPool();
      } catch (Exception e) {
      }
    }
    this.internalPool = new GenericObjectPool<T>(factory, poolConfig);
  }

 /**获取资源**/
 public T getResource(){
  try {
      return internalPool.borrowObject();
    } catch (NoSuchElementException nse) {
      if (null == nse.getCause()) { pool
        throw new JedisExhaustedPoolException(
            "Could not get a resource since the pool is exhausted", nse);
      }
      throw new JedisException("Could not get a resource from the pool", nse);
    } catch (Exception e) {
      throw new JedisConnectionException("Could not get a resource from the pool", e);
    }
}

 /**归还资源**/
 protected void returnResourceObject(final T resource) {
  if (resource == null) {
      return;
    }
    try {
      internalPool.returnObject(resource);
    } catch (Exception e) {
      throw new JedisException("Could not return the resource to the pool", e);
    }
}

 /**销毁资源**/
 public void destroy() {..省略代码..}

}
public GenericObjectPool(PooledObjectFactory<T> factory, GenericObjectPoolConfig<T> config) {
        super(config,  config.getJmxNamePrefix());
        this.factoryType = null;
        this.maxIdle = 8;
        this.minIdle = 0;
        this.allObjects = new ConcurrentHashMap();
        this.createCount = new AtomicLong(0L);
        this.makeObjectCount = 0L;
        this.makeObjectCountLock = new Object();
        this.abandonedConfig = null;
        if (factory == null) {
            this.jmxUnregister();
            throw new IllegalArgumentException("factory may not be null");
        } else {
            this.factory = factory;
            this.idleObjects = new LinkedBlockingDeque(config.getFairness());
            this.setConfig(config);
        }
    }

构造函数中有两个参数,一个是池对象工厂PooledObjectFactory factory,一个是对象池配置GenericObjectPoolConfig config;config用于配置最大连接数,最大空闲数,最小空闲数;factory则是用于创建,销毁,验证池对象,其实现我们后续查看,先了解其作用即可。

public interface PooledObjectFactory<T> {
    /**创建池对象**/
    PooledObject<T> makeObject() throws Exception;

    /**销毁对象**/
    void destroyObject(PooledObject<T> var1) throws Exception;

    /**验证池对象**/
    boolean validateObject(PooledObject<T> var1);

    /**激活对象**/ 
    void activateObject(PooledObject<T> var1) throws Exception;
  
   /**冻结对象**/ 
    void passivateObject(PooledObject<T> var1) throws Exception;
}

我们接着了解从GenericObjectPool的borrowObject过程,我们解释核心部分;

public T borrowObject(long borrowMaxWaitMillis) throws Exception {
        this.assertOpen();//对象池是否打开的断言
        //....省略部分代码...
        PooledObject<T> p = null;//声明池对象
        while(true) {
            boolean create;
            do {
                do {
                    do {
                        //创建成功后返回对象
                        if (p != null) {
                            this.updateStatsBorrow(p, System.currentTimeMillis() - waitTime);
                            return p.getObject();
                        }

                        create = false;
                        p = (PooledObject)this.idleObjects.pollFirst();
                        if (p == null) {
                           //创建池对象过程
                            p = this.create();
                            if (p != null) {
                                create = true;
                            }
                        }

                        if (blockWhenExhausted) {
                            if (p == null) {
                                if (borrowMaxWaitMillis < 0L) {
                                    p = (PooledObject)this.idleObjects.takeFirst();
                                } else {
                                    p = (PooledObject)this.idleObjects.pollFirst(borrowMaxWaitMillis, TimeUnit.MILLISECONDS);
                                }
                            }

                            if (p == null) {
                                throw new NoSuchElementException("Timeout waiting for idle object");
                            }
                        } else if (p == null) {
                            throw new NoSuchElementException("Pool exhausted");
                        }

                        if (!p.allocate()) {
                            p = null;
                        }
                    } while(p == null);

                    try {
                       //激活池对象
                        this.factory.activateObject(p);
                    } catch (Exception var13) {
                        try {
                            this.destroy(p);
                        } catch (Exception var12) {
                        }

                        p = null;
                        if (create) {
                            NoSuchElementException nsee = new NoSuchElementException("Unable to activate object");
                            nsee.initCause(var13);
                            throw nsee;
                        }
                    }
                } while(p == null);
            } while(!this.getTestOnBorrow() && (!create || !this.getTestOnCreate()));

            boolean validate = false;
            Throwable validationThrowable = null;

            try {
                 //验证池对象
                validate = this.factory.validateObject(p);
            } catch (Throwable var15) {
                PoolUtils.checkRethrow(var15);
                validationThrowable = var15;
            }

            if (!validate) {
                try {
                    this.destroy(p);
                    this.destroyedByBorrowValidationCount.incrementAndGet();
                } catch (Exception var14) {
                }

                p = null;
                if (create) {
                    NoSuchElementException nsee = new NoSuchElementException("Unable to validate object");
                    nsee.initCause(validationThrowable);
                    throw nsee;
                }
            }
        }
    }

我们来梳理一下流程,首先声明对象,然后调用create,之后会在create方法中调用this.factory.makeObject()创建对象,激活对象,验证对象,对象不为空则返回对象。

接着是JedisPoolAbstract,JedisPoolAbstract的代码比较简单,指定了泛型为Jedis,限制了池中资源是Jedis

public class JedisPoolAbstract extends Pool<Jedis> {

  public JedisPoolAbstract() {
    super();
  }

  public JedisPoolAbstract(GenericObjectPoolConfig poolConfig, PooledObjectFactory<Jedis> factory) {
    super(poolConfig, factory);
  }

  @Override
  protected void returnBrokenResource(Jedis resource) {
    super.returnBrokenResource(resource);
  }

  @Override
  protected void returnResource(Jedis resource) {
    super.returnResource(resource);
  }
}

最后是JedisPool的代码

public class JedisPool extends JedisPoolAbstract {
 /**丰富的构造方法-省略**/

 /**核心构造方法**/
public JedisPool(final GenericObjectPoolConfig poolConfig, final URI uri,
      final int connectionTimeout, final int soTimeout) {
    //JedisFactory 是PooledObjectFactory实现类
    super(poolConfig, new JedisFactory(uri, connectionTimeout, soTimeout, null));
  }

 /**重写获取资源方法**/
 @Override
  public Jedis getResource() {
    Jedis jedis = super.getResource();
    jedis.setDataSource(this);
    return jedis;
  }

 /**重写归还异常资源方法,对象池会使其不可用并销毁**/
  @Override
  protected void returnBrokenResource(final Jedis resource) {
    if (resource != null) {
      returnBrokenResourceObject(resource);
    }
  }

 /**重写归还资源方法**/
  @Override
  protected void returnResource(final Jedis resource) {
    if (resource != null) {
      try {
        resource.resetState();
        returnResourceObject(resource);
      } catch (Exception e) {
        returnBrokenResource(resource);
        throw new JedisException("Resource is returned to the pool as broken", e);
      }
    }
  }
}

JedisPool也是重写了父类获取资源,归还资源的方法;其中我们需要注意的是JedisPool的构造方法调用了父类的构造方法,JedisFactory是PooledObjectFactory的实现类。

  @Override
  public PooledObject<Jedis> makeObject() throws Exception {
    final HostAndPort hostAndPort = this.hostAndPort.get();
    final Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout,
        soTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier);

    try {
      jedis.connect();
      if (password != null) {
        jedis.auth(password);
      }
      if (database != 0) {
        jedis.select(database);
      }
      if (clientName != null) {
        jedis.clientSetname(clientName);
      }
    } catch (JedisException je) {
      jedis.close();
      throw je;
    }

    return new DefaultPooledObject<Jedis>(jedis);

  }

我们梳理一下整个JedisPool的使用流程;JedisPool利用GenericObjectPool实现了Jedis资源池化,其构造函数中的JedisFactory实现了PooledObjectFactory的接口,GenericObjectPool实例对象 internalPool 是 JedisPool 的父类 Pool的成员变量,初始化JedisPool 时会调用父类构造方法,初始化internalPool,在需要申请资源时JedisPool 实例调用 getResource方法,getResource调用父类实现,父类实现是调用internalPool 的borrowObject()完成资源的获取;归还资源的流程类似。

显示全文