現代のデジタル時代において、あらゆる業界の専門家は、近日開催されるイベント、会議、ワークショップについて常に最新の情報を得ておく必要があります。しかし、オンライン情報の広大な海の中で自分の興味と一致するイベントを効率的に見つけることは、大きな課題です。
このブログでは、この課題に対する革新的な解決策を紹介します。Facebook (opens new window)からイベントデータをスクレイピングし、そのデータをMyScale (opens new window)を使用して分析します。MyScaleは通常、RAGテックスタックに関連付けられるか、ベクトルデータベースとして使用されますが、その能力はこれらの領域にとどまりません。ベクトル検索機能を活用して、意味的に類似したイベントを分析し、より良い結果と洞察を提供します。
Grok AI (opens new window)がQdrantベクトルデータベースを検索エンジンとして使用して、X(旧称Twitter)データからリアルタイム情報を取得したことに気付くかもしれません。MyScaleもこのようにMyScaleをApifyなどの他のプラットフォームと統合して、簡単なパーソナライズされたアプリケーションの開発を通じて日常のタスクを強化することができます。
このブログでは、市の名前のみを入力として受け取り、Facebookから関連するすべてのイベントをスクレイピングするアプリケーションを開発しましょう。その後、MyScaleの高度なSQLベクトル機能を使用して、データ分析と意味検索を行います。
# ツールとテクノロジー
この便利なアプリケーションを開発するために、Apify、MyScale (opens new window)、OpenAIなど、いくつかのツールを使用します。
- Apify: ウェブスクレイピング (opens new window)と自動化プラットフォームで、データ収集のプロセスを大幅に簡略化します。データをスクレイプしてそれをLLMsにフィードする機能を提供します。これにより、リアルタイムデータでLLMsをトレーニングし、アプリケーションを開発することができます。
- MyScale: MyScaleは、構造化および非構造化データを最適化された方法で保存および処理するSQLベクトルデータベースです。
- OpenAI: OpenAI (opens new window)からモデル
text-embedding-3-small
を使用してテキストの埋め込みを取得し、それをデータ分析と意味検索のためにMyScaleに保存します。
# MyScaleとApifyのセットアップ方法
MyScaleとApifyのセットアップを開始するには、新しいディレクトリとPython (opens new window)ファイルを作成する必要があります。次のコマンドをターミナルまたはコマンドラインに入力してください。
mkdir MyScale
cd MyScale
touch main.ipynb
必要なパッケージをインストールしましょう。次のコマンドをコピーしてターミナルに貼り付けます。これらのパッケージは、アプリケーションを開発するために必要なツールとライブラリを提供します。
pip install openai apify-client clickhouse-connect pandas numpy
これで、す
べての依存関係がシステムにインストールされます。すべてが正しくインストールされたことを確認するには、次のコマンドをターミナルに入力します。
pip freeze | egrep '(openai|apify-client|clickhouse-connect|pandas|numpy)'
これには、インストールされたすべての依存関係とそのバージョンが含まれるはずです。欠落している依存関係がある場合は、その特定のパッケージのインストールコマンドを再実行する必要があります。これで、インストールが完了したら、インストール後のコマンドを入力してください。
💡 注意: Pythonノートブックで作業します。各コードブロックをノートブックセルとして考えてください。
# Apifyを使用したデータのスクレイピング方法
次に、Apify APIを使用してニューヨーク市のイベントデータをスクレイピングします。**Facebook Events scraper (opens new window)**を使用します。
import pandas as pd
from apify_client import ApifyClient
# APIトークンを使用してApifyClientを初期化します。
client = ApifyClient("ここにあなたのApifyキーを入力してください")
# Actorの入力を準備します。
run_input = {
"searchQueries": ["Sport New York"],
"startUrls": [],
"maxEvents": 50,
}
# Actorを実行して完了するまで待ちます。
run = client.actor("UZBnerCFBo5FgGouO").call(run_input=run_input)
df_columns = ['Name', 'Datetime', 'Description', 'Users_Going', 'Users_Interested', 'Users_Responded', 'City', 'Organized_By', 'Street_Address']
dataframe1 = pd.DataFrame(columns=df_columns)
for item in client.dataset(run["defaultDatasetId"]).iterate_items():
# 辞書内包表記を使用して、Noneの値を空の文字列に置き換えます。
row = {
'Name': item.get('name', ''),
'Datetime': item.get('dateTimeSentence', ''),
'Description': item.get('description', ''),
'Users_Going': item.get('usersGoing', ''),
'Users_Interested': item.get('usersInterested', ''),
'Users_Responded': item.get('usersResponded', ''),
'City': item.get('location', {}).get('city', '') if item.get('location') else '',
'Organized_By': item.get('organizedBy', ''),
'Street_Address': item.get('location', {}).get('streetAddress', '') if item.get('location') else ''
}
# すべてのNone値が空の文字列で置き換えられるようにします。
row = {k: (v if v is not None else '') for k, v in row.items()}
dataframe1 = dataframe1._append(row, ignore_index=True)
# データのクリーニング
dataframe1['Description'] = dataframe1['Description'].replace('\\n', '', regex=True)
このスクリプトは、pandas DataFrame形式で次のイベントの詳細を提供します。 💡 注意: 上記のスクリプトにApify APIキーを追加することを忘れないでください。Apifyコンソールのインテグレーション (opens new window)ページでAPIトークンを見つけることができます。
# データの前処理
収集した生データはさまざまな形式で提供されます。このスクリプトでは、イベント日時を1つの形式に統一し、データのフィルタリングを効率的に行うためにします。
# データ操作と日付解析のための必要なライブラリをインポートします。
import pandas as pd
import numpy as np
from datetime import datetime
from dateutil import parser
# 範囲または単一日付を表すかどうかを確認する関数
def parse_dates(date_str):
# 日付文字列にダッシュが含まれているかどうかを確認します(範囲を示す)
if '-' in date_str:
parts = date_str.split('-')
# 文字列が2つの部分に分割される場合、有効な範囲です
if len(parts) == 2:
try:
# 開始日と終了日を解析し、読みやすい形式にフォーマットします
start_date = parser.parse(parts[0], fuzzy=True).strftime('%a, %b %d')
end_date = parser.parse(parts[1], fuzzy=True).strftime('%a, %b %d')
return start_date, end_date
except ValueError:
# 解析エラーの場合、何もしません(後で処理します)
pass
# 範囲ではない場合、または解析が範囲に失敗した場合は、単一の日付として解析しようとします
try:
parsed_date = parser.parse(date_str, fuzzy=True)
# 単一の日付を開始日としてフォーマットし、終了日は異なる形式にします
start_date = parsed_date.strftime('%a, %b %d AT %I:%M %p EDT')
end_date = parsed_date.strftime('%a, %b %d') # 終了日のために時間を省略します
return start_date, end_date
except ValueError:
# 解析が失敗した場合、両方の日付をNaNに設定します
return np.nan, np.nan
# 日付文字列から詳細な日付、時刻、および曜日を抽出する関数
def extract_date_time_day(date_str):
try:
# 日付文字
列を解析し、入力形式に柔軟性を持たせます
parsed_date = parser.parse(date_str, fuzzy=True)
# 日付、時刻、および日の部分を抽出してフォーマットします
date = parsed_date.strftime('%Y-%m-%d')
day = parsed_date.strftime('%a')
# 元の文字列に時間コンポーネントが含まれているかどうかを判断します
time_component = parsed_date.strftime('%I:%M %p') not in date_str
time = parsed_date.strftime('%H:%M:%S') if not time_component else np.nan
except ValueError:
# 解析に失敗した場合、日付、時刻、および曜日をNaNに設定します
date, time, day = np.nan, np.nan, np.nan
return date, time, day
# DataFrame全体にparse_dates関数を適用し、開始日と終了日の新しい列を作成します
dataframe1[['Start_Date', 'End_Date']] = dataframe1.apply(lambda row: pd.Series(parse_dates(row['Datetime'])), axis=1)
# Start_DateがNaNである行を削除します(解析に失敗したことを意味します)
dataframe = dataframe1.dropna(subset=['Start_Date'])
# extract_date_time_dayを適用し、開始日と終了日を別々の日付、時刻、および日の列に分割します
dataframe['Start_Date'], dataframe['Start_Time'], dataframe['Start_Day'] = zip(*dataframe['Start_Date'].apply(extract_date_time_day))
dataframe['End_Date'], _, dataframe['End_Day'] = zip(*dataframe['End_Date'].apply(extract_date_time_day))
# もはや必要ない元の'Datetime'列を削除します
dataframe=dataframe.drop(['Datetime'], axis=1)
# 'Start_Date'と'End_Date'を日付形式に変換し、日付部分のみを抽出します
dataframe['Start_Date'] = pd.to_datetime(dataframe['Start_Date']).dt.date
dataframe['End_Date'] = pd.to_datetime(dataframe['End_Date']).dt.date
# 'Start_Time'を日付時刻形式に変換し、時間情報を保持します
dataframe['Start_Time'] = pd.to_datetime(dataframe['Start_Time'])
このコードスニペットでは、Pythonのpandas、datetime、およびdateutilパッケージを使用してデータをフォーマットしています。
# 埋め込みの生成
イベントの説明から埋め込みを生成し、text-embedding-3-small
を使用しています。これらの埋め込みは、各イベントの意味的本質を捉え、アプリケーションがより良い結果を返すのに役立ちます。
# OpenAIライブラリをAPIアクセス用にインポートします。
from openai import OpenAI
# APIキーを使用してOpenAIクライアントを初期化します。
openai_client = OpenAI(api_key="ここにあなたのOpenAI APIキーを入力してください")
# テキスト埋め込みを取得する関数
def get_embedding(text, model="text-embedding-3-small"):
return openai_client.embeddings.create(input=text, model=model).data
embeddings = get_embedding(dataframe["Description"].tolist())
# embeddingsオブジェクトから埋め込みベクトルを抽出します
vectors = [embedding.embedding for embedding in embeddings]
array = np.array(vectors)
embeddings_series = pd.Series(list(array))
# 新しい列として埋め込みをDataFrameに追加します。
dataframe['Description_Embeddings'] = embeddings_series
次に、MyScaleに新しい埋め込み付きDataFrameを挿入します。
# MyScaleとの接続
開始時に、MyScaleに接続してデータを格納できるようにします。
import clickhouse_connect
client = clickhouse_connect.get_client(
host='ここにホスト名を入力してください',
port=443,
username='ここにユーザー名を入力してください',
password='ここにパスワードを入力してください'
)
この接続設定により、アプリケーションがMyScaleと通信し、データ操作と分析にSQLの機能を使用できます。
💡 注意: MyScaleクラスターへの接続方法の詳細については、接続の詳細 (opens new window)を参照してください。
# MyScaleを使用してテーブルとインデックスを作成する
DataFrameに応じてテーブルを作成します。このテーブルには、埋め込みも含まれます。
client.command("""
CREATE TABLE default.Events (
Name String,
Description String,
Users_Going Int64,
Users_Interested Int64,
Users_Responded Int64,
City String,
Organized_By String,
Street_Address String,
Start_Date Date32,
End_Date Nullable(Date32),
Start_Time Nullable(DateTime64),
Start_Day String,
End_Day String,
Description_Embeddings Array(Float32),
CONSTRAINT check_data_length CHECK length(Description_Embeddings) = 1536
) ENGINE = MergeTree()
ORDER BY (Name);
""")
上記のSQLステートメントは、クラスター上にEvents
という名前のテーブルを作成します。CONSTRAINT
は、すべてのベクトル埋め込みが同じ長さ1536
であることを確認します。
# MyScaleにデータを格納し、インデックスを作成する
このステップでは、処理されたデータをMyScaleに挿入します。これには、効率的な格納と取得を確保するために、データをバッチで挿入する必要があります。
batch_size = 10 # 必要に応じて調整してください
num_batches = len(dataframe) // batch_size
for i in range(num_batches):
start_idx = i * batch_size
end_idx = start_idx + batch_size
batch_data = dataframe[start_idx:end_idx]
# print(batch_data["Description_Embeddings"])
client.insert("default.Events", batch_data.to_records(index=False).tolist(), column_names=batch_data.columns.tolist())
print(f"Batch {i+1}/{num_batches} inserted.")
client.command("""
ALTER TABLE default.Events
ADD VECTOR INDEX vector_index Description_Embeddings
TYPE MSTG
""")
上記のコードを使用して、Pandasを介して準備したデータセットを効率的にMyScaleデータベースに転送します。
# MyScaleを使用したデータ分析
最後に、MyScaleの解析機能を使用して、分析と意味的検索を実行します。SQLクエリを実行することで、トピック、場所、日付に基づいてイベントを分析できます。では、いくつかのクエリを書いてみましょう。
# シンプルなSQLクエリ
まず、テーブルから上位10件の結果を取得してみましょう。
results=client.query("""
SELECT Name,Description FROM default.Events LIMIT 10
""")
for row in results.named_results():
print(row["Name"])
print(row['Description'])
このクエリは、単純にevents
テーブルから上位10件の結果を返します。
# 意味的関連性によるイベントの発見
次に、リファレンスイベントと類似した上位10件の今後のイベントを見つけてみましょう。例えば、「1974年から運営されている国内で最も長いショー...現在私たちの50周年です!!!Our Schenectady」のようなイベントの説明の意味的埋め込みを比較することで、テーマや感情が一致することを確認します。
embeddings=get_embedding(["One of the Longest Running Shows in the Country - Operating since 1974 ...NOW our 50th YEAR !!!Our Schenectady"])
embedding=embeddings[0].embedding
results = client.query(f"""
SELECT Name, Description,
distance(Description_Embeddings, {embedding}) as dist FROM default.Events ORDER BY dist LIMIT 10
""")
for row in results.named_results():
print("イベントのタイトル ", row["Name"])
print("イベントの説明 ", row['Description'])
print("距離 : ", row["dist"])
# 人気のあるイベントのトレンド
このクエリは、出席者数と興味を持っているユーザー数によってトップ10のイベントをランク付けし、大規模な都市祭りから主要な会議までの人気のあるイベントをハイライトします。大規模でエネルギッシュな集まりに参加したい方に最適です。
results = client.query(f"""
SELECT Name, City, Users_Going, Users_Interested, Users_Responded
FROM default.Events
ORDER BY Users_Going DESC, Users_Interested DESC
LIMIT 10
""")
for row in results.named_results():
print("イベント名 ", row["Name"])
print("都市 ", row["City"])
print("参加者数 ", row["Users_Going"])
print("興味を持っているユーザー数 ", row["Users_Interested"])
# ニューヨークで人気のある地元のイベント
関連性と人気を組み合わせ、特定のイベントに関連するニューヨーク市内の類似のイベントを特定し、出席者数でランク付けします。これにより、市の活気あふれる文化を反映し、地元の関心を集めるイベントの厳選されたリストが提供されます。
embeddings=get_embedding(["One of the Longest Running Shows in the Country - Operating since 1974 ...NOW our 50th YEAR !!!Our Schenectady"])
embeddi=embeddings[0].embedding
results = client.query(f"""
SELECT Name,City, Description, Users_Going,distance(Description_Embeddings, {embeddi}) as dist
FROM default.Events
WHERE City LIKE '%ニューヨーク%' and dist < 1.5
ORDER BY Users_Going DESC,dist
LIMIT 10
""")
for row in results.named_results():
print("イベント名 ", row["Name"])
print("説明 ", row["Description"])
print("参加者数 ", row["Users_Going"])
# トップイベントオーガナイザー
このクエリは、参加者数と興味を持っているユーザーの合計数によってトップ10のイベントオーガナイザーをランク付けし、魅力的なイベントを作成し大観衆を集めることに優れた人々をハイライトします。これは、トップクラスのイベントに関心のあるイベントプランナーや参加者に洞察を提供します。
# 最も多くのユーザーを集めたクライアントは誰ですか?
results = client.query(f"""
SELECT Organized_By, SUM(Users_Going + Users_Interested) AS Total_Users
FROM default.Events
GROUP BY Organized_By
ORDER BY Total_Users DESC
Limit 10
""")
for row in results.named_results():
print("イベント名 ", row["Organized_By"])
print("Total_Users ", row["Total_Users"])
# RAGの実装
以前、データ分析においてMyScaleを探求し、データワークフローの強化能力を強調しました。今後は、Retrieval-Augmented Generation(RAG)を実装することで、さらに一歩進んで、外部ナレッジベースとLLMを組み合わせた革新的なフレームワークを使用します。このステップでは、データをより詳細に理解し、より詳細な洞察を得るのに役立ちます。次に、MyScaleとRAGを使用する方法を示します。これにより、データの取り扱いがより興味深く、生産的になります。
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
query="バスケットボールに関連するイベントをいくつか提案してください"
# 上記で定義されたget_embeddingメソッドを使用します。それは文のリストを受け入れます
embeddings=get_embedding([query])
embeddings=embeddings[0].embedding
results = client.query(f"""
SELECT Name, City, Users_Going, Description, distance(Description_Embeddings, {embeddings}) as dist
FROM default.Events
ORDER BY Users_Going DESC,dist
LIMIT 5
""")
PROMPT_TEMPLATE = """
以下の提供された情報を使用して、質問に対する回答を形成することがあなたの目標です:
{context}
---
提供された文脈を注意深く分析し、次の質問に基づいて、提供された情報のみを使用して回答を提供することがあなたの課題です:
{question}
"""
# 上位の結果の説明を結合します。
descriptions = [row["Description"] for row in results.named_results()]
context_text = "\n\n---\n\n".join(descriptions)
prompt_template = ChatPromptTemplate.from_template(PROMPT_TEMPLATE)
prompt = prompt_template.format(context=context_text, question=query)
model = ChatOpenAI(openai_api_key="your_api_key_here")
response_text = model.predict(prompt)
formatted_response = f"Response: {response_text}\n"
print(formatted_response)
このブログを通じて、MyScaleがあらゆる種類のアプリケーションを開発するために使用できるベクトルデータベース以上のものであることがわかりました。それは単純なSQLデータベースとして、または高度なAIアプリケーションとして使用でき、開発ドメインの大部分をカバーしています。少なくとも試してみて、無料のティアにサインアップして5百万の無料ベクトルストレージを取得して、高度な機能を探索してみることをお勧めします。
# 結論
イベント分析アプリケーションを開発する過程で、MyScaleとApify Scraperの能力と機能を探索しました。MyScaleは、高性能なベクトル検索に優れた能力を示し、SQLデータベースのすべての機能を維持しながら、なじみのあるSQL構文を使用してより速く正確に意味検索を行うことを開発者が助けることができます。
MyScaleの機能はこのアプリケーションに限定されません。RAG (opens new window)方法を使用して、任意のAIアプリケーションを開発するために採用できます。MyScaleは、コスト、速度、精度の面で他のベクトルデータベースを上回っています (opens new window)ので、次のアプリケーションに試してみるのはいかがでしょうか?
ご意見やご提案がある場合は、MyScale Discord (opens new window) または Twitter (opens new window) までお問い合わせください。