仓库源文站点原文


title: Rust中使用libp2p toc: true cover: 'https://img.paulzzh.com/touhou/random?99' date: 2023-12-27 22:12:07 categories: Rust tags: [Rust, P2P, libp2p]

description: 在个人数据日益被侵犯的现在,P2P技术变得越来越重要;本文通过编写一个实例,讲解了P2P中的一些概念,以及如何使用libp2p来进行点对点应用的开发!

在个人数据日益被侵犯的现在,P2P技术变得越来越重要;

本文通过编写一个实例,讲解了P2P中的一些概念,以及如何使用libp2p来进行点对点应用的开发!

源代码:

<br/>

<!--more-->

Rust中使用libp2p

P2P 概述

P2P 简介

<br/>

P2P 的特点

<br/>

P2P 的复杂性

<br/>

P2P 的要求

传输

Peer 身份

安全

Peer 路由

消息

流多路复用

<font color="#f00">**注意:多路复用在后端服务开发中很常见,其中客户端可以与服务器建立底层网络连接,然后通过底层网络连接多路复用不同的流(每个流具有唯一的端口号)**</font>

<br/>

Libp2p概述

libp2p 是一个由协议、规范和库组成的模块化系统,它支持 P2P 应用程序的开发;

它目前支持三种语言:JS、Go、Rust、JVM(Kotlin编写),未来将支持 Haskell、Python等;

它被许多流行的项目使用,例如:IPFS、Filecoin 和 Polkadot 等;

<font color="#f00">**由于目前 rust-libp2p 每次更新版本时, API 的变化都比较大!**</font>

<font color="#f00">**所以在使用时需要参考 CHANGELOG 来进行代码迁移!**</font>

例如,Swarm 参考:

<br/>

Libp2p 主要模块

<br/>

P2P 节点的身份

P2P Node

PeerId: 12d3k.....

<br/>

公钥和私钥

公钥和私钥的例子 - 访问传统的服务器

<br/>

多地址(Multiaddresses)

<br/>

Swarm概述

Swarm 是 libp2p 中给定 P2P 节点内的网络管理器模块;

它维护从给定节点到远程节点的所有活动和挂起连接,并管理已打开的所有子流的状态;

Swarm 的结构和上下文环境

Swarm 代表了一个低级接口,并提供了对 libp2p 网络的细粒度控制。Swarm 是使用传输、网络行为和节点 peer id 的组合构建的。

传输(Transport)会指明如何在网络上发送字节,而网络行为(Behaviour)会指明发送什么字节,发送给谁。

多个网络行为可以与单个运行节点相关联。

需要注意的是:同一套客户端和服务端代码在 libp2p 网络的所有节点上运行,即每个Node即是客户端又是服务端;

这与客户端和服务器具有不同代码库的 Client-Server 模型不同;

<br/>

发现 peer

mDNS 是由 RFC 6762(datatracker.ietf.org/doc/html/rfc6762)定义的协议,它将主机名解析为 IP 地址。

在 libp2p 中,mDNS 用于发现网络上的其他节点。

在 libp2p 中实现的网络行为 mDNS 将自动发现本地网络上的其他 libp2p 节点。

详见:

<br/>

rust-libp2p实践

上面介绍了关于 P2P 和 libp2p 中的一些概念;

最主要的是:Peer、非对称加密、Transport、Behaviour 以及 Swarm 的概念;

下面我们将使用 rust-libp2p 来创建一个点对点的应用;

注意:

