采用redis实现分布式锁的一样栽思路

事先整理了千篇一律篇文章《zookeeper
分布式锁服务》,本文介绍的
Zookeeper 是坐 3.4.5 这个平静版本也底蕴,最新的版本可以通过公网
http://hadoop.apache.org/zookeeper/来获得,Zookeeper
的设置非常简单,下面将从今单机模式以及集群模式简单单方面介绍 Zookeeper
的Windows安装与配置.

在一如既往差工作吃,出现了一个稍微意外,一个在redis写及读之点子吃查获的结果连与预期的无一致,后面仔细考虑了后,发现凡是为集群中相继节点都应用共享的缓存、队列这些,有些场景被各个节点内可能会见发出资源竞争,可能会见发作各个节点内的“线程不安全问题”,单机中,可以动用锁来解决,在分布式环境下,就要用到分布式锁了,当时网上查了查看,发现了森分外神给闹之思绪及代码,自己试试着实现了转,在及时把思路分享给大家。

第一需要装JdK,从Oracle的Java网站下载,安装好简单,就不再详述。

Redis可以开分布式锁之规格

因为redis是单线程的,并行访问的一声令下以他里头会是串行执行的,所以redis可以当实现分布式锁的技艺。

单机模式

所用底通令

在头里几篇稿子中,已经介绍了Redis的主干型和那个对应的操作,有免顶了解的朋友可以错过《Redis常因此色知多少?》失探视。在实现分布式锁遭到,主要用了redis的字符串类型数据结构,以及以下的操作:

单机安装非常简单,只要获得到 Zookeeper
的压缩包并解压到某某目录如:C:\zookeeper-3.4.5\生,Zookeeper
的开行脚论以 bin 目录下,Windows 下的启动脚论是 zkServer.cmd。

set key value [ex 秒数] [px 毫秒数] [nx/xx] : 保存key-value

ex、px指的是key的有效期。
nx:表示当key不设有时时,创建key并保存value
xx:表示当key存在时,保存value

localhost:0>set hello world
OK
localhost:0>set hello world ex 100
OK
localhost:0>set hello world px 1000
OK
这里主要看看 nx与xx的效果
localhost:0>set hello worldnx nx 
NULL
localhost:0>set hello worldxx xx 
OK
因为hello这个key已经存在,所以带上参数nx后,返回了NULL,没有操作成功。
带上xx这个参数后,现在hello这个key的value已经被替换成了worldxx

实现着因故到了set指令的nx参数,作用是保留一个key-value,如果key不设有时时,创建key并保留value,如果key存在是,不举行操作返回NULL,在java代码中,这个令于打包成了Jedis的一个主意,我要好以代码中并且做了一个装进:

    /**
     * 如果key不存在则建立返回1
     * 如果key存则不作操作,返回0
     * @param key
     * @param value
     * @return
     */
    public Long setNx(final String key,final String value){
        return this.execute(new Function<Long, Jedis>() {
            @Override
            public Long callback(Jedis jedis) {
                //set命令带nx参数在Jedis中被封装成了setnx方法
                return jedis.setnx(key, value);
            }
        });
    }

重大为此来得到锁,setnx返回1,证明获取到了锁,返回0证明锁已经是。

在你行启动脚本之前,还有几独着力的安排起用安排一下,Zookeeper
的部署文件在 conf 目录下,这个目录下产生 zoo_sample.cfg 和
log4j.properties,你需要做的就算是将 zoo_sample.cfg 改名为 zoo.cfg,因为
Zookeeper
在开行时见面寻找这个文件作为默认配置文件。下面详细介绍一下,这个布局文件被逐一配置起之义。

expire key 整数值:设置key的生命周期以秒为单位

要用于装锁的逾期时,贴出java中封装的方法expire:

   /**
     * 设置key的过期时间
     * @param key
     * @param exp
     * @return
     */
    public Long expire(final String key,final int exp){
        return this.execute(new Function<Long, Jedis>() {
            @Override
            public Long callback(Jedis jedis) {
                return jedis.expire(key, exp);
            }
        });
    }
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=C:\\zookeeper-3.4.5\\data
dataLogDir=C:\\zookeeper-3.4.5\\log
# the port at which the clients will connect
clientPort=2181
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
get key : 取出key对应的value

获得出存进去的锁值,贴出java中封装的点子get:

   /**
     * 获取String类型的值
     *
     * @param key
     * @return
     */
    public String get(final String key) {
        return this.execute(new Function<String, Jedis>() {
            @Override
            public String callback(Jedis jedis) {
                return jedis.get(key);
            }
        });
    }
  • tickTime:这个时间是作为 Zookeeper
    服务器之间或客户端和服务器之间维持心跳的年华间隔,也便是每个
    tickTime 时间就是会发送一个心跳。
  • dataDir:顾名思义就是是 Zookeeper
    保存数据的目,默认情况下,Zookeeper
    将写多少的日记文件为保留于斯目录里。
  • dataLogDir:顾名思义就是是 Zookeeper 保存日志文件之目
  • clientPort:这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper
    会监听是端口,接受客户端的造访请求。
getset key newvalue:返回key的旧value,把新值newvalue存进去
localhost:0>getset hello wolrd
wogldxx
localhost:0>get hello
wolrd

用以获取到锁以后把新的锁值存进,贴出java中封装的方getset:

   /**
     * 获取并返回旧值,在设置新值
     * @param key
     * @param value
     * @return
     */
    public String getSet(final String key, final String value){
        return this.execute(new Function<String, Jedis>() {
            @Override
            public String callback(Jedis jedis) {
                return jedis.getSet(key, value);
            }
        });
    }

当这些配置起配置好后,你现在即足以启动 Zookeeper 了,启动后使反省
Zookeeper 是否已在服务,可以透过 netstat – ano 命令查看是否发若安排的
clientPort 端口号在监听服务。

兑现之思绪

1.先利用目前之日子穿与锁之过期时相加后得出锁将要过期的时光,并将这日子作value,锁作为key调用setnx方法,如果回去1,证明原来这个key,也尽管是沿不存在,获取锁成功,则调用expire方法设置锁之逾期时间,返回获取锁成功。
2.假如setnx返回0,证明原来锁有,没有赢得到锁,然后死等锁之拿走。(我本来好的思路就是考虑到第2步就是终止了,后面看了大神的贯彻思路,发现自之想法欠妥,下面说一下大神的第3步)。
3.只要setnx返回0,证明原来锁有,没有博得到锁,然后调用get方法,获取第1步存进去的value,就是沿将过期的时刻,与眼前之时刻穿比较,如果如手上底时日穿大于锁将过期的工夫,证明锁已经晚点,调用getset方法,把新的光阴存上,返回获取锁成功。这样做主要是承诺对expire执行破产,或者服务器再开的场面下出现的沿无法自由的气象。
脚将准大神思路实现的代码贴出,实现了堵截机制的沿,也落实了一个免去它锁:

import com.eduapi.common.component.RedisComponent;
import com.eduapi.common.util.BeanUtils;
import org.apache.commons.lang3.StringUtils;

/**
 * @Description: 利用redis实现分布式锁.
 * @Author: ZhaoWeiNan .
 * @CreatedTime: 2017/3/20 .
 * @Version: 1.0 .
 */
public class RedisLock {

    private RedisComponent redisComponent;

    private static final int DEFAULT_ACQUIRY_RESOLUTION_MILLIS = 100;

    private String lockKey;

    /**
     * 锁超时时间,防止线程在入锁以后,无限的执行等待
     */
    private int expireMillisCond = 60 * 1000;

    /**
     * 锁等待时间,防止线程饥饿
     */
    private int timeoutMillisCond = 10 * 1000;

    private volatile boolean isLocked = false;

