# Source code for gerrychain.constraints.contiguity

```
from heapq import heappop, heappush
from itertools import count
import networkx as nx
from ..random import random
from .bounds import SelfConfiguringLowerBound
def are_reachable(G, source, avoid, targets):
"""Check that source can reach targets while avoiding given edges.
This function is a modified form of NetworkX's ``_dijkstra_multisource()``.
:param G: NetworkX graph.
:param source: Starting node.
:param weight: Function with (u, v, data) input that returns that edges weight.
:param targets: Nodes required to find.
:return:
A mapping from node to shortest distance to that node from one
of the source nodes.
:rtype: dict
"""
G_succ = G._succ if G.is_directed() else G._adj
push = heappush
pop = heappop
dist = {} # dictionary of final distances
seen = {}
# fringe is heapq with 3-tuples (distance,c,node)
# use the count c to avoid comparing nodes (may not be able to)
c = count()
fringe = []
seen[source] = 0
push(fringe, (0, next(c), source))
while not all(t in seen for t in targets) and fringe:
(d, _, v) = pop(fringe)
if v in dist:
continue # already searched this node.
dist[v] = d
for u, e in G_succ[v].items():
if avoid(v, u, e):
continue
vu_dist = dist[v] + 1
if u not in seen or vu_dist < seen[u]:
seen[u] = vu_dist
push(fringe, (vu_dist, next(c), u))
return all(t in seen for t in targets)
[docs]def single_flip_contiguous(partition):
"""Check if swapping the given node from its old assignment disconnects the
old assignment class.
:param partition: The proposed next :class:`~gerrychain.partition.Partition`
:return: whether the partition is contiguous
:rtype: bool
We assume that `removed_node` belonged to an assignment class that formed a
connected subgraph. To see if its removal left the subgraph connected, we
check that the neighbors of the removed node are still connected through
the changed graph.
"""
parent = partition.parent
flips = partition.flips
if not flips or not parent:
return contiguous(partition)
graph = partition.graph
assignment = partition.assignment
def partition_edge_avoid(start_node, end_node, edge_attrs):
"""Compute the district edge weight, which is 1 if the nodes have the same
assignment, and infinity otherwise.
"""
if assignment[start_node] != assignment[end_node]:
# Fun fact: networkx actually refuses to take edges with None
# weight.
return True
return False
for changed_node in flips:
old_assignment = partition.parent.assignment[changed_node]
old_neighbors = [
node
for node in graph.neighbors(changed_node)
if assignment[node] == old_assignment
]
if not old_neighbors:
# Under our assumptions, if there are no old neighbors, then the
# old_assignment district has vanished. It is trivially connected.
# However, we actually don't want any districts to disappear because
# it ends up breaking a lot of our other updaters. So we consider
# the empty district to be disconnected.
return False
start_neighbor = random.choice(old_neighbors)
connected = are_reachable(
graph, start_neighbor, partition_edge_avoid, old_neighbors
)
if not connected:
return False
# All neighbors of all changed nodes are connected, so the new graph is
# connected.
return True
def affected_parts(partition):
"""Returns the set of IDs of all parts that gained or lost a node during the last
flip.
"""
flips = partition.flips
parent = partition.parent
if flips is None:
return partition.parts
affected = set()
for node, part in flips.items():
affected.add(part)
affected.add(parent.assignment[node])
return affected
[docs]def contiguous(partition):
"""Check if the parts of a partition are connected using :func:`networkx.is_connected`.
:param partition: The proposed next :class:`~gerrychain.partition.Partition`
:return: whether the partition is contiguous
:rtype: bool
"""
return all(
nx.is_connected(partition.subgraphs[part]) for part in affected_parts(partition)
)
[docs]def contiguous_bfs(partition):
"""Checks that a given partition's parts are connected as graphs using a simple
breadth-first search.
:param partition: Instance of Partition
:return: Whether the parts of this partition are connected
:rtype: bool
"""
parts_to_check = affected_parts(partition)
# Generates a subgraph for each district and perform a BFS on it
# to check connectedness.
for part in parts_to_check:
adj = nx.to_dict_of_lists(partition.subgraphs[part])
if _bfs(adj) is False:
return False
return True
def number_of_contiguous_parts(partition):
"""Return the number of non-connected assignment subgraphs.
:param partition: Instance of Partition; contains connected components.
:return: number of contiguous districts
:rtype: int
"""
parts = partition.assignment.parts
return sum(1 for part in parts if nx.is_connected(partition.subgraphs[part]))
def contiguous_components(partition):
"""Return then connected components of each of the subgraphs of the parts
of the partition.
:param partition: Instance of Partition; contains connected components.
:return: dictionary mapping each part ID to a list holding the connected
subgraphs of that part of the partition
:rtype: dict
"""
return {
part: [subgraph.subgraph(nodes) for nodes in nx.connected_components(subgraph)]
for part, subgraph in partition.subgraphs.items()
}
no_more_discontiguous = SelfConfiguringLowerBound(number_of_contiguous_parts)
def _bfs(graph):
"""Performs a breadth-first search on the provided graph and returns true or
false depending on whether the graph is connected.
:param graph: Dict-of-lists; an adjacency matrix.
:return: is this graph connected?
:rtype: bool
"""
q = [next(iter(graph))]
visited = set()
total_vertices = len(graph)
# Check if the district has a single vertex. If it does, then simply return
# `True`, as it's trivially connected.
if total_vertices <= 1:
return True
# bfs!
while len(q) > 0:
current = q.pop(0)
neighbors = graph[current]
for neighbor in neighbors:
if neighbor not in visited:
visited.add(neighbor)
q += [neighbor]
return total_vertices == len(visited)
```