Skip to content
This repository has been archived by the owner on Sep 21, 2024. It is now read-only.

Commit

Permalink
fix: some fixes for downstream cloud hosting work:
Browse files Browse the repository at this point in the history
* Cyclic dependency issues downstream:
  * Remove vestigial getrandom dep from noosphere-core (add notes to the usage in noosphere-core after further investigation)
  * Remove deprecated (due to cyclic dep issues) `serde-serialize` feature from wasm-bindgen
* Make gateway routing traits public
* Provide blanket implementations of `JobClient` for `Arc<JobClient>`
* Scope `GatewayManager::ucan_store()` by gateway sphere identity, as well as other GatewayManager methods (in lieu of counterpart)
  • Loading branch information
cdata authored and jsantell committed Mar 28, 2024
1 parent f0f116c commit 19eaba2
Show file tree
Hide file tree
Showing 16 changed files with 59 additions and 38 deletions.
3 changes: 0 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,3 @@ web-sys = { version = "0.3" }
[profile.release]
opt-level = 'z'
lto = true

2 changes: 0 additions & 2 deletions rust/noosphere-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ tokio = { workspace = true, features = ["full"] }
tracing-subscriber = { workspace = true }

[target.'cfg(target_arch = "wasm32")'.dependencies]
# NOTE: This is needed so that rand can be included in WASM builds
getrandom = { workspace = true, features = ["js"] }
gloo-net = { workspace = true }
wasm-streams = { workspace = true }
wasm-bindgen = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions rust/noosphere-core/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ pub mod v0alpha2;

pub use client::*;
pub use data::*;
pub use route::*;
4 changes: 2 additions & 2 deletions rust/noosphere-gateway/src/extractors/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ where
let capability = generate_capability(counterpart_str, required_ability);
let db = self
.manager
.ucan_store()
.ucan_store(&gateway_scope.gateway)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;

Expand All @@ -70,7 +70,7 @@ where
debug!("Authorized!");
return self
.manager
.sphere_context(&gateway_scope.counterpart)
.sphere_context(&gateway_scope.gateway)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR);
}
Expand Down
9 changes: 6 additions & 3 deletions rust/noosphere-gateway/src/extractors/scope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use noosphere_core::context::SphereContext;
pub struct GatewayScope<C, S> {
/// [Did] of the client counterpart sphere.
pub counterpart: Did,
/// [Did] of the managed gateway sphere.
pub gateway: Did,
/// [Did] of the author of the managed gateway sphere.
pub gateway_identity: Did,
sphere_context_marker: PhantomData<C>,
Expand All @@ -31,9 +33,10 @@ pub struct GatewayScope<C, S> {

impl<C, S> GatewayScope<C, S> {
/// Creates a new [GatewayScope].
pub fn new(gateway_identity: Did, counterpart: Did) -> Self {
pub fn new(gateway_identity: Did, gateway: Did, counterpart: Did) -> Self {
Self {
gateway_identity,
gateway,
counterpart,
sphere_context_marker: PhantomData,
storage_marker: PhantomData,
Expand All @@ -54,7 +57,7 @@ where
parts: &mut Parts,
state: &Arc<M>,
) -> Result<Self, Self::Rejection> {
let (gateway_identity, counterpart) = state.gateway_scope(parts).await?;
Ok(GatewayScope::new(gateway_identity, counterpart))
let (gateway_identity, gateway, counterpart) = state.gateway_scope(parts).await?;
Ok(GatewayScope::new(gateway_identity, gateway, counterpart))
}
}
10 changes: 5 additions & 5 deletions rust/noosphere-gateway/src/gateway_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ where
/// An optional [Url] to configure CORS layers.
fn cors_origin(&self) -> Option<Url>;

/// A sphere-agnostic block store for authorization.
async fn ucan_store(&self) -> Result<UcanStore<S::BlockStore>>;
/// Retrieve a [UcanStore] for `sphere_identity`.
async fn ucan_store(&self, sphere_identity: &Did) -> Result<UcanStore<S::BlockStore>>;

/// Retrieve a sphere context that maps to `counterpart`.
async fn sphere_context(&self, counterpart: &Did) -> Result<C>;
/// Retrieve a sphere context that maps to `sphere_identity`.
async fn sphere_context(&self, sphere_identity: &Did) -> Result<C>;

/// Extract the specified gateway identity (0) and counterpart (1)
/// from an [axum] request. This function should be deterministic in
/// order to take advantage of caching.
async fn gateway_scope(&self, parts: &mut Parts) -> Result<(Did, Did), StatusCode>;
async fn gateway_scope(&self, parts: &mut Parts) -> Result<(Did, Did, Did), StatusCode>;
}
4 changes: 2 additions & 2 deletions rust/noosphere-gateway/src/handlers/v0alpha1/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ where
if let Err(error) = self
.job_runner_client
.submit(GatewayJob::NameSystemResolveSince {
identity: self.gateway_scope.counterpart.to_owned(),
identity: self.gateway_scope.gateway.to_owned(),
since: self.request_body.local_base,
})
{
Expand All @@ -323,7 +323,7 @@ where
// an explicit publish action. Move this to the publish handler when we
// have added it to the gateway.
if let Err(error) = self.job_runner_client.submit(GatewayJob::IpfsSyndication {
identity: self.gateway_scope.counterpart.to_owned(),
identity: self.gateway_scope.gateway.to_owned(),
name_publish_on_success,
}) {
warn!("Failed to queue IPFS syndication job: {}", error);
Expand Down
4 changes: 2 additions & 2 deletions rust/noosphere-gateway/src/handlers/v0alpha2/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ where
if let Err(error) = self
.job_runner_client
.submit(GatewayJob::NameSystemResolveAll {
identity: self.gateway_scope.counterpart.to_owned(),
identity: self.gateway_scope.gateway.to_owned(),
})
{
warn!("Failed to request name system resolutions: {}", error);
Expand All @@ -354,7 +354,7 @@ where
// an explicit publish action. Move this to the publish handler when we
// have added it to the gateway.
if let Err(error) = self.job_runner_client.submit(GatewayJob::IpfsSyndication {
identity: self.gateway_scope.counterpart.to_owned(),
identity: self.gateway_scope.gateway.to_owned(),
name_publish_on_success,
}) {
warn!("Failed to queue IPFS syndication job: {}", error);
Expand Down
10 changes: 10 additions & 0 deletions rust/noosphere-gateway/src/jobs/client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
use crate::jobs::GatewayJob;
use anyhow::Result;
use std::{ops::Deref, sync::Arc};

/// [JobClient] allows a gateway or other service
/// to submit jobs to be processed.
pub trait JobClient: Send + Sync {
/// Submit a [GatewayJob] to be processed.
fn submit(&self, job: GatewayJob) -> Result<()>;
}

impl<T> JobClient for Arc<T>
where
T: JobClient,
{
fn submit(&self, job: GatewayJob) -> Result<()> {
self.deref().submit(job)
}
}
8 changes: 4 additions & 4 deletions rust/noosphere-gateway/src/jobs/processors/syndication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ where

// Take a lock on the `SphereContext` and look up the most recent
// syndication checkpoint for this Kubo node
let (sphere_revision, mut syndication_checkpoint, db, counterpart_identity) = {
let (sphere_revision, mut syndication_checkpoint, db, sphere_identity) = {
let db = {
let context = context.sphere_context().await?;
context.db().clone()
Expand All @@ -87,7 +87,7 @@ where
let counterpart_identity = db.require_key::<_, Did>(COUNTERPART).await?;
let sphere = context.to_sphere().await?;
let content = sphere.get_content().await?;

let sphere_identity = context.identity().await?;
let counterpart_revision = *content.require(&counterpart_identity).await?;

let syndication_checkpoint = match context.read(&checkpoint_key).await? {
Expand Down Expand Up @@ -121,7 +121,7 @@ where
counterpart_revision,
syndication_checkpoint,
db,
counterpart_identity,
sphere_identity,
)
};

Expand Down Expand Up @@ -196,7 +196,7 @@ where

Ok(
name_publish_on_success.map(|record| GatewayJob::NameSystemPublish {
identity: counterpart_identity,
identity: sphere_identity,
record,
}),
)
Expand Down
29 changes: 20 additions & 9 deletions rust/noosphere-gateway/src/single_tenant/gateway_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@ where
name_resolver_api: Url,
cors_origin: Option<Url>,
) -> Result<Self> {
let gateway_identity = context.sphere_context().await?.author().did().await?;
let gateway_scope = GatewayScope::new(gateway_identity, counterpart);
let (gateway, gateway_identity) = {
let ctx = context.sphere_context().await?;
(ctx.identity().to_owned(), ctx.author().did().await?)
};
let gateway_scope = GatewayScope::new(gateway_identity, gateway, counterpart);
let context_resolver =
SingleTenantContextResolver::new(context.clone(), gateway_scope.clone());
let job_client = Arc::new(
Expand Down Expand Up @@ -88,19 +91,27 @@ where
self.cors_origin.to_owned()
}

async fn ucan_store(&self) -> Result<UcanStore<S::BlockStore>> {
let context = self.context.sphere_context().await?;
let db = context.db().to_block_store();
Ok(UcanStore(db))
async fn ucan_store(&self, sphere_identity: &Did) -> Result<UcanStore<S::BlockStore>> {
match &self.gateway_scope.gateway == sphere_identity {
true => {
let context = self.context.sphere_context().await?;
let db = context.db().to_block_store();
Ok(UcanStore(db))
}
false => Err(anyhow::anyhow!(
"No ucan store found with identity: {sphere_identity}."
)),
}
}

async fn sphere_context(&self, counterpart: &Did) -> Result<C> {
self.context_resolver.get_context(counterpart).await
async fn sphere_context(&self, sphere_identity: &Did) -> Result<C> {
self.context_resolver.get_context(sphere_identity).await
}

async fn gateway_scope(&self, _: &mut Parts) -> Result<(Did, Did), StatusCode> {
async fn gateway_scope(&self, _: &mut Parts) -> Result<(Did, Did, Did), StatusCode> {
Ok((
self.gateway_scope.gateway_identity.clone(),
self.gateway_scope.gateway.clone(),
self.gateway_scope.counterpart.clone(),
))
}
Expand Down
4 changes: 2 additions & 2 deletions rust/noosphere-gateway/src/single_tenant/job_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ where
}
}

impl<C, S> JobClient for Arc<SingleTenantJobClient<C, S>>
impl<C, S> JobClient for SingleTenantJobClient<C, S>
where
C: HasMutableSphereContext<S>,
S: Storage + 'static,
Expand Down Expand Up @@ -136,7 +136,7 @@ where
C: HasMutableSphereContext<S> + 'static,
S: Storage + 'static,
{
let identity = scope.counterpart;
let identity = scope.gateway;
let _ = tokio::join!(
schedule(
&queue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ where
S: Storage + 'static,
{
async fn get_context(&self, did: &Did) -> Result<C> {
match &self.gateway_scope.counterpart == did {
match &self.gateway_scope.gateway == did {
true => Ok(self.context.clone()),
false => Err(anyhow!(
"No sphere context found with gateway identity: {did}."
Expand Down
2 changes: 1 addition & 1 deletion rust/noosphere-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ rocksdb = { version = "0.22.0", optional = true }

[target.'cfg(target_arch = "wasm32")'.dependencies]
tokio = { workspace = true, features = ["sync", "macros"] }
wasm-bindgen = { workspace = true, features = ["serde-serialize"] }
wasm-bindgen = { workspace = true }
wasm-bindgen-futures = { workspace = true }
serde-wasm-bindgen = { workspace = true }
js-sys = { workspace = true }
Expand Down
4 changes: 3 additions & 1 deletion rust/noosphere-ucan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ wasm-bindgen-futures = { workspace = true, optional = true }
js-sys = { workspace = true, optional = true }

[target.'cfg(target_arch = "wasm32")'.dependencies]
# NOTE: This is needed so that rand can be included in WASM builds
# NOTE: This is a transitive dependency used by other
# crypto/random crates that requires us setting the "js"
# in order to provide the functionality in JS environments.
getrandom = { workspace = true, features = ["js"] }

[dev-dependencies]
Expand Down

0 comments on commit 19eaba2

Please sign in to comment.