Commit 3af86d5a authored by kaghog's avatar kaghog
Browse files

use location assignment algorithm for work and education for weekend trips

parent d9a77861
import synthesis.population.spatial.secondary.rda as rda
import sklearn.neighbors
import numpy as np
class CustomDistanceSampler(rda.FeasibleDistanceSampler):
def __init__(self, random, distributions, maximum_iterations = 1000):
rda.FeasibleDistanceSampler.__init__(self, random = random, maximum_iterations = maximum_iterations)
self.random = random
self.distributions = distributions
def sample_distances(self, problem):
distances = np.zeros((problem["size"] + 1))
for index, (mode, travel_time) in enumerate(zip(problem["modes"], problem["travel_times"])):
mode_distribution = self.distributions[mode]
bound_index = np.count_nonzero(travel_time > mode_distribution["bounds"])
mode_distribution = mode_distribution["distributions"][bound_index]
distances[index] = mode_distribution["values"][
np.count_nonzero(self.random.random_sample() > mode_distribution["cdf"])
]
return distances
class CustomDiscretizationSolver(rda.DiscretizationSolver):
def __init__(self, data):
self.data = data
self.indices = {}
self.query_size = 5
for purpose, data in self.data.items():
print("Constructing spatial index for %s ..." % purpose)
self.indices[purpose] = sklearn.neighbors.KDTree(data["locations"])
def solve(self, problem, locations):
discretized_locations = []
discretized_identifiers = []
for location, purpose in zip(locations, problem["purposes"]):
index = self.indices[purpose].query(location.reshape(1, -1), return_distance = False)[0][0]
discretized_identifiers.append(self.data[purpose]["identifiers"][index])
discretized_locations.append(self.data[purpose]["locations"][index])
return dict(
valid = True, locations = np.vstack(discretized_locations), identifiers = discretized_identifiers
)
import numpy as np
import pandas as pd
def configure(context):
context.stage("data.microcensus.persons")
context.stage("data.microcensus.trips")
def calculate_bounds(values, bin_size):
values = np.sort(values)
bounds = []
current_count = 0
previous_bound = None
for value in values:
if value == previous_bound:
continue
if current_count < bin_size:
current_count += 1
else:
current_count = 0
bounds.append(value)
previous_bound = value
if len(bounds) > 0:
bounds[-1] = np.inf
else:
bounds.append(np.inf)
return bounds
def execute(context):
# Prepare data
df_persons = context.stage("data.microcensus.persons")[["person_id", "person_weight"]].rename(
columns={"person_weight": "weight"})
df_trips = context.stage("data.microcensus.trips")[["person_id", "trip_id", "mode", "crowfly_distance",
"departure_time", "arrival_time", "purpose"]]
df_trips = pd.merge(df_trips, df_persons[["person_id", "weight"]], on="person_id")
df_trips["travel_time"] = df_trips["arrival_time"] - df_trips["departure_time"]
df_trips = df_trips[df_trips["travel_time"] > 0.0]
df_trips = df_trips[df_trips["crowfly_distance"] > 0.0]
df_trips["following_purpose"] = df_trips["purpose"]
df_trips["preceding_purpose"] = df_trips["purpose"].shift(1)
df_trips.loc[df_trips["trip_id"] == 1, "preceding_purpose"] = "home"
# Filtering for only work and education
primary_activities = ["work", "education"]
df_trips = df_trips[(df_trips["preceding_purpose"].isin(primary_activities)
& df_trips["following_purpose"].isin(primary_activities))]
# Rename columns
distance_column = "crowfly_distance" if "crowfly_distance" in df_trips else "network_distance"
df = df_trips[["mode", "travel_time", distance_column, "weight"]].rename(columns={distance_column: "distance"})
# Calculate distributions
modes = df["mode"].unique()
bin_size = 200
distributions = {}
for mode in modes:
# First calculate bounds by unique values
f_mode = df["mode"] == mode
bounds = calculate_bounds(df[f_mode]["travel_time"].values, bin_size)
distributions[mode] = dict(bounds=np.array(bounds), distributions=[])
# Second, calculate distribution per band
for lower_bound, upper_bound in zip([-np.inf] + bounds[:-1], bounds):
f_bound = (df["travel_time"] > lower_bound) & (df["travel_time"] <= upper_bound)
# Set up distribution
values = df[f_mode & f_bound]["distance"].values
weights = df[f_mode & f_bound]["weight"].values
sorter = np.argsort(values)
values = values[sorter]
weights = weights[sorter]
cdf = np.cumsum(weights)
cdf /= cdf[-1]
# Write distribution
distributions[mode]["distributions"].append(dict(cdf=cdf, values=values, weights=weights))
return distributions
import geopandas as gpd
import numpy as np
import pandas as pd
import shapely.geometry as geo
from synthesis.population.spatial.primary.weekend.components import CustomDistanceSampler, CustomDiscretizationSolver
from synthesis.population.spatial.primary.weekend.problems import find_assignment_problems
from synthesis.population.spatial.secondary.rda import AssignmentSolver, DiscretizationErrorObjective, \
GravityChainSolver
def configure(context):
context.stage("synthesis.population.trips")
context.stage("synthesis.population.sampled")
context.stage("synthesis.population.spatial.home.locations")
context.stage("synthesis.population.spatial.primary.weekend.distance_distributions")
context.stage("synthesis.population.destinations")
context.config("random_seed")
context.config("threads")
context.config("output_path")
def prepare_locations(context):
# Load persons and their primary locations
df_home = context.stage("synthesis.population.spatial.home.locations")
df_home = df_home.rename(columns={"geometry": "home"})
df_locations = context.stage("synthesis.population.sampled")[["person_id", "household_id"]]
df_locations = pd.merge(df_locations, df_home[["household_id", "home"]], how="left", on="household_id")
return df_locations[["person_id", "home"]].sort_values(by="person_id")
def prepare_destinations(context):
df_destinations = context.stage("synthesis.population.destinations")
M = np.max(df_destinations["destination_id"].values.tolist()) + 1
data = {}
identifiers = df_destinations["destination_id"].values
locations = np.vstack(df_destinations["geometry"].apply(lambda x: np.array([x.x, x.y])).values)
for purpose in ("work", "education"):
f = df_destinations["offers_%s" % purpose].values
data[purpose] = dict(
identifiers=identifiers[f],
locations=locations[f]
)
print(purpose, len(identifiers[f]))
return data
def resample_cdf(cdf, factor):
if factor >= 0.0:
cdf = cdf * (1.0 + factor * np.arange(1, len(cdf) + 1) / len(cdf))
else:
cdf = cdf * (1.0 + abs(factor) - abs(factor) * np.arange(1, len(cdf) + 1) / len(cdf))
cdf /= cdf[-1]
return cdf
def resample_distributions(distributions, factors):
for mode, mode_distributions in distributions.items():
for distribution in mode_distributions["distributions"]:
distribution["cdf"] = resample_cdf(distribution["cdf"], factors[mode])
def execute(context):
# Load trips and primary locations
df_trips = context.stage("synthesis.population.trips").sort_values(by=["person_id", "trip_index"])
df_trips["travel_time"] = df_trips["arrival_time"] - df_trips["departure_time"]
df_primary = prepare_locations(context)
# Prepare data
distance_distributions = context.stage("synthesis.population.spatial.primary.weekend.distance_distributions")
destinations = prepare_destinations(context)
# Resampling for calibration
resample_distributions(distance_distributions, dict(
car=0.8, car_passenger=1.0, pt=1.0, bike=0.0, walk=0.0
))
# Segment into subsamples
processes = context.config("threads")
unique_person_ids = df_trips["person_id"].unique()
number_of_persons = len(unique_person_ids)
unique_person_ids = np.array_split(unique_person_ids, processes)
random = np.random.RandomState(context.config("random_seed"))
random_seeds = random.randint(10000, size=processes)
# Create batch problems for parallelization
batches = []
for index in range(processes):
batches.append((
df_trips[df_trips["person_id"].isin(unique_person_ids[index])],
df_primary[df_primary["person_id"].isin(unique_person_ids[index])],
random_seeds[index]
))
# Run algorithm in parallel
with context.progress(label="Assigning locations to persons", total=number_of_persons):
with context.parallel(processes=processes, data=dict(
distance_distributions=distance_distributions,
destinations=destinations
)) as parallel:
df_locations, df_convergence = [], []
for df_locations_item, df_convergence_item in parallel.imap_unordered(process, batches):
df_locations.append(df_locations_item)
df_convergence.append(df_convergence_item)
df_locations = pd.concat(df_locations).sort_values(by=["person_id", "trip_index"])
df_convergence = pd.concat(df_convergence)
print("Success rate:", df_convergence["valid"].mean())
return df_locations, df_convergence
def process(context, arguments):
df_trips, df_primary, random_seed = arguments
# Set up RNG
random = np.random.RandomState(context.config("random_seed"))
# Set up distance sampler
distance_distributions = context.data("distance_distributions")
distance_sampler = CustomDistanceSampler(
maximum_iterations=1000,
random=random,
distributions=distance_distributions)
# Set up relaxation solver; currently, we do not consider tail problems.
relaxation_solver = GravityChainSolver(
random=random, eps=10.0, lateral_deviation=10.0, alpha=0.1
)
# Set up discretization solver
destinations = context.data("destinations")
discretization_solver = CustomDiscretizationSolver(destinations)
# Set up assignment solver
thresholds = dict(
car=200.0, car_passenger=200.0, pt=200.0,
bike=100.0, walk=100.0
)
assignment_objective = DiscretizationErrorObjective(thresholds=thresholds)
assignment_solver = AssignmentSolver(
distance_sampler=distance_sampler,
relaxation_solver=relaxation_solver,
discretization_solver=discretization_solver,
objective=assignment_objective,
maximum_iterations=20
)
df_locations = []
df_convergence = []
last_person_id = None
for problem in find_assignment_problems(df_trips, df_primary):
result = assignment_solver.solve(problem)
starting_trip_index = problem["trip_index"]
for index, (identifier, location) in enumerate(
zip(result["discretization"]["identifiers"], result["discretization"]["locations"])):
df_locations.append((
problem["person_id"], starting_trip_index + index, identifier, geo.Point(location)
))
df_convergence.append((
result["valid"], problem["size"]
))
if problem["person_id"] != last_person_id:
last_person_id = problem["person_id"]
context.progress.update()
df_locations = pd.DataFrame.from_records(df_locations,
columns=["person_id", "trip_index", "destination_id", "geometry"])
df_locations = gpd.GeoDataFrame(df_locations, crs=dict(init="epsg:2154"))
df_convergence = pd.DataFrame.from_records(df_convergence, columns=["valid", "size"])
return df_locations, df_convergence
import numpy as np
FIELDS = ["person_id", "trip_index", "preceding_purpose", "following_purpose", "mode", "travel_time", "activity_duration", "arrival_time"]
FIXED_PURPOSES = ["home"]
def find_bare_assignment_problems(df):
problem = None
for row in df[FIELDS].itertuples(index=False):
person_id, trip_index, preceding_purpose, following_purpose, mode, travel_time, act_dur, arr_time = row
if not problem is None and person_id != problem["person_id"]:
# We switch person, but we're still tracking a problem. This is a tail!
yield problem
problem = None
if problem is None:
# Start a new problem
problem = dict(
person_id=person_id, trip_index=trip_index, purposes=[preceding_purpose],
modes=[], travel_times=[], activity_duration = [], activity_start_time = []
)
problem["purposes"].append(following_purpose)
problem["modes"].append(mode)
problem["travel_times"].append(travel_time)
problem["activity_duration"].append(act_dur)
problem["activity_start_time"].append(arr_time)
if problem["purposes"][-1] in FIXED_PURPOSES:
# The current chain (or initial tail) ends with a fixed activity.
yield problem
problem = None
LOCATION_FIELDS = ["person_id", "home"]
def find_assignment_problems(df, df_locations):
"""
Enriches assignment problems with:
- Locations of the fixed activities
- Size of the problem
- Reduces purposes to the variable ones
"""
location_iterator = df_locations[LOCATION_FIELDS].itertuples(index=False)
current_location = None
for problem in find_bare_assignment_problems(df):
origin_purpose = problem["purposes"][0]
destination_purpose = problem["purposes"][-1]
# Reduce purposes
if origin_purpose in FIXED_PURPOSES and destination_purpose in FIXED_PURPOSES:
problem["purposes"] = problem["purposes"][1:-1]
elif origin_purpose in FIXED_PURPOSES:
problem["purposes"] = problem["purposes"][1:]
elif destination_purpose in FIXED_PURPOSES:
problem["purposes"] = problem["purposes"][:-1]
else:
raise RuntimeError("The presented 'problem' is neither a chain nor a tail")
# Define size
problem["size"] = len(problem["purposes"])
if problem["size"] == 0:
continue # We can skip if there are no variable activities
# Advance location iterator until we arrive at the current problem's person
while current_location is None or current_location[0] != problem["person_id"]:
current_location = next(location_iterator)
# Define origin and destination locations if they have fixed purposes
problem["origin"] = None
problem["destination"] = None
if origin_purpose in FIXED_PURPOSES:
problem["origin"] = current_location[LOCATION_FIELDS.index(origin_purpose)] # Shapely POINT
problem["origin"] = np.array([[problem["origin"].x, problem["origin"].y]])
if destination_purpose in FIXED_PURPOSES:
problem["destination"] = current_location[LOCATION_FIELDS.index(destination_purpose)] # Shapely POINT
problem["destination"] = np.array([[problem["destination"].x, problem["destination"].y]])
yield problem
......@@ -11,67 +11,78 @@ def configure(context):
context.stage("data.spatial.zone_shapes")
context.stage("synthesis.population.spatial.primary.work.zones")
#for weekend
context.config("weekend_scenario", False)
context.stage("synthesis.population.spatial.primary.weekend.locations")
def execute(context):
df = context.stage("synthesis.population.spatial.primary.work.zones")
df_statent = context.stage("data.statent.statent")
is_weekend_scenario = context.config("weekend_scenario")
if is_weekend_scenario:
df, convergence = context.stage("synthesis.population.spatial.primary.weekend.locations")
else:
df = context.stage("synthesis.population.spatial.primary.work.zones")
df_statent = context.stage("data.statent.statent")
df_zones = context.stage("data.spatial.zones")
df_zones["work_zone_id"] = df_zones["zone_id"]
df_zones = context.stage("data.spatial.zones")
df_zones["work_zone_id"] = df_zones["zone_id"]
df_demand = df.groupby("work_zone_id").size().reset_index(name="count")
df_demand = pd.merge(df_demand, df_zones[["work_zone_id", "zone_level"]])
df_demand = df.groupby("work_zone_id").size().reset_index(name="count")
df_demand = pd.merge(df_demand, df_zones[["work_zone_id", "zone_level"]])
# First handle the national commuters
df_national = df_demand[df_demand["zone_level"].isin(("municipality", "quarter"))]
empty_zones = []
# First handle the national commuters
df_national = df_demand[df_demand["zone_level"].isin(("municipality", "quarter"))]
empty_zones = []
for zone_id, count in context.progress(zip(df_national["work_zone_id"], df_national["count"]),
label="Assigning national locations ...", total=len(df_demand)):
indices = np.where(df_statent["zone_id"] == zone_id)[0]
weights = df_statent.iloc[indices]["number_employees"]
weights /= np.sum(weights)
for zone_id, count in context.progress(zip(df_national["work_zone_id"], df_national["count"]),
label="Assigning national locations ...", total=len(df_demand)):
indices = np.where(df_statent["zone_id"] == zone_id)[0]
weights = df_statent.iloc[indices]["number_employees"]
weights /= np.sum(weights)
if len(indices) > 0:
indices = np.repeat(indices, np.random.multinomial(count, weights))
if len(indices) > 0:
indices = np.repeat(indices, np.random.multinomial(count, weights))
f = df["work_zone_id"] == zone_id
df.loc[f, "work_x"] = df_statent.iloc[indices]["x"].values
df.loc[f, "work_y"] = df_statent.iloc[indices]["y"].values
df.loc[f, "work_location_id"] = df_statent.iloc[indices]["enterprise_id"].values
else:
empty_zones.append(zone_id)
f = df["work_zone_id"] == zone_id
df.loc[f, "work_x"] = df_statent.iloc[indices]["x"].values
df.loc[f, "work_y"] = df_statent.iloc[indices]["y"].values
df.loc[f, "work_location_id"] = df_statent.iloc[indices]["enterprise_id"].values
else:
empty_zones.append(zone_id)
print("Found %d zones which do not have any samples in STATENT" % len(empty_zones))
print("Found %d zones which do not have any samples in STATENT" % len(empty_zones))
# There are some empty zones (mainly border zones to Italy, which are under shared administration)
# There, we just sample a random location inside of the zone
# There are some empty zones (mainly border zones to Italy, which are under shared administration)
# There, we just sample a random location inside of the zone
df_shapes = context.stage("data.spatial.zone_shapes")
df_shapes = context.stage("data.spatial.zone_shapes")
for zone_id in context.progress(empty_zones, label="Assigning national locations for empty zones ..."):
count = df_national[df_national["work_zone_id"] == zone_id]["count"].iloc[0]
row = df_shapes[df_shapes["zone_id"] == zone_id].iloc[0]
coordinates = data.spatial.zone_shapes.sample_coordinates(row, count)
df.loc[df["work_zone_id"] == zone_id, "work_x"] = coordinates[:, 0]
df.loc[df["work_zone_id"] == zone_id, "work_y"] = coordinates[:, 1]
for zone_id in context.progress(empty_zones, label="Assigning national locations for empty zones ..."):
count = df_national[df_national["work_zone_id"] == zone_id]["count"].iloc[0]
row = df_shapes[df_shapes["zone_id"] == zone_id].iloc[0]
coordinates = data.spatial.zone_shapes.sample_coordinates(row, count)
df.loc[df["work_zone_id"] == zone_id, "work_x"] = coordinates[:, 0]
df.loc[df["work_zone_id"] == zone_id, "work_y"] = coordinates[:, 1]
# Second, handle the international commuters
print("TODO: We do not handle commuter traffic at the moment.")
# Second, handle the international commuters
print("TODO: We do not handle commuter traffic at the moment.")
# For now, make sure that we do not have any international traffic
df_international = df_demand[df_demand["zone_level"] == "country"]
# For now, make sure that we do not have any international traffic
df_international = df_demand[df_demand["zone_level"] == "country"]
assert (len(df_international) == 0)
assert (len(df) == len(df.dropna()))
assert (len(df_international) == 0)
assert (len(df) == len(df.dropna()))
df = df[["person_id",
"work_x", "work_y",
"work_location_id"]].rename({"work_x": "x",
"work_y": "y",
"work_location_id": "destination_id"},
axis=1)
df = df[["person_id",
"work_x", "work_y",
"work_location_id"]].rename({"work_x": "x",
"work_y": "y",
"work_location_id": "destination_id"},
axis=1)
df = spatial_utils.to_gpd(context, df, coord_type="work")
df = spatial_utils.to_gpd(context, df, coord_type="work")
return df[["person_id", "destination_id", "geometry"]]
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment