在Django中使用OSS
安装
storage
由于官方的django-oss-storage
库最新的v1.11
版本依旧有url
无处设置超时时间的bug,并且一直不接受PR,对其源码进行抽取并修改得到storage.py
文件如下:
import os
import six
import shutil
from urllib.parse import urljoin
from datetime import datetime
from tempfile import SpooledTemporaryFile
from django.core.files import File
from django.core.exceptions import ImproperlyConfigured, SuspiciousOperation
from django.core.files.storage import Storage
from django.conf import settings
from django.utils.encoding import force_text
from django.utils.deconstruct import deconstructible
from django.utils.timezone import utc
import oss2.utils
import oss2.exceptions
from oss2 import Auth, Service, Bucket, ObjectIterator
def _get_config(name):
config = os.environ.get(name, getattr(settings, name, None))
if config is not None:
if isinstance(config, six.string_types):
return config.strip()
else:
return config
else:
raise ImproperlyConfigured("'%s not found in env variables or setting.py" % name)
def _normalize_endpoint(endpoint):
if not endpoint.startswith('http://') and not endpoint.startswith('https://'):
return 'https://' + endpoint
else:
return endpoint
class OssError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
@deconstructible
class OssStorage(Storage):
"""
Aliyun OSS Storage
"""
def __init__(self, access_key_id=None, access_key_secret=None, end_point=None, bucket_name=None):
self.access_key_id = access_key_id if access_key_id else _get_config('OSS_ACCESS_KEY_ID')
self.access_key_secret = access_key_secret if access_key_secret else _get_config('OSS_ACCESS_KEY_SECRET')
self.end_point = _normalize_endpoint(end_point if end_point else _get_config('OSS_ENDPOINT'))
self.bucket_name = bucket_name if bucket_name else _get_config('OSS_BUCKET_NAME')
self.auth = Auth(self.access_key_id, self.access_key_secret)
self.service = Service(self.auth, self.end_point)
self.bucket = Bucket(self.auth, self.end_point, self.bucket_name)
# try to get bucket acl to check bucket exist or not
try:
self.bucket.get_bucket_acl().acl
except oss2.exceptions.NoSuchBucket:
raise SuspiciousOperation("Bucket '%s' does not exist." % self.bucket_name)
def _get_key_name(self, name):
"""
Get the object key name in OSS, e.g.,
location: /media/
input : test.txt
output : media/test.txt
"""
base_path = force_text(self.location)
final_path = urljoin(base_path + "/", name)
name = os.path.normpath(final_path.lstrip('/'))
if six.PY2:
name = name.encode('utf-8')
return name
def _open(self, name, mode='rb'):
if mode != "rb":
raise ValueError("OSS files can only be opened in read-only mode")
target_name = self._get_key_name(name)
try:
# Load the key into a temporary file
tmpf = SpooledTemporaryFile(max_size=10 * 1024 * 1024) # 10MB
obj = self.bucket.get_object(target_name)
if obj.content_length is None:
shutil.copyfileobj(obj, tmpf)
else:
oss2.utils.copyfileobj_and_verify(obj, tmpf, obj.content_length, request_id=obj.request_id)
tmpf.seek(0)
return OssFile(tmpf, target_name, self)
except oss2.exceptions.NoSuchKey:
raise OssError("%s does not exist" % name)
except:
raise OssError("Failed to open %s" % name)
def _save(self, name, content):
target_name = self._get_key_name(name)
self.bucket.put_object(target_name, content)
return os.path.normpath(name)
def create_dir(self, dirname):
target_name = self._get_key_name(dirname)
if not target_name.endswith('/'):
target_name += '/'
self.bucket.put_object(target_name, '')
def exists(self, name):
target_name = self._get_key_name(name)
if name.endswith("/"):
# This looks like a directory, but OSS has no concept of directories
# need to check whether the key starts with this prefix
result = self.bucket.list_objects(prefix=target_name, delimiter='', marker='', max_keys=1)
return bool(result.object_list)
exist = self.bucket.object_exists(target_name)
if not exist:
# It's not a file, but it might be a directory. Check again that it's not a directory.
name2 = name + "/"
return self.exists(name2)
return exist
def get_file_meta(self, name):
name = self._get_key_name(name)
return self.bucket.get_object_meta(name)
def size(self, name):
file_meta = self.get_file_meta(name)
return file_meta.content_length
def modified_time(self, name):
file_meta = self.get_file_meta(name)
return datetime.fromtimestamp(file_meta.last_modified)
created_time = accessed_time = modified_time
def get_modified_time(self, name):
file_meta = self.get_file_meta(name)
if settings.USE_TZ:
return datetime.utcfromtimestamp(file_meta.last_modified).replace(tzinfo=utc)
else:
return datetime.fromtimestamp(file_meta.last_modified)
get_created_time = get_accessed_time = get_modified_time
def content_type(self, name):
name = self._get_key_name(name)
file_info = self.bucket.head_object(name)
return file_info.content_type
def listdir(self, name):
if name == ".":
name = ""
name = self._get_key_name(name)
if not name.endswith('/'):
name += "/"
files = []
dirs = []
for obj in ObjectIterator(self.bucket, prefix=name, delimiter='/'):
if obj.is_prefix():
dirs.append(obj.key)
else:
files.append(obj.key)
return dirs, files
def url(self, name):
key = self._get_key_name(name)
return self.bucket.sign_url('GET', key, settings.OSS_EXPIRE)
def delete(self, name):
name = self._get_key_name(name)
result = self.bucket.delete_object(name)
def delete_with_slash(self, dirname):
name = self._get_key_name(dirname)
if not name.endswith('/'):
name += '/'
result = self.bucket.delete_object(name)
class OssMediaStorage(OssStorage):
def __init__(self):
self.location = settings.MEDIA_URL
super(OssMediaStorage, self).__init__()
def save(self, name, content, max_length=None):
return super(OssMediaStorage, self)._save(name, content)
class OssStaticStorage(OssStorage):
def __init__(self):
self.location = settings.STATIC_URL
super(OssStaticStorage, self).__init__()
def save(self, name, content, max_length=None):
return super(OssStaticStorage, self)._save(name, content)
class OssFile(File):
"""
A file returned from AliCloud OSS
"""
def __init__(self, content, name, storage):
super(OssFile, self).__init__(content, name)
self._storage = storage
def open(self, mode="rb"):
if self.closed:
self.file = self._storage.open(self.name, mode).file
return super(OssFile, self).open(mode)
需要特别注意的是,当你的Bucket
的ACL
类型为私有的时候,在storage.py
的line 179
的url
方法应为:
def url(self, name):
key = self._get_key_name(name)
return self.bucket.sign_url('GET', key, settings.OSS_EXPIRE)
当ACL类型为私有公开读、公开读写时,可修改为:
def url(self, name):
key = self._get_key_name(name)
return self.bucket._make_url(self.bucket_name, key)
区别是是否对链接进行签名,以授权前端获取到该文件。
settings.py
# OSS配置
OSS_ACCESS_KEY_ID = ""
OSS_ACCESS_KEY_SECRET = ""
OSS_ENDPOINT = "oss-cn-hangzhou.aliyuncs.com"
OSS_BUCKET_NAME = "demo"
# OSS_EXPIRE = 3600 # 私有链接超时时间 单位秒
USE_OSS = not DEBUG
if USE_OSS:
DEFAULT_FILE_STORAGE = 'common.storage.OssMediaStorage'
在这里设置USE_OSS
判断是为了将线下测试环境的media
数据与线上生产环境的media
数据隔离开来,不需要隔离则直接删除USE_OSS
判断即可。
图片处理(缩略图)
阿里云提供了2种oss
缩略图生成策略
一种是预设处理规则,之后通过规则名称去调用处理规则,处理规则可以在阿里云的OSS控制台中的图片处理选项中管理。默认的调用规则为域名/sample.jpg?x-oss-process=style/stylename
。
另外一种是通过url参数来控制图片的缩放、裁剪和水印等,这种方法会更加的灵活,但相应的url将变得不再简介美观。详情的使用方法可见阿里云OSS文档中的图片处理指南部分。