澳门美高梅手机网站读书笔记TF061:分布式TensorFlow,分布式原理、最佳实践

分布式TensorFlow由高质量gRPC库底层技术协理。马丁 Abadi、Ashish
Agarwal、Paul Barham随想《TensorFlow:Large-Scale Machine Learning on
Heterogeneous Distributed Systems》。

趁着互连网+时代的到来,京东金融业务规模不断增加,业务场景也不断创新。不过,业务转移之快当先想像,相应的
SOA
及微服务架构日趋深切,服务多少不断膨胀,线上环境日益复杂,服务看重关系每一天都在变化。

分布式原理。分布式集群
由多个服务器进度、客户端进度组成。布置情势,单机多卡、分布式(多机多卡)。多机多卡TensorFlow分布式。

如何实时看清系统的容量水位,为容量评估和系统扩容提供客观按照?

单机多卡,单台服务器多块GPU。训练进程:在单机单GPU陶冶,数据一个批次(batch)一个批次锻练。单机多GPU,三遍拍卖多个批次数据,每个GPU处理一个批次数据统计。变量参数保存在CPU,数据由CPU分发给多少个GPU,GPU统计每个批次更新梯度。CPU收集完几个GPU更新梯度,总括平均梯度,更新参数。继续总括更新梯度。处理速度取决最慢GPU速度。

当故障暴发时,怎么样准确判断影响范围?

分布式,锻练在七个工作节点(worker)。工作节点,完结统计单元。总结服务器单卡,指服务器。统计服务器多卡,四个GPU划分四个干活节点。数据量大,当先一台机械处理能力,须用分布式。

怎么着规定每三回交易进度中,每个系统处理耗时个别是不怎么?

分布式TensorFlow底层通讯,gRPC(google remote procedure
call)。gRPC,谷歌(谷歌)开源高质量、跨语言RPC框架。RPC共商,远程进度调用协议,互连网从远程计算机程度请求服务。

每个系统在处理一笔交易时,分别在数据库、NoSQL、缓存、日志、RPC、业务逻辑上耗时多少?

分布式陈设格局。分布式运行,多少个总括单元(工作节点),后端服务器陈设单工作节点、多做事节点。

如何快速确定系统的着实瓶颈点?

单工作节点计划。每台服务器运行一个工作节点,服务器三个GPU,一个做事节点可以访问多块GPU卡。代码tf.device()指定运行操作设备。优势,单机多GPU间通讯,作用高。逆风局,手动代码指定设备。

面对上述难点,本文将从智能容量评估与智能告警切入,为大家大快朵颐京东金融的运维实践。

多干活儿节点安顿。一台服务器运行四个干活节点。

智能容量评

设置CUDA_VISIBLE_DEVICES环境变量,限制各样工作节点只可见一个GPU,启动进度添加环境变量。用tf.device()指定特定GPU。多干活节点布署优势,代码简单,提升GPU使用率。逆风局,工作节点通讯,需布置七个干活节点。https://github.com/tobegit3hub/tensorflow\_examples/tree/master/distributed\_tensorflow

动用的容量评估是一个老大难难点,近年来也从没一种简易而卓有成效的形式,首若是通过压测手段直接得到应用单机最高
QPS 的连锁数据。

CUDA_VISIBLE_DEVICES=” python ./distributed_supervisor.py
–ps_hosts=127.0.0.1:2222,127.0.0.1:2223
–worker_hosts=127.0.0.1:2224,127.0.0.1:2225 –job_name=ps
–task_index=0
CUDA_VISIBLE_DEVICES=” python ./distributed_supervisor.py
–ps_hosts=127.0.0.1:2222,127.0.0.1:2223
–worker_hosts=127.0.0.1:2224,127.0.0.1:2225 –job_name=ps
–task_index=1
CUDA_VISIBLE_DEVICES=’0′ python ./distributed_supervisor.py
–ps_hosts=127.0.0.1:2222,127.0.0.1:2223
–worker_hosts=127.0.0.1:2224,127.0.0.1:2225 –job_name=worker
–task_index=0
CUDA_VISIBLE_DEVICES=’1′ python ./distributed_supervisor.py
–ps_hosts=127.0.0.1:2222,127.0.0.1:2223
–worker_hosts=127.0.0.1:2224,127.0.0.1:2225 –job_name=worker
–task_index=1

线下压测

分布式架构。https://www.tensorflow.org/extend/architecture
。客户端(client)、服务端(server),服务端包含主节点(master)、工作节点(worker)组成。

为了测试数据的对峙真实性,在容量评估的线下压测中貌似经过 tcpcopy
等工具,将线上的流量直接复制到测试服务器,在测试服务器出现瓶颈时得到利用最高的
QPS,再通过线上线下的折算周到推算出线上的选取能承载的容量。

