Skip to content

shared

cognite.powerops.resync.config.Auction

Bases: str, Enum

Source code in cognite/powerops/resync/config/_shared.py
class Auction(str, Enum):
    week = "week"
    weekend = "weekend"

cognite.powerops.resync.config.RetrievalType

Bases: Enum

Source code in cognite/powerops/resync/config/_shared.py
class RetrievalType(Enum):
    RANGE = auto()
    START = auto()
    END = auto()

cognite.powerops.resync.config.AggregationMethod

Bases: Enum

Source code in cognite/powerops/resync/config/_shared.py
class AggregationMethod(Enum):
    sum = auto()
    mean = auto()
    std = auto()
    sem = auto()
    max = auto()
    min = auto()
    median = auto()
    first = auto()
    last = auto()

cognite.powerops.resync.config.TransformationType

Bases: Enum

Source code in cognite/powerops/resync/config/_shared.py
class TransformationType(Enum):
    GATE_SCHEDULE_FLAG_VALUE_MAPPING = auto()
    GENERATOR_PRODUCTION_SCHEDULE_FLAG_VALUE_MAPPING = auto()
    PLANT_PRODUCTION_SCHEDULE_FLAG_VALUE_MAPPING = auto()
    GATE_OPENING_METER_TO_PERCENT = auto()
    TO_BOOL = auto()
    TO_INT = auto()
    ZERO_IF_NOT_ONE = auto()
    MULTIPLY = auto()
    MULTIPLY_FROM_OFFSET = auto()
    ADD = auto()
    ADD_FROM_OFFSET = auto()
    DYNAMIC_ADD_FROM_OFFSET = auto()
    RESERVOIR_LEVEL_TO_VOLUME = auto()
    STATIC = auto()
    DYNAMIC_STATIC = auto()
    ONE_IF_TWO = auto()
    ADD_WATER_IN_TRANSIT = auto()

cognite.powerops.resync.config.Transformation

Bases: BaseModel

Source code in cognite/powerops/resync/config/_shared.py
class Transformation(BaseModel):  # type: ignore[no-redef]  # mypy says kwargs is redefined on this line ¯\_(ツ)_/¯
    transformation: TransformationType
    kwargs: Optional[dict] = None

    @field_validator("transformation", mode="before")
    @classmethod
    def to_type(cls, value):
        return TransformationType[value.upper()] if isinstance(value, str) else value

    def to_dict(self) -> dict:
        return {"transformation": self.transformation.name, "kwargs": self.kwargs}

cognite.powerops.resync.config.TimeSeriesMappingEntry

Bases: BaseModel

Source code in cognite/powerops/resync/config/_shared.py
class TimeSeriesMappingEntry(BaseModel):
    object_type: str
    object_name: str
    attribute_name: str
    time_series_external_id: Optional[str] = None
    transformations: Optional[list[Transformation]] = None
    retrieve: Optional[RetrievalType] = None
    aggregation: Optional[AggregationMethod] = None

    @field_validator("aggregation", mode="before")
    @classmethod
    def to_enum(cls, value):
        return AggregationMethod[value] if isinstance(value, str) else value

    @validator("aggregation", always=True)  # TODO: update to use pydantic v2 field_validator
    @classmethod
    def set_default(cls, value, values):
        if value is not None:
            return value
        # TODO: do we want default `None` here or raise error?
        return ATTRIBUTE_DEFAULT_AGGREGATION.get(f"{values.get('object_type')}.{values.get('attribute_name')}")

    @field_validator("retrieve", mode="before")
    @classmethod
    def to_retrieval_enum(cls, value):
        return RetrievalType[value] if isinstance(value, str) else value

    @property
    def shop_model_path(self) -> str:
        return f"{self.object_type}.{self.object_name}.{self.attribute_name}"

    def _transformations_to_strings(self, max_cols: int) -> list[str]:
        if self.transformations:
            # ensure_ascii=False to allow æøåÆØÅ
            transformation_string = json.dumps([t.to_dict() for t in self.transformations], ensure_ascii=False)
        else:
            transformation_string = json.dumps([])

        max_chars = 255
        return [transformation_string[i * max_chars : (i + 1) * max_chars] for i in range(max_cols)]

    def to_sequence_row(self, max_transformation_cols: int) -> list[str | float]:
        return [
            self.shop_model_path,
            self.time_series_external_id or float("nan"),
            *self._transformations_to_strings(max_cols=max_transformation_cols),
            self.retrieve.name if self.retrieve else float("nan"),
            self.aggregation.name if self.aggregation else float("nan"),
        ]

cognite.powerops.resync.config.TimeSeriesMapping

Bases: BaseModel

Source code in cognite/powerops/resync/config/_shared.py
class TimeSeriesMapping(BaseModel):
    rows: list[TimeSeriesMappingEntry] = []
    columns: ClassVar[list[str]] = [
        "shop_model_path",
        "time_series_external_id",
        "transformations",
        "transformations1",
        "transformations2",
        "transformations3",
        "retrieve",
        "aggregation",
    ]

    @property
    def transformations_cols(self) -> list[str]:
        return [col for col in self.columns if col.startswith("transformations")]

    def __iter__(self) -> Iterator[TimeSeriesMappingEntry]:
        yield from self.rows

    def __len__(self) -> int:
        return len(self.rows)

    def __add__(self, other: TimeSeriesMapping) -> TimeSeriesMapping:
        return TimeSeriesMapping(rows=self.rows + other.rows)

    def append(self, element: TimeSeriesMappingEntry) -> None:
        self.rows.append(element)

    def extend(self, other: TimeSeriesMapping) -> None:
        self.rows.extend(other.rows)

    @property
    def column_definitions(self) -> list[dict]:
        return [{"valueType": "STRING", "externalId": col} for col in self.columns]

    def to_dataframe(self) -> pd.DataFrame:
        rows = [row.to_sequence_row(max_transformation_cols=len(self.transformations_cols)) for row in self.rows]
        return pd.DataFrame(data=rows, columns=self.columns)

    def dumps(self) -> dict[str, Any]:
        rows = []
        for row in self.rows:
            row_raw = row.dict(exclude={"aggregation", "retrieve", "transformations"})
            if row.aggregation:
                row_raw["aggregation"] = row.aggregation.name
            if row.retrieve:
                row_raw["retrieve"] = row.retrieve.name
            if row.transformations:
                row_raw["transformations"] = [
                    {
                        **transformation.dict(exclude={"transformation"}),
                        "transformation": transformation.transformation.name,
                    }
                    for transformation in row.transformations
                ]
            rows.append(row_raw)
        return {"rows": rows}