Commit d675459e authored by tchervec's avatar tchervec
Browse files

remove references to is_commute and data.microcensus.commute from...

remove references to is_commute and data.microcensus.commute from population.trips and population.activities
parent e02251ff
**3.0.0**
- Facilities attribute `offers_service` changed to `offers_other`
- Determine secondary activity locations directly in `python`
- Assign all work activities to the same location (i.e. one with longest duration)
- Separate work and education commute dataframes
- Add `isFreight` attribute to all agents in `population.xml`
- Update python requirements
- Updated statistical matching to most recent version
......
......@@ -11,6 +11,7 @@ run:
# These are configuration options that we use in the pipeline
config:
threads: 24
random_seed: 0
hot_deck_matching_runners: 24
disable_progress_bar: true
java_memory: 100G
......
......@@ -6,55 +6,49 @@ def configure(context):
context.stage("data.microcensus.trips")
context.stage("data.microcensus.persons")
def execute(context):
df_trips = context.stage("data.microcensus.trips")
df_persons = context.stage("data.microcensus.persons")
df_primary_commute = []
commutes = {}
for primary_purpose in ["work", "education"]:
# Find the maximum activity duration per person
df_max_time_per_person = df_trips[
df_trips["purpose"] == primary_purpose
][[
"person_id", "activity_duration"
]].groupby("person_id").max().reset_index()
# Find the maximum work duration per person
df_max_time_per_person = (df_trips[df_trips["purpose"] == "work"][["person_id", "activity_duration"]]
.groupby("person_id")
.max()
.reset_index())
# Find the trips with the maximum duration
df_commute = pd.merge(
df_trips[df_trips["purpose"] == primary_purpose][[
"person_id", "trip_id", "mode", "activity_duration", "destination_x", "destination_y"
]],
df_max_time_per_person,
on = ["person_id", "activity_duration"]
).groupby("person_id").first().reset_index()[[
"person_id", "trip_id", "mode", "activity_duration", "destination_x", "destination_y"
]]
df_commute.columns = [
"person_id", "commute_trip_id", "commute_mode", "commute_activity_duration",
"destination_x", "destination_y"]
df_commute = pd.merge(df_trips[df_trips["purpose"] == "work"][["person_id", "trip_id",
"mode", "activity_duration",
"destination_x", "destination_y"]],
df_max_time_per_person,
on=["person_id", "activity_duration"]
).groupby("person_id").first().reset_index()[["person_id", "trip_id",
"mode", "activity_duration",
"destination_x", "destination_y"]]
# Rename columns
df_commute.columns = ["person_id", "commute_trip_id",
"commute_mode", "commute_activity_duration",
"destination_x", "destination_y"]
# Find the commute distance
df_commute = pd.merge(
df_commute,
df_persons[["person_id", "home_x", "home_y"]],
on = "person_id")
df_commute = pd.merge(df_commute,
df_persons[["person_id", "home_x", "home_y"]],
on="person_id")
df_commute["commute_home_distance"] = np.sqrt(
(df_commute["home_x"] - df_commute["destination_x"])**2 + (df_commute["home_y"] - df_commute["destination_y"])**2
)
df_commute["commute_home_distance"] = np.sqrt((df_commute["home_x"] - df_commute["destination_x"]) ** 2 +
(df_commute["home_y"] - df_commute["destination_y"]) ** 2)
# Rename and clean columns
df_commute["commute_x"] = df_commute["destination_x"]
df_commute["commute_y"] = df_commute["destination_y"]
df_commute = df_commute[["person_id", "commute_trip_id", "commute_mode", "commute_home_distance",
"commute_activity_duration", "commute_x", "commute_y"]]
df_commute = df_commute[["person_id", "commute_trip_id", "commute_mode", "commute_home_distance", "commute_activity_duration", "commute_x", "commute_y"]]
df_commute.loc[:, "commute_purpose"] = primary_purpose
df_primary_commute.append(df_commute)
df_commute = pd.concat(df_primary_commute)
df_commute["commute_purpose"] = df_commute["commute_purpose"].astype("category")
commutes.update({primary_purpose: df_commute})
# Find the one with the longest duration, so we only have one commute purpose
df_commute = df_commute.sort_values("commute_activity_duration", ascending = False).drop_duplicates("person_id")
return df_commute
return commutes
import numpy as np
import numpy.linalg as la
import pandas as pd
def configure(context):
context.stage("data.microcensus.trips")
context.stage("data.microcensus.persons")
context.stage("data.microcensus.commute")
# TODO: Merge this into data.microcensus.commute
def execute(context):
df_trips = context.stage("data.microcensus.trips")[[
"person_id", "trip_id", "destination_x", "destination_y", "purpose"
]]
df_trips = df_trips[df_trips["purpose"].isin(["work", "education"])]
df_persons = context.stage("data.microcensus.persons")[[
"person_id", "home_x", "home_y"
]]
df_commute = context.stage("data.microcensus.commute")[[
"person_id", "commute_trip_id", "commute_x", "commute_y"
]]
df_trips = pd.merge(df_trips, df_commute, on = "person_id")
df_trips = pd.merge(df_trips, df_persons, on = "person_id")
df_trips = df_trips[[
"person_id", "trip_id", "commute_trip_id", "commute_x", "commute_y", "home_x", "home_y", "destination_x", "destination_y"
]]
data = df_trips[["home_x", "home_y", "commute_x", "commute_y", "destination_x", "destination_y"]].values
home_coordinates = data[:, 0:2]
primary_coordinates = data[:, 2:4]
secondary_coordinates = data[:, 4:6]
primary_distance = la.norm(primary_coordinates - home_coordinates, axis = 1)
primary_direction = (primary_coordinates - home_coordinates) / primary_distance[:, np.newaxis]
secondary_distance = la.norm(secondary_coordinates - home_coordinates, axis = 1)
secondary_direction = (secondary_coordinates - home_coordinates) / secondary_distance[:, np.newaxis]
tangential_distance = np.sum(primary_direction * secondary_direction * secondary_distance[:, np.newaxis], axis = 1)
tangential_factor = tangential_distance / primary_distance
normal_direction = np.dot(primary_direction, np.array([[0.0, -1.0], [1.0, 0.0]]))
center_direction = secondary_coordinates - (home_coordinates + primary_direction * tangential_distance[:, np.newaxis])
normal_distance = np.sum(normal_direction * center_direction, axis = 1)
df_trips.loc[:, "commute_tangential_ratio"] = tangential_distance / primary_distance
df_trips.loc[:, "commute_normal_ratio"] = normal_distance / primary_distance
df_trips.loc[:, "commute_direct_distance"] = secondary_distance
df_trips = df_trips[[
"person_id", "trip_id", "commute_tangential_ratio", "commute_normal_ratio", "commute_direct_distance"
]]
df_trips.columns = ["mz_person_id", "trip_id", "commute_tangential_ratio", "commute_normal_ratio", "commute_direct_distance"]
return df_trips
......@@ -5,13 +5,13 @@ import matsim.writers
def configure(context):
context.stage("population.opportunities")
context.stage("population.sociodemographics")
context.stage("synthesis.population.destinations")
context.stage("synthesis.population.enriched")
FIELDS = [
"location_id", "location_x", "location_y",
"offers_work", "offers_education", "offers_service", "offers_leisure", "offers_shop"
"destination_id", "destination_x", "destination_y",
"offers_work", "offers_education", "offers_leisure", "offers_shop", "offers_other"
]
......@@ -29,7 +29,7 @@ def execute(context):
cache_path = context.cache_path
# First, write actual facilities (from STATENT)
df_statent = context.stage("population.opportunities")
df_statent = context.stage("synthesis.population.destinations")
df_statent = df_statent[FIELDS]
with gzip.open("%s/facilities.xml.gz" % cache_path, "w+") as f:
......@@ -47,7 +47,7 @@ def execute(context):
writer.end_facility()
# Second, write household facilities
df_households = context.stage("population.sociodemographics")[[
df_households = context.stage("synthesis.population.enriched")[[
"household_id", "home_x", "home_y"
]].drop_duplicates("household_id")
......
......@@ -8,7 +8,7 @@ import matsim.writers
def configure(context):
context.stage("population.sociodemographics")
context.stage("synthesis.population.enriched")
FIELDS = ["household_id", "person_id", "income_class", "age", "number_of_cars_class", "number_of_bikes_class",
"municipality_type", "sp_region", "canton_id", "ovgk"]
......@@ -55,7 +55,7 @@ def add_household(writer, household, member_ids):
def execute(context):
cache_path = context.cache_path
df_persons = context.stage("population.sociodemographics").sort_values(by=["household_id", "person_id"])
df_persons = context.stage("synthesis.population.enriched").sort_values(by=["household_id", "person_id"])
df_persons = df_persons[FIELDS]
with gzip.open("%s/households.xml.gz" % cache_path, "w+") as f:
......
......@@ -8,12 +8,12 @@ import matsim.writers
def configure(context):
context.stage("population.sociodemographics")
context.stage("population.trips")
context.stage("population.activities")
context.stage("population.spatial.locations")
context.stage("synthesis.population.enriched")
context.stage("synthesis.population.trips")
context.stage("synthesis.population.activities")
context.stage("synthesis.population.spatial.locations")
context.config("use_freight", default=False)
context.stage("freight.trips")
context.stage("synthesis.freight.trips")
class PersonWriter:
......@@ -50,12 +50,14 @@ class PersonWriter:
# Plan
writer.start_plan(selected=True)
home_location = writer.location(self.activities[0][8], self.activities[0][9], "home%s" % self.person[13])
home_location = writer.location(self.activities[0][8].x, self.activities[0][8].y, "home%s" % self.person[13])
for i in range(len(self.activities)):
activity = self.activities[i]
location = home_location if np.isnan(activity[10]) else writer.location(activity[8], activity[9],
int(activity[10]))
geometry = activity[8]
destination_id = activity[9]
location = home_location if destination_id == -1 else writer.location(geometry.x, geometry.y,
int(destination_id))
start_time = activity[3] if not np.isnan(activity[3]) else None
end_time = activity[4] if not np.isnan(activity[4]) else None
......@@ -64,7 +66,7 @@ class PersonWriter:
if not activity[7]:
next_activity = self.activities[i + 1]
writer.add_leg(activity[11], activity[4], next_activity[3] - activity[4])
writer.add_leg(activity[10], activity[4], next_activity[3] - activity[4])
writer.end_plan()
writer.end_person()
......@@ -115,27 +117,27 @@ PERSON_FIELDS = ["person_id", "age", "car_availability", "employed", "driving_li
"subscriptions_ga", "subscriptions_halbtax", "subscriptions_verbund", "subscriptions_strecke",
"household_id", "is_car_passenger", "statpop_person_id", "statpop_household_id", "mz_person_id",
"mz_head_id"]
ACTIVITY_FIELDS = ["person_id", "activity_id", "start_time", "end_time", "duration", "purpose", "is_last", "location_x",
"location_y", "location_id", "following_mode"]
ACTIVITY_FIELDS = ["person_id", "activity_index", "start_time", "end_time", "duration", "purpose", "is_last",
"geometry", "destination_id", "following_mode"]
def execute(context):
cache_path = context.cache_path
df_persons = context.stage("population.sociodemographics")
df_activities = context.stage("population.activities")
df_persons = context.stage("synthesis.population.enriched")
df_activities = context.stage("synthesis.population.activities")
# Attach following modes to activities
df_trips = pd.DataFrame(context.stage("population.trips"), copy=True)[["person_id", "trip_id", "mode"]]
df_trips.columns = ["person_id", "activity_id", "following_mode"]
df_activities = pd.merge(df_activities, df_trips, on=["person_id", "activity_id"], how="left")
df_trips = pd.DataFrame(context.stage("synthesis.population.trips"), copy=True)[["person_id", "trip_index", "mode"]]
df_trips.columns = ["person_id", "activity_index", "following_mode"]
df_activities = pd.merge(df_activities, df_trips, on=["person_id", "activity_index"], how="left")
# Attach locations to activities
df_locations = context.stage("population.spatial.locations")
df_activities = pd.merge(df_activities, df_locations, on=["person_id", "activity_id"], how="left")
df_locations = context.stage("synthesis.population.spatial.locations")
df_activities = pd.merge(df_activities, df_locations, on=["person_id", "activity_index"], how="left")
# Bring in correct order (although it should already be)
df_persons = df_persons.sort_values(by="person_id")
df_activities = df_activities.sort_values(by=["person_id", "activity_id"])
df_activities = df_activities.sort_values(by=["person_id", "activity_index"])
df_persons = df_persons[PERSON_FIELDS]
df_activities = df_activities[ACTIVITY_FIELDS]
......@@ -177,7 +179,7 @@ def execute(context):
assert (number_of_written_persons == len(df_persons))
if context.config("use_freight"):
df_freight = context.stage("freight.trips")
df_freight = context.stage("synthesis.freight.trips")
freight_iterator = iter(df_freight.itertuples())
number_of_written_freight = 0
......
......@@ -2,13 +2,14 @@ import shutil
def configure(context):
context.stage("matsim.secondary_locations")
context.stage("matsim.population")
context.stage("matsim.households")
context.stage("matsim.facilities")
context.stage("matsim.network.mapped")
context.stage("matsim.java.eqasim")
context.stage("utils.java")
def execute(context):
# Some files we just copy
transit_vehicles_path = context.stage("matsim.network.mapped")["vehicles"]
......@@ -27,7 +28,7 @@ def execute(context):
facilities_input_path = context.stage("matsim.facilities")
facilities_output_path = "%s/switzerland_facilities.xml.gz" % context.cache_path
population_input_path = context.stage("matsim.secondary_locations")
population_input_path = context.stage("matsim.population")
population_prepared_path = "%s/prepared_population.xml.gz" % context.cache_path
population_output_path = "%s/switzerland_population.xml.gz" % context.cache_path
......@@ -39,61 +40,61 @@ def execute(context):
java(
context.stage("matsim.java.eqasim"),
"org.eqasim.core.scenario.preparation.RunPreparation", [
"--input-facilities-path", facilities_input_path,
"--output-facilities-path", facilities_output_path,
"--input-population-path", population_input_path,
"--output-population-path", population_prepared_path,
"--input-network-path", network_input_path,
"--output-network-path", network_output_path,
"--threads", str(context.config("threads"))
], cwd = context.cache_path)
"--input-facilities-path", facilities_input_path,
"--output-facilities-path", facilities_output_path,
"--input-population-path", population_input_path,
"--output-population-path", population_prepared_path,
"--input-network-path", network_input_path,
"--output-network-path", network_output_path,
"--threads", str(context.config("threads"))
], cwd=context.cache_path)
java(
context.stage("matsim.java.eqasim"),
"org.eqasim.core.scenario.config.RunGenerateConfig", [
"--output-path", config_output_path,
"--prefix", "switzerland_",
"--sample-size", str(context.config("input_downsampling")),
"--random-seed", str(1000),
"--threads", str(context.config("threads"))
], cwd = context.cache_path)
"--output-path", config_output_path,
"--prefix", "switzerland_",
"--sample-size", str(context.config("input_downsampling")),
"--random-seed", str(1000),
"--threads", str(context.config("threads"))
], cwd=context.cache_path)
java(
context.stage("matsim.java.eqasim"),
"org.eqasim.switzerland.scenario.RunCalculateStopCategories", [
"--input-path", transit_schedule_input_path,
"--output-path", transit_schedule_output_path
], cwd = context.cache_path)
"--input-path", transit_schedule_input_path,
"--output-path", transit_schedule_output_path
], cwd=context.cache_path)
java(
context.stage("matsim.java.eqasim"),
"org.eqasim.core.scenario.routing.RunPopulationRouting", [
"--config-path", config_output_path,
"--output-path", population_output_path,
"--threads", str(context.config("threads")),
"--config:plans.inputPlansFile", population_prepared_path
], cwd = context.cache_path)
"--config-path", config_output_path,
"--output-path", population_output_path,
"--threads", str(context.config("threads")),
"--config:plans.inputPlansFile", population_prepared_path
], cwd=context.cache_path)
java(
context.stage("matsim.java.eqasim"),
"org.eqasim.core.scenario.validation.RunScenarioValidator", [
"--config-path", config_output_path
], cwd = context.cache_path)
"--config-path", config_output_path
], cwd=context.cache_path)
java(
context.stage("matsim.java.eqasim"),
"org.eqasim.switzerland.scenario.RunAdaptConfig", [
"--input-path", config_output_path,
"--output-path", config_output_path
], cwd = context.cache_path)
"--input-path", config_output_path,
"--output-path", config_output_path
], cwd=context.cache_path)
java(
context.stage("matsim.java.eqasim"),
"org.eqasim.switzerland.RunSimulation", [
"--config-path", config_output_path,
"--config:controler.lastIteration", str(1),
"--config:controler.writeEventsInterval", str(1),
"--config:controler.writePlansInterval", str(1),
], cwd = context.cache_path)
"--config-path", config_output_path,
"--config:controler.lastIteration", str(1),
"--config:controler.writeEventsInterval", str(1),
"--config:controler.writePlansInterval", str(1),
], cwd=context.cache_path)
return context.cache_path
......@@ -24,11 +24,11 @@ def execute(context):
df_trips = df_trips[df_trips["crowfly_distance"] > 0.0]
df_trips["following_purpose"] = df_trips["purpose"]
df_trips["preceeding_purpose"] = df_trips["purpose"].shift(1)
df_trips.loc[df_trips["trip_id"] == 1, "preceeding_purpose"] = "home"
df_trips["preceding_purpose"] = df_trips["purpose"].shift(1)
df_trips.loc[df_trips["trip_id"] == 1, "preceding_purpose"] = "home"
df_trips = df_trips[~(
df_trips["preceeding_purpose"].isin(primary_activities) &
df_trips["preceding_purpose"].isin(primary_activities) &
df_trips["following_purpose"].isin(primary_activities)
)]
......
......@@ -55,8 +55,8 @@ class XmlWriter:
return "%02d:%02d:%02d" % (hours, minutes, seconds)
def location(self, x, y, facility_id = None):
#return (x, y, None if facility_id is None or np.isnan(facility_id) else int(facility_id))
return (x, y, None if facility_id is None or (type(facility_id) == float and np.isnan(facility_id)) else facility_id)
return (x, y,
None if facility_id is None or (type(facility_id) == float and np.isnan(facility_id)) else facility_id)
class PopulationWriter(XmlWriter):
POPULATION_SCOPE = 0
......
import itertools
import multiprocessing as mp
import numpy as np
class HotDeckMatcher:
def __init__(self, context, df_source, source_id, source_weight, mandatory_fields, preference_fields, default_id,
minimum_source_samples):
self.context = context
self.mandatory_fields = mandatory_fields
self.preference_fields = preference_fields
self.all_fields = self.mandatory_fields + self.preference_fields
self.default_id = default_id
self.minimum_source_samples = minimum_source_samples
self.values = {
field: list(np.unique(df_source[field]))
for field in self.all_fields
}
self.field_sizes = [len(self.values[field]) for field in self.all_fields]
self.field_indices = np.cumsum(self.field_sizes)
for field in self.all_fields:
print("Found categories for %s:" % field, ", ".join([str(c) for c in self.values[field]]))
self.source_matrix = self.make_matrix(df_source, source=True)
self.source_weights = df_source[source_weight]
self.source_ids = df_source[source_id]
search_order = [
[list(np.arange(size) == k) for k in range(size)] +
([] if field in self.mandatory_fields else [[False] * size])
for field, size in zip(self.all_fields, self.field_sizes)
]
self.field_masks = list(itertools.product(*search_order))
def make_matrix(self, df, chunk_index=None, source=False):
columns = sum(self.field_sizes)
matrix = np.zeros((len(df), columns), dtype=np.bool)
column_index = 0
with self.context.progress(total=columns,
label="Reading categories (%s) ..." % ("source" if source else "target"),
position=chunk_index) as progress:
for field_name in self.all_fields:
for field_value in self.values[field_name]:
matrix[:, column_index] = df[field_name] == field_value
column_index += 1
progress.update()
return matrix
def __call__(self, df_target, chunk_index=0):
target_matrix = self.make_matrix(df_target, chunk_index=None)
matched_mask = np.zeros((len(df_target),), dtype=np.bool)
matched_indices = np.ones((len(df_target),), dtype=np.int) * -1
# Note: This speeds things up quite a bit. We generate a random number
# for each person which is later on used for the sampling.
random = np.array([
np.random.random() for _ in
self.context.progress(range(len(df_target)), label="Generating random numbers", position=chunk_index)
])
with self.context.progress(total=len(self.field_masks),
position=chunk_index,
label="Hot Deck Matching") as progress:
for field_mask in self.field_masks:
field_mask = np.array(functools.reduce(lambda x, y: x + y, field_mask), dtype=np.bool)
source_mask = np.all(self.source_matrix[:, field_mask], axis=1)
if np.any(source_mask) and np.count_nonzero(source_mask) >= self.minimum_source_samples:
target_mask = np.all(target_matrix[:, field_mask], axis=1)
if np.any(target_mask):
source_indices = np.where(source_mask)[0]
random_indices = np.floor(random[target_mask] * len(source_indices)).astype(np.int)
matched_indices[np.where(~matched_mask)[0][target_mask]] = source_indices[random_indices]
# We continuously shrink these matrix to make the matching
# easier and easier as the HDM proceeds
random = random[~target_mask]
target_matrix = target_matrix[~target_mask]
matched_mask[~matched_mask] |= target_mask
progress.update()
matched_ids = np.zeros((len(df_target),), dtype=self.source_ids.dtype)
matched_ids[matched_mask] = self.source_ids.iloc[matched_indices[matched_mask]]
matched_ids[~matched_mask] = self.default_id
return matched_ids
def run(df_target, target_id, df_source, source_id, source_weight, mandatory_fields, preference_fields,
default_id=int(-1), runners=-1, minimum_source_samples=1):
matcher = HotDeckMatcher(df_source, source_id, source_weight, mandatory_fields, preference_fields, default_id,
minimum_source_samples)
if runners == -1:
runners = mp.cpu_count()
if runners == 1:
df_target.loc[:, "hdm_source_id"] = matcher(df_target, 0)
else:
with mp.Pool(processes=runners, initializer=initializer, initargs=(matcher,)) as pool:
chunks = np.array_split(df_target, runners)
df_target.loc[:, "hdm_source_id"] = np.hstack(pool.map(runner, enumerate(chunks)))
for i in range(runners + 1): print(" ") # Formatting of output
matcher = None
def initializer(_matcher):
global matcher
matcher = _matcher
def runner(args):
index, df_chunk = args