matched.py 14.4 KB
Newer Older
tchervec's avatar
tchervec committed
1
2
3
import itertools

import numba
Sebastian Hörl's avatar
Sebastian Hörl committed
4
import numpy as np
tchervec's avatar
tchervec committed
5
6
import pandas as pd

Sebastian Hörl's avatar
Sebastian Hörl committed
7
import data.constants as c
tchervec's avatar
tchervec committed
8
9
10
11
12

"""
This stage attaches observations from the microcensus to the synthetic population sample.
This is done by statistical matching.
"""
Sebastian Hörl's avatar
Sebastian Hörl committed
13

tchervec's avatar
tchervec committed
14

15
def configure(context):
tchervec's avatar
tchervec committed
16
17
18
    context.config("threads")
    context.config("random_seed", 0)
    context.config("matching_minimum_observations", 20)
19
    context.config("weekend_scenario", False)
20
    context.config("specific_weekend_scenario", "all") # options are "all", "saturday", "sunday"
21
    context.config("specific_day_scenario", "avgworkday") #options can be any of the days of the week or "avgworkday"
tchervec's avatar
tchervec committed
22

23
    context.stage("data.microcensus.persons")
24
    context.stage("synthesis.population.sampled")
Sebastian Hörl's avatar
Sebastian Hörl committed
25

tchervec's avatar
tchervec committed
26

tchervec's avatar
tchervec committed
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
@numba.jit(nopython=True, parallel=True)
def sample_indices(uniform, cdf, selected_indices):
    indices = np.arange(len(uniform))

    for i, u in enumerate(uniform):
        indices[i] = np.count_nonzero(cdf < u)

    return selected_indices[indices]


def statistical_matching(progress, df_source, source_identifier, weight, df_target, target_identifier, columns,
                         random_seed=0, minimum_observations=0):
    random = np.random.RandomState(random_seed)

    # Reduce data frames
    df_source = df_source[[source_identifier, weight] + columns].copy()
    df_target = df_target[[target_identifier] + columns].copy()

    # Sort data frames
    df_source = df_source.sort_values(by=columns)
    df_target = df_target.sort_values(by=columns)

    # Find unique values for all columns
    unique_values = {}

    for column in columns:
        unique_values[column] = list(sorted(set(df_source[column].unique()) | set(df_target[column].unique())))

    # Generate filters for all columns and values
    source_filters, target_filters = {}, {}

    for column, column_unique_values in unique_values.items():
        source_filters[column] = [df_source[column].values == value for value in column_unique_values]
        target_filters[column] = [df_target[column].values == value for value in column_unique_values]

    # Define search order
    source_filters = [source_filters[column] for column in columns]
    target_filters = [target_filters[column] for column in columns]

    # Perform matching
    weights = df_source[weight].values
    assigned_indices = np.ones((len(df_target),), dtype=np.int) * -1
    unassigned_mask = np.ones((len(df_target),), dtype=np.bool)
    assigned_levels = np.ones((len(df_target),), dtype=np.int) * -1
    uniform = random.random_sample(size=(len(df_target),))

    column_indices = [np.arange(len(unique_values[column])) for column in columns]

    for level in range(1, len(column_indices) + 1)[::-1]:
        level_column_indices = column_indices[:level]

        if np.count_nonzero(unassigned_mask) > 0:
            for column_index in itertools.product(*level_column_indices):
                f_source = np.logical_and.reduce([source_filters[i][k] for i, k in enumerate(column_index)])
                f_target = np.logical_and.reduce(
                    [target_filters[i][k] for i, k in enumerate(column_index)] + [unassigned_mask])

                selected_indices = np.nonzero(f_source)[0]
                requested_samples = np.count_nonzero(f_target)

                if requested_samples == 0:
                    continue

                if len(selected_indices) < minimum_observations:
                    continue

                selected_weights = weights[f_source]
                cdf = np.cumsum(selected_weights)
                cdf /= cdf[-1]

                assigned_indices[f_target] = sample_indices(uniform[f_target], cdf, selected_indices)
                assigned_levels[f_target] = level
                unassigned_mask[f_target] = False

                progress.update(np.count_nonzero(f_target))

    # Randomly assign unmatched observations
    cdf = np.cumsum(weights)
    cdf /= cdf[-1]

    assigned_indices[unassigned_mask] = sample_indices(uniform[unassigned_mask], cdf, np.arange(len(weights)))
    assigned_levels[unassigned_mask] = 0

    progress.update(np.count_nonzero(unassigned_mask))

    assert np.count_nonzero(unassigned_mask) == 0
    assert np.count_nonzero(assigned_indices == -1) == 0

    # Write back indices
    df_target[source_identifier] = df_source[source_identifier].values[assigned_indices]
    df_target = df_target[[target_identifier, source_identifier]]

    return df_target, assigned_levels


