Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
tensorflow / purelib / tensorflow / python / tpu / device_assignment.py
Size: Mime:
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ======================================
"""Library of TPU helper functions."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import math
import numpy as np
from six.moves import xrange  # pylint: disable=redefined-builtin

from tensorflow.python.platform import tf_logging as logging
from tensorflow.python.tpu.topology import Topology
from tensorflow.python.util.tf_export import tf_export


SINGLE_CORE_ASSIGNMENT = [[[0, 0, 0]]]


def _compute_task_and_cores_to_replicas(core_assignment, topology):
  """Computes a nested dict which maps task and logical core to replicas."""
  task_and_cores_to_replicas = {}
  for replica in xrange(core_assignment.shape[0]):
    for logical_core in xrange(core_assignment.shape[1]):
      coordinates = core_assignment[replica, logical_core, :]
      task_id = topology.task_ordinal_at_coordinates(coordinates)
      if task_id not in task_and_cores_to_replicas:
        task_and_cores_to_replicas[task_id] = {}
      if logical_core not in task_and_cores_to_replicas[task_id]:
        task_and_cores_to_replicas[task_id][logical_core] = set()

      task_and_cores_to_replicas[task_id][logical_core].add(replica)

  task_to_sorted_replica_id = {}

  for task, core_to_replicas in task_and_cores_to_replicas.items():
    core_to_sorted_replicas = {}
    for core, replicas in core_to_replicas.items():
      core_to_sorted_replicas[core] = sorted(replicas)

    task_to_sorted_replica_id[task] = core_to_sorted_replicas
  return task_to_sorted_replica_id


@tf_export("tpu.experimental.DeviceAssignment")
class DeviceAssignment(object):
  """Mapping from logical cores in a computation to the physical TPU topology.

  Prefer to use the `DeviceAssignment.build()` helper to construct a
  `DeviceAssignment`; it is easier if less flexible than constructing a
  `DeviceAssignment` directly.
  """

  def __init__(self, topology, core_assignment):
    """Constructs a `DeviceAssignment` object.

    Args:
      topology: A `Topology` object that describes the physical TPU topology.
      core_assignment: A logical to physical core mapping, represented as a
        rank 3 numpy array. See the description of the `core_assignment`
        property for more details.

    Raises:
      ValueError: If `topology` is not `Topology` object.
      ValueError: If `core_assignment` is not a rank 3 numpy array.
    """
    if not isinstance(topology, Topology):
      raise ValueError("topology must be a Topology object, got {}".format(
          type(topology)))
    core_assignment = np.asarray(core_assignment, dtype=np.int32)

    self._topology = topology

    if core_assignment.ndim != 3:
      raise ValueError("core_assignment must be a rank 3 numpy array, "
                       "got shape {}".format(core_assignment.shape))

    self._num_replicas = core_assignment.shape[0]
    self._num_cores_per_replica = core_assignment.shape[1]

    if core_assignment.shape[-1] != topology.mesh_rank:
      raise ValueError(
          "minor dimension of core_assignment must have size equal to topology "
          "rank ({}), got shape {}".format(topology.mesh_rank,
                                           core_assignment.shape))

    self._core_assignment = core_assignment
    self._task_and_cores_to_replicas = _compute_task_and_cores_to_replicas(
        self._core_assignment, topology)

  @property
  def topology(self):
    """A `Topology` that describes the TPU topology."""
    return self._topology

  @property
  def num_cores_per_replica(self):
    """The number of cores per replica."""
    return self._num_cores_per_replica

  @property
  def num_replicas(self):
    """The number of replicas of the computation."""
    return self._num_replicas

  @property
  def core_assignment(self):
    """The logical to physical core mapping.

    Returns:
      An integer numpy array of rank 3, with shape
      `[num_replicas, num_cores_per_replica, topology_rank]`. Maps
      (replica, logical core) pairs to physical topology coordinates.
    """
    return self._core_assignment

  def coordinates(self, replica, logical_core):
    """Returns the physical topology coordinates of a logical core."""
    return tuple(self.core_assignment[replica, logical_core, :])

  def lookup_replicas(self, task_id, logical_core):
    """Lookup replica ids by task number and logical core.

    Args:
      task_id: TensorFlow task number.
      logical_core: An integer, identifying a logical core.
    Returns:
      A sorted list of the replicas that are attached to that task and
      logical_core.
    Raises:
      ValueError: If no replica exists in the task which contains the logical
      core.
    """
    try:
      return self._task_and_cores_to_replicas[task_id][logical_core]
    except KeyError:
      raise ValueError(
          "Can not find any replica in task: {} contains logical_core: {} ".
          format(task_id, logical_core))

  def tpu_ordinal(self, replica=0, logical_core=0):
    """Returns the ordinal of the TPU device assigned to a logical core."""
    coordinates = self.coordinates(replica, logical_core)
    return self._topology.tpu_device_ordinal_at_coordinates(coordinates)

  def host_device(self, replica=0, logical_core=0, job=None):
    """Returns the CPU device attached to a logical core."""
    coordinates = self.coordinates(replica, logical_core)
    return self._topology.cpu_device_name_at_coordinates(coordinates, job=job)

  def tpu_device(self, replica=0, logical_core=0, job=None):
    """Returns the name of the TPU device assigned to a logical core."""
    coordinates = self.coordinates(replica, logical_core)
    return self._topology.tpu_device_name_at_coordinates(coordinates, job=job)

  @staticmethod
  def build(topology,
            computation_shape=None,
            computation_stride=None,
            num_replicas=1):
    return device_assignment(topology, computation_shape, computation_stride,
                             num_replicas)


def _ring_2d(height, width):
  """Ring-order of a height x width mesh.

  For example, in a 4x4 mesh, this returns the following order.
    0 -- 1 -- 2 -- 3
    |    |    |    |
    15-- 6 -- 5 -- 4
    |    |    |    |
    14-- 7 -- 8 -- 9
    |    |    |    |
    13-- 12-- 11-- 10

  Args:
    height: An integer represents the height.
    width: An integer represents the width.

  Returns:
    A list of [y, x] pairs with ring order.
  """
  if height == 1:
    return [(0, i) for i in range(width)]
  if width == 1:
    return [(i, 0) for i in range(height)]
  if height % 2 != 0:
    logging.warning("Odd dimension")
    return [(i % height, i // height) for i in range(width * height)]
  ret = [(0, 0)]
  for i in range(height // 2):
    for j in range(1, width):
      ret.append((2 * i, j))
    for j in range(width - 1, 0, -1):
      ret.append((2 * i + 1, j))
  for i in range(height - 1, 0, -1):
    ret.append((i, 0))
  return ret


def device_assignment(topology,
                      computation_shape=None,
                      computation_stride=None,
                      num_replicas=1):
  """Computes a device_assignment of a computation across a TPU topology.

  Attempts to choose a compact grid of cores for locality.

  Returns a `DeviceAssignment` that describes the cores in the topology assigned
  to each core of each replica.

  `computation_shape` and `computation_stride` values should be powers of 2 for
  optimal packing.

  Args:
    topology: A `Topology` object that describes the TPU cluster topology.
      To obtain a TPU topology, evaluate the `Tensor` returned by
      `initialize_system` using `Session.run`. Either a serialized
      `TopologyProto` or a `Topology` object may be passed. Note: you must
      evaluate the `Tensor` first; you cannot pass an unevaluated `Tensor` here.
    computation_shape: A rank 1 int32 numpy array with size equal to the
      topology rank, describing the shape of the computation's block of cores.
      If None, the `computation_shape` is `[1] * topology_rank`.
    computation_stride: A rank 1 int32 numpy array of size `topology_rank`,
      describing the inter-core spacing of the `computation_shape` cores in the
      TPU topology. If None, the `computation_stride` is `[1] * topology_rank`.
    num_replicas: The number of computation replicas to run. The replicas will
      be packed into the free spaces of the topology.

  Returns:
    A DeviceAssignment object, which describes the mapping between the logical
    cores in each computation replica and the physical cores in the TPU
    topology.

  Raises:
    ValueError: If `topology` is not a valid `Topology` object.
    ValueError: If `computation_shape` or `computation_stride` are not 1D int32
      numpy arrays with shape [3] where all values are positive.
    ValueError: If computation's replicas cannot fit into the TPU topology.
  """
  # Deserialize the Topology proto, if it is a string.
  if isinstance(topology, bytes):
    topology = Topology(serialized=topology)

  if not isinstance(topology, Topology):
    raise ValueError("`topology` is not a Topology object; got {}".format(
        type(topology)))

  topology_rank = len(topology.mesh_shape)
  mesh_shape = topology.mesh_shape
  if computation_shape is None:
    computation_shape = np.array([1] * topology_rank, dtype=np.int32)
  else:
    computation_shape = np.asarray(computation_shape, dtype=np.int32)

  if computation_stride is None:
    computation_stride = np.array([1] * topology_rank, dtype=np.int32)
  else:
    computation_stride = np.asarray(computation_stride, dtype=np.int32)

  if computation_shape.shape != (topology_rank,):
    raise ValueError("computation_shape must have shape [{}]; got {}".format(
        topology_rank, computation_shape.shape))
  if computation_stride.shape != (topology_rank,):
    raise ValueError("computation_stride must have shape [{}]; got {}".format(
        topology_rank, computation_stride.shape))

  if any(computation_shape < 1):
    raise ValueError(
        "computation_shape must be positive; got computation_shape={}".format(
            computation_shape))
  if any(computation_stride < 1):
    raise ValueError(
        "computation_stride must be positive; got computation_stride={}".format(
            computation_stride))

  # Computes the physical size of one computation instance.
  computation_footprint = computation_shape * computation_stride
  if any(computation_footprint > mesh_shape):
    raise ValueError(
        "computation footprint {} does not fit in TPU topology shape {}".format(
            computation_footprint, mesh_shape))

  # Computes how many copies of the computation footprint fit in the mesh.
  block_counts = mesh_shape // computation_footprint

  replica_counts = block_counts * computation_stride
  max_replicas = np.prod(replica_counts)
  if num_replicas > max_replicas:
    raise ValueError(
        "requested {} replicas but only {} replicas with shape {} and "
        "computation_stride {} fit in a TPU mesh of shape {}".format(
            num_replicas, max_replicas, computation_shape, computation_stride,
            mesh_shape))

  def ceil_of_ratio(n, m):
    return (n + m - 1) // m

  replica_shape = [0] * topology_rank
  if num_replicas > 0:
    remaining_replicas = num_replicas
    remaining_dims = topology_rank

    # Choose dimensions as close to an equal cube as possible, in order of
    # increasing dimension size. By visiting dimensions in increasing size, we
    # assign the most constrained dimension first, so we won't make infeasible
    # choices.
    #
    # As a secondary sort order, visit the dimensions in reverse order. This
    # means we try to use both cores on the same chip in preference to two cores
    # on different chips.
    for x, ni in sorted(((x, -i) for (i, x) in enumerate(replica_counts))):
      i = -ni
      target_size = int(math.ceil(remaining_replicas**(1.0 / remaining_dims)))
      replica_shape[i] = min(target_size, x)
      remaining_replicas = ceil_of_ratio(remaining_replicas, replica_shape[i])
      remaining_dims -= 1

    assert remaining_replicas == 1 and remaining_dims == 0

  # Assigns an offset to each replica such that no two replicas overlap.
  replica_offsets = np.full([num_replicas, topology_rank], -1, dtype=np.int32)

  # TODO(ylc): Revisit here when topology_rank > 3.
  enable_2d_tiling = (
      topology_rank == 3 and
      computation_shape[-1] == 2  # Only handle 2D case.
      and np.prod(computation_stride) == 1  # Ensure no stride.
      and num_replicas == max_replicas)  # Full replication.
  logging.info("enable_2d_tiling: {}".format(enable_2d_tiling))
  if enable_2d_tiling:
    assignment = []
    inner_ring = _ring_2d(computation_shape[0], computation_shape[1])
    outer_ring = _ring_2d(replica_shape[0], replica_shape[1])

    for replica in xrange(num_replicas):
      outer_x, outer_y = outer_ring[replica]
      per_replica_assignment = []
      for index in xrange(np.prod(computation_shape)):
        inner_x, inner_y = inner_ring[index // 2]
        px = outer_x * computation_shape[0] + inner_x
        py = outer_y * computation_shape[1] + inner_y
        pz = index % 2
        per_replica_assignment.append([px, py, pz])
      assignment.append(per_replica_assignment)
  else:
    for replica in xrange(num_replicas):
      # Chooses a replica number in each axis.
      t = replica
      pos = []
      for dim in replica_shape[::-1]:
        pos.append(t % dim)
        t //= dim
      replica_pos = np.array(pos[::-1], dtype=np.int32)

      # Determines where that replica starts in each axis.
      outer = replica_pos // computation_stride
      inner = replica_pos % computation_stride
      replica_offsets[replica, :] = outer * computation_footprint + inner

    # Computes a logical core -> physical core mapping for each replica.
    indices = [
        np.arange(0, computation_shape[i] * computation_stride[i],
                  computation_stride[i]) for i in xrange(topology_rank)
    ]
    indices = np.concatenate(
        [i[..., np.newaxis] for i in np.meshgrid(*indices, indexing="ij")],
        axis=-1)
    indices = indices.reshape((-1, topology_rank))
    assignment = indices + replica_offsets[:, np.newaxis, :]

  return DeviceAssignment(topology, core_assignment=assignment)