1 year ago
#323254

Tej Patel
Kafka python consumer not receiving new data inserted into the source data
I have my source database as Mysql and sink database as Cassandra. Python producer code establishes connection with Mysql and sends data to kafka topic, in my case the topic name is demo. The consumer code then data to Cassandra.
Here is my producer code
import json
from kafka import KafkaProducer
import pymysql.cursors
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
connection = pymysql.connect(host='127.0.0.1',
user='root',
port=3306,
password='Mysql@123',
db='bank_transaction',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
cursor = connection.cursor()
sql = """\select * from bank_transaction.transactions"""
cursor.execute(sql)
rows = cursor.fetchall()
data = ""
for row in rows:
producer.send('demo', json.dumps(row, default=str).encode("utf-8"))
producer.flush()
# cursor.close()
# connection.close()
# configure multiple retries
producer = KafkaProducer(retries=5)
Here is my cosumer code. I have created a keyspace called 'test' and table called 'response' in Cassandra.
from encodings import utf_8
from kafka import KafkaConsumer
import json
from cassandra.cluster import Cluster
from cassandra.policies import DCAwareRoundRobinPolicy
cluster = Cluster()
session = cluster.connect('test')
print("After connecting to kafka")
consumer = KafkaConsumer('demo',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
def insert(message):
msg = message.value.decode('utf-8')
print(msg)
msg = json.loads(msg)
keys = ",".join(msg.keys())
values = ','.join(str(v) for v in msg.values())
user_insert_stmt = session.prepare("insert into response ({0}) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)".format(keys))
new_msg = msg.values()
return session.execute(user_insert_stmt,new_msg)
for message in consumer:
insert (message)
When I try to insert new data into the Mysql table using the INSERT INTO query, the new data is not received by the consumer. My kafka-python version is 1.3.5, kafka version is 2.7.0 and Python version 3.8.10.
python
mysql
apache-kafka
kafka-python
0 Answers
Your Answer