# -*- coding: utf-8 -*- import os import sys import string import time import datetime import json from io import BytesIO import oss2 import random import requests import shutil from myconfigs import * use_internal_network = False def get_random_string(): now = datetime.datetime.now() date = now.strftime('%Y%m%d') time = now.strftime('%H%M%S') microsecond = now.strftime('%f') microsecond = microsecond[:6] rand_num = ''.join([str(random.randint(0, 9)) for _ in range(6)]) random_string = ''.join(random.choices(string.ascii_uppercase, k=6)) # ascii_lowercase return date + "-" + time + "-" + microsecond + "-" + random_string class ossService(): def __init__(self): # print(f'AK: ******{OSSAccessKeyId[-3:]}') # print(f'SK: ******{OSSAccessKeySecret[-3:]}') self.AccessKeyId = OSSAccessKeyId self.AccessKeySecret = OSSAccessKeySecret self.Endpoint = OSSEndpoint if use_internal_network: self.Endpoint = OSSEndpoint[:-len(".aliyuncs.com")] + "-internal.aliyuncs.com" self.BucketName = OSSBucketName # "vigen-invi" # "vigen-video" self.ObjectName = OSSObjectName # "video_generation" # "VideoGeneration" self.Prefix = "oss://" + self.BucketName auth = oss2.Auth(self.AccessKeyId, self.AccessKeySecret) self.bucket = oss2.Bucket(auth, self.Endpoint, self.BucketName) # oss_url: eg: oss://BucketName/ObjectName/xxx.mp4 # timeout: eg: 3600*100 # params: eg: {'x-oss-process': style} def sign(self, oss_url, timeout=3600, params=None): try: oss_path = oss_url[len("oss://" + self.BucketName + "/"):] return 1, self.bucket.sign_url('GET', oss_path, timeout, slash_safe=True, params=params) except Exception as e: print("sign error: {}".format(e)) return 0, "" def uploadOssFile(self, oss_full_path, local_full_path): try: self.bucket.put_object_from_file(oss_full_path, local_full_path) return self.sign(self.Prefix+"/"+oss_full_path, timeout=3600) except oss2.exceptions.OssError as e: print("oss upload error: ", e) return 0, "" def downloadOssFile(self, oss_full_path, local_full_path): status = 1 try: self.bucket.get_object_to_file(oss_full_path, local_full_path) except oss2.exceptions.OssError as e: print("oss download error: ", e) status = 0 return status def downloadFile(self, file_full_url, local_full_path): status = 1 response = requests.get(file_full_url) if response.status_code == 200: with open(local_full_path, "wb") as f: f.write(response.content) else: print("oss download error. ") status = 0 return status