Skip to content

production

cognite.powerops.resync.config.WatercourseConfig

Bases: Watercourse

Represents the configuration for a Watercourse

Attributes:

Name Type Description
version str

The version of the watercourse configuration.

Source code in cognite/powerops/resync/config/production/watercourse.py
class WatercourseConfig(Watercourse):
    """
    Represents the configuration for a Watercourse

    Attributes:
        version: The version of the watercourse configuration.

    """

    model_config: ClassVar[ConfigDict] = ConfigDict(protected_namespaces=tuple())
    version: str
    market_to_price_area: dict[str, str]
    directory: str
    model_raw: str
    # ------------------------------------------------------------------
    shop_model_template: dict[str, Any]
    yaml_raw_path: Path
    yaml_processed_path: Path  # TODO: not used here
    yaml_mapping_path: str = ""
    model_processed: str  # TODO: not used here
    model_mapping: Optional[str] = None
    write_back_model_file: Optional[bool] = True
    tco_paths: Optional[list[str]] = None  # TODO: not used here - HEV specific
    rrs_ids: Optional[list[str]] = None
    hardcoded_mapping: Optional[TimeSeriesMapping] = None  # TODO: not used here
    hist_flow_timeseries: Optional[dict[str, str]] = None  # TODO: not used here
    # ------------------------------------------------------------------
    production_obligation_ts_ext_ids: list[str] = Field(default_factory=list)
    plant_display_names_and_order: dict[str, tuple[str, int]] = Field(default_factory=dict)
    reservoir_display_names_and_order: dict[str, tuple[str, int]] = Field(default_factory=dict)
    water_value_based_method_time_series_csv_filename: Optional[str] = None
    generators: list[Generator] = Field(default_factory=list)

    @classmethod
    def from_yaml(cls, yaml_path: Path) -> list[WatercourseConfig]:
        return [cls(**watercourse_raw) for watercourse_raw in load_yaml(yaml_path)]

    @cached_property
    def valid_shop_objects(self) -> set[tuple[str, str]]:
        if "model" not in self.shop_model_template:
            raise ValueError("Invalid SHOP yaml: Missing 'model' key")
        model = cast(dict[str, Any], self.shop_model_template["model"])
        valid_mappings: set[tuple[str, str]] = set()
        for object_type, objects in model.items():
            if not isinstance(objects, dict):
                raise ValueError("Invalid SHOP yaml")
            for object_name, attributes in objects.items():
                if not isinstance(attributes, dict):
                    raise ValueError("Invalid SHOP yaml")
                valid_mappings.add((str(object_type).lower(), str(object_name).lower()))
        return valid_mappings

cognite.powerops.resync.config.Watercourse

Bases: BaseModel

Source code in cognite/powerops/resync/config/production/watercourse.py
class Watercourse(BaseModel):
    name: str
    shop_penalty_limit: int = 42000

cognite.powerops.resync.config.Plant

Bases: BaseModel

