Downloadclient
FileDownloadState
¶
The state a file can be in before/while/after downloading.
BaseExtractionTool(program_name, useability_check_args, extract_args, logger=logging.log)
¶
Initialises a extraction tool object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
program_name
|
str
|
the name of the archive extraction program, e.g., unzip |
required |
useability_check_args
|
str
|
the arguments of the extraction program to test if its installed, e.g., --version |
required |
extract_args
|
str
|
the arguments that will be passed to the program for extraction |
required |
logger
|
LoggerFunction
|
optional decorated logging.log object that can be passed from the calling daemon or client. |
log
|
is_useable()
¶
Checks if the extraction tool is installed and usable
Returns:
Type | Description |
---|---|
bool
|
True if it is usable otherwise False |
try_extraction(archive_file_path, file_to_extract, dest_dir_path)
¶
Calls the extraction program to extract a file from an archive
Parameters:
Name | Type | Description | Default |
---|---|---|---|
archive_file_path
|
str
|
path to the archive |
required |
file_to_extract
|
str
|
file name to extract from the archive |
required |
dest_dir_path
|
str
|
destination directory where the extracted file will be stored |
required |
Returns:
Type | Description |
---|---|
bool
|
True on success otherwise False |
DownloadClient(client=None, logger=None, tracing=True, check_admin=False, check_pcache=False)
¶
Initialises the basic settings for an DownloadClient object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
client
|
Optional[Client]
|
Optional: rucio.client.client.Client object. If None, a new object will be created. |
None
|
external_traces
|
Optional: reference to a list where traces can be added |
required | |
logger
|
Optional[LoggerFunction]
|
Optional: logging.Logger object. If None, default logger will be used. |
None
|
download_pfns(items, num_threads=2, trace_custom_fields=None, traces_copy_out=None, deactivate_file_download_exceptions=False)
¶
Download items with a given PFN. This function can only download files, no datasets.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
items
|
list[dict[str, Any]]
|
List of dictionaries. Each dictionary describing a file to download. Keys: pfn - PFN string of this file did - DID string of this file (e.g. 'scope:file.name'). Wildcards are not allowed rse - rse name (e.g. 'CERN-PROD_DATADISK'). RSE Expressions are not allowed base_dir - Optional: Base directory where the downloaded files will be stored. (Default: '.') no_subdir - Optional: If true, files are written directly into base_dir. (Default: False) adler32 - Optional: The adler32 checmsum to compare the downloaded files adler32 checksum with md5 - Optional: The md5 checksum to compare the downloaded files md5 checksum with transfer_timeout - Optional: Timeout time for the download protocols. (Default: None) check_local_with_filesize_only - Optional: If true, already downloaded files will not be validated by checksum. |
required |
num_threads
|
int
|
Suggestion of number of threads to use for the download. It will be lowered if it's too high. |
2
|
trace_custom_fields
|
Optional[dict[str, Any]]
|
Custom key value pairs to send with the traces |
None
|
traces_copy_out
|
Optional[list[dict[str, Any]]]
|
reference to an external list, where the traces should be uploaded |
None
|
deactivate_file_download_exceptions
|
bool
|
Boolean, if file download exceptions shouldn't be raised |
False
|
Returns:
Type | Description |
---|---|
list[dict[str, Any]]
|
a list of dictionaries with an entry for each file, containing the input options, the did, and the clientState clientState can be one of the following: ALREADY_DONE, DONE, FILE_NOT_FOUND, FAIL_VALIDATE, FAILED |
Raises:
Type | Description |
---|---|
InputValidationError
|
if one of the input items is in the wrong format |
NoFilesDownloaded
|
if no files could be downloaded |
NotAllFilesDownloaded
|
if not all files could be downloaded |
RucioException
|
if something unexpected went wrong during the download |
download_dids(items, num_threads=2, trace_custom_fields=None, traces_copy_out=None, deactivate_file_download_exceptions=False, sort=None)
¶
Download items with given DIDs. This function can also download datasets and wildcarded DIDs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
items
|
list[dict[str, Any]]
|
List of dictionaries. Each dictionary describing an item to download. Keys: did - DID string of this file (e.g. 'scope:file.name') filters - Filter to select DIDs for download. Optional if DID is given rse - Optional: rse name (e.g. 'CERN-PROD_DATADISK') or rse expression from where to download impl - Optional: name of the protocol implementation to be used to download this item. no_resolve_archives - Optional: bool indicating whether archives should not be considered for download (Default: False) resolve_archives - Deprecated: Use no_resolve_archives instead force_scheme - Optional: force a specific scheme to download this item. (Default: None) base_dir - Optional: base directory where the downloaded files will be stored. (Default: '.') no_subdir - Optional: If true, files are written directly into base_dir. (Default: False) nrandom - Optional: if the DID addresses a dataset, nrandom files will be randomly chosen for download from the dataset ignore_checksum - Optional: If true, skips the checksum validation between the downloaded file and the rucio catalouge. (Default: False) transfer_timeout - Optional: Timeout time for the download protocols. (Default: None) transfer_speed_timeout - Optional: Minimum allowed transfer speed (in KBps). Ignored if transfer_timeout set. Otherwise, used to compute default timeout (Default: 500) check_local_with_filesize_only - Optional: If true, already downloaded files will not be validated by checksum. |
required |
num_threads
|
int
|
Suggestion of number of threads to use for the download. It will be lowered if it's too high. |
2
|
trace_custom_fields
|
Optional[dict[str, Any]]
|
Custom key value pairs to send with the traces. |
None
|
traces_copy_out
|
Optional[list[dict[str, Any]]]
|
reference to an external list, where the traces should be uploaded |
None
|
deactivate_file_download_exceptions
|
bool
|
Boolean, if file download exceptions shouldn't be raised |
False
|
sort
|
Optional[SORTING_ALGORITHMS_LITERAL]
|
Select best replica by replica sorting algorithm. Available algorithms: |
None
|
Returns:
Type | Description |
---|---|
list[dict[str, Any]]
|
a list of dictionaries with an entry for each file, containing the input options, the did, and the clientState |
Raises:
Type | Description |
---|---|
InputValidationError
|
if one of the input items is in the wrong format |
NoFilesDownloaded
|
if no files could be downloaded |
NotAllFilesDownloaded
|
if not all files could be downloaded |
RucioException
|
if something unexpected went wrong during the download |
download_from_metalink_file(item, metalink_file_path, num_threads=2, trace_custom_fields=None, traces_copy_out=None, deactivate_file_download_exceptions=False)
¶
Download items using a given metalink file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item
|
dict[str, Any]
|
dictionary describing an item to download. Keys: base_dir - Optional: base directory where the downloaded files will be stored. (Default: '.') no_subdir - Optional: If true, files are written directly into base_dir. (Default: False) ignore_checksum - Optional: If true, skips the checksum validation between the downloaded file and the rucio catalouge. (Default: False) transfer_timeout - Optional: Timeout time for the download protocols. (Default: None) check_local_with_filesize_only - Optional: If true, already downloaded files will not be validated by checksum. |
required |
num_threads
|
int
|
Suggestion of number of threads to use for the download. It will be lowered if it's too high. |
2
|
trace_custom_fields
|
Optional[dict[str, Any]]
|
Custom key value pairs to send with the traces. |
None
|
traces_copy_out
|
Optional[list[dict[str, Any]]]
|
reference to an external list, where the traces should be uploaded |
None
|
deactivate_file_download_exceptions
|
bool
|
Boolean, if file download exceptions shouldn't be raised |
False
|
Returns:
Type | Description |
---|---|
list[dict[str, Any]]
|
a list of dictionaries with an entry for each file, containing the input options, the did, and the clientState |
Raises:
Type | Description |
---|---|
InputValidationError
|
if one of the input items is in the wrong format |
NoFilesDownloaded
|
if no files could be downloaded |
NotAllFilesDownloaded
|
if not all files could be downloaded |
RucioException
|
if something unexpected went wrong during the download |
_download_multithreaded(input_items, num_threads, trace_custom_fields=None, traces_copy_out=None)
¶
Starts an appropriate number of threads to download items from the input list. (This function is meant to be used as class internal only)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_items
|
list[dict[str, Any]]
|
list containing the input items to download |
required |
num_threads
|
int
|
suggestion of how many threads should be started |
required |
trace_custom_fields
|
Optional[dict[str, Any]]
|
Custom key value pairs to send with the traces |
None
|
traces_copy_out
|
Optional[list[dict[str, Any]]]
|
reference to an external list, where the traces should be uploaded |
None
|
Returns:
Type | Description |
---|---|
list[dict[str, Any]]
|
list with output items as dictionaries |
_download_worker(input_queue, output_queue, trace_custom_fields, traces_copy_out, log_prefix)
¶
This function runs as long as there are items in the input queue, downloads them and stores the output in the output queue. (This function is meant to be used as class internal only)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_queue
|
Queue
|
queue containing the input items to download |
required |
output_queue
|
Queue
|
queue where the output items will be stored |
required |
trace_custom_fields
|
dict[str, Any]
|
Custom key value pairs to send with the traces |
required |
traces_copy_out
|
Optional[list[dict[str, Any]]]
|
reference to an external list, where the traces should be uploaded |
required |
log_prefix
|
str
|
string that will be put at the beginning of every log message |
required |
_compute_actual_transfer_timeout(item)
staticmethod
¶
Merge the two options related to timeout into the value which will be used for protocol download.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item
|
dict[str, Any]
|
dictionary that describes the item to download |
required |
Returns:
Type | Description |
---|---|
int
|
timeout in seconds |
_download_item(item, trace, traces_copy_out, log_prefix='')
¶
Downloads the given item and sends traces for success/failure. (This function is meant to be used as class internal only)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item
|
dict[str, Any]
|
dictionary that describes the item to download |
required |
trace
|
dict[str, Any]
|
dictionary representing a pattern of trace that will be send |
required |
traces_copy_out
|
Optional[list[dict[str, Any]]]
|
reference to an external list, where the traces should be uploaded |
required |
log_prefix
|
str
|
string that will be put at the beginning of every log message |
''
|
Returns:
Type | Description |
---|---|
dict[str, Any]
|
dictionary with all attributes from the input item and a clientState attribute |
download_aria2c(items, trace_custom_fields=None, filters=None, deactivate_file_download_exceptions=False, sort=None)
¶
Uses aria2c to download the items with given DIDs. This function can also download datasets and wildcarded DIDs. It only can download files that are available via https/davs. Aria2c needs to be installed and X509_USER_PROXY needs to be set!
Parameters:
Name | Type | Description | Default |
---|---|---|---|
items
|
list[dict[str, Any]]
|
List of dictionaries. Each dictionary describing an item to download. Keys: did - DID string of this file (e.g. 'scope:file.name'). Wildcards are not allowed rse - Optional: rse name (e.g. 'CERN-PROD_DATADISK') or rse expression from where to download base_dir - Optional: base directory where the downloaded files will be stored. (Default: '.') no_subdir - Optional: If true, files are written directly into base_dir. (Default: False) nrandom - Optional: if the DID addresses a dataset, nrandom files will be randomly chosen for download from the dataset ignore_checksum - Optional: If true, skips the checksum validation between the downloaded file and the rucio catalouge. (Default: False) check_local_with_filesize_only - Optional: If true, already downloaded files will not be validated by checksum. |
required |
trace_custom_fields
|
Optional[dict[str, Any]]
|
Custom key value pairs to send with the traces |
None
|
filters
|
Optional[dict[str, Any]]
|
dictionary containing filter options |
None
|
deactivate_file_download_exceptions
|
bool
|
Boolean, if file download exceptions shouldn't be raised |
False
|
sort
|
Optional[SORTING_ALGORITHMS_LITERAL]
|
Select best replica by replica sorting algorithm. Available algorithms: |
None
|
Returns:
Type | Description |
---|---|
list[dict[str, Any]]
|
a list of dictionaries with an entry for each file, containing the input options, the did, and the clientState |
Raises:
Type | Description |
---|---|
InputValidationError
|
if one of the input items is in the wrong format |
NoFilesDownloaded
|
if no files could be downloaded |
NotAllFilesDownloaded
|
if not all files could be downloaded |
RucioException
|
if something went wrong during the download (e.g. aria2c could not be started) |
_start_aria2c_rpc(rpc_secret)
¶
Starts aria2c in RPC mode as a subprocess. Also creates the RPC proxy instance. (This function is meant to be used as class internal only)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
rpc_secret
|
str
|
the secret for the RPC proxy |
required |
Returns:
Type | Description |
---|---|
tuple[Popen, ServerProxy]
|
a tuple with the process and the rpc proxy objects |
Raises:
Type | Description |
---|---|
RucioException
|
if the process or the proxy could not be created |
_download_items_aria2c(items, aria_rpc, rpc_auth, trace_custom_fields=None)
¶
Uses aria2c to download the given items. Aria2c needs to be started as RPC background process first and a RPC proxy is needed. (This function is meant to be used as class internal only)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
items
|
list[dict[str, Any]]
|
list of dictionaries containing one dict for each file to download |
required |
aria_rcp
|
RPCProxy to the aria2c process |
required | |
rpc_auth
|
str
|
the rpc authentication token |
required |
trace_custom_fields
|
Optional[dict[str, Any]]
|
Custom key value pairs to send with the traces |
None
|
Returns:
Type | Description |
---|---|
list[dict[str, Any]]
|
a list of dictionaries with an entry for each file, containing the input options, the did, and the clientState |
_resolve_one_item_dids(item)
¶
Resolve scopes or wildcard DIDs to lists of full did names:
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item
|
dict[str, Any]
|
One input item |
required |
_resolve_and_merge_input_items(input_items, sort=None)
¶
This function takes the input items given to download_dids etc. and resolves the sources.
- It first performs a list_dids call to dereference any wildcards and retrieve DID stats (size, length, type).
- Next, input items are grouped together by common list_replicas options. For each group, a single list_replicas call is performed.
- The resolved File DIDs with sources are finally mapped back to initial input items to be able to correctly retrieve download options (timeout, destination directories, etc)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_items
|
list[dict[str, Any]]
|
List of dictionaries. Each dictionary describing an input item |
required |
Returns:
Type | Description |
---|---|
tuple[dict[str, Any], list[dict[str, Any]]]
|
a tuple: - a dictionary that maps the dereferenced(w/o wildcards) input DIDs to a list of input items - and a list with a dictionary for each file DID which has to be downloaded |
Raises:
Type | Description |
---|---|
InputValidationError
|
if one of the input items is in the wrong format |
_options_from_input_items(input_items)
¶
Best-effort generation of download options from multiple input items which resolve to the same file DID. This is done to download each file DID only once, even if it is requested multiple times via overlapping datasets and/or wildcard resolutions in distinct input items.
Some options can be easily merged. For example: multiple base_dir are all appended to a list. As a result, the file is downloaded once and copied to all desired destinations. Other options are not necessarily compatible. For example, two items requesting two different values for download timeout. We make our best to merge the options in such cases.
_prepare_items_for_download(did_to_input_items, file_items)
¶
Optimises the amount of files to download (This function is meant to be used as class internal only)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
did_to_input_items
|
dict[str, Any]
|
dictionary that maps resolved input DIDs to input items |
required |
file_items
|
list[dict[str, Any]]
|
list of dictionaries. Each dictionary describes a File DID to download |
required |
Returns:
Type | Description |
---|---|
list[dict[str, Any]]
|
list of dictionaries. Each dictionary describes an element to download |
Raises:
Type | Description |
---|---|
InputValidationError
|
if the given input is not valid or incomplete |
_split_did_str(did_str)
¶
Splits a given DID string (e.g. 'scope1:name.file') into its scope and name part (This function is meant to be used as class internal only)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
did_str
|
str
|
the DID string that will be split |
required |
Returns:
Type | Description |
---|---|
tuple[str, str]
|
the scope- and name part of the given DID |
Raises:
Type | Description |
---|---|
InputValidationError
|
if the given DID string is not valid |
_prepare_dest_dir(base_dir, dest_dir_name, no_subdir)
¶
Builds the final destination path for a file and creates the destination directory if it's not existent. (This function is meant to be used as class internal only)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
base_dir
|
str
|
base directory part |
required |
dest_dir_name
|
str
|
name of the destination directory |
required |
no_subdir
|
Optional[bool]
|
if no subdirectory should be created |
required |
Returns:
Type | Description |
---|---|
str
|
the absolute path of the destination directory |
_check_output(output_items, deactivate_file_download_exceptions=False)
¶
Checks if all files were successfully downloaded (This function is meant to be used as class internal only)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output_items
|
list[dict[str, Any]]
|
list of dictionaries describing the downloaded files |
required |
deactivate_file_download_exceptions
|
bool
|
Boolean, if file download exceptions shouldn't be raised |
False
|
Returns:
Type | Description |
---|---|
list[dict[str, Any]]
|
output_items list |
Raises:
Type | Description |
---|---|
NoFilesDownloaded
|
|
NotAllFilesDownloaded
|
|
_send_trace(trace)
¶
Checks if sending trace is allowed and send the trace.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
trace
|
dict[str, Any]
|
the trace |
required |
preferred_impl(sources)
¶
Finds the optimum protocol impl preferred by the client and supported by the remote RSE.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sources
|
list[dict[str, Any]]
|
List of sources for a given DID |
required |
Raises:
Type | Description |
---|---|
RucioException(msg)
|
general exception with msg for more details. |