zookeeper安装澳门美高梅手机网站

单机情势

点击这里下载zookeeper的安装包之后,
解压到合适目录. 进入zookeeper目录下的conf子目录, 创立zoo.cfg:

Bash代码  澳门美高梅手机网站 1

  1. tickTime=2000    
  2. dataDir=/Users/apple/zookeeper/data    
  3. dataLogDir=/Users/apple/zookeeper/logs    
  4. clientPort=4180   

参数表明:

  • tickTime: zookeeper中应用的骨干时间单位, 飞秒值.
  • dataDir: 数据目录. 能够是随机目录.
  • dataLogDir: log目录, 同样能够是随便目录. 假如没有安装该参数,
    将使用和dataDir相同的设置.
  • clientPort: 监听client连接的端口号.

到现在, zookeeper的单机情势已经安插好了. 运行server只需运营脚本:

Bash代码  澳门美高梅手机网站 2

  1. bin/zkServer.sh start  

 Server运维之后, 就能够运营client连接server了, 执行脚本:

Bash代码  澳门美高梅手机网站 3

  1. bin/zkCli.sh -server localhost:4180  

 

从本文初始,不定期分析2个开源项目源代码,起篇从著名的zookeeper初步。
缘何是zk,因为用到zk的场景实在太多了,抢先四分之二熟稔的分布式系统都有zookeeper的影子,比如hbase,storm,dubbo,kafka等等,其它前面提到的QashqaiPC框架原理与落到实处也使用了zookeeper。

伪集群情势

所谓伪集群, 是指在单台机器中运维两个zookeeper进度, 并组成2个集群.
以运维二个zookeeper进度为例.

将zookeeper的目录拷贝2份:

Bash代码  澳门美高梅手机网站 4

  1. |–zookeeper0  
  2. |–zookeeper1  
  3. |–zookeeper2  

 更改zookeeper0/conf/zoo.cfg文件为:

Bash代码  澳门美高梅手机网站 5

  1. tickTime=2000    
  2. initLimit=5    
  3. syncLimit=2    
  4. dataDir=/Users/apple/zookeeper0/data    
  5. dataLogDir=/Users/apple/zookeeper0/logs    
  6. clientPort=4180  
  7. server.0=127.0.0.1:8880:7770    
  8. server.1=127.0.0.1:8881:7771    
  9. server.2=127.0.0.1:8882:7772  

增加产量了多少个参数, 其含义如下:

  • initLimit: zookeeper集群中的包括多台server, 当中一台为leader,
    集群中别的的server为follower. initLimit参数配置起先化连接时,
    follower和leader之间的最长心跳时间. 此时该参数设置为5,
    表达时间范围为5倍tickTime, 即5*2000=10000ms=10s.
  • syncLimit: 该参数配置leader和follower之间发送消息,
    请求和应对的最大日子长度. 此时该参数设置为2,
    表明时间限定为2倍tickTime, 即6000ms.
  • server.X=A:B:C 当中X是1个数字, 表示那是第几号server.
    A是该server所在的IP地址.
    B配置该server和集群中的leader调换音讯所采用的端口.
    C配置选举leader时所使用的端口. 由于配备的是伪集群格局,
    所以种种server的B, C参数必须差异.

参照zookeeper0/conf/zoo.cfg, 配置zookeeper1/conf/zoo.cfg,
和zookeeper2/conf/zoo.cfg文件. 只需更改dataDir, dataLogDir,
clientPort参数即可.

在事先安装的dataDir中新建myid文件, 写入一个数字,
该数字代表那是第几号server.
该数字必须和zoo.cfg文件中的server.X中的X一一对应.
/Users/apple/zookeeper0/data/myid文件中写入0,
/Users/apple/zookeeper1/data/myid文件中写入1,
/Users/apple/zookeeper2/data/myid文件中写入2.

分级进入/Users/apple/zookeeper0/bin, /Users/apple/zookeeper1/bin,
/Users/apple/zookeeper2/bin八个目录, 运营server.
专擅行选购取1个server目录, 运行客户端:

