Source: source/assemble/poll.py (back to index)

  1 """Assemble the poll-level table (one row per protocol).  2    3 Inputs:  4   - pipelines/politica/build/clean/poll_2024.parquet  5         TSE mayoral poll registry, one row per protocol. Carries  6         pollster identity (institute, institute_fantasy,  7         pollster_cnpj), st_pesquisa_propria (own-poll flag), field  8         period (date_end), sample size, race uf / municipality /  9         muni_code_tse. (Built by pipelines/politica/source/clean/poll.py.) 10   - pipelines/politica/build/clean/poll_response_2024.parquet 11         Used only to restrict to protocols that have LLM-extracted 12         vote intentions (the analysis universe). Without this filter 13         poll.parquet would include ~5k registry protocols with no 14         candidate-response data attached downstream. 15   - pipelines/politica/build/clean/poll_sponsor_2024.parquet 16         Sponsor rows (one per protocol × sponsor); used for the 17         sponsor-type classifier. 18   19 Output: 20   - build/assemble/poll.parquet 21         One row per protocol (analysis universe) with sponsor 22         classification, pollster identity, race / muni metadata, 23         field period, and the derived poll_is_independent / 24         poll_has_candidate_sponsor flags. 25   26 Downstream: cand_poll.py joins this back on protocol; the SP slice 27 filters on uf == "SP". 28 """ 29 from __future__ import annotations 30   31 import re 32 import sys 33 import unicodedata 34 from pathlib import Path 35   36 import pandas as pd 37   38 HERE = Path(__file__).resolve().parent 39 PROJECT_ROOT = HERE.parent.parent 40 WORKSPACE = PROJECT_ROOT.parents[1] 41 POLITICA_CLEAN   = WORKSPACE / "pipelines" / "politica" / "build" / "clean" 42 POLL_PARQUET     = POLITICA_CLEAN / "poll_2024.parquet" 43 RESPONSE_PARQUET = POLITICA_CLEAN / "poll_response_2024.parquet" 44 SPONSOR_PARQUET  = POLITICA_CLEAN / "poll_sponsor_2024.parquet" 45 OUT = PROJECT_ROOT / "build" / "assemble" / "poll.parquet" 46   47 ELECTION_R1 = pd.Timestamp("2024-10-06") 48   49   50 def _norm(s) -> str: 51     if not isinstance(s, str): 52         return "" 53     s = unicodedata.normalize("NFKD", s).encode("ascii", "ignore").decode("ascii") 54     s = re.sub(r"[^A-Za-z0-9 ]", " ", s).upper().strip() 55     return re.sub(r"\s+", " ", s) 56   57   58 MEDIA_TOKENS = re.compile( 59     r"\b(JORNAL|JORNALISMO|JORNALISTICA|EDITORA|DIARIO|REVISTA|GAZETA|" 60     r"TRIBUNA|FOLHA|PORTAL|BLOG|RADIO|TV|REDE|SBT|GLOBO|RECORD|BAND|" 61     r"NOTICIAS|REPORTER|MIDIA|MIDIAS|MEDIA|COMUNICACAO|COMUNICACOES|TELEC)\b" 62 ) 63 POLLSTER_TOKENS = re.compile( 64     r"\b(PESQUISA|PESQUISAS|INSTITUTO|DATAFOLHA|IBOPE|QUAEST|ATLASINTEL|" 65     r"DOXA|VOX|PARANA|VERITA|ESTATISTICA|OPINIAO)\b" 66 ) 67 PARTY_TOKENS = re.compile(r"\bPARTIDO\b") 68   69 # CNAE-based MEDIA / POLLSTER signals — high-precision complement to the 70 # name-regex above. The AN-094 top-25 audit found a 36 % false-negative 71 # rate on the name-regex when applied to high-frequency other_firm 72 # sponsors; the CNAE side recovers ~5 of the 9 known misses (Imprensa 73 # 24h, CN7, O Popular, ac24horas, Fonte 83) — every entity whose CNPJ 74 # registers a journalism/broadcast/portal/news activity in the May-2025 75 # RFB snapshot. The four AN-094 misses CNAE cannot recover (O Imparcial, 76 # Programa do Rubão, Canal TCM, 180graus/CTH) have either no CNAE 77 # registered or non-journalism CNAEs declared — those still need the 78 # name regex or hand-curation. 79 MEDIA_CNAES = frozenset({ 80     "1811301",  # Impressão de jornais 81     "1811302",  # Impressão de livros, revistas e outras periódicas 82     "5812300",  # Edição de jornais (sem distinção) 83     "5812301",  # Edição de jornais diários 84     "5812302",  # Edição de jornais não diários 85     "5813100",  # Edição de revistas 86     "5822100",  # Edição integrada à impressão de jornais 87     "5822101",  # Edição integrada à impressão de jornais diários 88     "5822102",  # Edição integrada à impressão de jornais não diários 89     "5823900",  # Edição integrada à impressão de revistas 90     "6010100",  # Atividades de rádio 91     "6021700",  # Atividades de televisão aberta 92     "6022502",  # TV por assinatura, exceto programadoras 93     "6110803",  # Serviços de comunicação multimídia - SCM 94     "6141800",  # Operadoras de TV por assinatura, por cabo 95     "6142600",  # Operadoras de TV por assinatura, por microondas 96     "6143400",  # Operadoras de TV por assinatura, por satélite 97     "6319400",  # Portais, provedores de conteúdo e outros serviços de informação 98     "6391700",  # Agências de notícias 99     "9002701",  # Jornalistas independentes100 })101 POLLSTER_CNAES = frozenset({102     "7320300",  # Pesquisas de mercado e de opinião pública103 })104  105 # Razão-social patterns that VETO a CNAE-based MEDIA classification.106 # AN-094 found many shells declare journalism CNAEs they don't actually107 # do (the FacUnicamps "Construção de edifícios" + "Apart-hotéis" mix,108 # the VS PUBLICIDADE / TRES MARIAS EMPREENDIMENTOS + "Edição de jornais"109 # combo, ASSOCIACAO DE MARKETING + "Administração pública em geral",110 # etc.). When the firm's registered name is "PUBLICIDADE", "MARKETING",111 # "EMPREENDIMENTOS", or similar non-editorial corporate descriptor,112 # the journalism CNAE is treated as performative and the firm stays in113 # other_firm. None of the AN-094 real-media razões sociais match this114 # veto, so the false-negative risk is bounded.115 SHELL_NAME_VETO = re.compile(116     r"\b(PUBLICIDADE|PROPAGANDA|MARKETING|EMPREENDIMENTOS|EMPREENDIMENTO|"117     r"ASSOCIACAO|ASSESSORIA|CONSULTORIA|ADMINISTRACAO)\b"118 )119  120 INDEPENDENT_TYPES = {"media", "pollster_self"}121 CANDIDATE_LINKED_TYPES = {"committee_prefeito", "committee_vice",122                           "party", "party_name", "individual"}123  124  125 CNPJ_DIR = WORKSPACE / "pipelines" / "cnpj" / "build" / "clean"126 CNPJ_SNAPSHOT = "202505"127  128  129 # Minimum capital social (R$) required for CNAE-based MEDIA upgrade.130 # Real media outlets in the AN-094 audit have capital ≥ R$20k; three131 # of the residual shell false-positives have capital ≤ R$5.5k (the132 # MEI-style individual-name entities). This threshold prunes those133 # without dropping any real media we currently catch.134 MIN_CAPITAL_FOR_MEDIA = 10000.0135  136  137 def load_cnpj_lookups(sponsor_cnpjs: set[str]) -> tuple[138     dict[str, frozenset[str]], dict[str, float],139 ]:140     """Build (sponsor_cnpj → CNAEs) and (cnpj_base → capital_social).141  142     Restricts both to the input CNPJ universe so we don't load the143     full 62 M-row tables. Returns empty dicts (with printed warning)144     if the May-2025 snapshot is unavailable — the classifier then145     falls back to the legacy name regex only.146     """147     link_path = CNPJ_DIR / f"cnpj_cnae_{CNPJ_SNAPSHOT}.parquet"148     reg_path = CNPJ_DIR / f"cnpj_{CNPJ_SNAPSHOT}.parquet"149     cnae_lookup: dict[str, frozenset[str]] = {}150     cap_lookup: dict[str, float] = {}151  152     if link_path.exists():153         link = pd.read_parquet(link_path, columns=["cnpj", "cnae"])154         link = link[link["cnpj"].isin(sponsor_cnpjs)].copy()155         link["cnae"] = link["cnae"].astype(str)156         cnae_lookup = link.groupby("cnpj")["cnae"].apply(frozenset).to_dict()157     else:158         print(f"  CNAE link snapshot {link_path} not found — "159               f"CNAE-side classification disabled.")160  161     if reg_path.exists():162         bases = {c[:8] for c in sponsor_cnpjs}163         reg = pd.read_parquet(reg_path, columns=["cnpj_base", "capital_social"])164         reg = reg[reg["cnpj_base"].isin(bases)].copy()165         cap_lookup = dict(zip(reg["cnpj_base"], reg["capital_social"].astype(float)))166     else:167         print(f"  CNPJ registry {reg_path} not found — "168               f"capital-social threshold disabled.")169     return cnae_lookup, cap_lookup170  171  172 def classify_sponsor_row(173     row,174     cnae_lookup: dict[str, frozenset[str]],175     cap_lookup: dict[str, float],176 ) -> str:177     if row.get("id_type") == "CPF":178         return "individual"179     route = row.get("sponsor_route")180     if route == "committee":181         return ("committee_vice" if row.get("committee_office") == "VICEPREFEITO"182                 else "committee_prefeito")183     if route in ("party", "party_name"):184         return route185     name = _norm(row.get("sponsor_name", ""))186     pollster_cnpj = str(row.get("pollster_cnpj", "") or "")187     sponsor_cnpj = str(row.get("sponsor_id", "") or "")188     if sponsor_cnpj and pollster_cnpj and sponsor_cnpj == pollster_cnpj:189         return "pollster_self"190     # CNAE-side signal first (high precision), then name-regex fallback.191     # The shell-name veto prevents firms with non-editorial corporate192     # descriptors (PUBLICIDADE / MARKETING / EMPREENDIMENTOS / etc.)193     # from being upgraded to media on the basis of a declared-but-not-194     # practiced journalism CNAE — see AN-094 for the audit that motivated195     # this filter. The capital-social floor further prunes MEI-style196     # individual entities that declare journalism CNAEs.197     cnaes = cnae_lookup.get(sponsor_cnpj, frozenset())198     vetoed = bool(SHELL_NAME_VETO.search(name))199     capital = cap_lookup.get(sponsor_cnpj[:8], 0.0)200     if (cnaes & MEDIA_CNAES201             and not vetoed202             and capital >= MIN_CAPITAL_FOR_MEDIA):203         return "media"204     if cnaes & POLLSTER_CNAES:205         return "pollster_other"206     if PARTY_TOKENS.search(name):207         return "party"208     if MEDIA_TOKENS.search(name):209         return "media"210     if POLLSTER_TOKENS.search(name):211         return "pollster_other"212     return "other_firm"213  214  215 def build_poll_class_table(sponsor: pd.DataFrame) -> pd.DataFrame:216     sponsor = sponsor.copy()217     cnpjs = set(sponsor["sponsor_id"].dropna().astype(str).unique())218     print(f"  Loading CNPJ lookups for {len(cnpjs):,} distinct sponsor CNPJs…")219     cnae_lookup, cap_lookup = load_cnpj_lookups(cnpjs)220     print(f"  CNAEs found for {len(cnae_lookup):,} of {len(cnpjs):,} CNPJs; "221           f"capital social for {len(cap_lookup):,} CNPJ bases.")222     sponsor["sponsor_type"] = sponsor.apply(223         lambda r: classify_sponsor_row(r, cnae_lookup, cap_lookup), axis=1,224     )225     grouped = sponsor.groupby("protocol").agg(226         sponsor_types=("sponsor_type", lambda x: ";".join(sorted(set(x)))),227     ).reset_index()228  229     def _is_indep(types_str: str) -> int:230         types = set(types_str.split(";"))231         return int(bool(types) and types.issubset(INDEPENDENT_TYPES)232                    and bool(types & INDEPENDENT_TYPES))233  234     def _has_cand(types_str: str) -> int:235         types = set(types_str.split(";"))236         return int(bool(types & CANDIDATE_LINKED_TYPES))237  238     grouped["poll_is_independent"] = grouped["sponsor_types"].map(_is_indep)239     grouped["poll_has_candidate_sponsor"] = grouped["sponsor_types"].map(_has_cand)240     return grouped241  242  243 def main() -> int:244     for p in (POLL_PARQUET, RESPONSE_PARQUET, SPONSOR_PARQUET):245         if not p.exists():246             sys.exit(f"Missing {p}.")247  248     print(f"Loading poll_2024.parquet ({POLL_PARQUET})…")249     poll = pd.read_parquet(POLL_PARQUET, columns=[250         "protocol", "uf", "muni_code_tse", "municipality",251         "institute", "institute_fantasy",252         "pollster_cnpj", "st_pesquisa_propria",253         "sample_size", "date_end",254     ])255     # institute = NM_EMPRESA = canonical pollster name (downstream256     # consumers reference both columns; keep them as aliases).257     poll["pollster_name"] = poll["institute"]258     print(f"  registry protocols: {len(poll):,}")259  260     # Restrict to the LLM-extracted universe (the analysis sample).261     # Registry has ~14.9k mayoral protocols; LLM extractions cover ~9.5k262     # of them.  Without this filter ~5k registry-only rows would have no263     # downstream candidate-response data and would be dead weight.264     extracted_protos = set(265         pd.read_parquet(RESPONSE_PARQUET, columns=["protocol"])["protocol"]266     )267     poll = poll[poll["protocol"].isin(extracted_protos)].copy()268     print(f"  after restricting to LLM-extracted universe: {len(poll):,}")269  270     print(f"Loading sponsor parquet ({SPONSOR_PARQUET})…")271     sponsor = pd.read_parquet(SPONSOR_PARQUET)272     poll_class = build_poll_class_table(sponsor)273     print(f"  poll_class rows: {len(poll_class):,}; "274           f"independent: {poll_class['poll_is_independent'].sum():,}; "275           f"has-candidate-sponsor: {poll_class['poll_has_candidate_sponsor'].sum():,}")276  277     out = poll.merge(278         poll_class[["protocol", "sponsor_types",279                     "poll_is_independent", "poll_has_candidate_sponsor"]],280         on="protocol", how="left",281     )282     out["poll_is_independent"] = out["poll_is_independent"].fillna(0).astype(int)283     out["poll_has_candidate_sponsor"] = (284         out["poll_has_candidate_sponsor"].fillna(0).astype(int))285  286     # Derived: field_end (datetime), days_to_election, field_period_week,287     # and muni_id (string form of muni_code_tse).288     out["field_end"] = pd.to_datetime(out["date_end"], errors="coerce")289     out["days_to_election"] = (ELECTION_R1 - out["field_end"]).dt.days290     out["field_period_week"] = out["field_end"].dt.strftime("%Y-W%U")291     out["muni_id"] = out["muni_code_tse"].astype(str)292  293     out = out[[294         "protocol", "uf", "muni_id", "municipality",295         "institute", "institute_fantasy",296         "pollster_cnpj", "pollster_name", "st_pesquisa_propria",297         "sample_size", "field_end", "field_period_week", "days_to_election",298         "sponsor_types", "poll_is_independent", "poll_has_candidate_sponsor",299     ]]300  301     OUT.parent.mkdir(parents=True, exist_ok=True)302     out.to_parquet(OUT, index=False)303     print(f"\nWrote {OUT}")304     print(f"Rows: {len(out):,}; UFs: {out['uf'].nunique()}; "305           f"munis: {out['muni_id'].nunique():,}")306     print(f"Independent polls: {(out['poll_is_independent']==1).sum():,}; "307           f"Candidate-sponsored polls: "308           f"{(out['poll_has_candidate_sponsor']==1).sum():,}")309     print(f"st_pesquisa_propria=='S': "310           f"{(out['st_pesquisa_propria']=='S').sum():,}")311     return 0312  313  314 if __name__ == "__main__":315     raise SystemExit(main())316