Commit f69fb0b5 authored by tchervec's avatar tchervec
Browse files

Merge branch '54-port-pipeline-to-synpp' into develop

parents 7295912e 6c746bbc
import numpy as np
def configure(context, require):
require.stage("data.statpop.scaled")
def configure(context):
context.stage("data.statpop.scaled")
def execute(context):
df = context.stage("data.statpop.scaled")
if "input_downsampling" in context.config:
probability = context.config["input_downsampling"]
probability = context.config("input_downsampling")
print("Downsampling (%f)" % probability)
household_ids = np.unique(df["household_id"])
......
import data.utils
import pandas as pd
import numpy as np
import data.constants as c
def impute(df):
df_head = pd.DataFrame(df)
......
def configure(context, require):
require.config("raw_data_path")
# require.cache = False
def configure(context):
context.config("data_path")
def execute(context):
raw_data_path = context.config["raw_data_path"]
data_path = context.config("data_path")
import lzma as xz
import data.utils
with xz.open("%s/statpop/STATPOP_2012_PHH.csv.xz" % raw_data_path) as f:
with xz.open("%s/statpop/STATPOP_2012_PHH.csv.xz" % data_path) as f:
fields = {
"householdIdNum" : int,
"Plausibel" : int
"householdIdNum": int,
"Plausibel": int
}
renames = {
"householdIdNum" : "household_id",
"Plausibel" : "plausible"
"householdIdNum": "household_id",
"Plausibel": "plausible"
}
return data.utils.read_csv(f, fields, renames, total = 3488739)
return data.utils.read_csv(context, f, fields, renames, total=3488739)
def configure(context, require):
require.config("raw_data_path")
# require.cache = False
def configure(context):
context.config("data_path")
def execute(context):
raw_data_path = context.config["raw_data_path"]
data_path = context.config("data_path")
import lzma as xz
import data.utils
with xz.open("%s/statpop/STATPOP_2012_Link_Pers_HH.csv.xz" % raw_data_path) as f:
with xz.open("%s/statpop/STATPOP_2012_Link_Pers_HH.csv.xz" % data_path) as f:
fields = {
"personPseudoID" : int,
"householdIdNum" : int,
......@@ -21,4 +21,4 @@ def execute(context):
"REPORTINGMUNICIPALITYID" : "municipality_id"
}
return data.utils.read_csv(f, fields, renames, total = 8261094)
return data.utils.read_csv(context, f, fields, renames, total = 8261094)
import numpy as np
import pandas as pd
from tqdm import tqdm
import multiprocessing as mp
def add_expansion_factor_column(df):
if (df.columns.contains("expansion_factor") == False):
if "expansion_factor" not in list(df.columns):
df["expansion_factor"] = 1.0
return df
def check_control_has_weight_column(controls):
for control in controls:
if (control.columns.contains("weight") == False):
if "weight" not in list(control.columns):
raise Exception('Each control dataframe must have a weight column!')
return controls
class fitting_problem:
class FittingProblem:
def __init__(self, df, group_controls, group_id,
individual_controls=[], individual_id=""):
individual_controls=None, individual_id=""):
if individual_controls is None:
individual_controls = []
self.df = add_expansion_factor_column(df)
self.group_controls = check_control_has_weight_column(group_controls)
self.group_id = group_id
self.individual_controls = check_control_has_weight_column(individual_controls)
self.individual_id = individual_id
def compute_filters(fitting_problem):
def compute_filters(fitting_problem):
df = fitting_problem.df
group_controls = []
individual_controls = []
......@@ -32,39 +35,38 @@ def compute_filters(fitting_problem):
# create filters for group level controls
for control in fitting_problem.group_controls:
for _, row in control.iterrows():
group_control = []
group_control.append(row["weight"])
group_control = [row["weight"]]
# build filter
filter = np.ones(df.shape[0], dtype=np.bool)
f = np.ones(df.shape[0], dtype=np.bool)
for c in list(row.drop("weight").index):
filter &= (df[c] == row[c])
f &= (df[c] == row[c])
group_control.append(filter)
group_control.append(f)
group_controls.append(group_control)
# create filters for individual level controls
for control in fitting_problem.individual_controls:
for _, row in control.iterrows():
individual_control = []
individual_control.append(row["weight"])
individual_control = [row["weight"]]
# build a filter to select all individuals that match current control values
individual_filter = np.ones(df.shape[0], dtype=np.bool)
f_individual = np.ones(df.shape[0], dtype=np.bool)
for c in list(row.drop("weight").index):
individual_filter &= (df[c] == row[c])
f_individual &= (df[c] == row[c])
individual_control.append(individual_filter)
individual_control.append(f_individual)
# select group ids corresponding to individuals to rescale
group_ids = list(df[individual_filter][fitting_problem.group_id].unique())
group_filter = df[fitting_problem.group_id].isin(group_ids)
group_ids = list(df[f_individual][fitting_problem.group_id].unique())
f_group = df[fitting_problem.group_id].isin(group_ids)
individual_control.append(group_filter)
individual_control.append(f_group)
individual_controls.append(individual_control)
return group_controls, individual_controls
def group_fit(df, group_controls, group_id):
for group_control in group_controls:
group_weight = group_control[0]
......@@ -72,6 +74,7 @@ def group_fit(df, group_controls, group_id):
df = group_adjust(df, group_filter, group_weight, group_id)
return df
def group_adjust(df, group_filter, group_weight, group_id):
# rescale expansion factors
total = np.sum(df[group_filter][[group_id, "expansion_factor"]].drop_duplicates(group_id)["expansion_factor"])
......@@ -81,59 +84,65 @@ def group_adjust(df, group_filter, group_weight, group_id):
return df
def individual_fit(df, controls, group_id, algorithm="ipu"):
for control in controls:
weight = control[0]
individual_filter = control[1]
group_filter = control[2]
if (algorithm is "ipu"):
if algorithm is "ipu":
df = individual_adjust_ipu(df, individual_filter, group_filter, weight)
elif (algorithm is "ent"):
elif algorithm is "ent":
df = individual_adjust_ent(df, control, group_id)
return df
def individual_adjust_ipu(df, individual_filter, group_filter, weight):
def individual_adjust_ipu(df, f_individual, f_group, weight):
# compute scaling factor
total = np.sum(df[individual_filter]["expansion_factor"])
total = np.sum(df[f_individual]["expansion_factor"])
r = weight / total
# assign to groups
df.loc[group_filter, "r_factor"] = r
df.loc[group_filter, "expansion_factor"] *= r
df.loc[f_group, "r_factor"] = r
df.loc[f_group, "expansion_factor"] *= r
return df
def individual_adjust_ent(df, group_id, filter, weight):
def individual_adjust_ent(df, group_id, f, weight):
return df
def is_converged(f, r, tol_abs, tol_rel):
if((f * np.abs(1 - 1 / r) < tol_abs).all() and (np.abs(1 - r) < tol_rel).all()):
if np.all(f * np.abs(1 - 1 / r) < tol_abs) and np.all(np.abs(1 - r) < tol_rel):
print("Expansion factors have converged.")
return True
return False
def parallel_fit(args):
def parallel_fit(context, args):
index, task = args
fitting_problem, algorithm, tol_abs, tol_rel, maxiter = task
df = fitting_problem.df
group_controls, individual_controls = compute_filters(fitting_problem)
with tqdm(total=maxiter, position=index, desc="progress #%s" % str(index)) as progress:
with context.progress(total=maxiter, position=index, label="progress #%s" % str(index)) as progress:
for i in range(maxiter):
df["r_factor"] = 1.0
df = group_fit(df, group_controls, fitting_problem.group_id)
if (algorithm == "ipu"):
df = individual_fit(df, individual_controls, fitting_problem.group_id, algorithm="ipu")
elif (algorithm == "ent"):
df = individual_fit(df, fitting_problem.individual_controls, fitting_problem.group_id, algorithm="ent")
if algorithm == "ipu":
df = individual_fit(df=df, controls=individual_controls,
group_id=fitting_problem.group_id, algorithm="ipu")
elif algorithm == "ent":
df = individual_fit(df=df, controls=fitting_problem.individual_controls,
group_id=fitting_problem.group_id, algorithm="ent")
progress.update()
if (is_converged(df["expansion_factor"], df["r_factor"], tol_abs, tol_rel)):
if is_converged(f=df["expansion_factor"], r=df["r_factor"], tol_abs=tol_abs, tol_rel=tol_rel):
df = df.drop("r_factor", axis=1)
return df
......@@ -141,34 +150,34 @@ def parallel_fit(args):
df = df.drop("r_factor", axis=1)
return df
def fit(problem, algorithm="ipu", tol_abs=1e-3, tol_rel=1e-3, maxiter=2000, parallelize_on=None):
def fit(context, fitting_problem, algorithm="ipu", tol_abs=1e-3, tol_rel=1e-3, max_iter=2000, parallelize_on=None):
tasks = []
if parallelize_on is None:
tasks.append((problem, algorithm, tol_abs, tol_rel, maxiter))
tasks.append((fitting_problem, algorithm, tol_abs, tol_rel, max_iter))
else:
categories = list(problem.df[parallelize_on].unique())
categories = list(fitting_problem.df[parallelize_on].unique())
for category in categories:
sub_df = problem.df[problem.df[parallelize_on] == category]
sub_df = fitting_problem.df[fitting_problem.df[parallelize_on] == category]
sub_group_controls = []
for group_control in problem.group_controls:
for group_control in fitting_problem.group_controls:
sub_group_controls.append(group_control[group_control[parallelize_on] == category])
sub_individual_controls = []
for individual_control in problem.individual_controls:
for individual_control in fitting_problem.individual_controls:
sub_individual_controls.append(individual_control[individual_control[parallelize_on] == category])
subproblem = fitting_problem(sub_df, sub_group_controls, problem.group_id, sub_individual_controls,
problem.individual_id)
tasks.append((subproblem, algorithm, tol_abs, tol_rel, maxiter))
sub_problem = FittingProblem(df=sub_df,
group_controls=sub_group_controls,
group_id=fitting_problem.group_id,
individual_controls=sub_individual_controls,
individual_id=fitting_problem.individual_id)
tasks.append((sub_problem, algorithm, tol_abs, tol_rel, max_iter))
runners = len(tasks)
print("Fitting to data to controls...")
with mp.Pool(runners) as pool:
frames = pool.map(parallel_fit, enumerate(tasks))
for i in range(runners + 1): print(" ") # Formatting of output
with context.parallel() as parallel:
result = parallel.map(parallel_fit, tasks)
return pd.concat(frames)
\ No newline at end of file
return pd.concat(result)
def configure(context, require):
require.config("raw_data_path")
# require.cache = False
def configure(context):
context.config("data_path")
def execute(context):
raw_data_path = context.config["raw_data_path"]
data_path = context.config("data_path")
import lzma as xz
import data.utils
with xz.open("%s/statpop/STATPOP_2012_Personen.csv.xz" % raw_data_path) as f:
with xz.open("%s/statpop/STATPOP_2012_Personen.csv.xz" % data_path) as f:
fields = {
"personPseudoID" : int,
"SEX" : int,
"AGE" : int,
"MARITALSTATUS" : int,
"NATIONALITYCATEGORY" : int,
"GEOCOORDN" : float,
"GEOCOORDE" : float,
"POPULATIONTYPE" : int,
"TYPEOFRESIDENCE" : int,
"REPORTINGMUNICIPALITYID" : int,
"FEDERALBUILDINGID" : int,
"personPseudoID": int,
"SEX": int,
"AGE": int,
"MARITALSTATUS": int,
"NATIONALITYCATEGORY": int,
"GEOCOORDN": float,
"GEOCOORDE": float,
"POPULATIONTYPE": int,
"TYPEOFRESIDENCE": int,
"REPORTINGMUNICIPALITYID": int,
"FEDERALBUILDINGID": int,
}
renames = {
"personPseudoID" : "person_id",
"SEX" : "sex",
"AGE" : "age",
"MARITALSTATUS" : "marital_status",
"NATIONALITYCATEGORY" : "nationality",
"GEOCOORDN" : "home_y",
"GEOCOORDE" : "home_x",
"POPULATIONTYPE" : "population_type",
"TYPEOFRESIDENCE" : "type_of_residence",
"REPORTINGMUNICIPALITYID" : "municipality_id",
"FEDERALBUILDINGID" : "federal_building_id",
"personPseudoID": "person_id",
"SEX": "sex",
"AGE": "age",
"MARITALSTATUS": "marital_status",
"NATIONALITYCATEGORY": "nationality",
"GEOCOORDN": "home_y",
"GEOCOORDE": "home_x",
"POPULATIONTYPE": "population_type",
"TYPEOFRESIDENCE": "type_of_residence",
"REPORTINGMUNICIPALITYID": "municipality_id",
"FEDERALBUILDINGID": "federal_building_id",
}
return data.utils.read_csv(f, fields, renames, total = 8261094)
return data.utils.read_csv(context, f, fields, renames, total=8261094)
import pandas as pd
import numpy as np
import pandas as pd
import data.constants as c
CANTON_TO_ID = {
......@@ -58,21 +59,21 @@ CANTON_TO_ID_MULTILANGUAGE = {"Zürich": 1,
"Genève": 25,
"Jura": 26}
def configure(context, require):
require.config("raw_data_path")
require.config("scaling_year")
def configure(context):
context.config("data_path")
context.config("scaling_year")
def execute(context):
raw_data_path = context.config["raw_data_path"]
data_path = context.config("data_path")
# Select year in the future to project to
scaling_year = np.max([c.BASE_SCALING_YEAR, context.config["scaling_year"]])
scaling_year = np.max([c.BASE_SCALING_YEAR, context.config("scaling_year")])
if scaling_year < c.BASE_PROJECTED_YEAR:
# Load csv for historical data
df_households = pd.read_csv(
"%s/projections/households/px-x-0102020000_402.csv" % raw_data_path,
"%s/projections/households/px-x-0102020000_402.csv" % data_path,
sep=";", encoding="latin1", skiprows=1).rename({
'Kanton (-) / Bezirk (>>) / Gemeinde (......)':"canton_id"
}, axis=1)
......@@ -110,7 +111,7 @@ def execute(context):
# Load excel for projections
df_households = pd.read_excel(
"%s/projections/households/su-d-01.03.03.03.01.xlsx" % raw_data_path,
"%s/projections/households/su-d-01.03.03.03.01.xlsx" % data_path,
header=[0,1], skiprows = 2, nrows = 27, index_col = 0).reset_index().rename({
"index": "canton_id",
"Total": "total",
......
import pandas as pd
import numpy as np
import pandas as pd
import data.constants as c
CANTON_TO_ID = {"Zürich": 1,
......@@ -29,20 +30,20 @@ CANTON_TO_ID = {"Zürich": 1,
"Genève": 25,
"Jura": 26}
def configure(context, require):
require.config("raw_data_path")
require.config("scaling_year")
def configure(context):
context.config("data_path")
context.config("scaling_year")
def execute(context):
raw_data_path = context.config["raw_data_path"]
data_path = context.config("data_path")
# Select year in the future to project to
scaling_year = np.max([c.BASE_SCALING_YEAR, context.config["scaling_year"]])
scaling_year = np.max([c.BASE_SCALING_YEAR, context.config("scaling_year")])
if scaling_year < c.BASE_PROJECTED_YEAR:
# load excel data
df = pd.read_csv("%s/projections/population/px-x-0102010000_101.csv" % raw_data_path, sep=";",
df = pd.read_csv("%s/projections/population/px-x-0102010000_101.csv" % data_path, sep=";",
encoding="latin1", skiprows=1).rename({
"Kanton (-) / Bezirk (>>) / Gemeinde (......)":"canton_id",
"Jahr":"year",
......@@ -64,7 +65,7 @@ def execute(context):
else:
# load csv projection data
df = pd.read_csv("%s/projections/population/px-x-0104020000_101.csv" % raw_data_path, sep=";",
df = pd.read_csv("%s/projections/population/px-x-0104020000_101.csv" % data_path, sep=";",
encoding="latin1", skiprows=1).rename({
"Kanton": "canton_id",
"Staatsangehörigkeit (Kategorie)":"nationality",
......
import pandas as pd
import numpy as np
from data.statpop.multilevelipf import multilevelipf
import pandas as pd
import data.constants as c
from data.statpop.multilevelipf import multilevelipf
def configure(context, require):
require.config("output_path")
require.config("enable_scaling", default=False)
require.config("scaling_year", default=c.BASE_SCALING_YEAR)
require.stage("data.statpop.statpop")
require.stage("data.statpop.projections.households")
require.stage("data.statpop.projections.population")
def configure(context):
context.config("enable_scaling", default=False)
context.config("scaling_year", default=c.BASE_SCALING_YEAR)
context.stage("data.statpop.statpop")
context.stage("data.statpop.projections.households")
context.stage("data.statpop.projections.population")
def execute(context):
df_statpop = context.stage("data.statpop.statpop")
if context.config["enable_scaling"]:
if context.config("enable_scaling"):
df_household_controls = context.stage("data.statpop.projections.households")
df_population_controls = context.stage("data.statpop.projections.population")
......@@ -33,7 +34,8 @@ def execute(context):
group_controls=[df_household_controls], group_id="household_id",
individual_controls=[df_population_controls], individual_id="person_id")
# perform fitting
df_statpop = multilevelipf.fit(problem, algorithm="ipu", tol_abs=1e-2, tol_rel=1e-2, maxiter=100, parallelize_on="canton_id")
df_statpop = multilevelipf.fit(None, problem, algorithm="ipu", tol_abs=1e-2, tol_rel=1e-2, max_iter=100,
parallelize_on="canton_id")
del df_statpop["household_size_class_projection"]
# TODO: The expansion factors are rounded here by simply taking first the integer part
......
import data.utils
import pandas as pd
import numpy as np
import pandas as pd
import data.constants as c
import data.statpop.head_of_household
import data.spatial.cantons
import data.spatial.municipalities
import data.spatial.zones
import data.utils
import data.spatial.utils
import data.spatial.municipality_types
import data.statpop.density
import data.spatial.cantons
import data.spatial.ovgk
import data.spatial.utils
import data.spatial.zones
import data.statpop.density
import data.statpop.head_of_household
import data.utils
import data.utils
def configure(context):
context.stage("data.statpop.persons")
context.stage("data.statpop.households")
context.stage("data.statpop.link")
context.stage("data.spatial.municipalities")
context.stage("data.spatial.quarters")
context.stage("data.spatial.zones")
context.stage("data.spatial.municipality_types")
context.stage("data.statpop.density")
context.stage("data.spatial.cantons")
context.stage("data.spatial.ovgk")
def configure(context, require):
require.stage("data.statpop.persons")
require.stage("data.statpop.households")
require.stage("data.statpop.link")
require.stage("data.spatial.municipalities")
require.stage("data.spatial.quarters")
require.stage("data.spatial.zones")
require.stage("data.spatial.municipality_types")
require.stage("data.statpop.density")
require.stage("data.spatial.cantons")
require.stage("data.spatial.ovgk")
def execute(context):
df_persons = context.stage("data.statpop.persons")
......@@ -39,12 +42,12 @@ def execute(context):
df_persons = df_persons[df_persons["population_type"] == 1]
# Merge STATPOP persons and households into a list of persons with houeshold attributes
df = pd.merge(df_persons, df_link, on = ("person_id", "municipality_id"))
df = pd.merge(df, df_households, on = "household_id")
df = pd.merge(df_persons, df_link, on=("person_id", "municipality_id"))
df = pd.merge(df, df_households, on="household_id")
# Impute the houeshold size for each STATPOP person
df_size = df.groupby("household_id").size().reset_index(name = "household_size")
df = pd.merge(df, df_size, on = "household_id")
df_size = df.groupby("household_id").size().reset_index(name="household_size")
df = pd.merge(df, df_size, on="household_id")
# Only allow plausible households
df = df[df["plausible"] == 1]
......@@ -56,16 +59,16 @@ def execute(context):
df_filter = df[["household_id", "age"]].groupby("household_id").max().reset_index()
df_filter.loc[:, "all_under_age"] = df_filter["age"] < c.MINIMUM_AGE_PER_HOUSEHOLD
df = pd.merge(df, df_filter[["household_id", "all_under_age"]], on = "household_id")