Bash代码  澳门美高梅手机网站 6

  1. bin/zkCli.sh -server localhost:4180  

[toc]

 

1 环境准备

第1,下载zk的新本子,最新的稳定版是3.4.10,由于已下载3.4.9.先一贯利用。

集群形式

集群情势的安顿和伪集群基本一致.
是因为集群形式下, 各server布置在差异的机器上,
由此各server的conf/zoo.cfg文件能够完全一样.
上边是三个演示:

Bash代码  澳门美高梅手机网站 7

  1. tickTime=2000    
  2. initLimit=5    
  3. syncLimit=2    
  4. dataDir=/home/zookeeper/data    
  5. dataLogDir=/home/zookeeper/logs    
  6. clientPort=4180  
  7. server.43=10.1.39.43:2888:3888  
  8. server.47=10.1.39.47:2888:3888    
  9. server.48=10.1.39.48:2888:3888  

演示中安顿了3台zookeeper server, 分别布署在10.1.39.43, 10.1.39.47,
10.1.39.48上. 必要留意的是,
各server的dataDir目录下的myid文件中的数字必须不相同.

10.1.39.43 server的myid为43, 10.1.39.47 server的myid为47, 10.1.39.48
server的myid为48.

1.1 导入代码

IDEA直接打开zk目录:
澳门美高梅手机网站 8

类型安装为jdk1.7
然后,将src/java上边包车型大巴main和generated设置为源码目录,同时将lib目录添加为liabary。

1.2 设置配置文件

在conf目录,新建zoo.cfg,拷贝sample.cfg即可

澳门美高梅手机网站 9

1.3 调节和测试配置

查看bin/zkServer

set ZOOMAIN=org.apache.zookeeper.server.quorum.QuorumPeerMain
....
endlocal

调用的是org.apache.zookeeper.server.quorum.QuorumPeerMain,因而QuorumPeerMain,配置调节和测试程序,arguments设置conf/zoo.cfg

澳门美高梅手机网站 10

那般,就足以心潮澎湃的Debug代码了-:)

2 运维分析

2.1 QuorumPeerMain

QuorumPeerMain的main里,调用initializeAndRun

    protected void initializeAndRun(String[] args)
        throws ConfigException, IOException
    {
        QuorumPeerConfig config = new QuorumPeerConfig();
        if (args.length == 1) {
            config.parse(args[0]);
        }

        // Start and schedule the the purge task 清理任务
        DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
                .getDataDir(), config.getDataLogDir(), config
                .getSnapRetainCount(), config.getPurgeInterval());
        purgeMgr.start();

        // 集群模式
        if (args.length == 1 && config.servers.size() > 0) {
            runFromConfig(config);
        } else {
            LOG.warn("Either no config or no quorum defined in config, running "
                    + " in standalone mode");
            // there is only server in the quorum -- run as standalone
            // 单机模式
            ZooKeeperServerMain.main(args);
        }
    }

首要实施了:

  • 加载解析配置文件到QuorumPeerConfig
  • 实施清理职分
  • 看清是集群方式依旧单机形式,大家的布局文件未配备server,所以是单机情势,执行
    ZooKeeperServerMain.main

正文重点解析单机方式下的zk,集群形式临时不解读

2.2 ZooKeeperServerMain

ZooKeeperServerMain.main调用initializeAndRun

“` java
protected void initializeAndRun(String[] args)
throws ConfigException, IOException
{
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn(“Unable to register log4j JMX control”, e);
}

    ServerConfig config = new ServerConfig();
    if (args.length == 1) {
        config.parse(args[0]);
    } else {
        config.parse(args);
    }

    runFromConfig(config);
}```

读取配置,然后runFromConfig:

 public void runFromConfig(ServerConfig config) throws IOException {
        LOG.info("Starting server");
        FileTxnSnapLog txnLog = null;
        try {
            // Note that this thread isn't going to be doing anything else,
            // so rather than spawning another thread, we will just call
            // run() in this thread.
            // create a file logger url from the command line args
            final ZooKeeperServer zkServer = new ZooKeeperServer();
            // Registers shutdown handler which will be used to know the
            // server error or shutdown state changes.
            final CountDownLatch shutdownLatch = new CountDownLatch(1);
            zkServer.registerServerShutdownHandler(
                    new ZooKeeperServerShutdownHandler(shutdownLatch));

            // 快照
            txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(
                    config.dataDir));
            zkServer.setTxnLogFactory(txnLog);
            zkServer.setTickTime(config.tickTime);
            zkServer.setMinSessionTimeout(config.minSessionTimeout);
            zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
            // socket工厂
            cnxnFactory = ServerCnxnFactory.createFactory();
            cnxnFactory.configure(config.getClientPortAddress(),
                    config.getMaxClientCnxns());
            cnxnFactory.startup(zkServer);

            // Watch status of ZooKeeper server. It will do a graceful shutdown
            // if the server is not running or hits an internal error.
            shutdownLatch.await();
            shutdown();

            cnxnFactory.join();
            if (zkServer.canShutdown()) {
                zkServer.shutdown();
            }
        } catch (InterruptedException e) {
            // warn, but generally this is ok
            LOG.warn("Server interrupted", e);
        } finally {
            if (txnLog != null) {
                txnLog.close();
            }
        }
    }

几件工作:

  • 开创zkServer,对ZooKeeperServer设置某个配置参数,如tickTime、minSessionTimeout、maxSession提姆eout
  • 制造CountDownLatch,注释里写了,用来watch
    zk的意况,当zk关闭恐怕出现在这之中错误的时候优雅的倒闭服务
  • 基于计划参数dataLogDir和dataDir创制FileTxnSnapLog,用来储存zk数据和日志快速照相
  • 创制cnxnFactory,zk的
    socket工厂,负责处理网络请求,zk里有netty和NIO三种实现
  • cnxnFactory.startup(zkServer),启动zk服务器

2.3 ServerCnxnFactory

cnxnFactory负责zk的互连网请求,createFactory中,从系统安排中读取ZOOKEEPE卡宴_SERVER_CNXN_FACTOHighlanderY,默许是从未有过这一个布局的,由此暗许是使用NIOServerCnxnFactory,基于java的NIO完成,

    static public ServerCnxnFactory createFactory() throws IOException {
        String serverCnxnFactoryName =
            System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
        if (serverCnxnFactoryName == null) {
            serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
        }
        try {
            return (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
                                                .newInstance();
        } catch (Exception e) {
            IOException ioe = new IOException("Couldn't instantiate "
                    + serverCnxnFactoryName);
            ioe.initCause(e);
            throw ioe;
        }
    }

当然,大家能够很简单察觉:
澳门美高梅手机网站 11

ServerCnxnFactory还有个NettyServerCnxnFactory完毕,基于Netty完成NIO。ServerCnxnFactory里实际负责什么,前面再来看。

2.4 ZooKeeperServer

最近,主演登场,大家来看ZooKeeperServer内部有怎么着玄妙。
澳门美高梅手机网站 12

ZooKeeperServer是单机方式应用的类,在集群方式下利用的是它的子类。
咱俩先来看ZooKeeperServer包涵哪些内容:

    public static final int DEFAULT_TICK_TIME = 3000;
    protected int tickTime = DEFAULT_TICK_TIME;
    /** value of -1 indicates unset, use default */
    protected int minSessionTimeout = -1;
    /** value of -1 indicates unset, use default */
    protected int maxSessionTimeout = -1;
    protected SessionTracker sessionTracker; //创建和管理session
    private FileTxnSnapLog txnLogFactory = null; //文件快照
    private ZKDatabase zkDb; // ZooKeeper树形数据的模型
    private final AtomicLong hzxid = new AtomicLong(0); //原子增长Long,用于分配事务编号
    public final static Exception ok = new Exception("No prob");
    protected RequestProcessor firstProcessor; // ZooKeeperServer请求处理器链中的第一个处理器
    protected volatile State state = State.INITIAL;

    protected enum State {
        INITIAL, RUNNING, SHUTDOWN, ERROR;
    }

    /**
     * This is the secret that we use to generate passwords, for the moment it
     * is more of a sanity check.
     */
    static final private long superSecret = 0XB3415C00L;

    private final AtomicInteger requestsInProcess = new AtomicInteger(0);
    final List<ChangeRecord> outstandingChanges = new ArrayList<ChangeRecord>();
    // this data structure must be accessed under the outstandingChanges lock
    final HashMap<String, ChangeRecord> outstandingChangesForPath =
        new HashMap<String, ChangeRecord>();

    private ServerCnxnFactory serverCnxnFactory; //ServerSocket工厂,接受客户端的socket连接

    private final ServerStats serverStats; //server的运行状态统计
    private final ZooKeeperServerListener listener; // ZK运行状态监听
    private ZooKeeperServerShutdownHandler zkShutdownHandler;

2.5 服务运维

前边有点跑偏,继续回归运维进度:

            cnxnFactory = ServerCnxnFactory.createFactory();
            cnxnFactory.configure(config.getClientPortAddress(),
                    config.getMaxClientCnxns());
            cnxnFactory.startup(zkServer);

2.5.1 配置cnxnFactory

进入configure:

    @Override
    public void configure(InetSocketAddress addr, int maxcc) throws IOException {
        configureSaslLogin();

        // ZK网络请求主线程
        thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);
        thread.setDaemon(true);

        maxClientCnxns = maxcc;
        this.ss = ServerSocketChannel.open();
        ss.socket().setReuseAddress(true);
        LOG.info("binding to port " + addr);
        ss.socket().bind(addr);
        ss.configureBlocking(false);
        ss.register(selector, SelectionKey.OP_ACCEPT);
    }

几件业务:

  • configureSaslLogin,具体不细看,应该是拍卖鉴权
  • 初阶化ZooKeeperThread,那些ZooKeeperThread的作用是负担处理未处理分外:

    public class ZooKeeperThread extends Thread {

    private static final Logger LOG = LoggerFactory
            .getLogger(ZooKeeperThread.class);
    
    private UncaughtExceptionHandler uncaughtExceptionalHandler = new UncaughtExceptionHandler() {
    
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            handleException(t.getName(), e);
        }
    };
    
    public ZooKeeperThread(Runnable thread, String threadName) {
        super(thread, threadName);
        setUncaughtExceptionHandler(uncaughtExceptionalHandler);
    }
    
    protected void handleException(String thName, Throwable e) {
        LOG.warn("Exception occured from thread {}", thName, e);
    }
    

    }

  • 开发银行ServerSocketChannel,并绑定配置的addr,并且注册selector(能够搜索NIO驾驭细节)

2.5.2 启动cnxnFactory

持续分析,进入cnxnFactory.startup(zkServer)

    @Override
    public void startup(ZooKeeperServer zks) throws IOException,
            InterruptedException {
        start();
        setZooKeeperServer(zks);
        zks.startdata();
        zks.startup();
    }

率先,start,判断线程状态,要是未运维则运转线程,注意只会运转1次。

    @Override
    public void start() {
        // ensure thread is started once and only once
        if (thread.getState() == Thread.State.NEW) {
            thread.start();
        }
    }
socket处理线程

开发银行后,就会举行cnxnFactory.run

    public void run() {
        while (!ss.socket().isClosed()) {
            try {
                selector.select(1000);
                Set<SelectionKey> selected;
                synchronized (this) {
                    selected = selector.selectedKeys();
                }
                ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
                        selected);
                Collections.shuffle(selectedList);
                for (SelectionKey k : selectedList) {
                    if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                        SocketChannel sc = ((ServerSocketChannel) k
                                .channel()).accept();
                        InetAddress ia = sc.socket().getInetAddress();
                        int cnxncount = getClientCnxnCount(ia);
                        if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
                            LOG.warn("Too many connections from " + ia
                                     + " - max is " + maxClientCnxns );
                            sc.close();
                        } else {
                            LOG.info("Accepted socket connection from "
                                     + sc.socket().getRemoteSocketAddress());
                            sc.configureBlocking(false);
                            SelectionKey sk = sc.register(selector,
                                    SelectionKey.OP_READ);
                            NIOServerCnxn cnxn = createConnection(sc, sk);
                            sk.attach(cnxn);
                            addCnxn(cnxn);
                        }
                    } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                        NIOServerCnxn c = (NIOServerCnxn) k.attachment();
                        c.doIO(k);
                    } else {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Unexpected ops in select "
                                      + k.readyOps());
                        }
                    }
                }
                selected.clear();
            } catch (RuntimeException e) {
                LOG.warn("Ignoring unexpected runtime exception", e);
            } catch (Exception e) {
                LOG.warn("Ignoring exception", e);
            }
        }
        closeAll();
        LOG.info("NIOServerCnxn factory exited run method");
    }

