天天看點

kubeflow-8-pipeline中的資料傳遞1 小資料2 大資料3 多輸出

Argo Workflows是一個開源的容器本地工作流引擎,用于在kubernetes上協調運作作業。Argo Workflows是基于kubernetes CRD實作的

功能:

(1)定義工作流,其中工作流中的每一個步驟都是一個容器

(2)将多個步驟工作流模組化成一系列的任務,或者使用有向無環圖(DAG)捕獲任務間的依賴關系

(3)使用kubernetes上的Argo Workflows可以在短時間内輕松操作大量計算密集型作業

(4)不需要配置複雜的軟體開發産品就可以在kubernetes本地環境中運作CI/CD

1 小資料

Small data is the data that you’ll be comfortable passing as program’s command-line argument. Small data size should not exceed few kilobytes.

小資料是作為程式的指令行參數傳遞的資料。小資料大小不應超過幾千位元組。

Some examples of typical types of small data are: number, URL, small string (e.g. column name).

小資料的典型類型有:數字、URL、小字元串(例如列名)。

Small lists, dictionaries and JSON structures are fine, but keep an eye on the size and consider switching to file-based data passing methods that are more suitable for bigger data (more than several kilobytes) or binary data.

小清單、字典和JSON結構可以,但是要注意大小,并考慮切換到基于檔案的資料傳遞方法更适合于較大的資料(超過幾千位元組)或二進制資料。

All small data outputs will be at some point serialized to strings and all small data input values will be at some point deserialized from strings (passed as command-line argumants). There are built-in serializers and deserializers for several common types (e.g. str, int, float, bool, list, dict). All other types of data need to be serialized manually before returning the data. Make sure to properly specify type annotations, otherwize there would be no automatic deserialization and the component function will receive strings instead of deserialized objects.

所有小資料輸出将在某個點序列化為字元串,所有小資料輸入值将在某個點從字元串反序列化(作為指令行參數傳遞)。對于幾種常見類型(例如str、int、float、bool、list、dict),都有内置的序列化程式和反序列化程式。在傳回資料之前,所有其他類型的資料都需要手動序列化。確定正确指定類型注釋,否則不會自動反序列化,元件函數将接收字元串而不是反序列化對象。

1.1 kfp.components.func_to_container_op

将一個python方法轉化成一個pipeline的元件。

func_to_container_op(func:Callable, 
output_component_file:Union[str, NoneType]=None, 
base_image:Union[str, NoneType]=None, 
extra_code:Union[str, NoneType]='', 
packages_to_install:List[str]=None, 
modules_to_capture:List[str]=None, 
use_code_pickling:bool=False, 
annotations:Union[Mapping[str, str], 
NoneType]=None)
功能:
Converts a Python function to a component and returns a task
(:class:`kfp.dsl.ContainerOp`) factory.

Function docstring is used as component description. Argument and return annotations are used as component input/output types.
函數docstring用作元件描述。參數和傳回注釋用作元件輸入/輸出類型。
To declare a function with multiple return values, use the :code:`NamedTuple` return annotation syntax::
聲明多個傳回值的方式
from typing import NamedTuple
def add_multiply_two_numbers(a: float, b: float) -> NamedTuple('DummyName', [('sum', float), ('product', float)]):
    """Returns sum and product of two arguments"""
    return (a + b, a * b)
參數:
常用(1)func: 必填
The python function to convert
常用(2)base_image: Optional. 可選
Specify a custom Docker container image to use in the component. 
指定要在元件中使用的自定義Docker容器映像。
For lightweight components, the image needs to have python 3.5+. 
Default is tensorflow/tensorflow:1.13.2-py3

(3)output_component_file: Optional. 可選
Write a component definition to a local file. 
Can be used for sharing.
(4)extra_code: Optional. 可選
Extra code to add before the function code. 
Can be used as workaround to define types used in function signature.
(5)packages_to_install: Optional. 可選
List of [versioned] python packages to pip install before executing the user function.
(6)modules_to_capture: Optional. 可選
List of module names that will be captured (instead of just referencing) during the dependency scan. 
By default the :code:`func.__module__` is captured. 
The actual algorithm: Starting with the initial function, start traversing dependencies. 
If the :code:`dependency.__module__` is in the :code:`modules_to_capture` list then it's captured and it's dependencies are traversed. 
Otherwise the dependency is only referenced instead of capturing and its dependencies are not traversed.
(7)use_code_pickling: Specifies whether the function code should be captured using pickling as opposed to source code manipulation. 
Pickling has better support for capturing dependencies, but is sensitive to version mismatch between python in component creation environment and runtime image.
(8)annotations: Optional. 可選
Allows adding arbitrary key-value data to the component specification.

傳回值:
A factory function with a strongly-typed signature taken from the python function.
Once called with the required arguments, the factory constructs a pipeline task instance (:class:`kfp.dsl.ContainerOp`) 
           

1.2 kfp.Client.create_run_from_pipeline_func

在啟用kfp的Kubernetes群集上運作pipeline。

create_run_from_pipeline_func(self, 
pipeline_func:Callable, 
arguments:Mapping[str, str], 
run_name:Union[str, NoneType]=None, 
experiment_name:Union[str, NoneType]=None, 
pipeline_conf:Union[kfp.dsl._pipeline.PipelineConf, NoneType]=None, 
namespace:Union[str, NoneType]=None)
功能:
Runs pipeline on KFP-enabled Kubernetes cluster.
在啟用KFP的Kubernetes群集上運作pipeline。
This command compiles the pipeline function, creates or gets an experiment and submits the pipeline for execution.

參數:
(1)pipeline_func: 必填
A function that describes a pipeline by calling components and composing them into execution graph.
(2)arguments: 必填
Arguments to the pipeline function provided as a dict.

(3)run_name: Optional. 可選
Name of the run to be shown in the UI.
(4)experiment_name: Optional. 可選
Name of the experiment to add the run to.
(5)pipeline_conf: Optional. 可選
Pipeline configuration ops that will be applied to all the ops in the pipeline func.
(6)namespace: Kubernetes namespace where the pipeline runs are created.
  For single user deployment, leave it as None;
  For multi user, input a namespace where the user is authorized
           

1.3 消費常量參數

#pip install kfp

#docker pull tensorflow/tensorflow:1.13.2-py3

(1)指定基礎鏡像

直接列印常量參數

import kfp
from kfp.components import func_to_container_op

def print_fun():
    print("hello world")

if __name__ == '__main__':
	image_name = "tensorflow/tensorflow:1.13.2-py3"
    my_op = func_to_container_op(print_fun,base_image=image_name)
    kfp.Client().create_run_from_pipeline_func(my_op, arguments={})
           

(2)使用修飾器

預設會拉取鏡像python:3.7

import kfp
from kfp.components import func_to_container_op

@func_to_container_op
def print_fun(text:str):
    print(text)
    
def use():
    print_fun("hello lucy")
    
if __name__ == '__main__':
    kfp.Client().create_run_from_pipeline_func(use, arguments={})
           

1.4 消費變量參數

(1)指定基礎鏡像

import kfp
from kfp.components import func_to_container_op

def use(text:str):
    print(text)
    
if __name__ == '__main__':
    image_name = "tensorflow/tensorflow:1.13.2-py3"
    my_op = func_to_container_op(use,base_image=image_name)
    kfp.Client().create_run_from_pipeline_func(my_op, arguments={"text":"hello you"})
           

(2)使用修飾器

預設會拉取鏡像python:3.7

import kfp
from kfp.components import func_to_container_op

@func_to_container_op
def print_fun(text:str):
    print(text)
    
def use(text:str):
    print_fun(text)
    
if __name__ == '__main__':
    kfp.Client().create_run_from_pipeline_func(use, arguments={"text":"hello me"})
           

1.5 生産單參數資料

import kfp
from kfp.components import func_to_container_op

@func_to_container_op
def print_fun(text:str):
    print(text)

@func_to_container_op
def produce_fun() -> str:
    return 'Hello world'

def use():
    produce_task = produce_fun()
    # task.output only works for single-output components
    consume_task1 = print_fun(produce_task.output)
    # task.outputs[...] always works
    consume_task2 = print_fun(produce_task.outputs['output']) 

if __name__ == '__main__':
    kfp.Client().create_run_from_pipeline_func(use, arguments={})
           

定義了元件print_fun,預設使用鏡像python:3.7

定義了元件produce_fun,預設使用鏡像python:3.7

函數use制定了pipeline,資料從生産傳到消費。

kubeflow-8-pipeline中的資料傳遞1 小資料2 大資料3 多輸出

1.6 生産多參數資料

import kfp
from kfp.components import func_to_container_op
from typing import NamedTuple

