From 9d04c1786fa1ca6f90f72fe796654aee6d826427 Mon Sep 17 00:00:00 2001 From: David Huber Date: Thu, 23 Mar 2023 13:10:05 -0500 Subject: [PATCH] Add a globus transfer job. #1357 --- jobs/JGLOBAL_GLOBUS | 40 ++++++++ jobs/rocoto/globus.sh | 20 ++++ modulefiles/module_base.orion.lua | 3 + parm/config/config.base.emc.dyn | 10 +- parm/config/config.globus | 29 ++++++ parm/config/config.resources | 4 +- scripts/exgdas_enkf_earc.sh | 18 ++-- scripts/exglobal_archive.sh | 16 ++- scripts/exglobal_globus_xfer.sh | 161 ++++++++++++++++++++++++++++++ ush/push_inv_hpss.sh | 28 ++++++ workflow/applications.py | 15 ++- workflow/rocoto/workflow_tasks.py | 24 ++++- 12 files changed, 349 insertions(+), 19 deletions(-) create mode 100755 jobs/JGLOBAL_GLOBUS create mode 100755 jobs/rocoto/globus.sh create mode 100644 parm/config/config.globus create mode 100755 scripts/exglobal_globus_xfer.sh create mode 100644 ush/push_inv_hpss.sh diff --git a/jobs/JGLOBAL_GLOBUS b/jobs/JGLOBAL_GLOBUS new file mode 100755 index 00000000000..37cfd19af59 --- /dev/null +++ b/jobs/JGLOBAL_GLOBUS @@ -0,0 +1,40 @@ +#! /usr/bin/env bash + +source "${HOMEgfs}/ush/preamble.sh" +source "${HOMEgfs}/ush/jjob_header.sh" -e "arch" -c "base globus" + + +############################################## +# Set variables used in the script +############################################## +export CDATE=${CDATE:-${PDY}${cyc}} +export CDUMP=${CDUMP:-${RUN:-"gfs"}} + + +############################################################### +# Run archive script +############################################################### + +${GLOBALGLOBUSXFERSH:-${SCRgfs}/exglobal_globus_xfer.sh} +status=$? +[[ ${status} -ne 0 ]] && exit "${status}" + +############################################## +# End JOB SPECIFIC work +############################################## + +############################################## +# Final processing +############################################## +if [[ -e "${pgmout}" ]] ; then + cat "${pgmout}" +fi + + +########################################## +# Remove the Temporary working directory +########################################## +cd "${DATAROOT}" || (echo "${DATAROOT} does not exist. ABORT!"; exit 1) +[[ ${KEEPDATA} = "NO" ]] && rm -rf "${DATA}" + +exit 0 diff --git a/jobs/rocoto/globus.sh b/jobs/rocoto/globus.sh new file mode 100755 index 00000000000..5759b1df263 --- /dev/null +++ b/jobs/rocoto/globus.sh @@ -0,0 +1,20 @@ +#! /usr/bin/env bash + +source "${HOMEgfs}/ush/preamble.sh" + +############################################################### +# Source FV3GFS workflow modules +. "${HOMEgfs}"/ush/load_fv3gfs_modules.sh +status=$? +[[ ${status} -ne 0 ]] && exit "${status}" + +export job="arch" +export jobid="${job}.$$" + +############################################################### +# Execute the JJOB +"${HOMEgfs}"/jobs/JGLOBAL_GLOBUS +status=$? + + +exit "${status}" diff --git a/modulefiles/module_base.orion.lua b/modulefiles/module_base.orion.lua index 19897aaf33d..b96e887b63d 100644 --- a/modulefiles/module_base.orion.lua +++ b/modulefiles/module_base.orion.lua @@ -39,4 +39,7 @@ load(pathJoin("ufswm", "1.0.0")) load(pathJoin("met", "9.1")) load(pathJoin("metplus", "3.1")) +-- Set the path to globus +append_path("PATH", "/home/dhuber/local/bin") + whatis("Description: GFS run environment") diff --git a/parm/config/config.base.emc.dyn b/parm/config/config.base.emc.dyn index 96705ac6bd3..5c756277ec6 100644 --- a/parm/config/config.base.emc.dyn +++ b/parm/config/config.base.emc.dyn @@ -47,6 +47,7 @@ export HOMEDIR="@HOMEDIR@" export STMP="@STMP@" export PTMP="@PTMP@" export NOSCRUB="@NOSCRUB@" +export NIAGARA_USERNAME="${LOGNAME}" # Base directories for various builds export BASE_GIT="@BASE_GIT@" @@ -87,8 +88,8 @@ export VERBOSE="YES" export KEEPDATA="NO" export CHGRP_RSTPROD="@CHGRP_RSTPROD@" export CHGRP_CMD="@CHGRP_CMD@" -export NCDUMP="$NETCDF/bin/ncdump" -export NCLEN="$HOMEgfs/ush/getncdimlen" +export NCDUMP="${NETCDF}/bin/ncdump" +export NCLEN="${HOMEgfs}/ush/getncdimlen" # Machine environment, jobs, and other utility scripts export BASE_ENV="${HOMEgfs}/env" @@ -111,6 +112,7 @@ export DATAROOT="${STMP}/RUNDIRS/${PSLOT}" # TODO: set via prod_envir in Ops export RUNDIR="${DATAROOT}" # TODO: Should be removed; use DATAROOT instead export ARCDIR="${NOSCRUB}/archive/${PSLOT}" export ATARDIR="@ATARDIR@" +export ATARDIRloc="@ATARDIRloc@" # Commonly defined parameters in JJOBS export envir=${envir:-"prod"} @@ -378,8 +380,10 @@ export binary_diag=".false." export DO_METP="YES" # Run METPLUS jobs - set METPLUS settings in config.metp # Archiving options -export HPSSARCH="@HPSSARCH@" # save data to HPSS archive +export HPSSARCH="@HPSSARCH@" # save data to HPSS archive export LOCALARCH="@LOCALARCH@" # save data to local archive +export DO_GLOBUS="NO" # transfer local archives to Niagara for HPSS archive; only valid if LOCALARCH=YES +export REMOTE_USERNAME=${REMOTE_USERNAME:-${LOGNAME}} if [[ ${HPSSARCH} = "YES" ]] && [[ ${LOCALARCH} = "YES" ]]; then echo "Both HPSS and local archiving selected. Please choose one or the other." exit 2 diff --git a/parm/config/config.globus b/parm/config/config.globus new file mode 100644 index 00000000000..c0401f9f7b5 --- /dev/null +++ b/parm/config/config.globus @@ -0,0 +1,29 @@ +#! /usr/bin/env bash + +########## config.globus ########## +# Globus transfer specific + +echo "BEGIN: config.globus" + +# Get task specific resources +. ${EXPDIR}/config.resources globus + +export GLOBUS_XFR=${GLOBUS_XFR:-"globus transfer --notify failed --preserve-mtime --sync-level mtime"} +export GLOBUS_RM=${GLOBUS_RM:-"globus rm --notify failed -f"} +export GLOBUS_WAIT=${GLOBUS_WAIT-"globus task wait"} +export GLOBUS_LS=${GLOBUS_LS:-"globus ls"} + +export TARGET_DIR=${TARGET_DIR:-"/collab1/data/${NIAGARA_USERNAME:-$LOGNAME}/${PSLOT}/${CDATE}"} + +if [[ ${machine} == "HERA" ]]; then + export LOCAL_GLOBUS_ADDR="82109590-c090-11ea-bef9-0e716405a293" +elif [[ ${machine} == "ORION" ]]; then + export LOCAL_GLOBUS_ADDR="84bad22e-cb80-11ea-9a44-0255d23c44ef" +elif [[ ${machine} == "JET" ]]; then + export LOCAL_GLOBUS_ADDR="34ea8506-1882-11eb-81b5-0e2f230cc907" +fi + +# Default is Niagara +export REMOTE_GLOBUS_ADDR=${REMOTE_GLOBUS_ADDR:-"21467dd0-afd6-11ea-8f12-0a21f750d19b"} + +echo "END: config.arch" diff --git a/parm/config/config.resources b/parm/config/config.resources index c6418c44235..4f5fbd33277 100644 --- a/parm/config/config.resources +++ b/parm/config/config.resources @@ -13,7 +13,7 @@ if [ $# -ne 1 ]; then echo "atmensanalprep atmensanalrun atmensanalpost" echo "aeroanlinit aeroanlrun aeroanlfinal" echo "anal sfcanl analcalc analdiag gldas fcst post vrfy metp arch echgres" - echo "eobs ediag eomg eupd ecen esfc efcs epos earc" + echo "eobs ediag eomg eupd ecen esfc efcs epos earc globus" echo "init_chem mom6ic ocnpost" echo "waveinit waveprep wavepostsbs wavepostbndpnt wavepostbndpntbll wavepostpnt" echo "wavegempak waveawipsbulls waveawipsgridded" @@ -651,7 +651,7 @@ elif [ ${step} = "mom6ic" ]; then export npe_node_mom6ic=24 export is_exclusive=True -elif [[ ${step} = "arch" || ${step} = "earc" || ${step} = "getic" ]]; then +elif [[ ${step} = "arch" || ${step} = "earc" || ${step} = "getic" || ${step} = "globus" ]]; then eval "export wtime_${step}='06:00:00'" eval "export npe_${step}=1" diff --git a/scripts/exgdas_enkf_earc.sh b/scripts/exgdas_enkf_earc.sh index 456178fe9bf..8d09fcbea2e 100755 --- a/scripts/exgdas_enkf_earc.sh +++ b/scripts/exgdas_enkf_earc.sh @@ -9,6 +9,12 @@ export n=$((10#${ENSGRP})) export CDUMP_ENKF=$(echo "${EUPD_CYC:-"gdas"}" | tr a-z A-Z) export ARCH_LIST="${ROTDIR}/${RUN}.${PDY}/${cyc}/earc${ENSGRP}" +if [[ ${LOCALARCH} = "YES" ]]; then + tar_dir=${tar_dir:-${ATARDIRloc}} +elif [[ ${HPSSARCH} = "YES" ]]; then + tar_dir=${tar_dir:-${ATARDIR}} +fi + # ICS are restarts and always lag INC by $assim_freq hours. EARCINC_CYC=${ARCH_CYC} EARCICS_CYC=$((ARCH_CYC-assim_freq)) @@ -40,7 +46,7 @@ if (( 10#${ENSGRP} > 0 )) && [[ ${HPSSARCH} = "YES" || ${LOCALARCH} = "YES" ]]; TARCMD="htar" if [[ ${LOCALARCH} = "YES" ]]; then TARCMD="tar" - [ ! -d "${ATARDIR}"/"${CDATE}" ] && mkdir -p "${ATARDIR}"/"${CDATE}" + [ ! -d "${tar_dir}"/"${CDATE}" ] && mkdir -p "${tar_dir}"/"${CDATE}" fi #--determine when to save ICs for warm start @@ -65,7 +71,7 @@ if (( 10#${ENSGRP} > 0 )) && [[ ${HPSSARCH} = "YES" || ${LOCALARCH} = "YES" ]]; if [ "${CDATE}" -gt "${SDATE}" ]; then # Don't run for first half cycle - ${TARCMD} -P -cvf "${ATARDIR}/${CDATE}/${RUN}_grp${ENSGRP}.tar" $(cat "${ARCH_LIST}/${RUN}_grp${n}.txt") + ${TARCMD} -P -cvf "${tar_dir}/${CDATE}/${RUN}_grp${ENSGRP}.tar" $(cat "${ARCH_LIST}/${RUN}_grp${n}.txt") status=$? if [ "${status}" -ne 0 ] && [ "${CDATE}" -ge "${firstday}" ]; then echo "$(echo "${TARCMD}" | tr 'a-z' 'A-Z') ${CDATE} ${RUN}_grp${ENSGRP}.tar failed" @@ -73,7 +79,7 @@ if (( 10#${ENSGRP} > 0 )) && [[ ${HPSSARCH} = "YES" || ${LOCALARCH} = "YES" ]]; fi if [ "${SAVEWARMICA}" = "YES" ] && [ "${cyc}" -eq "${EARCINC_CYC}" ]; then - ${TARCMD} -P -cvf "${ATARDIR}/${CDATE}/${RUN}_restarta_grp${ENSGRP}.tar" $(cat "${ARCH_LIST}/${RUN}_restarta_grp${n}.txt") + ${TARCMD} -P -cvf "${tar_dir}/${CDATE}/${RUN}_restarta_grp${ENSGRP}.tar" $(cat "${ARCH_LIST}/${RUN}_restarta_grp${n}.txt") status=$? if [ "${status}" -ne 0 ]; then echo "$(echo "${TARCMD}" | tr 'a-z' 'A-Z') ${CDATE} ${RUN}_restarta_grp${ENSGRP}.tar failed" @@ -82,7 +88,7 @@ if (( 10#${ENSGRP} > 0 )) && [[ ${HPSSARCH} = "YES" || ${LOCALARCH} = "YES" ]]; fi if [ "${SAVEWARMICB}" = "YES" ] && [ "${cyc}" -eq "${EARCICS_CYC}" ]; then - ${TARCMD} -P -cvf "${ATARDIR}/${CDATE}/${RUN}_restartb_grp${ENSGRP}.tar" $(cat "${ARCH_LIST}/${RUN}_restartb_grp{n}.txt") + ${TARCMD} -P -cvf "${tar_dir}/${CDATE}/${RUN}_restartb_grp${ENSGRP}.tar" $(cat "${ARCH_LIST}/${RUN}_restartb_grp{n}.txt") status=$? if [ "${status}" -ne 0 ]; then echo "$(echo "${TARCMD}" | tr 'a-z' 'A-Z') ${CDATE} ${RUN}_restartb_grp${ENSGRP}.tar failed" @@ -105,11 +111,11 @@ if [ "${ENSGRP}" -eq 0 ]; then TARCMD="htar" if [[ ${LOCALARCH} = "YES" ]]; then TARCMD="tar" - [ ! -d "${ATARDIR}"/"${CDATE}" ] && mkdir -p "${ATARDIR}"/"${CDATE}" + [ ! -d "${tar_dir}"/"${CDATE}" ] && mkdir -p "${tar_dir}"/"${CDATE}" fi set +e - ${TARCMD} -P -cvf "${ATARDIR}/${CDATE}/${RUN}.tar" $(cat "${ARCH_LIST}/${RUN}.txt") + ${TARCMD} -P -cvf "${tar_dir}/${CDATE}/${RUN}.tar" $(cat "${ARCH_LIST}/${RUN}.txt") status=$? if [ "${status}" -ne 0 ] && [ "${CDATE}" -ge "${firstday}" ]; then echo "$(echo "${TARCMD}" | tr 'a-z' 'A-Z') ${CDATE} ${RUN}.tar failed" diff --git a/scripts/exglobal_archive.sh b/scripts/exglobal_archive.sh index 5bc76d896fc..5adbf6f95b0 100755 --- a/scripts/exglobal_archive.sh +++ b/scripts/exglobal_archive.sh @@ -13,6 +13,12 @@ if [ "${ARCHICS_CYC}" -lt 0 ]; then ARCHICS_CYC=$((ARCHICS_CYC+24)) fi +if [[ ${LOCALARCH} = "YES" ]]; then + tar_dir=${tar_dir:-${ATARDIRloc}} +elif [[ ${HPSSARCH} = "YES" ]]; then + tar_dir=${tar_dir:-${ATARDIR}} +fi + # CURRENT CYCLE APREFIX="${CDUMP}.t${cyc}z." @@ -111,8 +117,8 @@ if [[ ${HPSSARCH} = "YES" || ${LOCALARCH} = "YES" ]]; then TARCMD="htar" if [[ ${LOCALARCH} = "YES" ]]; then TARCMD="tar" - [ ! -d "${ATARDIR}"/"${CDATE}" ] && mkdir -p "${ATARDIR}"/"${CDATE}" - [ ! -d "${ATARDIR}"/"${CDATE_MOS}" ] && [ -d "${ROTDIR}"/gfsmos."${PDY_MOS}" ] && [ "${cyc}" -eq 18 ] && mkdir -p "${ATARDIR}"/"${CDATE_MOS}" + [ ! -d "${tar_dir}"/"${CDATE}" ] && mkdir -p "${tar_dir}"/"${CDATE}" + [ ! -d "${tar_dir}"/"${CDATE_MOS}" ] && [ -d "${ROTDIR}"/gfsmos."${PDY_MOS}" ] && [ "${cyc}" -eq 18 ] && mkdir -p "${tar_dir}"/"${CDATE_MOS}" fi #--determine when to save ICs for warm start and forecast-only runs @@ -181,7 +187,7 @@ if [ "${CDUMP}" = "gfs" ]; then # Aerosols if [ "${DO_AERO}" = "YES" ]; then for targrp in chem; do - ${TARCMD} -P -cvf "${ATARDIR}"/"${CDATE}"/"${targrp}".tar $(cat "${ARCH_LIST}"/"${targrp}".txt) + ${TARCMD} -P -cvf "${tar_dir}"/"${CDATE}"/"${targrp}".tar $(cat "${ARCH_LIST}"/"${targrp}".txt) status=$? if [ "${status}" -ne 0 ] && [ "${CDATE}" -ge "${firstday}" ]; then echo "HTAR ${CDATE} ${targrp}.tar failed" @@ -203,7 +209,7 @@ if [ "${CDUMP}" = "gfs" ]; then #--save mdl gfsmos output from all cycles in the 18Z archive directory if [ -d gfsmos."${PDY_MOS}" ] && [ "${cyc}" -eq 18 ]; then set +e - ${TARCMD} -P -cvf "${ATARDIR}"/"${CDATE_MOS}"/gfsmos.tar ./gfsmos."${PDY_MOS}" + ${TARCMD} -P -cvf "${tar_dir}"/"${CDATE_MOS}"/gfsmos.tar ./gfsmos."${PDY_MOS}" status=$? if [ "${status}" -ne 0 ] && [ "${CDATE}" -ge "${firstday}" ]; then echo "$(echo "${TARCMD}" | tr 'a-z' 'A-Z') ${CDATE} gfsmos.tar failed" @@ -237,7 +243,7 @@ fi shopt -s extglob for targrp in ${targrp_list}; do set +e - ${TARCMD} -P -cvf "${ATARDIR}"/"${CDATE}"/"${targrp}".tar $(cat "${ARCH_LIST}"/"${targrp}".txt) + ${TARCMD} -P -cvf "${tar_dir}"/"${CDATE}"/"${targrp}".tar $(cat "${ARCH_LIST}"/"${targrp}".txt) status=$? if [ "${status}" -ne 0 ] && [ "${CDATE}" -ge "${firstday}" ]; then echo "$(echo "${TARCMD}" | tr 'a-z' 'A-Z') ${CDATE} ${targrp}.tar failed" diff --git a/scripts/exglobal_globus_xfer.sh b/scripts/exglobal_globus_xfer.sh new file mode 100755 index 00000000000..14a5e3d5df6 --- /dev/null +++ b/scripts/exglobal_globus_xfer.sh @@ -0,0 +1,161 @@ +#! /usr/bin/env bash + +source "${HOMEgfs}/ush/preamble.sh" + +############################################## +# Begin JOB SPECIFIC work +############################################## + +############################################################### +# Check that data was archived locally +if [[ ${LOCALARCH} != "YES" ]]; then + echo "Local archival disabled, no data to send via globus!" + status=1 + exit "${status}" +fi + +############################################################### + +# Send the files to be archived via globus and retain the task ID +local_target="${LOCAL_GLOBUS_ADDR}:${ATARDIRloc}/${CDATE}" +remote_target_dir="/collab1/data/${REMOTE_USERNAME}/${PSLOT}/${CDATE}" +globus_target="${REMOTE_GLOBUS_ADDR}:${remote_target_dir}" + +rm -f send_list +for file in $(find ${ATARDIRloc}/${CDATE} -name "*${RUN}*"); do + echo "${file} $(basename ${file})" >> send_list +done + +${GLOBUS_XFR} --batch "send_list" ${local_target} ${globus_target} > globus_output + +status=$? +if [ $status -ne 0 ]; then + echo "Globus data transfer failed. Double check Globus IDs and make sure you're logged in and endpoints are activated." + exit $status +fi + +#Get the globus task number and wait until complete +task_id=$(cat globus_output | grep "Task ID" | sed "s/Task ID: \(.*\)/\1/") + +${GLOBUS_WAIT} "${task_id}" + +status=$? +if [ ${status} -ne 0 ]; then + echo "Globus data transfer failed after initialization." + exit ${status} +fi + +#Remove previous copy of the hpss log file used to notify that the job is complete, if it exists +remote_inv_target_dir="/collab1/data/${REMOTE_USERNAME}/inventory" +hpss_log_base=hpss.${PSLOT}.${CDATE}.log +hpss_log="${remote_inv_target_dir}/${hpss_log_base}" +tmp_log="${remote_target_dir}/tmp.${PSLOT}.${CDATE}.log" +globus_target="${REMOTE_GLOBUS_ADDR}:${remote_target_dir}" +${GLOBUS_RM} ${REMOTE_GLOBUS_ADDR}:${hpss_log} > rm_hpsslog_output + +${GLOBUS_WAIT} "${task_id}" + +status=$? +if [ ${status} -ne 0 ]; then + echo "Globus data transfer failed after initialization." + exit ${status} +fi + +######Generate a script to push the archives that were just sent to the remote server (Niagara) onward to HPSS +inv_fname="inventory.${PSLOT}.${CDATE}" +loc_inv="${ATARDIRloc}/${inv_fname}" +targ_inv="${remote_inv_target_dir}/${inv_fname}" +file_list=$(find ${ATARDIRloc}/${CDATE} -type f) + +rm -f ${loc_inv} +touch ${loc_inv} +echo "#!/usr/bin/bash" >> ${loc_inv} +echo "source /etc/bashrc" >> ${loc_inv} +echo "machine=${machine}" >> ${loc_inv} +echo "module load hpss" >> ${loc_inv} +echo "hsi \"mkdir -p ${ATARDIR}/${CDATE}\" >> ${tmp_log} 2>&1" >> ${loc_inv} + +for file in ${file_list}; do + sent_fname=$(basename ${file}) + echo "hsi put ${remote_target_dir}/${sent_fname} : ${ATARDIR}/${CDATE}/${sent_fname} >> ${tmp_log} 2>&1" >> ${loc_inv} + echo "if [[ \$? != 0 ]]; then" >> ${loc_inv} + echo " echo 'Failed to send ${sent_fname} to HPSS, aborting' >> ${tmp_log} 2>&1" >> ${loc_inv} + echo " exit 33" >> ${loc_inv} + echo "fi" >> ${loc_inv} +done +echo "mv ${tmp_log} ${hpss_log}" >> ${loc_inv} + +#Send the inventory HPSS script and retain the task number +local_target="${LOCAL_GLOBUS_ADDR}:${loc_inv}" +globus_target="${REMOTE_GLOBUS_ADDR}:${targ_inv}" +${GLOBUS_XFR} ${local_target} ${globus_target} > globus_inv_output + +status=$? +if [ ${status} -ne 0 ]; then + echo "Globus HPSS script transfer failed." + exit ${status} +fi + +#Send the HPSS push script and retain the task number +local_target="${LOCAL_GLOBUS_ADDR}:${USHgfs}/push_inv_hpss.sh" +globus_target="${REMOTE_GLOBUS_ADDR}:${remote_inv_target_dir}/push_inv_hpss.sh" +${GLOBUS_XFR} ${local_target} ${globus_target} > globus_push_output + +#Get the globus task numbers and wait until complete +task_id_inv=$(cat globus_inv_output | grep "Task ID" | sed "s/Task ID: \(.*\)/\1/") +task_id_push=$(cat globus_push_output | grep "Task ID" | sed "s/Task ID: \(.*\)/\1/") + +${GLOBUS_WAIT} "${task_id_inv}" +${GLOBUS_WAIT} "${task_id_push}" + +status=$? +if [ ${status} -ne 0 ]; then + echo "Globus HPSS script transfer failed after creation." + exit ${status} +fi + +#Look for the notification file on the remote system indicating the HPSS transfer(s) are complete +globus_target="${REMOTE_GLOBUS_ADDR}:${remote_inv_target_dir}" +transfer_complete=0 +set +e +count=0 +check_inv=1 +while [[ ${transfer_complete} = 0 ]]; do + sleep 30s + globus_glob=$(${GLOBUS_LS} ${globus_target}) + for file in ${globus_glob}; do + found=$( echo ${file} | grep ${hpss_log_base} | wc -l) + if [[ ${found} = 1 ]]; then + transfer_complete=1 + elif [[ ${found} > 1 ]]; then + echo "Too many matching HPSS logs in remote directory" + exit 2 + fi + done + + [[ check_inv = 0 ]] && continue + + count=$((count+1)) + #If we get over 10 minutes and the inventory files are still present, there is likely a problem with cron + if [[ $count -gt 20 ]]; then + inv_list=$(${GLOBUS_LS} ${globus_target}) + for file in ${inv_list}; do + inv_count=$(echo "${file}" | grep ${inv_fname} | wc -l) + if [[ inv_count != 0 ]]; then + echo "Remote script file has not been touched, cron likely not activated on remote server" + echo "If the remote is Niagara, enter a crontab entry like the following and try again" + echo "*/5 * * * * [[ -e /collab1/data/$LOGNAME/inventory/push_inv_hpss.sh ]] && bash /collab1/data/$LOGNAME/inventory/push_inv_hpss.sh" + exit 3 + fi + done + #If it has moved, then the HPSS transfer is ongoing; skip this if loop hereafter. + check_inv=0 + fi + +done +set_strict + +#Clean up +rm -f globus_output + +exit 0 diff --git a/ush/push_inv_hpss.sh b/ush/push_inv_hpss.sh new file mode 100644 index 00000000000..50b8172f198 --- /dev/null +++ b/ush/push_inv_hpss.sh @@ -0,0 +1,28 @@ +#!/usr/bin/bash +# push_inv_to_hpss.sh +# Purpose: Executes inventory scripts sent to Niagara via globus jobs +# to facilitate pushing archives to HPSS from systems that do +# not have HPSS connections. + +set -eu + +#Move inventory scripts to a temporary directory and then execute them +#so they are not executed by subsequent cron calls +work_dir=$(mktemp -d) + +count=0 +for script in /collab1/data/${LOGNAME}/inventory/inventory*; do + if [[ -e ${script} ]]; then + mv ${script} ${work_dir} + count=$((count+1)) + fi +done + +if [[ $count -gt 0 ]]; then + #Execute the scripts + for script in ${work_dir}/*; do + [[ ${script} ]] && bash ${script} + done +fi + +exit 0 diff --git a/workflow/applications.py b/workflow/applications.py index 717940a1bda..886e660a5f1 100644 --- a/workflow/applications.py +++ b/workflow/applications.py @@ -104,6 +104,7 @@ def __init__(self, conf: Configuration) -> None: self.do_gempak = _base.get('DO_GEMPAK', False) self.do_awips = _base.get('DO_AWIPS', False) self.do_wafs = _base.get('WAFSF', False) + self.do_globus = _base.get('DO_GLOBUS', False) self.do_vrfy = _base.get('DO_VRFY', True) self.do_metp = _base.get('DO_METP', False) self.do_jedivar = _base.get('DO_JEDIVAR', False) @@ -188,6 +189,9 @@ def _cycled_configs(self): configs += ['sfcanl', 'analcalc', 'fcst', 'post', 'vrfy', 'arch'] + if self.do_globus: + configs += ['globus'] + if self.do_gldas: configs += ['gldas'] @@ -240,6 +244,9 @@ def _forecast_only_configs(self): configs += ['arch'] + if self.do_globus: + configs += ['globus'] + if self.model_app in ['S2S', 'S2SW', 'S2SWA', 'NG-GODAS']: configs += ['coupled_ic'] else: @@ -353,6 +360,9 @@ def _get_cycled_task_names(self): gdas_gfs_common_cleanup_tasks = ['arch'] + if self.do_globus: + gdas_gfs_common_cleanup_tasks += ['globus'] + if self.do_jedivar: gdas_gfs_common_tasks_before_fcst += ['atmanalprep', 'atmanalrun', 'atmanalpost'] else: @@ -515,6 +525,9 @@ def _get_forecast_only_task_names(self): if self.do_wafs: tasks += ['wafs', 'wafsgcip', 'wafsgrib2', 'wafsgrib20p25', 'wafsblending', 'wafsblending0p25'] - tasks += ['arch'] # arch **must** be the last task + tasks += ['arch'] # arch **must** be the last task unless globus is used + + if self.do_globus: + tasks += ['globus'] return {f"{self._base['CDUMP']}": tasks} diff --git a/workflow/rocoto/workflow_tasks.py b/workflow/rocoto/workflow_tasks.py index 228dfb15aad..2ebb79c70ff 100644 --- a/workflow/rocoto/workflow_tasks.py +++ b/workflow/rocoto/workflow_tasks.py @@ -9,7 +9,7 @@ class Tasks: - SERVICE_TASKS = ['arch', 'earc', 'getic'] + SERVICE_TASKS = ['arch', 'earc', 'getic', 'globus'] VALID_TASKS = ['aerosol_init', 'coupled_ic', 'getic', 'init', 'prep', 'anal', 'sfcanl', 'analcalc', 'analdiag', 'gldas', 'arch', 'atmanalprep', 'atmanalrun', 'atmanalpost', @@ -19,7 +19,7 @@ class Tasks: 'atmensanalprep', 'atmensanalrun', 'atmensanalpost', 'aeroanlinit', 'aeroanlrun', 'aeroanlfinal', 'fcst', 'post', 'ocnpost', 'vrfy', 'metp', - 'postsnd', 'awips', 'gempak', + 'postsnd', 'awips', 'gempak', 'globus', 'wafs', 'wafsblending', 'wafsblending0p25', 'wafsgcip', 'wafsgrib2', 'wafsgrib20p25', 'waveawipsbulls', 'waveawipsgridded', 'wavegempak', 'waveinit', @@ -1045,6 +1045,26 @@ def arch(self): return task + def globus(self): + deps=[] + # Build dependencies on archive jobs. + dep_dict = {'type': 'task', 'name': f'{self.cdump}arch'} + deps.append(rocoto.add_dependency(dep_dict)) + # Add earc* dependency + if self.app_config.do_hybvar and self.cdump in ['gdas']: + dep_dict = {'type': 'metatask', 'name': 'gdaseamn'} + deps.append(rocoto.add_dependency(dep_dict)) + + dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) + + cycledef = 'gdas_half,gdas' if self.cdump in ['gdas'] else self.cdump + + resources = self.get_resource('globus') + task = create_wf_task('globus', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies, + cycledef=cycledef) + + return task + # Start of ensemble tasks def eobs(self): deps = []