__all__ = ['Factor', 'Arm', 'Minimizer']
import collections
import itertools
import random
import statistics
import typing
from .utils import (
_dict_add_inplace,
_dict_div_inplace,
_dict_product_inplace
)
[docs]class Factor:
"""
A prognostic factor with 2 or more levels, that needs to be balanced
between treatment arms.
"""
def __init__(self, name: str,
levels: typing.Iterable[str],
weight: float = 1.0):
"""
:param name: Name/label for the factor
:type name: str
:param levels: List of categorical levels for the factor
:param weight: Optional weight, used if the total imbalance in the
minimization algorithm is a weighted sum that weighs each factor
differently.
:type weight: float
"""
self.name = name
self.levels = tuple(levels)
if not len(self.levels) >= 2:
raise ValueError("Factors must have 2 or more levels")
if not weight > 0:
raise ValueError("Factor weight must be a positive number")
self.weight = weight
def __str__(self):
return "Factor({f.name}, levels={f.levels})".format(f=self)
def __repr__(self):
return str(self)
[docs] def get_random_level(self) -> str:
"""
Return one of the factor's levels at random (all levels have
equal probability).
:return: Name of the chosen level
:rtype: str
"""
return random.choice(self.levels)
[docs] def get_random_level_multiple(self, n) -> typing.List[str]:
"""
Return n random levels, sampled with replacement. Faster than calling
get_random_level multiple times.
:param n: Number of random levels to generate.
:return: List of level names.
"""
return random.choices(self.levels, k=n)
[docs]class Arm:
"""
A treatment arm that participants will be assigned to. Different
arms in the trial may have different allocation ratios, e.g.
1:2:1 for arms A, B and C.
"""
def __init__(self, name: str, allocation_ratio: int = 1):
self.name = name
if not (type(allocation_ratio) == int):
raise ValueError("Allocation ratio must be an integer.")
self.allocation_ratio = allocation_ratio
def __str__(self):
return "Arm({a.name}, allocation_ratio={a.allocation_ratio})".format(
a=self
)
def _get_imbalance_std_dev(arm_x_counts: dict) -> float:
"""
Calculate the imbalance between arms using standard deviation.
"""
return statistics.stdev(arm_x_counts.values())
def _get_imbalance_variance(arm_x_counts: dict) -> float:
"""
Calculate the imbalance between arms using variance.
"""
return statistics.variance(arm_x_counts.values())
def _get_imbalance_range(arm_x_counts: dict) -> float:
"""
Calculate the imbalance between arms using the range.
"""
counts = list(arm_x_counts.values())
return max(counts) - min(counts)
def _get_imbalance_over_max_range(arm_x_counts: dict, max_range: int) -> int:
"""
Calculate whether the imbalance between arms exceeds the
``max_range`` threshold.
:return: 1 if the range exceeds ``max_range``, 0 otherwise.
"""
counts = list(arm_x_counts.values())
count_range = max(counts) - min(counts)
return int(count_range > max_range)
def _get_imbalance_is_largest(arm_x_counts: dict, current_arm: str) -> int:
"""
Calculate whether ``current_arm`` has the greatest count.
:param current_arm: Name of current arm.
:return: 1 if ``current_arm`` has the most participants assigned,
0 otherwise.
"""
max_count = max(arm_x_counts.values())
has_two_counts = len(set(arm_x_counts.values())) == 2
current_arm_is_largest = (
(arm_x_counts[current_arm] == max_count) and
has_two_counts
)
return int(current_arm_is_largest)
def _get_imbalance_marginal_balance(arm_x_counts: dict) -> float:
"""
Marginal balance, as outlined in Han2009.
:return: score between 0 and 1, higher score reflecting higher
total imbalance.
"""
N = len(arm_x_counts)
denom = (N - 1) * sum(arm_x_counts.values())
all_pairs = itertools.combinations(arm_x_counts.keys(), 2)
num = sum(abs(arm_x_counts[arm_i] - arm_x_counts[arm_j])
for arm_i, arm_j in all_pairs)
return num / denom
[docs]class Minimizer:
"""
Given a set of prognostic factors and treatment arms, assigns
participants to arms based on the minimization algorithm
(or purely random assignment for comparison).
"""
D_IMBALANCE_METHODS = {
'standard_deviation': _get_imbalance_std_dev,
'variance': _get_imbalance_variance,
'range': _get_imbalance_range,
'over_max_range': _get_imbalance_over_max_range,
'is_largest': _get_imbalance_is_largest,
'marginal_balance': _get_imbalance_marginal_balance
}
TOTAL_IMBALANCE_METHODS = ['sum', 'weighted_sum']
PROBABILITY_METHODS = ['best_only', 'rank_all', 'pure_random',
'biased_coin']
def __init__(self,
factors: typing.Iterable[Factor],
arms: typing.Iterable[Arm],
d_imbalance_method: str = 'standard_deviation',
total_imbalance_method: str = 'sum',
probability_method: str = 'best_only',
**method_args):
"""
:param factors: List of prognostic factors to be considered during
assignment.
:param arms: List of treatment arms to assign participants
to (possibly with unequal ratios).
:param d_imbalance_method: Function used to score the
amount of imbalance, e.g. standard deviation. Must be one
of the methods in ``Minimizer.D_IMBALANCE_METHODS``.
:param total_imbalance_method: 'sum' (the default), or 'weighted_sum',
depending on whether all factors should be weighted equally.
:param probability_method: Method for assigning probabilities
to the arms, based on the imbalance that would result. See
:ref:`trial_setup` for details.
:param method_args: Additional keyword arguments required by some
imbalance and probability methods.
"""
# TODO: Document what all the additional method_args are.
# Check if we only have a single Factor
if isinstance(factors, Factor):
factors = [factors]
self.factors = list(factors)
self.arms = list(arms)
self._d_imbalance_method = self._check_d_imbalance_method(
d_imbalance_method,
method_args
)
self._total_imbalance_method = self._get_total_imbalance_method(
total_imbalance_method
)
self._probability_method = self._check_probability_method(
probability_method,
method_args
)
self._count_table = self._create_count_table()
self._arm_ratios = self._create_ratios()
@property
def factor_names(self):
return [f.name for f in self.factors]
@property
def factor_weights(self):
return {f.name: f.weight for f in self.factors}
@property
def arm_names(self):
return [a.name for a in self.arms]
[docs] def get_n(self) -> int:
"""
Get the number of arms for the trial.
"""
return len(self.arms)
def _check_d_imbalance_method(self, method: str, method_args: dict):
"""
Check that the supplied imbalance method is valid
(and any required arguments have been supplied).
"""
if method not in self.D_IMBALANCE_METHODS:
raise ValueError(
"Invalid d_imbalance_method. "
"See Minimizer.D_IMBALANCE_METHODS for available methods."
)
if method == 'is_largest':
if self.get_n() > 2:
raise ValueError(
"Can't use 'is_largest' with more than 2 arms."
)
if method == 'over_max_range':
d_max_range = method_args.get('d_max_range')
if d_max_range is None:
raise ValueError(
"If using 'over_max_range', you must supply an"
" additional argument d_max_range")
self._d_max_range = d_max_range
return method
def _check_probability_method(self, method: str, method_args: dict) -> str:
"""
Check that the supplied probability method is valid and
any required arguments have been provided.
"""
valid_methods = self.PROBABILITY_METHODS
if method not in valid_methods:
raise ValueError(
("Probability method must be one of {methods}" +
", {m} given").format(
methods=valid_methods,
m=method
)
)
# Print a warning when unequal allocation ratios are used
# and biased_coin isn't
if method != 'biased_coin':
ratios = list(self._create_ratios().values())
all_equal = all(r == ratios[0] for r in ratios)
if not all_equal:
warn_message = (
("NOTE: 'biased_coin' may give better results than {m}") +
(" when allocation ratios are unequal.").format(
m=method
)
)
print(warn_message)
if method in ('best_only', 'biased_coin'):
# Use midpoint between 1 and 1 / N by default
default_p = (1 + 1 / self.get_n()) / 2
if 'preferred_p' not in method_args:
print("preferred_p argument was not provided. Using default"
" value of {p}".format(p=default_p))
preferred_p = method_args.get('preferred_p', default_p)
p_valid = (
(preferred_p > (1 / self.get_n())) and
(preferred_p <= 1.0)
)
if not p_valid:
raise ValueError(
"preferred_p must be between (1 / N) and 1"
)
self._preferred_p = preferred_p
elif method == 'rank_all':
N = self.get_n()
# Use halfway point between boundaries as a default
default_q = (
((1 / N) + (2 / (N - 1))) / 2
)
q = method_args.get('q', default_q)
if 'q' not in method_args:
print("q argument was not provided. Using default"
" value of {q}".format(q=default_q))
q_valid = (q > (1 / N)) and (q < (2 / (N - 1)))
if not q_valid:
raise ValueError(
"q must be between 1 / N and 2 / (N - 1)"
)
self._q = q
return method
def _get_total_imbalance_method(self, method: str):
"""
Check that the method given is a valid choice.
"""
valid_methods = self.TOTAL_IMBALANCE_METHODS
if method not in valid_methods:
raise ValueError(
"Imbalance method should be one of: " +
str(valid_methods)
)
return method
def _create_ratios(self) -> dict:
"""
Store the allocation ratios for each treatment arm in a dictionary
so we can divide/multiply by them easily.
:return: dict that maps from arm name to allocation ratio
"""
return {a.name: a.allocation_ratio for a in self.arms}
def _create_count_table(self) -> dict:
"""
Set up the dictionary that will count how many participants have been
assigned to each combination of factor levels and treatment arms.
The keys are tuples, with length equal to the number of factor
levels. The values are dictionaries that map from arm name to
the count within that combination of factor levels.
So if the factors are Sex, Age and Disease Severity, an entry
in the count table might be:
count_table[('Male', '40+', 'Mild')] -> {'Arm1': 5, 'Arm2': 4}
This way, each participant is only represented in a single entry
in the count table. Accessing the required counts is also
relatively straightforward, e.g. if Sex is the second factor
in the trial, you can find all keys where factor_key[1] == 'Male'.
To deal better with large numbers of factors and/or levels,
where not all combinations of levels may actually be seen,
we use a defaultdict, and only create the counts for a key
when needed. This gives better performance with large numbers
of possible combinations.
:return: Nested defaultdict. Outer keys are factor level tuples,
keys for the inner dictionaries are arm names, and values
for the inner dictionary are the number of participants assigned
to each arm within that combination of factor levels.
"""
def _create_empty_arms() -> dict:
return {a: 0 for a in self.arm_names}
count_table = collections.defaultdict(_create_empty_arms)
return count_table
[docs] def add_existing_participant(self, factor_levels: dict, arm: str) -> None:
"""
Add a participant who has already been assigned to the trial to
the count table. Modifies the count table in place.
:param factor_levels: A dictionary where the keys are the names
of the factors in the trial, and the values are the factor
levels for the participant.
:param arm: Name of the arm they are assigned to.
"""
self._add_participant_to_arm(factor_levels, arm)
def _add_participant_to_arm(self, factor_levels: dict, arm: str) -> None:
"""
Add a participant with the given factor levels to ``arm``.
Modify the count table inplace.
"""
# Convert factor_levels to ordered tuple that matches the
# keys of the count table
participant_index = tuple(
factor_levels[factor] for factor in self.factor_names
)
self._count_table[participant_index][arm] += 1
@staticmethod
def _get_chosen_arm(arm_probs: dict) -> str:
"""
Return a randomly chosen arm, weighted by the probabilities
in ``arm_probs``
:param arm_probs: dict giving the probability of assignment
for each arm.
"""
choice = random.choices(list(arm_probs.keys()),
weights=list(arm_probs.values()),
k=1)
# random.choices returns a list so grab
# first element
return choice[0]
[docs] def assign_participant(self, factor_levels: dict) -> str:
"""
Assign a new participant to the trial, using the
minimization algorithm. Modifies the count table in place,
and returns the chosen arm.
:param factor_levels: A dictionary that maps from factor
names to participants.
:type factor_levels: dict
:return: Name of chosen arm
:rtype: str
"""
# Special case: avoid calculating imbalances when
# doing pure_random assignment
if self._probability_method == 'pure_random':
arm_probs = self._get_arm_probability_pure_random()
else:
imbalances = self.get_new_total_imbalances(factor_levels)
arm_probs = self.get_arm_probability(imbalances)
arm = self._get_chosen_arm(arm_probs)
self._add_participant_to_arm(factor_levels, arm)
return arm
[docs] def get_assignment_info(self, factor_levels: dict,
do_assignment: bool = False) -> dict:
"""
Alternative to assign_participant(), takes factor levels
for a new participant, chooses an arm, and returns the arm along
with extra details about whether the most-favoured arm was chosen.
By default, the participant is not actually assigned. To
assign the participant at the same time, set
``do_assignment=True``.
:return: dict with keys: ``'arm'``: chosen arm, ``'prob'``:
probability that was assigned to that arm before the selection,
``'most_favoured'``: Whether the chosen arm was the arm
with the highest probability (including if it was tied for
highest probability).
"""
# Special case: avoid calculating imbalances when
# doing pure_random assignment
if self._probability_method == 'pure_random':
arm_probs = self._get_arm_probability_pure_random()
else:
imbalances = self.get_new_total_imbalances(factor_levels)
arm_probs = self.get_arm_probability(imbalances)
arm = self._get_chosen_arm(arm_probs)
if do_assignment:
self._add_participant_to_arm(factor_levels, arm)
return {'arm': arm,
# Probability of the chosen arm
'prob': arm_probs[arm],
'most_favoured': arm_probs[arm] == max(arm_probs.values())}
def _get_arm_probability_pure_random(self):
# Need to account for allocation ratio
total = sum(self._arm_ratios.values())
ps = {a: self._arm_ratios[a] / total
for a in self.arm_names}
return ps
def _get_arm_probability_biased_coin(self, ranked_arms: list):
allocation_order = sorted(
self.arm_names, key=lambda a: self._arm_ratios[a]
)
smallest_arm = allocation_order[0]
preferred_arm = ranked_arms[0]
preferred_num = sum(ratio for arm, ratio in self._arm_ratios.items()
if arm != preferred_arm)
preferred_denom = sum(ratio for arm, ratio in self._arm_ratios.items()
if arm != smallest_arm)
preferred_p = (
1 - (preferred_num / preferred_denom) * (1 - self._preferred_p)
)
arm_probs = [preferred_p]
for other_arm in ranked_arms[1:]:
other_num = self._arm_ratios[other_arm]
other_denom = sum(ratio for arm, ratio in self._arm_ratios.items()
if arm != preferred_arm)
other_p = (other_num / other_denom) * (1 - preferred_p)
arm_probs.append(other_p)
return arm_probs
[docs] def get_arm_probability(
self,
imbalances: dict) -> dict:
"""
Calculate the probability assigned to each arm, based
on the current imbalances and the chosen
probability method.
"""
# Don't need to do any imbalance calculations
# if doing purely random assignment
if self._probability_method == 'pure_random':
return self._get_arm_probability_pure_random()
# All other methods depend on imbalance scores
ranked_arms = self._rank_imbalances(imbalances)
if self._probability_method == 'best_only':
p1 = self._preferred_p
N = len(self.arms)
other_p = (1 - p1) / (N - 1)
ps = [p1] + [other_p] * (N - 1)
elif self._probability_method == 'rank_all':
q = self._q
N = len(self.arms)
num = 2 * (N * q - 1)
denom = N * (N + 1)
ks = range(1, N + 1)
ps = [q - (num / denom) * k
for k in ks]
elif self._probability_method == 'biased_coin':
ps = self._get_arm_probability_biased_coin(ranked_arms)
return dict(zip(ranked_arms, ps))
@staticmethod
def _rank_imbalances(imbalances: dict) -> list:
"""
In the Pocock + Simon method, treatments are ranked according to their
total imbalance (in ascending order). When two treatments tie,
they are ranked in random order.
:param imbalances: A dictionary that maps from arm names to imbalance
scores.
:return: List of treatment names, in ranked order.
"""
def _shuffle_list_slice(lst, start, end) -> None:
"""
Shuffle the elements of lst[start:end] inplace.
"""
lst[start:end] = random.sample(
lst[start:end],
len(lst[start:end])
)
ranked = sorted(
imbalances.keys(),
key=lambda a: imbalances[a]
)
# Break ties randomly by shuffling them, as suggested
# in PS1975
start_index = 0
groups = itertools.groupby(
ranked,
key=lambda a: imbalances[a]
)
for val, group in groups:
group_size = len(list(group))
if group_size > 1:
_shuffle_list_slice(ranked, start_index,
start_index + group_size)
start_index += group_size
return ranked
[docs] def get_new_total_imbalances(
self,
factor_levels: dict) -> \
typing.Dict[str, typing.Union[int, float]]:
"""
Get the total imbalance scores that would result from assigning
a participant with the given factor levels to each arm.
:param factor_levels: Maps from factor names to the participant's
level for each factor.
:type factor_levels: dict
:return: Total imbalance score for each potential arm.
:rtype: dict
"""
d_imbalance_scores = self.get_new_ds(factor_levels)
total_imbalances = {}
for potential_arm in self.arm_names:
current_d_scores = d_imbalance_scores[potential_arm]
if self._total_imbalance_method == 'weighted_sum':
_dict_product_inplace(current_d_scores, self.factor_weights)
total_imbalances[potential_arm] = sum(current_d_scores.values())
return total_imbalances
[docs] def get_all_new_counts(
self,
factor_levels: dict) -> typing.Dict[str, dict]:
"""
For each potential arm that a new participant could be assigned
to, calculate the number of participants that would be in each arm
if the potential arm was chosen, within each factor level that the
new participant belongs to.
:param factor_levels: A dictionary that maps from factor names to
factor levels, giving the participant's factor levels for each
factor used in the trial.
:type factor_levels: dict
:return: A nested dictionary, that maps from (potential arm) ->
(factor name) -> dict of arm counts within that factor
"""
# NOTE: this takes into account the allocation ratios
# (divides each raw count by its ratio)
current_factor_counts = self.get_current_x_counts(factor_levels)
# Potential counts, if assigned to that arm
p_counts = collections.defaultdict(dict)
for potential_treatment in self.arm_names:
for i_factor in self.factor_names:
count_within_factor = self._get_count_after_assignment(
arm_counts=current_factor_counts[i_factor],
arm_name=potential_treatment
)
p_counts[potential_treatment][i_factor] = count_within_factor
return p_counts
[docs] def get_new_ds(self, factor_levels: dict) -> \
typing.Dict[str, dict]:
"""
Calculate the d_ik scores, the imbalance score within each factor,
for each potential arm that a new participant could be assigned to.
:param factor_levels: A dict that maps from (factor name) ->
(new participant's factor level)
:return: A dict that maps from (potential arm) -> (dict of scores
for each factor)
"""
get_imbalance = self.D_IMBALANCE_METHODS[self._d_imbalance_method]
counts_after_assignment = self.get_all_new_counts(factor_levels)
arm_ds = {}
for potential_arm in self.arm_names:
factor_scores = []
for i_factor in self.factor_names:
arm_counts = counts_after_assignment[potential_arm][i_factor]
# Some imbalance methods need special cases, the rest just
# need the counts
if self._d_imbalance_method == 'is_largest':
imbalance_score = get_imbalance(
arm_counts,
current_arm=potential_arm
)
elif self._d_imbalance_method == 'over_max_range':
imbalance_score = get_imbalance(
arm_counts,
max_range=self._d_max_range
)
else:
imbalance_score = get_imbalance(arm_counts)
factor_scores.append(imbalance_score)
# NOTE: this may not be necessary, since factor scores should
# always be in factor order
arm_ds[potential_arm] = dict(zip(self.factor_names, factor_scores))
return arm_ds
[docs] def get_current_x_counts(
self,
factor_levels: dict) -> typing.Dict[str, dict]:
"""
Return the current x_ijk (arm counts within each factor), when
considering a participant with the given ``factor_levels``.
:param factor_levels: A dictionary that maps from factor names to
factor levels, giving the participant's factor levels for each
factor used in the trial.
:type factor_levels: dict
:return:
"""
if not all(f in factor_levels for f in self.factor_names):
raise ValueError("All factors in the trial must be "
"in factor_levels.")
factor_x_counts = {}
for i_factor, j_level in factor_levels.items():
current_index = self.factor_names.index(i_factor)
current_x_ijk = {a: 0 for a in self.arm_names}
for factor_key in self._count_table:
if factor_key[current_index] == j_level:
_dict_add_inplace(current_x_ijk,
self._count_table[factor_key])
# Divide by allocation ratios to get weighted counts
# (has no effect when ratios are all 1)
_dict_div_inplace(current_x_ijk, self._arm_ratios)
factor_x_counts[i_factor] = current_x_ijk
return factor_x_counts
def _get_count_after_assignment(self, arm_counts: dict,
arm_name: str) -> dict:
"""
Take the current x_ijk (arm_counts), and return the count
after assigning to a potential treatment (arm_name).
"""
new_counts = arm_counts.copy()
new_counts[arm_name] += 1
return new_counts
[docs] def reset_counts_to_zero(self):
"""
Reset the counts of assigned participants to zero, i.e.
starting over as if no participants had been assigned.
"""
self._count_table = self._create_count_table()