此间一定于一个独门线程来拍卖互连网连接,通过selector.select(一千)来取得互连网请求,一旦有连日就绪,则先河拍卖:

  • 率先打乱 Collections.shuffle(selectedList);
  • for循环处理
    • 如果SelectionKey.OP_ACCEPT,代表3个新连接请求,创设SocketChannel,成立NIOServerCnxn,然后addCnxn
    • 若果可读写,则 NIOServerCnxn.doIO(k),执行IO操作
socket互联网请求处理

此地大约解析下doIO,摘录部分代码:

void doIO(SelectionKey k) throws InterruptedException {
        try {
            if (isSocketOpen() == false) {
                LOG.warn("trying to do i/o on a null socket for session:0x"
                         + Long.toHexString(sessionId));

                return;
            }
            if (k.isReadable()) {
                // 读取4个字节
                int rc = sock.read(incomingBuffer);
                if (rc < 0) {
                    throw new EndOfStreamException(
                            "Unable to read additional data from client sessionid 0x"
                            + Long.toHexString(sessionId)
                            + ", likely client has closed socket");
                }
                // 读满了
                if (incomingBuffer.remaining() == 0) {
                    boolean isPayload;
                    if (incomingBuffer == lenBuffer) { // start of next request
                        incomingBuffer.flip(); // 复位
                        isPayload = readLength(k); // 读取载荷长度
                        incomingBuffer.clear();
                    } else {
                        // continuation
                        isPayload = true;
                    }
                    if (isPayload) { // not the case for 4letterword
                        readPayload();
                    }
                    else {
                        // four letter words take care
                        // need not do anything else
                        return;
                    }
                }
            }

读取6个字节,获取到数码长度,然后读取载荷,也正是伸手

    private void readPayload() throws IOException, InterruptedException {
        if (incomingBuffer.remaining() != 0) { // have we read length bytes?
            int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
            if (rc < 0) {
                throw new EndOfStreamException(
                        "Unable to read additional data from client sessionid 0x"
                        + Long.toHexString(sessionId)
                        + ", likely client has closed socket");
            }
        }

        if (incomingBuffer.remaining() == 0) { // have we read length bytes?
            packetReceived();
            incomingBuffer.flip(); // 复位
            if (!initialized) {
                readConnectRequest(); // 读取连接请求
            } else {
                readRequest();
            }
            lenBuffer.clear();
            incomingBuffer = lenBuffer;
        }
    }

第2读取数据,然后再读取请求,那里关注readConnectRequest

读取连接请求
    private void readConnectRequest() throws IOException, InterruptedException {
        if (zkServer == null) {
            throw new IOException("ZooKeeperServer not running");
        }
        zkServer.processConnectRequest(this, incomingBuffer);
        initialized = true;
    }

继承,下边是处理连接请求:

     public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
        BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
        ConnectRequest connReq = new ConnectRequest();
        connReq.deserialize(bia, "connect"); // 反序列化请求
        ....
        // 客户端设置的超时时间
        int sessionTimeout = connReq.getTimeOut();
        byte passwd[] = connReq.getPasswd();
        int minSessionTimeout = getMinSessionTimeout();
        if (sessionTimeout < minSessionTimeout) {
            sessionTimeout = minSessionTimeout;
        }
        // 服务端设置的最大超时时间
        int maxSessionTimeout = getMaxSessionTimeout();
        if (sessionTimeout > maxSessionTimeout) {
            sessionTimeout = maxSessionTimeout;
        }
        cnxn.setSessionTimeout(sessionTimeout);
        // We don't want to receive any packets until we are sure that the
        // session is setup
        cnxn.disableRecv();
        // 请求是否带上sessionid
        long sessionId = connReq.getSessionId();
        if (sessionId != 0) {
            // 请求带了sessionid
            long clientSessionId = connReq.getSessionId();
            LOG.info("Client attempting to renew session 0x"
                    + Long.toHexString(clientSessionId)
                    + " at " + cnxn.getRemoteSocketAddress());
            // 关闭请求
            serverCnxnFactory.closeSession(sessionId);
            cnxn.setSessionId(sessionId);
            // 重新打开请求
            reopenSession(cnxn, sessionId, passwd, sessionTimeout);
        } else {
            LOG.info("Client attempting to establish new session at "
                    + cnxn.getRemoteSocketAddress());
            // 创建新sesssion
            createSession(cnxn, passwd, sessionTimeout);
        }
    }

以上完毕:

  • 将读取出来的incomingBuffer反类别化为ConnectRequest对象
  • 下一场设置超时时间,ServerCnxn接收到该申请后,依照客户端传递过来的sessionTimeout时间以及ZooKeeperServer本身的minSessionTimeout、maxSessionTimeout参数,鲜明最后的sessionTimeout时间
  • 认清客户端的央求是不是早已蕴涵sessionId
    • 假使带有,则执行sessionId的是还是不是过期、密码是不是正确等检查
    • 假诺没有sessionId,则创建三个session
创建session

从而,大家要求再看一下怎么成立session:

    long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
        long sessionId = sessionTracker.createSession(timeout);
        Random r = new Random(sessionId ^ superSecret);
        r.nextBytes(passwd);
        ByteBuffer to = ByteBuffer.allocate(4);
        to.putInt(timeout);
        cnxn.setSessionId(sessionId);
        submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null);
        return sessionId;
    }
  • 应用sessionTracker生成一个sessionId
  • submitRequest营造3个Request请求,请求的门类为OpCode.createSession

    private void submitRequest(ServerCnxn cnxn, long sessionId, int type,
            int xid, ByteBuffer bb, List<Id> authInfo) {
        Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);
        submitRequest(si);
    }
    
    public void submitRequest(Request si) {
        if (firstProcessor == null) {
            synchronized (this) {
                try {
                    // Since all requests are passed to the request
                    // processor it should wait for setting up the request
                    // processor chain. The state will be updated to RUNNING
                    // after the setup.
                    while (state == State.INITIAL) {
                        wait(1000);
                    }
                } catch (InterruptedException e) {
                    LOG.warn("Unexpected interruption", e);
                }
                if (firstProcessor == null || state != State.RUNNING) {
                    throw new RuntimeException("Not started");
                }
            }
        }
        try {
            touch(si.cnxn);
            boolean validpacket = Request.isValid(si.type);
            if (validpacket) {
                firstProcessor.processRequest(si);
                if (si.cnxn != null) {
                    incInProcess();
                }
            } else {
                LOG.warn("Received packet at server of unknown type " + si.type);
                new UnimplementedRequestProcessor().processRequest(si);
            }
        } catch (MissingSessionException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Dropping request: " + e.getMessage());
            }
        } catch (RequestProcessorException e) {
            LOG.error("Unable to process request:" + e.getMessage(), e);
        }
    }
    