客户端、主节点、工作节点关系。TensorFlow,客户端会话联系主节点,实际工作由工作节点落到实处,每个工作节点占一台设备(TensorFlow具体测算硬件抽象,CPU或GPU)。单机格局,客户端、主节点、工作节点在同一台服务器。分布形式,可不一致服务器。客户端->主节点->工作节点/job:worker/task:0->/job:ps/task:0。
客户端。建立TensorFlow总计图,建立与集群交互会话层。代码包括Session()。一个客户端可同时与四个服务端相连,一具服务端也可与多少个客户端相连。
服务端。运行tf.train.Server实例进程,TensroFlow执行任务集群(cluster)一局地。有主节点服务(Master
service)和劳作节点服务(Worker
service)。运行中,一个主节点进度和数个工作节点进度,主节点进度和做事接点进程经过接口通信。单机多卡和分布式结构同样,只须求转移通信接口达成切换。
主节点服务。完成tensorflow::Session接口。通过RPC服务程序连接工作节点,与做事节点服务进度工作职责通讯。TensorFlow服务端,task_index为0作业(job)。
做事节点服务。完成worker_service.proto接口,本地设备统计部分图。TensorFlow服务端,所有工作节点包罗工作节点服务逻辑。每个工作节点负责管理一个或三个设备。工作节点能够是本地不一致端口差别进度,或多台服务三个经过。运行TensorFlow分布式执行职务集,一个或多个作业(job)。每个作业,一个或多少个一样目标任务(task)。每个任务,一个干活进度执行。作业是职务集合,集群是学业集合。
分布式机器学习框架,作业分参数作业(parameter job)和劳作节点作业(worker
job)。参数作业运行服务器为参数服务器(parameter
server,PS),管理参数存储、更新。工作节点作业,管理无状态主要从事计算任务。模型越大,参数越来越多,模型参数更新超越一台机械品质,须求把参数分开到不一样机器存储更新。参数服务,多台机械组成集群,类似分布式存储架构,涉及多少同步、一致性,参数存储为键值对(key-value)。分布式键值内存数据库,加参数更新操作。李沐《Parameter
Server for Distributed Machine
Learning》http://www.cs.cmu.edu/~muli/file/ps.pdf
参数存储更新在参数作业进行,模型测算在做事节点作业进展。TensorFlow分布式落成作业间数据传输,参数作业到办事节点作业前向传来,工作节点作业到参数作业反向传播。
义务。特定TensorFlow服务器独立进度,在学业中负有对应序号。一个职责对应一个工作节点。集群->作业->任务->工作节点。

注:本图片转自tcpcopy官网

客户端、主节点、工作节点交互进度。单机多卡交互,客户端->会话运行->主节点->执行子图->工作节点->GPU0、GPU1。分布式交互,客户端->会话运行->主节点进度->执行子图1->工作节点进度1->GPU0、GPU1。《TensorFlow:Large-Scale
Machine Learning on Heterogeneous distributed
Systems》https://arxiv.org/abs/1603.04467v1

线上压测

分布式模式。

经过线下压测的措施展开容量评估的独到之处是压测进程对线上的条件大致向来不影响,可是经过比较麻烦,耗时也较长。所以以短平快为重大特征的网络公司更热衷通过线上的压测来进展容量评估。

数量交互。https://www.tensorflow.org/tutorials/deep\_cnn
。CPU负责梯度平均、参数更新,不一致GPU练习模型副本(model
replica)。基于陶冶样例子集训练,模型有独立性。
手续:差距GPU分别定义模型互联网布局。单个GPU从数据管道读取差距数据块,前向传来,总括损失,计算当前变量梯度。所有GPU输出梯度数据转移到CPU,梯度求平均操作,模型变量更新。重复,直到模型变量收敛。
数据交互,进步SGD效用。SGD
mini-batch样本,切成多份,模型复制多份,在八个模型上同时统计。三个模型测算速度不均等,CPU更新变量有共同、异步多个方案。

什么举办线上的压测?

一起更新、异步更新。分布式随机梯度下跌法,模型参数分布式存储在差距参数服务上,工作节点并行锻炼多少,和参数服务器通讯获取模型参数。
同台随机梯度下跌法(Sync-SGD,同步更新、同步练习),操练时,每个节点上干活义务读入共享参数,执行并行梯度总括,同步需要等待所有工作节点把一些梯度处好,将享有共享参数合并、累加,再几回性更新到模型参数,下一批次,所有工作节点用模子更新后参数练习。优势,每个操练批次考虑所有工作节点磨练情部,损失下跌稳定。逆风局,品质瓶颈在最慢工作节点。异楹设备,工作节点质量不一样,逆风局分明。
异步随机梯度下跌法(Async-SGD,异步更新、异步锻练),每个工作节点任务独立统计局部梯度,异步更新到模型参数,不需实践协调、等待操作。优势,质量不存在瓶颈。逆风局,每个工作节点总结梯度值发磅回参数服务器有参数更新争持,影响算法收剑速度,损失下跌进程抖动较大。
一块更新、异步更新完结不一致于立异参数服务器参数策略。数据量小,各节点计算能力较均匀,用一道模型。数据量大,各机器总括质量长短不一,用异步方式。
带备份的Sync-SGD(Sync-SDG with backup)。Jianmin Chen、Xinghao Pan、Rajat
Monga、Aamy Bengio、Rafal Jozefowicz论文《Revisiting Distributed
Synchronous SGD》https://arxiv.org/abs/1604.00981澳门美高梅手机网站,
。扩充工作节点,解决一部分工作节点总括慢难题。工作节点总数n+n*5%,n为集群工作节点数。异步更新设定接受到n个工作节点参数直接更新参数服务器模型参数,进入下一批次模型练习。总计较慢节点训练参数直接丢掉。
一起更新、异步更新有图内情势(in-graph pattern)和图间情势(between-graph
pattern),独立于图内(in-graph)、图间(between-graph)概念。
图内复制(in-grasph
replication),所有操作(operation)在同一个图中,用一个客户端来生成图,把拥有操作分配到集群拥有参数服务器和办事节点上。国内复制和单机多卡类似,增加到多机多卡,数据分发仍旧在客户端一个节点上。优势,总括节点只需求调用join()函数等待职责,客户端随时提交数据就可以陶冶。逆风局,训练多少分发在一个节点上,要分发给分化工作节点,严重影响并发磨练速度。
图间复制(between-graph
replication),每一个行事节点创设一个图,操练参数保存在参数服务器,数据不散发,各类工作节点独立计算,总计完毕把要更新参数告诉参数服务器,参数服务器更新参数。优势,不需求多少分发,各种工作节点都成立图和读取数据操练。逆风局,工作节点既是图创制者又是计量职责执行者,某个工作节点宕机影响集群工作。大数额相关深度学习推荐应用图间情势。

