IPFS(四) 源码解读之-p2p

IPFS(四) 源码解读之-p2p,第1张

package p2p

import (

"context"

"errors"

"time"

net "gx/ipfs/QmPjvxTpVH8qJyQDnxnsxF9kv9jezKD1kozz1hs3fCGsNh/go-libp2p-net"

manet "gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net"

ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"

pro "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"

pstore "gx/ipfs/QmZR2XWVVBCtbgBWnQhWk2xcQfaR3W8faQPriAiaaj7rsr/go-libp2p-peerstore"

p2phost "gx/ipfs/Qmb8T6YBBsjYsVGfrihQLfCJveczZnneSBqBKkYEBWDjge/go-libp2p-host"

peer "gx/ipfs/QmdVrMn1LhB4ybb8hMVaMLXnA8XRSewMnK6YqXKXoTcRvN/go-libp2p-peer"

)

//P2P结构保存当前正在运行的流/监听器的信息

// P2P structure holds information on currently running streams/listeners

type P2P struct {

//监听器

Listeners ListenerRegistry

//数据流

Streams StreamRegistry

//节点ID

identity peer.ID

//节点地址

peerHost p2phost.Host

//一个线程安全的对等节点存储

peerstore pstore.Peerstore

}

//创建一个新的p2p结构

// NewP2P creates new P2P struct

//这个新的p2p结构不包含p2p结构中的监听器和数据流

func NewP2P(identity peer.ID, peerHost p2phost.Host, peerstore pstore.Peerstore) *P2P {

return &P2P{

identity: identity,

peerHost: peerHost,

peerstore: peerstore,

}

}

//新建一个数据流 工具方法 构建一个有节点id,内容和协议的流

func (p2p P2P) newStreamTo(ctx2 context.Context, p peer.ID, protocol string) (net.Stream, error) {

//30s 后会自动timeout

ctx, cancel := context.WithTimeout(ctx2, time.Second 30) //TODO: configurable?

defer cancel()

err := p2p.peerHost.Connect(ctx, pstore.PeerInfo{ID: p})

if err != nil {

return nil, err

}

return p2p.peerHost.NewStream(ctx2, p, pro.ID(protocol))

}

//对话为远程监听器创建新的P2P流

//创建一个新的p2p流实现对对话的监听

// Dial creates new P2P stream to a remote listener

//Multiaddr是一种跨协议、跨平台的表示格式的互联网地址。它强调明确性和自我描述。

//对内接收

func (p2p P2P) Dial(ctx context.Context, addr ma.Multiaddr, peer peer.ID, proto string, bindAddr ma.Multiaddr) ( ListenerInfo, error) {

//获取一些节点信息 network, host, nil

lnet, _, err := manet.DialArgs(bindAddr)

if err != nil {

return nil, err

}

//监听信息

listenerInfo := ListenerInfo{

//节点身份

Identity: p2p.identity,

////应用程序协议标识符。

Protocol: proto,

}

//调用newStreamTo 通过ctx(内容) peer(节点id) proto(协议标识符) 参数获取一个新的数据流

remote, err := p2p.newStreamTo(ctx, peer, proto)

if err != nil {

return nil, err

}

//network协议标识

switch lnet {

//network为"tcp", "tcp4", "tcp6"

case "tcp", "tcp4", "tcp6":

//从监听器获取新的信息 nla.Listener, nil

listener, err := manet.Listen(bindAddr)

if err != nil {

if err2 := remote.Reset()err2 != nil {

return nil, err2

}

return nil, err

}

//将获取的新信息保存到listenerInfo

listenerInfo.Address = listener.Multiaddr()

listenerInfo.Closer = listener

listenerInfo.Running = true

//开启接受

go p2p.doAccept(&listenerInfo, remote, listener)

default:

return nil, errors.New("unsupported protocol: " + lnet)

}

return &listenerInfo, nil

}

//

func (p2p *P2P) doAccept(listenerInfo *ListenerInfo, remote net.Stream, listener manet.Listener) {

//关闭侦听器并删除流处理程序

defer listener.Close()

//Returns a Multiaddr friendly Conn

//一个有好的 Multiaddr 连接

local, err := listener.Accept()

if err != nil {

return

}

stream := StreamInfo{

//连接协议

Protocol: listenerInfo.Protocol,

//定位节点

LocalPeer: listenerInfo.Identity,

//定位节点地址

LocalAddr: listenerInfo.Address,

//远程节点

RemotePeer: remote.Conn().RemotePeer(),

//远程节点地址

RemoteAddr: remote.Conn().RemoteMultiaddr(),

//定位

Local: local,

//远程

Remote: remote,

//注册码

Registry: &p2p.Streams,

}

//注册连接信息

p2p.Streams.Register(&stream)

//开启节点广播

stream.startStreaming()

}

//侦听器将流处理程序包装到侦听器中

// Listener wraps stream handler into a listener

type Listener interface {

Accept() (net.Stream, error)

Close() error

}

//P2PListener保存关于侦听器的信息

// P2PListener holds information on a listener

type P2PListener struct {

peerHost p2phost.Host

conChchan net.Stream

protopro.ID

ctx context.Context

cancel func()

}

//等待侦听器的连接

// Accept waits for a connection from the listener

func (il *P2PListener) Accept() (net.Stream, error) {

select {

case c := <-il.conCh:

return c, nil

case <-il.ctx.Done():

return nil, il.ctx.Err()

}

}

//关闭侦听器并删除流处理程序

// Close closes the listener and removes stream handler

func (il *P2PListener) Close() error {

il.cancel()

il.peerHost.RemoveStreamHandler(il.proto)

return nil

}

// Listen创建新的P2PListener

// Listen creates new P2PListener

func (p2p P2P) registerStreamHandler(ctx2 context.Context, protocol string) ( P2PListener, error) {

ctx, cancel := context.WithCancel(ctx2)

list := &P2PListener{

peerHost: p2p.peerHost,

proto:pro.ID(protocol),

conCh:make(chan net.Stream),

ctx: ctx,

cancel: cancel,

}

p2p.peerHost.SetStreamHandler(list.proto, func(s net.Stream) {

select {

case list.conCh <- s:

case <-ctx.Done():

s.Reset()

}

})

return list, nil

}

// NewListener创建新的p2p侦听器

// NewListener creates new p2p listener

//对外广播

func (p2p P2P) NewListener(ctx context.Context, proto string, addr ma.Multiaddr) ( ListenerInfo, error) {

//调用registerStreamHandler 构造一个新的listener

listener, err := p2p.registerStreamHandler(ctx, proto)

if err != nil {

return nil, err

}

//构造新的listenerInfo

listenerInfo := ListenerInfo{

Identity: p2p.identity,

Protocol: proto,

Address: addr,

Closer: listener,

Running: true,

Registry: &p2p.Listeners,

}

go p2p.acceptStreams(&listenerInfo, listener)

//注册连接信息

p2p.Listeners.Register(&listenerInfo)

return &listenerInfo, nil

}

//接受流

func (p2p *P2P) acceptStreams(listenerInfo *ListenerInfo, listener Listener) {

for listenerInfo.Running {

//一个有好的 远程 连接

remote, err := listener.Accept()

if err != nil {

listener.Close()

break

}

}

//取消注册表中的p2p侦听器

p2p.Listeners.Deregister(listenerInfo.Protocol)

}

// CheckProtoExists检查是否注册了协议处理程序

// mux处理程序

// CheckProtoExists checks whether a protocol handler is registered to

// mux handler

func (p2p *P2P) CheckProtoExists(proto string) bool {

protos := p2p.peerHost.Mux().Protocols()

for _, p := range protos {

if p != proto {

continue

}

return true

}

return false

}

网贷,又称为p2p网络借贷。P2P是英文peer to peer的缩写,意即“个人对个人”。网络信贷起源于英国,随后发展到美国、德国和其他国家,其典型的模式为:网络信贷公司提供平台,由借贷双方自由竞价,撮合成交。资金借出人获取利息收益,并承担风险;资金借入人到期偿还本金,网络信贷公司收取中介服务费。p2p网贷平台源码基本上是不可能给公开下载的,网上有部分p2p网贷源码泄露了,那源码也是被修改过很多次的了,会隐藏也多bug的。下载安装后期维护成本很高,还不利于二次开发!你可以去看看迪蒙的网贷系统,中国网贷系统第一品牌,安全可靠,功能也很齐全,完全能满足企业定制开发需求。

节点发现功能主要涉及 Server \ Table \ udp 这几个数据结构,它们有独自的事件响应循环,节点发现功能便是它们互相协作完成的。其中,每个以太坊客户端启动后都会在本地运行一个 Server ,并将网络拓扑中相邻的节点视为 Node ,而 Table Node 的容器, udp 则是负责维持底层的连接。下面重点描述它们中重要的字段和事件循环处理的关键部分。

PrivateKey - 本节点的私钥,用于与其他节点建立时的握手协商

Protocols - 支持的所有上层协议

StaticNodes - 预设的静态 Peer ,节点启动时会首先去向它们发起连接,建立邻居关系

newTransport - 下层传输层实现,定义握手过程中的数据加密解密方式,默认的传输层实现是用 newRLPX() 创建的 rlpx ,这不是本文的重点