def _run_parallel_statistical_matching(context, args):
    # Pass arguments
    df_target, random_seed = args

    # Pass data
    df_source = context.data("df_source")
    source_identifier = context.data("source_identifier")
    weight = context.data("weight")
    target_identifier = context.data("target_identifier")
    columns = context.data("columns")
    minimum_observations = context.data("minimum_observations")

    return statistical_matching(context.progress, df_source, source_identifier, weight, df_target, target_identifier,
                                columns, random_seed, minimum_observations)


def parallel_statistical_matching(context, df_source, source_identifier, weight, df_target, target_identifier, columns,
                                  minimum_observations=0):
    random_seed = context.config("random_seed")
    processes = context.config("threads")

    random = np.random.RandomState(random_seed)
    chunks = np.array_split(df_target, processes)

    with context.progress(label="Statistical matching ...", total=len(df_target)):
        with context.parallel({
            "df_source": df_source, "source_identifier": source_identifier, "weight": weight,
            "target_identifier": target_identifier, "columns": columns,
            "minimum_observations": minimum_observations
        }) as parallel:
            random_seeds = random.randint(10000, size=len(chunks))
            results = parallel.map(_run_parallel_statistical_matching, zip(chunks, random_seeds))

            levels = np.hstack([r[1] for r in results])
            df_target = pd.concat([r[0] for r in results])

            return df_target, levels
159

Sebastian Hörl's avatar
Sebastian Hörl committed
160

Sebastian Hörl's avatar
Sebastian Hörl committed
161
def execute(context):
Sebastian Hörl's avatar
Sebastian Hörl committed
162
    df_mz = context.stage("data.microcensus.persons")
163
    is_weekend_scenario = context.config("weekend_scenario")
164
    specific_weekend_scenario = context.config("specific_weekend_scenario")
165
    specific_day = context.config("specific_day_scenario")
166
    is_specific_day_scenario = (specific_day != "avgworkday") & (specific_day is not None)
Sebastian Hörl's avatar
Sebastian Hörl committed
167
168

    # Source are the MZ observations, for each STATPOP person, a sample is drawn from there
169
    #specify day of the week the scenario will be generated for
170

Sebastian Hörl's avatar
Sebastian Hörl committed
171
    df_source = pd.DataFrame(df_mz[
172
173
174
175
176
177
178
179
                 (is_weekend_scenario & df_mz[
                     "weekend"])  # use only weekend samples for a weekend scenario - maybe not needed
                 |
                 (~is_weekend_scenario & ~is_specific_day_scenario & (~df_mz["weekend"]))  # and only weekday samples for a weekday
                 |
                #add options for different days of the week scenarios that are not weekend
                 (~is_weekend_scenario & (specific_day != "avgworkday") & (df_mz["day"] == specific_day))
                 ])
Sebastian Hörl's avatar
Sebastian Hörl committed
180

181
182
    #If specific weekend context is needed for saturday or sunday
    if (is_weekend_scenario & (specific_weekend_scenario != "all")):
kaghog's avatar
fix bug    
kaghog committed
183
184
        df_source = pd.DataFrame(df_source[((specific_weekend_scenario == "saturday") & df_source["saturday"]) |
                                 ((specific_weekend_scenario == "sunday") & df_source["sunday"])])
185

186
187
    print("INFO: The scenario will be generated for: ", df_source["day"].unique(), " day (s) of the week")

