Kafka设计解析(一) Kafka背景与架构介绍

 

本文就授权InfoQ独家披露,如用转载请注明出处。又多内容请查阅作者个人站点http://www.jasongj.com ,或者订阅微信公众号【大数量架构】

 
 这首开始控制把数以万计文章的名改掉,想了个好名字,反正不是玩单机版的就实施了。

图片 1

   
好了,这首我们看同样种非持久化的缓存服务器memcache,说及缓存本能反映就是是cache,session什么的,是的,可以说这

摘要

  Kafka是出于LinkedIn开发并开源之分布式消息网,因该分布式及高吞吐率而为广大运用,现都跟Cloudera
Hadoop,Apache Storm,Apache
Spark集成。本文介绍了Kafka的创建背景,设计目标,使用信息网的优势及当前流行的信息网相比。并介绍了Kafka的架,Producer消息路由,Consumer
Group以及由该促成之两样消息分发方式,Topic & Partition,最后介绍了Kafka
Consumer为何使用pull模式以及Kafka提供的老三栽delivery guarantee。

把都是基于.net进程的,通俗点也就算做不了差不多机器的共享,典型的一个即便是SSO。

背景介绍

 

Kafka创建背景

  Kafka是一个信网,原本开发自LinkedIn,用作LinkedIn的走流(Activity
Stream)和营业数量处理管道(Pipeline)的根基。现在它们都让多家不同品类的商家 作为多种类型的数管道与信息网运用。
  活动流数据是几拥有站点在针对那个网站采取状态做报表时犹设为此到的数目被极其健康的局部。活动数量包括页面访问量(Page
View)、被翻动内容方面的信息与查找情况相当内容。这种多少一般的处理方式是优先拿各种走以日记的款式写入某种文件,然后周期性地对这些文件进行统计分析。运营数量因的凡服务器的属性数据(CPU、IO使用率、请求时、服务日志等等数据)。运营数据的统计方式种类层出不穷。
  近年来,活动与营业数量处理就改为了网站软件出品特色中一个首要的一些,这虽待平等拟小更复杂的根底设备对那提供支撑。
  

一: 安装

Kafka简介

  Kafka是一律种植分布式的,基于发布/订阅的信网。主要设计目标如下:

  • 因时复杂度为O(1)的法提供信息持久化能力,即使对TB级以上数据吧能担保常数时间复杂度的看性能
  • 愈吞吐率。即使在怪廉价的商用机器及呢会成就单机支持各级秒100K条以上信息之传输
  • 支持Kafka
    Server间的信分区,及分布式消费,同时确保每个Partition内的音信顺序传输
  • 与此同时支持离线数据处理以及实时数据处理
  • Scale out:支持在线水平扩展

   
 memcahce像redis,mongodb一样都急需被他们自己之服务端,我们下载Memcached_1.2.5.zip,然后嵌入C盘,修改文件

怎用信息网

  • 解耦
      以档次启动的新来预测未来项目会遇到什么要求,是极困难的。消息网以处理过程中间插了一个含有的、基于数的接口层,两止的处理过程都使实现这无异于接口。这允许你独自的扩张或改动两度的处理过程,只要确保其遵守同样的接口约束。

  • 冗余
      有些情况下,处理数据的长河会破产。除非数量让持久化,否则用招致丢失。消息队列把多少开展持久化直到其已经深受全处理,通过就同方法逃避了数丢失风险。许多音队列所采取的”插入-获取-删除”范式中,在拿一个消息从队列中去之前,需要而的处理体系明确的指出该消息已经让处理完毕,从而确保您的数码被安康的保存直到你以完毕。

  • 扩展性
      因为消息队列解耦了你的处理过程,所以增大消息入队和拍卖的频率是雅爱之,只要另外多处理过程即可。不需要改变代码、不需要调剂参数。扩展就像调整大电力按钮一样简单。

  • 圆滑 & 峰值处理能力
      在访问量剧增的事态下,应用依然要连续发挥作用,但是如此的突发流量并无常见;如果也因克处理当下类似峰值访问为专业来投入资源随时待命的是伟大的浪费。使用信息队列会使重点零部件顶住突发的走访压力,而不见面以突发的过于的求而净崩溃。

  • 然而恢复性
      系统的同一有些零件失效时,不见面影响到整体系。消息队列降低了经过中的耦合度,所以即使一个处理消息之长河挂掉,加入队列中的音信还可以系恢复后让拍卖。

  • 各个保证
      于多用状况下,数据处理的相继都怪要紧。大部分消息队列本来就排序的,并且能保证数据会遵循一定的各个来拍卖。Kafka保证一个Partition内之信息的有序性。

  • 缓冲
      于另外要的系统被,都见面生要不同之处理时之元素。例如,加载一摆设图片于下过滤器花费还少之工夫。消息队列通过一个缓冲层来救助任务最高效率的行———写副行的处理会尽可能的敏捷。该缓冲有助于控制以及优化数据流经过系统的快。

  • 异步通信
      很多时光,用户不思量吧不需及时处理消息。消息队列提供了异步处理体制,允许用户将一个音放入队列,但并无立即处理它。想向行中放入小消息就加大多少,然后以得的时刻还夺处理它们。

名为memcached。

常用Message Queue对比

  • RabbitMQ
      RabbitMQ是用Erlang编写的一个开源之消息队列,本身支持多底商事:AMQP,XMPP,
    SMTP,
    STOMP,也正因如此,它好重量级,更称给企业级的开。同时实现了Broker构架,这象征消息于发送给客户端时先在着力队列排队。对路由,负载均衡或者数持久化都起十分好之支撑。

  • Redis
      Redis是一个冲Key-Value对的NoSQL数据库,开发保护非常活泼。虽然它是一个Key-Value数据库存储系统,但其本身支持MQ功能,所以完全可以当一个轻量级的序列服务来使用。对于RabbitMQ和Redis的入队暨出队操作,各尽100万破,每10万破记录同一破实行时。测试数据分为128Bytes、512Bytes、1K和10K季单例外尺寸的多少。实验表明:入队时,当数比较小时Redis的性能要高于RabbitMQ,而而数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数额大小,Redis都表现出大好的特性,而RabbitMQ的出队性能则远小于Redis。

  • ZeroMQ
      ZeroMQ号称太抢之音信队列系统,尤其针对大吞吐量的需求状况。ZMQ能够落实RabbitMQ不擅的高等/复杂的队列,但是开发人员需要自己组合又术框架,技术上的复杂度是指向这MQ能够使用成之挑战。ZeroMQ具有一个奇异的非中间件的模式,你不待设置和运转一个消息服务器或中等件,因为你的应用程序将装这个服务器角色。你就待简单的援ZeroMQ程序库,可以使用NuGet安装,然后你就是足以开心的于应用程序之间发送信息了。但是ZeroMQ仅提供非持久性的班,也就是说要宕机,数据以见面丢掉。其中,Twitter的Storm
    0.9.0先的本子中默认使用ZeroMQ作为数据流的导(Storm从0.9本开始以支持ZeroMQ和Netty作为传输模块)。

  • ActiveMQ
      ActiveMQ是Apache下的一个子项目。
    类似于ZeroMQ,它亦可以代办与接触对碰的技巧实现队列。同时类似于RabbitMQ,它少量代码就可很快地落实高级应用场景。

  • Kafka/Jafka
      Kafka是Apache下的一个子项目,是一个强性能跨语言分布式发布/订阅消息队列系统,而Jafka是于Kafka之上孵化而来的,即Kafka的一个升级版。具有以下特征:快速持久化,可以于O(1)的系出下开展信息持久化;高吞吐,在一如既往尊一般的服务器上既可达标10W/s的吞吐速率;完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;支持Hadoop数据交互加载,对于像Hadoop的同等的日志数据和离线分析体系,但以要求实时处理的范围,这是一个行之解决方案。Kafka通过Hadoop的互动加载机制统一了在线与离线的音处理。Apache
    Kafka相对于ActiveMQ是一个好轻量级的音信网,除了性能大好以外,还是一个办事出色的分布式系统。

1:install

Kafka架构

   
 install可以说凡是万力所能及通用命令,首先我们反至memcached目录,然后 memcached.exe
-d install 即可。

Terminology

  • Broker
      Kafka集群包含一个要多只服务器,这种服务器被誉为broker
  • Topic
      每条发表暨Kafka集群的音信还生一个类,这个路吃称Topic。(物理及不同Topic的消息分开储存,逻辑上一个Topic的信息虽然保存于一个要么多个broker上但用户只有待点名消息的Topic即可生育或者花费数量要不必关心数据存于何处)
  • Partition
      Parition是情理及之概念,每个Topic包含一个要多单Partition.
  • Producer
      负责发布信息及Kafka broker
  • Consumer
      信消费者,向Kafka broker读取信息之客户端。
  • Consumer Group
      每个Consumer属于一个一定的Consumer
    Group(可也每个Consumer指定group name,若无点名group
    name则属于默认的group)。

        图片 2

Kafka拓扑结构

图片 3
  如达到图所示,一个名列前茅的Kafka集众多被蕴含几Producer(可以是web前端有的Page
View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越强),若干Consumer
Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer
Group发生变化时开展rebalance。Producer使用push模式将消息发布到broker,Consumer用pull模式于broker订阅并花信息。
  

2:start

Topic & Partition

  Topic在逻辑上足为看是一个queue,每条消费都不能不指定它的Topic,可以简单了解吧必须指明把及时条信息放上谁queue里。为了让Kafka的吞吐率可以线性提高,物理及把Topic分成一个或者多单Partition,每个Partition在大体及相应一个文本夹,该文件夹下存储这个Partition的享有信息及目录文件。若创建topic1和topic2鲜单topic,且分别发生13只和19独分区,则整个集群达会相应会变卦共32个文件夹(本文所用集群共8只节点,此处topic1和topic2
replication-factor均为1),如下图所出示。
  图片 4
  
  每个日志文件还是一个log entrie序列,每个log entrie寓一个4字节整型数值(值为N+5),1单字节的”magic
value”,4只字节的CRC校验码,其后跟N个字节的消息体。每条消息还起一个手上Partition下唯一的64字节的offset,它指明了就漫长信息之发端位置。磁盘上囤积的消息格式如下:
  message length : 4 bytes (value: 1+4+n)
  “magic” value : 1 byte
  crc : 4 bytes
  payload : n bytes
  这个log entries毫无由一个文本构成,而是分成多独segment,每个segment以该segment第一条信息之offset命名并为“.kafka”为后缀。另外会时有发生一个目录文件,它表明了每个segment下富含的log entry的offset范围,如下图所展示。
  图片 5
  
  因为各条信息还给append到该Partition中,属于顺序写磁盘,因此效率很大(经验证,顺序写磁盘效率比较自由写内存还要后来居上,这是Kafka高吞吐率的一个分外关键之担保)。
  图片 6
  
  对于风俗习惯的message
queue而言,一般会去已经为消费的消息,而Kafka集群会保留所有的信,无论其让消费也。当然,因为磁盘限制,不容许永远保存有数据(实际上也从未必要),因此Kafka提供简单栽政策删除旧数据。一凡是冲时间,二凡是冲Partition文件大小。例如可以通过配备$KAFKA_HOME/config/server.properties,让Kafka删除一两全前之数码,也可每当Partition文件超过1GB时去除旧数据,配置如下所示。

  
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according to the retention policies
log.retention.check.interval.ms=300000
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false

  
  这里要注意,因为Kafka读博特定消息之岁月复杂度为O(1),即与文件大小无关,所以这边去过期文件与加强Kafka性能无关。选择哪的去策略才及磁盘以及具体的急需有关。另外,Kafka会为各个一个Consumer
Group保留有metadata信息——当前花之消息的position,也就算offset。这个offset由Consumer控制。正常状态下Consumer会在花完一长条信息后递增该offset。当然,Consumer也不过拿offset设成一个于小之价,重新消费有信。因为offet由Consumer控制,所以Kafka
broker是无论状态的,它不欲标记哪些消息于怎样消费了,也未待通过broker去保证和一个Consumer
Group只发一个Consumer能消费有平等长条信息,因此呢便未待锁机制,这为为Kafka的大吞吐率提供了有力保持。
  

   
现在咱们设开动start即可,要专注的就是是memecache默认的端口是11211,当然我啊未思量更指定端口了。

Producer消息路由

  Producer发送消息及broker时,会基于Paritition机制选以其储存到哪一个Partition。如果Partition机制设置合理,所有消息可以均匀分布至不同之Partition里,这样就算贯彻了负荷均衡。如果一个Topic对许一个文书,那这文件所在的机器I/O将会见变成是Topic的性瓶颈,而发生了Partition后,不同之音讯可以并行写副不同broker的不同Partition里,极大的加强了吞吐率。可以于$KAFKA_HOME/config/server.properties惨遭经过配备起num.partitions来指定新建Topic的默认Partition数量,也可是于开立Topic时经参数指定,同时为得以Topic创建之后通过Kafka提供的家伙修改。
  
  于发送一条消息不时,可以指定这漫漫信息之key,Producer根据这key和Partition机制来判断该拿及时长达信息发送至哪个Parition。Paritition机制好通过点名Producer的paritition. class立马同样参数来指定,该class必须贯彻kafka.producer.Partitioner接口。本例中设key可以给解析为整数则用相应的整数与Partition总数取余,该信息会受发送至该数对应之Partition。(每个Parition都见面发只序号,序号从0开始)

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class JasonPartitioner<T> implements Partitioner {

    public JasonPartitioner(VerifiableProperties verifiableProperties) {}

    @Override
    public int partition(Object key, int numPartitions) {
        try {
            int partitionNum = Integer.parseInt((String) key);
            return Math.abs(Integer.parseInt((String) key) % numPartitions);
        } catch (Exception e) {
            return Math.abs(key.hashCode() % numPartitions);
        }
    }
}

 
  如果拿高达例被之好像作为partition.class,并透过如下代码发送20条信息(key分别吗0,1,2,3)至topic3(包含4个Partition)。
  

public void sendMessage() throws InterruptedException{
  for(int i = 1; i <= 5; i++){
        List messageList = new ArrayList<KeyedMessage<String, String>>();
        for(int j = 0; j < 4; j++){
            messageList.add(new KeyedMessage<String, String>("topic2", j+"", "The " + i + " message for key " + j));
        }
        producer.send(messageList);
    }
  producer.close();
}

  则key相同的音信会让发送并蕴藏到跟一个partition里,而且key的序号正好跟Partition序号相同。(Partition序号从0开始,本例中的key也从0开始)。下图所示是透过Java程序调用Consumer后打印出的音信列表。
  图片 7  

        图片 8

Consumer Group

  (本节有描述都是依据Consumer hight level API而非low level
API)。
  使用Consumer high level
API时,同一Topic的同样漫长信息只能给与一个Consumer
Group内的一个Consumer消费,但基本上独Consumer Group可同时花费就同样消息。
  图片 9
  这是Kafka用来兑现一个Topic消息的广播(发给具有的Consumer)和单播(发给某一个Consumer)的一手。一个Topic可以针对许多单Consumer
Group。如果要贯彻播放,只要每个Consumer有一个独自的Group就得了。要落实单播只要抱有的Consumer在同一个Group里。用Consumer
Group还得将Consumer进行任意的分组而休需反复发送信息及不同的Topic。
  实际上,Kafka的统筹理念之一就是是还要提供离线处理和实时处理。根据当时同样特点,可以应用Storm这种实时流处理系统针对信息进行实时在线处理,同时采用Hadoop这种批判处理系统进行离线处理,还可同时将数据实时备份到其它一个多少主导,只需要确保这三个操作所用的Consumer属于不同的Consumer
Group即可。下图是Kafka在Linkedin的同种植简化部署示意图。
  图片 10
  
  下面这个例子更鲜明地出示了Kafka Consumer
Group的特点。首先创建一个Topic
(名也topic1,包含3只Partition),然后创建一个属于group1的Consumer实例,并创三个属于group2的Consumer实例,最后通过Producer向topic1殡葬key分别吗1,2,3之信。结果发现属于group1的Consumer收到了富有的立刻三修消息,同时group2中之3只Consumer分别收受了key为1,2,3底消息。如下图所示。
  图片 11  

3:stop,uninstall

Push vs. Pull  

  作为一个音网,Kafka遵循了民俗的方法,选择由Producer向broker
push信并出于Consumer从broker pull消息。一些logging-centric
system,比如Facebook的Scribe和Cloudera的Flume,采用push模式。事实上,push模式与pull模式各发高低。
  push模式大不便服消费速率不同的顾客,因为消息发送速率是由于broker决定的。push模式之目标是竭尽为最抢速度传递信息,但是这样十分爱招Consumer来不及处理消息,典型的呈现就是是拒绝服务以及台网不通。而pull模式则好因Consumer的费力量坐相当的速率消费信息。
  对于Kafka而言,pull模式更适于。pull模式可简化broker的计划,Consumer可自主控制消费信息的速率,同时Consumer可以自己说了算消费方式——即可批量花费呢只是依次消费,同时还能挑不同的付方式因此实现不同的传导语义。
  

     这有限单就是不截图了,一个凡是休,一个凡是卸载,反正都是万力所能及通用命令。

Kafka delivery guarantee

  有这么几种植或的delivery guarantee:

  • At most once 消息可能会见废弃,但绝对不见面更传输
  • At least one 消息不要会扔,但也许会见再次传输
  • Exactly once 每条信息一定会吃传相同次等还只有传输相同不成,很多时段马上是用户所想使的。
      
      当Producer向broker发送消息时,一旦这长达消息被commit,因数replication的留存,它便非会见丢弃。但是一旦Producer发送数据给broker后,遇到网络问题如招致通信中断,那Producer就无法判断该条信息是否曾经commit。虽然Kafka无法确定网络故障期间生了哟,但是Producer可以充分成一栽恍若于主键的事物,发生故障时幂等性的重试多次,这样就到位了Exactly once。截止到即(Kafka
    0.8.2本子,2015-03-04),这无异于Feature还未曾实现,有期待当Kafka未来之版中贯彻。(所以时默认情况下一致漫漫消息于Producer到broker是承保了At least once,可经过安装Producer异步发送实现At most once)。
      接下去讨论的凡信息于broker到Consumer的delivery
    guarantee语义。(仅对Kafka consumer high level
    API)。Consumer在起broker读取信息继,可以挑选commit,该操作会在Zookeeper中保存该Consumer在该Partition中读取的信息之offset。该Consumer下同样次等还念该Partition时会见于生一致长条开始读取。如不commit,下同样次于读取的开端位置会与达到等同差commit之后的发端位置相同。当然好将Consumer设置为autocommit,即Consumer一旦读到数量就自动commit。如果单单谈谈这无异于读取消息的进程,那Kafka是承保了Exactly once。但实质上利用被应用程序并非以Consumer读博完数据就是寿终正寝了,而是要拓展进一步处理,而数处理与commit的依次以充分老程度达主宰了消息从broker和consumer的delivery
    guarantee semantic。
  • 读了信息先commit再处理消息。这种模式下,如果Consumer在commit后尚从来不赶趟处理消息就是crash了,下次再开始工作晚即便无法读到刚刚已交给而无处理的音,这就算针对应于At most once
  • 读了信息先处理又commit。这种模式下,如果当处理终结信息随后commit之前Consumer
    crash了,下次再开始工作经常还会见处理刚未commit的音讯,实际上该消息曾为拍卖过了。这就对准应于At least once。在重重行使状况下,消息还发生一个主键,所以消息之处理往往具备幂等性,即多次处理就同一修信息和单纯处理同差是如出一辙的,那即便可以当是Exactly once。(笔者以为这种说法比较牵强,毕竟她不是Kafka本身提供的体制,主键本身吗并无能够全保证操作的幂等性。而且事实上我们说delivery
    guarantee
    语义是讨论给拍卖多少次,而非处理结果怎样,因为处理方式多种多样,我们不该将处理过程的风味——如是否幂等性,当成Kafka本身的Feature)
  • 要一定要是完成Exactly once,就用协调offset和实际操作的输出。精典的做法是引入两号提交。如果能够吃offset和操作输入是与一个地方,会重新简单与通用。这种方法可能重新好,因为许多出口系统或许不支持有限等提交。比如,Consumer以到多少后或者将多少放到HDFS,如果将最新的offset和数码我并写到HDFS,那便可保证数据的出口及offset的更新要么都好,要么都非完,间接实现Exactly once。(目前就算high
    level API而言,offset是存于Zookeeper中的,无法存于HDFS,而low level
    API的offset是由好去维护的,可以拿的存于HDFS中)
      总之,Kafka默认保证At least once,并且同意通过安装Producer异步提交来促成At most once。而Exactly once务求和表面存储系统协作,幸运的是Kafka提供的offset可以充分直接非常容易得使这种艺术。

 

