Source: source/assemble/cand_poll.py (back to index)
1 """Assemble the candidate × poll table (one row per (protocol, politico_id, scenario_label)). 2 3 This is the regression-ready table. Joins poll-level metadata from 4 build/assemble/poll.parquet back onto each (cand, poll, scenario) row 5 so downstream regressions can read this file standalone. 6 7 Inputs: 8 - pipelines/politica/build/clean/poll_response_2024.parquet 9 long: one row per (protocol, scenario, candidate) with 10 registry metadata + cand_politico_id / cand_cpf / cand_party / 11 cand_votes / match_score / match_method already joined. 12 - pipelines/politica/build/clean/poll_sponsor_2024.parquet 13 sponsor with Routes A+B+C+D candidate links. 14 - pipelines/politica/build/clean/candidato.csv 15 2024 PREFEITO 1st round panel — needed for the within-muni 16 vote-share denominator and the race-structure features. 17 - build/assemble/poll.parquet 18 poll-level metadata join (sponsor_types, pollster fields, 19 independence flags, field period). 20 21 Output: 22 - build/assemble/cand_poll.parquet 23 Candidate-poll table for the regressions. Sample: estimulado 24 scenarios, match_score >= 2, non-aggregate candidate names. 25 """ 26 from __future__ import annotations 27 28 import re 29 import sys 30 import unicodedata 31 from pathlib import Path 32 33 import pandas as pd 34 35 HERE = Path(__file__).resolve().parent 36 PROJECT_ROOT = HERE.parent.parent 37 WORKSPACE = PROJECT_ROOT.parents[1] 38 POLITICA_BUILD = WORKSPACE / "pipelines" / "politica" / "build" 39 POLL_PARQUET = POLITICA_BUILD / "clean" / "poll_response_2024.parquet" 40 SPONSOR_CAND = POLITICA_BUILD / "clean" / "poll_sponsor_2024.parquet" 41 CANDIDATO_CSV = POLITICA_BUILD / "clean" / "candidato.csv" 42 POLL_ASSEMBLE = PROJECT_ROOT / "build" / "assemble" / "poll.parquet" 43 CAND_ASSEMBLE = PROJECT_ROOT / "build" / "assemble" / "cand.parquet" 44 OUT = PROJECT_ROOT / "build" / "assemble" / "cand_poll.parquet" 45 46 47 AGGREGATE_RE = re.compile( 48 r"BRANC|NULO|NENHUM|NAO SAB|NAO RESPOND|NAO OPIN|INDECISO|" 49 r"OUTROS|REJEITA|NSNR|NS NR|NAO SEI|" 50 r"NAO ATINGIRAM|PODERIA VOTAR|TODOS OS|EM BRANC|" 51 r"ANULARIA|NAO VOTARA|SEM RESPOSTA|NAO DECIDI|NAO QUIS|" 52 r"NINGUEM|CANDIDATO [A-Z]$|HIPOTETICO|NAO INFORMADO|" 53 # Patterns surfaced by the 2026-06-14 unmatched-candidate 54 # diagnostic — leftover pseudo-aggregates the LLM extraction 55 # treated as candidate names. These run against the 56 # _norm()-ed (uppercased, punctuation→space) form. 57 # Ideally the upstream cleaner catches these; until then, drop 58 # here so they don't pollute matched_share. 59 r"ANULOU|VOTARIA EM NENHUM|CERTEZA VOTARIA NESTE|" 60 r"COMPARECE|NAO COMPARE|NAO PRETENDE|NAO IRIA|NAO IRA VOTAR|" 61 r"NAO VAI VOTAR|NAO VOU VOTAR|NAO PRETENDO VOTAR|" 62 r"NAO LEMBRA|NAO LEMBR|NAO CONHEC|NAO SOUBE|" 63 r"^NR$|^NS$|^NO$|^NS NO$|" # "NS/NO" → norm "NS NO" 64 r"^N S N [ROP]|^N [ROPS]\b|" # "N.S/N.R", "N.S/N.O" 65 r"N SABE|N OPIN|" 66 r"^OUTRO$|^OUTRA$|OUTRO [SN]OME|" 67 r"SOMENTE|^NULL$|^VAZIO$", 68 re.IGNORECASE, 69 ) 70 71 72 def _norm(s) -> str: 73 if not isinstance(s, str): 74 return "" 75 s = unicodedata.normalize("NFKD", s).encode("ascii", "ignore").decode("ascii") 76 s = re.sub(r"[^A-Za-z0-9 ]", " ", s).upper().strip() 77 return re.sub(r"\s+", " ", s) 78 79 80 def load_muni_vote_totals() -> pd.DataFrame: 81 """Sum of valid mayoral votes within muni (2024 1st round). Used as 82 the denominator for `final_share = cand_votes / muni_valid_votes`.""" 83 c = pd.read_csv( 84 CANDIDATO_CSV, dtype=str, 85 usecols=["year", "office", "round", "municipio_id", "votes"], 86 ) 87 c = c[(c["year"] == "2024.0") & (c["office"] == "PREFEITO") 88 & (c["round"] == "1.0")].copy() 89 c["votes"] = pd.to_numeric(c["votes"], errors="coerce").fillna(0) 90 c["muni_id"] = c["municipio_id"].str.replace(r"\.0$", "", regex=True) 91 return c.groupby("muni_id")["votes"].sum().rename("muni_valid_votes").reset_index() 92 93 94 def load_race_structure() -> pd.DataFrame: 95 """Per (muni × candidate) structure: final rank, race size, race 96 margin. Computed from 2024 PREFEITO 1st-round candidato.csv.""" 97 c = pd.read_csv( 98 CANDIDATO_CSV, dtype=str, 99 usecols=["year", "office", "round", "municipio_id",100 "politico_id", "votes"],101 )102 c = c[(c["year"] == "2024.0") & (c["office"] == "PREFEITO")103 & (c["round"] == "1.0")].copy()104 c["votes_n"] = pd.to_numeric(c["votes"], errors="coerce").fillna(0)105 c["muni_id"] = c["municipio_id"].str.replace(r"\.0$", "", regex=True)106 c = c.sort_values(["muni_id", "votes_n"], ascending=[True, False])107 c["final_rank"] = c.groupby("muni_id").cumcount() + 1108 c["n_candidates_in_race"] = c.groupby("muni_id")["politico_id"].transform("count")109 muni_total = c.groupby("muni_id")["votes_n"].transform("sum")110 c["final_share_local"] = (c["votes_n"] / muni_total.where(muni_total > 0)).fillna(0)111 top1 = c[c["final_rank"] == 1].set_index("muni_id")["final_share_local"]112 top2 = c[c["final_rank"] == 2].set_index("muni_id")["final_share_local"]113 margin = (top1 - top2.reindex(top1.index, fill_value=0)).rename("race_margin")114 c = c.merge(margin.reset_index(), on="muni_id", how="left")115 return c[[116 "politico_id", "muni_id",117 "final_rank", "n_candidates_in_race", "race_margin",118 ]]119 120 121 def load_muni_aptos_2020() -> pd.DataFrame:122 """2020 muni registered-voter counts (aptos), as a proxy for 2024.123 Returns aptos_proxy + runoff_eligible (binary, >=200k voters)."""124 e = pd.read_csv(125 CANDIDATO_CSV.parent / "eleicao.csv",126 dtype=str,127 usecols=["year", "office", "round", "municipio_id", "aptos"],128 )129 e = e[(e["year"] == "2020") & (e["office"] == "PREFEITO")130 & (e["round"] == "1")].copy()131 e["aptos_n"] = pd.to_numeric(e["aptos"], errors="coerce")132 e["muni_id"] = e["municipio_id"].str.replace(r"\.0$", "", regex=True)133 out = e.groupby("muni_id")["aptos_n"].max().rename("aptos_proxy").reset_index()134 out["runoff_eligible"] = (out["aptos_proxy"] >= 200_000).astype(int)135 return out136 137 138 def load_sponsor_links(sponsor: pd.DataFrame) -> pd.DataFrame:139 """(protocol, politico_id, route_used) — one row per sponsor link."""140 s = sponsor[sponsor["sponsor_candidate_politico_id"].notna()][[141 "protocol", "sponsor_candidate_politico_id", "sponsor_route",142 ]].rename(columns={143 "sponsor_candidate_politico_id": "politico_id",144 "sponsor_route": "route_used",145 }).drop_duplicates(["protocol", "politico_id"])146 return s147 148 149 def main() -> int:150 for p in (POLL_PARQUET, SPONSOR_CAND, POLL_ASSEMBLE, CAND_ASSEMBLE):151 if not p.exists():152 sys.exit(f"Missing {p}.")153 154 print(f"Loading poll_response_2024.parquet ({POLL_PARQUET})…")155 poll = pd.read_parquet(POLL_PARQUET)156 print(f" total rows: {len(poll):,}; protocols: {poll['protocol'].nunique():,}")157 158 poll = poll[poll["scenario_type"] == "estimulado"].copy()159 print(f" estimulado: {len(poll):,}")160 is_agg = poll["candidate_name"].fillna("").map(_norm).str.contains(161 AGGREGATE_RE, regex=True, na=False162 )163 poll = poll[~is_agg].copy()164 print(f" after dropping aggregate names: {len(poll):,}")165 166 # Per (protocol, scenario_label), compute the share of poll-percent167 # accounted for by candidates that confidently matched a TSE168 # registration (match_score >= 2). This is the measure of how169 # sound the downstream renormalization is on a given poll —170 # matched_share == 1.0 means every non-aggregate candidate matched,171 # so the renormalization is a no-op and poll_percent stays on the172 # raw scale; matched_share < 1.0 inflates the matched candidates'173 # poll_percent. Computed BEFORE the match_score filter so the174 # denominator (sum_total) includes the unmatched rows.175 poll["matched"] = (poll["match_score"].fillna(0).astype(int) >= 2).astype(int)176 sum_total = poll.groupby(177 ["protocol", "scenario_label"])["percent"].transform("sum")178 matched_pct = poll["percent"] * poll["matched"]179 sum_matched = matched_pct.groupby(180 [poll["protocol"], poll["scenario_label"]]).transform("sum")181 poll["matched_share"] = (182 sum_matched / sum_total.where(sum_total > 0)).fillna(0)183 poll = poll.drop(columns="matched")184 185 poll = poll[poll["match_score"].fillna(0).astype(int) >= 2].copy()186 print(f" after match_score >= 2: {len(poll):,}; "187 f"candidates: {poll['cand_politico_id'].nunique():,}")188 print(f" matched_share distribution: "189 f"100% on {(poll.drop_duplicates(['protocol','scenario_label'])['matched_share']==1.0).sum():,} "190 f"of {poll[['protocol','scenario_label']].drop_duplicates().shape[0]:,} polls × scenarios")191 192 # Canonical-scenario pick. A relatório PDF often has multiple193 # estimulado scenarios ("what if X drops out", "with the four194 # frontrunners", ...) and the analysis uses only the main one.195 # Keep the scenario with the most distinct candidates per protocol;196 # tie-break alphabetically on scenario_label so the choice is197 # deterministic across builds. After this filter, scenario_label198 # is 1:1 with protocol — the cand_poll grain becomes199 # (protocol, politico_id) cleanly.200 scen_cands = (poll.groupby(["protocol", "scenario_label"])201 ["cand_politico_id"].nunique()202 .reset_index(name="n_cands"))203 scen_cands = scen_cands.sort_values(204 ["protocol", "n_cands", "scenario_label"],205 ascending=[True, False, True])206 canonical = scen_cands.drop_duplicates("protocol", keep="first")207 poll = poll.merge(canonical[["protocol", "scenario_label"]],208 on=["protocol", "scenario_label"], how="inner")209 print(f" after canonical-scenario pick: {len(poll):,}; "210 f"protocols: {poll['protocol'].nunique():,}")211 212 sponsor = pd.read_parquet(SPONSOR_CAND)213 sponsor_links = load_sponsor_links(sponsor)214 print(f" sponsor→candidate links: {len(sponsor_links):,}")215 216 muni_totals = load_muni_vote_totals()217 race_struct = load_race_structure()218 muni_size = load_muni_aptos_2020()219 cand_assemble = pd.read_parquet(CAND_ASSEMBLE)220 221 poll["muni_id"] = poll["muni_code"].astype("Int64").astype(str)222 poll["cand_votes_num"] = pd.to_numeric(poll["cand_votes"], errors="coerce").fillna(0)223 poll = poll.merge(muni_totals, on="muni_id", how="left")224 poll["final_share"] = (poll["cand_votes_num"]225 / poll["muni_valid_votes"].where(poll["muni_valid_votes"] > 0)).fillna(0)226 227 # Renormalize poll_percent within (protocol × scenario_label) so the228 # denominator matches final_share (candidate / valid).229 grp = poll.groupby(["protocol", "scenario_label"])["percent"]230 poll["scenario_cand_sum"] = grp.transform("sum")231 poll["poll_percent"] = (232 100 * poll["percent"]233 / poll["scenario_cand_sum"].where(poll["scenario_cand_sum"] > 0)234 ).fillna(0)235 poll["poll_percent_raw"] = poll["percent"]236 poll["error"] = poll["poll_percent"] - 100 * poll["final_share"]237 238 sponsor_set = set(zip(sponsor_links["protocol"], sponsor_links["politico_id"]))239 poll["sponsored_by"] = poll.apply(240 lambda r: int((r["protocol"], r["cand_politico_id"]) in sponsor_set), axis=1241 )242 sponsors_by_protocol = sponsor_links.groupby(243 "protocol"244 )["politico_id"].apply(set).to_dict()245 def _opp(row):246 s = sponsors_by_protocol.get(row["protocol"], set())247 return int(any(p != row["cand_politico_id"] for p in s))248 poll["opponent_sponsored"] = poll.apply(_opp, axis=1)249 250 # Heterogeneity inputs251 poll = poll.merge(252 race_struct,253 left_on=["cand_politico_id", "muni_id"],254 right_on=["politico_id", "muni_id"],255 how="left",256 ).drop(columns="politico_id")257 poll = poll.merge(muni_size, on="muni_id", how="left")258 poll["runoff_eligible"] = poll["runoff_eligible"].fillna(0).astype(int)259 # Candidate-level columns (experience + base profile) come from the260 # canonical cand.parquet rather than recomputed inline. Identity cols261 # already on `poll` (cand_politico_id, party from poll registration)262 # are kept; we pull experience + base profile from cand.263 cand_cols = ["politico_id", "prior_prefeito_runs", "prior_prefeito_wins",264 "first_time_prefeito", "incumbent_proxy",265 "base_source", "n_secoes_with_vote", "n_secoes_in_muni",266 "base_gini", "base_top_decile_share", "base_lv_size_weighted"]267 poll = poll.merge(268 cand_assemble[cand_cols].rename(columns={"politico_id": "cand_politico_id"}),269 on="cand_politico_id", how="left",270 )271 poll["prior_prefeito_runs"] = poll["prior_prefeito_runs"].fillna(0).astype(int)272 poll["prior_prefeito_wins"] = poll["prior_prefeito_wins"].fillna(0).astype(int)273 poll["first_time_prefeito"] = poll["first_time_prefeito"].fillna(1).astype(int)274 poll["incumbent_proxy"] = poll["incumbent_proxy"].fillna(0).astype(int)275 poll["base_source"] = poll["base_source"].fillna("unavailable")276 277 # Route used (for sponsored=1 rows)278 route_by = sponsor_links.drop_duplicates(["protocol", "politico_id"])279 poll = poll.merge(280 route_by,281 left_on=["protocol", "cand_politico_id"],282 right_on=["protocol", "politico_id"],283 how="left",284 ).drop(columns="politico_id")285 poll.loc[poll["sponsored_by"] == 0, "route_used"] = pd.NA286 287 # Join poll-level metadata from build/assemble/poll.parquet so288 # downstream regression code reads one parquet, not two.289 # 2026-06-17: pollster_cnpj and st_pesquisa_propria are now carried290 # by upstream poll_response_2024.parquet (added by pipelines/politica291 # source/clean/poll.py), so we no longer re-pull them here. The292 # assemble-only columns are sponsor_types + the independence flags +293 # pollster_name + the derived field-period columns.294 print(f"Joining poll.parquet ({POLL_ASSEMBLE})…")295 poll_meta = pd.read_parquet(POLL_ASSEMBLE)296 cand_poll = poll.merge(297 poll_meta[[298 "protocol", "sponsor_types",299 "poll_is_independent", "poll_has_candidate_sponsor",300 "pollster_name",301 "field_end", "field_period_week", "days_to_election",302 ]],303 on="protocol", how="left",304 )305 cand_poll["poll_is_independent"] = cand_poll["poll_is_independent"].fillna(0).astype(int)306 cand_poll["poll_has_candidate_sponsor"] = (307 cand_poll["poll_has_candidate_sponsor"].fillna(0).astype(int))308 309 # Final dedup on (protocol, cand_politico_id) — keep the row with310 # the highest match_score, then the first remaining. Even after the311 # canonical-scenario filter, a few protocols can have the same312 # politico_id appearing twice (e.g. the LLM extraction listed the313 # candidate's name twice in one scenario, or the upstream match314 # produced two rows). The (protocol, politico_id) grain is the315 # contract downstream code relies on; enforce it here.316 n_before = len(cand_poll)317 cand_poll = cand_poll.sort_values(318 ["protocol", "cand_politico_id", "match_score"],319 ascending=[True, True, False])320 cand_poll = cand_poll.drop_duplicates(321 ["protocol", "cand_politico_id"], keep="first")322 print(f"Final dedup (protocol, politico_id): "323 f"{n_before:,} → {len(cand_poll):,} rows")324 325 # Column order: identifiers first, then sample/race membership,326 # then outcome / treatment, then everything else. Downstream code327 # that does df.head() or df.columns[:3] should land on the keys.328 out = cand_poll[[329 # Identifiers (the unique key)330 "protocol", "cand_politico_id",331 # Sample / race membership332 "uf", "muni_id", "municipality", "scenario_label",333 # Candidate identity334 "cand_politico", "cand_party",335 # Outcome / treatment336 "poll_percent", "poll_percent_raw", "final_share", "error",337 "matched_share",338 "sponsored_by", "opponent_sponsored", "route_used",339 # Poll-level metadata (constant per protocol)340 "poll_is_independent", "poll_has_candidate_sponsor", "sponsor_types",341 "institute", "institute_fantasy",342 "pollster_cnpj", "pollster_name", "st_pesquisa_propria",343 "sample_size", "field_end", "field_period_week", "days_to_election",344 # Match quality (from upstream)345 "n_match_candidates", "match_score", "match_method",346 # Race structure347 "final_rank", "n_candidates_in_race", "race_margin",348 "aptos_proxy", "runoff_eligible",349 # Candidate experience / base profile350 "prior_prefeito_runs", "prior_prefeito_wins",351 "first_time_prefeito", "incumbent_proxy",352 "base_source", "n_secoes_with_vote", "n_secoes_in_muni",353 "base_gini", "base_top_decile_share", "base_lv_size_weighted",354 ]].copy().rename(columns={355 "cand_politico_id": "politico_id",356 "cand_politico": "cand_name",357 })358 359 # Grain assertion — the contract every reader can rely on.360 assert not out.duplicated(["protocol", "politico_id"]).any(), \361 "cand_poll.parquet grain broken: (protocol, politico_id) has duplicates"362 363 OUT.parent.mkdir(parents=True, exist_ok=True)364 out.to_parquet(OUT, index=False)365 print(f"\nWrote {OUT}")366 print(f"Rows: {len(out):,}; candidates: {out['politico_id'].nunique():,}; "367 f"polls: {out['protocol'].nunique():,}; races: {out['muni_id'].nunique():,}")368 print(f"sponsored_by==1: {(out['sponsored_by']==1).sum():,}; "369 f"poll_is_independent==1: {(out['poll_is_independent']==1).sum():,}; "370 f"SP rows: {(out['uf']=='SP').sum():,}")371 return 0372 373 374 if __name__ == "__main__":375 raise SystemExit(main())376