ancient.module.syncFile 源代码

from ..conf import parser

import os
import shutil
import uuid
import hashlib
from typing import Iterable, Tuple, Optional, Union

option = parser.options("file")
print("[syncFile] is imported.")

"""
该模块和upload.py视图有关联,可以增加方法,但是不要修改或删除该组件内容
"""


[文档]class Component: """ 文件上下文处理模块 :param path: 相对于root_path进行偏移的子文件,该类中通过get_safe_path来获取一个安全偏移的路径,防止输入超出文件路径的地址 :param top_path: 默认情况从配置文件中获取,核心操作目录,有时需要把一些文件写入静态资源管理的文件夹中,可以修改它 """ def __init__(self, path="", top_path=None): if not top_path: top_path = option["path"] self.root_path = self.ajoin(top_path, path) self.rel_root_path = self.join(top_path, path) self.write_chunk = self.read_chunk = 65536 self.write_fp = self.read_fp = None self.encode = "utf-8"
[文档] @staticmethod def ajoin(*args) -> str: """ 合并路径并转换成绝对路径 :param args: 多个路径参数 :return: 合并后且标准处理的路径 """ return os.path.abspath(os.path.join(*args))
[文档] @staticmethod def join(*args) -> str: """ 合并的路径并经过标准化处理,去除..或.等符号 :param args: 多个路径参数 :return: 合并后且标准处理的路径 """ return os.path.normpath(os.path.join(*args))
[文档] @staticmethod def touch(path: str) -> None: """ 快速创建一个空文件 :param path: 文件路径 :return: None """ open(path, "wb").close()
[文档] def ensure_dir(self, path: Union[str] = None) -> bool: """ 确保文件夹存在,不存在则创建它 :param path: 文件夹路径 :return: 返回判断之前是否存在该文件 """ if path is None: path = self.root_path if os.path.exists(path): return True else: os.makedirs(path) return False
[文档] def get_safe_path(self, *args) -> str: """ 获取相对于self.root_path路径进行偏移子路径,如果合成后的子路径超出self.root_path路径 会抛出AssertionError异常,有时希望通过用户传来的参数来构成一个路径,需要 用到该方法,避免用户传递的参数中包含..等内容,最终导致路径位置偏移出限定目录中,接收多个路径合成参数 :param args: 多个路径参数 :return: 在限定路径下的子路径字符串 """ path = self.join(self.root_path, *args) assert self.root_path in path, "Path location overflow" return path
[文档] def write(self, path: str, data: bytes, auto_close: bool = True) -> None: """ 向文件中写入数据,该方法编码为utf-8 :param path: 文件路径 :param data: 写入数据 :param auto_close: 是否及时关闭文件流,如果设置False可以获取write_fp属性手动关闭 :return: None """ self.write_fp = open(path, "wb") self.write_fp.write(data) if auto_close: self.write_fp.close() self.write_fp = None
[文档] def s_read(self, path: str, auto_close: bool = True) -> bytes: """ 不迭代数据,直接读取文件内容 :param path: 读取文件的路径 :param auto_close: 否及时关闭文件流,如果设置False可以获取read_fp属性手动关闭 :return: bytes 读取到到的文件流 """ self.read_fp = open(path, "rb") data = self.read_fp.read() if auto_close: self.read_fp.close() return data
[文档] def read(self, path: str, auto_close: bool = True) -> Iterable: """ 读取文件数据返回生成器 :param path: 读取文件的路径 :param auto_close: 是否及时关闭文件流,如果设置False可以获取read_fp属性手动关闭 :return: generator """ self.read_fp = open(path, "rb") while True: chunk = self.read_fp.read(self.read_chunk) if chunk: yield chunk else: break if auto_close: self.read_fp.close() self.read_fp = None return
[文档] @staticmethod def is_exist(path: str) -> bool: """ 判断文件是否存在 :param path: 路径 :return: 布尔值,是否存在 """ return os.path.exists(path)
[文档] def is_safe_exist(self, file_name: str) -> bool: """ 判断文件是否存在,需要[捕获异常]处理 :param file_name: 文件名称 :return: 布尔值,是否存在 """ file_path = self.get_safe_path(file_name) if os.path.exists(file_path): return True return False
[文档] def is_exist_md5(self, md5: str, suffix: str) -> bool: """ 判断是否存在md5文件,用于分片上传的方法,需要[捕获异常]处理 :param md5: md5值 :param suffix: 文件后缀名称 :return: 布尔值,是否存在 """ file_path = self.get_safe_path(md5 + "." + suffix) if os.path.exists(file_path): return True return False
[文档] @staticmethod def clear_fragment(path: str) -> None: """ 清空指定文件夹,用于清理md5碎片文件夹 :param path: 碎片路径 :return: None """ shutil.rmtree(path)
[文档] def md5_to_file(self, fragment_path: str, md5: str, suffix: str) -> Iterable: """ 该功能为分片上传功能,用于合并分片上传产生的碎片块,需要[捕获异常]处理 使用案例:: file_context = syncFile.Component() for i in file_context.md5_to_file(fragment_path , md5, suffix): if not i: return self.write_json(message="读取文件块产生错误,合并失败", status=1) # 提高并发,抛出当前handler控制主线程的权利 await gen.sleep(0) self.write_json(message="合并成功") :param fragment_path: 碎片文件夹路径 :param md5: md5值 :param suffix: 文件后缀拓展名 :return: None """ md5_file = self.get_safe_path(md5 + "." + suffix) hmd5 = hashlib.md5() for i in range(len(os.listdir(fragment_path))): try: with open(os.path.join(fragment_path, str(i)), "rb") as fp: block = fp.read() hmd5.update(block) with open(md5_file, "ab") as fp: fp.write(block) yield True except IOError as e: yield False if hmd5.hexdigest() == md5: self.clear_fragment(fragment_path) return True return False
[文档] def receive_file(self, files, arg_name="image", is_only=True, file_name=None) -> Optional[Tuple[str, str, str]]: """ 帮助tornado进行文件接收,该方法需要捕捉异常,只接收第一个文件 接收路径会保证文件夹存在,不存在则创建,并且会查看是否为安全路径,不是 安全路径抛出异常,需要[捕获异常]处理 :param files: self.requests.files :param arg_name: 参数名称 :param is_only: 是否生成唯一ID文件名 :param file_name: 当is_only=False时,提供的自定义文件名称,如果不提供则来自上传的文件名,拓展名以上传文件为准无需附加 :return: (文件名称[不携带拓展名],文件拓展名,文件相对服务器保存路径) """ file_list = files.get(arg_name, None) if not file_list: return None # filename,body,content_type source_file = file_list[0] # 确保路径存在 self.ensure_dir(self.root_path) source_name, source_ext = os.path.splitext(source_file.filename) source_ext = source_ext.lower() # 判断文件名称 if is_only: name = str(uuid.uuid1()) elif file_name: name = file_name else: name = source_name # 安全保存到本地 save_path = self.get_safe_path(name + source_ext) self.write(save_path, source_file.body) return name, source_ext, self.join(self.rel_root_path, name + source_ext)
[文档] @staticmethod def rollback_receive_file(path: str) -> None: """ 有时接收文件后将接收的文件名传入到数据库中,如果这时数据库错误需要回退操作,那么你可以使用该方法删除接收的文件,实现回退 :param path: 文件路径 :return: None """ try: os.remove(path) except FileNotFoundError: pass
[文档]def insert2fp(file_path, offset, content, per_size=2048): """ 允许你在文件指定位置进行内容插入 :param file_path: 文件路径 :param offset: 文件偏移位置 :param content: 插入的内容 :param per_size: 每片读取大小限制 :return: None """ copies = offset // per_size f_dir, f_name = os.path.split(file_path) temp_path = os.path.join(f_dir, f_name + ".temp") with open(temp_path, "w") as w_fp: with open(file_path, "r") as fp: fp.seek(0) for c in range(1, copies + 1 + int(offset % per_size > 0)): if c * per_size >= offset: result = fp.read(offset - fp.tell()) else: result = fp.read(per_size) w_fp.write(result) w_fp.write(content) for c in fp: w_fp.write(c) os.remove(file_path) os.rename(temp_path, file_path)