## Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#fromtypingimportAny,Dict,List,Optional,Union,cast,TYPE_CHECKINGimportpandasaspdfrompysparkimportkeyword_only,sincefrompyspark.ml.connect.baseimportEstimator,Model,Transformerfrompyspark.ml.connect.io_utilsimport(ParamsReadWrite,MetaAlgorithmReadWrite,)frompyspark.ml.paramimportParam,Paramsfrompyspark.ml.commonimportinherit_docfrompyspark.sql.dataframeimportDataFrameifTYPE_CHECKING:frompyspark.ml._typingimportParamMapclass_PipelineReadWrite(MetaAlgorithmReadWrite):def_get_child_stages(self)->List[Any]:ifisinstance(self,Pipeline):returnlist(self.getStages())elifisinstance(self,PipelineModel):returnlist(self.stages)else:raiseValueError(f"Unknown type {self.__class__}")def_get_skip_saving_params(self)->List[str]:""" Returns params to be skipped when saving metadata. """return["stages"]def_save_meta_algorithm(self,root_path:str,node_path:List[str])->Dict[str,Any]:metadata=self._get_metadata_to_save()metadata["stages"]=[]ifisinstance(self,Pipeline):stages=self.getStages()elifisinstance(self,PipelineModel):stages=self.stageselse:raiseValueError(f"Unknown type {self.__class__}")forstage_index,stageinenumerate(stages):stage_node_path=node_path+[f"pipeline_stage_{stage_index}"]stage_metadata=stage._save_to_node_path(# type: ignore[attr-defined]root_path,stage_node_path)metadata["stages"].append(stage_metadata)returnmetadatadef_load_meta_algorithm(self,root_path:str,node_metadata:Dict[str,Any])->None:stages=[]forstage_metainnode_metadata["stages"]:stage=ParamsReadWrite._load_instance_from_metadata(stage_meta,root_path)stages.append(stage)ifisinstance(self,Pipeline):self.setStages(stages)elifisinstance(self,PipelineModel):self.stages=stageselse:raiseValueError()
[docs]@inherit_docclassPipeline(Estimator["PipelineModel"],_PipelineReadWrite):""" A simple pipeline, which acts as an estimator. A Pipeline consists of a sequence of stages, each of which is either an :py:class:`Estimator` or a :py:class:`Transformer`. When :py:meth:`Pipeline.fit` is called, the stages are executed in order. If a stage is an :py:class:`Estimator`, its :py:meth:`Estimator.fit` method will be called on the input dataset to fit a model. Then the model, which is a transformer, will be used to transform the dataset as the input to the next stage. If a stage is a :py:class:`Transformer`, its :py:meth:`Transformer.transform` method will be called to produce the dataset for the next stage. The fitted model from a :py:class:`Pipeline` is a :py:class:`PipelineModel`, which consists of fitted models and transformers, corresponding to the pipeline stages. If stages is an empty list, the pipeline acts as an identity transformer. .. versionadded:: 3.5.0 Examples -------- >>> from pyspark.ml.connect import Pipeline >>> from pyspark.ml.connect.classification import LogisticRegression >>> from pyspark.ml.connect.feature import StandardScaler >>> scaler = StandardScaler(inputCol='features', outputCol='scaled_features') >>> lor = LogisticRegression(maxIter=20, learningRate=0.01) >>> pipeline=Pipeline(stages=[scaler, lor]) >>> dataset = spark.createDataFrame([ ... ([1.0, 2.0], 1), ... ([2.0, -1.0], 1), ... ([-3.0, -2.0], 0), ... ([-1.0, -2.0], 0), ... ], schema=['features', 'label']) >>> pipeline_model = pipeline.fit(dataset) >>> transformed_dataset = pipeline_model.transform(dataset) >>> transformed_dataset.show() +------------+-----+--------------------+----------+--------------------+ | features|label| scaled_features|prediction| probability| +------------+-----+--------------------+----------+--------------------+ | [1.0, 2.0]| 1|[0.56373452100212...| 1|[0.02423273026943...| | [2.0, -1.0]| 1|[1.01472213780381...| 1|[0.09334788471460...| |[-3.0, -2.0]| 0|[-1.2402159462046...| 0|[0.99808156490325...| |[-1.0, -2.0]| 0|[-0.3382407126012...| 0|[0.96210002899169...| +------------+-----+--------------------+----------+--------------------+ >>> pipeline_model.saveToLocal("/tmp/pipeline") >>> loaded_pipeline_model = PipelineModel.loadFromLocal("/tmp/pipeline") """stages:Param[List[Params]]=Param(Params._dummy(),"stages","a list of pipeline stages")# type: ignore[assignment]_input_kwargs:Dict[str,Any]@keyword_onlydef__init__(self,*,stages:Optional[List[Params]]=None):""" __init__(self, \\*, stages=None) """super(Pipeline,self).__init__()kwargs=self._input_kwargsself.setParams(**kwargs)
[docs]defsetStages(self,value:List[Params])->"Pipeline":""" Set pipeline stages. .. versionadded:: 3.5.0 Parameters ---------- value : list of :py:class:`pyspark.ml.connect.Transformer` or :py:class:`pyspark.ml.connect.Estimator` Returns ------- :py:class:`Pipeline` the pipeline instance """returnself._set(stages=value)
[docs]@since("3.5.0")defgetStages(self)->List[Params]:""" Get pipeline stages. """returnself.getOrDefault(self.stages)
[docs]@keyword_only@since("3.5.0")defsetParams(self,*,stages:Optional[List[Params]]=None)->"Pipeline":""" setParams(self, \\*, stages=None) Sets params for Pipeline. """kwargs=self._input_kwargsreturnself._set(**kwargs)
def_fit(self,dataset:Union[DataFrame,pd.DataFrame])->"PipelineModel":stages=self.getStages()forstageinstages:ifnot(isinstance(stage,Estimator)orisinstance(stage,Transformer)):raiseTypeError("Cannot recognize a pipeline stage of type %s."%type(stage))indexOfLastEstimator=-1fori,stageinenumerate(stages):ifisinstance(stage,Estimator):indexOfLastEstimator=itransformers:List[Transformer]=[]fori,stageinenumerate(stages):ifi<=indexOfLastEstimator:ifisinstance(stage,Transformer):transformers.append(stage)dataset=stage.transform(dataset)else:# must be an Estimatormodel=stage.fit(dataset)# type: ignore[attr-defined]transformers.append(model)ifi<indexOfLastEstimator:dataset=model.transform(dataset)else:transformers.append(cast(Transformer,stage))pipeline_model=PipelineModel(transformers)# type: ignore[arg-type]pipeline_model._resetUid(self.uid)returnpipeline_model
[docs]defcopy(self,extra:Optional["ParamMap"]=None)->"Pipeline":""" Creates a copy of this instance. .. versionadded:: 3.5.0 Parameters ---------- extra : dict, optional extra parameters Returns ------- :py:class:`Pipeline` new instance """ifextraisNone:extra=dict()that=Params.copy(self,extra)stages=[stage.copy(extra)forstageinthat.getStages()]returnthat.setStages(stages)
[docs]@inherit_docclassPipelineModel(Model,_PipelineReadWrite):""" Represents a compiled pipeline with transformers and fitted models. .. versionadded:: 3.5.0 """def__init__(self,stages:Optional[List[Params]]=None):super(PipelineModel,self).__init__()self.stages=stages# type: ignore[assignment]def_transform(self,dataset:Union[DataFrame,pd.DataFrame])->Union[DataFrame,pd.DataFrame]:fortinself.stages:dataset=t.transform(dataset)returndataset
[docs]defcopy(self,extra:Optional["ParamMap"]=None)->"PipelineModel":""" Creates a copy of this instance. .. versionadded:: 3.5.0 :param extra: extra parameters :returns: new instance """ifextraisNone:extra=dict()stages=[stage.copy(extra)forstageinself.stages]returnPipelineModel(stages)