很早就听说celeryfabric这两个神器,最近同时在做两个自己的小项目,更新比较频繁,一遍一遍手动部署太过僵硬,所以才真正用到了这俩好东西。

celery

在用到celery之前,如果有定时任务这样的需求,我一般都是直接写crontab,但是如果单个任务时间消耗较长的话,使用celery就可以让长时间消耗的任务异步执行,避免程序主线程的阻塞。另外一点,celery结合flower这个celery监控工具,能够让你看到自己的任务执行状况。

celery作为一个分布式任务调度模块,它拥有独立的职程(Worker)进程持续监视队列中是否有需要处理的新任务,是否有要处理的任务则取决于中间人(Broker)在客户端和职程间斡旋。Broker从客户端向队列添加消息,之后Broker把消息派送给Worker。
在真正使用的时候,也不会有多少难度。

  1. 安装celery

    1
    pip install celery
  2. 选择一个中间人,我直接用redis,比较方便

  3. 新建celery app,douyu_app.py文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    from celery import Celery
    import celery_douyu_config # celery配置


    app = Celery('douyuapp', include=['task_douyu']) # task_douyu为celery任务
    app.config_from_object('celery_douyu_config')

    if __name__ == '__main__':
    app.start()
  4. 新建任务task_douyu.py文件

    1
    2
    3
    4
    5
    6
    7
    8
    from douyu_app import app
    from collector.danmu.CDouyu import Douyu


    @app.task
    def SAVE_DOUYU_DATA():
    douyu = Douyu()
    douyu.getRoomInfos()
  5. celery配置celery_douyu_config.py文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    from celery.schedules import crontab

    BROKER_URL = 'redis://127.0.0.1:6379/2' # 任务发布队列
    CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/3' # 任务执行结果存储

    CELERY_TIMEZONE = 'Asia/Shanghai' # 时区选择

    CELERYBEAT_SCHEDULE = {
    'SAVE_DOUYU_ROOMINFO': { # 任务名称
    'task': 'task_douyu.SAVE_DOUYU_DATA', # 具体任务 task_douyu.SAVE_DOUYU_DATA 是一个真正要执行的任务
    'schedule': crontab(minute=[40]), # 任务计划时间,每小时40分的时候执行
    }
    }
  6. 运行

    1
    celery -A douyu_app worker -B --loglevel=info

这里的-B主要用于celery定时任务,通过heartbeat来通知Worker是否有任务需要执行。

我这里只是简单的用了一下celery,并且只用到了定时任务,celery作为一款分布式任务调度工具,肯定会有更多更厉害的用法,留待以后用到的时候再研究。

fabric

之所以用到fabric还是因为自己比较懒,前边说到最近在频繁更新两个小项目,这两个项目都有向外提供api数据服务的部分,所以每次修改代码,运行测试,提交代码,ssh到远程主机pull最新代码,杀死原先服务进程,nohup最新服务。这个链路还是比较长的,并且有个项目长期的爬取网络数据,然后存储在mongodb中,自己的低配云主机硬盘有限,需要不定期地将数据迁移出来,所以想到了fabric这个好东西,应该可以满足自己的需求,自动化完成上述整个任务链路。

  1. 安装

    1
    pip install fabric
  2. 在项目根目录创建fabfile.py(默认使用这个名字,如果想要自定义,在运行的时候fab -f xxx.py也是可以的)

  3. 配置远程主机
    fabric是通过ssh的方式登录到远程主机的,可以通过用户名/密码的方式或者是通过密钥的形式,因为也就三台主机,也都是自己的机器,我选择使用密钥登陆。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    # 配置~/.ssh/config
    Host host_name
    HostName ip or domain
    User user
    IdentityFile ~/.ssh/id_rsa

    # 在fabfile中增加相关配置
    env.use_ssh_config = True
    env.hosts = ['al', 'al1'] # 这里的名字就是config中的host_name
    env.roledefs = { # 不同的主机要是有不同的任务,可以通过指定role来进行区分
    'douyu': ['al'],
    'other': ['al1'],
    }
  4. 添加运行测试

    1
    2
    3
    4
    5
    6
    @task
    def run_test():
    with settings(warn_only=True):
    result = local('python tester.py', capture=True)
    if result.failed and not confirm("Tests failed. Continue anyway?"):
    abort("Aborting at user request.")
  5. 提交代码

    1
    2
    3
    4
    5
    @runs_once
    @task
    def pre_deploy():
    local('git add -A && git commit')
    local('git push origin master && git push tx master')
  6. 更新代码,发布新的服务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    def deploy_code(cmd):
    code_dir = '~/git/Hotroom'
    with settings(warn_only=True):
    if run("ls {}".format(code_dir)).failed:
    run("git clone hhttps://github.com/love3forever/Hotroom.git {}".format(code_dir))
    with cd(code_dir):
    run("git pull")
    run("sudo pip install -r ./requirement.txt")
    with cd('./hotroom'):
    pids = run(
    "ps -ef | grep celery | grep -v grep | awk '{print $2}'")
    if pids:
    pid_list = pids.split('\r\n')
    for i in pid_list:
    with settings(warn_only=True):
    run('kill -9 %s' % i)
    run('pwd')
    run("(nohup celery -A {} worker -B --loglevel=error >& /dev/null < /dev/null &) && sleep 1".format(cmd))
    run('echo deployed')
  7. 备份还原数据库,清空数据库

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    def dump(db):
    stamp = datetime.now().strftime('%y_%m_%d_%H_%M')
    cd_dir = '~/tars/dump/'
    dump_dir = '~/tars/dump/dump_{}'.format(stamp)
    dump_tar = 'dump_{}.tar.gz'.format(stamp)
    local_dir = '/Users/eclipse/Downloads/mongo34/dump/'
    local_tar_dir = '/Users/eclipse/Downloads/mongo34/dump/dump_{}'.format(
    stamp)
    local_dump_dir = '/Users/eclipse/Downloads/mongo34/dump/dump_{}.tar.gz'.format(
    stamp)
    with settings(warn_only=True):
    run('mkdir -p {}'.format(dump_dir))
    run('mongodump -d {} -c Roominfo -o {} --gzip'.format(db, dump_dir))
    with cd(cd_dir):
    run('ls')
    run('tar -zcvf {} {}'.format(dump_tar, 'dump_{}'.format(stamp)))
    with settings(warn_only=True):
    get('~/tars/dump/{}'.format(dump_tar), local_dir)
    run('echo current db:{}'.format(db))
    run('mongo')
    local('mkdir -p {}'.format(local_tar_dir))
    local('tar -zxvf {} -C {}'.format(local_dump_dir, local_dir))
    local('/Users/eclipse/Downloads/mongo34/bin/mongorestore --dir={} \
    --gzip'.format(local_tar_dir))

小结

目前都只用到了这两个库的一点点功能,相信在以后的工作中会让这俩库有更多的作用!