下篇预告

  下一致首用深刻教Kafka是怎做Replication和Leader
Election的。在Kafka0.8以前的版本被,如果某broker宕机,或者磁盘出现问题,则该broker上有着partition的数目还见面掉。而Kafka0.8随后进入了Replication机制,可以以每个Partition的数据备份多卖,即使一些broker宕机也能保证系统的可用性和数目的完整性。

老二:驱动程序

 
 memcache的服务器我们即便曾开启好了,由于当商家近日一直还当为此php,算了要用C#使得吧,谁被这是.net

社区呢,下载C#驱动,既然是缓存服务器,只要来中心的CURD,我思念当就大多了。

 1 using System;
 2 using System.Collections.Generic;
 3 
 4 namespace BeIT.MemCached
 5 {
 6     class Example
 7     {
 8         public static void Main(string[] args)
 9         {
10             //通过配置文件初始化memcache实例
11             MemcachedClient cache = MemcachedClient.GetInstance("MyConfigFileCache");
12 
13             //编辑(可以模拟session操作,缓存20分钟)
14             cache.Set("name", "一线码农", DateTime.Now.AddMinutes(20));
15 
16             //获取
17             var result = cache.Get("name");
18 
19             Console.WriteLine("获取name的缓存数据为: " + result);
20 
21             //删除
22             cache.Delete("name");
23 
24             Console.WriteLine("\n成功删除cache中name的数据");
25 
26             result = cache.Get("name");
27 
28             Console.WriteLine("\n再次获取cache中name的数据为:" + (result ?? "null") + "\n");
29 
30             //查看下memecahce的运行情况
31             foreach (KeyValuePair<string, Dictionary<string, string>> host in cache.Status())
32             {
33                 Console.Out.WriteLine("Host: " + host.Key);
34                 foreach (KeyValuePair<string, string> item in host.Value)
35                 {
36                     Console.Out.WriteLine("\t" + item.Key + ": " + item.Value);
37                 }
38                 Console.Out.WriteLine();
39             }
40 
41             Console.Read();
42         }
43     }
44 }

咱们又定义下安排文件,既然memcache可以用于分布式,那便避免不了以cache分摊到几乎台服务器上,可以视,下面的

配备也是非常简单的,当然分配的原理自然是memcache自身的算法决定的,最后别忘了在另一样雅服务器上怒放一个端口就她

就行了。

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <configSections>
    <section name="beitmemcached" type="System.Configuration.NameValueSectionHandler" />
  </configSections>
  <appSettings>
  </appSettings>
  <beitmemcached>
    <add key="MyConfigFileCache" value="127.0.0.1:11211" />
    <!--<add key="MyConfigFileCache" value="127.0.0.1:11211,127.0.0.1:8888" />-->
  </beitmemcached>
</configuration>

图片 12

 

脚是包装程序:BeITMemcached ,也得以交codegoogle去下载。

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注