Akka(10): 分布式运算:集群-Cluster【澳门美高梅手机网站】

 
 Akka-Cluster得以在一部物理机或一组网络连接的服务器上搭建计划。用Akka开发同一版本的分布式程序可以在其它硬件环境中运行,那样大家就可以规定以Akka分布式程序作为规范的编程格局了。

品牌机一般预装了正版Windows(单买需求¥888),甚至还有正版Office(单买需求¥749),价值不菲。但也预装了一些“流氓”软件,导致开机后半天无法动,所以自己重装很有需要。或者换固态硬盘了、中毒了,也亟需重装。那时候怎么样保险预装的正版Win10和Office2016不丢啊?

 
 在上头两篇切磋里大家介绍了Akka-Remoting。Akka-Remoting其实是一种ActorSystem之间Actor对Actor点对点的联系协商。通过Akka-Remoting来贯彻一个ActorSystem中的一个Actor与另一个Actorsystem中的另一个Actor之间的牵连。在Remoting成效之后,Akka又升高了集群Cluster功用。Akka-Cluster是按照Akka-Remoting之上的新一代分布式运算环境,所以Remoting已经变为了Akka-Cluster的内部帮忙成效,在生育条件中的分布式运算应该尽量使用Akka-Cluster。当然,人们仍是可以在念书和测试环境中利用Akka-Remoting来询问Akka的分布式运算机制和原理。Remoting和Cluster的明明分别之一就是的确完结了Actor的地方透明化。让编程职员可以更自在自然的贯彻分布式编程。当然,更首要的是对峙Akka-Remoting而言,Akka-Cluster提供了一整套更安全、更连忙的分布式运算环境。

答案就是:在重装从前,查出预装的正版种类号,然后下载到相同版本的连串和软件,重装,输入系列号即可(半数以上情况不用输入)。

概括的话Akka-Cluster将三个JVM连接整合起来,完毕新闻地址的透明化和统一化使用管理,集成一体化的消息使得系统。最后目的是可以把一个重型程序分割成多少个子程序,然后安排到很多JVM上去完毕程序的分布式并行运算。更要紧的是:Cluster的构建进程与Actor编程没有牵涉,当Cluster把多少个ActorSystem集合成一个联合系统后,我们得以用在单一ActorSystem里编程的习惯格局编写分布式运算程序。由于在单一机器上就足以配备七个节点形成一个集群,我们开发的分布式程序可以在单机或多机群上运行,差别的只是什么陈设和配置集群环境。

查看系统版本

右键点击初始菜单——系统,能看出系统版本。可以观望此电脑预装的是64位的“Windows
10 家庭汉语版”,那也是绝一大半台式机预装的本子。

2016-11-13.png

俺们第一来认识一些有关Akka-Cluster的底蕴概念:

查看Win10序列号

本机的Win10的种类号很简单查出来,按“Win”+
“R”,运行powershell,然后实施以下命令:

(Get-WmiObject -query 'select * from SoftwareLicensingService').OA3xOriginalProductKey

win10 命令查看体系号

把下边查到的队列号记下来(用手机拍下来即可),比如“QWERT-ASDFG-ZXCVB-QWERT-ASDFG”。

Node:集群节点,也得以说是意味一个单独的ActorSystem,用hostname:port来代表。一部物理机械上可以构建多少个集群节点Node,那时它们有着同样的hostname和不一致的port,在差别机器上的Node则足以行使分化的hostname和千篇一律的port。

查看Office版本

打开控制面板——程序和功力,能来看预装的是Office
365,而统计机上的贴纸明明写着是“Office
2016家园与和学生版”,没提到,激活将来就变了。

控制面板-程序和效益 里面能来看预装的office

Cluster:由八个节点Node作为集群成员通过一种集群社团协议形成集群的一个完全。

联网激活Office

在桌面右键新建一个Word文档,打开相会到如下界面:

激活office

激活office-向某个帐户添加此密钥

把这些密钥记下来以防万一。然后继续“联机兑换”。

联机兑换-1、登录

报到你的微软帐户,如果没有的话,一定注册一个(打不开Gmail的景色下,微软信箱是个科学的选择)。

一头兑换-2、确认帐户设置

同步兑换-3、获取office

同台兑换-3、office已未雨绸缪妥当

在网站上兑换完结,回到Office软件达成激活,输入刚才的微软帐户,然后要等待很久,等Office自动安装落成即可。

形成激活

激活office

分选你的产品

帐户已履新

关于word-激活后

此刻再刷新“程序和成效”,就会发现“Office 365”消失了,而产出了“Office
2016家庭和学生版”。通过这么些进程可以窥见,Office连串号已经绑定到村办的微软账号里,所以重装系统也不会丢了,刚才记的行列号只是以防万一,用不上了。

警告:
若是新买的微机却一筹莫展激活Office,表明或者是别人退货的,或者集团把这一个Office系列号抄了拿去卖了,请尽早联系店家。买电脑一定要在:京东自营、苏宁自营、亚马逊(亚马逊)自营、Tmall品牌旗舰店、京东品牌旗舰店、苏宁品牌旗舰店,不要在电脑城或京东第三方/天猫/天猫商城小店,否则出现那种软件纠纷很难维权。

Leader:集群中的某个成员节点Node。由Akka自动在集群成员中选定,负责集群成员生命周期状态的现实转换操作。

下载Win10创造启动U盘,重装系统

既然如此知道了预装的是64位Win10家中普通话版,到网上下载微软官方ISO镜像即可。微软官网(microsoft.com)免费提供Win10下载:

微软官网免费提供win10下载

明天电脑CPU都是64位的了,提出系统也用64位的

Win10_1607_China_GGK_Chinese(Simplified)_x64.iso

系统ISO下载完毕,再下载rufus,然后插上U盘,用rufus做成启动U盘。PS:没有U盘的话,花几十块买一个就行了,当心名牌的低劣产品,参考:《最慢的U盘排名榜,请勿购买》

rufus

rufus选择ISO镜像

注意:挑选了ISO镜像之后,上边的选项会变,须要重新选成“用于UEFI统计机的GPT分区方案”。

1.jpg

U盘做好未来,重启电脑,使用急忙键或者修改bios,从U盘启动,一步步重装即可。

开机按bios快捷键,选择从U盘启动

Win10装置界面

Win10装置界面,能够看出驱动器0是新装的固态硬盘,而使得器1里面是出厂带的系统

Win10设置界面,把老系统的分区全体删减,把系统装在固态硬盘上

Win10设置快已毕了

Win10装置到位,进入了桌面

出于主板里集成了正版音讯,一大半场馆下不会冒出输入连串号的界面,重装完毕就会发现早已激活了。刚才记下的Win10连串号用不上了,只是以防万一。

Seed-Node:由一或多个集群中的节点组成。一个节点在加盟集群在此以前先向所有用户指定的Seed-Node发出联系新闻,然后向第一答复的Seed-Node发出插足集群请求。Seed-Node的首要意义是为申请插手集群的节点提供切实的联系地址,毕竟申请参与的节点要求一个现实的地址来发送申请参与音讯,从那个地点来说:Seed-Node可以是集群中其余已知地址的节点。

过来正版Office

重装完系统,打开Office的设置网页setup.office.com,登录你的微软帐号,别点“下一步”,而是点“从Office.com/MyAccount安装”。

setup.office.com 点“安装”

我的Office帐户

默许是32位的,倘诺想装64位的,就点“语言和安装选项——其他装置选项——Office的其余版本”。那是联网下载安装,网速快的话不到1个时辰就好了,借使嫌慢或者想留着下次重装时用,能够下载脱机安装程序(地点在:语言和装置选项——其余设置选项)。安装好,登录你的微软账号,即可激活。

语言和安装选项——点“其余设置选项”

其它设置选项——点“下载脱机安装程序”

微软正是够了,装软件还必要牢记网址……还须求点很多隐身的链接……

当您有耐心看完这么长的作品,推测也发现了Windows这种单机操作系统实在太不易用了,而手机上的iOS/Android互联网操作系统就很粗略。个人提出:考虑给父母长辈买电脑时,假若需求是看视频电视剧、聊微信QQ、玩斗地主等小游戏,则不用买电脑了,买iPad即可(苹果官网9.7英寸最新款不到3000元),简单易用,永不中毒。

Node-Lifecycle-State:一个节点的生命周期里蕴涵以下多少个情景转换:

Joining->Up,Leaving->Exiting,Exiting->Removed,Unreachable->Up,Unreachable->Down,Down->Removed

除此以外,Akka-Cluster通过沟通心跳信号(heart-beat
signal)情势可以监测任何节点是或不是处在不能联络Unreachable状态。

Membership:集群成员协会是透过Gossip沟通协商把多个节点组织起来形成的一个集群全体。

Membership-State:
集群状态,是一个集群内存有节点共享的数据结构,用于存放群内所有节点状态。集群状态是一种CRDT数据结构,提供安全便利的数额统一操作,方便逐步累加型数据统一更新。

Gossip-Protocal:是Node之间的调换协议。集群内的节点分邻里相互通过Gossip沟通更新集群状态数据,稳步扩散交换覆盖全体集群拥有节点并形成完整的集合集群状态数据。

Gossip-Convergence:集群统一情状。当Gossip互换覆盖了集群中有所节点,即具备节点都获得统一的集群状态,就直达集群统一景况Convergence。

Failure-Detector
fd:所有节点都具备心跳信号交流成效。集群中某个节点可能被五个节点用heartbeat检测在线是不是Reachable/Unreachable。假使集群中任何一个节点处于Unreachable状态则整个集群不可以达至Convergence状态。

Leader-Actions:当集群达到Convergence后系统活动选定一个Leader节点举行以上描述的节点状态转换操作。倘使集群内有节点处于Unreachable状态,无法达标集群Convergence,则不能满意任何节点状态转换请求。

在Akka-Cluster中一个节点参加集群是机关的,只要在配置文件里安装一个Seed-Node清单,否则就非得在Actor程序里用Cluster.join或Cluster.joinSeedNodes方法加人:

 /**
   * Try to join this cluster node with the node specified by 'address'.
   * A 'Join(selfAddress)' command is sent to the node to join.
   *
   * An actor system can only join a cluster once. Additional attempts will be ignored.
   * When it has successfully joined it must be restarted to be able to join another
   * cluster or to join the same cluster again.
   *
   * The name of the [[akka.actor.ActorSystem]] must be the same for all members of a
   * cluster.
   */
  def join(address: Address): Unit =
    clusterCore ! ClusterUserAction.JoinTo(fillLocal(address))

  /**
   * Join the specified seed nodes without defining them in config.
   * Especially useful from tests when Addresses are unknown before startup time.
   *
   * An actor system can only join a cluster once. Additional attempts will be ignored.
   * When it has successfully joined it must be restarted to be able to join another
   * cluster or to join the same cluster again.
   */
  def joinSeedNodes(seedNodes: immutable.Seq[Address]): Unit =
    clusterCore ! InternalClusterAction.JoinSeedNodes(seedNodes.toVector.map(fillLocal))

集群节点Leave和Down完毕方式如下: 

/**
    * Send command to issue state transition to LEAVING for the node specified by 'address'.
   * The member will go through the status changes [[MemberStatus]] `Leaving` (not published to
   * subscribers) followed by [[MemberStatus]] `Exiting` and finally [[MemberStatus]] `Removed`.
   *
   * Note that this command can be issued to any member in the cluster, not necessarily the
   * one that is leaving. The cluster extension, but not the actor system or JVM, of the
   * leaving member will be shutdown after the leader has changed status of the member to
   * Exiting. Thereafter the member will be removed from the cluster. Normally this is
   * handled automatically, but in case of network failures during this process it might
   * still be necessary to set the node’s status to Down in order to complete the removal.
   */
  def leave(address: Address): Unit =
    clusterCore ! ClusterUserAction.Leave(fillLocal(address))

  /**
   * Send command to DOWN the node specified by 'address'.
   *
   * When a member is considered by the failure detector to be unreachable the leader is not
   * allowed to perform its duties, such as changing status of new joining members to 'Up'.
   * The status of the unreachable member must be changed to 'Down', which can be done with
   * this method.
   */
  def down(address: Address): Unit =
    clusterCore ! ClusterUserAction.Down(fillLocal(address))

Akka-Cluster的集群节点状态转换可以作为事件在Akka的伊夫ntBus上揭穿:

  /**
   * Marker interface for membership events.
   * Published when the state change is first seen on a node.
   * The state change was performed by the leader when there was
   * convergence on the leader node, i.e. all members had seen previous
   * state.
   */
  sealed trait MemberEvent extends ClusterDomainEvent {
    def member: Member
  }

  /**
   * Member status changed to Joining.
   */
  final case class MemberJoined(member: Member) extends MemberEvent {
    if (member.status != Joining) throw new IllegalArgumentException("Expected Joining status, got: " + member)
  }

  /**
   * Member status changed to WeaklyUp.
   * A joining member can be moved to `WeaklyUp` if convergence
   * cannot be reached, i.e. there are unreachable nodes.
   * It will be moved to `Up` when convergence is reached.
   */
  final case class MemberWeaklyUp(member: Member) extends MemberEvent {
    if (member.status != WeaklyUp) throw new IllegalArgumentException("Expected WeaklyUp status, got: " + member)
  }

  /**
   * Member status changed to Up.
   */
  final case class MemberUp(member: Member) extends MemberEvent {
    if (member.status != Up) throw new IllegalArgumentException("Expected Up status, got: " + member)
  }

  /**
   * Member status changed to Leaving.
   */
  final case class MemberLeft(member: Member) extends MemberEvent {
    if (member.status != Leaving) throw new IllegalArgumentException("Expected Leaving status, got: " + member)
  }

  /**
   * Member status changed to `MemberStatus.Exiting` and will be removed
   * when all members have seen the `Exiting` status.
   */
  final case class MemberExited(member: Member) extends MemberEvent {
    if (member.status != Exiting) throw new IllegalArgumentException("Expected Exiting status, got: " + member)
  }

  /**
   * Member completely removed from the cluster.
   * When `previousStatus` is `MemberStatus.Down` the node was removed
   * after being detected as unreachable and downed.
   * When `previousStatus` is `MemberStatus.Exiting` the node was removed
   * after graceful leaving and exiting.
   */
  final case class MemberRemoved(member: Member, previousStatus: MemberStatus) extends MemberEvent {
    if (member.status != Removed) throw new IllegalArgumentException("Expected Removed status, got: " + member)
  }
  /**
   * Marker interface to facilitate subscription of
   * both [[UnreachableMember]] and [[ReachableMember]].
   */
  sealed trait ReachabilityEvent extends ClusterDomainEvent {
    def member: Member
  }

  /**
   * A member is considered as unreachable by the failure detector.
   */
  final case class UnreachableMember(member: Member) extends ReachabilityEvent

  /**
   * A member is considered as reachable by the failure detector
   * after having been unreachable.
   * @see [[UnreachableMember]]
   */
  final case class ReachableMember(member: Member) extends ReachabilityEvent

集群的眼前情况值是存放在底下CurrentClusterState结构里的: 

/**
   * Current snapshot state of the cluster. Sent to new subscriber.
   */
  final case class CurrentClusterState(
    members:       immutable.SortedSet[Member]  = immutable.SortedSet.empty,
    unreachable:   Set[Member]                  = Set.empty,
    seenBy:        Set[Address]                 = Set.empty,
    leader:        Option[Address]              = None,
    roleLeaderMap: Map[String, Option[Address]] = Map.empty) {

    /**
     * Java API: get current member list.
     */
    def getMembers: java.lang.Iterable[Member] = {
      import scala.collection.JavaConverters._
      members.asJava
    }

    /**
     * Java API: get current unreachable set.
     */
    def getUnreachable: java.util.Set[Member] =
      scala.collection.JavaConverters.setAsJavaSetConverter(unreachable).asJava

    /**
     * Java API: get current “seen-by” set.
     */
    def getSeenBy: java.util.Set[Address] =
      scala.collection.JavaConverters.setAsJavaSetConverter(seenBy).asJava

    /**
     * Java API: get address of current leader, or null if none
     */
    def getLeader: Address = leader orNull

    /**
     * All node roles in the cluster
     */
    def allRoles: Set[String] = roleLeaderMap.keySet

    /**
     * Java API: All node roles in the cluster
     */
    def getAllRoles: java.util.Set[String] =
      scala.collection.JavaConverters.setAsJavaSetConverter(allRoles).asJava

    /**
     * get address of current leader, if any, within the role set
     */
    def roleLeader(role: String): Option[Address] = roleLeaderMap.getOrElse(role, None)

    /**
     * Java API: get address of current leader within the role set,
     * or null if no node with that role
     */
    def getRoleLeader(role: String): Address = roleLeaderMap.get(role).flatten.orNull
  }

用户可以监听那么些事件的爆发:

cluster.subscribe(self, initialStateMode = InitialStateAsEvents,
  classOf[MemberEvent], classOf[UnreachableMember])

