import json
from rocketmq.client import Producer
from rocketmq.client import Message
# 初始化生产者
producer = Producer('PYthon_Test')
# 设置NameServer地址
producer.set_namesrv_addr('172.16.100.39:9876')
#producer.set_session_credentials("adminmq", "xxxxxxx", 'authChannel') # 完成设置验证
producer.set_session_credentials("jtkj_mq", "xxxx", 'authChannel') # 完成设置验证
# 启动生产者
producer.start()
# 构建消息(需要将消息发送到指定Topic)
message = Message('TestTopic')
# 设置消息内容(注意消息只能是字符串)
msg_body = {"id":"test_id","name":"test_name","message":"test_message"}
message.set_body(json.dumps(msg_body).encode('utf-8'))
# 可以设置其他属性,如Tags、Keys等
message.set_tags('python')
message.set_keys('test')
try:
# 发送消息(此处就会发送消息到RocketMQ服务器)
ret = producer.send_sync(message)
# 打印发送结果
print(f'status:{ret.status}') # 0表示OK
print(f'msg_id:{ret.msg_id}') # 消息id,同消费者获取到的消息id
print(f'offset:{ret.offset}') # 偏移量,默认从0开始,1,2。。。
if ret.status == 0:
print("发送成功")
else:
print(f"发送失败, 消息状态: {ret.status}")
except Exception as e:
# 处理发送过程中可能出现的异常
print(f"Send message failed, exception: {e}")
finally:
# 停止生产者
producer.shutdown()