ntab - 典型实现是 Table ,所有 peer Node 的形式存放在 Table

ourHandshake - 与其他节点建立连接时的握手信息,包含本地节点的版本号以及支持的上层协议

addpeer - 连接握手完成后,连接过程通过这个通道通知 Server

Server 的监听循环,启动底层监听socket,当收到连接请求时,Accept后调用 setupConn() 开始连接建立过程

Server的主要事件处理和功能实现循环

Node 唯一表示网络上的一个节点

IP - IP地址

UDP/TCP - 连接使用的UDP/TCP端口号

ID - 以太坊网络中唯一标识一个节点,本质上是一个椭圆曲线公钥(PublicKey),与 Server 的 PrivateKey 对应。一个节点的IP地址不一定是固定的,但ID是唯一的。

sha - 用于节点间的距离计算

Table 主要用来管理与本节点与其他节点的连接的建立\更新\删除

bucket - 所有 peer 按与本节点的距离远近放在不同的桶(bucket)中,详见之后的 节点维护

refreshReq - 更新 Table 请求通道

Table 的主要事件循环,主要负责控制 refresh revalidate 过程。

refresh.C - 定时(30s)启动Peer刷新过程的定时器

refreshReq - 接收其他线程投递到 Table 的 刷新Peer连接 的通知,当收到该通知时启动更新,详见之后的 更新邻居关系

revalidate.C - 定时重新检查以连接节点的有效性的定时器,详见之后的 探活检测

udp 负责节点间通信的底层消息控制,是 Table 运行的 Kademlia 协议的底层组件

conn - 底层监听端口的连接

addpending - udp 用来接收 pending 的channel。使用场景为:当我们向其他节点发送数据包后(packet)后可能会期待收到它的回复,pending用来记录一次这种还没有到来的回复。举个例子,当我们发送ping包时,总是期待对方回复pong包。这时就可以将构造一个pending结构,其中包含期待接收的pong包的信息以及对应的callback函数,将这个pengding投递到udp的这个channel。 udp 在收到匹配的pong后,执行预设的callback。

gotreply - udp 用来接收其他节点回复的通道,配合上面的addpending,收到回复后,遍历已有的pending链表,看是否有匹配的pending。

Table - 和 Server 中的ntab是同一个 Table

udp 的处理循环,负责控制消息的向上递交和收发控制

udp 的底层接受数据包循环,负责接收其他节点的 packet

以太坊使用 Kademlia 分布式路由存储协议来进行网络拓扑维护,了解该协议建议先阅读 易懂分布式 。更权威的资料可以查看 wiki 。总的来说该协议:

源码中由 Table 结构保存所有 bucket bucket 结构如下

节点可以在 entries replacements 互相转化,一个 entries 节点如果 Validate 失败,那么它会被原本将一个原本在 replacements 数组的节点替换。

有效性检测就是利用 ping 消息进行探活操作。 Table.loop() 启动了一个定时器(0~10s),定期随机选择一个bucket,向其 entries 中末尾的节点发送 ping 消息,如果对方回应了 pong ,则探活成功。

Table.loop() 会定期(定时器超时)或不定期(收到refreshReq)地进行更新邻居关系(发现新邻居),两者都调用 doRefresh() 方法,该方法对在网络上查找离自身和三个随机节点最近的若干个节点。

Table 的 lookup() 方法用来实现节点查找目标节点,它的实现就是 Kademlia 协议,通过节点间的接力,一步一步接近目标。

当一个节点启动后,它会首先向配置的静态节点发起连接,发起连接的过程称为 Dial ,源码中通过创建 dialTask 跟踪这个过程

dialTask表示一次向其他节点主动发起连接的任务

在 Server 启动时,会调用 newDialState() 根据预配置的 StaticNodes 初始化一批 dialTask , 并在 Server.run() 方法中,启动这些这些任务。

Dial 过程需要知道目标节点( dest )的IP地址,如果不知道的话,就要先使用 recolve() 解析出目标的IP地址,怎么解析?就是先要用借助 Kademlia 协议在网络中查找目标节点。

当得到目标节点的IP后,下一步便是建立连接,这是通过 dialTask.dial() 建立连接

连接建立的握手过程分为两个阶段,在在 SetupConn() 中实现

第一阶段为 ECDH密钥建立 :

第二阶段为协议握手,互相交换支持的上层协议

如果两次握手都通过,dialTask将向 Server 的 addpeer 通道发送 peer 的信息


欢迎分享,转载请注明来源:夏雨云

原文地址:https://www.xiayuyun.com/zonghe/455080.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-06-02
下一篇2023-06-02

发表评论

登录后才能评论

评论列表(0条)

    保存