Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
ray / purelib / ray / tune / automl / genetic_searcher.py
Size: Mime:
import logging
import numpy as np

from ray.tune.automl.search_policy import AutoMLSearcher

logger = logging.getLogger(__name__)
LOGGING_PREFIX = "[GENETIC SEARCH] "


class GeneticSearch(AutoMLSearcher):
    """Implement the genetic search.

    Keep a collection of top-K parameter permutations as base genes,
    then apply selection, crossover, and mutation to them to generate
    new genes (a.k.a new generation). Hopefully, the performance of
    the top population would increase generation by generation.
    """

    def __init__(
        self,
        search_space,
        reward_attr,
        max_generation=2,
        population_size=10,
        population_decay=0.95,
        keep_top_ratio=0.2,
        selection_bound=0.4,
        crossover_bound=0.4,
    ):
        """
        Initialize GeneticSearcher.

        Args:
            search_space: The space to search.
            reward_attr: The attribute name of the reward in the result.
            max_generation: Max iteration number of genetic search.
            population_size: Number of trials of the initial generation.
            population_decay: Decay ratio of population size for the
                next generation.
            keep_top_ratio: Ratio of the top performance population.
            selection_bound: Threshold for performing selection.
            crossover_bound: Threshold for performing crossover.
        """
        super(GeneticSearch, self).__init__(search_space, reward_attr)

        self._cur_generation = 1
        self._max_generation = max_generation
        self._population_size = population_size
        self._population_decay = population_decay
        self._keep_top_ratio = keep_top_ratio
        self._selection_bound = selection_bound
        self._crossover_bound = crossover_bound

        self._cur_config_list = []
        self._cur_encoding_list = []
        for _ in range(population_size):
            one_hot = self.search_space.generate_random_one_hot_encoding()
            self._cur_encoding_list.append(one_hot)
            self._cur_config_list.append(
                self.search_space.apply_one_hot_encoding(one_hot)
            )

    def _select(self):
        population_size = len(self._cur_config_list)
        logger.info(
            LOGGING_PREFIX + "Generate the %sth generation, population=%s",
            self._cur_generation,
            population_size,
        )
        return self._cur_config_list, self._cur_encoding_list

    def _feedback(self, trials):
        self._cur_generation += 1
        if self._cur_generation > self._max_generation:
            return AutoMLSearcher.TERMINATE

        sorted_trials = sorted(
            trials, key=lambda t: t.best_result[self.reward_attr], reverse=True
        )
        self._cur_encoding_list = self._next_generation(sorted_trials)
        self._cur_config_list = []
        for one_hot in self._cur_encoding_list:
            self._cur_config_list.append(
                self.search_space.apply_one_hot_encoding(one_hot)
            )

        return AutoMLSearcher.CONTINUE

    def _next_generation(self, sorted_trials):
        """Generate genes (encodings) for the next generation.

        Use the top K (_keep_top_ratio) trials of the last generation
        as candidates to generate the next generation. The action could
        be selection, crossover and mutation according corresponding
        ratio (_selection_bound, _crossover_bound).

        Args:
            sorted_trials: List of finished trials with top
                performance ones first.

        Returns:
            A list of new genes (encodings)
        """

        candidate = []
        next_generation = []
        num_population = self._next_population_size(len(sorted_trials))
        top_num = int(max(num_population * self._keep_top_ratio, 2))

        for i in range(top_num):
            candidate.append(sorted_trials[i].extra_arg)
            next_generation.append(sorted_trials[i].extra_arg)

        for i in range(top_num, num_population):
            flip_coin = np.random.uniform()
            if flip_coin < self._selection_bound:
                next_gen = GeneticSearch._selection(candidate)
            else:
                if flip_coin < self._selection_bound + self._crossover_bound:
                    next_gen = GeneticSearch._crossover(candidate)
                else:
                    next_gen = GeneticSearch._mutation(candidate)
            next_generation.append(next_gen)
        return next_generation

    def _next_population_size(self, last_population_size):
        """Calculate the population size of the next generation.

        Intuitively, the population should decay after each iteration since
        it should converge. It can also decrease the total resource required.

        Args:
            last_population_size: The last population size.

        Returns:
            The new population size.
        """
        # TODO: implement an generic resource allocate algorithm.
        return int(max(last_population_size * self._population_decay, 3))

    @staticmethod
    def _selection(candidate):
        """Perform selection action to candidates.

        For example, new gene = sample_1 + the 5th bit of sample2.

        Args:
            candidate: List of candidate genes (encodings).

        Examples:
            >>> import numpy as np
            >>> from ray.tune.automl.genetic_searcher import GeneticSearch
            >>> genetic_search = GeneticSearch( # doctest: +SKIP
            ...     search_space={}, reward_attr="attr")
            >>> # Genes that represent 3 parameters
            >>> gene1 = np.array([[0, 0, 1], [0, 1], [1, 0]])
            >>> gene2 = np.array([[0, 1, 0], [1, 0], [0, 1]])
            >>> new_gene = genetic_search._selection([gene1, gene2]) # doctest: +SKIP
            >>> # new_gene could be gene1 overwritten with the
            >>> # 2nd parameter of gene2
            >>> # in which case:
            >>> #   new_gene[0] = gene1[0]
            >>> #   new_gene[1] = gene2[1]
            >>> #   new_gene[2] = gene1[0]

        Returns:
            New gene (encoding)
        """
        sample_index1 = np.random.choice(len(candidate))
        sample_index2 = np.random.choice(len(candidate))
        sample_1 = candidate[sample_index1]
        sample_2 = candidate[sample_index2]
        select_index = np.random.choice(len(sample_1))
        logger.info(
            LOGGING_PREFIX + "Perform selection from %sth to %sth at index=%s",
            sample_index2,
            sample_index1,
            select_index,
        )

        next_gen = []
        for i in range(len(sample_1)):
            sample = sample_2[i] if i is select_index else sample_1[i]
            next_gen.append(sample)
        return next_gen

    @staticmethod
    def _crossover(candidate):
        """Perform crossover action to candidates.

        For example, new gene = 60% sample_1 + 40% sample_2.

        Args:
            candidate: List of candidate genes (encodings).

        Examples:
            >>> import numpy as np
            >>> from ray.tune.automl.genetic_searcher import GeneticSearch
            >>> genetic_search = GeneticSearch( # doctest: +SKIP
            ...     search_space={}, reward_attr="attr")
            >>> # Genes that represent 3 parameters
            >>> gene1 = np.array([[0, 0, 1], [0, 1], [1, 0]])
            >>> gene2 = np.array([[0, 1, 0], [1, 0], [0, 1]])
            >>> new_gene = genetic_search._crossover([gene1, gene2]) # doctest: +SKIP
            >>> # new_gene could be the first [n=1] parameters of
            >>> # gene1 + the rest of gene2
            >>> # in which case:
            >>> #   new_gene[0] = gene1[0]
            >>> #   new_gene[1] = gene2[1]
            >>> #   new_gene[2] = gene1[1]

        Returns:
            New gene (encoding)
        """
        sample_index1 = np.random.choice(len(candidate))
        sample_index2 = np.random.choice(len(candidate))
        sample_1 = candidate[sample_index1]
        sample_2 = candidate[sample_index2]
        cross_index = int(len(sample_1) * np.random.uniform(low=0.3, high=0.7))
        logger.info(
            LOGGING_PREFIX + "Perform crossover between %sth and %sth at index=%s",
            sample_index1,
            sample_index2,
            cross_index,
        )

        next_gen = []
        for i in range(len(sample_1)):
            sample = sample_2[i] if i > cross_index else sample_1[i]
            next_gen.append(sample)
        return next_gen

    @staticmethod
    def _mutation(candidate, rate=0.1):
        """Perform mutation action to candidates.

        For example, randomly change 10% of original sample

        Args:
            candidate: List of candidate genes (encodings).
            rate: Percentage of mutation bits

        Examples:
            >>> import numpy as np
            >>> from ray.tune.automl.genetic_searcher import GeneticSearch
            >>> genetic_search = GeneticSearch( # doctest: +SKIP
            ...     search_space={}, reward_attr="attr")
            >>> # Genes that represent 3 parameters
            >>> gene1 = np.array([[0, 0, 1], [0, 1], [1, 0]])
            >>> new_gene = genetic_search._mutation([gene1]) # doctest: +SKIP
            >>> # new_gene could be the gene1 with the 3rd parameter changed
            >>> #   new_gene[0] = gene1[0]
            >>> #   new_gene[1] = gene1[1]
            >>> #   new_gene[2] = [0, 1] != gene1[2]

        Returns:
            New gene (encoding)
        """
        sample_index = np.random.choice(len(candidate))
        sample = candidate[sample_index]
        idx_list = []
        for i in range(int(max(len(sample) * rate, 1))):
            idx = np.random.choice(len(sample))
            idx_list.append(idx)

            field = sample[idx]  # one-hot encoding
            field[np.argmax(field)] = 0
            bit = np.random.choice(field.shape[0])
            field[bit] = 1

        logger.info(
            LOGGING_PREFIX + "Perform mutation on %sth at index=%s",
            sample_index,
            str(idx_list),
        )
        return sample