Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
ray / purelib / ray / tune / tests / test_experiment_analysis.py
Size: Mime:
import unittest
import shutil
import tempfile
import random
import os
import pickle
import pandas as pd
from numpy import nan

import ray
from ray import tune
from ray.tune import ExperimentAnalysis
import ray.tune.registry
from ray.tune.utils.mock_trainable import MyTrainableClass
from ray.tune.utils.util import is_nan


class ExperimentAnalysisSuite(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        ray.init(num_cpus=4, num_gpus=0, local_mode=True, include_dashboard=False)

    @classmethod
    def tearDownClass(cls):
        ray.shutdown()

    def setUp(self):
        self.test_dir = tempfile.mkdtemp()
        self.test_name = "analysis_exp"
        self.num_samples = 10
        self.metric = "episode_reward_mean"
        self.test_path = os.path.join(self.test_dir, self.test_name)
        self.run_test_exp()

    def tearDown(self):
        shutil.rmtree(self.test_dir, ignore_errors=True)

    def run_test_exp(self):
        self.ea = tune.run(
            MyTrainableClass,
            name=self.test_name,
            local_dir=self.test_dir,
            stop={"training_iteration": 1},
            checkpoint_freq=1,
            num_samples=self.num_samples,
            config={
                "width": tune.sample_from(lambda spec: 10 + int(90 * random.random())),
                "height": tune.sample_from(lambda spec: int(100 * random.random())),
            },
        )

    def nan_test_exp(self):
        nan_ea = tune.run(
            lambda x: nan,
            name="testing_nan",
            local_dir=self.test_dir,
            stop={"training_iteration": 1},
            checkpoint_freq=1,
            num_samples=self.num_samples,
            config={
                "width": tune.sample_from(lambda spec: 10 + int(90 * random.random())),
                "height": tune.sample_from(lambda spec: int(100 * random.random())),
            },
        )
        return nan_ea

    def testDataframe(self):
        df = self.ea.dataframe(self.metric, mode="max")

        self.assertTrue(isinstance(df, pd.DataFrame))
        self.assertEqual(df.shape[0], self.num_samples)

    def testLoadJson(self):
        all_dataframes_via_csv = self.ea.fetch_trial_dataframes()

        self.ea.set_filetype("json")
        all_dataframes_via_json = self.ea.fetch_trial_dataframes()

        assert set(all_dataframes_via_csv) == set(all_dataframes_via_json)

        with self.assertRaises(ValueError):
            self.ea.set_filetype("bad")

        self.ea.set_filetype("csv")
        all_dataframes_via_csv2 = self.ea.fetch_trial_dataframes()
        assert set(all_dataframes_via_csv) == set(all_dataframes_via_csv2)

    def testStats(self):
        assert self.ea.stats()
        assert self.ea.runner_data()

    def testTrialDataframe(self):
        checkpoints = self.ea._checkpoints_and_paths
        idx = random.randint(0, len(checkpoints) - 1)
        logdir_from_checkpoint = str(
            checkpoints[idx][1].joinpath(checkpoints[idx][0]["relative_logdir"])
        )
        logdir_from_trial = self.ea.trials[idx].logdir

        self.assertEqual(logdir_from_checkpoint, logdir_from_trial)

        trial_df = self.ea.trial_dataframes[logdir_from_checkpoint]

        self.assertTrue(isinstance(trial_df, pd.DataFrame))
        self.assertEqual(trial_df.shape[0], 1)

    def testBestConfig(self):
        best_config = self.ea.get_best_config(self.metric, mode="max")
        self.assertTrue(isinstance(best_config, dict))
        self.assertTrue("width" in best_config)
        self.assertTrue("height" in best_config)

    def testBestConfigNan(self):
        nan_ea = self.nan_test_exp()
        best_config = nan_ea.get_best_config(self.metric, mode="max")
        self.assertIsNone(best_config)

    def testBestLogdir(self):
        logdir = self.ea.get_best_logdir(self.metric, mode="max")
        self.assertTrue(logdir.startswith(self.test_path))
        logdir2 = self.ea.get_best_logdir(self.metric, mode="min")
        self.assertTrue(logdir2.startswith(self.test_path))
        self.assertNotEqual(logdir, logdir2)

    def testBestLogdirNan(self):
        nan_ea = self.nan_test_exp()
        logdir = nan_ea.get_best_logdir(self.metric, mode="max")
        self.assertIsNone(logdir)

    def testGetTrialCheckpointsPathsByTrial(self):
        best_trial = self.ea.get_best_trial(self.metric, mode="max")
        checkpoints_metrics = self.ea.get_trial_checkpoints_paths(best_trial)
        logdir = self.ea.get_best_logdir(self.metric, mode="max")
        expected_path = os.path.join(logdir, "checkpoint_000001")
        assert os.path.normpath(checkpoints_metrics[0][0]) == expected_path
        assert checkpoints_metrics[0][1] == 1

    def testGetTrialCheckpointsPathsByPath(self):
        logdir = self.ea.get_best_logdir(self.metric, mode="max")
        checkpoints_metrics = self.ea.get_trial_checkpoints_paths(logdir)
        expected_path = os.path.join(logdir, "checkpoint_000001")
        assert os.path.normpath(checkpoints_metrics[0][0]) == expected_path
        assert checkpoints_metrics[0][1] == 1

    def testGetTrialCheckpointsPathsWithMetricByTrial(self):
        best_trial = self.ea.get_best_trial(self.metric, mode="max")
        paths = self.ea.get_trial_checkpoints_paths(best_trial, self.metric)
        logdir = self.ea.get_best_logdir(self.metric, mode="max")
        expected_path = os.path.join(logdir, "checkpoint_000001")
        assert os.path.normpath(paths[0][0]) == expected_path
        assert paths[0][1] == best_trial.metric_analysis[self.metric]["last"]

    def testGetTrialCheckpointsPathsWithMetricByPath(self):
        best_trial = self.ea.get_best_trial(self.metric, mode="max")
        logdir = self.ea.get_best_logdir(self.metric, mode="max")
        paths = self.ea.get_trial_checkpoints_paths(best_trial, self.metric)
        expected_path = os.path.join(logdir, "checkpoint_000001")
        assert os.path.normpath(paths[0][0]) == expected_path
        assert paths[0][1] == best_trial.metric_analysis[self.metric]["last"]

    def testGetBestCheckpoint(self):
        best_trial = self.ea.get_best_trial(self.metric, mode="max")
        checkpoints_metrics = self.ea.get_trial_checkpoints_paths(
            best_trial, metric=self.metric
        )
        expected_path = max(checkpoints_metrics, key=lambda x: x[1])[0]
        best_checkpoint = self.ea.get_best_checkpoint(
            best_trial, self.metric, mode="max"
        )
        assert expected_path == best_checkpoint._local_path

    def testGetBestCheckpointNan(self):
        """Tests if nan values are excluded from best checkpoint."""
        metric = "loss"

        def train(config):
            for i in range(config["steps"]):
                if i == 0:
                    value = float("nan")
                else:
                    value = i
                result = {metric: value}
                with tune.checkpoint_dir(step=i):
                    pass
                tune.report(**result)

        ea = tune.run(train, local_dir=self.test_dir, config={"steps": 3})
        best_trial = ea.get_best_trial(metric, mode="min")
        best_checkpoint = ea.get_best_checkpoint(best_trial, metric, mode="min")
        checkpoints_metrics = ea.get_trial_checkpoints_paths(best_trial, metric=metric)
        expected_checkpoint_no_nan = min(
            [
                checkpoint_metric
                for checkpoint_metric in checkpoints_metrics
                if not is_nan(checkpoint_metric[1])
            ],
            key=lambda x: x[1],
        )[0]
        assert best_checkpoint._local_path == expected_checkpoint_no_nan

    def testGetLastCheckpoint(self):
        # one more experiment with 2 iterations
        new_ea = tune.run(
            MyTrainableClass,
            name=self.test_name,
            local_dir=self.test_dir,
            stop={"training_iteration": 2},
            checkpoint_freq=1,
            config={
                "width": tune.sample_from(lambda spec: 10 + int(90 * random.random())),
                "height": tune.sample_from(lambda spec: int(100 * random.random())),
            },
        )

        # check if it's loaded correctly
        last_checkpoint = new_ea.get_last_checkpoint()._local_path
        assert self.test_path in last_checkpoint
        assert "checkpoint_000002" in last_checkpoint

        # test restoring the checkpoint and running for another iteration
        tune.run(
            MyTrainableClass,
            name=self.test_name,
            local_dir=self.test_dir,
            restore=last_checkpoint,
            stop={"training_iteration": 3},
            checkpoint_freq=1,
            config={
                "width": tune.sample_from(lambda spec: 10 + int(90 * random.random())),
                "height": tune.sample_from(lambda spec: int(100 * random.random())),
            },
        )

    def testAllDataframes(self):
        dataframes = self.ea.trial_dataframes
        self.assertTrue(len(dataframes) == self.num_samples)

        self.assertTrue(isinstance(dataframes, dict))
        for df in dataframes.values():
            self.assertEqual(df.training_iteration.max(), 1)

    def testIgnoreOtherExperiment(self):
        analysis = tune.run(
            MyTrainableClass,
            name="test_example",
            local_dir=self.test_dir,
            stop={"training_iteration": 1},
            num_samples=1,
            config={
                "width": tune.sample_from(lambda spec: 10 + int(90 * random.random())),
                "height": tune.sample_from(lambda spec: int(100 * random.random())),
            },
        )
        df = analysis.dataframe(self.metric, mode="max")
        self.assertEqual(df.shape[0], 1)

    def testGetTrialCheckpointsPathsByPathWithSpecialCharacters(self):
        analysis = tune.run(
            MyTrainableClass,
            name="test_example",
            local_dir=self.test_dir,
            stop={"training_iteration": 1},
            num_samples=1,
            config={"test": tune.grid_search([[1, 2], [3, 4]])},
            checkpoint_at_end=True,
        )
        logdir = analysis.get_best_logdir(self.metric, mode="max")
        checkpoints_metrics = analysis.get_trial_checkpoints_paths(logdir)
        expected_path = os.path.join(logdir, "checkpoint_000001")
        assert os.path.normpath(checkpoints_metrics[0][0]) == expected_path
        assert checkpoints_metrics[0][1] == 1

    def testGetTrialCheckpointsPathsWithTemporaryCheckpoints(self):
        analysis = tune.run(
            MyTrainableClass,
            name="test_example",
            local_dir=self.test_dir,
            stop={"training_iteration": 2},
            num_samples=1,
            config={"test": tune.grid_search([[1, 2], [3, 4]])},
            checkpoint_at_end=True,
        )
        logdir = analysis.get_best_logdir(self.metric, mode="max")

        shutil.copytree(
            os.path.join(logdir, "checkpoint_000002"),
            os.path.join(logdir, "checkpoint_tmpxxx"),
        )

        checkpoints_metrics = analysis.get_trial_checkpoints_paths(logdir)
        expected_path = os.path.join(logdir, "checkpoint_000002")

        assert len(checkpoints_metrics) == 1

        assert os.path.normpath(checkpoints_metrics[0][0]) == expected_path
        assert checkpoints_metrics[0][1] == 2


class ExperimentAnalysisPropertySuite(unittest.TestCase):
    def testBestProperties(self):
        def train(config):
            for i in range(10):
                with tune.checkpoint_dir(i):
                    pass
                tune.report(res=config["base"] + i)

        ea = tune.run(
            train,
            config={"base": tune.grid_search([100, 200, 300])},
            metric="res",
            mode="max",
        )

        trials = ea.trials

        self.assertEqual(ea.best_trial, trials[2])
        self.assertEqual(ea.best_config, trials[2].config)
        self.assertEqual(ea.best_logdir, trials[2].logdir)
        self.assertEqual(
            ea.best_checkpoint._local_path, trials[2].checkpoint.dir_or_data
        )
        self.assertTrue(all(ea.best_dataframe["trial_id"] == trials[2].trial_id))
        self.assertEqual(ea.results_df.loc[trials[2].trial_id, "res"], 309)
        self.assertEqual(ea.best_result["res"], 309)
        self.assertEqual(ea.best_result_df.loc[trials[2].trial_id, "res"], 309)

    def testDataframeBestResult(self):
        def train(config):
            if config["var"] == 1:
                tune.report(loss=9)
                tune.report(loss=7)
                tune.report(loss=5)
            else:
                tune.report(loss=10)
                tune.report(loss=4)
                tune.report(loss=10)

        analysis = tune.run(
            train, config={"var": tune.grid_search([1, 2])}, metric="loss", mode="min"
        )

        self.assertEqual(analysis.best_config["var"], 1)

        with self.assertRaises(ValueError):
            # Should raise because we didn't pass a metric
            df = analysis.dataframe(mode="max")

        # If we specify `min`, we expect the lowest ever observed result
        df = analysis.dataframe(metric="loss", mode="min")
        var = df[df.loss == df.loss.min()]["config/var"].values[0]
        self.assertEqual(var, 2)

        # If we don't pass a mode, we just fetch the last result
        df = analysis.dataframe(metric="loss")
        var = df[df.loss == df.loss.min()]["config/var"].values[0]
        self.assertEqual(var, 1)

        df = analysis.dataframe()
        var = df[df.loss == df.loss.min()]["config/var"].values[0]
        self.assertEqual(var, 1)


class ExperimentAnalysisStubSuite(unittest.TestCase):
    def setUp(self):
        self.test_dir = tempfile.mkdtemp()
        self.test_name = "analysis_exp"
        self.num_samples = 2
        self.metric = "episode_reward_mean"
        self.test_path = os.path.join(self.test_dir, self.test_name)
        self.run_test_exp()

    def tearDown(self):
        shutil.rmtree(self.test_dir, ignore_errors=True)
        ray.shutdown()

    def run_test_exp(self):
        def training_function(config, checkpoint_dir=None):
            tune.report(episode_reward_mean=config["alpha"])

        return tune.run(
            training_function,
            name=self.test_name,
            local_dir=self.test_dir,
            stop={"training_iteration": 1},
            num_samples=self.num_samples,
            config={
                "alpha": tune.sample_from(lambda spec: 10 + int(90 * random.random())),
            },
        )

    def testPickling(self):
        analysis = self.run_test_exp()
        pickle_path = os.path.join(self.test_dir, "analysis.pickle")
        with open(pickle_path, "wb") as f:
            pickle.dump(analysis, f)

        self.assertTrue(analysis.get_best_trial(metric=self.metric, mode="max"))

        ray.shutdown()
        ray.tune.registry._global_registry = ray.tune.registry._Registry(
            prefix="global"
        )

        with open(pickle_path, "rb") as f:
            analysis = pickle.load(f)

        self.assertTrue(analysis.get_best_trial(metric=self.metric, mode="max"))

    def testFromPath(self):
        self.run_test_exp()
        analysis = ExperimentAnalysis(self.test_path)

        self.assertTrue(analysis.get_best_trial(metric=self.metric, mode="max"))

        ray.shutdown()
        ray.tune.registry._global_registry = ray.tune.registry._Registry(
            prefix="global"
        )

        analysis = ExperimentAnalysis(self.test_path)

        # This will be None if validate_trainable during loading fails
        self.assertTrue(analysis.get_best_trial(metric=self.metric, mode="max"))


if __name__ == "__main__":
    import pytest
    import sys

    sys.exit(pytest.main(["-v", __file__]))