Source code for abm.pops
# -*- coding: utf-8 -*-
"""
abm.pops
~~~~~~~~
Environments of interconnected agents
"""
from abm.entities import Task
from cached_property import cached_property
import numpy as np
[docs]class Environment(object):
def __init__(self, debug=True, path_cutoff=20):
self.path = []
self.show = True
self.debug = debug
self.path_cutoff = path_cutoff
[docs] def log(self, msg):
"""Prints a message to stdout, only if self.debug=True."""
if self.debug:
print(msg)
def _distribute_awards(self, task):
awarded = set()
# the last entry in the path is the stopping point, which made no decisions
for point in self.path[:-1]:
if point in awarded:
continue
awarded.add(point)
amount = self._calculate_award(task, self.path, point)
self.population[point].award(amount)
if hasattr(self, 'flush_updates'):
self.flush_updates()
def _calculate_award(self, task, path, entity):
"""
Returns the amount awarded to <entity> after <task> has been
routed through <path>.
"""
if len(self.path) >= self.path_cutoff:
# detect failure
return -1. / self.path_cutoff
# in a one traversal case, the one traversal gets all the credit
k = float(len(path) - 1)
return (task.value / k)
[docs] def initiate_task(self, fixed_pair=None):
"""Initializes a task, and calls pass_message()."""
start, end = [-1, -1] if not fixed_pair else fixed_pair
if fixed_pair and start == end:
self.log("changing fixed pair: they're the same!")
while start == end:
[start, end] = self._pick_start_end()
self.log('new task created')
self.log('starting at %s and aiming for %s' % (start, end))
task = self._generate_task(end)
self.path = []
self._run_task(start, task)
def _generate_task(self, target):
return Task(target)
def _run_task(self, start, task):
self.path.append(start)
# take first step
recipient = self.population[start].next(task, sender=None)
sender = start
self.path.append(recipient)
while not (recipient == task.target or len(self.path) >= self.path_cutoff):
next_recipient = self.population[recipient].next(task, sender=sender)
sender = recipient
recipient = next_recipient
self.path.append(recipient)
self._distribute_awards(task)
def _pick_start_end(self):
return np.random.randint(self.size, size=2).tolist()
[docs] def clear(self):
"""
Clear every Entity's history. Called when the package is stuck
in a cycle.
"""
point_iterator = (self.population
if isinstance(self.population, list)
else self.population.values())
for point in point_iterator:
point.sent = []
[docs]class TaskFeatureMixin(object):
"""Overrides _generate_task with a feature-defining version"""
bias = False
node_index_indicator = False
@cached_property
def _attribute_categories(self):
return [(k, v.keys()) for k, v in self.attributes.items()]
def _generate_task(self, target):
"""
Convert categorical data about the target into an indicator vector
Return a task with this vector accessible as task.features
"""
target_node = self.population[target]
feature_vec_components = [np.ones(1)] if self.bias else []
for attribute, categories in self._attribute_categories:
component = np.zeros(len(categories))
target_val = getattr(target_node, attribute)
component[categories.index(target_val)] = 1
feature_vec_components.append(component)
if self.node_index_indicator:
component = np.zeros(len(self.population))
component[target] = 1
feature_vec_components.append(component)
feature_vec = np.hstack(feature_vec_components)
task = Task(target, features=feature_vec)
return task