Module ranky.ranking
Expand source code
#######################
### RANKING SYSTEMS ###
#######################
import numpy as np
import pandas as pd
from scipy.stats import rankdata
from scipy.optimize import minimize
from scipy.optimize import differential_evolution
from random import random as _random
from tqdm import tqdm
import itertools as it
import ranky as rk
from .metric import centrality
#####################
##### Functions #####
#####################
# Convert to ranking
def rank(m, axis=0, method='average', ascending=False, reverse=False):
""" Replace values by their rank in the column.
By default, higher is better.
TODO: save parameters to add values to an already fitted ranking.
Args:
m: Score matrix.
axis: Candidates axis.
method: 'average', 'min', 'max', 'dense', 'ordinal'
ascending: Ascending or descending order.
reverse: Reverse order.
Returns:
np.ndarray, pd.Series, pd.DataFrame
Ranked preference matrix.
"""
if isinstance(m, list):
m = np.array(m)
if ascending == reverse: # greater is better (descending order)
m = -m # take the opposite to inverse rank
r = np.apply_along_axis(rankdata, axis, m, method=method) # convert values to ranking in all rows or columns
return process_vote(m, r)
def weigher(r, method='hyperbolic'):
""" The weigher function.
Must map nonnegative integers (zero representing the most important element) to a nonnegative weight.
The default method, 'hyperbolic', provides hyperbolic weighing, that is, rank r is mapped to weight 1/(r+1)
Args:
r: Integer value (supposedly a rank) to weight
method: Weighing method. 'hyperbolic'
"""
if method == 'hyperbolic':
return 1 / (r + 1)
else:
raise Exception('Unknown method: {}'.format(method))
def tie(r, threshold=0.1):
""" TODO: merge close values.
"""
return 0
def contains_ties(r):
""" Return True if r contains tied values.
Args:
r: 1D array-like of scores or ranks.
"""
n = len(r)
if n==0:
return False
for i in range(n):
for j in range(1, n):
if i != j:
if r[i] == r[j]:
return True
return False
# remove rows (candidates) or columns (voters)
def bootstrap(m, axis=0, n=None, replace=True, return_holdout=False):
""" Sample with replacement among an axis (and keep the same shape by default).
By convention rows reprensent candidates and columns represent voters.
Args:
axis: Axis concerned by bootstrap.
n: Number of examples to sample. By default it is the size of the matrix among the axis.
replace: Sample with or without replacement. It is not bootstrap if the sampling is done without replacement.
return_holdout: If True, returns a tuple (bootstrap, out-of-bag set).
"""
if n is None:
n = m.shape[axis]
idx = np.random.choice(m.shape[axis], n, replace=replace)
bootstrap = np.take(m, idx, axis=axis)
if return_holdout:
ran = np.arange(m.shape[axis])
holdout_idx = ran[np.array([x not in idx for x in ran])]
holdout = np.take(m, holdout_idx, axis=axis)
return bootstrap, holdout
else:
return bootstrap
def joint_bootstrap(m_list, axis=0, n=None, replace=True):
""" Apply the same bootstrap on all matrices on m_list.
Sample with replacement among an axis (and keep the same shape by default).
By convention rows reprensent candidates and columns represent voters.
Args:
axis: Axis concerned by bootstrap.
n: Number of examples to sample. By default it is the size of the matrix among the axis.
replace: Sample with or without replacement. It is not bootstrap if the sampling is done without replacement.
"""
size = m_list[0].shape[axis]
if n is None:
n = size
idx = np.random.choice(size, n, replace=replace)
return [np.take(m, idx, axis=axis) for m in m_list]
def top_k_method(D, F, k=1, reverse=False):
""" Apply top-k method to select a winner from two rankings D and F (development and final).
Return the index of the winner. The winner is the top of F from the k best candidates of D.
Args:
D: 1D array-like of scores, representing the development phase (or public leaderboard).
F: 1D array-like of scores, representing the final phase (or private leaderboard).
k: number of candidates that access the final phase.
reverse: if True lower is better (by default higher is better).
"""
D, F = to_series(D), to_series(F)
top_k = select_k_best(D, k=k, reverse=reverse)
return rk.select_best(F[top_k], reverse=reverse)
def select_k_best(m, k=1, reverse=False):
""" Select k best candidates from the 1D array m.
Args:
m: 1D array-like of scores.
k: number of best candidates to be returned.
reverse: if True lower is better (by default higher is better).
"""
m = to_series(m)
if k == 0 or k > len(m):
raise Exception('Bad value for K')
return m.sort_values(ascending=reverse).index[:k]
def select_best(m, reverse=False):
""" Return the best candidate from the 1D array m.
Args:
m: 1D array-like of scores.
reverse: if True lower is better (by default higher is better).
"""
return select_k_best(m, k=1, reverse=reverse)[0]
# upsampling
# downsampling
def is_series(m):
return isinstance(m, pd.Series)
def is_dataframe(m):
return isinstance(m, pd.DataFrame)
def to_series(m):
if not isinstance(m, list):
if len(m.shape) == 2 and m.shape[1] == 1: # "column array"
m = m.reshape((m.shape[0]))
if not is_series(m): # cast to pd.Series if needed
m = pd.Series(m)
return m
def process_vote(m, r, axis=1):
""" Keep names if using pd.DataFrame.
Args:
m: original matrix of scores (pd.DataFrame or pd.Series)
r: the ranking (array-like)
"""
if is_dataframe(m):
if len(r.shape) == 1: # Series
if axis==0: # Voting axis
r = pd.Series(r, m.columns) # Participants names
elif axis==1:
r = pd.Series(r, m.index)
elif len(r.shape) == 2: # DataFrame
r = pd.DataFrame(r, index=m.index, columns=m.columns)
elif is_series(m): # From Series to Series
r = pd.Series(r, m.index)
return r
#################################
####### RANKING SYSTEMS #########
#################################
#################################
##### 1. CLASSICAL METHODS #######
#################################
def dictator(m, axis=1):
#def random(m, axis=1): # renamed because of random module
""" Random dictator.
Args:
m: 2D matrix of scores.
axis: axis of judges.
"""
voter = np.random.randint(m.shape[axis]) # select a column number
r = np.take(np.array(m), voter, axis=axis) #m[:, voter]
return process_vote(m, r, axis=axis)
def borda(m, axis=1, method='mean', reverse=False):
""" Borda count.
Args:
m: 2D matrix of scores.
axis: axis of judges.
method: 'mean' or 'median'.
reverse: reverse the ranking.
"""
ranking = rank(m, axis=1-axis)
if reverse:
ranking = rank(-m, axis=1-axis)
if method == 'mean':
r = ranking.mean(axis=axis)
elif method == 'median':
r = ranking.median(axis=axis)
else:
raise(Exception('Unknown method for borda system: {}'.format(method)))
return process_vote(m, r, axis=axis)
def majority(m, axis=1):
""" Majority judgement.
Args:
m: 2D matrix of scores.
axis: axis of judges.
"""
r = np.median(m, axis=axis)
return process_vote(m, r, axis=axis)
def score(m, axis=1):
""" Score/range ranking.
Args:
m: 2D matrix of scores.
axis: axis of judges.
"""
r = np.mean(m, axis=axis)
return process_vote(m, r, axis=axis)
def uninominal(m, axis=1, turns=1):
""" Uninominal.
Args:
m: 2D matrix of scores.
axis: axis of judges.
turns: number of turns.
"""
if turns > 1:
raise(Exception('Uninominal system is currently implemented only in one turn setting.'))
ranking = rank(m, axis=1-axis) # convert to rank
#for turn in range(turns): # TODO replace 1 by turn in next line
r = (ranking == 1).sum(axis=axis) # count number of uninominal vote (first in judgement)
return process_vote(m, r, axis=axis)
def pairwise(m, axis=1, wins=None, return_graph=False, score=False, **kwargs):
""" Pairwise method.
We compute the matrix of scores of all possible pairs of matches between all candidates.
The score of one match is defined by the `wins` function.
Args:
m: 2D matrix of scores (preference matrix).
axis: Judge axis. /!\
wins: Function returning True if a wins against b. `rk.copeland_wins` used by default.
return_graph: If True, returns the 1-1 matches result matrix.
score: If True, produce scores between 0 and 1 by dividing the results by (n - 1)
with n the number of candidates.
"""
if wins is None:
wins = rk.copeland_wins
_m = m
m = np.array(m)
n = m.shape[1-axis]
graph = np.zeros((n, n))
for i in range(n):
for j in range(n):
if i != j:
c1, c2 = np.take(m, i, 1-axis), np.take(m, j, 1-axis)
graph[i][j] = wins(c1, c2, **kwargs)
else:
graph[i][j] = 0 # no comparison with itself
r = np.sum(graph, axis=1) # collect candidates average score against all opponents
r = process_vote(_m, r, axis=axis)
if score:
r = r / (n - 1)
if return_graph:
return r, graph
return r
def copeland(m, axis=1, **kwargs):
""" Copeland's method.
This function is an alias of calling `rk.pairwise` function with `rk.copeland_wins` as the wins function.
Args:
m: 2D matrix of scores.
axis: axis of judges.
**kwargs: Arguments to be passed to `rk.pairwise` function.
"""
return pairwise(m, axis=axis, wins=rk.copeland_wins, **kwargs)
def kemeny_young(m, axis=1, **kwargs):
""" Kemeny-Young method.
This function is an alias of calling `rk.center` function with Kendall tau as the metric.
Indeed, Kemeny-Young method consists in computing the ranking that is the closest to all judges,
according to Kendall's distance.
Args:
m: 2D matrix of scores.
axis: axis of judges.
**kwargs: arguments to be passed to `rk.center` function.
"""
return center(m, axis=axis, method='kendalltau', **kwargs)
#################################
### 2. COMPUTATIONAL METHODS ####
#################################
# Based on optimization.
def brute_force(m, axis=0, method='swap'):
""" Brute force search.
TODO:
- keep all optimal solutions.
- ties
- docs
"""
best_score = -1
best_r = np.take(m, 0, axis=axis)
for r in tqdm(list(it.permutations(range(m.shape[axis])))): # all possible rankings
score = centrality(m, r, axis=axis, method=method)
if score > best_score:
best_score = score
best_r = r
return best_r
def random_swap(r, n=1, tie=0.1):
""" Swap randomly two values in r.
Used by evolution_strategy function.
Args:
n: number of consecutive changes
tie: probability of tie instead of swap
"""
_r = r.copy()
for _ in range(n):
i1, i2 = np.random.randint(len(r)), np.random.randint(len(r))
if tie!=0 and np.random.randint(1/tie) == 0: # generate tie with some probability
_r[i1] = _r[i2]
else: # swap two values
_r[i1], _r[i2] = _r[i2], _r[i1]
return _r
def evolution_strategy(m, axis=0, mu=10, l=2, epochs=50, n=1, tie=0.1, method='swap', history=False, verbose=False):
""" Use evolution strategy to search the best centrality ranking.
Return the best ranking (and the best score of each generation if needed).
Args:
axis: candidates axis.
mu: population size.
l: mu * l = offspring size.
epochs: number of iterations.
n: number of swaps performed during a single mutation.
tie: probability of performing a tie instead of a swap during mutation process.
method: method used to compute centrality of the ranking.
history: if True, return a tuple (ranking, history).
verbose: if True, plot the learning curve.
"""
r = np.arange(m.shape[axis])
h = []
population = [sorted(r, key=lambda k: _random()) for _ in range(mu)] # mu random ranked ballots
best_ranking = population[0] # initialize best_ranking
for epoch in tqdm(range(epochs)):
offspring = [random_swap(x, n=n, tie=tie) for x in population*l] # random swaps to generate new ranked ballots
offspring.append(best_ranking) # add the previous best to the offspring to avoid losing it if no children beat it
scores = [centrality(m, child, axis=axis, method=method) for child in offspring] # compute fit function
idx_best = np.argsort(scores)[len(scores)-mu:]
population = list(np.array(offspring)[idx_best]) # select the mu best ballots
argmax = idx_best[-1]
best_ranking = offspring[argmax]
h.append(scores[argmax]) # collect best score
r = process_vote(m, best_ranking, axis=1-axis)
if verbose:
show_learning_curve(h)
print('Best centrality score: {}'.format(h[-1]))
if history:
return r, h # return the best ranking and its score
return r
def center(m, axis=1, method='euclidean', verbose=True, **kwargs):
""" Find the geometric median or 1-center.
Solve the metric facility location problem.
Find the ranking maximizing the centrality.
Also called optimal rank aggregation.
Args:
axis: judges axis.
method: distance or correlation method used as metric.
verbose: show optimization termination message.
**kwargs: arguments for `scipy.differential_evolution` function.
"""
# Finds the global minimum of a multivariate function.
# Differential Evolution is stochastic in nature (does not use gradient methods) to find the minimium, and can search large areas of candidate space,
# but often requires larger numbers of function evaluations than conventional gradient based techniques.
# The algorithm is due to Storn and Price [R150].
m_np = np.array(m)
bounds = [(m_np.min(), m_np.max()) for _ in range(m_np.shape[1-axis])]
res = differential_evolution(rk.mean_distance, bounds, (m_np, 1-axis, method), disp=False, **kwargs) # from scipy.optimize
if verbose:
print(res.message)
r = res.x
return process_vote(m, r, axis=axis)
##########################
##### CONSENSUS ##########
##########################
def consensus(m, axis=0):
""" Strict consensus between ranked ballots.
"""
m_arr = np.array(m)
if axis==0:
m_arr = m_arr.T
r = np.all(m_arr == np.take(m_arr, 0, axis=0), axis=0)
return process_vote(m, r, axis=1-axis)
# STATISTICAL TESTS #
# McNemar test
# statistic = (Yes/No - No/Yes)^2 / (Yes/No + No/Yes)
# Friedman test
Functions
def bootstrap(m, axis=0, n=None, replace=True, return_holdout=False)
-
Sample with replacement among an axis (and keep the same shape by default).
By convention rows reprensent candidates and columns represent voters.
Args
axis
- Axis concerned by bootstrap.
n
- Number of examples to sample. By default it is the size of the matrix among the axis.
replace
- Sample with or without replacement. It is not bootstrap if the sampling is done without replacement.
return_holdout
- If True, returns a tuple (bootstrap, out-of-bag set).
Expand source code
def bootstrap(m, axis=0, n=None, replace=True, return_holdout=False): """ Sample with replacement among an axis (and keep the same shape by default). By convention rows reprensent candidates and columns represent voters. Args: axis: Axis concerned by bootstrap. n: Number of examples to sample. By default it is the size of the matrix among the axis. replace: Sample with or without replacement. It is not bootstrap if the sampling is done without replacement. return_holdout: If True, returns a tuple (bootstrap, out-of-bag set). """ if n is None: n = m.shape[axis] idx = np.random.choice(m.shape[axis], n, replace=replace) bootstrap = np.take(m, idx, axis=axis) if return_holdout: ran = np.arange(m.shape[axis]) holdout_idx = ran[np.array([x not in idx for x in ran])] holdout = np.take(m, holdout_idx, axis=axis) return bootstrap, holdout else: return bootstrap
def borda(m, axis=1, method='mean', reverse=False)
-
Borda count.
Args
m
- 2D matrix of scores.
axis
- axis of judges.
method
- 'mean' or 'median'.
reverse
- reverse the ranking.
Expand source code
def borda(m, axis=1, method='mean', reverse=False): """ Borda count. Args: m: 2D matrix of scores. axis: axis of judges. method: 'mean' or 'median'. reverse: reverse the ranking. """ ranking = rank(m, axis=1-axis) if reverse: ranking = rank(-m, axis=1-axis) if method == 'mean': r = ranking.mean(axis=axis) elif method == 'median': r = ranking.median(axis=axis) else: raise(Exception('Unknown method for borda system: {}'.format(method))) return process_vote(m, r, axis=axis)
def brute_force(m, axis=0, method='swap')
-
Brute force search.
Todo
- keep all optimal solutions.
- ties
- docs
Expand source code
def brute_force(m, axis=0, method='swap'): """ Brute force search. TODO: - keep all optimal solutions. - ties - docs """ best_score = -1 best_r = np.take(m, 0, axis=axis) for r in tqdm(list(it.permutations(range(m.shape[axis])))): # all possible rankings score = centrality(m, r, axis=axis, method=method) if score > best_score: best_score = score best_r = r return best_r
def center(m, axis=1, method='euclidean', verbose=True, **kwargs)
-
Find the geometric median or 1-center.
Solve the metric facility location problem. Find the ranking maximizing the centrality. Also called optimal rank aggregation.
Args
axis
- judges axis.
method
- distance or correlation method used as metric.
verbose
- show optimization termination message.
**kwargs
- arguments for
scipy.differential_evolution
function.
Expand source code
def center(m, axis=1, method='euclidean', verbose=True, **kwargs): """ Find the geometric median or 1-center. Solve the metric facility location problem. Find the ranking maximizing the centrality. Also called optimal rank aggregation. Args: axis: judges axis. method: distance or correlation method used as metric. verbose: show optimization termination message. **kwargs: arguments for `scipy.differential_evolution` function. """ # Finds the global minimum of a multivariate function. # Differential Evolution is stochastic in nature (does not use gradient methods) to find the minimium, and can search large areas of candidate space, # but often requires larger numbers of function evaluations than conventional gradient based techniques. # The algorithm is due to Storn and Price [R150]. m_np = np.array(m) bounds = [(m_np.min(), m_np.max()) for _ in range(m_np.shape[1-axis])] res = differential_evolution(rk.mean_distance, bounds, (m_np, 1-axis, method), disp=False, **kwargs) # from scipy.optimize if verbose: print(res.message) r = res.x return process_vote(m, r, axis=axis)
def consensus(m, axis=0)
-
Strict consensus between ranked ballots.
Expand source code
def consensus(m, axis=0): """ Strict consensus between ranked ballots. """ m_arr = np.array(m) if axis==0: m_arr = m_arr.T r = np.all(m_arr == np.take(m_arr, 0, axis=0), axis=0) return process_vote(m, r, axis=1-axis)
def contains_ties(r)
-
Return True if r contains tied values.
Args
r
- 1D array-like of scores or ranks.
Expand source code
def contains_ties(r): """ Return True if r contains tied values. Args: r: 1D array-like of scores or ranks. """ n = len(r) if n==0: return False for i in range(n): for j in range(1, n): if i != j: if r[i] == r[j]: return True return False
def copeland(m, axis=1, **kwargs)
-
Copeland's method.
This function is an alias of calling
rk.pairwise
function withrk.copeland_wins
as the wins function.Args
m
- 2D matrix of scores.
axis
- axis of judges.
**kwargs
- Arguments to be passed to
rk.pairwise
function.
Expand source code
def copeland(m, axis=1, **kwargs): """ Copeland's method. This function is an alias of calling `rk.pairwise` function with `rk.copeland_wins` as the wins function. Args: m: 2D matrix of scores. axis: axis of judges. **kwargs: Arguments to be passed to `rk.pairwise` function. """ return pairwise(m, axis=axis, wins=rk.copeland_wins, **kwargs)
def dictator(m, axis=1)
-
Random dictator.
Args
m
- 2D matrix of scores.
axis
- axis of judges.
Expand source code
def dictator(m, axis=1): #def random(m, axis=1): # renamed because of random module """ Random dictator. Args: m: 2D matrix of scores. axis: axis of judges. """ voter = np.random.randint(m.shape[axis]) # select a column number r = np.take(np.array(m), voter, axis=axis) #m[:, voter] return process_vote(m, r, axis=axis)
def evolution_strategy(m, axis=0, mu=10, l=2, epochs=50, n=1, tie=0.1, method='swap', history=False, verbose=False)
-
Use evolution strategy to search the best centrality ranking.
Return the best ranking (and the best score of each generation if needed).
Args
axis
- candidates axis.
mu
- population size.
l
- mu * l = offspring size.
epochs
- number of iterations.
n
- number of swaps performed during a single mutation.
tie
- probability of performing a tie instead of a swap during mutation process.
method
- method used to compute centrality of the ranking.
history
- if True, return a tuple (ranking, history).
verbose
- if True, plot the learning curve.
Expand source code
def evolution_strategy(m, axis=0, mu=10, l=2, epochs=50, n=1, tie=0.1, method='swap', history=False, verbose=False): """ Use evolution strategy to search the best centrality ranking. Return the best ranking (and the best score of each generation if needed). Args: axis: candidates axis. mu: population size. l: mu * l = offspring size. epochs: number of iterations. n: number of swaps performed during a single mutation. tie: probability of performing a tie instead of a swap during mutation process. method: method used to compute centrality of the ranking. history: if True, return a tuple (ranking, history). verbose: if True, plot the learning curve. """ r = np.arange(m.shape[axis]) h = [] population = [sorted(r, key=lambda k: _random()) for _ in range(mu)] # mu random ranked ballots best_ranking = population[0] # initialize best_ranking for epoch in tqdm(range(epochs)): offspring = [random_swap(x, n=n, tie=tie) for x in population*l] # random swaps to generate new ranked ballots offspring.append(best_ranking) # add the previous best to the offspring to avoid losing it if no children beat it scores = [centrality(m, child, axis=axis, method=method) for child in offspring] # compute fit function idx_best = np.argsort(scores)[len(scores)-mu:] population = list(np.array(offspring)[idx_best]) # select the mu best ballots argmax = idx_best[-1] best_ranking = offspring[argmax] h.append(scores[argmax]) # collect best score r = process_vote(m, best_ranking, axis=1-axis) if verbose: show_learning_curve(h) print('Best centrality score: {}'.format(h[-1])) if history: return r, h # return the best ranking and its score return r
def is_dataframe(m)
-
Expand source code
def is_dataframe(m): return isinstance(m, pd.DataFrame)
def is_series(m)
-
Expand source code
def is_series(m): return isinstance(m, pd.Series)
def joint_bootstrap(m_list, axis=0, n=None, replace=True)
-
Apply the same bootstrap on all matrices on m_list.
Sample with replacement among an axis (and keep the same shape by default). By convention rows reprensent candidates and columns represent voters.
Args
axis
- Axis concerned by bootstrap.
n
- Number of examples to sample. By default it is the size of the matrix among the axis.
replace
- Sample with or without replacement. It is not bootstrap if the sampling is done without replacement.
Expand source code
def joint_bootstrap(m_list, axis=0, n=None, replace=True): """ Apply the same bootstrap on all matrices on m_list. Sample with replacement among an axis (and keep the same shape by default). By convention rows reprensent candidates and columns represent voters. Args: axis: Axis concerned by bootstrap. n: Number of examples to sample. By default it is the size of the matrix among the axis. replace: Sample with or without replacement. It is not bootstrap if the sampling is done without replacement. """ size = m_list[0].shape[axis] if n is None: n = size idx = np.random.choice(size, n, replace=replace) return [np.take(m, idx, axis=axis) for m in m_list]
def kemeny_young(m, axis=1, **kwargs)
-
Kemeny-Young method.
This function is an alias of calling
rk.center
function with Kendall tau as the metric. Indeed, Kemeny-Young method consists in computing the ranking that is the closest to all judges, according to Kendall's distance.Args
m
- 2D matrix of scores.
axis
- axis of judges.
**kwargs
- arguments to be passed to
rk.center
function.
Expand source code
def kemeny_young(m, axis=1, **kwargs): """ Kemeny-Young method. This function is an alias of calling `rk.center` function with Kendall tau as the metric. Indeed, Kemeny-Young method consists in computing the ranking that is the closest to all judges, according to Kendall's distance. Args: m: 2D matrix of scores. axis: axis of judges. **kwargs: arguments to be passed to `rk.center` function. """ return center(m, axis=axis, method='kendalltau', **kwargs)
def majority(m, axis=1)
-
Majority judgement.
Args
m
- 2D matrix of scores.
axis
- axis of judges.
Expand source code
def majority(m, axis=1): """ Majority judgement. Args: m: 2D matrix of scores. axis: axis of judges. """ r = np.median(m, axis=axis) return process_vote(m, r, axis=axis)
def pairwise(m, axis=1, wins=None, return_graph=False, score=False, **kwargs)
-
Pairwise method.
We compute the matrix of scores of all possible pairs of matches between all candidates. The score of one match is defined by the
wins
function.Args
m
- 2D matrix of scores (preference matrix).
axis
- Judge axis. /!
wins: Function returning True if a wins against b.
rk.copeland_wins
used by default. return_graph
- If True, returns the 1-1 matches result matrix.
score
- If True, produce scores between 0 and 1 by dividing the results by (n - 1) with n the number of candidates.
Expand source code
def pairwise(m, axis=1, wins=None, return_graph=False, score=False, **kwargs): """ Pairwise method. We compute the matrix of scores of all possible pairs of matches between all candidates. The score of one match is defined by the `wins` function. Args: m: 2D matrix of scores (preference matrix). axis: Judge axis. /!\ wins: Function returning True if a wins against b. `rk.copeland_wins` used by default. return_graph: If True, returns the 1-1 matches result matrix. score: If True, produce scores between 0 and 1 by dividing the results by (n - 1) with n the number of candidates. """ if wins is None: wins = rk.copeland_wins _m = m m = np.array(m) n = m.shape[1-axis] graph = np.zeros((n, n)) for i in range(n): for j in range(n): if i != j: c1, c2 = np.take(m, i, 1-axis), np.take(m, j, 1-axis) graph[i][j] = wins(c1, c2, **kwargs) else: graph[i][j] = 0 # no comparison with itself r = np.sum(graph, axis=1) # collect candidates average score against all opponents r = process_vote(_m, r, axis=axis) if score: r = r / (n - 1) if return_graph: return r, graph return r
def process_vote(m, r, axis=1)
-
Keep names if using pd.DataFrame.
Args
m
- original matrix of scores (pd.DataFrame or pd.Series)
r
- the ranking (array-like)
Expand source code
def process_vote(m, r, axis=1): """ Keep names if using pd.DataFrame. Args: m: original matrix of scores (pd.DataFrame or pd.Series) r: the ranking (array-like) """ if is_dataframe(m): if len(r.shape) == 1: # Series if axis==0: # Voting axis r = pd.Series(r, m.columns) # Participants names elif axis==1: r = pd.Series(r, m.index) elif len(r.shape) == 2: # DataFrame r = pd.DataFrame(r, index=m.index, columns=m.columns) elif is_series(m): # From Series to Series r = pd.Series(r, m.index) return r
def random_swap(r, n=1, tie=0.1)
-
Swap randomly two values in r.
Used by evolution_strategy function.
Args
n
- number of consecutive changes
tie
- probability of tie instead of swap
Expand source code
def random_swap(r, n=1, tie=0.1): """ Swap randomly two values in r. Used by evolution_strategy function. Args: n: number of consecutive changes tie: probability of tie instead of swap """ _r = r.copy() for _ in range(n): i1, i2 = np.random.randint(len(r)), np.random.randint(len(r)) if tie!=0 and np.random.randint(1/tie) == 0: # generate tie with some probability _r[i1] = _r[i2] else: # swap two values _r[i1], _r[i2] = _r[i2], _r[i1] return _r
def rank(m, axis=0, method='average', ascending=False, reverse=False)
-
Replace values by their rank in the column.
By default, higher is better. TODO: save parameters to add values to an already fitted ranking.
Args
m
- Score matrix.
axis
- Candidates axis.
method
- 'average', 'min', 'max', 'dense', 'ordinal'
ascending
- Ascending or descending order.
reverse
- Reverse order.
Returns
np.ndarray, pd.Series, pd.DataFrame Ranked preference matrix.
Expand source code
def rank(m, axis=0, method='average', ascending=False, reverse=False): """ Replace values by their rank in the column. By default, higher is better. TODO: save parameters to add values to an already fitted ranking. Args: m: Score matrix. axis: Candidates axis. method: 'average', 'min', 'max', 'dense', 'ordinal' ascending: Ascending or descending order. reverse: Reverse order. Returns: np.ndarray, pd.Series, pd.DataFrame Ranked preference matrix. """ if isinstance(m, list): m = np.array(m) if ascending == reverse: # greater is better (descending order) m = -m # take the opposite to inverse rank r = np.apply_along_axis(rankdata, axis, m, method=method) # convert values to ranking in all rows or columns return process_vote(m, r)
def score(m, axis=1)
-
Score/range ranking.
Args
m
- 2D matrix of scores.
axis
- axis of judges.
Expand source code
def score(m, axis=1): """ Score/range ranking. Args: m: 2D matrix of scores. axis: axis of judges. """ r = np.mean(m, axis=axis) return process_vote(m, r, axis=axis)
def select_best(m, reverse=False)
-
Return the best candidate from the 1D array m.
Args
m
- 1D array-like of scores.
reverse
- if True lower is better (by default higher is better).
Expand source code
def select_best(m, reverse=False): """ Return the best candidate from the 1D array m. Args: m: 1D array-like of scores. reverse: if True lower is better (by default higher is better). """ return select_k_best(m, k=1, reverse=reverse)[0]
def select_k_best(m, k=1, reverse=False)
-
Select k best candidates from the 1D array m.
Args
m
- 1D array-like of scores.
k
- number of best candidates to be returned.
reverse
- if True lower is better (by default higher is better).
Expand source code
def select_k_best(m, k=1, reverse=False): """ Select k best candidates from the 1D array m. Args: m: 1D array-like of scores. k: number of best candidates to be returned. reverse: if True lower is better (by default higher is better). """ m = to_series(m) if k == 0 or k > len(m): raise Exception('Bad value for K') return m.sort_values(ascending=reverse).index[:k]
def tie(r, threshold=0.1)
-
TODO: merge close values.
Expand source code
def tie(r, threshold=0.1): """ TODO: merge close values. """ return 0
def to_series(m)
-
Expand source code
def to_series(m): if not isinstance(m, list): if len(m.shape) == 2 and m.shape[1] == 1: # "column array" m = m.reshape((m.shape[0])) if not is_series(m): # cast to pd.Series if needed m = pd.Series(m) return m
def top_k_method(D, F, k=1, reverse=False)
-
Apply top-k method to select a winner from two rankings D and F (development and final).
Return the index of the winner. The winner is the top of F from the k best candidates of D.
Args
D
- 1D array-like of scores, representing the development phase (or public leaderboard).
F
- 1D array-like of scores, representing the final phase (or private leaderboard).
k
- number of candidates that access the final phase.
reverse
- if True lower is better (by default higher is better).
Expand source code
def top_k_method(D, F, k=1, reverse=False): """ Apply top-k method to select a winner from two rankings D and F (development and final). Return the index of the winner. The winner is the top of F from the k best candidates of D. Args: D: 1D array-like of scores, representing the development phase (or public leaderboard). F: 1D array-like of scores, representing the final phase (or private leaderboard). k: number of candidates that access the final phase. reverse: if True lower is better (by default higher is better). """ D, F = to_series(D), to_series(F) top_k = select_k_best(D, k=k, reverse=reverse) return rk.select_best(F[top_k], reverse=reverse)
def uninominal(m, axis=1, turns=1)
-
Uninominal.
Args
m
- 2D matrix of scores.
axis
- axis of judges.
turns
- number of turns.
Expand source code
def uninominal(m, axis=1, turns=1): """ Uninominal. Args: m: 2D matrix of scores. axis: axis of judges. turns: number of turns. """ if turns > 1: raise(Exception('Uninominal system is currently implemented only in one turn setting.')) ranking = rank(m, axis=1-axis) # convert to rank #for turn in range(turns): # TODO replace 1 by turn in next line r = (ranking == 1).sum(axis=axis) # count number of uninominal vote (first in judgement) return process_vote(m, r, axis=axis)
def weigher(r, method='hyperbolic')
-
The weigher function.
Must map nonnegative integers (zero representing the most important element) to a nonnegative weight. The default method, 'hyperbolic', provides hyperbolic weighing, that is, rank r is mapped to weight 1/(r+1)
Args
r
- Integer value (supposedly a rank) to weight
method
- Weighing method. 'hyperbolic'
Expand source code
def weigher(r, method='hyperbolic'): """ The weigher function. Must map nonnegative integers (zero representing the most important element) to a nonnegative weight. The default method, 'hyperbolic', provides hyperbolic weighing, that is, rank r is mapped to weight 1/(r+1) Args: r: Integer value (supposedly a rank) to weight method: Weighing method. 'hyperbolic' """ if method == 'hyperbolic': return 1 / (r + 1) else: raise Exception('Unknown method: {}'.format(method))