Skip to content

resync

What is resync?

resync is a tool for syncing configuration files with CDF.

Why resync?

The problem resync is solving is to have confidence through traceability and reproducibility of the configuration files used to create the resources in CDF. The user interface is inspired by terraform with init, validate, plan, apply, destroy commands. The plan command will compare CDF with the local configuration files and output the changes that will be applied if the apply command is run. The apply command will apply the changes to CDF. resync is intended to be run in a CI/CD pipeline, but can also be run locally.

The alternative to using resync is to manually create the resources in CDF, and/or have custom scripts residing on different developers local computers. This is error prone and does not provide traceability and reproducibility.

How does resync work?

flowchart LR
    A(.yaml-files) -->|Validation| B(Pydantic Config Models)
    B -->|Combine| C(Resync Models)
    C --> E(Compare)
    D(CDF) --> F(Resync Models)
    F --> E
    E -->|Upload Diff| G(CDF)

Main resync functions

cognite.powerops.resync.init(client, is_dev=False, dry_run=False, verbose=False)

This function will create the data models in CDF that are required for resync to work. It will not overwrite existing models.

Parameters:

Name Type Description Default
client PowerOpsClient | None

The PowerOpsClient to use. If not provided, a new client will be created.

required
is_dev bool

Whether the deployment is for development environment. If true, the models views and data models will be deleted and recreated.

False
dry_run bool

Whether to run the command as a dry run, meaning no resources will be created.

False
Source code in cognite/powerops/resync/core/main.py
def init(client: PowerOpsClient | None, is_dev: bool = False, dry_run: bool = False, verbose: bool = False) -> None:
    """
    This function will create the data models in CDF that are required for resync to work. It will not overwrite
    existing models.

    Args:
        client: The PowerOpsClient to use. If not provided, a new client will be created.
        is_dev: Whether the deployment is for development environment. If true, the models views and data models
                will be deleted and recreated.
        dry_run: Whether to run the command as a dry run, meaning no resources will be created.
    """
    client = client or PowerOpsClient.from_settings()
    cdf = client.cdf
    result = re.findall(r"^https://([\w-]+).cognitedata.com", cdf.config.base_url)
    if result:
        cluster = result[0]
    else:
        raise ValueError(f"Invalid base_url {cdf.config.base_url}")

    with environment_variables(
        {
            "CDF_CLUSTER": cluster,
            "CDF_URL": cdf.config.base_url,
            "IDP_CLIENT_ID": cdf.config.credentials.client_id,  # type: ignore[attr-defined]
            "IDP_CLIENT_SECRET": "dummy",
            "IDP_TOKEN_URL": (
                cdf.config.credentials.authority_url
                if hasattr(cdf.config.credentials, "authority_url")
                else cdf.config.credentials.token_url  # type: ignore[attr-defined]
            ),
            "CDF_PROJECT": cdf.config.project,
        }
    ):
        tool_config = CDFToolConfig()
        tool_config._client = cdf

    powerops_folder = Path(__file__).parent.parent.parent
    with environment_variables({"SENTRY_ENABLED": "false"}):
        ctx = Context(Command("build"))
        ctx.obj = Common(
            override_env=False,
            verbose=verbose,
            cluster=cluster,
            project=cdf.config.project,
            mockToolGlobals=tool_config,
        )
        build(ctx=ctx, source_dir=str(powerops_folder), build_dir="build", build_env="dev", clean=True)

    build_folder = Path.cwd() / "build"

    loader = DataModelLoader(build_folder)
    schema = loader.load()
    logger.info("Loaded all powerops data models")
    # TODO: fix validation
    DataModelLoader.validate(schema)
    logger.info("Validated all powerops data models")

    # Nodes should not be deployed, so we remove them from the build folder
    for node_file in build_folder.glob("data_models/*.powerops_nodes.yaml"):
        node_file.unlink()

    build_hash = calculate_directory_hash(build_folder)
    filepath = Path(tempfile.gettempdir()) / "powerops_init_command.txt"
    if dry_run:
        filepath.write_text(build_hash)
    elif filepath.exists() and filepath.read_text() == build_hash:
        ...
    else:
        print(
            Panel(
                "[bold red]Error: [/] `powerops init` has not been run with --dry-run before running without it."
                "Please run `powerops init` with --dry-run first to verify the changes.",
                title="No dry-run",
            )
        )
        exit(1)

    if is_dev:
        changed = loader.changed_views(cdf, schema, verbose)
        if changed:
            print(Panel(f"Detected {len(changed)} changed views"))
            if verbose:
                for view in changed:
                    logger.info(f"Changed view: {view}")
            dependencies = {view_id for dependencies in changed.values() for view_id in dependencies}
            print(f"Detected {len(dependencies)} dependent views")
            prefix = "Would delete" if dry_run else "Deleting"
            to_delete = list(dependencies | set(changed))
            print(f"{prefix} changed and dependent {len(to_delete)} views")

            if not dry_run:
                deleted = cdf.data_modeling.views.delete(to_delete)
                print(f"Deleted {len(deleted)} views")

            data_model_ids_to_delete = set(loader.dependent_data_models(schema, set(to_delete)))
        else:
            data_model_ids_to_delete = set()
            print(Panel("No changes detected in any views"))

        existing = cdf.data_modeling.data_models.retrieve(schema.data_models.as_ids())
        existing_by_id = {model.as_id(): model for model in existing}
        loader = ToolkitDataModelLoader.create_loader(tool_config)
        for local_data_model in schema.data_models:
            if local_data_model.as_id() not in existing_by_id:
                continue
            existing_data_model = existing_by_id[local_data_model.as_id()]

            if not loader._is_equal_custom(local_data_model, existing_data_model):  # type: ignore[attr-defined]
                data_model_ids_to_delete.add(local_data_model.as_id())

        print(Panel(f"Detected {len(data_model_ids_to_delete)} dependent and changed data models"))
        prefix = "Would delete" if dry_run else "Deleting"
        print(f"{prefix} dependent {len(data_model_ids_to_delete)} data models")
        if not dry_run:
            deleted_models = cdf.data_modeling.data_models.delete(list(data_model_ids_to_delete))
            print(f"Deleted {len(deleted_models)} data models")

    with environment_variables({"SENTRY_ENABLED": "false"}):
        ctx = Context(Command("deploy"))
        ctx.obj = Common(
            override_env=False,
            verbose=verbose,
            cluster=cluster,
            project=cdf.config.project,
            mockToolGlobals=tool_config,
        )
        try:
            deploy(
                ctx,
                build_dir="build",
                build_env="dev",
                interactive=False,
                drop=False,
                drop_data=False,
                dry_run=dry_run,
                include=None,
            )
        except SystemExit as e:
            if e.code != 0 and not dry_run:
                # Toolkit currently calls apply once for each view, which typically fails due to dependencies.
                # We try to deploy the views first, and then retry the deploy.
                print(Panel("Trying deploying views first", title="Deploy failed"))
                view_loader = ViewLoader.create_loader(tool_config)
                view_files = view_loader.find_files(build_folder / "data_models")
                views = [
                    view_loader.load_resource(view_file, tool_config, skip_validation=False) for view_file in view_files
                ]
                created = cdf.data_modeling.views.apply(views)
                print(f"Created {len(created)} views. Retrying deploy...")
                deploy(
                    ctx,
                    build_dir="build",
                    build_env="dev",
                    interactive=False,
                    drop=False,
                    drop_data=False,
                    dry_run=dry_run,
                    include=None,
                )
            else:
                raise

cognite.powerops.resync.validate(config_dir, market)

Source code in cognite/powerops/resync/core/main.py
def validate(config_dir: str | Path, market: str) -> Any:
    raise NotImplementedError("validate is not implemented")

cognite.powerops.resync.plan(config_dir, market, client, model_names=None, dump_folder=None)

Loads the local configuration files, transform them into Resync models, and compares them to the downloaded CDF Resync models.

Parameters:

Name Type Description Default
config_dir Path

Local path to the configuration files. Needs to follow a specific structure. See below.

required
market str

The market to load the configuration for.

required
client PowerOpsClient | None

The PowerOpsClient to use. If not provided, a new client will be created.

required
model_names str | list[str] | None

The models to run the plan.

None
dump_folder Optional[Path]

If present, the local and CDF changes will be dumped to this directory. This is done so that you can use local tools (for example, PyCharm or VS Code) to compare the detailed changes between the local and CDF configuration.

None

Returns:

Type Description
ModelDifferences

A ModelDifferences object containing the differences between the local and CDF configuration.

Configuration file structure:

📦config_dir
 ┣ 📂cogshop - The CogSHOP configuration
 ┣ 📂market - The Market configuration for DayAhead, RKOM, and benchmarking.
 ┣ 📂production - The physical assets configuration, Watercourse, PriceArea, Generator, Plant  (SHOP centered)
 ┗ 📜settings.yaml - Settings for resync.

Source code in cognite/powerops/resync/core/main.py
def plan(
    config_dir: Path,
    market: str,
    client: PowerOpsClient | None,
    model_names: str | list[str] | None = None,
    dump_folder: Optional[Path] = None,
) -> ModelDifferences:
    """
    Loads the local configuration files, transform them into Resync models, and compares them to the downloaded
    CDF Resync models.

    Args:
        config_dir: Local path to the configuration files. Needs to follow a specific structure. See below.
        market: The market to load the configuration for.
        client: The PowerOpsClient to use. If not provided, a new client will be created.
        model_names: The models to run the plan.
        dump_folder: If present, the local and CDF changes will be dumped to this directory. This is done so that
                    you can use local tools (for example, PyCharm or VS Code) to compare the detailed changes
                    between the local and CDF configuration.


    Returns:
        A ModelDifferences object containing the differences between the local and CDF configuration.

    Configuration file structure:
    ```
    📦config_dir
     ┣ 📂cogshop - The CogSHOP configuration
     ┣ 📂market - The Market configuration for DayAhead, RKOM, and benchmarking.
     ┣ 📂production - The physical assets configuration, Watercourse, PriceArea, Generator, Plant  (SHOP centered)
     ┗ 📜settings.yaml - Settings for resync.
    ```
    """
    client = client or PowerOpsClient.from_settings()

    loaded_models = _load_transform(market, config_dir, client.cdf.config.project, model_names)

    logger.info(f"Load transform completed, models {', '.join([type(m).__name__ for m in loaded_models])} loaded")
    if client.datasets.read_dataset is None:
        raise ValueError("No read_dataset configured in settings")
    data_set_external_id = client.datasets.read_dataset
    all_differences = []
    for new_model in loaded_models:
        logger.info(f"Retrieving {new_model.model_name} from CDF")
        cdf_model = type(new_model).from_cdf(client, data_set_external_id=data_set_external_id)

        if isinstance(new_model, AssetModel):
            static_resources = new_model.static_resources_from_cdf(client)
        else:
            static_resources = {}

        differences = diff.model_difference(cdf_model, new_model, static_resources)
        _clean_relationships(client.cdf, differences, new_model)

        if dump_folder:
            dump_folder.mkdir(parents=True, exist_ok=True)
            # Standardize models for easy comparison
            new_model.standardize()
            cdf_model.standardize()

            (dump_folder / f"{new_model.model_name}_local.yaml").write_text(safe_dump(new_model.dump_as_cdf_resource()))
            (dump_folder / f"{new_model.model_name}_cdf.yaml").write_text(safe_dump(cdf_model.dump_as_cdf_resource()))

        all_differences.append(differences)
    return ModelDifferences(all_differences)

cognite.powerops.resync.apply(config_dir, market, client=None, model_names=None, auto_yes=False)

Loads the local configuration files, transform them into Resync models, and uploads it to CDF. Any deviations of the existing CDF configuration will be overwritten.

Parameters:

Name Type Description Default
config_dir Path

Local path to the configuration files. Needs to follow a specific structure. See below.

required
market str

The market to load the configuration for.

required
client PowerOpsClient | None

The PowerOpsClient to use. If not provided, a new client will be created.

None
model_names str | list[str] | None

The models to run the plan.

None
auto_yes bool

If true, all prompts will be auto confirmed.

False

Returns:

Type Description
ModelDifferences

A ModelDifferences object containing the differences between the local and CDF configuration which have been

ModelDifferences

written to CDF.

Configuration file structure:

📦config_dir
 ┣ 📂cogshop - The CogSHOP configuration
 ┣ 📂market - The Market configuration for DayAhead, RKOM, and benchmarking.
 ┣ 📂production - The physical assets configuration, Watercourse, PriceArea, Generator, Plant  (SHOP centered)
 ┗ 📜settings.yaml - Settings for resync.

Source code in cognite/powerops/resync/core/main.py
def apply(
    config_dir: Path,
    market: str,
    client: PowerOpsClient | None = None,
    model_names: str | list[str] | None = None,
    auto_yes: bool = False,
) -> ModelDifferences:
    """
    Loads the local configuration files, transform them into Resync models, and uploads it to CDF. Any deviations
    of the existing CDF configuration will be overwritten.

    Args:
        config_dir: Local path to the configuration files. Needs to follow a specific structure. See below.
        market: The market to load the configuration for.
        client: The PowerOpsClient to use. If not provided, a new client will be created.
        model_names: The models to run the plan.
        auto_yes: If true, all prompts will be auto confirmed.

    Returns:
        A ModelDifferences object containing the differences between the local and CDF configuration which have been
        written to CDF.

    Configuration file structure:
    ```
    📦config_dir
     ┣ 📂cogshop - The CogSHOP configuration
     ┣ 📂market - The Market configuration for DayAhead, RKOM, and benchmarking.
     ┣ 📂production - The physical assets configuration, Watercourse, PriceArea, Generator, Plant  (SHOP centered)
     ┗ 📜settings.yaml - Settings for resync.
    ```
    """
    client = client or PowerOpsClient.from_settings()
    loaded_models = _load_transform(market, config_dir, client.cdf.config.project, model_names)

    written_changes = ModelDifferences([])
    for new_model in loaded_models:
        cdf_model = type(new_model).from_cdf(client, data_set_external_id=client.datasets.read_dataset)
        if isinstance(new_model, AssetModel):
            static_resources = new_model.static_resources_from_cdf(client)
        else:
            static_resources = {}

        differences = diff.model_difference(cdf_model, new_model, static_resources)

        # Do not create relationships to time series that does not exist.
        _clean_relationships(client.cdf, differences, new_model)
        # Remove the domain model as this is not CDF resources
        # TimeSeries are not updated by resync.
        differences.filter_out(group="Domain", field_names={"timeseries"})

        new_sequences_by_id = {s.external_id: s for s in new_model.sequences()}
        new_files_by_id = {f.external_id: f for f in new_model.files()}

        removed = _remove_resources(differences, client.cdf, auto_yes)
        added_updated = _add_update_resources(differences, client, auto_yes, new_sequences_by_id, new_files_by_id)

        written_changes.append(removed + added_updated)

    return written_changes

cognite.powerops.resync.destroy(client, model_names=None, auto_yes=False, dry_run=False)

Destroys all resync models in CDF. This will also delete all data in the models.

Parameters:

Name Type Description Default
client PowerOpsClient | None

The PowerOpsClient to use. If not provided, a new client will be created.

required
model_names str | list[str] | None

The models to destroy.

None
auto_yes bool

If true, all prompts will be auto confirmed.

False
dry_run bool

If true, the models will not be deleted, but the changes will be printed.

False

Returns:

Type Description
ModelDifferences

A ModelDifferences object containing the resources that has been destroyed.

Source code in cognite/powerops/resync/core/main.py
def destroy(
    client: PowerOpsClient | None,
    model_names: str | list[str] | None = None,
    auto_yes: bool = False,
    dry_run: bool = False,
) -> ModelDifferences:
    """
    Destroys all resync models in CDF. This will also delete all data in the models.

    Args:
        client: The PowerOpsClient to use. If not provided, a new client will be created.
        model_names: The models to destroy.
        auto_yes: If true, all prompts will be auto confirmed.
        dry_run: If true, the models will not be deleted, but the changes will be printed.

    Returns:
        A ModelDifferences object containing the resources that has been destroyed.
    """
    client = client or PowerOpsClient.from_settings()

    model_types = _to_models(model_names)
    destroyed = ModelDifferences([])
    for model_type in model_types:
        if issubclass(model_type, DataModel) and model_type.graph_ql:
            remove_data_model = _get_data_model_view_containers(
                client.cdf, model_type.graph_ql.id_, model_type.__name__
            )
            if not remove_data_model.changes:
                logger.warning(f"Skipping {model_type.__name__}, no data model found")
                continue
            static_resources = {}
        elif issubclass(model_type, DataModel) and model_type.dms_model:
            raise NotImplementedError()
        elif issubclass(model_type, DataModel) and model_type.source_model:
            raise NotImplementedError()
        elif issubclass(model_type, AssetModel):
            remove_data_model = ModelDifference(model_type.__name__, {})
            if issubclass(model_type, models.MarketModel):
                # We only need the root asset to be set.
                production_external_id = cast(str, models.ProductionModel.root_asset.external_id)
                model_type.set_root_asset("", "", "", production_external_id)
            static_resources = model_type.static_resources_from_cdf(client)
        else:
            raise ValueError(f"Unknown model type {model_type}")

        cdf_model = model_type.from_cdf(client, data_set_external_id=client.datasets.read_dataset)

        remove_data = diff.remove_only(cdf_model, static_resources)
        remove_data.filter_out(group="Domain", field_names={"timeseries"})

        if dry_run:
            destroyed.append(remove_data + remove_data_model)
        else:
            removed = _remove_resources(remove_data + remove_data_model, client.cdf, auto_yes)
            destroyed.append(removed)

    loader = DataModelLoader(Path.cwd() / "build")
    schema = loader.load()
    loader.destroy(client.cdf, schema, dry_run)

    # Spaces are deleted last, as they might contain other resources.
    spaces = set(space for d in model_types if issubclass(d, DataModel) for space in d.spaces())
    if spaces and not dry_run:
        deleted_space: list[SpaceId] = []
        # One at a time, in case there are other resources in the space that will prevent deletion.
        for space in spaces:
            logger.info(f"Deleting space {space}..")
            try:
                client.cdf.data_modeling.spaces.delete(list(spaces))
            except CogniteAPIError as e:
                logger.warning(f"Failed to delete space {space} with error {e}")
            else:
                logger.info(f"... deleted space {space}")
                deleted_space.append(SpaceId(space))
        if deleted_space:
            destroyed.append(
                ModelDifference(
                    model_name="All Models",
                    changes={
                        "spaces": FieldDifference(
                            group="CDF",
                            field_name="spaces",
                            removed=list(deleted_space),
                            added=[],
                            changed=[],
                            unchanged=[],
                        )
                    },
                )
            )
    if not dry_run:
        labels = AssetLabel.as_label_definitions() + RelationshipLabel.as_label_definitions()
        client.cdf.labels.delete([label.external_id for label in labels if label.external_id])

    return destroyed