相似的话,不管是经过汇总的负载设备(如 F5、Radware
等)或是四七层的软负载(LVS、Nginx、HAProxy 等)亦可能开源的劳动框架(如
Spring Cloud、DUBBO 等)都辅助加权轮询算法(Weighted Round
罗布in)。一言以蔽之就是在负载轮询的时候,分裂的服务器可以指定区其余权重。

模型并行。切分模型,模型分化部分举办在差距装备上,一个批次样书可以在差别装备同时施行。TensorFlow尽量让附近统计在一如既往台装备上完结节省互联网支付。MartinAbadi、Ashish Agarwal、Paul Barham杂谈《TensorFlow:Large-Scale Machine
Learning on Heterogeneous Distributed
Systems》https://arxiv.org/abs/1603.04467v1

线上压测的规律就是逐渐加大某一台服务器的权重,使那台服务器的流量远大于别的服务器,直至该服务器出现品质瓶颈。这几个瓶颈可能是CPU、LOAD、内存、带宽等物理瓶颈,也恐怕是
RT、失利率、QPS 波动等软瓶颈。

模型并行、数据交互,TensorFlow中,总计可以分别,参数可以分离。能够在各类设备上分红总计节点,让对应参数也在该装置上,总计参数放一块。

当单机品质出现质量瓶颈时,工程师记下此时的行使 QPS
就是单机容量,然后依照集群服务器数量很不难取得集群的容量。

分布式API。https://www.tensorflow.org/deploy/distributed
创制集群,每个职责(task)启动一个劳动(工作节点服务或主节点服务)。义务能够分布差距机器,可以等效台机器开动三个义务,用不一致GPU运行。每个职分完结工作:创立一个tf.train.ClusterSpec,对集群拥有任务展开描述,描述内容对富有义务一样。创设一个tf.train.Server,创设一个劳动,运行相应作业统计义务。
TensorFlow分布式开发API。tf.train.ClusterSpec({“ps”:ps_hosts,”worker”:worke_hosts})。创建TensorFlow集群描述新闻,ps、worker为作业名称,ps_phsts、worker_hosts为作业义务所在节点地址音讯。tf.train.ClusterSpec传入参数,作业和任务间关系映射,映射关系职务通过IP地址、端口号表示。

正如 Nginx 的部署,使得劳动器 192.168.0.2 的流量是其余服务器的 5
倍,假若此时服务器 192.168.0.2 出现瓶颈,QPS 为 1000,那么集群容量为
3000。(即使负载没有瓶颈)

结构
tf.train.ClusterSpec({“local”:[“localhost:2222″,”localhost:2223”]})
可用职责 /job:local/task:0、/job:local/task:1。
结构
tf.train.ClusterSpec({“worker”:[“worker0.example.com:2222″,”worker1.example.com:2222″,”worker2.example.com:2222″],”ps”:[“ps0.example.com:2222″,”ps1.example.com:2222”]})
可用职分 /job:worker/task:0、 /job:worker/task:1、 /job:worker/task:2、
/job:ps/task:0、 /job:ps/task:1
tf.train.Server(cluster,job_name,task_index)。创设服务(主节点服务或办事节点服务),运行作业总括义务,运行义务在task_index指定机器开动。

http {    upstream cluster {    server 192.168.0.2 weight= 5;    server 192.168.0.3 weight= 1;    server 192.168.0.4 weight= 1;       }    }

#任务0
cluster =
tr.train.ClusterSpec({“local”:[“localhost:2222″,”localhost:2223”]})
server = tr.train.Server(cluster,job_name=”local”,task_index=0)
#任务1
cluster =
tr.train.ClusterSpec({“local”:[“localhost:2222″,”localhost:2223”]})
server = tr.train.Server(cluster,job_name=”local”,task_index=1)。
自动化管理节点、监控节点工具。集群管理工具Kubernetes。
tf.device(device_name_or_function)。设定指定设备举办张量运算,批定代码运行CPU、GPU。

容量总计

