文章
问答
冒泡
boto3连接华为OBS(Python3 SDK)
import os
from boto3.session import Session
from botocore.exceptions import ClientError
from environs import Env

env = Env()
env.read_env()


class Boto3Client:
    def __init__(self):
        """
        access_key = "xxx"
        secret_key = "xxx"
        url = "https://s3.ap-east-1.amazonaws.com"  # 也可以是自己节点的地址
        """
        self.access_key = os.environ.get('HWC_ACCESS_KEY_ID')
        self.secret_key = os.environ.get('HWC_SECRET_ACCESS_KEY')
        self.endpoint_url = os.environ.get('HWC_OBS_ENDPOINT_URL')
        self.region_name = os.environ.get('REGION_NAME')
        session = Session(self.access_key, self.secret_key, region_name=self.region_name)
        self.s3_client = session.client('s3', endpoint_url=self.endpoint_url)
        self.s3 = session.resource('s3', endpoint_url=self.endpoint_url)
        # self.urlR = url

    def make_bucket(self, bucket):
        self.s3_client.create_bucket(Bucket=bucket)
        return bucket

    def existed_bucket(self, bucket):
        try:
            self.s3_client.meta.client.head_bucket(Bucket=bucket)
        except ClientError:
            return False
        return True

    def remove_bucket(self, bucket):
        return self.s3_client.delete_bucket(Bucket=bucket)

    def list_buckets(self):
        res = self.s3_client.list_buckets()
        return [i['Name'] for i in res['Buckets']]

    def put_object(self, bucket, file_name, objectName=None):
        if objectName is None:
            objectName = file_name.split('/')[-1]
        with open(file_name, 'rb') as f:
            self.s3_client.put_object(
                Body=f,
                Bucket=bucket,
                Key=objectName
            )
            return objectName

    def list_objects(self, bucket):
        res = self.s3_client.list_objects(Bucket=bucket)
        if 'Contents' not in res:
            return []
        else:
            return [i['Key'] for i in res['Contents']]

    def list_object_v2(self, bucket):
        res = self.s3_client.list_objects_v2(Bucket=bucket)
        return [i['Key'] for i in res['Contents']]

    def get_object(self, bucket, object_name, file_name):
        self.s3_client.download_file(bucket, object_name, file_name)
        return os.path.abspath(file_name)

    def delete_obj(self, bucket, object_name):
        res = self.s3_client.delete_object(
            Bucket=bucket,
            Key=object_name,
        )
        return res

    def get_bucket_location(self, bucket):
        location_info = self.s3_client.get_bucket_location(Bucket=bucket)
        return location_info['LocationConstraint']

    def head_bucket(self, bucket):
        resp = self.s3_client.head_bucket(Bucket=bucket)
        resp['ResponseMetadata']['HTTPHeaders']['x-amz-request-id'] = 0
        resp['ResponseMetadata']['HTTPHeaders']['date'] = 0
        resp['ResponseMetadata']['RequestId'] = 0
        return resp

    def head_object(self, bucket, object_name):
        return self.s3_client.head_object(
            Bucket=bucket,
            Key=object_name,
        )

    def get_bucket_acl(self, bucket):
        resp = self.s3_client.get_bucket_acl(Bucket=bucket)
        resp['ResponseMetadata']['HTTPHeaders']['x-amz-request-id'] = 0
        resp['ResponseMetadata']['RequestId'] = 0
        return resp

    def get_object_acl(self, bucket, object_name):
        resp = self.s3_client.get_object_acl(Bucket=bucket, Key=object_name)
        resp['ResponseMetadata']['HTTPHeaders']['x-amz-request-id'] = 0
        resp['ResponseMetadata']['RequestId'] = 0
        return resp

    def put_object_Acl(self, bucket, object_name, Permission='FULL_CONTROL'):
        """Permission:'FULL_CONTROL'|'WRITE'|'WRITE_ACP'|'READ'|'READ_ACP'"""
        return self.s3_client.put_object_acl(
            ACL='private',
            AccessControlPolicy={
                'Grants': [
                    {
                        'Grantee': {
                            'DisplayName': 'string',
                            'EmailAddress': 'string',
                            'ID': 'string',
                            'Type': 'CanonicalUser',
                            'URI': 'string'
                        },
                        'Permission': Permission
                    },
                ],
                'Owner': {
                    'DisplayName': 'string',
                    'ID': 'string'
                }
            },
            Bucket=bucket,
            GrantFullControl='string',
            GrantRead='string',
            GrantReadACP='string',
            GrantWrite='string',
            GrantWriteACP='string',
            Key=object_name,
            RequestPayer='requester',
            VersionId='string',
            ExpectedBucketOwner='string'
        )

    def create_multipart_upload(self, bucket, object_name):
        return self.s3_client.create_multipart_upload(Bucket=bucket, Key=object_name)['UploadId']

    def complete_multipart_upload(self, bucket, object_name, upload_id, part: list):
        # part : 传 Parts结构
        """[
            {
                'ETag': file_md5,
                'PartNumber': part_nums
            },
            ...
            ]"""
        return self.s3_client.complete_multipart_upload(
            Bucket=bucket,
            Key=object_name,
            UploadId=upload_id,
            MultipartUpload={
                'Parts': part
            },
        )

    def upload_part(self, fileIO, bucket, object_name, part_num: int, upload_id):
        return self.s3_client.upload_part(
            Body=fileIO,
            Bucket=bucket,
            Key=object_name,
            PartNumber=part_num,
            UploadId=upload_id
        )


if __name__ == '__main__':

    dir_path = 'media/business_license/b77f6a38-9779-40a0-bec6-b29d307a3088/'
    file_name = '1645084555.909076.jpeg'

    bucketName = os.environ.get('HWC_OBS_BUCKET_NAME')
    try:
        s3client = Boto3Client()
        # r = s3client.put_object(bucketName, './'+file_name, dir_path+file_name)
        r = s3client.get_object(bucketName, dir_path+file_name, file_name)
        print(r)
    except Exception as e:
        print(str(e))
python

关于作者

小乙哥
学海无涯,回头是岸
获得点赞
文章被阅读