ActiveMQ入门实例

1.下载ActiveMQ

失掉官方网站下载:http://activemq.apache.org/

该文使用centos6.5 64位  redis3.2.8

2.运行ActiveMQ

解压缩apache-activemq-5.5.1-bin.zip,然后双击apache-activemq-5.5.1\bin\activemq.bat运行ActiveMQ程序。

启动ActiveMQ以后,登陆:http://localhost:8161/admin/,创设一个Queue,命名吧FirstQueue。

一、  redis-cluster架构图

图片 1

 

 

 

集群通信:所有redis节点之间通过PING-PONG机制互相互联,内部采纳二进制鞋子优化传输速度和带富。

集群数据存储(哈希槽):

(1)redis-cluster把拥有的物理节点映射到[0-16383]slot上,cluster
负责珍惜node<->slot<->value
(2)Redis 集群被坐了 16384 只哈希槽,当得在 Redis 集群被放置一个
key-value 时,redis 先对 key 使用 crc16 算法算有一个结出,然后拿结果对
16384 求余数,这样每个 key 都会面相应一个号在 0-16383
之间的哈希槽,redis 会依照节点数量大致均等之将哈希槽映射到不同的节点。

举例表明数据怎样抉择仓储于哪台redis服务器上:

近来生3宝redis服务器的集群:redis Server1(0-5000)  redis
Server1(5001-10000)  redis Server1(10001-16384)

积存数据:set key A

A存储于哪个节点上:

总结A的hash值,例如值为98,98者槽在redis Server1上,所以A应该放置redis
Server1。

节点内怎么确定有节点挂掉:容错机制

图片 2

看清节点是否挂掉:某个节点(棕色节点)给革命节点发送PING命令,但切莫得带PONG回应,于是该节点怀疑革命节点都挂掉,于是他拿此估摸通告受任何节点,其他节点就晤面向革命节点发送PING命令,假设没有PONG回应,则也以为红色节点都挂掉,如果半数以上master节点与master节点通信超越(cluster-node-timeout),认为当下master节点挂掉。

嗬时整个集群不可用(cluster_state:fail):

  a:假如集群任意master挂掉,且当前master没有slave.集群进入fail状态,也得明白成集群的slot映射[0-16383]匪完整时进入fail状态,所以一般大家会呢各种一个master节点做打节点slave 。ps : redis-3.0.0.rc1投入cluster-require-full-coverage参数,默认关闭,打开集群兼容部分失败.

  b:假使集群超越半数以上master挂掉,无论是否暴发slave,集群进入fail状态.

  ps:当集群不可用时,所有对集群的操作做还无可用,收到((error) CLUSTERDOWN The cluster is
down)错误

3.开立Eclipse项目并运行

创建project:ActiveMQ-5.5,并导入apache-activemq-5.5.1\lib目录下欲采纳的jar文件,项目布局要下图所示:

图片 3

 二、搭建redis集群

会见众多中发出五只节点的集群,每个节点有同样主一统。需要6台虚拟机。

在此间我们扩充建筑一个伪分布式的集群,使用6独redis实例来套。

1、集群需要ruby环境:

安装ruby
yum install ruby
yum install rubygems

2.查看redis集群管理工具redis-trib.rb是否留存

[root@localhost src]# pwd
/tools/redis-3.2.8/src
[root@localhost src]# ll *.rb
-rwxrwxr-x. 1 root root 60852 2月  12 23:14 redis-trib.rb
[root@localhost src]# 

3、安装redis-trib.rb脚本运行看重之条件 redis-3.2.2.gem

   安装ruby的包:gem install redis-3.2.2.gem

 4、创建6个redis实例

[root@localhost local]# pwd
/usr/local
[root@localhost local]# mkdir redis-cluster
[root@localhost local]# cp -r redis redis-cluster/redis01  //复制我们安装的单机版redis实例

 //删除复制后底 dump.rdb 文件

  [root@localhost bin]# pwd
  /usr/local/redis-cluster/redis01/bin

 [root@localhost bin]# rm -rf dump.rdb

//修改配置文件 port    与 cluster-enabled

# Accept connections on the specified port, default is 6379 (IANA #815344).
# If port 0 is specified Redis will not listen on a TCP socket.
port 7001

################################ REDIS CLUSTER  ###############################
#
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# WARNING EXPERIMENTAL: Redis Cluster is considered to be stable code, however
# in order to mark it as "mature" we need to wait for a non trivial percentage
# of users to deploy it in production.
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
#
# Normal Redis instances can't be part of a Redis Cluster; only nodes that are
# started as cluster nodes can. In order to start a Redis instance as a
# cluster node enable the cluster support uncommenting the following:
#
 cluster-enabled yes  //开启集群功能

将redis01 复制5份

[root@localhost redis-cluster]# cp -r redis01/ redis02
[root@localhost redis-cluster]# cp -r redis01/ redis03
[root@localhost redis-cluster]# cp -r redis01/ redis04
[root@localhost redis-cluster]# cp -r redis01/ redis05
[root@localhost redis-cluster]# cp -r redis01/ redis06

改他们之端口为 redis02:7002    redis03:7003    redis04:7004  
 redis05:7005    redis06:7006

5、将集群依赖的ruby脚本redis-trib.rb
移动至  /usr/local/redis-cluster 目录下好使用

  [root@localhost src]# pwd
  /tools/redis-3.2.8/src
  [root@localhost src]# cp *.rb /usr/local/redis-cluster

6、启动6各样redis实例 使用脚本启动

[root@localhost redis-cluster]# vim startRedis.sh

cd redis01/bin
./redis-server ../etc/redis.conf
cd ../../
cd redis02/bin
./redis-server ../etc/redis.conf
cd ../../
cd redis03/bin
./redis-server ../etc/redis.conf
cd ../../
cd redis04/bin
./redis-server ../etc/redis.conf
cd ../../
cd redis05/bin
./redis-server ../etc/redis.conf
cd ../../
cd redis06/bin
./redis-server ../etc/redis.conf
cd ../../

授予该文件可尽权 :[root@localhost redis-cluster]# chmod +x
startRedis.sh

7、启动脚本并查阅6各级redis实例是否启动成功

[root@localhost redis-cluster]# ./startRedis.sh 
[root@localhost redis-cluster]# ps -ef|grep redis
root      1634     1  0 17:11 ?        00:00:00 ./redis-server 0.0.0.0:7001 [cluster]
root      1637     1  0 17:11 ?        00:00:00 ./redis-server 0.0.0.0:7003 [cluster]
root      1644     1  0 17:11 ?        00:00:00 ./redis-server 0.0.0.0:7005 [cluster]
root      1939     1  0 17:15 ?        00:00:00 ./redis-server 0.0.0.0:7002 [cluster]
root      1943     1  0 17:15 ?        00:00:00 ./redis-server 0.0.0.0:7004 [cluster]
root      1951     1  0 17:15 ?        00:00:00 ./redis-server 0.0.0.0:7006 [cluster]
root      1955  1463  0 17:15 pts/0    00:00:00 grep redis

8、创制集群

[root@localhost redis-cluster]# ./redis-trib.rb create –replicas 1
192.168.6.190:7001 192.168.6.190:7002 192.168.6.190:7003
192.168.6.190:7004 192.168.6.190:7005 192.168.6.190:7006

[root@localhost redis-cluster]# ./redis-trib.rb create --replicas 1 192.168.6.190:7001 192.168.6.190:7002 192.168.6.190:7003 192.168.6.190:7004 192.168.6.190:7005 192.168.6.190:7006
>>> Creating cluster
>>> Performing hash slots allocation on 6 nodes...
Using 3 masters:
192.168.6.190:7001
192.168.6.190:7002
192.168.6.190:7003
Adding replica 192.168.6.190:7004 to 192.168.6.190:7001
Adding replica 192.168.6.190:7005 to 192.168.6.190:7002
Adding replica 192.168.6.190:7006 to 192.168.6.190:7003
M: 6e8c35d55d3699afe0b10317cf9c199e7df1ae50 192.168.6.190:7001
   slots:0-5460 (5461 slots) master     //slots槽的分配
M: 4cc06e1060369a16f70d4f8097c065bf886162a3 192.168.6.190:7002
   slots:5461-10922 (5462 slots) master
M: 532355022ea3473834726e7512270025b7eddae3 192.168.6.190:7003
   slots:10923-16383 (5461 slots) master
S: de789ed503705e7a5a2e5fc41adb7ed3290de08c 192.168.6.190:7004
   replicates 6e8c35d55d3699afe0b10317cf9c199e7df1ae50
S: c1261800c3a7e74ff3b2fda22b07ef994f7db4a7 192.168.6.190:7005
   replicates 4cc06e1060369a16f70d4f8097c065bf886162a3
S: 07910c0d839e4fcd6139598edd4953fcd32adb29 192.168.6.190:7006
   replicates 532355022ea3473834726e7512270025b7eddae3
Can I set the above configuration? (type 'yes' to accept): yes  输入yes创建集群
>>> Nodes configuration updated
>>> Assign a different config epoch to each node
>>> Sending CLUSTER MEET messages to join the cluster
Waiting for the cluster to join....
>>> Performing Cluster Check (using node 192.168.6.190:7001)
M: 6e8c35d55d3699afe0b10317cf9c199e7df1ae50 192.168.6.190:7001   M:主节点  S:从节点
   slots:0-5460 (5461 slots) master
   1 additional replica(s)
M: 532355022ea3473834726e7512270025b7eddae3 192.168.6.190:7003
   slots:10923-16383 (5461 slots) master
   1 additional replica(s)
S: de789ed503705e7a5a2e5fc41adb7ed3290de08c 192.168.6.190:7004
   slots: (0 slots) slave
   replicates 6e8c35d55d3699afe0b10317cf9c199e7df1ae50
S: c1261800c3a7e74ff3b2fda22b07ef994f7db4a7 192.168.6.190:7005
   slots: (0 slots) slave
   replicates 4cc06e1060369a16f70d4f8097c065bf886162a3
M: 4cc06e1060369a16f70d4f8097c065bf886162a3 192.168.6.190:7002
   slots:5461-10922 (5462 slots) master
   1 additional replica(s)
S: 07910c0d839e4fcd6139598edd4953fcd32adb29 192.168.6.190:7006
   slots: (0 slots) slave
   replicates 532355022ea3473834726e7512270025b7eddae3
[OK] All nodes agree about slots configuration.
>>> Check for open slots...
>>> Check slots coverage...
[OK] All 16384 slots covered.

9、测试集群

使6列redis实例中擅自一个客户端链接集群:

[root@localhost redis-cluster]# redis02/bin/redis-cli -h
192.168.6.190 -p 7002 -c

[root@localhost redis-cluster]# redis02/bin/redis-cli -h 192.168.6.190 -p 7002 -c
192.168.6.190:7002> set age 100
-> Redirected to slot [741] located at 192.168.6.190:7001  
OK

当这里我们登陆了集群的7002redis服务器,age的hash值是741遂便管欠多少存储在了7001节点上,应为该节点的扇范围是
(slots:0-5460)。

10、关闭集群

咱俩创设一个本子文件redisShutdown.sh 

redis01/bin/redis-cli -p 7001 shutdown
redis02/bin/redis-cli -p 7002 shutdown
redis03/bin/redis-cli -p 7003 shutdown
redis04/bin/redis-cli -p 7004 shutdown
redis05/bin/redis-cli -p 7005 shutdown
redis06/bin/redis-cli -p 7006 shutdown

暨是集群搭建了。
ps:单机版redis默认有16独数据库,集众多中只有出一个数据库即0如泣如诉库

 

3.1.Sender.java

图片 4😉