    public RedisLock(RedisComponent redisComponent, String lockKey) {
        this.redisComponent = redisComponent;
        this.lockKey = lockKey;
    }

    public RedisLock(RedisComponent redisComponent, String lockKey, int timeoutMillisCond) {
        this(redisComponent, lockKey);
        this.timeoutMillisCond = timeoutMillisCond;
    }

    public RedisLock(RedisComponent redisComponent, String lockKey, int timeoutMillisCond, int expireMillisCond) {
        this(redisComponent, lockKey, timeoutMillisCond);
        this.expireMillisCond = expireMillisCond;
    }

    public RedisLock(RedisComponent redisComponent, int expireMillisCond, String lockKey) {
        this(redisComponent, lockKey);
        this.expireMillisCond = expireMillisCond;
    }

    public String getLockKey() {
        return lockKey;
    }

    /**
     * 获得 lock. (把大神的思路粘过来了)
     * 实现思路: 主要是使用了redis 的setnx命令,缓存了锁.
     * reids缓存的key是锁的key,所有的共享, value是锁的到期时间(注意:这里把过期时间放在value了,没有时间上设置其超时时间)
     * 执行过程:
     * 1.通过setnx尝试设置某个key的值,成功(当前没有这个锁)则返回,成功获得锁
     * 2.锁已经存在则获取锁的到期时间,和当前时间比较,超时的话,则设置新的值
     *
     * @return true if lock is acquired, false acquire timeouted
     * @throws InterruptedException in case of thread interruption
     */
    public synchronized boolean lock() throws InterruptedException {
        int timeout = timeoutMillisCond;

        boolean flag = false;

        while (timeout > 0){
            //设置所得到期时间
            Long expires = System.currentTimeMillis() + expireMillisCond;
            String expiresStr = BeanUtils.convertObject2String(expires);

            //原来redis里面没有锁,获取锁成功
            if (this.redisComponent.setNx(lockKey,expiresStr) > 0){
                //设置锁的过期时间
                this.redisComponent.expire(lockKey,expireMillisCond);
                isLocked = true;
                return true;
            }

            flag = compareLock(expiresStr);

            if (flag){
                return flag;
            }

            timeout -= DEFAULT_ACQUIRY_RESOLUTION_MILLIS;

            /*
                延迟100 毫秒,  这里使用随机时间可能会好一点,可以防止饥饿进程的出现,即,当同时到达多个进程,
                只会有一个进程获得锁,其他的都用同样的频率进行尝试,后面有来了一些进程,也以同样的频率申请锁,这将可能导致前面来的锁得不到满足.
                使用随机的等待时间可以一定程度上保证公平性
             */
            Thread.sleep(DEFAULT_ACQUIRY_RESOLUTION_MILLIS);
        }
        return false;
    }

    /**
     * 排他锁。作用相当于 synchronized 同步快
     * @return
     * @throws InterruptedException
     */
    public synchronized boolean excludeLock() {

        //设置所得到期时间
        long expires = System.currentTimeMillis() + expireMillisCond;
        String expiresStr = BeanUtils.convertObject2String(expires);

        //原来redis里面没有锁,获取锁成功
        if (this.redisComponent.setNx(lockKey,expiresStr) > 0){
            //设置锁的过期时间
            this.redisComponent.expire(lockKey,expireMillisCond);
            isLocked = true;
            return true;
        }

        return compareLock(expiresStr);
    }