188
189
190
    df_population = context.stage("synthesis.population.sampled")
    number_of_statpop_persons = len(np.unique(df_population["person_id"]))
    number_of_statpop_households = len(np.unique(df_population["household_id"]))
Sebastian Hörl's avatar
Sebastian Hörl committed
191

tchervec's avatar
tchervec committed
192
193
194
195
    ## We first want to match by household to be able
    ## to add extra attributes to the persons

    # Match households
196
197
    age_selector = df_population["age"] >= c.MZ_AGE_THRESHOLD
    head_selector = age_selector & df_population["is_head"]
Sebastian Hörl's avatar
Sebastian Hörl committed
198

199
    df_target = pd.DataFrame(df_population[head_selector])
Sebastian Hörl's avatar
Sebastian Hörl committed
200

tchervec's avatar
tchervec committed
201
202
203
204
205
206
207
208
    columns = ["age_class", "sex", "marital_status", "household_size_class", "municipality_type"]

    # Perform statistical matching
    df_source = df_source.rename(columns={"person_id": "mz_id"})

    df_assignment, levels = parallel_statistical_matching(
        context,
        df_source, "mz_id", "household_weight",
Sebastian Hörl's avatar
Sebastian Hörl committed
209
        df_target, "person_id",
tchervec's avatar
tchervec committed
210
211
212
213
214
        columns,
        minimum_observations=context.config("matching_minimum_observations"))

    df_target = pd.merge(df_target, df_assignment, on="person_id")
    assert len(df_target) == len(df_assignment)
Sebastian Hörl's avatar
Sebastian Hörl committed
215

tchervec's avatar
tchervec committed
216
217
218
219
220
221
222
223
224
    context.set_info("matched_counts", {
        count: np.count_nonzero(levels >= count) for count in range(len(columns) + 1)
    })

    for count in range(len(columns) + 1):
        print("%d matched levels:" % count, np.count_nonzero(levels >= count),
              "%.2f%%" % (100 * np.count_nonzero(levels >= count) / len(df_target),))

    # Remove and track unmatchable households (i.e. head of household)
Sebastian Hörl's avatar
Sebastian Hörl committed
225

226
    initial_statpop_length = len(df_population)
Sebastian Hörl's avatar
Sebastian Hörl committed
227
    initial_target_length = len(df_target)
Sebastian Hörl's avatar
Sebastian Hörl committed
228

tchervec's avatar
tchervec committed
229
    unmatchable_household_selector = levels < 1
Sebastian Hörl's avatar
Sebastian Hörl committed
230
    umatchable_household_ids = set(df_target.loc[unmatchable_household_selector, "household_id"].values)
231
    unmatchable_person_selector = df_population["household_id"].isin(umatchable_household_ids)
Sebastian Hörl's avatar
Sebastian Hörl committed
232

233
    removed_person_ids = set(df_population.loc[unmatchable_person_selector, "person_id"].values)
Sebastian Hörl's avatar
Sebastian Hörl committed
234
    removed_household_ids = set() | umatchable_household_ids
Sebastian Hörl's avatar
Sebastian Hörl committed
235
236

    df_target = df_target.loc[~unmatchable_household_selector, :]
237
    df_population = df_population.loc[~unmatchable_person_selector, :]
Sebastian Hörl's avatar
Sebastian Hörl committed
238

tchervec's avatar
tchervec committed
239
    removed_households_count = sum(unmatchable_household_selector)
Sebastian Hörl's avatar
Sebastian Hörl committed
240
241
    removed_persons_count = sum(unmatchable_person_selector)

tchervec's avatar
tchervec committed
242
243
    print("Unmatchable heads of household: ", removed_households_count)
    print("  Removed households: ", removed_households_count)
Sebastian Hörl's avatar
Sebastian Hörl committed
244
    print("  Removed persons: ", removed_persons_count)
Sebastian Hörl's avatar
Sebastian Hörl committed
245
    print("")
Sebastian Hörl's avatar
Sebastian Hörl committed
246

tchervec's avatar
tchervec committed
247
    assert (len(df_target) == initial_target_length - removed_households_count)
248
    assert (len(df_population) == initial_statpop_length - removed_persons_count)
Sebastian Hörl's avatar
Sebastian Hörl committed
249
250

    # Convert IDs
tchervec's avatar
tchervec committed
251
252
    df_target["mz_id"] = df_target["mz_id"].astype(np.int)
    df_source["mz_id"] = df_source["mz_id"].astype(np.int)
Sebastian Hörl's avatar
Sebastian Hörl committed
253

tchervec's avatar
tchervec committed
254
    # Get the attributes from the MZ for the head of household (and thus for the household)
Sebastian Hörl's avatar
Sebastian Hörl committed
255
256
    df_attributes = pd.merge(
        df_target[[
tchervec's avatar
tchervec committed
257
            "household_id", "mz_id"
Sebastian Hörl's avatar
Sebastian Hörl committed
258
259
        ]],
        df_source[[
tchervec's avatar
tchervec committed
260
            "mz_id", "income_class", "number_of_cars_class", "number_of_bikes_class"
Sebastian Hörl's avatar
Sebastian Hörl committed
261
        ]],
tchervec's avatar
tchervec committed
262
        on = "mz_id"
Sebastian Hörl's avatar
Sebastian Hörl committed
263
    )
Sebastian Hörl's avatar
Sebastian Hörl committed
264

tchervec's avatar
tchervec committed
265
266
    df_attributes["mz_head_id"] = df_attributes["mz_id"]
    del df_attributes["mz_id"]
Sebastian Hörl's avatar
Sebastian Hörl committed
267

tchervec's avatar
tchervec committed
268
    assert (len(df_attributes) == len(df_target))
Sebastian Hörl's avatar
Sebastian Hörl committed
269
270

    # Attach attrbiutes to STATPOP for the second matching
tchervec's avatar
tchervec committed
271
    print("Attach attributes to STATPOP for the second matching")
272
    initial_statpop_size = len(df_population)
Sebastian Hörl's avatar
Sebastian Hörl committed
273

274
275
    df_population = pd.merge(
        df_population, df_attributes, on="household_id"
Sebastian Hörl's avatar
Sebastian Hörl committed
276
    )
Sebastian Hörl's avatar
Sebastian Hörl committed
277

278
    assert (len(df_population) == initial_statpop_size)
Sebastian Hörl's avatar
Sebastian Hörl committed
279
    del df_attributes
Sebastian Hörl's avatar
Sebastian Hörl committed
280

tchervec's avatar
tchervec committed
281
282
    ## Now that we have added attributes
    ## Match persons
283
284
    age_selector = df_population["age"] >= c.MZ_AGE_THRESHOLD
    df_target = pd.DataFrame(df_population[age_selector])
Sebastian Hörl's avatar
Sebastian Hörl committed
285

tchervec's avatar
tchervec committed
286
287
288
289
290
    columns = ["age_class", "sex", "marital_status", "household_size_class", "municipality_type", "income_class", "number_of_cars_class", "number_of_bikes_class"]

    df_assignment, levels = parallel_statistical_matching(
        context,
        df_source, "mz_id", "person_weight",
Sebastian Hörl's avatar
Sebastian Hörl committed
291
        df_target, "person_id",
tchervec's avatar
tchervec committed
292
293
        columns,
        minimum_observations=context.config("matching_minimum_observations"))
Sebastian Hörl's avatar
Sebastian Hörl committed
294

tchervec's avatar
tchervec committed
295
296
    df_target = pd.merge(df_target, df_assignment, on="person_id")
    assert len(df_target) == len(df_assignment)
Sebastian Hörl's avatar
Sebastian Hörl committed
297

tchervec's avatar
tchervec committed
298
299
300
301
302
303
304
305
306
    context.set_info("matched_counts", {
        count: np.count_nonzero(levels >= count) for count in range(len(columns) + 1)
    })

    for count in range(len(columns) + 1):
        print("%d matched levels:" % count, np.count_nonzero(levels >= count),
              "%.2f%%" % (100 * np.count_nonzero(levels >= count) / len(df_target),))

    # Remove and track unmatchable persons
307
    initial_statpop_length = len(df_population)
Sebastian Hörl's avatar
Sebastian Hörl committed
308
    initial_target_length = len(df_target)
