Pipeline GenAI de Recomendações para Líderes de Vendas
Pipeline GenAI de Recomendações para Líderes de Vendas
Este artigo descreve um pipeline em produção que combina Machine Learning (preparação de dados, feature engineering, orquestração) e Inteligência Artificial generativa (LLM Claude, engenharia de prompts, inferência em batch) para gerar recomendações personalizadas para líderes de equipes de vendas. O conteúdo é apresentado de forma genérica, sem identificação de clientes.
Visão geral do fluxo
O pipeline segue as etapas abaixo, orquestradas no Databricks Workflows:
- Configuração – Parâmetros por ambiente (dev/prd), endpoint do modelo, método de invocação (ai_query ou pay_per_token).
- Carga e padronização – Leitura de tabelas pré-agregadas (Process C), mapeamento de colunas e tipos de problema.
- Preparação de dados – Join com dados de consultoras, construção de contexto por líder (ciclos, equipe, métricas).
- Feature engineering – Cálculo de indicadores por líder (tamanho de equipe, taxas de atividade, níveis de desempenho, inatividade).
- Geração de prompts – Construção de prompts por tipo de recomendação (SQL/expressões em PySpark).
- Inferência LLM – Chamadas em batch ao Claude (via ai_query ou pay_per_token).
- Persistência – Gravação em Delta Lake com Unity Catalog (tabelas intermediárias e final).
Fluxograma do pipeline
Figura 1 – Fluxo do pipeline de recomendações GenAI (Config → Data Loading → Data Prep → Feature Engineering → Prompt Generation → LLM → Persistência).
Parte de Machine Learning
- Ingestão e padronização: tabelas Delta com schema padronizado (leader_code, operational_cycle, problem_type, métricas de equipe).
- Feature engineering: métricas por líder (total de consultores, pedidos, itens, taxas de atividade por nível, indicadores de inatividade) para alimentar o contexto do LLM.
- Orquestração: Databricks Workflows com tarefas encadeadas (config → load → data prep → feature eng → prompt gen → LLM → persist).
- Governança: Unity Catalog, tabelas intermediárias nomeadas e versionamento em Delta Lake.
Tudo relacionado a dados, agregações e métricas está no lado ML; a geração de texto fica no lado IA.
Exemplo: Configuração do modelo e do pipeline
# Configuração do Model Serving (widgets ou task values do job)
model_config = ModelServingConfig(
endpoint_name="databricks-claude-sonnet-4",
invocation_method="pay_per_token", # ou "ai_query"
max_tokens=10000,
temperature=0.7,
enable_chunking=True,
chunk_size=50,
)
# Configuração das tabelas de origem e destino
leader_config = LeaderDataPipelineConfig(
source_catalog="product",
source_schema="elo_modelo_combinado",
source_table="commercial_consultant_index",
target_catalog=get_database_name(),
target_schema="data_science_features",
target_table="genai_cb_bra_agente_iap_recomendacao_ln_recommendations",
)
Exemplo: Carga e padronização (Process C)
# Carga da tabela pré-agregada (Process C)
df_process_c = spark.table(process_c_config.source_table_full_name)
# Mapeamento de colunas para schema padronizado
column_mapping = {
"cod_lider": "leader_code",
"nome_lider": "leader_name",
"ciclo": "operational_cycle",
# ...
}
for old_col, new_col in column_mapping.items():
if old_col in df_process_c.columns:
df_process_c = df_process_c.withColumnRenamed(old_col, new_col)
# Tipo de recomendação a partir do problem_type
df_standardized = df_process_c.withColumn(
"recommendation_type",
when(col("problem_type") == "atividade", lit("ATIVAS"))
.when(col("problem_type") == "cessadas", lit("ATIVAS"))
.otherwise(lit("GERAL"))
)
Parte de Inteligência Artificial: Claude, ai_query e pay per token
Dois modos de invocação do Claude
O pipeline suporta duas formas de chamar o modelo no Databricks:
| Modo | Descrição | Uso típico |
|---|---|---|
| ai_query | Inferência via AI Query (SQL/Spark nativo no Databricks). O custo é gerenciado pela plataforma; ideal para batch grande. | Jobs agendados, muitos registros por execução. |
| pay_per_token | Chamadas à API do endpoint (Model Serving) com controle explícito de tokens (max_tokens, chunking). Cobrança por token consumido. | Controle fino de custo, testes, lotes menores. |
- ai_query: uso de
ai_query()(ou equivalente) no Spark para enviar coluna de prompts e receber coluna de respostas em batch, sem gerenciar HTTP manualmente. - pay_per_token: uso de token do workspace, URL do endpoint e parâmetros (max_tokens, temperature); suporte a chunking (processar em blocos de N linhas) para controlar custo e tempo.
Controle de custo com tokens
- max_tokens: limite por resposta para evitar respostas muito longas e gasto desnecessário.
- Chunking: processar em lotes (ex.: 50 registros por vez) reduz picos de uso e facilita retentativas.
- Inferência em batch: preferir batch em vez de chamada por registro quando possível (tanto em ai_query quanto em pay_per_token em lotes).
Engenharia de prompts
- Prompts são montados em PySpark (concatenação de colunas e textos) por tipo de recomendação (ex.: atividade, desempenho, priorização).
- Cada linha enviada ao LLM contém contexto estruturado (métricas do líder, equipe, problema) para o modelo gerar texto de recomendação.
- Tratamento de erros e fallback: respostas vazias ou com indicação de erro são substituídas por mensagem genérica antes de persistir.
Trecho principal: geração da coluna de prompt
# Expressão SQL/PySpark para montar o prompt por linha (ex.: concat de contexto)
prompt_sql = """
concat(
'Analise o desempenho do líder: ', leader_name, '. ',
'Equipe com ', total_managed_consultants, ' consultores. ',
'Ciclo: ', operational_cycle, '. ',
'Problema: ', problem_type, '. ',
'Causa geral: ', coalesce(causa_geral, 'N/A'), '. ',
'Gere uma recomendação objetiva em até 3 parágrafos.'
)
"""
from pyspark.sql.functions import expr
df_with_prompts = df_leaders_bra.withColumn("prompt_text", expr(prompt_sql))
Invocação LLM (pay_per_token)
# Uso do invoker com chunking (lotes de N registros)
invoker = LLMInvoker(model_config)
df_to_process = df_with_prompts # ou .limit(N) para teste
df_with_recommendations = invoker.invoke_llm(
df_to_process,
prompt_column="prompt_text",
output_column="genai_recommendation",
)
Tratamento de erros e fallback
from pyspark.sql.functions import when, lower, col, coalesce, lit
df_with_recommendations = df_with_recommendations.withColumn(
"genai_recommendation",
when(lower(col("genai_recommendation")).contains("erro"), lit("Infelizmente não contamos com a resposta a essa consulta neste momento."))
.otherwise(col("genai_recommendation"))
)
df_with_recommendations = df_with_recommendations.withColumn(
"genai_recommendation",
coalesce(col("genai_recommendation"), lit("Infelizmente não contamos com a resposta a essa consulta neste momento."))
)
Persistência em Delta (Unity Catalog)
# Gravação na tabela com recomendações (append para histórico)
df_to_save = df_with_recommendations.select(cols_with_recommendations)
df_to_save.write.format("delta").mode("append") \
.option("mergeSchema", "true") \
.saveAsTable(df_with_recommendations_path)
Prévia do fluxo (resumo)
[Config] → [Data Loading] → [Data Prep] → [Feature Engineering] → [Prompt Generation]
→ [LLM Invocation: ai_query ou pay_per_token] → [Error handling] → [Persistência Delta]
- Config: ambiente, endpoint, método (ai_query / pay_per_token), max_tokens, chunk_size.
- Data Loading / Prep / Feature Eng: PySpark, Delta Lake.
- Prompt Generation: expressões SQL/Spark gerando coluna
prompt_text. - LLM: Claude Sonnet; saída em coluna
genai_recommendation. - Persistência: tabelas Delta (ex.: with_recommendations em append, tabela final em overwrite).
Relação com as habilidades do portfólio
- Machine Learning (Vendas): pipeline de dados, feature engineering, métricas de equipe e líderes, orquestração em Databricks.
- Inteligência Artificial (Claude): uso do Claude em produção, ai_query vs pay_per_token, controle por tokens, engenharia de prompts e inferência em batch.
Para mais detalhes técnicos, consulte a página de Machine Learning → Vendas e Inteligência Artificial → Claude neste portfólio.