Creating a Multiple Choice Flask App with Kafka Python and pyTigerGraph

Using pyTigerGraph with Kafka Python in a Flask App

Shreya Chaudhary
4 min readJul 30, 2021

Note: TigerGraph offers a native Kafka Loader. Check it out here: https://docs.tigergraph.com/dev/data-loader-guides/kafka-loader-user-guide. This blog walks through Kafka Python and pyTigerGraph. I highly recommend you check it out and look out for an upcoming blog on it.

Overview

In some of the past blogs, we were able to create a Notes graph with TigerGraph and generate multiple choice questions using GSQL.

Now, let’s create a Flask app and update and interact with the graph live using Kafka Python.

Part I: Prepare your Graph and Directory

Step I: Start your Solution

You can start your solution by going to https://tgcloud.io/, clicking the “Solutions” tab, pressing “Solution Operations” the box under Actions, and pressing “Start” from the dropdown.

Step II: Create your Directory

On your computer, create your directory. You’ll need two files for Kafka: consumer.py and app.py (which will have the producer). app.py will also run the main server. Next, create a templates folder with a one HTML file for now: index.html.

Part II: Create your Kafka Producer and Consumer

Step I: Create the Kafka Consumer

First, we’ll write consumer.py to create a basic Kafka consumer. To start, we’ll import our two libraries, kafka and json.

from kafka import KafkaConsumer
import json

Next, we’ll create our consumer. The topic name will be tgMCQ.

TOPIC_NAME = 'tgMCQ'consumer = KafkaConsumer(TOPIC_NAME, value_deserializer=lambda m: json.loads(m.decode('utf-8')))

Finally, as messages come in, we’ll print them out.

for message in consumer:
print(message)

Perfect! Run the python file. We’ll next run our producer.

python3 consumer.py

Step II: Send a Test Message from a Kafka Producer

Now that the consumer is ready, let’s send a sample message to ensure everything is working. In app.py, first import kafka.

from kafka import KafkaProducer

We’ll then set up the KafkaProducer. Once again our topic name will be tgMCQs. Then, we’ll host the Kafka server locally, on localhost:9092. Then we’ll pass it to be our Kafka Producer.

TOPIC_NAME = 'tgMCQ'
KAFKA_SERVER = 'localhost:9092'
producer = KafkaProducer(bootstrap_servers=KAFKA_SERVER)

Finally, we’ll send the message “TigerGraph is amazing!!”

producer.send(TOPIC_NAME, bytes("[{\"msg\": \"TigerGraph is amazing!!\"}]", 'utf-8'))producer.flush()

Let’s run this file in a separate terminal.

python3 app.py

If you then navigate back to the terminal where you ran consumer.py, you’ll notice the message appear in the terminal.

Message Read from the Consumer!

Fantastic! Let’s next create a request to upsert a vertex.

Part II: Update your Graph with Kafka and pyTigerGraph using Flask

Step I: Update the Consumer

The producer will remain the same, but let’s add to the consumer to upsert data into pyTigerGraph. To start off, we’ll import more libraries, including SpaCy and pyTigerGraph.

from kafka import KafkaConsumer
import json
import pyTigerGraph as tgimport spacy
from spacy import displacy
import en_core_web_sm
nlp = en_core_web_sm.load()

Next, we’ll need to connect to our TigerGraph solution. Here, replace SUBDOMAIN and PASSWORD with your subdomain and password.

Note: As of July 28th, you’ll need to add a beta=True. This will be changed in future versions.

conn = tg.TigerGraphConnection(host="https://SUBDOMAIN.i.tgcloud.io/", password="PASSWORD", graphname="NotesGraph", beta=True)
conn.apiToken = conn.getToken(conn.createSecret())
print("Connected")

Since it takes a while for it to connect, I have it print “Connected” right after.

Next, we once again create the KafkaConsumer.

TOPIC_NAME = 'tgMCQ'consumer = KafkaConsumer(TOPIC_NAME, value_deserializer=lambda m: json.loads(m.decode('utf-8')))

Perfect! Finally, when we get a new message, we’ll run the same function as described in the past blog, running Named Entity Recognition and upserting all the necessary vertices and edges.

for message in consumer:   notes = message.value[0]["msg"]
article = nlp(notes)
sentences = [x for x in article.sents]
conn.upsertVertex("Folder", "KafkaFolder", attributes={"folder": "KafkaFolder"})
conn.upsertVertex("Document", "KafkaDoc", attributes={"document": "KafkaDoc"})
conn.upsertEdge("Folder", "KafkaFolder", "FOLDER_DOCUMENT", "Document", "KafkaDoc")
for sent in sentences: conn.upsertVertex("Sentence", str(sent), attributes={"setence": str(sent)})
conn.upsertEdge("Document", "KafkaDoc", "DOCUMENT_SENTENCE", "Sentence", str(sent))
val = nlp(str(sent)) if val: for ent in val.ents: conn.upsertVertex("Entity", ent.label_, attributes={"entity": ent.label_})
conn.upsertVertex("Entity_Name", ent.text, attributes={"entity_name": ent.text})
conn.upsertEdge("Sentence", str(sent), "SENTENCE_ENTITY_NAME", "Entity_Name", ent.text)
conn.upsertEdge("Entity_Name", ent.text, "ENTITY_NAME_ENTITY", "Entity", ent.label_)
conn.upsertEdge("Document", "KafkaDoc", "DOCUMENT_ENTITY_NAME", "Entity_Name", ent.text)

Perfect! Now, if we run the consumer.py then run the producer (app.py), we can check GraphStudio to find the new sentence vertex upserted.

Step II: Add Flask

Finally, let’s add Flask. In app.py, add the other library imports, primarily Flask and pyTigerGraph.

from flask import Flask, render_template, request, redirectfrom kafka import KafkaProducerimport randomimport pyTigerGraph as tg

Once again, let’s connect to pyTigerGraph and create the Kafka producer.

conn = tg.TigerGraphConnection(host="https://SUBDOMAIN.i.tgcloud.io/", password="PASSWORD", graphname="NotesGraph", beta=True)
conn.apiToken = conn.getToken(conn.createSecret())
print("Connected")
TOPIC_NAME = 'tgMCQ'
KAFKA_SERVER = 'localhost:9092'
producer = KafkaProducer(bootstrap_servers=KAFKA_SERVER)

Next, we’ll initialise the Flask app.

app = Flask(__name__)

We’ll then add one endpoint, /, then we will run the app.

@app.route("/")def main():
return render_template("index.html")
app.run(debug=True)

Step III: Update the HTML

Next, open the index.html file. For now, we’ll just add a textbox. It will be in a form to make a POST request.

<!DOCTYPE html><html><body><form action="/sent_data" method="post">   <input type="input_box" name = "passage"></form></body></html>

Let’s translate this endpoint in the main app.

@app.route("/sent_data", methods=['POST'])def process_data():   text = request.form.get("passage")   print(text)   msg = '[{"type": "add_text"}, {"msg": "' + text + '"}]'   producer.send(TOPIC_NAME, bytes(msg, 'utf-8'))   producer.flush()   return redirect("/")

Then, if we run it, we’ll be able to upsert data in pyTigerGraph using Kafka and Flask!

Part III: Congrats!

Congrats! You made it through this blog! If you have more questions or want more content like this, please join the TigerGraph Discord!

--

--