Source code for hca.dss

from __future__ import absolute_import, division, print_function, unicode_literals

import hashlib
import os
import re
import time
import uuid
from io import open

import requests
from requests.exceptions import ChunkedEncodingError, ConnectionError, ReadTimeout

from ..util import SwaggerClient
from ..util.exceptions import SwaggerAPIException
from .. import logger
from .upload_to_cloud import upload_to_cloud


[docs]class DSSClient(SwaggerClient): """ Client for the Data Storage Service API. """ UPLOAD_BACKOFF_FACTOR = 1.618 def __init__(self, *args, **kwargs): super(DSSClient, self).__init__(*args, **kwargs) self.commands += [self.download, self.upload]
[docs] def download(self, bundle_uuid, replica, version="", dest_name="", initial_retries_left=10, min_delay_seconds=0.25): """ Download a bundle and save it to the local filesystem as a directory. """ if not dest_name: dest_name = bundle_uuid bundle = self.get_bundle(uuid=bundle_uuid, replica=replica, version=version if version else None)["bundle"] if not os.path.isdir(dest_name): os.makedirs(dest_name) for file_ in bundle["files"]: file_uuid = file_["uuid"] file_version = file_["version"] filename = file_.get("name", file_uuid) logger.info("File %s: Retrieving...", filename) file_path = os.path.join(dest_name, filename) # Attempt to download the data. If a retryable exception occurs, we wait a bit and retry again. The delay # increases each time we fail and decreases each time we successfully read a block. We set a quota for the # number of failures that goes up with every successful block read and down with each failure. # # If we can, we will attempt HTTP resume. However, we verify that the server supports HTTP resume. If the # ranged get doesn't yield the correct header, then we start over. delay = min_delay_seconds retries_left = initial_retries_left hasher = hashlib.sha256() with open(file_path, "wb") as fh: while True: try: response = self.get_file._request( dict(uuid=file_uuid, version=file_version, replica=replica), stream=True, headers={ 'Range': "bytes={}-".format(fh.tell()) }, ) try: if not response.ok: logger.error("%s", "File {}: GET FAILED.".format(filename)) logger.error("%s", "Response: {}".format(response.text)) break consume_bytes = int(fh.tell()) server_start = 0 content_range_header = response.headers.get('Content-Range', None) if content_range_header is not None: cre = re.compile("bytes (\d+)-(\d+)") mo = cre.search(content_range_header) if mo is not None: server_start = int(mo.group(1)) consume_bytes -= server_start assert consume_bytes >= 0 if server_start > 0 and consume_bytes == 0: logger.info("%s", "File {}: Resuming at {}.".format( filename, server_start)) elif consume_bytes > 0: logger.info("%s", "File {}: Resuming at {}. Dropping {} bytes to match".format( filename, server_start, consume_bytes)) while consume_bytes > 0: bytes_to_read = min(consume_bytes, 1024*1024) content = response.iter_content(chunk_size=bytes_to_read) chunk = next(content) if chunk: consume_bytes -= len(chunk) for chunk in response.iter_content(chunk_size=1024*1024): if chunk: fh.write(chunk) hasher.update(chunk) retries_left = min(retries_left + 1, initial_retries_left) delay = max(delay / 2, min_delay_seconds) break finally: response.close() except (ChunkedEncodingError, ConnectionError, ReadTimeout): if retries_left > 0: # resume!! logger.info("%s", "File {}: GET FAILED. Attempting to resume.".format( filename, file_path)) time.sleep(delay) delay *= 2 retries_left -= 1 continue raise if hasher.hexdigest().lower() != file_["sha256"].lower(): os.remove(file_path) logger.error("%s", "File {}: GET FAILED. Checksum mismatch.".format(filename)) raise ValueError("Expected sha256 {} Received sha256 {}".format( file_["sha256"].lower(), hasher.hexdigest().lower())) else: logger.info("%s", "File {}: GET SUCCEEDED. Stored at {}.".format(filename, file_path)) return {}
[docs] def upload(self, src_dir, replica, staging_bucket, timeout_seconds=1200): """ Upload a directory of files from the local filesystem and create a bundle containing the uploaded files. This method requires the use of a client-controlled object storage bucket to stage the data for upload. """ bundle_uuid = str(uuid.uuid4()) files_to_upload, files_uploaded = [], [] for filename in os.listdir(src_dir): full_file_name = os.path.join(src_dir, filename) files_to_upload.append(open(full_file_name, "rb")) logger.info("Uploading %i files from %s to %s", len(files_to_upload), src_dir, staging_bucket) file_uuids, uploaded_keys = upload_to_cloud(files_to_upload, staging_bucket=staging_bucket, replica=replica, from_cloud=False) for file_handle in files_to_upload: file_handle.close() filenames = list(map(os.path.basename, uploaded_keys)) filename_key_list = list(zip(filenames, file_uuids, uploaded_keys)) for filename, file_uuid, key in filename_key_list: logger.info("File %s: registering...", filename) # Generating file data creator_uid = self.config.get("creator_uid", 0) source_url = "s3://{}/{}".format(staging_bucket, key) logger.info("File %s: registering from %s -> uuid %s", filename, source_url, file_uuid) response = self.put_file._request(dict( uuid=file_uuid, bundle_uuid=bundle_uuid, creator_uid=creator_uid, source_url=source_url )) version = response.json().get('version', "blank") files_uploaded.append(dict(name=filename, version=version, uuid=file_uuid, creator_uid=creator_uid)) if response.status_code in (requests.codes.ok, requests.codes.created): logger.info("File %s: Sync copy -> %s", filename, version) else: assert response.status_code == requests.codes.accepted logger.info("File %s: Async copy -> %s", filename, version) timeout = time.time() + timeout_seconds wait = 1.0 while time.time() < timeout: try: self.head_file(uuid=file_uuid, replica="aws", version=version) break except SwaggerAPIException as e: if e.code != requests.codes.not_found: msg = "File {}: Unexpected server response during registration" raise RuntimeError(msg.format(filename)) time.sleep(wait) wait = min(60.0, wait * self.UPLOAD_BACKOFF_FACTOR) else: # timed out. :( raise RuntimeError("File {}: registration FAILED".format(filename)) logger.debug("Successfully uploaded file") file_args = [{'indexed': file_["name"].endswith(".json"), 'name': file_['name'], 'version': file_['version'], 'uuid': file_['uuid']} for file_ in files_uploaded] logger.info("%s", "Bundle {}: Registering...".format(bundle_uuid)) response = self.put_bundle(uuid=bundle_uuid, replica=replica, creator_uid=creator_uid, files=file_args) logger.info("%s", "Bundle {}: Registered successfully".format(bundle_uuid)) return { "bundle_uuid": bundle_uuid, "creator_uid": creator_uid, "replica": replica, "version": response["version"], "files": files_uploaded }