Skip to content

Downloadclient

FileDownloadState

The state a file can be in before/while/after downloading.

BaseExtractionTool(program_name, useability_check_args, extract_args, logger=logging.log)

Initialises a extraction tool object

Parameters:

Name Type Description Default
program_name str

the name of the archive extraction program, e.g., unzip

required
useability_check_args str

the arguments of the extraction program to test if its installed, e.g., --version

required
extract_args str

the arguments that will be passed to the program for extraction

required
logger LoggerFunction

optional decorated logging.log object that can be passed from the calling daemon or client.

log

is_useable()

Checks if the extraction tool is installed and usable

Returns:

Type Description
bool

True if it is usable otherwise False

try_extraction(archive_file_path, file_to_extract, dest_dir_path)

Calls the extraction program to extract a file from an archive

Parameters:

Name Type Description Default
archive_file_path str

path to the archive

required
file_to_extract str

file name to extract from the archive

required
dest_dir_path str

destination directory where the extracted file will be stored

required

Returns:

Type Description
bool

True on success otherwise False

DownloadClient(client=None, logger=None, tracing=True, check_admin=False, check_pcache=False)

Initialises the basic settings for an DownloadClient object

Parameters:

Name Type Description Default
client Optional[Client]

Optional: rucio.client.client.Client object. If None, a new object will be created.

None
external_traces

Optional: reference to a list where traces can be added

required
logger Optional[LoggerFunction]

Optional: logging.Logger object. If None, default logger will be used.

None

download_pfns(items, num_threads=2, trace_custom_fields=None, traces_copy_out=None, deactivate_file_download_exceptions=False)

Download items with a given PFN. This function can only download files, no datasets.

Parameters:

Name Type Description Default
items list[dict[str, Any]]

List of dictionaries. Each dictionary describing a file to download. Keys: pfn - PFN string of this file did - DID string of this file (e.g. 'scope:file.name'). Wildcards are not allowed rse - rse name (e.g. 'CERN-PROD_DATADISK'). RSE Expressions are not allowed base_dir - Optional: Base directory where the downloaded files will be stored. (Default: '.') no_subdir - Optional: If true, files are written directly into base_dir. (Default: False) adler32 - Optional: The adler32 checmsum to compare the downloaded files adler32 checksum with md5 - Optional: The md5 checksum to compare the downloaded files md5 checksum with transfer_timeout - Optional: Timeout time for the download protocols. (Default: None) check_local_with_filesize_only - Optional: If true, already downloaded files will not be validated by checksum.

required
num_threads int

Suggestion of number of threads to use for the download. It will be lowered if it's too high.

2
trace_custom_fields Optional[dict[str, Any]]

Custom key value pairs to send with the traces

None
traces_copy_out Optional[list[dict[str, Any]]]

reference to an external list, where the traces should be uploaded

None
deactivate_file_download_exceptions bool

Boolean, if file download exceptions shouldn't be raised

False

Returns:

Type Description
list[dict[str, Any]]

a list of dictionaries with an entry for each file, containing the input options, the did, and the clientState clientState can be one of the following: ALREADY_DONE, DONE, FILE_NOT_FOUND, FAIL_VALIDATE, FAILED

Raises:

Type Description
InputValidationError

if one of the input items is in the wrong format

NoFilesDownloaded

if no files could be downloaded

NotAllFilesDownloaded

if not all files could be downloaded

RucioException

if something unexpected went wrong during the download

download_dids(items, num_threads=2, trace_custom_fields=None, traces_copy_out=None, deactivate_file_download_exceptions=False, sort=None)

Download items with given DIDs. This function can also download datasets and wildcarded DIDs.

Parameters:

Name Type Description Default
items list[dict[str, Any]]

List of dictionaries. Each dictionary describing an item to download. Keys: did - DID string of this file (e.g. 'scope:file.name') filters - Filter to select DIDs for download. Optional if DID is given rse - Optional: rse name (e.g. 'CERN-PROD_DATADISK') or rse expression from where to download impl - Optional: name of the protocol implementation to be used to download this item. no_resolve_archives - Optional: bool indicating whether archives should not be considered for download (Default: False) resolve_archives - Deprecated: Use no_resolve_archives instead force_scheme - Optional: force a specific scheme to download this item. (Default: None) base_dir - Optional: base directory where the downloaded files will be stored. (Default: '.') no_subdir - Optional: If true, files are written directly into base_dir. (Default: False) nrandom - Optional: if the DID addresses a dataset, nrandom files will be randomly chosen for download from the dataset ignore_checksum - Optional: If true, skips the checksum validation between the downloaded file and the rucio catalouge. (Default: False) transfer_timeout - Optional: Timeout time for the download protocols. (Default: None) transfer_speed_timeout - Optional: Minimum allowed transfer speed (in KBps). Ignored if transfer_timeout set. Otherwise, used to compute default timeout (Default: 500) check_local_with_filesize_only - Optional: If true, already downloaded files will not be validated by checksum.

required
num_threads int

Suggestion of number of threads to use for the download. It will be lowered if it's too high.

2
trace_custom_fields Optional[dict[str, Any]]

Custom key value pairs to send with the traces.

None
traces_copy_out Optional[list[dict[str, Any]]]

reference to an external list, where the traces should be uploaded

None
deactivate_file_download_exceptions bool

Boolean, if file download exceptions shouldn't be raised

False
sort Optional[SORTING_ALGORITHMS_LITERAL]

Select best replica by replica sorting algorithm. Available algorithms: geoip - based on src/dst IP topographical distance

None

Returns:

Type Description
list[dict[str, Any]]

a list of dictionaries with an entry for each file, containing the input options, the did, and the clientState

Raises:

Type Description
InputValidationError

if one of the input items is in the wrong format

NoFilesDownloaded

if no files could be downloaded

NotAllFilesDownloaded

if not all files could be downloaded

RucioException

if something unexpected went wrong during the download

Download items using a given metalink file.

Parameters:

Name Type Description Default
item dict[str, Any]

dictionary describing an item to download. Keys: base_dir - Optional: base directory where the downloaded files will be stored. (Default: '.') no_subdir - Optional: If true, files are written directly into base_dir. (Default: False) ignore_checksum - Optional: If true, skips the checksum validation between the downloaded file and the rucio catalouge. (Default: False) transfer_timeout - Optional: Timeout time for the download protocols. (Default: None) check_local_with_filesize_only - Optional: If true, already downloaded files will not be validated by checksum.

required
num_threads int

Suggestion of number of threads to use for the download. It will be lowered if it's too high.

2
trace_custom_fields Optional[dict[str, Any]]

Custom key value pairs to send with the traces.

None
traces_copy_out Optional[list[dict[str, Any]]]

reference to an external list, where the traces should be uploaded

None
deactivate_file_download_exceptions bool

Boolean, if file download exceptions shouldn't be raised

False

Returns:

Type Description
list[dict[str, Any]]

a list of dictionaries with an entry for each file, containing the input options, the did, and the clientState

Raises:

Type Description
InputValidationError

if one of the input items is in the wrong format

NoFilesDownloaded

if no files could be downloaded

NotAllFilesDownloaded

if not all files could be downloaded

RucioException

if something unexpected went wrong during the download

_download_multithreaded(input_items, num_threads, trace_custom_fields=None, traces_copy_out=None)

Starts an appropriate number of threads to download items from the input list. (This function is meant to be used as class internal only)

Parameters:

Name Type Description Default
input_items list[dict[str, Any]]

list containing the input items to download

required
num_threads int

suggestion of how many threads should be started

required
trace_custom_fields Optional[dict[str, Any]]

Custom key value pairs to send with the traces

None
traces_copy_out Optional[list[dict[str, Any]]]

reference to an external list, where the traces should be uploaded

None

Returns:

Type Description
list[dict[str, Any]]

list with output items as dictionaries

_download_worker(input_queue, output_queue, trace_custom_fields, traces_copy_out, log_prefix)

This function runs as long as there are items in the input queue, downloads them and stores the output in the output queue. (This function is meant to be used as class internal only)

Parameters:

Name Type Description Default
input_queue Queue

queue containing the input items to download

required
output_queue Queue

queue where the output items will be stored

required
trace_custom_fields dict[str, Any]

Custom key value pairs to send with the traces

required
traces_copy_out Optional[list[dict[str, Any]]]

reference to an external list, where the traces should be uploaded

required
log_prefix str

string that will be put at the beginning of every log message

required

_compute_actual_transfer_timeout(item) staticmethod

Merge the two options related to timeout into the value which will be used for protocol download.

Parameters:

Name Type Description Default
item dict[str, Any]

dictionary that describes the item to download

required

Returns:

Type Description
int

timeout in seconds

_download_item(item, trace, traces_copy_out, log_prefix='')

Downloads the given item and sends traces for success/failure. (This function is meant to be used as class internal only)

Parameters:

Name Type Description Default
item dict[str, Any]

dictionary that describes the item to download

required
trace dict[str, Any]

dictionary representing a pattern of trace that will be send

required
traces_copy_out Optional[list[dict[str, Any]]]

reference to an external list, where the traces should be uploaded

required
log_prefix str

string that will be put at the beginning of every log message

''

Returns:

Type Description
dict[str, Any]

dictionary with all attributes from the input item and a clientState attribute

download_aria2c(items, trace_custom_fields=None, filters=None, deactivate_file_download_exceptions=False, sort=None)

Uses aria2c to download the items with given DIDs. This function can also download datasets and wildcarded DIDs. It only can download files that are available via https/davs. Aria2c needs to be installed and X509_USER_PROXY needs to be set!

Parameters:

Name Type Description Default
items list[dict[str, Any]]

List of dictionaries. Each dictionary describing an item to download. Keys: did - DID string of this file (e.g. 'scope:file.name'). Wildcards are not allowed rse - Optional: rse name (e.g. 'CERN-PROD_DATADISK') or rse expression from where to download base_dir - Optional: base directory where the downloaded files will be stored. (Default: '.') no_subdir - Optional: If true, files are written directly into base_dir. (Default: False) nrandom - Optional: if the DID addresses a dataset, nrandom files will be randomly chosen for download from the dataset ignore_checksum - Optional: If true, skips the checksum validation between the downloaded file and the rucio catalouge. (Default: False) check_local_with_filesize_only - Optional: If true, already downloaded files will not be validated by checksum.

required
trace_custom_fields Optional[dict[str, Any]]

Custom key value pairs to send with the traces

None
filters Optional[dict[str, Any]]

dictionary containing filter options

None
deactivate_file_download_exceptions bool

Boolean, if file download exceptions shouldn't be raised

False
sort Optional[SORTING_ALGORITHMS_LITERAL]

Select best replica by replica sorting algorithm. Available algorithms: geoip - based on src/dst IP topographical distance

None

Returns:

Type Description
list[dict[str, Any]]

a list of dictionaries with an entry for each file, containing the input options, the did, and the clientState

Raises:

Type Description
InputValidationError

if one of the input items is in the wrong format

NoFilesDownloaded

if no files could be downloaded

NotAllFilesDownloaded

if not all files could be downloaded

RucioException

if something went wrong during the download (e.g. aria2c could not be started)

_start_aria2c_rpc(rpc_secret)

Starts aria2c in RPC mode as a subprocess. Also creates the RPC proxy instance. (This function is meant to be used as class internal only)

Parameters:

Name Type Description Default
rpc_secret str

the secret for the RPC proxy

required

Returns:

Type Description
tuple[Popen, ServerProxy]

a tuple with the process and the rpc proxy objects

Raises:

Type Description
RucioException

if the process or the proxy could not be created

_download_items_aria2c(items, aria_rpc, rpc_auth, trace_custom_fields=None)

Uses aria2c to download the given items. Aria2c needs to be started as RPC background process first and a RPC proxy is needed. (This function is meant to be used as class internal only)

Parameters:

Name Type Description Default
items list[dict[str, Any]]

list of dictionaries containing one dict for each file to download

required
aria_rcp

RPCProxy to the aria2c process

required
rpc_auth str

the rpc authentication token

