-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathchecks.py
326 lines (266 loc) · 11.5 KB
/
checks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
"""
Appointment Method Checks
-------------------------
Functions to conditionally check appointment methods.
Contents:
quota_condition,
consistency_condition
"""
from math import ceil, floor
import pandas as pd
from poli_sci_kit.appointment.metrics import ideal_share
def quota_condition(shares, seats):
"""
Checks whether assignment method results fall within the range of the ideal share rounded down and up.
Notes
-----
https://en.wikipedia.org/wiki/Quota_rule
Parameters
----------
shares : list
The proportion of the population or votes for the regions or parties.
seats : list
The share of seats given to the regions or parties.
Returns
-------
check_pass or fail_report: bool or list (contains tuples)
A value of True, or a list of corresponding arguments where the check has failed and their indexes.
"""
assert len(shares) == len(
seats
), "The total different shares of a population or vote must equal that of the allocated seats."
check_list = [
ceil(ideal_share(s, sum(shares), sum(seats))) >= seats[i]
and floor(ideal_share(s, sum(shares), sum(seats))) <= seats[i]
for i, s in enumerate(shares)
]
fail_report = {
i: (shares[i], seats[i]) for i, c in enumerate(check_list) if c == False
}
check_pass = False not in check_list
print("Quota condition passed:", check_pass)
if check_pass:
return check_pass
print("Returning list of argument elements that failed the condition.")
return fail_report
def consistency_condition(df_shares=None, df_seats=None, check_type="seat_monotony"):
"""
Checks the consistency of assignment method results given dataframes of shares and allocations.
Notes
-----
Rows and columns of the df(s) will be marked and dropped if consistent, with a failed condition being if the resulting df has size > 0 (some where inconsistent).
Parameters
----------
df_shares : pd.DataFrame (num_region_party, num_variation; contains ints, default=None)
Proportions of the population or votes for the regions or parties given variance.
df_seats : pd.DataFrame (num_region_party, num_variation; contains ints, default=None)
Shares of seats given to the regions or parties given variance.
check_type : str
Whether the consistency of a change in seats or a change in shares is checked.
Options:
The style of monotony to derive the consistency with.
- seat_monotony : An increase in total seats does not decrease alloted seats
Note: use sums of cols of df_seats, checking col element monotony given a differences in sums.
- share_monotony : An increase in shares does not decrease alloted seats
Note: use rows of df_shares and check coinciding elements of df_seats for monotony.
Returns
-------
check_pass or df_fail_report: bool or pd.DataFrame (contains ints)
A value of True, or False with a df of corresponding arguments where the check has failed.
"""
if df_shares is not None and df_seats is not None:
assert (
df_shares.shape == df_seats.shape
), "The number of share variations must be equal to the number of seat allocation variations."
if check_type == "seat_monotony":
df_fail_report = df_seats.copy()
seat_sums = [df_seats[col].sum() for col in df_seats.columns]
seat_sums_sorted_indexes = [
tup[0] for tup in sorted(enumerate(seat_sums), key=lambda i: i[1])
]
# Order seat allocation columns by increasing total.
df_seats = df_seats[[df_seats.columns[i] for i in seat_sums_sorted_indexes]]
# Check that elements of each column are less than corresponding
# ones in later columns.
check_cols = [
[
df_seats.loc[:, df_seats.columns[j]]
<= df_seats.loc[:, df_seats.columns[i]]
for i in range(len(df_seats.columns))[j:]
]
for j in range(len(df_seats.columns))
]
# Return True if the column elements are always less than following
# ones, or the str of the later columns that break the condition.
# str() is used to assure that 1 != True in the later sets.
check_cols = [
[True if c[j].all() == True else str(j) for j in range(len(c))]
for c in check_cols
]
# Return True if the column's total allotment passes the condition,
# or the index of columns with which the column fails.
check_cols = [
True
if list(set(c))[0] == True and len(set(c)) == 1
else [i + int(item) for item in list(set(c)) if item != True]
for i, c in enumerate(check_cols)
]
col_range = list(range(len(df_fail_report.columns))) # list to use .pop()
cols_dropped = 0
for i in col_range:
if check_cols[i] == True:
# Drop the column, and add to an indexer to maintain lengths.
df_fail_report.drop(
df_fail_report.columns[i - cols_dropped], axis=1, inplace=True
)
cols_dropped += 1
else:
# Keep the column, and remove the indexes of any columns that
# break the condition to keep them as well.
for later_col in check_cols[i]:
col_range.pop(later_col)
if len(df_fail_report.columns) != 0:
# Find elements in a row that are greater than following elements.
check_rows = [
[
[
df_fail_report.loc[row, df_fail_report.columns[col]]
<= df_fail_report.loc[row, df_fail_report.columns[col_after]]
for col_after in range(len(df_fail_report.columns))[col:]
]
for col in range(len(df_fail_report.columns))
]
for row in df_fail_report.index
]
check_rows = [
[
True
if list(set(comparison))[0] == True and len(set(comparison)) == 1
else False
for comparison in i
]
for i in check_rows
]
check_rows = [
True if list(set(i))[0] == True and len(set(i)) == 1 else False
for i in check_rows
]
rows_dropped = 0
for i in range(len(df_fail_report.index)):
if check_rows[i] == True:
# Drop the row if no elements are greater than following ones,
# and add to an indexer to maintain lengths.
df_fail_report.drop(
df_fail_report.index[i - rows_dropped], axis=0, inplace=True
)
rows_dropped += 1
check_pass = len(df_fail_report.columns) == 0
print(
f"Consistency condition based on {check_type.split('_')[0]} monotony passed:",
check_pass,
)
if not check_pass:
print("Returning df of argument elements that failed the condition.")
return df_fail_report
else:
return check_pass
elif check_type == "share_monotony":
# The fail report df has share and seat columns alternated.
df_fail_report = pd.DataFrame()
col = 0
for i in range(len(df_shares.columns)):
df_fail_report.loc[:, col] = pd.Series(
df_shares[df_shares.columns[i]], index=df_shares.index
)
col += 1
df_fail_report.loc[:, col] = pd.Series(
df_seats[df_seats.columns[i]], index=df_seats.index
)
col += 1
# Check which share and seat columns are less than one another.
check_share_rows = [
[
[
df_shares.loc[row, df_shares.columns[col]]
<= df_shares.loc[row, df_shares.columns[other_col]]
for other_col in range(len(df_shares.columns))
]
for col in range(len(df_shares.columns))
]
for row in df_shares.index
]
check_seat_rows = [
[
[
df_seats.loc[row, df_seats.columns[col]]
<= df_seats.loc[row, df_seats.columns[other_col]]
for other_col in range(len(df_seats.columns))
]
for col in range(len(df_seats.columns))
]
for row in df_seats.index
]
# Combine the above for indexes where the condition is met and not.
check_shares_seats = [
[
[
False
if check_share_rows[i][j][k] == True
and check_seat_rows[i][j][k] != True
else True
for k in range(len(check_share_rows[0][0]))
]
for j in range(len(check_share_rows[0]))
]
for i in range(len(check_share_rows))
]
rows_kept = []
for i in range(len(df_fail_report.index)):
row_element_checker = 0
for element_check in check_shares_seats[i]:
if list(set(element_check))[0] == True and len(set(element_check)) == 1:
row_element_checker += 1
if row_element_checker == len(check_shares_seats[i]):
df_fail_report.drop(i, axis=0, inplace=True)
else:
rows_kept.append(i)
# Column indexes, indexing over pairs as share and seat columns are
# dropped together.
col_pair_range = list(range(int(len(df_fail_report.columns) / 2)))
# Indexing which columns to keep.
col_pairs_to_keep = []
for r in rows_kept:
for c in col_pair_range:
if (
list(set(check_shares_seats[r][c]))[0] != True
or len(set(check_shares_seats[r][c])) != 1
):
col_pairs_to_keep.append(c)
for later_col in range(len(check_shares_seats[r][c])):
if check_shares_seats[r][c][later_col] == False:
col_pairs_to_keep.append(later_col)
col_pairs_to_keep = list(set(col_pairs_to_keep))
# Return those columns to be dropped.
cols_to_keep = [[2 * i, 2 * i + 1] for i in col_pairs_to_keep]
cols_to_keep = [item for sublist in cols_to_keep for item in sublist]
cols_dropped = 0
for col in range(len(df_fail_report.columns)):
if col not in cols_to_keep:
df_fail_report.drop(
df_fail_report.columns[col - cols_dropped], axis=1, inplace=True
)
cols_dropped += 1
else:
ValueError(
"The 'check_type' argument myst be either seat_monotony or share_monotony"
)
check_pass = len(df_fail_report) == 0
print(
f"Consistency condition based on {check_type.split('_')[0]} monotony passed:",
check_pass,
)
if not check_pass:
print("Returning df of argument elements that failed the condition.")
return df_fail_report
else:
return check_pass