madtornado

最新版本:0.3.9
Author:SystemLight
Contact:https://github.com/SystemLight

环境要求: python3.5 及以上

平台: windows(select) | Linux(epoll)

Madtornado is a project templates for Tornado framework and quickly generate the Tornado project.
madtornado是一个Tornado项目开发模板,用于快速生成一个Tornado项目服务器。

注解

star 收藏一下

开源协议

MIT:

The MIT License (MIT)

Copyright (c) 2019, 2020 madtornado contributors

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

资源

Installation

pip install madtornado
sea --init_project [project path]

The powerful madtornado

Very easy to check whether is not-null for arguments

args = self.get_argument_for({"a": None, "b": None, "c": None})
check_rule = {
    "a": [check.not_null], "b": [check.not_null], "c": [check.not_null("c type is error")]
}
result = check.some(args, check_rule)
print(result.__dict__)

Used madtornado

workspace

%madtornado_project%\ancient\view\reception.py

start server

python server.py

Create route

file : reception.py

@register.route(use=register.PRT)
class IndexHandler(BaseHandler):
    """

    url: http://127.0.0.1:8095/prt/index

    """

    async def get(self):
        self.write("<h1 style='text-align:center'>Index</h1>")

    async def post(self):
        self.throw(404)

    async def put(self):
        self.throw(404)

    async def delete(self):
        self.throw(404)

Quickly create route

sea --new_recp %madtornado_project%\ancient\view\reception.py

Configure anything

%madtornado_project%\config\tornado.cfg

Advise

  • Nginx ( IIS ) use port 80
  • Tomcat use port 8080
  • Apache2 use port 8088
  • madtornado use port 8095

ancient

后台运作核心包
ancient.custom --- 定制模板引擎
uiMethod
ancient.custom.uiMethod.hello_func(self, content)[源代码]
ancient.custom.uiMethod.hello_method(self)[源代码]
ancient.custom.uiMethod.static_url(self, path)[源代码]

你可能不太了解这个重写的意义,原生tornado已经有了这个方法,如果你不知道这件事,你可以不用看我接下来说的 为了更好的让madtornado控制静态文件,提供更好更遍历的使用,我的服务端入口中根本没有启用原生的staticFileHandler 这样我就可以自己来控制静态文件提供更强大更易用的功能,这样导致就是static_url无法使用,但是这不是问题,你只需要 按需要重写即可,这不是大问题,我已经提供了一小段代码,在默认配置文件中会有/s为前缀的静态管理器配置,这个返回值刚好 契合。

参数:
  • self -- 占位,用于获取tornado对象
  • path -- 传进来的文件位置参数
返回:

合成的文件路径

uiModule
class ancient.custom.uiModule.HelloModule(handler)[源代码]

基类:tornado.web.UIModule

render(*args, **kwargs)[源代码]

Override in subclasses to return this module's output.

ancient.handlers --- 拓展原生模块
baseHandler
class ancient.handlers.baseHandler.BaseHandler(application, request, **kwargs)[源代码]

基类:tornado.web.RequestHandler

通过继承 BaseHandler 来使用它提供的便利方法,任何一条访问都是一个继承了 BaseHandler 的实例化对象 在0.3之后的版本,不建议直接继承BaseHandler,而是通过inheritHandler下的Base间接继承,这样你可以很轻松 的更改路由的父类,如你突然想要给这些子类加入拦截验证功能,你只需要改变inheritHandler/Base的指向就够了。

参数:
  • application -- tornado.web.Application对象
  • request -- HTTPServerRequest对象 (protocol,host,method,uri,version,remote_ip)
  • kwargs -- 其它在application加入到路由表时添加的额外字段
request

The tornado.httputil.HTTPServerRequest object containing additional request parameters including e.g. headers and body data:

request(uri,host,method,header,body,path,query,version,remote_ip,files)
talks_name

token或者session id 返回的cookie名称

json_body

当使用get_json_body方法后用于保存转换成json的body内容

HTTPFile(request.files):

self.request.files.get()方法,获取名称对应的file列表
file文件拥有属性,filename,body,content_type
clear_current_user()[源代码]

清除用户会话cookie

返回:None
data_received(chunk)[源代码]

重写方法 类加上 @stream_request_body 装饰器以后,自动调用的方法

参数:chunk -- 传入body中的流文件
返回:None
static expires_time(e_time)[源代码]

对cookie过期时间进行过滤

参数:e_time -- 过期时间,如3600秒
返回:赋值给expires的真实值
get_argument_for(these=None)[源代码]

获取一组query中或者body中的form-url格式参数

参数:these -- 一个字典对象,包含要获取的参数名称和对应的默认值,如果为None获取所有值
返回:dict 获取完成的字典对象,参数名和值相对应
get_argument_from(these_list)[源代码]

根据传入的参数列表获取参数,如果参数为空值则不返回,从body和query中查找参数

参数:these_list -- 想要获取的参数的列表
返回:dict 根据these_list提供的参数列表所能取到的变量键值对
get_argument_plus(arg_name, arg_default=None)[源代码]

获取任意格式传入的参数,可以参数写在链接中,也可以写在body中用json形式保存,也可以x-www-form-urlencoded形式传入body, 调用该方法将自动识别,不支持formdata类型获取,请使用request.files属性自行获取,或者通过data_received回调方法获取

参数:
  • arg_name -- 参数名称
  • arg_default -- 获取不到返回的默认值
返回:

经过处理获取到的值

get_body2json()[源代码]

获取请求的body,将其转换成json数据,并保存在属性json_body中

返回:dict
get_body_argument_for(these=None)[源代码]

获取一组body中的form-url格式参数

参数:these -- 一个字典对象,包含要获取的参数名称和对应的默认值,如果为None获取所有值
返回:dict 获取完成的字典对象,参数名和值相对应
get_body_argument_from(these_list)[源代码]

根据传入的参数列表获取参数,如果参数为空值则不返回,从body中查找参数

参数:these_list -- 想要获取的参数的列表
返回:dict 根据these_list提供的参数列表所能取到的变量键值对
get_current_user()[源代码]

当使用 tornado.web.authenticated 装饰器时调用的方法,如果返回内容,则验证通过,否则自动调整到登录页面,登录地址 设置application中的'login_url': '',参数

返回:str 当前用户的cookie信息
get_login_url()[源代码]

重写方法 自定义login_url地址,返回url地址,当使用@authenticated装饰器时,未验证的连接将跳转的url

返回:str
get_query_argument_for(these=None)[源代码]

获取一组query中的form-url格式参数

参数:these -- 一个字典对象,包含要获取的参数名称和对应的默认值,如果为None获取所有值
返回:dict 获取完成的字典对象,参数名和值相对应
get_query_argument_from(these_list)[源代码]

根据传入的参数列表获取参数,如果参数为空值则不返回,从query中查找参数

参数:these_list -- 想要获取的参数的列表
返回:dict 根据these_list提供的参数列表所能取到的变量键值对
on_close()[源代码]

当连接被意外断开时调用的方法,继承BaseHandler的类可以直接重写该方法

返回:None
on_connection_close()[源代码]

重写方法 当连接被意外断开时调用的方法,这个为tornado自带方法,内部有一定处理程序,不建议直接继承重写该方法, 正确的做法是继承BaseHandler对象,重写它的on_close方法

返回:None
on_finish()[源代码]

重写方法 当连接完成所有工作后,收尾时调用的方法,可以写入如sql关闭等任务,与prepare方法一样, 不建议直接改写BaseHandler的该方法,会影响到所有继承BaseHandler的类

返回:None
prepare()[源代码]

重写方法 当实例连接进来需要预先处理内容时调用的函数,例如token认证等任务,写在其中, 不建议直接修改BaseHandler的该方法,应当继承BaseHandler后,重写该方法,并且之后拥有相同 行为的Handler,继续继承同一个被改写prepare的Handler的类

返回:None
set_access_headers(domain='*')[源代码]

设置允许跨域访问

参数:domain -- 域范围
返回:None
set_current_user(value, over_time=3600)[源代码]

设置用户会话cookie

参数:
  • value -- 值,可以是token,也可以是session id
  • over_time -- 过期时间
返回:

None

set_default_headers()[源代码]

重写方法 当返回响应的时候,需要设置header信息,调用的默认方法,BaseHandler默认采用text/html方式返回

返回:None
set_nocache_headers()[源代码]

设置不缓存该文件

返回:None
set_stream_headers(name)[源代码]

设置返回为流类型,一般用于文件下载

参数:name -- 文件名称
返回:None
throw(status_code=200, log_message='', **kwargs)[源代码]

send_error()方法并不会直接触发系统默认的错误处理样式,如果想使用默认错误处理返回的内容, 请使用throw()方法,注意该方法会被try捕获到,如果写入到try中的时候请注意

常用状态码:

400 - 请求出现错误,非常通用,不想明确区分的客户端请求出错都可以返回
401 - 没有提供认证信息,未登录或者用户名密码错误
403 - 请求的资源不允许访问,token过期
404 - 请求的内容不存在
405 - 请求的[ 方法 ]不允许使用
406 - 请求格式不正确
408 - 请求超时了
410 - 请求资源曾经存在,但现在不存在了
413 - 请求体过大
414 - 请求的 URI 太长了
415 - 不支持的媒体类型
参数:
  • status_code -- 状态码
  • log_message -- 返回消息
  • kwargs -- 额外参数,如{'status_code': 404, 'reason': None}
返回:

None

write_array(list_data, encode_date=False)[源代码]

返回JSON数据响应,对象数组

参数:
  • list_data -- 列表数据
  • encode_date -- 是否存在datetime类型的字段
返回:

None

write_dict(dict_data, encode_date=False)[源代码]

返回JSON数据响应,该返回为纯粹的数据,错误信息根据状态码进行判定,

参数:
  • dict_data -- dict类型数据
  • encode_date -- 是否存在datetime类型的字段
返回:

None

write_error(status_code, **kwargs)[源代码]

重写方法 当send_error方法被触发时,调用的方法,用来处理错误状态 想要触发原始错误信息,请使用send_error

想要使用json格式错误信息,请使用throw方法

使用send_error会触发tornado系统默认处理,请使用throw触发madtornado来处理错误

参数:
  • status_code -- 状态码
  • kwargs -- 额外参数,如exc_info包含HTTPError的错误信息元组
返回:

None

write_jsonp(key, value)[源代码]

当使用jsonp技术进行请求时,调用此方法,用来直接返回jsonp的内容

参数:
  • key -- jsonp的名称
  • value -- 返回jsonp的内容,多个内容用逗号分隔
返回:

None

write_ok()[源代码]

返回一个正常的状态,这个方法和self.throw(200)返回内容是一样的, 但是throw方法会被捕获异常所捕获到,write_ok并不会被捕获

返回:None
write_resp(status_code, **kwargs)[源代码]

restful风格的自定义错误数据,该错误返回JSON格式错误响应 重写该方法用于自定义错误响应格式,该方法重新时不要抛出错误异常或调用send_error,throw等

参数:
  • status_code -- 响应码
  • kwargs -- 额外参数和log_message
返回:

None

dealHandler
class ancient.handlers.dealHandler.PongHandler(application, request, **kwargs)[源代码]

基类:ancient.handlers.baseHandler.BaseHandler

有时可能需要测试服务器的服务是否启动,请访问/pong,来进行确定

get()[源代码]

有时可能需要测试服务器的服务是否启动,请访问/pong,来进行确定

post()[源代码]

有时可能需要测试服务器的服务是否启动,请访问/pong,来进行确定

class ancient.handlers.dealHandler.ProxyHandler(application, request, **kwargs)[源代码]

基类:ancient.handlers.baseHandler.BaseHandler

默认情况代理模块一定带有proxy前缀,可以在配置文件中更改proxy_prefix属性内容

代理服务采用流式请求远端服务器和流式返回内容的方式,即Transfer-Encoding: chunked

注意:代理请求根路径访问时结尾一定要加"/",如http://你的域名/proxy/bd/,否则不进行代理

在配置文件中,填写proxy_handler用来启用代理模块:

proxy_handler = [["p1","http://www.baidu.com"],["p2","http://www.google.com/"]]

上述配置了两个代理路由,访问URL如下:

  1. 第一个:http://你的域名/proxy/p1/要访问的地址,这将代理到baidu下面
  2. 第二个:http://你的域名/proxy/p2/要访问的地址,这将代理到google下面

proxy_prefix和alias(也就是配置中的p1或p2)可以为空,但是不能包含"/"

代理路径是 http://你的域名/proxy_prefix/alias/要访问的路径

proxy_address

代理地址关键信息,包含{instead, address, host}

delete()[源代码]

DELETE方法请求代理

返回:None
get()[源代码]

GET方法请求代理

返回:None
initialize(**kwargs)[源代码]

__init__之前回调的方法

参数:kwargs -- __proxy_address
返回:None
options()[源代码]

符合下列条件,会跨域预检:

1. 请求方法不是GET/HEAD/POST
2. POST请求的Content-Type并非application/x-www-form-urlencoded, multipart/form-data, 或text/plain
3. 请求设置了自定义的header字段
返回:None
post()[源代码]

POST方法请求代理

返回:None
prepare()[源代码]

预处理回调,在请求开始之前执行的内容,设置允许跨域

返回:None
proxy()[源代码]

访问代理:

http://域名/proxy/自定义后缀/代理的内容路径
返回:None
proxy_received_body(chunk)[源代码]

处理收到的body信息

参数:chunk -- 收到的body块
返回:None
proxy_received_header(chunk)[源代码]

处理收到的头部信息

参数:chunk -- 收到的一行头部信息
返回:None
put()[源代码]

PUT方法请求代理

返回:None
class ancient.handlers.dealHandler.StaticHandler(application, request, **kwargs)[源代码]

基类:tornado.web.StaticFileHandler, ancient.handlers.baseHandler.BaseHandler

继承 StaticFileHandlerBaseHandler ,用于处理静态文件访问控制的类,

get(path, include_body=True)[源代码]

默认等于StaticFileHandler.get()行为,通过写该方法,对文件进行控制,可以直接在这里编写代码

initialize(path: str, default_filename: str = None, spa_page: str = None) → None[源代码]
inheritHandler
class ancient.handlers.inheritHandler.AbstractBaseHandler(application, request, **kwargs)[源代码]

基类:ancient.handlers.baseHandler.BaseHandler, abc.ABC

delete(*params)[源代码]
get(*params)[源代码]
post(*params)[源代码]
put(*params)[源代码]
class ancient.handlers.inheritHandler.CROSBaseHandler(application, request, **kwargs)[源代码]

基类:ancient.handlers.baseHandler.BaseHandler

继承该类,让路由拥有跨域访问的能力

options(*args)[源代码]
prepare()[源代码]

重写方法 当实例连接进来需要预先处理内容时调用的函数,例如token认证等任务,写在其中, 不建议直接修改BaseHandler的该方法,应当继承BaseHandler后,重写该方法,并且之后拥有相同 行为的Handler,继续继承同一个被改写prepare的Handler的类

返回:None
class ancient.handlers.inheritHandler.CustomErrorBaseHandler(application, request, **kwargs)[源代码]

基类:ancient.handlers.baseHandler.BaseHandler

madtornado全面使用restful风格规范,并且错误处理分割已经预定义为JSON格式响应,如果想自定义 JSON的数据内容可以调用时传参self.throw(404,log_message="message content")

如果想自定义一类错误处理,可以如该模块一样重写write_resp,并且做拦截,注意你也许并不是所有请求方法 下都需要拦截并自定义处理错误,请对请求方法和错误码进行判断,之后你可以让拥有相同错误处理方式的类都继承该类

write_resp(status_code, **kwargs)[源代码]

重写write_resp错误响应方法,该方法中切记不要调用任何抛出异常或者send_error,throw方法 否则会一直递归调用,请在结尾处调用父类该方法,保证其它响应异常可以被正确处理

该类是一个自定义响应信息处理类,当GET方法并且响应404时返回自定义的内容,其余交给父类处理用resultful风格 当你使用send_error方法抛出响应状态时,该方法不执行,因为send_error是tornado的方法,错误处理由tornado直接 进行,请使用madtornado的方法throw()抛出响应异常

参数:
  • status_code -- 响应状态码
  • kwargs -- 额外参数和log_message
返回:

None

class ancient.handlers.inheritHandler.InterceptorBaseHandler(application, request, **kwargs)[源代码]

基类:ancient.handlers.baseHandler.BaseHandler, abc.ABC

该类可以作为一个验证拦截器,你可以继承该类之后你需要重写interceptor 在该方法中做权限验证,这个拦截器目前只作用于get,post,put,delete方法 并且你要书写的请求也不是在get等中了,而是派生的i_get,i_post,i_put,i_delete

举例:

async def i_get(self):
    ...

async def i_post(self):
    ...
delete(*params)[源代码]
get(*params)[源代码]
i_delete(*params)[源代码]
i_get(*params)[源代码]
i_post(*params)[源代码]
i_put(*params)[源代码]
interceptor(*args)[源代码]

重写该方法

返回:返回True则继续执行,返回False将不继续执行
post(*params)[源代码]
put(*params)[源代码]
ancient.model --- 存储模型的目录

