天天看點

Integration with FastAPI and APScheduler [with ray]

https://www.cnblogs.com/lightsong/p/15054120.html

上篇博文,介紹了如何給 API Server添加 APScheduler, 以便之後背景的定時任務。

但是這裡有一個問題, 如果執行的定時任務很是耗時, 則會在主程序(API server)占有大量的計算資源, 導緻API server響應新的連接配接不及時。

這裡引入 RAY 架構來專門解決這個問題。

https://github.com/ray-project/ray

本質上說, 此架構是一個支援分布式計算的架構, 并且支援 強化學習, 以及模型調參的工作。

An open source framework that provides a simple, universal API for building distributed applications. Ray is packaged with RLlib, a scalable reinforcement learning library, and Tune, a scalable hyperparameter tuning library.

支援三種模式:

寄生于宿主程序

獨立程序

叢集

樣例代碼, 在主程序中, 調用如下代碼, 則會将remote标注的函數推送到  ray 工作程序, 此工作程序可以在任何一個主機上。

https://zhuanlan.zhihu.com/p/111340572

在 Paper 裡面描述了一個典型的遠端調用流程:
Integration with FastAPI and APScheduler [with ray]

celery也是一個分布式計算的架構。

但是celery部署work程序時候, 需要制定  task所在腳本,

這樣工作程序的環境部署,是跟要執行的腳本強相關的。

但是Ray,更加類似Jenkins的主從模式, 可以将待執行的腳本推送到worker node上,然後執行,

這在應用部署上更加解耦, ray相當于是一個分布式運作環境, 可以送出任何的腳本到平台上執行。

類似 spark 平台。

https://github.com/fanqingsong/distributed_computing_on_celery/blob/master/tasks.py

https://github.com/fanqingsong/distributed_computing_on_celery/blob/master/taskscaller.py

run

https://docs.ray.io/en/master/cluster/index.html

One of Ray’s strengths is the ability to leverage multiple machines in the same program. Ray can, of course, be run on a single machine (and is done so often), but the real power is using Ray on a cluster of machines. A Ray cluster consists of a head node and a set of worker nodes. The head node needs to be started first, and the worker nodes are given the address of the head node to form the cluster:
Integration with FastAPI and APScheduler [with ray]

https://docs.ray.io/en/master/configure.html#cluster-resources

code refer

also refer to

https://docs.ray.io/en/releases-0.8.5/using-ray-on-a-cluster.html#deploying-ray-on-a-cluster

https://medium.com/distributed-computing-with-ray/autoscaling-clusters-with-ray-36bad4da6b9c

https://docs.ray.io/en/master/ray-dashboard.html#ray-dashboard

提供了完備的背景診斷工具

(1)叢集度量

(2)錯誤和異常,容易定位

(3)檢視各個機器上的日志

。。。

Ray’s built-in dashboard provides metrics, charts, and other features that help Ray users to understand Ray clusters and libraries. The dashboard lets you: View cluster metrics. See errors and exceptions at a glance. View logs across many machines in a single pane. Understand Ray memory utilization and debug memory errors. See per-actor resource usage, executed tasks, logs, and more. Kill actors and profile your Ray jobs. See Tune jobs and trial information. Detect cluster anomalies and debug them.

https://docs.ray.io/en/master/ray-logging.html#id1

By default, Ray logs are stored in a /tmp/ray/session_*/logs directory. worker-[worker_id]-[job_id]-[pid].[out|err]: Python/Java part of Ray drivers and workers. All of stdout and stderr from tasks/actors are streamed here. Note that job_id is an id of the driver.
(pid=47411) task_id: TaskID(a67dc375e60ddd1affffffffffffffffffffffff01000000)

https://docs.ray.io/en/master/package-ref.html#runtime-context-apis

<dl></dl> <dt></dt> <code>ray.runtime_context.</code><code>get_runtime_context</code>()[source] <dd></dd> Get the runtime context of the current driver/worker. Example:

PublicAPI (beta): This API is in beta and may change before becoming stable.

還可以查到 node_id, task_id

property <code>job_id</code> Get current job ID for this worker or driver. Job ID is the id of your Ray drivers that create tasks or actors. <dt>Returns</dt> <dd><dl></dl></dd> <dt>If called by a driver, this returns the job ID. If called in</dt> a task, return the job ID of the associated driver. property <code>node_id</code> Get current node ID for this worker or driver. Node ID is the id of a node that your driver, task, or actor runs. a node id for this worker or driver. property <code>task_id</code> Get current task ID for this worker or driver. Task ID is the id of a Ray task. This shouldn’t be used in a driver process.

https://docs.ray.io/en/master/auto_examples/tips-for-first-time.html#tip-4-pipeline-data-processing

https://apscheduler.readthedocs.io/en/stable/modules/schedulers/asyncio.html

AsyncIOScheduler was meant to be used with the AsyncIO event loop. By default, it will run jobs in the event loop’s thread pool. If you have an application that runs on an AsyncIO event loop, you will want to use this scheduler.

異步排程器

https://stackoverflow.com/questions/63001954/python-apscheduler-how-does-asyncioscheduler-work

https://github.com/fanqingsong/fastapi_apscheduler

With the help of fastapi and apscheduler, implement API to get cpu rate and set/delete periodical cpu scan job. reference: https://ahaw021.medium.com/scheduled-jobs-with-fastapi-and-apscheduler-5a4c50580b0e Seperate workload from fastapi server, in order to prevent the server from being too busy. Select APScheduler as time policy manager. Select Ray as logic node to execute workload. The call from fastapi or apscheduler to ray cluster is asynchronous, so all the communication is reactive, no blocking status exists. To demostrating how to use fastapi and apscheduler Requirements: previde API to get CPU rate, and get it periodically (1) get_cpu_rate -- get current cpu rate by this call (2) set_cpu_scanner_job -- set one scheduled job to scan cpu rate periodically (3) del_cpu_scanner_job -- delete the scheduled job

出處:http://www.cnblogs.com/lightsong/

本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接。