高并发异步框架Akka使用(二)、Actor模型

提到Akka就不能不提Actor模型,他是Akka绕不过的话题,也是Akka的核心所在。
那么究竟什么是Actor模型呢?
我们可以先建立这样一个概念,一个Actor就对应于Java中的一个Java Bean,是一个边界实体。但是他不完全等同于Java Bean,或者说这是一个错误的等价。之所以这么说,就是想给大家有个经验上的概念。
一个Actor从内部看,可以做很多事,但从外部来看,他的职能又是单一的。这一概念可能 不太好理解,我们暂且放下,后面会慢慢介绍。
Actor和Actor之间不能直接调用或交互,他们通过邮件来交互,每个Actor都有一个自己的信箱,里面存放着其他Actor发来的邮件。同时每个Actor也可以给别人发邮件。
参见下面这张图,有四个actor,actor之间通过mail交互通信,他们共同组成了一个完整的系统,就是Actor System。

Actor可以处理不同类型的邮件,每一个类型的邮件有一个专门的处理方法。
好了,我们现在来做一个通俗的类比,让大家有一个更直观的理解。
Actor System就好比我们这个世界,我们每一个人就是一个Actor,我们每个人有不同的技能,有的人会织衣,有的人会捕鱼,有的人会打猎。
现在有这样一个需求,有一个人A,需要一批貂皮大衣和鱼皮马甲,他自己啥也不会,怎么办呢?他决定找其他人帮忙:

    1.A给猎人发了一封信,让猎人帮他打一只貂,
    2.猎人打好了貂,通过顺丰发给了他,于是A有了貂皮;
    3.A给渔夫发了一封信,让农夫帮他打一条鱼,
    4.渔夫打好了鱼,通过圆通发给了他,于是A有了鱼皮;
    4.A又给织衣人发一封信,同时把貂皮、鱼皮通过顺丰寄给了他,让织衣人帮他做貂皮大衣和鱼皮马甲。
    5.织衣人做好了貂皮大衣和鱼皮马甲用顺丰发给了他,最终他拥有了貂皮大衣和鱼皮马甲。

每个人都有自己的特殊技能,比别人做的又快又好。这样就构成了一个完整的世界,一个人人有衣穿,人人有田种的世界。每个人各司其职,其乐融融。
以上就是actor和actor system的简单类比。
可是大家会有这样的疑问:为什么A自己不打猎、捕鱼、织衣,交给这么多人做,不是更麻烦吗?万一其中一件快递丢了呢?那不是满盘皆输。
仔细想一想,确实是这样的,生活中掌握多项技能的人大有人在。但是当你回头想想,你会发现这样的模型存在问题:

    如果只交给一个人,他去世了怎么办?
    现在又有人提出要做貂皮马甲怎么办?
    如果很多人需要大衣和马甲,都来找他,他做不完怎么办?
    同样存在的问题,他们也会存在快递丢失;

回到我们的软件系统中,大家不难发现,这不就是一个传统的All in one的系统吗?所有的功能都整合在这个大系统中;一旦需求有变动,需要修改很多代码;请求一大,负载狂飙;因为一些其他问题,大致请求失败,业务被迫中断。
因此我们需要一个解决的办法,这也就是微服务产生的根源。这时候,你可能会发现,Akka还真有点像那么回事。

    akka中有不同的actor,处理不同的任务
    通过actor模型拆分后,我们可以构建异构的系统
    对于来往的邮件,Akka支持持久化,我们可以做持久化,在出错时可以重做。
    Akka还支持集群,同一类型任务可以交给集群处理,避免单点故障。

通过这样的改进,我们可以做很多事情。譬如现在织衣人有钱了,他自己不用亲自织衣服了,他买了机器,通过织衣服的机器来做。对于整个系统来说,变化不大,大家还是给他发邮件,寄材料。对于别人来说,他的变化是无感知的。同样的,越来越多的人加入织衣行列,大家也不必关心,只管发邮件,寄材料,所有的这些由织衣人自己来解决。这样就不会出现一个织衣人病了,大家就不能收到衣服的情况。
这时候还有可能有另外一种情况,现在是冬天,大雪纷飞,那么织衣人就要先织造貂皮大衣,鱼皮马甲可能要暂停,待到冬去春来再去织马甲,或者根本就不去织马甲。
传统的系统,遇到流量高峰,想要去实现这样的需求还是很难的,首先剥离织貂皮大衣和马甲就是一件很麻烦的事,或者说根本没办法实现,更不要说后面再去重新织马甲。
而Akka框架却很容易做到,只要新增加一个Actors就好了,将原先织马甲的逻辑剪切到新的Actor中,新的Actor就拥有了这项技能,当流量高峰来临时,我们保证主要、核心业务正常,其他的暂时关停,持久化相关数据,等到系统正常时候再补做。总体来说还是很方便的。
Akka的有点远不止此,当然这些都是和微服务息息相关的,结合起来还是很好理解的。
在Akka框架中actor是其中最小的执行粒度,是Akka中的基础模型。类似于Java中的线程,但是他比线程轻量很多,更像是golang中的协程的概念。

高并发异步框架Akka使用(一)、介绍

曾几何时你是否迷茫,苦于寻找一个能使简单实现出高并发的异步编程框架,自己动手写又不现实,到底哪里能找到这样一个好用的框架呢?或是你想寻找一个能够实现微服务的框架,亦或是你想实现一下DDD的编程。
如果你有这个困惑,是时候了解一下Akka了,他可以满足你的愿望。
我在一次了解Scala语言的情况下,接触到了Akka。官网介绍Akka是为Java和Scala构建高度并发、分布式和弹性消息驱动应用程序的工具包。使用Akka可以更轻松地构建强大的反应式、并发和分布式应用程序。

  • 更简单的并发和分布式系统,actors和streams允许您构建可扩展的系统,更有效地使用服务器的资源。
  • 弹性设计,以响应式原则为基础,Akka能够让系统自我修复,失败时能及时响应。
  • 高性能,在一台机器上,最高可达5000万msg/sec。内存占用很小;每GB堆大约250万个Actors。
  • 弹性的、分布式的,无单点故障的分布式系统。负载平衡和跨节点的自适应路由。使用CRDT实现分布式数据的最终一致性。
  • 响应式的流数据,具有背压式的异步非阻塞流处理。为构建微服务提供了一个很好的平台使用完全异步和流式http服务器和客户端。

Akka包含几个大部分:

    Akka Actors Actor系统

    Akka Streams 流

    Akka HTTP 

    Akka Cluster 集群、分布式

    Cluster Sharding  集群副本管理

    Distributed Data 分布式数据

    Akka Persistence 数据持久化

    Alpakka

    Alpakka Kafka 结合kafka应用

    Akka gRPC gRPC协议

    Akka Enhancements 增强包

    Akka Management

其中核心是Actos模型,接下来我会用几篇文章来介绍一下。我们的一切都是围绕Actos展开的。我们将写一个小例子,通过理论和代码相结合,从而能更好地理解AKKA,乃至Actor模型,领域驱动设计(DDD)等。

波动拳式代码优化问题(二)、混合型

上一篇我们介绍了简单的波动拳式代码的优化,在实际业务开发的过程中,我们往往碰到的问题比上面的还要复杂。在各个条件中穿插着循环、判断等等。接下来我们来谈谈这一类代码的优化,请看下面的例子,我们有这样三类对象:

    老师————>班级————>学生
        1:n 1:n

一个老师任教多个年级的多个班级,一个班级有多名学生。来看下代码:

class Teacher {

    private int teachId;
    /**
     * 编号
     * 
     */
    private String classNo;
    /**
     * 名称
     * 
     */
    private String teachName;
    /**
     * 科目
     * 
     */
    private String subject;
    /**
     * 生日
     * 
     */
    private String birthDay;
    /**
     * 任教班级
     */
    private List<Classes> classList;
    ......
}
class Classes {

    private int classId;
    /**
     * 编号
     * 
     */
    private String classNo;
    /**
     * 名称
     * 
     */
    private String className;
    /**
     * 年级
     */
    private int grade;
    /**
     * 学生
     */
    private List<Student> studentList;
    ......
}
class Student {

    private int id;
    /**
     * 编号
     * 
     */
    private String studentNo;
    /**
     * 姓名
     * 
     */
    private String studentName;
    /**
     * 性别
     * 
     */
    private String gender;

    /**
     * 生日
     * 
     */
    private String birthDay;
    ......
}

我们现在有这样一个需求:我们想找出任教八年级数学的老师以及和这些老师具有相同生日的男性学生。一般的思路是这样的:

    先找出任教数学的老师;
    再找出这些任教九年级的班级
    再找出这些班级的男同学;
    最后比较老师和学生的生日
private void filter() {
        Map<Integer,Integer> relateMap = new HashMap<>(16);
        List<Teacher> teacherList = new ArrayList<Teacher>();
        for (Teacher teacher : teacherList) {
            if (teacher.getSubject().equals("math")) {
                List<Classes> classesList = teacher.getClassList();
                for (Classes classes : classesList) {
                    if (classes.getGrade() == 8) {
                        List<Student> studentList = classes.getStudentList();
                        for (Student student : studentList) {
                            if (student.getGender().equals("male")) {
                                if (student.getBirthDay().equals(teacher.getBirthDay())) {
                                    relateMap.put(student.getId(),teacher.getTeachId());
                                } 
                            }
                        }
                    }
                }
            }
        }
    }

我们可以看到,这是典型的波动拳式代码,谜一样的缩进。那怎么样优化呢?这时候,传统的语法是不能解决的,我们需要借助一些特殊的语法糖。譬如,我们使用Java的流式api。

        teacherList.stream().filter(t -> t.getSubject().equals("math")).forEach(teacher -> {
            teacher.getClassList().stream().filter(c -> c.getGrade() == 8).forEach(classes -> {
                classes.getStudentList().stream().filter(stu -> stu.getGender().equals("male")).forEach(student -> {
                    if (student.getBirthDay().equals(teacher.getBirthDay())) {
                        relateMap.put(student.getId(), teacher.getTeachId());
                    }
                });
            });
        });

看一下,是不是简洁了很多。有的小伙伴也许会说,这样写好像和之前差不多。其实不然,通过这样的改进,我们的逻辑更加清晰,更易于维护,将来扩展起来也更方便。
再比如出现的代码在混合型的基础上,每个条件分支又嵌套多层,这种情况在基于上面的优化前提下就比较好优化了。结合上一篇的介绍,我们可以按照模板套出来。

总结

在实际开发过程中,希望小伙伴们不能只为快速了实现业务功能,而乱了章法。这都是欠下的债,越往后债越背越多。工业生产强调的是团队协作,代码写出来是给人看的,要易于阅读,易于维护。总之,仁者见仁智者见智,这些还是需要大家自己去思考思考。

波动拳式代码优化问题(一)、简单型

何为波动拳式代码?玩过街霸的或者看过龙珠的小伙伴都知道,就是代码像是被龟波气功打过一样,呈现”>”形式。

这种代码主要表现为有3个以上判断语句,加上业务代码后,逻辑十分复杂。常见的波动拳式有三种,下面我们来具体分析一下。

第一种

    private int HadokenStyle1() {
        boolean flag1 = true;
        boolean flag2 = true;
        boolean flag3 = true;
        boolean flag4 = true;
        boolean flag5 = true;
        boolean flag6 = true;
        int res = 0;

        if (flag1) {
            if (flag2) {
                if (flag3) {
                    if (flag4) {
                        if (flag5) {
                            if (flag6) {
                                res = 1;
                            }
                        }
                    }
                }
            }
        }
        return res;
    }

这种样式的代码变现为需要业务同时满足多个标记,那么如何来优化呢?


private int HadokenStyle1Mod() { boolean flag1 = true; boolean flag2 = true; boolean flag3 = true; boolean flag4 = true; boolean flag5 = true; boolean flag6 = true; int res = 0; if (flag1 && flag2 && flag3 && flag4 && flag5 && flag6) { res = 1; } return res; }

这种情况只要放入一行,同时满足就好了,因为是&,只要有一个不满足,程序就退出判读语句。

第二种

这种形式的代码表现为满足条件时,继续向下,不满足是获得一个返回结果

    private int HadokenStyle2() {
        boolean flag1 = true;
        boolean flag2 = true;
        boolean flag3 = true;
        boolean flag4 = true;
        boolean flag5 = true;
        boolean flag6 = true;
        int res = 0;

        if (flag1) {
            if (flag2) {
                if (flag3) {
                    if (flag4) {
                        if (flag5) {
                            if (flag6) {
                                res = 7;
                            } else {
                                res = 6;
                            }
                        } else {
                            res = 5;
                        }
                    } else {
                        res = 4;
                    }
                } else {
                    res = 3;
                }
            } else {
                res = 2;
            }
        }
        return res;
    }