response
class ancient.model.response.DateEncoder(skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[源代码]

基类:json.encoder.JSONEncoder

default(obj)[源代码]

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this:

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
class ancient.model.response.ResponseModel[源代码]

基类:object

响应模型,通过实例化该类并调用该类的get方法获取返回的json格式数据 注意,采用ResponseModel类即意味着你将不遵循restful风格规范,而 采用madtornado风格的接口规范:

响应码:200[使用madtornado风格的接口规范,需保证任何状态http码都为200,通过返回数据的status进行状态标记]
返回类型示例:{"status": "fail", "description": "not found", "message": ""}
static dumps(data, encode_date=False)[源代码]

json.dumps的别名,用于方便调用

参数:
  • data -- 需要解析的数据对象
  • encode_date -- 对象中是否存在date或datetime对象
返回:

str

get(encode_date=False)[源代码]

通过改写ResponseModel实例的get方法返回值,实现全局的默认返回格式

参数:encode_date --
返回:str
ancient.module --- 拓展的IO模块
asyncMysql
class ancient.module.asyncMysql.Component[源代码]

基类:object

该模块参数均来源与madtornado配置文件,并且不支持同时连接多个不同的数据库

异步mysql快速请求模块(基于tormysql),示例:

db = asyncMysql().Component()
await db.on()
result = await db.select_tcw("mt")  # 读取mt表下的所有内容并返回
self.write(result) # 返回给前台页面显示

安全建议:

永远不要相信用户输入内容,sql模块虽然对于value进行了预检测
但是对于key部分还是无能为力,所以一定不要让用户对key的部分有
任何操作,只基于用户提供value值的权限,这样做是安全的。
delete_tw(table: str, where: str = None, where_arg: List[SQL_CONTENT] = None) → ancient.rig.genSQL.InputResult[源代码]

简化版的删除数据

参数:
  • table -- 数据表名称
  • where -- 如果该项不填,则删除整个表格,一个字符串,代表where条件筛选部分,如id=1 and name=a
  • where_arg -- 如果where传入的字符串包括占位符,那么传入参数列表,让sql可以预查询
返回:

genSQL.InputResult结果信息对象

input_sql(sql, arg_list=None, many=False) → ancient.rig.genSQL.InputResult[源代码]

执行插入,更新,删除等输入系列的sql语句

参数:
  • sql -- 输入系列的sql语句
  • arg_list -- 预处理参数列表,可以是二维数组代表多行插入前提需要设置many参数
  • many -- 是否启用多行输入
返回:

genSQL.InputResult 输入语句查询信息

insert_tc(table, content, many=False) → ancient.rig.genSQL.InputResult[源代码]

简化版的插入数据,只能简单的插入数据

示例内容:

insert_tc("table", [1, 2, 3, 4, 5])
转换内容 : ('insert into table values(%s,%s,%s,%s,%s)', [1, 2, 3, 4, 5])

insert_tc("table", [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]], many=True, ph="?")
转换内容 : ('insert into table values(?,?,?,?,?)', [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]])

insert_tc("table", {"id": 12, "name": "SystemLight"}, many=False, ph="%s")
转换内容 : ('insert into table(name,id) values(%s,%s)', ['SystemLight', 12])

insert_tc("table", {"key": ["id", "name"], "value": [["1", "lisys"], ["2", "sl"]]}, many=True, ph="%s")
转换内容 : ('insert into table(id,name) values(%s,%s)', [['1', 'lisys'], ['2', 'sl']])
参数:
  • table -- 表格的名称
  • content -- 支持四中格式传参,参考示例内容
  • many -- 是否启用多行插入
返回:

genSQL.InputResult结果信息对象

insert_update_tc(table, content, many=False) → ancient.rig.genSQL.InputResult[源代码]

简化版的插入即更新,这是mysql的特性方法,存在则更新数据,不存在则插入一条新数据

参数:
  • table -- 数据表名称
  • content -- 支持dict多行模式{"key":[],"value":[[]]},和dict单行模式传参
  • many -- 是否启用多行模式
返回:

genSQL.InputResult结果信息对象

off() → None[源代码]

关闭一个数据库连接实例

返回:None
on() → None[源代码]

开启一个数据库实例从sql_pool当中

举例:

db=Component()
db.on()
result = await db.select_tw("table")
db.off()

# 与上述使用方法一致
async with Component() as com:
    result = await com.select_tcw("table")

快捷使用方法:

with Component() as db:
    db.select_tw("table")
返回:None
output_sql()[源代码]

输出查询内容,直接传入select的sql语句,用于查询

参数:
  • sql -- select开头的sql语句
  • arg_list -- 预处理参数,如果需要预处理查询,需要在sql语句中使用%s作为占位符
返回:

查询到的结果内容

select_tcw()[源代码]

简化版的查询,用于快捷查询表格内容,不支持连表等高端操作

参数:
  • table -- 查询的表格名称
  • field -- 一个元组,代表要查询的字段名称
  • where -- 一个字符串,代表where条件筛选部分,如id=1 and name=a,可以在其中使用占位符,同时要提供where_arg
  • where_arg -- 如果where传入的字符串包括占位符,那么传入参数列表,让sql可以预查询
返回:

Tuple 查询到的结果内容

set_return_dict(is_return_dict: bool = True) → None[源代码]

设置是否返回字典格式而非数组格式的数据对象

参数:is_return_dict -- 是否返回字典格式数据
返回:None
sql_pool = <tormysql.pool.ConnectionPool object>
truncate_t(table: str) → ancient.rig.genSQL.InputResult[源代码]

快速清空表格数据

参数:table -- 数据表名称
返回:genSQL.InputResult结果信息对象
update_tcw(table: str, content: Dict[str, Any], where: str = None, where_arg: List[SQL_CONTENT] = None) → ancient.rig.genSQL.InputResult[源代码]

简化版的更新数据

参数:
  • table -- 数据表名称
  • content -- 字典对象,键值对相对应
  • where -- 一个字符串,代表where条件筛选部分,如id=1 and name=a,可以在其中使用占位符,同时要提供where_arg
  • where_arg -- 如果where传入的字符串包括占位符,那么传入参数列表,让sql可以预查询
返回:

genSQL.InputResult结果信息对象

syncFile
class ancient.module.syncFile.Component(path='', top_path=None)[源代码]

基类:object

文件上下文处理模块

参数:
  • path -- 相对于root_path进行偏移的子文件,该类中通过get_safe_path来获取一个安全偏移的路径,防止输入超出文件路径的地址
  • top_path -- 默认情况从配置文件中获取,核心操作目录,有时需要把一些文件写入静态资源管理的文件夹中,可以修改它
static ajoin(*args) → str[源代码]

合并路径并转换成绝对路径

参数:args -- 多个路径参数
返回:合并后且标准处理的路径
static clear_fragment(path: str) → None[源代码]

清空指定文件夹,用于清理md5碎片文件夹

参数:path -- 碎片路径
返回:None
ensure_dir(path: str = None) → bool[源代码]

确保文件夹存在,不存在则创建它

参数:path -- 文件夹路径
返回:返回判断之前是否存在该文件
get_safe_path(*args) → str[源代码]

获取相对于self.root_path路径进行偏移子路径,如果合成后的子路径超出self.root_path路径 会抛出AssertionError异常,有时希望通过用户传来的参数来构成一个路径,需要 用到该方法,避免用户传递的参数中包含..等内容,最终导致路径位置偏移出限定目录中,接收多个路径合成参数

参数:args -- 多个路径参数
返回:在限定路径下的子路径字符串
static is_exist(path: str) → bool[源代码]

判断文件是否存在

参数:path -- 路径
返回:布尔值,是否存在
is_exist_md5(md5: str, suffix: str) → bool[源代码]

判断是否存在md5文件,用于分片上传的方法,需要[捕获异常]处理

参数:
  • md5 -- md5值
  • suffix -- 文件后缀名称
返回:

布尔值,是否存在

is_safe_exist(file_name: str) → bool[源代码]

判断文件是否存在,需要[捕获异常]处理

参数:file_name -- 文件名称
返回:布尔值,是否存在
static join(*args) → str[源代码]

合并的路径并经过标准化处理,去除..或.等符号

参数:args -- 多个路径参数
返回:合并后且标准处理的路径
md5_to_file(fragment_path: str, md5: str, suffix: str) → Iterable[源代码]

该功能为分片上传功能,用于合并分片上传产生的碎片块,需要[捕获异常]处理

使用案例:

file_context = syncFile.Component()
for i in file_context.md5_to_file(fragment_path , md5, suffix):
    if not i:
        return self.write_json(message="读取文件块产生错误,合并失败", status=1)
    # 提高并发,抛出当前handler控制主线程的权利
    await gen.sleep(0)
self.write_json(message="合并成功")
参数:
  • fragment_path -- 碎片文件夹路径
  • md5 -- md5值
  • suffix -- 文件后缀拓展名
返回:

None

read(path: str, auto_close: bool = True) → Iterable[源代码]

读取文件数据返回生成器

参数:
  • path -- 读取文件的路径
  • auto_close -- 是否及时关闭文件流,如果设置False可以获取read_fp属性手动关闭
返回:

generator

receive_file(files, arg_name='image', is_only=True, file_name=None) → Optional[Tuple[str, str, str]][源代码]

帮助tornado进行文件接收,该方法需要捕捉异常,只接收第一个文件 接收路径会保证文件夹存在,不存在则创建,并且会查看是否为安全路径,不是 安全路径抛出异常,需要[捕获异常]处理

