最近在做一个小型的mongodb运维工具,在开发的过程中想要实现mongodb数据库备份的功能:前端页面提供一个按钮,用户点击之后向后端post想要备份哪个数据库,后端接到请求之后将这个任务发给异步任务队列,在数据库完成备份之后,后端推送一个包含此次备份详情的消息给前端页面,并且提供一个能够下载此备份文件的链接。
我的实现方法可以用下边这个图来表达:
task-flow.png

实现过程和遇到的问题

首先,介绍一下我在这里用到的一些技术组成:前端直接使用vue,后端使用flask提供restful服务,使用flask-jwt来扩展用户认证的功能、使用flask-socketio扩展给前后端之间增加websocket的通讯方式。Celery当作异步任务队列,执行长耗时的任务。使用Fabric来执行一些流程化的任务(比如在服务器本地执行数据库备份)。Redis在这里充当消息队列,负责flask和celery之间的消息通信(多亏了flask-socketio这个库)。

jwt权限认证

现在的web开发,都逐渐偏向前后端分离的模式,前后端之间的通讯大都是通过数据接口的形式来进行。那么前端发来一个数据请求,后端该如何知道是谁在请求数据、数据请求者有没有权限访问这些数据资源呢?我这次尝试使用JWT(JSON Web Tokens)来进行用户认证。
由于后端使用的是flask+flask-restful来提供rest数据服务,所以我直接找到一个flask的jwt扩展:Flask-JWT。在使用的过程中也十分方便,你需要做的有以下几点:

  1. 创建User:
    创建User的目的是在之后的认证过程中,通过User的username和passowrd来生成token。这里的User必须的三个属性:username、password、id。
  2. 完成jwt的基础配置:

    1
    2
    3
    4
    5
    6
    # jwt相关
    jwt_config = {
    'JWT_SECRET_KEY': 'jwt_secret',
    'JWT_AUTH_URL_RULE': '/api/v1/auth',
    'JWT_EXPIRATION_DELTA': timedelta(days=1)
    }

    这里需要注意的是:如果flask自身的配置没有SECRET_KEY这一项,jwt必须自己配置JWT_SECRET_KEY,否则flask-jwt会在使用的过程中报错:

    1
    TypeError: Expecting a string- or bytes-formatted key.

    另外一些其他的相关配置,都可以在flask-jwt的文档中找到。

  3. 实现jwt认证过程方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    def authenticate(username, password):
    # 用户身份认证
    user = User(username, password)
    flag, _ = user.check_password()
    if flag:
    return user
    else:
    return None

    def identity(payload):
    print(payload)
    if payload:
    user_id = payload['identity']
    # 返回值为之后的current_identity
    return User.gen_user_by_id(user_id)
    else:
    return None
  4. 完成jwt配置

    1
    2
    3
    app = Flask(__name__)
    app.config.update(jwt_config)
    jwt = JWT(app, authenticate, identity)

    通过上述的几个步骤,就可以简单的构建一个由jwt来守护的数据服务了,在初次认证获取token的时候,可以通过向JWT_AUTH_URL_RULE所指向的url post自己的用户名和密码即可。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    → http POST http://localhost:11111/api/v1/auth username=aaa password=bbb
    HTTP/1.1 200 OK
    Connection: keep-alive
    Content-Length: 226
    Content-Type: application/json
    Date: Wed, 27 Sep 2017 05:12:54 GMT

    {
    "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpZGVudGl0eSI6IjU5YjVlZDM3NzY0NTBiZjBhYjc1ODE3OCIsImlhdCI6MTUwNjQ4OTE3NCwibmJmIjoxNTA2NDg5MTc0LCJleHAiOjE1MDY1NzU1NzR9.1mZhsu7JU_mAV88TtPmG70iV9Z78QBf0bKKiAGqP6ZE"
    }
  5. @jwt_required和current_identity
    加入jwt是想让它来保护我的数据服务,这个实现过程十分简单,只需要在需要保护的地方添加@jwt_required这个装饰器即可,而从flask_jwt中引入的current_identity更是能够获得当前请求数据的用户是谁,就像之前通过flask_login中通过current_user获取当前用户一样。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    from flask_jwt import jwt_required, current_app, current_identity

    @api_mongo.resource('/<string:db>/status')
    class DatabaseStatus(Resource):
    """docstring for DatabaseStatus"""
    @jwt_required()
    def get(self, db):
    try:
    db = mongo.get_database(db)
    data = db.command("dbstats")
    except Exception as e:
    print(str(e))
    abort(404)
    else:
    return make_response(jsonify(data))

vue-resource获取后端数据

在首次进行用户认证的时候,将从后端获取的token放在localStorage中,然后在每次访问受保护的资源的时候,在请求的的header里边增加Authorization

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 用户初次认证
this.$http.post('/api/v1/auth', JSON.stringify({username: this.username,password:this.password})).then(response=>{
console.log(response)
localStorage.setItem('jwt_token',response.data['access_token'])
this.$message('登陆成功')
this.$router.push('/')
}, response=>{
this.$message('用户名或密码错误')
})

// 访问受保护资源
this.$http.get('/api/v1/mongo/'+db+'/colstatus',{'headers':{'Authorization':'JWT '+localStorage.getItem('jwt_token')}}).then(response=>{
console.log(response)
this.colData = response.body
}, response=>{
this.$message('获取数据库详情失败')
})

异步执行任务

这次要实现的任务是完成mongodb指定库的备份,由于pymongo中并没有类似mongodb自带工具mongodump那样的方法,所以想了个办法让异步任务通过fabric执行命令完成数据库备份:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import os
from time import time
from datetime import datetime
from flask_socketio import SocketIO
from pymongo import MongoClient
from gridfs import GridFSBucket
from functionModules.taskQueue.celery_app import app
from fabric.api import local


@app.task
def mongodump(cmd_str, dump_path, dump_root, dump_name, username, db_name, db_col):
dump_start = time()
gfs, col = init_mongo()
tar_path = './dump_tars/{}.tar.gz'.format(dump_name)
tar_filename = '{}.tar.gz'.format(dump_name)
local(cmd_str)
local('cd {}'.format(dump_path))
local('pwd')
local('tar zcvf {} -C {} .'.format(tar_path, dump_path))
print('备份文件压缩完毕')
with open(tar_path, 'rb') as df:
g_upload = gfs.open_upload_stream(
tar_filename, metadata={'contentType': 'application/x-compressed'})
g_upload.write(df.read())
g_upload.close()
print("文件上传完毕")
dump_over = time()
dump_cost = round(dump_over - dump_start, 2)
dump_result = {
'dump_name': tar_filename,
'operator': username,
'complete_time': datetime.now().strftime('%Y-%m-%d %H-%M'),

'file_size': get_FileSize(tar_path),
'time_cost': dump_cost,
'db_name': db_name,
'col_name': db_col
}
col.insert_one(dump_result)
print('备份流程完成')
try:
from eventlet import monkey_patch
monkey_patch()
socketio = SocketIO(message_queue='redis://localhost:6379/10')
socketio.emit('mongo dump', 'mongodump complete')
except Exception as e:
print(str(e))


def get_FileSize(filePath):
filePath = unicode(filePath, 'utf8')
fsize = os.path.getsize(filePath)
fsize = fsize / float(1024 * 1024)
return round(fsize, 2)


def init_mongo():
mongo = MongoClient()
db = mongo['mongodump']
gfs = GridFSBucket(db)
col = db['dump_info']
return gfs, col

整个任务主要是想要实现:

  1. 指定mongodb库的备份
  2. 将mongodb备份文件夹压缩
  3. 将压缩文件上传到mongodb的gridfs中持久存储
  4. 将此次备份详情存入数据库中
  5. 任务完成之后通过websocket通知主程序任务完成

在这个过程,主要碰到了这样几个问题:

  1. 使用tar zcvf压缩文件的时候,发现压缩包在解压的时候,解压出来的文件包含了被压缩文件的绝对路径,解决办法就是:tar zcvf xx.tar.gz -C /path/to/floder .,一定不能忘记最后的那个.
  2. 不得不说,flask-socketio这个库真的是很棒,通过

    1
    2
    socketio = SocketIO(message_queue='redis://localhost:6379/10')
    socketio.emit('mongo dump', 'mongodump complete')

    这样的方式就可以在Celery的进程中触发主进程(flask)websocket事件,这个主要归功于redis的pub/sub(这个等下再说)

和redis结合的flask-socketio