    /**
     * 比较是否可以获取锁
     * 锁超时时 获取
     * @param expiresStr
     * @return
     */
    private boolean compareLock(String expiresStr){
        //假如两个线程走到这里
        //因为redis是单线程的获取到
        // A线程获取  currentValueStr = 1 B线程获取 currentValueStr = 1
        String currentValueStr = this.redisComponent.get(lockKey);

        //锁
        if (StringUtils.isNotEmpty(currentValueStr) && Long.parseLong(currentValueStr) < System.currentTimeMillis()){

            //获取上一个锁到期时间,并设置现在的锁到期时间,
            //只有一个线程才能获取上一个线程的设置时间,因为jedis.getSet是同步的
            //只有A线程 把 2 存进去了。 取出了 1, 对比获得了锁
            //B线程 吧 2存进去了。 获取 2.。对比 没有获得锁,
            String oldValue = this.redisComponent.getSet(lockKey,expiresStr);

            if (StringUtils.isNotEmpty(oldValue) && StringUtils.equals(oldValue,currentValueStr)){

                //防止误删(覆盖,因为key是相同的)了他人的锁——这里达不到效果,这里值会被覆盖,但是因为什么相差了很少的时间,所以可以接受
                //[分布式的情况下]:如过这个时候,多个线程恰好都到了这里,但是只有一个线程的设置值和当前值相同,他才有权利获取锁
                isLocked = true;
                return true;
            }
        }
        return false;
    }

    /**
     * 释放锁
     */
    public synchronized void unlock(){
        if (isLocked){
            this.redisComponent.delete(lockKey);
            isLocked = false;
        }
    }

}

代码中,把大神写的思路站进去了,大神讲的还是很明白的,看看调用的代码:

        //创建锁对象, redisComponent 为redis组件的对象   过期时间  锁的key
        RedisLock redisLock = new RedisLock(redisComponent,1000 * 60,RedisCacheKey.REDIS_LOCK_KEY + now_mm);
        //获取锁
        if (redisLock.excludeLock()){
            try {
                //拿到了锁,读取定时短信有序集合
                set = this.redisComponent.zRangeByScore(RedisCacheKey.MSG_TIME_LIST,0,end);

                if (set != null && set.size() > 0){
                    flag = true;
                }
            } catch (Exception e) {
                LOGGER.error("获取定时短信有序集合异常,异常为{}",e.toString());
            }
...

以redis实现分布式锁之平等种思路就是为大家介绍及此处,欢迎大家来交流,指出文中部分说错的地方,让自己加深认识。
多谢大家!

澳门美高梅手机网站 1

集群模式

Zookeeper 不仅可以单机提供劳务,同时也支撑多机组成集群来提供劳动。实际上
Zookeeper 还支持另外一种伪集群的办法,也便是得在同等贵物理机上运行多只
Zookeeper 实例,下面用介绍集群模式的装以及安排。

Zookeeper
的集群模式之设置与布置为无是十分复杂,所而做的虽是搭几单布局起。集群模式除了上面的老三只布局起还要多下面几乎独布局起:

initLimit=5 
syncLimit=2 
server.1=192.168.211.1:2888:3888 
server.2=192.168.211.2:2888:3888

  • initLimit:这个布局起是为此来安排 Zookeeper
    接受客户端(这里所说的客户端不是用户连接 Zookeeper
    服务器的客户端,而是 Zookeeper 服务器集众多被连至 Leader 的 Follower
    服务器)初始化连接时最为丰富会经受多少只心跳时间间隔数。当曾超过 10
    只心跳的年月(也就算是 tickTime)长度后 Zookeeper
    服务器还从未接客户端的返信息,那么表明是客户端连接失败。总的时长就是
    5*2000=10 秒
  • syncLimit:这个布局起标识 Leader 与 Follower
    之间发送信息,请求与回答时间长短,最丰富不可知超过多少个 tickTime
    的工夫长度,总的日子长就是 2*2000=4 秒
  • server.A=B:C:D:其中 A 是一个数字,表示是是第几如泣如诉服务器;B
    是者服务器的 ip 地址;C 代表的是其一服务器和聚集众多被的 Leader
    服务器交换信息之端口;D 表示的凡一旦聚众多被的 Leader
    服务器挂了,需要一个端口来再进行选举,选出一个初的
    Leader,而这端口就是用来实行选举时服务器相互通信的端口。如果是伪集群的部署方式,由于
    B 都是同,所以不同之 Zookeeper
    实例通信端口号不能够同,所以要是叫她分配不同之捧口号。
  • 除此之外修改 zoo.cfg 配置文件,集群模式下还要配置一个文件
    myid,这个文件于 dataDir 目录下,这个文件之中就来一个数码就是是 A
    的价值,Zookeeper 启动时见面读取这个文件,拿到里头的数码以及 zoo.cfg
    里面的安排信息比从而判断究竟是很 server。

数据模型

Zookeeper
会维护一个存有层次关系之数据结构,它非常相近于一个标准的文件系统,如图 1
所示:

  • 澳门美高梅手机网站 2

Zookeeper 这种数据结构有如下这些特征:

  1. 每个子目录项如 NameService 都深受如当 znode,这个 znode
    是叫其所于的路径唯一标识,如 Server1 这个 znode 的标识为
    /NameService/Server1
  2. znode 可以有子节点目录,并且每个 znode 可以储存数据,注意 EPHEMERAL
    类型的目节点不克发出子节点目录
  3. znode 是发生本的,每个 znode
    中蕴藏的数量可以产生多单版,也就算是一个拜访路径中好储存多份数据
  4. znode 可以是临时节点,一旦创立是 znode
    的客户端与服务器失去联系,这个 znode 也以自动删除,Zookeeper
    的客户端以及服务器通信采用长连方式,每个客户端与服务器通过中心跳来保持连续,这个连续状态称为
    session,如果 znode 是现节点,这个 session 失效,znode 也便去了
  5. znode 的目录名好活动编号,如 App1
    已经有,再创的话,将会晤自动命名吧 App2
  6. znode
    可以叫监控,包括这目录节点受到贮存的数的改动,子节点目录的转变等,一旦变化可通知设置监控的客户端,这个是
    Zookeeper 的中坚特性,Zookeeper
    的多多效果都是基于这个特点实现的,后面在一流的动场景被见面来实例介绍

哪利用

Zookeeper
作为一个分布式的劳务框架,主要用来解决分布式集众多被使用系统的一致性问题,它能提供依据类似于文件系统的目节点树方式的数量存储,但是
Zookeeper
并无是故来特别储存数据的,它的来意主要是因此来保护与督查你存储的多寡的状态变化。通过监督这些多少状态的变化,从而得以达到基于数的集群管理.

通过C#代码用zookeeper

Zookeeper的下要是由此创设其Nuget
ZooKeeperNet包下的Zookeeper实例,并且调用其接口方法进行的,主要的操作就是针对性znode的增删改操作,监听znode的生成与处理。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using ZooKeeperNet;

namespace ZookeeperDemo
{
    class Watcher : IWatcher
    {
        public void Process(WatchedEvent @event)
        {
            if (@event.Type == EventType.NodeDataChanged)
            {
                Console.WriteLine(@event.Path);
            }
        }
    }
}

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using ZooKeeperNet;

namespace ZookeeperDemo
{
    class Program
    {
        static void Main(string[] args)
        {

            //创建一个Zookeeper实例,第一个参数为目标服务器地址和端口,第二个参数为Session超时时间,第三个为节点变化时的回调方法 
            using (ZooKeeper zk = new ZooKeeper("127.0.0.1:2181", new TimeSpan(0, 0, 0, 50000), new Watcher()))
            {
                var stat = zk.Exists("/root",true);

                ////创建一个节点root,数据是mydata,不进行ACL权限控制,节点为永久性的(即客户端shutdown了也不会消失) 
                //zk.Create("/root", "mydata".GetBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.Persistent);

                //在root下面创建一个childone znode,数据为childone,不进行ACL权限控制,节点为永久性的 
                zk.Create("/root/childone", "childone".GetBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.Persistent);
                //取得/root节点下的子节点名称,返回List<String> 
                zk.GetChildren("/root", true);
                //取得/root/childone节点下的数据,返回byte[] 
                zk.GetData("/root/childone", true, null);

                //修改节点/root/childone下的数据,第三个参数为版本,如果是-1,那会无视被修改的数据版本,直接改掉
                zk.SetData("/root/childone", "childonemodify".GetBytes(), -1);
                //删除/root/childone这个节点,第二个参数为版本,-1的话直接删除,无视版本 
                zk.Delete("/root/childone", -1);
            }

        }
    }
}

 

浅析

创连接:

1.得服务主机列表

2.安过时间

3.登记客户端事件

4.坐线程安全之不二法门创造请求连接(启动客户端请求队列,循环队列基于socket通信、根据请求类型执行不一之请动作)

要流程:

布局请求头、构造request,reponse、构造响应头、构造Packet对象,packet对象准备好后,把所有对象放入一个outgoingQueue
packet被放大入outgoingQueue中,等待SendThread把packet对应的情发送给server。server处理分3步于doio方法中ReadLength
ReadConnectResult
ReadResponse,直到ReadResponse方法中确定packet请求了。

响应流程:

针对心跳的ping请求的resp,针对auth请求的resp,一般接口请求的resp,如果接口请求要求了watcher,当watcher关注的情有变动时之notification

吊相关部分API方法:

开创节点:create

demo:zk.Create(Dir, severname.GetBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.Persistent);

其中CreateMode分为4类Persistent、PersistentSequential、Ephemeral、EphemeralSequential

PERSISTENT 创建持久化节点,对诺机器关闭连接后节点/数据不见面收敛

PERSISTENT_SEQUENTIAL
如果PATH是以’/’结尾则盖这个PATH作为父节点,创建一个子节点,其子节点名字是一个准先后顺序排列的数值;否则创建一个名字是‘/’后面字符加上先后顺序排列的数值字符串的节点,同样创持久节点

EPHEMERAL
创建瞬时节点,Zookeeper在感知连接机器宕机后会消除其创建的瞬间节点

EPHEMERAL_SEQUENTIAL
穿件瞬时相继节点,和PERSISTENT_SEQUENTIAL一样,区别在于其是瞬间之

除去节点 delete

demo :zk.Delete(Dir, -1);

前一个参数代表节点名称(一般作为路径),后一个是本子号 -1象征都匹配

翻节点 exists

demo : zk.Exists(Dir, new MyWatch2());

获取数据 getData

demo :zk.GetData(Dir, new MyWatch2(), stat);

获取一个节点的数量,可注入watcher 

设置数据 setData

demo : zk.SetData(Dir, new byte[1], 1);

得下级节点集合 GetChildren

demo :zk.GetChildren(Dir, true);

存储

znodes类似文件和目录。但她不是一个典型的文件系统,zookeeper数据保存在内存中,这意味zookeeper可以兑现大吞吐量和小顺延。

watcher

Zookeeper有少数种植watches,一种植是data watches,另一样种是child
watches。其中,getData()和exists()以及create()等会见补加data
watches,getChildren()会补加child
watches。而delete()涉及到去数据和子节点,会以触发data watches和child
watches。

 

示范代码下载:ZookeeperDemo.zip

连带文章:

ZooKeeper配置
(二)

zookeeper
安装配备(三)

Zookeeper .Net
Client

https://github.com/devhong/Zookeeper.Net

据悉ZooKeeper构建大规模部署体系II
http://xahxy.blog.hexun.com/83250722_d.html

分布式服务框架 Zookeeper —
管理分布式环境受到之数据
http://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper/

基于ZooKeeper的分布式Session实现
http://blog.csdn.net/jacktan/article/details/6112806

ZooKeeper介绍、分析、理解  http://mazd1002.blog.163.com/blog/static/6657496520111120101753463/

ZooKeeper实现分布式队列Queue http://blog.fens.me/zookeeper-queue/ 

李欣:ZooKeeper在携程的施用和前景
http://v.csdn.hudong.com/open/view/detail/83-SDCC2012-ctrip-ZooKeeper

Storm-源码分析-
Storm中Zookeeper的运用

Zookeeper
的念和使用

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注