在前面的章节我们提到服务的单点以及吞吐量问题,为此我们需要引入分布式 Cluster。
在此之前我们需要做一些配置
actor {
provider = "akka.cluster.ClusterActorRefProvider"
serializers {
kryo = "com.twitter.chill.akka.AkkaSerializer"
proto = "akka.remote.serialization.ProtobufSerializer"
}
serialization-bindings {
#"scala.Product" = kryo
"com.google.protobuf.Message" = proto
"java.io.Serializable" = kryo
}
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 8000
}
}
cluster {
seed-nodes = [
"akka.tcp://AkkaTaskProcessing@127.0.0.1:8000",
"akka.tcp://AkkaTaskProcessing@127.0.0.1:8001"]
# Disable legacy metrics in akka-cluster.
metrics.enabled=off
}
我们启动两个服务,分别开启8000和8001端口。第一个配置8000,另一个服务中的port配置为8001。
我们来看一下代码
public class SimpleClusterListenerActor extends UntypedAbstractActor {
Cluster cluster = Cluster.get(getContext().system());
@Override
public void preStart() {
cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(), MemberEvent.class, UnreachableMember.class);
}
@Override
public void postStop() {
cluster.unsubscribe(getSelf());
}
@Override
public void onReceive(Object message) {
if (message instanceof MemberUp) {
MemberUp mUp = (MemberUp) message;
log.info("上线: {}", mUp.member());
} else if (message instanceof UnreachableMember) {
UnreachableMember mUnreachable = (UnreachableMember) message;
log.info("检测不到: {}", mUnreachable.member());
} else if (message instanceof MemberRemoved) {
MemberRemoved mRemoved = (MemberRemoved) message;
log.info("移除: {}", mRemoved.member());
} else if (message instanceof MemberEvent) {
} else {
unhandled(message);
}
}
}
我们还是集成actor,实现onReceive方法。我们在启动时开启集群消息订阅,结束时关闭订阅。
同时在onReceive方法中判断消息类型,不同的消息这里仅给出打印日志,具体日志具体处理。
通过实验,我们会发现启动一台服务后,在启动另一台,会打印出上线信息,关闭其中一台,也会发现打印下线信息。而且还会自动进行负载均衡。
总结
至此,Akka的系列笔记结束了,akka的强大功能其远远不止笔记上说的这些内容,实际应用还是需要多探索,多学习。