这一节我们来聊一聊akka的持久化问题,首先讲一下为什么需要持久化。
假设有这样一个业务,用户在淘宝下订单,淘宝突然崩溃。当淘宝服务重启后,用户在此登录发现自己的订单数据全部没有了,这时候用户也很崩溃。
还是我们的工程例子,突然有一天,台风过境。工程项目被台风摧毁,包工头失联,工人死伤惨重,工地一片狼藉。台风走后,总得重建吧?可是怎么重建呢?所有的图纸,施工方案之前都没有保存。根本没办法做。大家都在想如果当时把这些都存档起来那该多好啊,这时候拿出来,按照之前的流程走一遍,一切搞定。这就是持久化的意义和重要性。
Akka中也有持久化,他包含两个概念
快照 snapshot
日志 Journal
两者的区别是快照是某一时刻的记录情况,有系统根据配置定期打印。而日志是我们控制的的顺序日志,我们可以从任意一种回复,具体的需要根据业务的实际情况分析。持久化可以存储到本地磁盘网络存储等等。
我们来看一个例子
public class TaskPersistActor extends AbstractPersistentActor {
......
@Override
public String persistenceId() {
return "TaskPersistActor-" + id;
}
@Override
public void preStart() throws Exception {
mat = ActorMaterializer.create(system);
queries = PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.class,
LeveldbReadJournal.Identifier());
source = queries.eventsByPersistenceId(persistenceId(), 0, Long.MAX_VALUE);
}
@Override
public Receive createReceiveRecover() {
return receiveBuilder().match(Task.class, evt -> {
state.add(evt);
}).match(SnapshotOffer.class, message -> {
source.runForeach(i -> {
Task task = (Task) ((EventEnvelope) i).event();
}, mat);
state = (TaskState) message.snapshot();
if (state != null && getNumEvents() > 0) {
ConcurrentHashMap<String, Task> events = state.getEvents();
Iterator<Entry<String, Task>> iter = events.entrySet().iterator();
while (iter.hasNext()) {
Entry<String, Task> entry = (Entry<String, Task>) iter.next();
if (((Task) entry.getValue()).getCheckMap().isEmpty()) {
events.remove(entry.getKey());
} else {
handleTaskCmd((Task) entry.getValue(), false);
}
}
}
}).match(RecoveryCompleted.class, message -> {
// log.info("taskpersistActor-" + id + " 恢复完成");
}).matchAny(message -> log.error("无法恢复事件!" + message.getClass().toString())).build();
}
@Override
public Receive createReceive() {
return receiveBuilder().match(Task.class, cmd -> {
final Task task = cmd.clone();
handleTaskCmd(task, true);
}).match(CheckMessage.class, message -> {
try {
handleCheckMessage(message);
} catch (Exception e) {
e.printStackTrace();
}
}).match(SnapshotTick.class, message -> {
// 删除历史快照
deleteSnapshot(snapshotSequenceNr());
deleteSnapshot(lastSequenceNr());
if (getNumEvents() > 0) {
saveSnapshot(state.copy());
}
}).match(SaveSnapshotSuccess.class, message -> {
// 删除历史快照
}).match(SaveSnapshotFailure.class, message -> {
log.error(" 创建快照失败!");
}).match(DeleteSnapshotSuccess.class, message -> {
// log.info(" 删除快照完成!");
}).matchAny(message -> log.error("无法处理命令!" + message.getClass().toString())).build();
}
......
}
我么可以看到,要实现持久化,需要继承持久化基类,这里我们选择 AbstractPersistentActor,接着实现他的三个方法
public String persistenceId()
public Receive createReceiveRecover()
public Receive createReceive()
persistenceId用于获取持久化Id,createReceiveRecover用于处理回复持久化的消息,createReceive用于处理普通消息。
使用持久化我们需要在配置文件中开启配置:
persistence {
journal {
plugin = "akka.persistence.journal.leveldb"
leveldb.dir = "persistence/journal"
leveldb.native = false
}
snapshot-store {
plugin = "akka.persistence.snapshot-store.local"
local.dir = "persistence/snapshots"
}
}
至此,一个基于akka的简单的系统就可以使用了。下一节我们需要来讨论下如何避免单点问题,以及提高吞吐量,我们需要引入Akka Cluster。