4.2 采集要求
4.2.1 功能架构
大数据平台的采集功能需要采集抽取结构化和非结构化的数据,具体分为数据抽取、数据规整、数据输出和数据稽核功能,如图4-6所示。
图4-6 功能架构
① 数据抽取
● 接口定义:根据不同数据源,定义相应的接口协议,如FTP、HTTP、JSON等;
● 数据抽取:可以全量抽取和增量抽取方式从源系统抽取数据。
② 数据规整
● 数据解析:按照接口定义的格式从HTTP、JSON、XML等格式的数据源中提取数据,以便后续清洗;
● 数据清洗:按照数据业务规则对无效数据、异常数据进行清洗,以减少网络带宽压力及保证数据的有效性。
③ 数据输出
● 数据入库:将稽核无误后的数据进行入库处理;
● 日志记录:采集记录操作的日志,包括但不限于操作时间、数据范围、采集的数据量、采集的错误信息等,并将日志信息输出至安全管理和核心处理系统。
④ 数据稽核:针对数据抽取、数据规整、数据输出的每个环节进行稽核,确保数据的准确性、完整性、一致性。
4.2.2 技术架构
大数据平台采集系统的技术架构如图4-7所示。
图4-7 技术架构
① 数据抽取
● Flume:用于分布式海量日志采集系统;
● JDBC工具:用于关系数据库数据采集;
● OGG:用于Oracle数据库数据采集;
● FTP:用于文本数据采集;
● Nutch:用于网页数据采集。
② 数据规整
● 解析:按照接口定义的格式从HTTP、JSON、XML等格式的数据源中提取数据;
● 清洗:负责对无效数据、异常数据进行清洗。
③ 数据输出
● 实时消息:使用分布式消息队列,将数据输出至核心处理系统;
● 批量文件:将采集的数据生成文件,批量输出至核心处理系统;
● 日志文件:将稽核日志、安全日志,分别输出给数据管理系统、核心处理系统。
4.2.3 处理技术
1.数据采集框架
数据采集框架采用Flume。
① 架构
Flume数据采集框架包括以下内容,如图4-8所示。
图4-8 数据采集框架图
● Source可以接收外部源发送过来的数据,不同的Source,可以接受不同的数据格式,比如目录池(Spooling Directory)数据源,可以监控指定文件夹中的新文件变化,如果目录中有文件产生,就会立刻读取其内容;
● Channel是一个存储地,接收Source的输出,直到有Sink消费掉Channel中的数据,Channel中的数据直到进入到下一个Channel中或者进入终端才会被删除,当Sink写入失败后,可以自动重启,不会造成数据丢失,因此很可靠;
● Sink会消费Channel中的数据,然后送给外部源或者其他Source,比如数据可以写入到HDFS或者HBase中。
② 可靠性
Flume的核心是把数据从数据源收集过来,再送到目的地。为了确保输送成功,在送到目的地之前,会先缓存数据,待数据真正到达目的地后,再删除自己缓存的数据。
Flume使用事务性的方式保证传送Event整个过程的可靠性。Sink必须在Event被存入Channel后,或者已经被传达到下一站Agent里,又或者已经被存入外部数据目的地之后,才能把Event从Channel中Remove掉。这样数据流里的Event无论是在一个Agent里还是多个Agent之间流转,都能保证可靠,因为以上的事务保证了Event会被成功存储起来。而Channel的多种实现在可恢复性上有不同的保证。也保证了Event不同程度的可靠性。比如,Flume支持在本地保存一份文件Channel作为备份,而Memory Channel将Event存在内存Queue里,虽然速度快,但丢失的话无法恢复。
③ 可扩展性
Flume采用了三层架构,分别为Agent, Collector和Storage,每一层均可以水平扩展。其中,所有Agent和Collector由Master统一管理,这使得系统容易监控和维护,且Master允许有多个(使用ZooKeeper进行管理和负载均衡),这就避免了单点故障问题。
④ 可管理性
所有Agent和Collector由Master统一管理,这使得系统便于维护。多Master情况下,Flume利用ZooKeeper和Gossip,保证动态配置数据的一致性。用户可以在Master上查看各个数据源或者数据流执行情况,且可以对各个数据源配置和动态加载。Flume提供了Web和Shell Script Command两种形式对数据流进行管理。
⑤ 功能可扩展性
用户可以根据需要添加自己的Agent, Collector或者Storage。此外,Flume自带了很多组件。
2.消息队列框架
消息队列框架采用Kafka。
Kafka是一个消息订阅和发布系统,它将消息的发布称作生产者(Producer),将消息的订阅称作消费者(Consumer),将中间的存储阵列称作代理(Broker),三者关系如图4-9所示。
图4-9 生产者—代理—消费者
Kafka有以下特点。
● 同时为发布和订阅提供高吞吐量;
● 可进行持久化操作。将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。通过将数据持久化到硬盘并复制,可防止数据丢失;
● 分布式系统,易于向外扩展。所有的生产者、代理和消费者都会有多个,均为分布式的,无须停机即可扩展机器;
● 消息被处理的状态是在消费者端维护,而不是由服务器端维护,当失败时能自动平衡;
● 支持在线和离线的场景。
① 架构
Kafka是显式分布式架构,如图4-10所示,Producer、Broker和Consumer都可以有多个。Kafka的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。Kafka的基本概念包括以下几点。
图4-10 Kafka架构
● Topic:特指Kafka处理的消息源(Feeds of Messages)的不同分类。
● Partition:Topic物理上的分组,一个Topic可以分为多个Partition,每个Partition是一个有序的队列。Partition中的每条消息都会被分配一个有序的ID(Offset)。
● Message:Message是通信的基本单位,每个Producer可以向一个Topic发布一些消息。如果Consumer订阅了这个主题,那么新发布的消息就会广播给这些Consumer。
● Producers:消息和数据生产者,向Kafka的一个Topic发布消息的过程称为Producers。
● Consumers:消息和数据消费者,订阅Topics并处理其发布的消息的过程称为Consumers。
● Broker:缓存代理,Kafka集群中的一台或多台服务器统称为Broker。
Kafka是显式分布式的,多个Producer、Broker和Consumer可以运行在一个大的集群上,作为一个逻辑整体对外提供服务;多个Consumer可以组成一个Group, Message只能传输给某个Group中的某一个Consumer。
② 关键技术点
Kafka采用了以下关键技术。
● Zero-Copy:在Kafka上,有两个原因可能导致低效,太多的网络请求和过多的字节复制,为了提高效率,Kafka把Message分成一组一组的,每次请求会把一组Message发给相应的Consumer。此外,为了减少字节复制,采用了Sendfile系统调用。
● Exactly Once Message Transfer:Kafka中仅保存了每个Consumer已经处理数据的Offset,这样有两个好处,一是保存的数据量少,二是当Consumer出错时,重新启动Consumer处理数据,只需从最近的Offset开始即可。
● Push/Pull:Producer向Kafka中推(Push)数据,Consumer从Kafka中拉(Pull)数据。
● 负载均衡和容错:Producer和Broker之间没有负载均衡机制,Broker和Consumer之间利用ZooKeeper进行负载均衡。所有Broker和Consumer都会在ZooKeeper中进行注册,且ZooKeeper会保存它们的一些元数据信息,如果某个Broker和Consumer发生了变化,所有其他的Broker和Consumer都会得到通知。
3.分布式爬虫框架
分布式爬虫框架采用Nutch。
Nutch是一个基于Lucene、类似Google的完整网络搜索引擎解决方案,基于Hadoop的分布式处理模型保证了系统的性能,插件机制保证了系统的可定制化,而且很容易集成到自己的应用之中。
总体上,Nutch可以分为两部分:抓取部分和搜索部分。抓取程序抓取页面并把抓取回来的数据做成反向索引,搜索程序则对反向索引进行搜索来回答用户的请求。抓取程序和搜索程序的接口是索引,两者都使用索引中的字段。抓取程序和搜索程序可以分别位于不同的机器上。
抓取程序是被Nutch的抓取工具驱动的。这组工具用来建立和维护三种不同的数据结构:Web Database、Segments、Index。
① Web Database(简称WebDB):这是一个特殊的存储数据结构,用来映射被抓取网站数据的结构和属性的集合。WebDB用来存储从抓取开始(包括重新抓取)的所有网站结构数据和属性。WebDB只被抓取程序使用,搜索程序并不使用它。WebDB存储两种实体:页面和链接。页面表示网络上的一个网页,这个网页的URL作为标识被索引,同时建立一个对网页内容的MD5哈希签名。跟网页相关的其他内容也被存储,包括页面中的链接数量(外链接),页面抓取信息(在页面被重复抓取的情况下),还有表示页面级别的分数。因此WebDB可以说是一个网络图,节点是页面,链接是边。
② Segment:这是网页的集合,并且它是被索引的。Segment的Fetchlist是抓取程序使用的URL列表,它是从WebDB中生成的。Fetcher的输出数据是从Fetchlist中抓取的网页。Fetcher的输出数据先被反向索引,然后索引后的结果被存储在Segment中。Segment的生命周期是有限制的,当下一轮抓取开始后它就没有用了。因此删除超过指定时间期限的Segment是可以的。而且也可以节省不少磁盘空间。Segment的命名是日期加时间,反映出相应的存活周期。
③ Index:索引库是反向索引所有系统中被抓取的页面,它并不直接从页面反向索引产生,而是合并很多小的Segment的索引产生的。Nutch使用Lucene来建立索引,因此所有Lucene相关的工具API都用来建立索引库。需要说明的是Lucene的Segment概念和Nutch的Segment概念是完全不同的。Lucene的Segment是Lucene索引库的一部分,而Nutch的Segment是WebDB中被抓取和索引的一部分。
抓取程序的抓取过程如下。
抓取是一个循环的过程,抓取工具从WebDB中生成了一个Fetchlist集合;抽取工具的根据是Fetchlist从网络上下载网页内容;工具程序根据抽取工具发现的新链接更新WebDB,然后再生成新的Fetchlist,周而复始。这个抓取循环在Nutch中经常指generate/fetch/update循环。
一般来说同一域名下的URL链接会被合成到同一个Fetchlist。这样做的考虑是当同时使用多个工具抓取的时候,不会产生重复抓取的现象。Nutch遵循Robots Exclusion Protocol,可以用robots.txt定义保护私有网页数据不被抓取。
上面这个抓取工具的组合是Nutch的最外层的,也可以直接使用底层的工具,自己组合这些底层工具的执行顺序达到同样的结果。这是Nutch的优势。具体工作过程如下。
a)创建一个新的WebDB(admin db-create);
b)把开始抓取的根URL放入WebDB(inject);
c)从WebDB的新Segment中生成Fetchlist(generate);
d)根据Fetchlist列表抓取网页的内容(fetch);
e)根据抓取回来的网页链接URL更新WebDB(updatedb);
f)重复上面c)~e)步骤直到到达指定的抓取层数;
g)用计算出来的网页URL权重Scores更新Segments(updatesegs);
h)对抓取回来的网页建立索引(index);
i)在索引中消除重复的内容和重复的URL(dedup);
j)合并多个索引到一个大索引,为搜索提供索引库(merge)。
4.2.4 场景应用
1.结构化数据采集
(1)文本文件
如图4-11所示,结构化文本入库数据流包括如下几点。
图4-11 结构化文本入库数据流
① 数据抽取:数据源端将TXT文本文件和Check文件传至大数据平台的FTP Server。
② 数据规整:对文本数据按规定格式进行解析,主要是编码格式等;使用Flume、Kafka组件,将数据转为消息队列,并进行数据清洗。
③ 数据输出:解析、清洗后的数据,送至大数据平台;如果源数据无须清洗,则文件数据直接入库。
(2)关系数据库
如图4-12所示,数据库采集数据流包括如下几点。
图4-12 数据库采集数据流
① 数据抽取:利用JDBC工具/OGG,从源数据库中抽取数据。
② 数据规整:对数据按规定格式进行解析,主要是字符集转换等;使用Flume、Kafka组件,将数据转为消息队列,并进行清洗。
③ 数据输出:解析、清洗后的数据,送至大数据平台;如果源数据无须清洗,则文件数据直接入库。
2.非结构化数据采集
(1)DPI分光数据
如图4-13所示,DPI分光数据采集流程包括如下几点。
图4-13 DPI分光数据采集流程
① 数据抽取:按标准统一DPI数据格式,通过FTP方式输出给大数据平台。
② 数据规整:对数据按规定格式进行解析,主要是网络协议等。
③ 数据输出:使用Flume、Kafka组件,将数据转为消息队列,并进行清洗;解析、清洗后的数据,送至大数据平台。
(2)文本文件
如图4-14所示,非结构化文本入库数据流包括如下几点。
图4-14 非结构化文本入库数据流
① 数据抽取:数据源端将TXT文本文件和Check文件传至大数据平台的FTP Server。
② 数据规整:对文本数据按规定格式进行解析,主要是编码格式等;使用Flume、Kafka组件,将数据转为消息队列,并进行数据清洗。
③ 数据输出:解析、清洗后的数据,送至大数据平台;如果源数据无须清洗,则文件数据直接入库。
(3)网页数据
如图4-15所示,网页数据采集数据流包括以下三方面内容。
图4-15 网页数据采集数据流
① 数据抽取:Nutch根据配置的规则,从网页上抓取数据。
② 数据规整:数据解析、清洗、入库,由Nutch集成,统一实现。
③ 数据输出:抓取后的数据,存入大数据平台。
4.2.5 接口协议
如表4-2所示为接口协议要求。
表4-2 接口协议
4.2.6 接口约定
1.接口双方责任
(1)FTP接口
① 源数据提供方的责任
● 保证在指定的时间范围内生成本接口要求规定的数据至与下游约定的目录下面;
● 保证接口文件中的记录各值域在有效的取值范围内,数据中均不能包含0x0A(换行符)和0x05字符,确保数据的有效性和准确性;
● 负责数据的业务逻辑一致性控制,保证不将逻辑错误的数据提供给下游系统,保证提供数据质量,确保数据的准确性、一致性、完整性。
② 数据接收方的责任
● 负责与源数据提供方的信息交互和沟通;
● 负责对源数据提供方提供的数据文件进行及时的读入、稽核、规整及输出;
● 需具备监控校验及传输过程的功能;
● 负责及时根据稽核文件中记录的信息,对接口数据文件进行文件级校验。
(2)数据库接口
① 源数据提供方的责任
● 向数据接收方通报表结构;
● 向数据接收方通报数据库、表的编码方式,如果表中字段有特别编码,应当告知数据接收方;
● 保证在指定的时间范围内生成/更新数据至与下游约定的表;
● 对数据库服务器进行日常监控及维护;
● 保证数据库表中的记录各值域在有效的取值范围内,确保数据的有效性和准确性;
● 负责数据的业务逻辑一致性控制,保证不将逻辑错误的数据提供给下游系统,保证提供数据的质量,确保数据的准确性、一致性、完整性。
② 数据接收方的责任
● 负责与源数据提供方的信息交互和沟通;
● 负责对源数据提供方提供的数据文件及时读入、稽核、规整及输出;
● 需具备监控校验及传输过程的功能;
● 负责及时根据稽核比对抽取的记录数与输出的记录数。
2.接口文件格式及说明
(1)接口文件设计原则
接口文件设计原则如下。
① 文件压缩:文件要求压缩后上传。
② 文件大小:加载到Hadoop的单个文件大小以接近HDFS的Block块大小为最好;例如HDFS的Block大小为128MB,则单个文件压缩前最大不宜超过300MB。
③ 压缩格式:统一采用GZIP格式。
④ 文件编码方式应当统一为UTF-8。
(2)文件命名方式
① 数据文件命名方式
命名规则:文件名前缀.文件生成日期.数据日期.文件批次.批次流水号.重传次数.文件名后缀
● 文件名前缀:某类文件的唯一标识。
● 文件生成日期:这系统日期,这是描述文件生成的精确时间。其格式为“yyyyMMddhh24mmss”。
● 数据日期:数据日期是描述当前抽取周期中,数据的发生日期(如:20160328,则表示抽取的是2016年03月28日的数据快照)。按日抽取的数据文件,其数据日期就是数据的发生日期。按月抽取的数据文件,则数据日期取数据发生的年月(如:201205)。
● 重传次数:表示源数据提供方对某一业务的某一批次的某一数据日期的数据重传次数。取值范围是00~99,00表示初次正常下发,01表示第1次重传,02表示第2次重传,……, N表示第N次重传。
● 文件批次:001~999,表示该业务的数据分批的序号,主要是支持该业务数据文件按批次重传使用。
● 批次流水号:001~999,表示文件批次下的数据按照规定的文件大小拆分后的序列号。
● 文件名后缀:“DAT”字符串(上传过程使用“TMP”字符串)。
② 稽核文件命名方式
该文件命名规则中没有文件批次和批次流水号的概念,均默认成“000”字符串,其他规则跟对应的数据文件命名规则保持一致,文件名后缀字符串为“CHECK”。该文件的内容用于描述该接口在本次操作的动作内所传送的文件列表,并附带了上传的每个数据文件需要校验的信息和数据处理接收方进行登记的信息,字段之间以0x05作为分隔符,文件信息由表4-3中的内容组成。
表4-3 稽核文件信息表
③ 上传过程中文件命名方式
源数据提供方所生成的数据文件和稽核文件,在上传过程中,要以“TMP”字符串结尾。
④ 回执文件格式及命名
该文件由数据处理接收方提供,通过传输通道传送到源数据提供方。
根据稽核文件对数据文件进行校验时,所生成的回执文件名跟对应的稽核文件名一致。
命名规则:Check文件名(包括后缀字符串“CHECK”).文件名后缀
如校验成功:回执文件名的后缀为“RPT”。
如校验失败:回执文件名的后缀为“ERR”。
校验报告文件的字段分隔符为0x05,且分隔符不能省略,文件内容如表4-4所示。
表4-4 校验报告文件
注:如果数据处理接收方在接口所规定的时间内未收到Check文件,那么数据处理接收方需要发出提醒报告,报告文件的校验结果代码为“99”,报告文件命名方式跟Check文件命名规则一致。
4.2.7 性能指标
大数据平台采集系统性能指标(参考值)如下。
① 文本文件数据采集:单台接口机配1块千兆位网卡,按照下行带宽占总带宽利用率的64%测算,每秒处理一个80MB文件。
② 数据库数据采集:大于10000条每秒。
③ DPI分光数据采集:单台采集清洗服务器配1块万兆位网卡,处理性能为153Mbps,采集清洗时间低于3秒,Hadoop入库时间低于5分钟。
④ 网页数据采集:单个IP百兆位带宽在10个并发的情况下一天采集80万个URL。