File size: 2,905 Bytes
07ec1d3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7d1a461
 
07ec1d3
 
7d1a461
07ec1d3
7d1a461
 
 
07ec1d3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# -*- coding: utf-8 -*-
import os
import sys
import string
import time
import datetime
import json
from io import BytesIO
import oss2
import random
import requests
import shutil
from myconfigs import *

use_internal_network = False

def get_random_string():
    now = datetime.datetime.now()
    date = now.strftime('%Y%m%d')
    time = now.strftime('%H%M%S')
    microsecond = now.strftime('%f')
    microsecond = microsecond[:6] 

    rand_num = ''.join([str(random.randint(0, 9)) for _ in range(6)])
    random_string = ''.join(random.choices(string.ascii_uppercase, k=6)) # ascii_lowercase
    return date + "-" + time + "-" + microsecond + "-" + random_string

class ossService():
    def __init__(self):
        # print(f'AK: ******{OSSAccessKeyId[-3:]}')
        # print(f'SK: ******{OSSAccessKeySecret[-3:]}')
        self.AccessKeyId = OSSAccessKeyId
        self.AccessKeySecret = OSSAccessKeySecret
        self.Endpoint = OSSEndpoint
        if use_internal_network: 
            self.Endpoint = OSSEndpoint[:-len(".aliyuncs.com")] + "-internal.aliyuncs.com"
        self.BucketName = OSSBucketName # "vigen-invi" # "vigen-video"
        self.ObjectName = OSSObjectName # "video_generation" # "VideoGeneration"
        self.Prefix = "oss://" + self.BucketName

        auth = oss2.Auth(self.AccessKeyId, self.AccessKeySecret)
        self.bucket = oss2.Bucket(auth, self.Endpoint, self.BucketName)

    # oss_url: eg: oss://BucketName/ObjectName/xxx.mp4
    # timeout: eg: 3600*100
    # params: eg: {'x-oss-process': style}
    def sign(self, oss_url, timeout=3600, params=None):
        try:
            oss_path = oss_url[len("oss://" + self.BucketName + "/"):]
            return 1, self.bucket.sign_url('GET', oss_path, timeout, slash_safe=True, params=params)
        except Exception as e:
            print("sign error: {}".format(e))
            return 0, ""
        
    def uploadOssFile(self, oss_full_path, local_full_path):
        try:
            self.bucket.put_object_from_file(oss_full_path, local_full_path)
            return self.sign(self.Prefix+"/"+oss_full_path, timeout=3600)
        except oss2.exceptions.OssError as e:
            print("oss upload error: ", e)
            return 0, ""

    def downloadOssFile(self, oss_full_path, local_full_path):
        status = 1
        try:
            self.bucket.get_object_to_file(oss_full_path, local_full_path)
        except oss2.exceptions.OssError as e:
            print("oss download error: ", e)
            status = 0
        return status

        
    def downloadFile(self, file_full_url, local_full_path):
        status = 1
        response = requests.get(file_full_url)
        if response.status_code == 200:
            with open(local_full_path, "wb") as f:
                f.write(response.content)
        else:
            print("oss download error. ")
            status = 0
        return status