-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathMQPullConsumer.py
More file actions
94 lines (79 loc) · 3.32 KB
/
MQPullConsumer.py
File metadata and controls
94 lines (79 loc) · 3.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#!/usr/bin/python
# -*- coding:utf-8 -*-
from jpype import *
import logging
import time
logger = logging.getLogger("MQPullConsumer")
__all__ = ["MQPullConsumer"]
DefaultMQPullConsumer= JPackage('com.alibaba.rocketmq.client.consumer').DefaultMQPullConsumer
MQClientException = JPackage('com.alibaba.rocketmq.client.exception').MQClientException
PullResult = JPackage('com.alibaba.rocketmq.client.consumer').PullResult
MessageQueue = JPackage('com.alibaba.rocketmq.common.message').MessageQueue
class MQPullConsumer(object):
def __init__(self, groupName, namesrvAddr):
"""
:param groupName:
:param namesrvAddr:
:return:
"""
self.consumer = None #初始化放在了init函数中
self.groupName = groupName
self.namesrvAddr = namesrvAddr
self.instanceName = str(int(time.time()*1000)) #毫秒值作为instance name
self.mqs = None
self.offseTable = {} # map of message queue id to queue offset
def init(self):
"""批量设置一些基本项(为了尽可能少实现这些API接口,如以后有需要,可以逐个移出init)"""
logger.info('Initializing consumer ' + self.instanceName + ' ...')
self.consumer = DefaultMQPullConsumer(JString(self.groupName)) #创建实例
self.consumer.setNamesrvAddr(JString(self.namesrvAddr))
self.consumer.setInstanceName(JString(self.instanceName))
def start(self):
"""
# JAVA prototype
# public void start() throws MQClientException {
"""
logger.info('Starting consumer ' + self.instanceName + ' ...')
self.consumer.start()
def shutdown(self):
"""
# JAVA prototype
# public void shutdown() {
"""
logger.info('Shutting down consumer ' + self.instanceName + ' ...')
self.consumer.shutdown()
def pullBlockIfNotFound(self, mq, subExpression, offset, maxNums):
"""
# JAVA prototype
# public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
# throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
# public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums,
# PullCallback pullCallback) throws MQClientException, RemotingException, InterruptedException {
"""
pullResult = self.consumer.pullBlockIfNotFound(mq, subExpression, self.getMessageQueueOffset(mq), maxNums)
return pullResult
def fetchSubscribeMessageQueues(self, topic):
"""
# JAVA prototype
# public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
"""
self.mqs = self.consumer.fetchSubscribeMessageQueues(JString(topic))
def getMessageQueueOffset(self, mq):
"""
获取某个MQ中的当前消息的offset
:param mq:
:return:
"""
haskey = self.offseTable.has_key(mq.queueId)
if haskey:
return self.offseTable[mq.queueId]
else:
return 0
def putMessageQueueOffset(self, mq, offset):
"""
设置某个MQ中的当前消息的offset(更新后的值)
:param mq:
:param offset:
:return:
"""
self.offseTable[mq.queueId] = offset