Skip to content

Commit

Permalink
Separate folders are now created in the sandbox for each step, as par…
Browse files Browse the repository at this point in the history
…t of #751.
  • Loading branch information
Richard Liang committed Feb 27, 2019
1 parent d3e42b8 commit 19f58c9
Show file tree
Hide file tree
Showing 4 changed files with 393 additions and 24 deletions.
31 changes: 16 additions & 15 deletions kive/container/management/commands/runcontainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,15 +233,12 @@ def run_pipeline(self,
"""
# The instructions take the form of a Python representation of a pipeline JSON file.
# We keep track of what files were produced by what steps in file_map, which is a list of dictionaries.
# Each dictionary maps dataset_name -|-> (internal path, external path), and the step index is their
# Each dictionary maps dataset_name -|-> external path, and the step index is their
# position in the list.
inputs_map = {}
for input_dict in instructions["inputs"]:
# This dictionary has a field called "dataset_name".
inputs_map[input_dict["dataset_name"]] = (
os.path.join(internal_inputs_dir, input_dict["dataset_name"]),
os.path.join(external_inputs_dir, input_dict["dataset_name"])
) # the second isn't needed but we keep it for consistency
inputs_map[input_dict["dataset_name"]] = os.path.join(external_inputs_dir, input_dict["dataset_name"])
file_map = [inputs_map]

final_return_code = 0
Expand All @@ -251,6 +248,11 @@ def run_pipeline(self,
idx,
step["driver"])

external_step_input_dir = os.path.join(run.full_sandbox_path, "step{}".format(idx), "input")
external_step_output_dir = os.path.join(run.full_sandbox_path, "step{}".format(idx), "output")
os.makedirs(external_step_input_dir)
os.makedirs(external_step_output_dir)

# Each step is a dictionary with fields:
# - driver (the executable)
# - inputs (a list of (step_num, dataset_name) pairs)
Expand All @@ -264,20 +266,19 @@ def run_pipeline(self,
source_step = input_dict["source_step"]
source_dataset_name = input_dict["source_dataset_name"]
step_outputs = file_map[source_step]
internal_path, external_path = step_outputs[source_dataset_name]
input_paths.append(internal_path)
external_path = step_outputs[source_dataset_name]
os.link(external_path, os.path.join(external_step_input_dir, input_dict["dataset_name"]))
input_paths.append(os.path.join(internal_inputs_dir, input_dict["dataset_name"]))
outputs_map = {}
output_paths = []
for dataset_name in step["outputs"]:
file_name = self.build_dataset_name(
run,
"step{}_{}".format(idx, dataset_name))
outputs_map[dataset_name] = (
os.path.join(internal_outputs_dir, file_name),
os.path.join(external_outputs_dir, file_name)
)
output_paths.append(os.path.join(internal_outputs_dir, file_name))
outputs_map[dataset_name] = os.path.join(external_step_output_dir, file_name)
file_map.append(outputs_map)

output_paths = [outputs_map[x][0] for x in step["outputs"]]
execution_args = [
"singularity",
"exec",
Expand All @@ -286,9 +287,9 @@ def run_pipeline(self,
"-B",
extracted_archive_dir + ':' + internal_binary_dir,
"-B",
external_inputs_dir + ':' + internal_inputs_dir,
external_step_input_dir + ':' + internal_inputs_dir,
"-B",
external_outputs_dir + ':' + internal_outputs_dir,
external_step_output_dir + ':' + internal_outputs_dir,
"--pwd",
internal_working_dir,
run.app.container.parent.file.path,
Expand Down Expand Up @@ -326,6 +327,6 @@ def run_pipeline(self,
final_output_path = os.path.join(external_outputs_dir, output["dataset_name"])
source_step = output["source_step"]
source_dataset_name = output["source_dataset_name"]
os.link(file_map[source_step][source_dataset_name][1], final_output_path)
os.link(file_map[source_step][source_dataset_name], final_output_path)

return final_return_code
Loading

0 comments on commit 19f58c9

Please sign in to comment.