# -*- coding: utf-8 -*-
"""
abm.xypops
~~~~~~~~~~
Environments not backed by networkx whose x, y traits are used in visualization
"""
from scipy.stats.distributions import norm
from scipy.stats.distributions import uniform
from sklearn.metrics.pairwise import euclidean_distances
from abm.viz import display_network
from abm.pops import Environment
from abm.entities import XyEntity
import numpy as np
from random import choice
Y_DIST = norm(300, 10)
CLUSTER_X_DIST_MAP = {
'A': uniform(0, 50),
'B': uniform(30, 50),
'C': uniform(60, 50)
}
CLUSTER_SIZES = {
'A': 8,
'B': 10,
'C': 8
}
[docs]def make_points(cluster, size, y_dist, x_dist):
"""Creates a set of points using y_dist and x_dist to draw the location."""
ys = y_dist.rvs(size)
xs = x_dist.rvs(size)
return list(zip(xs, ys, [cluster] * size))
[docs]class XyEnvironment(Environment):
"""
A set of connected Entities. Handles message passing and displaying.
Entities are connected randomly.
"""
def __init__(self, y_pos_dist=Y_DIST, cluster_x_dists=CLUSTER_X_DIST_MAP,
cluster_sizes=CLUSTER_SIZES, single_component=True,
entity_class=XyEntity, **kwargs):
super(XyEnvironment, self).__init__(**kwargs)
self.population = []
self.connectivity_matrix = None
self.connected_components = []
self.node_component_map = {}
self.entity_class = entity_class
self._set_entities(y_pos_dist, cluster_x_dists, cluster_sizes)
self._set_connectivity_matrix()
self._set_connections()
if single_component:
self._ensure_single_component()
def _set_entities(self, y_pos_dist, cluster_x_dists, cluster_sizes):
point_args = []
for cluster, size in cluster_sizes.iteritems():
point_args += make_points(cluster, size,
y_pos_dist, cluster_x_dists[cluster])
for ix, (x, y, cluster) in enumerate(point_args):
pt = self.entity_class(environment=self, index=ix, x=x, y=y, cluster=cluster)
self.population.append(pt)
self.size = len(self.population)
def _set_connections(self, track_components=True):
"""Initializes each Entity's adjacency list.
:param track_components: Flag for tracking connected components during graph construction
"""
for index, point in enumerate(self.population):
# make set of connections to indices; np.where returns a tuple
adjacencies = set(np.where(self.connectivity_matrix[index] > 0)[0])
adjacencies.discard(index)
# pass adjacency information down to agent
point.set_adjacencies(adjacencies)
if track_components:
# track connected components as we construct edges
if index in self.node_component_map:
component = self.node_component_map[index]
else:
component = set([index])
self.node_component_map[index] = component
self.connected_components.append(component)
# update the component in place with potential new members
component.update(adjacencies)
# update the node - component map so we can fetch this object
# for adjacencies
self.node_component_map.update(
{a: component for a in adjacencies})
# resolve potential component connections
self._resolve_components(component)
n = float(len(self.population))
k = float(np.sum(self.connectivity_matrix)) / 2
self.edge_density = k / (n * (n - 1) / 2)
def _ensure_single_component(self):
"""
Iterate through disjoint component list, adding connections between sequential components
Update other datastructures to reflect the new connections
"""
for ix, component in enumerate(self.connected_components[:-1]):
start, end = (choice(list(component)), choice(
list(self.connected_components[ix + 1])))
self.population[start].adjacencies.append(end)
self.population[end].adjacencies.append(start)
self.connectivity_matrix[start][end] = True
self.connectivity_matrix[end][start] = True
self.connected_components[ix].add(end)
self.connected_components[ix + 1].add(start)
self._resolve_components(self.connected_components[0])
def _resolve_components(self, component):
"""
Find components thought to be separate that now have intersections
Condense these and set self.connected_components to be a list of disjoint sets
"""
resolved_components = [component]
for other_component in self.connected_components:
if other_component.intersection(component) or other_component is component:
component.update(other_component)
self.node_component_map.update(
{a: component for a in other_component})
else:
resolved_components.append(other_component)
self.connected_components = resolved_components
def _set_connectivity_matrix(self):
"""
Computes the connectivity matrix of this Environment. Each point is
connected to each other within a radius.
"""
if self.connectivity_matrix is not None:
return
# generate a random symmetric matrix
point_count = len(self.population)
matrix = np.random.randint(
0, 2, point_count ** 2).reshape(point_count, point_count)
matrix = (matrix + matrix.T) / 2
for i in range(point_count):
matrix[i][i] = 0
self.connectivity_matrix = matrix
[docs] def display(self, current=None, target=None):
"""
Plots the state of the task. If <show> = False, doesn't plot
anything and the simulation can run faster.
"""
if not self.show:
return
display_network(self.population, self.connectivity_matrix,
current=current, target=target)
[docs]class CappedPreferentialEnvironment(XyEnvironment):
"""
A set of connected Entities. Handles message passing and displaying. Connections are laid
out such that entities of the same cluster are more likely to be tied together,
proportionally to a parameter alpha. The overall density of the network is controlled
by a parameter beta.
"""
def __init__(self, alpha=0.8, beta=0.4, *args, **kwargs):
self.alpha = alpha
self.beta = beta
super(CappedPreferentialEnvironment, self).__init__(*args, **kwargs)
def _set_connectivity_matrix(self):
"""
Computes the connectivity matrix of this Environment. Each point is
connected to each other within a radius.
"""
if self.connectivity_matrix is not None:
return
def decide_connection(point1, point2):
# A point is connected to another point of its same cluster
# with high probability proportional to alpha, and to
# another point of a different clluester with probability
# proportional to 1 - alpha.
# Moreover, the edge density of a network is capped at a value
# beta. That's why we choose a 0 with probability 1-beta,
# and partition beta into alpha and 1-alpha.
alpha = self.alpha
beta = self.beta
if point1.cluster == point2.cluster:
tie = np.random.choice(
[0, 0, 1], p=[1 - beta, beta * (1 - alpha), beta * alpha])
else:
tie = np.random.choice(
[0, 0, 1], p=[1 - beta, beta * alpha, beta * (1 - alpha)])
return tie
matrix = np.array([[0] * len(self.population)
for _ in range(len(self.population))])
# since the graph is undirected, the matrix is symmetric,
# which in turn means we need only compute the lower triangular
# elements and then copy them into the upper triangular elements
for i, point1 in enumerate(self.population):
for j, point2 in enumerate(self.population[:i]):
matrix[i][j] = decide_connection(point1, point2)
matrix[j][i] = matrix[i][j]
self.connectivity_matrix = matrix
[docs]class NearestNeighborsEnvironment(XyEnvironment):
"""
A set of connected Entities. Handles message passing and displaying. Connections laid
out geographically: each point is connected to some of its nearest neighbors.
"""
def _set_connectivity_matrix(self):
"""
Computes the connectivity matrix of this Environment. Each point is
connected to each other within a radius.
"""
if self.connectivity_matrix is not None:
return
points_arr = np.array([[p.x, p.y] for p in self.population])
distance_mat = euclidean_distances(points_arr, points_arr)
# Every point p will be connected to each other point whose distance
# to p is less than a cut-off value. This value is computed as the
# mean of {min_nonzero(dist_mat(p)) | p is a point}, times a factor
def min_nonzero(r):
return min(r[r > 0])
# apply_along_axis(f, axis=1, arr) applies f to each row
min_neighbor_distances = np.apply_along_axis(
min_nonzero, axis=1, arr=distance_mat)
factor = 2.2
neighbor_cutoff = np.mean(min_neighbor_distances) * factor
connectivity_matrix = distance_mat < neighbor_cutoff
self.connectivity_matrix = connectivity_matrix