天天看點

MaxCompute的任務狀态和多任務執行!/usr/bin/env python-- coding: utf-8 --author: lemon

本文作者:龍利民

企業介紹:

ofo 小黃車是一個無樁共享單車出行平台,締造了“無樁單車共享”模式,緻力于解決城市出行問題。使用者隻需在微信服務号或app輸入車牌号,即可獲得密碼解鎖用車,随取随用,随時随地,也可以共享自己的單車到 ofo 共享平台,獲得所有 ofo 小黃車的終身免費使用權,以1換n。

我們在使用maxcompute的時候,我們其實非常期望知道目前有多少任務在跑,哪些任務耗時長,哪些任務已經完成,并且能通過任務的logview來分析任務耗時長的原因。

maxcompute的任務狀态分running和terminated, 其中running是包含:正在運作和等待運作的兩種狀态,terminated包含:完成、失敗、cancel的任務三個狀态。阿裡雲提供了擷取上述2種狀态的sdk函數,odps.list_instances(status=running|terminated, start_time=開始時間,結束時間)。為了實作秒級别更新任務狀态我們可以用以下思路來實作。

1、對于已經running的任務,我們需要快速更新它的狀态,有可能已經完成了;

2、不斷擷取新的任務狀态。

我們用mysql來記錄任務的狀态表設計如下:

下面的頁面可以檢視目前的任務耗時,開始時間,對超過1小時的任務顔色使用紅色标注,并且能檢視logview,還能對任務進行取消,非常友善。

MaxCompute的任務狀态和多任務執行!/usr/bin/env python-- coding: utf-8 --author: lemon

我們來看看代碼的實作:

import time

import threading

import traceback

import datetime

from odps import odps

from dataflow import config

from libs.myconn import cursor

from config import dbinfo_bi_master

from libs import logger as _logger

g_table_name = "bi_maxcompute_task"

def save_task(instanceid, odps, mysqlconn):

class maxcomputetask(threading.thread):

class maxcomputetaskrunning(threading.thread):

if name == "__main__":

maxcompute可以在指令行下運作,也可以用sdk,阿裡雲的內建環境跑任務等。很多時候我們面臨的任務是非常多的,如何做一個多任務的代碼執行器,也是經常遇到的問題。任務執行是一個典型的生産者和消費者的關系,生産者擷取任務,消費者執行任務。這麼做有2個好處。

1)任務執行的數量是需要可控的,如果同時運作的任務不可控勢必對伺服器資源造成沖擊, 2)多機運作服務,避免單點故障,maxcompute的任務是運作在雲端的,可以通過instanceid擷取到結果,此結果是保留7天的。

我大緻貼一些我們在實際場景種的一些代碼,生産者和消費者的代碼:

class producter(threading.thread):

def main():

def signal_runflag(sig, frame):

maxcompute實際執行時的代碼:

擷取結果時的代碼:

總結:

阿裡雲的maxcompute是非常好用的雲計算服務,它的更新和疊代速度都非常快,使用阿裡雲解放工程師的搭建基礎服務的時間,讓我們更多的專注業務,站在巨人的肩膀上聰明的幹活。