在当今数字时代,各行各业的专业人士都必须及时了解即将举行的活动、会议和研讨会。然而,在庞大的在线信息海洋中高效地找到与自己兴趣相符的活动是一个巨大的挑战。
本博客介绍了一个创新的解决方案:一个全面的应用程序,旨在从Facebook (opens new window)上爬取活动数据,并使用MyScale (opens new window)分析这些数据。虽然MyScale通常与RAG技术栈相关联或用作向量数据库,但其功能不限于这些领域。我们将利用它进行数据分析,利用其向量搜索功能分析语义上相似的活动,从而提供更好的结果和见解。
您可能会注意到Grok AI (opens new window)使用了Qdrant向量数据库作为搜索引擎,从X(以前称为Twitter)数据中检索实时信息。您也可以通过将MyScale与Apify等平台集成,以类似的方式评估向量数据库的强大功能,通过开发简单的个性化应用程序来增强日常生活任务。
因此,在本博客中,让我们开发一个应用程序,只需输入一个城市的名称,就可以从Facebook上爬取所有相关的活动。随后,我们将使用MyScale的高级SQL向量功能进行数据分析和语义搜索。
# 工具和技术
我们将使用多种工具,包括Apify、MyScale (opens new window)和OpenAI,来开发这个有用的应用程序。
- Apify:一个流行的网络爬虫 (opens new window)和自动化平台,极大地简化了数据收集的过程。它提供了爬取数据并将其提供给LLMs的能力。这使我们能够在实时数据上训练LLMs并开发应用程序。
- MyScale:MyScale是一个SQL向量数据库,我们可以使用它以优化的方式存储和处理结构化和非结构化数据。
- OpenAI:我们将使用来自OpenAI (opens new window)的模型
text-embedding-3-small
来获取文本的嵌入,然后将这些嵌入保存在MyScale中进行数据分析和语义搜索。
# 如何设置MyScale和Apify
要开始设置MyScale和Apify,您需要创建一个新的目录和一个Python (opens new window)文件。您可以通过打开终端或命令行并输入以下命令来完成此操作:
mkdir MyScale
cd MyScale
touch main.ipynb
让我们安装所需的软件包。复制下面的命令,并将其粘贴到终端中。这些软件包将为我们开发应用程序所需的工具和库提供支持。
pip install openai apify-client clickhouse-connect pandas numpy
这将在您的系统中安装所有依赖项。要确认一切都安装正确,您可以在终端中输入以下命令。
pip freeze | egrep '(openai|apify-client|clickhouse-connect|pandas|numpy)'
这应该包括所有已安装的依赖项及其版本。如果发现任何缺少的依赖项,您可能需要重新运行该特定软件包的安装命令。现在,安装完成后,我们可以开始编写代码了。
💡 注意:我们将在Python笔记本中工作。将每个代码块视为一个笔记本单元格。
# 如何使用Apify爬取数据
现在,我们将使用Apify API来爬取纽约市的活动数据,使用**Facebook活动爬虫 (opens new window)**。
import pandas as pd
from apify_client import ApifyClient
# 使用您的API令牌初始化ApifyClient
client = ApifyClient("在此处输入您的Apify密钥")
# 准备Actor输入
run_input = {
"searchQueries": ["Sport New York"],
"startUrls": [],
"maxEvents": 50,
}
# 运行Actor并等待其完成
run = client.actor("UZBnerCFBo5FgGouO").call(run_input=run_input)
df_columns = ['Name', 'Datetime', 'Description', 'Users_Going', 'Users_Interested', 'Users_Responded', 'City', 'Organized_By', 'Street_Address']
dataframe1 = pd.DataFrame(columns=df_columns)
for item in client.dataset(run["defaultDatasetId"]).iterate_items():
# 使用字典推导式将None值替换为空字符串
row = {
'Name': item.get('name', ''),
'Datetime': item.get('dateTimeSentence', ''),
'Description': item.get('description', ''),
'Users_Going': item.get('usersGoing', ''),
'Users_Interested': item.get('usersInterested', ''),
'Users_Responded': item.get('usersResponded', ''),
'City': item.get('location', {}).get('city', '') if item.get('location') else '',
'Organized_By': item.get('organizedBy', ''),
'Street_Address': item.get('location', {}).get('streetAddress', '') if item.get('location') else ''
}
# 确保所有None值都被替换为空字符串
row = {k: (v if v is not None else '') for k, v in row.items()}
dataframe1 = dataframe1._append(row, ignore_index=True)
# 清理数据
dataframe1['Description'] = dataframe1['Description'].replace('\\n', '', regex=True)
这段脚本以pandas DataFrame的形式给出了即将举行的活动详细信息。 💡 注意:不要忘记在上面的脚本中添加您的Apify API密钥。您可以在Apify控制台的Integrations (opens new window)页面上找到您的API令牌。
# 数据预处理
当我们收集原始数据时,它以各种格式呈现。在此脚本中,我们将活动日期转换为单一格式,以便更高效地进行数据过滤。
# 导入数据操作和日期解析所需的库
import pandas as pd
import numpy as np
from datetime import datetime
from dateutil import parser
# 解析可能表示范围或单个日期的日期字符串的函数
def parse_dates(date_str):
# 检查日期字符串是否包含破折号,表示范围
if '-' in date_str:
parts = date_str.split('-')
# 如果字符串分为两部分,表示是一个有效的范围
if len(parts) == 2:
try:
# 解析开始和结束日期,并将其格式化为可读格式
start_date = parser.parse(parts[0], fuzzy=True).strftime('%a, %b %d')
end_date = parser.parse(parts[1], fuzzy=True).strftime('%a, %b %d')
return start_date, end_date
except ValueError:
# 如果解析错误,不做任何操作(将在下面处理)
pass
# 如果不是范围,或者解析范围失败,则尝试解析为单个日期
try:
parsed_date = parser.parse(date_str, fuzzy=True)
# 格式化开始日期,并以不同的格式格式化结束日期
start_date = parsed_date.strftime('%a, %b %d AT %I:%M %p EDT')
end_date = parsed_date.strftime('%a, %b %d') # 结束日期不包含时间
return start_date, end_date
except ValueError:
# 如果解析失败,则返回NaN作为两个日期
return np.nan, np.nan
# 从日期字符串中提取详细的日期、时间和星期几
def extract_date_time_day(date_str):
try:
# 解析日期字符串,允许一定的输入格式灵活性
parsed_date = parser.parse(date_str, fuzzy=True)
# 提取和格式化日期、时间和星期几部分
date = parsed_date.strftime('%Y-%m-%d')
day = parsed_date.strftime('%a')
# 确定原始字符串是否包含时间组件
time_component = parsed_date.strftime('%I:%M %p') not in date_str
time = parsed_date.strftime('%H:%M:%S') if not time_component else np.nan
except ValueError:
# 如果解析失败,则将日期、时间和星期几设置为NaN
date, time, day = np.nan, np.nan, np.nan
return date, time, day
# 对整个数据集应用parse_dates函数,为开始和结束日期创建新列
dataframe1[['Start_Date', 'End_Date']] = dataframe1.apply(lambda row: pd.Series(parse_dates(row['Datetime'])), axis=1)
# 删除Start_Date为NaN的行,表示解析失败
dataframe = dataframe1.dropna(subset=['Start_Date'])
# 使用extract_date_time_day将开始和结束日期拆分为单独的日期、时间和星期几列
dataframe['Start_Date'], dataframe['Start_Time'], dataframe['Start_Day'] = zip(*dataframe['Start_Date'].apply(extract_date_time_day))
dataframe['End_Date'], _, dataframe['End_Day'] = zip(*dataframe['End_Date'].apply(extract_date_time_day))
# 删除原始的'Datetime'列,因为它不再需要
dataframe=dataframe.drop(['Datetime'], axis=1)
# 将'Start_Date'和'End_Date'转换为日期格式,并提取日期部分
dataframe['Start_Date'] = pd.to_datetime(dataframe['Start_Date']).dt.date
dataframe['End_Date'] = pd.to_datetime(dataframe['End_Date']).dt.date
# 将'Start_Time'转换为日期格式,并保留时间信息
dataframe['Start_Time'] = pd.to_datetime(dataframe['Start_Time'])
这段代码使用Python的pandas、datetime和dateutil库来格式化数据。
# 生成嵌入
为了深入了解和搜索活动,我们将使用text-embedding-3-small
生成活动描述的嵌入。这些嵌入捕捉了每个活动的语义本质,帮助应用程序返回更好的结果。
# 导入OpenAI库以访问API。
from openai import OpenAI
# 使用API密钥初始化OpenAI客户端。
openai_client = OpenAI(api_key="在此处输入您的OpenAI API密钥")
# 获取文本嵌入的函数
def get_embedding(text, model="text-embedding-3-small"):
return openai_client.embeddings.create(input=text, model=model).data
embeddings = get_embedding(dataframe["Description"].tolist())
# 从嵌入对象中提取嵌入向量
vectors = [embedding.embedding for embedding in embeddings]
array = np.array(vectors)
embeddings_series = pd.Series(list(array))
# 将嵌入作为新列添加到DataFrame中。
dataframe['Description_Embeddings'] = embeddings_series
现在,我们将新的带有嵌入的DataFrame插入MyScale。
# 连接到MyScale
正如我们在开头讨论的那样,我们将使用MyScale作为存储和管理数据的向量数据库。在这里,我们将连接到MyScale以准备存储数据。
import clickhouse_connect
client = clickhouse_connect.get_client(
host='在此处输入主机名',
port=443,
username='在此处输入用户名',
password='在此处输入密码'
)
这个连接设置确保我们的应用程序可以与MyScale进行通信,并使用SQL的功能进行数据操作和分析。
💡 注意:有关如何连接到MyScale集群的更多信息,请参阅连接详细信息 (opens new window)。
# 使用MyScale创建表和索引
现在,我们根据DataFrame创建一个表。所有的数据都将存储在这个表中,包括嵌入。
client.command("""
CREATE TABLE default.Events (
Name String,
Description String,
Users_Going Int64,
Users_Interested Int64,
Users_Responded Int64,
City String,
Organized_By String,
Street_Address String,
Start_Date Date32,
End_Date Nullable(Date32),
Start_Time Nullable(DateTime64),
Start_Day String,
End_Day String,
Description_Embeddings Array(Float32),
CONSTRAINT check_data_length CHECK length(Description_Embeddings) = 1536
) ENGINE = MergeTree()
ORDER BY (Name);
""")
上述SQL语句在集群上创建了一个名为Events
的表。CONSTRAINT
确保所有的向量嵌入都具有相同的长度1536
。
# 存储数据并在MyScale中创建索引
在这一步中,我们将处理后的数据插入MyScale。这涉及到批量插入数据,以确保高效的存储和检索。
batch_size = 10 # 根据您的需求进行调整
num_batches = len(dataframe) // batch_size
for i in range(num_batches):
start_idx = i * batch_size
end_idx = start_idx + batch_size
batch_data = dataframe[start_idx:end_idx]
# print(batch_data["Description_Embeddings"])
client.insert("default.Events", batch_data.to_records(index=False).tolist(), column_names=batch_data.columns.tolist())
print(f"Batch {i+1}/{num_batches} inserted.")
client.command("""
ALTER TABLE default.Events
ADD VECTOR INDEX vector_index Description_Embeddings
TYPE MSTG
""")
使用pandas,上述代码将我们准备好的数据集高效地转移到MyScale数据库中。
# 使用MyScale进行数据分析
最后,我们使用MyScale的分析能力进行数据分析,并实现语义搜索。通过执行SQL查询,我们可以根据主题、位置和日期分析活动。因此,让我们尝试编写一些查询。
# 简单的SQL查询
首先,让我们从表中获取前10个结果。
results=client.query("""
SELECT Name,Description FROM default.Events LIMIT 10
""")
for row in results.named_results():
print(row["Name"])
print(row['Description'])
此查询将简单地返回events
表中的前10个结果。
# 通过语义相关性发现活动
让我们尝试找到与参考活动相关的前10个即将举行的活动,这些活动的氛围相似,比如“一个国家历史最悠久的演出——自1974年以来运营……现在是我们的第50个年头!我们的斯克内克塔迪”。这是通过比较活动描述的语义嵌入来实现的,确保主题和情感匹配。
embeddings=get_embedding(["一个国家历史最悠久的演出——自1974年以来运营……现在是我们的第50个年头!我们的斯克内克塔迪"])
embedding=embeddings[0].embedding
results = client.query(f"""
SELECT Name, Description,
distance(Description_Embeddings, {embedding}) as dist FROM default.Events ORDER BY dist LIMIT 10
""")
for row in results.named_results():
print("活动标题 ", row["Name"])
print("活动描述 ", row['Description'])
print("距离:", row["dist"])
# 热门活动按热度排序
此查询按照参与者和感兴趣用户的数量,排名前10个活动,突出显示大型城市节日和重要会议的热门活动。对于那些希望参加大型、充满活力的聚会的人来说,这是理想的选择。
results = client.query(f"""
SELECT Name, City, Users_Going, Users_Interested, Users_Responded
FROM default.Events
ORDER BY Users_Going DESC, Users_Interested DESC
LIMIT 10
""")
for row in results.named_results():
print("活动名称 ", row["Name"])
print("城市 ", row["City"])
print("参与者数量 ", row["Users_Going"])
print("感兴趣的用户数量 ", row["Users_Interested"])
# 纽约市的热门本地活动
将相关性和热度结合起来,此查询识别与特定活动相关的纽约市的类似活动,并按照参与人数进行排名,提供了一个精选的活动列表,反映了该城市充满活力的文化,并吸引了当地的兴趣。
embeddings=get_embedding(["一个国家历史最悠久的演出——自1974年以来运营……现在是我们的第50个年头!我们的斯克内克塔迪"])
embeddi=embeddings[0].embedding
results = client.query(f"""
SELECT Name,City, Description, Users_Going,distance(Description_Embeddings, {embeddi}) as dist
FROM default.Events
WHERE City LIKE '%纽约%' and dist < 1.5
ORDER BY Users_Going DESC,dist
LIMIT 10
""")
for row in results.named_results():
print("活动名称 ", row["Name"])
print("描述 ", row["Description"])
print("参与者数量 ", row["Users_Going"])
# 领先的活动组织者
此查询按照参与者和感兴趣用户的总数,排名前10个活动组织者,突出显示那些在创建引人注目的活动和吸引大量观众方面表现出色的组织者。它为活动策划者和对顶级活动感兴趣的参与者提供了见解。
# 哪个客户吸引了最多的用户
results = client.query(f"""
SELECT Organized_By, SUM(Users_Going + Users_Interested) AS Total_Users
FROM default.Events
GROUP BY Organized_By
ORDER BY Total_Users DESC
Limit 10
""")
for row in results.named_results():
print("活动名称 ", row["Organized_By"])
print("总用户数 ", row["Total_Users"])
# 实施RAG
之前,我们已经探索了MyScale用于数据分析的能力,突出了它在增强数据工作流方面的能力。接下来,我们将进一步实施检索增强生成(RAG),这是一个创新的框架,将外部知识库与LLMs结合起来。这一步将帮助您更好地了解数据并找到更详细的见解。接下来,您将看到如何将RAG与MyScale结合使用,这将使与数据的工作更加有趣和高效。
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
query="请给我推荐一些与篮球相关的活动"
# 使用上面定义的get_embedding方法,它接受一个句子列表
embeddings=get_embedding([query])
embeddings=embeddings[0].embedding
results = client.query(f"""
SELECT Name, City, Users_Going, Description, distance(Description_Embeddings, {embeddings}) as dist
FROM default.Events
ORDER BY Users_Going DESC,dist
LIMIT 5
""")
PROMPT_TEMPLATE = """
您的目标是根据下面提供的信息回答问题:
{context}
---
您的任务是仔细分析提供的上下文,并根据仅提供的信息回答以下问题:
{question}
"""
# 组合前几个结果的描述。
descriptions = [row["Description"] for row in results.named_results()]
context_text = "\n\n---\n\n".join(descriptions)
prompt_template = ChatPromptTemplate.from_template(PROMPT_TEMPLATE)
prompt = prompt_template.format(context=context_text, question=query)
model = ChatOpenAI(openai_api_key="在此处输入您的API密钥")
response_text = model.predict(prompt)
formatted_response = f"回答:{response_text}\n"
print(formatted_response)
在整个博客中,我们观察到MyScale远不止是一个可以用来开发各种应用程序的向量数据库。我们可以将其用作简单的SQL数据库或高级AI应用程序,涵盖了大部分开发领域。我们鼓励您至少尝试一下,并通过注册免费版并获得500万个免费向量存储来探索高级功能。
# 结论
我们通过开发一个活动分析应用程序,探索了MyScale与Apify Scraper的能力和功能。MyScale在高性能向量搜索方面展示了其卓越的能力,同时保留了SQL数据库的所有功能,帮助开发人员使用熟悉的SQL语法进行语义搜索,速度和准确性更高。
MyScale的功能不限于此应用程序,您可以采用它来使用RAG (opens new window)方法开发任何AI应用程序。MyScale在成本、速度和准确性方面超越了其他向量数据库 (opens new window),所以为什么不在下一个应用程序中尝试一下呢?
如果您有任何反馈或建议,请在MyScale的Discord (opens new window)或Twitter (opens new window)上与我们联系。