下边包车型大巴代码:

  • 创办1个Request
  • 等候firstProcessor创造达成,然后调用firstProcessor.processRequest

firstProcessor是何等东东,上面再公布

2.5.3 zk服务器运转

双重归来startup, setZooKeeperServer(zks),代码相当的粗略

 final public void setZooKeeperServer(ZooKeeperServer zk) {
        this.zkServer = zk;
        if (zk != null) {
            zk.setServerCnxnFactory(this);
        }
    }

接下来是zk服务器的startdata:

    public void startdata() 
    throws IOException, InterruptedException {
        //check to see if zkDb is not null
        if (zkDb == null) {
            zkDb = new ZKDatabase(this.txnLogFactory);
        }  
        if (!zkDb.isInitialized()) {
            loadData();
        }
    }

初叶化ZKDatabase,从txnLogFactory里读取快照数据。

末段是zk服务器的startup:

    public synchronized void startup() {
        if (sessionTracker == null) {
            createSessionTracker();
        }
        startSessionTracker();
        setupRequestProcessors();

        registerJMX();

        setState(State.RUNNING);
        notifyAll();
    }

几件事情:

  • createSessionTracker创建sessionTracker
  • startSessionTracker启动SessionTracker
  • setupRequestProcessors 创造请求处理器链
  • registerJMX 注册JMX
  • setState(State.RUNNING) 设置情形为运营中
