Commit 2d137b1f authored by tchervec's avatar tchervec
Browse files

refactored ipu fitting algorithm and population scaling stage

parent f8f43a78
**3.0.0**
- Refactor IPU fitting
- Impute canton id directly from shapefile
- Added config elements for generating flowchart
- Use new pyproj syntax without `init:`
......
# General pipeline settings
working_directory: /home/tchervec/Documents/data/switzerland/cache
flowchart_path: /home/tchervec/Documents/data/switzerland/flowchart.json
dryrun: true
dryrun: false
# Requested stages
run:
# - data.microcensus.trips
# - matsim.run
# - data.statpop.projections.households
# - data.statpop.projections.population
- data.statpop.scaled
# These are configuration options that we use in the pipeline
config:
threads: 4
random_seed: 0
hot_deck_matching_runners: 2
disable_progress_bar: false
java_memory: 10G
......
import numpy as np
import pandas as pd
def add_expansion_factor_column(df):
......@@ -15,67 +14,74 @@ def check_control_has_weight_column(controls):
return controls
class FittingProblem:
def __init__(self, df, group_controls, group_id,
individual_controls=None, individual_id=""):
if individual_controls is None:
individual_controls = []
self.df = add_expansion_factor_column(df)
self.group_controls = check_control_has_weight_column(group_controls)
self.group_id = group_id
self.individual_controls = check_control_has_weight_column(individual_controls)
self.individual_id = individual_id
def compute_filters(fitting_problem):
df = fitting_problem.df
group_controls = []
individual_controls = []
def compute_group_filters(df, group_controls):
# create filters for group level controls
for control in fitting_problem.group_controls:
group_filters = []
for control in group_controls:
for _, row in control.iterrows():
group_control = [row["weight"]]
group_filter = [row["weight"]]
# build filter
f = np.ones(df.shape[0], dtype=np.bool)
for c in list(row.drop("weight").index):
f &= (df[c] == row[c])
group_control.append(f)
group_controls.append(group_control)
group_filter.append(f)
group_filters.append(group_filter)
return group_filters
def compute_individual_filters(df, group_id, individual_controls):
# create filters for individual level controls
for control in fitting_problem.individual_controls:
individual_filters = []
for control in individual_controls:
for _, row in control.iterrows():
individual_control = [row["weight"]]
individual_filter = [row["weight"]]
# build a filter to select all individuals that match current control values
f_individual = np.ones(df.shape[0], dtype=np.bool)
for c in list(row.drop("weight").index):
f_individual &= (df[c] == row[c])
individual_control.append(f_individual)
individual_filter.append(f_individual)
# select group ids corresponding to individuals to rescale
group_ids = list(df[f_individual][fitting_problem.group_id].unique())
f_group = df[fitting_problem.group_id].isin(group_ids)
group_ids = list(df[f_individual][group_id].unique())
f_group = df[group_id].isin(group_ids)
individual_filter.append(f_group)
individual_filters.append(individual_filter)
return individual_filters
individual_control.append(f_group)
individual_controls.append(individual_control)
return group_controls, individual_controls
class FittingProblem:
def __init__(self, df, group_controls, group_id, individual_controls=None, individual_id=""):
if individual_controls is None:
individual_controls = []
self.df = df
self.group_controls = group_controls
self.group_id = group_id
self.individual_controls = individual_controls
self.individual_id = individual_id
class IPUSolver:
def __init__(self, tol_abs=1e-3, tol_rel=1e-3, max_iter=2000):
self.tol_abs = tol_abs
self.tol_rel = tol_rel
self.max_iter = max_iter
def group_fit(df, group_controls, group_id):
def _group_fit(self, df, group_controls, group_id):
for group_control in group_controls:
group_weight = group_control[0]
group_filter = group_control[1]
df = group_adjust(df, group_filter, group_weight, group_id)
df = self._group_adjust(df, group_filter, group_weight, group_id)
return df
def group_adjust(df, group_filter, group_weight, group_id):
@staticmethod
def _group_adjust(df, group_filter, group_weight, group_id):
# rescale expansion factors
total = np.sum(df[group_filter][[group_id, "expansion_factor"]].drop_duplicates(group_id)["expansion_factor"])
r = group_weight / total
......@@ -84,21 +90,17 @@ def group_adjust(df, group_filter, group_weight, group_id):
return df
def individual_fit(df, controls, group_id, algorithm="ipu"):
def _individual_fit(self, df, controls):
for control in controls:
weight = control[0]
individual_filter = control[1]
group_filter = control[2]
df = self._individual_adjust(df, individual_filter, group_filter, weight)
if algorithm is "ipu":
df = individual_adjust_ipu(df, individual_filter, group_filter, weight)
elif algorithm is "ent":
df = individual_adjust_ent(df, control, group_id)
return df
def individual_adjust_ipu(df, f_individual, f_group, weight):
@staticmethod
def _individual_adjust(df, f_individual, f_group, weight):
# compute scaling factor
total = np.sum(df[f_individual]["expansion_factor"])
r = weight / total
......@@ -109,75 +111,29 @@ def individual_adjust_ipu(df, f_individual, f_group, weight):
return df
def individual_adjust_ent(df, group_id, f, weight):
return df
def is_converged(f, r, tol_abs, tol_rel):
if np.all(f * np.abs(1 - 1 / r) < tol_abs) and np.all(np.abs(1 - r) < tol_rel):
print("Expansion factors have converged.")
def _is_converged(self, f, r):
if np.all(f * np.abs(1 - 1 / r) < self.tol_abs) and np.all(np.abs(1 - r) < self.tol_rel):
return True
return False
def fit(self, args):
def parallel_fit(context, args):
index, task = args
fitting_problem, algorithm, tol_abs, tol_rel, maxiter = task
index, problem = args
df = fitting_problem.df
group_controls, individual_controls = compute_filters(fitting_problem)
df = problem.df
group_controls = problem.group_controls
group_id = problem.group_id
individual_controls = problem.individual_controls
with context.progress(total=maxiter, position=index, label="progress #%s" % str(index)) as progress:
for i in range(maxiter):
for i in range(self.max_iter):
df["r_factor"] = 1.0
df = group_fit(df, group_controls, fitting_problem.group_id)
if algorithm == "ipu":
df = individual_fit(df=df, controls=individual_controls,
group_id=fitting_problem.group_id, algorithm="ipu")
elif algorithm == "ent":
df = individual_fit(df=df, controls=fitting_problem.individual_controls,
group_id=fitting_problem.group_id, algorithm="ent")
progress.update()
df = self._group_fit(df=df, group_controls=group_controls, group_id=group_id)
df = self._individual_fit(df=df, controls=individual_controls)
if is_converged(f=df["expansion_factor"], r=df["r_factor"], tol_abs=tol_abs, tol_rel=tol_rel):
if self._is_converged(f=df["expansion_factor"], r=df["r_factor"]):
df = df.drop("r_factor", axis=1)
return df
return df, True
print("Reached max iteration ", maxiter)
df = df.drop("r_factor", axis=1)
return df
def fit(context, fitting_problem, algorithm="ipu", tol_abs=1e-3, tol_rel=1e-3, max_iter=2000, parallelize_on=None):
tasks = []
if parallelize_on is None:
tasks.append((fitting_problem, algorithm, tol_abs, tol_rel, max_iter))
else:
categories = list(fitting_problem.df[parallelize_on].unique())
for category in categories:
sub_df = fitting_problem.df[fitting_problem.df[parallelize_on] == category]
sub_group_controls = []
for group_control in fitting_problem.group_controls:
sub_group_controls.append(group_control[group_control[parallelize_on] == category])
sub_individual_controls = []
for individual_control in fitting_problem.individual_controls:
sub_individual_controls.append(individual_control[individual_control[parallelize_on] == category])
sub_problem = FittingProblem(df=sub_df,
group_controls=sub_group_controls,
group_id=fitting_problem.group_id,
individual_controls=sub_individual_controls,
individual_id=fitting_problem.individual_id)
tasks.append((sub_problem, algorithm, tol_abs, tol_rel, max_iter))
print("Fitting to data to controls...")
with context.parallel() as parallel:
result = parallel.map(parallel_fit, tasks)
return pd.concat(result)
return df, False
......@@ -59,10 +59,12 @@ CANTON_TO_ID_MULTILANGUAGE = {"Zürich": 1,
"Genève": 25,
"Jura": 26}
def configure(context):
context.config("data_path")
context.config("scaling_year")
def execute(context):
data_path = context.config("data_path")
......@@ -72,11 +74,10 @@ def execute(context):
if scaling_year < c.BASE_PROJECTED_YEAR:
# Load csv for historical data
df_households = pd.read_csv(
"%s/projections/households/px-x-0102020000_402.csv" % data_path,
sep=";", encoding="latin1", skiprows=1).rename({
'Kanton (-) / Bezirk (>>) / Gemeinde (......)':"canton_id"
}, axis=1)
df_households = (pd.read_csv("%s/projections/households/px-x-0102020000_402.csv" % data_path,
sep=";", encoding="latin1", skiprows=1)
.rename({'Kanton (-) / Bezirk (>>) / Gemeinde (......)': "canton_id"}, axis=1)
)
# Convert to long format
df_households = df_households.melt(
......@@ -103,7 +104,8 @@ def execute(context):
# TODO: why do we only use five categories?
# Make zero-based with only 5 categories
df_households["household_size"] = np.minimum(5, df_households["household_size"]) - 1
df_households = df_households.groupby(["canton_id", "household_size"]).sum().reset_index().sort_values(["canton_id", "household_size"])
df_households = df_households.groupby(["canton_id", "household_size"]).sum().reset_index().sort_values(
["canton_id", "household_size"])
df_households = df_households.rename({"household_size": "household_size_class_projection"}, axis=1)
df_households = df_households.sort_values(["canton_id", "household_size_class_projection"])
......@@ -112,18 +114,18 @@ def execute(context):
# Load excel for projections
df_households = pd.read_excel(
"%s/projections/households/su-d-01.03.03.03.01.xlsx" % data_path,
header=[0,1], skiprows = 2, nrows = 27, index_col = 0).reset_index().rename({
header=[0, 1], skiprows=2, nrows=27, index_col=0).reset_index().rename({
"index": "canton_id",
"Total": "total",
"1 Person": 1,
"2 Personen": 2,
"3 und mehr Personen": 3
}, axis = 1)
}, axis=1)
# Convert to long format
df_households = df_households.melt(
id_vars = "canton_id", value_vars = [1, 2, 3],
value_name = "weight", var_name = ["household_size", "year"]
id_vars="canton_id", value_vars=[1, 2, 3],
value_name="weight", var_name=["household_size", "year"]
)
# Remove Switzerland total
......@@ -131,7 +133,7 @@ def execute(context):
# Pivot years to columns
df_households = df_households.pivot_table(
index = ["canton_id", "household_size"], columns = ["year"]
index=["canton_id", "household_size"], columns=["year"]
)
# Add new interpolated column
......@@ -139,12 +141,12 @@ def execute(context):
lambda x: max(0, 1e3 * np.dot(
np.polyfit(
[2017, 2045],
[x[("weight"), 2017], x[("weight"), 2045]],
[x["weight", 2017], x["weight", 2045]],
1
),
[scaling_year, 1]
))
, axis = 1)
, axis=1)
# Reformat
df_households = df_households[("weight", scaling_year)].reset_index()
......@@ -152,10 +154,15 @@ def execute(context):
# Make zero-based
df_households["household_size"] -= 1
df_households = df_households.rename({"household_size": "household_size_class_projection"}, axis = 1)
df_households = df_households.rename({"household_size": "household_size_class_projection"}, axis=1)
# Replace cantons
df_households = df_households.replace(CANTON_TO_ID)
# round weights and convert to integer
df_households["weight"] = np.round(df_households["weight"])
df_households["weight"] = df_households["weight"].astype(int)
print(df_households.head())
return df_households
return df_households, scaling_year
......@@ -119,5 +119,10 @@ def execute(context):
df = df[("weight", scaling_year)].reset_index()
df.columns = ["canton_id", "sex", "nationality", "age_class", "weight"]
# round weights and convert to integer
df["weight"] = np.round(df["weight"])
df["weight"] = df["weight"].astype(int)
return df
print(df.head())
return df, scaling_year
......@@ -3,70 +3,127 @@ import pandas as pd
import data.constants as c
from data.statpop.multilevelipf import multilevelipf
from data.statpop.multilevelipf.multilevelipf import FittingProblem, IPUSolver
def configure(context):
context.config("enable_scaling", default=False)
context.config("scaling_year", default=c.BASE_SCALING_YEAR)
context.config("threads")
context.stage("data.statpop.statpop")
context.stage("data.statpop.projections.households")
context.stage("data.statpop.projections.population")
def execute(context):
df_statpop = context.stage("data.statpop.statpop")
if context.config("enable_scaling"):
df_household_controls = context.stage("data.statpop.projections.households")
df_population_controls = context.stage("data.statpop.projections.population")
scaling_year = context.config("scaling_year")
print("Scaling STATPOP to year", scaling_year, "using IPU.")
print(" Number of households in household controls :", df_household_controls["weight"].sum())
processes = context.config("threads")
df_household_controls, hh_year = context.stage("data.statpop.projections.households")
df_population_controls, pop_year = context.stage("data.statpop.projections.population")
print(" Number of households before scaling :", df_statpop["household_id"].unique().shape[0])
print(" Number of persons before scaling :", df_statpop["person_id"].unique().shape[0])
assert hh_year == scaling_year
assert pop_year == scaling_year
print("Number of households in household controls :", df_household_controls["weight"].sum())
print("Number of persons in population controls :", df_population_controls["weight"].sum())
print("Number of households before scaling :", len(df_statpop["household_id"].unique()))
print("Number of persons before scaling :", len(df_statpop["person_id"].unique()))
# we need to add a new household class column with only as many categories as the controls
number_household_classes = len(df_household_controls["household_size_class_projection"].unique())
df_statpop["household_size_class_projection"] = np.minimum(number_household_classes, df_statpop["household_size"]) - 1
# set up fitting problem
problem = multilevelipf.fitting_problem(df_statpop,
group_controls=[df_household_controls], group_id="household_id",
individual_controls=[df_population_controls], individual_id="person_id")
# perform fitting
df_statpop = multilevelipf.fit(None, problem, algorithm="ipu", tol_abs=1e-2, tol_rel=1e-2, max_iter=100,
parallelize_on="canton_id")
del df_statpop["household_size_class_projection"]
df_statpop["household_size_class_projection"] = np.minimum(number_household_classes,
df_statpop["household_size"]) - 1
# create IPU fitting problem by canton
problems = []
canton_ids = list(df_statpop.sort_values("canton_id")["canton_id"].unique())
for canton_id in context.progress(canton_ids, label="Constructing separate IPU fitting problems by canton..."):
# select sub df
df = df_statpop[df_statpop["canton_id"] == canton_id].copy()
df = multilevelipf.add_expansion_factor_column(df)
# get group controls, perform checks and convert to filters
group_controls = [df_household_controls[df_household_controls["canton_id"] == canton_id]]
group_id = "household_id"
assert multilevelipf.check_control_has_weight_column(group_controls)
group_controls = multilevelipf.compute_group_filters(df, group_controls)
# get individual controls, perform checks and convert to filters
individual_controls = [df_population_controls[df_population_controls["canton_id"] == canton_id]]
individual_id = "individual_id"
assert multilevelipf.check_control_has_weight_column(individual_controls)
individual_controls = multilevelipf.compute_individual_filters(df, group_id, individual_controls)
# create fitting problem
problem = FittingProblem(df, group_controls, group_id, individual_controls, individual_id)
problems.append(problem)
# Run IPU algorithm in parallel
with context.progress(label="Performing IPU on STATPOP by canton...", total=len(problems)):
with context.parallel(processes=processes) as parallel:
df_results, convergence = [], []
for df_result_item, convergence_item in parallel.imap_unordered(process, enumerate(problems)):
df_results.append(df_result_item)
convergence.append(convergence_item)
df_statpop = pd.concat(df_results).drop("household_size_class_projection", axis=1)
print("Convergence rate:", np.round(np.mean(convergence), 3))
# TODO: The expansion factors are rounded here by simply taking first the integer part
# as the base value and the remainder as a probability of have an extra household.
# An array of random doubles is then generated and compared to these probabilities to decide whether to add
# this remaining household. However, KM used the "Truncate-Replicate-Sample" method in his version. We should
# consider this maybe in the future.
df_household_expansion_factors = df_statpop[["household_id", "expansion_factor"]].drop_duplicates("household_id")
probability = (df_household_expansion_factors["expansion_factor"] - np.floor(df_household_expansion_factors["expansion_factor"])).values
df_household_expansion_factors["expansion_factor"] = np.floor(df_household_expansion_factors["expansion_factor"])
df_household_expansion_factors["expansion_factor"] += np.random.random(size = (len(probability),)) < probability
print("Duplicating STATPOP households based on expansion factors obtained by IPU.")
df_household_expansion_factors = df_statpop[["household_id", "expansion_factor"]].drop_duplicates(
"household_id")
probability = (df_household_expansion_factors["expansion_factor"] - np.floor(
df_household_expansion_factors["expansion_factor"])).values
df_household_expansion_factors["expansion_factor"] = np.floor(
df_household_expansion_factors["expansion_factor"])
df_household_expansion_factors["expansion_factor"] += np.random.random(size=(len(probability),)) < probability
del df_statpop["expansion_factor"]
df_statpop = pd.merge(df_statpop, df_household_expansion_factors, on="household_id")
# duplicate households
df_households = df_statpop[["household_id", "expansion_factor"]].drop_duplicates("household_id")
indices = np.repeat(np.arange(df_households.shape[0]), df_households["expansion_factor"].astype(np.int64).values)
indices = np.repeat(np.arange(df_households.shape[0]),
df_households["expansion_factor"].astype(np.int64).values)
df_households = df_households.iloc[indices]
df_households["household_id_new"] = np.arange(df_households.shape[0]) + 1
del df_households["expansion_factor"]
# merge duplicated households back into statpop
print("Generating new household ids.")
df_statpop = pd.merge(df_statpop, df_households, on="household_id").drop("expansion_factor", axis=1)
df_statpop["household_id"] = df_statpop["household_id_new"]
del df_statpop["household_id_new"]
# sort by household id and generate new person ids
print("Generating new person ids.")
df_statpop = df_statpop.sort_values(by=["household_id", "person_id"])
df_statpop["person_id"] = np.arange(df_statpop.shape[0]) + 1
print(" Number of households after scaling :", df_statpop["household_id"].unique().shape[0])
print(" Number of persons after scaling :", df_statpop["person_id"].unique().shape[0])
print("Number of households in household controls :", df_household_controls["weight"].sum())
print("Number of persons in population controls :", df_population_controls["weight"].sum())
print("Number of households after scaling :", len(df_statpop["household_id"].unique()))
print("Number of persons after scaling :", len(df_statpop["person_id"].unique()))
return df_statpop
def process(context, args):
ipu_solver = IPUSolver(tol_abs=1e-2, tol_rel=1e-2, max_iter=100)
result, convergence = ipu_solver.fit(args)
context.progress.update()
return result, convergence
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment