高并发异步框架Akka使用(七)、分布式Cluster

在前面的章节我们提到服务的单点以及吞吐量问题,为此我们需要引入分布式 Cluster。
在此之前我们需要做一些配置


actor { provider = "akka.cluster.ClusterActorRefProvider" serializers { kryo = "com.twitter.chill.akka.AkkaSerializer" proto = "akka.remote.serialization.ProtobufSerializer" } serialization-bindings { #"scala.Product" = kryo "com.google.protobuf.Message" = proto "java.io.Serializable" = kryo } } remote { log-remote-lifecycle-events = off netty.tcp { hostname = "127.0.0.1" port = 8000 } } cluster { seed-nodes = [ "akka.tcp://AkkaTaskProcessing@127.0.0.1:8000", "akka.tcp://AkkaTaskProcessing@127.0.0.1:8001"] # Disable legacy metrics in akka-cluster. metrics.enabled=off }

我们启动两个服务,分别开启8000和8001端口。第一个配置8000,另一个服务中的port配置为8001。
我们来看一下代码

public class SimpleClusterListenerActor extends UntypedAbstractActor {
   Cluster cluster = Cluster.get(getContext().system());
    @Override
    public void preStart() {
              cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(), MemberEvent.class, UnreachableMember.class);      
    }

    @Override
    public void postStop() {
        cluster.unsubscribe(getSelf());
    }

    @Override
    public void onReceive(Object message) {
        if (message instanceof MemberUp) {
            MemberUp mUp = (MemberUp) message;
            log.info("上线: {}", mUp.member());

        } else if (message instanceof UnreachableMember) {
            UnreachableMember mUnreachable = (UnreachableMember) message;
            log.info("检测不到: {}", mUnreachable.member());

        } else if (message instanceof MemberRemoved) {
            MemberRemoved mRemoved = (MemberRemoved) message;
            log.info("移除: {}", mRemoved.member());

        } else if (message instanceof MemberEvent) {

        } else {
            unhandled(message);
        }

    }
}

我们还是集成actor,实现onReceive方法。我们在启动时开启集群消息订阅,结束时关闭订阅。
同时在onReceive方法中判断消息类型,不同的消息这里仅给出打印日志,具体日志具体处理。
通过实验,我们会发现启动一台服务后,在启动另一台,会打印出上线信息,关闭其中一台,也会发现打印下线信息。而且还会自动进行负载均衡。

总结

至此,Akka的系列笔记结束了,akka的强大功能其远远不止笔记上说的这些内容,实际应用还是需要多探索,多学习。

高并发异步框架Akka使用(六)、Persistent

这一节我们来聊一聊akka的持久化问题,首先讲一下为什么需要持久化。
假设有这样一个业务,用户在淘宝下订单,淘宝突然崩溃。当淘宝服务重启后,用户在此登录发现自己的订单数据全部没有了,这时候用户也很崩溃。
还是我们的工程例子,突然有一天,台风过境。工程项目被台风摧毁,包工头失联,工人死伤惨重,工地一片狼藉。台风走后,总得重建吧?可是怎么重建呢?所有的图纸,施工方案之前都没有保存。根本没办法做。大家都在想如果当时把这些都存档起来那该多好啊,这时候拿出来,按照之前的流程走一遍,一切搞定。这就是持久化的意义和重要性。
Akka中也有持久化,他包含两个概念

    快照 snapshot
    日志 Journal

两者的区别是快照是某一时刻的记录情况,有系统根据配置定期打印。而日志是我们控制的的顺序日志,我们可以从任意一种回复,具体的需要根据业务的实际情况分析。持久化可以存储到本地磁盘网络存储等等。
我们来看一个例子

public class TaskPersistActor extends AbstractPersistentActor {
    ......
          @Override
    public String persistenceId() {
        return "TaskPersistActor-" + id;
    }

    @Override
    public void preStart() throws Exception {
        mat = ActorMaterializer.create(system);
        queries = PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.class,
                LeveldbReadJournal.Identifier());
        source = queries.eventsByPersistenceId(persistenceId(), 0, Long.MAX_VALUE);
    }

    @Override
    public Receive createReceiveRecover() {
        return receiveBuilder().match(Task.class, evt -> {
             state.add(evt);
        }).match(SnapshotOffer.class, message -> {
            source.runForeach(i -> {
                Task task = (Task) ((EventEnvelope) i).event();
                          }, mat);
            state = (TaskState) message.snapshot();
            if (state != null && getNumEvents() > 0) {
                ConcurrentHashMap<String, Task> events = state.getEvents();
                Iterator<Entry<String, Task>> iter = events.entrySet().iterator();
                while (iter.hasNext()) {
                    Entry<String, Task> entry = (Entry<String, Task>) iter.next();
                    if (((Task) entry.getValue()).getCheckMap().isEmpty()) {
                        events.remove(entry.getKey());
                    } else {
                        handleTaskCmd((Task) entry.getValue(), false);
                    }

                }
            }
        }).match(RecoveryCompleted.class, message -> {
            // log.info("taskpersistActor-" + id + " 恢复完成");
        }).matchAny(message -> log.error("无法恢复事件!" + message.getClass().toString())).build();
    }

    @Override
    public Receive createReceive() {

        return receiveBuilder().match(Task.class, cmd -> {
            final Task task = cmd.clone();
            handleTaskCmd(task, true);
        }).match(CheckMessage.class, message -> {
            try {
                handleCheckMessage(message);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).match(SnapshotTick.class, message -> {

            // 删除历史快照
            deleteSnapshot(snapshotSequenceNr());
            deleteSnapshot(lastSequenceNr());
            if (getNumEvents() > 0) {
                saveSnapshot(state.copy());
            }
        }).match(SaveSnapshotSuccess.class, message -> {
            // 删除历史快照
        }).match(SaveSnapshotFailure.class, message -> {
            log.error(" 创建快照失败!");
        }).match(DeleteSnapshotSuccess.class, message -> {
            // log.info(" 删除快照完成!");
        }).matchAny(message -> log.error("无法处理命令!" + message.getClass().toString())).build();
    }
    ......
}

我么可以看到,要实现持久化,需要继承持久化基类,这里我们选择 AbstractPersistentActor,接着实现他的三个方法

public String persistenceId() 
public Receive createReceiveRecover()
public Receive createReceive() 

persistenceId用于获取持久化Id,createReceiveRecover用于处理回复持久化的消息,createReceive用于处理普通消息。
使用持久化我们需要在配置文件中开启配置:

    persistence {  
      journal {  
        plugin = "akka.persistence.journal.leveldb"  
        leveldb.dir = "persistence/journal"  
        leveldb.native = false  
      }  
      snapshot-store {  
        plugin = "akka.persistence.snapshot-store.local"  
        local.dir = "persistence/snapshots"  
      }  
    } 

至此,一个基于akka的简单的系统就可以使用了。下一节我们需要来讨论下如何避免单点问题,以及提高吞吐量,我们需要引入Akka Cluster。

高并发异步框架Akka使用(五)、Actor实战

上一节我们介绍了supervisor,包工头的工作模式。这一节我们来学一下具体的工人是如何工作的,前面我们提到过每一个工人都是一个actor,自然都需要继承actor,按照actor方式干活。

public class TaskActor  extends UntypedAbstractActor {
    ......
          @Override
    public void onReceive(Object message) throws Throwable {
       if (message instanceof Task){
           final Task task = ((Task)message).clone();
           handleTaskCmd(task, true);
       }else {
           log.error("unsupport!" + message.getClass().toString());
       }

    }


        private void handleTaskCmd(final Task event, boolean persistFlag) {
        ......
        if (!mcRelVector.isEmpty()) {
            // 是否需要持久化
            if (persistFlag) {
                String uuid = UUID.randomUUID().toString();
                event.setBatchUuid(uuid);
                ......
                // 创建日志记录
                insertCheckLog(event);
            }
            // 处理正常业务逻辑
            handleTaskEvt(event, mcRelVector);
        }
    }
}

    private void handleTaskEvt(final Task event, Set<MonitorsChecksRelated> mcRelVector) {

        ActorRef monitorActor = getContext().actorOf(
                springExtension.props("monitorActor").withDispatcher("thread-pool-dispatcher"),
                "monitorActor" + System.currentTimeMillis() + "-" + System.nanoTime());
        getContext().watch(monitorActor);

        for (MonitorsChecksRelated mcRelated : mcRelVector) {
           ......
            // 循环检查项,如果是无顺序的,则生成多个actor并发执行,否在按顺序在一个actor中执行
                ActorRef monitorActorOnce = getContext().actorOf(
                        springExtension.props("monitorActor").withDispatcher("thread-pool-dispatcher"),
                        "monitorActorOnce" + System.currentTimeMillis() + "-" + System.nanoTime());
                getContext().watch(monitorActorOnce);
                monitorActorOnce.tell(monitorsChecksRelated, getSelf());


        }
        // 发送毒丸,终止顺序actor
            monitorActor.tell(PoisonPill.getInstance(), ActorRef.noSender());

    }
    ```

示例代码实现了onReceive方法,他也只处理task,为了演示actor之间的消息发送方便,我们把taskactor比作二级包工头,他下面也有工人monitoractor,很显然他手下的工人要由他来创建,创建完直接叫他干活,他找都是临时工。毕竟一级一级分包下来,也没什么大钱赚,养工人还是一笔很大的开销。任务分法完成以后,他在给大家发一条短信,任务没有了,你们干结束了就结账走人,下次有活我再找你们。
这份短信就是我们的PoisonPill。而短信的发送就是我们的tell方法。每个actor之间通过tell发送短信交互。我们这里使用的无响应模式,还可以使用有响应模式。比如工人做完活了,需要tell一下二级包工头,那么包工头需要在自己的onReceive方法中新增一种类型,实现响应的处理就好了。
```java 
#工人调用sender获取二级包工头,执行tell方法告诉工头自己干完了。
sender().tell(result, self());

这一节,我们主要介绍了Actor之间是如何进行通信的。主要就是集成actor,实现onReceive方法。下一节我们将介绍一下Akka的持久化问题。

高并发异步框架Akka使用(四)、Supervisor

在Akka中存在着这样一种监管策略,Supervisor 监督管理。
Akka中每个Actor是由其他的Actor来创建,意思是 ,每个Actor都有一个父类,记录在父类的上下文中。每个父类Actor监督管理所有子类Actor。任何一个子类Actor产生异常行为时,父类Actor需要处理。
这里我们在做一个类比,每一个actor都是一个工人,而Supervisor则是一个包工头。项目的工人数目有包工头控制,工人病假、休息等活动有包工头安排,使用不同的策略。

通常对于子类Actor异常处理有不同决策。具体的决策包括以下几种:

Restart:重新启动出现异常的Actor;
Stop:退出出现异常的Actor;
Resume:忽略异常,保留其当前状态,继续执行;
Escalate:自己也不知道如何处理,异常抛出给上层,交给自己的父类处理。
任何一个新Actor,在其基类中都是有默认的Supervision实现的。Akka提供了两种策略实现:OneForOneStrategy和AllForOneStrategy。

    OneForOneStrategy:只有当前抛出异常的Actor响应该策略;
    AllForOneStrategy:所有的Actor都响应该策略。

默认使用OneForOneStrategy。


/** * When supervisorStrategy is not specified for an actor this * `Decider` is used by default in the supervisor strategy. * The child will be stopped when [[akka.actor.ActorInitializationException]], * [[akka.actor.ActorKilledException]], or [[akka.actor.DeathPactException]] is * thrown. It will be restarted for other `Exception` types. * The error is escalated if it's a `Throwable`, i.e. `Error`. */ final val defaultDecider: Decider = { case _: ActorInitializationException ⇒ Stop case _: ActorKilledException ⇒ Stop case _: DeathPactException ⇒ Stop case _: Exception ⇒ Restart } /** * When supervisorStrategy is not specified for an actor this * is used by default. OneForOneStrategy with decider defined in * [[#defaultDecider]]. */ final val defaultStrategy: SupervisorStrategy = { OneForOneStrategy()(defaultDecider) }

当actor发生初始化、结束、以及死信异常时,直接停止。其他异常重启actor。

同样,我们需要实现一个Supervisor,让他来监督我们的work。所以我们需要在系统初始化的时候就建立好supervisor。好比工程开始要联系好开发商,找好包工头。

 ActorSystem   actorSystem = ActorSystem.create();
 private static ActorRef supervisor = actorSystem.actorOf(ext.props("supervisor").withMailbox("akka.priority-mailbox"));

我们使用ActorSystem的actorOf方法创建。

  /**
   * Create new actor as child of this context with the given name, which must
   * not be null, empty or start with “$”. If the given name is already in use,
   * an `InvalidActorNameException` is thrown.
   *
   * See [[akka.actor.Props]] for details on how to obtain a `Props` object.
   *
   * @throws akka.actor.InvalidActorNameException if the given name is
   *   invalid or already in use
   * @throws akka.ConfigurationException if deployment, dispatcher
   *   or mailbox configuration is wrong
   * @throws UnsupportedOperationException if invoked on an ActorSystem that
   *   uses a custom user guardian
   */
  def actorOf(props: Props, name: String): ActorRef

我们持有一个ActorRef对象,而不是直接持有Actor,ActorRef与Actor相比,可以理解成他是一个props类型对象的引用,我们不关心具体的这个对象有Actor system来管理。这样更加灵活,比如这个actor奔溃了,actor system会为我们在生成一个。

好比我们工地上需要一个钢筋工,我不会找张三或者李四,我找告诉包工头,我需要一个钢筋工,至于是谁由包工头安排,比如今天是张三来了,可能明天张三请假了,那就由李四来。这样应该很好理解。
好了,有了包工头我们就可以开始工作了。我们知道,包工头也是这个系统中的一颗螺丝,他也是一个actor,默认他也需要集成actor,他的任务就是给其他人发任务。

public class Supervisor extends UntypedAbstractActor {

    private final LoggingAdapter log = Logging.getLogger(getContext().system(), "Supervisor");

    private Router router;

    @Override
    public void preStart() throws Exception {

        List<Routee> routees = new ArrayList<Routee>();
        for (int i = 0; i < ConstantsType.MAX_WORK_ACTOR_AMOUNT; i++) {
            ActorRef actor = getContext().actorOf(ext.props("taskActor", i), "taskActor" + i);
            // 监控
            getContext().watch(actor);
            routees.add(new ActorRefRoutee(actor));
        }
        /**
         * RoundRobinRoutingLogic: 轮询
         * BroadcastRoutingLogic: 广播
         * RandomRoutingLogic: 随机
         * SmallestMailboxRoutingLogic: 空闲
         */
        router = new Router(new RoundRobinRoutingLogic(), routees);
        super.preStart();
    }

    @Override
    public void onReceive(Object message) throws Exception {

        if (message instanceof Task) {
            // 进行路由转发
            router.route(message, getSender());
        } else {
            log.error("unsupport message {}", message);
        }
    }

    @Override
    public void postStop() throws Exception {
        log.info("stop");
        super.postStop();
    }
}

这里引入了一个Router,路由的概念相当于联系人、号码簿。在preStart中,我们创建了一定数量的actor,通过Routee,关联到Router。
好比包工头要干活,手下要维持一个50人的小团队,每个人就是一个actor,他怎么管这些人呢,他把每个人的手机号码记下来,号码就相当于Routee。号码簿就相当于Router。其中的watch方法就相当于和工人签了合同。之后有任务就可以找工人来干活。
那有任务了到底找谁呢?包工头想了四种方法:

    RoundRobinRoutingLogic: 轮询,按号码簿挨个找
    BroadcastRoutingLogic: 广播,按号码簿群发
    RandomRoutingLogic: 随机,从号码簿中随机选一个
    SmallestMailboxRoutingLogic: 空闲,看谁在家闲的蛋疼找谁

为了公平起见,我们替包工头选一个轮询方法。
一切准备妥当,粮草齐备,就等着大展宏图。我们前面介绍过,每个包工头都是一个actor,自然他也有mailbox,会收到mail。actor系统会自动处理mail到onReceive方法,我们只要实现这个方法就好了。
包工头比较懒,他只接受工程甲方发来的任务信息 ,其他的一律不处理。收到甲方的任务之后,通过发短信通知手下的工人来干活。具体怎么通知,通知谁上面已经介绍过了。
乍一看,还是很简单的,这一节我们简单介绍了supervisor。以及一个actor处理的 基本流程。下一节我们将介绍一下具体的actor如何处理,以及actor之间是如何交流通信的。

高并发异步框架Akka使用(三)、Mailboxes

在Akka中,除了Actor,另一个很重要的概念就是Mailboxes。他是Akka的基石之一,掌握了这两个概念,对于Akka你基本上算是有了一定的理解。
Mailboxes顾名思义,就是邮箱的意思,Akka中的邮箱保存actor的信息。正常情况下,每个actor都有一个自己的邮箱。但对于Balancingpool,所有路由对象将共享一个邮箱实例,这一点需要特别注意。
这里我们再来打一个比方,我们把actor类比为公司老总,那么mailbox就像相当于是老总的助理。各个老总比较忙,他们不会直接交流,而是通过给他们的秘书发邮件,让她的秘书来安排。
譬如google的boss想找Oracle的boss喝酒,Oracle的boss日理万机,没时间应付。所以需要由助理安排行程,那么google的boss先给Oracle的boss助理发一份邮件,告诉他自己想和他的boss喝一杯。接下来就等助理安排行程,助理一看,前面有微软的boss预约了想要一起唱K,Facebook的boss约了一起听京剧,那就按预约的顺序安排吧。
这是时候有两种情况:

    1.微软的boss要求如果去唱K就给他回一封邮件。
    2.Facebook的boss则比较随意,去不去随便,反正我也约了其他人;
    你爱去不去,不用告诉我。我是团购的票,送给你做个顺水人情。

这就是两种不同的处理方式,有响应处理和无响应处理。这两种方式没有好坏、优劣,主要看业务的要求。
Akka中的mailbox主要有两类分别是:unboundedMailbox 和boundedMailbox,他们主要的区别是邮箱是否有容量界限,在这这两种类型之上,他们还可以分为是否阻塞,优选级权限等类型的邮箱。
unboundedMailbox主要包括:

    UnboundedMailbox (default)
    SingleConsumerOnlyUnboundedMailbox 
    NonBlockingBoundedMailbox
    UnboundedControlAwareMailbox
    UnboundedPriorityMailbox
    UnboundedStablePriorityMailbox
    UnboundedPriorityMailbox

boundedMailbox主要包括:

    BoundedMailbox
    BoundedPriorityMailbox
    BoundedStablePriorityMailbox
    BoundedStablePriorityMailbox
    BoundedPriorityMailbox
    BoundedControlAwareMailbox

默认使用的是UnboundedMailbox,无阻塞,无容量上界。这就是mailbox的基本概念。
比如我们实现一个自己的mailbox,对于一些特殊的操作,我们需要优先执行:

public class PriorityMailbox extends UnboundedStablePriorityMailbox {

    public PriorityMailbox(ActorSystem.Settings settings, Config config) {
          super(new PriorityGenerator() {
            @Override
            public int gen(Object message) {
                if (message instanceof Task) {
                    return ((Task) message).getPriority();
                }  else if (message.equals(PoisonPill.getInstance())) {
                    return 1000;
                } else {
                    return 100;
                }
            }
        });

    }
}

我们对PoisonPill赋予高优先级。使用自定义mailbox,我们需要在配置文件中开启配置

  priority-mailbox {
    mailbox-type = "com.tts.akkaflow.mailbox.PriorityMailbox"
  }

接下来我们将结合具体的示例,讲讲如何使用Actor和mailbox,如何将两者串起来。

高并发异步框架Akka使用(二)、Actor模型

提到Akka就不能不提Actor模型,他是Akka绕不过的话题,也是Akka的核心所在。
那么究竟什么是Actor模型呢?
我们可以先建立这样一个概念,一个Actor就对应于Java中的一个Java Bean,是一个边界实体。但是他不完全等同于Java Bean,或者说这是一个错误的等价。之所以这么说,就是想给大家有个经验上的概念。
一个Actor从内部看,可以做很多事,但从外部来看,他的职能又是单一的。这一概念可能 不太好理解,我们暂且放下,后面会慢慢介绍。
Actor和Actor之间不能直接调用或交互,他们通过邮件来交互,每个Actor都有一个自己的信箱,里面存放着其他Actor发来的邮件。同时每个Actor也可以给别人发邮件。
参见下面这张图,有四个actor,actor之间通过mail交互通信,他们共同组成了一个完整的系统,就是Actor System。

Actor可以处理不同类型的邮件,每一个类型的邮件有一个专门的处理方法。
好了,我们现在来做一个通俗的类比,让大家有一个更直观的理解。
Actor System就好比我们这个世界,我们每一个人就是一个Actor,我们每个人有不同的技能,有的人会织衣,有的人会捕鱼,有的人会打猎。
现在有这样一个需求,有一个人A,需要一批貂皮大衣和鱼皮马甲,他自己啥也不会,怎么办呢?他决定找其他人帮忙:

    1.A给猎人发了一封信,让猎人帮他打一只貂,
    2.猎人打好了貂,通过顺丰发给了他,于是A有了貂皮;
    3.A给渔夫发了一封信,让农夫帮他打一条鱼,
    4.渔夫打好了鱼,通过圆通发给了他,于是A有了鱼皮;
    4.A又给织衣人发一封信,同时把貂皮、鱼皮通过顺丰寄给了他,让织衣人帮他做貂皮大衣和鱼皮马甲。
    5.织衣人做好了貂皮大衣和鱼皮马甲用顺丰发给了他,最终他拥有了貂皮大衣和鱼皮马甲。

每个人都有自己的特殊技能,比别人做的又快又好。这样就构成了一个完整的世界,一个人人有衣穿,人人有田种的世界。每个人各司其职,其乐融融。
以上就是actor和actor system的简单类比。
可是大家会有这样的疑问:为什么A自己不打猎、捕鱼、织衣,交给这么多人做,不是更麻烦吗?万一其中一件快递丢了呢?那不是满盘皆输。
仔细想一想,确实是这样的,生活中掌握多项技能的人大有人在。但是当你回头想想,你会发现这样的模型存在问题:

    如果只交给一个人,他去世了怎么办?
    现在又有人提出要做貂皮马甲怎么办?
    如果很多人需要大衣和马甲,都来找他,他做不完怎么办?
    同样存在的问题,他们也会存在快递丢失;

回到我们的软件系统中,大家不难发现,这不就是一个传统的All in one的系统吗?所有的功能都整合在这个大系统中;一旦需求有变动,需要修改很多代码;请求一大,负载狂飙;因为一些其他问题,大致请求失败,业务被迫中断。
因此我们需要一个解决的办法,这也就是微服务产生的根源。这时候,你可能会发现,Akka还真有点像那么回事。

    akka中有不同的actor,处理不同的任务
    通过actor模型拆分后,我们可以构建异构的系统
    对于来往的邮件,Akka支持持久化,我们可以做持久化,在出错时可以重做。
    Akka还支持集群,同一类型任务可以交给集群处理,避免单点故障。

通过这样的改进,我们可以做很多事情。譬如现在织衣人有钱了,他自己不用亲自织衣服了,他买了机器,通过织衣服的机器来做。对于整个系统来说,变化不大,大家还是给他发邮件,寄材料。对于别人来说,他的变化是无感知的。同样的,越来越多的人加入织衣行列,大家也不必关心,只管发邮件,寄材料,所有的这些由织衣人自己来解决。这样就不会出现一个织衣人病了,大家就不能收到衣服的情况。
这时候还有可能有另外一种情况,现在是冬天,大雪纷飞,那么织衣人就要先织造貂皮大衣,鱼皮马甲可能要暂停,待到冬去春来再去织马甲,或者根本就不去织马甲。
传统的系统,遇到流量高峰,想要去实现这样的需求还是很难的,首先剥离织貂皮大衣和马甲就是一件很麻烦的事,或者说根本没办法实现,更不要说后面再去重新织马甲。
而Akka框架却很容易做到,只要新增加一个Actors就好了,将原先织马甲的逻辑剪切到新的Actor中,新的Actor就拥有了这项技能,当流量高峰来临时,我们保证主要、核心业务正常,其他的暂时关停,持久化相关数据,等到系统正常时候再补做。总体来说还是很方便的。
Akka的有点远不止此,当然这些都是和微服务息息相关的,结合起来还是很好理解的。
在Akka框架中actor是其中最小的执行粒度,是Akka中的基础模型。类似于Java中的线程,但是他比线程轻量很多,更像是golang中的协程的概念。

高并发异步框架Akka使用(一)、介绍

曾几何时你是否迷茫,苦于寻找一个能使简单实现出高并发的异步编程框架,自己动手写又不现实,到底哪里能找到这样一个好用的框架呢?或是你想寻找一个能够实现微服务的框架,亦或是你想实现一下DDD的编程。
如果你有这个困惑,是时候了解一下Akka了,他可以满足你的愿望。
我在一次了解Scala语言的情况下,接触到了Akka。官网介绍Akka是为Java和Scala构建高度并发、分布式和弹性消息驱动应用程序的工具包。使用Akka可以更轻松地构建强大的反应式、并发和分布式应用程序。

  • 更简单的并发和分布式系统,actors和streams允许您构建可扩展的系统,更有效地使用服务器的资源。
  • 弹性设计,以响应式原则为基础,Akka能够让系统自我修复,失败时能及时响应。
  • 高性能,在一台机器上,最高可达5000万msg/sec。内存占用很小;每GB堆大约250万个Actors。
  • 弹性的、分布式的,无单点故障的分布式系统。负载平衡和跨节点的自适应路由。使用CRDT实现分布式数据的最终一致性。
  • 响应式的流数据,具有背压式的异步非阻塞流处理。为构建微服务提供了一个很好的平台使用完全异步和流式http服务器和客户端。

Akka包含几个大部分:

    Akka Actors Actor系统

    Akka Streams 流

    Akka HTTP 

    Akka Cluster 集群、分布式

    Cluster Sharding  集群副本管理

    Distributed Data 分布式数据

    Akka Persistence 数据持久化

    Alpakka

    Alpakka Kafka 结合kafka应用

    Akka gRPC gRPC协议

    Akka Enhancements 增强包

    Akka Management

其中核心是Actos模型,接下来我会用几篇文章来介绍一下。我们的一切都是围绕Actos展开的。我们将写一个小例子,通过理论和代码相结合,从而能更好地理解AKKA,乃至Actor模型,领域驱动设计(DDD)等。