@func_to_container_op
def consume_one_argument(text: str):
    print('name={}'.format(text))

@func_to_container_op
def produce_one_output() ->str:
    return "lucy"


@func_to_container_op
def consume_two_arguments(text: str, number: int):
    print('name={}'.format(text))
    print('age={}'.format(str(number)))

@func_to_container_op
def produce_two_outputs() -> NamedTuple('Outputs', [('name', str), ('age', int)]):
    return ("lily", 24)

def use(text: str = "kerry"):
    produce_task1 = produce_one_output()
    produce_task2 = produce_two_outputs()
    # task.output only works for single-output components
    consume_task1 = consume_one_argument(produce_task1.output)
    # task.outputs[...] always works
    consume_task2 = consume_one_argument(produce_task1.outputs['output']) 
    
    consume_task3 = consume_two_arguments(produce_task2.outputs['name'], produce_task2.outputs['age']) 
    consume_task4 = consume_two_arguments(text, produce_task2.outputs['age']) 
    consume_task5 = consume_two_arguments(produce_task1.outputs['output'], produce_task2.outputs['age']) 
    
if __name__ == '__main__':
    kfp.Client().create_run_from_pipeline_func(use, arguments={})
           
kubeflow-8-pipeline中的資料傳遞1 小資料2 大資料3 多輸出

2 大資料

(1)Bigger data should be read from files and written to files.

較大的資料應該從檔案中讀取并寫入檔案。

(2)The paths for the input and output files are chosen by the system and are passed into the function (as strings).

輸入和輸出檔案的路徑由系統選擇,并作為字元串傳遞到函數中。

(3)Use the InputPath parameter annotation to tell the system that the function wants to consume the corresponding input data as a file. The system will download the data, write it to a local file and then pass the path of that file to the function.

使用InputPath參數注釋告訴系統函數希望将相應的輸入資料作為檔案使用。系統将下載下傳資料,将其寫入本地檔案,然後将該檔案的路徑傳遞給函數。

(4)Use the OutputPath parameter annotation to tell the system that the function wants to produce the corresponding output data as a file. The system will prepare and pass the path of a file where the function should write the output data. After the function exits, the system will upload the data to the storage system so that it can be passed to downstream components.

使用OutputPath參數注釋告訴系統函數希望以檔案形式生成相應的輸出資料。系統将準備并傳遞函數應在其中寫入輸出資料的檔案路徑。函數退出後,系統會将資料上載到存儲系統,以便将其傳遞給下遊元件。

(5)You can specify the type of the consumed/produced data by specifying the type argument to InputPath and OutputPath. The type can be a python type or an arbitrary type name string. OutputPath(‘TFModel’) means that the function states that the data it has written to a file has type ‘TFModel’. InputPath(‘TFModel’) means that the function states that it expect the data it reads from a file to have type ‘TFModel’. When the pipeline author connects inputs to outputs the system checks whether the types match.

通過将type參數指定給InputPath和OutputPath,可以指定已使用/生成的資料的類型。類型可以是python類型或任意類型名字元串。OutputPath(‘TFModel’)表示函數聲明它寫入檔案的資料的類型為’TFModel’。InputPath(‘TFModel’)表示函數聲明它希望從檔案中讀取的資料的類型為’TFModel’。當管道作者将輸入連接配接到輸出時,系統檢查類型是否比對。

Note on input/output names: 
When the function is converted to component, 
the input and output names generally follow the parameter names, 
but the "_path" and "_file" suffixes are stripped from file/path inputs and outputs. 
E.g. the number_file_path: InputPath(int) parameter becomes the number: int input. 
This makes the argument passing look more natural: number=42 instead of number_file_path=42.
           

2.1 kfp.components.InputPath

class InputPath(builtins.object)
When creating component from function, 
:class:`.InputPath` should be used as function parameter annotation 
to tell the system to pass the *data file path* to the function 
instead of passing the actual data.

__init__(self, type=None)
Initialize self.  See help(type(self)) for accurate signature.
           

2.2 kfp.components.OutputPath

class OutputPath(builtins.object)
When creating component from function, 
:class:`.OutputPath` should be used as function parameter annotation
 to tell the system that the function wants to output data by writing it into a file with the given path 
 instead of returning the data from the function.
__init__(self, type=None)
Initialize self.  See help(type(self)) for accurate signature.
           

2.3 大資料讀寫

from typing import NamedTuple
import kfp
from kfp.components import InputPath, InputTextFile, OutputPath, OutputTextFile
from kfp.components import func_to_container_op

# Writing bigger data
@func_to_container_op
def produce_data(line: str, output_text_path: OutputPath(str), count: int = 10):
    '''Repeat the line specified number of times'''
    with open(output_text_path, 'w') as fw:
        for i in range(count):
            fw.write(line + '\n')


# Reading bigger data
@func_to_container_op
def consume_data(text_path: InputPath()): # The "text" input is untyped so that any data can be printed
    '''Print text'''
    with open(text_path, 'r') as reader:
        for line in reader:
            print(line, end = '')

def print_repeating_lines_pipeline():
    produce_task = produce_data(line='world', count=20)
    consume_task = consume_data(produce_task.output) # Don't forget .output !

   
if __name__ == '__main__':
    kfp.Client().create_run_from_pipeline_func(print_repeating_lines_pipeline, arguments={})
           
kubeflow-8-pipeline中的資料傳遞1 小資料2 大資料3 多輸出

3 多輸出

how to make a component with multiple outputs using the Pipelines SDK.

3.1 kfp.compiler.build_python_component

build_python_component(component_func:Callable, 
target_image:str, 
base_image:Union[str, NoneType]=None, 
dependency:List[str]=[], 
staging_gcs_path:Union[str, NoneType]=None, 
timeout:int=600, 
namespace:Union[str, NoneType]=None, 
target_component_file:Union[str, NoneType]=None, 
python_version:str='python3', 
is_v2:bool=False)
automatically builds a container image for the
component_func based on the base_image and pushes to the target_image.
參數:
(1)component_func (python function): 
The python function to build components upon.
(2)base_image (str): 
Docker image to use as a base image.
(3)target_image (str): 
The target image path.
Full URI to push the target image.
(4)staging_gcs_path (str): GCS blob that can store temporary build files.
(5)timeout (int): 
The timeout for the image build(in secs), default is 600 seconds.
(6)namespace (str): 
The namespace within which to run the kubernetes Kaniko job. If the job is running on GKE and value is None the underlying functions will use the default namespace from GKE.
(7)dependency (list): 
The list of VersionedDependency, which includes the package name and versions, default is empty.
(8)target_component_file (str): 
The path to save the generated component YAML spec.
(9)python_version (str): Choose python2 or python3, default is python3
is_v2: Whether or not generating a v2 KFP component, default is false.
           

3.2 func_to_container_op

(1)修飾器

會預設下載下傳python:3.7鏡像

import kfp 
from typing import NamedTuple
from kfp.components import func_to_container_op

@func_to_container_op
def product_sum(a: float, b: float) -> NamedTuple('hahah', [('product', float), ('sum', float)]):
    '''Returns the product and sum of two numbers'''
    return (a*b, a+b)


@kfp.dsl.pipeline(
    name='Multiple Outputs Pipeline',
    description='Sample pipeline to showcase multiple outputs'
)
def pipeline(a=2.0, b=2.5, c=3.0):
    prod_sum_task = product_sum(a, b)
    prod_sum_task2 = product_sum(b, c)
    prod_sum_task3 = product_sum(prod_sum_task.outputs['product'],
                                    prod_sum_task2.outputs['sum'])
if __name__ == '__main__':
    arguments = { 'a': 2,'b': 3,'c': 4}
    kfp.Client().create_run_from_pipeline_func(pipeline, arguments=arguments)
           

(2)指定鏡像

會使用指定的鏡像

import kfp 
from typing import NamedTuple
from kfp.components import func_to_container_op


def product_sum(a: float, b: float) -> NamedTuple('hahah', [('product', float), ('sum', float)]):
    '''Returns the product and sum of two numbers'''
    return (a*b, a+b)

image_name = "tensorflow/tensorflow:1.13.2-py3"
my_op = func_to_container_op(product_sum,base_image = image_name)

@kfp.dsl.pipeline(
    name='Multiple Outputs Pipeline',
    description='Sample pipeline to showcase multiple outputs'
)
def my_pipeline(a=2.0, b=2.5, c=3.0):
    task1 = my_op(a, b)
    task2 = my_op(b, c)
    task3 = my_op(task1.outputs['product'],task2.outputs['sum'])
    
if __name__ == '__main__':
    arguments = { 'a': 20,'b': 30,'c': 40}
    kfp.Client().create_run_from_pipeline_func(my_pipeline, arguments=arguments)
           

繼續閱讀