Skip to content

Fts3

Fts3TransferStatusReport(external_host, request_id, request=None)

_find_used_source_rse(session, logger)

For multi-source transfers, FTS has a choice between multiple sources. Find which of the possible sources FTS actually used for the transfer.

_dst_file_set_and_file_corrupted(request, dst_file) staticmethod

Returns True if the dst_file dict returned by fts was filled and its content allows to affirm that the file is corrupted.

_dst_file_set_and_file_correct(request, dst_file) staticmethod

Returns True if the dst_file dict returned by fts was filled and its content allows to affirm that the file is correct.

_is_recoverable_fts_overwrite_error(request, reason, file_metadata) classmethod

Verify the special case when FTS cannot copy a file because destination exists and overwrite is disabled, but the destination file is actually correct.

This can happen when some transitory error happened during a previous submission attempt. Hence, the transfer is correctly executed by FTS, but rucio doesn't know about it.

Returns true when the request must be marked as successful even if it was reported failed by FTS.

FTS3CompletionMessageTransferStatusReport(external_host, request_id, fts_message)

Parses FTS Completion messages received via the message queue

FTS3ApiTransferStatusReport(external_host, request_id, job_response, file_response, request=None)

Parses FTS api response

FTS3Transfertool(external_host, oidc_support=False, vo=None, group_bulk=1, group_policy='rule', source_strategy=None, max_time_in_queue=None, bring_online=43200, default_lifetime=172800, archive_timeout_override=None, logger=logging.log)

FTS3 implementation of a Rucio transfertool

Initializes the transfertool

Parameters:

Name Type Description Default
external_host str

The external host where the transfertool API is running

required

_pick_fts_servers(source_rse, dest_rse) classmethod

Pick fts servers to use for submission between the two given rse

submit(transfers, job_params, timeout=None)

Submit transfers to FTS3 via JSON.

Parameters:

Name Type Description Default
files

List of dictionaries describing the file transfers.

required
job_params dict[str, str]

Dictionary containing key/value pairs, for all transfers.

required
timeout Optional[int]

Timeout in seconds.

None

Returns:

Type Description
str

FTS transfer identifier.

cancel(transfer_ids, timeout=None)

Cancel transfers that have been submitted to FTS3.

Parameters:

Name Type Description Default
transfer_ids Sequence[str]

FTS transfer identifiers as list of strings.

required
timeout Optional[int]

Timeout in seconds.

None

Returns:

Type Description
dict[str, Any]

True if cancellation was successful.

update_priority(transfer_id, priority, timeout=None)

Update the priority of a transfer that has been submitted to FTS via JSON.

Parameters:

Name Type Description Default
transfer_id str

FTS transfer identifier as a string.

required
priority int

FTS job priority as an integer from 1 to 5.

required
timeout Optional[int]

Timeout in seconds.

None

Returns:

Type Description
dict[str, Any]

True if update was successful.

query(transfer_ids, details=False, timeout=None)

Query the status of a transfer in FTS3 via JSON.

Parameters:

Name Type Description Default
transfer_ids Sequence[str]

FTS transfer identifiers as list of strings.

required
details bool

Switch if detailed information should be listed.

False
timeout Optional[int]

Timeout in seconds.

None

Returns:

Type Description
Union[Optional[dict[str, Any]], list[dict[str, Any]]]

Transfer status information as a list of dictionaries.

whoami()

Returns credential information from the FTS3 server.

Returns:

Type Description
dict[str, Any]

Credentials as stored by the FTS3 server as a dictionary.

version()

Returns FTS3 server information.

Returns:

Type Description
dict[str, Any]

FTS3 server information as a dictionary.

bulk_query(requests_by_eid, timeout=None)

Query the status of a bulk of transfers in FTS3 via JSON.

Parameters:

Name Type Description Default
requests_by_eid dict[str, dict[str, dict[str, Any]]]

dictionary {external_id1: {request_id1: request1, ...}, ...} of request to be queried

required

Returns:

Type Description
dict[str, Any]

Transfer status information as a dictionary.

list_se_status()

Get the list of banned Storage Elements.

Returns:

Type Description
dict[str, Any]

Detailed dictionary of banned Storage Elements.

get_se_config(storage_element)

Get the Json response for the configuration of a storage element.

Parameters:

Name Type Description Default
storage_element str

the storage element you want the configuration for.

required

Returns:

Type Description
dict[str, Any]

