from __future__ import absolute_import, division, print_function, unicode_literals
from collections import defaultdict
import csv
from datetime import datetime
from fnmatch import fnmatchcase
import hashlib
import os
import re
import time
import uuid
from io import open
import requests
from requests.exceptions import ChunkedEncodingError, ConnectionError, ReadTimeout
from hca.util import USING_PYTHON2
from hca.util.compat import glob_escape
from ..util import SwaggerClient
from ..util.exceptions import SwaggerAPIException
from .. import logger
from .upload_to_cloud import upload_to_cloud
[docs]class DSSClient(SwaggerClient):
"""
Client for the Data Storage Service API.
"""
UPLOAD_BACKOFF_FACTOR = 1.618
def __init__(self, *args, **kwargs):
super(DSSClient, self).__init__(*args, **kwargs)
self.commands += [self.download, self.download_manifest, self.upload]
[docs] def download(self, bundle_uuid, replica, version="", dest_name="",
metadata_files=('*',), data_files=('*',),
initial_retries_left=10, min_delay_seconds=0.25):
"""
Download a bundle and save it to the local filesystem as a directory.
`metadata_files` (`--metadata-files` on the CLI) are one or more shell patterns against which all metadata
files in the bundle will be matched case-sensitively. A file is considered a metadata file if the `indexed`
property in the manifest is set. If and only if a metadata file matches any of the patterns in
`metadata_files` will it be downloaded.
`data_files` (`--data-files` on the CLI) are one or more shell patterns against which all data files in the
bundle will be matched case-sensitively. A file is considered a data file if the `indexed` property in the
manifest is not set. If and only if a data file matches any of the patterns in `data_files` will it be
downloaded.
By default, all data and metadata files are downloaded. To disable the downloading of data files,
use `--data-files ''` if using the CLI (or `data_files=()` if invoking `download` programmatically). Likewise
for metadata files.
"""
if not dest_name:
dest_name = bundle_uuid
bundle = self.get_bundle(uuid=bundle_uuid, replica=replica, version=version if version else None)["bundle"]
files = {}
for file_ in bundle["files"]:
# The file name collision check is case-insensitive even if the local file system we're running on is
# case-sensitive. We do this in order to get consistent download behavior on all operating systems and
# file systems. The case of file names downloaded to a case-sensitive system will still match exactly
# what's specified in the bundle manifest. We just don't want a bundle with files 'Foo' and 'foo' to
# create two files on one system and one file on the other. Allowing this to happen would, in the best
# case, overwrite Foo with foo locally. A resumed download could produce a local file called foo that
# contains a mix of data from Foo and foo.
filename = file_.get("name", file_["uuid"]).lower()
if files.setdefault(filename, file_) is not file_:
raise ValueError("Bundle {bundle_uuid} version {bundle_version} contains multiple files named "
"'{filename}' or a case derivation thereof".format(filename=filename, **bundle))
for file_ in files.values():
file_uuid = file_["uuid"]
file_version = file_["version"]
filename = file_.get("name", file_uuid)
globs = metadata_files if file_['indexed'] else data_files
if not any(fnmatchcase(filename, glob) for glob in globs):
continue
if not os.path.isdir(dest_name):
os.makedirs(dest_name)
logger.info("File %s: Retrieving...", filename)
file_path = os.path.join(dest_name, filename)
# Attempt to download the data. If a retryable exception occurs, we wait a bit and retry again. The delay
# increases each time we fail and decreases each time we successfully read a block. We set a quota for the
# number of failures that goes up with every successful block read and down with each failure.
#
# If we can, we will attempt HTTP resume. However, we verify that the server supports HTTP resume. If the
# ranged get doesn't yield the correct header, then we start over.
delay = min_delay_seconds
retries_left = initial_retries_left
hasher = hashlib.sha256()
with open(file_path, "wb") as fh:
while True:
try:
response = self.get_file._request(
dict(uuid=file_uuid, version=file_version, replica=replica),
stream=True,
headers={
'Range': "bytes={}-".format(fh.tell())
},
)
try:
if not response.ok:
logger.error("%s", "File {}: GET FAILED.".format(filename))
logger.error("%s", "Response: {}".format(response.text))
break
consume_bytes = int(fh.tell())
server_start = 0
content_range_header = response.headers.get('Content-Range', None)
if content_range_header is not None:
cre = re.compile("bytes (\d+)-(\d+)")
mo = cre.search(content_range_header)
if mo is not None:
server_start = int(mo.group(1))
consume_bytes -= server_start
assert consume_bytes >= 0
if server_start > 0 and consume_bytes == 0:
logger.info("%s", "File {}: Resuming at {}.".format(
filename, server_start))
elif consume_bytes > 0:
logger.info("%s", "File {}: Resuming at {}. Dropping {} bytes to match".format(
filename, server_start, consume_bytes))
while consume_bytes > 0:
bytes_to_read = min(consume_bytes, 1024*1024)
content = response.iter_content(chunk_size=bytes_to_read)
chunk = next(content)
if chunk:
consume_bytes -= len(chunk)
for chunk in response.iter_content(chunk_size=1024*1024):
if chunk:
fh.write(chunk)
hasher.update(chunk)
retries_left = min(retries_left + 1, initial_retries_left)
delay = max(delay / 2, min_delay_seconds)
break
finally:
response.close()
except (ChunkedEncodingError, ConnectionError, ReadTimeout):
if retries_left > 0:
# resume!!
logger.info("%s", "File {}: GET FAILED. Attempting to resume.".format(
filename, file_path))
time.sleep(delay)
delay *= 2
retries_left -= 1
continue
raise
if hasher.hexdigest().lower() != file_["sha256"].lower():
os.remove(file_path)
logger.error("%s", "File {}: GET FAILED. Checksum mismatch.".format(filename))
raise ValueError("Expected sha256 {} Received sha256 {}".format(
file_["sha256"].lower(), hasher.hexdigest().lower()))
else:
logger.info("%s", "File {}: GET SUCCEEDED. Stored at {}.".format(filename, file_path))
[docs] def download_manifest(self, manifest, replica, initial_retries_left=10, min_delay_seconds=0.25):
"""
Process the given manifest file in TSV (tab-separated values) format and download the files referenced by it.
Each row in the manifest represents one file in DSS. The manifest must have a header row. The header row must
declare the following columns:
`bundle_uuid` - the UUID of the bundle containing the file in DSS
`bundle_version` - the version of the bundle containing the file in DSS
`file_name` - the name of the file as specified in the bundle
The TSV may have additional columns. Those columns will be ignored. The ordering of the columns is
insignificant because the TSV is required to have a header row.
"""
with open(manifest) as f:
bundles = defaultdict(set)
# unicode_literals is on so all strings are unicode. CSV wants a str so we need to jump through a hoop.
delimiter = '\t'.encode('ascii') if USING_PYTHON2 else '\t'
reader = csv.DictReader(f, delimiter=delimiter, quoting=csv.QUOTE_NONE)
for row in reader:
bundles[(row['bundle_uuid'], row['bundle_version'])].add(row['file_name'])
errors = 0
for (bundle_uuid, bundle_version), data_files in bundles.items():
data_globs = tuple(glob_escape(file_name) for file_name in data_files if file_name)
logger.info('Downloading bundle %s version %s ...', bundle_uuid, bundle_version)
try:
self.download(bundle_uuid,
replica,
version=bundle_version,
data_files=data_globs,
initial_retries_left=initial_retries_left,
min_delay_seconds=min_delay_seconds)
except Exception as e:
errors += 1
logger.warning('Failed to download bundle %s version %s from replica %s',
bundle_uuid, bundle_version, replica, exc_info=e)
if errors:
raise RuntimeError('{} bundle(s) failed to download'.format(errors))
else:
return {}
[docs] def upload(self, src_dir, replica, staging_bucket, timeout_seconds=1200):
"""
Upload a directory of files from the local filesystem and create a bundle containing the uploaded files.
This method requires the use of a client-controlled object storage bucket to stage the data for upload.
"""
bundle_uuid = str(uuid.uuid4())
version = datetime.utcnow().strftime("%Y-%m-%dT%H%M%S.%fZ")
files_to_upload, files_uploaded = [], []
for filename in os.listdir(src_dir):
full_file_name = os.path.join(src_dir, filename)
files_to_upload.append(open(full_file_name, "rb"))
logger.info("Uploading %i files from %s to %s", len(files_to_upload), src_dir, staging_bucket)
file_uuids, uploaded_keys = upload_to_cloud(files_to_upload, staging_bucket=staging_bucket, replica=replica,
from_cloud=False)
for file_handle in files_to_upload:
file_handle.close()
filenames = list(map(os.path.basename, uploaded_keys))
filename_key_list = list(zip(filenames, file_uuids, uploaded_keys))
for filename, file_uuid, key in filename_key_list:
logger.info("File %s: registering...", filename)
# Generating file data
creator_uid = self.config.get("creator_uid", 0)
source_url = "s3://{}/{}".format(staging_bucket, key)
logger.info("File %s: registering from %s -> uuid %s", filename, source_url, file_uuid)
response = self.put_file._request(dict(
uuid=file_uuid,
bundle_uuid=bundle_uuid,
version=version,
creator_uid=creator_uid,
source_url=source_url
))
files_uploaded.append(dict(name=filename, version=version, uuid=file_uuid, creator_uid=creator_uid))
if response.status_code in (requests.codes.ok, requests.codes.created):
logger.info("File %s: Sync copy -> %s", filename, version)
else:
assert response.status_code == requests.codes.accepted
logger.info("File %s: Async copy -> %s", filename, version)
timeout = time.time() + timeout_seconds
wait = 1.0
while time.time() < timeout:
try:
self.head_file(uuid=file_uuid, replica="aws", version=version)
break
except SwaggerAPIException as e:
if e.code != requests.codes.not_found:
msg = "File {}: Unexpected server response during registration"
raise RuntimeError(msg.format(filename))
time.sleep(wait)
wait = min(60.0, wait * self.UPLOAD_BACKOFF_FACTOR)
else:
# timed out. :(
raise RuntimeError("File {}: registration FAILED".format(filename))
logger.debug("Successfully uploaded file")
file_args = [{'indexed': file_["name"].endswith(".json"),
'name': file_['name'],
'version': file_['version'],
'uuid': file_['uuid']} for file_ in files_uploaded]
logger.info("%s", "Bundle {}: Registering...".format(bundle_uuid))
response = self.put_bundle(uuid=bundle_uuid,
version=version,
replica=replica,
creator_uid=creator_uid,
files=file_args)
logger.info("%s", "Bundle {}: Registered successfully".format(bundle_uuid))
return {
"bundle_uuid": bundle_uuid,
"creator_uid": creator_uid,
"replica": replica,
"version": response["version"],
"files": files_uploaded
}