<font color="#f00">**由于 [rust-libp2p](https://github.com/libp2p/rust-libp2p/) 的 API 变化较大,这里我们采用的是目前最新的版本:`0.52`;**</font>

<font color="#f00">**该版本要求最低的 Rust 版本为 `1.71.0`,可能需要使用 `rustup update stable` 更新**!</font>

接下来我们会创建一个简单的关于菜谱的应用:

用户可以:

<br/>

创建项目

使用 cargo 创建一个项目,增加依赖:

[dependencies]
libp2p = { version = "0.52", features = ["tokio", "floodsub", "noise", "tcp", "yamux", "mdns", "macros", "identify"] }
tokio = { version = "1", features = ["io-util", "io-std", "macros", "rt", "rt-multi-thread", "fs", "time", "sync"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
once_cell = "1.5"
log = "0.4"
pretty_env_logger = "0.4"
anyhow = "1.0.77"

<br/>

全局变量

前面提到,p2p 中使用非对称加密来确定一个 PeerId,这里我们使用全局变量来定义一个全局唯一的 PeerId:

src/consts.rs

pub const STORAGE_FILE_PATH: &str = "./recipes.json";

/// Key pair enables us to communicate securely with the rest of the network, making sure no one can impersonate
pub static KEYS: Lazy<identity::Keypair> = Lazy::new(identity::Keypair::generate_ed25519);

/// A unique identifier for a specific peer within the whole peer to peer network
///
/// Derive from a key pair to ensure its uniqueness
pub static PEER_ID: Lazy<PeerId> = Lazy::new(|| PeerId::from(KEYS.public()));

/// A Topic is a concept from Floodsub, which is an implementation of libp2p’s pub/sub interface
pub static TOPIC: Lazy<Topic> = Lazy::new(|| Topic::new("recipes"));

说明:

<br/>

模型定义

接下来定义几个结构体类型;

存储菜谱信息 Recipe:

src/models.rs

/// The recipe data for cook
#[derive(Debug, Serialize, Deserialize)]
pub struct Recipe {
    pub id: usize,
    pub name: String,
    pub ingredients: String,
    pub instructions: String,
    pub shared: bool,
}

列出菜谱的模式(全部、根据 PeerId):

src/models.rs

/// Fetch data mode
#[derive(Debug, Serialize, Deserialize)]
pub enum ListMode {
    /// Fetch from all peers
    All,

    /// Fetch from one specific peer
    One(String),
}

列出菜谱的请求、响应:

src/models.rs

#[derive(Debug, Serialize, Deserialize)]
pub struct ListRequest {
    pub mode: ListMode,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ListResponse {
    pub mode: ListMode,
    pub data: Vec<Recipe>,
    pub receiver: String,
}

Tokio select 事件枚举(后面会用到):

src/models.rs

pub enum EventType {
    Response(ListResponse),
    Input(String),
}

<br/>

创建并启动一个 Swarm

前面介绍了 Swarm 的概念,Swarm 是一个管理器,管理所有节点的活动、挂起、连接等等;

在创建一个 Swarm 时大概要配置以下几个内容:

下面就创建了一个 Swarm:

src/main.rs

let mut swarm = libp2p::SwarmBuilder::with_existing_identity(KEYS.clone())
  .with_tokio()
  .with_tcp(
    tcp::Config::default(),
    noise::Config::new,
    yamux::Config::default,
  )?
  .with_behaviour(|_key| RecipeBehaviour {
    flood_sub: Floodsub::new(*PEER_ID),
    mdns: mdns::tokio::Behaviour::new(mdns::Config::default(), KEYS.public().to_peer_id())
    .expect("can create mdns"),
  })?
  .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(5)))
  .build();

创建 Swarm 的部分:

通过 listen_on 即可启动监听:

src/main.rs

Swarm::listen_on(
        &mut swarm,
        "/ip4/0.0.0.0/tcp/0"
            .parse()
            .expect("can get a local socket"),
    )
    .expect("swarm can be started");

监听 IPV4 下任意 IP 来源(0.0.0.0)的连接,并且监听端口由系统分配(端口号为0);

此外,这里我们还启动了 Swarm 中定义的 FloodSub 对我们 Topic 的监听:

src/main.rs

swarm.behaviour_mut().flood_sub.subscribe(TOPIC.clone());

<br/>

定义网络行为 NetworkBehaviour

在上一小节创建 Swarm 时,我们使用了 RecipeBehaviour 我们自定义的行为;

下面我们来看是如何定义的:

src/behaviour.rs

#[derive(NetworkBehaviour)]
#[behaviour(to_swarm = "RecipeBehaviourEvent")]
pub struct RecipeBehaviour {
    pub(crate) flood_sub: Floodsub,
    pub(crate) mdns: mdns::tokio::Behaviour,
}

#[derive(Debug)]
pub enum RecipeBehaviourEvent {
    Floodsub(FloodsubEvent),
    Mdns(mdns::Event),
}

impl From<FloodsubEvent> for RecipeBehaviourEvent {
    fn from(event: FloodsubEvent) -> RecipeBehaviourEvent {
        RecipeBehaviourEvent::Floodsub(event)
    }
}

impl From<mdns::Event> for RecipeBehaviourEvent {
    fn from(event: mdns::Event) -> RecipeBehaviourEvent {
        RecipeBehaviourEvent::Mdns(event)
    }
}

首先是 RecipeBehaviour 结构体,其中包括了 FloodSub 和 mDNS 两个 Behaviour;

分别处理了:消息广播和节点自动发现;

<font color="#f00">**需要注意的是:RecipeBehaviour 结构体使用 `#[derive(NetworkBehaviour)]` 标识;**</font>

<font color="#f00">**此宏会自动实现部分逻辑,同时要求被标注的结构体只能含有实现了 NetworkBehaviour 的属性!**</font>

<font color="#f00">**注意:之前可以在 NetworkBehaviour 结构体中使用 `#[behaviour(ignore)]` 的方式被废弃!**</font>

见:

<font color="#f00">**同时,BehaviourEvent 的定义被抽取到了枚举中,通过 `#[behaviour(to_swarm = "RecipeBehaviourEvent")]` 来声明;**</font>

<font color="#f00">**后面可以看到,我们通过将 RecipeBehaviour 中定义的行为产生的事件统一使用 From Trait 转换为RecipeBehaviourEvent 来统一处理;**</font>

<font color="#f00">**这也是最新 rust-libp2p 官方推荐的方式!**</font>

<br/>

处理输入请求

在前面我们定义了我们应用允许用户操作的一些行为,用户可以:

下面我们来简单实现:

src/main.rs

let (response_sender, mut response_rcv) = mpsc::unbounded_channel();
let mut stdin = tokio::io::BufReader::new(tokio::io::stdin()).lines();
loop {
  let evt = {
    tokio::select! {
      line = stdin.next_line() => Some(EventType::Input(line.expect("can get line").expect("can read line from stdin"))),
      response = response_rcv.recv() => Some(EventType::Response(response.expect("response exists"))),
      _ = handle_swarm_event(response_sender.clone(), &mut swarm) => None,
    }
  };

  if let Some(event) = evt {
    match event {
      EventType::Response(resp) => {
        let json = serde_json::to_string(&resp).expect("can jsonify response");
        swarm
        .behaviour_mut()
        .flood_sub
        .publish(TOPIC.clone(), json.as_bytes());
      }
      EventType::Input(line) => match line.as_str() {
        "ls p" => handle_list_peers(&mut swarm).await,
        cmd if cmd.starts_with("create r") => handle_create_recipe(cmd).await,
        cmd if cmd.starts_with("publish r") => handle_publish_recipe(cmd).await,
        cmd if cmd.starts_with("ls r") => handle_list_recipes(cmd, &mut swarm).await,
        _ => error!("unknown command: {:?}", line),
      },
    }
  }
}

为了简单实现,我们使用 StdIn 作为输入;

在 loop 无限循环中,我们使用 tokio::select! 来等待准备完成的事件:

上面的 select 最终会返回我们之前定义的 EventType 类型:

pub enum EventType {
  Input(String),
  Response(ListResponse),
}

对于 Input 的处理比较简单,直接对应我们之前对于用户的定义处理即可:

EventType::Input(line) => match line.as_str() {
  "ls p" => handle_list_peers(&mut swarm).await,
  cmd if cmd.starts_with("create r") => handle_create_recipe(cmd).await,
  cmd if cmd.starts_with("publish r") => handle_publish_recipe(cmd).await,
  cmd if cmd.starts_with("ls r") => handle_list_recipes(cmd, &mut swarm).await,
  _ => error!("unknown command: {:?}", line),
},

对于 Response 则是将结果广播给其他所有 Node:

EventType::Response(resp) => {
  let json = serde_json::to_string(&resp).expect("can jsonify response");
  swarm
  .behaviour_mut()
  .flood_sub
  .publish(TOPIC.clone(), json.as_bytes());
}

下面来看处理用户输入的逻辑;

<br/>

列出全部对等点

使用 ls p 列出所有目前已连接的对等点:

src/handlers.rs

pub async fn handle_list_peers(swarm: &mut Swarm<RecipeBehaviour>) {
    info!("Discovered Peers:");
    let nodes = swarm.behaviour().mdns.discovered_nodes();

    let mut unique_peers = HashSet::new();
    for peer in nodes {
        unique_peers.insert(peer);
    }
    unique_peers.iter().for_each(|p| info!("{}", p));
}

通过 swarm 可以获取到 mdns 中自动发现的节点;

逻辑非常简单,这里不再赘述;

<br/>

创建Recipe

使用 create r recipe_name|recipe_ingredients|recipe_instruction 创建一个新的 Recipe;

创建Recipe 只涉及到本地数据,不涉及 p2p 部分,逻辑比较简单:

src/handlers.rs

pub async fn handle_create_recipe(cmd: &str) {
  if let Some(rest) = cmd.strip_prefix("create r") {
    let elements: Vec<&str> = rest.split('|').collect();
    if elements.len() < 3 {
      info!("too few arguments - Format: name|ingredients|instructions");
    } else {
      let name = elements.first().expect("name is there");
      let ingredients = elements.get(1).expect("ingredients is there");
      let instructions = elements.get(2).expect("instructions is there");
      if let Err(e) = create_new_recipe(name, ingredients, instructions).await {
        error!("error creating recipe: {}", e);
      };
    }
  }
}

async fn create_new_recipe(name: &str, ingredients: &str, instructions: &str) -> Result<()> {
  let mut local_recipes = read_local_recipes().await?;
  let new_id = match local_recipes.iter().max_by_key(|r| r.id) {
    Some(v) => v.id + 1,
    None => 0,
  };
  local_recipes.push(Recipe {
    id: new_id,
    name: name.to_owned(),
    ingredients: ingredients.to_owned(),
    instructions: instructions.to_owned(),
    shared: false,
  });
  write_local_recipes(&local_recipes).await?;

  info!("Created recipe:");
  info!("Name: {}", name);
  info!("Ingredients: {}", ingredients);
  info!("Instructions:: {}", instructions);

  Ok(())
}

async fn write_local_recipes(recipes: &Vec<Recipe>) -> Result<()> {
  let json = serde_json::to_string(&recipes)?;
  fs::write(STORAGE_FILE_PATH, &json).await?;
  Ok(())
}

async fn read_local_recipes() -> Result<Vec<Recipe>> {
  let content = fs::read(STORAGE_FILE_PATH).await?;
  let result = serde_json::from_slice(&content)?;
  Ok(result)
}

逻辑比较简单,就是解析命令行传入的参数,创建结构体,先加载数据文件的数据,然后添加一条再写回文件;

频繁读写文件的效率非常的低,这里只是作为例子演示简单实现!

<br/>

发布Recipe

发布菜谱来对他人分享:publish r recipe_id

发布 Recipe 的逻辑也非常简单:

src/handlers.rs

pub async fn handle_publish_recipe(cmd: &str) {
  if let Some(rest) = cmd.strip_prefix("publish r") {
    match rest.trim().parse::<usize>() {
      Ok(id) => {
        if let Err(e) = publish_recipe(id).await {
          info!("error publishing recipe with id {}, {}", id, e)
        } else {
          info!("Published Recipe with id: {}", id);
        }
      }
      Err(e) => error!("invalid id: {}, {}", rest.trim(), e),
    };
  }
}

async fn publish_recipe(id: usize) -> Result<()> {
  let mut local_recipes = read_local_recipes().await?;
  local_recipes
  .iter_mut()
  .filter(|r| r.id == id)
  .for_each(|r| r.shared = true);
  write_local_recipes(&local_recipes).await?;
  Ok(())
}

首先解析命令行传入的Recipe Id,然后修改文件中对应 Id 的 shared 字段,改为 true;

和创建Recipe类似,这里也会频繁读写文件,效率较低,仅作为展示例子;

<br/>

查询菜谱(发送消息)

查询菜谱比较复杂,可以通过下面三种形式:

下面来看实现:

src/handlers.rs

pub async fn handle_list_recipes(cmd: &str, swarm: &mut Swarm<RecipeBehaviour>) {
  let rest = cmd.strip_prefix("ls r ");
  match rest {
    Some("all") => {
      let req = ListRequest {
        mode: ListMode::All,
      };
      let json = serde_json::to_string(&req).expect("can jsonify request");
      swarm
      .behaviour_mut()
      .flood_sub
      .publish(TOPIC.clone(), json.as_bytes());
    }
    Some(recipes_peer_id) => {
      let req = ListRequest {
        mode: ListMode::One(recipes_peer_id.to_owned()),
      };
      let json = serde_json::to_string(&req).expect("can jsonify request");
      swarm
      .behaviour_mut()
      .flood_sub
      .publish(TOPIC.clone(), json.as_bytes());
    }
    None => {
      match read_local_recipes().await {
        Ok(v) => {
          info!("Local Recipes ({})", v.len());
          v.iter().for_each(|r| info!("{:?}", r));
        }
        Err(e) => error!("error fetching local recipes: {}", e),
      };
    }
  };
}

首先,解析命令行参数:

<br/>

响应消息

前面我们通过 FloodSub 向对等网中的其他节点发送了请求,那么如何响应消息呢?

这就是 Swarm 自定义逻辑中的核心部分了!

注意到,在前面的 处理输入请求 一小节,我们在 tokio::select! 中定义了:

tokio::select! {
  ...
  _ = handle_swarm_event(response_sender.clone(), &mut swarm) => None,
}

我们正是在这个函数中处理的 BehaviourEvent 逻辑,下面来看:

src/handlers.rs

pub async fn handle_swarm_event(
  response_sender: mpsc::UnboundedSender<ListResponse>,
  swarm: &mut Swarm<RecipeBehaviour>,
) {
  let event = swarm.select_next_some().await;
  info!("Income swarm Event: {:?}", event);

  match event {
    SwarmEvent::Behaviour(recipe_behaviours) => match recipe_behaviours {
      RecipeBehaviourEvent::Floodsub(flood_sub_event) => match flood_sub_event {
        FloodsubEvent::Message(msg) => {
          if let Ok(resp) = serde_json::from_slice::<ListResponse>(&msg.data) {
            if resp.receiver == PEER_ID.to_string() {
              info!("Response from {}:", msg.source);
              resp.data.iter().for_each(|r| info!("{:?}", r));
            }
          } else if let Ok(req) = serde_json::from_slice::<ListRequest>(&msg.data) {
            match req.mode {
              ListMode::All => {
                info!("Received ALL req: {:?} from {:?}", req, msg.source);
                respond_with_public_recipes(
                  response_sender.clone(),
                  msg.source.to_string(),
                );
              }
              ListMode::One(ref peer_id) => {
                if peer_id == &PEER_ID.to_string() {
                  info!("Received req: {:?} from {:?}", req, msg.source);
                  respond_with_public_recipes(
                    response_sender.clone(),
                    msg.source.to_string(),
                  );
                }
              }
            }
          }
        }
        FloodsubEvent::Subscribed { .. } => {}
        FloodsubEvent::Unsubscribed { .. } => {}
      },
      RecipeBehaviourEvent::Mdns(mdns_event) => match mdns_event {
        Event::Discovered(discovered_list) => {
          let behavior_mut = swarm.behaviour_mut();
          for (peer, _addr) in discovered_list {
            behavior_mut.flood_sub.add_node_to_partial_view(peer);
          }
        }
        Event::Expired(expired_list) => {
          let behavior_mut = swarm.behaviour_mut();
          for (peer, _addr) in expired_list {
            if !behavior_mut.mdns.has_node(&peer) {
              behavior_mut.flood_sub.remove_node_from_partial_view(&peer);
            }
          }
        }
      },
    },
    SwarmEvent::ConnectionEstablished {
      peer_id,
      connection_id,
      endpoint,
      num_established,
      ..
    } => {
      debug!("[Connection established] peer_id: {}, connection_id: {}, endpoint: {:?}, num_established: {:?}", peer_id, connection_id, endpoint, num_established);
    }
    SwarmEvent::ConnectionClosed {
      peer_id,
      connection_id,
      endpoint,
      num_established,
      ..
    } => {
      debug!("[Connection closed] peer_id: {}, connection_id: {}, endpoint: {:?}, num_established: {:?}", peer_id, connection_id, endpoint, num_established);
    }
    SwarmEvent::IncomingConnection { .. } => {}
    SwarmEvent::IncomingConnectionError { .. } => {}
    SwarmEvent::OutgoingConnectionError { .. } => {}
    SwarmEvent::NewListenAddr { .. } => {}
    SwarmEvent::ExpiredListenAddr { .. } => {}
    SwarmEvent::ListenerClosed { .. } => {}
    SwarmEvent::ListenerError { .. } => {}
    SwarmEvent::Dialing { .. } => {}
  };
}

可以看到,我们正是通过 let event = swarm.select_next_some().await 来获取的 Swarm 中所有的 Event 事件!

随后我们即可枚举全部的 Event 事件:

可以看到,得益于 Rust 强大的枚举类型,我们可以非常清晰的处理各种事件!

<br/>

处理FloodSubEvent

处理 FloodSubEvent 的逻辑如下:

RecipeBehaviourEvent::Floodsub(flood_sub_event) => match flood_sub_event {
  FloodsubEvent::Message(msg) => {
    if let Ok(resp) = serde_json::from_slice::<ListResponse>(&msg.data) {
      if resp.receiver == PEER_ID.to_string() {
        info!("Response from {}:", msg.source);
        resp.data.iter().for_each(|r| info!("{:?}", r));
      }
    } else if let Ok(req) = serde_json::from_slice::<ListRequest>(&msg.data) {
      match req.mode {
        ListMode::All => {
          info!("Received ALL req: {:?} from {:?}", req, msg.source);
          respond_with_public_recipes(
            response_sender.clone(),
            msg.source.to_string(),
          );
        }
        ListMode::One(ref peer_id) => {
          if peer_id == &PEER_ID.to_string() {
            info!("Received req: {:?} from {:?}", req, msg.source);
            respond_with_public_recipes(
              response_sender.clone(),
              msg.source.to_string(),
            );
          }
        }
      }
    }
  }
  FloodsubEvent::Subscribed { .. } => {}
  FloodsubEvent::Unsubscribed { .. } => {}
},

首先我们判断:

respond_with_public_recipes 的逻辑如下:

fn respond_with_public_recipes(sender: mpsc::UnboundedSender<ListResponse>, receiver: String) {
  tokio::spawn(async move {
    match read_local_recipes().await {
      Ok(recipes) => {
        let resp = ListResponse {
          mode: ListMode::All,
          receiver,
          data: recipes.into_iter().filter(|r| r.shared).collect(),
        };
        if let Err(e) = sender.send(resp) {
          error!("error sending response via channel, {}", e);
        }
      }
      Err(e) => error!("error fetching local recipes to answer ALL request, {}", e),
    }
  });
}

读取本地 Recipe 数据,并通过 sender 发送即可!

tokio::select! 中会接收响应,并广播出去:

tokio::select! {
  response = response_rcv.recv() => Some(EventType::Response(response.expect("response exists"))),
}

EventType::Response(resp) => {
  let json = serde_json::to_string(&resp).expect("can jsonify response");
  swarm
  .behaviour_mut()
  .flood_sub
  .publish(TOPIC.clone(), json.as_bytes());
}

<br/>

处理mDNSEvent

mDNS 包括了两个事件:

#[derive(Debug, Clone)]
pub enum Event {
  /// Discovered nodes through mDNS.
  Discovered(Vec<(PeerId, Multiaddr)>),

  /// The given combinations of `PeerId` and `Multiaddr` have expired.
  ///
  /// Each discovered record has a time-to-live. When this TTL expires and the address hasn't
  /// been refreshed, we remove it from the list and emit it as an `Expired` event.
  Expired(Vec<(PeerId, Multiaddr)>),
}

分别为:

处理逻辑如下:

src/handlers.rs

RecipeBehaviourEvent::Mdns(mdns_event) => match mdns_event {
  Event::Discovered(discovered_list) => {
    let behavior_mut = swarm.behaviour_mut();
    for (peer, _addr) in discovered_list {
      behavior_mut.flood_sub.add_node_to_partial_view(peer);
    }
  }
  Event::Expired(expired_list) => {
    let behavior_mut = swarm.behaviour_mut();
    for (peer, _addr) in expired_list {
      if !behavior_mut.mdns.has_node(&peer) {
        behavior_mut.flood_sub.remove_node_from_partial_view(&peer);
      }
    }
  }
},

即:

逻辑非常简单;

至此,我们的应用开发完毕!

<br/>

应用测试

我们可以通过 cargo run 在本机创建多个节点;

cargo run

INFO  rust_learn > Peer Id: 12D3KooWA7xhiEmFxikn9aiWcffkhDACDhz1rRPXxkC4yxgnzJCT
INFO  libp2p_mdns::behaviour::iface > creating instance on iface 192.168.31.22
INFO  rust_learn::handlers          > Income swarm Event: NewListenAddr { listener_id: ListenerId(1), address: "/ip4/127.0.0.1/tcp/65248" }
INFO  rust_learn::handlers          > Income swarm Event: NewListenAddr { listener_id: ListenerId(1), address: "/ip4/192.168.31.22/tcp/65248" }
INFO  libp2p_mdns::behaviour        > discovered: 12D3KooWGEGJQhFaR4ZzJ15CUvMVVu1wcaGd3i7yzvHHYFexbfT7 /ip4/192.168.31.22/tcp/65247
INFO  rust_learn::handlers          > Income swarm Event: Behaviour(Mdns(Discovered([(PeerId("12D3KooWGEGJQhFaR4ZzJ15CUvMVVu1wcaGd3i7yzvHHYFexbfT7"), "/ip4/192.168.31.22/tcp/65247")])))
INFO  rust_learn::handlers          > Income swarm Event: Dialing { peer_id: Some(PeerId("12D3KooWGEGJQhFaR4ZzJ15CUvMVVu1wcaGd3i7yzvHHYFexbfT7")), connection_id: ConnectionId(1) }
INFO  rust_learn::handlers          > Income swarm Event: ConnectionEstablished { peer_id: PeerId("12D3KooWGEGJQhFaR4ZzJ15CUvMVVu1wcaGd3i7yzvHHYFexbfT7"), connection_id: ConnectionId(1), endpoint: Dialer { address: "/ip4/192.168.31.22/tcp/65247/p2p/12D3KooWGEGJQhFaR4ZzJ15CUvMVVu1wcaGd3i7yzvHHYFexbfT7", role_override: Dialer }, num_established: 1, concurrent_dial_errors: Some([]), established_in: 7.355625ms }
INFO  rust_learn::handlers          > Income swarm Event: Behaviour(Floodsub(Subscribed { peer_id: PeerId("12D3KooWGEGJQhFaR4ZzJ15CUvMVVu1wcaGd3i7yzvHHYFexbfT7"), topic: Topic("recipes") }))
INFO  libp2p_mdns::behaviour        > discovered: 12D3KooWCWesVZsAoDaFs7UZYXV6gTNd56UPMoiWfxWFgvLCJZhz /ip4/192.168.31.22/tcp/65250
INFO  rust_learn::handlers          > Income swarm Event: Behaviour(Mdns(Discovered([(PeerId("12D3KooWCWesVZsAoDaFs7UZYXV6gTNd56UPMoiWfxWFgvLCJZhz"), "/ip4/192.168.31.22/tcp/65250")])))
INFO  rust_learn::handlers          > Income swarm Event: Dialing { peer_id: Some(PeerId("12D3KooWCWesVZsAoDaFs7UZYXV6gTNd56UPMoiWfxWFgvLCJZhz")), connection_id: ConnectionId(2) }
INFO  rust_learn::handlers          > Income swarm Event: IncomingConnection { connection_id: ConnectionId(3), local_addr: "/ip4/192.168.31.22/tcp/65248", send_back_addr: "/ip4/192.168.31.22/tcp/65253" }
INFO  rust_learn::handlers          > Income swarm Event: ConnectionEstablished { peer_id: PeerId("12D3KooWCWesVZsAoDaFs7UZYXV6gTNd56UPMoiWfxWFgvLCJZhz"), connection_id: ConnectionId(2), endpoint: Dialer { address: "/ip4/192.168.31.22/tcp/65250/p2p/12D3KooWCWesVZsAoDaFs7UZYXV6gTNd56UPMoiWfxWFgvLCJZhz", role_override: Dialer }, num_established: 1, concurrent_dial_errors: Some([]), established_in: 5.762334ms }
INFO  rust_learn::handlers          > Income swarm Event: ConnectionEstablished { peer_id: PeerId("12D3KooWCWesVZsAoDaFs7UZYXV6gTNd56UPMoiWfxWFgvLCJZhz"), connection_id: ConnectionId(3), endpoint: Listener { local_addr: "/ip4/192.168.31.22/tcp/65248", send_back_addr: "/ip4/192.168.31.22/tcp/65253" }, num_established: 2, concurrent_dial_errors: None, established_in: 5.212125ms }
INFO  rust_learn::handlers          > Income swarm Event: Behaviour(Floodsub(Subscribed { peer_id: PeerId("12D3KooWCWesVZsAoDaFs7UZYXV6gTNd56UPMoiWfxWFgvLCJZhz"), topic: Topic("recipes") }))

查看所有对等节点:

ls p

 INFO  rust_learn::handlers          > Discovered Peers:
 INFO  rust_learn::handlers          > 12D3KooWGEGJQhFaR4ZzJ15CUvMVVu1wcaGd3i7yzvHHYFexbfT7
 INFO  rust_learn::handlers          > 12D3KooWCWesVZsAoDaFs7UZYXV6gTNd56UPMoiWfxWFgvLCJZhz

创建 Recipe:

create r name|recipe_ingredients|recipe_instruction

 INFO  rust_learn::handlers          > Created recipe:
 INFO  rust_learn::handlers          > Name:  name
 INFO  rust_learn::handlers          > Ingredients: recipe_ingredients
 INFO  rust_learn::handlers          > Instructions:: recipe_instruction

列出本地 Recipe:

ls r

 INFO  rust_learn::handlers          > Local Recipes (6)
 INFO  rust_learn::handlers          > Recipe { id: 0, name: " Coffee", ingredients: "Coffee", instructions: "Make Coffee", shared: true }
 INFO  rust_learn::handlers          > Recipe { id: 1, name: " Tea", ingredients: "Tea, Water", instructions: "Boil Water, add tea", shared: true }
 INFO  rust_learn::handlers          > Recipe { id: 2, name: " Carrot Cake", ingredients: "Carrots, Cake", instructions: "Make Carrot Cake", shared: true }
 INFO  rust_learn::handlers          > Recipe { id: 3, name: " Name", ingredients: "Ingredients", instructions: "Instructions", shared: true }
 INFO  rust_learn::handlers          > Recipe { id: 4, name: " name", ingredients: "recipeIngredients", instructions: "instruction", shared: true }
 INFO  rust_learn::handlers          > Recipe { id: 5, name: " name", ingredients: "recipe_ingredients", instructions: "recipe_instruction", shared: false }

列出所有对等点 Recipe:

ls r all

 INFO  rust_learn::handlers          > Income swarm Event: Behaviour(Floodsub(Message(FloodsubMessage { source: PeerId("12D3KooWGEGJQhFaR4ZzJ15CUvMVVu1wcaGd3i7yzvHHYFexbfT7"), data: [123, 34, 109, 111,...
 INFO  rust_learn::handlers          > Response from 12D3KooWGEGJQhFaR4ZzJ15CUvMVVu1wcaGd3i7yzvHHYFexbfT7:
 INFO  rust_learn::handlers          > Recipe { id: 0, name: " Coffee", ingredients: "Coffee", instructions: "Make Coffee", shared: true }
 INFO  rust_learn::handlers          > Recipe { id: 1, name: " Tea", ingredients: "Tea, Water", instructions: "Boil Water, add tea", shared: true }
 INFO  rust_learn::handlers          > Recipe { id: 2, name: " Carrot Cake", ingredients: "Carrots, Cake", instructions: "Make Carrot Cake", shared: true }
 INFO  rust_learn::handlers          > Recipe { id: 3, name: " Name", ingredients: "Ingredients", instructions: "Instructions", shared: true }
 INFO  rust_learn::handlers          > Recipe { id: 4, name: " name", ingredients: "recipeIngredients", instructions: "instruction", shared: true }
 INFO  rust_learn::handlers          > Income swarm Event: Behaviour(Floodsub(Message(FloodsubMessage { source: PeerId("12D3KooWCWesVZsAoDaFs7UZYXV6gTNd56UPMoiWfxWFgvLCJZhz"), data: [123, 34, 109....
 INFO  rust_learn::handlers          > Response from 12D3KooWCWesVZsAoDaFs7UZYXV6gTNd56UPMoiWfxWFgvLCJZhz:
 INFO  rust_learn::handlers          > Recipe { id: 0, name: " Coffee", ingredients: "Coffee", instructions: "Make Coffee", shared: true }
 INFO  rust_learn::handlers          > Recipe { id: 1, name: " Tea", ingredients: "Tea, Water", instructions: "Boil Water, add tea", shared: true }
 INFO  rust_learn::handlers          > Recipe { id: 2, name: " Carrot Cake", ingredients: "Carrots, Cake", instructions: "Make Carrot Cake", shared: true }
 INFO  rust_learn::handlers          > Recipe { id: 3, name: " Name", ingredients: "Ingredients", instructions: "Instructions", shared: true }
 INFO  rust_learn::handlers          > Recipe { id: 4, name: " name", ingredients: "recipeIngredients", instructions: "instruction", shared: true }

列出某个对等点 Recipe:

ls r 12D3KooWGEGJQhFaR4ZzJ15CUvMVVu1wcaGd3i7yzvHHYFexbfT7

 INFO  rust_learn::handlers          > Income swarm Event: Behaviour(Floodsub(Message(FloodsubMessage { source: PeerId("12D3KooWGEGJQhFaR4ZzJ15CUvMVVu1wcaGd3i7yzvHHYFexbfT7"), data: [123, 34, 109, 111, 100, 101, 34, ...
 INFO  rust_learn::handlers          > Response from 12D3KooWGEGJQhFaR4ZzJ15CUvMVVu1wcaGd3i7yzvHHYFexbfT7:
 INFO  rust_learn::handlers          > Recipe { id: 0, name: " Coffee", ingredients: "Coffee", instructions: "Make Coffee", shared: true }
 INFO  rust_learn::handlers          > Recipe { id: 1, name: " Tea", ingredients: "Tea, Water", instructions: "Boil Water, add tea", shared: true }
 INFO  rust_learn::handlers          > Recipe { id: 2, name: " Carrot Cake", ingredients: "Carrots, Cake", instructions: "Make Carrot Cake", shared: true }
 INFO  rust_learn::handlers          > Recipe { id: 3, name: " Name", ingredients: "Ingredients", instructions: "Instructions", shared: true }
 INFO  rust_learn::handlers          > Recipe { id: 4, name: " name", ingredients: "recipeIngredients", instructions: "instruction", shared: true }

发布 Recipe:

publish r 5

 INFO  rust_learn::handlers          > Published Recipe with id: 5

再次列出远程节点中的 Recipe:

ls r 12D3KooWGEGJQhFaR4ZzJ15CUvMVVu1wcaGd3i7yzvHHYFexbfT7

 INFO  rust_learn::handlers          > Income swarm Event: Behaviour(Floodsub(Message(FloodsubMessage { source: PeerId("12D3KooWGEGJQhFaR4ZzJ15CUvMVVu1wcaGd3i7yzvHHYFexbfT7"), data: [123, 34, 109, ...
 INFO  rust_learn::handlers          > Response from 12D3KooWGEGJQhFaR4ZzJ15CUvMVVu1wcaGd3i7yzvHHYFexbfT7:
 INFO  rust_learn::handlers          > Recipe { id: 0, name: " Coffee", ingredients: "Coffee", instructions: "Make Coffee", shared: true }
 INFO  rust_learn::handlers          > Recipe { id: 1, name: " Tea", ingredients: "Tea, Water", instructions: "Boil Water, add tea", shared: true }
 INFO  rust_learn::handlers          > Recipe { id: 2, name: " Carrot Cake", ingredients: "Carrots, Cake", instructions: "Make Carrot Cake", shared: true }
 INFO  rust_learn::handlers          > Recipe { id: 3, name: " Name", ingredients: "Ingredients", instructions: "Instructions", shared: true }
 INFO  rust_learn::handlers          > Recipe { id: 4, name: " name", ingredients: "recipeIngredients", instructions: "instruction", shared: true }
 INFO  rust_learn::handlers          > Recipe { id: 5, name: " name", ingredients: "recipe_ingredients", instructions: "recipe_instruction", shared: true }

由于多个节点共享的是同一份数据文件,刚刚我们发布了一个新的 Recipe,所以刚刚发布的 Recipe 也在这里显示了!

<br/>

小结

相信通过上面的讲解,你已经了解了 libp2p 的基本使用;

P2P 技术目前广泛应用在区块链等领域,相信 libp2p 也一定会被越来越多的人使用!

<br/>

附录

源代码:

参考文章:

<br/>