package com.xuwei.activemq;
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;
public class Sender {     private static final int SEND_NUMBER = 5;
    public static void main(String[] args) {         // ConnectionFactory :连接工厂,JMS 用它创建连接         ConnectionFactory connectionFactory;         // Connection :JMS 客户端到JMS Provider 的连接         Connection connection = null;         // Session: 一个发送或接收消息的线程         Session session;         // Destination :消息的目的地;消息发送给谁.         Destination destination;         // MessageProducer:消息发送者         MessageProducer producer;         // TextMessage message;         // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar         connectionFactory = new ActiveMQConnectionFactory(                 ActiveMQConnection.DEFAULT_USER,                 ActiveMQConnection.DEFAULT_PASSWORD,                 "tcp://localhost:61616");         try {             // 构造从工厂得到连接对象             connection = connectionFactory.createConnection();             // 启动             connection.start();             // 获取操作连接             session = connection.createSession(Boolean.TRUE,                     Session.AUTO_ACKNOWLEDGE);             // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置             destination = session.createQueue("FirstQueue");             // 得到消息生成者【发送者】             producer = session.createProducer(destination);             // 设置不持久化,此处学习,实际根据项目决定             producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);             // 构造消息,此处写死,项目就是参数,或者方法获取             sendMessage(session, producer);             session.commit();         } catch (Exception e) {             e.printStackTrace();         } finally {             try {                 if (null != connection)                     connection.close();             } catch (Throwable ignore) {             }         }     }
    public static void sendMessage(Session session, MessageProducer producer)             throws Exception {         for (int i = 1; i <= SEND_NUMBER; i++) {             TextMessage message = session                     .createTextMessage("ActiveMq 发送的消息" + i);             // 发送消息到目的地方             System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);             producer.send(message);         }     } }

图片 5😉

3.2.Receiver.java

图片 6😉

package com.xuwei.activemq;
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;
public class Receiver {     public static void main(String[] args) {         // ConnectionFactory :连接工厂,JMS 用它创建连接         ConnectionFactory connectionFactory;         // Connection :JMS 客户端到JMS Provider 的连接         Connection connection = null;         // Session: 一个发送或接收消息的线程         Session session;         // Destination :消息的目的地;消息发送给谁.         Destination destination;         // 消费者,消息接收者         MessageConsumer consumer;         connectionFactory = new ActiveMQConnectionFactory(                 ActiveMQConnection.DEFAULT_USER,                 ActiveMQConnection.DEFAULT_PASSWORD,                 "tcp://localhost:61616");         try {             // 构造从工厂得到连接对象             connection = connectionFactory.createConnection();             // 启动             connection.start();             // 获取操作连接             session = connection.createSession(Boolean.FALSE,                     Session.AUTO_ACKNOWLEDGE);             // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置             destination = session.createQueue("FirstQueue");             consumer = session.createConsumer(destination);             while (true) {                 //设置接收者接收消息的时间,为了便于测试,这里谁定为100s                 TextMessage message = (TextMessage) consumer.receive(100000);                 if (null != message) {                     System.out.println("收到消息" + message.getText());                 } else {                     break;                 }             }         } catch (Exception e) {             e.printStackTrace();         } finally {             try {                 if (null != connection)                     connection.close();             } catch (Throwable ignore) {             }         }     } }

图片 7😉

4.注意事项

  1. 最终接收者跟发送者在不同的机械上测试
  2. 品种所引用的jar最后以ActiveMQ下的lib中搜寻,这样不会面面世版本争持。

5.测试过程

坐凡当单机上测试,所以待开两个eclipse,每一个eclipse都生本人的workspace。我们在eclipse1中运作Receiver,在eclipse2中运作Sender。

熟视无睹起eclipse1中运行Receiver将来console介面没有任何信息,在eclipse2中运作Sender未来,eclipse2中的console显示如下信:

发送信息:ActiveMq 发送的信1 殡葬信息:ActiveMq 发送的信2 发送音讯:ActiveMq 发送的音3 发送信息:ActiveMq 发送的信息4 发送音讯:ActiveMq
发送的音讯5

如回到eclipse1中发觉console界面出现如下信:

吸收消息ActiveMq 发送的信息1 收下音讯ActiveMq 发送的信息2 收到音讯ActiveMq 发送的消息3 收到音信ActiveMq 发送的消息4 收到音信ActiveMq
发送的消息5

 PS:2012-2-27

前些天察觉测试并不需要开启六只eclipse,在一个eclipse下页可以启动多独程序,并且有差不四个console,在方的Receiver.java中,设置一个比充分的日子,比如receive(500000),如下代码所示:

TextMessage message = (TextMessage) consumer.receive(500000);

以此时节运行Receiver.java的讲话,会令那么些Receiver.java一直运转500秒,在eclipse中可窥见:

图片 8

点击这个棕色方块可以手动结束运作程序。

运行玩receiver未来我们当运作sender,在运作完sender将来,我们设切换至receiver的console,如下图所示:

图片 9

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注