之前一直苦恼的是,在主进程(flask)调用celery的task之后,我该如何才知道任务的执行情况。当然,像这样:

1
2
3
4
celery_task_result = celery_tasks.mongodump.apply_async(
(dump_cmd_str, '{}{}'.format(dump_root, dump_dir), dump_root, dump_dir, username, dump_db, dump_col))
while not celery_task_result.ready():
time.sleep(1)

的确可以获得异步执行任务的状态,但是这样会使主进程阻塞,不是一个可取的方法。直到在我重新看了好几遍flask-socketio文档之后才发现这些内容,网页链接在此:flask-socketio文档
flask-socketio.png
如果真如文档内容所说,我用redis做为flask-socketio的消息队列,celery在执行过程中也可以随时把它自己执行的任务状态通过这个消息队列触发对应的文件。于是,我赶紧修改代码:

1
2
3
# socketio相关
socketio = SocketIO(app, message_queue='redis://localhost:6379/10',
async_mode='eventlet')

一边改代码一遍yy,这下nb了,谁知道重新执行程序发现这样的报错:

1
RuntimeError:Redis requires a monkey patched socket library to work with evnetlet

查看了源码并且又google之后,在初始化socketio的时候简单的使用eventlet的monkey patch即可:

1
2
3
from eventlet import monkey_patch

monkey_patch()

经过这一番操作,现在socketio的消息都是通过redis来传递,我们甚至可以直观的在redis-cli中看到:

1
2
3
4
5
6
7
8
9
10
11
→ redis-cli       
127.0.0.1:6379> select 10
OK
127.0.0.1:6379[10]> SUBSCRIBE flask-socketio # 使用redis做消息队列,默认channel是flask-socketio
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "flask-socketio"
3) (integer) 1
1) "message"
2) "flask-socketio"
3) "(dp0\nS'skip_sid'\np1\nNsS'room'\np2\nNsS'namespace'\np3\nS'/'\np4\nsS'event'\np5\nS'mongo dump'\np6\nsS'callback'\np7\nNsS'data'\np8\nS'mongodump complete'\np9\nsS'method'\np10\nS'emit'\np11\ns."

这样,只需要在后端写好对应的事件,celery在执行完成后触发事件,前端订阅了该事件的地方作出对应的响应即可。

1
2
3
4
5
6
7
8
9
this.socket = io('http://localhost:11111')
this.socket.on('mongo dump',(dumpmsg)=>{
console.log(`mongo dump: ${dumpmsg}`)
this.$notify({
title: '成功',
message: dumpmsg,
type: 'success'
})
})

socket-message.png

flask文件下载

其实上边讲了一大堆,基本上已经走完了整个异步任务的整个过程。但是文件完成备份,最终还是要提供一个下载链接,主要是为了让实施人员拿着这套备份去给用户还原数据用。
在flask里边,获取mongodb gridfs中文件并提供下载通过这样的方式即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from pymongo import MongoClient
from gridfs import GridFSBucket
mongo = MongoClient()
dump_db = mongo['mongodump']
dump_info_col = dump_db['dump_info']
gfs = GridFSBucket(dump_db)

@bp_mongo_file.route('/dumpdownload/<string:filename>')
def mongofile_download(filename):
if filename:
try:
grid_out = gfs.open_download_stream_by_name(filename)
except Exception:
abort(404)
else:
contents = grid_out.read()
if contents:
response = make_response(contents)
response.headers['Content-Type'] = 'application/x-compressed'
response.headers["Content-Disposition"] = \
"attachment; filename={}".format(filename)
return response

而在vue前端,只需要在对应的位置添加下载链接即可:

1
2
3
4
5
<a :href="getDownloadLink(scope.row.dump_name)">点击下载</a>

getDownloadLink:function(filename){
return 'http://localhost:11111/mongofile/dumpdownload/'+filename
}

小结

这个功能的开发只是个开始,不过在这个过程中我有几点感受:

  1. 必须要重视官方文档的力量
  2. 在遇到问题的时候,除了google,深入下去研究源代码是一个非常不错的方法
  3. 东西要活学活用,现在看来flask-socketio使用redis做为消息队列的方式,其实我在读研的时候用node实现过类似的东西。库可以用现成的,但是思维不能僵化。
  4. 做东西之前要有清晰的逻辑,最好能够画成图