Python_rocketmq_生产者

import json
from rocketmq.client import Producer
from rocketmq.client import Message

# 初始化生产者
producer = Producer('PYthon_Test')
# 设置NameServer地址
producer.set_namesrv_addr('172.16.100.39:9876')
#producer.set_session_credentials("adminmq", "xxxxxxx", 'authChannel')  # 完成设置验证
producer.set_session_credentials("jtkj_mq", "xxxx", 'authChannel')  # 完成设置验证
# 启动生产者
producer.start()

# 构建消息(需要将消息发送到指定Topic)
message = Message('TestTopic')
# 设置消息内容(注意消息只能是字符串)
msg_body = {"id":"test_id","name":"test_name","message":"test_message"}
message.set_body(json.dumps(msg_body).encode('utf-8'))
# 可以设置其他属性,如Tags、Keys等
message.set_tags('python')
message.set_keys('test')

try:
    # 发送消息(此处就会发送消息到RocketMQ服务器)
    ret = producer.send_sync(message)
    # 打印发送结果
    print(f'status:{ret.status}') # 0表示OK
    print(f'msg_id:{ret.msg_id}') # 消息id,同消费者获取到的消息id
    print(f'offset:{ret.offset}') # 偏移量,默认从0开始,1,2。。。
    if ret.status == 0:
        print("发送成功")
    else:
        print(f"发送失败, 消息状态: {ret.status}")
except Exception as e:
    # 处理发送过程中可能出现的异常
    print(f"Send message failed, exception: {e}")
finally:
    # 停止生产者
    producer.shutdown()