Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
networkx / algorithms / connectivity / tests / test_edge_kcomponents.py
Size: Mime:
# -*- coding: utf-8 -*-
import networkx as nx
import itertools as it
from nose.tools import (assert_equal, assert_not_equal, assert_greater_equal,
                        assert_raises, assert_in)
from networkx.utils import pairwise
from networkx.algorithms.connectivity import (
    bridge_components,
    EdgeComponentAuxGraph,
)
from networkx.algorithms.connectivity.edge_kcomponents import (
    general_k_edge_subgraphs,
)


# ----------------
# Helper functions
# ----------------

def fset(list_of_sets):
    """ allows == to be used for list of sets """
    return set(map(frozenset, list_of_sets))


def _assert_subgraph_edge_connectivity(G, ccs_subgraph, k):
    """
    tests properties of k-edge-connected subgraphs

    the actual edge connectivity should be no less than k unless the cc is a
    single node.
    """
    for cc in ccs_subgraph:
        C = G.subgraph(cc)
        if len(cc) > 1:
            connectivity = nx.edge_connectivity(C)
            assert_greater_equal(connectivity, k)


def _memo_connectivity(G, u, v, memo):
    edge = (u, v)
    if edge in memo:
        return memo[edge]
    if not G.is_directed():
        redge = (v, u)
        if redge in memo:
            return memo[redge]
    memo[edge] = nx.edge_connectivity(G, *edge)
    return memo[edge]


def _all_pairs_connectivity(G, cc, k, memo):
    # Brute force check
    for u, v in it.combinations(cc, 2):
        # Use a memoization dict to save on computation
        connectivity = _memo_connectivity(G, u, v, memo)
        if G.is_directed():
            connectivity = min(connectivity, _memo_connectivity(G, v, u, memo))
        assert_greater_equal(connectivity, k)


def _assert_local_cc_edge_connectivity(G, ccs_local, k, memo):
    """
    tests properties of k-edge-connected components

    the local edge connectivity between each pair of nodes in the the original
    graph should be no less than k unless the cc is a single node.
    """
    for cc in ccs_local:
        if len(cc) > 1:
            # Strategy for testing a bit faster: If the subgraph has high edge
            # connectivity then it must have local connectivity
            C = G.subgraph(cc)
            connectivity = nx.edge_connectivity(C)
            if connectivity < k:
                # Otherwise do the brute force (with memoization) check
                _all_pairs_connectivity(G, cc, k, memo)


# Helper function
def _check_edge_connectivity(G):
    """
    Helper - generates all k-edge-components using the aux graph.  Checks the
    both local and subgraph edge connectivity of each cc. Also checks that
    alternate methods of computing the k-edge-ccs generate the same result.
    """
    # Construct the auxillary graph that can be used to make each k-cc or k-sub
    aux_graph = EdgeComponentAuxGraph.construct(G)

    # memoize the local connectivity in this graph
    memo = {}

    for k in it.count(1):
        # Test "local" k-edge-components and k-edge-subgraphs
        ccs_local = fset(aux_graph.k_edge_components(k))
        ccs_subgraph = fset(aux_graph.k_edge_subgraphs(k))

        # Check connectivity properties that should be garuenteed by the
        # algorithms.
        _assert_local_cc_edge_connectivity(G, ccs_local, k, memo)
        _assert_subgraph_edge_connectivity(G, ccs_subgraph, k)

        if k == 1 or k == 2 and not G.is_directed():
            assert_equal(ccs_local, ccs_subgraph,
                         'Subgraphs and components should be the same '
                         'when k == 1 or (k == 2 and not G.directed())')

        if G.is_directed():
            # Test special case methods are the same as the aux graph
            if k == 1:
                alt_sccs = fset(nx.strongly_connected_components(G))
                assert_equal(alt_sccs, ccs_local, 'k=1 failed alt')
                assert_equal(alt_sccs, ccs_subgraph, 'k=1 failed alt')
        else:
            # Test special case methods are the same as the aux graph
            if k == 1:
                alt_ccs = fset(nx.connected_components(G))
                assert_equal(alt_ccs, ccs_local, 'k=1 failed alt')
                assert_equal(alt_ccs, ccs_subgraph, 'k=1 failed alt')
            elif k == 2:
                alt_bridge_ccs = fset(bridge_components(G))
                assert_equal(alt_bridge_ccs, ccs_local, 'k=2 failed alt')
                assert_equal(alt_bridge_ccs, ccs_subgraph, 'k=2 failed alt')
            # if new methods for k == 3 or k == 4 are implemented add them here

        # Check the general subgraph method works by itself
        alt_subgraph_ccs = fset([set(C.nodes()) for C in
                                 general_k_edge_subgraphs(G, k=k)])
        assert_equal(alt_subgraph_ccs, ccs_subgraph,
                     'alt subgraph method failed')

        # Stop once k is larger than all special case methods
        # and we cannot break down ccs any further.
        if k > 2 and all(len(cc) == 1 for cc in ccs_local):
            break


# ----------------
# Misc tests
# ----------------

def test_zero_k_exception():
    G = nx.Graph()
    # functions that return generators error immediately
    assert_raises(ValueError, nx.k_edge_components, G, k=0)
    assert_raises(ValueError, nx.k_edge_subgraphs, G, k=0)

    # actual generators only error when you get the first item
    aux_graph = EdgeComponentAuxGraph.construct(G)
    assert_raises(ValueError, list, aux_graph.k_edge_components(k=0))
    assert_raises(ValueError, list, aux_graph.k_edge_subgraphs(k=0))

    assert_raises(ValueError, list, general_k_edge_subgraphs(G, k=0))


def test_empty_input():
    G = nx.Graph()
    assert_equal([], list(nx.k_edge_components(G, k=5)))
    assert_equal([], list(nx.k_edge_subgraphs(G, k=5)))

    G = nx.DiGraph()
    assert_equal([], list(nx.k_edge_components(G, k=5)))
    assert_equal([], list(nx.k_edge_subgraphs(G, k=5)))


def test_not_implemented():
    G = nx.MultiGraph()
    assert_raises(nx.NetworkXNotImplemented, EdgeComponentAuxGraph.construct, G)
    assert_raises(nx.NetworkXNotImplemented, nx.k_edge_components, G, k=2)
    assert_raises(nx.NetworkXNotImplemented, nx.k_edge_subgraphs, G, k=2)
    assert_raises(nx.NetworkXNotImplemented, bridge_components, G)
    assert_raises(nx.NetworkXNotImplemented, bridge_components, nx.DiGraph())


def test_general_k_edge_subgraph_quick_return():
    # tests quick return optimization
    G = nx.Graph()
    G.add_node(0)
    subgraphs = list(general_k_edge_subgraphs(G, k=1))
    assert_equal(len(subgraphs), 1)
    for subgraph in subgraphs:
        assert_equal(subgraph.number_of_nodes(), 1)

    G.add_node(1)
    subgraphs = list(general_k_edge_subgraphs(G, k=1))
    assert_equal(len(subgraphs), 2)
    for subgraph in subgraphs:
        assert_equal(subgraph.number_of_nodes(), 1)


# ----------------
# Undirected tests
# ----------------

def test_random_gnp():
    # seeds = [1550709854, 1309423156, 4208992358, 2785630813, 1915069929]
    seeds = [2785630813, 1915069929]

    for seed in seeds:
        G = nx.gnp_random_graph(20, 0.2, seed=seed)
        _check_edge_connectivity(G)


def test_configuration():
    seeds = [2718183590, 2470619828, 1694705158, 3001036531, 2401251497]
    for seed in seeds:
        deg_seq = nx.random_powerlaw_tree_sequence(20, seed=seed, tries=5000)
        G = nx.Graph(nx.configuration_model(deg_seq, seed=seed))
        G.remove_edges_from(nx.selfloop_edges(G))
        _check_edge_connectivity(G)


def test_shell():
    # seeds = [2057382236, 3331169846, 1840105863, 476020778, 2247498425]
    seeds = [2057382236]
    for seed in seeds:
        constructor = [(12, 70, 0.8), (15, 40, 0.6)]
        G = nx.random_shell_graph(constructor, seed=seed)
        _check_edge_connectivity(G)


def test_karate():
    G = nx.karate_club_graph()
    _check_edge_connectivity(G)


def test_tarjan_bridge():
    # graph from tarjan paper
    # RE Tarjan - "A note on finding the bridges of a graph"
    # Information Processing Letters, 1974 - Elsevier
    # doi:10.1016/0020-0190(74)90003-9.
    # define 2-connected components and bridges
    ccs = [(1, 2, 4, 3, 1, 4), (5, 6, 7, 5), (8, 9, 10, 8),
           (17, 18, 16, 15, 17), (11, 12, 14, 13, 11, 14)]
    bridges = [(4, 8), (3, 5), (3, 17)]
    G = nx.Graph(it.chain(*(pairwise(path) for path in ccs + bridges)))
    _check_edge_connectivity(G)


def test_bridge_cc():
    # define 2-connected components and bridges
    cc2 = [(1, 2, 4, 3, 1, 4), (8, 9, 10, 8), (11, 12, 13, 11)]
    bridges = [(4, 8), (3, 5), (20, 21), (22, 23, 24)]
    G = nx.Graph(it.chain(*(pairwise(path) for path in cc2 + bridges)))
    bridge_ccs = fset(bridge_components(G))
    target_ccs = fset([
        {1, 2, 3, 4}, {5}, {8, 9, 10}, {11, 12, 13}, {20},
        {21}, {22}, {23}, {24}
    ])
    assert_equal(bridge_ccs, target_ccs)
    _check_edge_connectivity(G)


def test_undirected_aux_graph():
    # Graph similar to the one in
    # http://journals.plos.org/plosone/article?id=10.1371/journal.pone.0136264
    a, b, c, d, e, f, g, h, i = 'abcdefghi'
    paths = [
        (a, d, b, f, c),
        (a, e, b),
        (a, e, b, c, g, b, a),
        (c, b),
        (f, g, f),
        (h, i)
    ]
    G = nx.Graph(it.chain(*[pairwise(path) for path in paths]))
    aux_graph = EdgeComponentAuxGraph.construct(G)

    components_1 = fset(aux_graph.k_edge_subgraphs(k=1))
    target_1 = fset([{a, b, c, d, e, f, g}, {h, i}])
    assert_equal(target_1, components_1)

    # Check that the undirected case for k=1 agrees with CCs
    alt_1 = fset(nx.k_edge_subgraphs(G, k=1))
    assert_equal(alt_1, components_1)

    components_2 = fset(aux_graph.k_edge_subgraphs(k=2))
    target_2 = fset([{a, b, c, d, e, f, g}, {h}, {i}])
    assert_equal(target_2, components_2)

    # Check that the undirected case for k=2 agrees with bridge components
    alt_2 = fset(nx.k_edge_subgraphs(G, k=2))
    assert_equal(alt_2, components_2)

    components_3 = fset(aux_graph.k_edge_subgraphs(k=3))
    target_3 = fset([{a}, {b, c, f, g}, {d}, {e}, {h}, {i}])
    assert_equal(target_3, components_3)

    components_4 = fset(aux_graph.k_edge_subgraphs(k=4))
    target_4 = fset([{a}, {b}, {c}, {d}, {e}, {f}, {g}, {h}, {i}])
    assert_equal(target_4, components_4)

    _check_edge_connectivity(G)


def test_local_subgraph_difference():
    paths = [
        (11, 12, 13, 14, 11, 13, 14, 12),  # first 4-clique
        (21, 22, 23, 24, 21, 23, 24, 22),  # second 4-clique
        # paths connecting each node of the 4 cliques
        (11, 101, 21),
        (12, 102, 22),
        (13, 103, 23),
        (14, 104, 24),
    ]
    G = nx.Graph(it.chain(*[pairwise(path) for path in paths]))
    aux_graph = EdgeComponentAuxGraph.construct(G)

    # Each clique is returned separately in k-edge-subgraphs
    subgraph_ccs = fset(aux_graph.k_edge_subgraphs(3))
    subgraph_target = fset([{101}, {102}, {103}, {104},
                            {21, 22, 23, 24}, {11, 12, 13, 14}])
    assert_equal(subgraph_ccs, subgraph_target)

    # But in k-edge-ccs they are returned together
    # because they are locally 3-edge-connected
    local_ccs = fset(aux_graph.k_edge_components(3))
    local_target = fset([{101}, {102}, {103}, {104},
                         {11, 12, 13, 14, 21, 22, 23, 24}])
    assert_equal(local_ccs, local_target)


def test_local_subgraph_difference_directed():
    dipaths = [
        (1, 2, 3, 4, 1),
        (1, 3, 1),
    ]
    G = nx.DiGraph(it.chain(*[pairwise(path) for path in dipaths]))

    assert_equal(
        fset(nx.k_edge_components(G, k=1)),
        fset(nx.k_edge_subgraphs(G, k=1))
    )

    # Unlike undirected graphs, when k=2, for directed graphs there is a case
    # where the k-edge-ccs are not the same as the k-edge-subgraphs.
    # (in directed graphs ccs and subgraphs are the same when k=2)
    assert_not_equal(
        fset(nx.k_edge_components(G, k=2)),
        fset(nx.k_edge_subgraphs(G, k=2))
    )

    assert_equal(
        fset(nx.k_edge_components(G, k=3)),
        fset(nx.k_edge_subgraphs(G, k=3))
    )

    _check_edge_connectivity(G)


def test_triangles():
    paths = [
        (11, 12, 13, 11),  # first 3-clique
        (21, 22, 23, 21),  # second 3-clique
        (11, 21),  # connected by an edge
    ]
    G = nx.Graph(it.chain(*[pairwise(path) for path in paths]))

    # subgraph and ccs are the same in all cases here
    assert_equal(
        fset(nx.k_edge_components(G, k=1)),
        fset(nx.k_edge_subgraphs(G, k=1))
    )

    assert_equal(
        fset(nx.k_edge_components(G, k=2)),
        fset(nx.k_edge_subgraphs(G, k=2))
    )

    assert_equal(
        fset(nx.k_edge_components(G, k=3)),
        fset(nx.k_edge_subgraphs(G, k=3))
    )

    _check_edge_connectivity(G)


def test_four_clique():
    paths = [
        (11, 12, 13, 14, 11, 13, 14, 12),  # first 4-clique
        (21, 22, 23, 24, 21, 23, 24, 22),  # second 4-clique
        # paths connecting the 4 cliques such that they are
        # 3-connected in G, but not in the subgraph.
        # Case where the nodes bridging them do not have degree less than 3.
        (100, 13),
        (12, 100, 22),
        (13, 200, 23),
        (14, 300, 24),
    ]
    G = nx.Graph(it.chain(*[pairwise(path) for path in paths]))

    # The subgraphs and ccs are different for k=3
    local_ccs = fset(nx.k_edge_components(G, k=3))
    subgraphs = fset(nx.k_edge_subgraphs(G, k=3))
    assert_not_equal(local_ccs, subgraphs)

    # The cliques ares in the same cc
    clique1 = frozenset(paths[0])
    clique2 = frozenset(paths[1])
    assert_in(clique1.union(clique2).union({100}), local_ccs)

    # but different subgraphs
    assert_in(clique1, subgraphs)
    assert_in(clique2, subgraphs)

    assert_equal(G.degree(100), 3)

    _check_edge_connectivity(G)


def test_five_clique():
    # Make a graph that can be disconnected less than 4 edges, but no node has
    # degree less than 4.
    G = nx.disjoint_union(nx.complete_graph(5), nx.complete_graph(5))
    paths = [
        # add aux-connections
        (1, 100, 6), (2, 100, 7), (3, 200, 8), (4, 200, 100),
    ]
    G.add_edges_from(it.chain(*[pairwise(path) for path in paths]))
    assert_equal(min(dict(nx.degree(G)).values()), 4)

    # For k=3 they are the same
    assert_equal(
        fset(nx.k_edge_components(G, k=3)),
        fset(nx.k_edge_subgraphs(G, k=3))
    )

    # For k=4 they are the different
    # the aux nodes are in the same CC as clique 1 but no the same subgraph
    assert_not_equal(
        fset(nx.k_edge_components(G, k=4)),
        fset(nx.k_edge_subgraphs(G, k=4))
    )

    # For k=5 they are not the same
    assert_not_equal(
        fset(nx.k_edge_components(G, k=5)),
        fset(nx.k_edge_subgraphs(G, k=5))
    )

    # For k=6 they are the same
    assert_equal(
        fset(nx.k_edge_components(G, k=6)),
        fset(nx.k_edge_subgraphs(G, k=6))
    )
    _check_edge_connectivity(G)


# ----------------
# Undirected tests
# ----------------

def test_directed_aux_graph():
    # Graph similar to the one in
    # http://journals.plos.org/plosone/article?id=10.1371/journal.pone.0136264
    a, b, c, d, e, f, g, h, i = 'abcdefghi'
    dipaths = [
        (a, d, b, f, c),
        (a, e, b),
        (a, e, b, c, g, b, a),
        (c, b),
        (f, g, f),
        (h, i)
    ]
    G = nx.DiGraph(it.chain(*[pairwise(path) for path in dipaths]))
    aux_graph = EdgeComponentAuxGraph.construct(G)

    components_1 = fset(aux_graph.k_edge_subgraphs(k=1))
    target_1 = fset([{a, b, c, d, e, f, g}, {h}, {i}])
    assert_equal(target_1, components_1)

    # Check that the directed case for k=1 agrees with SCCs
    alt_1 = fset(nx.strongly_connected_components(G))
    assert_equal(alt_1, components_1)

    components_2 = fset(aux_graph.k_edge_subgraphs(k=2))
    target_2 = fset([{i}, {e}, {d}, {b, c, f, g}, {h}, {a}])
    assert_equal(target_2, components_2)

    components_3 = fset(aux_graph.k_edge_subgraphs(k=3))
    target_3 = fset([{a}, {b}, {c}, {d}, {e}, {f}, {g}, {h}, {i}])
    assert_equal(target_3, components_3)


def test_random_gnp_directed():
    # seeds = [3894723670, 500186844, 267231174, 2181982262, 1116750056]
    seeds = [2181982262]
    for seed in seeds:
        G = nx.gnp_random_graph(20, 0.2, directed=True, seed=seed)
        _check_edge_connectivity(G)


def test_configuration_directed():
    # seeds = [671221681, 2403749451, 124433910, 672335939, 1193127215]
    seeds = [672335939]
    for seed in seeds:
        deg_seq = nx.random_powerlaw_tree_sequence(20, seed=seed, tries=5000)
        G = nx.DiGraph(nx.configuration_model(deg_seq, seed=seed))
        G.remove_edges_from(nx.selfloop_edges(G))
        _check_edge_connectivity(G)


def test_shell_directed():
    # seeds = [3134027055, 4079264063, 1350769518, 1405643020, 530038094]
    seeds = [3134027055]
    for seed in seeds:
        constructor = [(12, 70, 0.8), (15, 40, 0.6)]
        G = nx.random_shell_graph(constructor, seed=seed).to_directed()
        _check_edge_connectivity(G)


def test_karate_directed():
    G = nx.karate_club_graph().to_directed()
    _check_edge_connectivity(G)