title: Rust中使用libp2p toc: true cover: 'https://img.paulzzh.com/touhou/random?99' date: 2023-12-27 22:12:07 categories: Rust tags: [Rust, P2P, libp2p]
在个人数据日益被侵犯的现在,P2P技术变得越来越重要;
本文通过编写一个实例,讲解了P2P中的一些概念,以及如何使用libp2p来进行点对点应用的开发!
源代码:
<br/>
<!--more--><br/>
<br/>
<br/>
<font color="#f00">**注意:多路复用在后端服务开发中很常见,其中客户端可以与服务器建立底层网络连接,然后通过底层网络连接多路复用不同的流(每个流具有唯一的端口号)**</font>
<br/>
libp2p 是一个由协议、规范和库组成的模块化系统,它支持 P2P 应用程序的开发;
它目前支持三种语言:JS、Go、Rust、JVM(Kotlin编写),未来将支持 Haskell、Python等;
它被许多流行的项目使用,例如:IPFS、Filecoin 和 Polkadot 等;
<font color="#f00">**由于目前 rust-libp2p 每次更新版本时, API 的变化都比较大!**</font>
<font color="#f00">**所以在使用时需要参考 CHANGELOG 来进行代码迁移!**</font>
例如,Swarm 参考:
<br/>
<br/>
P2P Node
PeerId: 12d3k.....
<br/>
公钥和私钥的例子 - 访问传统的服务器
- 如果想连接到数据中心的远程服务器(使用SSH),用户可以生成密钥对并在远程服务器上配置公钥,从而授予用户访问权限。
- 但远程服务器如何知道哪个用户是该公钥的所有者?
- 为了实现这一点,当连接(通过SSH)到远程服务器时,用户必须指定私钥(与存储在服务器上的公钥关联的)。
- 私钥从不发送到远程服务器,但SSH客户端(在本地服务器上运行)使用用户的私钥向远程SSH服务器进行身份验证。
<br/>
peer 的位置是可以到达对方的网络地址。
当 p2p 网络上的节点共享其联系信息时,它们会发送一个保护网络地址和 peer id 的多地址(multiaddress)。
<br/>
Swarm 是 libp2p 中给定 P2P 节点内的网络管理器模块;
它维护从给定节点到远程节点的所有活动和挂起连接,并管理已打开的所有子流的状态;
Swarm 代表了一个低级接口,并提供了对 libp2p 网络的细粒度控制。Swarm 是使用传输、网络行为和节点 peer id 的组合构建的。
传输(Transport)会指明如何在网络上发送字节,而网络行为(Behaviour)会指明发送什么字节,发送给谁。
多个网络行为可以与单个运行节点相关联。
需要注意的是:同一套客户端和服务端代码在 libp2p 网络的所有节点上运行,即每个Node即是客户端又是服务端;
这与客户端和服务器具有不同代码库的 Client-Server 模型不同;
<br/>
mDNS 是由 RFC 6762(datatracker.ietf.org/doc/html/rfc6762)定义的协议,它将主机名解析为 IP 地址。
在 libp2p 中,mDNS 用于发现网络上的其他节点。
在 libp2p 中实现的网络行为 mDNS 将自动发现本地网络上的其他 libp2p 节点。
详见:
<br/>
上面介绍了关于 P2P 和 libp2p 中的一些概念;
最主要的是:Peer、非对称加密、Transport、Behaviour 以及 Swarm 的概念;
下面我们将使用 rust-libp2p 来创建一个点对点的应用;
注意:
<font color="#f00">**由于 [rust-libp2p](https://github.com/libp2p/rust-libp2p/) 的 API 变化较大,这里我们采用的是目前最新的版本:`0.52`;**</font>
<font color="#f00">**该版本要求最低的 Rust 版本为 `1.71.0`,可能需要使用 `rustup update stable` 更新**!</font>
接下来我们会创建一个简单的关于菜谱的应用:
用户可以:
ls p
create r recipe_name|recipe_ingredients|recipe_instruction
publish r recipe_id
ls r
ls r all
ls r 12D...(PeerId)
<br/>
使用 cargo 创建一个项目,增加依赖:
[dependencies]
libp2p = { version = "0.52", features = ["tokio", "floodsub", "noise", "tcp", "yamux", "mdns", "macros", "identify"] }
tokio = { version = "1", features = ["io-util", "io-std", "macros", "rt", "rt-multi-thread", "fs", "time", "sync"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
once_cell = "1.5"
log = "0.4"
pretty_env_logger = "0.4"
anyhow = "1.0.77"
<br/>
前面提到,p2p 中使用非对称加密来确定一个 PeerId,这里我们使用全局变量来定义一个全局唯一的 PeerId:
src/consts.rs
pub const STORAGE_FILE_PATH: &str = "./recipes.json";
/// Key pair enables us to communicate securely with the rest of the network, making sure no one can impersonate
pub static KEYS: Lazy<identity::Keypair> = Lazy::new(identity::Keypair::generate_ed25519);
/// A unique identifier for a specific peer within the whole peer to peer network
///
/// Derive from a key pair to ensure its uniqueness
pub static PEER_ID: Lazy<PeerId> = Lazy::new(|| PeerId::from(KEYS.public()));
/// A Topic is a concept from Floodsub, which is an implementation of libp2p’s pub/sub interface
pub static TOPIC: Lazy<Topic> = Lazy::new(|| Topic::new("recipes"));
说明:
STORAGE_FILE_PATH
:本地菜谱存储路径;KEYS
:非对称加密密钥对;PEER_ID
:通过密钥对生成的 PeerId;TOPIC
:在 Floodsub 模式中,对等节点可以订阅的 Topic,类似于 PubSub 模式;<br/>
接下来定义几个结构体类型;
存储菜谱信息 Recipe:
src/models.rs
/// The recipe data for cook
#[derive(Debug, Serialize, Deserialize)]
pub struct Recipe {
pub id: usize,
pub name: String,
pub ingredients: String,
pub instructions: String,
pub shared: bool,
}
列出菜谱的模式(全部、根据 PeerId):
src/models.rs
/// Fetch data mode
#[derive(Debug, Serialize, Deserialize)]
pub enum ListMode {
/// Fetch from all peers
All,
/// Fetch from one specific peer
One(String),
}
列出菜谱的请求、响应:
src/models.rs
#[derive(Debug, Serialize, Deserialize)]
pub struct ListRequest {
pub mode: ListMode,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ListResponse {
pub mode: ListMode,
pub data: Vec<Recipe>,
pub receiver: String,
}
Tokio select 事件枚举(后面会用到):
src/models.rs
pub enum EventType {
Response(ListResponse),
Input(String),
}
<br/>
前面介绍了 Swarm 的概念,Swarm 是一个管理器,管理所有节点的活动、挂起、连接等等;
在创建一个 Swarm 时大概要配置以下几个内容:
下面就创建了一个 Swarm:
src/main.rs
let mut swarm = libp2p::SwarmBuilder::with_existing_identity(KEYS.clone())
.with_tokio()
.with_tcp(
tcp::Config::default(),
noise::Config::new,
yamux::Config::default,
)?
.with_behaviour(|_key| RecipeBehaviour {
flood_sub: Floodsub::new(*PEER_ID),
mdns: mdns::tokio::Behaviour::new(mdns::Config::default(), KEYS.public().to_peer_id())
.expect("can create mdns"),
})?
.with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(5)))
.build();
创建 Swarm 的部分:
with_existing_identity
:指定了密钥对;with_tokio
:指定了异步运行时为 tokio;with_tcp
:使用 TCP 为 Transport;tcp_config: libp2p_tcp::Config
:TCP 配置,例如连接超时时间等;security_upgrade: SecUpgrade
:TCP 安全连接配置,这里使用 noise 握手,即使用密钥对的方式;multiplexer_upgrade: MuxUpgrade
:连接多路复用配置,这里使用 yamux;with_behaviour
:该 Swarm 网络行为定义,这里为自定义的 RecipeBehaviour,后文会介绍;with_swarm_config:Swarm
自身配置,这里作为实验,配置了闲置超时时间为 5s;通过 listen_on
即可启动监听:
src/main.rs
Swarm::listen_on(
&mut swarm,
"/ip4/0.0.0.0/tcp/0"
.parse()
.expect("can get a local socket"),
)
.expect("swarm can be started");
监听 IPV4 下任意 IP 来源(0.0.0.0
)的连接,并且监听端口由系统分配(端口号为0);
此外,这里我们还启动了 Swarm 中定义的 FloodSub 对我们 Topic 的监听:
src/main.rs
swarm.behaviour_mut().flood_sub.subscribe(TOPIC.clone());
<br/>
在上一小节创建 Swarm 时,我们使用了 RecipeBehaviour 我们自定义的行为;
下面我们来看是如何定义的:
src/behaviour.rs
#[derive(NetworkBehaviour)]
#[behaviour(to_swarm = "RecipeBehaviourEvent")]
pub struct RecipeBehaviour {
pub(crate) flood_sub: Floodsub,
pub(crate) mdns: mdns::tokio::Behaviour,
}
#[derive(Debug)]
pub enum RecipeBehaviourEvent {
Floodsub(FloodsubEvent),
Mdns(mdns::Event),
}
impl From<FloodsubEvent> for RecipeBehaviourEvent {
fn from(event: FloodsubEvent) -> RecipeBehaviourEvent {
RecipeBehaviourEvent::Floodsub(event)
}
}
impl From<mdns::Event> for RecipeBehaviourEvent {
fn from(event: mdns::Event) -> RecipeBehaviourEvent {
RecipeBehaviourEvent::Mdns(event)
}
}
首先是 RecipeBehaviour 结构体,其中包括了 FloodSub 和 mDNS 两个 Behaviour;
分别处理了:消息广播和节点自动发现;
<font color="#f00">**需要注意的是:RecipeBehaviour 结构体使用 `#[derive(NetworkBehaviour)]` 标识;**</font>
<font color="#f00">**此宏会自动实现部分逻辑,同时要求被标注的结构体只能含有实现了 NetworkBehaviour 的属性!**</font>
<font color="#f00">**注意:之前可以在 NetworkBehaviour 结构体中使用 `#[behaviour(ignore)]` 的方式被废弃!**</font>
见:
<font color="#f00">**同时,BehaviourEvent 的定义被抽取到了枚举中,通过 `#[behaviour(to_swarm = "RecipeBehaviourEvent")]` 来声明;**</font>
<font color="#f00">**后面可以看到,我们通过将 RecipeBehaviour 中定义的行为产生的事件统一使用 From Trait 转换为RecipeBehaviourEvent 来统一处理;**</font>
<font color="#f00">**这也是最新 rust-libp2p 官方推荐的方式!**</font>
<br/>
在前面我们定义了我们应用允许用户操作的一些行为,用户可以:
ls p
create r recipe_name|recipe_ingredients|recipe_instruction
publish r recipe_id
ls r
ls r all
ls r 12D...(PeerId)
下面我们来简单实现:
src/main.rs
let (response_sender, mut response_rcv) = mpsc::unbounded_channel();
let mut stdin = tokio::io::BufReader::new(tokio::io::stdin()).lines();
loop {
let evt = {
tokio::select! {
line = stdin.next_line() => Some(EventType::Input(line.expect("can get line").expect("can read line from stdin"))),
response = response_rcv.recv() => Some(EventType::Response(response.expect("response exists"))),
_ = handle_swarm_event(response_sender.clone(), &mut swarm) => None,
}
};
if let Some(event) = evt {
match event {
EventType::Response(resp) => {
let json = serde_json::to_string(&resp).expect("can jsonify response");
swarm
.behaviour_mut()
.flood_sub
.publish(TOPIC.clone(), json.as_bytes());
}
EventType::Input(line) => match line.as_str() {
"ls p" => handle_list_peers(&mut swarm).await,
cmd if cmd.starts_with("create r") => handle_create_recipe(cmd).await,
cmd if cmd.starts_with("publish r") => handle_publish_recipe(cmd).await,
cmd if cmd.starts_with("ls r") => handle_list_recipes(cmd, &mut swarm).await,
_ => error!("unknown command: {:?}", line),
},
}
}
}
为了简单实现,我们使用 StdIn 作为输入;
在 loop 无限循环中,我们使用 tokio::select!
来等待准备完成的事件:
line
:用户完成了一行数据;response
:列表数据请求的响应;上面的 select 最终会返回我们之前定义的 EventType 类型:
pub enum EventType {
Input(String),
Response(ListResponse),
}
对于 Input 的处理比较简单,直接对应我们之前对于用户的定义处理即可:
EventType::Input(line) => match line.as_str() {
"ls p" => handle_list_peers(&mut swarm).await,
cmd if cmd.starts_with("create r") => handle_create_recipe(cmd).await,
cmd if cmd.starts_with("publish r") => handle_publish_recipe(cmd).await,
cmd if cmd.starts_with("ls r") => handle_list_recipes(cmd, &mut swarm).await,
_ => error!("unknown command: {:?}", line),
},
对于 Response 则是将结果广播给其他所有 Node:
EventType::Response(resp) => {
let json = serde_json::to_string(&resp).expect("can jsonify response");
swarm
.behaviour_mut()
.flood_sub
.publish(TOPIC.clone(), json.as_bytes());
}
下面来看处理用户输入的逻辑;
<br/>
使用 ls p
列出所有目前已连接的对等点:
src/handlers.rs
pub async fn handle_list_peers(swarm: &mut Swarm<RecipeBehaviour>) {
info!("Discovered Peers:");
let nodes = swarm.behaviour().mdns.discovered_nodes();
let mut unique_peers = HashSet::new();
for peer in nodes {
unique_peers.insert(peer);
}
unique_peers.iter().for_each(|p| info!("{}", p));
}
通过 swarm 可以获取到 mdns 中自动发现的节点;
逻辑非常简单,这里不再赘述;
<br/>
使用 create r recipe_name|recipe_ingredients|recipe_instruction
创建一个新的 Recipe;
创建Recipe 只涉及到本地数据,不涉及 p2p 部分,逻辑比较简单:
src/handlers.rs
pub async fn handle_create_recipe(cmd: &str) {
if let Some(rest) = cmd.strip_prefix("create r") {
let elements: Vec<&str> = rest.split('|').collect();
if elements.len() < 3 {
info!("too few arguments - Format: name|ingredients|instructions");
} else {
let name = elements.first().expect("name is there");
let ingredients = elements.get(1).expect("ingredients is there");
let instructions = elements.get(2).expect("instructions is there");
if let Err(e) = create_new_recipe(name, ingredients, instructions).await {
error!("error creating recipe: {}", e);
};
}
}
}
async fn create_new_recipe(name: &str, ingredients: &str, instructions: &str) -> Result<()> {
let mut local_recipes = read_local_recipes().await?;
let new_id = match local_recipes.iter().max_by_key(|r| r.id) {
Some(v) => v.id + 1,
None => 0,
};
local_recipes.push(Recipe {
id: new_id,
name: name.to_owned(),
ingredients: ingredients.to_owned(),
instructions: instructions.to_owned(),
shared: false,
});
write_local_recipes(&local_recipes).await?;
info!("Created recipe:");
info!("Name: {}", name);
info!("Ingredients: {}", ingredients);
info!("Instructions:: {}", instructions);
Ok(())
}
async fn write_local_recipes(recipes: &Vec<Recipe>) -> Result<()> {
let json = serde_json::to_string(&recipes)?;
fs::write(STORAGE_FILE_PATH, &json).await?;
Ok(())
}
async fn read_local_recipes() -> Result<Vec<Recipe>> {
let content = fs::read(STORAGE_FILE_PATH).await?;
let result = serde_json::from_slice(&content)?;
Ok(result)
}
逻辑比较简单,就是解析命令行传入的参数,创建结构体,先加载数据文件的数据,然后添加一条再写回文件;
频繁读写文件的效率非常的低,这里只是作为例子演示简单实现!
<br/>
发布菜谱来对他人分享:publish r recipe_id
发布 Recipe 的逻辑也非常简单:
src/handlers.rs
pub async fn handle_publish_recipe(cmd: &str) {
if let Some(rest) = cmd.strip_prefix("publish r") {
match rest.trim().parse::<usize>() {
Ok(id) => {
if let Err(e) = publish_recipe(id).await {
info!("error publishing recipe with id {}, {}", id, e)
} else {
info!("Published Recipe with id: {}", id);
}
}
Err(e) => error!("invalid id: {}, {}", rest.trim(), e),
};
}
}
async fn publish_recipe(id: usize) -> Result<()> {
let mut local_recipes = read_local_recipes().await?;
local_recipes
.iter_mut()
.filter(|r| r.id == id)
.for_each(|r| r.shared = true);
write_local_recipes(&local_recipes).await?;
Ok(())
}
首先解析命令行传入的Recipe Id,然后修改文件中对应 Id 的 shared 字段,改为 true;
和创建Recipe类似,这里也会频繁读写文件,效率较低,仅作为展示例子;
<br/>
查询菜谱比较复杂,可以通过下面三种形式:
ls r
ls r all
ls r 12D...(PeerId)
下面来看实现:
src/handlers.rs
pub async fn handle_list_recipes(cmd: &str, swarm: &mut Swarm<RecipeBehaviour>) {
let rest = cmd.strip_prefix("ls r ");
match rest {
Some("all") => {
let req = ListRequest {
mode: ListMode::All,
};
let json = serde_json::to_string(&req).expect("can jsonify request");
swarm
.behaviour_mut()
.flood_sub
.publish(TOPIC.clone(), json.as_bytes());
}
Some(recipes_peer_id) => {
let req = ListRequest {
mode: ListMode::One(recipes_peer_id.to_owned()),
};
let json = serde_json::to_string(&req).expect("can jsonify request");
swarm
.behaviour_mut()
.flood_sub
.publish(TOPIC.clone(), json.as_bytes());
}
None => {
match read_local_recipes().await {
Ok(v) => {
info!("Local Recipes ({})", v.len());
v.iter().for_each(|r| info!("{:?}", r));
}
Err(e) => error!("error fetching local recipes: {}", e),
};
}
};
}
首先,解析命令行参数:
ListMode::All
请求;recipes_peer_id
:则广播 ListMode::One(recipes_peer_id.to_owned())
请求;<br/>
前面我们通过 FloodSub 向对等网中的其他节点发送了请求,那么如何响应消息呢?
这就是 Swarm 自定义逻辑中的核心部分了!
注意到,在前面的 处理输入请求 一小节,我们在 tokio::select!
中定义了:
tokio::select! {
...
_ = handle_swarm_event(response_sender.clone(), &mut swarm) => None,
}
我们正是在这个函数中处理的 BehaviourEvent 逻辑,下面来看:
src/handlers.rs
pub async fn handle_swarm_event(
response_sender: mpsc::UnboundedSender<ListResponse>,
swarm: &mut Swarm<RecipeBehaviour>,
) {
let event = swarm.select_next_some().await;
info!("Income swarm Event: {:?}", event);
match event {
SwarmEvent::Behaviour(recipe_behaviours) => match recipe_behaviours {
RecipeBehaviourEvent::Floodsub(flood_sub_event) => match flood_sub_event {
FloodsubEvent::Message(msg) => {
if let Ok(resp) = serde_json::from_slice::<ListResponse>(&msg.data) {
if resp.receiver == PEER_ID.to_string() {
info!("Response from {}:", msg.source);
resp.data.iter().for_each(|r| info!("{:?}", r));
}
} else if let Ok(req) = serde_json::from_slice::<ListRequest>(&msg.data) {
match req.mode {
ListMode::All => {
info!("Received ALL req: {:?} from {:?}", req, msg.source);
respond_with_public_recipes(
response_sender.clone(),
msg.source.to_string(),
);
}
ListMode::One(ref peer_id) => {
if peer_id == &PEER_ID.to_string() {
info!("Received req: {:?} from {:?}", req, msg.source);
respond_with_public_recipes(
response_sender.clone(),
msg.source.to_string(),
);
}
}
}
}
}
FloodsubEvent::Subscribed { .. } => {}
FloodsubEvent::Unsubscribed { .. } => {}
},
RecipeBehaviourEvent::Mdns(mdns_event) => match mdns_event {
Event::Discovered(discovered_list) => {
let behavior_mut = swarm.behaviour_mut();
for (peer, _addr) in discovered_list {
behavior_mut.flood_sub.add_node_to_partial_view(peer);
}
}
Event::Expired(expired_list) => {
let behavior_mut = swarm.behaviour_mut();
for (peer, _addr) in expired_list {
if !behavior_mut.mdns.has_node(&peer) {
behavior_mut.flood_sub.remove_node_from_partial_view(&peer);
}
}
}
},
},
SwarmEvent::ConnectionEstablished {
peer_id,
connection_id,
endpoint,
num_established,
..
} => {
debug!("[Connection established] peer_id: {}, connection_id: {}, endpoint: {:?}, num_established: {:?}", peer_id, connection_id, endpoint, num_established);
}
SwarmEvent::ConnectionClosed {
peer_id,
connection_id,
endpoint,
num_established,
..
} => {
debug!("[Connection closed] peer_id: {}, connection_id: {}, endpoint: {:?}, num_established: {:?}", peer_id, connection_id, endpoint, num_established);
}
SwarmEvent::IncomingConnection { .. } => {}
SwarmEvent::IncomingConnectionError { .. } => {}
SwarmEvent::OutgoingConnectionError { .. } => {}
SwarmEvent::NewListenAddr { .. } => {}
SwarmEvent::ExpiredListenAddr { .. } => {}
SwarmEvent::ListenerClosed { .. } => {}
SwarmEvent::ListenerError { .. } => {}
SwarmEvent::Dialing { .. } => {}
};
}
可以看到,我们正是通过 let event = swarm.select_next_some().await
来获取的 Swarm 中所有的 Event 事件!
随后我们即可枚举全部的 Event 事件:
SwarmEvent::Behaviour
:用户自定义的事件,也就是上面我们通过 to_swarm
宏指定的 Enum 类型!SwarmEvent::ConnectionEstablished
:连接建立事件;SwarmEvent::ConnectionClosed
:连接断开事件;可以看到,得益于 Rust 强大的枚举类型,我们可以非常清晰的处理各种事件!
<br/>
处理 FloodSubEvent 的逻辑如下:
RecipeBehaviourEvent::Floodsub(flood_sub_event) => match flood_sub_event {
FloodsubEvent::Message(msg) => {
if let Ok(resp) = serde_json::from_slice::<ListResponse>(&msg.data) {
if resp.receiver == PEER_ID.to_string() {
info!("Response from {}:", msg.source);
resp.data.iter().for_each(|r| info!("{:?}", r));
}
} else if let Ok(req) = serde_json::from_slice::<ListRequest>(&msg.data) {
match req.mode {
ListMode::All => {
info!("Received ALL req: {:?} from {:?}", req, msg.source);
respond_with_public_recipes(
response_sender.clone(),
msg.source.to_string(),
);
}
ListMode::One(ref peer_id) => {
if peer_id == &PEER_ID.to_string() {
info!("Received req: {:?} from {:?}", req, msg.source);
respond_with_public_recipes(
response_sender.clone(),
msg.source.to_string(),
);
}
}
}
}
}
FloodsubEvent::Subscribed { .. } => {}
FloodsubEvent::Unsubscribed { .. } => {}
},
首先我们判断:
ListMode::All
查询模式,则读取本机数据,并返回;ListMode::One(ref peer_id)
,则判断 PeerId 为本机才返回;respond_with_public_recipes
的逻辑如下:
fn respond_with_public_recipes(sender: mpsc::UnboundedSender<ListResponse>, receiver: String) {
tokio::spawn(async move {
match read_local_recipes().await {
Ok(recipes) => {
let resp = ListResponse {
mode: ListMode::All,
receiver,
data: recipes.into_iter().filter(|r| r.shared).collect(),
};
if let Err(e) = sender.send(resp) {
error!("error sending response via channel, {}", e);
}
}
Err(e) => error!("error fetching local recipes to answer ALL request, {}", e),
}
});
}
读取本地 Recipe 数据,并通过 sender 发送即可!
在 tokio::select!
中会接收响应,并广播出去:
tokio::select! {
response = response_rcv.recv() => Some(EventType::Response(response.expect("response exists"))),
}
EventType::Response(resp) => {
let json = serde_json::to_string(&resp).expect("can jsonify response");
swarm
.behaviour_mut()
.flood_sub
.publish(TOPIC.clone(), json.as_bytes());
}
<br/>
mDNS 包括了两个事件:
#[derive(Debug, Clone)]
pub enum Event {
/// Discovered nodes through mDNS.
Discovered(Vec<(PeerId, Multiaddr)>),
/// The given combinations of `PeerId` and `Multiaddr` have expired.
///
/// Each discovered record has a time-to-live. When this TTL expires and the address hasn't
/// been refreshed, we remove it from the list and emit it as an `Expired` event.
Expired(Vec<(PeerId, Multiaddr)>),
}
分别为:
Discovered(Vec<(PeerId, Multiaddr)>)
:已发现的节点;Expired(Vec<(PeerId, Multiaddr)>)
:新节点过期;处理逻辑如下:
src/handlers.rs
RecipeBehaviourEvent::Mdns(mdns_event) => match mdns_event {
Event::Discovered(discovered_list) => {
let behavior_mut = swarm.behaviour_mut();
for (peer, _addr) in discovered_list {
behavior_mut.flood_sub.add_node_to_partial_view(peer);
}
}
Event::Expired(expired_list) => {
let behavior_mut = swarm.behaviour_mut();
for (peer, _addr) in expired_list {
if !behavior_mut.mdns.has_node(&peer) {
behavior_mut.flood_sub.remove_node_from_partial_view(&peer);
}
}
}
},
即:
逻辑非常简单;
至此,我们的应用开发完毕!
<br/>
我们可以通过 cargo run
在本机创建多个节点;
cargo run
INFO rust_learn > Peer Id: 12D3KooWA7xhiEmFxikn9aiWcffkhDACDhz1rRPXxkC4yxgnzJCT
INFO libp2p_mdns::behaviour::iface > creating instance on iface 192.168.31.22
INFO rust_learn::handlers > Income swarm Event: NewListenAddr { listener_id: ListenerId(1), address: "/ip4/127.0.0.1/tcp/65248" }
INFO rust_learn::handlers > Income swarm Event: NewListenAddr { listener_id: ListenerId(1), address: "/ip4/192.168.31.22/tcp/65248" }
INFO libp2p_mdns::behaviour > discovered: 12D3KooWGEGJQhFaR4ZzJ15CUvMVVu1wcaGd3i7yzvHHYFexbfT7 /ip4/192.168.31.22/tcp/65247
INFO rust_learn::handlers > Income swarm Event: Behaviour(Mdns(Discovered([(PeerId("12D3KooWGEGJQhFaR4ZzJ15CUvMVVu1wcaGd3i7yzvHHYFexbfT7"), "/ip4/192.168.31.22/tcp/65247")])))
INFO rust_learn::handlers > Income swarm Event: Dialing { peer_id: Some(PeerId("12D3KooWGEGJQhFaR4ZzJ15CUvMVVu1wcaGd3i7yzvHHYFexbfT7")), connection_id: ConnectionId(1) }
INFO rust_learn::handlers > Income swarm Event: ConnectionEstablished { peer_id: PeerId("12D3KooWGEGJQhFaR4ZzJ15CUvMVVu1wcaGd3i7yzvHHYFexbfT7"), connection_id: ConnectionId(1), endpoint: Dialer { address: "/ip4/192.168.31.22/tcp/65247/p2p/12D3KooWGEGJQhFaR4ZzJ15CUvMVVu1wcaGd3i7yzvHHYFexbfT7", role_override: Dialer }, num_established: 1, concurrent_dial_errors: Some([]), established_in: 7.355625ms }
INFO rust_learn::handlers > Income swarm Event: Behaviour(Floodsub(Subscribed { peer_id: PeerId("12D3KooWGEGJQhFaR4ZzJ15CUvMVVu1wcaGd3i7yzvHHYFexbfT7"), topic: Topic("recipes") }))
INFO libp2p_mdns::behaviour > discovered: 12D3KooWCWesVZsAoDaFs7UZYXV6gTNd56UPMoiWfxWFgvLCJZhz /ip4/192.168.31.22/tcp/65250
INFO rust_learn::handlers > Income swarm Event: Behaviour(Mdns(Discovered([(PeerId("12D3KooWCWesVZsAoDaFs7UZYXV6gTNd56UPMoiWfxWFgvLCJZhz"), "/ip4/192.168.31.22/tcp/65250")])))
INFO rust_learn::handlers > Income swarm Event: Dialing { peer_id: Some(PeerId("12D3KooWCWesVZsAoDaFs7UZYXV6gTNd56UPMoiWfxWFgvLCJZhz")), connection_id: ConnectionId(2) }
INFO rust_learn::handlers > Income swarm Event: IncomingConnection { connection_id: ConnectionId(3), local_addr: "/ip4/192.168.31.22/tcp/65248", send_back_addr: "/ip4/192.168.31.22/tcp/65253" }
INFO rust_learn::handlers > Income swarm Event: ConnectionEstablished { peer_id: PeerId("12D3KooWCWesVZsAoDaFs7UZYXV6gTNd56UPMoiWfxWFgvLCJZhz"), connection_id: ConnectionId(2), endpoint: Dialer { address: "/ip4/192.168.31.22/tcp/65250/p2p/12D3KooWCWesVZsAoDaFs7UZYXV6gTNd56UPMoiWfxWFgvLCJZhz", role_override: Dialer }, num_established: 1, concurrent_dial_errors: Some([]), established_in: 5.762334ms }
INFO rust_learn::handlers > Income swarm Event: ConnectionEstablished { peer_id: PeerId("12D3KooWCWesVZsAoDaFs7UZYXV6gTNd56UPMoiWfxWFgvLCJZhz"), connection_id: ConnectionId(3), endpoint: Listener { local_addr: "/ip4/192.168.31.22/tcp/65248", send_back_addr: "/ip4/192.168.31.22/tcp/65253" }, num_established: 2, concurrent_dial_errors: None, established_in: 5.212125ms }
INFO rust_learn::handlers > Income swarm Event: Behaviour(Floodsub(Subscribed { peer_id: PeerId("12D3KooWCWesVZsAoDaFs7UZYXV6gTNd56UPMoiWfxWFgvLCJZhz"), topic: Topic("recipes") }))
查看所有对等节点:
ls p
INFO rust_learn::handlers > Discovered Peers:
INFO rust_learn::handlers > 12D3KooWGEGJQhFaR4ZzJ15CUvMVVu1wcaGd3i7yzvHHYFexbfT7
INFO rust_learn::handlers > 12D3KooWCWesVZsAoDaFs7UZYXV6gTNd56UPMoiWfxWFgvLCJZhz
创建 Recipe:
create r name|recipe_ingredients|recipe_instruction
INFO rust_learn::handlers > Created recipe:
INFO rust_learn::handlers > Name: name
INFO rust_learn::handlers > Ingredients: recipe_ingredients
INFO rust_learn::handlers > Instructions:: recipe_instruction
列出本地 Recipe:
ls r
INFO rust_learn::handlers > Local Recipes (6)
INFO rust_learn::handlers > Recipe { id: 0, name: " Coffee", ingredients: "Coffee", instructions: "Make Coffee", shared: true }
INFO rust_learn::handlers > Recipe { id: 1, name: " Tea", ingredients: "Tea, Water", instructions: "Boil Water, add tea", shared: true }
INFO rust_learn::handlers > Recipe { id: 2, name: " Carrot Cake", ingredients: "Carrots, Cake", instructions: "Make Carrot Cake", shared: true }
INFO rust_learn::handlers > Recipe { id: 3, name: " Name", ingredients: "Ingredients", instructions: "Instructions", shared: true }
INFO rust_learn::handlers > Recipe { id: 4, name: " name", ingredients: "recipeIngredients", instructions: "instruction", shared: true }
INFO rust_learn::handlers > Recipe { id: 5, name: " name", ingredients: "recipe_ingredients", instructions: "recipe_instruction", shared: false }
列出所有对等点 Recipe:
ls r all
INFO rust_learn::handlers > Income swarm Event: Behaviour(Floodsub(Message(FloodsubMessage { source: PeerId("12D3KooWGEGJQhFaR4ZzJ15CUvMVVu1wcaGd3i7yzvHHYFexbfT7"), data: [123, 34, 109, 111,...
INFO rust_learn::handlers > Response from 12D3KooWGEGJQhFaR4ZzJ15CUvMVVu1wcaGd3i7yzvHHYFexbfT7:
INFO rust_learn::handlers > Recipe { id: 0, name: " Coffee", ingredients: "Coffee", instructions: "Make Coffee", shared: true }
INFO rust_learn::handlers > Recipe { id: 1, name: " Tea", ingredients: "Tea, Water", instructions: "Boil Water, add tea", shared: true }
INFO rust_learn::handlers > Recipe { id: 2, name: " Carrot Cake", ingredients: "Carrots, Cake", instructions: "Make Carrot Cake", shared: true }
INFO rust_learn::handlers > Recipe { id: 3, name: " Name", ingredients: "Ingredients", instructions: "Instructions", shared: true }
INFO rust_learn::handlers > Recipe { id: 4, name: " name", ingredients: "recipeIngredients", instructions: "instruction", shared: true }
INFO rust_learn::handlers > Income swarm Event: Behaviour(Floodsub(Message(FloodsubMessage { source: PeerId("12D3KooWCWesVZsAoDaFs7UZYXV6gTNd56UPMoiWfxWFgvLCJZhz"), data: [123, 34, 109....
INFO rust_learn::handlers > Response from 12D3KooWCWesVZsAoDaFs7UZYXV6gTNd56UPMoiWfxWFgvLCJZhz:
INFO rust_learn::handlers > Recipe { id: 0, name: " Coffee", ingredients: "Coffee", instructions: "Make Coffee", shared: true }
INFO rust_learn::handlers > Recipe { id: 1, name: " Tea", ingredients: "Tea, Water", instructions: "Boil Water, add tea", shared: true }
INFO rust_learn::handlers > Recipe { id: 2, name: " Carrot Cake", ingredients: "Carrots, Cake", instructions: "Make Carrot Cake", shared: true }
INFO rust_learn::handlers > Recipe { id: 3, name: " Name", ingredients: "Ingredients", instructions: "Instructions", shared: true }
INFO rust_learn::handlers > Recipe { id: 4, name: " name", ingredients: "recipeIngredients", instructions: "instruction", shared: true }
列出某个对等点 Recipe:
ls r 12D3KooWGEGJQhFaR4ZzJ15CUvMVVu1wcaGd3i7yzvHHYFexbfT7
INFO rust_learn::handlers > Income swarm Event: Behaviour(Floodsub(Message(FloodsubMessage { source: PeerId("12D3KooWGEGJQhFaR4ZzJ15CUvMVVu1wcaGd3i7yzvHHYFexbfT7"), data: [123, 34, 109, 111, 100, 101, 34, ...
INFO rust_learn::handlers > Response from 12D3KooWGEGJQhFaR4ZzJ15CUvMVVu1wcaGd3i7yzvHHYFexbfT7:
INFO rust_learn::handlers > Recipe { id: 0, name: " Coffee", ingredients: "Coffee", instructions: "Make Coffee", shared: true }
INFO rust_learn::handlers > Recipe { id: 1, name: " Tea", ingredients: "Tea, Water", instructions: "Boil Water, add tea", shared: true }
INFO rust_learn::handlers > Recipe { id: 2, name: " Carrot Cake", ingredients: "Carrots, Cake", instructions: "Make Carrot Cake", shared: true }
INFO rust_learn::handlers > Recipe { id: 3, name: " Name", ingredients: "Ingredients", instructions: "Instructions", shared: true }
INFO rust_learn::handlers > Recipe { id: 4, name: " name", ingredients: "recipeIngredients", instructions: "instruction", shared: true }
发布 Recipe:
publish r 5
INFO rust_learn::handlers > Published Recipe with id: 5
再次列出远程节点中的 Recipe:
ls r 12D3KooWGEGJQhFaR4ZzJ15CUvMVVu1wcaGd3i7yzvHHYFexbfT7
INFO rust_learn::handlers > Income swarm Event: Behaviour(Floodsub(Message(FloodsubMessage { source: PeerId("12D3KooWGEGJQhFaR4ZzJ15CUvMVVu1wcaGd3i7yzvHHYFexbfT7"), data: [123, 34, 109, ...
INFO rust_learn::handlers > Response from 12D3KooWGEGJQhFaR4ZzJ15CUvMVVu1wcaGd3i7yzvHHYFexbfT7:
INFO rust_learn::handlers > Recipe { id: 0, name: " Coffee", ingredients: "Coffee", instructions: "Make Coffee", shared: true }
INFO rust_learn::handlers > Recipe { id: 1, name: " Tea", ingredients: "Tea, Water", instructions: "Boil Water, add tea", shared: true }
INFO rust_learn::handlers > Recipe { id: 2, name: " Carrot Cake", ingredients: "Carrots, Cake", instructions: "Make Carrot Cake", shared: true }
INFO rust_learn::handlers > Recipe { id: 3, name: " Name", ingredients: "Ingredients", instructions: "Instructions", shared: true }
INFO rust_learn::handlers > Recipe { id: 4, name: " name", ingredients: "recipeIngredients", instructions: "instruction", shared: true }
INFO rust_learn::handlers > Recipe { id: 5, name: " name", ingredients: "recipe_ingredients", instructions: "recipe_instruction", shared: true }
由于多个节点共享的是同一份数据文件,刚刚我们发布了一个新的 Recipe,所以刚刚发布的 Recipe 也在这里显示了!
<br/>
相信通过上面的讲解,你已经了解了 libp2p 的基本使用;
P2P 技术目前广泛应用在区块链等领域,相信 libp2p 也一定会被越来越多的人使用!
<br/>
源代码:
参考文章:
<br/>