#指定在task0所在机器执行Tensor操作运算
with tf.device(“/job:ps/task:0”):
weights_1 = tf.Variable(…)
biases_1 = tf.Variable(…)

无论是是线上依然线下的压测,反映的都是压测时的行使容量。在互连网快捷发展的明天,程序版本迭代的进程惊人,针对每一趟版本的迭代、环境的生爱丁堡进展一回线上的压测是不具体的,也是不享有可操作性的。

分布式磨炼代码框架。成立TensorFlow服务器集群,在该集群分布式计算数据流图。https://github.com/tensorflow/tensorflow/blob/master/tensorflow/docs\_src/deploy/distributed.md

这就是说换一种思路去思辨,咱们通过压测去评估应用的容量其实是因为我们无能为力领会具体的一个主意的耗时究竟在哪个地方?也就是说被压测的对象对我们是一个黑盒子,假如大家想办法打开了那几个黑盒子,理论上大家就有主意总括应用的容量,而且可以形成实时的采纳容量评估。

 

故此,殷切须求寻求其余一种缓解难点的思绪:QPS
的瓶颈到底是怎么着?若是弄明白了这一个难点,应用的 QPS 就足以因此测算得到。

import argparse
import sys
import tensorflow as tf
FLAGS = None
def main(_):
  # 第1步:命令行参数解析,获取集群信息ps_hosts、worker_hosts
  # 当前节点角色信息job_name、task_index
  ps_hosts = FLAGS.ps_hosts.split(",")
  worker_hosts = FLAGS.worker_hosts.split(",")
  # 第2步:创建当前任务节点服务器
  # Create a cluster from the parameter server and worker hosts.
  cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
  # Create and start a server for the local task.
  server = tf.train.Server(cluster,
                           job_name=FLAGS.job_name,
                           task_index=FLAGS.task_index)
  # 第3步:如果当前节点是参数服务器,调用server.join()无休止等待;如果是工作节点,执行第4步
  if FLAGS.job_name == "ps":
    server.join()
  # 第4步:构建要训练模型,构建计算图
  elif FLAGS.job_name == "worker":
    # Assigns ops to the local worker by default.
    with tf.device(tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % FLAGS.task_index,
        cluster=cluster)):
      # Build model...
      loss = ...
      global_step = tf.contrib.framework.get_or_create_global_step()
      train_op = tf.train.AdagradOptimizer(0.01).minimize(
          loss, global_step=global_step)
    # The StopAtStepHook handles stopping after running given steps.
    # 第5步管理模型训练过程
    hooks=[tf.train.StopAtStepHook(last_step=1000000)]
    # The MonitoredTrainingSession takes care of session initialization,
    # restoring from a checkpoint, saving to a checkpoint, and closing when done
    # or an error occurs.
    with tf.train.MonitoredTrainingSession(master=server.target,
                                           is_chief=(FLAGS.task_index == 0),
                                           checkpoint_dir="/tmp/train_logs",
                                           hooks=hooks) as mon_sess:
      while not mon_sess.should_stop():
        # Run a training step asynchronously.
        # See `tf.train.SyncReplicasOptimizer` for additional details on how to
        # perform *synchronous* training.
        # mon_sess.run handles AbortedError in case of preempted PS.
        # 训练模型
        mon_sess.run(train_op)
if __name__ == "__main__":
  parser = argparse.ArgumentParser()
  parser.register("type", "bool", lambda v: v.lower() == "true")
  # Flags for defining the tf.train.ClusterSpec
  parser.add_argument(
      "--ps_hosts",
      type=str,
      default="",
      help="Comma-separated list of hostname:port pairs"
  )
  parser.add_argument(
      "--worker_hosts",
      type=str,
      default="",
      help="Comma-separated list of hostname:port pairs"
  )
  parser.add_argument(
      "--job_name",
      type=str,
      default="",
      help="One of 'ps', 'worker'"
  )
  # Flags for defining the tf.train.Server
  parser.add_argument(
      "--task_index",
      type=int,
      default=0,
      help="Index of task within the job"
  )
  FLAGS, unparsed = parser.parse_known_args()
  tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)

再组成下图的耗时明细和选择所处的运转环境,大家就可以找到实际的瓶颈点。

 

举一个简易的例证:

分布式最佳实践。https://github.com/tensorflow/tensorflow/blob/master/tensorflow/tools/dist\_test/python/mnist\_replica.py

借使一个方法在必然采样时间内,平均 QPS 为 200,平均耗时为
100ms,耗时明细分析发现平均访问数据库 6 次,每一趟耗时
10ms,也就是数据库总耗时 60ms,其余均为作业逻辑耗时
40ms。怎么样确定应用的容量呢?

MNIST数据集分布式训练。开设3个端口作分布式工作节点陈设,2222端口参数服务器,2223端口工作节点0,2224端口工作节点1。参数服务器执行参数更新职务,工作节点0、工作节点1执行图模型训练总计职分。参数服务器/job:ps/task:0
cocalhost:2222,工作节点/job:worker/task:0
cocalhost:2223,工作节点/job:worker/task:1 cocalhost:2224。
运作代码。

一经数据库连接池的最罗安达接数为 30,执行此格局的线程池最大为
50(简单起见暂时不考虑线程的切换费用),那么理论上数据库的单机最高 QPS 为
30*1000/60=500。

python mnist_replica.py --job_name="ps" --task_index=0
python mnist_replica.py --job_name="worker" --task_index=0
python mnist_replica.py --job_name="worker" --task_index=1

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import math
import sys
import tempfile
import time
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
# 定义常量,用于创建数据流图
flags = tf.app.flags
flags.DEFINE_string("data_dir", "/tmp/mnist-data",
                    "Directory for storing mnist data")
# 只下载数据,不做其他操作
flags.DEFINE_boolean("download_only", False,
                     "Only perform downloading of data; Do not proceed to "
                     "session preparation, model definition or training")
# task_index从0开始。0代表用来初始化变量的第一个任务
flags.DEFINE_integer("task_index", None,
                     "Worker task index, should be >= 0. task_index=0 is "
                     "the master worker task the performs the variable "
                     "initialization ")
# 每台机器GPU个数,机器没有GPU为0
flags.DEFINE_integer("num_gpus", 1,
                     "Total number of gpus for each machine."
                     "If you don't use GPU, please set it to '0'")
# 同步训练模型下,设置收集工作节点数量。默认工作节点总数
flags.DEFINE_integer("replicas_to_aggregate", None,
                     "Number of replicas to aggregate before parameter update"
                     "is applied (For sync_replicas mode only; default: "
                     "num_workers)")
flags.DEFINE_integer("hidden_units", 100,
                     "Number of units in the hidden layer of the NN")
# 训练次数
flags.DEFINE_integer("train_steps", 200,
                     "Number of (global) training steps to perform")
flags.DEFINE_integer("batch_size", 100, "Training batch size")
flags.DEFINE_float("learning_rate", 0.01, "Learning rate")
# 使用同步训练、异步训练
flags.DEFINE_boolean("sync_replicas", False,
                     "Use the sync_replicas (synchronized replicas) mode, "
                     "wherein the parameter updates from workers are aggregated "
                     "before applied to avoid stale gradients")
# 如果服务器已经存在,采用gRPC协议通信;如果不存在,采用进程间通信
flags.DEFINE_boolean(
    "existing_servers", False, "Whether servers already exists. If True, "
    "will use the worker hosts via their GRPC URLs (one client process "
    "per worker host). Otherwise, will create an in-process TensorFlow "
    "server.")
# 参数服务器主机
flags.DEFINE_string("ps_hosts","localhost:2222",
                    "Comma-separated list of hostname:port pairs")
# 工作节点主机
flags.DEFINE_string("worker_hosts", "localhost:2223,localhost:2224",
                    "Comma-separated list of hostname:port pairs")
# 本作业是工作节点还是参数服务器
flags.DEFINE_string("job_name", None,"job name: worker or ps")
FLAGS = flags.FLAGS
IMAGE_PIXELS = 28
def main(unused_argv):
  mnist = input_data.read_data_sets(FLAGS.data_dir, one_hot=True)
  if FLAGS.download_only:
    sys.exit(0)
  if FLAGS.job_name is None or FLAGS.job_name == "":
    raise ValueError("Must specify an explicit `job_name`")
  if FLAGS.task_index is None or FLAGS.task_index =="":
    raise ValueError("Must specify an explicit `task_index`")
  print("job name = %s" % FLAGS.job_name)
  print("task index = %d" % FLAGS.task_index)
  #Construct the cluster and start the server
  # 读取集群描述信息
  ps_spec = FLAGS.ps_hosts.split(",")
  worker_spec = FLAGS.worker_hosts.split(",")
  # Get the number of workers.
  num_workers = len(worker_spec)
  # 创建TensorFlow集群描述对象
  cluster = tf.train.ClusterSpec({
      "ps": ps_spec,
      "worker": worker_spec})
  # 为本地执行任务创建TensorFlow Server对象。
  if not FLAGS.existing_servers:
    # Not using existing servers. Create an in-process server.
    # 创建本地Sever对象,从tf.train.Server这个定义开始,每个节点开始不同
    # 根据执行的命令的参数(作业名字)不同,决定这个任务是哪个任务
    # 如果作业名字是ps,进程就加入这里,作为参数更新的服务,等待其他工作节点给它提交参数更新的数据
    # 如果作业名字是worker,就执行后面的计算任务
    server = tf.train.Server(
        cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
    # 如果是参数服务器,直接启动即可。这里,进程就会阻塞在这里
    # 下面的tf.train.replica_device_setter代码会将参数批定给ps_server保管
    if FLAGS.job_name == "ps":
      server.join()
  # 处理工作节点
  # 找出worker的主节点,即task_index为0的点
  is_chief = (FLAGS.task_index == 0)
  # 如果使用gpu
  if FLAGS.num_gpus > 0:
    # Avoid gpu allocation conflict: now allocate task_num -> #gpu
    # for each worker in the corresponding machine
    gpu = (FLAGS.task_index % FLAGS.num_gpus)
    # 分配worker到指定gpu上运行
    worker_device = "/job:worker/task:%d/gpu:%d" % (FLAGS.task_index, gpu)
  # 如果使用cpu
  elif FLAGS.num_gpus == 0:
    # Just allocate the CPU to worker server
    # 把cpu分配给worker
    cpu = 0
    worker_device = "/job:worker/task:%d/cpu:%d" % (FLAGS.task_index, cpu)
  # The device setter will automatically place Variables ops on separate
  # parameter servers (ps). The non-Variable ops will be placed on the workers.
  # The ps use CPU and workers use corresponding GPU
  # 用tf.train.replica_device_setter将涉及变量操作分配到参数服务器上,使用CPU。将涉及非变量操作分配到工作节点上,使用上一步worker_device值。
  # 在这个with语句之下定义的参数,会自动分配到参数服务器上去定义。如果有多个参数服务器,就轮流循环分配
  with tf.device(
      tf.train.replica_device_setter(
          worker_device=worker_device,
          ps_device="/job:ps/cpu:0",
          cluster=cluster)):

    # 定义全局步长,默认值为0
    global_step = tf.Variable(0, name="global_step", trainable=False)
    # Variables of the hidden layer
    # 定义隐藏层参数变量,这里是全连接神经网络隐藏层
    hid_w = tf.Variable(
        tf.truncated_normal(
            [IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units],
            stddev=1.0 / IMAGE_PIXELS),
        name="hid_w")
    hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name="hid_b")
    # Variables of the softmax layer
    # 定义Softmax 回归层参数变量
    sm_w = tf.Variable(
        tf.truncated_normal(
            [FLAGS.hidden_units, 10],
            stddev=1.0 / math.sqrt(FLAGS.hidden_units)),
        name="sm_w")
    sm_b = tf.Variable(tf.zeros([10]), name="sm_b")
    # Ops: located on the worker specified with FLAGS.task_index
    # 定义模型输入数据变量
    x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS])
    y_ = tf.placeholder(tf.float32, [None, 10])
    # 构建隐藏层
    hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)
    hid = tf.nn.relu(hid_lin)
    # 构建损失函数和优化器
    y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))
    cross_entropy = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0)))
    # 异步训练模式:自己计算完成梯度就去更新参数,不同副本之间不会去协调进度
    opt = tf.train.AdamOptimizer(FLAGS.learning_rate)
    # 同步训练模式
    if FLAGS.sync_replicas:
      if FLAGS.replicas_to_aggregate is None:
        replicas_to_aggregate = num_workers
      else:
        replicas_to_aggregate = FLAGS.replicas_to_aggregate
      # 使用SyncReplicasOptimizer作优化器,并且是在图间复制情况下
      # 在图内复制情况下将所有梯度平均
      opt = tf.train.SyncReplicasOptimizer(
          opt,
          replicas_to_aggregate=replicas_to_aggregate,
          total_num_replicas=num_workers,
          name="mnist_sync_replicas")
    train_step = opt.minimize(cross_entropy, global_step=global_step)
    if FLAGS.sync_replicas:
      local_init_op = opt.local_step_init_op
      if is_chief:
        # 所有进行计算工作节点里一个主工作节点(chief)
        # 主节点负责初始化参数、模型保存、概要保存
        local_init_op = opt.chief_init_op
      ready_for_local_init_op = opt.ready_for_local_init_op
      # Initial token and chief queue runners required by the sync_replicas mode
      # 同步训练模式所需初始令牌、主队列
      chief_queue_runner = opt.get_chief_queue_runner()
      sync_init_op = opt.get_init_tokens_op()
    init_op = tf.global_variables_initializer()
    train_dir = tempfile.mkdtemp()
    if FLAGS.sync_replicas:
      # 创建一个监管程序,用于统计训练模型过程中的信息
      # lodger 是保存和加载模型路径
      # 启动就会去这个logdir目录看是否有检查点文件,有的话就自动加载
      # 没有就用init_op指定初始化参数
      # 主工作节点(chief)负责模型参数初始化工作
      # 过程中,其他工作节点等待主节眯完成初始化工作,初始化完成后,一起开始训练数据
      # global_step值是所有计算节点共享的
      # 在执行损失函数最小值时自动加1,通过global_step知道所有计算节点一共计算多少步
      sv = tf.train.Supervisor(
          is_chief=is_chief,
          logdir=train_dir,
          init_op=init_op,
          local_init_op=local_init_op,
          ready_for_local_init_op=ready_for_local_init_op,
          recovery_wait_secs=1,
          global_step=global_step)
    else:
      sv = tf.train.Supervisor(
          is_chief=is_chief,
          logdir=train_dir,
          init_op=init_op,
          recovery_wait_secs=1,
          global_step=global_step)
    # 创建会话,设置属性allow_soft_placement为True
    # 所有操作默认使用被指定设置,如GPU
    # 如果该操作函数没有GPU实现,自动使用CPU设备
    sess_config = tf.ConfigProto(
        allow_soft_placement=True,
        log_device_placement=False,
        device_filters=["/job:ps", "/job:worker/task:%d" % FLAGS.task_index])
    # The chief worker (task_index==0) session will prepare the session,
    # while the remaining workers will wait for the preparation to complete.
    # 主工作节点(chief),task_index为0节点初始化会话
    # 其余工作节点等待会话被初始化后进行计算
    if is_chief:
      print("Worker %d: Initializing session..." % FLAGS.task_index)
    else:
      print("Worker %d: Waiting for session to be initialized..." %
            FLAGS.task_index)
    if FLAGS.existing_servers:
      server_grpc_url = "grpc://" + worker_spec[FLAGS.task_index]
      print("Using existing server at: %s" % server_grpc_url)
      # 创建TensorFlow会话对象,用于执行TensorFlow图计算
      # prepare_or_wait_for_session需要参数初始化完成且主节点准备好后,才开始训练
      sess = sv.prepare_or_wait_for_session(server_grpc_url,
                                            config=sess_config)
    else:
      sess = sv.prepare_or_wait_for_session(server.target, config=sess_config)
    print("Worker %d: Session initialization complete." % FLAGS.task_index)
    if FLAGS.sync_replicas and is_chief:
      # Chief worker will start the chief queue runner and call the init op.
      sess.run(sync_init_op)
      sv.start_queue_runners(sess, [chief_queue_runner])
    # Perform training
    # 执行分布式模型训练
    time_begin = time.time()
    print("Training begins @ %f" % time_begin)
    local_step = 0
    while True:
      # Training feed
      # 读入MNIST训练数据,默认每批次100张图片
      batch_xs, batch_ys = mnist.train.next_batch(FLAGS.batch_size)
      train_feed = {x: batch_xs, y_: batch_ys}
      _, step = sess.run([train_step, global_step], feed_dict=train_feed)
      local_step += 1
      now = time.time()
      print("%f: Worker %d: training step %d done (global step: %d)" %
            (now, FLAGS.task_index, local_step, step))
      if step >= FLAGS.train_steps:
        break
    time_end = time.time()
    print("Training ends @ %f" % time_end)
    training_time = time_end - time_begin
    print("Training elapsed time: %f s" % training_time)
    # Validation feed
    # 读入MNIST验证数据,计算验证的交叉熵
    val_feed = {x: mnist.validation.images, y_: mnist.validation.labels}
    val_xent = sess.run(cross_entropy, feed_dict=val_feed)
    print("After %d training step(s), validation cross entropy = %g" %
          (FLAGS.train_steps, val_xent))
if __name__ == "__main__":
  tf.app.run()

同理业务逻辑的单机最高 QPS 为
50*1000/40=1250,显明那个格局的瓶颈点在数据库上,也就是以此方法的单机最高
QPS 为 500。

 

接下来,针对那么些措施开展优化,数据库每一回访问的耗时降到了
5ms,平均访问次数变成了 4 次,也就是数据库总耗时为
20ms,业务逻辑耗时仍然是 40ms,此时数据库的单机最高 QPS 为
30*1000/20=1500。分明此时的瓶颈点在业务逻辑上,也就是其一艺术的单机最高
QPS 为 1250。

参考资料:

上例为一个艺术的单机最高 QPS
猜想,结合其余情势做同理分析,根据统计出这么些艺术在总体应用中对资源的占用比例就足以推算出整个应用的单机最高
QPS。

《TensorFlow技术解析与实战》

越来越分析,业务逻辑耗时也就是总耗时去除了 IO 的耗时(如 RPC
远程调用、访问数据库、读写磁盘耗时等等),业务逻辑耗时首要分为两大一部分:

欢迎推荐东京(Tokyo)机械学习工作机遇,我的微信:qingxingfengzi

线程运行耗时(RUNNABLE)

线程等待耗时(BLOCKED、WAITING、TIMED_WAITING)

由此对事情逻辑耗时的分类得知,真正消耗 CPU
资源的是线程运行耗时,那么难题就改成了我们怎么得到运行时刻与等待时间的耗时比例了。

CPU 使用率(进度、线程)可以经过 proc
虚拟文件系统得到,此处不是本文重点,不展开研究。分歧环境仍可以经过不相同的特性神速得到那一个多少。以
Java 应用为例,我们得以从 JMX
中得到线程执行的总计情况,大概推算出上述的比例,如下图所示:

接轨分析下边的例子,如若大家通过分析线程的运转状态获知,运行时刻与等待时间为
1:1,此时经过 CPU 的使用率为 20%,那么 CPU 目标能支撑的单机最高 QPS 为
200 * 100% / 20% = 1000,也就是以此法子的单机最高 QPS 为
1000。同理可以估摸互连网带宽等物理资源的瓶颈点。

一般的话,业务逻辑耗时中,对于计算密集型的使用,CPU
计算耗时的比重比较大,而 IO 密集型的应用反之。

经过上述的数据,大家就可以实时评估系统的容量,如下图:

应用实时水位图

智能告警

来源告警分析是基于互联网拓扑,结合调用链,通过时间相关性、权重、机器学习等算法,将报警举行分拣筛选,快捷找到告警根源的一种艺术。它能从大批量的报警中找到难点的源于,因而大大收缩了故障排查及回复时间。

报警处理步骤

报警过滤(将报警中不重大的告警以及重新告警过滤掉)

变更派生告警(根源关联关系转移各个派生告警)

报警关联(同一个时光窗内,不相同品种派生告警是或不是留存涉嫌)

权重总结(根据预先安装的各样报警的权重,计算改为根源告警的可能)

变化根源告警(将权重最大的派生告警标记为来自告警)

根源告警合并(若多类告警计算出的来自告警相同,则将其联合)

根据历史告警处理知识库,找到类似根源告警的处理方案,智能地交给解决方案。

源于告警原理图

举例来说来说:

设若多少个系统通过 RPC 进行劳动调用,调用关系如下:D 系统->C 系统->
B 系统-> A 系统。

当 A 系统查询数据库出现查询超时后,告警会层层往前推进,导致 B、C、D
系统均有 N 个超时告警发生。此时,ROOT
分析可以将报警举行消解,直接解析出根源告警为 A 系统访问数据库分外,导致
A、B、C、D 几个种类极度。

这般,就防止了处理人士和种种系统开发人员交流,支持处理人士很快定位难题根源、进步了平分解决岁月(MTTR)。如下图所示:

起点告警调用链关系

来源告警明细表

根源告警分析紧要分为强关联分析与机具学习两类。

a.强关联数据解析

强涉嫌指的是已知确定的涉嫌关系。如:

行使之间的调用链关系

数据库与应用服务器

网络设施与互连网设施、互联网设施与应用服务器

宿主机与虚拟机关系等

若在同一个日子窗内,有多少个强涉嫌的设施或应用服务器同时报警,则大约率认为告警之间存在关联关系。

在权重算法中,有一个要害的规则,链路上设有连续的报警可能存在关联,越靠后的使用越可能是来源于。现在大家依据例子,分别计算各个根源告警。

接轨应用方面的例证,D 应用->C 应用->B 应用->A
应用->数据库分外的情事。

首先是总计数据库根源告警。依据数据库关联关系,会派生数据库类型的数据库告警、A
应用告警。还会派生一条利用类型的 A 应用数据库卓殊报警。

基于数据库派生告警以及数据库与应用的涉嫌关系及权重,可以得出数据库卓殊导致
A 应用查询超时。

接下去是总括应用根源告警。依照调用关系,大家先总计出一而再五个应用告警的链路。当前
D->C->B->A 三个使用都有派生告警,满意此规则。

接下来,找到最靠后的告警应用,也就是 A 应用。列举时间窗口内具备 A
应用的派生告警(可能存在各个派生告警,依据权重统计根源),将权重最高的派生告警标记为来源告警。

譬如说:A 系统里面有 2 种类型派生告警,分别是数据库告警、GC 告警。

基于权重总结规则,数据库告警为 90,GC 告警
10,也就是说数据库十分报警权重最高。那时由于数据库根源告警和调用链根源告警一致,会将三种档次的报警合并。最终得出结论:数据库万分导致
A、B、C、D 系统报警。

b.机器学习根源分析

强涉嫌数据解析是对已知告警的关系关系,直接举行来源告警分析。然而多少时候,关联关系是未知的。这时就需要通过机器学习算法,找到告警之间的包含联系,再开展来源告警预测。

当下,主要开展了两类机器学习实践。

1、关联规则算法

涉嫌规则算法主要开展了 Apriori 算法和 FPGrowth
两类算法的执行。那两类作用相似,都可以发现反复项集。经过实测,FPGrowth
比 Apriori 更高速一些。

大家按一定的日子距离划分时间窗,总括每个时间窗内,种种告警一起现身的功能,找出各样报警之间的涉及。末了可按分析出的涉嫌关系,生成根源告警

事关规则算法的独到之处在于领会和落到实处起来相比较简单。缺点是功效比较低,灵活度也不够高。

2、神经网络算法

循环神经互连网(简称
RNN)是一个和时间种类有关联的神经互联网,对单张图片而言,像素新闻是不变的,而对于一段话而言,里面的词的组合是有程序的,而且经常意况下,后续的词和眼前的词有种种关联。

这时,卷积神经网络平常很难处理那种时序关联音信,而 RNN
却能立见效能地举办拍卖。

随着时光间隔的叠加,RNN
对于背后时间的节点相比较前边时间节点的感知力将骤降。解决那个题材必要用到
LongShort Term 互连网(简称 LSTM),它通过刻意的规划来防止短时间依赖难题。LSTM
在实践中默认能够记住长时间的音讯,而不要求提交很大代价。

对于某类故障引起的大度报警之间,存在着日子相关性。将历史派生告警作为输入,未来自告警类型作为出口。通过
LSTM
提取派生告警特征,建立告警相关性分析模型。那样就可以实时将符合特征的派生告警,划分到平等类根源告警中,匡助用户疾速定位难题。

亟需验证的是经济本身的作业特点支配了对第三方存在依靠,由此告警本身的随机性较大,客观上导致学习样本的质量不高,须求漫长的积聚和核查才能达到比较好的意义,由此对此来自告警,即使有原则取到强涉嫌关系,提出接纳强涉嫌分析,能达标一语双关的机能。

结语

智能运维是当下运维领域被炒得最火的词汇之一,然则个人认为没有一个智能运维的制品是放之四海而皆准,智能运维需求在真实的环境中不止的磨合,才能已毕大家预料的作用。

随着人工智能在运维领域的缕缕尝试与探索,以后在运维领域中的分外检测与智能报警及自动化容量规划与分配一定取得迅捷的前进,从而成为运维的中坚竞争力。

初稿来自:技巧之家

发表评论

电子邮件地址不会被公开。 必填项已用*标注