什么是异步

引文

我们从这样一个故事开始。某天午饭时间,大家聊得正嗨。

小A突然来了一句:“我X,这傻缺网站支付太慢了,等圈圈转了半天。最后告诉我连接银行接口支付失败,你不能做成异步啊。先告诉我成功,有支付结果再通知我,害的我等了半天”。
新来的同事小B弱弱的问了句:“哥,什么是异步啊?”
前端小C:“异步就是回调。”
小A补充说:“异步就是多线程。”
小C似懂非懂地看着我。
我啃了一口鸡腿:“嗯,他俩说的都对,又都不对。如果安卓的小D在,他可能会说异步不就是事件监听嘛。”
瞬间三人同时盯着我,说的好像很高深,你给解释解释啊。

其实要想理解什么是异步,很简单。凡是要用辩证的眼光要看待。我们先来看看什么是同步。

同步

同步是指用户在进行某个操作之后,暂停下一步操作,直到本次操作结束返回,期间持有的资源也一直占用。

举个栗子:

很久以前,还没有手机的时候。老一辈处对象,想聊聊天只能打电话,每次一打就是几个小时。期间只能对着电话你一句,我一句地呱啦呱啦说一通,啥也不能干。
我们聚焦在聊天这件事上,通过电话来聊天就是一个同步事件,聊天的双方,你一言我一语,交替进行。从一方一看,我说一句,必须等对方应答,我才能说下一句。这里电话就是竞争的资源,一个使用,其他人就不能使用。

异步

接下来,异步概念闪亮登场。什么是异步?异步可以理解为同步的对立面。

我们来想一下,有什么样的方式让两人之间可以不用你一言,我一语地等待,同时不依赖于仅有的电话资源呢。
你可能想到了写信。是的,在那个年代的确是通过写信来解决的。以当时的物价来衡量的化,打电话还是很昂贵的。所以自然而然的就可以通过写信来解决。
通过写信的方式 ,两个人不必等待。我可以把我想对你说的话一股脑全部写到信里。然后偷到邮箱,剩下的就是满心欢喜的期待,等待你的回信。
等待的时间里我可以学学三弦,拉拉京胡,京剧走起。既丰富了生活,又陶冶了情操。如果十天半月收不到回信,我就再写一封。貌似生活野蛮惬意的。
和上面的同步相比,这就是异步。

小B觉得更懵了,这不就是说写信比打电话更适合,异步比同步好吗?那我们是不是在倒退啊。
回到我们之前的观点,凡事看问题,都要从两面性出发。同步和异步的使用也是分场景的,不同的业务场景使用不同的技术。
譬如,在实时性要求比较高的场景下,譬如数据库操作,我们需要使用同步。
再举个例子:

现在你和女朋友闹别扭,你俩都要掰了,要黄了,你还在写信,慢慢腾腾的。等她收到信,估计你是没希望了。这时候,你可定不计较金钱、时间,终极目标就是在最短时间内挽回这段美好的爱情。
相比而言,使用写信的方式,更经济,占用的资源少,双方也有更大的自有,不必完全拘泥于这件事,专业术语来讲叫强耦合,强关联,但是等到对方的回应很慢,有时候邮差把新建弄丢了,可能这辈子都收不到这份回信。而打电话,更直接,相对而言花销大,双方只能一直捧着电话叽叽歪歪,但是可以立马获得对方的甜言蜜语。
那是不是异步一定就比同步的慢呢?还是那句话,要看具体的业务场景,以及具体使用的技术而定而定。譬如在上述的异步情况下,我们可以使用微信语音。同样是异步,但是速度却可以接近近实时。
我们简单总结一下,同步需要等待结果返回,会相对占用资源。异步可以先于真实的结果返回,不需要等待,操作完车后释放资源,不占用竞争资源。

# 异步的使用场景:
那在什么情况下我们需要使用异步呢?

1.任务执行可以不按顺序,即没有特定的时序性或顺序性;
2.操作需要耗费资源,譬如I/O操作,网络请求;
3.不存在共享资源的互斥访问;
4.对于实时性要求不是很高;
5.对核心任务影响不大;
5.非原子性操作;
6.非强一致性操作

异步的好处

那么使用异步有什么好处呢?

可以并发或并行执行非顺序性任务,提高执行效率;
节省资源,业务低耦合,获取更好的用户体验;
避免资源的竞争访问,避免阻塞,避免死锁;
延迟返回,提高请求的响应率

异步的实现

好了,最后我们回到最初问题的讨论,我们来看看大家关于问题的讨论。

 小C:“异步就是回调。”
 小A:“异步就是多线程。”

在我们理解了什么是异步之后我们会发现,他俩说的都对,前端的回调,或是多线程,以及事件监听,都是异步编程的提现,都能实现一定程度的异步。
但是异步并不止于此,他更多的是一种思想。上述只是他在不同语言,不同业务场景的落地。
其实他还有更多的实现。

我们从一个大的宏观的角度来看,从服务、系统的层面,每一个服务都是一个异步编程提现,譬如微服务体系中的每一服务;
我们从具体服务的架构上来看也是存在异步的,譬如我们常使用的各种中间件,消息队列MQ,
我们通过MQ将生产者和消费者相互隔离开,生产者只关心生产,消费者只关心消费;
我们再从具体的业务实现来看也是有异步的,譬如使用的业务回调,注册推送等;
再比如实现的各种语言也都有不同的异步实现,譬如C的pthread,java的多线程,python的threading,async/wait,golang的gorutine等等。
还有各种编程框架,netty,Akka,flask,C语言的libUV库等等;
甚至经典的操作系统I/O多路复用模型select,poll,epoll等等。

异步可以说无处不在。
听了我的话,他们三人若有所思的点点头。最后我要说的是,异步更重要的是一种思想,而不是特定的实现。

重新开更

自从博客的阿里云到期后,已经好久没更新了。突然想到了胡适先生的笔记:

《胡适留学日记》卷一(有关“打牌”记录全在这一卷)中有“打牌”记录累计36次(二月1次、三月1次、四月3次、五月3次、六月3次、七月11次、八月10次、九月4次),从1911年2月5日至9月6日(次日虽提及“打牌”但实未打牌,且算作1次)戒牌为止,平均6天打牌一次。在最频繁的七、八、九月,平均不到三天就打牌一次,真可谓三天两头打。

胡适记打牌之事,一般仅“打牌”两个字而已。有时会多些几个字,但似乎有为“打牌”之行为开脱之嫌。如:
2月5日,“刘千里以电话邀打牌。”【友人相邀,面子不可拂却。】
4月29日,“天时骤暖至八十度以上,不能读书,与沈、陈诸君打纸牌,又与刘、侯诸君打中国牌,以为消遣之计。”【天热好打牌。】
5月14日,“夜与刘千里诸人打牌。刘君已毕业,云下星期二将归祖国矣。”【大概打牌也算是为友人“饯行”的一种方式吧】
7月2日,“天热不能作事,打牌消遣。”【天又热了。】
7月3日,“今日天气百一十度。打牌。”【八十度就只能去打牌了,一百一十度就更甭说了。】
7月8日,“无事。打牌。天气稍凉矣。”【今天打牌不是因为天热,纯属无事闹的。】
8月4日,“化学第四小考,极不称意;平生考试,此为最下。打牌。”【因为考试成绩不称意,所以打牌以消遣。可是成绩不好,又是为什么呢?查7月17日“化学试卷竟得百分,直出意料。”同一门课,不到一个月,成绩落差如斯,谁之罪耶?打牌也。】
8月5日,“打牌。”【全天记录仅次两字,显然对昨日化学考试之成绩仍耿耿于怀。】
8月10日,“夜早睡;连日或以读书,或以打牌,恒子夜始寝,今日觉有不适,故以此矫之。”【打牌的代价不仅仅是化学成绩下降那么简单哈。】
9月4日,“今日为劳动节(Labor Day),为休息之日。打牌。”【劳动节,要休息,所以打牌,最冠冕堂皇的理由了。】

大概是觉察到了自己的堕落,胡适于是要洗心革面了。在9月6日的日记里写道,“今日,迁居世界学生会所,初次离群索居,殊觉凄冷。昨日,与金涛君相戒不复打牌。”自此日起,(留学)日记中不复出现“打牌”矣。

这样下去,我不就是先生日记里写的吗。想想买域名,买阿里云服务器花费的,那可都是白花花的银子啊。新学期,新气象,从今天起要继续开更,记录学习过程。

滑动验证码的破解(二)、具体实现

上一节我们分析了滑动验证码的破解的原理以及大概的实现方式。这一节我们将讨论下具体的技术实现细节。
工欲善其事必先利其器,我们需要借助openCV视觉处理库来处理识别,因为简单好用,本身这个图片识别也不复杂,都是固定的几何图形。

        # 载入滑块匹配图片
        src = cv2.imread(path.format('crop-',i,'.jpg'))
        # 载入背景图片
        back = cv2.imread(path.format(i,'','.png'))
        # 裁剪,这里的1/3参数可以根据实际情况调整,获取这一块图片
        start = int((back.shape[1])*10/30)
        crop = back[y:back.shape[0]-y,start:back.shape[1]]
        # 锐化滤波
        kernel = np.array([[0, -1, 0], [-1, 5, -1], [0, -1, 0]], np.float32)
        src = cv2.filter2D(src,-1,kernel=kernel)

        # 灰度化
        src = cv2.cvtColor(src, cv2.COLOR_BGR2GRAY)
        crop = cv2.cvtColor(crop, cv2.COLOR_BGR2GRAY)

        # 获取长度宽度
        h, w = src.shape[:2]
        # 匹配模板,函数自带
        ss = cv2.matchTemplate(crop,src,cv2.TM_CCOEFF_NORMED)
        min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(ss)
        # 通过左上右下定位出图形位置
        # 左上角
        left_top = max_loc
        # 右下角
        right_bottom = (left_top[0] + w, left_top[1]+h)
        # 画出矩形位置
        re = cv2.rectangle(crop, left_top, right_bottom, 255, 2)
        # 拼接查看图片
        cv2.imwrite(path.format('crop-',i,'-des.png'), re, [int(cv2.IMWRITE_JPEG_QUALITY)])

核心的逻辑还是很简单的:

    1.分别载入识别图形和背景阴影图片;
    2.我们分析图片可以看出,具体的阴影位置只能存在于和目标滑块在一条线上;
    3.获取这一块图片,以加快识别,减少不必要的浪费;
    4.对图像锐化滤波,
    5.对图片灰化处理,方便识别
    6.匹配模板,找出阴影位置
    7.通过上一步获取的结果,再返回到原图中裁剪,匹配出最终位置

总的来说,使用自带的匹配模板还是很简答的,避免了我们徒手实现。但是我们也不是盲目使用,我们需要在真正处理前对图片进行一些处理。通过观察分析,我们可以看到,因为这种滑动验证码只能进行水平滑动,那就意味着最终的识别区域必将存在于和左侧滑块处于同一条水平线线上的区域。那么我们离目标又近了一步,我们只需要处理其中一部分图片就可以了。我们需要对图片进行裁剪,即可获取这一块图片。
当然图片还可以进行其他的算法处理用以获得更精确结果,小伙伴可以自行尝试。
我们来看一下识别的结果:


识别率还是可以的,我们粗略地统计了一下,识别成功率大概在90%左右。感兴趣的小伙伴可以优化一下图片处理逻辑,使用更好的处理算法以提高成功率。
再回到上一节,我们获得了图片的具体数据,只要在加上验证码在屏幕中的相对位置,我们就可以计算出绝对数据。最后把这些参数传递给selenium,写出驱动脚本,直接让他去滑动,我们的问题最终也就解决了。

总结

总的来说,滑动验证码识别实现起来,不是很困难。关键在于怎么分析出问题,以及找到处理问题的方法、步骤,然后按顺序去处理。
所谓道高一尺,魔高一丈。今天你破解了一种验证码,明天又会出现新形式的验证码。两者此消彼长,没有孰强孰弱。而两者真正的平衡点,在于正常用户能够理解并能完成验证罢了。
本文只做技术上的交流探讨,所有素材选取均出自互联网,不针对任何第三方。如由此引发的其他相关法律、法规问题,与本网站无关。

滑动验证码的破解(一)、原理介绍

最近有小伙伴和我讨论了验证码破解识别的话题,希望我帮忙实现一个滑动验证码破解识别的原型。今天我们就来聊聊这个话题。
验证码的破解是一个很有趣的话题,破解的而方式也有很多。譬如最机械的使用人工点击,亦或是相关的视觉处理算法,甚至是近年来很火热的机器学习等等。近年来,随着识别的技术提高,相应的验证码制造识别的难度也在提高,表现出越来越智能化。
下面我们直接进入主题,谈一谈今天的主角滑动验证码的破解。首先,我们来看一下,滑动验证码长啥样,我随便上网找了个图片。


再来看一张

我们可以看到大概是这样的,页面上有一张图片,图片右侧有两个随机的几何图形的阴影图片,图片左侧有一个目标几何图形。图片下方是一个滑块。
具体验证逻辑是这样的

    拖动下方滑块;
    上方的几何图形会跟着滑块移动;
    移动上方的几何图形,直到几何图形达到右侧相似阴影;
    识别完成。

有的小伙伴可能觉得我讲这个很啰嗦,这么简单的逻辑谁不知道。我为什么要介绍这个呢?后文的破解识别和这个有很大的关系,俗话说知己知彼,百战不殆。
言归正传,我们究竟应该如何做呢?我们想要破解识别出结果,必须要解决两个问题

    1.找出图中相同几何阴影图片的位置。
    2.计算出目标几何图片滑块和相同几何阴影图片的位置距离。

我们带着问题去寻找答案,似乎就不在那么困难了。
通过上面的分析,我们会发现一些有趣现象。

    1.图中几何阴影的识别不就转换成了图片识别问题了吗?
    2.位置识别我也许不是很好计算,我们可以换一个思路,上面的分析我们知道目标滑块和拖动滑块是同进退的,我们只要在上一步的基础上计算下方出滑块和相同几何阴影图片的位置距离,问题不就迎刃而解了吗?

有了上面的数据,我们就可以通过计算出图片在整个屏幕的相对位置,计算出最终的滑动距离。接下来问题就好办多了,八仙过海,各显神通。
我的小伙伴告诉我,它使用的是Selenium来实现的。记得我以前使用过Appium来实现过APP端的自动化的测试,他和Selenium的原理一样。我们只要写几行代码驱动Appium,那么从PC到移动端,所有的场景都能完美解决,理论上讲所有的技术障碍我们都已经扫清了。
当然这里也有一些问题,比如说Selenium如何模拟人类的滑动操作,模仿出一个变速抖动运动轨迹等等问题,这些本文不做讨论。我们只关心滑动验证码本身问题。回过头来,我们继续聚焦初始问题,实现图片的识别和距离计算。到底需要怎么做呢?下一节,我们将通过撸代码的方式为大家做具体的剖析。

mybatis升级导致的一次异常