其它,大家仍能用callback形式在情景转换前后调用一些运算来展开准备处理和后来拍卖:

 /**
   * The supplied thunk will be run, once, when current cluster member is `Up`.
   * Typically used together with configuration option `akka.cluster.min-nr-of-members`
   * to defer some action, such as starting actors, until the cluster has reached
   * a certain size.
   */
  def registerOnMemberUp[T](code: ⇒ T): Unit =
    registerOnMemberUp(new Runnable { def run() = code })

  /**
   * Java API: The supplied callback will be run, once, when current cluster member is `Up`.
   * Typically used together with configuration option `akka.cluster.min-nr-of-members`
   * to defer some action, such as starting actors, until the cluster has reached
   * a certain size.
   */
  def registerOnMemberUp(callback: Runnable): Unit =
    clusterDaemons ! InternalClusterAction.AddOnMemberUpListener(callback)

  /**
   * The supplied thunk will be run, once, when current cluster member is `Removed`.
   * If the cluster has already been shutdown the thunk will run on the caller thread immediately.
   * Typically used together `cluster.leave(cluster.selfAddress)` and then `system.terminate()`.
   */
  def registerOnMemberRemoved[T](code: ⇒ T): Unit =
    registerOnMemberRemoved(new Runnable { override def run(): Unit = code })

  /**
   * Java API: The supplied thunk will be run, once, when current cluster member is `Removed`.
   * If the cluster has already been shutdown the thunk will run on the caller thread immediately.
   * Typically used together `cluster.leave(cluster.selfAddress)` and then `system.terminate()`.
   */
  def registerOnMemberRemoved(callback: Runnable): Unit = {
    if (_isTerminated.get())
      callback.run()
    else
      clusterDaemons ! InternalClusterAction.AddOnMemberRemovedListener(callback)
  }

下边大家就用个例证来示范Akka-Cluster的周转进度:

率先要求Akka-Cluster的dependency:build.sbt

name := "cluster-states-demo"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= {
  val akkaVersion = "2.5.3"
  Seq(
    "com.typesafe.akka"       %%  "akka-actor"   % akkaVersion,
    "com.typesafe.akka"       %%  "akka-cluster"   % akkaVersion
  )
}

下一场是骨干的配置:cluster.conf

akka {
  actor {
    provider = "cluster"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2551
    }
  }
  cluster {
    seed-nodes = [
      "akka.tcp://clusterSystem@127.0.0.1:2551"]
  }
}

上边是一个集群状态转换事件的监听Actor:

import akka.actor._
import akka.cluster.ClusterEvent._
import akka.cluster._
import com.typesafe.config.ConfigFactory

class EventLisener extends Actor with ActorLogging {
  val cluster = Cluster(context.system)
  override def preStart(): Unit = {
    cluster.subscribe(self,initialStateMode = InitialStateAsEvents
    ,classOf[MemberEvent],classOf[UnreachableMember])  //订阅集群状态转换信息
    super.preStart()
  }

  override def postStop(): Unit = {
    cluster.unsubscribe(self)    //取消订阅
    super.postStop()
  }

  override def receive: Receive = {
    case MemberJoined(member) =>
      log.info("Member is Joining: {}", member.address)
    case MemberUp(member) =>
      log.info("Member is Up: {}", member.address)
    case MemberLeft(member) =>
      log.info("Member is Leaving: {}", member.address)
    case MemberExited(member) =>
      log.info("Member is Exiting: {}", member.address)
    case MemberRemoved(member, previousStatus) =>
      log.info(
        "Member is Removed: {} after {}",
        member.address, previousStatus)
    case UnreachableMember(member) =>
      log.info("Member detected as unreachable: {}", member)
      cluster.down(member.address)      //手工驱除,不用auto-down
    case _: MemberEvent => // ignore
  }

} 

下边是伊夫(Eve)ntListener使用测试代码,扩充了Node加人集群后或者举行的中期设置及退出集群后的之后清理:

