1. PandasAI简介

  • 定义:结合Pandas和AI的开源Python库
  • 核心功能:使用自然语言进行数据查询和分析
  • 支持数据源:CSV、XLSX、PostgreSQL、MySQL、BigQuery、Databricks、Snowflake等

2. 主要特点

  • 自然语言查询:用日常语言提问数据问题
  • 数据可视化:自动生成图表和图形
  • 数据清理:处理缺失值问题
  • 特征生成:提升数据质量
  • 多数据源连接:支持多种数据库和文件格式

3. 技术架构

工作原理:
自然语言查询 → AI模型理解 → 转换为Python/SQL代码 → 与数据交互 → 返回结果

4. 环境安装步骤

步骤1:源码包下载与依赖管理
  • 使用Python 3.10.x版本
  • 从GitLab仓库下载源码(对应01分支)
  • 明确依赖版本以避免兼容性问题
步骤2:安装Python依赖
  • 安装PandasAI及相关依赖包
  • 需要修复官方代码中的bug(文章中提到的)
  • 配置生成式AI模型(如OpenAI GPT)
步骤3:运行Demo
  • 使用官方示例代码进行测试
  • 验证自然语言查询功能
  • 测试数据可视化和分析能力

5. 应用价值

  • 提升效率:减少编写复杂查询和分析代码的时间
  • 降低门槛:非技术人员也能进行数据分析
  • 全面功能:涵盖数据探索、清洗、可视化、特征工程全流程

6. 技术资源

  • 官方文档:https://docs.pandas-ai.com
  • GitHub仓库:https://github.com/Sinaptik-AI/pandas-ai
  • 源码仓库:GitLab(作者提供的实例代码)

实际应用场景

  1. 业务分析师:快速生成报表和洞察
  2. 数据科学家:加速数据探索和预处理
  3. 开发人员:简化数据查询和分析流程
  4. 产品经理:直接与数据对话获取指标

注意事项

  • 需要适当修复官方代码中的bug
  • 注意依赖版本兼容性
  • 需要配置有效的AI模型API(如OpenAI)

这篇文章为初学者提供了完整的PandasAI入门指南,从环境搭建到实际应用,展示了如何利用AI技术简化传统的数据分析工作流程。

PandasAI实战:环境搭建与基本使用

完整步骤详解

步骤1:环境准备与安装

1.1 创建虚拟环境(推荐)
# 使用conda
conda create -n pandasai_env python=3.10
conda activate pandasai_env

# 或使用venv
python -m venv pandasai_env
# Windows
pandasai_env\Scripts\activate
# Linux/Mac
source pandasai_env/bin/activate
1.2 安装核心依赖
# 基础包
pip install pandas numpy matplotlib seaborn

# PandasAI
pip install pandas-ai

# 如果需要使用OpenAI等大模型
pip install openai
# 或使用本地模型
pip install langchain
1.3 验证安装
import pandas as pd
import pandasai as pai

print(f"Pandas版本: {pd.__version__}")
print(f"PandasAI版本: {pai.__version__}")

步骤2:准备测试DataFrame

import pandas as pd
from datetime import datetime

# 创建示例DataFrame
def create_sample_dataframe():
    data = {
        'date': pd.date_range(start='2024-01-01', periods=30, freq='D'),
        'city': ['北京']*10 + ['上海']*10 + ['广州']*10,
        'temperature': [2, 3, 1, 4, 2, 3, 5, 6, 4, 3] + 
                      [8, 9, 10, 8, 7, 9, 11, 10, 8, 9] + 
                      [18, 19, 20, 21, 22, 20, 19, 21, 22, 23],
        'humidity': [45, 47, 50, 48, 46, 49, 51, 52, 50, 48] + 
                   [65, 66, 68, 67, 65, 69, 70, 68, 67, 66] + 
                   [75, 76, 78, 77, 79, 76, 75, 78, 77, 76],
        'sales': [1000, 1200, 800, 1500, 900, 1300, 1400, 1600, 1100, 1250] + 
                [2000, 2200, 1800, 2500, 1900, 2300, 2400, 2600, 2100, 2250] + 
                [3000, 3200, 2800, 3500, 2900, 3300, 3400, 3600, 3100, 3250],
        'category': ['A', 'B', 'A', 'C', 'B', 'A', 'C', 'B', 'A', 'C'] * 3
    }
    
    df = pd.DataFrame(data)
    # 添加一些缺失值
    df.loc[5, 'sales'] = None
    df.loc[15, 'humidity'] = None
    df.loc[25, 'temperature'] = None
    
    return df

# 创建并查看数据
df = create_sample_dataframe()
print("数据形状:", df.shape)
print("\n前5行数据:")
print(df.head())
print("\n数据基本信息:")
print(df.info())
print("\n描述性统计:")
print(df.describe())

步骤3:配置PandasAI并创建MockLLM

from pandasai import SmartDataframe
from pandasai.llm import OpenAI
from pandasai.llm.local_llm import LocalLLM
import warnings
warnings.filterwarnings('ignore')

# 方案1:使用MockLLM(用于测试)
class MockLLM:
    """模拟LLM类,返回预设的代码"""
    
    def __init__(self):
        self.history = []
        
    def call(self, instruction: str, value: str, suffix: str = ""):
        """模拟LLM调用"""
        
        # 记录历史
        self.history.append({
            'instruction': instruction,
            'value': value,
            'suffix': suffix
        })
        
        # 根据问题返回预设的pandas代码
        if "前5行" in instruction or "前5条" in instruction:
            return "df.head(5)"
        elif "统计信息" in instruction or "describe" in instruction:
            return "df.describe()"
        elif "平均值" in instruction or "平均温度" in instruction:
            if "温度" in instruction:
                return "df['temperature'].mean()"
            elif "湿度" in instruction:
                return "df['humidity'].mean()"
            elif "销售额" in instruction:
                return "df['sales'].mean()"
        elif "各城市" in instruction and "平均温度" in instruction:
            return "df.groupby('city')['temperature'].mean()"
        elif "缺失值" in instruction:
            return "df.isnull().sum()"
        elif "北京" in instruction and "销售额" in instruction:
            return "df[df['city'] == '北京']['sales'].sum()"
        elif "折线图" in instruction or "趋势" in instruction:
            return """
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 6))
for city in df['city'].unique():
    city_data = df[df['city'] == city]
    plt.plot(city_data['date'], city_data['temperature'], label=city, marker='o')
plt.title('各城市温度趋势')
plt.xlabel('日期')
plt.ylabel('温度(°C)')
plt.legend()
plt.grid(True)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
return plt
"""
        else:
            # 默认返回前3行
            return "df.head(3)"
    
    def chat(self, prompt: str):
        """聊天接口"""
        return self.call(prompt, "", "")

# 方案2:使用真实的OpenAI API(需要API密钥)
# llm = OpenAI(api_token="your-api-key-here")

# 创建MockLLM实例
mock_llm = MockLLM()

# 创建SmartDataframe
sdf = SmartDataframe(df, config={"llm": mock_llm, "verbose": True})

步骤4:运行自然语言查询

def run_queries(smart_df):
    """运行一系列自然语言查询"""
    
    queries = [
        "显示数据的前5行",
        "给我数据的统计信息",
        "计算平均温度是多少?",
        "计算各城市的平均温度",
        "查看数据中的缺失值情况",
        "计算北京的销售额总和",
        "绘制各城市温度变化趋势的折线图"
    ]
    
    results = {}
    
    for i, query in enumerate(queries, 1):
        print(f"\n{'='*50}")
        print(f"查询 {i}: {query}")
        print('-'*30)
        
        try:
            # 执行查询
            result = smart_df.chat(query)
            results[query] = result
            
            # 显示结果
            if isinstance(result, pd.DataFrame):
                print(result.to_string())
            elif isinstance(result, pd.Series):
                print(result.to_string())
            elif hasattr(result, 'show'):  # 如果是matplotlib对象
                print("已生成图表")
                # 在实际环境中,可以保存图表
                # result.savefig(f"chart_{i}.png")
            else:
                print(f"结果: {result}")
                
        except Exception as e:
            print(f"查询失败: {e}")
            results[query] = f"错误: {e}"
    
    return results

# 运行查询
results = run_queries(sdf)

步骤5:高级功能演示

def advanced_features_demo():
    """演示PandasAI的高级功能"""
    
    print("\n" + "="*60)
    print("高级功能演示")
    print("="*60)
    
    # 1. 数据清理示例
    print("\n1. 数据清理 - 处理缺失值")
    print("原始数据缺失情况:")
    print(df.isnull().sum())
    
    # 使用PandasAI进行数据清理(模拟)
    clean_query = "清理数据中的缺失值,用平均值填充"
    print(f"\n执行查询: {clean_query}")
    # 在实际PandasAI中,这会生成相应的清理代码
    
    # 2. 特征工程示例
    print("\n2. 特征工程 - 创建新特征")
    feature_query = "创建一个新特征'temp_category',根据温度分类:低温(<10)、中温(10-20)、高温(>20)"
    print(f"执行查询: {feature_query}")
    
    # 手动实现以演示
    def categorize_temp(temp):
        if pd.isna(temp):
            return '未知'
        elif temp < 10:
            return '低温'
        elif temp <= 20:
            return '中温'
        else:
            return '高温'
    
    df['temp_category'] = df['temperature'].apply(categorize_temp)
    print("新增特征后的数据前5行:")
    print(df[['date', 'city', 'temperature', 'temp_category']].head())
    
    # 3. 数据聚合分析
    print("\n3. 数据聚合分析")
    agg_query = "按城市和温度类别统计平均销售额"
    print(f"执行查询: {agg_query}")
    
    agg_result = df.groupby(['city', 'temp_category'])['sales'].mean()
    print(agg_result)
    
    # 4. 时间序列分析
    print("\n4. 时间序列分析 - 计算7天移动平均")
    ts_query = "计算每个城市销售额的7天移动平均值"
    print(f"执行查询: {ts_query}")
    
    # 演示代码
    df.set_index('date', inplace=True)
    for city in df['city'].unique():
        city_sales = df[df['city'] == city]['sales']
        ma_7 = city_sales.rolling(window=7).mean()
        print(f"{city}的7天移动平均销售额: {ma_7.dropna().iloc[-1] if len(ma_7.dropna()) > 0 else '数据不足'}")
    
    df.reset_index(inplace=True)
    
    return df

# 运行高级功能演示
enhanced_df = advanced_features_demo()

步骤6:实际应用场景示例

def real_world_scenarios():
    """实际应用场景演示"""
    
    print("\n" + "="*60)
    print("实际应用场景示例")
    print("="*60)
    
    # 场景1:销售数据分析
    print("\n场景1: 销售数据分析")
    sales_scenarios = [
        "哪个月份的销售额最高?",
        "哪个城市的平均销售额最高?",
        "按类别分析销售额分布",
        "找出销售额最高的3天"
    ]
    
    for scenario in sales_scenarios:
        print(f"\n问题: {scenario}")
        # 在实际PandasAI中,可以直接用自然语言查询
        # result = sdf.chat(scenario)
        # print(f"答案: {result}")
    
    # 场景2:气象数据分析
    print("\n场景2: 气象数据分析")
    weather_scenarios = [
        "哪个城市的温度波动最大?",
        "温度和湿度之间有什么关系?",
        "预测未来3天的温度趋势",
        "找出异常的温度值"
    ]
    
    for scenario in weather_scenarios:
        print(f"\n问题: {scenario}")
    
    # 场景3:业务报告生成
    print("\n场景3: 自动生成业务报告")
    report_query = """
    生成一份数据分析报告,包括:
    1. 总体销售情况概览
    2. 各城市表现对比
    3. 温度对销售的影响分析
    4. 主要发现和建议
    """
    
    print(f"\n报告生成请求: {report_query}")
    print("\n模拟报告内容:")
    print("-"*40)
    print("数据分析报告")
    print("-"*40)
    print("1. 总体销售情况:")
    print(f"   总销售额: {df['sales'].sum():,.0f}元")
    print(f"   平均日销售额: {df['sales'].mean():,.0f}元")
    print(f"   销售天数: {df['date'].nunique()}天")
    
    print("\n2. 各城市表现对比:")
    city_sales = df.groupby('city')['sales'].sum()
    for city, sales in city_sales.items():
        print(f"   {city}: {sales:,.0f}元")
    
    print("\n3. 温度对销售的影响:")
    temp_sales_corr = df['temperature'].corr(df['sales'])
    print(f"   温度与销售额的相关系数: {temp_sales_corr:.3f}")
    
    print("\n4. 主要发现和建议:")
    print("   - 上海和广州的销售额明显高于北京")
    print("   - 温度与销售额呈正相关关系")
    print("   - 建议在温度较高的季节加大营销力度")

# 运行应用场景演示
real_world_scenarios()

步骤7:完整示例代码整合

# 完整的示例代码
def complete_demo():
    """
    PandasAI完整演示
    包含环境检查、数据准备、查询执行和结果展示
    """
    
    print("PandasAI 完整演示")
    print("="*60)
    
    try:
        # 1. 环境检查
        print("1. 检查环境...")
        import pandas as pd
        import numpy as np
        from pandasai import SmartDataframe
        
        print("   ✓ 环境检查通过")
        
        # 2. 创建数据
        print("\n2. 创建示例数据...")
        df = create_sample_dataframe()
        print(f"   ✓ 创建了包含 {len(df)} 行数据的DataFrame")
        
        # 3. 初始化PandasAI
        print("\n3. 初始化PandasAI...")
        mock_llm = MockLLM()
        sdf = SmartDataframe(df, config={"llm": mock_llm, "verbose": False})
        print("   ✓ PandasAI初始化完成")
        
        # 4. 执行示例查询
        print("\n4. 执行自然语言查询...")
        print("\n示例查询1: '显示前3行数据'")
        result1 = sdf.chat("显示前3行数据")
        print(result1)
        
        print("\n示例查询2: '计算平均销售额'")
        result2 = sdf.chat("计算平均销售额")
        print(f"平均销售额: {result2}")
        
        print("\n示例查询3: '按城市分组统计平均温度'")
        result3 = sdf.chat("按城市分组统计平均温度")
        print(result3)
        
        print("\n✓ 演示完成!")
        
        return {
            'dataframe': df,
            'smart_dataframe': sdf,
            'results': {
                '前3行数据': result1,
                '平均销售额': result2,
                '各城市平均温度': result3
            }
        }
        
    except ImportError as e:
        print(f"✗ 导入错误: {e}")
        print("请确保已安装必要的包:")
        print("pip install pandas pandas-ai numpy")
        return None
    except Exception as e:
        print(f"✗ 发生错误: {e}")
        return None

# 运行完整演示
demo_results = complete_demo()

if demo_results:
    print("\n" + "="*60)
    print("演示总结")
    print("="*60)
    print(f"1. 数据规模: {len(demo_results['dataframe'])} 行 × {len(demo_results['dataframe'].columns)} 列")
    print(f"2. 成功执行查询数: {len(demo_results['results'])}")
    print(f"3. 使用的列: {list(demo_results['dataframe'].columns)}")
    print("\n您可以使用以下方式继续探索:")
    print("   sdf.chat('您的自然语言问题')")
    print("\n例如:")
    print("   sdf.chat('哪天的销售额最高?')")
    print("   sdf.chat('绘制温度分布直方图')")
    print("   sdf.chat('按星期分析销售趋势')")

关键要点总结

1. 核心优势

  • 自然语言接口:无需编写复杂的Pandas代码
  • 智能分析:自动生成分析代码和可视化
  • 降低门槛:业务人员可直接与数据对话

2. 使用建议

# 最佳实践
# 1. 明确问题
question = "分析2024年1月各城市的销售趋势"

# 2. 逐步细化
sub_questions = [
    "计算各城市1月总销售额",
    "比较各城市日均销售额",
    "绘制销售额趋势图"
]

# 3. 验证结果
for q in sub_questions:
    result = sdf.chat(q)
    print(f"问题: {q}")
    print(f"结果: {result}\n")

3. 注意事项

  • MockLLM仅用于测试,生产环境需要真实的LLM
  • 复杂查询可能需要多次交互
  • 结果需要人工验证准确性
  • 注意数据隐私和安全

4. 下一步学习

  1. 接入真实LLM(OpenAI、本地模型等)
  2. 学习高级数据连接功能
  3. 探索自定义函数和插件
  4. 了解性能优化技巧

这个完整的示例展示了PandasAI从环境搭建到实际应用的全过程。通过MockLLM模拟,您可以在本地环境中体验PandasAI的自然语言查询能力,为后续接入真实AI模型打下基础。

PandasAI进阶实战:深入学习路径详解

4. 下一步学习详细指南

4.1 接入真实LLM(OpenAI、本地模型等)

4.1.1 接入OpenAI API
# 安装必要的包
# pip install openai pandasai python-dotenv

import os
from dotenv import load_dotenv
from pandasai import SmartDataframe
from pandasai.llm import OpenAI
import pandas as pd

# 1. 配置API密钥
load_dotenv()  # 从.env文件加载环境变量

# 方法1:使用环境变量
os.environ["OPENAI_API_KEY"] = "your-api-key-here"

# 方法2:直接配置
llm = OpenAI(
    api_token="sk-your-openai-api-key",
    model="gpt-4",  # 或 "gpt-3.5-turbo"
    temperature=0.7,
    max_tokens=1000,
    timeout=120,  # 请求超时时间
)

# 2. 创建数据集
data = {
    "产品": ["手机", "平板", "电脑", "手表", "耳机"] * 4,
    "季度": ["Q1"]*5 + ["Q2"]*5 + ["Q3"]*5 + ["Q4"]*5,
    "销售额": [10000, 8000, 15000, 5000, 3000] * 4,
    "成本": [6000, 5000, 10000, 3000, 1800] * 4,
    "地区": ["华东", "华南", "华北", "华西", "华中"] * 4
}
df = pd.DataFrame(data)

# 3. 创建SmartDataframe
sdf = SmartDataframe(
    df,
    config={
        "llm": llm,
        "verbose": True,  # 显示详细日志
        "save_logs": True,  # 保存日志
        "enable_cache": True,  # 启用缓存
        "max_retries": 3  # 最大重试次数
    }
)

# 4. 使用真实LLM进行查询
queries = [
    "计算每个产品的平均销售额",
    "哪个季度的总销售额最高?",
    "绘制各产品销售额的柱状图",
    "计算每个产品的利润率((销售额-成本)/销售额)",
    "分析各地区的销售表现并给出建议"
]

for i, query in enumerate(queries, 1):
    print(f"\n查询 {i}: {query}")
    print("-" * 50)
    try:
        result = sdf.chat(query)
        if hasattr(result, '__repr__'):
            print(result)
        else:
            print("查询完成!")
    except Exception as e:
        print(f"查询失败: {str(e)}")

# 5. 高级配置示例
advanced_config = {
    "llm": OpenAI(
        api_token="your-api-key",
        model="gpt-4",
        temperature=0.3,  # 更确定的输出
        max_tokens=2000,
        top_p=0.9,
        frequency_penalty=0.1,
        presence_penalty=0.1,
    ),
    "conversational": True,  # 启用对话模式
    "memory": True,  # 启用记忆功能
    "custom_prompts": {
        "data_visualization": "请为以下数据创建可视化图表:{prompt}"
    },
    "custom_whitelisted_dependencies": ["seaborn", "plotly"]
}
4.1.2 接入本地开源模型(使用Ollama)
# 安装必要包
# pip install ollama langchain pandasai

from pandasai import SmartDataframe
from pandasai.llm import Ollama
import pandas as pd

# 1. 确保Ollama服务正在运行
# 在终端运行:ollama serve
# 下载模型:ollama pull llama2 或 ollama pull mistral

# 2. 配置本地LLM
local_llm = Ollama(
    model="llama2",  # 或 "mistral", "codellama"
    base_url="http://localhost:11434",  # Ollama默认地址
    temperature=0.7,
    max_tokens=2000,
    # 可选:设置自定义提示模板
    custom_prompt_template="""
    你是一个数据分析助手。用户会给你一个DataFrame和一些查询。
    请用Python代码回答问题。
    
    数据信息:
    {df_head}
    
    用户查询:{query}
    
    请生成合适的代码:
    """
)

# 3. 准备数据
df = pd.read_csv("your_data.csv")  # 或从其他源加载

# 4. 创建SmartDataframe
sdf_local = SmartDataframe(
    df,
    config={
        "llm": local_llm,
        "verbose": True,
        "enforce_privacy": True,  # 隐私模式,不发送数据到外部
        "use_error_correction_framework": True  # 使用错误纠正框架
    }
)

# 5. 测试查询
try:
    result = sdf_local.chat("数据的基本统计信息是什么?")
    print(result)
except Exception as e:
    print(f"错误: {e}")
4.1.3 使用多模型切换
from pandasai import SmartDataframe
from pandasai.llm import OpenAI, Ollama, HuggingFaceLLM
import pandas as pd

class MultiModelManager:
    """多模型管理器"""
    
    def __init__(self):
        self.models = {}
        self.current_model = None
        
    def register_model(self, name, llm_instance):
        """注册模型"""
        self.models[name] = llm_instance
        
    def switch_model(self, name):
        """切换模型"""
        if name in self.models:
            self.current_model = self.models[name]
            return True
        return False
    
    def get_model(self, name=None):
        """获取模型"""
        if name:
            return self.models.get(name)
        return self.current_model

# 初始化管理器
manager = MultiModelManager()

# 注册多个模型
manager.register_model(
    "openai_gpt4",
    OpenAI(api_token="your-key", model="gpt-4")
)

manager.register_model(
    "openai_gpt3",
    OpenAI(api_token="your-key", model="gpt-3.5-turbo")
)

manager.register_model(
    "local_llama",
    Ollama(model="llama2", base_url="http://localhost:11434")
)

# 根据需求切换模型
df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})

# 使用GPT-4处理复杂查询
manager.switch_model("openai_gpt4")
sdf_gpt4 = SmartDataframe(df, config={"llm": manager.get_model()})
complex_result = sdf_gpt4.chat("进行时间序列预测分析")

# 使用本地模型处理简单查询
manager.switch_model("local_llama")
sdf_local = SmartDataframe(df, config={"llm": manager.get_model()})
simple_result = sdf_local.chat("计算平均值")

4.2 学习高级数据连接功能

4.2.1 连接多种数据库
# 安装必要包
# pip install pandasai[sql] sqlalchemy psycopg2-binary pymysql

from pandasai import SmartDataframe
from pandasai.connectors import (
    SQLConnector,
    PostgreSQLConnector,
    MySQLConnector,
    SnowflakeConnector,
    BigQueryConnector
)
import pandas as pd

# 1. PostgreSQL连接
postgres_connector = PostgreSQLConnector(
    config={
        "host": "localhost",
        "port": 5432,
        "database": "your_database",
        "username": "your_username",
        "password": "your_password",
        "table": "sales_data",  # 或使用SQL查询
        # "query": "SELECT * FROM sales WHERE date > '2024-01-01'"
    }
)

# 2. MySQL连接
mysql_connector = MySQLConnector(
    config={
        "host": "localhost",
        "port": 3306,
        "database": "your_db",
        "username": "root",
        "password": "password",
        "table": "customer_data"
    }
)

# 3. Snowflake连接
snowflake_connector = SnowflakeConnector(
    config={
        "account": "your_account",
        "username": "your_username",
        "password": "your_password",
        "database": "your_database",
        "schema": "your_schema",
        "warehouse": "your_warehouse",
        "role": "your_role",
        "table": "large_dataset"
    }
)

# 4. 通用SQL连接器
generic_connector = SQLConnector(
    config={
        "connection_string": "postgresql://user:password@localhost/dbname",
        "table": "your_table"
    }
)

# 5. 创建SmartDataframe并查询
connector = postgres_connector  # 选择要使用的连接器

sdf_db = SmartDataframe(
    connector,
    config={
        "llm": OpenAI(api_token="your-key"),
        "verbose": True
    }
)

# 自然语言查询数据库
queries = [
    "显示最近30天的销售记录",
    "计算每个地区的总销售额",
    "找出销售额最高的10个产品",
    "分析销售趋势并预测下个月销售额"
]

for query in queries:
    print(f"\n查询: {query}")
    try:
        result = sdf_db.chat(query)
        if isinstance(result, pd.DataFrame):
            print(f"返回{len(result)}行数据")
            print(result.head())
        else:
            print(result)
    except Exception as e:
        print(f"错误: {e}")
4.2.2 多数据源联合查询
from pandasai import SmartDatalake
from pandasai.connectors import (
    PostgreSQLConnector,
    CSVConnector,
    ExcelConnector
)

# 1. 创建多个数据源连接器
sales_connector = PostgreSQLConnector({
    "host": "localhost",
    "database": "sales_db",
    "table": "transactions"
})

customers_connector = CSVConnector({
    "path": "/path/to/customers.csv"
})

products_connector = ExcelConnector({
    "path": "/path/to/products.xlsx",
    "sheet_name": "ProductInfo"
})

# 2. 创建数据湖(支持多数据源)
datalake = SmartDatalake(
    [sales_connector, customers_connector, products_connector],
    config={
        "llm": OpenAI(api_token="your-key"),
        "verbose": True
    }
)

# 3. 跨数据源查询
cross_source_queries = [
    # 关联查询
    "将销售数据与客户数据关联,分析VIP客户的购买行为",
    
    # 复杂分析
    "计算每个产品类别的销售额,并按地区分组",
    
    # 数据整合
    "创建完整的销售报告,包含产品信息、客户信息和交易详情",
    
    # 业务洞察
    "找出最受欢迎的产品组合,并建议捆绑销售策略"
]

for query in cross_source_queries:
    print(f"\n跨源查询: {query}")
    try:
        result = datalake.chat(query)
        print(f"查询完成!")
        if isinstance(result, pd.DataFrame):
            print(f"返回数据形状: {result.shape}")
    except Exception as e:
        print(f"错误: {e}")
4.2.3 实时API数据连接
# 安装必要包
# pip install requests pandasai

import requests
from pandasai import SmartDataframe
from pandasai.connectors import BaseConnector
import pandas as pd

class APIConnector(BaseConnector):
    """自定义API连接器"""
    
    def __init__(self, config):
        self.api_url = config["api_url"]
        self.api_key = config.get("api_key")
        self.headers = config.get("headers", {})
        self.params = config.get("params", {})
        
    def head(self, n=5):
        """获取数据头部"""
        return self._fetch_data().head(n)
        
    def _fetch_data(self):
        """从API获取数据"""
        headers = self.headers.copy()
        if self.api_key:
            headers["Authorization"] = f"Bearer {self.api_key}"
            
        response = requests.get(
            self.api_url,
            headers=headers,
            params=self.params,
            timeout=30
        )
        response.raise_for_status()
        
        data = response.json()
        # 假设API返回JSON列表
        return pd.DataFrame(data)
    
    @property
    def _df(self):
        """获取完整DataFrame"""
        return self._fetch_data()

# 使用自定义API连接器
api_connector = APIConnector({
    "api_url": "https://api.example.com/data",
    "api_key": "your_api_key_here",
    "headers": {"Content-Type": "application/json"},
    "params": {"limit": 1000}
})

sdf_api = SmartDataframe(
    api_connector,
    config={
        "llm": OpenAI(api_token="your-key"),
        "verbose": True
    }
)

# 查询实时数据
result = sdf_api.chat("分析最新的数据趋势")
print(result)

4.3 探索自定义函数和插件

4.3.1 创建自定义分析函数
from pandasai import SmartDataframe
from pandasai.helpers import code_manager
import pandas as pd
import numpy as np

# 1. 定义自定义函数库
class CustomAnalytics:
    """自定义分析函数库"""
    
    @staticmethod
    def calculate_cagr(start_value, end_value, periods):
        """计算复合年增长率"""
        if start_value <= 0:
            return 0
        return (end_value / start_value) ** (1 / periods) - 1
    
    @staticmethod
    def detect_anomalies_zscore(series, threshold=3):
        """使用Z-score检测异常值"""
        mean = np.mean(series)
        std = np.std(series)
        z_scores = (series - mean) / std
        return np.abs(z_scores) > threshold
    
    @staticmethod
    def calculate_roi(investment, returns):
        """计算投资回报率"""
        if investment == 0:
            return 0
        return (returns - investment) / investment
    
    @staticmethod
    def create_segments(data, column, bins, labels):
        """创建数据分段"""
        return pd.cut(data[column], bins=bins, labels=labels)

# 2. 注册自定义函数
df = pd.DataFrame({
    "month": pd.date_range("2024-01-01", periods=12, freq='M'),
    "revenue": [100, 120, 130, 115, 140, 160, 180, 200, 190, 210, 220, 230],
    "cost": [70, 80, 85, 75, 90, 100, 120, 130, 125, 140, 150, 155]
})

sdf_custom = SmartDataframe(
    df,
    config={
        "llm": OpenAI(api_token="your-key"),
        "custom_whitelisted_dependencies": [
            "CustomAnalytics",
            "calculate_cagr",
            "detect_anomalies_zscore",
            "calculate_roi",
            "create_segments"
        ],
        # 添加自定义导入
        "custom_imports": """
from custom_analytics import CustomAnalytics
import numpy as np
        """
    }
)

# 3. 使用自定义函数的查询
custom_queries = [
    "使用calculate_cagr函数计算收入的复合年增长率",
    "使用detect_anomalies_zscore检测收入中的异常值",
    "使用calculate_roi计算每个月的投资回报率",
    "使用create_segments将收入分为低、中、高三段"
]

for query in custom_queries:
    print(f"\n自定义查询: {query}")
    try:
        result = sdf_custom.chat(query)
        print(result)
    except Exception as e:
        print(f"错误: {e}")
4.3.2 创建自定义可视化插件
from pandasai import SmartDataframe
from pandasai.middlewares import BaseMiddleware
import plotly.graph_objects as go
import plotly.express as px

class PlotlyVisualizer(BaseMiddleware):
    """Plotly可视化中间件"""
    
    def run(self, code):
        """修改生成的代码以使用Plotly"""
        # 检测matplotlib代码并替换为plotly
        if "plt.show()" in code or "matplotlib" in code:
            code = self._convert_to_plotly(code)
        return code
    
    def _convert_to_plotly(self, code):
        """将matplotlib代码转换为plotly"""
        conversions = {
            "import matplotlib.pyplot as plt": "import plotly.express as px\nimport plotly.graph_objects as go",
            "plt.bar(": "go.Bar(",
            "plt.plot(": "go.Scatter(",
            "plt.scatter(": "go.Scatter(mode='markers', ",
            "plt.hist(": "go.Histogram(",
            "plt.show()": "fig.show()",
            "plt.figure(": "fig = go.Figure(",
            "plt.title(": "fig.update_layout(title=",
            "plt.xlabel(": "fig.update_layout(xaxis_title=",
            "plt.ylabel(": "fig.update_layout(yaxis_title=",
            "plt.legend()": "fig.update_layout(showlegend=True)",
            "plt.grid(": "# Grid removed for plotly",
        }
        
        for old, new in conversions.items():
            code = code.replace(old, new)
        
        return code

class CustomVisualizations:
    """自定义可视化函数"""
    
    @staticmethod
    def create_waterfall(df, values, labels, title="Waterfall Chart"):
        """创建瀑布图"""
        fig = go.Figure(go.Waterfall(
            name="业绩",
            orientation="v",
            measure=["relative"] * len(df),
            x=df[labels],
            y=df[values],
            connector={"line": {"color": "rgb(63, 63, 63)"}},
        ))
        
        fig.update_layout(
            title=title,
            showlegend=True,
            waterfallgap=0.3,
        )
        
        return fig
    
    @staticmethod
    def create_sunburst(df, path, values, title="Sunburst Chart"):
        """创建旭日图"""
        fig = px.sunburst(
            df,
            path=path,
            values=values,
            title=title
        )
        return fig

# 使用自定义可视化
df_viz = pd.DataFrame({
    "category": ["A", "B", "C", "A", "B", "C"],
    "subcategory": ["A1", "B1", "C1", "A2", "B2", "C2"],
    "value": [100, 150, 200, 120, 180, 220],
    "month": ["Jan", "Jan", "Jan", "Feb", "Feb", "Feb"]
})

sdf_viz = SmartDataframe(
    df_viz,
    config={
        "llm": OpenAI(api_token="your-key"),
        "middlewares": [PlotlyVisualizer()],
        "custom_whitelisted_dependencies": [
            "CustomVisualizations",
            "create_waterfall",
            "create_sunburst"
        ],
        "save_charts": True,
        "save_charts_path": "./charts"
    }
)

# 生成自定义可视化
viz_queries = [
    "使用create_waterfall创建价值的瀑布图",
    "使用create_sunburst创建分类的旭日图",
    "创建一个交互式的散点图矩阵"
]

for query in viz_queries:
    print(f"\n可视化查询: {query}")
    try:
        result = sdf_viz.chat(query)
        # 在Jupyter中会自动显示图表
        # 在脚本中,可以保存图表
        if hasattr(result, 'write_html'):
            result.write_html(f"chart_{query[:10]}.html")
            print("图表已保存为HTML文件")
    except Exception as e:
        print(f"错误: {e}")
4.3.3 创建数据质量检查插件
import pandas as pd
from pandasai import SmartDataframe
from pandasai.middlewares import BaseMiddleware

class DataQualityChecker(BaseMiddleware):
    """数据质量检查中间件"""
    
    def __init__(self, thresholds=None):
        self.thresholds = thresholds or {
            "missing_threshold": 0.3,
            "outlier_threshold": 3,
            "duplicate_threshold": 0.1
        }
        
    def run(self, df):
        """执行数据质量检查"""
        quality_report = {
            "summary": {},
            "issues": [],
            "suggestions": []
        }
        
        # 检查缺失值
        missing_percentage = df.isnull().sum() / len(df)
        high_missing = missing_percentage[missing_percentage > self.thresholds["missing_threshold"]]
        
        if len(high_missing) > 0:
            quality_report["issues"].append({
                "type": "high_missing_values",
                "columns": high_missing.index.tolist(),
                "values": high_missing.values.tolist()
            })
            quality_report["suggestions"].append(
                "考虑删除缺失值超过30%的列或使用插值方法"
            )
        
        # 检查重复值
        duplicate_rows = df.duplicated().sum()
        duplicate_percentage = duplicate_rows / len(df)
        
        if duplicate_percentage > self.thresholds["duplicate_threshold"]:
            quality_report["issues"].append({
                "type": "high_duplicates",
                "count": duplicate_rows,
                "percentage": duplicate_percentage
            })
            quality_report["suggestions"].append(
                "考虑删除重复行或调查数据收集过程"
            )
        
        # 生成摘要
        quality_report["summary"] = {
            "total_rows": len(df),
            "total_columns": len(df.columns),
            "missing_values": df.isnull().sum().sum(),
            "duplicate_rows": duplicate_rows,
            "data_types": df.dtypes.to_dict()
        }
        
        return quality_report

# 使用数据质量检查
df_quality = pd.DataFrame({
    "A": [1, 2, None, 4, 5],
    "B": [1, 1, 3, 4, 5],  # 有重复
    "C": [100, 200, 300, 400, 500],
    "D": [None, None, 3, 4, 5]  # 高缺失
})

quality_checker = DataQualityChecker()

# 创建SmartDataframe并添加质量检查
sdf_quality = SmartDataframe(
    df_quality,
    config={
        "llm": OpenAI(api_token="your-key"),
        "custom_middlewares": [quality_checker]
    }
)

# 自动质量检查
print("数据质量报告:")
quality_report = quality_checker.run(df_quality)
for key, value in quality_report.items():
    print(f"\n{key}:")
    if isinstance(value, dict):
        for k, v in value.items():
            print(f"  {k}: {v}")
    elif isinstance(value, list):
        for item in value:
            print(f"  {item}")

# 使用自然语言查询数据质量问题
result = sdf_quality.chat("识别数据质量问题并给出修复建议")
print(f"\nAI分析结果:\n{result}")

4.4 了解性能优化技巧

4.4.1 查询优化与缓存策略
from pandasai import SmartDataframe
from pandasai.llm import OpenAI
import pandas as pd
import time
from functools import lru_cache

# 1. 性能监控装饰器
def performance_monitor(func):
    """性能监控装饰器"""
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        execution_time = end_time - start_time
        print(f"函数 {func.__name__} 执行时间: {execution_time:.2f}秒")
        return result
    return wrapper

# 2. 智能缓存系统
class SmartCache:
    """智能缓存系统"""
    
    def __init__(self, max_size=1000, ttl=3600):
        self.cache = {}
        self.max_size = max_size
        self.ttl = ttl  # 缓存生存时间(秒)
        self.access_times = {}
        
    def get(self, key):
        """获取缓存"""
        if key in self.cache:
            # 检查是否过期
            if time.time() - self.access_times[key] < self.ttl:
                self.access_times[key] = time.time()
                print(f"缓存命中: {key[:50]}...")
                return self.cache[key]
            else:
                # 缓存过期
                del self.cache[key]
                del self.access_times[key]
        return None
    
    def set(self, key, value):
        """设置缓存"""
        if len(self.cache) >= self.max_size:
            # 移除最久未使用的
            oldest_key = min(self.access_times, key=self.access_times.get)
            del self.cache[oldest_key]
            del self.access_times[oldest_key]
        
        self.cache[key] = value
        self.access_times[key] = time.time()

# 3. 优化配置
optimized_config = {
    "llm": OpenAI(
        api_token="your-key",
        model="gpt-3.5-turbo",  # 对于性能考虑,使用更快的模型
        temperature=0.1,  # 降低随机性
        max_tokens=500,  # 限制输出长度
    ),
    # 性能优化选项
    "enable_cache": True,
    "cache_max_size": 1000,
    "cache_lifetime": 300,  # 5分钟
    
    # 代码执行限制
    "max_execution_time": 30,  # 最大执行时间
    "max_retries": 2,  # 减少重试次数
    
    # 数据采样(对大数据集)
    "sample_size": 10000,  # 采样大小
    "sample_strategy": "head",  # 采样策略
    
    # 并行处理
    "use_parallel": True,
    "max_workers": 4,
    
    # 内存优化
    "optimize_memory": True,
    "chunk_size": 10000,
}

# 4. 大数据处理优化
class BigDataHandler:
    """大数据处理器"""
    
    def __init__(self, df, chunk_size=10000):
        self.df = df
        self.chunk_size = chunk_size
        
    @performance_monitor
    def process_in_chunks(self, operation):
        """分块处理数据"""
        results = []
        total_chunks = (len(self.df) // self.chunk_size) + 1
        
        for i in range(total_chunks):
            start_idx = i * self.chunk_size
            end_idx = min((i + 1) * self.chunk_size, len(self.df))
            chunk = self.df.iloc[start_idx:end_idx]
            
            print(f"处理块 {i+1}/{total_chunks} ({len(chunk)} 行)")
            result = operation(chunk)
            results.append(result)
            
            # 清理内存
            del chunk
            
        return pd.concat(results, ignore_index=True) if results else pd.DataFrame()

# 5. 查询优化策略
def optimize_query(query, context=None):
    """优化自然语言查询"""
    # 查询重写规则
    rewrite_rules = {
        "显示所有数据": "显示前1000行数据",
        "计算全部": "抽样计算",
        "详细分析": "概要分析",
    }
    
    optimized_query = query
    for pattern, replacement in rewrite_rules.items():
        if pattern in query:
            optimized_query = optimized_query.replace(pattern, replacement)
            print(f"查询已优化: '{pattern}' -> '{replacement}'")
    
    return optimized_query

# 6. 性能测试
def performance_test():
    """性能测试函数"""
    # 创建测试数据
    test_data = pd.DataFrame({
        "id": range(100000),
        "value": np.random.randn(100000),
        "category": np.random.choice(["A", "B", "C", "D"], 100000)
    })
    
    # 创建SmartDataframe
    sdf_perf = SmartDataframe(test_data, config=optimized_config)
    
    # 测试查询
    test_queries = [
        "计算value的平均值",
        "按category分组统计",
        "找出value最大的100条记录",
        "创建value的直方图"
    ]
    
    cache = SmartCache()
    
    for query in test_queries:
        print(f"\n测试查询: {query}")
        
        # 检查缓存
        cached_result = cache.get(query)
        if cached_result is not None:
            print("从缓存获取结果")
            result = cached_result
        else:
            # 优化查询
            optimized = optimize_query(query)
            
            # 执行查询
            start_time = time.time()
            result = sdf_perf.chat(optimized)
            end_time = time.time()
            
            # 缓存结果
            cache.set(query, result)
            
            print(f"查询执行时间: {end_time - start_time:.2f}秒")
        
        print(f"结果类型: {type(result)}")

# 运行性能测试
performance_test()

# 7. 内存使用优化
def memory_optimization_tips():
    """内存优化建议"""
    tips = """
    PandasAI内存优化技巧:
    
    1. 数据采样:
       - 对于探索性分析,使用数据样本
       - 配置sample_size参数
    
    2. 数据类型优化:
       - 将object类型转换为category
       - 使用适当的数据类型(int8, float32等)
    
    3. 分块处理:
       - 大数据集分块处理
       - 使用chunk_size参数
    
    4. 及时清理:
       - 删除不需要的中间变量
       - 使用del释放内存
    
    5. 使用数据库:
       - 大数据存储在数据库中
       - 让数据库执行聚合操作
    
    6. 缓存策略:
       - 启用智能缓存
       - 设置合理的TTL
    """
    print(tips)

memory_optimization_tips()
4.4.2 异步处理与并发优化
import asyncio
import concurrent.futures
from pandasai import SmartDataframe
import pandas as pd
import numpy as np

class AsyncPandasAI:
    """异步PandasAI处理器"""
    
    def __init__(self, df, max_workers=4):
        self.df = df
        self.max_workers = max_workers
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
        
    async def process_queries_async(self, queries):
        """异步处理多个查询"""
        loop = asyncio.get_event_loop()
        
        # 准备任务
        tasks = []
        for query in queries:
            task = loop.run_in_executor(
                self.executor,
                self._process_single_query,
                query
            )
            tasks.append(task)
        
        # 并发执行
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 处理结果
        processed_results = []
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"查询 '{queries[i]}' 失败: {result}")
                processed_results.append(None)
            else:
                processed_results.append(result)
        
        return processed_results
    
    def _process_single_query(self, query):
        """处理单个查询"""
        sdf = SmartDataframe(
            self.df,
            config={
                "llm": OpenAI(api_token="your-key"),
                "verbose": False
            }
        )
        return sdf.chat(query)
    
    def process_batch(self, queries, batch_size=10):
        """批量处理查询"""
        all_results = []
        
        for i in range(0, len(queries), batch_size):
            batch = queries[i:i+batch_size]
            print(f"处理批次 {i//batch_size + 1}: {len(batch)} 个查询")
            
            # 同步方式处理批次
            with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
                future_to_query = {
                    executor.submit(self._process_single_query, query): query
                    for query in batch
                }
                
                for future in concurrent.futures.as_completed(future_to_query):
                    query = future_to_query[future]
                    try:
                        result = future.result()
                        all_results.append((query, result))
                        print(f"✓ 完成: {query}")
                    except Exception as e:
                        print(f"✗ 失败: {query} - {e}")
                        all_results.append((query, None))
        
        return all_results

# 异步处理示例
async def async_example():
    """异步处理示例"""
    # 创建测试数据
    df = pd.DataFrame({
        "date": pd.date_range("2024-01-01", periods=100, freq='D'),
        "value": np.random.randn(100) * 100 + 1000,
        "category": np.random.choice(["A", "B", "C"], 100)
    })
    
    # 创建处理器
    processor = AsyncPandasAI(df, max_workers=5)
    
    # 准备查询
    queries = [
        "计算value的平均值",
        "按category分组统计",
        "创建时间序列图",
        "检测异常值",
        "预测未来7天的趋势",
        "计算移动平均",
        "分析周末和工作日的差异",
        "创建热力图",
        "计算相关性矩阵",
        "生成统计报告"
    ]
    
    print("开始异步处理...")
    
    # 方法1:异步处理
    results = await processor.process_queries_async(queries)
    
    # 方法2:批量处理(同步)
    # results = processor.process_batch(queries, batch_size=3)
    
    print("\n处理完成!")
    for i, (query, result) in enumerate(zip(queries, results)):
        if result is not None:
            print(f"{i+1}. {query}: 成功")
        else:
            print(f"{i+1}. {query}: 失败")

# 运行异步示例(在支持async的环境中)
# asyncio.run(async_example())
4.4.3 监控与日志系统
import logging
import json
from datetime import datetime
from pandasai import SmartDataframe
import pandas as pd

class PerformanceMonitor:
    """性能监控系统"""
    
    def __init__(self, log_file="pandasai_performance.log"):
        self.log_file = log_file
        self.setup_logging()
        
    def setup_logging(self):
        """设置日志系统"""
        logger = logging.getLogger("PandasAI-Performance")
        logger.setLevel(logging.INFO)
        
        # 文件处理器
        file_handler = logging.FileHandler(self.log_file)
        file_handler.setLevel(logging.INFO)
        
        # 控制台处理器
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.WARNING)
        
        # 格式化
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        file_handler.setFormatter(formatter)
        console_handler.setFormatter(formatter)
        
        logger.addHandler(file_handler)
        logger.addHandler(console_handler)
        
        self.logger = logger
    
    def log_query(self, query, execution_time, result_size=None, status="success"):
        """记录查询日志"""
        log_entry = {
            "timestamp": datetime.now().isoformat(),
            "query": query,
            "execution_time": execution_time,
            "result_size": result_size,
            "status": status
        }
        
        self.logger.info(json.dumps(log_entry))
        
        # 性能警告
        if execution_time > 10:  # 超过10秒
            self.logger.warning(f"慢查询: {query} - 用时 {execution_time:.2f}秒")
    
    def generate_performance_report(self, days=7):
        """生成性能报告"""
        # 读取日志文件
        with open(self.log_file, 'r') as f:
            logs = [json.loads(line) for line in f if line.strip()]
        
        # 分析性能
        if not logs:
            return "暂无性能数据"
        
        # 计算统计信息
        execution_times = [log["execution_time"] for log in logs if "execution_time" in log]
        
        report = {
            "total_queries": len(logs),
            "success_rate": sum(1 for log in logs if log.get("status") == "success") / len(logs) * 100,
            "avg_execution_time": sum(execution_times) / len(execution_times) if execution_times else 0,
            "max_execution_time": max(execution_times) if execution_times else 0,
            "slow_queries": [log["query"] for log in logs if log.get("execution_time", 0) > 10],
            "common_queries": self._get_common_queries(logs),
            "performance_trend": self._calculate_trend(logs)
        }
        
        return report
    
    def _get_common_queries(self, logs, top_n=5):
        """获取常见查询"""
        from collections import Counter
        queries = [log["query"] for log in logs]
        return Counter(queries).most_common(top_n)
    
    def _calculate_trend(self, logs):
        """计算性能趋势"""
        # 按日期分组
        daily_data = {}
        for log in logs:
            date = log["timestamp"][:10]  # 提取日期
            if date not in daily_data:
                daily_data[date] = {"count": 0, "total_time": 0}
            daily_data[date]["count"] += 1
            daily_data[date]["total_time"] += log.get("execution_time", 0)
        
        # 计算每日平均值
        trend = {}
        for date, data in daily_data.items():
            trend[date] = data["total_time"] / data["count"]
        
        return trend

# 使用监控系统
def monitored_analysis():
    """带监控的分析"""
    # 创建监控器
    monitor = PerformanceMonitor()
    
    # 创建数据
    df = pd.DataFrame({
        "sales": np.random.randint(100, 1000, 1000),
        "profit": np.random.randint(10, 200, 1000),
        "region": np.random.choice(["North", "South", "East", "West"], 1000),
        "month": np.random.choice(["Jan", "Feb", "Mar", "Apr"], 1000)
    })
    
    # 创建SmartDataframe
    sdf = SmartDataframe(
        df,
        config={
            "llm": OpenAI(api_token="your-key"),
            "verbose": False,
            "enable_cache": True
        }
    )
    
    # 执行监控查询
    import time
    
    queries = [
        "计算各区域的平均销售额",
        "分析销售额与利润的关系",
        "预测下个月的销售趋势",
        "找出表现最好的区域",
        "创建销售仪表板"
    ]
    
    for query in queries:
        print(f"\n执行: {query}")
        start_time = time.time()
        
        try:
            result = sdf.chat(query)
            execution_time = time.time() - start_time
            
            # 记录日志
            result_size = len(result) if hasattr(result, '__len__') else None
            monitor.log_query(
                query=query,
                execution_time=execution_time,
                result_size=result_size,
                status="success"
            )
            
            print(f"✓ 成功 - 用时: {execution_time:.2f}秒")
            
        except Exception as e:
            execution_time = time.time() - start_time
            monitor.log_query(
                query=query,
                execution_time=execution_time,
                status=f"error: {str(e)}"
            )
            print(f"✗ 失败: {e}")
    
    # 生成性能报告
    print("\n" + "="*60)
    print("性能报告")
    print("="*60)
    
    report = monitor.generate_performance_report()
    for key, value in report.items():
        if isinstance(value, list):
            print(f"{key}:")
            for item in value:
                print(f"  - {item}")
        else:
            print(f"{key}: {value}")

# 运行监控示例
monitored_analysis()

总结与最佳实践

学习路径总结

学习阶段 主要内容 关键技能
初级阶段 基础安装、MockLLM使用、简单查询 环境配置、基本语法
中级阶段 真实LLM集成、数据库连接、自定义函数 API集成、SQL连接、函数扩展
高级阶段 性能优化、异步处理、插件开发 性能调优、并发编程、系统设计
专家阶段 架构设计、生产部署、团队协作 架构设计、CI/CD、团队管理

实用工具推荐

# 开发环境检查清单
def check_development_environment():
    """检查开发环境"""
    required_packages = [
        "pandas",
        "pandasai",
        "openai",  # 如果使用OpenAI
        "sqlalchemy",  # 如果使用数据库
        "plotly",  # 如果使用高级可视化
        "asyncio",  # 如果使用异步
        "logging"  # 如果使用日志
    ]
    
    print("开发环境检查清单:")
    print("="*60)
    
    for package in required_packages:
        try:
            __import__(package.replace("-", "_"))
            print(f"✓ {package}")
        except ImportError:
            print(f"✗ {package} - 需要安装")
    
    print("\n建议配置:")
    print("1. 使用虚拟环境")
    print("2. 设置环境变量")
    print("3. 配置版本控制")
    print("4. 设置监控和日志")
    print("5. 实施测试策略")

check_development_environment()

生产部署建议

# 生产配置示例
PRODUCTION_CONFIG = {
    "llm": {
        "provider": "openai",
        "model": "gpt-4",
        "api_key_env_var": "OPENAI_API_KEY",
        "timeout": 30,
        "max_retries": 3
    },
    "database": {
        "connection_pool_size": 10,
        "max_overflow": 20,
        "pool_recycle": 3600
    },
    "performance": {
        "enable_cache": True,
        "cache_ttl": 300,
        "max_cache_size": 10000,
        "query_timeout": 60,
        "max_result_size": 100000
    },
    "security": {
        "data_masking": True,
        "log_sanitization": True,
        "api_rate_limit": 100,
        "allowed_data_sources": ["database1", "api1"]
    },
    "monitoring": {
        "enable_logging": True,
        "log_level": "INFO",
        "performance_metrics": True,
        "alert_threshold": {
            "response_time": 10,
            "error_rate": 0.01,
            "cache_hit_rate": 0.8
        }
    }
}

# Docker部署示例
DOCKER_COMPOSE_TEMPLATE = """
version: '3.8'
services:
  pandasai-api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - DATABASE_URL=${DATABASE_URL}
      - REDIS_URL=${REDIS_URL}
    volumes:
      - ./logs:/app/logs
      - ./cache:/app/cache
    depends_on:
      - redis
      - database
    
  redis:
    image: redis:alpine
    ports:
      - "6379:6379"
    
  database:
    image: postgres:13
    environment:
      - POSTGRES_PASSWORD=${DB_PASSWORD}
    volumes:
      - postgres_data:/var/lib/postgresql/data

volumes:
  postgres_data:
"""

通过这个完整的进阶指南,您可以系统地学习PandasAI的高级功能,从基础使用到生产部署,全面提升数据分析自动化的能力。每个部分都包含实际代码示例和最佳实践,帮助您在实际项目中应用这些技术。

更多推荐