Sebastian Hörl's avatar
Sebastian Hörl committed
309

tchervec's avatar
tchervec committed
310
    unmatchable_person_selector = levels < 1
Sebastian Hörl's avatar
Sebastian Hörl committed
311
    umatchable_household_ids = set(df_target.loc[unmatchable_person_selector, "household_id"].values)
312
    unmatchable_member_selector = df_population["household_id"].isin(umatchable_household_ids)
Sebastian Hörl's avatar
Sebastian Hörl committed
313

314
    removed_person_ids |= set(df_population.loc[unmatchable_member_selector, "person_id"].values)
Sebastian Hörl's avatar
Sebastian Hörl committed
315
    removed_household_ids |= umatchable_household_ids
Sebastian Hörl's avatar
Sebastian Hörl committed
316
317

    df_target = df_target.loc[~unmatchable_person_selector, :]
318
    df_population = df_population.loc[~unmatchable_member_selector, :]
Sebastian Hörl's avatar
Sebastian Hörl committed
319
320

    removed_persons_count = sum(unmatchable_person_selector)
tchervec's avatar
tchervec committed
321
    removed_households_count = len(umatchable_household_ids)
Sebastian Hörl's avatar
Sebastian Hörl committed
322
323
324
    removed_members_count = sum(unmatchable_member_selector)

    print("Unmatchable persons: ", removed_persons_count)
tchervec's avatar
tchervec committed
325
    print("  Removed households: ", removed_households_count)
Sebastian Hörl's avatar
Sebastian Hörl committed
326
    print("  Removed household members: ", removed_members_count)
Sebastian Hörl's avatar
Sebastian Hörl committed
327
    print("")
Sebastian Hörl's avatar
Sebastian Hörl committed
328

tchervec's avatar
tchervec committed
329
    assert (len(df_target) == initial_target_length - removed_persons_count)
330
    assert (len(df_population) == initial_statpop_length - removed_members_count)
Sebastian Hörl's avatar
Sebastian Hörl committed
331
332
333
334

    # Extract only the matching information

    df_matching = pd.merge(
335
        df_population[["person_id", "household_id", "mz_head_id"]],
tchervec's avatar
tchervec committed
336
        df_target[["person_id", "mz_id"]],
tchervec's avatar
tchervec committed
337
        on="person_id", how="left")
Sebastian Hörl's avatar
Sebastian Hörl committed
338

tchervec's avatar
tchervec committed
339
340
    df_matching["mz_person_id"] = df_matching["mz_id"]
    del df_matching["mz_id"]
Sebastian Hörl's avatar
Sebastian Hörl committed
341

342
    assert (len(df_matching) == len(df_population))
Sebastian Hörl's avatar
Sebastian Hörl committed
343
344

    # Check that all person who don't have a MZ id now are under age
345
346
    assert (np.all(df_population[
                       df_population["person_id"].isin(
tchervec's avatar
tchervec committed
347
348
349
                           df_matching.loc[df_matching["mz_person_id"] == -1]["person_id"]
                       )
                   ]["age"] < c.MZ_AGE_THRESHOLD))
Sebastian Hörl's avatar
Sebastian Hörl committed
350

tchervec's avatar
tchervec committed
351
    assert (not np.any(df_matching["mz_head_id"] == -1))
Sebastian Hörl's avatar
Sebastian Hörl committed
352

Sebastian Hörl's avatar
Sebastian Hörl committed
353
    print("Matching is done. In total, the following observations were removed from STATPOP: ")
tchervec's avatar
tchervec committed
354
355
356
357
    print("  Households: %d (%.2f%%)" % (
    len(removed_household_ids), 100.0 * len(removed_household_ids) / number_of_statpop_households))
    print("  Persons: %d (%.2f%%)" % (
    len(removed_person_ids), 100.0 * len(removed_person_ids) / number_of_statpop_persons))
Sebastian Hörl's avatar
Sebastian Hörl committed
358

359
360
    print(np.unique(df_matching["mz_head_id"]))

Sebastian Hörl's avatar
Sebastian Hörl committed
361
    # Return
Sebastian Hörl's avatar
Sebastian Hörl committed
362
    return df_matching, removed_person_ids