Source code in cognite/powerops/resync/config/production/plant.py
class Plant(BaseModel):
    name: PlantName
    external_id: ExternalId
    outlet_level: float  # meters above sea level
    p_min: float = p_min_fallback
    p_max: float = p_max_fallback  # arbitrary max power output if not specified
    head_loss_factor: float = 0.0
    penstock_head_loss_factors: Optional[dict[str, float]] = None
    connections_losses: Optional[float] = None
    display_name: Optional[str] = None
    ordering_key: Optional[float] = None

    generator_ext_ids: list[ExternalId] = []  # external IDs of generator assets
    inlet_reservoir_ext_id: Optional[ExternalId] = None  # external ID of reservoir asset

    inlet_level_time_series: Optional[ExternalId] = None  # external ID of time series with values in m.a.s.l.
    outlet_level_time_series: Optional[ExternalId] = None  # external ID of time series with values in m.a.s.l.
    water_value_time_series: Optional[ExternalId] = None  # external ID of time series with values in €/MWh
    feeding_fee_time_series: Optional[ExternalId] = None  # external ID of time series with values in percent
    p_min_time_series: Optional[ExternalId] = None  # external ID of time series with values in MW
    p_max_time_series: Optional[ExternalId] = None  # external ID of time series with values in MW
    head_direct_time_series: Optional[ExternalId] = None  # external ID of time series with values in m

    @field_validator("penstock_head_loss_factors", mode="before")
    @classmethod
    def parse_dict(cls, value):
        if isinstance(value, str):
            value = json.loads(value)
        return value

    @classmethod
    def from_cdf_resources(cls, asset: Asset, relationships: list[Relationship], **kwargs) -> Plant:
        """Initialise a Plant from CDF Asset and Relationships

        Args:
            asset: The plant Asset relationships
            relationships: Relationships to related resources (will be mapped to attributes based on labels)
            kwargs: Any other attributes that are not part of the Asset

        """
        # Initialise plant from Asset
        plant = cls(
            name=asset.name,
            external_id=asset.external_id,
            outlet_level=float(asset.metadata["outlet_level"]),
            p_min=float(asset.metadata["p_min"]),
            p_max=float(asset.metadata["p_max"]),
            head_loss_factor=float(asset.metadata["head_loss_factor"]),
            connection_losses=asset.metadata.get("connection_losses") or None,
            penstock_head_loss_factors=json.loads(asset.metadata.get("penstock_head_loss_factors") or "{}"),
            **kwargs,  # kwargs to set any other attributes that are not part of the Asset
        )

        # Find time series based on relationships
        time_series_ext_ids_and_labels = [
            (rel.target_external_id, [label.external_id for label in (rel.labels or [])])
            for rel in relationships
            if rel.target_type == "TIMESERIES" and rel.source_external_id == plant.external_id
        ]
        for ts_ext_id, labels in time_series_ext_ids_and_labels:
            if RelationshipLabel.INLET_LEVEL_TIME_SERIES in labels:
                plant.inlet_level_time_series = ts_ext_id
            elif RelationshipLabel.OUTLET_LEVEL_TIME_SERIES in labels:
                plant.outlet_level_time_series = ts_ext_id
            elif RelationshipLabel.WATER_VALUE_TIME_SERIES in labels:
                plant.water_value_time_series = ts_ext_id
            elif RelationshipLabel.FEEDING_FEE_TIME_SERIES in labels:
                plant.feeding_fee_time_series = ts_ext_id
            elif RelationshipLabel.P_MIN_TIME_SERIES in labels:
                plant.p_min_time_series = ts_ext_id
            elif RelationshipLabel.P_MAX_TIME_SERIES in labels:
                plant.p_max_time_series = ts_ext_id
            elif RelationshipLabel.HEAD_DIRECT_TIME_SERIES in labels:
                plant.head_direct_time_series = ts_ext_id

        # Find generators based on relationships
        plant.generator_ext_ids = [
            rel.target_external_id
            for rel in relationships
            if label_in_labels(RelationshipLabel.GENERATOR, rel.labels or [])
        ]

        # Find inlet reservoir based on relationships
        for rel in relationships:
            if (
                rel.target_type == "ASSET"
                and label_in_labels(RelationshipLabel.INLET_RESERVOIR, rel.labels or [])
                and rel.source_external_id == plant.external_id
            ):
                plant.inlet_reservoir_ext_id = rel.target_external_id
                break

        return plant

from_cdf_resources(asset, relationships, **kwargs) classmethod

Initialise a Plant from CDF Asset and Relationships

Parameters:

Name Type Description Default
asset Asset

The plant Asset relationships

required
relationships list[Relationship]

Relationships to related resources (will be mapped to attributes based on labels)

required
kwargs

Any other attributes that are not part of the Asset

{}
Source code in cognite/powerops/resync/config/production/plant.py
@classmethod
def from_cdf_resources(cls, asset: Asset, relationships: list[Relationship], **kwargs) -> Plant:
    """Initialise a Plant from CDF Asset and Relationships

    Args:
        asset: The plant Asset relationships
        relationships: Relationships to related resources (will be mapped to attributes based on labels)
        kwargs: Any other attributes that are not part of the Asset

    """
    # Initialise plant from Asset
    plant = cls(
        name=asset.name,
        external_id=asset.external_id,
        outlet_level=float(asset.metadata["outlet_level"]),
        p_min=float(asset.metadata["p_min"]),
        p_max=float(asset.metadata["p_max"]),
        head_loss_factor=float(asset.metadata["head_loss_factor"]),
        connection_losses=asset.metadata.get("connection_losses") or None,
        penstock_head_loss_factors=json.loads(asset.metadata.get("penstock_head_loss_factors") or "{}"),
        **kwargs,  # kwargs to set any other attributes that are not part of the Asset
    )

    # Find time series based on relationships
    time_series_ext_ids_and_labels = [
        (rel.target_external_id, [label.external_id for label in (rel.labels or [])])
        for rel in relationships
        if rel.target_type == "TIMESERIES" and rel.source_external_id == plant.external_id
    ]
    for ts_ext_id, labels in time_series_ext_ids_and_labels:
        if RelationshipLabel.INLET_LEVEL_TIME_SERIES in labels:
            plant.inlet_level_time_series = ts_ext_id
        elif RelationshipLabel.OUTLET_LEVEL_TIME_SERIES in labels:
            plant.outlet_level_time_series = ts_ext_id
        elif RelationshipLabel.WATER_VALUE_TIME_SERIES in labels:
            plant.water_value_time_series = ts_ext_id
        elif RelationshipLabel.FEEDING_FEE_TIME_SERIES in labels:
            plant.feeding_fee_time_series = ts_ext_id
        elif RelationshipLabel.P_MIN_TIME_SERIES in labels:
            plant.p_min_time_series = ts_ext_id
        elif RelationshipLabel.P_MAX_TIME_SERIES in labels:
            plant.p_max_time_series = ts_ext_id
        elif RelationshipLabel.HEAD_DIRECT_TIME_SERIES in labels:
            plant.head_direct_time_series = ts_ext_id

    # Find generators based on relationships
    plant.generator_ext_ids = [
        rel.target_external_id
        for rel in relationships
        if label_in_labels(RelationshipLabel.GENERATOR, rel.labels or [])
    ]

    # Find inlet reservoir based on relationships
    for rel in relationships:
        if (
            rel.target_type == "ASSET"
            and label_in_labels(RelationshipLabel.INLET_RESERVOIR, rel.labels or [])
            and rel.source_external_id == plant.external_id
        ):
            plant.inlet_reservoir_ext_id = rel.target_external_id
            break

    return plant

cognite.powerops.resync.config.Generator

Bases: BaseModel

Source code in cognite/powerops/resync/config/production/generator.py
class Generator(BaseModel):
    name: GeneratorName
    penstock: str
    startcost: float
    p_min: float = 0.0

    start_stop_cost_time_series: Optional[ExternalId] = None  # external ID of time series with values in m
    is_available_time_series: Optional[ExternalId] = None  # external ID of boolean time series

    @property
    def external_id(self) -> ExternalId:
        return f"generator_{self.name}"

    @field_validator("penstock", mode="before")
    @classmethod
    def to_string(cls, value):
        return str(value)

    @field_validator("startcost", mode="before")
    @classmethod
    def to_float(cls, value):
        if isinstance(value, dict) and len(value) >= 1:
            # Timeseries with one value
            _, value = next(iter(value.items()))
            return float(value)
        return value

cognite.powerops.resync.config.TimeSeriesMapping

Bases: BaseModel

Source code in cognite/powerops/resync/config/_shared.py
class TimeSeriesMapping(BaseModel):
    rows: list[TimeSeriesMappingEntry] = []
    columns: ClassVar[list[str]] = [
        "shop_model_path",
        "time_series_external_id",
        "transformations",
        "transformations1",
        "transformations2",
        "transformations3",
        "retrieve",
        "aggregation",
    ]

    @property
    def transformations_cols(self) -> list[str]:
        return [col for col in self.columns if col.startswith("transformations")]

    def __iter__(self) -> Iterator[TimeSeriesMappingEntry]:
        yield from self.rows

    def __len__(self) -> int:
        return len(self.rows)

    def __add__(self, other: TimeSeriesMapping) -> TimeSeriesMapping:
        return TimeSeriesMapping(rows=self.rows + other.rows)

    def append(self, element: TimeSeriesMappingEntry) -> None:
        self.rows.append(element)

    def extend(self, other: TimeSeriesMapping) -> None:
        self.rows.extend(other.rows)

    @property
    def column_definitions(self) -> list[dict]:
        return [{"valueType": "STRING", "externalId": col} for col in self.columns]

    def to_dataframe(self) -> pd.DataFrame:
        rows = [row.to_sequence_row(max_transformation_cols=len(self.transformations_cols)) for row in self.rows]
        return pd.DataFrame(data=rows, columns=self.columns)

    def dumps(self) -> dict[str, Any]:
        rows = []
        for row in self.rows:
            row_raw = row.dict(exclude={"aggregation", "retrieve", "transformations"})
            if row.aggregation:
                row_raw["aggregation"] = row.aggregation.name
            if row.retrieve:
                row_raw["retrieve"] = row.retrieve.name
            if row.transformations:
                row_raw["transformations"] = [
                    {
                        **transformation.dict(exclude={"transformation"}),
                        "transformation": transformation.transformation.name,
                    }
                    for transformation in row.transformations
                ]
            rows.append(row_raw)
        return {"rows": rows}

cognite.powerops.resync.config.PlantTimeSeriesMapping

Bases: BaseModel

Source code in cognite/powerops/resync/config/production/plant.py
class PlantTimeSeriesMapping(BaseModel):
    plant_name: str
    water_value: Optional[ExternalId] = None
    inlet_reservoir_level: Optional[ExternalId] = None
    outlet_reservoir_level: Optional[ExternalId] = None
    p_min: Optional[ExternalId] = None
    p_max: Optional[ExternalId] = None
    feeding_fee: Optional[ExternalId] = None
    head_direct: Optional[ExternalId] = None

    @field_validator("*", mode="before")
    @classmethod
    def parse_number_to_string(cls, value):
        return str(value) if isinstance(value, (int, float)) else value

cognite.powerops.resync.config.GeneratorTimeSeriesMapping

Bases: BaseModel

Source code in cognite/powerops/resync/config/production/generator.py
class GeneratorTimeSeriesMapping(BaseModel):
    generator_name: GeneratorName
    start_stop_cost: Optional[ExternalId] = None
    is_available: Optional[ExternalId] = None

    @field_validator("start_stop_cost", "is_available", mode="before")
    @classmethod
    def parse_number_to_string(cls, value):
        return str(value) if isinstance(value, (int, float)) else value