高并发异步框架Akka使用(六)、Persistent

这一节我们来聊一聊akka的持久化问题,首先讲一下为什么需要持久化。
假设有这样一个业务,用户在淘宝下订单,淘宝突然崩溃。当淘宝服务重启后,用户在此登录发现自己的订单数据全部没有了,这时候用户也很崩溃。
还是我们的工程例子,突然有一天,台风过境。工程项目被台风摧毁,包工头失联,工人死伤惨重,工地一片狼藉。台风走后,总得重建吧?可是怎么重建呢?所有的图纸,施工方案之前都没有保存。根本没办法做。大家都在想如果当时把这些都存档起来那该多好啊,这时候拿出来,按照之前的流程走一遍,一切搞定。这就是持久化的意义和重要性。
Akka中也有持久化,他包含两个概念

    快照 snapshot
    日志 Journal

两者的区别是快照是某一时刻的记录情况,有系统根据配置定期打印。而日志是我们控制的的顺序日志,我们可以从任意一种回复,具体的需要根据业务的实际情况分析。持久化可以存储到本地磁盘网络存储等等。
我们来看一个例子

public class TaskPersistActor extends AbstractPersistentActor {
    ......
          @Override
    public String persistenceId() {
        return "TaskPersistActor-" + id;
    }

    @Override
    public void preStart() throws Exception {
        mat = ActorMaterializer.create(system);
        queries = PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.class,
                LeveldbReadJournal.Identifier());
        source = queries.eventsByPersistenceId(persistenceId(), 0, Long.MAX_VALUE);
    }

    @Override
    public Receive createReceiveRecover() {
        return receiveBuilder().match(Task.class, evt -> {
             state.add(evt);
        }).match(SnapshotOffer.class, message -> {
            source.runForeach(i -> {
                Task task = (Task) ((EventEnvelope) i).event();
                          }, mat);
            state = (TaskState) message.snapshot();
            if (state != null && getNumEvents() > 0) {
                ConcurrentHashMap<String, Task> events = state.getEvents();
                Iterator<Entry<String, Task>> iter = events.entrySet().iterator();
                while (iter.hasNext()) {
                    Entry<String, Task> entry = (Entry<String, Task>) iter.next();
                    if (((Task) entry.getValue()).getCheckMap().isEmpty()) {
                        events.remove(entry.getKey());
                    } else {
                        handleTaskCmd((Task) entry.getValue(), false);
                    }

                }
            }
        }).match(RecoveryCompleted.class, message -> {
            // log.info("taskpersistActor-" + id + " 恢复完成");
        }).matchAny(message -> log.error("无法恢复事件!" + message.getClass().toString())).build();
    }

    @Override
    public Receive createReceive() {

        return receiveBuilder().match(Task.class, cmd -> {
            final Task task = cmd.clone();
            handleTaskCmd(task, true);
        }).match(CheckMessage.class, message -> {
            try {
                handleCheckMessage(message);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).match(SnapshotTick.class, message -> {

            // 删除历史快照
            deleteSnapshot(snapshotSequenceNr());
            deleteSnapshot(lastSequenceNr());
            if (getNumEvents() > 0) {
                saveSnapshot(state.copy());
            }
        }).match(SaveSnapshotSuccess.class, message -> {
            // 删除历史快照
        }).match(SaveSnapshotFailure.class, message -> {
            log.error(" 创建快照失败!");
        }).match(DeleteSnapshotSuccess.class, message -> {
            // log.info(" 删除快照完成!");
        }).matchAny(message -> log.error("无法处理命令!" + message.getClass().toString())).build();
    }
    ......
}

我么可以看到,要实现持久化,需要继承持久化基类,这里我们选择 AbstractPersistentActor,接着实现他的三个方法

public String persistenceId() 
public Receive createReceiveRecover()
public Receive createReceive() 

persistenceId用于获取持久化Id,createReceiveRecover用于处理回复持久化的消息,createReceive用于处理普通消息。
使用持久化我们需要在配置文件中开启配置:

    persistence {  
      journal {  
        plugin = "akka.persistence.journal.leveldb"  
        leveldb.dir = "persistence/journal"  
        leveldb.native = false  
      }  
      snapshot-store {  
        plugin = "akka.persistence.snapshot-store.local"  
        local.dir = "persistence/snapshots"  
      }  
    } 

至此,一个基于akka的简单的系统就可以使用了。下一节我们需要来讨论下如何避免单点问题,以及提高吞吐量,我们需要引入Akka Cluster。

高并发异步框架Akka使用(五)、Actor实战

上一节我们介绍了supervisor,包工头的工作模式。这一节我们来学一下具体的工人是如何工作的,前面我们提到过每一个工人都是一个actor,自然都需要继承actor,按照actor方式干活。

public class TaskActor  extends UntypedAbstractActor {
    ......
          @Override
    public void onReceive(Object message) throws Throwable {
       if (message instanceof Task){
           final Task task = ((Task)message).clone();
           handleTaskCmd(task, true);
       }else {
           log.error("unsupport!" + message.getClass().toString());
       }

    }


        private void handleTaskCmd(final Task event, boolean persistFlag) {
        ......
        if (!mcRelVector.isEmpty()) {
            // 是否需要持久化
            if (persistFlag) {
                String uuid = UUID.randomUUID().toString();
                event.setBatchUuid(uuid);
                ......
                // 创建日志记录
                insertCheckLog(event);
            }
            // 处理正常业务逻辑
            handleTaskEvt(event, mcRelVector);
        }
    }
}

    private void handleTaskEvt(final Task event, Set<MonitorsChecksRelated> mcRelVector) {

        ActorRef monitorActor = getContext().actorOf(
                springExtension.props("monitorActor").withDispatcher("thread-pool-dispatcher"),
                "monitorActor" + System.currentTimeMillis() + "-" + System.nanoTime());
        getContext().watch(monitorActor);

        for (MonitorsChecksRelated mcRelated : mcRelVector) {
           ......
            // 循环检查项,如果是无顺序的,则生成多个actor并发执行,否在按顺序在一个actor中执行
                ActorRef monitorActorOnce = getContext().actorOf(
                        springExtension.props("monitorActor").withDispatcher("thread-pool-dispatcher"),
                        "monitorActorOnce" + System.currentTimeMillis() + "-" + System.nanoTime());
                getContext().watch(monitorActorOnce);
                monitorActorOnce.tell(monitorsChecksRelated, getSelf());


        }
        // 发送毒丸,终止顺序actor
            monitorActor.tell(PoisonPill.getInstance(), ActorRef.noSender());

    }
    ```

示例代码实现了onReceive方法,他也只处理task,为了演示actor之间的消息发送方便,我们把taskactor比作二级包工头,他下面也有工人monitoractor,很显然他手下的工人要由他来创建,创建完直接叫他干活,他找都是临时工。毕竟一级一级分包下来,也没什么大钱赚,养工人还是一笔很大的开销。任务分法完成以后,他在给大家发一条短信,任务没有了,你们干结束了就结账走人,下次有活我再找你们。
这份短信就是我们的PoisonPill。而短信的发送就是我们的tell方法。每个actor之间通过tell发送短信交互。我们这里使用的无响应模式,还可以使用有响应模式。比如工人做完活了,需要tell一下二级包工头,那么包工头需要在自己的onReceive方法中新增一种类型,实现响应的处理就好了。
```java 
#工人调用sender获取二级包工头,执行tell方法告诉工头自己干完了。
sender().tell(result, self());

这一节,我们主要介绍了Actor之间是如何进行通信的。主要就是集成actor,实现onReceive方法。下一节我们将介绍一下Akka的持久化问题。

高并发异步框架Akka使用(四)、Supervisor

在Akka中存在着这样一种监管策略,Supervisor 监督管理。
Akka中每个Actor是由其他的Actor来创建,意思是 ,每个Actor都有一个父类,记录在父类的上下文中。每个父类Actor监督管理所有子类Actor。任何一个子类Actor产生异常行为时,父类Actor需要处理。
这里我们在做一个类比,每一个actor都是一个工人,而Supervisor则是一个包工头。项目的工人数目有包工头控制,工人病假、休息等活动有包工头安排,使用不同的策略。

通常对于子类Actor异常处理有不同决策。具体的决策包括以下几种:

Restart:重新启动出现异常的Actor;
Stop:退出出现异常的Actor;
Resume:忽略异常,保留其当前状态,继续执行;
Escalate:自己也不知道如何处理,异常抛出给上层,交给自己的父类处理。
任何一个新Actor,在其基类中都是有默认的Supervision实现的。Akka提供了两种策略实现:OneForOneStrategy和AllForOneStrategy。

    OneForOneStrategy:只有当前抛出异常的Actor响应该策略;
    AllForOneStrategy:所有的Actor都响应该策略。

默认使用OneForOneStrategy。


/** * When supervisorStrategy is not specified for an actor this * `Decider` is used by default in the supervisor strategy. * The child will be stopped when [[akka.actor.ActorInitializationException]], * [[akka.actor.ActorKilledException]], or [[akka.actor.DeathPactException]] is * thrown. It will be restarted for other `Exception` types. * The error is escalated if it's a `Throwable`, i.e. `Error`. */ final val defaultDecider: Decider = { case _: ActorInitializationException ⇒ Stop case _: ActorKilledException ⇒ Stop case _: DeathPactException ⇒ Stop case _: Exception ⇒ Restart } /** * When supervisorStrategy is not specified for an actor this * is used by default. OneForOneStrategy with decider defined in * [[#defaultDecider]]. */ final val defaultStrategy: SupervisorStrategy = { OneForOneStrategy()(defaultDecider) }

当actor发生初始化、结束、以及死信异常时,直接停止。其他异常重启actor。

同样,我们需要实现一个Supervisor,让他来监督我们的work。所以我们需要在系统初始化的时候就建立好supervisor。好比工程开始要联系好开发商,找好包工头。

 ActorSystem   actorSystem = ActorSystem.create();
 private static ActorRef supervisor = actorSystem.actorOf(ext.props("supervisor").withMailbox("akka.priority-mailbox"));

我们使用ActorSystem的actorOf方法创建。

  /**
   * Create new actor as child of this context with the given name, which must
   * not be null, empty or start with “$”. If the given name is already in use,
   * an `InvalidActorNameException` is thrown.
   *
   * See [[akka.actor.Props]] for details on how to obtain a `Props` object.
   *
   * @throws akka.actor.InvalidActorNameException if the given name is
   *   invalid or already in use
   * @throws akka.ConfigurationException if deployment, dispatcher
   *   or mailbox configuration is wrong
   * @throws UnsupportedOperationException if invoked on an ActorSystem that
   *   uses a custom user guardian
   */
  def actorOf(props: Props, name: String): ActorRef

我们持有一个ActorRef对象,而不是直接持有Actor,ActorRef与Actor相比,可以理解成他是一个props类型对象的引用,我们不关心具体的这个对象有Actor system来管理。这样更加灵活,比如这个actor奔溃了,actor system会为我们在生成一个。

好比我们工地上需要一个钢筋工,我不会找张三或者李四,我找告诉包工头,我需要一个钢筋工,至于是谁由包工头安排,比如今天是张三来了,可能明天张三请假了,那就由李四来。这样应该很好理解。
好了,有了包工头我们就可以开始工作了。我们知道,包工头也是这个系统中的一颗螺丝,他也是一个actor,默认他也需要集成actor,他的任务就是给其他人发任务。

public class Supervisor extends UntypedAbstractActor {

    private final LoggingAdapter log = Logging.getLogger(getContext().system(), "Supervisor");

    private Router router;

    @Override
    public void preStart() throws Exception {

        List<Routee> routees = new ArrayList<Routee>();
        for (int i = 0; i < ConstantsType.MAX_WORK_ACTOR_AMOUNT; i++) {
            ActorRef actor = getContext().actorOf(ext.props("taskActor", i), "taskActor" + i);
            // 监控
            getContext().watch(actor);
            routees.add(new ActorRefRoutee(actor));
        }
        /**
         * RoundRobinRoutingLogic: 轮询
         * BroadcastRoutingLogic: 广播
         * RandomRoutingLogic: 随机
         * SmallestMailboxRoutingLogic: 空闲
         */
        router = new Router(new RoundRobinRoutingLogic(), routees);
        super.preStart();
    }

    @Override
    public void onReceive(Object message) throws Exception {

        if (message instanceof Task) {
            // 进行路由转发
            router.route(message, getSender());
        } else {
            log.error("unsupport message {}", message);
        }
    }

    @Override
    public void postStop() throws Exception {
        log.info("stop");
        super.postStop();
    }
}

这里引入了一个Router,路由的概念相当于联系人、号码簿。在preStart中,我们创建了一定数量的actor,通过Routee,关联到Router。
好比包工头要干活,手下要维持一个50人的小团队,每个人就是一个actor,他怎么管这些人呢,他把每个人的手机号码记下来,号码就相当于Routee。号码簿就相当于Router。其中的watch方法就相当于和工人签了合同。之后有任务就可以找工人来干活。
那有任务了到底找谁呢?包工头想了四种方法:

    RoundRobinRoutingLogic: 轮询,按号码簿挨个找
    BroadcastRoutingLogic: 广播,按号码簿群发
    RandomRoutingLogic: 随机,从号码簿中随机选一个
    SmallestMailboxRoutingLogic: 空闲,看谁在家闲的蛋疼找谁

为了公平起见,我们替包工头选一个轮询方法。
一切准备妥当,粮草齐备,就等着大展宏图。我们前面介绍过,每个包工头都是一个actor,自然他也有mailbox,会收到mail。actor系统会自动处理mail到onReceive方法,我们只要实现这个方法就好了。
包工头比较懒,他只接受工程甲方发来的任务信息 ,其他的一律不处理。收到甲方的任务之后,通过发短信通知手下的工人来干活。具体怎么通知,通知谁上面已经介绍过了。
乍一看,还是很简单的,这一节我们简单介绍了supervisor。以及一个actor处理的 基本流程。下一节我们将介绍一下具体的actor如何处理,以及actor之间是如何交流通信的。

高并发异步框架Akka使用(三)、Mailboxes

在Akka中,除了Actor,另一个很重要的概念就是Mailboxes。他是Akka的基石之一,掌握了这两个概念,对于Akka你基本上算是有了一定的理解。
Mailboxes顾名思义,就是邮箱的意思,Akka中的邮箱保存actor的信息。正常情况下,每个actor都有一个自己的邮箱。但对于Balancingpool,所有路由对象将共享一个邮箱实例,这一点需要特别注意。
这里我们再来打一个比方,我们把actor类比为公司老总,那么mailbox就像相当于是老总的助理。各个老总比较忙,他们不会直接交流,而是通过给他们的秘书发邮件,让她的秘书来安排。
譬如google的boss想找Oracle的boss喝酒,Oracle的boss日理万机,没时间应付。所以需要由助理安排行程,那么google的boss先给Oracle的boss助理发一份邮件,告诉他自己想和他的boss喝一杯。接下来就等助理安排行程,助理一看,前面有微软的boss预约了想要一起唱K,Facebook的boss约了一起听京剧,那就按预约的顺序安排吧。
这是时候有两种情况:

    1.微软的boss要求如果去唱K就给他回一封邮件。
    2.Facebook的boss则比较随意,去不去随便,反正我也约了其他人;
    你爱去不去,不用告诉我。我是团购的票,送给你做个顺水人情。

这就是两种不同的处理方式,有响应处理和无响应处理。这两种方式没有好坏、优劣,主要看业务的要求。
Akka中的mailbox主要有两类分别是:unboundedMailbox 和boundedMailbox,他们主要的区别是邮箱是否有容量界限,在这这两种类型之上,他们还可以分为是否阻塞,优选级权限等类型的邮箱。
unboundedMailbox主要包括:

    UnboundedMailbox (default)
    SingleConsumerOnlyUnboundedMailbox 
    NonBlockingBoundedMailbox
    UnboundedControlAwareMailbox
    UnboundedPriorityMailbox
    UnboundedStablePriorityMailbox
    UnboundedPriorityMailbox

boundedMailbox主要包括:

    BoundedMailbox
    BoundedPriorityMailbox
    BoundedStablePriorityMailbox
    BoundedStablePriorityMailbox
    BoundedPriorityMailbox
    BoundedControlAwareMailbox

默认使用的是UnboundedMailbox,无阻塞,无容量上界。这就是mailbox的基本概念。
比如我们实现一个自己的mailbox,对于一些特殊的操作,我们需要优先执行:

public class PriorityMailbox extends UnboundedStablePriorityMailbox {

    public PriorityMailbox(ActorSystem.Settings settings, Config config) {
          super(new PriorityGenerator() {
            @Override
            public int gen(Object message) {
                if (message instanceof Task) {
                    return ((Task) message).getPriority();
                }  else if (message.equals(PoisonPill.getInstance())) {
                    return 1000;
                } else {
                    return 100;
                }
            }
        });

    }
}

我们对PoisonPill赋予高优先级。使用自定义mailbox,我们需要在配置文件中开启配置

  priority-mailbox {
    mailbox-type = "com.tts.akkaflow.mailbox.PriorityMailbox"
  }

接下来我们将结合具体的示例,讲讲如何使用Actor和mailbox,如何将两者串起来。