object ClusterEventsDemo {
  def main(args: Array[String]): Unit = {
    //重设port,seed-node-address
    val port =
      if (args.isEmpty) "0"
      else args(0)

    val addr =
      if (args.length < 2) "2551"
      else args(1)


    val seednodeSetting = "akka.cluster.seed-nodes = ["+
    "\"akka.tcp://clusterSystem@127.0.0.1:"+
      s"${addr}"+"\"]"


    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port = ${port}")
      .withFallback(ConfigFactory.parseString(seednodeSetting))
      .withFallback(ConfigFactory.load("cluster.conf"))

    val clusterSystem = ActorSystem(name="clusterSystem",config=config)
    val eventListener = clusterSystem.actorOf(Props[EventLisener],"eventListener")

    val cluster = Cluster(clusterSystem)
    cluster.registerOnMemberRemoved(println("Leaving cluster. I should cleanup... "))
    cluster.registerOnMemberUp(println("Hookup to cluster. Do some setups ..."))
    println("actor system started!")
    scala.io.StdIn.readLine()

    clusterSystem.terminate()


  }
}

我们在多少个terminal上用sbt来测试运行:

1、run “2551” “2551”  
//这是个seed-node

[INFO] [06/26/2017 21:25:46.743] [clusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://clusterSystem)] Cluster Node [akka.tcp://clusterSystem@127.0.0.1:2551] - Node [akka.tcp://clusterSystem@127.0.0.1:2551] is JOINING, roles []
[INFO] [06/26/2017 21:25:46.751] [clusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://clusterSystem)] Cluster Node [akka.tcp://clusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://clusterSystem@127.0.0.1:2551] to [Up]
[INFO] [06/26/2017 21:25:46.755] [clusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://clusterSystem@127.0.0.1:2551/user/eventListener] Member is Up: akka.tcp://clusterSystem@127.0.0.1:2551

2、run “0” “2551”    
 //port=0代表由系统活动选取端口

[INFO] [06/26/2017 21:26:57.467] [run-main-1e] [akka.cluster.Cluster(akka://clusterSystem)] Cluster Node [akka.tcp://clusterSystem@127.0.0.1:52459] - Started up successfully
actor system started!
[INFO] [06/26/2017 21:26:57.735] [clusterSystem-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://clusterSystem)] Cluster Node [akka.tcp://clusterSystem@127.0.0.1:52459] - Welcome from [akka.tcp://clusterSystem@127.0.0.1:2551]
[INFO] [06/26/2017 21:26:57.751] [clusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://clusterSystem@127.0.0.1:52459/user/eventListener] Member is Up: akka.tcp://clusterSystem@127.0.0.1:2551
[INFO] [06/26/2017 21:26:57.752] [clusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://clusterSystem@127.0.0.1:52459/user/eventListener] Member is Joining: akka.tcp://clusterSystem@127.0.0.1:52459
[INFO] [06/26/2017 21:26:57.809] [clusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://clusterSystem@127.0.0.1:52459/user/eventListener] Member is Up: akka.tcp://clusterSystem@127.0.0.1:52459

3、run “0” “2551”    
 //port=0代表由系统活动选用端口

[INFO] [06/26/2017 21:28:22.577] [run-main-1] [akka.cluster.Cluster(akka://clusterSystem)] Cluster Node [akka.tcp://clusterSystem@127.0.0.1:52467] - Started up successfully
actor system started!
[INFO] [06/26/2017 21:28:22.736] [clusterSystem-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://clusterSystem)] Cluster Node [akka.tcp://clusterSystem@127.0.0.1:52467] - Welcome from [akka.tcp://clusterSystem@127.0.0.1:2551]
[INFO] [06/26/2017 21:28:22.747] [clusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://clusterSystem@127.0.0.1:52467/user/eventListener] Member is Up: akka.tcp://clusterSystem@127.0.0.1:2551
[INFO] [06/26/2017 21:28:22.749] [clusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://clusterSystem@127.0.0.1:52467/user/eventListener] Member is Up: akka.tcp://clusterSystem@127.0.0.1:52459
[INFO] [06/26/2017 21:28:22.749] [clusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://clusterSystem@127.0.0.1:52467/user/eventListener] Member is Joining: akka.tcp://clusterSystem@127.0.0.1:52467
[INFO] [06/26/2017 21:28:24.611] [clusterSystem-akka.actor.default-dispatcher-22] [akka.tcp://clusterSystem@127.0.0.1:52467/user/eventListener] Member is Up: akka.tcp://clusterSystem@127.0.0.1:52467

在terminal2运算cluster.leave(cluster.selfAddress):

[INFO] [06/26/2017 22:40:47.614] [clusterSystem-akka.actor.default-dispatcher-4] [akka.tcp://clusterSystem@127.0.0.1:2551/user/eventListener] Member is Leaving: akka.tcp://clusterSystem@127.0.0.1:53986
[INFO] [06/26/2017 22:40:48.032] [clusterSystem-akka.actor.default-dispatcher-15] [akka.cluster.Cluster(akka://clusterSystem)] Cluster Node [akka.tcp://clusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://clusterSystem@127.0.0.1:53986] to [Exiting]
[INFO] [06/26/2017 22:40:48.032] [clusterSystem-akka.actor.default-dispatcher-21] [akka.tcp://clusterSystem@127.0.0.1:2551/user/eventListener] Member is Exiting: akka.tcp://clusterSystem@127.0.0.1:53986
[INFO] [06/26/2017 22:40:48.047] [clusterSystem-akka.actor.default-dispatcher-21] [akka.cluster.Cluster(akka://clusterSystem)] Cluster Node [akka.tcp://clusterSystem@127.0.0.1:2551] - Exiting confirmed [akka.tcp://clusterSystem@127.0.0.1:53986]
[INFO] [06/26/2017 22:40:49.033] [clusterSystem-akka.actor.default-dispatcher-15] [akka.cluster.Cluster(akka://clusterSystem)] Cluster Node [akka.tcp://clusterSystem@127.0.0.1:2551] - Leader is removing confirmed Exiting node [akka.tcp://clusterSystem@127.0.0.1:53986]
[INFO] [06/26/2017 22:40:49.033] [clusterSystem-akka.actor.default-dispatcher-21] [akka.tcp://clusterSystem@127.0.0.1:2551/user/eventListener] Member is Removed: akka.tcp://clusterSystem@127.0.0.1:53986 after Exiting

下边就是此次示范的源代码:

resources/cluster.conf

akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }
  cluster {
    seed-nodes = [
      "akka.tcp://clusterSystem@127.0.0.1:2551"]
  }
}

ClusterEventsDemo.scala

 

import akka.actor._
import akka.cluster.ClusterEvent._
import akka.cluster._
import com.typesafe.config.ConfigFactory

class EventLisener extends Actor with ActorLogging {
  val cluster = Cluster(context.system)
  override def preStart(): Unit = {
    cluster.subscribe(self,initialStateMode = InitialStateAsEvents
    ,classOf[MemberEvent],classOf[UnreachableMember])  //订阅集群状态转换信息
    super.preStart()
  }

  override def postStop(): Unit = {
    cluster.unsubscribe(self)    //取消订阅
    super.postStop()
  }

  override def receive: Receive = {
    case MemberJoined(member) =>
      log.info("Member is Joining: {}", member.address)
    case MemberUp(member) =>
      log.info("Member is Up: {}", member.address)
    case MemberLeft(member) =>
      log.info("Member is Leaving: {}", member.address)
    case MemberExited(member) =>
      log.info("Member is Exiting: {}", member.address)
    case MemberRemoved(member, previousStatus) =>
      log.info(
        "Member is Removed: {} after {}",
        member.address, previousStatus)
    case UnreachableMember(member) =>
      log.info("Member detected as unreachable: {}", member)
      cluster.down(member.address)      //手工驱除,不用auto-down
    case _: MemberEvent => // ignore
  }

}

object ClusterEventsDemo {
  def main(args: Array[String]): Unit = {
    //重设port,seed-node-address
    val port =
      if (args.isEmpty) "0"
      else args(0)

    val addr =
      if (args.length < 2) "2551"
      else args(1)


    val seednodeSetting = "akka.cluster.seed-nodes = ["+
    "\"akka.tcp://clusterSystem@127.0.0.1:"+
      s"${addr}"+"\"]"


    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port = ${port}")
      .withFallback(ConfigFactory.parseString(seednodeSetting))
      .withFallback(ConfigFactory.load("cluster.conf"))

    val clusterSystem = ActorSystem(name="clusterSystem",config=config)
    val eventListener = clusterSystem.actorOf(Props[EventLisener],"eventListener")

    val cluster = Cluster(clusterSystem)
    cluster.registerOnMemberRemoved(println("Leaving cluster. I should cleanup... "))
    cluster.registerOnMemberUp(println("Hookup to cluster. Do some setups ..."))
    println("actor system started!")
    scala.io.StdIn.readLine()

    clusterSystem.terminate()


  }
}

 

 

 

 

 

 

 

 

 

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注