这个故事是这样开始的,某部门的小伙伴,突然有一天找我,火急火燎的说线上业务出问题了。之前线上运行的代码没有任何问题,突然某功能的一条 SQL 执行异常。之前是好好的啊,怎么突然就不行了。查询了好多资料,也检查了代码,也不知道错在了哪里。
通过分析,发现了一个很奇怪的现象,SQL 语句写的是没有任何问题的,通过开启 SQL 语句 Debug 打印,其实也显示查询到了记录,但是返回给业务层的时候,始终没有数据,返回 java.lang.NullPointerException 异常。
根据经验判断,应该是 mybatis 代码的问题,经过询问得知,小伙伴使用了一个开源框架,他升级了这个框架,附带地 mybatis 的版本也跟着升级了。
有了这个答案,似乎问题就简单很多了,我们只要找到问题是如何引起的就解决了。我们首先想到应该是mybatis在做 resultMap 映射的时候出了异常。我们进入 ResultSetHandler 的实现类 DefaultResultSetHandler 中寻找答案。

  private void handleRowValuesForSimpleResultMap(ResultSetWrapper rsw, ResultMap resultMap, ResultHandler<?> resultHandler, RowBounds rowBounds, ResultMapping parentMapping)
      throws SQLException {
    DefaultResultContext<Object> resultContext = new DefaultResultContext<>();
    ResultSet resultSet = rsw.getResultSet();
    skipRows(resultSet, rowBounds);
    while (shouldProcessMoreRows(resultContext, rowBounds) && !resultSet.isClosed() && resultSet.next()) {
      ResultMap discriminatedResultMap = resolveDiscriminatedResultMap(resultSet, resultMap, null);
      // 处理单行RowValue
      Object rowValue = getRowValue(rsw, discriminatedResultMap, null);
      storeObject(resultHandler, resultContext, rowValue, parentMapping, resultSet);
    }
  }


  private Object getRowValue(ResultSetWrapper rsw, ResultMap resultMap, String columnPrefix) throws SQLException {
    final ResultLoaderMap lazyLoader = new ResultLoaderMap();
    Object rowValue = createResultObject(rsw, resultMap, lazyLoader, columnPrefix);
    if (rowValue != null && !hasTypeHandlerForResultObject(rsw, resultMap.getType())) {
      final MetaObject metaObject = configuration.newMetaObject(rowValue);
      boolean foundValues = this.useConstructorMappings;
      if (shouldApplyAutomaticMappings(resultMap, false)) {
        foundValues = applyAutomaticMappings(rsw, resultMap, metaObject, columnPrefix) || foundValues;
      }
      foundValues = applyPropertyMappings(rsw, resultMap, metaObject, lazyLoader, columnPrefix) || foundValues;
      foundValues = lazyLoader.size() > 0 || foundValues;
      rowValue = foundValues || configuration.isReturnInstanceForEmptyRow() ? rowValue : null;
    }
    return rowValue;
  }

// 处理自动映射
  private boolean applyAutomaticMappings(ResultSetWrapper rsw, ResultMap resultMap, MetaObject metaObject, String columnPrefix) throws SQLException {
    List<UnMappedColumnAutoMapping> autoMapping = createAutomaticMappings(rsw, resultMap, metaObject, columnPrefix);
    boolean foundValues = false;
    if (!autoMapping.isEmpty()) {
      for (UnMappedColumnAutoMapping mapping : autoMapping) {
        final Object value = mapping.typeHandler.getResult(rsw.getResultSet(), mapping.column);
        if (value != null) {
          foundValues = true;
        }
        if (value != null || (configuration.isCallSettersOnNulls() && !mapping.primitive)) {
          // gcode issue #377, call setter on nulls (value is not 'found')
          metaObject.setValue(mapping.property, value);
        }
      }
    }
    return foundValues;
  }


// 创建自动映射,主要是在这个方法中
    private List<UnMappedColumnAutoMapping> createAutomaticMappings(ResultSetWrapper rsw, ResultMap resultMap, MetaObject metaObject, String columnPrefix) throws SQLException {
    final String mapKey = resultMap.getId() + ":" + columnPrefix;
    // 解析出未映射的列,返回的结果
    List<UnMappedColumnAutoMapping> autoMapping = autoMappingsCache.get(mapKey);
    if (autoMapping == null) {
      autoMapping = new ArrayList<>();
      // 从resultSet中获取未映射的列名
      final List<String> unmappedColumnNames = rsw.getUnmappedColumnNames(resultMap, columnPrefix);
      // 循环分析处理
      for (String columnName : unmappedColumnNames) {
        String propertyName = columnName;
        if (columnPrefix != null && !columnPrefix.isEmpty()) {
          // When columnPrefix is specified,
          // ignore columns without the prefix.
          if (columnName.toUpperCase(Locale.ENGLISH).startsWith(columnPrefix)) {
            propertyName = columnName.substring(columnPrefix.length());
          } else {
            continue;
          }
        }
        // 看下列在最终的对象中是否存在
        final String property = metaObject.findProperty(propertyName, configuration.isMapUnderscoreToCamelCase());
        // 如果存在,就进行自动映射处理
        if (property != null && metaObject.hasSetter(property)) {
            /* 如果resultMap中包含这个列,退出本轮循环。
            问题就出在这里,如果查询返回的列在resultMap中有显示定义,那么就会结束本次处理;
            这时候存在这样一个问题,如果一张表中有5个字段,resultMap中有三个字段,
            且对应于表中三个字段,那么mybatis会认为这三个字段已经映射了,
            导致autoMapping为空,这时候就会出现业务返回空指针,这个应该是一个bug
            */

          if (resultMap.getMappedProperties().contains(property)) {
            continue;
          }
          final Class<?> propertyType = metaObject.getSetterType(property);
          if (typeHandlerRegistry.hasTypeHandler(propertyType, rsw.getJdbcType(columnName))) {
            final TypeHandler<?> typeHandler = rsw.getTypeHandler(propertyType, columnName);
            autoMapping.add(new UnMappedColumnAutoMapping(columnName, property, typeHandler, propertyType.isPrimitive()));
          } else {
            configuration.getAutoMappingUnknownColumnBehavior()
                .doAction(mappedStatement, columnName, property, propertyType);
          }
        } else {
          configuration.getAutoMappingUnknownColumnBehavior()
              .doAction(mappedStatement, columnName, (property != null) ? property : propertyName, null);
        }
      }
      autoMappingsCache.put(mapKey, autoMapping);
    }
    return autoMapping;
  }