参数:
  • files -- self.requests.files
  • arg_name -- 参数名称
  • is_only -- 是否生成唯一ID文件名
  • file_name -- 当is_only=False时,提供的自定义文件名称,如果不提供则来自上传的文件名,拓展名以上传文件为准无需附加
返回:

(文件名称[不携带拓展名],文件拓展名,文件相对服务器保存路径)

static rollback_receive_file(path: str) → None[源代码]

有时接收文件后将接收的文件名传入到数据库中,如果这时数据库错误需要回退操作,那么你可以使用该方法删除接收的文件,实现回退

参数:path -- 文件路径
返回:None
s_read(path: str, auto_close: bool = True) → bytes[源代码]

不迭代数据,直接读取文件内容

参数:
  • path -- 读取文件的路径
  • auto_close -- 否及时关闭文件流,如果设置False可以获取read_fp属性手动关闭
返回:

bytes 读取到到的文件流

static touch(path: str) → None[源代码]

快速创建一个空文件

参数:path -- 文件路径
返回:None
write(path: str, data: bytes, auto_close: bool = True) → None[源代码]

向文件中写入数据,该方法编码为utf-8

参数:
  • path -- 文件路径
  • data -- 写入数据
  • auto_close -- 是否及时关闭文件流,如果设置False可以获取write_fp属性手动关闭
返回:

None

ancient.module.syncFile.insert2fp(file_path, offset, content, per_size=2048)[源代码]

允许你在文件指定位置进行内容插入

参数:
  • file_path -- 文件路径
  • offset -- 文件偏移位置
  • content -- 插入的内容
  • per_size -- 每片读取大小限制
返回:

None

syncJwt
class ancient.module.syncJwt.Component[源代码]

基类:object

decode(payload: str) → Dict[源代码]

解析jwt加密的token,如果无法解析抛出异常

参数:payload -- 需要解析的负载
返回:dict
encode(payload: dict, exp: Optional[int] = None) → bytes[源代码]

编码dict对象成为jwt二进制对象,当做token使用

参数:
  • payload -- 需要加密的dict对象
  • exp -- 有效时间
返回:

str

syncMemcached
class ancient.module.syncMemcached.Component[源代码]

基类:object

off() → None[源代码]
on() → None[源代码]
spe_set(key: str, value: str, over_time: int) → int[源代码]
syncMysql
class ancient.module.syncMysql.Component[源代码]

基类:object

该模块参数均来源与madtornado配置文件,并且不支持同时连接多个不同的数据库

安全建议:

永远不要相信用户输入内容,sql模块虽然对于value进行了预检测
但是对于key部分还是无能为力,所以一定不要让用户对key的部分有
任何操作,只基于用户提供value值的权限,这样做是安全的。
delete_tw(table: str, where: str = None, where_arg: List[SQL_CONTENT] = None) → ancient.rig.genSQL.InputResult[源代码]

简化版的删除数据

参数:
  • table -- 数据表名称
  • where -- 如果该项不填,则删除整个表格,一个字符串,代表where条件筛选部分,如id=1 and name=a
  • where_arg -- 如果where传入的字符串包括占位符,那么传入参数列表,让sql可以预查询
返回:

genSQL.InputResult结果信息对象

input_sql(sql, arg_list=None, many=False) → ancient.rig.genSQL.InputResult[源代码]

执行插入,更新,删除等输入系列的sql语句

参数:
  • sql -- 输入系列的sql语句
  • arg_list -- 预处理参数列表,可以是二维数组代表多行插入前提需要设置many参数
  • many -- 是否启用多行输入
返回:

genSQL.InputResult 输入语句查询信息

insert_tc(table, content, many=False) → ancient.rig.genSQL.InputResult[源代码]

简化版的插入数据,只能简单的插入数据

示例内容:

insert_tc("table", [1, 2, 3, 4, 5])
转换内容 : ('insert into table values(%s,%s,%s,%s,%s)', [1, 2, 3, 4, 5])

insert_tc("table", [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]], many=True, ph="?")
转换内容 : ('insert into table values(?,?,?,?,?)', [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]])

insert_tc("table", {"id": 12, "name": "SystemLight"}, many=False, ph="%s")
转换内容 : ('insert into table(name,id) values(%s,%s)', ['SystemLight', 12])

insert_tc("table", {"key": ["id", "name"], "value": [["1", "lisys"], ["2", "sl"]]}, many=True, ph="%s")
转换内容 : ('insert into table(id,name) values(%s,%s)', [['1', 'lisys'], ['2', 'sl']])
参数:
  • table -- 表格的名称
  • content -- 支持四中格式传参,参考示例内容
  • many -- 是否启用多行插入
返回:

genSQL.InputResult结果信息对象

insert_update_tc(table, content, many=False) → ancient.rig.genSQL.InputResult[源代码]

简化版的插入即更新,这是mysql的特性方法,存在则更新数据,不存在则插入一条新数据

参数:
  • table -- 数据表名称
  • content -- 支持dict多行模式{"key":[],"value":[[]]},和dict当行模式传参
  • many -- 是否启用多行模式
返回:

genSQL.InputResult结果信息对象

off() → None[源代码]

关闭一个数据库连接实例

返回:None
on() → None[源代码]

开启一个数据库实例从sql_pool当中

举例:

db=Component()
db.on()
db.select_tw("table")
db.off()

快捷使用方法:

with Component() as db:
    db.select_tw("table")
返回:None
output_sql()[源代码]

输出查询内容,直接传入select的sql语句,用于查询

参数:
  • sql -- select开头的sql语句
  • arg_list -- 预处理参数,如果需要预处理查询,需要在sql语句中使用%s作为占位符
返回:

查询到的结果内容

select_tcw()[源代码]

简化版的查询,用于快捷查询表格内容,不支持连表等高端操作

参数:
  • table -- 查询的表格名称
  • field -- 一个元组,代表要查询的字段名称
  • where -- 一个字符串,代表where条件筛选部分,如id=1 and name=a,可以在其中使用占位符,同时要提供where_arg
  • where_arg -- 如果where传入的字符串包括占位符,那么传入参数列表,让sql可以预查询
返回:

Tuple 查询到的结果内容

set_cursor_dict(is_return_dict: bool = True) → None[源代码]

设置是否返回字典格式而非数组格式的数据对象

参数:is_return_dict -- 是否返回字典格式数据
返回:None
truncate_t(table: str) → ancient.rig.genSQL.InputResult[源代码]

快速清空表格数据

参数:table -- 数据表名称
返回:genSQL.InputResult结果信息对象
update_tcw(table: str, content: Dict[str, Any], where: str = None, where_arg: List[SQL_CONTENT] = None) → ancient.rig.genSQL.InputResult[源代码]

简化版的更新数据

参数:
  • table -- 数据表名称
  • content -- 字典对象,键值对相对应
  • where -- 一个字符串,代表where条件筛选部分,如id=1 and name=a,可以在其中使用占位符,同时要提供where_arg
  • where_arg -- 如果where传入的字符串包括占位符,那么传入参数列表,让sql可以预查询
返回:

genSQL.InputResult结果信息对象

syncSqlite
class ancient.module.syncSqlite.Component[源代码]

基类:object

该模块参数均来源与madtornado配置文件,并且不支持同时连接多个不同的数据库

安全建议:

永远不要相信用户输入内容,sql模块虽然对于value进行了预检测
但是对于key部分还是无能为力,所以一定不要让用户对key的部分有
任何操作,只基于用户提供value值的权限,这样做是安全的。
delete_tw(table: str, where: str = None, where_arg: List[SQL_CONTENT] = None) → ancient.rig.genSQL.InputResult[源代码]

简化版的删除数据

参数:
  • table -- 数据表名称
  • where -- 如果该项不填,则删除整个表格,一个字符串,代表where条件筛选部分,如id=1 and name=a
  • where_arg -- 如果where传入的字符串包括占位符,那么传入参数列表,让sql可以预查询
返回:

genSQL.InputResult结果信息对象

input_sql(sql, arg_list=None, many=False) → ancient.rig.genSQL.InputResult[源代码]

执行插入,更新,删除等输入系列的sql语句

参数:
  • sql -- 输入系列的sql语句
  • arg_list -- 预处理参数列表,可以是二维数组代表多行插入前提需要设置many参数
  • many -- 是否启用多行输入
返回:

genSQL.InputResult 输入语句查询信息

insert_tc(table, content, many=False)[源代码]

简化版的插入数据,只能简单的插入数据

示例内容:

insert_tc("table", [1, 2, 3, 4, 5])
转换内容 : ('insert into table values(%s,%s,%s,%s,%s)', [1, 2, 3, 4, 5])

insert_tc("table", [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]], many=True, ph="?")
转换内容 : ('insert into table values(?,?,?,?,?)', [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]])

insert_tc("table", {"id": 12, "name": "SystemLight"}, many=False, ph="%s")
转换内容 : ('insert into table(name,id) values(%s,%s)', ['SystemLight', 12])

insert_tc("table", {"key": ["id", "name"], "value": [["1", "lisys"], ["2", "sl"]]}, many=True, ph="%s")
转换内容 : ('insert into table(id,name) values(%s,%s)', [['1', 'lisys'], ['2', 'sl']])
参数:
  • table -- 表格的名称
  • content -- 支持四中格式传参,参考示例内容
  • many -- 是否启用多行插入
返回:

genSQL.InputResult结果信息对象

off() → None[源代码]

关闭一个数据库连接实例

返回:None
on() → None[源代码]

开启一个数据库实例从sql_pool当中

举例:

db=Component()
db.on()
db.select_tw("table")
db.off()

快捷使用方法:

with Component() as db:
    db.select_tw("table")
返回:None
output_sql()[源代码]

输出查询内容,直接传入select的sql语句,用于查询

参数:
  • sql -- select开头的sql语句
  • arg_list -- 预处理参数,如果需要预处理查询,需要在sql语句中使用%s作为占位符
返回:

查询到的结果内容

select_tcw()[源代码]

简化版的查询,用于快捷查询表格内容,不支持连表等高端操作

参数:
  • table -- 查询的表格名称
  • field -- 一个元组,代表要查询的字段名称
  • where -- 一个字符串,代表where条件筛选部分,如id=1 and name=a,可以在其中使用占位符,同时要提供where_arg
  • where_arg -- 如果where传入的字符串包括占位符,那么传入参数列表,让sql可以预查询
返回:

Tuple 查询到的结果内容

set_cursor_dict(is_return_dict: bool = True) → None[源代码]

设置是否返回字典格式而非数组格式的数据对象

参数:is_return_dict -- 是否返回字典格式数据
返回:None
show_table()[源代码]

查询表格列表

返回:表格列表数据
update_tcw(table: str, content: Dict[str, Any], where: str = None, where_arg: List[SQL_CONTENT] = None) → ancient.rig.genSQL.InputResult[源代码]

简化版的更新数据

参数:
  • table -- 数据表名称
  • content -- 字典对象,键值对相对应
  • where -- 一个字符串,代表where条件筛选部分,如id=1 and name=a,可以在其中使用占位符,同时要提供where_arg
  • where_arg -- 如果where传入的字符串包括占位符,那么传入参数列表,让sql可以预查询
返回:

genSQL.InputResult结果信息对象

ancient.module.syncSqlite.dict_factory(cursor, row)[源代码]
ancient.rig --- 拓展的工具方法
check
class ancient.rig.check.BaseMessage(key=None, status=True, msg=None)[源代码]

基类:object

自定义规则函数返回的对象,可以继承该类,自定义返回Message对象

该类是包含校验信息的消息类,当继承该类后,可以为该类添加方法

用于得到该类时直接调用处理

参数:
  • key -- 系统调用,触发message的key值
  • status -- 状态
  • msg -- 自定义rule函数返回的自定义内容
ancient.rig.check.every(content, config)[源代码]

检验dict对象,当全部key值校验完所有规则函数返回message对象

校验字典:

print(every({
    "name": "lisys",
    "age": None
}, {
    "name": not_null,
    "age": [not_null()]
}).__dict__)
参数:
  • content -- 检验dict对象
  • config -- 配置dict对象
返回:

Message

ancient.rig.check.not_null(*args, **kwargs)[源代码]
ancient.rig.check.rule(fn)[源代码]

装饰器,用于装饰自定义规则rule函数

参数:fn -- 被装饰的普通函数
返回:None
ancient.rig.check.some(content, config)[源代码]

检验dict对象,当一个key值触发错误就返回message对象

校验字典:

print(some({
    "name": "lisys",
    "age": None
}, {
    "name": not_null(msg="自定义传参"),
    "age": [not_null]
}).__dict__)
参数:
  • content -- 检验dict对象
  • config -- 配置dict对象
返回:

dict

ancient.rig.check.verify(param, preset, strict=False)[源代码]

校验传入内容

strict为true,并且preset是一个rule列表时,verify会校验所有rule

并且返回一个主Message对象,该Message的msg是一个列表,包含所有的规则错误校验信息

如果有一个规则校验失败,那么主Message对象的status值将为False

使用预设即rule的列表进行校验:

# 检验value的值是否符合规则,not_null为非假的规则函数,verify函数返回BaseMessage对象
value = "hello SystemLight"
print(verify(value, [not_null]).__dict__)

value = "hello SystemLight"
print(verify(value, [not_null(msg="自定义传参")]).__dict__)

直接传入rule函数:

value = None
print(verify(value, not_null(msg="自定义传参")).__dict__)

value = None
print(verify(value, not_null).__dict__)
参数:
  • param -- 检验内容
  • preset -- 预设preset,rule函数列表,也可以直接传递rule函数
  • strict -- 是否为严格模式,即需要校验全部的rule函数才做错误返回
返回:

Message

genSQL
class ancient.rig.genSQL.InputResult[源代码]

基类:object

该类是当数据库module执行输入语句系列时,出现错误会默认返回的错误对象

status : 标识返回状态是否正确,如果处理sql语句时报错且回滚了数据,status标识为False

err_info : 错误信息

affect : sql语句影响到的行数

last_rowid : 返回自增ID的号码

ancient.rig.genSQL.delete_tw(table, where=None)[源代码]

示例内容:

delete_tw("table", where="id=1")
转换sql: delete from table where id=1
参数:
  • table -- 需要删除的表的名称
  • where -- 用于筛选,如id=2
返回:

删除sql

ancient.rig.genSQL.insert_tc(table, content, many=False, ph='%s')[源代码]

示例内容:

insert_tc("table", [1, 2, 3, 4, 5])
转换内容 : ('insert into table values(%s,%s,%s,%s,%s)', [1, 2, 3, 4, 5])

insert_tc("table", [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]], many=True, ph="?")
转换内容 : ('insert into table values(?,?,?,?,?)', [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]])

insert_tc("table", {"id": 12, "name": "SystemLight"}, many=False, ph="%s")
转换内容 : ('insert into table(name,id) values(%s,%s)', ['SystemLight', 12])

insert_tc("table", {"key": ["id", "name"], "value": [["1", "lisys"], ["2", "sl"]]}, many=True, ph="%s")
转换内容 : ('insert into table(id,name) values(%s,%s)', [['1', 'lisys'], ['2', 'sl']])
参数:
  • table -- 插入内容的表名称
  • content -- 需要插入的内容,有多种类型方式供选择
  • many -- 是否进行多行插入,默认值:False
  • ph -- 预查询模板占位符,默认值:%s
返回:

元祖(插入预查询模板,预查询参数)

ancient.rig.genSQL.insert_update_tc(table, content, many=False, ph='%s')[源代码]

插入即更新,这条sql语句在mysql中是有效的,不同数据系统可能有所不同

示例内容:

