Interactive pipelines

Cloud Pipelines SDK has an interactive mode feature that allows the users to run components one by one and build pipelines interactively, step by step.

Usually, pipelines are built statically, then compiled and sent for execution. However, for experimentation and pipeline or component development, running components interactively provides better experience.

Interactive mode changes the behavior of the component functions. Normally, when you call a component function, it simply returns a Task object which is used to build pipelines statically. However, in interactive mode, when you call a component function, the component starts executing immediately and the function returns an Execution object instead of a Task object.

Prerequisites

Install the Cloud Pipelines SDK: pip install cloud-pipelines.

The default task launcher uses the local Docker installation to execute component tasks. Make sure that Docker is installed and working. See https://docs.docker.com/get-docker/

Activating the interactive mode

To activate the interactive execution mode, call cloud_pipelines.activate_interactive_mode():

import cloud_pipelines

cloud_pipelines.activate_interactive_mode()

That's it. Now you can start executing components. You can pass the component execution outputs to other components, thus creating pipelines interactively.

Note: To deactivate interactive mode, call cloud_pipelines.deactivate_interactive_mode(). Note: You can use interactive mode as Python context: cloud_pipelines.orchestration.runners.InteractiveMode().

Using interactive mode

Creating or loading a component

Components are usually loaded from remote or local component.yaml files.

Methods to load or create components can be found in the cloud_pipelines.components module:

  • load_component_from_url
  • load_component_from_file
  • load_component_from_text
  • create_component_from_func

Each of these functions return a Python function with signature that corresponds to the component inputs.

Here we will just create a new component from a small Python function.

def add_exclamation(message: str = "Hello, world") -> str:
    result = message + "!"
    print(result)
    return result

from cloud_pipelines import components

add_exclamation_op = components.create_component_from_func(add_exclamation)

The resulting add_exclamation_op is a Python callable/function.

Learn more about creating a component from a Python function.

Running the component interactively

Usually the component function just returns a Task object that is used for building pipelines statically. However in interactive mode, the component starts running right away and the function returns an Execution object.

execution = add_exclamation_op(message="Hi")

You should see the following log:

2023-02-06 07:18:37.984659: [Add exclamation] Starting container task.
2023-02-06 07:18:40.474181: [Add exclamation] Hi!
2023-02-06 07:18:40.998756: [Add exclamation] Container task completed with status: Succeeded

Check the component execution outputs (Artifact objects):

output1 = execution.outputs["Output"]
print(output1.materialize())

You should see the following log:

Hi!

Orchestration

Executions

When you run component task, the Runner immediately creates and returns an Execution object while the execution runs in the background. Execution object has .outputs attribute that holds the dictionary of the component execution's output artifacts. The output artifacts can be passed to components which creates new executions.

The execution.wait_for_completion() method blocks until the execution succeeds or fails.

Artifacts

Each Artifact points to already existing or future data (file or directory).

The artifact.download([path=...]) method downloads artifact to explicit or temporary local location and returns the path.

The artifact.type_spec attribute holds the artifact type.

The artifact.materialize() method can convert the artifact data to a Python object for a small list of known types:

  • String -> str
  • Integer -> int
  • Float -> float
  • Boolean -> bool
  • JsonArray -> list
  • JsonObject -> dict

Extra supported types:

  • ApacheParquet -> pandas.DataFrame
  • TensorflowSavedModel -> TensorFlow or Keras module

The same types are supported when passing Python objects into a component.

Note: The artifact.download() and artifact.materialize() methods wait for the artifacts to be produced if they are not ready yet.

Features

Execution caching and reuse

Successfully completed executions are put in cache. If the same task (same components, same input arguments) is submitted for execution again, the result would be reused from cache instead of doing the work again.

Running execution caching and reuse

Executions that are still running are put in a special cache too. If some task is submitted for execution while an identical task (same components, same input arguments) is being executed, then this running execution will be reused instead of doing the work again.

Running locally and in cloud

Local launchers

Docker

Docker launcher uses the local Docker installation to execute component tasks. Make sure that Docker is installed and working. See https://docs.docker.com/get-docker/

This is the default task launcher, so you do not need to select it explicitly. To configure Cloud Pipelines to use the Docker task launcher use the following code:

from cloud_pipelines.orchestration.launchers.local_docker_launcher import DockerContainerLauncher
cloud_pipelines.activate_interactive_mode(
    task_launcher=DockerContainerLauncher(),
    # Optional:
    root_uri="/some/local/path",  # The location where the system can put the artifact data and the DB
)

Google Cloud

There are several task launchers based on Google Cloud services. Launchers are being actively developed and some are still in experimental stage.

Make sure that you have set up access to Google Cloud using gcloud (e.g. gcloud auth login).

Google Cloud services work with data stored in Google Cloud Storage, so you will need Google Cloud Storage bucket.

Set the root_uri to some directory in a Google Cloud Storage bucket when using task launchers based on Google Cloud services.

Google Cloud Batch

Google Cloud Batch is a fully managed batch service to schedule, queue, and execute batch jobs on Google's infrastructure.

To configure Cloud Pipelines to use the Google Cloud Batch task launcher use the following code:

from cloud_pipelines.orchestration.launchers.google_cloud_batch_launcher import GoogleCloudBatchLauncher
cloud_pipelines.activate_interactive_mode(
    task_launcher=GoogleCloudBatchLauncher(),
    root_uri="gs://<bucket>/<root_dir>",
)

Google Cloud Vertex AI CustomJob

Google Cloud Vertex AI helps users build, deploy, and scale machine learning (ML) models faster, with fully managed ML tools for any use case. Google Cloud Vertex AI CustomJob service is the basic way to run containerized code in Vertex AI.

To configure Cloud Pipelines to use the Google Cloud Vertex AI CustomJob task launcher use the following code:

from cloud_pipelines.orchestration.launchers.google_cloud_vertex_custom_job_launcher import GoogleCloudVertexAiCustomJobLauncher

cloud_pipelines.activate_interactive_mode(
    task_launcher=GoogleCloudVertexAiCustomJobLauncher(),
    root_uri="gs://<bucket>/<root_dir>",
)

Examples

Passing and materializing Pandas DataFrame

import cloud_pipelines
from cloud_pipelines.components import create_component_from_func, InputPath, OutputPath

cloud_pipelines.activate_interactive_mode()


# Creating a new component
def select_columns(
    table_path: InputPath("ApacheParquet"),
    output_table_path: OutputPath("ApacheParquet"),
    column_names: list,
):
    import pandas

    df = pandas.read_parquet(table_path)
    print("Input table:")
    df.info()
    df = df[column_names]
    print("Output table:")
    df.info()
    df.to_parquet(output_table_path, index=False)

select_columns_op = create_component_from_func(
    func=select_columns,
    packages_to_install=["pandas==1.3.5", "pyarrow==10.0.1"],
)

# Using the component:

# Preparing the input data
import pandas
input_df = pandas.DataFrame(
    {
        "feature1": [1, 2, 3, 4, 5],
        "feature2": [0.1, 0.2, 0.3, 0.4, 0.5],
        "feature3": ["a", "b", "c", "d", "e"],
    }
)

# Running the component
output_art = select_columns_op(
    table=input_df, column_names=["feature1", "feature2"]
).outputs["output_table"]

output_art2 = select_columns_op(
    table=output_art, column_names=["feature2"]
).outputs["output_table"]

# Checking the output data
output_df = output_art.materialize()
assert output_df.columns.to_list() == ["feature1", "feature2"]

output_df2 = output_art2.materialize()
assert output_df2.columns.to_list() == ["feature2"]

cloud_pipelines.deactivate_interactive_mode()

You should see the following log:

2023-02-06 07:20:59.575396: [Select columns] Starting container task.
2023-02-06 07:21:18.629752: [Select columns] Input table:
2023-02-06 07:21:18.638000: [Select columns] <class 'pandas.core.frame.DataFrame'>
2023-02-06 07:21:18.639234: [Select columns] RangeIndex: 5 entries, 0 to 4
2023-02-06 07:21:18.640105: [Select columns] Data columns (total 3 columns):
2023-02-06 07:21:18.640904: [Select columns]  #   Column    Non-Null Count  Dtype
2023-02-06 07:21:18.641821: [Select columns] ---  ------    --------------  -----
2023-02-06 07:21:18.642746: [Select columns]  0   feature1  5 non-null      int64
2023-02-06 07:21:18.643615: [Select columns]  1   feature2  5 non-null      float64
2023-02-06 07:21:18.644713: [Select columns]  2   feature3  5 non-null      object
2023-02-06 07:21:18.645470: [Select columns] dtypes: float64(1), int64(1), object(1)
2023-02-06 07:21:18.646253: [Select columns] memory usage: 248.0+ bytes
2023-02-06 07:21:18.647397: [Select columns] Output table:
2023-02-06 07:21:18.648472: [Select columns] <class 'pandas.core.frame.DataFrame'>
2023-02-06 07:21:18.649203: [Select columns] RangeIndex: 5 entries, 0 to 4
2023-02-06 07:21:18.650094: [Select columns] Data columns (total 2 columns):
2023-02-06 07:21:18.650899: [Select columns]  #   Column    Non-Null Count  Dtype
2023-02-06 07:21:18.651725: [Select columns] ---  ------    --------------  -----
2023-02-06 07:21:18.652832: [Select columns]  0   feature1  5 non-null      int64
2023-02-06 07:21:18.653717: [Select columns]  1   feature2  5 non-null      float64
2023-02-06 07:21:18.654761: [Select columns] dtypes: float64(1), int64(1)
2023-02-06 07:21:18.655600: [Select columns] memory usage: 208.0 bytes
2023-02-06 07:21:21.214023: [Select columns] Container task completed with status: Succeeded
2023-02-06 07:21:21.219940: [Select columns 2] Starting container task.
2023-02-06 07:21:37.051625: [Select columns 2] Input table:
2023-02-06 07:21:37.059715: [Select columns 2] <class 'pandas.core.frame.DataFrame'>
2023-02-06 07:21:37.061031: [Select columns 2] RangeIndex: 5 entries, 0 to 4
2023-02-06 07:21:37.062099: [Select columns 2] Data columns (total 2 columns):
2023-02-06 07:21:37.063600: [Select columns 2]  #   Column    Non-Null Count  Dtype
2023-02-06 07:21:37.064602: [Select columns 2] ---  ------    --------------  -----
2023-02-06 07:21:37.065859: [Select columns 2]  0   feature1  5 non-null      int64
2023-02-06 07:21:37.066878: [Select columns 2]  1   feature2  5 non-null      float64
2023-02-06 07:21:37.068062: [Select columns 2] dtypes: float64(1), int64(1)
2023-02-06 07:21:37.069451: [Select columns 2] memory usage: 208.0 bytes
2023-02-06 07:21:37.070592: [Select columns 2] Output table:
2023-02-06 07:21:37.071812: [Select columns 2] <class 'pandas.core.frame.DataFrame'>
2023-02-06 07:21:37.072785: [Select columns 2] RangeIndex: 5 entries, 0 to 4
2023-02-06 07:21:37.073734: [Select columns 2] Data columns (total 1 columns):
2023-02-06 07:21:37.074751: [Select columns 2]  #   Column    Non-Null Count  Dtype
2023-02-06 07:21:37.075664: [Select columns 2] ---  ------    --------------  -----
2023-02-06 07:21:37.077513: [Select columns 2]  0   feature2  5 non-null      float64
2023-02-06 07:21:37.078540: [Select columns 2] dtypes: float64(1)
2023-02-06 07:21:37.079618: [Select columns 2] memory usage: 168.0 bytes
2023-02-06 07:21:39.645323: [Select columns 2] Container task completed with status: Succeeded

Passing and materializing TensorFlow model

import cloud_pipelines
from cloud_pipelines.components import create_component_from_func, InputPath, OutputPath

cloud_pipelines.activate_interactive_mode()


# Creating a new component
def transform_keras_model(
    model_path: InputPath("TensorflowSavedModel"),
    output_model_path: OutputPath("TensorflowSavedModel"),
):
    import tensorflow

    model = tensorflow.keras.models.load_model(filepath=model_path)
    model.summary()
    tensorflow.keras.models.save_model(model=model, filepath=output_model_path)

transform_keras_model_op = create_component_from_func(
    func=transform_keras_model,
    base_image="tensorflow/tensorflow:2.11.0",
)

# Using the component:

# Preparing the input data
import tensorflow as tf
input_model = tf.keras.Sequential([
    tf.keras.layers.Dense(5, input_shape=(3,)),
    tf.keras.layers.Softmax(),
])

# Running the component
output_art = transform_keras_model_op(model=input_model).outputs["output_model"]

# Checking the output data
output_model = output_art.materialize()
predictions = output_model(tf.constant([[0.1, 0.2, 0.3]]))
print(predictions)
assert predictions.shape == (1, 5)

cloud_pipelines.deactivate_interactive_mode()

You should see the following log:

2023-02-06 07:28:23.274859: [Transform keras model] Starting container task.
2023-02-06 07:28:26.564806: [Transform keras model] Model: "sequential_2"
2023-02-06 07:28:26.566027: [Transform keras model] _________________________________________________________________
2023-02-06 07:28:26.567347: [Transform keras model]  Layer (type)                Output Shape              Param #
2023-02-06 07:28:26.568419: [Transform keras model] =================================================================
2023-02-06 07:28:26.569426: [Transform keras model]  dense_2 (Dense)             (None, 5)                 20
2023-02-06 07:28:26.570448: [Transform keras model]
2023-02-06 07:28:26.571461: [Transform keras model]  softmax_2 (Softmax)         (None, 5)                 0
2023-02-06 07:28:26.572642: [Transform keras model]
2023-02-06 07:28:26.575428: [Transform keras model] =================================================================
2023-02-06 07:28:26.576851: [Transform keras model] Total params: 20
2023-02-06 07:28:26.578012: [Transform keras model] Trainable params: 20
2023-02-06 07:28:26.579110: [Transform keras model] Non-trainable params: 0
2023-02-06 07:28:26.580276: [Transform keras model] _________________________________________________________________
2023-02-06 07:28:27.912004: [Transform keras model] Container task completed with status: Succeeded
tf.Tensor([[0.17777765 0.16397926 0.18161307 0.22165933 0.2549707 ]], shape=(1, 5), dtype=float32)

End-to-end model training pipeline

import cloud_pipelines
from cloud_pipelines import components

cloud_pipelines.activate_interactive_mode()


# Loading components:
download_from_gcs_op = components.load_component_from_url("https://raw.githubusercontent.com/Ark-kun/pipeline_components/d8c4cf5e6403bc65bcf8d606e6baf87e2528a3dc/components/google-cloud/storage/download/component.yaml")
select_columns_using_Pandas_on_CSV_data_op = components.load_component_from_url("https://raw.githubusercontent.com/Ark-kun/pipeline_components/8c78aae096806cff3bc331a40566f42f5c3e9d4b/components/pandas/Select_columns/in_CSV_format/component.yaml")
fill_all_missing_values_using_Pandas_on_CSV_data_op = components.load_component_from_url("https://raw.githubusercontent.com/Ark-kun/pipeline_components/23405971f5f16a41b16c343129b893c52e4d1d48/components/pandas/Fill_all_missing_values/in_CSV_format/component.yaml")
split_rows_into_subsets_op = components.load_component_from_url("https://raw.githubusercontent.com/Ark-kun/pipeline_components/daae5a4abaa35e44501818b1534ed7827d7da073/components/dataset_manipulation/Split_rows_into_subsets/in_CSV/component.yaml")
create_fully_connected_tensorflow_network_op = components.load_component_from_url("https://raw.githubusercontent.com/Ark-kun/pipeline_components/9ca0f9eecf5f896f65b8538bbd809747052617d1/components/tensorflow/Create_fully_connected_network/component.yaml")
train_model_using_Keras_on_CSV_op = components.load_component_from_url("https://raw.githubusercontent.com/Ark-kun/pipeline_components/c504a4010348c50eaaf6d4337586ccc008f4dcef/components/tensorflow/Train_model_using_Keras/on_CSV/component.yaml")
predict_with_TensorFlow_model_on_CSV_data_op = components.load_component_from_url("https://raw.githubusercontent.com/Ark-kun/pipeline_components/59c759ce6f543184e30db6817d2a703879bc0f39/components/tensorflow/Predict/on_CSV/component.yaml")

