Commit a042b728 authored by tchervec's avatar tchervec
Browse files

Merge branch '27-correctly-rounding-expansion-factors-after-scaling' into 'develop'

Resolve "Correctly rounding expansion factors after scaling"

See merge request ivt-vpl/populations/ch-zh-synpop!93
parents b4bdfa36 0d7497b0
**3.0.0** **3.0.0**
- Duplicate agents after IPU using Truncate-Replicate-Sample method
- Use WMAPE and WMAE as IPU convergence criteria
- Update to `java 11.0.x` - Update to `java 11.0.x`
- Update environment set-up on servers - Update environment set-up on servers
- Refactor IPU fitting - Refactor IPU fitting
......
...@@ -74,9 +74,11 @@ class FittingProblem: ...@@ -74,9 +74,11 @@ class FittingProblem:
class IPUSolver: class IPUSolver:
def __init__(self, group_tol=1e-3, individual_tol=1e-3, max_iter=2000): def __init__(self, group_rel_tol=1e-3, group_abs_tol=10, ind_rel_tol=1e-3, ind_abs_tol=10, max_iter=2000):
self.group_tol = group_tol self.group_rel_tol = group_rel_tol
self.individual_tol = individual_tol self.group_abs_tol = group_abs_tol
self.ind_rel_tol = ind_rel_tol
self.ind_abs_tol = ind_abs_tol
self.max_iter = max_iter self.max_iter = max_iter
def _group_fit(self, df, group_controls, group_id): def _group_fit(self, df, group_controls, group_id):
...@@ -117,11 +119,12 @@ class IPUSolver: ...@@ -117,11 +119,12 @@ class IPUSolver:
def _is_converged(self, df, group_controls, group_id, individual_controls): def _is_converged(self, df, group_controls, group_id, individual_controls):
if (len(group_controls) == 0): if len(group_controls) == 0:
return True return True
# compute WMAPE at group level # compute WMAPE and WMAE at group level
nominator = 0 nominator_wmape = 0
nominator_wmae = 0
denominator = 0 denominator = 0
for group_control in group_controls: for group_control in group_controls:
...@@ -129,16 +132,19 @@ class IPUSolver: ...@@ -129,16 +132,19 @@ class IPUSolver:
f_group = group_control[1] f_group = group_control[1]
total = np.sum(df[f_group][[group_id, "expansion_factor"]].drop_duplicates(group_id)["expansion_factor"]) total = np.sum(df[f_group][[group_id, "expansion_factor"]].drop_duplicates(group_id)["expansion_factor"])
nominator += np.abs(total - weight) nominator_wmape += np.abs(total - weight)
nominator_wmae += np.abs(total - weight) * np.abs(weight)
denominator += np.abs(weight) denominator += np.abs(weight)
wmape = nominator / denominator wmape = nominator_wmape / denominator
wmae = nominator_wmae / denominator
if wmape > self.group_tol: if wmape > self.group_rel_tol and wmae > self.group_abs_tol:
return False return False
# compute WMAPE at individual level # compute WMAPE and WMAE at individual level
nominator = 0 nominator_wmape = 0
nominator_wmae = 0
denominator = 0 denominator = 0
for individual_control in individual_controls: for individual_control in individual_controls:
...@@ -146,19 +152,19 @@ class IPUSolver: ...@@ -146,19 +152,19 @@ class IPUSolver:
f_individual = individual_control[1] f_individual = individual_control[1]
total = np.sum(df[f_individual]["expansion_factor"]) total = np.sum(df[f_individual]["expansion_factor"])
nominator += np.abs(total - weight) nominator_wmape += np.abs(total - weight)
nominator_wmae += np.abs(total - weight) * np.abs(weight)
denominator += np.abs(weight) denominator += np.abs(weight)
wmape = nominator / denominator wmape = nominator_wmape / denominator
wmae = nominator_wmae / denominator
if wmape > self.individual_tol: if wmape > self.ind_rel_tol and wmae > self.ind_abs_tol:
return False return False
return True return True
def fit(self, args): def fit(self, problem):
index, problem = args
df = problem.df df = problem.df
group_controls = problem.group_controls group_controls = problem.group_controls
...@@ -166,10 +172,16 @@ class IPUSolver: ...@@ -166,10 +172,16 @@ class IPUSolver:
individual_controls = problem.individual_controls individual_controls = problem.individual_controls
for i in range(self.max_iter): for i in range(self.max_iter):
# group fit and check convergence
df = self._group_fit(df=df, group_controls=group_controls, group_id=group_id) df = self._group_fit(df=df, group_controls=group_controls, group_id=group_id)
df = self._individual_fit(df=df, controls=individual_controls) if self._is_converged(df=df, group_controls=group_controls, group_id=group_id,
individual_controls=individual_controls):
return df, True
if self._is_converged(df=df, group_controls=group_controls, group_id=group_id, individual_controls=individual_controls): # individual fit and check convergence
df = self._individual_fit(df=df, controls=individual_controls)
if self._is_converged(df=df, group_controls=group_controls, group_id=group_id,
individual_controls=individual_controls):
return df, True return df, True
return df, False return df, False
...@@ -66,45 +66,28 @@ def execute(context): ...@@ -66,45 +66,28 @@ def execute(context):
problem = FittingProblem(df, group_controls, group_id, individual_controls, individual_id) problem = FittingProblem(df, group_controls, group_id, individual_controls, individual_id)
problems.append(problem) problems.append(problem)
print("Constructed %d IPU fitting problems." % len(problems))
print("Starting IPU.")
# Run IPU algorithm in parallel # Run IPU algorithm in parallel
with context.progress(label="Performing IPU on STATPOP by canton...", total=len(problems)): with context.progress(label="Performing IPU on STATPOP by canton...", total=len(problems)):
with context.parallel(processes=processes) as parallel: with context.parallel(processes=processes) as parallel:
df_results, convergence = [], [] df_households, convergence = [], []
for df_result_item, convergence_item in parallel.imap_unordered(process, enumerate(problems)): for df_household_item, convergence_item in parallel.imap_unordered(process, problems):
df_results.append(df_result_item) df_households.append(df_household_item)
convergence.append(convergence_item) convergence.append(convergence_item)
df_statpop = pd.concat(df_results).drop("household_size_class_projection", axis=1) df_households = pd.concat(df_households)
print("Convergence rate:", np.round(np.mean(convergence), 3)) print("Convergence rate:", np.round(np.mean(convergence), 3))
# TODO: The expansion factors are rounded here by simply taking first the integer part # Generate new unique ids
# as the base value and the remainder as a probability of have an extra household. print("Generating new household ids.")
# An array of random doubles is then generated and compared to these probabilities to decide whether to add
# this remaining household. However, KM used the "Truncate-Replicate-Sample" method in his version. We should
# consider this maybe in the future.
print("Duplicating STATPOP households based on expansion factors obtained by IPU.")
df_household_expansion_factors = df_statpop[["household_id", "expansion_factor"]].drop_duplicates(
"household_id")
probability = (df_household_expansion_factors["expansion_factor"] - np.floor(
df_household_expansion_factors["expansion_factor"])).values
df_household_expansion_factors["expansion_factor"] = np.floor(
df_household_expansion_factors["expansion_factor"])
df_household_expansion_factors["expansion_factor"] += np.random.random(size=(len(probability),)) < probability
del df_statpop["expansion_factor"]
df_statpop = pd.merge(df_statpop, df_household_expansion_factors, on="household_id")
# duplicate households
df_households = df_statpop[["household_id", "expansion_factor"]].drop_duplicates("household_id")
indices = np.repeat(np.arange(df_households.shape[0]),
df_households["expansion_factor"].astype(np.int64).values)
df_households = df_households.iloc[indices]
df_households["household_id_new"] = np.arange(df_households.shape[0]) + 1 df_households["household_id_new"] = np.arange(df_households.shape[0]) + 1
del df_households["expansion_factor"] del df_statpop["household_id"]
# merge duplicated households back into statpop # Merge the new household ids onto statpop by statpop_household_id (i.e. original id)
print("Generating new household ids.") df_statpop = pd.merge(df_statpop, df_households, on="statpop_household_id")
df_statpop = pd.merge(df_statpop, df_households, on="household_id").drop("expansion_factor", axis=1)
df_statpop["household_id"] = df_statpop["household_id_new"] df_statpop["household_id"] = df_statpop["household_id_new"]
del df_statpop["household_id_new"] del df_statpop["household_id_new"]
...@@ -121,9 +104,48 @@ def execute(context): ...@@ -121,9 +104,48 @@ def execute(context):
return df_statpop return df_statpop
def process(context, args): def process(context, problem):
ipu_solver = IPUSolver(group_tol=1e-4, individual_tol=1e-4, max_iter=2000)
result, convergence = ipu_solver.fit(args) # Create the IPU solver
ipu_solver = IPUSolver(group_rel_tol=1e-4, group_abs_tol=1, ind_rel_tol=1e-5, ind_abs_tol=10, max_iter=2000)
# Fit the problem, which results a df with expansion factors and whether the algorithm converged
df_result, convergence = ipu_solver.fit(problem)
df_households = []
# Integerize the results using the "Truncate-Replicate-Sample" method
# We loop through the groups here to get a better fit by household size
# because we know they are mutually exclusive (but this is not general)
for i, group_control in enumerate(problem.group_controls):
group_weight = group_control[0]
group_filter = group_control[1]
# 1) Truncate - here we compute both the integer part (count) and remainder of the weight for each household
df_hh_group = (df_result[group_filter][["household_id", "statpop_household_id", "expansion_factor"]]
.drop_duplicates("household_id"))
weights = df_hh_group["expansion_factor"].values
counts = np.floor(weights).astype(np.int)
remainders = weights - counts
# 2) Replicate - We duplicate the households based on the count
indices = np.repeat(list(df_hh_group.index), counts)
df_replicate = df_hh_group.loc[indices]
# 3) Sample - We sample the required remaining households based on the remainders without replacement
indices = np.random.choice(list(df_hh_group.index), int(np.round(np.sum(weights) - np.sum(counts))),
replace=False, p=(remainders / np.sum(remainders)))
df_sample = df_hh_group.loc[indices]
# We combine the replicated and sampled households
df = pd.concat([df_replicate, df_sample])
# We add them to our list by group
df_households.append(df)
df_households = pd.concat(df_households).drop("expansion_factor", axis=1)
df_households["household_id"] = np.arange(df_households.shape[0]) + 1
context.progress.update() context.progress.update()
return result, convergence # return only duplicated households with new and original ids
return df_households, convergence
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment