高并发异步框架Akka使用(七)、分布式Cluster

在前面的章节我们提到服务的单点以及吞吐量问题,为此我们需要引入分布式 Cluster。
在此之前我们需要做一些配置


actor { provider = "akka.cluster.ClusterActorRefProvider" serializers { kryo = "com.twitter.chill.akka.AkkaSerializer" proto = "akka.remote.serialization.ProtobufSerializer" } serialization-bindings { #"scala.Product" = kryo "com.google.protobuf.Message" = proto "java.io.Serializable" = kryo } } remote { log-remote-lifecycle-events = off netty.tcp { hostname = "127.0.0.1" port = 8000 } } cluster { seed-nodes = [ "akka.tcp://AkkaTaskProcessing@127.0.0.1:8000", "akka.tcp://AkkaTaskProcessing@127.0.0.1:8001"] # Disable legacy metrics in akka-cluster. metrics.enabled=off }

我们启动两个服务,分别开启8000和8001端口。第一个配置8000,另一个服务中的port配置为8001。
我们来看一下代码

public class SimpleClusterListenerActor extends UntypedAbstractActor {
   Cluster cluster = Cluster.get(getContext().system());
    @Override
    public void preStart() {
              cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(), MemberEvent.class, UnreachableMember.class);      
    }

    @Override
    public void postStop() {
        cluster.unsubscribe(getSelf());
    }

    @Override
    public void onReceive(Object message) {
        if (message instanceof MemberUp) {
            MemberUp mUp = (MemberUp) message;
            log.info("上线: {}", mUp.member());

        } else if (message instanceof UnreachableMember) {
            UnreachableMember mUnreachable = (UnreachableMember) message;
            log.info("检测不到: {}", mUnreachable.member());

        } else if (message instanceof MemberRemoved) {
            MemberRemoved mRemoved = (MemberRemoved) message;
            log.info("移除: {}", mRemoved.member());

        } else if (message instanceof MemberEvent) {

        } else {
            unhandled(message);
        }

    }
}

我们还是集成actor,实现onReceive方法。我们在启动时开启集群消息订阅,结束时关闭订阅。
同时在onReceive方法中判断消息类型,不同的消息这里仅给出打印日志,具体日志具体处理。
通过实验,我们会发现启动一台服务后,在启动另一台,会打印出上线信息,关闭其中一台,也会发现打印下线信息。而且还会自动进行负载均衡。

总结

至此,Akka的系列笔记结束了,akka的强大功能其远远不止笔记上说的这些内容,实际应用还是需要多探索,多学习。