# The end-to-end model training pipeline

dataset_gcs_uri = "gs://ml-pipeline-dataset/Chicago_taxi_trips/chicago_taxi_trips_2019-01-01_-_2019-02-01_limit=10000.csv"
feature_columns = ["trip_seconds", "trip_miles", "pickup_community_area", "dropoff_community_area", "fare", "tolls", "extras"]  # Excluded "trip_total"
label_column = "tips"
training_set_fraction = 0.8

all_columns = [label_column] + feature_columns

dataset = download_from_gcs_op(
    gcs_path=dataset_gcs_uri
).outputs["Data"]

dataset = select_columns_using_Pandas_on_CSV_data_op(
    table=dataset,
    column_names=all_columns,
).outputs["transformed_table"]

dataset = fill_all_missing_values_using_Pandas_on_CSV_data_op(
    table=dataset,
    replacement_value="0",
    # # Optional:
    # column_names=None,  # =[...]
).outputs["transformed_table"]

split_task = split_rows_into_subsets_op(
    table=dataset,
    fraction_1=training_set_fraction,
)
training_data = split_task.outputs["split_1"]
testing_data = split_task.outputs["split_2"]

network = create_fully_connected_tensorflow_network_op(
    input_size=len(feature_columns),
    # Optional:
    hidden_layer_sizes=[10],
    activation_name="elu",
    # output_activation_name=None,
    # output_size=1,
).outputs["model"]

model = train_model_using_Keras_on_CSV_op(
    training_data=training_data,
    model=network,
    label_column_name=label_column,
    # Optional:
    # loss_function_name="mean_squared_error",
    number_of_epochs=10,
    # learning_rate=0.1,
    # optimizer_name="Adadelta",
    # optimizer_parameters={},
    # batch_size=32,
    metric_names=["mean_absolute_error"],
    # random_seed=0,
).outputs["trained_model"]

predictions = predict_with_TensorFlow_model_on_CSV_data_op(
    dataset=testing_data,
    model=model,
    # label_column_name needs to be set when doing prediction on a dataset that has labels
    label_column_name=label_column,
    # Optional:
    # batch_size=1000,
).outputs["predictions"]

# Inspecting the trained model (requires TensorFlow to be installed):
tf_model = model.materialize()
predictions = tf_model.predict([[100, 10, 0, 0, 30, 0, 0]])
print(predictions)

You should see the following log:

2023-02-06 07:50:05.939270: [Download from GCS] Starting container task.
2023-02-06 07:50:05.958715: [Create fully connected tensorflow network] Starting container task.
2023-02-06 07:50:27.954952: [Download from GCS] Copying gs://ml-pipeline-dataset/Chicago_taxi_trips/chicago_taxi_trips_2019-01-01_-_2019-02-01_limit=10000.csv...
/ [1/1 files][  4.0 MiB/  4.0 MiB] 100% DoneCS] / [0/1 files][    0.0 B/  4.0 MiB]   0% Done
2023-02-06 07:50:29.098814: [Download from GCS] Container task completed with status: Succeeded
2023-02-06 07:50:29.105666: [Select columns using Pandas on CSV data] Starting container task.
2023-02-06 07:50:31.304809: [Create fully connected tensorflow network] Model: "sequential"
2023-02-06 07:50:31.306510: [Create fully connected tensorflow network] _________________________________________________________________
2023-02-06 07:50:31.308110: [Create fully connected tensorflow network]  Layer (type)                Output Shape              Param #
2023-02-06 07:50:31.309528: [Create fully connected tensorflow network] =================================================================
2023-02-06 07:50:31.310855: [Create fully connected tensorflow network]  dense (Dense)               (None, 10)                80
2023-02-06 07:50:31.312091: [Create fully connected tensorflow network]
2023-02-06 07:50:31.313317: [Create fully connected tensorflow network]  dense_1 (Dense)             (None, 1)                 11
2023-02-06 07:50:31.314453: [Create fully connected tensorflow network]
2023-02-06 07:50:31.315585: [Create fully connected tensorflow network] =================================================================
2023-02-06 07:50:31.316750: [Create fully connected tensorflow network] Total params: 91
2023-02-06 07:50:31.318417: [Create fully connected tensorflow network] Trainable params: 91
2023-02-06 07:50:31.319557: [Create fully connected tensorflow network] Non-trainable params: 0
2023-02-06 07:50:31.320711: [Create fully connected tensorflow network] _________________________________________________________________
2023-02-06 07:50:31.321628: [Create fully connected tensorflow network] None
2023-02-06 07:50:32.675023: [Create fully connected tensorflow network] Container task completed with status: Succeeded
2023-02-06 07:50:48.864645: [Select columns using Pandas on CSV data] Container task completed with status: Succeeded
2023-02-06 07:50:48.871253: [Fill all missing values using Pandas on CSV data] Starting container task.
2023-02-06 07:51:04.675086: [Fill all missing values using Pandas on CSV data] Container task completed with status: Succeeded
2023-02-06 07:51:04.682334: [Split rows into subsets] Starting container task.
2023-02-06 07:51:05.849733: [Split rows into subsets] Container task completed with status: Succeeded
2023-02-06 07:51:05.859774: [Train model using Keras on CSV] Starting container task.
2023-02-06 07:51:30.908289: [Train model using Keras on CSV] Epoch 1/10
250/250 [==============================] - 1s 3ms/step - loss: 466542.4375 - mean_absolute_error: 255.4936
2023-02-06 07:51:32.240018: [Train model using Keras on CSV] Epoch 2/10
250/250 [==============================] - 0s 2ms/step - loss: 438178.6250 - mean_absolute_error: 244.0726
2023-02-06 07:51:32.765705: [Train model using Keras on CSV] Epoch 3/10
250/250 [==============================] - 0s 2ms/step - loss: 420882.3125 - mean_absolute_error: 236.5558
2023-02-06 07:51:33.282513: [Train model using Keras on CSV] Epoch 4/10
250/250 [==============================] - 0s 2ms/step - loss: 400669.0625 - mean_absolute_error: 228.6952
2023-02-06 07:51:33.818343: [Train model using Keras on CSV] Epoch 5/10
250/250 [==============================] - 0s 1ms/step - loss: 389529.3125 - mean_absolute_error: 221.8236
2023-02-06 07:51:34.364360: [Train model using Keras on CSV] Epoch 6/10
250/250 [==============================] - 0s 2ms/step - loss: 369576.1875 - mean_absolute_error: 214.7483
2023-02-06 07:51:34.902718: [Train model using Keras on CSV] Epoch 7/10
250/250 [==============================] - 0s 1ms/step - loss: 359488.5938 - mean_absolute_error: 208.7065
2023-02-06 07:51:35.421358: [Train model using Keras on CSV] Epoch 8/10
250/250 [==============================] - 0s 1ms/step - loss: 344714.1250 - mean_absolute_error: 202.4835
2023-02-06 07:51:36.059533: [Train model using Keras on CSV] Epoch 9/10
250/250 [==============================] - 0s 2ms/step - loss: 332770.8438 - mean_absolute_error: 196.7609
2023-02-06 07:51:36.600948: [Train model using Keras on CSV] Epoch 10/10
250/250 [==============================] - 0s 1ms/step - loss: 320756.7500 - mean_absolute_error: 191.5481
2023-02-06 07:51:38.626267: [Train model using Keras on CSV] Container task completed with status: Succeeded
2023-02-06 07:51:38.635764: [Predict with TensorFlow model on CSV data] Starting container task.
2023-02-06 07:52:05.156470: [Predict with TensorFlow model on CSV data] Container task completed with status: Succeeded

1/1 [==============================] - 2s 2s/step
[[19.48053]]