SessionTracker

看SessionTracker的注释:

This is the basic interface that ZooKeeperServer uses to track
sessions.
承担追踪Session的

在zk里的贯彻是SessionTrackerImpl:

    protected void createSessionTracker() {
        sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
                tickTime, 1, getZooKeeperServerListener());
    }

    protected void startSessionTracker() {
        ((SessionTrackerImpl)sessionTracker).start();
    }

SessionTrackerImpl后边再详细分析。

2.5.4 ZooKeeperServer请求处理器链介绍

那里是zk的骨干部分之一,zk接收到的乞请最后在那边开始展览处理。

 protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor syncProcessor = new SyncRequestProcessor(this,
                finalProcessor);
        ((SyncRequestProcessor)syncProcessor).start();
        firstProcessor = new PrepRequestProcessor(this, syncProcessor);
        ((PrepRequestProcessor)firstProcessor).start();
    }

恳请处理链介绍

  • 首先是PrepRequestProcessor
  • 然后是SyncRequestProcessor
  • 最后是finalProcessor

下边依次解读:

RequestProcessor

RequestProcessors are chained together to process transactions.
RequestProcessors都以链在一道的事务处理链

public interface RequestProcessor {
    @SuppressWarnings("serial")
    public static class RequestProcessorException extends Exception {
        public RequestProcessorException(String msg, Throwable t) {
            super(msg, t);
        }
    }

    void processRequest(Request request) throws RequestProcessorException;

    void shutdown();
}

含有上边那些达成:
澳门美高梅手机网站 13
大家首要来看上面多少个:

PrepRequestProcessor

干什么成为请求处理链,看下PrepRequestProcessor代码就驾驭了:

    RequestProcessor nextProcessor;

    ZooKeeperServer zks;

    public PrepRequestProcessor(ZooKeeperServer zks,
            RequestProcessor nextProcessor) {
        super("ProcessThread(sid:" + zks.getServerId() + " cport:"
                + zks.getClientPort() + "):", zks.getZooKeeperServerListener());
        this.nextProcessor = nextProcessor;
        this.zks = zks;
    }

    protected void pRequest(Request request) throws RequestProcessorException {
        ……
        nextProcessor.processRequest(request);
    }

构造函数里富含nextProcessor,在pRequest完结后,执行nextProcessor.processRequest,相当于链式执行。

紧接着分析,再来看类的概念:

public class PrepRequestProcessor extends ZooKeeperCriticalThread implements
            RequestProcessor {

        LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();

        RequestProcessor nextProcessor;

}

多少个宗旨

  • 继承自ZooKeeperCriticalThread,是一个Thread
  • 驷不如舌性质submittedRequests
    是一个LinkedBlockingQueue,LinkedBlockingQueue完毕是线程安全的,达成了先进先出天性,是用作劳动者消费者的首要选取。

PrepRequestProcessor作为处理链的源头,对外提供processRequest方法收集请求,由于是单线程,所以要求将请求放入submittedRequests队列。

    public void processRequest(Request request) {
        // request.addRQRec(">prep="+zks.outstandingChanges.size());
        submittedRequests.add(request);
    }

放入队列后,PrepRequestProcessor本身正是2个Thread,所以start后执行run,在run方法中又会将用户提交的呼吁取出来实行处理:

    public void run() {
            while (true) {
                // 取出一个请求
                Request request = submittedRequests.take();
                if (Request.requestOfDeath == request) {
                    break;
                }
                // 处理请求
                pRequest(request);
            }
        }

再来看pRequest:
澳门美高梅手机网站 14

基于request的type,构造对应的请求,对于增删改等影响多少状态的操作都被认为是事情(txn:transaction)
,供给创立出事情请求头(hdr),调用pRequest2Txn,别的操作则不属于工作操作,需要验证下sessionId是还是不是合法。

 //create/close session don't require request record
            case OpCode.createSession:
            case OpCode.closeSession:
                pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
                break;

            //All the rest don't need to create a Txn - just verify session
            case OpCode.sync:
            case OpCode.exists:
            case OpCode.getData:
            case OpCode.getACL:
            case OpCode.getChildren:
            case OpCode.getChildren2:
            case OpCode.ping:
            case OpCode.setWatches:
                zks.sessionTracker.checkSession(request.sessionId,
                        request.getOwner());
                break;

来看pRequest2Txn,以create为例

  pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true);

   protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
        throws KeeperException, IOException, RequestProcessorException
    {
        request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
                                    zks.getTime(), type);

        switch (type) {
            case OpCode.create:                
                zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                CreateRequest createRequest = (CreateRequest)record;   
                if(deserialize)
                    ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
                String path = createRequest.getPath();
                int lastSlash = path.lastIndexOf('/');
                if (lastSlash == -1 || path.indexOf('\0') != -1 || failCreate) {
                    LOG.info("Invalid path " + path + " with session 0x" +
                            Long.toHexString(request.sessionId));
                    throw new KeeperException.BadArgumentsException(path);
                }
                List<ACL> listACL = removeDuplicates(createRequest.getAcl());
                if (!fixupACL(request.authInfo, listACL)) {
                    throw new KeeperException.InvalidACLException(path);
                }
                String parentPath = path.substring(0, lastSlash);
                ChangeRecord parentRecord = getRecordForPath(parentPath);

                checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE,
                        request.authInfo);
                int parentCVersion = parentRecord.stat.getCversion();
                CreateMode createMode =
                    CreateMode.fromFlag(createRequest.getFlags());
                if (createMode.isSequential()) {
                    path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
                }
                validatePath(path, request.sessionId);
                try {
                    if (getRecordForPath(path) != null) {
                        throw new KeeperException.NodeExistsException(path);
                    }
                } catch (KeeperException.NoNodeException e) {
                    // ignore this one
                }
                boolean ephemeralParent = parentRecord.stat.getEphemeralOwner() != 0;
                if (ephemeralParent) {
                    throw new KeeperException.NoChildrenForEphemeralsException(path);
                }
                int newCversion = parentRecord.stat.getCversion()+1;
                request.txn = new CreateTxn(path, createRequest.getData(),
                        listACL,
                        createMode.isEphemeral(), newCversion);
                StatPersisted s = new StatPersisted();
                if (createMode.isEphemeral()) {
                    s.setEphemeralOwner(request.sessionId);
                }
                parentRecord = parentRecord.duplicate(request.hdr.getZxid());
                parentRecord.childCount++;
                parentRecord.stat.setCversion(newCversion);
                addChangeRecord(parentRecord);
                addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path, s,
                        0, listACL));
                break;
  • 第①是 zks.getNextZxid()创设3个事务id,AtomicLong
    hzxid是自增加id,开头化为0,每一次加一
  • 在pRequest2Txn内部,先给request创造多个TxnHeader,这几个header包括事务id
  • 接下来判断请求类型
  • zks.sessionTracker.checkSession(request.sessionId,
    request.getOwner()) 检查session
  • 反连串化为CreateRequest
SyncRequestProcessor
FinalRequestProcessor

未完待续


作者:Jadepeng
出处:jqpeng的技艺记事本–http://www.cnblogs.com/xiaoqi
你的协理是对博主最大的鼓励,感激您的认真读书。
本文版权归作者全部,欢迎转发,但未经小编同意必须保留此段评释,且在小说页面分明地点给出原作连接,不然保留追究法律责任的义务。

发表评论

电子邮件地址不会被公开。 必填项已用*标注