死磕以太坊源码分析之Fetcher同步
Fetcher 功能概述
区块数据同步分为被动同步和主动同步:
被动同步是指本地节点收到其他节点的一些广播的消息,然后请求区块信息。
主动同步是指节点主动向其他节点请求区块数据,比如geth刚启动时的syning,以及运行时定时和相邻节点同步
Fetcher
负责被动同步,主要做以下事情:
- 收到完整的block广播消息(NewBlockMsg)
- 收到blockhash广播消息(NewBlockHashesMsg)
这两个消息又是分别由 peer.AsyncSendNewBlockHash
和 peer.AsyncSendNewBlock
两个方法发出的,这两个方法只有在矿工挖到新的区块时才会被调用:
1 | // 订阅本地挖到新的区块的消息 |
1 | func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { |
所以,当某个矿工产生了新的区块、并将这个新区块广播给其它节点,而其它远程节点收到广播的消息时,才会用到 fetcher
模块去同步这些区块。
fetcher的状态字段
在 Fetcher
内部对区块进行同步时,会被分成如下几个阶段,并且每个阶段都有一个状态字段与之对应,用来记录这个阶段的数据:
Fetcher.announced
:此阶段代表节点宣称产生了新的区块(这个新产生的区块不一定是自己产生的,也可能是同步了其它节点新产生的区块),Fetcher
对象将相关信息放到Fetcher.announced
中,等待下载。Fetcher.fetching
:此阶段代表之前「announced」的区块正在被下载。Fetcher.fetched
:代表区块的header
已下载成功,现在等待下载body
。Fetcher.completing
:代表body
已经发起了下载,正在等待 body 下载成功。Fetcher.queued
:代表body
已经下载成功。因此一个区块的数据:header
和 body 都已下载完成,此区块正在等待写入本地数据库。
Fetcher 同步区块哈希
而新产生区块时,会使用消息 NewBlockHashesMsg
和 NewBlockMsg
对其进行传播。因此 Fetcher
对象也是从这两个消息处发现新的区块信息的。先来看同步区块哈希的过程。
1 | case msg.Code == NewBlockHashesMsg: |
先将接收的哈希标记在远程节点上,然后去本地检索是否有这个哈希,如果本地数据库不存在的话,就放到unknown
里面,然后通知本地的fetcher
模块再去远程节点上请求此区块的header
和body
。 接下来进入到fetcher.Notify
方法中。
1 | func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time, |
它构造了一个 announce
结构,并将其发送给了 Fetcher.notify
这个 channel。注意 announce
这个结构里带着下载 header 和 body 的方法: fetchHeader
和 fetchBodies
。这两个方法在下面的过程中会讲到。 接下来我们进入到fetcher.go
的loop函数中,找到notify
,分以下几个内容:
①:校验防止Dos攻击(限制为256个)
1 | count := f.announces[notification.origin] + 1 |
②:新来的块号必须满足 $chainHeight - blockno < 7$ 或者 $blockno - chainHeight < 32$
1 | if notification.number > 0 { |
③:准备下载header
的fetching
中存在此哈希则跳过
1 | if _, ok := f.fetching[notification.hash]; ok { |
④:准备下载body
的completing
中存在此哈希也跳过
1 | if _, ok := f.completing[notification.hash]; ok { |
⑤:当确定fetching
和completing
不存在此区块哈希时,则把此区块哈希放入到announced
中,准备拉取header
和body
。
1 | f.announced[notification.hash] = append(f.announced[notification.hash], notification) |
⑥:如果 Fetcher.announced
中只有刚才新加入的这一个区块哈希,那么调用 Fetcher.rescheduleFetch
重新设置变量 fetchTimer
的周期
1 | if len(f.announced) == 1 { |
拉取header
接下来就是到fetchTimer.C
函数中:进行拉取header的操作了,具体步骤如下:
①:选择要下载的区块,从 announced
转移到 fetching
中
1 | for hash, announces := range f.announced { |
②:发送下载 header
的请求
1 | //发送所有的header请求 |
现在我们再回到f.notify
函数中,找到p.RequestOneHeader
,发送GetBlockHeadersMsg
给远程节点,然后远程节点再通过case msg.Code == GetBlockHeadersMsg
进行处理,本地区块链会返回headers,然后再发送回去。
1 | origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash) |
这时候我们请求的headers
被远程节点给发送回来了,又是通过新的消息BlockHeadersMsg
来传递的,当请求的 header
到来时,会通过两种方式来过滤header :
Fetcher.FilterHeaders
通知Fetcher
对象
1 | case msg.Code == BlockHeadersMsg: |
2.downloader.DeliverHeaders
通知downloader
对象
1 | if len(headers) > 0 || !filter { |
downloader
相关的放在接下的文章探讨。继续看FilterHeaders
:
1 | filter := make(chan *headerFilterTask) |
主要分为3个步骤:
- 先发一个通信用的
channel
给headerFilter
- 将要过滤的
headerFilterTask
发送给filter
- 检索过滤后剩余的标题
主要的处理步骤还是在loop
函数中的filter := <-f.headerFilter
,在探讨处理前,先了解三个参数的含义:
unknown:
未知的headerincomplete:
header拉取完成,但是body还没有拉取complete:
header和body都拉取完成,一个完整的块,可导入到数据库
接下来正式进入到for _, header := range task.headers {}
循环中: 这是第一段重要的循环
①:判断是否是在fetching
中的header,并且不是其他同步算法的header
1 | if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil { |
②:如果传递的header
与承诺的number
不匹配,删除peer
1 | if header.Number.Uint64() != announce.number { |
③:判断此区块在本地是否已存在,如果不存在且只有header
(空块),直接放入complete
以及f.completing
中,否则就放入到incomplete
中等待同步body
。
1 | if f.getBlock(hash) == nil { |
④:如果f.fetching
中不存在此哈希,就放入到unkown
中
1 | else { |
⑤:之后再把Unknown
的header
再通知fetcher继续过滤
1 | select { |
接着就是进入到第二个循环,要准备拿出incomplete里的哈希,进行同步body的同步
1 | for _, announce := range incomplete { |
如果f.completing
中存在,就表明已经在开始同步body
了,直接跳过,否则把这个哈希放入到f.fetched
,表示header
同步完毕,准备body
同步,由f.rescheduleComplete(completeTimer)
完成。最后是安排只有header
的区块进行导入操作.
1 | for _, block := range complete { |
重点分析completeTimer.C
,同步body
的操作,这步完成就是要准备区块导入到数据库流程了。
拉取body
进入completeTimer.C
,从f.fetched获取哈希,如果本地区块链查不到的话就把这个哈希放入到f.completing
中,再循环进行fetchBodies
,整个流程就结束了,代码大致如下:
1 | case <-completeTimer.C: |
关键的拉取body
函数: p.RequestBodies
,发送GetBlockBodiesMsg
消息同步body
。回到handler
里面去查看对应的消息:
1 | case msg.Code == GetBlockBodiesMsg: |
softResponseLimit
返回的body
大小最大为$2 * 1024 * 1024$,MaxBlockFetch
表示每个请求最多128个body
。
之后直接通过GetBodyRLP
返回数据通过SendBlockBodiesRLP
发回给节点。
节点将会接收到新消息:BlockBodiesMsg
,进入查看:
1 | // 过滤掉filter请求的body 同步,其他的都交给downloader |
过滤掉filter
请求的body
同步,其他的都交给downloader
,downloader
部分之后的篇章讲。进入到FilterBodies
:
1 | filter := make(chan *bodyFilterTask) |
主要分为3个步骤:
- 先发一个通信用的
channel
给bodyFilter
- 将要过滤的
bodyFilterTask
发送给filter
- 检索过滤后剩余的
body
现在进入到case filter := <-f.bodyFilter
里面,大致做了以下几件事:
①:首先从f.completing中获取要同步body的哈希
1 | for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ { |
②:然后从f.queued去查这个哈希是不是已经获取了body,如果没有并满足条件就创建一个完整block
1 | if f.queued[hash] == nil { |
③:最后对完整的块进行导入
1 | for _, block := range blocks { |
最后用一张粗略的图来大概的描述一下整个同步区块哈希的流程:
同步区块哈希的最终会走到f.enqueue
里面,这个也是同步区块最重要的要做的一件事,下文就会讲到。
Fetcher 同步区块
分析完上面比较复杂的同步区块哈希过程,接下来就要分析比较简单的同步区块过程。从NewBlockMsg
开始:
主要做两件事:
①:fetcher
模块导入远程节点发过来的区块
1 | pm.fetcher.Enqueue(p.id, request.Block) |
②:主动同步远程节点
1 | if _, td := p.Head(); trueTD.Cmp(td) > 0 { |
主动同步由Downloader
去处理,我们这篇只讨论fetcher
相关。
区块入队列
1 | pm.fetcher.Enqueue(p.id, request.Block) |
1 | case op := <-f.inject: |
正式进入将区块送进queue
中,主要做了以下几件事:
①: 确保新加peer
没有导致DOS
攻击
1 | count := f.queues[peer] + 1 |
②:丢弃掉过去的和比较老的区块
1 | if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist { |
③:安排区块导入
1 | if _, ok := f.queued[hash]; !ok { |
到此为止,已经将区块送入到queue
中,接下来就是要回到loop
函数中去处理queue
中的区块。
区块入库
loop函数在处理队列中的区块主要做了以下事情:
- 判断队列是否为空
- 取出区块哈希,并且和本地链进行比较,如果太高的话,就暂时不导入
- 最后通过f.insert将区块插入到数据库。
代码如下:
1 | height := f.chainHeight() |
进入到f.insert
中,主要做了以下几件事:
①:判断区块的父块是否存在,不存在则中断插入
1 | parent := f.getBlock(block.ParentHash()) |
②: 快速验证header,并在传递时广播该块
1 | switch err := f.verifyHeader(block.Header()); err { |
③:运行真正的插入逻辑
1 | if _, err := f.insertChain(types.Blocks{block}); err != nil { |
④:导入成功广播此块
1 | go f.broadcastBlock(block, false) |
真正做区块入库的是f.insertChain,这里会调用blockchain模块去操作,具体细节会后续文章讲述,到此为止Fether模块的同步就到此结束了,下面是同步区块的流程图:
参考
https://mindcarver.cn ☆☆☆☆☆