Samhita's picture
add gantry code
3cc543c
raw
history blame contribute delete
No virus
5.17 kB
"""
Utility that uses boto to create buckets.
This work is not our own but is entirely written by https://github.com/full-stack-deep-learning.
"""
import hashlib
import json
import boto3
import botocore
S3_URL_FORMAT = "https://{bucket}.s3.{region}.amazonaws.com/{key}"
S3_URI_FORMAT = "s3://{bucket}/{key}"
s3 = boto3.resource("s3")
def get_or_create_bucket(name):
"""Gets an S3 bucket with boto3 or creates it if it doesn't exist."""
try: # try to create a bucket
name, response = _create_bucket(name)
except botocore.exceptions.ClientError as err:
# error handling from https://github.com/boto/boto3/issues/1195#issuecomment-495842252
status = err.response["ResponseMetadata"][
"HTTPStatusCode"
] # status codes identify particular errors
if status == 409: # if the bucket exists already,
pass # we don't need to make it -- we presume we have the right permissions
else:
raise err
bucket = s3.Bucket(name)
return bucket
def _create_bucket(name):
"""Creates a bucket with the provided name."""
session = boto3.session.Session() # sessions hold on to credentials and config
current_region = session.region_name # so we can pull the default region
bucket_config = {"LocationConstraint": current_region} # and apply it to the bucket
bucket_response = s3.create_bucket(
Bucket=name, CreateBucketConfiguration=bucket_config
)
return name, bucket_response
def make_key(fileobj, filetype=None):
"""Creates a unique key for the fileobj and optionally append the filetype."""
identifier = make_identifier(fileobj)
if filetype is None:
return identifier
else:
return identifier + "." + filetype
def make_unique_bucket_name(prefix, seed):
"""Creates a unique bucket name from a prefix and a seed."""
name = hashlib.sha256(seed.encode("utf-8")).hexdigest()[:10]
return prefix + "-" + name
def get_url_of(bucket, key=None):
"""Returns the url of a bucket and optionally of an object in that bucket."""
if not isinstance(bucket, str):
bucket = bucket.name
region = _get_region(bucket)
key = key or ""
url = _format_url(bucket, region, key)
return url
def get_uri_of(bucket, key=None):
"""Returns the s3:// uri of a bucket and optionally of an object in that bucket."""
if not isinstance(bucket, str):
bucket = bucket.name
key = key or ""
uri = _format_uri(bucket, key)
return uri
def enable_bucket_versioning(bucket):
"""Turns on versioning for bucket contents, which avoids deletion."""
if not isinstance(bucket, str):
bucket = bucket.name
bucket_versioning = s3.BucketVersioning(bucket)
return bucket_versioning.enable()
def add_access_policy(bucket):
"""Adds a policy to our bucket that allows the Gantry app to access data."""
access_policy = json.dumps(_get_policy(bucket.name))
s3.meta.client.put_bucket_policy(Bucket=bucket.name, Policy=access_policy)
def _get_policy(bucket_name):
"""Returns a bucket policy allowing Gantry app access as a JSON-compatible dictionary."""
return {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": [
"arn:aws:iam::848836713690:root",
"arn:aws:iam::339325199688:root",
"arn:aws:iam::665957668247:root",
]
},
"Action": ["s3:GetObject", "s3:GetObjectVersion"],
"Resource": f"arn:aws:s3:::{bucket_name}/*",
},
{
"Effect": "Allow",
"Principal": {
"AWS": [
"arn:aws:iam::848836713690:root",
"arn:aws:iam::339325199688:root",
"arn:aws:iam::665957668247:root",
]
},
"Action": "s3:ListBucketVersions",
"Resource": f"arn:aws:s3:::{bucket_name}",
},
],
}
def make_identifier(byte_data):
"""Create a unique identifier for a collection of bytes via hashing."""
# feed them to hashing algo -- security is not critical here, so we use SHA-1
hashed_data = hashlib.sha1(byte_data) # noqa: S3
identifier = hashed_data.hexdigest() # turn it into hexdecimal
return identifier
def _get_region(bucket):
"""Determine the region of an s3 bucket."""
if not isinstance(bucket, str):
bucket = bucket.name
s3_client = boto3.client("s3")
bucket_location_response = s3_client.get_bucket_location(Bucket=bucket)
bucket_location = bucket_location_response["LocationConstraint"]
return bucket_location
def _format_url(bucket_name, region, key=None):
key = key or ""
url = S3_URL_FORMAT.format(bucket=bucket_name, region=region, key=key)
return url
def _format_uri(bucket_name, key=None):
key = key or ""
uri = S3_URI_FORMAT.format(bucket=bucket_name, key=key)
return uri