Creating a Multiple Choice Flask App with Kafka Python and pyTigerGraph
Using pyTigerGraph with Kafka Python in a Flask App
Note: TigerGraph offers a native Kafka Loader. Check it out here: https://docs.tigergraph.com/dev/data-loader-guides/kafka-loader-user-guide. This blog walks through Kafka Python and pyTigerGraph. I highly recommend you check it out and look out for an upcoming blog on it.
Overview
In some of the past blogs, we were able to create a Notes graph with TigerGraph and generate multiple choice questions using GSQL.
Now, let’s create a Flask app and update and interact with the graph live using Kafka Python.
Part I: Prepare your Graph and Directory
Step I: Start your Solution
You can start your solution by going to https://tgcloud.io/, clicking the “Solutions” tab, pressing “Solution Operations” the box under Actions, and pressing “Start” from the dropdown.
Step II: Create your Directory
On your computer, create your directory. You’ll need two files for Kafka: consumer.py and app.py (which will have the producer). app.py
will also run the main server. Next, create a templates folder with a one HTML file for now: index.html.
Part II: Create your Kafka Producer and Consumer
Step I: Create the Kafka Consumer
First, we’ll write consumer.py
to create a basic Kafka consumer. To start, we’ll import our two libraries, kafka and json.
from kafka import KafkaConsumer
import json
Next, we’ll create our consumer. The topic name will be tgMCQ
.
TOPIC_NAME = 'tgMCQ'consumer = KafkaConsumer(TOPIC_NAME, value_deserializer=lambda m: json.loads(m.decode('utf-8')))
Finally, as messages come in, we’ll print them out.
for message in consumer:
print(message)
Perfect! Run the python file. We’ll next run our producer.
python3 consumer.py
Step II: Send a Test Message from a Kafka Producer
Now that the consumer is ready, let’s send a sample message to ensure everything is working. In app.py, first import kafka.
from kafka import KafkaProducer
We’ll then set up the KafkaProducer. Once again our topic name will be tgMCQs. Then, we’ll host the Kafka server locally, on localhost:9092. Then we’ll pass it to be our Kafka Producer.
TOPIC_NAME = 'tgMCQ'
KAFKA_SERVER = 'localhost:9092'producer = KafkaProducer(bootstrap_servers=KAFKA_SERVER)
Finally, we’ll send the message “TigerGraph is amazing!!”
producer.send(TOPIC_NAME, bytes("[{\"msg\": \"TigerGraph is amazing!!\"}]", 'utf-8'))producer.flush()
Let’s run this file in a separate terminal.
python3 app.py
If you then navigate back to the terminal where you ran consumer.py, you’ll notice the message appear in the terminal.
Fantastic! Let’s next create a request to upsert a vertex.
Part II: Update your Graph with Kafka and pyTigerGraph using Flask
Step I: Update the Consumer
The producer will remain the same, but let’s add to the consumer to upsert data into pyTigerGraph. To start off, we’ll import more libraries, including SpaCy and pyTigerGraph.
from kafka import KafkaConsumer
import jsonimport pyTigerGraph as tgimport spacy
from spacy import displacy
import en_core_web_sm
nlp = en_core_web_sm.load()
Next, we’ll need to connect to our TigerGraph solution. Here, replace SUBDOMAIN and PASSWORD with your subdomain and password.
Note: As of July 28th, you’ll need to add a beta=True. This will be changed in future versions.
conn = tg.TigerGraphConnection(host="https://SUBDOMAIN.i.tgcloud.io/", password="PASSWORD", graphname="NotesGraph", beta=True)
conn.apiToken = conn.getToken(conn.createSecret())print("Connected")
Since it takes a while for it to connect, I have it print “Connected” right after.
Next, we once again create the KafkaConsumer.
TOPIC_NAME = 'tgMCQ'consumer = KafkaConsumer(TOPIC_NAME, value_deserializer=lambda m: json.loads(m.decode('utf-8')))
Perfect! Finally, when we get a new message, we’ll run the same function as described in the past blog, running Named Entity Recognition and upserting all the necessary vertices and edges.
for message in consumer: notes = message.value[0]["msg"]
article = nlp(notes)
sentences = [x for x in article.sents] conn.upsertVertex("Folder", "KafkaFolder", attributes={"folder": "KafkaFolder"})
conn.upsertVertex("Document", "KafkaDoc", attributes={"document": "KafkaDoc"})
conn.upsertEdge("Folder", "KafkaFolder", "FOLDER_DOCUMENT", "Document", "KafkaDoc") for sent in sentences: conn.upsertVertex("Sentence", str(sent), attributes={"setence": str(sent)})
conn.upsertEdge("Document", "KafkaDoc", "DOCUMENT_SENTENCE", "Sentence", str(sent)) val = nlp(str(sent)) if val: for ent in val.ents: conn.upsertVertex("Entity", ent.label_, attributes={"entity": ent.label_})
conn.upsertVertex("Entity_Name", ent.text, attributes={"entity_name": ent.text})
conn.upsertEdge("Sentence", str(sent), "SENTENCE_ENTITY_NAME", "Entity_Name", ent.text)
conn.upsertEdge("Entity_Name", ent.text, "ENTITY_NAME_ENTITY", "Entity", ent.label_)
conn.upsertEdge("Document", "KafkaDoc", "DOCUMENT_ENTITY_NAME", "Entity_Name", ent.text)
Perfect! Now, if we run the consumer.py then run the producer (app.py), we can check GraphStudio to find the new sentence vertex upserted.
Step II: Add Flask
Finally, let’s add Flask. In app.py, add the other library imports, primarily Flask and pyTigerGraph.
from flask import Flask, render_template, request, redirectfrom kafka import KafkaProducerimport randomimport pyTigerGraph as tg
Once again, let’s connect to pyTigerGraph and create the Kafka producer.
conn = tg.TigerGraphConnection(host="https://SUBDOMAIN.i.tgcloud.io/", password="PASSWORD", graphname="NotesGraph", beta=True)
conn.apiToken = conn.getToken(conn.createSecret())
print("Connected")TOPIC_NAME = 'tgMCQ'
KAFKA_SERVER = 'localhost:9092'
producer = KafkaProducer(bootstrap_servers=KAFKA_SERVER)
Next, we’ll initialise the Flask app.
app = Flask(__name__)
We’ll then add one endpoint, /, then we will run the app.
@app.route("/")def main():
return render_template("index.html")app.run(debug=True)
Step III: Update the HTML
Next, open the index.html file. For now, we’ll just add a textbox. It will be in a form to make a POST request.
<!DOCTYPE html><html><body><form action="/sent_data" method="post"> <input type="input_box" name = "passage"></form></body></html>
Let’s translate this endpoint in the main app.
@app.route("/sent_data", methods=['POST'])def process_data(): text = request.form.get("passage") print(text) msg = '[{"type": "add_text"}, {"msg": "' + text + '"}]' producer.send(TOPIC_NAME, bytes(msg, 'utf-8')) producer.flush() return redirect("/")
Then, if we run it, we’ll be able to upsert data in pyTigerGraph using Kafka and Flask!
Part III: Congrats!
Congrats! You made it through this blog! If you have more questions or want more content like this, please join the TigerGraph Discord!