Commit e703322e authored by Grace Orowo Kagho's avatar Grace Orowo Kagho
Browse files

add weekend specific work locations

parent a0c3bd0a
import geopandas as gpd
import numpy as np
import pandas as pd
import shapely.geometry as geo
from sklearn.neighbors import KDTree
from synthesis.population.spatial.primary.weekend.components import CustomDistanceSampler, CustomDiscretizationSolver
from synthesis.population.spatial.primary.weekend.problems import find_assignment_problems
from synthesis.population.spatial.secondary.rda import AssignmentSolver, DiscretizationErrorObjective, \
GravityChainSolver
def configure(context):
context.stage("synthesis.population.trips")
context.stage("synthesis.population.sampled")
context.stage("synthesis.population.spatial.home.locations")
context.stage("synthesis.population.spatial.primary.weekend.distance_distributions")
context.stage("synthesis.population.destinations")
context.config("random_seed")
context.config("threads")
context.config("output_path")
def prepare_locations(context):
# Load persons and their primary locations
df_home = context.stage("synthesis.population.spatial.home.locations")
df_home = df_home.rename(columns={"geometry": "home"})
df_locations = context.stage("synthesis.population.sampled")[["person_id", "household_id"]]
df_locations = pd.merge(df_locations, df_home[["household_id", "home"]], how="left", on="household_id")
return df_locations[["person_id", "home"]].sort_values(by="person_id")
def prepare_destinations(context):
df_destinations = context.stage("synthesis.population.destinations")
M = np.max(df_destinations["destination_id"].values.tolist()) + 1
data = {}
identifiers = df_destinations["destination_id"].values
locations = np.vstack(df_destinations["geometry"].apply(lambda x: np.array([x.x, x.y])).values)
for purpose in ("work", "education"):
f = df_destinations["offers_%s" % purpose].values
data[purpose] = dict(
identifiers=identifiers[f],
locations=locations[f]
)
print(purpose, len(identifiers[f]))
return data
def resample_cdf(cdf, factor):
if factor >= 0.0:
cdf = cdf * (1.0 + factor * np.arange(1, len(cdf) + 1) / len(cdf))
else:
cdf = cdf * (1.0 + abs(factor) - abs(factor) * np.arange(1, len(cdf) + 1) / len(cdf))
cdf /= cdf[-1]
return cdf
def resample_distributions(distributions, factors):
for mode, mode_distributions in distributions.items():
for distribution in mode_distributions["distributions"]:
distribution["cdf"] = resample_cdf(distribution["cdf"], factors[mode])
def impute_work_locations(df_trips, destinations, df_primary, distributions):
work_coordinates = destinations["work"]
tree = KDTree(work_coordinates)
home_coordinates = df_primary["home"].values
distance_cdf = distributions["cdf"]
indices, distances = tree.query_radius(home_coordinates, r = )
def execute(context):
# Load trips and primary locations
df_trips = context.stage("synthesis.population.trips").sort_values(by=["person_id", "trip_index"])
df_trips["travel_time"] = df_trips["arrival_time"] - df_trips["departure_time"]
df_primary = prepare_locations(context)
# Prepare data
distance_distributions = context.stage("synthesis.population.spatial.primary.weekend.distance_distributions")
destinations = prepare_destinations(context)
# Resampling for calibration
resample_distributions(distance_distributions, dict(
car=0.8, car_passenger=1.0, pt=1.0, bike=0.0, walk=0.0
))
# Segment into subsamples
processes = context.config("threads")
unique_person_ids = df_trips["person_id"].unique()
number_of_persons = len(unique_person_ids)
unique_person_ids = np.array_split(unique_person_ids, processes)
random = np.random.RandomState(context.config("random_seed"))
random_seeds = random.randint(10000, size=processes)
# Create batch problems for parallelization
batches = []
for index in range(processes):
batches.append((
df_trips[df_trips["person_id"].isin(unique_person_ids[index])],
df_primary[df_primary["person_id"].isin(unique_person_ids[index])],
random_seeds[index]
))
# Run algorithm in parallel
with context.progress(label="Assigning locations to persons", total=number_of_persons):
with context.parallel(processes=processes, data=dict(
distance_distributions=distance_distributions,
destinations=destinations
)) as parallel:
df_locations, df_convergence = [], []
for df_locations_item, df_convergence_item in parallel.imap_unordered(process, batches):
df_locations.append(df_locations_item)
df_convergence.append(df_convergence_item)
df_locations = pd.concat(df_locations).sort_values(by=["person_id", "trip_index"])
df_convergence = pd.concat(df_convergence)
print("Success rate:", df_convergence["valid"].mean())
return df_locations, df_convergence
def process(context, arguments):
df_trips, df_primary, random_seed = arguments
# Set up RNG
random = np.random.RandomState(context.config("random_seed"))
# Set up distance sampler
distance_distributions = context.data("distance_distributions")
distance_sampler = CustomDistanceSampler(
maximum_iterations=1000,
random=random,
distributions=distance_distributions)
# Set up relaxation solver; currently, we do not consider tail problems.
relaxation_solver = GravityChainSolver(
random=random, eps=10.0, lateral_deviation=10.0, alpha=0.1
)
# Set up discretization solver
destinations = context.data("destinations")
discretization_solver = CustomDiscretizationSolver(destinations)
# Set up assignment solver
thresholds = dict(
car=200.0, car_passenger=200.0, pt=200.0,
bike=100.0, walk=100.0
)
assignment_objective = DiscretizationErrorObjective(thresholds=thresholds)
assignment_solver = AssignmentSolver(
distance_sampler=distance_sampler,
relaxation_solver=relaxation_solver,
discretization_solver=discretization_solver,
objective=assignment_objective,
maximum_iterations=20
)
df_locations = []
df_convergence = []
last_person_id = None
for problem in find_assignment_problems(df_trips, df_primary):
result = assignment_solver.solve(problem)
starting_trip_index = problem["trip_index"]
for index, (identifier, location) in enumerate(
zip(result["discretization"]["identifiers"], result["discretization"]["locations"])):
df_locations.append((
problem["person_id"], starting_trip_index + index, identifier, geo.Point(location)
))
df_convergence.append((
result["valid"], problem["size"]
))
if problem["person_id"] != last_person_id:
last_person_id = problem["person_id"]
context.progress.update()
df_locations = pd.DataFrame.from_records(df_locations,
columns=["person_id", "trip_index", "destination_id", "geometry"])
df_locations = gpd.GeoDataFrame(df_locations, crs=dict(init="epsg:2154"))
df_convergence = pd.DataFrame.from_records(df_convergence, columns=["valid", "size"])
return df_locations, df_convergence
......@@ -37,14 +37,27 @@ def execute(context):
# Prepare data
df_persons = context.stage("data.microcensus.persons")[["person_id", "person_weight"]].rename(
columns={"person_weight": "weight"})
#filter out only persons engaged in weekend trips
#the assumption here is that saturday and sunday have the same distance distribution for work or education trips
#which is different from the weekday. should confirm this
df_persons = df_persons[df_persons["weekend"]]
df_trips = context.stage("data.microcensus.trips")[["person_id", "trip_id", "mode", "crowfly_distance",
"departure_time", "arrival_time", "purpose"]]
df_trips = pd.merge(df_trips, df_persons[["person_id", "weight"]], on="person_id")
#calculate travel_time
df_trips["travel_time"] = df_trips["arrival_time"] - df_trips["departure_time"]
df_trips = df_trips[df_trips["travel_time"] > 0.0]
#keep only trips with distances greater than zero. Need to confirm why this is, just following old conventions
df_trips = df_trips[df_trips["crowfly_distance"] > 0.0]
#separate trip purpose into origin and destination
df_trips["following_purpose"] = df_trips["purpose"]
df_trips["preceding_purpose"] = df_trips["purpose"].shift(1)
df_trips.loc[df_trips["trip_id"] == 1, "preceding_purpose"] = "home"
......@@ -55,38 +68,64 @@ def execute(context):
& df_trips["following_purpose"].isin(primary_activities))]
# Rename columns
distance_column = "crowfly_distance" if "crowfly_distance" in df_trips else "network_distance"
df = df_trips[["mode", "travel_time", distance_column, "weight"]].rename(columns={distance_column: "distance"})
#distance_column = "crowfly_distance" if "crowfly_distance" in df_trips else "network_distance"
#df = df_trips[["travel_time", distance_column, "weight"]].rename(columns={distance_column: "distance"})
df = df_trips[["travel_time", "crowfly_distance", "weight"]].rename(columns={"crowfly_distance" : "distance"})
# Calculate distributions
modes = df["mode"].unique()
#the location assignment uses the distributions based on the mode type for the primary purpose.
# Here we do not consider mode and as a result we do not consider the travel time either
#modes = df["mode"].unique()
bin_size = 200
bin_size = 200 #ToDo need to find out why this value has been specified
distributions = {}
for mode in modes:
# First calculate bounds by unique values
f_mode = df["mode"] == mode
bounds = calculate_bounds(df[f_mode]["travel_time"].values, bin_size)
distributions[mode] = dict(bounds=np.array(bounds), distributions=[])
# Second, calculate distribution per band
for lower_bound, upper_bound in zip([-np.inf] + bounds[:-1], bounds):
f_bound = (df["travel_time"] > lower_bound) & (df["travel_time"] <= upper_bound)
# Set up distribution
values = df[f_mode & f_bound]["distance"].values
weights = df[f_mode & f_bound]["weight"].values
sorter = np.argsort(values)
values = values[sorter]
weights = weights[sorter]
cdf = np.cumsum(weights)
cdf /= cdf[-1]
#for mode in modes:
# First calculate bounds by unique values
#f_mode = df["mode"] == mode
#bounds = calculate_bounds(df[f_mode]["travel_time"].values, bin_size)
# distributions[mode] = dict(bounds=np.array(bounds), distributions=[])
# Write distribution
distributions[mode]["distributions"].append(dict(cdf=cdf, values=values, weights=weights))
# Second, calculate distribution per band
# for lower_bound, upper_bound in zip([-np.inf] + bounds[:-1], bounds):
# f_bound = (df["travel_time"] > lower_bound) & (df["travel_time"] <= upper_bound)
#
# # Set up distribution
# values = df[f_mode & f_bound]["distance"].values
# weights = df[f_mode & f_bound]["weight"].values
#
# sorter = np.argsort(values)
# values = values[sorter]
# weights = weights[sorter]
#
# cdf = np.cumsum(weights)
# cdf /= cdf[-1]
#
# # Write distribution
# distributions[mode]["distributions"].append(dict(cdf=cdf, values=values, weights=weights))
#create travel time bins or bands
bounds = calculate_bounds(df["travel_time"].values, bin_size)
# calculate distribution per band
for lower_bound, upper_bound in zip([-np.inf] + bounds[:-1], bounds):
f_bound = (df["travel_time"] > lower_bound) & (df["travel_time"] <= upper_bound)
# Set up distribution
values = df[f_bound]["distance"].values
weights = df[f_bound]["weight"].values
sorter = np.argsort(values)
values = values[sorter]
weights = weights[sorter]
cdf = np.cumsum(weights)
cdf /= cdf[-1]
# Write distribution
distributions[mode]["distributions"].append(dict(cdf=cdf, values=values, weights=weights))
return distributions
import geopandas as gpd
import numpy as np
import pandas as pd
import shapely.geometry as geo
from sklearn.neighbors import KDTree
import data.spatial.utils as spatial_utils
from synthesis.population.spatial.primary.weekend.components import CustomDistanceSampler, CustomDiscretizationSolver
from synthesis.population.spatial.primary.weekend.problems import find_assignment_problems
from synthesis.population.spatial.secondary.rda import AssignmentSolver, DiscretizationErrorObjective, \
GravityChainSolver
def configure(context):
context.stage("synthesis.population.trips")
context.stage("synthesis.population.sampled")
context.stage("synthesis.population.spatial.home.locations")
context.stage("synthesis.population.spatial.primary.weekend.distance_distributions")
context.stage("synthesis.population.destinations")
context.config("random_seed")
context.config("threads")
context.config("output_path")
def prepare_home_locations(context):
# Load persons and their primary locations
df_home = context.stage("synthesis.population.spatial.home.locations")
df_home = df_home.rename(columns={"geometry": "home"})
df_locations = context.stage("synthesis.population.sampled")[["person_id", "household_id"]]
df_locations = pd.merge(df_locations, df_home[["household_id", "home"]], how="left", on="household_id")
return df_locations[["person_id", "home"]].sort_values(by="person_id")
def prepare_destinations(context):
df_destinations = context.stage("synthesis.population.destinations")
M = np.max(df_destinations["destination_id"].values.tolist()) + 1
data = {}
identifiers = df_destinations["destination_id"].values
locations = np.vstack(df_destinations["geometry"].apply(lambda x: np.array([x.x, x.y])).values)
no_employees = df_destinations["number_employees"].values
for purpose in ("work", "education"):
f = df_destinations["offers_%s" % purpose].values
data[purpose] = dict(
identifiers=identifiers[f],
locations=locations[f],
no_employees=no_employees[f]
)
print("Number of statent facilities for: ", purpose, len(identifiers[f]))
return data, df_destinations
def prepare_radius_from_cdf(cdf, midpoint_bins, random_values):
#random_values are random values generated matching the length of the synthetic population which will be
# used to sample radius randomly to be used for sampling distances for each person
#here we are sampling locations for each person not for each trip
value_bins = np.searchsorted(cdf,random_values)
radius_from_cdf = midpoint_bins[value_bins]
return radius_from_cdf
def find_locations(home_coordinates, destination_coordinates, radius):
tree = KDTree(destination_coordinates)
indices, distances = tree.query_radius(home_coordinates, r = radius, return_distance = True, sort_results=True)
#when no facility is found
for i in range(len(indices)):
l = indices[i]
if len(l) == 0:
dist, ind = tree.query(np.array(home_coordinates[i]).reshape(1,-1), 2, return_distance = True, sort_results=True)
fac = ind[0][1]
indices[i] = [fac]
distances[i] = [dist[0][1]]
return indices, distances
def impute_work_locations(context, distributions, destinations, df_syn_persons_work, df_destinations):
#prepare the distances used for sampling based on the cdf (this is the radius variable)
cdf = distributions["work"]["cdf"]
midpoint_bins = distributions["work"["midpoint_bins"]]
random_values = len(df_syn_persons_work)
radius = prepare_radius_from_cdf(cdf, midpoint_bins, random_values)
#prepare the home and destination coordinates
destination_coordinates = destinations["work"]["locations"]
home_coordinates = np.vstack([df_syn_persons_work["home_x"], df_syn_persons_work["home_y"]]).T
indices, distances = find_locations(home_coordinates, destination_coordinates, radius)
# Select the last index and distance for each person
discrete_indices = [l[-1] for l in indices]
discrete_distances = [d[-1] for d in distances]
#ToDo choose location from indices using weights based on number of employees identified in the locations
#use number of employees attributes in destinations
discrete_indices = []
discrete_distances = []
for ind in indices:
weights = destinations["work"]["no_employees"][ind][0]
weights /= np.sum(weights)
selector = np.random.choice(ind.len, p=weights)
index = np.choose(selector, ind.T)
discrete_indices.append(destinations["work"]["identifiers"][index])
discrete_distances.append(destinations["work"]["locations"][index])
df_candidates = df_destinations[df_destinations["offers_work"]].copy()
df_work_persons = df_syn_persons_work.copy()
df_work_persons["work_x"] = df_candidates.iloc[discrete_indices]["x"].values
df_work_persons["work_y"] = df_candidates.iloc[discrete_indices]["y"].values
df_work_persons["destination_id"] = df_candidates.iloc[discrete_indices]["destination_id"].values
df_work_persons = df_work_persons[["person_id",
"education_x", "education_y",
"education_location_id"]].rename({"education_location_id": "destination_id",
"education_x": "x",
"education_y": "y"},
axis=1)
df_work_persons = spatial_utils.to_gpd(context, df_work_persons, coord_type="work")
return df_work_persons[["person_id", "destination_id", "geometry"]]
def execute(context):
# Prepare data
distance_distributions = context.stage("synthesis.population.spatial.primary.weekend.pry_distance_distributions")
df_home = prepare_home_locations(context)
destinations, df_destinations = prepare_destinations(context)
#prepare synthetic persons for work and education
# Load person information
df_persons = context.stage("synthesis.population.enriched")
df_persons = pd.merge(df_persons, df_home, on="person_id")
#Todo the above df_persons merge is proprably a duplicate, need to sort it out when my brain is sharper than now
# Something to do with the df_homes using the syn.pop.enriched
# Load trips and get persons that do work trips #Todo maybe i could sort this out using mz.commute info?
df_trips = context.stage("synthesis.population.trips").sort_values(by=["person_id", "trip_index"])
df_trips_work = df_trips[(df_trips["preceding_purpose"].isin(["home", "work"])
& df_trips["following_purpose"].isin(["home", "work"]))].copy()
df_syn_persons_work = df_persons[df_persons["person_id"].isin(df_trips_work["person_id"].unique())]
#create work locations for work persons
df_locations = impute_work_locations(context, distance_distributions, destinations, df_syn_persons_work, df_destinations)
return df_locations
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment