Skip to content

Commit

Permalink
Download from remote nodes directly + announce external swarm address (
Browse files Browse the repository at this point in the history
  • Loading branch information
justprosh authored Jul 16, 2021
1 parent 2e64ed5 commit 65db814
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 43 deletions.
6 changes: 4 additions & 2 deletions aqua/ipfs.aqua
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
data IpfsGetFromResult:
data IpfsGetResult:
success: bool
error: string
path: string
Expand All @@ -18,9 +18,11 @@ data IpfsResult:
error: string

service Ipfs("ipfs-adapter"):
connect(multiaddr: string) -> IpfsResult
get(hash: string) -> IpfsGetResult
get_external_api_multiaddr() -> IpfsMultiaddrResult
get_external_swarm_multiaddr() -> IpfsMultiaddrResult
get_from(hash: string, swarm_multiaddr: string) -> IpfsGetFromResult
get_from(hash: string, external_multiaddr: string) -> IpfsGetResult
get_local_api_multiaddr() -> IpfsMultiaddrResult
put(file_path: string) -> IpfsPutResult
set_external_api_multiaddr(multiaddr: string) -> IpfsResult
Expand Down
26 changes: 13 additions & 13 deletions service/effector/src/effector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,32 +43,32 @@ fn unwrap_mounted_binary_result(result: MountedBinaryResult) -> Result<String> {
#[inline]
fn get_timeout_string(timeout: u64) -> String { format!("{}s", timeout) }

fn make_cmd_args(args: Vec<String>, local_multiaddr: String, timeout_sec: u64) -> Vec<String> {
fn make_cmd_args(args: Vec<String>, api_multiaddr: String, timeout_sec: u64) -> Vec<String> {
args.into_iter().chain(
vec![
String::from("--timeout"),
get_timeout_string(timeout_sec),
String::from("--api"),
local_multiaddr
api_multiaddr
]).collect()
}

#[marine]
pub fn connect(multiaddr: String, local_multiaddr: String, timeout_sec: u64) -> IpfsResult {
pub fn connect(multiaddr: String, api_multiaddr: String, timeout_sec: u64) -> IpfsResult {
log::info!("connect called with multiaddr {}", multiaddr);

let args = vec![
String::from("swarm"),
String::from("connect"),
multiaddr];
let cmd = make_cmd_args(args, local_multiaddr, timeout_sec);
let cmd = make_cmd_args(args, api_multiaddr, timeout_sec);

unwrap_mounted_binary_result(ipfs(cmd)).map(|_| ()).into()
}

/// Put file from specified path to IPFS and return its hash.
#[marine]
pub fn put(file_path: String, local_multiaddr: String, timeout_sec: u64) -> IpfsPutResult {
pub fn put(file_path: String, api_multiaddr: String, timeout_sec: u64) -> IpfsPutResult {
log::info!("put called with file path {}", file_path);

if !std::path::Path::new(&file_path).exists() {
Expand All @@ -80,7 +80,7 @@ pub fn put(file_path: String, local_multiaddr: String, timeout_sec: u64) -> Ipfs
String::from("-Q"),
inject_vault_host_path(file_path)
];
let cmd = make_cmd_args(args, local_multiaddr, timeout_sec);
let cmd = make_cmd_args(args, api_multiaddr, timeout_sec);

log::info!("ipfs put args {:?}", cmd);

Expand All @@ -89,7 +89,7 @@ pub fn put(file_path: String, local_multiaddr: String, timeout_sec: u64) -> Ipfs

/// Get file by provided hash from IPFS, saves it to a temporary file and returns a path to it.
#[marine]
pub fn get(hash: String, file_path: String, local_multiaddr: String, timeout_sec: u64) -> IpfsResult {
pub fn get(hash: String, file_path: String, api_multiaddr: String, timeout_sec: u64) -> IpfsResult {
log::info!("get called with hash {}", hash);

let args = vec![
Expand All @@ -98,7 +98,7 @@ pub fn get(hash: String, file_path: String, local_multiaddr: String, timeout_sec
inject_vault_host_path(file_path),
hash,
];
let cmd = make_cmd_args(args, local_multiaddr, timeout_sec);
let cmd = make_cmd_args(args, api_multiaddr, timeout_sec);

log::info!("ipfs get args {:?}", cmd);

Expand All @@ -108,9 +108,9 @@ pub fn get(hash: String, file_path: String, local_multiaddr: String, timeout_sec
}

#[marine]
pub fn get_peer_id(local_multiaddr: String, timeout_sec: u64) -> IpfsGetPeerIdResult {
pub fn get_peer_id(api_multiaddr: String, timeout_sec: u64) -> IpfsGetPeerIdResult {
let result: Result<String> = try {
let cmd = make_cmd_args(vec![String::from("id")], local_multiaddr, timeout_sec);
let cmd = make_cmd_args(vec![String::from("id")], api_multiaddr, timeout_sec);

let result = unwrap_mounted_binary_result(ipfs(cmd))?;
let result: serde_json::Value = serde_json::from_str(&result).wrap_err("ipfs response parsing failed")?;
Expand All @@ -121,16 +121,16 @@ pub fn get_peer_id(local_multiaddr: String, timeout_sec: u64) -> IpfsGetPeerIdRe
}

#[marine]
pub fn set_external_api_multiaddr(multiaddr: String, local_multiaddr: String, timeout_sec: u64) -> IpfsResult {
pub fn set_external_swarm_multiaddr(swarm_multiaddr: String, api_multiaddr: String, timeout_sec: u64) -> IpfsResult {
let result: Result<()> = try {
let multiaddr = Multiaddr::from_str(&multiaddr).wrap_err(format!("invalid multiaddr {}", multiaddr))?;
let multiaddr = Multiaddr::from_str(&swarm_multiaddr).wrap_err(format!("invalid multiaddr {}", swarm_multiaddr))?;
let args = vec![
String::from("config"),
String::from("Addresses.Announce"),
format!(r#"["{}"]"#, multiaddr.to_string()),
String::from("--json"),
];
let cmd = make_cmd_args(args, local_multiaddr, timeout_sec);
let cmd = make_cmd_args(args, api_multiaddr, timeout_sec);

unwrap_mounted_binary_result(ipfs(cmd)).map(|_| ())?
};
Expand Down
90 changes: 65 additions & 25 deletions service/pure/src/pure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

#![allow(improper_ctypes)]

use types::{IpfsResult, IpfsGetFromResult, IpfsPutResult, IpfsGetPeerIdResult, IpfsMultiaddrResult};
use types::{IpfsResult, IpfsGetResult, IpfsPutResult, IpfsGetPeerIdResult, IpfsMultiaddrResult};

use marine_rs_sdk::marine;
use marine_rs_sdk::module_manifest;
Expand Down Expand Up @@ -80,6 +80,28 @@ pub(crate) fn create_config() {
}
}

pub fn get_peer_id(api_multiaddr: String, timeout: u64) -> eyre::Result<Protocol<'static>> {
let peer_id_result = ipfs_get_peer_id(api_multiaddr, timeout);
if !peer_id_result.success {
Err(eyre::eyre!(peer_id_result.error.clone()))?;
}

Ok(Protocol::P2p(Multihash::from_bytes(&bs58::decode(peer_id_result.peer_id.clone()).into_vec()?).wrap_err(format!("error parsing peer_id from 'ipfs id': {}", peer_id_result.peer_id))?))
}

#[marine]
pub fn connect(multiaddr: String) -> IpfsResult {
if Multiaddr::from_str(&multiaddr).is_err() {
return Err(eyre::eyre!("invalid multiaddr: {}", multiaddr)).into();
}

let config = load_config();
let timeout = config.timeout;
let local_maddr = config.local_api_multiaddr.to_string();

ipfs_connect(multiaddr, local_maddr, timeout)
}

#[marine]
pub fn put(file_path: String) -> IpfsPutResult {
log::info!("put called with {:?}", file_path);
Expand All @@ -88,22 +110,25 @@ pub fn put(file_path: String) -> IpfsPutResult {
}

#[marine]
pub fn get_from(hash: String, swarm_multiaddr: String) -> IpfsGetFromResult {
log::info!("get called with hash: {}", hash);
pub fn get(hash: String) -> IpfsGetResult {
let local_maddr = load_config().local_api_multiaddr.to_string();
get_from(hash, local_maddr)
}

#[marine]
pub fn get_from(hash: String, external_multiaddr: String) -> IpfsGetResult {
log::info!("get from called with hash: {}", hash);
let config = load_config();
let timeout = config.timeout;
let local_maddr = config.local_api_multiaddr.to_string();

let particle_id = marine_rs_sdk::get_call_parameters().particle_id;
let connect_result = ipfs_connect(swarm_multiaddr, local_maddr.clone(), timeout);

if !connect_result.success {
return Err(eyre::eyre!(connect_result.error)).into();
if Multiaddr::from_str(&external_multiaddr).is_err() {
return Err(eyre::eyre!("invalid multiaddr: {}", external_multiaddr)).into();
}

let particle_vault_path = format!("/tmp/vault/{}", particle_id);
let path = format!("{}/{}", particle_vault_path, hash);
let get_result = ipfs_get(hash, path.clone(), local_maddr, timeout);
let get_result = ipfs_get(hash, path.clone(), external_multiaddr, timeout);

if get_result.success {
Ok(path).into()
Expand Down Expand Up @@ -143,16 +168,7 @@ pub fn set_external_api_multiaddr(multiaddr: String) -> IpfsResult {
n => Err(eyre::eyre!("multiaddr should contain 2 or 3 components, {} given", n))?,
}

let set_result = ipfs_set_external_api_multiaddr(multiaddr.to_string(), local_maddr.clone(), timeout);
if !set_result.success {
return set_result;
}
let peer_id_result = ipfs_get_peer_id(local_maddr, timeout);
if !peer_id_result.success {
Err(eyre::eyre!(peer_id_result.error.clone()))?;
}

let peer_id = Protocol::P2p(Multihash::from_bytes(&bs58::decode(peer_id_result.peer_id.clone()).into_vec()?).wrap_err(format!("peer_id parsing failed: {}", peer_id_result.peer_id))?);
let peer_id = get_peer_id(local_maddr, timeout)?;
if passed_peer_id.is_some() && passed_peer_id != Some(peer_id.clone()) {
Err(eyre::eyre!("given peer id is different from node peer_id: given {}, actual {}", passed_peer_id.unwrap().to_string(), peer_id.to_string()))?;
}
Expand Down Expand Up @@ -205,7 +221,31 @@ pub fn set_external_swarm_multiaddr(multiaddr: String) -> IpfsResult {

let result: eyre::Result<()> = try {
let mut config = load_config();
config.external_swarm_multiaddr = Some(Multiaddr::from_str(&multiaddr).wrap_err(format!("invalid multiaddr: {}", multiaddr))?);

let mut multiaddr = Multiaddr::from_str(&multiaddr).wrap_err(format!("invalid multiaddr: {}", multiaddr))?;

let mut passed_peer_id = None;
match multiaddr.iter().count() {
3 => {
passed_peer_id = multiaddr.pop();
}
2 => {}
n => Err(eyre::eyre!("multiaddr should contain 2 or 3 components, {} given", n))?,
}

let peer_id = get_peer_id(config.local_api_multiaddr.to_string(), config.timeout)?;
if passed_peer_id.is_some() && passed_peer_id != Some(peer_id.clone()) {
Err(eyre::eyre!("given peer id is different from node peer_id: given {}, actual {}", passed_peer_id.unwrap().to_string(), peer_id.to_string()))?;
}

multiaddr.push(peer_id);

let set_result = ipfs_set_external_swarm_multiaddr(multiaddr.to_string(), config.local_api_multiaddr.to_string(), config.timeout);
if !set_result.success {
return set_result;
}

config.external_swarm_multiaddr = Some(multiaddr);
write_config(config);
()
};
Expand All @@ -225,19 +265,19 @@ pub fn set_timeout(timeout_sec: u64) {
#[link(wasm_import_module = "ipfs_effector")]
extern "C" {
#[link_name = "connect"]
pub fn ipfs_connect(external_multiaddr: String, local_multiaddr: String, timeout_sec: u64) -> IpfsResult;
pub fn ipfs_connect(external_multiaddr: String, api_multiaddr: String, timeout_sec: u64) -> IpfsResult;

/// Put provided file to ipfs, return ipfs hash of the file.
#[link_name = "put"]
pub fn ipfs_put(file_path: String, local_multiaddr: String, timeout_sec: u64) -> IpfsPutResult;
pub fn ipfs_put(file_path: String, api_multiaddr: String, timeout_sec: u64) -> IpfsPutResult;

/// Get file from ipfs by hash.
#[link_name = "get"]
pub fn ipfs_get(hash: String, file_path: String, local_multiaddr: String, timeout_sec: u64) -> IpfsResult;
pub fn ipfs_get(hash: String, file_path: String, api_multiaddr: String, timeout_sec: u64) -> IpfsResult;

#[link_name = "get_peer_id"]
pub fn ipfs_get_peer_id(local_multiaddr: String, timeout_sec: u64) -> IpfsGetPeerIdResult;

#[link_name = "set_external_api_multiaddr"]
pub fn ipfs_set_external_api_multiaddr(external_multiaddr: String, local_multiaddr: String, timeout_sec: u64) -> IpfsResult;
#[link_name = "set_external_swarm_multiaddr"]
pub fn ipfs_set_external_swarm_multiaddr(swarm_multiaddr: String, api_multiaddr: String, timeout_sec: u64) -> IpfsResult;
}
13 changes: 13 additions & 0 deletions service/pure/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,17 @@ mod tests {
assert!(result.success);
assert_eq!(format!("{}/p2p/{}", multiaddr, peer_id), result.multiaddr);
}

#[marine_test(config_path = "Config.toml", modules_dir = "../../artifacts")]
fn set_get_external_swarm_multiaddr() {
let multiaddr = "/ip4/127.0.0.1/tcp/9992";
let result = ipfs_pure.set_external_swarm_multiaddr(multiaddr.to_string());
assert!(result.success);

let peer_id = ipfs_effector.get_peer_id("/ip4/127.0.0.1/tcp/5001".to_string(), 0).peer_id;

let result = ipfs_pure.get_external_swarm_multiaddr();
assert!(result.success);
assert_eq!(format!("{}/p2p/{}", multiaddr, peer_id), result.multiaddr);
}
}
2 changes: 1 addition & 1 deletion service/types/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
mod results;

pub use results::{IpfsGetFromResult, IpfsResult, IpfsPutResult, IpfsGetPeerIdResult, IpfsMultiaddrResult};
pub use results::{IpfsGetResult, IpfsResult, IpfsPutResult, IpfsGetPeerIdResult, IpfsMultiaddrResult};
4 changes: 2 additions & 2 deletions service/types/src/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ impl From<Result<()>> for IpfsResult {
}

#[marine]
pub struct IpfsGetFromResult {
pub struct IpfsGetResult {
pub success: bool,
pub error: String,
pub path: String,
}

impl From<Result<String>> for IpfsGetFromResult {
impl From<Result<String>> for IpfsGetResult {
fn from(result: Result<String>) -> Self {
match result {
Ok(path) => Self { success: true, error: "".to_string(), path },
Expand Down

0 comments on commit 65db814

Please sign in to comment.