I am trying to topic a dataset of tweets. I have around 50 million tweets. Unfortunately, such a large dataset will not fit in ram (even 128GB) due to the embeddings. Therefore, I have been working on making an incremental BERTopic as per the docs
As such:
from bertopic.vectorizers import OnlineCountVectorizer
from bertopic.vectorizers import ClassTfidfTransformer
from sklearn.cluster import MiniBatchKMeans
import numpy as np
class SafeIncrementalPCA(IncrementalPCA):
def partial_fit(self, X, y=None):
# Ensure the input is contiguous and in float64
X = np.ascontiguousarray(X, dtype=np.float64)
return super().partial_fit(X, y)
def transform(self, X):
result = super().transform(X)
# Force the output to be float64 and contiguous
return np.ascontiguousarray(result, dtype=np.float64)
vectorizer_model = OnlineCountVectorizer(stop_words="english")
ctfidf_model = ClassTfidfTransformer(reduce_frequent_words=True, bm25_weighting=True)
umap_model = SafeIncrementalPCA(n_components=100)
cluster_model = MiniBatchKMeans(n_clusters=1000, random_state=0)
from bertopic import BERTopic
topic_model = BERTopic(umap_model=umap_model,
hdbscan_model=cluster_model,
for docs_delayed, emb_delayed in tqdm(zip(docs_partitions, embeddings_partitions), total=len(docs_partitions)):
docs_pdf = docs_delayedpute()
emb_pdf = emb_delayedpute()
docs = docs_pdf["text"].tolist()
embeddings = np.vstack(emb_pdf['embeddings'].tolist())
# Partial fit your model (make sure your model supports partial_fit, like many scikit-learn estimators do)
topic_model.partial_fit(docs, embeddings)
and then transforming the dataset into a SQL database:
for docs_delayed, emb_delayed in tqdm(zip(docs_partitions, embeddings_partitions), total=len(docs_partitions)):
docs_pdf = docs_delayedpute()
emb_pdf = emb_delayedpute()
docs = docs_pdf["text"].tolist()
embeddings = np.vstack(emb_pdf['embeddings'].tolist())
# 3) Apply BERTopic on this shard
topics, probs = topic_model.transform(docs, embeddings)
# Save topics to DataFrame
df_topics = pd.DataFrame({
"tweet_id": docs_pdf["id"].tolist(),
"topic": topics,
"probability": probs
})
## Merge & store in DB
docs_pdf["topic"] = df_topics["topic"]
docs_pdf["probability"] = df_topics["probability"]
docs_pdf.to_sql("tweets", engine, if_exists="append", index=False)
I've been trying to do this for a quite a while and this is the closest working example I have gotten. The only issue is half of the dataset has null topics in the database at the end. From what I understand of the theory, MiniBatchKMeans should not have any outliers and therefore all tweets should be assigned to at least one topic, right? I've checked out the unclassified tweets in question and there is nothing in their doc that should suggest it would be hard to classify (relative to others that are classified).
I would be very happy to hear any sort of suggestion on what could be going wrong and how I could fix this!
Thanks!