Source code for abm.analysis

# -*- coding: utf-8 -*-
"""
    abm.analysis
    ~~~~~~~~~~~~

    Some functions for training/analyzing networks
"""
import operator
from abm import learners
import random
import networkx as nx
import pandas as pd
import numpy as np
import seaborn as sns


[docs]def get_shortest_path_likelihood(env, start, end, paths=None): """ Return the probability of following any the shortest-length paths from start to end """ path_log_likelihood = [] path_probas = [] for path in (paths or nx.all_shortest_paths(env.graph, start, end)): task = env._generate_task(end) node = env.population[start] for step_ix in path[1:]: try: softmaxes = learners._exp_over_sumexp(task.features, node.w_container) path_log_likelihood.append(np.log(softmaxes[step_ix])) except AttributeError: path_log_likelihood.append(np.log(1./len(node.adjacencies))) node = env.population[step_ix] path_probas.append(np.exp(sum(path_log_likelihood))) path_log_likelihood = [] return sum(path_probas), len(path)
[docs]def get_dyad_data(env, dyads): """ Create a dictionary of per-dyad information used in performance monitoring """ dyad_data = {} for dyad in dyads: data_dict = {} data_dict.update({'start_' + k: v for k, v in get_attrs(env, dyad[0]).items()}) data_dict.update({'end_' + k: v for k, v in get_attrs(env, dyad[1]).items()}) data_dict['shortest_paths'] = list(nx.all_shortest_paths(env.graph, *dyad)) data_dict['shortest_path_length'] = len(data_dict['shortest_paths'][0]) dyad_data[dyad] = data_dict return dyad_data
[docs]def get_env_likelihood_samples(env, as_df=True, n_tasks=36000, sample_each=400, n_dyads=1000): dyads = get_dyads(env, target_len=n_dyads) dyad_data = get_dyad_data(env, dyads) likelihood_samples = [] env.debug = False env.show = False for i in range(n_tasks): env.initiate_task() if i % sample_each == 0: if as_df: df = path_likelihood_with_dyad_traits(env, dyads, dyad_data) df['time'] = i likelihood_samples.append(df) else: likelihood_samples.append( [get_shortest_path_likelihood(env, *dyad) for dyad in dyads] ) if as_df: return pd.concat(likelihood_samples) else: return likelihood_samples
[docs]def get_attrs(pop, ix): """ Get features from node ix in pop. :param pop: an Environment :param ix: a node label :return: key-value pairs of the node's features :rtype: dict :Example: >>> from abm import analysis, nxpops, io >>> cfg = io.ConfigReader('../setup.json').get_config() >>> pop = nxpops.SoftmaxNxEnvironment(**cfg) >>> analysis.get_attrs(pop, 3) {u'color': u'blue', u'region': u'east'} .. note:: the keys in the returned dict are read from pop.attributes .. seealso:: :func:`get_dyad_data` .. warning:: ix must be a valid node index """ return {key: pop.population[ix][key] for key in pop.attributes}
[docs]def path_likelihood_with_dyad_traits(env, dyads, dyad_data): data = [] for dyad in dyads: li, plen = get_shortest_path_likelihood( env, *dyad, paths=dyad_data[dyad]['shortest_paths'] ) learnt_over_best = learnt_over_shortest_path_len( env, *dyad, shortest_len=dyad_data[dyad]['shortest_path_length'] ) data_dict = {'li': li, 'plen': plen, 'learnt_over_best': learnt_over_best} data_dict.update(dyad_data[dyad]) data.append(data_dict) return pd.DataFrame(data)
[docs]def get_dyads(env, target_len=1000): dyads = [] while len(dyads) < target_len: pair = env._pick_start_end() try: d = nx.shortest_path_length(env.graph, *pair) except nx.NetworkXNoPath: continue if d > 1: dyads.append(tuple(pair)) return dyads
[docs]def learnt_over_shortest_path_len(env, start, end, shortest_len=None): """ Return the ratio of learnt likeliest path to shortest path len """ shortest_len = shortest_len or nx.shortest_path_length(env.graph, start, end) learnt_len = 0 task = env._generate_task(end) node = env.population[start] while node.index != end and learnt_len < 40: learnt_len += 1 try: softmaxes = learners._exp_over_sumexp(task.features, node.w_container) argmax = max(softmaxes.iteritems(), key=operator.itemgetter(1))[0] except AttributeError: argmax = random.choice(node.adjacencies) node = env.population[argmax] return learnt_len / float(shortest_len)
def _group_sample_by_time(samples, key='li'): # make a list of lists, then turn into 2d ndarray return np.array( list(samples.groupby('time').apply(lambda df: df[key].tolist()).values) )
[docs]def segment_learning_df(df, prefixes=['start_', 'end_']): """ Separates dyad learning stats df based on whether dyad members have matching / nonmatching / partially matching attributes :param prefixes: specify the column prefixes for dyad attributes returns {segment_name: dataframe slice} """ attrs = reduce( lambda a, b: a.intersection(b), [set([c[len(p):] for c in df.columns if c.startswith(p)]) for p in prefixes] ) # make boolean vectors of where each attr match/don't match matches = [ reduce(operator.eq, [df[p + attr] for p in prefixes]) for attr in attrs ] return dict( full_mismatch=df[reduce(lambda a, b: ~a & ~b, matches)], full_match=df[reduce(operator.and_, matches)], some_mismatch=df[reduce(operator.xor, matches)] )
[docs]def plot_segment_stats(segments, segment_keys=['full_mismatch', 'full_match', 'some_mismatch'], colors=['red', 'blue', 'green'], measure_key='li'): for seg_key, color in zip(segment_keys, colors): grouped_time_samples = _group_sample_by_time(segments[seg_key], key=measure_key) sns.tsplot(data=grouped_time_samples.T, color=color)
[docs]def plot_learning_df(df, key='li', **kwargs): segments = segment_learning_df(df) segment_keys = [k for k, v in kwargs.items() if v] plot_segment_stats(segments, measure_key=key, segment_keys=segment_keys) return segments