Messenger: hopefully handle joins correctly
This commit is contained in:
parent
d2791094d9
commit
d5d74b0b73
2 changed files with 114 additions and 67 deletions
2
connector/external/external.go
vendored
2
connector/external/external.go
vendored
|
@ -111,7 +111,7 @@ func (ext *External) Configure(c Configuration) error {
|
||||||
|
|
||||||
ext.generation += 1
|
ext.generation += 1
|
||||||
|
|
||||||
ext.handlerChan = make(chan *extMessageWithData)
|
ext.handlerChan = make(chan *extMessageWithData, 1000)
|
||||||
go ext.handlerLoop(ext.generation)
|
go ext.handlerLoop(ext.generation)
|
||||||
|
|
||||||
err = ext.setupProc()
|
err = ext.setupProc()
|
||||||
|
|
179
external/messenger.py
vendored
179
external/messenger.py
vendored
|
@ -95,52 +95,12 @@ class InitialSyncThread(threading.Thread):
|
||||||
self.threads = threads
|
self.threads = threads
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
sys.stderr.write("fb thread list: {}\n".format(self.threads))
|
sys.stderr.write("(python) fb thread list: {}\n".format(self.threads))
|
||||||
|
|
||||||
for thread in self.threads:
|
for thread in self.threads:
|
||||||
sys.stderr.write("fb thread: {}\n".format(thread))
|
sys.stderr.write("(python) fb thread: {}\n".format(thread))
|
||||||
if thread.type == ThreadType.GROUP:
|
self.bridge.setup_joined_thread(thread)
|
||||||
members = self.client.fetchAllUsersFromThreads([thread])
|
|
||||||
|
|
||||||
self.bridge.write({
|
|
||||||
"_type": JOINED,
|
|
||||||
"room": thread.uid,
|
|
||||||
})
|
|
||||||
|
|
||||||
self.bridge.send_room_info(thread, members)
|
|
||||||
self.bridge.send_room_members(thread, members)
|
|
||||||
|
|
||||||
self.backlog_room(thread)
|
|
||||||
|
|
||||||
|
|
||||||
def backlog_room(self, thread):
|
|
||||||
prev_last_seen = self.bridge.cache_get("last_seen_%s"%thread.uid)
|
|
||||||
if prev_last_seen == "":
|
|
||||||
prev_last_seen = None
|
|
||||||
|
|
||||||
messages = []
|
|
||||||
found = False
|
|
||||||
while not found:
|
|
||||||
before = None
|
|
||||||
if len(messages) > 0:
|
|
||||||
before = messages[-1].timestamp
|
|
||||||
page = self.client.fetchThreadMessages(thread.uid, before=before, limit=20)
|
|
||||||
for m in page:
|
|
||||||
if m.uid == prev_last_seen or len(messages) > self.bridge.init_backlog_length:
|
|
||||||
found = True
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
messages.append(m)
|
|
||||||
|
|
||||||
for m in reversed(messages):
|
|
||||||
if m.text is None:
|
|
||||||
m.text = ""
|
|
||||||
m.text = "[{}] {}".format(
|
|
||||||
time.strftime("%Y-%m-%d %H:%M %Z", time.localtime(float(m.timestamp)/1000)).strip(),
|
|
||||||
m.text)
|
|
||||||
self.bridge.onMessage(thread_id=thread.uid,
|
|
||||||
thread_type=thread.type,
|
|
||||||
message_object=m)
|
|
||||||
|
|
||||||
class ClientListenThread(threading.Thread):
|
class ClientListenThread(threading.Thread):
|
||||||
def __init__(self, client, *args, **kwargs):
|
def __init__(self, client, *args, **kwargs):
|
||||||
|
@ -149,7 +109,7 @@ class ClientListenThread(threading.Thread):
|
||||||
self.client = client
|
self.client = client
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
sys.stderr.write("Start client.listen()\n")
|
sys.stderr.write("(python messenger) Start client.listen()\n")
|
||||||
self.client.listen()
|
self.client.listen()
|
||||||
|
|
||||||
|
|
||||||
|
@ -159,7 +119,8 @@ class MessengerBridge:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.rev_uid = {}
|
self.rev_uid = {}
|
||||||
self.uid_map = {}
|
self.uid_map = {}
|
||||||
self.joined_map = {}
|
self.others_joined_map = {}
|
||||||
|
self.my_joined_rooms = {}
|
||||||
self.init_backlog_length = 100
|
self.init_backlog_length = 100
|
||||||
|
|
||||||
def getUserId(self, user):
|
def getUserId(self, user):
|
||||||
|
@ -215,7 +176,7 @@ class MessengerBridge:
|
||||||
try:
|
try:
|
||||||
line = sys.stdin.readline()
|
line = sys.stdin.readline()
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
sys.stderr.write("(messenger) shutting down")
|
sys.stderr.write("(python messenger) shutting down")
|
||||||
self.close()
|
self.close()
|
||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
@ -230,7 +191,7 @@ class MessengerBridge:
|
||||||
if "_type" not in rep:
|
if "_type" not in rep:
|
||||||
rep["_type"] = REP_OK
|
rep["_type"] = REP_OK
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
sys.stderr.write("{}\n".format(traceback.format_exc()))
|
sys.stderr.write("(python) {}\n".format(traceback.format_exc()))
|
||||||
rep = {
|
rep = {
|
||||||
"_type": REP_ERROR,
|
"_type": REP_ERROR,
|
||||||
"error": "{}".format(e)
|
"error": "{}".format(e)
|
||||||
|
@ -292,8 +253,19 @@ class MessengerBridge:
|
||||||
userId = self.getUserIdFromUid(self.client.uid)
|
userId = self.getUserIdFromUid(self.client.uid)
|
||||||
return {"_type": REP_OK, "user": userId}
|
return {"_type": REP_OK, "user": userId}
|
||||||
|
|
||||||
elif ty == INVITE and cmd["room"] == "":
|
elif ty == JOIN:
|
||||||
return {"_type": REP_OK}
|
self.ensure_i_joined(cmd["room"])
|
||||||
|
|
||||||
|
elif ty == LEAVE:
|
||||||
|
thread_id = cmd["room"]
|
||||||
|
self.client.removeUserFromGroup(self.client.uid, thread_id)
|
||||||
|
if thread_id in self.my_joined_rooms:
|
||||||
|
del self.my_joined_rooms[thread_id]
|
||||||
|
|
||||||
|
elif ty == INVITE:
|
||||||
|
if cmd["room"] != "":
|
||||||
|
uid = self.revUserId(cmd["user"])
|
||||||
|
self.client.addUsersToGroup([uid], cmd["room"])
|
||||||
|
|
||||||
elif ty == SEND:
|
elif ty == SEND:
|
||||||
event = cmd["data"]
|
event = cmd["data"]
|
||||||
|
@ -318,7 +290,7 @@ class MessengerBridge:
|
||||||
msg_id = self.client.send(msg, thread_id=event["room"], thread_type=ThreadType.GROUP)
|
msg_id = self.client.send(msg, thread_id=event["room"], thread_type=ThreadType.GROUP)
|
||||||
elif event["recipient"] != "":
|
elif event["recipient"] != "":
|
||||||
uid = self.revUserId(event["recipient"])
|
uid = self.revUserId(event["recipient"])
|
||||||
sys.stderr.write("Sending to {}\n".format(uid))
|
sys.stderr.write("(python) Sending to {}\n".format(uid))
|
||||||
if len(attachments) > 0:
|
if len(attachments) > 0:
|
||||||
msg_id = self.client.sendRemoteFiles(attachments, message=msg, thread_id=uid, thread_type=ThreadType.USER)
|
msg_id = self.client.sendRemoteFiles(attachments, message=msg, thread_id=uid, thread_type=ThreadType.USER)
|
||||||
else:
|
else:
|
||||||
|
@ -344,13 +316,40 @@ class MessengerBridge:
|
||||||
q = queue.Queue(1)
|
q = queue.Queue(1)
|
||||||
self.cache_gets[num] = q
|
self.cache_gets[num] = q
|
||||||
self.write({"_type": CACHE_GET, "_id": num, "key": key})
|
self.write({"_type": CACHE_GET, "_id": num, "key": key})
|
||||||
rep = q.get(block=True, timeout=30)
|
try:
|
||||||
|
rep = q.get(block=True, timeout=30)
|
||||||
|
except queue.Empty:
|
||||||
|
rep = ""
|
||||||
del self.cache_gets[num]
|
del self.cache_gets[num]
|
||||||
return rep
|
return rep
|
||||||
|
|
||||||
def cache_put(self, key, value):
|
def cache_put(self, key, value):
|
||||||
self.write({"_type": CACHE_PUT, "key": key, "value": value})
|
self.write({"_type": CACHE_PUT, "key": key, "value": value})
|
||||||
|
|
||||||
|
# ---- Info sync ----
|
||||||
|
|
||||||
|
def ensure_i_joined(self, thread_id):
|
||||||
|
if thread_id not in self.my_joined_rooms:
|
||||||
|
thread = self.client.fetchThreadInfo(thread_id)[thread_id]
|
||||||
|
self.setup_joined_thread(thread)
|
||||||
|
|
||||||
|
def setup_joined_thread(self, thread):
|
||||||
|
self.my_joined_rooms[thread.uid] = True
|
||||||
|
|
||||||
|
if thread.type == ThreadType.GROUP:
|
||||||
|
members = self.client.fetchAllUsersFromThreads([thread])
|
||||||
|
|
||||||
|
self.write({
|
||||||
|
"_type": JOINED,
|
||||||
|
"room": thread.uid,
|
||||||
|
})
|
||||||
|
|
||||||
|
self.send_room_info(thread, members)
|
||||||
|
self.send_room_members(thread, members)
|
||||||
|
|
||||||
|
self.backlog_room(thread)
|
||||||
|
|
||||||
|
|
||||||
def send_room_info(self, thread, members):
|
def send_room_info(self, thread, members):
|
||||||
room_info = {}
|
room_info = {}
|
||||||
if thread.name is not None:
|
if thread.name is not None:
|
||||||
|
@ -378,12 +377,41 @@ class MessengerBridge:
|
||||||
|
|
||||||
def send_room_members(self, thread, members):
|
def send_room_members(self, thread, members):
|
||||||
for member in members:
|
for member in members:
|
||||||
sys.stderr.write("fb thread member: {}\n".format(member))
|
sys.stderr.write("(python) fb thread member: {}\n".format(member))
|
||||||
self.ensureJoined(self.getUserId(member), thread.uid)
|
self.ensureJoined(self.getUserId(member), thread.uid)
|
||||||
|
|
||||||
|
def backlog_room(self, thread):
|
||||||
|
prev_last_seen = self.cache_get("last_seen_%s"%thread.uid)
|
||||||
|
if prev_last_seen == "":
|
||||||
|
prev_last_seen = None
|
||||||
|
|
||||||
|
messages = []
|
||||||
|
found = False
|
||||||
|
while not found:
|
||||||
|
before = None
|
||||||
|
if len(messages) > 0:
|
||||||
|
before = messages[-1].timestamp
|
||||||
|
page = self.client.fetchThreadMessages(thread.uid, before=before, limit=20)
|
||||||
|
for m in page:
|
||||||
|
if m.uid == prev_last_seen or len(messages) > self.init_backlog_length:
|
||||||
|
found = True
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
messages.append(m)
|
||||||
|
|
||||||
|
for m in reversed(messages):
|
||||||
|
if m.text is None:
|
||||||
|
m.text = ""
|
||||||
|
m.text = "[{}] {}".format(
|
||||||
|
time.strftime("%Y-%m-%d %H:%M %Z", time.localtime(float(m.timestamp)/1000)).strip(),
|
||||||
|
m.text)
|
||||||
|
self.onMessage(thread_id=thread.uid,
|
||||||
|
thread_type=thread.type,
|
||||||
|
message_object=m)
|
||||||
|
|
||||||
def ensureJoined(self, userId, room):
|
def ensureJoined(self, userId, room):
|
||||||
key = "{}--{}".format(userId, room)
|
key = "{}--{}".format(userId, room)
|
||||||
if not key in self.joined_map:
|
if not key in self.others_joined_map:
|
||||||
self.write({
|
self.write({
|
||||||
"_type": EVENT,
|
"_type": EVENT,
|
||||||
"data": {
|
"data": {
|
||||||
|
@ -392,9 +420,13 @@ class MessengerBridge:
|
||||||
"room": room,
|
"room": room,
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
self.joined_map[key] = True
|
self.others_joined_map[key] = True
|
||||||
|
|
||||||
|
# ---- Event handlers ----
|
||||||
|
|
||||||
def onMessage(self, thread_id, thread_type, message_object, **kwargs):
|
def onMessage(self, thread_id, thread_type, message_object, **kwargs):
|
||||||
|
self.ensure_i_joined(thread_id)
|
||||||
|
|
||||||
if message_object.author == self.client.uid:
|
if message_object.author == self.client.uid:
|
||||||
# Ignore our own messages
|
# Ignore our own messages
|
||||||
return
|
return
|
||||||
|
@ -442,29 +474,44 @@ class MessengerBridge:
|
||||||
event["room"] = thread_id
|
event["room"] = thread_id
|
||||||
self.ensureJoined(author, thread_id)
|
self.ensureJoined(author, thread_id)
|
||||||
|
|
||||||
self.write({"_type": EVENT, "data": event})
|
if event["text"] != "" or len(event["attachments"]) > 0:
|
||||||
|
self.write({"_type": EVENT, "data": event})
|
||||||
|
|
||||||
self.cache_put("last_seen_%s"%thread_id, message_object.uid)
|
self.cache_put("last_seen_%s"%thread_id, message_object.uid)
|
||||||
|
|
||||||
def onPeopleAdded(self, added_ids, thread_id, *args, **kwargs):
|
def onPeopleAdded(self, added_ids, thread_id, *args, **kwargs):
|
||||||
for user_id in added_ids:
|
for user_id in added_ids:
|
||||||
self.ensureJoined(self.getUserIdFromUid(user_id), thread_id)
|
if user_id == self.client.uid:
|
||||||
|
self.ensure_i_joined(thread_id)
|
||||||
|
else:
|
||||||
|
self.ensureJoined(self.getUserIdFromUid(user_id), thread_id)
|
||||||
|
|
||||||
def onPersonRemoved(self, removed_id, thread_id, *args, **kwargs):
|
def onPersonRemoved(self, removed_id, thread_id, *args, **kwargs):
|
||||||
userId = self.getUserIdFromUid(removed_id),
|
if removed_id == self.client.uid:
|
||||||
self.write({
|
self.write({
|
||||||
"_type": EVENT,
|
"_type": LEFT,
|
||||||
"data": {
|
|
||||||
"type": EVENT_JOIN,
|
|
||||||
"author": userId,
|
|
||||||
"room": thread_id,
|
"room": thread_id,
|
||||||
}
|
})
|
||||||
})
|
if thread_id in self.my_joined_rooms:
|
||||||
del self.joined_map["{}--{}".format(userId, thread_id)]
|
del self.my_joined_rooms[thread_id]
|
||||||
|
else:
|
||||||
|
userId = self.getUserIdFromUid(removed_id),
|
||||||
|
self.write({
|
||||||
|
"_type": EVENT,
|
||||||
|
"data": {
|
||||||
|
"type": EVENT_JOIN,
|
||||||
|
"author": userId,
|
||||||
|
"room": thread_id,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
map_key = "{}--{}".format(userId, thread_id)
|
||||||
|
if map_key in self.others_joined_map:
|
||||||
|
del self.others_joined_map[map_key]
|
||||||
|
|
||||||
def onTitleChange(self, author_id, new_title, thread_id, thread_type, *args, **kwargs):
|
def onTitleChange(self, author_id, new_title, thread_id, thread_type, *args, **kwargs):
|
||||||
|
self.ensure_i_joined(thread_id)
|
||||||
if thread_type == ThreadType.GROUP:
|
if thread_type == ThreadType.GROUP:
|
||||||
self.bridge.write({
|
self.write({
|
||||||
"_type": ROOM_INFO_UPDATED,
|
"_type": ROOM_INFO_UPDATED,
|
||||||
"room": thread_id,
|
"room": thread_id,
|
||||||
"data": {"name": new_title},
|
"data": {"name": new_title},
|
||||||
|
|
Loading…
Reference in a new issue