通过上面的代码分析,我们找到了问题的根源,mybatis 会把表中返回的字段能够对应 resultMap 属性字段过滤掉。这就导致当返回的字段和 resultMap 全部对应时,返回的 autoMapping 映射 List 为空,自然导致映射失败。
既然我们已经知道了问题存在这一行代码,那怎么修改呢?

     if (resultMap.getMappedProperties().contains(property)) {
            continue;
          }

这里有几种方法:一、是使用 resultType,因为我们问题是单表的查询直接映射,理论上讲应该使用 resultType,这时候不去解析 resultMap 映射,这样就避免了过滤,但是线上有可能还有其他功能存在这样的使用情况,逐一去修改是不可能的。
二、回退框架 ,回退到设计之前的版本,但是小伙伴反馈,以基于新版本开发,线上已经有新版本特性的代码,回退是不可能回退的,这辈子都不可能回退的,其他的也干不了,只能依靠第三种方法。
三、比对了升级的代码,发现 mybatis 从 3.4.3 引入了这行代码,日志显示是为了解决复杂 map 的嵌套 bug,避免无意义的解析。在我们的实际使用场景中,没有使用这样的情况,于是我们决定走第三种方案,修改源码。
但是问题来了,修改了源码将来再次升级 mybatis 怎么办,会不会忘记这个 bugfix?最终我们决定这样做,我们将这个文件单独修改,同时生成一个 bugfix 的 jar 包,基于 JVM 的 classLoader 双亲委托特性,想办法让 JVM 优先加载我们 jar 包里的这个类,这样既避免了修改 mybatis 源码,也可以不用修改线上业务代码,还能解决我们的问题,将来也可以平滑升,一举多得。最终经过验证,成功解决。

总结

-   对于不熟悉的框架,升级需谨慎;
-   开源的框架问题,不要害怕,勇敢的看他的源码;
-   遇到解决不了的问题,不妨换个思路,从其他方向寻找突破。这才是这次问题最宝贵的经验,问题本身不重要,重要的是解决的思路。

高并发异步框架Akka使用(七)、分布式Cluster

在前面的章节我们提到服务的单点以及吞吐量问题,为此我们需要引入分布式 Cluster。
在此之前我们需要做一些配置