required
trace_custom_fields Optional[dict[str, Any]]

Custom key value pairs to send with the traces

None

Returns:

Type Description
list[dict[str, Any]]

a list of dictionaries with an entry for each file, containing the input options, the did, and the clientState

_resolve_one_item_dids(item)

Resolve scopes or wildcard DIDs to lists of full did names:

Parameters:

Name Type Description Default
item dict[str, Any]

One input item

required

_resolve_and_merge_input_items(input_items, sort=None)

This function takes the input items given to download_dids etc. and resolves the sources.

  • It first performs a list_dids call to dereference any wildcards and retrieve DID stats (size, length, type).
  • Next, input items are grouped together by common list_replicas options. For each group, a single list_replicas call is performed.
  • The resolved File DIDs with sources are finally mapped back to initial input items to be able to correctly retrieve download options (timeout, destination directories, etc)

Parameters:

Name Type Description Default
input_items list[dict[str, Any]]

List of dictionaries. Each dictionary describing an input item

required

Returns:

Type Description
tuple[dict[str, Any], list[dict[str, Any]]]

a tuple: - a dictionary that maps the dereferenced(w/o wildcards) input DIDs to a list of input items - and a list with a dictionary for each file DID which has to be downloaded

Raises:

Type Description
InputValidationError

if one of the input items is in the wrong format

_options_from_input_items(input_items)

Best-effort generation of download options from multiple input items which resolve to the same file DID. This is done to download each file DID only once, even if it is requested multiple times via overlapping datasets and/or wildcard resolutions in distinct input items.

Some options can be easily merged. For example: multiple base_dir are all appended to a list. As a result, the file is downloaded once and copied to all desired destinations. Other options are not necessarily compatible. For example, two items requesting two different values for download timeout. We make our best to merge the options in such cases.

_prepare_items_for_download(did_to_input_items, file_items)

Optimises the amount of files to download (This function is meant to be used as class internal only)

Parameters:

Name Type Description Default
did_to_input_items dict[str, Any]

dictionary that maps resolved input DIDs to input items

required
file_items list[dict[str, Any]]

list of dictionaries. Each dictionary describes a File DID to download

required

Returns:

Type Description
list[dict[str, Any]]

list of dictionaries. Each dictionary describes an element to download

Raises:

Type Description
InputValidationError

if the given input is not valid or incomplete

_split_did_str(did_str)

Splits a given DID string (e.g. 'scope1:name.file') into its scope and name part (This function is meant to be used as class internal only)

Parameters:

Name Type Description Default
did_str str

the DID string that will be split

required

Returns:

Type Description
tuple[str, str]

the scope- and name part of the given DID

Raises:

Type Description
InputValidationError

if the given DID string is not valid

_prepare_dest_dir(base_dir, dest_dir_name, no_subdir)

Builds the final destination path for a file and creates the destination directory if it's not existent. (This function is meant to be used as class internal only)

Parameters:

Name Type Description Default
base_dir str

base directory part

required
dest_dir_name str

name of the destination directory

required
no_subdir Optional[bool]

if no subdirectory should be created

required

Returns:

Type Description
str

the absolute path of the destination directory

_check_output(output_items, deactivate_file_download_exceptions=False)

Checks if all files were successfully downloaded (This function is meant to be used as class internal only)

Parameters:

Name Type Description Default
output_items list[dict[str, Any]]

list of dictionaries describing the downloaded files

required
deactivate_file_download_exceptions bool

Boolean, if file download exceptions shouldn't be raised

False

Returns:

Type Description
list[dict[str, Any]]

output_items list

Raises:

Type Description
NoFilesDownloaded
NotAllFilesDownloaded

_send_trace(trace)

Checks if sending trace is allowed and send the trace.

Parameters:

Name Type Description Default
trace dict[str, Any]

the trace

required

preferred_impl(sources)

Finds the optimum protocol impl preferred by the client and supported by the remote RSE.

Parameters:

Name Type Description Default
sources list[dict[str, Any]]

List of sources for a given DID

required

Raises:

Type Description
RucioException(msg)

general exception with msg for more details.