insert_update_tc("table", {"id": 12, "name": "SystemLight"}, many=False, ph="%s")
转换内容 : ('insert into table(id,name) values(%s,%s) on duplicate key update
id = values(id),name = values(name)', [12, 'SystemLight'])

insert_update_tc("table", {"key": ["id", "name"], "value": [["1", "lisys"], ["2", "sl"]]}, many=True, ph="%s")
转换内容 : ('insert into table(id,name) values(%s,%s) on duplicate key update
id = values(id),name = values(name)', [['1', 'lisys'], ['2', 'sl']])
参数:
  • table -- 插入即更新的table名称
  • content -- 需要插入即更新的内容,有两种类型方式供选择
  • many -- 是否进行多行插入,默认值:False
  • ph -- 预查询模板占位符,默认值:%s
返回:

元祖(插入预查询模板,预查询参数)

ancient.rig.genSQL.limit(sql, start, total)[源代码]

生成限制返回数量的sql语句

参数:
  • sql -- 现有sql语句
  • start -- 开始位置
  • total -- 总计条数
返回:

附件limit的sql语句

ancient.rig.genSQL.select_tcw(table, field=('*', ), where=None)[源代码]

示例内容:

select_tcw("table", ("id", "name"), where="id='2' and name='3'")
转换sql: select id,name from table where id='2' and name='3'
参数:
  • table -- 查询的表名称
  • field -- 需要查询的字段,放入元祖中,默认值("*",)
  • where -- 筛选的内容,如 id='2' and name='3',注意'用来声明字符串
返回:

查询sql语句

ancient.rig.genSQL.truncate_t(table)[源代码]

生成清空表sql语句

参数:table -- 需要清空的表的名称
返回:['set foreign_key_checks=0', 'truncate table tabble', 'set foreign_key_checks=1']
ancient.rig.genSQL.update_tcw(table, content, where=None, where_arg=None, ph='%s')[源代码]

生成更新sql语句

示例内容:

update_tcw("table", {"id": 12, "name": "SystemLight"}, ph="%s")
转换内容 : ('update table set name=%s,id=%s', ['SystemLight', 12])
参数:
  • table -- 更新的table名称
  • content -- 需要修改的值,字典类型
  • where -- 用于筛选,如id=2
  • where_arg -- 预查询参数,列表类型
  • ph -- 预查询模板占位符
返回:

元祖

password
ancient.rig.password.encrypt(password: str, salt: bytes = None) → bytes[源代码]

密码加密

参数:
  • password -- 密码值
  • salt -- salt值,默认不用填写
返回:

加密后的密码,无法逆向解析

ancient.rig.password.iso7064mod11_2(source: str) → str[源代码]

iso7064mod11-2校验算法

参数:source -- 需要添加校验的字符串
返回:校验位
ancient.rig.password.iso7064mod37_2(source: str) → str[源代码]

iso7064mod37_2校验算法

参数:source -- 需要添加校验的字符串
返回:校验位
ancient.rig.password.iso7064mod37_hybrid_36(source: str) → str[源代码]

iso7064mod37_HYBRID_36校验算法

参数:source -- 需要添加校验的字符串
返回:校验位
ancient.rig.password.validate(hashed: str, input_password: str) → bool[源代码]

密码验证

参数:
  • hashed -- 加密的密码
  • input_password -- 需要比对的密码
返回:

bool

register
class ancient.rig.register.RESTful(init_entity=None)[源代码]

基类:object

RESTful风格路由生成器,用于快速生成restful风格路由, 生成的路由可以被注册,不建议过长的路由匹配,尽量使用querystring 进行参数传递

示例一:

rf = register.rf

# 以对象的方式描写RESTful风格路由,相当于
# /zoos/{动物园ID}/animals               # 动物园所有动物
# /zoos/{动物园ID}/animals/{动物ID}       # 动物园指定ID的动物
@register.route(url=rf.e("zoos").e("animals").url)

示例二:

# 与上述注册方式效果一致
@register.route(url=RESTful(["zoos", "animals"]).u())
@register.route(url=RESTful([("zoos","(正则的类型,斜杠)w"), "animals"]).url)

示例三:

# 压缩实体去掉前缀路径,使用s方法注册的实体不包含前缀,即/{动物园ID}
@register.route(url=rf.s("zoos").url)

# 控制实例ID标识项是否可以省略,默认是可省略的标识
@register.route(url=rf.e("zoos").u(rf.LOOSE))

# 严格模式下必须包含指定的动物园ID,即无法匹配 /zoos
@register.route(url=rf.e("zoos").u(rf.STRICT))

示例四:

有时你可能需要匹配的参数是可选的,你可以这样注册
url=rf.e("fruit", **rf.union("apple", "banana", "orange")).url

上面的注册只会对三条路径进行匹配,使用union方法创造可选匹配串,通过**的方式传递给实体方法
/fruit/apple,/fruit/banana,/fruit/orange
IGNORE = 2
LOOSE = 0
STRICT = 1
clear()[源代码]

清空注册进来的内容

返回:RESTful实例对象
e(name, shape='[^\\\\/]', union=False)[源代码]

URL中增加一个实体,实体是有前缀的,前缀名等于name

参数:
  • name -- 实体前缀名称
  • shape -- 匹配类型
  • union -- 类型是否为可选匹配
返回:

RESTful实例对象

s(name, shape='[^\\\\/]', union=False)[源代码]

URL中增加一个简易实体,实体没有前缀

参数:
  • name -- 实体前缀名称,仅用于标记无实际意义
  • shape -- 匹配类型
  • union -- 类型是否为可选匹配
返回:

RESTful实例对象

u(need_eof=0)[源代码]

获取url

参数:need_eof -- 结尾字符的种类
返回:生成的URL匹配串
static union(*args)[源代码]

返回一个用于shape参数的匹配字符串,代表该内容类型为可选内容,可选范围为传入的内容

参数:args -- 可选项的列表
返回:用于shape参数的匹配字符串
url

属性方法,获取url

返回:生成的URL匹配串
class ancient.rig.register.Router(version='', prefix='')[源代码]

基类:object

如果不需要分组可以直接在Handler上面增加装饰器,如:

@register.route(version="v3")
class RecpHandler(BaseHandler):
    ...

访问地址:http://127.0.0.1:8095/v3/recp

分组路由,如:

实例化router对象
router=register.Router(prefix="/prefix",version="v3")

@router.route()
class RecpHandler(BaseHandler):
    ...

@router.route(url="/custom")
class NewRecpHandler(BaseHandler):
    ...

访问地址:http://127.0.0.1:8095/v3/prefix/recp
访问地址:http://127.0.0.1:8095/v3/perfix/custom
参数:
  • version -- restful风格版本号
  • prefix -- 统一路由前缀
static get_pe8(content: str)[源代码]

将大写字母分割转换成下划线分割的形式,如AbcDef-->abc_def

参数:content -- 转换的类名,如果带有Handler结尾将自动去除
返回:转换后的内容
route(url: str = None, prefix: str = '', urls: list = None, end=False)[源代码]

装饰器 添加该装饰器的请求实例,路由路径会被自动注册到路由表中 如果你并不想使用装饰器,你可以直接获取register.register_route的 路由表进行添加内容,通过装饰器注册的类是不区分顺序的,完全类导入顺序 路由具体生成顺序可以在运行程序后查看log下面的webMap.log文件

是否需要路由结尾有/和没有/匹配的地址一样,例如index/和index匹配是一样的 你可以设置url参数如/index/abc/?,这样两个匹配效果是一样的

如果你在配置文件中配置了url_prefix = [] 这是一个列表,会生成一组带前缀的静态文件路由匹配项 而我们的end=True的路由会放到这些带前缀的静态文件管理路由后面

参数:
  • url -- 路由路径,不填写默认会根据类名生成路径
  • prefix -- 如果提供该内容将在路由中添加一个前缀,这个前缀是在统一路由前缀后面的
  • urls -- 如果设置该参数,传入一个列表对象,url将不起效果,同时为该类增加多个可匹配的路由
  • end -- 声明该路由是否注册到默认路由后面,系统默认路由参考server.py中的default_route
返回:

None

template
class ancient.rig.template.GenerateCodeEngine(template_root_path='')[源代码]

基类:object

生成代码引擎类

使用方法:

gec = GenerateCodeEngine()
gec.catch_write("index.html", "template.html", {
    "anthor": "systemlight"
})
catch_user_code(path, match=None, code_count=1)[源代码]

捕获目标路径文件中的用户代码

参数:
  • path -- 目标文件路径
  • match -- 匹配用户代码规则
  • code_count -- 用户代码数量
返回:

匹配结果列表

catch_write(path, template_path, kwargs=None)[源代码]

捕获用户代码写入方法,执行写入之前会先匹配用户代码

参数:
  • path -- 目标文件路径
  • template_path -- 模板文件路径
  • kwargs -- 其它额外参数,参考catch_user_code方法,包含写入模板中的变量数据和函数等
返回:

None

end_match_tag
register_glob_content(name, value)[源代码]

注册全局方法或者变量,每个模板渲染时都将附带该内容

参数:
  • name -- 名称
  • value -- 内容
返回:

None

render(template_path, kwargs=None)[源代码]

根据模板渲染并生成字符串返回

参数:
  • template_path -- 模板文件路径
  • kwargs -- 包含写入模板中的变量数据和函数等
返回:

渲染后的内容

start_match_tag
write(path, template_path, kwargs=None)[源代码]

将渲染内容希尔到文件当中

参数:
  • path -- 目标文件路径
  • template_path -- 模板文件路径
  • kwargs -- 包含写入模板中的变量数据和函数等
返回:

None

utils
class ancient.rig.utils.Rectangular(x, y, w, h)[源代码]

基类:object

collision(r2)[源代码]

判断两个矩形是否产生碰撞关系

r1.x0 < r2.x1 r1.y0 < r2.y1 r1.x1 > r2.x0 r1.y1 > r2.y0

参数:r2 -- Rectangular
返回:布尔
contain(r2)[源代码]

判断矩形中是否包含另外一个矩形r2,注意包含也是矩形碰撞所以collision方法会返回True

r1.x0 < r2.x0 r1.x1 > r2.x1 r1.y0 < r2.y0 r1.y1 > r2.y1

参数:r2 -- Rectangular
返回:布尔
class ancient.rig.utils.TreeOperate(key=None)[源代码]

基类:object

TreeOperate允许你操作一颗树型结构数据,支持数据导入和导出,数据必须含有key唯一标识, 子元素必须存储在children键值下 示例内容:

_data = {
    "key": "1",
    "title": "root",
    "children": [
        {"key": "2", "title": "2", "children": [
            {"key": "4", "title": "4"},
            {"key": "5", "title": "5"}
        ]},
        {"key": "3", "title": "3", "children": [
            {"key": "6", "title": "6"},
            {"key": "7", "title": "7"}
        ]}
    ]
}
tree_root = TreeOperate.from_dict(_data)
tree_root.find("2").append(TreeOperate.from_dict({"key": "8", "title": "8"}))
print(tree_root.find("8"))
tree_root.find("8").remove()
print(tree_root.find("8"))
append(sub_tree: ancient.rig.utils.TreeOperate)[源代码]

为当前节点添加子节点,节点类型必须是TreeOperate类型 :param sub_tree: 子类型节点 :return: None

children
count()[源代码]

统计树型结构节点数量

返回:节点数量
find(key: str)[源代码]

根据key值查找节点 :param key: key值 :return: TreeOperate

static from_dict(data)[源代码]

从dict对象中返回TreeOperate对象 :param data: dict :return: TreeOperate

static from_file(path)[源代码]

从json文件中读取数据

参数:path -- json文件路径
返回:TreeOperate
parse(callback: Callable[[TreeOperate, List[T], int], T], deep=0)[源代码]

遍历定制解析规则,返回解析内容

参数:
  • callback -- Callable[["TreeOperate", List[T], int], T] 解析回调函数返回解析结果
  • deep -- 当前解析深度,默认不需要填写,用于回调函数接收判断所在层级
返回:

解析结果

remove(key=None)[源代码]

删除节点,如果传递key值,将删除当前节点下匹配的子孙节点, 如果不传递key值将当前节点从父节点中删除 :param key: [可选] key值 :return:

to_dict(flat=False)[源代码]

输出dict类型数据,用于json化 :param flat: 是否将data参数内容直接映射到对象 :return: dict

class ancient.rig.utils.UpdateList(*args, **kwargs)[源代码]

基类:list

主要方法update(),该方法是对list类型拓展, 当update的数据对象存在时对其更新,注意请保证UpdateList 的子项是dict类型而不要使用值类型,值类型对于UpdateList毫无意义

on_update hook函数,接收old_val(旧数据), p_object(新数据),需要返回更新数据 on_append hook函数,接收p_object(添加数据),需要返回添加数据 on_fetch_key hook函数,当key属性定义为函数时需要同时定义如何捕获key值

key 支持字符串,字符串指定子元素中的更新参考值
支持函数,接收val(当前数据),key(参考key值)该key值由on_fetch_key返回,函数返回bool值True为更新,False为添加

on_fetch_key作用:

复杂场景下我们可能需要up[("home2", True)]这样来找到响应的item,这样显示传递key值没有什么问题,key函数可以获取到
相应的key数据以供我们处理,但是当我们调用update时,update需要判断该内容是更新还是添加,这时我们传入的内容是数据,显然
update无法知晓如何获取我们想要的类型key值,如("home2", True),所以我们要定义on_fetch_key来告知update如何捕获我们
想要的类型的key值,on_fetch_key只有当key属性定义为函数时才有意义。
find(callback)[源代码]

返回满足回调函数的内容

参数:callback -- 回调函数,返回布尔类型用于判断是否满足要求
返回:(索引,值)
update(p_object)[源代码]

类似于append方法,不同的是当内容存在时会对内容进行更新,更新逻辑遵从update_callback 而当内容不存在时与append方法一致进行末尾加入内容

参数:p_object -- 内容对象
返回:None
ancient.rig.utils.assign(*args)[源代码]

与js中Object.assign表现形式一样

将所有可枚举属性的值从一个或多个源对象复制到目标对象

与dict.update(dict)效果一致

参数:args -- 复数的dict对象
返回:合并后的dict对象
ancient.rig.utils.deque(iterable=(), maxlen=None)[源代码]

deque返回deque对象,该对象包含的元素恒定,当你添加一个元素时最上层的元素会被销毁

参数:
  • iterable --
  • maxlen --
返回:

ancient.rig.utils.find(iterable, func)[源代码]

查找可迭代对象的指定项,匹配第一个子项并返回,无匹配项时返回(-1,None)

参数:
  • func -- 匹配函数
  • iterable -- 可迭代对象
返回:

索引,子对象

ancient.rig.utils.grunt(cup, water)[源代码]

grunt将cup中的dict对象作为最终产物,用water中的数据进行替换, 如果water未提供cup中相对应key值的字段,将使用cup中提供的默认内容

参数:
  • cup -- 需要的数据dict
  • water -- 源数据dict
返回:

最终产生的dict

ancient.rig.utils.inin(content, pool)[源代码]

查找指定内容是否存在于列表的字符串中,这种情况content一定要比列表中字符串短

举例:

inin("a",["asdf","fsfsdf"]) 将返回 "asdf"
参数:
  • content -- 内容
  • pool -- 列表
返回:

匹配内容

ancient.rig.utils.kill_form_port(port)[源代码]

传入端口号,杀死进程

参数:port -- 端口号,int类型
返回:None
ancient.rig.utils.loop(loop_iter)[源代码]

导入一个可迭代对象,loop将循环遍历它,通过next()取出下一个内容, 有时你可能需要循环变换一个状态例如从0-1再从0-1你可能需要loop, 这个函数类似轮播图,播放到结尾再从头播放

与itertools.cycle函数效果一致

参数:loop_iter -- 可迭代对象
返回:generator
ancient.rig.utils.plunder(obj: dict, which: list) → dict[源代码]

从字典中获取which中指定的内容,如果不存在或值不为真不获取该值

参数:
  • obj -- 源字典
  • which -- 需要获取哪些值
返回:

捞取后的字典

ancient.rig.utils.rang_split(content: str, index: int) → list[源代码]

根据提供的索引位置分割字符串

举例:

abcdefg  index=2
return ["ab","cdefg"]
参数:
  • content -- 字符串
  • index -- 索引位置
返回:

截取后的列表

ancient.rig.utils.read(path, encoding='utf-8')[源代码]

快捷读取文件函数

参数:
  • path -- 文件路径
  • encoding --
返回:

读取的文件内容

ancient.rig.utils.rectangular_factor(x0, y0, x1, y1)[源代码]

根据左上角和右下角坐标返回的Rectangular对象

返回:Rectangular
ancient.rig.utils.require(path, encoding='utf-8')[源代码]

有时你可能只是需要从文件中读取到json数据,这是require函数将根据 获取到的path,返回dict对象,相当方便,该函数同样类似于json.load

参数:path --
返回:dict
ancient.rig.utils.retry(freq=3, retry_callback=None)[源代码]

装饰器,为函数添加此装饰器当函数抛出异常时会对函数重新调用,重新调用次数取决于freq指定的参数

参数:
  • freq -- 重试次数
  • retry_callback -- 重试时回调执行的函数
返回:

原函数返回值

ancient.rig.utils.rinin(content, pool)[源代码]

查找指定内容是否存在于列表的字符串中,这种情况content一定要比列表中字符串长

举例:

inin("asdf",["a","fsfsdf"]) 将返回 "a"
参数:
  • content -- 内容
  • pool -- 列表
返回:

匹配内容

ancient.rig.utils.scoop(obj: dict, which: list) → dict[源代码]

从字典中获取which中指定的内容,如果不存在则捞取的值为None

参数:
  • obj -- 源字典
  • which -- 需要获取哪些值
返回:

捞取后的字典

ancient.rig.utils.write(path, data, encoding='utf-8')[源代码]

快捷写入文件函数

参数:
  • path -- 文件路径
  • data -- 写入数据
  • encoding --
返回:

None

ancient.view --- 视图模块
example
class ancient.view.example.AdminHandler(application, request, **kwargs)[源代码]

基类:ancient.handlers.baseHandler.BaseHandler

用render注意static_url函数无法使用被madtornado禁用了,由于tornado静态的处理不是很优美, 所以内部重新实现了静态处理,如果实在需求static_url可以在custom/uiMethod.py中自行实现

碎碎念:

想要做到高并发,一定不要写成同步代码,单线程是由程序中的await进行上下文切换的,
每一个路由都是一个协程,而这些协程中仍然可以通过gen.WaitIterator([]),gen.multi([])
等方法进行并发,其中WaitIterator返回时无序的,multi返回是有序的,传递给multi和WaitIterator对象时,
需要使用gen.convert_yielded将coroutine转换成Future对象再进行参数传递,或者直接传递一个包含协程的列表做参数。
如果你想从当前协程中主动弹出可以使用gen.sleep(0),如果程序中存在CPU密集型操作,请使用
await IOLoop.current().run_in_executor(None, blocking_func, args)或者给阻塞函数添加@run_on_executor装饰器

干掉阻塞代码(由于GIL关系,并不会提升太大性能,但是会减少阻塞):

from tornado.concurrent import run_on_executor

import time
import threading
from concurrent.futures import ThreadPoolExecutor

RecpHandler...部分
executor = ThreadPoolExecutor(20)

@run_on_executor
def block_code(self):
    print(threading.current_thread())
    time.sleep(10)
delete()[源代码]
get()[源代码]
post()[源代码]
put()[源代码]
class ancient.view.example.AnimalsHandler(application, request, **kwargs)[源代码]

基类:ancient.handlers.baseHandler.BaseHandler

RESTful风格动物园模型距离,动物实体

# 以对象的方式描写RESTful风格路由,相当于 # /zoos/{动物园ID}/animals # 动物园所有动物 # /zoos/{动物园ID}/animals/{动物ID} # 动物园指定ID的动物

get(zoos, animals)[源代码]
class ancient.view.example.IndexHandler(application, request, **kwargs)[源代码]

基类:ancient.handlers.baseHandler.BaseHandler

CustomErrorHandler是一个举例说明,演示了如何正确的创建自己的自定义错误处理 公共类

delete()[源代码]
get()[源代码]
post()[源代码]
put()[源代码]
class ancient.view.example.ZoosHandler(application, request, **kwargs)[源代码]

基类:ancient.handlers.baseHandler.BaseHandler

RESTful风格动物园模型距离,动物园实体

# 以对象的方式描写RESTful风格路由,相当于 # /zoos # 所有的动物园 # /zoos/{动物园ID} # 指定的动物园

get(zoos)[源代码]
ancient.wood --- 预设视图模块
freedom
class ancient.wood.freedom.FreedomHandler(application, request, **kwargs)[源代码]

基类:ancient.handlers.inheritHandler.AbstractBaseHandler

delete(path)[源代码]

删除一个json文件

调用地址:

http://127.0.0.1:8095/freedom/*path.json

delete: *path代表具体路径与本地磁盘空间进行映射如foo路径下的bar.json http://127.0.0.1:8095/freedom/foo/bar.json
参数:path -- 查找路径
返回:None
get(path)[源代码]

返回指定路径下json文件内容,接受auto_create参数,是否当 文件不存在时自动创建相应json文件并返回

调用地址:

http://127.0.0.1:8095/freedom/*path.json

get: *path代表具体路径与本地磁盘空间进行映射如foo路径下的bar.json http://127.0.0.1:8095/freedom/foo/bar.json

可选参数:auto_create(当文件不存在时,是否自动创建空文件并返回)
参数:path -- 查找路径
返回:None
post(path)[源代码]

增加一个JSON文件,数据传入body并且以json格式传入

调用地址:

http://127.0.0.1:8095/freedom/*path.json

post: *path代表具体路径与本地磁盘空间进行映射如foo路径下的bar.json http://127.0.0.1:8095/freedom/foo/bar.json
参数:path -- 查找路径
返回:None
put(path)[源代码]

更新一个json文件

调用地址:

http://127.0.0.1:8095/freedom/*path.json

put: *path代表具体路径与本地磁盘空间进行映射如foo路径下的bar.json http://127.0.0.1:8095/freedom/foo/bar.json

body: 用JSON格式派发数据
参数:path -- 查找路径
返回:None
class ancient.wood.freedom.FreedomPathHandler(application, request, **kwargs)[源代码]

基类:ancient.handlers.inheritHandler.AbstractBaseHandler

delete(path)[源代码]

删除一个路径及其路径下的所有内容,危险操作

调用地址:

http://127.0.0.1:8095/freedom/*path

delete: *path代表具体路径与本地磁盘空间进行映射如foo路径下的bar http://127.0.0.1:8095/freedom/foo/bar
参数:path -- 查找路径
返回:None
get(path)[源代码]

获取路径下所有JSON文件列表

调用地址:

http://127.0.0.1:8095/freedom/*path

get: *path代表具体路径与本地磁盘空间进行映射如foo路径下的bar http://127.0.0.1:8095/freedom/foo/bar

可选参数:need_data(是否需要数据一起返回,*代表所有文件需要数据,如果只想取指定文件用逗号分隔)
参数:path -- 查找路径
返回:None
post(path)[源代码]

新增一个路径,递归添加

调用地址:

http://127.0.0.1:8095/freedom/*path

post: *path代表具体路径与本地磁盘空间进行映射如foo路径下的bar http://127.0.0.1:8095/freedom/foo/bar
参数:path -- 查找路径
返回:None
put(path)[源代码]

无内容

参数:path --
返回:
ancient.wood.freedom.create_json(path)[源代码]

创建JSON文件,如果路径不存在会递归创建路径

参数:path -- json文件创建路径
返回:json文件创建路径
ancient.wood.freedom.get_safe_path(self: ancient.handlers.inheritHandler.AbstractBaseHandler, *path)[源代码]

获取相对freedom的偏移路径,会判断请求路径是否安全,不会在 之外的路径产生返回

参数:
  • self -- AbstractBaseHandler
  • path -- 路径
返回:

正确的路径,否则触发400错误

upload
class ancient.wood.upload.ExistFileHandler(application, request, **kwargs)[源代码]

基类:ancient.handlers.baseHandler.BaseHandler

获取md5文件是否存在

文件分块上传接口:

http://127.0.0.1:8095/file/exist

get:访问

需要参数:md5,suffix

suffix标识后缀名,如png mp4 jpg等
get()[源代码]
class ancient.wood.upload.MergeFileHandler(application, request, **kwargs)[源代码]

基类:ancient.handlers.baseHandler.BaseHandler

文件分块上传合并文件接口:

http://127.0.0.1:8095/file/merge

post:访问

需要参数:md5,suffix

suffix标识后缀名,如png mp4 jpg等
post()[源代码]
class ancient.wood.upload.UploadFileHandler(application, request, **kwargs)[源代码]

基类:ancient.handlers.baseHandler.BaseHandler

该模块为内置路由解决方案,提供文件分块上传参考 如果需要可以取消掉路由注释,该路由模块将会自动注入到网站地图中

文件分块上传接口:

http://127.0.0.1:8095/file/upload

post:访问

需要参数:md5,file,block
post()[源代码]

文件上传接口方法实现,参数处理比较复杂,新版本可以更加简洁, 但是为了不破坏其它内容,并未修改逻辑

webhook
class ancient.wood.webhook.GogsWebHookHandler(application, request, **kwargs)[源代码]

基类:ancient.handlers.baseHandler.BaseHandler

允许你通过配置文件注册webhook以运行指定脚本程序,该指定脚本程序一般用于自动部署其它应用程序

访问地址:

/webhook/gogs/:name(注册到配置文件的名称)

配置举例:

[tornado-webhook]
gogs=gogs webhook 配置文件路径

{
  "注册name名称": {
    "shell": "C:\netcoreapps\update.bat", //执行脚本
    "analysis_execution": true, // 是否解析,当注册为windows服务时必须解析执行
    "app_port": 5000,  // 应用程序端口,用于自动部署该程序时自动重启该程序,可不填写,将不会重启应用
    "branch": [  // 当指定分支被推送时进行自动部署
      "master"
    ]
  }
}
get(name)[源代码]
ancient.wood.webhook.exec_hook(name_conf)[源代码]
ancient.boot --- 启动钩子脚本
ancient.boot.boot()[源代码]

这是入口程序,请在这里编写需要启动运行的程序内容

返回:None
ancient.boot.exec_hook_on_boot(names)[源代码]

执行webhook脚本文件

参数:names -- 部署任务名称,和注册文件中保持一致
返回:None
ancient.boot.start_page(port)[源代码]

自启动浏览器功能

参数:port -- 访问地址的端口
返回:None

config

配置文件
tornado.cfg --- 服务配置

配置文件说明:

[tornado]
release = false
frame = madtornado
frame_version = 0.3.1
project = madtornado
project_version = 0.1.0

[tornado-server]
domain =
listen = 0.0.0.0
port = 8095

[tornado-secret]
cookie_secret = madtornado
xsrf_cookies = false

[tornado-static]
; url_prefix拥有多种形态,
; 1. 增加多个可访问前缀 ["\s","\v"],这样在URL中/s和/r都会映射到静态目录中,默认静态目录是path参数配置
; 2. 如果前缀映射的目录不同,可以这样配置[["\s","other/statics/"],"\v"]
; 3. 当然有时默认文件也不同,可以这样配置[["\s","other/statics/","main.html"],"\v"]
; 4. madtornadoV0.3.8版本加入了SPA配置,如果开启前端路由模式,必须指定一个spa页面,所有路由交给这个页面
; 5. 如果spa模式默认是不开启的,如果只想给某个映射静态路由添加spa_pag,这样配置
; [["\s","other/statics/","main.html","spa_page.html"],"\v"]
; 6. madtornado接收到请求时如果没找到任何内容会去请求静态页面,如果没有请求到静态页面且没有开启spa模式将返回404
; 7. 404返回内容可以重写,重写方法参考inheritHandler/CustomErrorBaseHandler并且交给staticHandler继承
url_prefix = []
path = statics/
default_filename = index.html
spa_page =

[tornado-template]
template_path = templates

[tornado-proxy]
xheaders = false
proxy_prefix = proxy
proxy_handler = []

[tornado-debug]
debug = true
autoreload = true
compiled_template_cache = true
static_hash_cache = true
serve_traceback = true
open_log = false

[file]
path = resource/

[token]
secret = madtornado
algorithm = HS256
over_time = 3600

[cache]
server_list = ["192.168.1.2:11211"]
over_time = 3600

[db]
max_connections = 1024
idle_seconds = 3600
wait_connection_timeout = 3
charset = utf8
host = 127.0.0.1
port = 3306
user = root
password = 111111
db = madtornado

用户手册

忠告
  • 千万不要在GBK环境工作,会很惨
  • 相对目录中最好不要加./ 可读效果不好
  • 这是用户指南,真正详细的帮助你使用madtornado