Skip to content

time series mapping

cognite.powerops.resync.time_series_mapping

This subpackage contains helpers that are used by end-user to generate the TimeSeriesMapping required as input in the CogSHOP configuration.

cognite.powerops.resync.time_series_mapping.mapping.filter_time_series_mappings(mapping, client)

Filters out mapping entries where the expected CDF time series is not found.

Source code in cognite/powerops/resync/time_series_mapping/mapping.py
def filter_time_series_mappings(mapping: TimeSeriesMapping, client: CogniteClient) -> TimeSeriesMapping:
    """Filters out mapping entries where the expected CDF time series is not found."""
    if not mapping.rows:
        print_warning("TimeSeriesMapping is empty! No filtering possible!")
        return mapping

    external_ids_in_mapping = {
        mapping.time_series_external_id for mapping in mapping if mapping.time_series_external_id is not None
    }

    external_ids_in_cdf = {
        time_series.external_id
        for time_series in client.time_series.retrieve_multiple(
            external_ids=list(external_ids_in_mapping), ignore_unknown_ids=True
        )
    }

    if missing_from_cdf := external_ids_in_mapping - external_ids_in_cdf:
        print_warning(f"Time series found in mapping but missing from CDF: {missing_from_cdf}")

    return TimeSeriesMapping(
        rows=[
            entry
            for entry in mapping
            if entry.time_series_external_id in external_ids_in_cdf or entry.time_series_external_id is None
        ]
    )

cognite.powerops.resync.time_series_mapping.mapping.merge_and_keep_last_mapping_if_overlap(*mappings)

Within each TimeSeriesMapping keep the last TimeSeriesMappingEntry in cases where they have the same shop_model_path. Across mappings keep the last TimeSeriesMappingEntry from the last TimeSeriesMapping.

Source code in cognite/powerops/resync/time_series_mapping/mapping.py
def merge_and_keep_last_mapping_if_overlap(*mappings: TimeSeriesMapping) -> TimeSeriesMapping:
    """Within each TimeSeriesMapping keep the last TimeSeriesMappingEntry in cases
    where they have the same `shop_model_path`. Across mappings keep the last TimeSeriesMappingEntry
    from the last TimeSeriesMapping.
    """

    keep_last_across_lists = {entry.shop_model_path: entry for mapping in mappings for entry in mapping}

    return TimeSeriesMapping(rows=list(keep_last_across_lists.values()))

cognite.powerops.resync.time_series_mapping.mapping.special_case_handle_gate_number(name)

Must handle any gate numbers above 1 if found in other sources than YAML

Source code in cognite/powerops/resync/time_series_mapping/mapping.py
def special_case_handle_gate_number(name: str) -> None:
    """Must handle any gate numbers above 1 if found in other sources than YAML"""
    # TODO: extend to handle special case if needed
    if re.search(pattern=r"L[2-9]", string=name):
        print_warning(f"Potential gate {name} not in YAML!")

cognite.powerops.resync.time_series_mapping.mapping.print_warning(s)

Adds some nice colors to the printed text :)

Source code in cognite/powerops/resync/time_series_mapping/mapping.py
def print_warning(s: str) -> None:
    """Adds some nice colors to the printed text :)"""
    print(f"\033[91m[WARNING] {s}\033[0m")

cognite.powerops.resync.time_series_mapping.static_mapping.is_constant_valued_dict(attribute_value)

Check if all dictionary values are the same

Source code in cognite/powerops/resync/time_series_mapping/static_mapping.py
def is_constant_valued_dict(attribute_value: Any) -> bool:
    """Check if all dictionary values are the same"""
    return (
        isinstance(attribute_value, dict)
        and all(isinstance(value, (float, int)) for value in attribute_value.values())
        and len(set(attribute_value.values())) == 1
    )

cognite.powerops.resync.time_series_mapping.static_mapping.get_static_mapping(shop_model)

Source code in cognite/powerops/resync/time_series_mapping/static_mapping.py
def get_static_mapping(shop_model: dict) -> TimeSeriesMapping:
    # sourcery skip: for-append-to-extend, merge-nested-ifs
    static_mapping = TimeSeriesMapping()
    for object_type, objects in shop_model.items():
        for object_name, object_attributes in objects.items():
            # e.g. max_vol: 42 or maintenance_flag: {<date>: 1, <other_date>: 0}
            for attribute_name, attribute_value in object_attributes.items():
                if f"{object_type}.{attribute_name}" in ignored_attributes:
                    continue

                if is_constant_valued_dict(attribute_value):
                    static_value = {0: next(iter(attribute_value.values()))}  # TODO: clean up
                    static_mapping.append(
                        TimeSeriesMappingEntry(
                            object_type=object_type,
                            object_name=str(object_name),
                            attribute_name=attribute_name,
                            transformations=[
                                Transformation(transformation=TransformationType.STATIC, kwargs=static_value)
                            ],
                        )
                    )

    return static_mapping