Skip to content

Commit

Permalink
Added a test checking that the input and output directories are prope…
Browse files Browse the repository at this point in the history
…rly configured when running archive containers.

This is part of #751.
  • Loading branch information
Richard Liang committed Mar 1, 2019
1 parent 39e2360 commit 42b66e6
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 15 deletions.
140 changes: 125 additions & 15 deletions kive/container/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from container.forms import ContainerForm
from kive.tests import BaseTestCases, install_fixture_files, capture_log_stream
from librarian.models import Dataset, ExternalFileDirectory, get_upload_path
from file_access_utils import use_field_file


def create_tar_content(container=None, content=None):
Expand Down Expand Up @@ -1731,6 +1732,16 @@ def setUp(self):
self.call_stdout = ''
self.call_stderr = ''
self.call_return_code = 0
self.source_path = os.path.abspath(
os.path.join(
__file__,
'..',
'..',
'..',
'samplecode',
'singularity'
)
)

def dummy_call(self, command, stdout, stderr):
self.called_command = command
Expand Down Expand Up @@ -2021,12 +2032,6 @@ def _test_run_multistep_archive_helper(self):
1,2
2,3
"""
source_path = os.path.abspath(os.path.join(__file__,
'..',
'..',
'..',
'samplecode',
'singularity'))
content = dict(pipeline=dict(
inputs=[dict(dataset_name="pairs_csv")],
steps=[dict(driver="sums_and_products.py",
Expand All @@ -2045,7 +2050,7 @@ def _test_run_multistep_archive_helper(self):
tar_data = BytesIO()
with TarFile(fileobj=tar_data, mode='w') as t:
for script_name in ('sums_and_products.py', 'sum_summary.py'):
with open(os.path.join(source_path, script_name), 'rb') as f:
with open(os.path.join(self.source_path, script_name), 'rb') as f:
script_text = f.read()
script_text = b'#!/usr/bin/env python\n' + script_text
tar_info = TarInfo(script_name)
Expand Down Expand Up @@ -2156,12 +2161,6 @@ def test_multiple_inputs_and_outputs_directories(self):
bye
what up
"""
source_path = os.path.abspath(os.path.join(__file__,
'..',
'..',
'..',
'samplecode',
'singularity'))
content = dict(pipeline=dict(
inputs=[dict(dataset_name="names_csv"),
dict(dataset_name="raw_salutations_csv")],
Expand Down Expand Up @@ -2198,7 +2197,7 @@ def test_multiple_inputs_and_outputs_directories(self):
tar_data = BytesIO()
with TarFile(fileobj=tar_data, mode='w') as t:
for script_name in ('hello_goodbye_converter.py', 'salutations.py'):
with open(os.path.join(source_path, script_name), 'rb') as f:
with open(os.path.join(self.source_path, script_name), 'rb') as f:
script_text = f.read()
tar_info = TarInfo(script_name)
tar_info.size = len(script_text)
Expand All @@ -2208,7 +2207,7 @@ def test_multiple_inputs_and_outputs_directories(self):
cf = ContainerFamily.objects.get(name="fixture family")
parent = cf.containers.get(tag="vFixture")

# Make a new archive container for this archive container.
# Make a new archive container.
container = Container.objects.create(
parent=parent,
family=cf,
Expand Down Expand Up @@ -2331,6 +2330,117 @@ def test_multiple_inputs_and_outputs_directories(self):
self.assertTrue(os.path.isfile(final_opposite_formatted_salutations))
self.assertTrue(cmp(step3_output_opposite_formatted_salutations_csv, final_opposite_formatted_salutations))

def test_mount_directories(self):
"""Test that the correct directories are mounted in the right places."""
pipeline = {
"inputs": [
{"dataset_name": "input_text"}
],
"steps": [
{
"driver": "scanner.py",
"inputs": [
{
"dataset_name": "step1_input_text",
"source_step": 0,
"source_dataset_name": "input_text"
}
],
"outputs": ["summary_json"]
},
{
"driver": "scanner.py",
"inputs": [
{
"dataset_name": "step2_input_text",
"source_step": 1,
"source_dataset_name": "summary_json"
}
],
"outputs": ["summary_json"]
}
],
"outputs": [
{
"dataset_name": "step1_summary_json",
"source_step": 1,
"source_dataset_name": "summary_json"
},
{
"dataset_name": "step2_summary_json",
"source_step": 2,
"source_dataset_name": "summary_json"
}
]
}
content = {"pipeline": pipeline}

tar_data = BytesIO()
with TarFile(fileobj=tar_data, mode='w') as t:
with open(os.path.join(self.source_path, "scanner.py"), 'rb') as f:
script_text = f.read()
tar_info = TarInfo("scanner.py")
tar_info.size = len(script_text)
t.addfile(tar_info, BytesIO(script_text))
tar_data.seek(0)

cf = ContainerFamily.objects.get(name="fixture family")
parent = cf.containers.get(tag="vFixture")

# Make a new archive container for this archive container.
container = Container.objects.create(
parent=parent,
family=cf,
user=parent.user,
tag='multistep_multiinput_multioutput',
file_type=Container.TAR)
container.file.save('test_multi.tar', ContentFile(tar_data.getvalue()))
container.write_archive_content(content)
container.save()

archive_app = container.apps.create(memory=200, threads=1)
archive_app.write_inputs('input_text')
archive_app.write_outputs('step1_summary_json step2_summary_json')
input_text = """\
Line 1
Line 2
Line 3
"""
input_text_dataset = Dataset.create_dataset(
file_path=None,
user=container.user,
file_handle=ContentFile(input_text.encode("utf-8"), name="input_text")
)

run = archive_app.runs.create(
name="CheckMountPoints",
user=container.user
)
run.datasets.create(
argument=archive_app.arguments.get(type=ContainerArgument.INPUT, position=1),
dataset=input_text_dataset
)
run.save(schedule=False)

# Run it!
call_command('runcontainer', str(run.id))
run.refresh_from_db()

# Now inspect the results. The first step's input should have three lines; the second should have one.
step1_summary_cds = run.datasets.get(argument__type=ContainerArgument.OUTPUT, argument__position=1)
with use_field_file(step1_summary_cds.dataset.dataset_file) as f:
step1_summary = json.loads(f.read().decode("utf-8"))
self.assertEqual(step1_summary["lines"], 3)
self.assertEqual(step1_summary["mnt_input_contents"], ["step1_input_text"])
self.assertEqual(step1_summary["mnt_output_contents"], ["step1_summary_json_{}".format(run.pk)])

step2_summary_cds = run.datasets.get(argument__type=ContainerArgument.OUTPUT, argument__position=2)
with use_field_file(step2_summary_cds.dataset.dataset_file) as f:
step2_summary = json.loads(f.read().decode("utf-8"))
self.assertEqual(step2_summary["lines"], 1)
self.assertEqual(step2_summary["mnt_input_contents"], ["step2_input_text"])
self.assertEqual(step2_summary["mnt_output_contents"], ["step2_summary_json_{}".format(run.pk)])

def test_already_started(self):
""" Pretend that another instance of the command already started. """
run = ContainerRun.objects.get(name='fixture run')
Expand Down
33 changes: 33 additions & 0 deletions samplecode/singularity/scanner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#! /usr/bin/env python

import os
import json
from argparse import ArgumentParser, FileType


def parse_args():
parser = ArgumentParser(
description="Counts the number of lines in the input file, reports the contents of /mnt/input and /mnt/output."
)
parser.add_argument(
'input_text',
type=FileType("rt"),
help='A text file, anything goes'
)
parser.add_argument('summary_json', type=FileType('w'))

return parser.parse_args()


def main():
args = parse_args()
summary = {
"lines": sum(1 for _ in args.input_text),
"mnt_input_contents": os.listdir("/mnt/input"),
"mnt_output_contents": os.listdir("/mnt/output")
}
args.summary_json.write(json.dumps(summary))


if __name__ == "__main__":
main()

0 comments on commit 42b66e6

Please sign in to comment.