Real-time Tweet Classification with Kafka and sklearn



[Source]: https://medium.com/velotio-perspectives/real-time-text-classification-using-kafka-and-scikit-learn-c2875ad80b3c

This post aims to provide an example of classifying real-time tweets using Kafka and sklearn with Python, which include following stuffs:

  • Train a classification model using sklearn and 20NewsGroup data (which is available in sklearn's datasets)
  • Consume a tweet stream using Kafka
  • Classify each tweet in the stream with trained model



Prerequirements:


The following code is tested with Python  3.6, and consists of two .py files.



The first file is producing tweet streams from Twitter by connecting via the Twitter API.

from kafka import SimpleProducer, KafkaClient
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
import json

# Twitter configurations
consumer_key = "****"
consumer_secret = "****"
access_token = "****"
access_token_secret = "****"

# Kafka tweet stream producers
topic = "twitter-stream"
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)


class KafkaPusher(StreamListener):
    def on_data(self, data):
        all_data = json.loads(data)
        tweet = all_data["text"]
        producer.send_messages(topic, tweet.encode("utf-8"))
        return True

    def on_error(self, status):
        print(status)


WORDS_TO_TRACK = [
    "Politics", "Apple", "Google", "Microsoft",
    "Bikes", "Harley Davidson", "Medicine"
]

if __name__ == "__main__":
    l = KafkaPusher()
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)
    stream = Stream(auth, l)
    while True:
        try:
            stream.filter(languages=["en"], track=WORDS_TO_TRACK)
        except:
            pass

The second file is used to train a model using 20News datasets and consuming the tweet streams in real time, and classify tweets with the trained model.

from __future__ import division,print_function, absolute_import
from sklearn.datasets import fetch_20newsgroups #built-in dataset
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.naive_bayes import MultinomialNB
import pickle
from kafka import KafkaConsumer
from kafka import SimpleProducer, KafkaClient
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
import json

# 20NewsGroup categories
# http://qwone.com/~jason/20Newsgroups/
categories = [
    "talk.politics.misc",
    "misc.forsale",
    "rec.motorcycles",
    "comp.sys.mac.hardware",
    "sci.med",
    "talk.religion.misc"
]


def fetch_train_dataset(categories):
    twenty_train = fetch_20newsgroups(
        subset="train",
        categories=categories,
        shuffle=True,
        random_state=777
    )
    return twenty_train


def bag_of_words(categories):
    count_vect = CountVectorizer()
    X_train_counts = count_vect.fit_transform(
        fetch_train_dataset(categories).data
    )
    pickle.dump(
        count_vect.vocabulary_,
        open("vocab.pickle", "wb")
    )
    return X_train_counts


def tf_idf(categories):
    tf_transformer = TfidfTransformer()
    return (tf_transformer, tf_transformer.fit_transform(
        bag_of_words(categories)
    ))

def model(categories):
    clf = MultinomialNB().fit(
        tf_idf(categories)[1],
        fetch_train_dataset(categories).target
    )
    return clf


if __name__ == "__main__":
    # train your model with 20newsgroup data
    model = model(categories)
    pickle.dump(model, open("model.pickle", "wb"))
    print("training finished...")

    # load model and vocab
    print("Loading pre-trained model")
    vocabulary_to_load = pickle.load(open("vocab.pickle", 'rb'))
    count_vect = CountVectorizer(vocabulary=vocabulary_to_load)
    load_model = pickle.load(open("model.pickle", 'rb'))
    count_vect._validate_vocabulary()
    tfidf_transformer = tf_idf(categories)[0]

    # predicting the streaming kafka messages
    consumer = KafkaConsumer('twitter-stream', bootstrap_servers=['localhost:9092'])
    print("Starting ML predictions.")
    for message in consumer:
        X_new_counts = count_vect.transform([message.value])
        X_new_tfidf = tfidf_transformer.transform(X_new_counts)
        predicted = load_model.predict(X_new_tfidf)
        print(message.value, fetch_train_dataset(categories).target_names[predicted[0]])
As a result of running aforementioned code, we will have an output like following:

training finished...
Loading pre-trained model
Starting ML predictions.
b'RT @azraeiismachine: Just five minutes for a better Malaysia. \n\nThis is an anonymous questionairre from KPWKM. If you know someone or are a\xe2\x80\xa6' rec.motorcycles
b'@REALGoingInRaw @REALGoingInRaw google play says an episode hasn\xe2\x80\x99t been updated since last Tuesday I\xe2\x80\x99m just wonderi\xe2\x80\xa6 https://t.co/0Z35fWaaoM' sci.med
b'Lrt is me as I sit here on twitter even though google docs is open in front of me so I can plan a story' rec.motorcycles
b'RT @Tamaraciocci: Trump sues California, arguing law targeting tax returns is \xe2\x80\x9cflagrantly illegal\xe2\x80\x9d - The Sacramento Bee https://t.co/2CBcdS\xe2\x80\xa6' talk.politics.misc
b"RT @TrellfromBmore: Trump sues California over candidates' tax return law - BBC News https://t.co/GQO6lG1uGL" talk.politics.misc