http://www.7klian.com

HPB 快速同步之 boby 同步与数据处理惩罚源码理会(三)

到这里,header,body 就有了,并且统一存储在 resultCache 中。之后就是如何对 resultCache 的处理惩罚。将在下文先容。

fetchParts 的内部处理惩罚与 headers 挪用的是一样的,这里不贴代码了,只是参数纷歧样。bodyWakeCh 分支触发了 update 事件,举办 body 的同步。ReserveBodies 用于获取同步 body 的任务,焦点仍然在 reserveHeaders。

[code]

HPB 芯链首创人,巴比特专栏作家。十余年金融大数据、区块链技能开拓履历,曾参加建设银联大数据。主创区块链解说视频节目《明说》30 多期,编写了《以太坊官网文档中文版》,并作为主要作者编写了《区块链开拓指南》,在中国区块链社区以 ID“蓝莲花”知名。

进口函数是 fetchBodies,留意函数内界说的几个函数:

case packet := <-deliveryCh:if peer := this.syncer.peers.Peer(packet.PeerId()); peer != nil {// Deliver the received chunk of data and check chain validityaccepted, err := deliver(packet)if err == errInvalidChain {return err}if err != errStaleDelivery {setIdle(peer, accepted)}}// Blocks assembled, try to update the progressselect {case update <- struct{}{}:default:}

本文首发于汪晓明博客

3. 收到 bodies 后如那里理惩罚

作者:感激 HPB Wallet 开拓团队整理供稿

func (this scheduler) ReserveBodies(p *peerConnection, count int) (fetchRequest, bool, error) {isNoop := func(header *types.Header) bool {return header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash}this.lock.Lock()defer this.lock.Unlock()return this.reserveHeaders(p, count, this.blockTaskPool, this.blockTaskQueue, this.blockPendPool, this.blockDonePool, isNoop)}

[code]

*

func (this scheduler) Schedule(headers []types.Header, from uint64) []types.Header {this.lock.Lock()defer this.lock.Unlock()// Insert all the headers prioritised by the contained block numberinserts := make([]types.Header, 0, len(headers))for_, header := range headers {// Make sure chain order is honoured and preserved throughouthash := header.Hash()if header.Number == nil || header.Number.Uint64() != from {log.Warn("Header broke chain ordering", "number", header.Number, "hash", hash, "expected", from)break}if this.headerHead != (common.Hash{}) && this.headerHead != header.ParentHash {log.Warn("Header broke chain ancestry", "number", header.Number, "hash", hash)break}// Make sure no duplicate requests are executedif_, ok := this.blockTaskPool[hash]; ok {log.Warn("Header already scheduled for block fetch", "number", header.Number, "hash", hash)continue}if_, ok := this.receiptTaskPool[hash]; ok {log.Warn("Header already scheduled for receipt fetch", "number", header.Number, "hash", hash)continue}// Queue the header for content retrievalthis.blockTaskPool[hash] = headerthis.blockTaskQueue.Push(header, -float32(header.Number.Uint64()))if this.mode == config.FastSync && header.Number.Uint64() <= this.fastSyncPivot {// Fast phase of the fast sync, retrieve receipts toothis.receiptTaskPool[hash] = headerthis.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))}inserts = append(inserts, header)this.headerHead = hashfrom++}return inserts}
func (this scheduler) deliver(id string, taskPool map[common.Hash]types.Header, taskQueue prque.Prque,pendPool map[string]fetchRequest, donePool map[common.Hash]struct{}, reqTimer metrics.Timer,results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) (int, error) {request := pendPool[id]for i, header := range request.Headers {// Short circuit assembly if no more fetch results are foundif i >= results {break}// Reconstruct the next result if contents match upindex := int(header.Number.Int64() - int64(this.resultOffset))if index >= len(this.resultCache) || index < 0 || this.resultCache[index] == nil {failure = errInvalidChainlog.Error("invalid hash chain(deliver)", "index", index, "len(resultCache)", len(this.resultCache))break}if err := reconstruct(header, i, this.resultCache[index]); err != nil {failure = errbreak}donePool[header.Hash()] = struct{}{}this.resultCache[index].Pending--useful = trueaccepted++// Clean up a successful fetchrequest.Headers[i] = nildelete(taskPool, header.Hash())}// Return all failed or missing fetches to the queuefor_, header := range request.Headers {if header != nil {taskQueue.Push(header, -float32(header.Number.Uint64()))}}// Wake up WaitResultsif accepted > 0 {this.active.Signal()}// If none of the data was good, it's a stale deliveryswitch {case failure == nil || failure == errInvalidChain:return accepted, failurecase useful:return accepted, fmt.Errorf("partial failure: %v", failure)default:return accepted, errStaleDelivery}}
func HandleBlockBodiesMsg(p p2p.Peer, msg p2p.Msg) error {// A batch of block bodies arrived to one of our previous requestsvar request blockBodiesDataif err := msg.Decode(&request;); err != nil {return p2p.ErrResp(p2p.ErrDecode, "msg %v: %v", msg, err)}// Deliver them all to the downloader for queuingtrasactions := make([][]types.Transaction, len(request))uncles := make([][]*types.Header, len(request))for i, body := range request {trasactions[i] = body.Transactionsuncles[i] = body.Uncles}// Filter out any explicitly requested bodies, deliver the rest to the downloaderfilter := len(trasactions) > 0 || len(uncles) > 0if filter {trasactions, uncles = InstanceSynCtrl().puller.FilterBodies(p.GetID(), trasactions, uncles, time.Now())}if len(trasactions) > 0 || len(uncles) > 0 || !filter {err := InstanceSynCtrl().syner.DeliverBodies(p.GetID(), trasactions, uncles)if err != nil {log.Debug("Failed to deliver bodies", "err", err)}}return nil}

先从 pendpool 中取出 request,这里的 request 是请求 bodies 时生成的。遍历 request,然后通过 reconstruct 将 body 填充到 resultCache 中,这样一来 resultcache 中既有 header,又有 body,就可以构成一个完整的块了。

留意 :挪用 reserveHeaders 的参数,blockTaskPool,blockTashQueue 都是在 processheader 后,通知 bodywakech 之前通过 Schedule 函数写入的。

[code]
[/code]

HPB 芯链官网

郑重声明:本文版权归原作者所有,转载文章仅为传播更多信息之目的,如作者信息标记有误,请第一时间联系我们修改或删除,多谢。