actor { provider = "akka.cluster.ClusterActorRefProvider" serializers { kryo = "com.twitter.chill.akka.AkkaSerializer" proto = "akka.remote.serialization.ProtobufSerializer" } serialization-bindings { #"scala.Product" = kryo "com.google.protobuf.Message" = proto "java.io.Serializable" = kryo } } remote { log-remote-lifecycle-events = off netty.tcp { hostname = "127.0.0.1" port = 8000 } } cluster { seed-nodes = [ "akka.tcp://AkkaTaskProcessing@127.0.0.1:8000", "akka.tcp://AkkaTaskProcessing@127.0.0.1:8001"] # Disable legacy metrics in akka-cluster. metrics.enabled=off }

我们启动两个服务,分别开启8000和8001端口。第一个配置8000,另一个服务中的port配置为8001。
我们来看一下代码

public class SimpleClusterListenerActor extends UntypedAbstractActor {
   Cluster cluster = Cluster.get(getContext().system());
    @Override
    public void preStart() {
              cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(), MemberEvent.class, UnreachableMember.class);      
    }

    @Override
    public void postStop() {
        cluster.unsubscribe(getSelf());
    }

    @Override
    public void onReceive(Object message) {
        if (message instanceof MemberUp) {
            MemberUp mUp = (MemberUp) message;
            log.info("上线: {}", mUp.member());

        } else if (message instanceof UnreachableMember) {
            UnreachableMember mUnreachable = (UnreachableMember) message;
            log.info("检测不到: {}", mUnreachable.member());

        } else if (message instanceof MemberRemoved) {
            MemberRemoved mRemoved = (MemberRemoved) message;
            log.info("移除: {}", mRemoved.member());

        } else if (message instanceof MemberEvent) {

        } else {
            unhandled(message);
        }

    }
}

我们还是集成actor,实现onReceive方法。我们在启动时开启集群消息订阅,结束时关闭订阅。
同时在onReceive方法中判断消息类型,不同的消息这里仅给出打印日志,具体日志具体处理。
通过实验,我们会发现启动一台服务后,在启动另一台,会打印出上线信息,关闭其中一台,也会发现打印下线信息。而且还会自动进行负载均衡。

总结

至此,Akka的系列笔记结束了,akka的强大功能其远远不止笔记上说的这些内容,实际应用还是需要多探索,多学习。

高并发异步框架Akka使用(六)、Persistent

这一节我们来聊一聊akka的持久化问题,首先讲一下为什么需要持久化。
假设有这样一个业务,用户在淘宝下订单,淘宝突然崩溃。当淘宝服务重启后,用户在此登录发现自己的订单数据全部没有了,这时候用户也很崩溃。
还是我们的工程例子,突然有一天,台风过境。工程项目被台风摧毁,包工头失联,工人死伤惨重,工地一片狼藉。台风走后,总得重建吧?可是怎么重建呢?所有的图纸,施工方案之前都没有保存。根本没办法做。大家都在想如果当时把这些都存档起来那该多好啊,这时候拿出来,按照之前的流程走一遍,一切搞定。这就是持久化的意义和重要性。
Akka中也有持久化,他包含两个概念

    快照 snapshot
    日志 Journal

两者的区别是快照是某一时刻的记录情况,有系统根据配置定期打印。而日志是我们控制的的顺序日志,我们可以从任意一种回复,具体的需要根据业务的实际情况分析。持久化可以存储到本地磁盘网络存储等等。
我们来看一个例子

public class TaskPersistActor extends AbstractPersistentActor {
    ......
          @Override
    public String persistenceId() {
        return "TaskPersistActor-" + id;
    }

    @Override
    public void preStart() throws Exception {
        mat = ActorMaterializer.create(system);
        queries = PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.class,
                LeveldbReadJournal.Identifier());
        source = queries.eventsByPersistenceId(persistenceId(), 0, Long.MAX_VALUE);
    }

    @Override
    public Receive createReceiveRecover() {
        return receiveBuilder().match(Task.class, evt -> {
             state.add(evt);
        }).match(SnapshotOffer.class, message -> {
            source.runForeach(i -> {
                Task task = (Task) ((EventEnvelope) i).event();
                          }, mat);
            state = (TaskState) message.snapshot();
            if (state != null && getNumEvents() > 0) {
                ConcurrentHashMap<String, Task> events = state.getEvents();
                Iterator<Entry<String, Task>> iter = events.entrySet().iterator();
                while (iter.hasNext()) {
                    Entry<String, Task> entry = (Entry<String, Task>) iter.next();
                    if (((Task) entry.getValue()).getCheckMap().isEmpty()) {
                        events.remove(entry.getKey());
                    } else {
                        handleTaskCmd((Task) entry.getValue(), false);
                    }

                }
            }
        }).match(RecoveryCompleted.class, message -> {
            // log.info("taskpersistActor-" + id + " 恢复完成");
        }).matchAny(message -> log.error("无法恢复事件!" + message.getClass().toString())).build();
    }

    @Override
    public Receive createReceive() {

        return receiveBuilder().match(Task.class, cmd -> {
            final Task task = cmd.clone();
            handleTaskCmd(task, true);
        }).match(CheckMessage.class, message -> {
            try {
                handleCheckMessage(message);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).match(SnapshotTick.class, message -> {

            // 删除历史快照
            deleteSnapshot(snapshotSequenceNr());
            deleteSnapshot(lastSequenceNr());
            if (getNumEvents() > 0) {
                saveSnapshot(state.copy());
            }
        }).match(SaveSnapshotSuccess.class, message -> {
            // 删除历史快照
        }).match(SaveSnapshotFailure.class, message -> {
            log.error(" 创建快照失败!");
        }).match(DeleteSnapshotSuccess.class, message -> {
            // log.info(" 删除快照完成!");
        }).matchAny(message -> log.error("无法处理命令!" + message.getClass().toString())).build();
    }
    ......
}

我么可以看到,要实现持久化,需要继承持久化基类,这里我们选择 AbstractPersistentActor,接着实现他的三个方法

public String persistenceId() 
public Receive createReceiveRecover()
public Receive createReceive() 

persistenceId用于获取持久化Id,createReceiveRecover用于处理回复持久化的消息,createReceive用于处理普通消息。
使用持久化我们需要在配置文件中开启配置:

    persistence {  
      journal {  
        plugin = "akka.persistence.journal.leveldb"  
        leveldb.dir = "persistence/journal"  
        leveldb.native = false  
      }  
      snapshot-store {  
        plugin = "akka.persistence.snapshot-store.local"  
        local.dir = "persistence/snapshots"  
      }  
    } 

至此,一个基于akka的简单的系统就可以使用了。下一节我们需要来讨论下如何避免单点问题,以及提高吞吐量,我们需要引入Akka Cluster。

高并发异步框架Akka使用(五)、Actor实战

上一节我们介绍了supervisor,包工头的工作模式。这一节我们来学一下具体的工人是如何工作的,前面我们提到过每一个工人都是一个actor,自然都需要继承actor,按照actor方式干活。

public class TaskActor  extends UntypedAbstractActor {
    ......
          @Override
    public void onReceive(Object message) throws Throwable {
       if (message instanceof Task){
           final Task task = ((Task)message).clone();
           handleTaskCmd(task, true);
       }else {
           log.error("unsupport!" + message.getClass().toString());
       }

    }


        private void handleTaskCmd(final Task event, boolean persistFlag) {
        ......
        if (!mcRelVector.isEmpty()) {
            // 是否需要持久化
            if (persistFlag) {
                String uuid = UUID.randomUUID().toString();
                event.setBatchUuid(uuid);
                ......
                // 创建日志记录
                insertCheckLog(event);
            }
            // 处理正常业务逻辑
            handleTaskEvt(event, mcRelVector);
        }
    }
}

    private void handleTaskEvt(final Task event, Set<MonitorsChecksRelated> mcRelVector) {

        ActorRef monitorActor = getContext().actorOf(
                springExtension.props("monitorActor").withDispatcher("thread-pool-dispatcher"),
                "monitorActor" + System.currentTimeMillis() + "-" + System.nanoTime());
        getContext().watch(monitorActor);

        for (MonitorsChecksRelated mcRelated : mcRelVector) {
           ......
            // 循环检查项,如果是无顺序的,则生成多个actor并发执行,否在按顺序在一个actor中执行
                ActorRef monitorActorOnce = getContext().actorOf(
                        springExtension.props("monitorActor").withDispatcher("thread-pool-dispatcher"),
                        "monitorActorOnce" + System.currentTimeMillis() + "-" + System.nanoTime());
                getContext().watch(monitorActorOnce);
                monitorActorOnce.tell(monitorsChecksRelated, getSelf());


        }
        // 发送毒丸,终止顺序actor
            monitorActor.tell(PoisonPill.getInstance(), ActorRef.noSender());

    }
    ```

示例代码实现了onReceive方法,他也只处理task,为了演示actor之间的消息发送方便,我们把taskactor比作二级包工头,他下面也有工人monitoractor,很显然他手下的工人要由他来创建,创建完直接叫他干活,他找都是临时工。毕竟一级一级分包下来,也没什么大钱赚,养工人还是一笔很大的开销。任务分法完成以后,他在给大家发一条短信,任务没有了,你们干结束了就结账走人,下次有活我再找你们。
这份短信就是我们的PoisonPill。而短信的发送就是我们的tell方法。每个actor之间通过tell发送短信交互。我们这里使用的无响应模式,还可以使用有响应模式。比如工人做完活了,需要tell一下二级包工头,那么包工头需要在自己的onReceive方法中新增一种类型,实现响应的处理就好了。
```java 
#工人调用sender获取二级包工头,执行tell方法告诉工头自己干完了。
sender().tell(result, self());

这一节,我们主要介绍了Actor之间是如何进行通信的。主要就是集成actor,实现onReceive方法。下一节我们将介绍一下Akka的持久化问题。

高并发异步框架Akka使用(四)、Supervisor

在Akka中存在着这样一种监管策略,Supervisor 监督管理。
Akka中每个Actor是由其他的Actor来创建,意思是 ,每个Actor都有一个父类,记录在父类的上下文中。每个父类Actor监督管理所有子类Actor。任何一个子类Actor产生异常行为时,父类Actor需要处理。
这里我们在做一个类比,每一个actor都是一个工人,而Supervisor则是一个包工头。项目的工人数目有包工头控制,工人病假、休息等活动有包工头安排,使用不同的策略。

通常对于子类Actor异常处理有不同决策。具体的决策包括以下几种:

Restart:重新启动出现异常的Actor;
Stop:退出出现异常的Actor;
Resume:忽略异常,保留其当前状态,继续执行;
Escalate:自己也不知道如何处理,异常抛出给上层,交给自己的父类处理。
任何一个新Actor,在其基类中都是有默认的Supervision实现的。Akka提供了两种策略实现:OneForOneStrategy和AllForOneStrategy。

    OneForOneStrategy:只有当前抛出异常的Actor响应该策略;
    AllForOneStrategy:所有的Actor都响应该策略。

默认使用OneForOneStrategy。


/** * When supervisorStrategy is not specified for an actor this * `Decider` is used by default in the supervisor strategy. * The child will be stopped when [[akka.actor.ActorInitializationException]], * [[akka.actor.ActorKilledException]], or [[akka.actor.DeathPactException]] is * thrown. It will be restarted for other `Exception` types. * The error is escalated if it's a `Throwable`, i.e. `Error`. */ final val defaultDecider: Decider = { case _: ActorInitializationException ⇒ Stop case _: ActorKilledException ⇒ Stop case _: DeathPactException ⇒ Stop case _: Exception ⇒ Restart } /** * When supervisorStrategy is not specified for an actor this * is used by default. OneForOneStrategy with decider defined in * [[#defaultDecider]]. */ final val defaultStrategy: SupervisorStrategy = { OneForOneStrategy()(defaultDecider) }

当actor发生初始化、结束、以及死信异常时,直接停止。其他异常重启actor。

同样,我们需要实现一个Supervisor,让他来监督我们的work。所以我们需要在系统初始化的时候就建立好supervisor。好比工程开始要联系好开发商,找好包工头。

 ActorSystem   actorSystem = ActorSystem.create();
 private static ActorRef supervisor = actorSystem.actorOf(ext.props("supervisor").withMailbox("akka.priority-mailbox"));

我们使用ActorSystem的actorOf方法创建。

  /**
   * Create new actor as child of this context with the given name, which must
   * not be null, empty or start with “$”. If the given name is already in use,
   * an `InvalidActorNameException` is thrown.
   *
   * See [[akka.actor.Props]] for details on how to obtain a `Props` object.
   *
   * @throws akka.actor.InvalidActorNameException if the given name is
   *   invalid or already in use
   * @throws akka.ConfigurationException if deployment, dispatcher
   *   or mailbox configuration is wrong
   * @throws UnsupportedOperationException if invoked on an ActorSystem that
   *   uses a custom user guardian
   */
  def actorOf(props: Props, name: String): ActorRef

我们持有一个ActorRef对象,而不是直接持有Actor,ActorRef与Actor相比,可以理解成他是一个props类型对象的引用,我们不关心具体的这个对象有Actor system来管理。这样更加灵活,比如这个actor奔溃了,actor system会为我们在生成一个。

好比我们工地上需要一个钢筋工,我不会找张三或者李四,我找告诉包工头,我需要一个钢筋工,至于是谁由包工头安排,比如今天是张三来了,可能明天张三请假了,那就由李四来。这样应该很好理解。
好了,有了包工头我们就可以开始工作了。我们知道,包工头也是这个系统中的一颗螺丝,他也是一个actor,默认他也需要集成actor,他的任务就是给其他人发任务。

public class Supervisor extends UntypedAbstractActor {

    private final LoggingAdapter log = Logging.getLogger(getContext().system(), "Supervisor");

    private Router router;

    @Override
    public void preStart() throws Exception {

        List<Routee> routees = new ArrayList<Routee>();
        for (int i = 0; i < ConstantsType.MAX_WORK_ACTOR_AMOUNT; i++) {
            ActorRef actor = getContext().actorOf(ext.props("taskActor", i), "taskActor" + i);
            // 监控
            getContext().watch(actor);
            routees.add(new ActorRefRoutee(actor));
        }
        /**
         * RoundRobinRoutingLogic: 轮询
         * BroadcastRoutingLogic: 广播
         * RandomRoutingLogic: 随机
         * SmallestMailboxRoutingLogic: 空闲
         */
        router = new Router(new RoundRobinRoutingLogic(), routees);
        super.preStart();
    }

    @Override
    public void onReceive(Object message) throws Exception {

        if (message instanceof Task) {
            // 进行路由转发
            router.route(message, getSender());
        } else {
            log.error("unsupport message {}", message);
        }
    }

    @Override
    public void postStop() throws Exception {
        log.info("stop");
        super.postStop();
    }
}

这里引入了一个Router,路由的概念相当于联系人、号码簿。在preStart中,我们创建了一定数量的actor,通过Routee,关联到Router。
好比包工头要干活,手下要维持一个50人的小团队,每个人就是一个actor,他怎么管这些人呢,他把每个人的手机号码记下来,号码就相当于Routee。号码簿就相当于Router。其中的watch方法就相当于和工人签了合同。之后有任务就可以找工人来干活。
那有任务了到底找谁呢?包工头想了四种方法:

    RoundRobinRoutingLogic: 轮询,按号码簿挨个找
    BroadcastRoutingLogic: 广播,按号码簿群发
    RandomRoutingLogic: 随机,从号码簿中随机选一个
    SmallestMailboxRoutingLogic: 空闲,看谁在家闲的蛋疼找谁

为了公平起见,我们替包工头选一个轮询方法。
一切准备妥当,粮草齐备,就等着大展宏图。我们前面介绍过,每个包工头都是一个actor,自然他也有mailbox,会收到mail。actor系统会自动处理mail到onReceive方法,我们只要实现这个方法就好了。
包工头比较懒,他只接受工程甲方发来的任务信息 ,其他的一律不处理。收到甲方的任务之后,通过发短信通知手下的工人来干活。具体怎么通知,通知谁上面已经介绍过了。
乍一看,还是很简单的,这一节我们简单介绍了supervisor。以及一个actor处理的 基本流程。下一节我们将介绍一下具体的actor如何处理,以及actor之间是如何交流通信的。

高并发异步框架Akka使用(三)、Mailboxes

在Akka中,除了Actor,另一个很重要的概念就是Mailboxes。他是Akka的基石之一,掌握了这两个概念,对于Akka你基本上算是有了一定的理解。
Mailboxes顾名思义,就是邮箱的意思,Akka中的邮箱保存actor的信息。正常情况下,每个actor都有一个自己的邮箱。但对于Balancingpool,所有路由对象将共享一个邮箱实例,这一点需要特别注意。
这里我们再来打一个比方,我们把actor类比为公司老总,那么mailbox就像相当于是老总的助理。各个老总比较忙,他们不会直接交流,而是通过给他们的秘书发邮件,让她的秘书来安排。
譬如google的boss想找Oracle的boss喝酒,Oracle的boss日理万机,没时间应付。所以需要由助理安排行程,那么google的boss先给Oracle的boss助理发一份邮件,告诉他自己想和他的boss喝一杯。接下来就等助理安排行程,助理一看,前面有微软的boss预约了想要一起唱K,Facebook的boss约了一起听京剧,那就按预约的顺序安排吧。
这是时候有两种情况:

    1.微软的boss要求如果去唱K就给他回一封邮件。
    2.Facebook的boss则比较随意,去不去随便,反正我也约了其他人;
    你爱去不去,不用告诉我。我是团购的票,送给你做个顺水人情。

这就是两种不同的处理方式,有响应处理和无响应处理。这两种方式没有好坏、优劣,主要看业务的要求。
Akka中的mailbox主要有两类分别是:unboundedMailbox 和boundedMailbox,他们主要的区别是邮箱是否有容量界限,在这这两种类型之上,他们还可以分为是否阻塞,优选级权限等类型的邮箱。
unboundedMailbox主要包括:

    UnboundedMailbox (default)
    SingleConsumerOnlyUnboundedMailbox 
    NonBlockingBoundedMailbox
    UnboundedControlAwareMailbox
    UnboundedPriorityMailbox
    UnboundedStablePriorityMailbox
    UnboundedPriorityMailbox

boundedMailbox主要包括:

    BoundedMailbox
    BoundedPriorityMailbox
    BoundedStablePriorityMailbox
    BoundedStablePriorityMailbox
    BoundedPriorityMailbox
    BoundedControlAwareMailbox

默认使用的是UnboundedMailbox,无阻塞,无容量上界。这就是mailbox的基本概念。
比如我们实现一个自己的mailbox,对于一些特殊的操作,我们需要优先执行:

public class PriorityMailbox extends UnboundedStablePriorityMailbox {

    public PriorityMailbox(ActorSystem.Settings settings, Config config) {
          super(new PriorityGenerator() {
            @Override
            public int gen(Object message) {
                if (message instanceof Task) {
                    return ((Task) message).getPriority();
                }  else if (message.equals(PoisonPill.getInstance())) {
                    return 1000;
                } else {
                    return 100;
                }
            }
        });

    }
}

我们对PoisonPill赋予高优先级。使用自定义mailbox,我们需要在配置文件中开启配置

  priority-mailbox {
    mailbox-type = "com.tts.akkaflow.mailbox.PriorityMailbox"
  }

接下来我们将结合具体的示例,讲讲如何使用Actor和mailbox,如何将两者串起来。