Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: poll DHT in background when worker runs up a workflow + dual-stack webserver #590

Merged
merged 6 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .envrc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use_flake

export RUST_LOG=homestar=debug,homestar_runtime=debug,homestar_wasm=debug,libp2p=info,libp2p_gossipsub::behaviour=debug,tarpc=info,tower_http=debug,jsonrpsee_server=debug,moka=debug
export RUST_BACKTRACE=full
export RUST_BACKTRACE=1
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ repos:
- id: trailing-whitespace
exclude: bindings\.rs$
- id: end-of-file-fixer
exclude: \.(txt|json)$
exclude: \.(txt|json|patch)$
- id: check-yaml
- id: check-json
- id: check-added-large-files
Expand Down
40 changes: 27 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions diesel.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

[print_schema]
file = "homestar-runtime/src/db/schema.rs"
patch_file = "homestar-runtime/src/db/schema.patch"

[migrations_directory]
dir = "homestar-runtime/migrations"
24 changes: 12 additions & 12 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 12 additions & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,23 @@
pkgs = import nixpkgs {inherit system overlays;};
unstable = import nixos-unstable {inherit system overlays;};

rust-toolchain = fenix.packages.${system}.fromToolchainFile {
file-toolchain = fenix.packages.${system}.fromToolchainFile {
file = ./rust-toolchain.toml;
# sha256 = pkgs.lib.fakeSha256;
sha256 = "sha256-e4mlaJehWBymYxJGgnbuCObVlqMlQSilZ8FljG9zPHY=";
};

default-toolchain = fenix.packages.${system}.complete.withComponents [
"cargo"
"clippy"
"llvm-tools-preview"
"rustfmt"
"rust-src"
"rust-std"
];

rust-toolchain = fenix.packages.${system}.combine [file-toolchain default-toolchain];

rustPlatform = pkgs.makeRustPlatform {
cargo = rust-toolchain;
rustc = rust-toolchain;
Expand Down
2 changes: 1 addition & 1 deletion homestar-invocation/src/receipt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub struct Receipt<T> {
}

impl<T> Receipt<T> {
///
/// Create a new [Receipt].
pub fn new(
ran: Pointer,
result: task::Result<T>,
Expand Down
11 changes: 9 additions & 2 deletions homestar-invocation/src/task/instruction/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,15 @@ where
F: Fn(Cid) -> BoxFuture<'a, Result<task::Result<T>, ResolveError>> + Clone + Send + Sync,
Ipld: From<T>,
{
let inputs = resolve_args(self.0, lookup_fn);
Ok(Args(inputs.await))
let inputs = resolve_args(self.0, lookup_fn).await;
for input in inputs.iter() {
if let Input::Deferred(awaiting) = input {
return Err(ResolveError::UnresolvedCid(
awaiting.instruction_cid().to_string(),
));
}
}
Ok(Args(inputs))
}
}

Expand Down
4 changes: 3 additions & 1 deletion homestar-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ diesel = { version = "2.1", default-features = false, features = [
"with-deprecated",
"chrono",
] }
diesel-derive-enum = { version = "2.1", features = ["sqlite"] }
diesel_migrations = "2.1"
dotenvy = "0.15"
dyn-clone = "1.0"
Expand All @@ -79,6 +80,7 @@ homestar-workspace-hack = { workspace = true }
http = "0.2"
http-serde = "1.1"
humantime = { workspace = true }
hyper = { version = "0.14", default-features = false }
indexmap = { workspace = true }
ipfs-api = { version = "0.17", optional = true }
ipfs-api-backend-hyper = { version = "0.6", default-features = false, features = [
Expand Down Expand Up @@ -147,7 +149,7 @@ serde_with = { version = "3.5", default-features = false, features = [
] }
stream-cancel = "0.8"
sysinfo = { version = "0.29", default-features = false, optional = true }
tabled = { version = "0.14", default-features = false, features = [
tabled = { version = "0.15", default-features = false, features = [
"derive",
"macros",
] }
Expand Down
4 changes: 3 additions & 1 deletion homestar-runtime/config/defaults.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ mesh_n = 2
mesh_outbound_min = 1

[node.network.libp2p.dht]
enable_resolve_receipts_in_background = true
p2p_receipt_timeout = 500
p2p_workflow_info_timeout = 500
p2p_provider_timeout = 10_000
Expand All @@ -74,7 +75,8 @@ max_connections = 10
server_timeout = 120

[node.network.webserver]
host = "127.0.0.1"
v4_host = "127.0.0.1"
v6_host = "[::1]"
port = 1337
timeout = 120
websocket_capacity = 2048
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE workflows DROP COLUMN status;
ALTER TABLE workflows DROP COLUMN retries;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE workflows ADD COLUMN status TEXT CHECK(
status IN ('pending', 'completed', 'running', 'stuck')) NOT NULL DEFAULT
'pending';
ALTER TABLE workflows ADD COLUMN retries INTEGER NOT NULL DEFAULT 0;
15 changes: 15 additions & 0 deletions homestar-runtime/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use tokio::fs;
use tracing::info;

#[allow(missing_docs, unused_imports)]
#[rustfmt::skip]
pub mod schema;
pub(crate) mod utils;

Expand Down Expand Up @@ -248,6 +249,20 @@ pub trait Database: Send + Sync + Clone {
}
}

/// Update workflow status given a Cid to the workflow.
fn set_workflow_status(
workflow_cid: Cid,
status: workflow::Status,
conn: &mut Connection,
) -> Result<(), diesel::result::Error> {
diesel::update(schema::workflows::dsl::workflows)
.filter(schema::workflows::cid.eq(Pointer::new(workflow_cid)))
.set(schema::workflows::status.eq(status))
.execute(conn)?;

Ok(())
}

/// Store workflow Cid and [Receipt] Cid in the database for inner join.
fn store_workflow_receipt(
workflow_cid: Cid,
Expand Down
9 changes: 9 additions & 0 deletions homestar-runtime/src/db/schema.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
@@ -25,7 +25,7 @@ diesel::table! {
cid -> Text,
name -> Nullable<Text>,
num_tasks -> Integer,
resources -> Binary,
created_at -> Timestamp,
completed_at -> Nullable<Timestamp>,
+ status -> crate::workflow::StatusMapping,
- status -> Text,
8 changes: 7 additions & 1 deletion homestar-runtime/src/db/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ diesel::table! {
resources -> Binary,
created_at -> Timestamp,
completed_at -> Nullable<Timestamp>,
status -> crate::workflow::StatusMapping,
retries -> Integer,
}
}

Expand All @@ -34,4 +36,8 @@ diesel::table! {
diesel::joinable!(workflows_receipts -> receipts (receipt_cid));
diesel::joinable!(workflows_receipts -> workflows (workflow_cid));

diesel::allow_tables_to_appear_in_same_query!(receipts, workflows, workflows_receipts,);
diesel::allow_tables_to_appear_in_same_query!(
receipts,
workflows,
workflows_receipts,
);
2 changes: 0 additions & 2 deletions homestar-runtime/src/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use crate::{
settings,
};
use anyhow::Result;
use async_trait::async_trait;
use fnv::FnvHashMap;
use libp2p::{
core::ConnectedPoint, futures::StreamExt, kad::QueryId, rendezvous::Cookie,
Expand Down Expand Up @@ -48,7 +47,6 @@ struct Bootstrap {
}

/// Handler trait for [EventHandler] events.
#[async_trait]
pub(crate) trait Handler<DB>
where
DB: Database,
Expand Down
2 changes: 0 additions & 2 deletions homestar-runtime/src/event_handler/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use crate::{
workflow, Db, Receipt,
};
use anyhow::Result;
use async_trait::async_trait;
#[cfg(feature = "websocket-notify")]
use homestar_invocation::Pointer;
use homestar_invocation::Receipt as InvocationReceipt;
Expand Down Expand Up @@ -701,7 +700,6 @@ impl PeerRequest {
}
}

#[async_trait]
impl<DB> Handler<DB> for Event
where
DB: Database,
Expand Down
1 change: 1 addition & 0 deletions homestar-runtime/src/event_handler/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub(crate) fn emit_receipt(
subject = "notification.receipt",
category = "notification",
cid = receipt_cid.to_string(),
instruction_cid = receipt.instruction().cid().to_string(),
"emitting receipt to WebSocket"
);
if let Some(ipld) = metadata {
Expand Down
Loading
Loading