diff --git a/cdsodatacli/download.py b/cdsodatacli/download.py index 0b976e6..5500c3e 100644 --- a/cdsodatacli/download.py +++ b/cdsodatacli/download.py @@ -8,6 +8,7 @@ import shutil import random import pandas as pd +from requests.exceptions import ChunkedEncodingError from concurrent.futures import ThreadPoolExecutor, as_completed import numpy as np import traceback @@ -26,7 +27,8 @@ from cdsodatacli.utils import conf, test_safe_archive, test_safe_spool from collections import defaultdict -chunksize = 4096 +# chunksize = 4096 +chunksize = 8192 # like in the CDSE example # def CDS_Odata_download_one_product(session, headers, url, output_filepath): # """ @@ -94,23 +96,36 @@ def CDS_Odata_download_one_product_v2( # output_filepath_tmp = ( # output_filepath.replace(conf["spool"], conf["pre_spool"]) + ".tmp" # ) - output_filepath_tmp = os.path.join(conf["pre_spool"],os.path.basename(output_filepath)+ ".tmp") + output_filepath_tmp = os.path.join( + conf["pre_spool"], os.path.basename(output_filepath) + ".tmp" + ) safename_base = os.path.basename(output_filepath).replace(".zip", "") with open(output_filepath_tmp, "wb") as f: logging.debug("Downloading %s" % output_filepath) response = session.get(url, headers=headers, stream=True) status = response.status_code status_meaning = response.reason + # Check for 'Transfer-Encoding: chunked' + if ( + "Transfer-Encoding" in response.headers + and response.headers["Transfer-Encoding"] == "chunked" + ): + logging.warning( + "Server is using 'Transfer-Encoding: chunked'. Content length may not be accurate." + ) if response.ok: total_length = int( int(response.headers.get("content-length")) / 1000 / 1000 ) logging.debug("total_length : %s Mo", total_length) - - for chunk in response.iter_content(chunk_size=8192): - if chunk: - f.write(chunk) - if not response.ok and os.path.exists(output_filepath_tmp): + try: + for chunk in response.iter_content(chunk_size=chunksize): + if chunk: + f.write(chunk) + except ChunkedEncodingError as e: + status = -1 + status_meaning = "ChunkedEncodingError" + if (not response.ok or status == -1) and os.path.exists(output_filepath_tmp): logging.debug("remove empty file %s", output_filepath_tmp) os.remove(output_filepath_tmp) elapsed_time = time.time() - t0 @@ -250,7 +265,7 @@ def download_list_product_multithread_v2( login=login, date_generation_access_token=date_generation_access_token, ) - logging.info('remove session semaphore for %s',login) + logging.info("remove session semaphore for %s", login) remove_semaphore_session_file( session_dir=conf["active_session_directory"], safename=safename_base, @@ -272,7 +287,7 @@ def download_list_product_multithread_v2( else: df2.loc[(df2["safe"] == safename_base), "status"] = -1 errors_per_account[login] += 1 - logging.info('error found for %s meaning %s',login,status_meaning) + logging.info("error found for %s meaning %s", login, status_meaning) # df2["status"][df2["safe"] == safename_base] = -1 # download in error cpt["status_%s" % status_meaning] += 1 @@ -374,7 +389,7 @@ def download_list_product( access_token, date_generation_access_token, specific_account, - path_semphore_token + path_semphore_token, ) = get_bearer_access_token(specific_account=specific_account) headers = {"Authorization": "Bearer %s" % access_token} session.headers.update(headers) @@ -454,7 +469,7 @@ def download_list_product_sequential( """ assert len(list_id) == len(list_safename) - logins_group = 'logins' + logins_group = "logins" cpt = defaultdict(int) cpt["total_product_to_download"] = len(list_id) df = pd.DataFrame( @@ -463,7 +478,10 @@ def download_list_product_sequential( df2, cpt = filter_product_already_present(cpt, df, outputdir) df_products_downloadable = get_sessions_download_available( - df2, hideProgressBar=hideProgressBar, blacklist=None,logins_group=logins_group, + df2, + hideProgressBar=hideProgressBar, + blacklist=None, + logins_group=logins_group, ) logging.info("product downloadable: %s", len(df_products_downloadable)) df_products_downloadable["status"] = 0 @@ -506,8 +524,10 @@ def download_list_product_sequential( access_token, date_generation_access_token, login, - path_semaphore_token - ) = get_bearer_access_token(specific_account=None,account_group=logins_group) + path_semaphore_token, + ) = get_bearer_access_token( + specific_account=None, account_group=logins_group + ) headers = {"Authorization": "Bearer %s" % access_token} session.headers.update(headers) else: diff --git a/cdsodatacli/fetch_access_token.py b/cdsodatacli/fetch_access_token.py index b396fd3..11e214f 100644 --- a/cdsodatacli/fetch_access_token.py +++ b/cdsodatacli/fetch_access_token.py @@ -33,12 +33,13 @@ def get_bearer_access_token(quiet=True, specific_account=None, account_group="lo prefix = "curl -s " else: prefix = "curl " + option_insecure = ' --insecure' # added because workers have deprecated SSL certificates cmd = ( prefix + " --location --request POST " + url_identity - + " --header 'Content-Type: application/x-www-form-urlencoded' --data-urlencode 'grant_type=password' --data-urlencode 'username=%s' --data-urlencode 'password=%s' --data-urlencode 'client_id=cdse-public'" - % (login, passwd) + + " --header 'Content-Type: application/x-www-form-urlencoded' --data-urlencode 'grant_type=password' --data-urlencode 'username=%s' --data-urlencode 'password=%s' --data-urlencode 'client_id=cdse-public' %s" + % (login, passwd,option_insecure) ) logging.debug("cmd: %s", cmd)