Commit 5aba6580 authored by Grace Orowo Kagho's avatar Grace Orowo Kagho
Browse files

weekend work locations first test using modified distance distribution method...

weekend work locations first test using modified distance distribution method from saopaolo, education is the same as before
parent e703322e
def configure(context):
context.stage("synthesis.population.spatial.primary.education.locations")
def execute(context):
df_education = context.stage("synthesis.population.spatial.primary.education.locations")
return df_education
#Todo: need to check if i need to modify for weekends since the number of education trips are low anyways
\ No newline at end of file
......@@ -10,185 +10,11 @@ from synthesis.population.spatial.secondary.rda import AssignmentSolver, Discret
def configure(context):
context.stage("synthesis.population.trips")
context.stage("synthesis.population.sampled")
context.stage("synthesis.population.spatial.home.locations")
context.stage("synthesis.population.spatial.primary.weekend.distance_distributions")
context.stage("synthesis.population.destinations")
context.config("random_seed")
context.config("threads")
context.config("output_path")
def prepare_locations(context):
# Load persons and their primary locations
df_home = context.stage("synthesis.population.spatial.home.locations")
df_home = df_home.rename(columns={"geometry": "home"})
df_locations = context.stage("synthesis.population.sampled")[["person_id", "household_id"]]
df_locations = pd.merge(df_locations, df_home[["household_id", "home"]], how="left", on="household_id")
return df_locations[["person_id", "home"]].sort_values(by="person_id")
def prepare_destinations(context):
df_destinations = context.stage("synthesis.population.destinations")
M = np.max(df_destinations["destination_id"].values.tolist()) + 1
data = {}
identifiers = df_destinations["destination_id"].values
locations = np.vstack(df_destinations["geometry"].apply(lambda x: np.array([x.x, x.y])).values)
for purpose in ("work", "education"):
f = df_destinations["offers_%s" % purpose].values
data[purpose] = dict(
identifiers=identifiers[f],
locations=locations[f]
)
print(purpose, len(identifiers[f]))
return data
def resample_cdf(cdf, factor):
if factor >= 0.0:
cdf = cdf * (1.0 + factor * np.arange(1, len(cdf) + 1) / len(cdf))
else:
cdf = cdf * (1.0 + abs(factor) - abs(factor) * np.arange(1, len(cdf) + 1) / len(cdf))
cdf /= cdf[-1]
return cdf
def resample_distributions(distributions, factors):
for mode, mode_distributions in distributions.items():
for distribution in mode_distributions["distributions"]:
distribution["cdf"] = resample_cdf(distribution["cdf"], factors[mode])
context.stage("synthesis.population.spatial.primary.weekend.work_locations")
context.stage("synthesis.population.spatial.primary.weekend.education_locations")
def execute(context):
# Load trips and primary locations
df_trips = context.stage("synthesis.population.trips").sort_values(by=["person_id", "trip_index"])
df_trips["travel_time"] = df_trips["arrival_time"] - df_trips["departure_time"]
df_primary = prepare_locations(context)
# Prepare data
distance_distributions = context.stage("synthesis.population.spatial.primary.weekend.distance_distributions")
destinations = prepare_destinations(context)
# Resampling for calibration
resample_distributions(distance_distributions, dict(
car=0.8, car_passenger=1.0, pt=1.0, bike=0.0, walk=0.0
))
# Segment into subsamples
processes = context.config("threads")
unique_person_ids = df_trips["person_id"].unique()
number_of_persons = len(unique_person_ids)
unique_person_ids = np.array_split(unique_person_ids, processes)
random = np.random.RandomState(context.config("random_seed"))
random_seeds = random.randint(10000, size=processes)
# Create batch problems for parallelization
batches = []
for index in range(processes):
batches.append((
df_trips[df_trips["person_id"].isin(unique_person_ids[index])],
df_primary[df_primary["person_id"].isin(unique_person_ids[index])],
random_seeds[index]
))
# Run algorithm in parallel
with context.progress(label="Assigning locations to persons", total=number_of_persons):
with context.parallel(processes=processes, data=dict(
distance_distributions=distance_distributions,
destinations=destinations
)) as parallel:
df_locations, df_convergence = [], []
for df_locations_item, df_convergence_item in parallel.imap_unordered(process, batches):
df_locations.append(df_locations_item)
df_convergence.append(df_convergence_item)
df_locations = pd.concat(df_locations).sort_values(by=["person_id", "trip_index"])
df_convergence = pd.concat(df_convergence)
print("Success rate:", df_convergence["valid"].mean())
return df_locations, df_convergence
def process(context, arguments):
df_trips, df_primary, random_seed = arguments
# Set up RNG
random = np.random.RandomState(context.config("random_seed"))
# Set up distance sampler
distance_distributions = context.data("distance_distributions")
distance_sampler = CustomDistanceSampler(
maximum_iterations=1000,
random=random,
distributions=distance_distributions)
# Set up relaxation solver; currently, we do not consider tail problems.
relaxation_solver = GravityChainSolver(
random=random, eps=10.0, lateral_deviation=10.0, alpha=0.1
)
# Set up discretization solver
destinations = context.data("destinations")
discretization_solver = CustomDiscretizationSolver(destinations)
# Set up assignment solver
thresholds = dict(
car=200.0, car_passenger=200.0, pt=200.0,
bike=100.0, walk=100.0
)
assignment_objective = DiscretizationErrorObjective(thresholds=thresholds)
assignment_solver = AssignmentSolver(
distance_sampler=distance_sampler,
relaxation_solver=relaxation_solver,
discretization_solver=discretization_solver,
objective=assignment_objective,
maximum_iterations=20
)
df_locations = []
df_convergence = []
last_person_id = None
for problem in find_assignment_problems(df_trips, df_primary):
result = assignment_solver.solve(problem)
starting_trip_index = problem["trip_index"]
for index, (identifier, location) in enumerate(
zip(result["discretization"]["identifiers"], result["discretization"]["locations"])):
df_locations.append((
problem["person_id"], starting_trip_index + index, identifier, geo.Point(location)
))
df_convergence.append((
result["valid"], problem["size"]
))
if problem["person_id"] != last_person_id:
last_person_id = problem["person_id"]
context.progress.update()
df_locations = pd.DataFrame.from_records(df_locations,
columns=["person_id", "trip_index", "destination_id", "geometry"])
df_locations = gpd.GeoDataFrame(df_locations, crs=dict(init="epsg:2154"))
df_work = context.stage("synthesis.population.spatial.primary.weekend.work_locations")
df_education = context.stage("synthesis.population.spatial.primary.weekend.education_locations")
df_convergence = pd.DataFrame.from_records(df_convergence, columns=["valid", "size"])
return df_locations, df_convergence
return df_work, df_education
......@@ -6,31 +6,12 @@ def configure(context):
context.stage("data.microcensus.persons")
context.stage("data.microcensus.trips")
def calculate_bounds(values, bin_size):
values = np.sort(values)
bounds = []
current_count = 0
previous_bound = None
for value in values:
if value == previous_bound:
continue
if current_count < bin_size:
current_count += 1
else:
current_count = 0
bounds.append(value)
previous_bound = value
if len(bounds) > 0:
bounds[-1] = np.inf
else:
bounds.append(np.inf)
return bounds
def compute_cdf(df,bin_size=200):
hist_vals, bins_vals = np.histogram(df["distance"], weights=df["weight"],bins=bin_size)
histbin_midpoints = bins_vals[:-1] + np.diff(bins_vals) / 2
cdf = np.cumsum(hist_vals)
cdf = cdf / cdf[-1]
return cdf, histbin_midpoints
def execute(context):
......@@ -40,8 +21,10 @@ def execute(context):
#filter out only persons engaged in weekend trips
# ToDo should confirm this
#the assumption here is that saturday and sunday have the same distance distribution for work or education trips
#which is different from the weekday. should confirm this
#which is different from the weekday.
df_persons = df_persons[df_persons["weekend"]]
......@@ -49,11 +32,7 @@ def execute(context):
"departure_time", "arrival_time", "purpose"]]
df_trips = pd.merge(df_trips, df_persons[["person_id", "weight"]], on="person_id")
#calculate travel_time
df_trips["travel_time"] = df_trips["arrival_time"] - df_trips["departure_time"]
df_trips = df_trips[df_trips["travel_time"] > 0.0]
#keep only trips with distances greater than zero. Need to confirm why this is, just following old conventions
#keep only trips with distances greater than zero
df_trips = df_trips[df_trips["crowfly_distance"] > 0.0]
......@@ -64,68 +43,37 @@ def execute(context):
# Filtering for only work and education
primary_activities = ["home", "work", "education"]
df_trips = df_trips[(df_trips["preceding_purpose"].isin(primary_activities)
& df_trips["following_purpose"].isin(primary_activities))]
# df_trips = df_trips[(df_trips["preceding_purpose"].isin(primary_activities)
# & df_trips["following_purpose"].isin(primary_activities))]
# Rename columns
#distance_column = "crowfly_distance" if "crowfly_distance" in df_trips else "network_distance"
#df = df_trips[["travel_time", distance_column, "weight"]].rename(columns={distance_column: "distance"})
#Calculate distributions
bin_size = 200 # ToDo need to find out why this value has been specified and how it affects the distances provided
distributions = {}
df = df_trips[["travel_time", "crowfly_distance", "weight"]].rename(columns={"crowfly_distance" : "distance"})
#Extract work distances #ToDo take distances from commute distances already provided in mz.commute
df_trips_work = df_trips[(df_trips["preceding_purpose"].isin(["home","work"])
& df_trips["following_purpose"].isin(["home", "work"]))].copy()
# Using the person weights and the crowfly distance of all trips that end at work. (the issue here is that
# there is no integrity for h-w trips as some trips are w-w. A solution will be to remove all trips
# that do not hold the integrity of h-w
# Calculate distributions
#the location assignment uses the distributions based on the mode type for the primary purpose.
# Here we do not consider mode and as a result we do not consider the travel time either
#modes = df["mode"].unique()
df = df_trips_work[["crowfly_distance", "weight"]].rename(columns={"crowfly_distance": "distance"})
work_cdf, work_midpoint_bin_distances = compute_cdf(df, bin_size=200)
bin_size = 200 #ToDo need to find out why this value has been specified
distributions = {}
# Write distribution for work
distributions["work"] = dict(cdf=work_cdf, midpoint_bins=work_midpoint_bin_distances)
#Extract education distances
df_trips_edu = df_trips[(df_trips["preceding_purpose"].isin(["home", "education"])
& df_trips["following_purpose"].isin(["home", "education"]))].copy()
df = df_trips_edu[["crowfly_distance", "weight"]].rename(columns={"crowfly_distance": "distance"})
edu_cdf, edu_midpoint_bin_distances = compute_cdf(df, bin_size=200)
# Write distribution for education
distributions["education"] = dict(cdf=edu_cdf, midpoint_bins=edu_midpoint_bin_distances)
#for mode in modes:
# First calculate bounds by unique values
#f_mode = df["mode"] == mode
#bounds = calculate_bounds(df[f_mode]["travel_time"].values, bin_size)
# distributions[mode] = dict(bounds=np.array(bounds), distributions=[])
# Second, calculate distribution per band
# for lower_bound, upper_bound in zip([-np.inf] + bounds[:-1], bounds):
# f_bound = (df["travel_time"] > lower_bound) & (df["travel_time"] <= upper_bound)
#
# # Set up distribution
# values = df[f_mode & f_bound]["distance"].values
# weights = df[f_mode & f_bound]["weight"].values
#
# sorter = np.argsort(values)
# values = values[sorter]
# weights = weights[sorter]
#
# cdf = np.cumsum(weights)
# cdf /= cdf[-1]
#
# # Write distribution
# distributions[mode]["distributions"].append(dict(cdf=cdf, values=values, weights=weights))
#create travel time bins or bands
bounds = calculate_bounds(df["travel_time"].values, bin_size)
# calculate distribution per band
for lower_bound, upper_bound in zip([-np.inf] + bounds[:-1], bounds):
f_bound = (df["travel_time"] > lower_bound) & (df["travel_time"] <= upper_bound)
# Set up distribution
values = df[f_bound]["distance"].values
weights = df[f_bound]["weight"].values
sorter = np.argsort(values)
values = values[sorter]
weights = weights[sorter]
cdf = np.cumsum(weights)
cdf /= cdf[-1]
# Write distribution
distributions[mode]["distributions"].append(dict(cdf=cdf, values=values, weights=weights))
return distributions
......@@ -123,10 +123,9 @@ def impute_work_locations(context, distributions, destinations, df_syn_persons_w
df_work_persons["destination_id"] = df_candidates.iloc[discrete_indices]["destination_id"].values
df_work_persons = df_work_persons[["person_id",
"education_x", "education_y",
"education_location_id"]].rename({"education_location_id": "destination_id",
"education_x": "x",
"education_y": "y"},
"work_x", "work_y",
"destination_id"]].rename({"work_x": "x",
"work_y": "y"},
axis=1)
df_work_persons = spatial_utils.to_gpd(context, df_work_persons, coord_type="work")
......@@ -162,3 +161,9 @@ def execute(context):
return df_locations
#todo: some things not considered and assumptions made:
# 1. work distances are not by zone (any need to differentiate same zone work locations and different zones work locs?)
# 2. international vs national trips
# 3. would there be influence of mode usage on the work distances like in the secondary activities\
# I should test if this works for weekdays and compare with what is given from the zone. How to validate the flows?
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment