后台线程更新界面的巧妙方法

b. 后台线程运行的时节,最好将界面
disable,不然用户或点击两涂鸦按钮,或者将来台在运转的当儿,做另外事情,有或碰面相互影响。

1.2.3 生产者 producer

  • producer将消息写副kafka
  • 写副假使指定topic和partition
  • 信息如何分割至不同之partition,算法由producer指定

3.
后台线程更新界面不克太抢,一来界面更新太抢人随即不干净,容易吃人当程序好像失控了,在混展现有乱码;二来,界面更新太抢,也会晤影响整操作的得进度,更新界面也是索要
CPU 的。我们领略,电影每秒是 24幅,也就是说,每秒更新画面 24
次,是足以吃丁觉着特别流利的,每秒更新超越 24 次是休必要之。

2.2.2 命令

起步: bin/kafka-server-start.sh config/server.properties ,生产条件最好因守护程序启动:nohup
 &

结束: bin/kafka-server-stop.sh 

于单机版程序的计划被,对于欲较长期运行的操作,一般还经后台线程来形成。倘诺直白用
UI 线程(在 click 事件受到) 运行,则 UI
界面长时得无顶机会更绘制,会招程序假死之观(俗称“翻白眼”)。

2.3 使用

kafka本身是与zookeeper相连的,而针对性承诺producer和consumer的状态保存也都是因而zookeeper完成的。对Kafka的各样操作通过该所连接的Zookeeper完成。

立异用户界面,接纳定时器 timer ,取名
UpdateUi提姆(Tim)er。后台线程运行过程遭到,把运行状态(百分比、状态指示详细字符串、紧要步骤字符串)放入全局变量中,UpdateUi提姆(Tim)er
来读取全局变量并出示。定时器运行区间,可以安装成每秒 2 –5
次(我的经验值)。invoke 之类的函数就不要调用了。这样可以解决问题。

4 Kafka监控工具(KafkaOffsetMonitor)

可凭KafkaOffsetMonitor来图形化体现Kafka的broker节点、topic、consumer及offset等音信。

为KafkaOffsetMonitor-assembly-0.2.0.jar为条例,下载后执行:

图片 1

#!/bin/bash
java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.2.0.jar \
     com.quantifind.kafka.offsetapp.OffsetGetterWeb \
     --zk  192.168.5.131:2181,192.168.6.132:2181,192.168.6.133:2181 \
     --port 8087 \
     --refresh 10.seconds \
     --retain 1.days 1>./zsm-logs/stdout.log 2>./zsm-logs/stderr.log &

图片 2

内部,zk依据host1:port1,host2:port2…的格式去写即可,port为打开web界面的捧口号,refresh为刷新时,retain为数据保存时间(单位seconds,
minutes, hours, days)

 

 

1.2.4 消费者 consumer

  • consumer读取音信并作处理
  • consumer group

    • 此定义的引入为了帮忙有限种植现象:每条消息分发一个主顾,每条音信广播为消费组的拥有消费者
    • 差不三只consumer
      group订阅一个topic,该topci的信息广播为group内所有consumer
    • 同漫长音信发送至一个consumer
      group后,只可以由该group的一个consumer接收和运用
    • 一个group中之每个consumer对应一个partition可以带如下好处
      • 足随partition的数码举行并发处理
      • 每个partition都止来一个consumer读取,由此保证了信被拍卖的依次是仍partition的存顺序举行,注意这一个顺序受到producer存放信息的算法影响
  •  一个Consumer可以来差不四只线程举办花费,线程数应不多于topic的partition数,因为对一个富含一要么多花费线程的consumer
    group来说,一个partition只可以分吃中的一个花费线程消费,且被尽可能多之线程能分配至partition(然则事实上真正去花的线程及线程数依然出于线程池的调度机制来支配)。这样只要线程数比partition数多,那么单射分配也会生出多生出底线程,它们就未会合花及外一个partition的数量要空转耗资源

  • 倘使consumer从多单partition读到数,不保证数据间的顺序性,kafka只保证在一个partition上数据是一动不动的,但大多单partition,按照你念之次第会生出异
  • 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应之partition会暴发变化

付给受用户之后,如若用户的总括机于我之总结机快了好多,或者慢性了广大,则运行界面效果依然未佳:不是翻新太抢、就是立异太慢。

3.1 使用

运转 ./kafka-run-class.sh kafka.tools.MirrorMaker
–help 查看下表达,如下:

图片 3

图片 4

 1 Option                                  Description                            
 2 ------                                  -----------                            
 3 --blacklist <Java regex (String)>       Blacklist of topics to mirror.         
 4 --consumer.config <config file>         Consumer config to consume from a      
 5                                           source cluster. You may specify      
 6                                           multiple of these.                   
 7 --help                                  Print this message.                    
 8 --num.producers <Integer: Number of     Number of producer instances (default: 
 9   producers>                              1)                                   
10 --num.streams <Integer: Number of       Number of consumption streams.         
11   threads>                                (default: 1)                         
12 --producer.config <config file>         Embedded producer config.              
13 --queue.size <Integer: Queue size in    Number of messages that are buffered   
14   terms of number of messages>            between the consumer and producer    
15                                           (default: 10000)                     
16 --whitelist <Java regex (String)>       Whitelist of topics to mirror.

图片 5

此出五只注意事项:

1.2 信息队列

要是只有问题1 ,则还较好处理。网上广大描述 invoke
之类的函数。尽管劳累,也还算是在可控范围。

2.1 启动Zookeeper

测试时得利用Kafka附带的Zookeeper:

启航: ./bin/zookeeper-server-start.sh config/zookeeper.properties
& ,config/zookeeper.properties是Zookeeper的布置文件。

结束: ./bin/zookeeper-server-stop.sh 

可是最好好温馨多建筑一个Zookeeper集群,提高可用性和可靠性。详见:Zookeeper的安装及应用——MarchOn

对问题2/3,
则免便于处理。比如,我之先后是批量复制文件,在自的支付电脑上经过测试,每复制
10 只公文,更新一下界面,看起比好。程序即便这么形容了。

5.1 安装

需要从Github下载源码并安装sbt工具编译生成安装包,生成的小运大长且不知缘何一直出错,所以这里用网友业已编译好的保 (咸份链接)。

包为kafka-manager-1.0-SNAPSHOT.zip

>解压:

 unzip kafka-manager-1.0-SNAPSHOT.zip 

>配置conf/application.conf里的kafka-manager.zkhosts:

 kafka-manager.zkhosts=”192.168.6.131:2181,192.168.6.132:2181,192.168.6.133:2181″ 

>启动:

 ./bin/kafka-manager
-Dconfig.file=conf/application.conf (启动后以Zookeeper根目录下可发现扩张了kafka-manager目录)

默认是9000端口,要动任何端口可以当命令行指定http.port,此外kafka-manager.zkhosts也可以命令行指定,如:

 ./bin/kafka-manager -Dhttp.port=9001
-Dkafka-manager.zkhosts=”192.168.6.131:2181,192.168.6.132:2181,192.168.6.133:2181″ 

解决办法是:

2.2 启动Kafka服务器

    public static string RunningDetailStepStatus;

5.2 使用

访问web页面,在Cluster->Add
Cluster,输入而监督之Kafka集群的Zookeeper即可。

全局变量类设计,可以呢如此的花样:

2. 安以及用

以kafka_2.11-0.10.0.0为例。

下载解压后,进入kafka_2.11-0.10.0.0/

 

2.2.3 Kafka在Zookeeper中之蕴藏结构

假若上述的zookeeper.connect的值没有门路,则也清路径,启动Zookeeper和Kafka,命令执行连接Zookeeper后,用
get /
命令可窥见发 consumers、config、controller、admin、brokers、zookeeper、controller_epoch
这几单目录。

夫结构如下:(具体而参照:apache
kafka连串之以zookeeper中储存结构

图片 6

 

 

Kafka是千篇一律种植胜似吞吐量的分布式发表订阅的信队列系统,原本开发自LinkedIn,用作LinkedIn的活动流(ActivityStream)和营业数量处理管道(Pipeline)的根基。现在它们曾为多家不同品类的信用社作为多体系型的多少管道和信息网运用。

    public static int RunningPercent;

5 Kafka集群管理工具(Kafka Manager)

kafka-manager是yahoo开源出来的色,属于商业级别之家伙用Scala编写。

是管理工具可以好容易地觉察布于汇聚众多中之焉topic分布不备匀,或者是分区在合集群分布不咸匀的的意况。它补助管理几近只集群、拔取副本、副本重新分配以及开创Topic。同时,这么些管理工具也是一个坏好之好飞快浏览这么些集群的工具。

其一工具为集群的法子运行,需要Zookeeper。

参考资料:http://hengyunabc.github.io/kafka-manager-install/

    public static string RunningMainStepStatus;

2.3.1 命令行客户端

创建topic: bin/kafka-topics.sh  –create
 –zookeeper  localhost:2181  –replication-factor 1  –partitions  1
 –topic test 

列有有topic: bin/kafka-topics.sh –list
–zookeeper localhost:2181 

查topic信息(包括分区、副本情状等): kafka-topics.sh –describe –zookeeper
localhost:2181 –topic
my-replicated-topic ,会列有分区数、副本数、副本leader节点、副本节点、活在的副本节点

朝有topic生产消息: bin/kafka-console-producer.sh –broker-list
localhost:9092 –topic test 

起某topic消费音信: bin/kafka-console-consumer.sh –zookeeper
localhost:2181 –topic test
–from-beginning (默认用一个线程消费指定topic的有分区的数额)

去除某个Kafka
groupid:连接Zookeeper后之所以rmr命令,如去名吧JSI的消费组: rmr /consumers/JSI 

查阅消费进度:

./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group test-mirror-consumer-zsm --zkconnect ec2-12345.cn-north-1.compute.amazonaws.com.cn:2181/kafka/blink/0822 --topic GPS2
    各参数:
    --group指MirrorMaker消费源集群时指定的group.id
    -zkconnect指源集群的zookeeper地址
    --topic指定查的topic,没指定则返回所有topic的消费情况

a.
多线程对同一个变量的读写,固然不加以特别控制,是爆发一些缓存、延迟的。也就是说,一个线程对变量的转移,并不一定会为外一个线程读到。举例来说,工作线程先更新就速度为
5%, 然后更新也 10%,这时候 timer 去读取进度,有或读到 5%,下次才会念到
10%。可是那一点对大家的体现程序逻辑,不做多不行影响。

1 Kafka音信队列简介

 

2.2.1 配置文件

安排config/server.properties文件,一般需要配备如下字段,其他遵照默认即可:

图片 7

broker.id:          每一个broker在集群中的唯一表示,要求是正数
listeners(效果同之前的版本的host.name及port):注意绑定host.name,否则可能出现莫名其妙的错误如consumer找不到broker。这个host.name是Kafka的server的机器名字,会注册到Zookeeper中
log.dirs:           kafka数据的存放地址,多个地址的话用逗号分割,多个目录分布在不同磁盘上可以提高读写性能
log.retention.hours:    数据文件保留多长时间, 存储的最大时间超过这个时间会根据log.cleanup.policy设置数据清除策略
zookeeper.connect:     指定ZooKeeper的connect string,以hostname:port的形式,可有多个以逗号分隔,如hostname1:port1,hostname2:port2,hostname3:port3,还可有路径,如:hostname1:port1,hostname2:port2,hostname3:port3/kafka,注意要事先在zk中创建/kafka节点,否则会报出错误:java.lang.IllegalArgumentException: Path length must be > 0

图片 8

持有参数的意思和安排可参照:http://orchome.com/12http://blog.csdn.net/lizhitao/article/details/25667831

 一个配置示范如下:

图片 9

图片 10

  1 # Licensed to the Apache Software Foundation (ASF) under one or more
  2 # contributor license agreements.  See the NOTICE file distributed with
  3 # this work for additional information regarding copyright ownership.
  4 # The ASF licenses this file to You under the Apache License, Version 2.0
  5 # (the "License"); you may not use this file except in compliance with
  6 # the License.  You may obtain a copy of the License at
  7 #
  8 #    http://www.apache.org/licenses/LICENSE-2.0
  9 #
 10 # Unless required by applicable law or agreed to in writing, software
 11 # distributed under the License is distributed on an "AS IS" BASIS,
 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13 # See the License for the specific language governing permissions and
 14 # limitations under the License.
 15 # see kafka.server.KafkaConfig for additional details and defaults
 16 
 17 ############################# Server Basics #############################
 18 
 19 # The id of the broker. This must be set to a unique integer for each broker.
 20 broker.id=1
 21 
 22 ############################# Socket Server Settings #############################
 23 
 24 # The address the socket server listens on. It will get the value returned from 
 25 # java.net.InetAddress.getCanonicalHostName() if not configured.
 26 #   FORMAT:
 27 #     listeners = security_protocol://host_name:port
 28 #   EXAMPLE:
 29 #     listeners = PLAINTEXT://your.host.name:9092
 30 listeners=PLAINTEXT://192.168.6.128:9092
 31 
 32 # Hostname and port the broker will advertise to producers and consumers. If not set, 
 33 # it uses the value for "listeners" if configured.  Otherwise, it will use the value
 34 # returned from java.net.InetAddress.getCanonicalHostName().
 35 #advertised.listeners=PLAINTEXT://your.host.name:9092
 36 
 37 # The number of threads handling network requests
 38 num.network.threads=3
 39 
 40 # The number of threads doing disk I/O
 41 num.io.threads=8
 42 
 43 # The send buffer (SO_SNDBUF) used by the socket server
 44 socket.send.buffer.bytes=102400
 45 
 46 # The receive buffer (SO_RCVBUF) used by the socket server
 47 socket.receive.buffer.bytes=102400
 48 
 49 # The maximum size of a request that the socket server will accept (protection against OOM)
 50 socket.request.max.bytes=104857600
 51 
 52 
 53 ############################# Log Basics #############################
 54 
 55 # A comma seperated list of directories under which to store log files
 56 log.dirs=/usr/local/kafka/kafka_2.11-0.10.0.0/kfk_data/
 57 
 58 # The default number of log partitions per topic. More partitions allow greater
 59 # parallelism for consumption, but this will also result in more files across
 60 # the brokers.
 61 num.partitions=2
 62 auto.create.topics.enable=false
 63 
 64 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
 65 # This value is recommended to be increased for installations with data dirs located in RAID array.
 66 num.recovery.threads.per.data.dir=1
 67 
 68 ############################# Log Flush Policy #############################
 69 
 70 # Messages are immediately written to the filesystem but by default we only fsync() to sync
 71 # the OS cache lazily. The following configurations control the flush of data to disk.
 72 # There are a few important trade-offs here:
 73 #    1. Durability: Unflushed data may be lost if you are not using replication.
 74 #    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
 75 #    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
 76 # The settings below allow one to configure the flush policy to flush data after a period of time or
 77 # every N messages (or both). This can be done globally and overridden on a per-topic basis.
 78 
 79 # The number of messages to accept before forcing a flush of data to disk
 80 #log.flush.interval.messages=10000
 81 
 82 # The maximum amount of time a message can sit in a log before we force a flush
 83 #log.flush.interval.ms=1000
 84 
 85 ############################# Log Retention Policy #############################
 86 
 87 # The following configurations control the disposal of log segments. The policy can
 88 # be set to delete segments after a period of time, or after a given size has accumulated.
 89 # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
 90 # from the end of the log.
 91 
 92 # The minimum age of a log file to be eligible for deletion
 93 log.retention.hours=4
 94 
 95 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
 96 # segments don't drop below log.retention.bytes.
 97 #log.retention.bytes=1073741824
 98 
 99 # The maximum size of a log segment file. When this size is reached a new log segment will be created.
100 log.segment.bytes=1073741824
101 
102 # The interval at which log segments are checked to see if they can be deleted according
103 # to the retention policies
104 log.retention.check.interval.ms=300000
105 
106 ############################# Zookeeper #############################
107 
108 # Zookeeper connection string (see zookeeper docs for details).
109 # This is a comma separated host:port pairs, each corresponding to a zk
110 # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
111 # You can also append an optional chroot string to the urls to specify the
112 # root directory for all kafka znodes.
113 zookeeper.connect=192.168.6.131:2181,192.168.6.132:2181,192.168.6.133:2181
114 
115 # Timeout in ms for connecting to zookeeper
116 zookeeper.connection.timeout.ms=6000

图片 11

注意auto.create.topics.enable字段,若否true则如若producer写副某个不存在的topic时会晤活动创建该topic,若否false则要先创制否则会报错:failed
after 3 retries。

public class GlobalVars
{

6 进阶

  • 每当现阶段的kafka版本实现中,对于zookeeper的拥有操作都是由kafka
    controller来就的(serially的办法)
  • offset管理:kafka会记录offset到zk中。可是,zk client
    api对zk的累累写副是一个不算的操作。0.8.2 kafka引入了native offset
    storage,将offset管理起zk移有,并且可做到水平扩张。其规律就是接纳了kafka的compacted
    topic,offset因consumer
    group,topic与partion的重组作为key直接交给至compacted
    topic中。同时Kafka又以内存中敬服了两头版组来保安最新的offset信息,consumer来取最新offset音信日常直打内存拿即可。当然,kafka允许而神速checkpoint最新的offset消息到磁盘上。
  • 哪些规定分区数:分区数之确定及硬件、软件、负载情状十分还有关,要看到具体情状而定,不过仍可遵守一定的步子来尝试确定分区数:创立一个独自发1单分区的topic,然后测试者topic的producer吞吐量和consumer吞吐量。尽管它们的价值分别是Tp和Tc,单位是MB/s。然后假而总的靶子吞吐量是Tt,那么分区数
    =  Tt / max(Tp, Tc)

 

引用:https://www.cnblogs.com/z-sm/p/5691760.html

 

1.2.2 音信格式

  1. 一个topic对许同种音信格式,由此音信用topic分类
  2. 一个topic代表的信发出1单或五只patition(s)组成
  3. 一个partition中
    • 一个partition应该存放于一如既往到几近个server上
      • 一经只是生一个server,就无冗余备份,是单机而未是集群
      • 假如有两只server
        • 一个server为leader,其他servers为followers;leader需要经受读写请求;followers仅作冗余备份;leader出现故障,会自动选举一个follower作为leader,保证服务不间断;每个server都或去一些partitions的leader和其他partitions的follower角色,这样满集群就会面高达负载均衡的效益
    • 音讯据梯次存放,顺序不可变
    • 不得不加音讯,不能插入
    • 每个音讯还发出一个offset,用作新闻ID, 在一个partition中唯一
    • offset有consumer保存和保管,因而读取顺序实际上是全然有consumer决定的,不自然是线性的
    • 音讯发生逾期日期,过期则去
  1. 后台线程一般无法一贯操作界面控件,需要调用 invoke 之类的函数;

  2. 后台线程更新界面的频次不克顶慢,太慢则也容易为用户认为程序“死掉了”;

3.2 启动

./bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config  zsmSourceClusterConsumer.config  --num.streams 2 --producer.config zsmTargetClusterProducer.config --whitelist="ds*"
    --consumer.config所指定的文件里至少需要有zookeeper.connect、group.id两字段
    --producer.config至少需要有metadata.broker.list字段,指定目标集群的brooker列表
    --whitelist指定要同步的topic

足就此2.3.1所说之查阅消费进度来查阅对原集群的同步意况(即花费现象)。

 

————-转载请表明来源:http://www.cnblogs.com/jacklondon

3 MirrorMaker

Kafka自身提供的MirrorMaker工具用于把一个集群的数据并到外一集群,其规律就是是本着源集群消费、对目的集群生产。

运作时用指定源集群的Zookeeper地址(pull情势)或目标集群的Broker列表(push情势)。

————-欢迎我们下载试用力克单点登录系统, http://zheguisoft.com

2.3.2 Java客户端

1、Topic操作:

图片 12

图片 13

 1 import kafka.admin.DeleteTopicCommand;
 2 import kafka.admin.TopicCommand;
 3 
 4 /**
 5  * @author zsm
 6  * @date 2016年9月27日 上午10:26:42
 7  * @version 1.0
 8  * @parameter
 9  * @since
10  * @return
11  */
12 public class JTopic {
13     public static void createTopic(String zkAddr, String topicName, int partition, int replication) {
14         String[] options = new String[] { "--create", "--zookeeper", zkAddr, "--topic", topicName, "--partitions",
15                 partition + "", "--replication-factor", replication + "" };
16         TopicCommand.main(options);
17     }
18 
19     public static void listTopic(String zkAddr) {
20         String[] options = new String[] { "--list", "--zookeeper", zkAddr };
21         TopicCommand.main(options);
22     }
23 
24     public static void describeTopic(String zkAddr, String topicName) {
25         String[] options = new String[] { "--describe", "--zookeeper", zkAddr, "--topic", topicName, };
26         TopicCommand.main(options);
27     }
28 
29     public static void alterTopic(String zkAddr, String topicName) {
30         String[] options = new String[] { "--alter", "--zookeeper", zkAddr, "--topic", topicName, "--partitions", "5" };
31         TopicCommand.main(options);
32     }
33 
34     // 通过删除zk里面对应的路径来实现删除topic的功能,只会删除zk里面的信息,Kafka上真实的数据并没有删除
35     public static void deleteTopic(String zkAddr, String topicName) {
36         String[] options = new String[] { "--zookeeper", zkAddr, "--topic", topicName };
37         DeleteTopicCommand.main(options);
38     }
39 
40     public static void main(String[] args) {
41         // TODO Auto-generated method stub
42 
43         String myTestTopic = "ZsmTestTopic";
44         int myPartition = 4;
45         int myreplication = 1;
46 
47         //createTopic(ConfigureAPI.KafkaProperties.ZK, myTestTopic, myPartition, myreplication);
48         // listTopic(ConfigureAPI.KafkaProperties.ZK);
49         describeTopic(ConfigureAPI.KafkaProperties.ZK, myTestTopic);
50         // alterTopic(ConfigureAPI.KafkaProperties.ZK, myTestTopic);
51         // deleteTopic(ConfigureAPI.KafkaProperties.ZK, myTestTopic);
52     }
53 
54 }

图片 14

2、写:(写时可以指定key以供Kafka依据key将数据写入某个分区,若无指定,则几就是随机找一个分区发送无key的音信,然后将这分区号进入到缓存中为备前面平昔运用——当然,Kafka本身吗会面清空该缓存(默认每10分钟要每一趟要topic元数据时))

图片 15

图片 16

  1 package com.zsm.kfkdemo;
  2 
  3 import java.util.ArrayList;
  4 import java.util.List;
  5 import java.util.Properties;
  6 
  7 import com.zsm.kfkdemo.ConfigureAPI.KafkaProperties;
  8 
  9 import kafka.javaapi.producer.Producer;
 10 import kafka.producer.KeyedMessage;
 11 import kafka.producer.ProducerConfig;
 12 
 13 /**
 14  * 可以指定规则(key和分区函数)以让消息写到特定分区:
 15  * <p>
 16  * 1、若发送的消息没有指定key则Kafka会随机选择一个分区
 17  * </p>
 18  * <p>
 19  * 2、否则,若指定了分区函数(通过partitioner.class)则该函数以key为参数确定写到哪个分区
 20  * </p>
 21  * <p>
 22  * 3、否则,Kafka根据hash(key)%partitionNum确定写到哪个分区
 23  * </p>
 24  * 
 25  * @author zsm
 26  * @date 2016年9月27日 上午10:26:42
 27  * @version 1.0
 28  * @parameter
 29  * @since
 30  * @return
 31  */
 32 public class JProducer extends Thread {
 33     private Producer<String, String> producer;
 34     private String topic;
 35     private final int SLEEP = 10;
 36     private final int msgNum = 1000;
 37 
 38     public JProducer(String topic) {
 39         Properties props = new Properties();
 40         props.put("metadata.broker.list", KafkaProperties.BROKER_LIST);// 如192.168.6.127:9092,192.168.6.128:9092
 41         // request.required.acks
 42         // 0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7). This option provides the lowest latency but the weakest durability guarantees
 43         // (some data will be lost when a server fails).
 44         // 1, which means that the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server
 45         // acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost).
 46         // -1, which means that the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be
 47         // lost as long as at least one in sync replica remains.
 48         props.put("request.required.acks", "-1");
 49         // 配置value的序列化类
 50         props.put("serializer.class", "kafka.serializer.StringEncoder");
 51         // 配置key的序列化类
 52         props.put("key.serializer.class", "kafka.serializer.StringEncoder");
 53         // 提供自定义的分区函数将消息写到分区上,未指定的话Kafka根据hash(messageKey)%partitionNum确定写到哪个分区
 54         props.put("partitioner.class", "com.zsm.kfkdemo.MyPartitioner");
 55         producer = new Producer<String, String>(new ProducerConfig(props));
 56         this.topic = topic;
 57     }
 58 
 59     @Override
 60     public void run() {
 61         boolean isBatchWriteMode = true;
 62         System.out.println("isBatchWriteMode: " + isBatchWriteMode);
 63         if (isBatchWriteMode) {
 64             // 批量发送
 65             int batchSize = 100;
 66             List<KeyedMessage<String, String>> msgList = new ArrayList<KeyedMessage<String, String>>(batchSize);
 67             for (int i = 0; i < msgNum; i++) {
 68                 String msg = "Message_" + i;
 69                 msgList.add(new KeyedMessage<String, String>(topic, i + "", msg));
 70                 // msgList.add(new KeyedMessage<String, String>(topic, msg));//未指定key,Kafka会自动选择一个分区
 71                 if (i % batchSize == 0) {
 72                     producer.send(msgList);
 73                     System.out.println("Send->[" + msgList + "]");
 74                     msgList.clear();
 75                     try {
 76                         sleep(SLEEP);
 77                     } catch (Exception ex) {
 78                         ex.printStackTrace();
 79                     }
 80                 }
 81             }
 82             producer.send(msgList);
 83         } else {
 84             // 单个发送
 85             for (int i = 0; i < msgNum; i++) {
 86                 KeyedMessage<String, String> msg = new KeyedMessage<String, String>(topic, i + "", "Message_" + i);
 87                 // KeyedMessage<String, String> msg = new KeyedMessage<String, String>(topic, "Message_" + i);//未指定key,Kafka会自动选择一个分区
 88                 producer.send(msg);
 89                 System.out.println("Send->[" + msg + "]");
 90                 try {
 91                     sleep(SLEEP);
 92                 } catch (Exception ex) {
 93                     ex.printStackTrace();
 94                 }
 95             }
 96         }
 97 
 98         System.out.println("send done");
 99     }
100 
101     public static void main(String[] args) {
102         JProducer pro = new JProducer(KafkaProperties.TOPIC);
103         pro.start();
104     }
105 }

图片 17

3、读:(对于Consumer,需要留意 auto.commit.enable 和 auto.offset.reset 那简单只字段)

图片 18

图片 19

 1 package com.zsm.kfkdemo;
 2 
 3 import java.text.MessageFormat;
 4 import java.util.HashMap;
 5 import java.util.List;
 6 import java.util.Map;
 7 import java.util.Properties;
 8 
 9 import com.zsm.kfkdemo.ConfigureAPI.KafkaProperties;
10 
11 import kafka.consumer.Consumer;
12 import kafka.consumer.ConsumerConfig;
13 import kafka.consumer.ConsumerIterator;
14 import kafka.consumer.KafkaStream;
15 import kafka.javaapi.consumer.ConsumerConnector;
16 import kafka.message.MessageAndMetadata;
17 
18 /**
19  * 同一consumer group的多线程消费可以两种方法实现:
20  * <p>
21  * 1、实现单线程客户端,启动多个去消费
22  * </p>
23  * <p>
24  * 2、在客户端的createMessageStreams里为topic指定大于1的线程数,再启动多个线程处理每个stream
25  * </p>
26  * 
27  * @author zsm
28  * @date 2016年9月27日 上午10:26:42
29  * @version 1.0
30  * @parameter
31  * @since
32  * @return
33  */
34 public class JConsumer extends Thread {
35 
36     private ConsumerConnector consumer;
37     private String topic;
38     private final int SLEEP = 20;
39 
40     public JConsumer(String topic) {
41         consumer = Consumer.createJavaConsumerConnector(this.consumerConfig());
42         this.topic = topic;
43     }
44 
45     private ConsumerConfig consumerConfig() {
46         Properties props = new Properties();
47         props.put("zookeeper.connect", KafkaProperties.ZK);
48         props.put("group.id", KafkaProperties.GROUP_ID);
49         props.put("auto.commit.enable", "true");// 默认为true,让consumer定期commit offset,zookeeper会将offset持久化,否则只在内存,若故障则再消费时会从最后一次保存的offset开始
50         props.put("auto.commit.interval.ms", KafkaProperties.INTERVAL + "");// 经过INTERVAL时间提交一次offset
51         props.put("auto.offset.reset", "largest");// What to do when there is no initial offset in ZooKeeper or if an offset is out of range
52         props.put("zookeeper.session.timeout.ms", KafkaProperties.TIMEOUT + "");
53         props.put("zookeeper.sync.time.ms", "200");
54         return new ConsumerConfig(props);
55     }
56 
57     @Override
58     public void run() {
59         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
60         topicCountMap.put(topic, new Integer(1));// 线程数
61         Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumer.createMessageStreams(topicCountMap);
62         KafkaStream<byte[], byte[]> stream = streams.get(topic).get(0);// 若上面设了多个线程去消费,则这里需为每个stream开个线程做如下的处理
63 
64         ConsumerIterator<byte[], byte[]> it = stream.iterator();
65         MessageAndMetadata<byte[], byte[]> messageAndMetaData = null;
66         while (it.hasNext()) {
67             messageAndMetaData = it.next();
68             System.out.println(MessageFormat.format("Receive->[ message:{0} , key:{1} , partition:{2} , offset:{3} ]",
69                     new String(messageAndMetaData.message()), new String(messageAndMetaData.key()),
70                     messageAndMetaData.partition() + "", messageAndMetaData.offset() + ""));
71             try {
72                 sleep(SLEEP);
73             } catch (Exception ex) {
74                 ex.printStackTrace();
75             }
76         }
77     }
78 
79     public static void main(String[] args) {
80         JConsumer con = new JConsumer(KafkaProperties.TOPIC);
81         con.start();
82     }
83 }

图片 20

与Kafka相关的Maven依赖:

图片 21

图片 22

 1         <dependency>
 2             <groupId>org.apache.kafka</groupId>
 3             <artifactId>kafka_2.9.2</artifactId>
 4             <version>0.8.1.1</version>
 5             <exclusions>
 6                 <exclusion>
 7                     <groupId>com.sun.jmx</groupId>
 8                     <artifactId>jmxri</artifactId>
 9                 </exclusion>
10                 <exclusion>
11                     <groupId>com.sun.jdmk</groupId>
12                     <artifactId>jmxtools</artifactId>
13                 </exclusion>
14                 <exclusion>
15                     <groupId>javax.jms</groupId>
16                     <artifactId>jms</artifactId>
17                 </exclusion>
18             </exclusions>
19         </dependency>

图片 23

 

}

1.1 基本术语

  • Broker

    Kafka集群包含一个依然七只服务器,这种服务器被称呼broker[5] 

  • Topic

    各国条公布到Kafka集群的信息还爆发一个型,这么些路吃名Topic。(物理上不同Topic的音信分开储存,逻辑上一个Topic的信尽管保存于一个仍旧五个broker上可用户仅仅需要点名信息的Topic即可生或花数据要无需关心数据存于何处)

  • Partition

    Partition是大体及之概念,每个Topic包含一个要多独Partition.(一般也kafka节点数cpu的毕竟核数)

  • Producer

    当发表音信及Kafka broker

  • Consumer

    信消费者,向Kafka broker读取音信之客户端。

  • Consumer Group

    每个Consumer属于一个特定的Consumer Group(可也每个Consumer指定group
    name,若不指定group name则属于默认的group)。

后台线程更新界面有部分注意事项:

1.2.1 基本特征

  1. 可扩展

    • 每当未需底线的景下开展扩容
    • 数码流分区(partition)存储在三只机械及
  2. 高性能

    • 单个broker就可知服务上千客户端
    • 单个broker每秒种读/写不过直达每秒几百兆字节
    • 差不六个brokers组成的集群将达丰盛大之吞吐力
    • 性能稳定,无论数额差不多很
    • Kafka于底层丢弃了Java堆缓存机制,采纳了操作系统级别之页缓存,同时用轻易写操作改吗各个写,再结Zero-Copy的风味极大地革新了IO性能
  3. 坚持不渝存储

    • 仓储于磁盘上
    • 冗余备份到任何服务器上盖制止丢失

发表评论

电子邮件地址不会被公开。 必填项已用*标注