Source: source/assemble/cand_poll.py (back to index)

  1 """Assemble the candidate × poll table (one row per (protocol, politico_id, scenario_label)).  2    3 This is the regression-ready table. Joins poll-level metadata from  4 build/assemble/poll.parquet back onto each (cand, poll, scenario) row  5 so downstream regressions can read this file standalone.  6    7 Inputs:  8   - pipelines/politica/build/clean/poll_response_2024.parquet  9         long: one row per (protocol, scenario, candidate) with 10         registry metadata + cand_politico_id / cand_cpf / cand_party / 11         cand_votes / match_score / match_method already joined. 12   - pipelines/politica/build/clean/poll_sponsor_2024.parquet 13         sponsor with Routes A+B+C+D candidate links. 14   - pipelines/politica/build/clean/candidato.csv 15         2024 PREFEITO 1st round panel — needed for the within-muni 16         vote-share denominator and the race-structure features. 17   - build/assemble/poll.parquet 18         poll-level metadata join (sponsor_types, pollster fields, 19         independence flags, field period). 20   21 Output: 22   - build/assemble/cand_poll.parquet 23         Candidate-poll table for the regressions. Sample: estimulado 24         scenarios, match_score >= 2, non-aggregate candidate names. 25 """ 26 from __future__ import annotations 27   28 import re 29 import sys 30 import unicodedata 31 from pathlib import Path 32   33 import pandas as pd 34   35 HERE = Path(__file__).resolve().parent 36 PROJECT_ROOT = HERE.parent.parent 37 WORKSPACE = PROJECT_ROOT.parents[1] 38 POLITICA_BUILD = WORKSPACE / "pipelines" / "politica" / "build" 39 POLL_PARQUET = POLITICA_BUILD / "clean" / "poll_response_2024.parquet" 40 SPONSOR_CAND = POLITICA_BUILD / "clean" / "poll_sponsor_2024.parquet" 41 CANDIDATO_CSV = POLITICA_BUILD / "clean" / "candidato.csv" 42 POLL_ASSEMBLE = PROJECT_ROOT / "build" / "assemble" / "poll.parquet" 43 CAND_ASSEMBLE = PROJECT_ROOT / "build" / "assemble" / "cand.parquet" 44 OUT = PROJECT_ROOT / "build" / "assemble" / "cand_poll.parquet" 45   46   47 AGGREGATE_RE = re.compile( 48     r"BRANC|NULO|NENHUM|NAO SAB|NAO RESPOND|NAO OPIN|INDECISO|" 49     r"OUTROS|REJEITA|NSNR|NS NR|NAO SEI|" 50     r"NAO ATINGIRAM|PODERIA VOTAR|TODOS OS|EM BRANC|" 51     r"ANULARIA|NAO VOTARA|SEM RESPOSTA|NAO DECIDI|NAO QUIS|" 52     r"NINGUEM|CANDIDATO [A-Z]$|HIPOTETICO|NAO INFORMADO|" 53     # Patterns surfaced by the 2026-06-14 unmatched-candidate 54     # diagnostic — leftover pseudo-aggregates the LLM extraction 55     # treated as candidate names. These run against the 56     # _norm()-ed (uppercased, punctuation→space) form. 57     # Ideally the upstream cleaner catches these; until then, drop 58     # here so they don't pollute matched_share. 59     r"ANULOU|VOTARIA EM NENHUM|CERTEZA VOTARIA NESTE|" 60     r"COMPARECE|NAO COMPARE|NAO PRETENDE|NAO IRIA|NAO IRA VOTAR|" 61     r"NAO VAI VOTAR|NAO VOU VOTAR|NAO PRETENDO VOTAR|" 62     r"NAO LEMBRA|NAO LEMBR|NAO CONHEC|NAO SOUBE|" 63     r"^NR$|^NS$|^NO$|^NS NO$|"           # "NS/NO" → norm "NS NO" 64     r"^N S N [ROP]|^N [ROPS]\b|"          # "N.S/N.R", "N.S/N.O" 65     r"N SABE|N OPIN|" 66     r"^OUTRO$|^OUTRA$|OUTRO [SN]OME|" 67     r"SOMENTE|^NULL$|^VAZIO$", 68     re.IGNORECASE, 69 ) 70   71   72 def _norm(s) -> str: 73     if not isinstance(s, str): 74         return "" 75     s = unicodedata.normalize("NFKD", s).encode("ascii", "ignore").decode("ascii") 76     s = re.sub(r"[^A-Za-z0-9 ]", " ", s).upper().strip() 77     return re.sub(r"\s+", " ", s) 78   79   80 def load_muni_vote_totals() -> pd.DataFrame: 81     """Sum of valid mayoral votes within muni (2024 1st round). Used as 82     the denominator for `final_share = cand_votes / muni_valid_votes`.""" 83     c = pd.read_csv( 84         CANDIDATO_CSV, dtype=str, 85         usecols=["year", "office", "round", "municipio_id", "votes"], 86     ) 87     c = c[(c["year"] == "2024.0") & (c["office"] == "PREFEITO") 88           & (c["round"] == "1.0")].copy() 89     c["votes"] = pd.to_numeric(c["votes"], errors="coerce").fillna(0) 90     c["muni_id"] = c["municipio_id"].str.replace(r"\.0$", "", regex=True) 91     return c.groupby("muni_id")["votes"].sum().rename("muni_valid_votes").reset_index() 92   93   94 def load_race_structure() -> pd.DataFrame: 95     """Per (muni × candidate) structure: final rank, race size, race 96     margin. Computed from 2024 PREFEITO 1st-round candidato.csv.""" 97     c = pd.read_csv( 98         CANDIDATO_CSV, dtype=str, 99         usecols=["year", "office", "round", "municipio_id",100                  "politico_id", "votes"],101     )102     c = c[(c["year"] == "2024.0") & (c["office"] == "PREFEITO")103           & (c["round"] == "1.0")].copy()104     c["votes_n"] = pd.to_numeric(c["votes"], errors="coerce").fillna(0)105     c["muni_id"] = c["municipio_id"].str.replace(r"\.0$", "", regex=True)106     c = c.sort_values(["muni_id", "votes_n"], ascending=[True, False])107     c["final_rank"] = c.groupby("muni_id").cumcount() + 1108     c["n_candidates_in_race"] = c.groupby("muni_id")["politico_id"].transform("count")109     muni_total = c.groupby("muni_id")["votes_n"].transform("sum")110     c["final_share_local"] = (c["votes_n"] / muni_total.where(muni_total > 0)).fillna(0)111     top1 = c[c["final_rank"] == 1].set_index("muni_id")["final_share_local"]112     top2 = c[c["final_rank"] == 2].set_index("muni_id")["final_share_local"]113     margin = (top1 - top2.reindex(top1.index, fill_value=0)).rename("race_margin")114     c = c.merge(margin.reset_index(), on="muni_id", how="left")115     return c[[116         "politico_id", "muni_id",117         "final_rank", "n_candidates_in_race", "race_margin",118     ]]119  120  121 def load_muni_aptos_2020() -> pd.DataFrame:122     """2020 muni registered-voter counts (aptos), as a proxy for 2024.123     Returns aptos_proxy + runoff_eligible (binary, >=200k voters)."""124     e = pd.read_csv(125         CANDIDATO_CSV.parent / "eleicao.csv",126         dtype=str,127         usecols=["year", "office", "round", "municipio_id", "aptos"],128     )129     e = e[(e["year"] == "2020") & (e["office"] == "PREFEITO")130           & (e["round"] == "1")].copy()131     e["aptos_n"] = pd.to_numeric(e["aptos"], errors="coerce")132     e["muni_id"] = e["municipio_id"].str.replace(r"\.0$", "", regex=True)133     out = e.groupby("muni_id")["aptos_n"].max().rename("aptos_proxy").reset_index()134     out["runoff_eligible"] = (out["aptos_proxy"] >= 200_000).astype(int)135     return out136  137  138 def load_sponsor_links(sponsor: pd.DataFrame) -> pd.DataFrame:139     """(protocol, politico_id, route_used) — one row per sponsor link."""140     s = sponsor[sponsor["sponsor_candidate_politico_id"].notna()][[141         "protocol", "sponsor_candidate_politico_id", "sponsor_route",142     ]].rename(columns={143         "sponsor_candidate_politico_id": "politico_id",144         "sponsor_route": "route_used",145     }).drop_duplicates(["protocol", "politico_id"])146     return s147  148  149 def main() -> int:150     for p in (POLL_PARQUET, SPONSOR_CAND, POLL_ASSEMBLE, CAND_ASSEMBLE):151         if not p.exists():152             sys.exit(f"Missing {p}.")153  154     print(f"Loading poll_response_2024.parquet ({POLL_PARQUET})…")155     poll = pd.read_parquet(POLL_PARQUET)156     print(f"  total rows: {len(poll):,}; protocols: {poll['protocol'].nunique():,}")157  158     poll = poll[poll["scenario_type"] == "estimulado"].copy()159     print(f"  estimulado: {len(poll):,}")160     is_agg = poll["candidate_name"].fillna("").map(_norm).str.contains(161         AGGREGATE_RE, regex=True, na=False162     )163     poll = poll[~is_agg].copy()164     print(f"  after dropping aggregate names: {len(poll):,}")165  166     # Per (protocol, scenario_label), compute the share of poll-percent167     # accounted for by candidates that confidently matched a TSE168     # registration (match_score >= 2). This is the measure of how169     # sound the downstream renormalization is on a given poll —170     # matched_share == 1.0 means every non-aggregate candidate matched,171     # so the renormalization is a no-op and poll_percent stays on the172     # raw scale; matched_share < 1.0 inflates the matched candidates'173     # poll_percent. Computed BEFORE the match_score filter so the174     # denominator (sum_total) includes the unmatched rows.175     poll["matched"] = (poll["match_score"].fillna(0).astype(int) >= 2).astype(int)176     sum_total = poll.groupby(177         ["protocol", "scenario_label"])["percent"].transform("sum")178     matched_pct = poll["percent"] * poll["matched"]179     sum_matched = matched_pct.groupby(180         [poll["protocol"], poll["scenario_label"]]).transform("sum")181     poll["matched_share"] = (182         sum_matched / sum_total.where(sum_total > 0)).fillna(0)183     poll = poll.drop(columns="matched")184  185     poll = poll[poll["match_score"].fillna(0).astype(int) >= 2].copy()186     print(f"  after match_score >= 2: {len(poll):,}; "187           f"candidates: {poll['cand_politico_id'].nunique():,}")188     print(f"  matched_share distribution: "189           f"100% on {(poll.drop_duplicates(['protocol','scenario_label'])['matched_share']==1.0).sum():,} "190           f"of {poll[['protocol','scenario_label']].drop_duplicates().shape[0]:,} polls × scenarios")191  192     # Canonical-scenario pick. A relatório PDF often has multiple193     # estimulado scenarios ("what if X drops out", "with the four194     # frontrunners", ...) and the analysis uses only the main one.195     # Keep the scenario with the most distinct candidates per protocol;196     # tie-break alphabetically on scenario_label so the choice is197     # deterministic across builds. After this filter, scenario_label198     # is 1:1 with protocol — the cand_poll grain becomes199     # (protocol, politico_id) cleanly.200     scen_cands = (poll.groupby(["protocol", "scenario_label"])201                        ["cand_politico_id"].nunique()202                        .reset_index(name="n_cands"))203     scen_cands = scen_cands.sort_values(204         ["protocol", "n_cands", "scenario_label"],205         ascending=[True, False, True])206     canonical = scen_cands.drop_duplicates("protocol", keep="first")207     poll = poll.merge(canonical[["protocol", "scenario_label"]],208                       on=["protocol", "scenario_label"], how="inner")209     print(f"  after canonical-scenario pick: {len(poll):,}; "210           f"protocols: {poll['protocol'].nunique():,}")211  212     sponsor = pd.read_parquet(SPONSOR_CAND)213     sponsor_links = load_sponsor_links(sponsor)214     print(f"  sponsor→candidate links: {len(sponsor_links):,}")215  216     muni_totals = load_muni_vote_totals()217     race_struct = load_race_structure()218     muni_size = load_muni_aptos_2020()219     cand_assemble = pd.read_parquet(CAND_ASSEMBLE)220  221     poll["muni_id"] = poll["muni_code"].astype("Int64").astype(str)222     poll["cand_votes_num"] = pd.to_numeric(poll["cand_votes"], errors="coerce").fillna(0)223     poll = poll.merge(muni_totals, on="muni_id", how="left")224     poll["final_share"] = (poll["cand_votes_num"]225                            / poll["muni_valid_votes"].where(poll["muni_valid_votes"] > 0)).fillna(0)226  227     # Renormalize poll_percent within (protocol × scenario_label) so the228     # denominator matches final_share (candidate / valid).229     grp = poll.groupby(["protocol", "scenario_label"])["percent"]230     poll["scenario_cand_sum"] = grp.transform("sum")231     poll["poll_percent"] = (232         100 * poll["percent"]233         / poll["scenario_cand_sum"].where(poll["scenario_cand_sum"] > 0)234     ).fillna(0)235     poll["poll_percent_raw"] = poll["percent"]236     poll["error"] = poll["poll_percent"] - 100 * poll["final_share"]237  238     sponsor_set = set(zip(sponsor_links["protocol"], sponsor_links["politico_id"]))239     poll["sponsored_by"] = poll.apply(240         lambda r: int((r["protocol"], r["cand_politico_id"]) in sponsor_set), axis=1241     )242     sponsors_by_protocol = sponsor_links.groupby(243         "protocol"244     )["politico_id"].apply(set).to_dict()245     def _opp(row):246         s = sponsors_by_protocol.get(row["protocol"], set())247         return int(any(p != row["cand_politico_id"] for p in s))248     poll["opponent_sponsored"] = poll.apply(_opp, axis=1)249  250     # Heterogeneity inputs251     poll = poll.merge(252         race_struct,253         left_on=["cand_politico_id", "muni_id"],254         right_on=["politico_id", "muni_id"],255         how="left",256     ).drop(columns="politico_id")257     poll = poll.merge(muni_size, on="muni_id", how="left")258     poll["runoff_eligible"] = poll["runoff_eligible"].fillna(0).astype(int)259     # Candidate-level columns (experience + base profile) come from the260     # canonical cand.parquet rather than recomputed inline. Identity cols261     # already on `poll` (cand_politico_id, party from poll registration)262     # are kept; we pull experience + base profile from cand.263     cand_cols = ["politico_id", "prior_prefeito_runs", "prior_prefeito_wins",264                  "first_time_prefeito", "incumbent_proxy",265                  "base_source", "n_secoes_with_vote", "n_secoes_in_muni",266                  "base_gini", "base_top_decile_share", "base_lv_size_weighted"]267     poll = poll.merge(268         cand_assemble[cand_cols].rename(columns={"politico_id": "cand_politico_id"}),269         on="cand_politico_id", how="left",270     )271     poll["prior_prefeito_runs"] = poll["prior_prefeito_runs"].fillna(0).astype(int)272     poll["prior_prefeito_wins"] = poll["prior_prefeito_wins"].fillna(0).astype(int)273     poll["first_time_prefeito"] = poll["first_time_prefeito"].fillna(1).astype(int)274     poll["incumbent_proxy"] = poll["incumbent_proxy"].fillna(0).astype(int)275     poll["base_source"] = poll["base_source"].fillna("unavailable")276  277     # Route used (for sponsored=1 rows)278     route_by = sponsor_links.drop_duplicates(["protocol", "politico_id"])279     poll = poll.merge(280         route_by,281         left_on=["protocol", "cand_politico_id"],282         right_on=["protocol", "politico_id"],283         how="left",284     ).drop(columns="politico_id")285     poll.loc[poll["sponsored_by"] == 0, "route_used"] = pd.NA286  287     # Join poll-level metadata from build/assemble/poll.parquet so288     # downstream regression code reads one parquet, not two.289     # 2026-06-17: pollster_cnpj and st_pesquisa_propria are now carried290     # by upstream poll_response_2024.parquet (added by pipelines/politica291     # source/clean/poll.py), so we no longer re-pull them here. The292     # assemble-only columns are sponsor_types + the independence flags +293     # pollster_name + the derived field-period columns.294     print(f"Joining poll.parquet ({POLL_ASSEMBLE})…")295     poll_meta = pd.read_parquet(POLL_ASSEMBLE)296     cand_poll = poll.merge(297         poll_meta[[298             "protocol", "sponsor_types",299             "poll_is_independent", "poll_has_candidate_sponsor",300             "pollster_name",301             "field_end", "field_period_week", "days_to_election",302         ]],303         on="protocol", how="left",304     )305     cand_poll["poll_is_independent"] = cand_poll["poll_is_independent"].fillna(0).astype(int)306     cand_poll["poll_has_candidate_sponsor"] = (307         cand_poll["poll_has_candidate_sponsor"].fillna(0).astype(int))308  309     # Final dedup on (protocol, cand_politico_id) — keep the row with310     # the highest match_score, then the first remaining. Even after the311     # canonical-scenario filter, a few protocols can have the same312     # politico_id appearing twice (e.g. the LLM extraction listed the313     # candidate's name twice in one scenario, or the upstream match314     # produced two rows). The (protocol, politico_id) grain is the315     # contract downstream code relies on; enforce it here.316     n_before = len(cand_poll)317     cand_poll = cand_poll.sort_values(318         ["protocol", "cand_politico_id", "match_score"],319         ascending=[True, True, False])320     cand_poll = cand_poll.drop_duplicates(321         ["protocol", "cand_politico_id"], keep="first")322     print(f"Final dedup (protocol, politico_id): "323           f"{n_before:,} → {len(cand_poll):,} rows")324  325     # Column order: identifiers first, then sample/race membership,326     # then outcome / treatment, then everything else. Downstream code327     # that does df.head() or df.columns[:3] should land on the keys.328     out = cand_poll[[329         # Identifiers (the unique key)330         "protocol", "cand_politico_id",331         # Sample / race membership332         "uf", "muni_id", "municipality", "scenario_label",333         # Candidate identity334         "cand_politico", "cand_party",335         # Outcome / treatment336         "poll_percent", "poll_percent_raw", "final_share", "error",337         "matched_share",338         "sponsored_by", "opponent_sponsored", "route_used",339         # Poll-level metadata (constant per protocol)340         "poll_is_independent", "poll_has_candidate_sponsor", "sponsor_types",341         "institute", "institute_fantasy",342         "pollster_cnpj", "pollster_name", "st_pesquisa_propria",343         "sample_size", "field_end", "field_period_week", "days_to_election",344         # Match quality (from upstream)345         "n_match_candidates", "match_score", "match_method",346         # Race structure347         "final_rank", "n_candidates_in_race", "race_margin",348         "aptos_proxy", "runoff_eligible",349         # Candidate experience / base profile350         "prior_prefeito_runs", "prior_prefeito_wins",351         "first_time_prefeito", "incumbent_proxy",352         "base_source", "n_secoes_with_vote", "n_secoes_in_muni",353         "base_gini", "base_top_decile_share", "base_lv_size_weighted",354     ]].copy().rename(columns={355         "cand_politico_id": "politico_id",356         "cand_politico": "cand_name",357     })358  359     # Grain assertion — the contract every reader can rely on.360     assert not out.duplicated(["protocol", "politico_id"]).any(), \361         "cand_poll.parquet grain broken: (protocol, politico_id) has duplicates"362  363     OUT.parent.mkdir(parents=True, exist_ok=True)364     out.to_parquet(OUT, index=False)365     print(f"\nWrote {OUT}")366     print(f"Rows: {len(out):,}; candidates: {out['politico_id'].nunique():,}; "367           f"polls: {out['protocol'].nunique():,}; races: {out['muni_id'].nunique():,}")368     print(f"sponsored_by==1: {(out['sponsored_by']==1).sum():,}; "369           f"poll_is_independent==1: {(out['poll_is_independent']==1).sum():,}; "370           f"SP rows: {(out['uf']=='SP').sum():,}")371     return 0372  373  374 if __name__ == "__main__":375     raise SystemExit(main())376