a Json result for the configuration of a storage element.

set_se_config(storage_element, inbound_max_active=None, outbound_max_active=None, inbound_max_throughput=None, outbound_max_throughput=None, staging=None)

Set the configuration for a storage element. Used for alleviating transfer failures due to timeout.

Parameters:

Name Type Description Default
storage_element str

The storage element to be configured

required
inbound_max_active Optional[int]

the integer to set the inbound_max_active for the SE.

None
outbound_max_active Optional[int]

the integer to set the outbound_max_active for the SE.

None
inbound_max_throughput Optional[float]

the float to set the inbound_max_throughput for the SE.

None
outbound_max_throughput Optional[float]

the float to set the outbound_max_throughput for the SE.

None
staging Optional[int]

the integer to set the staging for the operation of a SE.

None

Returns:

Type Description
dict[str, Any]

JSON post response in case of success, otherwise raise Exception.

set_se_status(storage_element, message, ban=True, timeout=None)

Ban a Storage Element. Used when a site is in downtime. One can use a timeout in seconds. In that case the jobs will wait before being cancel. If no timeout is specified, the jobs are canceled immediately

Parameters:

Name Type Description Default
storage_element str

The Storage Element that will be banned.

required
message str

The reason of the ban.

required
ban bool

Boolean. If set to True, ban the SE, if set to False unban the SE.

True
timeout Optional[int]

if None, send to FTS status 'cancel' else 'waiting' + the corresponding timeout.

None

Returns:

Type Description
int

0 in case of success, otherwise raise Exception

__get_transfer_baseid_voname()

Get transfer VO name from the external host.

__get_deterministic_id(sid)

Get deterministic FTS job id.

Parameters:

Name Type Description Default
sid str

FTS seed id.

required

Returns:

Type Description
Optional[str]

FTS transfer identifier.

__query_details(transfer_id)

Query the detailed status of a transfer in FTS3 via JSON.

Parameters:

Name Type Description Default
transfer_id str

FTS transfer identifier as a string.

required

Returns:

Type Description
Optional[dict[str, Any]]

Detailed transfer status information as a dictionary.

_scitags_ids(logger=logging.log)

Re-fetch if needed and return the scitags ids

_configured_source_strategy(activity, logger)

Retrieve from the configuration the source selection strategy for the given activity

_available_checksums(transfer)

Get checksums which can be used for file validation on the source and the destination RSE

_hop_checksum_validation_strategy(transfer, logger)

Compute the checksum validation strategy (none, source, destination or both) depending on available source and destination checksums for a single hop transfer

_path_checksum_validation_strategy(transfer_path, logger)

Compute the checksum validation strategy for the whole transfer path.

_pick_fts_checksum(transfer, path_strategy)

Pick the checksum to use for validating file integrity on this particular transfer hop. This function will only work correctly for values of 'path_strategy' which are valid for the englobing multi-hop transfer path.

Returns the checksum as a string in the format expected by the FTS bulks submission API.

_use_tokens(transfer_hop)

Whether a transfer can be performed with tokens.

In order to be so, all the involved RSEs must have it explicitly enabled and the protocol being used must be WebDAV.

build_job_params(transfer_path, bring_online=None, default_lifetime=None, archive_timeout_override=None, max_time_in_queue=None, logger=logging.log)

Prepare the job parameters which will be passed to FTS transfertool Please refer to https://fts3-docs.web.cern.ch/fts3-docs/fts-rest/docs/bulk.html#parameters for the list of parameters.

bulk_group_transfers(transfer_paths, policy='rule', group_bulk=200, source_strategy=None, max_time_in_queue=None, logger=logging.log, archive_timeout_override=None, bring_online=None, default_lifetime=None)

Group transfers in bulk based on certain criteria

Parameters:

Name Type Description Default
transfer_paths Iterable[list[DirectTransfer]]

List of transfer paths to group. Each path is a list of single-hop transfers.

required
policy str

Policy to use to group.

'rule'
group_bulk int

Bulk sizes.

200
source_strategy Optional[str]

Strategy to group sources

None
max_time_in_queue Optional[dict]

Maximum time in queue

None
archive_timeout_override Optional[int]

Override the archive_timeout parameter for any transfers with it set (0 to unset)

None
logger LoggerFunction

Optional decorated logger that can be passed from the calling daemons or servers.

log

Returns:

Type Description
list[dict[str, Any]]

List of grouped transfers.