xuxiaohui9216 发表于 2015-9-1 10:54:25

代码中实际运用memcached——java

  
  以下文章取自:http://jameswxx.iteye.com/blog/1168711
  
  memcached的java客户端有好几种,http://code.google.com/p/memcached/wiki/Clients 罗列了以下几种



Html代码
[*]spymemcached
[*]
[*]    * http://www.couchbase.org/code/couchbase/java
[*]          o An improved Java API maintained by Matt Ingenthron and others at Couchbase.
[*]          o Aggressively optimised, ability to run async, supports binary protocol, support Membase and Couchbase features, etc. See site for details.
[*]
[*]Java memcached client
[*]
[*]    * http://www.whalin.com/memcached
[*]          o A Java API is maintained by Greg Whalin from Meetup.com.
[*]
[*]More Java memcached clients
[*]
[*]    * http://code.google.com/p/javamemcachedclient
[*]    * http://code.google.com/p/memcache-client-forjava
[*]    * http://code.google.com/p/xmemcached
[*]
[*]Integrations
[*]
[*]    * http://code.google.com/p/simple-spring-memcached
[*]    * http://code.google.com/p/memcached-session-manager
  
  
  
我看的是第二个:Java memcached client源码,代码很简洁,一共只有9个类,最主要的有以下三个
MemcachedClient.java   客户端,负责提供外出程序接口,如get/set方法等等
  SockIOPool.java          一个自平衡的连接池
NativeHandler.java       负责部分数据类型的序列化
  
  它包含以下几个部分
1:key的服务端分布
2:数据序列化和压缩
3:连接池(连接方式和池的动态自动平衡)
4:failover和failback机制
5:和memcached服务器的通讯协议
关于这几个点,我从key的set/get说起,会贯穿上面列举的4个部分。这个文章写下来,本来是作为一个笔记,思维比较跳跃,可能不是很连贯,如有疑问,欢迎站内交流。这个client的代码
  很简洁明了,我也没有加过多注释,只是理了一个脉络。
  
  
  
从客户端自带的测试代码开始



Java代码
[*]package com.meetup.memcached.test;
[*]import com.meetup.memcached.*;
[*]import org.apache.log4j.*;
[*]
[*]public class TestMemcached{   
[*]    public static void main(String[] args) {
[*]      BasicConfigurator.configure();
[*]      String[] servers = { "127.0.0.1:12000"};
[*]      SockIOPool pool = SockIOPool.getInstance();
[*]      pool.setServers( servers );
[*]      pool.setFailover( true );//故障转移
[*]   pool.setInitConn( 10 ); //初始化连接为10
[*]      pool.setMinConn( 5 );//最小连接为5
[*]      pool.setMaxConn( 250 );//最大连接为250
[*]      pool.setMaintSleep( 30 );//平衡线程休眠时间为30ms
[*]      pool.setNagle( false );//Nagle标志为false
[*]      pool.setSocketTO( 3000 );//响应超时时间为3000ms
[*]      pool.setAliveCheck( true );//需要可用状态检查
[*]   //初始化连接池,默认名称为"default"
[*]      pool.initialize();
[*]      //新建一个memcached客户端,如果没有给名字
[*]   MemcachedClient mcc = new MemcachedClient();
[*]
[*]      // turn off most memcached client logging:
[*]      com.meetup.memcached.Logger.getLogger( MemcachedClient.class.getName() ).setLevel( com.meetup.memcached.Logger.LEVEL_WARN );
[*]
[*]      for ( int i = 0; i < 10; i++ ) {
[*]            boolean success = mcc.set( "" + i, "Hello!" );
[*]            String result = (String)mcc.get( "" + i );
[*]            System.out.println( String.format( "set( %d ): %s", i, success ) );
[*]            System.out.println( String.format( "get( %d ): %s", i, result ) );
[*]      }
[*]
[*]      System.out.println( "\n\t -- sleeping --\n" );
[*]      try { Thread.sleep( 10000 ); } catch ( Exception ex ) { }
[*]
[*]      for ( int i = 0; i < 10; i++ ) {
[*]            boolean success = mcc.set( "" + i, "Hello!" );
[*]            String result = (String)mcc.get( "" + i );
[*]            System.out.println( String.format( "set( %d ): %s", i, success ) );
[*]            System.out.println( String.format( "get( %d ): %s", i, result ) );
[*]      }
[*]    }
[*]}
  
  

以上代码大概做了这几件事情:
初始化一个连接池
新建一个memcached客户端
set一个key/value
get一个key,并且打印出value
这是我们实际应用中很常见的场景。
  
连接池的创建和初始化
      连接池SockIOPool是非常重要的部分,它的好坏直接决定了客户端的性能。SockIOPool用一个HashMap持有多个连接池对象,连接池以名称作为标识,默认为"default"。看看
  SockIOPool的getInstance方法就知道了。



Java代码
[*]public static SockIOPool getInstance() {
[*]      return getInstance("default");
[*]}
[*]
[*]public static synchronized SockIOPool getInstance(String poolName) {
[*]      if (pools.containsKey(poolName)) return pools.get(poolName);
[*]
[*]      SockIOPool pool = new SockIOPool();
[*]      pools.put(poolName, pool);
[*]
[*]      return pool;
[*]}
  
  
  
连接池实例化完成后,还需要初始化,看看pool.initialize()做了什么:



Java代码
[*]
[*]
[*]public void initialize() {
[*]   //这里以自身作为同步锁,防止被多次初始化
[*]   synchronized (this) {
[*]   // 如果已经被初始化了则终止初始化过程
[*]   if (initialized && (buckets != null || consistentBuckets != null) && (availPool != null)&& (busyPool != null)) {
[*]      log.error("++++ trying to initialize an already initialized pool");
[*]      return;
[*]   }
[*]    <span style="color: #ff0000;">// 可用连接集合</span>
[*]      availPool = new HashMap<String, Map<SockIO, Long>>(servers.length * initConn);
[*]   //工作连接集合
[*]   busyPool = new HashMap<String, Map<SockIO, Long>>(servers.length * initConn);   
[*]   // 不可用连接集合         
[*]   deadPool = new IdentityHashMap<SockIO, Integer>();
[*]    hostDeadDur = new HashMap<String, Long>();
[*]   hostDead = new HashMap<String, Date>();
[*]   maxCreate = (poolMultiplier > minConn) ? minConn : minConn / poolMultiplier;
[*]   if (log.isDebugEnabled()) {
[*]      log.debug("++++ initializing pool with following settings:");
[*]      log.debug("++++ initial size: " + initConn);
[*]      log.debug("++++ min spare   : " + minConn);
[*]      log.debug("++++ max spare   : " + maxConn);
[*]   }
[*]   if (servers == null || servers.length <= 0) {
[*]      log.error("++++ trying to initialize with no servers");
[*]      throw new IllegalStateException("++++ trying to initialize with no servers");
[*]   }
[*]   // initalize our internal hashing structures
[*]   if (this.hashingAlg == CONSISTENT_HASH) populateConsistentBuckets();
[*]   else populateBuckets();
[*]   // mark pool as initialized
[*]   this.initialized = true;
[*]   // start maint thread
[*]   if (this.maintSleep > 0) this.startMaintThread();
[*]}
[*]}
  

  
  
  连接池的关闭
  很简单,只是重置清空相关参数而已



Java代码
[*]public void shutDown() {
[*]      synchronized (this) {
[*]            if (log.isDebugEnabled()) log.debug("++++ SockIOPool shutting down...");
[*]
[*]            if (maintThread != null && maintThread.isRunning()) {
[*]                // stop the main thread
[*]                stopMaintThread();
[*]
[*]                // wait for the thread to finish
[*]                while (maintThread.isRunning()) {
[*]                  if (log.isDebugEnabled()) log.debug("++++ waiting for main thread to finish run +++");
[*]                  try {
[*]                        Thread.sleep(500);
[*]                  } catch (Exception ex) {
[*]                  }
[*]                }
[*]            }
[*]
[*]            if (log.isDebugEnabled()) log.debug("++++ closing all internal pools.");
[*]            closePool(availPool);
[*]            closePool(busyPool);
[*]            availPool = null;
[*]            busyPool = null;
[*]            buckets = null;
[*]            consistentBuckets = null;
[*]            hostDeadDur = null;
[*]            hostDead = null;
[*]            maintThread = null;
[*]            initialized = false;
[*]            if (log.isDebugEnabled()) log.debug("++++ SockIOPool finished shutting down.");
[*]      }
[*]    }
  
  
  
  
  连接池的自动平衡
SockIOPool的initialize()方法最后有这么一行代码
  // start maint thread
if (this.maintSleep > 0) this.startMaintThread();

  这是在初始化完成后,启动线程池平衡线程
  



Java代码
[*]protected void startMaintThread() {
[*]      if (maintThread != null) {
[*]          if (maintThread.isRunning()) {
[*]            log.error("main thread already running");
[*]          } else {
[*]            maintThread.start();
[*]          }
[*]      } else {
[*]          maintThread = new MaintThread(this);
[*]          maintThread.setInterval(this.maintSleep);
[*]          maintThread.start();
[*]      }
[*]}
  
  
  MaintThread的run方法



Java代码
[*]public void run() {
[*]   this.running = true;
[*]   while (!this.stopThread) {
[*]       try {
[*]         Thread.sleep(interval);
[*]         // if pool is initialized, then
[*]         // run the maintenance method on itself
[*]         if (pool.isInitialized()) pool.selfMaint();
[*]       } catch (Exception e) {
[*]         break;
[*]       }
[*]   }
[*]   this.running = false;
  
其实最终的平衡方法是SockIOPool.selfMaint()
  



Java代码
[*]protected void selfMaint() {
[*]      if (log.isDebugEnabled()) log.debug("++++ Starting self maintenance....");
[*]
[*]      // go through avail sockets and create sockets
[*]      // as needed to maintain pool settings
[*]      Map<String, Integer> needSockets = new HashMap<String, Integer>();
[*]
[*]      synchronized (this) {
[*]            // 先统计每个服务器实例的可用连接是否小于最小可用连接数
[*]      for (Iterator<String> i = availPool.keySet().iterator(); i.hasNext();) {
[*]                String host = i.next();
[*]                Map<SockIO, Long> sockets = availPool.get(host);
[*]
[*]                if (log.isDebugEnabled()) log.debug("++++ Size of avail pool for host (" + host + ") = "
[*]                                                    + sockets.size());
[*]
[*]                // if pool is too small (n < minSpare)
[*]                if (sockets.size() < minConn) {
[*]                  // need to create new sockets
[*]                  int need = minConn - sockets.size();
[*]                  needSockets.put(host, need);
[*]                }
[*]            }
[*]      }
[*]
[*]      // 如果小于最小可用连接数,则要新建增加可用连接
[*]   Map<String, Set<SockIO>> newSockets = new HashMap<String, Set<SockIO>>();
[*]
[*]      for (String host : needSockets.keySet()) {
[*]            Integer need = needSockets.get(host);
[*]
[*]            if (log.isDebugEnabled()) log.debug("++++ Need to create " + need + " new sockets for pool for host: "
[*]                                                + host);
[*]
[*]            Set<SockIO> newSock = new HashSet<SockIO>(need);
[*]            for (int j = 0; j < need; j++) {
[*]                SockIO socket = createSocket(host);
[*]                if (socket == null) break;
[*]                newSock.add(socket);
[*]            }
[*]            newSockets.put(host, newSock);
[*]      }
[*]
[*]      // synchronize to add and remove to/from avail pool
[*]      // as well as clean up the busy pool (no point in releasing
[*]      // lock here as should be quick to pool adjust and no
[*]      // blocking ops here)
[*]      synchronized (this) {
[*]            //将新建的连接添加到可用连接集合里
[*]       for (String host : newSockets.keySet()) {
[*]                Set<SockIO> sockets = newSockets.get(host);
[*]                for (SockIO socket : sockets)
[*]                  addSocketToPool(availPool, host, socket);
[*]            }
[*]
[*]            for (Iterator<String> i = availPool.keySet().iterator(); i.hasNext();) {
[*]                String host = i.next();
[*]                Map<SockIO, Long> sockets = availPool.get(host);
[*]                if (log.isDebugEnabled()) log.debug("++++ Size of avail pool for host (" + host + ") = "
[*]                                                    + sockets.size());
[*]
[*]                //如果可用连接超过了最大连接数,则要关闭一些
[*]          if (sockets.size() > maxConn) {
[*]                  // need to close down some sockets
[*]                  int diff = sockets.size() - maxConn;
[*]                  int needToClose = (diff <= poolMultiplier) ? diff : (diff) / poolMultiplier;
[*]
[*]                  if (log.isDebugEnabled()) log.debug("++++ need to remove " + needToClose
[*]                                                      + " spare sockets for pool for host: " + host);
[*]
[*]                  for (Iterator<SockIO> j = sockets.keySet().iterator(); j.hasNext();) {
[*]                        if (needToClose <= 0) break;
[*]
[*]                        // remove stale entries
[*]                        SockIO socket = j.next();
[*]                        long expire = sockets.get(socket).longValue();
[*]
[*]                        // 这里回收可用连接池的闲置连接,连接设置到可用连接池里时,expire设置为当前时间。如果 (expire + maxIdle) < System.currentTimeMillis()为true,则表
[*]明,该连接在可用连接池呆得太久了,需要回收
[*]               if ((expire + maxIdle) < System.currentTimeMillis()) {
[*]                            if (log.isDebugEnabled()) log.debug("+++ removing stale entry from pool as it is past its idle timeout and pool is over max spare");
[*]
[*]                            // remove from the availPool
[*]                            deadPool.put(socket, ZERO);
[*]                            j.remove();
[*]                            needToClose--;
[*]                        }
[*]                  }
[*]                }
[*]            }
[*]
[*]            //清理正在工作的连接集合
[*]      for (Iterator<String> i = busyPool.keySet().iterator(); i.hasNext();) {
[*]                String host = i.next();
[*]                Map<SockIO, Long> sockets = busyPool.get(host);
[*]                if (log.isDebugEnabled()) log.debug("++++ Size of busy pool for host (" + host + ")= "
[*]                                                    + sockets.size());
[*]                // loop through all connections and check to see if we have any hung connections
[*]                for (Iterator<SockIO> j = sockets.keySet().iterator(); j.hasNext();) {
[*]                  // remove stale entries
[*]                  SockIO socket = j.next();
[*]                  long hungTime = sockets.get(socket).longValue();
[*]                  //如果工作时间超过maxBusyTime,则也要回收掉,超过maxBusyTime,可能是服务器响应时间过长
[*]             if ((hungTime + maxBusyTime) < System.currentTimeMillis()) {
[*]                        log.error("+++ removing potentially hung connection from busy pool ... socket in pool for "
[*]                                  + (System.currentTimeMillis() - hungTime) + "ms");
[*]
[*]                        // remove from the busy pool
[*]                        deadPool.put(socket, ZERO);
[*]                        j.remove();
[*]                  }
[*]                }
[*]            }
[*]      }
[*]
[*]      // 最后清理不可用连接集合
[*]   Set<SockIO> toClose;
[*]      synchronized (deadPool) {
[*]            toClose = deadPool.keySet();
[*]            deadPool = new IdentityHashMap<SockIO, Integer>();
[*]      }
[*]
[*]      for (SockIO socket : toClose) {
[*]            try {
[*]                socket.trueClose(false);
[*]            } catch (Exception ex) {
[*]                log.error("++++ failed to close SockIO obj from deadPool");
[*]                log.error(ex.getMessage(), ex);
[*]            }
[*]
[*]            socket = null;
[*]      }
[*]
[*]      if (log.isDebugEnabled()) log.debug("+++ ending self maintenance.");
[*]    }
  

  
  
  key的服务器端分布
  初始化方法其实就是根据每个服务器的权重,建立一个服务器地址集合,如果选择了一致性哈希,则对服务器地址进行一致性哈希分布,一致性哈希算法比较简单,如果不了解的同学,可以
  自行google一下,initialize() 方法里有这段代码:
  //一致性哈希



Java代码
[*]if (this.hashingAlg == CONSISTENT_HASH){
[*]populateConsistentBuckets();
[*]}else populateBuckets();
  
看看populateConsistentBuckets()方法
  // 用一致性哈希算法将服务器分布在一个2的32次方的环里,服务器的分布位置<=servers.length*40*4



Java代码
[*]private void populateConsistentBuckets() {
[*]    if (log.isDebugEnabled()) log.debug("++++ initializing internal hashing structure for consistent hashing");
[*]
[*]    // store buckets in tree map
[*]    this.consistentBuckets = new TreeMap<Long, String>();
[*]    MessageDigest md5 = MD5.get();
[*]    if (this.totalWeight <= 0 && this.weights != null) {
[*]      for (int i = 0; i < this.weights.length; i++)
[*]            this.totalWeight += (this.weights == null) ? 1 : this.weights;
[*]    } else if (this.weights == null) {
[*]      this.totalWeight = this.servers.length;
[*]    }
[*]
[*]    for (int i = 0; i < servers.length; i++) {
[*]       int thisWeight = 1;
[*]       if (this.weights != null && this.weights != null) thisWeight = this.weights;
[*]
[*]      //这个值永远小于40 * this.servers.length,因为thisWeight/totalWeight永远小于1



Java代码
[*] double factor = Math.floor(((double) (40 * this.servers.length * thisWeight)) / (double) this.totalWeight);
[*]
[*]//服务器的分布位置为factor*4,factor<=40*this.servers.length,所以服务器的分布位置& lt;=40*this.servers.length*4。
[*]for (long j = 0; j < factor; j++) {
[*]      //md5值的二进制数组为16位
[*]   byte[] d = md5.digest((servers + "-" + j).getBytes());
[*]       //16位二进制数组每4位为一组,每组第4个值左移24位,第三个值左移16位,第二个值左移8位,第一个值不移位。进行或运算,得到一个小于2的32 次方的long值。
[*]    for (int h = 0; h < 4; h++) {
[*]            Long k = ((long) (d & 0xFF) << 24) | ((long) (d & 0xFF) << 16)
[*]                         | ((long) (d & 0xFF) << 8) | ((long) (d & 0xFF));
[*]                consistentBuckets.put(k, servers);
[*]                if (log.isDebugEnabled()) log.debug("++++ added " + servers + " to server bucket");
[*]         }
[*]   }
[*]
[*]   // create initial connections
[*]   if (log.isDebugEnabled()) log.debug("+++ creating initial connections (" + initConn + ") for host: "
[*]                                          + servers);
[*]
[*]   //创建连接
[*] for (int j = 0; j < initConn; j++) {
[*]       SockIO socket = createSocket(servers);
[*]       if (socket == null) {
[*]            log.error("++++ failed to create connection to: " + servers + " -- only " + j + " created.");
[*]             break;
[*]       }
[*]
[*]       //添加到可用连接池
[*]   addSocketToPool(availPool, servers, socket);
[*]       if (log.isDebugEnabled()) log.debug("++++ created and added socket: " + socket.toString()
[*]                                                + " for host " + servers);
[*]   }
[*]}
  
  
  如果不是一致性哈希,则只是普通分布,很简单,只是根据权重将服务器地址放入buckets这个List里



Java代码
[*]private void populateBuckets() {
[*]      if (log.isDebugEnabled()) log.debug("++++ initializing internal hashing structure for consistent hashing");
[*]
[*]      // store buckets in tree map
[*]      this.buckets = new ArrayList<String>();
[*]
[*]      for (int i = 0; i < servers.length; i++) {
[*]            if (this.weights != null && this.weights.length > i) {
[*]                for (int k = 0; k < this.weights.intValue(); k++) {
[*]                  this.buckets.add(servers);
[*]                  if (log.isDebugEnabled()) log.debug("++++ added " + servers + " to server bucket");
[*]                }
[*]            } else {
[*]                this.buckets.add(servers);
[*]                if (log.isDebugEnabled()) log.debug("++++ added " + servers + " to server bucket");
[*]            }
[*]
[*]            // create initial connections
[*]            if (log.isDebugEnabled()) log.debug("+++ creating initial connections (" + initConn + ") for host: "
[*]                                                + servers);
[*]
[*]            for (int j = 0; j < initConn; j++) {
[*]                SockIO socket = createSocket(servers);
[*]                if (socket == null) {
[*]                  log.error("++++ failed to create connection to: " + servers + " -- only " + j + " created.");
[*]                  break;
[*]                }
[*]
[*]                //新建连接后,加入到可用连接集合里
[*]         addSocketToPool(availPool, servers, socket);
[*]                if (log.isDebugEnabled()) log.debug("++++ created and added socket: " + socket.toString()
[*]                                                    + " for host " + servers);
[*]            }
[*]      }
[*]    }
  

  
  
  如何创建socket连接
  在上面的private void populateBuckets()方法里,createSocket(servers)是创建到服务器的连接,看看这个方法



Java代码
[*]protected SockIO createSocket(String host) {
[*]SockIO socket = null;
[*]//hostDeadLock是一个可重入锁,它的变量声明为
[*]
[*]
[*]private final ReentrantLock             hostDeadLock    = new ReentrantLock();
[*]hostDeadLock.lock();
[*]try {
[*]//hostDead.containsKey(host)为true表示曾经连接过该服务器,但没有成功。
[*]//hostDead是一个HashMap,key为服务器地址,value为当时连接不成功的时间
[*]//hostDeadDur是一个HashMap,key为服务器地址,value为设置的重试间隔时间
[*]
[*]   if (failover && failback && hostDead.containsKey(host) && hostDeadDur.containsKey(host)) {
[*]       Date store = hostDead.get(host);
[*]       long expire = hostDeadDur.get(host).longValue();
[*]
[*]      if ((store.getTime() + expire) > System.currentTimeMillis()) return null;
[*]   }
[*] } finally {
[*]    hostDeadLock.unlock();
[*]}
[*]
[*]
[*]try {
[*]   socket = new SockIO(this, host, this.socketTO, this.socketConnectTO, this.nagle);
[*]   if (!socket.isConnected()) {
[*]               log.error("++++ failed to get SockIO obj for: " + host + " -- new socket is not connected");
[*]               deadPool.put(socket, ZERO);
[*]               socket = null;
[*]   }
[*] } catch (Exception ex) {
[*]         log.error("++++ failed to get SockIO obj for: " + host);
[*]         log.error(ex.getMessage(), ex);
[*]         socket = null;
[*] }
[*]
[*]// if we failed to get socket, then mark
[*]// host dead for a duration which falls off
[*]hostDeadLock.lock();
[*]try {
[*]         //到了这里,socket仍然为null,说明这个server悲剧了,无法和它创建连接,则要把该server丢到不可用的主机集合里
[*]      if (socket == null) {
[*]               Date now = new Date();
[*]               hostDead.put(host, now);
[*]
[*]               //如果上次就不可用了,到期了仍然不可用,就要这次的不可用时间设为上次的2倍,否则初始时长为1000ms
[*]               long expire = (hostDeadDur.containsKey(host)) ? (((Long) hostDeadDur.get(host)).longValue() * 2) : 1000;
[*]
[*]               if (expire > MAX_RETRY_DELAY) expire = MAX_RETRY_DELAY;
[*]
[*]               hostDeadDur.put(host, new Long(expire));
[*]               if (log.isDebugEnabled()) log.debug("++++ ignoring dead host: " + host + " for " + expire + " ms");
[*]
[*]               // 既然这个host都不可用了,那与它的所有连接当然要从可用连接集合"availPool"里删除掉
[*]         clearHostFromPool(availPool, host);
[*]         } else {
[*]               if (log.isDebugEnabled()) log.debug("++++ created socket (" + socket.toString() + ") for host: " + host);
[*]               //连接创建成功,如果上次不成功,那么这次要把该host从不可用主机集合里删除掉
[*]          if (hostDead.containsKey(host) || hostDeadDur.containsKey(host)) {
[*]                   hostDead.remove(host);
[*]                   hostDeadDur.remove(host);
[*]               }
[*]         }
[*]       } finally {
[*]         hostDeadLock.unlock();
[*]       }
[*]
[*]       return socket;
[*]   }
  
  
  SockIO构造函数



Java代码
[*]public SockIO(SockIOPool pool, String host, int timeout, int connectTimeout, boolean noDelay)
[*]                                                                                             throws IOException,
[*]                                                                                             UnknownHostException {
[*]      this.pool = pool;
[*]      String[] ip = host.split(":");
[*]      // get socket: default is to use non-blocking connect
[*]      sock = getSocket(ip, Integer.parseInt(ip), connectTimeout);
[*]      if (timeout >= 0) this.sock.setSoTimeout(timeout);
[*]      // testing only
[*]      sock.setTcpNoDelay(noDelay);
[*]      // wrap streams
[*]      in = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
[*]      out = new BufferedOutputStream(sock.getOutputStream());
[*]      this.host = host;
[*]}
  

  getSocket方法



Java代码
[*]protected static Socket getSocket(String host, int port, int timeout) throws IOException {
[*]         SocketChannel sock = SocketChannel.open();
[*]         sock.socket().connect(new InetSocketAddress(host, port), timeout);
[*]         return sock.socket();
[*] }
  可以看到,socket连接是用nio方式创建的。
  
  
  
  新建MemcachedClient
MemcachedClient mcc = new MemcachedClient();新建了一个memcached客户端,看看构造函数,没作什么,只是设置参数而已。



Java代码
[*]/**
[*]* Creates a new instance of MemCachedClient.
[*]*/
[*] public MemcachedClient() {
[*]   init();
[*] }
[*]
[*]
[*] private void init() {
[*]   this.sanitizeKeys       = true;
[*]   this.primitiveAsString= false;
[*]   this.compressEnable   = true;
[*]   this.compressThreshold= COMPRESS_THRESH;
[*]   this.defaultEncoding    = "UTF-8";
[*]   this.poolName         = ( this.poolName == null ) ? "default" : this.poolName;
[*]
[*]   // get a pool instance to work with for the life of this instance
[*]   this.pool               = SockIOPool.getInstance( poolName );
[*] }
  
  
  
  
  
  set方法如何工作
  到此memcached客户端初始化工作完成。再回到测试类TestMemcached,看看for循环里的
  boolean success = mcc.set( ""+ i, "Hello!" );
String result = (String)mcc.get( "" + i );
初始化后,就可以set,get了。看看set是怎么工作的。
  



Java代码
[*]/**
[*]   * Stores data on the server; only the key and the value are specified.
[*]   *
[*]   * @param key key to store data under
[*]   * @param value value to store
[*]   * @return true, if the data was successfully stored
[*]   */
[*]    public boolean set( String key, Object value ) {
[*]      return set( "set", key, value, null, null, primitiveAsString );
[*]    }
[*]
[*]
[*]    //这个set方法比较长   
[*]   private boolean set( String cmdname, String key, Object value, Date expiry, Integer hashCode, boolean asString ) {
[*]      if ( cmdname == null || cmdname.trim().equals( "" ) || key == null ) {
[*]            log.error( "key is null or cmd is null/empty for set()" );
[*]            return false;
[*]      }
[*]
[*]      try {
[*]            key = sanitizeKey( key );
[*]      }
[*]      catch ( UnsupportedEncodingException e ) {
[*]            // if we have an errorHandler, use its hook
[*]            if ( errorHandler != null )
[*]                errorHandler.handleErrorOnSet( this, e, key );
[*]            log.error( "failed to sanitize your key!", e );
[*]            return false;
[*]      }
[*]
[*]      if ( value == null ) {
[*]            log.error( "trying to store a null value to cache" );
[*]            return false;
[*]      }
[*]
[*]      // get SockIO obj
[*]      SockIOPool.SockIO sock = pool.getSock( key, hashCode );
[*]
[*]      if ( sock == null ) {
[*]            if ( errorHandler != null )
[*]                errorHandler.handleErrorOnSet( this, new IOException( "no socket to server available" ), key );
[*]            return false;
[*]      }
[*]
[*]      if ( expiry == null )
[*]            expiry = new Date(0);
[*]
[*]      // store flags
[*]      int flags = 0;
[*]
[*]      // byte array to hold data
[*]      byte[] val;
[*]
[*]    //这些类型自己序列化,否则由java序列化处理
[*]   if ( NativeHandler.isHandled( value ) ) {         
[*]            if ( asString ) {
[*]                //如果是字符串,则直接getBytes   
[*]                try {
[*]                  if ( log.isInfoEnabled() )
[*]                        log.info( "++++ storing data as a string for key: " + key + " for class: " + value.getClass().getName() );
[*]                  val = value.toString().getBytes( defaultEncoding );
[*]                }
[*]                catch ( UnsupportedEncodingException ue ) {
[*]                  // if we have an errorHandler, use its hook
[*]                  if ( errorHandler != null )
[*]                        errorHandler.handleErrorOnSet( this, ue, key );
[*]                  log.error( "invalid encoding type used: " + defaultEncoding, ue );
[*]                  sock.close();
[*]                  sock = null;
[*]                  return false;
[*]                }
[*]            }
[*]            else {
[*]                try {
[*]                  if ( log.isInfoEnabled() )
[*]                        log.info( "Storing with native handler..." );
[*]                  flags |= NativeHandler.getMarkerFlag( value );
[*]                  val    = NativeHandler.encode( value );
[*]                }
[*]                catch ( Exception e ) {
[*]                  // if we have an errorHandler, use its hook
[*]                  if ( errorHandler != null )
[*]                        errorHandler.handleErrorOnSet( this, e, key );
[*]                  log.error( "Failed to native handle obj", e );
[*]
[*]                  sock.close();
[*]                  sock = null;
[*]                  return false;
[*]                }
[*]            }
[*]      }
[*]      else {
[*]            // 否则用java的序列化
[*]      try {
[*]                if ( log.isInfoEnabled() )
[*]                  log.info( "++++ serializing for key: " + key + " for class: " + value.getClass().getName() );
[*]                ByteArrayOutputStream bos = new ByteArrayOutputStream();
[*]                (new ObjectOutputStream( bos )).writeObject( value );
[*]                val = bos.toByteArray();
[*]                flags |= F_SERIALIZED;
[*]            }
[*]            catch ( IOException e ) {
[*]                // if we have an errorHandler, use its hook
[*]                if ( errorHandler != null )
[*]                  errorHandler.handleErrorOnSet( this, e, key );
[*]
[*]                // if we fail to serialize, then
[*]                // we bail
[*]                log.error( "failed to serialize obj", e );
[*]                log.error( value.toString() );
[*]
[*]                // return socket to pool and bail
[*]                sock.close();
[*]                sock = null;
[*]                return false;
[*]            }
[*]      }
[*]
[*]      //压缩内容
[*]   if ( compressEnable && val.length > compressThreshold ) {
[*]            try {
[*]                if ( log.isInfoEnabled() ) {
[*]                  log.info( "++++ trying to compress data" );
[*]                  log.info( "++++ size prior to compression: " + val.length );
[*]                }
[*]                ByteArrayOutputStream bos = new ByteArrayOutputStream( val.length );
[*]                GZIPOutputStream gos = new GZIPOutputStream( bos );
[*]                gos.write( val, 0, val.length );
[*]                gos.finish();
[*]                gos.close();
[*]
[*]                // store it and set compression flag
[*]                val = bos.toByteArray();
[*]                flags |= F_COMPRESSED;
[*]
[*]                if ( log.isInfoEnabled() )
[*]                  log.info( "++++ compression succeeded, size after: " + val.length );
[*]            }
[*]            catch ( IOException e ) {
[*]                // if we have an errorHandler, use its hook
[*]                if ( errorHandler != null )
[*]                  errorHandler.handleErrorOnSet( this, e, key );
[*]                log.error( "IOException while compressing stream: " + e.getMessage() );
[*]                log.error( "storing data uncompressed" );
[*]            }
[*]      }
[*]
[*]      // now write the data to the cache server
[*]      try {
[*]             //按照memcached协议组装命令
[*]      String cmd = String.format( "%s %s %d %d %d\r\n", cmdname, key, flags, (expiry.getTime() / 1000), val.length );
[*]            sock.write( cmd.getBytes() );
[*]            sock.write( val );
[*]            sock.write( "\r\n".getBytes() );
[*]            sock.flush();
[*]
[*]            // get result code
[*]            String line = sock.readLine();
[*]            if ( log.isInfoEnabled() )
[*]                log.info( "++++ memcache cmd (result code): " + cmd + " (" + line + ")" );
[*]
[*]            if ( STORED.equals( line ) ) {
[*]                if ( log.isInfoEnabled() )
[*]                  log.info("++++ data successfully stored for key: " + key );
[*]                sock.close();
[*]                sock = null;
[*]                return true;
[*]            }
[*]            else if ( NOTSTORED.equals( line ) ) {
[*]                if ( log.isInfoEnabled() )
[*]                  log.info( "++++ data not stored in cache for key: " + key );
[*]            }
[*]            else {
[*]                log.error( "++++ error storing data in cache for key: " + key + " -- length: " + val.length );
[*]                log.error( "++++ server response: " + line );
[*]            }
[*]      }
[*]      catch ( IOException e ) {
[*]
[*]            // if we have an errorHandler, use its hook
[*]            if ( errorHandler != null )
[*]                errorHandler.handleErrorOnSet( this, e, key );
[*]
[*]            // exception thrown
[*]            log.error( "++++ exception thrown while writing bytes to server on set" );
[*]            log.error( e.getMessage(), e );
[*]
[*]            try {
[*]                sock.trueClose();
[*]            }
[*]            catch ( IOException ioe ) {
[*]                log.error( "++++ failed to close socket : " + sock.toString() );
[*]            }
[*]
[*]            sock = null;
[*]      }
[*]
[*]      //用完了,就要回收哦,sock.close()不是真正的关闭,只是放入到可用连接集合里。   
[*]       if ( sock != null ) {
[*]            sock.close();
[*]            sock = null;
[*]      }
[*]      return false;
[*]    }
  通过set方法向服务器设置key和value,涉及到以下几个点
数据的压缩和序列化 (如果是get方法,则和set方法基本是相反的)
为key分配服务器 对于一些常用类型,采用自定义的序列化,具体要看NativeHander.java,这个类比较简单,有兴趣可以自己看看
  



Java代码
[*]public static boolean isHandled( Object value ) {
[*]       return (
[*]         value instanceof Byte            ||
[*]         value instanceof Boolean         ||
[*]         value instanceof Integer         ||
[*]         value instanceof Long            ||
[*]         value instanceof Character       ||
[*]         value instanceof String          ||
[*]         value instanceof StringBuffer    ||
[*]         value instanceof Float         ||
[*]         value instanceof Short         ||
[*]         value instanceof Double          ||
[*]         value instanceof Date            ||
[*]         value instanceof StringBuilder   ||
[*]         value instanceof byte[]
[*]         )
[*]       ? true
[*]       : false;
[*]   }
  其他类型则用java的默认序列化
  
  
  
  为key选择服务器
SockIOPool.SockIO sock = pool.getSock( key, hashCode );就是为key选择服务器



Java代码
[*]public SockIO getSock(String key, Integer hashCode) {
[*]      if (log.isDebugEnabled()) log.debug("cache socket pick " + key + " " + hashCode);
[*]      if (!this.initialized) {
[*]            log.error("attempting to get SockIO from uninitialized pool!");
[*]            return null;
[*]      }
[*]
[*]      // if no servers return null
[*]      if ((this.hashingAlg == CONSISTENT_HASH && consistentBuckets.size() == 0)
[*]            || (buckets != null && buckets.size() == 0)) return null;
[*]
[*]      // if only one server, return it
[*]      if ((this.hashingAlg == CONSISTENT_HASH && consistentBuckets.size() == 1)
[*]            || (buckets != null && buckets.size() == 1)) {
[*]            SockIO sock = (this.hashingAlg == CONSISTENT_HASH) ? getConnection(consistentBuckets.get(consistentBuckets.firstKey())) : getConnection(buckets.get(0));
[*]            if (sock != null && sock.isConnected()) {
[*]                if (aliveCheck) {//健康状态检查
[*]
[*]                  if (!sock.isAlive()) {
[*]                        sock.close();
[*]                        try {
[*]                            sock.trueClose();//有问题,真的关闭socket
[*]
[*]                        } catch (IOException ioe) {
[*]                            log.error("failed to close dead socket");
[*]                        }
[*]                        sock = null;
[*]                  }
[*]                }
[*]            } else {//连接不正常,放入不可用连接集合里
[*]          if (sock != null) {
[*]                  deadPool.put(sock, ZERO);
[*]                  sock = null;
[*]                }
[*]            }
[*]
[*]            return sock;
[*]      }
[*]
[*]      Set<String> tryServers = new HashSet<String>(Arrays.asList(servers));
[*]      // get initial bucket
[*]      long bucket = getBucket(key, hashCode);
[*]      String server = (this.hashingAlg == CONSISTENT_HASH) ? consistentBuckets.get(bucket) : buckets.get((int) bucket);
[*]
[*]      while (!tryServers.isEmpty()) {
[*]            // try to get socket from bucket
[*]            SockIO sock = getConnection(server);
[*]            if (log.isDebugEnabled()) log.debug("cache choose " + server + " for " + key);
[*]            if (sock != null && sock.isConnected()) {
[*]                if (aliveCheck) {
[*]                  if (sock.isAlive()) {
[*]                        return sock;
[*]                  } else {
[*]                        sock.close();
[*]                        try {
[*]                            sock.trueClose();
[*]                        } catch (IOException ioe) {
[*]                            log.error("failed to close dead socket");
[*]                        }
[*]                        sock = null;
[*]                  }
[*]                } else {
[*]                  return sock;
[*]                }
[*]            } else {
[*]                if (sock != null) {
[*]                  deadPool.put(sock, ZERO);
[*]                  sock = null;
[*]                }
[*]            }
[*]
[*]            // if we do not want to failover, then bail here
[*]            if (!failover) return null;
[*]
[*]            // log that we tried
[*]            tryServers.remove(server);
[*]
[*]            if (tryServers.isEmpty()) break;   
[*]         //注意哦,下面是failover机制
[*]      int rehashTries = 0;
[*]            while (!tryServers.contains(server)) {
[*]                String newKey = String.format("%s%s", rehashTries, key);
[*]                if (log.isDebugEnabled()) log.debug("rehashing with: " + newKey);
[*]
[*]                bucket = getBucket(newKey, null);
[*]                server = (this.hashingAlg == CONSISTENT_HASH) ? consistentBuckets.get(bucket) : buckets.get((int) bucket);
[*]                rehashTries++;
[*]            }
[*]      }
[*]      return null;
[*]    }
  

  
  下面这个方法是真正的从服务器获取连接
  
  
   



Java代码
[*]public SockIO getConnection(String host) {
[*]      if (!this.initialized) {
[*]            log.error("attempting to get SockIO from uninitialized pool!");
[*]            return null;
[*]      }
[*]
[*]      if (host == null) return null;
[*]
[*]      synchronized (this) {
[*]            // if we have items in the pool
[*]            // then we can return it
[*]            if (availPool != null && !availPool.isEmpty()) {
[*]                // take first connected socket
[*]                Map<SockIO, Long> aSockets = availPool.get(host);
[*]                if (aSockets != null && !aSockets.isEmpty()) {
[*]                  for (Iterator<SockIO> i = aSockets.keySet().iterator(); i.hasNext();) {
[*]                        SockIO socket = i.next();
[*]                        if (socket.isConnected()) {
[*]                            if (log.isDebugEnabled()) log.debug("++++ moving socket for host (" + host
[*]                                                                + ") to busy pool ... socket: " + socket);
[*]                            // remove from avail pool
[*]                            i.remove();
[*]                            // add to busy pool
[*]                            addSocketToPool(busyPool, host, socket);
[*]                            // return socket
[*]                            return socket;
[*]                        } else {
[*]                            // add to deadpool for later reaping
[*]                            deadPool.put(socket, ZERO);
[*]                            // remove from avail pool
[*]                            i.remove();
[*]                        }
[*]                  }
[*]                }
[*]            }
[*]      }
[*]
[*]      // create one socket -- let the maint thread take care of creating more
[*]      SockIO socket = createSocket(host);
[*]      if (socket != null) {
[*]            synchronized (this) {
[*]                addSocketToPool(busyPool, host, socket);
[*]            }
[*]      }
[*]      return socket;
[*]    }
[*]
[*]
  
  
  
  failover和failback
  这两者都是发生在获取可用连接这个环节。
  failover,如果为key选择的服务器不可用,则对key重新哈希选择下一个服务器,详见getSock方法的末尾。
  failback,用一个hashmap存储连接失败的服务器和对应的失效持续时间,每次获取连接时,都探测是否到了重试时间。
页: [1]
查看完整版本: 代码中实际运用memcached——java