将超参数调优升级到 SDK v2

在 SDK v2 中,优化超参数已合并到作业中。

作业有不同类型。 大多数作业都是运行 command 的命令作业,例如 python main.py。 作业中运行的内容与任何编程语言无关,因此你可以运行 bash 脚本、调用 python 解释器、运行一组 curl 命令或其他任何内容。

扫描作业是另一种类型的作业,它定义扫描设置,你可以通过调用命令的扫描方法来启动它。

若要升级,需要更改用于定义超参数优化试验并将其提交到 SDK v2 的代码。 作业运行的内容不需要升级到 SDK v2。 但是,建议从模型训练脚本中删除任何特定于 Azure 机器学习的代码。 这种分离便于更轻松地在本地和云之间进行转换,并且被认为是成熟 MLOps 的最佳做法。 实际上,这意味着删除 azureml.* 代码行。 模型日志记录和跟踪代码应替换为 MLflow。 有关详细信息,请参阅如何在 v2 中使用 MLflow

本文比较 SDK v1 和 SDK v2 中的方案。

在试验中运行超参数优化

  • SDK v1

    from azureml.core import ScriptRunConfig, Experiment, Workspace
    from azureml.train.hyperdrive import RandomParameterSampling, BanditPolicy, HyperDriveConfig, PrimaryMetricGoal
    from azureml.train.hyperdrive import choice, loguniform
    
    dataset = Dataset.get_by_name(ws, 'mnist-dataset')
    
    # list the files referenced by mnist dataset
    dataset.to_path()
    
    #define the search space for your hyperparameters
    param_sampling = RandomParameterSampling(
        {
            '--batch-size': choice(25, 50, 100),
            '--first-layer-neurons': choice(10, 50, 200, 300, 500),
            '--second-layer-neurons': choice(10, 50, 200, 500),
            '--learning-rate': loguniform(-6, -1)
        }
    )
    
    args = ['--data-folder', dataset.as_named_input('mnist').as_mount()]
    
    #Set up your script run
    src = ScriptRunConfig(source_directory=script_folder,
                          script='keras_mnist.py',
                          arguments=args,
                          compute_target=compute_target,
                          environment=keras_env)
    
    # Set early stopping on this one
    early_termination_policy = BanditPolicy(evaluation_interval=2, slack_factor=0.1)
    
    # Define the configurations for your hyperparameter tuning experiment
    hyperdrive_config = HyperDriveConfig(run_config=src,
                                         hyperparameter_sampling=param_sampling,
                                         policy=early_termination_policy,
                                         primary_metric_name='Accuracy',
                                         primary_metric_goal=PrimaryMetricGoal.MAXIMIZE,
                                         max_total_runs=20,
                                         max_concurrent_runs=4)
    # Specify your experiment details                                     
    experiment = Experiment(workspace, experiment_name)
    
    hyperdrive_run = experiment.submit(hyperdrive_config)
    
    #Find the best model
    best_run = hyperdrive_run.get_best_run_by_primary_metric()
    
  • SDK v2

    from azure.ai.ml import MLClient
    from azure.ai.ml import command, Input
    from azure.ai.ml.sweep import Choice, Uniform, MedianStoppingPolicy
    from azure.identity import DefaultAzureCredential
    
    # Create your command
    command_job_for_sweep = command(
        code="./src",
        command="python main.py --iris-csv ${{inputs.iris_csv}} --learning-rate ${{inputs.learning_rate}} --boosting ${{inputs.boosting}}",
        environment="AzureML-lightgbm-3.2-ubuntu18.04-py37-cpu@latest",
        inputs={
            "iris_csv": Input(
                type="uri_file",
                path="https://azuremlexamples.blob.core.chinacloudapi.cn/datasets/iris.csv",
            ),
            #define the search space for your hyperparameters
            "learning_rate": Uniform(min_value=0.01, max_value=0.9),
            "boosting": Choice(values=["gbdt", "dart"]),
        },
        compute="cpu-cluster",
    )
    
    # Call sweep() on your command job to sweep over your parameter expressions
    sweep_job = command_job_for_sweep.sweep(
        compute="cpu-cluster", 
        sampling_algorithm="random",
        primary_metric="test-multi_logloss",
        goal="Minimize",
    )
    
    # Define the limits for this sweep
    sweep_job.set_limits(max_total_trials=20, max_concurrent_trials=10, timeout=7200)
    
    # Set early stopping on this one
    sweep_job.early_termination = MedianStoppingPolicy(delay_evaluation=5, evaluation_interval=2)
    
    # Specify your experiment details
    sweep_job.display_name = "lightgbm-iris-sweep-example"
    sweep_job.experiment_name = "lightgbm-iris-sweep-example"
    sweep_job.description = "Run a hyperparameter sweep job for LightGBM on Iris dataset."
    
    # submit the sweep
    returned_sweep_job = ml_client.create_or_update(sweep_job)
    
    # get a URL for the status of the job
    returned_sweep_job.services["Studio"].endpoint
    
    # Download best trial model output
    ml_client.jobs.download(returned_sweep_job.name, output_name="model")
    