那么该如何优化呢?

    private int HadokenStyle2Mod() {
        boolean flag1 = true;
        boolean flag2 = true;
        boolean flag3 = true;
        boolean flag4 = true;
        boolean flag5 = true;
        boolean flag6 = true;

        boolean flagAll = true;
        flagAll = flagAll && flag1;
        if (!flagAll) {
            return 1;
        }
        flagAll = flagAll && flag2;
        if (!flagAll) {
            return 2;
        }
        flagAll = flagAll && flag3;
        if (!flagAll) {
            return 3;
        }
        flagAll = flagAll && flag4;
        if (!flagAll) {
            return 4;
        }
        flagAll = flagAll && flag5;
        if (!flagAll) {
            return 5;
        }
        flagAll = flagAll && flag6;
        if (!flagAll) {
            return 6;
        } else {
            return 7;
        }
    }

这种情况下,我们采取反向思维,直接找到他的反面返回,满足条件继续推进。

第二种

这种属于变种,有大于三个平行分支

 private int HadokenStyle3() {
        int style = 0;
        int res = 0;

        if (style == 1) {
            res = 1;
        } else if (style == 2) {
            res = 2;
        } else if (style == 3) {
            res = 3;
        } else if (style == 4) {
            res = 4;
        } else if (style == 5) {
            res = 5;
        } else if (style == 6) {
            res = 6;
        }

        return res;
    }

这种情况当我们明确知道条件的判断值时,我们一般采用switch case语法来替换,通常来说多分支的情况下swich的效率要高一点,也没高多少。还有一个重要原因switch是的语义更加直接、清晰,我们来看下。

    private int HadokenStyle3Mod() {
        int style = 0;
        int res = 0;

        switch (style) {
            case 1:
                res = 1;
                break;
            case 2:
                res = 2;
                break;
            case 3:
                res = 3;
                break;
            case 4:
                res = 4;
                break;
            case 5:
                res = 5;
                break;
            case 6:
                res = 6;
                break;

            default:
                break;
        }
        return res;
    }

以上三种形式的代码,我们在实际开发过程中,会经常遇到,刚开始可能条件很少,随着业务的演变,今天你加一条,明天我加一条,慢慢的越来越多,最终累积出波动拳式。还有一些其他的变种,比如循环和条件合并型的等。

爬虫Scrapy笔记(五)、middlewares

实战中我们的爬虫很容易被对方ban,如何反ban是一门很深的功课。我们来学学最简单的。
通常情况下,最容易想到的有两个,一个是用户代理(user-agent),还有一个就是ip代理(proxy)。下面我们就要学一个新的组件middlewares。
middlewares组件是一组中间件,主要是处理爬虫系统的输入输出数据,包括request和response数据。
首先需要自定义一个middleware组件,随便起个名字SampleUserAgentMiddleware,用户代理的middleware继承scrapy.contrib.downloadermiddleware.useragent.UserAgentMiddleware,然后实现他的process_request方法。具体实现是这样的,我们定义一个用户代理的列表,也可以动态生成,简单起见,我们写个固定的列表,然后随机选取一个,之后把它塞到request请求的User-Agent参数中即可。

class SampleUserAgentMiddleware(useragent.UserAgentMiddleware):
    #代理
    agents = [
        "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; AcooBrowser; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
        "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; Acoo Browser; SLCC1; .NET CLR 2.0.50727; Media Center PC 5.0; \.NET CLR 3.0.04506)",
        "Mozilla/4.0 (compatible; MSIE 7.0; AOL 9.5; AOLBuild 4337.35; Windows NT 5.1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
        "Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US)",
        "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET CLR 2.0.50727; Media Center PC 6.0)",
        "Mozilla/5.0 (compatible; MSIE 8.0; Windows NT 6.0; Trident/4.0; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET CLR 1.0.3705; .NET CLR 1.1.4322)",
        "Mozilla/4.0 (compatible; MSIE 7.0b; Windows NT 5.2; .NET CLR 1.1.4322; .NET CLR 2.0.50727; InfoPath.2; .NET CLR 3.0.04506.30)",
        "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN) AppleWebKit/523.15 (KHTML, like Gecko, Safari/419.3) Arora/0.3 (Change: 287 c9dfb30)",
        "Mozilla/5.0 (X11; U; Linux; en-US) AppleWebKit/527+ (KHTML, like Gecko, Safari/419.3) Arora/0.6",
        "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.8.1.2pre) Gecko/20070215 K-Ninja/2.1.1",
        "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9) Gecko/20080705 Firefox/3.0 Kapiko/3.0",
        "Mozilla/5.0 (X11; Linux i686; U;) Gecko/20070322 Kazehakase/0.4.5",
        "Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.0.8) Gecko Fedora/1.9.0.8-1.fc10 Kazehakase/0.5.6",
        "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11",
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_3) AppleWebKit/535.20 (KHTML, like Gecko) Chrome/19.0.1036.7 Safari/535.20",
        "Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; fr) Presto/2.9.168 Version/11.52",
        ]

    def __init__(self, user_agent=''):
        self.user_agent = user_agent

    def process_request(self, request, spider):
        #随机选择一个user-agent
        agent = random.choice(self.agents)
        request.headers.setdefault('User-Agent', agent)

接下来我们再来实现一个ip代理,也是很简单的,我们定义一个middleware组件,然后实现process_request方法就可以了,这里我们演示采用fiddler代理。

class ProxyMiddleware(object):
    def process_request(self, request, spider):
        proxy = 'http://localhost:8888'
        request.meta['proxy'] = proxy

要想middleware工作,我们还需要在setting中开启配置,

DOWNLOADER_MIDDLEWARES = {
#    'sample.middlewares.MyCustomDownloaderMiddleware': 543,2019-09-04 13:57:43 星期三
    'sample.middlewares.SampleUserAgentMiddleware': 100,
    'sample.middlewares.ProxyMiddleware': 200,
}

我们来验证下

我们可以看到代理成功获取。

sample示例源码下载

总结

总的来说,通过scrapy来实现爬虫还是很简单的。因其高度封装特性,以及模块化的设计,极大地便利了开发者。但是想要用做好爬虫还有很远的路要走,比如反爬,分布式爬虫的设计等等。

爬虫Scrapy笔记(四)、spider

正主登场,前面我们介绍了爬虫开始前的一些外围工作,现在,我们终于开始讨论爬虫了。

class OschinaSpider(scrapy.Spider):
 ......
    def parse(self, response):
        pass

他也很简单,继承 Spider 类,我们只要实现 parse 接口就好了,parse 接受一个 response 参数。
在我们实现以前先理一下逻辑。我们首先需要从找到所有的文章,他的页面文章大致如下图:

其次,我们需要找到页面上具体元素的信息,如下图

我们只要解析好上面两组信息就好了,解析第一组信息是为了找到具体文章,之后把信息交给 scrapy 的 schedule 组件,通过Downloader去请求文章,之后再将结果返回给爬虫,解析第二组信息,就是具体要持久化的信息,这样一来就好办了。
我们来看下代码:


def parse(self, response): # 通过xpath定位文件,找到文章url链接 news_href = response.xpath( '//div[@class="page"]/div[@class="box vertical news"]/a[@class="news-link primary " or @class="news-link primary visited " ]/@href').extract() # 循环获取到的数据,让scrapy重新请求,过滤下,只请求news信息 for news in news_href: if news.startswith("https://www.oschina.net/news"): # 执行yield生成器,结果回调news_detail yield scrapy.Request(news, callback=self.news_detail) def news_detail(self, response): # 生成一个SampleItem容器 item = SampleItem() # 解析具体的字段 # 标题 item['title'] = response.xpath('//div[@class="article-detail"]/h2[@class="header"]/text()').extract() # 作者 item['author'] = response.xpath( '//div[@class="article-detail"]/div[@class="extra ui horizontal list meta-wrap"]/div[@class="item"]/a[@class="__user"]/span/text()').extract() # 收藏 item['collect'] = response.xpath( '//div[@class="article-detail"]/div[@class="extra ui horizontal list meta-wrap"]/div[@class="item collect-btn "]/span/text()').extract() # 评论 item['comment'] = response.xpath( '//div[@class="article-detail"]/div[@class="extra ui horizontal list meta-wrap"]/div[@class="item comment-count"]/a[@class="normal"]/span/text()').extract() # 执行yield生成器,让pipeline处理 yield item

现在来启动我们的爬虫


执行成功,我们来看下执行的结果

成功生成了数据

这样,我们就完成了一只爬虫的基本任务。之后我们就可以基于这些数据做一些分析,应用到相应的业务中去。总的来说,还是很简单的,最主要的是要动手,看着简单,时间操作中还是会遇到各种各样的问题。
做爬虫始终绕不开一个问题,就是反爬,反爬的技术有很多,需要具体问题具体分析。下一节,我们学学最简单的user-agent和代理ip。

爬虫Scrapy笔记(三)、pipelines

pipelines 主要是用于接收一个 item,然后处理这个 item,通过 pipeline 来决定是否需要继续处理还是存储起来,亦或是丢弃等等。
他也很简单,只要实现一个接口方法就可以

  def process_item(self, item, spider):

下面我们来实现这个方法,在之前我们需要做一些预处理,我们想把这些信心存储起来,简单起见,我们就把 item 存储在 sqlite 中。sqlite 是一个简单的数据库,不做过多介绍。
我们需要把爬虫的启动和关闭和数据库的启动和关闭关联起来,防止出现异常。
scrapy 中本身包含如下信号

#signals.py
engine_started = object()
engine_stopped = object()
spider_opened = object()
spider_idle = object()
spider_closed = object()
spider_error = object()
request_scheduled = object()
request_dropped = object()
response_received = object()
response_downloaded = object()
item_scraped = object()
item_dropped = object()

# for backwards compatibility
stats_spider_opened = spider_opened
stats_spider_closing = spider_closed
stats_spider_closed = spider_closed

item_passed = item_scraped

request_received = request_scheduled

这里使用了 dispatcher.connect 函数

class SamplePipeline(object):
       #文件名
    filename = 'oschina.sqlite'

    def __init__(self):
        self.conn = None
        # 添加信号分发器,控制数据库的连接与关闭
        dispatcher.connect(self.started, signals.engine_started)
        dispatcher.connect(self.stopped, signals.engine_stopped)

    # 初始换
    def started(self):
        # 如果路径存在,表示数据库已创建,连接数据库
        if path.exists(self.filename):
            self.conn = sqlite3.connect(self.filename)
            self.conn.text_factory = str
            # 否则表示数据库不存在,创建数据库,并创建相关表
        else:
            self.conn = self.init_table(self.filename)

    # 清理相关工作,提交事务,关闭连接
    def stopped(self):
        if self.conn is not None:
            self.conn.commit()
            self.conn.close()
            self.conn = None

    # 建表
    def init_table(self, filename):
        conn = sqlite3.connect(filename)
        # 这里的表结构和item相对应
        conn.execute("""create table oschina(
                        id integer primary key autoincrement,
                        title text,
                        author text,
                        collect int,
                        comment int
                        )""")
        conn.commit()
        return conn

    def process_item(self, item, spider):
        self.conn.execute(
            'insert or replace into oschina values((select id from oschina a where a.title=?),?,?,?,?)',
            (item['title'][0], item['title'][0], item['author'][0],  int(item['collect'][0]),
             int(item['comment'][0])))
        return item

这样,我们就完成了简单的数据持久化操作。注意一点的是,需要在setting.py中开启pipeline设置。

ITEM_PIPELINES = {
    当有多个Pipeline,可加在后面,后面数据表示执行顺序
    'sample.pipelines.SamplePipeline': 300,
}

接下里就等着正主登场了。下一章节,我们来看下如何驱动一只爬虫。

爬虫Scrapy笔记(二)、items

items,项目,顾名思义这个组件主要用来转储项目的,是一个转储容器,类似于 Java 里 POJO。

class SampleItem(scrapy.Item):
    # define the fields for your item here like:
    # name = scrapy.Field()
    pass

我们自定的这个 SampleItem,集成了 scrapy 框架中的 Item,来看下他的定义

class DictItem(MutableMapping, BaseItem):

    fields = {}

    def __init__(self, *args, **kwargs):
        self._values = {}
        if args or kwargs:  # avoid creating dict for most common case
            for k, v in six.iteritems(dict(*args, **kwargs)):
                self[k] = v

    def __getitem__(self, key):
        return self._values[key]

    def __setitem__(self, key, value):
        if key in self.fields:
            self._values[key] = value
        else:
            raise KeyError("%s does not support field: %s" %
                (self.__class__.__name__, key))

    def __delitem__(self, key):
        del self._values[key]

    def __getattr__(self, name):
        if name in self.fields:
            raise AttributeError("Use item[%r] to get field value" % name)
        raise AttributeError(name)

    def __setattr__(self, name, value):
        if not name.startswith('_'):
            raise AttributeError("Use item[%r] = %r to set field value" %
                (name, value))
        super(DictItem, self).__setattr__(name, value)

    def __len__(self):
        return len(self._values)

    def __iter__(self):
        return iter(self._values)

    __hash__ = BaseItem.__hash__

    def keys(self):
        return self._values.keys()

    def __repr__(self):
        return pformat(dict(self))

    def copy(self):
        return self.__class__(self)


@six.add_metaclass(ItemMeta)
class Item(DictItem):
    pass

Item 继承 DictItem,我们查看源码,姑且这样理解,它就是一个实现了字典的这样一个容器。
好了,我们也定义一下我们的容器,比如我们想知道每一篇文章的收藏情况以及评论的参与度,大致了解一下这篇文章是否受欢迎。我们定义一下我们需要的信息


class SampleItem(scrapy.Item): # define the fields for your item here like: # 文章标题 title = scrapy.Field() # 作者 author = scrapy.Field() # 发布日期 date = scrapy.Field() # 收藏 collect = scrapy.Field() # 评论 comment = scrapy.Field() pass

看来还是很简单的,接下来我们将创建一个 pipeline,来处理持久化我们的数据。

爬虫Scrapy笔记(一)、安装及初始化

因为业务的需要,最近需要研究爬虫实现相关功能。故此,新开一篇记录学习过程。
从网上查询相关的资料,母目前使用的比较多的开源框架可能要算 Scrapy 了。
百度百科是这样介绍的:

- Scrapy 是 Python 开发的一个快速、高层次的屏幕抓取和 web 抓取框架,用于抓取 web 站点并从页面中提取结构化的数据。Scrapy 用途广泛,可以用于数据挖掘、监测和自动化测试。

- Scrapy 吸引人的地方在于它是一个框架,任何人都可以根据需求方便的修改。它也提供了多种类型爬虫的基类,如 BaseSpider、sitemap 爬虫等,最新版本又提供了 web2.0 爬虫的支持。

Scrapy 架构介绍(摘自百度百科)

- Scrapy Engine(引擎):负责 Spider、ItemPipeline、Downloader、Scheduler 中间的通讯,信号、数据传递等。
- Scheduler(调度器):它负责接受引擎发送过来的 Request 请求,并按照一定的方式进行整理排列,入队,当引擎需要时,交还给引擎。
- Downloader(下载器):负责下载 Scrapy Engine(引擎)发送的所有 Requests 请求,并将其获取到的 Responses 交还给 Scrapy Engine(引擎),由引擎交给 Spider 来处理。
- Spider(爬虫):它负责处理所有 Responses,从中分析提取数据,获取 Item 字段需要的数据,并将需要跟进的 URL 提交给引擎,再次进入 Scheduler(调度器)。
- Item Pipeline(管道):它负责处理 Spider 中获取到的 Item,并进行进行后期处理(详细分析、过滤、存储等)的地方。
- Downloader Middlewares(下载中间件):一个可以自定义扩展下载功能的组件。
- Spider Middlewares(Spider 中间件):一个可以自定扩展和操作引擎和 Spider 中间通信的功能组件。

大概的工作流程如下:

- 爬虫引擎启动之后,爬虫根据初始的url经由Engine交给调度器;
- 调度器按照一定的逻辑排序,在通过Engine依赖于Downloader请求相关资源;
- Downloader将获取到的结果返回给爬虫来解析;
- 爬虫解析完成后,将一部分结果通过pipeline做持久化;
- 另一部需要继续爬取的任务再经由由Engine交给调度器;
- 按照上述逻辑重复执行,直到没有可以爬取的数据;
- 爬取停止,退出;

这样我们就有了一个初步的理解,理论部分结束。我们开始动手吧,实际使用加上阅读源码才能了解的更彻底。
参照相关的教程,大致如下:

安装 Scrapy,我本地因为使用的是 conda 环境,他可以自己解决各种依赖,很方便,直接安装就可以了,这里不再赘述。
安装好了之后,检查下版本 【scrapy version】,出现版本号,表示安装完成

接下来创建一个项目,我们命名为 sample,【scrapy startproject sample】,提示创建完成

我们看下创建的项目文件,和上面的组件基本上对应的,具体的功能不再赘述

这里有两个文件,一个是 settings,主要是做设置的,是项目中程序相关的配置,是一个全局配置。譬如我想配置一个常量,可以写在这个文件里。还有一个 scrapy.cfg 这样也是一个配置信息,主要是项目的相关信息。

当然了你也可以不使用命令行,你也可以手动创建,都是可以的,只不过这样麻烦一点而已。话说回来谁喜欢和自己过不去呢。

好了,接下来我们需要新建一只属于我们自己爬虫,本着开源学习的精神,我们以爬取开源中国 https://www.oschina.net/ 为例:

当然,这一步也可手动创建,其实也很简单,就是创建一个集成scrapy的子类

# -*- coding: utf-8 -*-
import scrapy


class OschinaSpider(scrapy.Spider):
    name = "oschina"
    allowed_domains = ["https://www.oschina.net/"]
    start_urls = ['http://https://www.oschina.net//']

    def parse(self, response):
        pass

爬虫的名字叫oschina,爬虫会爬取 https://www.oschina.net/ 域名下url,起始url是 https://www.oschina.net/ 。
这样一只爬虫就建好了,下一步我们将丰满这只爬虫,让它去爬取我们需要的信息。

Mybatis源码笔记(六)、ResultHandler

ResultHandler 接口比较简单,主要用于处理 ResultContext 对象

public interface ResultHandler<T> {

  void handleResult(ResultContext<? extends T> resultContext);

}

ResultHandler 主要有两个实现,分别是 DefaultResultHandler 和 DefaultMapResultHandler

  @SuppressWarnings("unchecked")
  public DefaultMapResultHandler(String mapKey, ObjectFactory objectFactory, ObjectWrapperFactory objectWrapperFactory, ReflectorFactory reflectorFactory) {
    this.objectFactory = objectFactory;
    this.objectWrapperFactory = objectWrapperFactory;
    this.reflectorFactory = reflectorFactory;
    this.mappedResults = objectFactory.create(Map.class);
    this.mapKey = mapKey;
  }

  @Override
  public void handleResult(ResultContext<? extends V> context) {
    final V value = context.getResultObject();
    final MetaObject mo = MetaObject.forObject(value, objectFactory, objectWrapperFactory, reflectorFactory);
    // TODO is that assignment always true?
    final K key = (K) mo.getValue(mapKey);
    mappedResults.put(key, value);
  }

  public Map<K, V> getMappedResults() {
    return mappedResults;
  }
}

public class DefaultResultHandler implements ResultHandler<Object> {

  private final List<Object> list;

  public DefaultResultHandler() {
    list = new ArrayList<>();
  }

  @SuppressWarnings("unchecked")
  public DefaultResultHandler(ObjectFactory objectFactory) {
    list = objectFactory.create(List.class);
  }

  @Override
  public void handleResult(ResultContext<?> context) {
    list.add(context.getResultObject());
  }

  public List<Object> getResultList() {
    return list;
  }

}

区别的化一目了然,DefaultMapResultHandler 将处理结果存储在一个 Map 对象中,DefaultResultHandler 将结果存储在一个 List 对象中。

总结:

至此,我们的主线流程大致的也就有了一些了解,对应于传统的 JDBC 方式,我们一步一步地分析了源码,他们是如何运作的,基本上已经明了了。实际上mybatis的框架还是很复杂的,其中一些很复杂的逻辑我们并没有实际深入,比如boundSql的解析,自动填充对象的映射,等等。想要有更深的了解还需进一步精度这些代码。