import os
from boto3.session import Session
from botocore.exceptions import ClientError
from environs import Env
env = Env()
env.read_env()
class Boto3Client:
def __init__(self):
"""
access_key = "xxx"
secret_key = "xxx"
url = "https://s3.ap-east-1.amazonaws.com" # 也可以是自己节点的地址
"""
self.access_key = os.environ.get('HWC_ACCESS_KEY_ID')
self.secret_key = os.environ.get('HWC_SECRET_ACCESS_KEY')
self.endpoint_url = os.environ.get('HWC_OBS_ENDPOINT_URL')
self.region_name = os.environ.get('REGION_NAME')
session = Session(self.access_key, self.secret_key, region_name=self.region_name)
self.s3_client = session.client('s3', endpoint_url=self.endpoint_url)
self.s3 = session.resource('s3', endpoint_url=self.endpoint_url)
# self.urlR = url
def make_bucket(self, bucket):
self.s3_client.create_bucket(Bucket=bucket)
return bucket
def existed_bucket(self, bucket):
try:
self.s3_client.meta.client.head_bucket(Bucket=bucket)
except ClientError:
return False
return True
def remove_bucket(self, bucket):
return self.s3_client.delete_bucket(Bucket=bucket)
def list_buckets(self):
res = self.s3_client.list_buckets()
return [i['Name'] for i in res['Buckets']]
def put_object(self, bucket, file_name, objectName=None):
if objectName is None:
objectName = file_name.split('/')[-1]
with open(file_name, 'rb') as f:
self.s3_client.put_object(
Body=f,
Bucket=bucket,
Key=objectName
)
return objectName
def list_objects(self, bucket):
res = self.s3_client.list_objects(Bucket=bucket)
if 'Contents' not in res:
return []
else:
return [i['Key'] for i in res['Contents']]
def list_object_v2(self, bucket):
res = self.s3_client.list_objects_v2(Bucket=bucket)
return [i['Key'] for i in res['Contents']]
def get_object(self, bucket, object_name, file_name):
self.s3_client.download_file(bucket, object_name, file_name)
return os.path.abspath(file_name)
def delete_obj(self, bucket, object_name):
res = self.s3_client.delete_object(
Bucket=bucket,
Key=object_name,
)
return res
def get_bucket_location(self, bucket):
location_info = self.s3_client.get_bucket_location(Bucket=bucket)
return location_info['LocationConstraint']
def head_bucket(self, bucket):
resp = self.s3_client.head_bucket(Bucket=bucket)
resp['ResponseMetadata']['HTTPHeaders']['x-amz-request-id'] = 0
resp['ResponseMetadata']['HTTPHeaders']['date'] = 0
resp['ResponseMetadata']['RequestId'] = 0
return resp
def head_object(self, bucket, object_name):
return self.s3_client.head_object(
Bucket=bucket,
Key=object_name,
)
def get_bucket_acl(self, bucket):
resp = self.s3_client.get_bucket_acl(Bucket=bucket)
resp['ResponseMetadata']['HTTPHeaders']['x-amz-request-id'] = 0
resp['ResponseMetadata']['RequestId'] = 0
return resp
def get_object_acl(self, bucket, object_name):
resp = self.s3_client.get_object_acl(Bucket=bucket, Key=object_name)
resp['ResponseMetadata']['HTTPHeaders']['x-amz-request-id'] = 0
resp['ResponseMetadata']['RequestId'] = 0
return resp
def put_object_Acl(self, bucket, object_name, Permission='FULL_CONTROL'):
"""Permission:'FULL_CONTROL'|'WRITE'|'WRITE_ACP'|'READ'|'READ_ACP'"""
return self.s3_client.put_object_acl(
ACL='private',
AccessControlPolicy={
'Grants': [
{
'Grantee': {
'DisplayName': 'string',
'EmailAddress': 'string',
'ID': 'string',
'Type': 'CanonicalUser',
'URI': 'string'
},
'Permission': Permission
},
],
'Owner': {
'DisplayName': 'string',
'ID': 'string'
}
},
Bucket=bucket,
GrantFullControl='string',
GrantRead='string',
GrantReadACP='string',
GrantWrite='string',
GrantWriteACP='string',
Key=object_name,
RequestPayer='requester',
VersionId='string',
ExpectedBucketOwner='string'
)
def create_multipart_upload(self, bucket, object_name):
return self.s3_client.create_multipart_upload(Bucket=bucket, Key=object_name)['UploadId']
def complete_multipart_upload(self, bucket, object_name, upload_id, part: list):
# part : 传 Parts结构
"""[
{
'ETag': file_md5,
'PartNumber': part_nums
},
...
]"""
return self.s3_client.complete_multipart_upload(
Bucket=bucket,
Key=object_name,
UploadId=upload_id,
MultipartUpload={
'Parts': part
},
)
def upload_part(self, fileIO, bucket, object_name, part_num: int, upload_id):
return self.s3_client.upload_part(
Body=fileIO,
Bucket=bucket,
Key=object_name,
PartNumber=part_num,
UploadId=upload_id
)
if __name__ == '__main__':
dir_path = 'media/business_license/b77f6a38-9779-40a0-bec6-b29d307a3088/'
file_name = '1645084555.909076.jpeg'
bucketName = os.environ.get('HWC_OBS_BUCKET_NAME')
try:
s3client = Boto3Client()
# r = s3client.put_object(bucketName, './'+file_name, dir_path+file_name)
r = s3client.get_object(bucketName, dir_path+file_name, file_name)
print(r)
except Exception as e:
print(str(e))