1 year ago

#323254

test-img

Tej Patel

Kafka python consumer not receiving new data inserted into the source data

I have my source database as Mysql and sink database as Cassandra. Python producer code establishes connection with Mysql and sends data to kafka topic, in my case the topic name is demo. The consumer code then data to Cassandra.

Here is my producer code

import json
from kafka import KafkaProducer
import pymysql.cursors

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

connection = pymysql.connect(host='127.0.0.1',
                                     user='root',
                                     port=3306,
                                     password='Mysql@123',
                                     db='bank_transaction',
                                     charset='utf8mb4',
                                     cursorclass=pymysql.cursors.DictCursor)
cursor = connection.cursor()
sql = """\select * from bank_transaction.transactions"""
cursor.execute(sql)
rows = cursor.fetchall()
data = ""


for row in rows:

    producer.send('demo', json.dumps(row, default=str).encode("utf-8"))
    producer.flush()


# cursor.close()
# connection.close()

# configure multiple retries
producer = KafkaProducer(retries=5)

Here is my cosumer code. I have created a keyspace called 'test' and table called 'response' in Cassandra.

from encodings import utf_8
from kafka import KafkaConsumer
import json
from cassandra.cluster import Cluster
from cassandra.policies import DCAwareRoundRobinPolicy



cluster = Cluster()
session = cluster.connect('test')




print("After connecting to kafka")

consumer = KafkaConsumer('demo',
                         group_id='my-group',
                         bootstrap_servers=['localhost:9092'])

def insert(message):
    msg = message.value.decode('utf-8')
    print(msg)
    msg = json.loads(msg)

    keys = ",".join(msg.keys())
    values = ','.join(str(v) for v in msg.values())
    user_insert_stmt = session.prepare("insert into response ({0}) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)".format(keys))
    new_msg = msg.values()
    return session.execute(user_insert_stmt,new_msg)

for message in consumer:
    insert (message)

When I try to insert new data into the Mysql table using the INSERT INTO query, the new data is not received by the consumer. My kafka-python version is 1.3.5, kafka version is 2.7.0 and Python version 3.8.10.

python

mysql

apache-kafka

kafka-python

0 Answers

Your Answer

Accepted video resources