Source: source/assemble/poll.py (back to index)
1 """Assemble the poll-level table (one row per protocol). 2 3 Inputs: 4 - pipelines/politica/build/clean/poll_2024.parquet 5 TSE mayoral poll registry, one row per protocol. Carries 6 pollster identity (institute, institute_fantasy, 7 pollster_cnpj), st_pesquisa_propria (own-poll flag), field 8 period (date_end), sample size, race uf / municipality / 9 muni_code_tse. (Built by pipelines/politica/source/clean/poll.py.) 10 - pipelines/politica/build/clean/poll_response_2024.parquet 11 Used only to restrict to protocols that have LLM-extracted 12 vote intentions (the analysis universe). Without this filter 13 poll.parquet would include ~5k registry protocols with no 14 candidate-response data attached downstream. 15 - pipelines/politica/build/clean/poll_sponsor_2024.parquet 16 Sponsor rows (one per protocol × sponsor); used for the 17 sponsor-type classifier. 18 19 Output: 20 - build/assemble/poll.parquet 21 One row per protocol (analysis universe) with sponsor 22 classification, pollster identity, race / muni metadata, 23 field period, and the derived poll_is_independent / 24 poll_has_candidate_sponsor flags. 25 26 Downstream: cand_poll.py joins this back on protocol; the SP slice 27 filters on uf == "SP". 28 """ 29 from __future__ import annotations 30 31 import re 32 import sys 33 import unicodedata 34 from pathlib import Path 35 36 import pandas as pd 37 38 HERE = Path(__file__).resolve().parent 39 PROJECT_ROOT = HERE.parent.parent 40 WORKSPACE = PROJECT_ROOT.parents[1] 41 POLITICA_CLEAN = WORKSPACE / "pipelines" / "politica" / "build" / "clean" 42 POLL_PARQUET = POLITICA_CLEAN / "poll_2024.parquet" 43 RESPONSE_PARQUET = POLITICA_CLEAN / "poll_response_2024.parquet" 44 SPONSOR_PARQUET = POLITICA_CLEAN / "poll_sponsor_2024.parquet" 45 OUT = PROJECT_ROOT / "build" / "assemble" / "poll.parquet" 46 47 ELECTION_R1 = pd.Timestamp("2024-10-06") 48 49 50 def _norm(s) -> str: 51 if not isinstance(s, str): 52 return "" 53 s = unicodedata.normalize("NFKD", s).encode("ascii", "ignore").decode("ascii") 54 s = re.sub(r"[^A-Za-z0-9 ]", " ", s).upper().strip() 55 return re.sub(r"\s+", " ", s) 56 57 58 MEDIA_TOKENS = re.compile( 59 r"\b(JORNAL|JORNALISMO|JORNALISTICA|EDITORA|DIARIO|REVISTA|GAZETA|" 60 r"TRIBUNA|FOLHA|PORTAL|BLOG|RADIO|TV|REDE|SBT|GLOBO|RECORD|BAND|" 61 r"NOTICIAS|REPORTER|MIDIA|MIDIAS|MEDIA|COMUNICACAO|COMUNICACOES|TELEC)\b" 62 ) 63 POLLSTER_TOKENS = re.compile( 64 r"\b(PESQUISA|PESQUISAS|INSTITUTO|DATAFOLHA|IBOPE|QUAEST|ATLASINTEL|" 65 r"DOXA|VOX|PARANA|VERITA|ESTATISTICA|OPINIAO)\b" 66 ) 67 PARTY_TOKENS = re.compile(r"\bPARTIDO\b") 68 69 # CNAE-based MEDIA / POLLSTER signals — high-precision complement to the 70 # name-regex above. The AN-094 top-25 audit found a 36 % false-negative 71 # rate on the name-regex when applied to high-frequency other_firm 72 # sponsors; the CNAE side recovers ~5 of the 9 known misses (Imprensa 73 # 24h, CN7, O Popular, ac24horas, Fonte 83) — every entity whose CNPJ 74 # registers a journalism/broadcast/portal/news activity in the May-2025 75 # RFB snapshot. The four AN-094 misses CNAE cannot recover (O Imparcial, 76 # Programa do Rubão, Canal TCM, 180graus/CTH) have either no CNAE 77 # registered or non-journalism CNAEs declared — those still need the 78 # name regex or hand-curation. 79 MEDIA_CNAES = frozenset({ 80 "1811301", # Impressão de jornais 81 "1811302", # Impressão de livros, revistas e outras periódicas 82 "5812300", # Edição de jornais (sem distinção) 83 "5812301", # Edição de jornais diários 84 "5812302", # Edição de jornais não diários 85 "5813100", # Edição de revistas 86 "5822100", # Edição integrada à impressão de jornais 87 "5822101", # Edição integrada à impressão de jornais diários 88 "5822102", # Edição integrada à impressão de jornais não diários 89 "5823900", # Edição integrada à impressão de revistas 90 "6010100", # Atividades de rádio 91 "6021700", # Atividades de televisão aberta 92 "6022502", # TV por assinatura, exceto programadoras 93 "6110803", # Serviços de comunicação multimídia - SCM 94 "6141800", # Operadoras de TV por assinatura, por cabo 95 "6142600", # Operadoras de TV por assinatura, por microondas 96 "6143400", # Operadoras de TV por assinatura, por satélite 97 "6319400", # Portais, provedores de conteúdo e outros serviços de informação 98 "6391700", # Agências de notícias 99 "9002701", # Jornalistas independentes100 })101 POLLSTER_CNAES = frozenset({102 "7320300", # Pesquisas de mercado e de opinião pública103 })104 105 # Razão-social patterns that VETO a CNAE-based MEDIA classification.106 # AN-094 found many shells declare journalism CNAEs they don't actually107 # do (the FacUnicamps "Construção de edifícios" + "Apart-hotéis" mix,108 # the VS PUBLICIDADE / TRES MARIAS EMPREENDIMENTOS + "Edição de jornais"109 # combo, ASSOCIACAO DE MARKETING + "Administração pública em geral",110 # etc.). When the firm's registered name is "PUBLICIDADE", "MARKETING",111 # "EMPREENDIMENTOS", or similar non-editorial corporate descriptor,112 # the journalism CNAE is treated as performative and the firm stays in113 # other_firm. None of the AN-094 real-media razões sociais match this114 # veto, so the false-negative risk is bounded.115 SHELL_NAME_VETO = re.compile(116 r"\b(PUBLICIDADE|PROPAGANDA|MARKETING|EMPREENDIMENTOS|EMPREENDIMENTO|"117 r"ASSOCIACAO|ASSESSORIA|CONSULTORIA|ADMINISTRACAO)\b"118 )119 120 INDEPENDENT_TYPES = {"media", "pollster_self"}121 CANDIDATE_LINKED_TYPES = {"committee_prefeito", "committee_vice",122 "party", "party_name", "individual"}123 124 125 CNPJ_DIR = WORKSPACE / "pipelines" / "cnpj" / "build" / "clean"126 CNPJ_SNAPSHOT = "202505"127 128 129 # Minimum capital social (R$) required for CNAE-based MEDIA upgrade.130 # Real media outlets in the AN-094 audit have capital ≥ R$20k; three131 # of the residual shell false-positives have capital ≤ R$5.5k (the132 # MEI-style individual-name entities). This threshold prunes those133 # without dropping any real media we currently catch.134 MIN_CAPITAL_FOR_MEDIA = 10000.0135 136 137 def load_cnpj_lookups(sponsor_cnpjs: set[str]) -> tuple[138 dict[str, frozenset[str]], dict[str, float],139 ]:140 """Build (sponsor_cnpj → CNAEs) and (cnpj_base → capital_social).141 142 Restricts both to the input CNPJ universe so we don't load the143 full 62 M-row tables. Returns empty dicts (with printed warning)144 if the May-2025 snapshot is unavailable — the classifier then145 falls back to the legacy name regex only.146 """147 link_path = CNPJ_DIR / f"cnpj_cnae_{CNPJ_SNAPSHOT}.parquet"148 reg_path = CNPJ_DIR / f"cnpj_{CNPJ_SNAPSHOT}.parquet"149 cnae_lookup: dict[str, frozenset[str]] = {}150 cap_lookup: dict[str, float] = {}151 152 if link_path.exists():153 link = pd.read_parquet(link_path, columns=["cnpj", "cnae"])154 link = link[link["cnpj"].isin(sponsor_cnpjs)].copy()155 link["cnae"] = link["cnae"].astype(str)156 cnae_lookup = link.groupby("cnpj")["cnae"].apply(frozenset).to_dict()157 else:158 print(f" CNAE link snapshot {link_path} not found — "159 f"CNAE-side classification disabled.")160 161 if reg_path.exists():162 bases = {c[:8] for c in sponsor_cnpjs}163 reg = pd.read_parquet(reg_path, columns=["cnpj_base", "capital_social"])164 reg = reg[reg["cnpj_base"].isin(bases)].copy()165 cap_lookup = dict(zip(reg["cnpj_base"], reg["capital_social"].astype(float)))166 else:167 print(f" CNPJ registry {reg_path} not found — "168 f"capital-social threshold disabled.")169 return cnae_lookup, cap_lookup170 171 172 def classify_sponsor_row(173 row,174 cnae_lookup: dict[str, frozenset[str]],175 cap_lookup: dict[str, float],176 ) -> str:177 if row.get("id_type") == "CPF":178 return "individual"179 route = row.get("sponsor_route")180 if route == "committee":181 return ("committee_vice" if row.get("committee_office") == "VICEPREFEITO"182 else "committee_prefeito")183 if route in ("party", "party_name"):184 return route185 name = _norm(row.get("sponsor_name", ""))186 pollster_cnpj = str(row.get("pollster_cnpj", "") or "")187 sponsor_cnpj = str(row.get("sponsor_id", "") or "")188 if sponsor_cnpj and pollster_cnpj and sponsor_cnpj == pollster_cnpj:189 return "pollster_self"190 # CNAE-side signal first (high precision), then name-regex fallback.191 # The shell-name veto prevents firms with non-editorial corporate192 # descriptors (PUBLICIDADE / MARKETING / EMPREENDIMENTOS / etc.)193 # from being upgraded to media on the basis of a declared-but-not-194 # practiced journalism CNAE — see AN-094 for the audit that motivated195 # this filter. The capital-social floor further prunes MEI-style196 # individual entities that declare journalism CNAEs.197 cnaes = cnae_lookup.get(sponsor_cnpj, frozenset())198 vetoed = bool(SHELL_NAME_VETO.search(name))199 capital = cap_lookup.get(sponsor_cnpj[:8], 0.0)200 if (cnaes & MEDIA_CNAES201 and not vetoed202 and capital >= MIN_CAPITAL_FOR_MEDIA):203 return "media"204 if cnaes & POLLSTER_CNAES:205 return "pollster_other"206 if PARTY_TOKENS.search(name):207 return "party"208 if MEDIA_TOKENS.search(name):209 return "media"210 if POLLSTER_TOKENS.search(name):211 return "pollster_other"212 return "other_firm"213 214 215 def build_poll_class_table(sponsor: pd.DataFrame) -> pd.DataFrame:216 sponsor = sponsor.copy()217 cnpjs = set(sponsor["sponsor_id"].dropna().astype(str).unique())218 print(f" Loading CNPJ lookups for {len(cnpjs):,} distinct sponsor CNPJs…")219 cnae_lookup, cap_lookup = load_cnpj_lookups(cnpjs)220 print(f" CNAEs found for {len(cnae_lookup):,} of {len(cnpjs):,} CNPJs; "221 f"capital social for {len(cap_lookup):,} CNPJ bases.")222 sponsor["sponsor_type"] = sponsor.apply(223 lambda r: classify_sponsor_row(r, cnae_lookup, cap_lookup), axis=1,224 )225 grouped = sponsor.groupby("protocol").agg(226 sponsor_types=("sponsor_type", lambda x: ";".join(sorted(set(x)))),227 ).reset_index()228 229 def _is_indep(types_str: str) -> int:230 types = set(types_str.split(";"))231 return int(bool(types) and types.issubset(INDEPENDENT_TYPES)232 and bool(types & INDEPENDENT_TYPES))233 234 def _has_cand(types_str: str) -> int:235 types = set(types_str.split(";"))236 return int(bool(types & CANDIDATE_LINKED_TYPES))237 238 grouped["poll_is_independent"] = grouped["sponsor_types"].map(_is_indep)239 grouped["poll_has_candidate_sponsor"] = grouped["sponsor_types"].map(_has_cand)240 return grouped241 242 243 def main() -> int:244 for p in (POLL_PARQUET, RESPONSE_PARQUET, SPONSOR_PARQUET):245 if not p.exists():246 sys.exit(f"Missing {p}.")247 248 print(f"Loading poll_2024.parquet ({POLL_PARQUET})…")249 poll = pd.read_parquet(POLL_PARQUET, columns=[250 "protocol", "uf", "muni_code_tse", "municipality",251 "institute", "institute_fantasy",252 "pollster_cnpj", "st_pesquisa_propria",253 "sample_size", "date_end",254 ])255 # institute = NM_EMPRESA = canonical pollster name (downstream256 # consumers reference both columns; keep them as aliases).257 poll["pollster_name"] = poll["institute"]258 print(f" registry protocols: {len(poll):,}")259 260 # Restrict to the LLM-extracted universe (the analysis sample).261 # Registry has ~14.9k mayoral protocols; LLM extractions cover ~9.5k262 # of them. Without this filter ~5k registry-only rows would have no263 # downstream candidate-response data and would be dead weight.264 extracted_protos = set(265 pd.read_parquet(RESPONSE_PARQUET, columns=["protocol"])["protocol"]266 )267 poll = poll[poll["protocol"].isin(extracted_protos)].copy()268 print(f" after restricting to LLM-extracted universe: {len(poll):,}")269 270 print(f"Loading sponsor parquet ({SPONSOR_PARQUET})…")271 sponsor = pd.read_parquet(SPONSOR_PARQUET)272 poll_class = build_poll_class_table(sponsor)273 print(f" poll_class rows: {len(poll_class):,}; "274 f"independent: {poll_class['poll_is_independent'].sum():,}; "275 f"has-candidate-sponsor: {poll_class['poll_has_candidate_sponsor'].sum():,}")276 277 out = poll.merge(278 poll_class[["protocol", "sponsor_types",279 "poll_is_independent", "poll_has_candidate_sponsor"]],280 on="protocol", how="left",281 )282 out["poll_is_independent"] = out["poll_is_independent"].fillna(0).astype(int)283 out["poll_has_candidate_sponsor"] = (284 out["poll_has_candidate_sponsor"].fillna(0).astype(int))285 286 # Derived: field_end (datetime), days_to_election, field_period_week,287 # and muni_id (string form of muni_code_tse).288 out["field_end"] = pd.to_datetime(out["date_end"], errors="coerce")289 out["days_to_election"] = (ELECTION_R1 - out["field_end"]).dt.days290 out["field_period_week"] = out["field_end"].dt.strftime("%Y-W%U")291 out["muni_id"] = out["muni_code_tse"].astype(str)292 293 out = out[[294 "protocol", "uf", "muni_id", "municipality",295 "institute", "institute_fantasy",296 "pollster_cnpj", "pollster_name", "st_pesquisa_propria",297 "sample_size", "field_end", "field_period_week", "days_to_election",298 "sponsor_types", "poll_is_independent", "poll_has_candidate_sponsor",299 ]]300 301 OUT.parent.mkdir(parents=True, exist_ok=True)302 out.to_parquet(OUT, index=False)303 print(f"\nWrote {OUT}")304 print(f"Rows: {len(out):,}; UFs: {out['uf'].nunique()}; "305 f"munis: {out['muni_id'].nunique():,}")306 print(f"Independent polls: {(out['poll_is_independent']==1).sum():,}; "307 f"Candidate-sponsored polls: "308 f"{(out['poll_has_candidate_sponsor']==1).sum():,}")309 print(f"st_pesquisa_propria=='S': "310 f"{(out['st_pesquisa_propria']=='S').sum():,}")311 return 0312 313 314 if __name__ == "__main__":315 raise SystemExit(main())316