在管道中运行超参数优化

  • SDK v1

    
    tf_env = Environment.get(ws, name='AzureML-TensorFlow-2.0-GPU')
    data_folder = dataset.as_mount()
    src = ScriptRunConfig(source_directory=script_folder,
                          script='tf_mnist.py',
                          arguments=['--data-folder', data_folder],
                          compute_target=compute_target,
                          environment=tf_env)
    
    #Define HyperDrive configs
    ps = RandomParameterSampling(
        {
            '--batch-size': choice(25, 50, 100),
            '--first-layer-neurons': choice(10, 50, 200, 300, 500),
            '--second-layer-neurons': choice(10, 50, 200, 500),
            '--learning-rate': loguniform(-6, -1)
        }
    )
    
    early_termination_policy = BanditPolicy(evaluation_interval=2, slack_factor=0.1)
    
    hd_config = HyperDriveConfig(run_config=src, 
                                 hyperparameter_sampling=ps,
                                 policy=early_termination_policy,
                                 primary_metric_name='validation_acc', 
                                 primary_metric_goal=PrimaryMetricGoal.MAXIMIZE, 
                                 max_total_runs=4,
                                 max_concurrent_runs=4)
    
    metrics_output_name = 'metrics_output'
    metrics_data = PipelineData(name='metrics_data',
                                datastore=datastore,
                                pipeline_output_name=metrics_output_name,
                                training_output=TrainingOutput("Metrics"))
    
    model_output_name = 'model_output'
    saved_model = PipelineData(name='saved_model',
                                datastore=datastore,
                                pipeline_output_name=model_output_name,
                                training_output=TrainingOutput("Model",
                                                               model_file="outputs/model/saved_model.pb"))
    #Create HyperDriveStep
    hd_step_name='hd_step01'
    hd_step = HyperDriveStep(
        name=hd_step_name,
        hyperdrive_config=hd_config,
        inputs=[data_folder],
        outputs=[metrics_data, saved_model])                             
    
    #Find and register best model
    conda_dep = CondaDependencies()
    conda_dep.add_pip_package("azureml-sdk")
    
    rcfg = RunConfiguration(conda_dependencies=conda_dep)
    
    register_model_step = PythonScriptStep(script_name='register_model.py',
                                           name="register_model_step01",
                                           inputs=[saved_model],
                                           compute_target=cpu_cluster,
                                           arguments=["--saved-model", saved_model],
                                           allow_reuse=True,
                                           runconfig=rcfg)
    
    register_model_step.run_after(hd_step)
    
    #Run the pipeline
    pipeline = Pipeline(workspace=ws, steps=[hd_step, register_model_step])
    pipeline_run = exp.submit(pipeline)
    
    
  • SDK v2

    train_component_func = load_component(path="./train.yml")
    score_component_func = load_component(path="./predict.yml")
    
    # define a pipeline
    @pipeline()
    def pipeline_with_hyperparameter_sweep():
        """Tune hyperparameters using sample components."""
        train_model = train_component_func(
            data=Input(
                type="uri_file",
                path="wasbs://datasets@azuremlexamples.blob.core.chinacloudapi.cn/iris.csv",
            ),
            c_value=Uniform(min_value=0.5, max_value=0.9),
            kernel=Choice(["rbf", "linear", "poly"]),
            coef0=Uniform(min_value=0.1, max_value=1),
            degree=3,
            gamma="scale",
            shrinking=False,
            probability=False,
            tol=0.001,
            cache_size=1024,
            verbose=False,
            max_iter=-1,
            decision_function_shape="ovr",
            break_ties=False,
            random_state=42,
        )
        sweep_step = train_model.sweep(
            primary_metric="training_f1_score",
            goal="minimize",
            sampling_algorithm="random",
            compute="cpu-cluster",
        )
        sweep_step.set_limits(max_total_trials=20, max_concurrent_trials=10, timeout=7200)
    
        score_data = score_component_func(
            model=sweep_step.outputs.model_output, test_data=sweep_step.outputs.test_data
        )
    
    
    pipeline_job = pipeline_with_hyperparameter_sweep()
    
    # set pipeline level compute
    pipeline_job.settings.default_compute = "cpu-cluster"
    
    # submit job to workspace
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="pipeline_samples"
    )
    pipeline_job
    

SDK v1 和 SDK v2 中关键功能的映射

SDK v1 中的功能 SDK v2 中的粗略映射
HyperDriveRunConfig() SweepJob()
hyperdrive 包 sweep 包

后续步骤

有关详细信息,请参阅: