PandasAI应用与实战解析
PandasAI实战:环境搭建与基本使用指南 PandasAI是一款结合Pandas与AI技术的开源Python库,通过自然语言简化数据分析流程。本文提供完整安装指南: 核心功能 自然语言查询:用日常语言提问数据问题 自动可视化:生成图表和图形 数据预处理:处理缺失值和特征工程 多数据源支持:CSV、Excel及主流数据库 环境搭建步骤 创建Python 3.10虚拟环境 安装核心依赖:panda
·
1. PandasAI简介
- 定义:结合Pandas和AI的开源Python库
- 核心功能:使用自然语言进行数据查询和分析
- 支持数据源:CSV、XLSX、PostgreSQL、MySQL、BigQuery、Databricks、Snowflake等
2. 主要特点
- 自然语言查询:用日常语言提问数据问题
- 数据可视化:自动生成图表和图形
- 数据清理:处理缺失值问题
- 特征生成:提升数据质量
- 多数据源连接:支持多种数据库和文件格式
3. 技术架构
工作原理:
自然语言查询 → AI模型理解 → 转换为Python/SQL代码 → 与数据交互 → 返回结果
4. 环境安装步骤
步骤1:源码包下载与依赖管理
- 使用Python 3.10.x版本
- 从GitLab仓库下载源码(对应01分支)
- 明确依赖版本以避免兼容性问题
步骤2:安装Python依赖
- 安装PandasAI及相关依赖包
- 需要修复官方代码中的bug(文章中提到的)
- 配置生成式AI模型(如OpenAI GPT)
步骤3:运行Demo
- 使用官方示例代码进行测试
- 验证自然语言查询功能
- 测试数据可视化和分析能力
5. 应用价值
- 提升效率:减少编写复杂查询和分析代码的时间
- 降低门槛:非技术人员也能进行数据分析
- 全面功能:涵盖数据探索、清洗、可视化、特征工程全流程
6. 技术资源
- 官方文档:https://docs.pandas-ai.com
- GitHub仓库:https://github.com/Sinaptik-AI/pandas-ai
- 源码仓库:GitLab(作者提供的实例代码)
实际应用场景
- 业务分析师:快速生成报表和洞察
- 数据科学家:加速数据探索和预处理
- 开发人员:简化数据查询和分析流程
- 产品经理:直接与数据对话获取指标
注意事项
- 需要适当修复官方代码中的bug
- 注意依赖版本兼容性
- 需要配置有效的AI模型API(如OpenAI)
这篇文章为初学者提供了完整的PandasAI入门指南,从环境搭建到实际应用,展示了如何利用AI技术简化传统的数据分析工作流程。
PandasAI实战:环境搭建与基本使用
完整步骤详解
步骤1:环境准备与安装
1.1 创建虚拟环境(推荐)
# 使用conda
conda create -n pandasai_env python=3.10
conda activate pandasai_env
# 或使用venv
python -m venv pandasai_env
# Windows
pandasai_env\Scripts\activate
# Linux/Mac
source pandasai_env/bin/activate
1.2 安装核心依赖
# 基础包
pip install pandas numpy matplotlib seaborn
# PandasAI
pip install pandas-ai
# 如果需要使用OpenAI等大模型
pip install openai
# 或使用本地模型
pip install langchain
1.3 验证安装
import pandas as pd
import pandasai as pai
print(f"Pandas版本: {pd.__version__}")
print(f"PandasAI版本: {pai.__version__}")
步骤2:准备测试DataFrame
import pandas as pd
from datetime import datetime
# 创建示例DataFrame
def create_sample_dataframe():
data = {
'date': pd.date_range(start='2024-01-01', periods=30, freq='D'),
'city': ['北京']*10 + ['上海']*10 + ['广州']*10,
'temperature': [2, 3, 1, 4, 2, 3, 5, 6, 4, 3] +
[8, 9, 10, 8, 7, 9, 11, 10, 8, 9] +
[18, 19, 20, 21, 22, 20, 19, 21, 22, 23],
'humidity': [45, 47, 50, 48, 46, 49, 51, 52, 50, 48] +
[65, 66, 68, 67, 65, 69, 70, 68, 67, 66] +
[75, 76, 78, 77, 79, 76, 75, 78, 77, 76],
'sales': [1000, 1200, 800, 1500, 900, 1300, 1400, 1600, 1100, 1250] +
[2000, 2200, 1800, 2500, 1900, 2300, 2400, 2600, 2100, 2250] +
[3000, 3200, 2800, 3500, 2900, 3300, 3400, 3600, 3100, 3250],
'category': ['A', 'B', 'A', 'C', 'B', 'A', 'C', 'B', 'A', 'C'] * 3
}
df = pd.DataFrame(data)
# 添加一些缺失值
df.loc[5, 'sales'] = None
df.loc[15, 'humidity'] = None
df.loc[25, 'temperature'] = None
return df
# 创建并查看数据
df = create_sample_dataframe()
print("数据形状:", df.shape)
print("\n前5行数据:")
print(df.head())
print("\n数据基本信息:")
print(df.info())
print("\n描述性统计:")
print(df.describe())
步骤3:配置PandasAI并创建MockLLM
from pandasai import SmartDataframe
from pandasai.llm import OpenAI
from pandasai.llm.local_llm import LocalLLM
import warnings
warnings.filterwarnings('ignore')
# 方案1:使用MockLLM(用于测试)
class MockLLM:
"""模拟LLM类,返回预设的代码"""
def __init__(self):
self.history = []
def call(self, instruction: str, value: str, suffix: str = ""):
"""模拟LLM调用"""
# 记录历史
self.history.append({
'instruction': instruction,
'value': value,
'suffix': suffix
})
# 根据问题返回预设的pandas代码
if "前5行" in instruction or "前5条" in instruction:
return "df.head(5)"
elif "统计信息" in instruction or "describe" in instruction:
return "df.describe()"
elif "平均值" in instruction or "平均温度" in instruction:
if "温度" in instruction:
return "df['temperature'].mean()"
elif "湿度" in instruction:
return "df['humidity'].mean()"
elif "销售额" in instruction:
return "df['sales'].mean()"
elif "各城市" in instruction and "平均温度" in instruction:
return "df.groupby('city')['temperature'].mean()"
elif "缺失值" in instruction:
return "df.isnull().sum()"
elif "北京" in instruction and "销售额" in instruction:
return "df[df['city'] == '北京']['sales'].sum()"
elif "折线图" in instruction or "趋势" in instruction:
return """
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 6))
for city in df['city'].unique():
city_data = df[df['city'] == city]
plt.plot(city_data['date'], city_data['temperature'], label=city, marker='o')
plt.title('各城市温度趋势')
plt.xlabel('日期')
plt.ylabel('温度(°C)')
plt.legend()
plt.grid(True)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
return plt
"""
else:
# 默认返回前3行
return "df.head(3)"
def chat(self, prompt: str):
"""聊天接口"""
return self.call(prompt, "", "")
# 方案2:使用真实的OpenAI API(需要API密钥)
# llm = OpenAI(api_token="your-api-key-here")
# 创建MockLLM实例
mock_llm = MockLLM()
# 创建SmartDataframe
sdf = SmartDataframe(df, config={"llm": mock_llm, "verbose": True})
步骤4:运行自然语言查询
def run_queries(smart_df):
"""运行一系列自然语言查询"""
queries = [
"显示数据的前5行",
"给我数据的统计信息",
"计算平均温度是多少?",
"计算各城市的平均温度",
"查看数据中的缺失值情况",
"计算北京的销售额总和",
"绘制各城市温度变化趋势的折线图"
]
results = {}
for i, query in enumerate(queries, 1):
print(f"\n{'='*50}")
print(f"查询 {i}: {query}")
print('-'*30)
try:
# 执行查询
result = smart_df.chat(query)
results[query] = result
# 显示结果
if isinstance(result, pd.DataFrame):
print(result.to_string())
elif isinstance(result, pd.Series):
print(result.to_string())
elif hasattr(result, 'show'): # 如果是matplotlib对象
print("已生成图表")
# 在实际环境中,可以保存图表
# result.savefig(f"chart_{i}.png")
else:
print(f"结果: {result}")
except Exception as e:
print(f"查询失败: {e}")
results[query] = f"错误: {e}"
return results
# 运行查询
results = run_queries(sdf)
步骤5:高级功能演示
def advanced_features_demo():
"""演示PandasAI的高级功能"""
print("\n" + "="*60)
print("高级功能演示")
print("="*60)
# 1. 数据清理示例
print("\n1. 数据清理 - 处理缺失值")
print("原始数据缺失情况:")
print(df.isnull().sum())
# 使用PandasAI进行数据清理(模拟)
clean_query = "清理数据中的缺失值,用平均值填充"
print(f"\n执行查询: {clean_query}")
# 在实际PandasAI中,这会生成相应的清理代码
# 2. 特征工程示例
print("\n2. 特征工程 - 创建新特征")
feature_query = "创建一个新特征'temp_category',根据温度分类:低温(<10)、中温(10-20)、高温(>20)"
print(f"执行查询: {feature_query}")
# 手动实现以演示
def categorize_temp(temp):
if pd.isna(temp):
return '未知'
elif temp < 10:
return '低温'
elif temp <= 20:
return '中温'
else:
return '高温'
df['temp_category'] = df['temperature'].apply(categorize_temp)
print("新增特征后的数据前5行:")
print(df[['date', 'city', 'temperature', 'temp_category']].head())
# 3. 数据聚合分析
print("\n3. 数据聚合分析")
agg_query = "按城市和温度类别统计平均销售额"
print(f"执行查询: {agg_query}")
agg_result = df.groupby(['city', 'temp_category'])['sales'].mean()
print(agg_result)
# 4. 时间序列分析
print("\n4. 时间序列分析 - 计算7天移动平均")
ts_query = "计算每个城市销售额的7天移动平均值"
print(f"执行查询: {ts_query}")
# 演示代码
df.set_index('date', inplace=True)
for city in df['city'].unique():
city_sales = df[df['city'] == city]['sales']
ma_7 = city_sales.rolling(window=7).mean()
print(f"{city}的7天移动平均销售额: {ma_7.dropna().iloc[-1] if len(ma_7.dropna()) > 0 else '数据不足'}")
df.reset_index(inplace=True)
return df
# 运行高级功能演示
enhanced_df = advanced_features_demo()
步骤6:实际应用场景示例
def real_world_scenarios():
"""实际应用场景演示"""
print("\n" + "="*60)
print("实际应用场景示例")
print("="*60)
# 场景1:销售数据分析
print("\n场景1: 销售数据分析")
sales_scenarios = [
"哪个月份的销售额最高?",
"哪个城市的平均销售额最高?",
"按类别分析销售额分布",
"找出销售额最高的3天"
]
for scenario in sales_scenarios:
print(f"\n问题: {scenario}")
# 在实际PandasAI中,可以直接用自然语言查询
# result = sdf.chat(scenario)
# print(f"答案: {result}")
# 场景2:气象数据分析
print("\n场景2: 气象数据分析")
weather_scenarios = [
"哪个城市的温度波动最大?",
"温度和湿度之间有什么关系?",
"预测未来3天的温度趋势",
"找出异常的温度值"
]
for scenario in weather_scenarios:
print(f"\n问题: {scenario}")
# 场景3:业务报告生成
print("\n场景3: 自动生成业务报告")
report_query = """
生成一份数据分析报告,包括:
1. 总体销售情况概览
2. 各城市表现对比
3. 温度对销售的影响分析
4. 主要发现和建议
"""
print(f"\n报告生成请求: {report_query}")
print("\n模拟报告内容:")
print("-"*40)
print("数据分析报告")
print("-"*40)
print("1. 总体销售情况:")
print(f" 总销售额: {df['sales'].sum():,.0f}元")
print(f" 平均日销售额: {df['sales'].mean():,.0f}元")
print(f" 销售天数: {df['date'].nunique()}天")
print("\n2. 各城市表现对比:")
city_sales = df.groupby('city')['sales'].sum()
for city, sales in city_sales.items():
print(f" {city}: {sales:,.0f}元")
print("\n3. 温度对销售的影响:")
temp_sales_corr = df['temperature'].corr(df['sales'])
print(f" 温度与销售额的相关系数: {temp_sales_corr:.3f}")
print("\n4. 主要发现和建议:")
print(" - 上海和广州的销售额明显高于北京")
print(" - 温度与销售额呈正相关关系")
print(" - 建议在温度较高的季节加大营销力度")
# 运行应用场景演示
real_world_scenarios()
步骤7:完整示例代码整合
# 完整的示例代码
def complete_demo():
"""
PandasAI完整演示
包含环境检查、数据准备、查询执行和结果展示
"""
print("PandasAI 完整演示")
print("="*60)
try:
# 1. 环境检查
print("1. 检查环境...")
import pandas as pd
import numpy as np
from pandasai import SmartDataframe
print(" ✓ 环境检查通过")
# 2. 创建数据
print("\n2. 创建示例数据...")
df = create_sample_dataframe()
print(f" ✓ 创建了包含 {len(df)} 行数据的DataFrame")
# 3. 初始化PandasAI
print("\n3. 初始化PandasAI...")
mock_llm = MockLLM()
sdf = SmartDataframe(df, config={"llm": mock_llm, "verbose": False})
print(" ✓ PandasAI初始化完成")
# 4. 执行示例查询
print("\n4. 执行自然语言查询...")
print("\n示例查询1: '显示前3行数据'")
result1 = sdf.chat("显示前3行数据")
print(result1)
print("\n示例查询2: '计算平均销售额'")
result2 = sdf.chat("计算平均销售额")
print(f"平均销售额: {result2}")
print("\n示例查询3: '按城市分组统计平均温度'")
result3 = sdf.chat("按城市分组统计平均温度")
print(result3)
print("\n✓ 演示完成!")
return {
'dataframe': df,
'smart_dataframe': sdf,
'results': {
'前3行数据': result1,
'平均销售额': result2,
'各城市平均温度': result3
}
}
except ImportError as e:
print(f"✗ 导入错误: {e}")
print("请确保已安装必要的包:")
print("pip install pandas pandas-ai numpy")
return None
except Exception as e:
print(f"✗ 发生错误: {e}")
return None
# 运行完整演示
demo_results = complete_demo()
if demo_results:
print("\n" + "="*60)
print("演示总结")
print("="*60)
print(f"1. 数据规模: {len(demo_results['dataframe'])} 行 × {len(demo_results['dataframe'].columns)} 列")
print(f"2. 成功执行查询数: {len(demo_results['results'])}")
print(f"3. 使用的列: {list(demo_results['dataframe'].columns)}")
print("\n您可以使用以下方式继续探索:")
print(" sdf.chat('您的自然语言问题')")
print("\n例如:")
print(" sdf.chat('哪天的销售额最高?')")
print(" sdf.chat('绘制温度分布直方图')")
print(" sdf.chat('按星期分析销售趋势')")
关键要点总结
1. 核心优势
- 自然语言接口:无需编写复杂的Pandas代码
- 智能分析:自动生成分析代码和可视化
- 降低门槛:业务人员可直接与数据对话
2. 使用建议
# 最佳实践
# 1. 明确问题
question = "分析2024年1月各城市的销售趋势"
# 2. 逐步细化
sub_questions = [
"计算各城市1月总销售额",
"比较各城市日均销售额",
"绘制销售额趋势图"
]
# 3. 验证结果
for q in sub_questions:
result = sdf.chat(q)
print(f"问题: {q}")
print(f"结果: {result}\n")
3. 注意事项
- MockLLM仅用于测试,生产环境需要真实的LLM
- 复杂查询可能需要多次交互
- 结果需要人工验证准确性
- 注意数据隐私和安全
4. 下一步学习
- 接入真实LLM(OpenAI、本地模型等)
- 学习高级数据连接功能
- 探索自定义函数和插件
- 了解性能优化技巧
这个完整的示例展示了PandasAI从环境搭建到实际应用的全过程。通过MockLLM模拟,您可以在本地环境中体验PandasAI的自然语言查询能力,为后续接入真实AI模型打下基础。
PandasAI进阶实战:深入学习路径详解
4. 下一步学习详细指南
4.1 接入真实LLM(OpenAI、本地模型等)
4.1.1 接入OpenAI API
# 安装必要的包
# pip install openai pandasai python-dotenv
import os
from dotenv import load_dotenv
from pandasai import SmartDataframe
from pandasai.llm import OpenAI
import pandas as pd
# 1. 配置API密钥
load_dotenv() # 从.env文件加载环境变量
# 方法1:使用环境变量
os.environ["OPENAI_API_KEY"] = "your-api-key-here"
# 方法2:直接配置
llm = OpenAI(
api_token="sk-your-openai-api-key",
model="gpt-4", # 或 "gpt-3.5-turbo"
temperature=0.7,
max_tokens=1000,
timeout=120, # 请求超时时间
)
# 2. 创建数据集
data = {
"产品": ["手机", "平板", "电脑", "手表", "耳机"] * 4,
"季度": ["Q1"]*5 + ["Q2"]*5 + ["Q3"]*5 + ["Q4"]*5,
"销售额": [10000, 8000, 15000, 5000, 3000] * 4,
"成本": [6000, 5000, 10000, 3000, 1800] * 4,
"地区": ["华东", "华南", "华北", "华西", "华中"] * 4
}
df = pd.DataFrame(data)
# 3. 创建SmartDataframe
sdf = SmartDataframe(
df,
config={
"llm": llm,
"verbose": True, # 显示详细日志
"save_logs": True, # 保存日志
"enable_cache": True, # 启用缓存
"max_retries": 3 # 最大重试次数
}
)
# 4. 使用真实LLM进行查询
queries = [
"计算每个产品的平均销售额",
"哪个季度的总销售额最高?",
"绘制各产品销售额的柱状图",
"计算每个产品的利润率((销售额-成本)/销售额)",
"分析各地区的销售表现并给出建议"
]
for i, query in enumerate(queries, 1):
print(f"\n查询 {i}: {query}")
print("-" * 50)
try:
result = sdf.chat(query)
if hasattr(result, '__repr__'):
print(result)
else:
print("查询完成!")
except Exception as e:
print(f"查询失败: {str(e)}")
# 5. 高级配置示例
advanced_config = {
"llm": OpenAI(
api_token="your-api-key",
model="gpt-4",
temperature=0.3, # 更确定的输出
max_tokens=2000,
top_p=0.9,
frequency_penalty=0.1,
presence_penalty=0.1,
),
"conversational": True, # 启用对话模式
"memory": True, # 启用记忆功能
"custom_prompts": {
"data_visualization": "请为以下数据创建可视化图表:{prompt}"
},
"custom_whitelisted_dependencies": ["seaborn", "plotly"]
}
4.1.2 接入本地开源模型(使用Ollama)
# 安装必要包
# pip install ollama langchain pandasai
from pandasai import SmartDataframe
from pandasai.llm import Ollama
import pandas as pd
# 1. 确保Ollama服务正在运行
# 在终端运行:ollama serve
# 下载模型:ollama pull llama2 或 ollama pull mistral
# 2. 配置本地LLM
local_llm = Ollama(
model="llama2", # 或 "mistral", "codellama"
base_url="http://localhost:11434", # Ollama默认地址
temperature=0.7,
max_tokens=2000,
# 可选:设置自定义提示模板
custom_prompt_template="""
你是一个数据分析助手。用户会给你一个DataFrame和一些查询。
请用Python代码回答问题。
数据信息:
{df_head}
用户查询:{query}
请生成合适的代码:
"""
)
# 3. 准备数据
df = pd.read_csv("your_data.csv") # 或从其他源加载
# 4. 创建SmartDataframe
sdf_local = SmartDataframe(
df,
config={
"llm": local_llm,
"verbose": True,
"enforce_privacy": True, # 隐私模式,不发送数据到外部
"use_error_correction_framework": True # 使用错误纠正框架
}
)
# 5. 测试查询
try:
result = sdf_local.chat("数据的基本统计信息是什么?")
print(result)
except Exception as e:
print(f"错误: {e}")
4.1.3 使用多模型切换
from pandasai import SmartDataframe
from pandasai.llm import OpenAI, Ollama, HuggingFaceLLM
import pandas as pd
class MultiModelManager:
"""多模型管理器"""
def __init__(self):
self.models = {}
self.current_model = None
def register_model(self, name, llm_instance):
"""注册模型"""
self.models[name] = llm_instance
def switch_model(self, name):
"""切换模型"""
if name in self.models:
self.current_model = self.models[name]
return True
return False
def get_model(self, name=None):
"""获取模型"""
if name:
return self.models.get(name)
return self.current_model
# 初始化管理器
manager = MultiModelManager()
# 注册多个模型
manager.register_model(
"openai_gpt4",
OpenAI(api_token="your-key", model="gpt-4")
)
manager.register_model(
"openai_gpt3",
OpenAI(api_token="your-key", model="gpt-3.5-turbo")
)
manager.register_model(
"local_llama",
Ollama(model="llama2", base_url="http://localhost:11434")
)
# 根据需求切换模型
df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
# 使用GPT-4处理复杂查询
manager.switch_model("openai_gpt4")
sdf_gpt4 = SmartDataframe(df, config={"llm": manager.get_model()})
complex_result = sdf_gpt4.chat("进行时间序列预测分析")
# 使用本地模型处理简单查询
manager.switch_model("local_llama")
sdf_local = SmartDataframe(df, config={"llm": manager.get_model()})
simple_result = sdf_local.chat("计算平均值")
4.2 学习高级数据连接功能
4.2.1 连接多种数据库
# 安装必要包
# pip install pandasai[sql] sqlalchemy psycopg2-binary pymysql
from pandasai import SmartDataframe
from pandasai.connectors import (
SQLConnector,
PostgreSQLConnector,
MySQLConnector,
SnowflakeConnector,
BigQueryConnector
)
import pandas as pd
# 1. PostgreSQL连接
postgres_connector = PostgreSQLConnector(
config={
"host": "localhost",
"port": 5432,
"database": "your_database",
"username": "your_username",
"password": "your_password",
"table": "sales_data", # 或使用SQL查询
# "query": "SELECT * FROM sales WHERE date > '2024-01-01'"
}
)
# 2. MySQL连接
mysql_connector = MySQLConnector(
config={
"host": "localhost",
"port": 3306,
"database": "your_db",
"username": "root",
"password": "password",
"table": "customer_data"
}
)
# 3. Snowflake连接
snowflake_connector = SnowflakeConnector(
config={
"account": "your_account",
"username": "your_username",
"password": "your_password",
"database": "your_database",
"schema": "your_schema",
"warehouse": "your_warehouse",
"role": "your_role",
"table": "large_dataset"
}
)
# 4. 通用SQL连接器
generic_connector = SQLConnector(
config={
"connection_string": "postgresql://user:password@localhost/dbname",
"table": "your_table"
}
)
# 5. 创建SmartDataframe并查询
connector = postgres_connector # 选择要使用的连接器
sdf_db = SmartDataframe(
connector,
config={
"llm": OpenAI(api_token="your-key"),
"verbose": True
}
)
# 自然语言查询数据库
queries = [
"显示最近30天的销售记录",
"计算每个地区的总销售额",
"找出销售额最高的10个产品",
"分析销售趋势并预测下个月销售额"
]
for query in queries:
print(f"\n查询: {query}")
try:
result = sdf_db.chat(query)
if isinstance(result, pd.DataFrame):
print(f"返回{len(result)}行数据")
print(result.head())
else:
print(result)
except Exception as e:
print(f"错误: {e}")
4.2.2 多数据源联合查询
from pandasai import SmartDatalake
from pandasai.connectors import (
PostgreSQLConnector,
CSVConnector,
ExcelConnector
)
# 1. 创建多个数据源连接器
sales_connector = PostgreSQLConnector({
"host": "localhost",
"database": "sales_db",
"table": "transactions"
})
customers_connector = CSVConnector({
"path": "/path/to/customers.csv"
})
products_connector = ExcelConnector({
"path": "/path/to/products.xlsx",
"sheet_name": "ProductInfo"
})
# 2. 创建数据湖(支持多数据源)
datalake = SmartDatalake(
[sales_connector, customers_connector, products_connector],
config={
"llm": OpenAI(api_token="your-key"),
"verbose": True
}
)
# 3. 跨数据源查询
cross_source_queries = [
# 关联查询
"将销售数据与客户数据关联,分析VIP客户的购买行为",
# 复杂分析
"计算每个产品类别的销售额,并按地区分组",
# 数据整合
"创建完整的销售报告,包含产品信息、客户信息和交易详情",
# 业务洞察
"找出最受欢迎的产品组合,并建议捆绑销售策略"
]
for query in cross_source_queries:
print(f"\n跨源查询: {query}")
try:
result = datalake.chat(query)
print(f"查询完成!")
if isinstance(result, pd.DataFrame):
print(f"返回数据形状: {result.shape}")
except Exception as e:
print(f"错误: {e}")
4.2.3 实时API数据连接
# 安装必要包
# pip install requests pandasai
import requests
from pandasai import SmartDataframe
from pandasai.connectors import BaseConnector
import pandas as pd
class APIConnector(BaseConnector):
"""自定义API连接器"""
def __init__(self, config):
self.api_url = config["api_url"]
self.api_key = config.get("api_key")
self.headers = config.get("headers", {})
self.params = config.get("params", {})
def head(self, n=5):
"""获取数据头部"""
return self._fetch_data().head(n)
def _fetch_data(self):
"""从API获取数据"""
headers = self.headers.copy()
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
response = requests.get(
self.api_url,
headers=headers,
params=self.params,
timeout=30
)
response.raise_for_status()
data = response.json()
# 假设API返回JSON列表
return pd.DataFrame(data)
@property
def _df(self):
"""获取完整DataFrame"""
return self._fetch_data()
# 使用自定义API连接器
api_connector = APIConnector({
"api_url": "https://api.example.com/data",
"api_key": "your_api_key_here",
"headers": {"Content-Type": "application/json"},
"params": {"limit": 1000}
})
sdf_api = SmartDataframe(
api_connector,
config={
"llm": OpenAI(api_token="your-key"),
"verbose": True
}
)
# 查询实时数据
result = sdf_api.chat("分析最新的数据趋势")
print(result)
4.3 探索自定义函数和插件
4.3.1 创建自定义分析函数
from pandasai import SmartDataframe
from pandasai.helpers import code_manager
import pandas as pd
import numpy as np
# 1. 定义自定义函数库
class CustomAnalytics:
"""自定义分析函数库"""
@staticmethod
def calculate_cagr(start_value, end_value, periods):
"""计算复合年增长率"""
if start_value <= 0:
return 0
return (end_value / start_value) ** (1 / periods) - 1
@staticmethod
def detect_anomalies_zscore(series, threshold=3):
"""使用Z-score检测异常值"""
mean = np.mean(series)
std = np.std(series)
z_scores = (series - mean) / std
return np.abs(z_scores) > threshold
@staticmethod
def calculate_roi(investment, returns):
"""计算投资回报率"""
if investment == 0:
return 0
return (returns - investment) / investment
@staticmethod
def create_segments(data, column, bins, labels):
"""创建数据分段"""
return pd.cut(data[column], bins=bins, labels=labels)
# 2. 注册自定义函数
df = pd.DataFrame({
"month": pd.date_range("2024-01-01", periods=12, freq='M'),
"revenue": [100, 120, 130, 115, 140, 160, 180, 200, 190, 210, 220, 230],
"cost": [70, 80, 85, 75, 90, 100, 120, 130, 125, 140, 150, 155]
})
sdf_custom = SmartDataframe(
df,
config={
"llm": OpenAI(api_token="your-key"),
"custom_whitelisted_dependencies": [
"CustomAnalytics",
"calculate_cagr",
"detect_anomalies_zscore",
"calculate_roi",
"create_segments"
],
# 添加自定义导入
"custom_imports": """
from custom_analytics import CustomAnalytics
import numpy as np
"""
}
)
# 3. 使用自定义函数的查询
custom_queries = [
"使用calculate_cagr函数计算收入的复合年增长率",
"使用detect_anomalies_zscore检测收入中的异常值",
"使用calculate_roi计算每个月的投资回报率",
"使用create_segments将收入分为低、中、高三段"
]
for query in custom_queries:
print(f"\n自定义查询: {query}")
try:
result = sdf_custom.chat(query)
print(result)
except Exception as e:
print(f"错误: {e}")
4.3.2 创建自定义可视化插件
from pandasai import SmartDataframe
from pandasai.middlewares import BaseMiddleware
import plotly.graph_objects as go
import plotly.express as px
class PlotlyVisualizer(BaseMiddleware):
"""Plotly可视化中间件"""
def run(self, code):
"""修改生成的代码以使用Plotly"""
# 检测matplotlib代码并替换为plotly
if "plt.show()" in code or "matplotlib" in code:
code = self._convert_to_plotly(code)
return code
def _convert_to_plotly(self, code):
"""将matplotlib代码转换为plotly"""
conversions = {
"import matplotlib.pyplot as plt": "import plotly.express as px\nimport plotly.graph_objects as go",
"plt.bar(": "go.Bar(",
"plt.plot(": "go.Scatter(",
"plt.scatter(": "go.Scatter(mode='markers', ",
"plt.hist(": "go.Histogram(",
"plt.show()": "fig.show()",
"plt.figure(": "fig = go.Figure(",
"plt.title(": "fig.update_layout(title=",
"plt.xlabel(": "fig.update_layout(xaxis_title=",
"plt.ylabel(": "fig.update_layout(yaxis_title=",
"plt.legend()": "fig.update_layout(showlegend=True)",
"plt.grid(": "# Grid removed for plotly",
}
for old, new in conversions.items():
code = code.replace(old, new)
return code
class CustomVisualizations:
"""自定义可视化函数"""
@staticmethod
def create_waterfall(df, values, labels, title="Waterfall Chart"):
"""创建瀑布图"""
fig = go.Figure(go.Waterfall(
name="业绩",
orientation="v",
measure=["relative"] * len(df),
x=df[labels],
y=df[values],
connector={"line": {"color": "rgb(63, 63, 63)"}},
))
fig.update_layout(
title=title,
showlegend=True,
waterfallgap=0.3,
)
return fig
@staticmethod
def create_sunburst(df, path, values, title="Sunburst Chart"):
"""创建旭日图"""
fig = px.sunburst(
df,
path=path,
values=values,
title=title
)
return fig
# 使用自定义可视化
df_viz = pd.DataFrame({
"category": ["A", "B", "C", "A", "B", "C"],
"subcategory": ["A1", "B1", "C1", "A2", "B2", "C2"],
"value": [100, 150, 200, 120, 180, 220],
"month": ["Jan", "Jan", "Jan", "Feb", "Feb", "Feb"]
})
sdf_viz = SmartDataframe(
df_viz,
config={
"llm": OpenAI(api_token="your-key"),
"middlewares": [PlotlyVisualizer()],
"custom_whitelisted_dependencies": [
"CustomVisualizations",
"create_waterfall",
"create_sunburst"
],
"save_charts": True,
"save_charts_path": "./charts"
}
)
# 生成自定义可视化
viz_queries = [
"使用create_waterfall创建价值的瀑布图",
"使用create_sunburst创建分类的旭日图",
"创建一个交互式的散点图矩阵"
]
for query in viz_queries:
print(f"\n可视化查询: {query}")
try:
result = sdf_viz.chat(query)
# 在Jupyter中会自动显示图表
# 在脚本中,可以保存图表
if hasattr(result, 'write_html'):
result.write_html(f"chart_{query[:10]}.html")
print("图表已保存为HTML文件")
except Exception as e:
print(f"错误: {e}")
4.3.3 创建数据质量检查插件
import pandas as pd
from pandasai import SmartDataframe
from pandasai.middlewares import BaseMiddleware
class DataQualityChecker(BaseMiddleware):
"""数据质量检查中间件"""
def __init__(self, thresholds=None):
self.thresholds = thresholds or {
"missing_threshold": 0.3,
"outlier_threshold": 3,
"duplicate_threshold": 0.1
}
def run(self, df):
"""执行数据质量检查"""
quality_report = {
"summary": {},
"issues": [],
"suggestions": []
}
# 检查缺失值
missing_percentage = df.isnull().sum() / len(df)
high_missing = missing_percentage[missing_percentage > self.thresholds["missing_threshold"]]
if len(high_missing) > 0:
quality_report["issues"].append({
"type": "high_missing_values",
"columns": high_missing.index.tolist(),
"values": high_missing.values.tolist()
})
quality_report["suggestions"].append(
"考虑删除缺失值超过30%的列或使用插值方法"
)
# 检查重复值
duplicate_rows = df.duplicated().sum()
duplicate_percentage = duplicate_rows / len(df)
if duplicate_percentage > self.thresholds["duplicate_threshold"]:
quality_report["issues"].append({
"type": "high_duplicates",
"count": duplicate_rows,
"percentage": duplicate_percentage
})
quality_report["suggestions"].append(
"考虑删除重复行或调查数据收集过程"
)
# 生成摘要
quality_report["summary"] = {
"total_rows": len(df),
"total_columns": len(df.columns),
"missing_values": df.isnull().sum().sum(),
"duplicate_rows": duplicate_rows,
"data_types": df.dtypes.to_dict()
}
return quality_report
# 使用数据质量检查
df_quality = pd.DataFrame({
"A": [1, 2, None, 4, 5],
"B": [1, 1, 3, 4, 5], # 有重复
"C": [100, 200, 300, 400, 500],
"D": [None, None, 3, 4, 5] # 高缺失
})
quality_checker = DataQualityChecker()
# 创建SmartDataframe并添加质量检查
sdf_quality = SmartDataframe(
df_quality,
config={
"llm": OpenAI(api_token="your-key"),
"custom_middlewares": [quality_checker]
}
)
# 自动质量检查
print("数据质量报告:")
quality_report = quality_checker.run(df_quality)
for key, value in quality_report.items():
print(f"\n{key}:")
if isinstance(value, dict):
for k, v in value.items():
print(f" {k}: {v}")
elif isinstance(value, list):
for item in value:
print(f" {item}")
# 使用自然语言查询数据质量问题
result = sdf_quality.chat("识别数据质量问题并给出修复建议")
print(f"\nAI分析结果:\n{result}")
4.4 了解性能优化技巧
4.4.1 查询优化与缓存策略
from pandasai import SmartDataframe
from pandasai.llm import OpenAI
import pandas as pd
import time
from functools import lru_cache
# 1. 性能监控装饰器
def performance_monitor(func):
"""性能监控装饰器"""
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
execution_time = end_time - start_time
print(f"函数 {func.__name__} 执行时间: {execution_time:.2f}秒")
return result
return wrapper
# 2. 智能缓存系统
class SmartCache:
"""智能缓存系统"""
def __init__(self, max_size=1000, ttl=3600):
self.cache = {}
self.max_size = max_size
self.ttl = ttl # 缓存生存时间(秒)
self.access_times = {}
def get(self, key):
"""获取缓存"""
if key in self.cache:
# 检查是否过期
if time.time() - self.access_times[key] < self.ttl:
self.access_times[key] = time.time()
print(f"缓存命中: {key[:50]}...")
return self.cache[key]
else:
# 缓存过期
del self.cache[key]
del self.access_times[key]
return None
def set(self, key, value):
"""设置缓存"""
if len(self.cache) >= self.max_size:
# 移除最久未使用的
oldest_key = min(self.access_times, key=self.access_times.get)
del self.cache[oldest_key]
del self.access_times[oldest_key]
self.cache[key] = value
self.access_times[key] = time.time()
# 3. 优化配置
optimized_config = {
"llm": OpenAI(
api_token="your-key",
model="gpt-3.5-turbo", # 对于性能考虑,使用更快的模型
temperature=0.1, # 降低随机性
max_tokens=500, # 限制输出长度
),
# 性能优化选项
"enable_cache": True,
"cache_max_size": 1000,
"cache_lifetime": 300, # 5分钟
# 代码执行限制
"max_execution_time": 30, # 最大执行时间
"max_retries": 2, # 减少重试次数
# 数据采样(对大数据集)
"sample_size": 10000, # 采样大小
"sample_strategy": "head", # 采样策略
# 并行处理
"use_parallel": True,
"max_workers": 4,
# 内存优化
"optimize_memory": True,
"chunk_size": 10000,
}
# 4. 大数据处理优化
class BigDataHandler:
"""大数据处理器"""
def __init__(self, df, chunk_size=10000):
self.df = df
self.chunk_size = chunk_size
@performance_monitor
def process_in_chunks(self, operation):
"""分块处理数据"""
results = []
total_chunks = (len(self.df) // self.chunk_size) + 1
for i in range(total_chunks):
start_idx = i * self.chunk_size
end_idx = min((i + 1) * self.chunk_size, len(self.df))
chunk = self.df.iloc[start_idx:end_idx]
print(f"处理块 {i+1}/{total_chunks} ({len(chunk)} 行)")
result = operation(chunk)
results.append(result)
# 清理内存
del chunk
return pd.concat(results, ignore_index=True) if results else pd.DataFrame()
# 5. 查询优化策略
def optimize_query(query, context=None):
"""优化自然语言查询"""
# 查询重写规则
rewrite_rules = {
"显示所有数据": "显示前1000行数据",
"计算全部": "抽样计算",
"详细分析": "概要分析",
}
optimized_query = query
for pattern, replacement in rewrite_rules.items():
if pattern in query:
optimized_query = optimized_query.replace(pattern, replacement)
print(f"查询已优化: '{pattern}' -> '{replacement}'")
return optimized_query
# 6. 性能测试
def performance_test():
"""性能测试函数"""
# 创建测试数据
test_data = pd.DataFrame({
"id": range(100000),
"value": np.random.randn(100000),
"category": np.random.choice(["A", "B", "C", "D"], 100000)
})
# 创建SmartDataframe
sdf_perf = SmartDataframe(test_data, config=optimized_config)
# 测试查询
test_queries = [
"计算value的平均值",
"按category分组统计",
"找出value最大的100条记录",
"创建value的直方图"
]
cache = SmartCache()
for query in test_queries:
print(f"\n测试查询: {query}")
# 检查缓存
cached_result = cache.get(query)
if cached_result is not None:
print("从缓存获取结果")
result = cached_result
else:
# 优化查询
optimized = optimize_query(query)
# 执行查询
start_time = time.time()
result = sdf_perf.chat(optimized)
end_time = time.time()
# 缓存结果
cache.set(query, result)
print(f"查询执行时间: {end_time - start_time:.2f}秒")
print(f"结果类型: {type(result)}")
# 运行性能测试
performance_test()
# 7. 内存使用优化
def memory_optimization_tips():
"""内存优化建议"""
tips = """
PandasAI内存优化技巧:
1. 数据采样:
- 对于探索性分析,使用数据样本
- 配置sample_size参数
2. 数据类型优化:
- 将object类型转换为category
- 使用适当的数据类型(int8, float32等)
3. 分块处理:
- 大数据集分块处理
- 使用chunk_size参数
4. 及时清理:
- 删除不需要的中间变量
- 使用del释放内存
5. 使用数据库:
- 大数据存储在数据库中
- 让数据库执行聚合操作
6. 缓存策略:
- 启用智能缓存
- 设置合理的TTL
"""
print(tips)
memory_optimization_tips()
4.4.2 异步处理与并发优化
import asyncio
import concurrent.futures
from pandasai import SmartDataframe
import pandas as pd
import numpy as np
class AsyncPandasAI:
"""异步PandasAI处理器"""
def __init__(self, df, max_workers=4):
self.df = df
self.max_workers = max_workers
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
async def process_queries_async(self, queries):
"""异步处理多个查询"""
loop = asyncio.get_event_loop()
# 准备任务
tasks = []
for query in queries:
task = loop.run_in_executor(
self.executor,
self._process_single_query,
query
)
tasks.append(task)
# 并发执行
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"查询 '{queries[i]}' 失败: {result}")
processed_results.append(None)
else:
processed_results.append(result)
return processed_results
def _process_single_query(self, query):
"""处理单个查询"""
sdf = SmartDataframe(
self.df,
config={
"llm": OpenAI(api_token="your-key"),
"verbose": False
}
)
return sdf.chat(query)
def process_batch(self, queries, batch_size=10):
"""批量处理查询"""
all_results = []
for i in range(0, len(queries), batch_size):
batch = queries[i:i+batch_size]
print(f"处理批次 {i//batch_size + 1}: {len(batch)} 个查询")
# 同步方式处理批次
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_query = {
executor.submit(self._process_single_query, query): query
for query in batch
}
for future in concurrent.futures.as_completed(future_to_query):
query = future_to_query[future]
try:
result = future.result()
all_results.append((query, result))
print(f"✓ 完成: {query}")
except Exception as e:
print(f"✗ 失败: {query} - {e}")
all_results.append((query, None))
return all_results
# 异步处理示例
async def async_example():
"""异步处理示例"""
# 创建测试数据
df = pd.DataFrame({
"date": pd.date_range("2024-01-01", periods=100, freq='D'),
"value": np.random.randn(100) * 100 + 1000,
"category": np.random.choice(["A", "B", "C"], 100)
})
# 创建处理器
processor = AsyncPandasAI(df, max_workers=5)
# 准备查询
queries = [
"计算value的平均值",
"按category分组统计",
"创建时间序列图",
"检测异常值",
"预测未来7天的趋势",
"计算移动平均",
"分析周末和工作日的差异",
"创建热力图",
"计算相关性矩阵",
"生成统计报告"
]
print("开始异步处理...")
# 方法1:异步处理
results = await processor.process_queries_async(queries)
# 方法2:批量处理(同步)
# results = processor.process_batch(queries, batch_size=3)
print("\n处理完成!")
for i, (query, result) in enumerate(zip(queries, results)):
if result is not None:
print(f"{i+1}. {query}: 成功")
else:
print(f"{i+1}. {query}: 失败")
# 运行异步示例(在支持async的环境中)
# asyncio.run(async_example())
4.4.3 监控与日志系统
import logging
import json
from datetime import datetime
from pandasai import SmartDataframe
import pandas as pd
class PerformanceMonitor:
"""性能监控系统"""
def __init__(self, log_file="pandasai_performance.log"):
self.log_file = log_file
self.setup_logging()
def setup_logging(self):
"""设置日志系统"""
logger = logging.getLogger("PandasAI-Performance")
logger.setLevel(logging.INFO)
# 文件处理器
file_handler = logging.FileHandler(self.log_file)
file_handler.setLevel(logging.INFO)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.WARNING)
# 格式化
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
self.logger = logger
def log_query(self, query, execution_time, result_size=None, status="success"):
"""记录查询日志"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"query": query,
"execution_time": execution_time,
"result_size": result_size,
"status": status
}
self.logger.info(json.dumps(log_entry))
# 性能警告
if execution_time > 10: # 超过10秒
self.logger.warning(f"慢查询: {query} - 用时 {execution_time:.2f}秒")
def generate_performance_report(self, days=7):
"""生成性能报告"""
# 读取日志文件
with open(self.log_file, 'r') as f:
logs = [json.loads(line) for line in f if line.strip()]
# 分析性能
if not logs:
return "暂无性能数据"
# 计算统计信息
execution_times = [log["execution_time"] for log in logs if "execution_time" in log]
report = {
"total_queries": len(logs),
"success_rate": sum(1 for log in logs if log.get("status") == "success") / len(logs) * 100,
"avg_execution_time": sum(execution_times) / len(execution_times) if execution_times else 0,
"max_execution_time": max(execution_times) if execution_times else 0,
"slow_queries": [log["query"] for log in logs if log.get("execution_time", 0) > 10],
"common_queries": self._get_common_queries(logs),
"performance_trend": self._calculate_trend(logs)
}
return report
def _get_common_queries(self, logs, top_n=5):
"""获取常见查询"""
from collections import Counter
queries = [log["query"] for log in logs]
return Counter(queries).most_common(top_n)
def _calculate_trend(self, logs):
"""计算性能趋势"""
# 按日期分组
daily_data = {}
for log in logs:
date = log["timestamp"][:10] # 提取日期
if date not in daily_data:
daily_data[date] = {"count": 0, "total_time": 0}
daily_data[date]["count"] += 1
daily_data[date]["total_time"] += log.get("execution_time", 0)
# 计算每日平均值
trend = {}
for date, data in daily_data.items():
trend[date] = data["total_time"] / data["count"]
return trend
# 使用监控系统
def monitored_analysis():
"""带监控的分析"""
# 创建监控器
monitor = PerformanceMonitor()
# 创建数据
df = pd.DataFrame({
"sales": np.random.randint(100, 1000, 1000),
"profit": np.random.randint(10, 200, 1000),
"region": np.random.choice(["North", "South", "East", "West"], 1000),
"month": np.random.choice(["Jan", "Feb", "Mar", "Apr"], 1000)
})
# 创建SmartDataframe
sdf = SmartDataframe(
df,
config={
"llm": OpenAI(api_token="your-key"),
"verbose": False,
"enable_cache": True
}
)
# 执行监控查询
import time
queries = [
"计算各区域的平均销售额",
"分析销售额与利润的关系",
"预测下个月的销售趋势",
"找出表现最好的区域",
"创建销售仪表板"
]
for query in queries:
print(f"\n执行: {query}")
start_time = time.time()
try:
result = sdf.chat(query)
execution_time = time.time() - start_time
# 记录日志
result_size = len(result) if hasattr(result, '__len__') else None
monitor.log_query(
query=query,
execution_time=execution_time,
result_size=result_size,
status="success"
)
print(f"✓ 成功 - 用时: {execution_time:.2f}秒")
except Exception as e:
execution_time = time.time() - start_time
monitor.log_query(
query=query,
execution_time=execution_time,
status=f"error: {str(e)}"
)
print(f"✗ 失败: {e}")
# 生成性能报告
print("\n" + "="*60)
print("性能报告")
print("="*60)
report = monitor.generate_performance_report()
for key, value in report.items():
if isinstance(value, list):
print(f"{key}:")
for item in value:
print(f" - {item}")
else:
print(f"{key}: {value}")
# 运行监控示例
monitored_analysis()
总结与最佳实践
学习路径总结
| 学习阶段 | 主要内容 | 关键技能 |
|---|---|---|
| 初级阶段 | 基础安装、MockLLM使用、简单查询 | 环境配置、基本语法 |
| 中级阶段 | 真实LLM集成、数据库连接、自定义函数 | API集成、SQL连接、函数扩展 |
| 高级阶段 | 性能优化、异步处理、插件开发 | 性能调优、并发编程、系统设计 |
| 专家阶段 | 架构设计、生产部署、团队协作 | 架构设计、CI/CD、团队管理 |
实用工具推荐
# 开发环境检查清单
def check_development_environment():
"""检查开发环境"""
required_packages = [
"pandas",
"pandasai",
"openai", # 如果使用OpenAI
"sqlalchemy", # 如果使用数据库
"plotly", # 如果使用高级可视化
"asyncio", # 如果使用异步
"logging" # 如果使用日志
]
print("开发环境检查清单:")
print("="*60)
for package in required_packages:
try:
__import__(package.replace("-", "_"))
print(f"✓ {package}")
except ImportError:
print(f"✗ {package} - 需要安装")
print("\n建议配置:")
print("1. 使用虚拟环境")
print("2. 设置环境变量")
print("3. 配置版本控制")
print("4. 设置监控和日志")
print("5. 实施测试策略")
check_development_environment()
生产部署建议
# 生产配置示例
PRODUCTION_CONFIG = {
"llm": {
"provider": "openai",
"model": "gpt-4",
"api_key_env_var": "OPENAI_API_KEY",
"timeout": 30,
"max_retries": 3
},
"database": {
"connection_pool_size": 10,
"max_overflow": 20,
"pool_recycle": 3600
},
"performance": {
"enable_cache": True,
"cache_ttl": 300,
"max_cache_size": 10000,
"query_timeout": 60,
"max_result_size": 100000
},
"security": {
"data_masking": True,
"log_sanitization": True,
"api_rate_limit": 100,
"allowed_data_sources": ["database1", "api1"]
},
"monitoring": {
"enable_logging": True,
"log_level": "INFO",
"performance_metrics": True,
"alert_threshold": {
"response_time": 10,
"error_rate": 0.01,
"cache_hit_rate": 0.8
}
}
}
# Docker部署示例
DOCKER_COMPOSE_TEMPLATE = """
version: '3.8'
services:
pandasai-api:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- DATABASE_URL=${DATABASE_URL}
- REDIS_URL=${REDIS_URL}
volumes:
- ./logs:/app/logs
- ./cache:/app/cache
depends_on:
- redis
- database
redis:
image: redis:alpine
ports:
- "6379:6379"
database:
image: postgres:13
environment:
- POSTGRES_PASSWORD=${DB_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
postgres_data:
"""
通过这个完整的进阶指南,您可以系统地学习PandasAI的高级功能,从基础使用到生产部署,全面提升数据分析自动化的能力。每个部分都包含实际代码示例和最佳实践,帮助您在实际项目中应用这些技术。
更多推荐


所有评论(0)