本文作者:龙利民
企业介绍:
ofo 小黄车是一个无桩共享单车出行平台,缔造了“无桩单车共享”模式,致力于解决城市出行问题。用户只需在微信服务号或app输入车牌号,即可获得密码解锁用车,随取随用,随时随地,也可以共享自己的单车到 ofo 共享平台,获得所有 ofo 小黄车的终身免费使用权,以1换n。
我们在使用maxcompute的时候,我们其实非常期望知道当前有多少任务在跑,哪些任务耗时长,哪些任务已经完成,并且能通过任务的logview来分析任务耗时长的原因。
maxcompute的任务状态分running和terminated, 其中running是包含:正在运行和等待运行的两种状态,terminated包含:完成、失败、cancel的任务三个状态。阿里云提供了获取上述2种状态的sdk函数,odps.list_instances(status=running|terminated, start_time=开始时间,结束时间)。为了实现秒级别更新任务状态我们可以用以下思路来实现。
1、对于已经running的任务,我们需要快速更新它的状态,有可能已经完成了;
2、不断获取新的任务状态。
我们用mysql来记录任务的状态表设计如下:
下面的页面可以查看当前的任务耗时,开始时间,对超过1小时的任务颜色使用红色标注,并且能查看logview,还能对任务进行取消,非常方便。

我们来看看代码的实现:
import time
import threading
import traceback
import datetime
from odps import odps
from dataflow import config
from libs.myconn import cursor
from config import dbinfo_bi_master
from libs import logger as _logger
g_table_name = "bi_maxcompute_task"
def save_task(instanceid, odps, mysqlconn):
class maxcomputetask(threading.thread):
class maxcomputetaskrunning(threading.thread):
if name == "__main__":
maxcompute可以在命令行下运行,也可以用sdk,阿里云的集成环境跑任务等。很多时候我们面临的任务是非常多的,如何做一个多任务的代码执行器,也是经常遇到的问题。任务执行是一个典型的生产者和消费者的关系,生产者获取任务,消费者执行任务。这么做有2个好处。
1)任务执行的数量是需要可控的,如果同时运行的任务不可控势必对服务器资源造成冲击, 2)多机运行服务,避免单点故障,maxcompute的任务是运行在云端的,可以通过instanceid获取到结果,此结果是保留7天的。
我大致贴一些我们在实际场景种的一些代码,生产者和消费者的代码:
class producter(threading.thread):
def main():
def signal_runflag(sig, frame):
maxcompute实际执行时的代码:
获取结果时的代码:
总结:
阿里云的maxcompute是非常好用的云计算服务,它的更新和迭代速度都非常快,使用阿里云解放工程师的搭建基础服务的时间,让我们更多的专注业务,站在巨人的肩膀上聪明的干活。