From d5d74b0b73590fcee103f3beea7ee5ff7ca83653 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Sun, 1 Mar 2020 20:38:00 +0100 Subject: [PATCH] Messenger: hopefully handle joins correctly --- connector/external/external.go | 2 +- external/messenger.py | 179 +++++++++++++++++++++------------ 2 files changed, 114 insertions(+), 67 deletions(-) diff --git a/connector/external/external.go b/connector/external/external.go index 9a4137d..6d25230 100644 --- a/connector/external/external.go +++ b/connector/external/external.go @@ -111,7 +111,7 @@ func (ext *External) Configure(c Configuration) error { ext.generation += 1 - ext.handlerChan = make(chan *extMessageWithData) + ext.handlerChan = make(chan *extMessageWithData, 1000) go ext.handlerLoop(ext.generation) err = ext.setupProc() diff --git a/external/messenger.py b/external/messenger.py index cb48410..9728c9d 100755 --- a/external/messenger.py +++ b/external/messenger.py @@ -95,52 +95,12 @@ class InitialSyncThread(threading.Thread): self.threads = threads def run(self): - sys.stderr.write("fb thread list: {}\n".format(self.threads)) + sys.stderr.write("(python) fb thread list: {}\n".format(self.threads)) for thread in self.threads: - sys.stderr.write("fb thread: {}\n".format(thread)) - if thread.type == ThreadType.GROUP: - members = self.client.fetchAllUsersFromThreads([thread]) + sys.stderr.write("(python) fb thread: {}\n".format(thread)) + self.bridge.setup_joined_thread(thread) - self.bridge.write({ - "_type": JOINED, - "room": thread.uid, - }) - - self.bridge.send_room_info(thread, members) - self.bridge.send_room_members(thread, members) - - self.backlog_room(thread) - - - def backlog_room(self, thread): - prev_last_seen = self.bridge.cache_get("last_seen_%s"%thread.uid) - if prev_last_seen == "": - prev_last_seen = None - - messages = [] - found = False - while not found: - before = None - if len(messages) > 0: - before = messages[-1].timestamp - page = self.client.fetchThreadMessages(thread.uid, before=before, limit=20) - for m in page: - if m.uid == prev_last_seen or len(messages) > self.bridge.init_backlog_length: - found = True - break - else: - messages.append(m) - - for m in reversed(messages): - if m.text is None: - m.text = "" - m.text = "[{}] {}".format( - time.strftime("%Y-%m-%d %H:%M %Z", time.localtime(float(m.timestamp)/1000)).strip(), - m.text) - self.bridge.onMessage(thread_id=thread.uid, - thread_type=thread.type, - message_object=m) class ClientListenThread(threading.Thread): def __init__(self, client, *args, **kwargs): @@ -149,7 +109,7 @@ class ClientListenThread(threading.Thread): self.client = client def run(self): - sys.stderr.write("Start client.listen()\n") + sys.stderr.write("(python messenger) Start client.listen()\n") self.client.listen() @@ -159,7 +119,8 @@ class MessengerBridge: def __init__(self): self.rev_uid = {} self.uid_map = {} - self.joined_map = {} + self.others_joined_map = {} + self.my_joined_rooms = {} self.init_backlog_length = 100 def getUserId(self, user): @@ -215,7 +176,7 @@ class MessengerBridge: try: line = sys.stdin.readline() except KeyboardInterrupt: - sys.stderr.write("(messenger) shutting down") + sys.stderr.write("(python messenger) shutting down") self.close() time.sleep(5) sys.exit(0) @@ -230,7 +191,7 @@ class MessengerBridge: if "_type" not in rep: rep["_type"] = REP_OK except Exception as e: - sys.stderr.write("{}\n".format(traceback.format_exc())) + sys.stderr.write("(python) {}\n".format(traceback.format_exc())) rep = { "_type": REP_ERROR, "error": "{}".format(e) @@ -292,8 +253,19 @@ class MessengerBridge: userId = self.getUserIdFromUid(self.client.uid) return {"_type": REP_OK, "user": userId} - elif ty == INVITE and cmd["room"] == "": - return {"_type": REP_OK} + elif ty == JOIN: + self.ensure_i_joined(cmd["room"]) + + elif ty == LEAVE: + thread_id = cmd["room"] + self.client.removeUserFromGroup(self.client.uid, thread_id) + if thread_id in self.my_joined_rooms: + del self.my_joined_rooms[thread_id] + + elif ty == INVITE: + if cmd["room"] != "": + uid = self.revUserId(cmd["user"]) + self.client.addUsersToGroup([uid], cmd["room"]) elif ty == SEND: event = cmd["data"] @@ -318,7 +290,7 @@ class MessengerBridge: msg_id = self.client.send(msg, thread_id=event["room"], thread_type=ThreadType.GROUP) elif event["recipient"] != "": uid = self.revUserId(event["recipient"]) - sys.stderr.write("Sending to {}\n".format(uid)) + sys.stderr.write("(python) Sending to {}\n".format(uid)) if len(attachments) > 0: msg_id = self.client.sendRemoteFiles(attachments, message=msg, thread_id=uid, thread_type=ThreadType.USER) else: @@ -344,13 +316,40 @@ class MessengerBridge: q = queue.Queue(1) self.cache_gets[num] = q self.write({"_type": CACHE_GET, "_id": num, "key": key}) - rep = q.get(block=True, timeout=30) + try: + rep = q.get(block=True, timeout=30) + except queue.Empty: + rep = "" del self.cache_gets[num] return rep def cache_put(self, key, value): self.write({"_type": CACHE_PUT, "key": key, "value": value}) + # ---- Info sync ---- + + def ensure_i_joined(self, thread_id): + if thread_id not in self.my_joined_rooms: + thread = self.client.fetchThreadInfo(thread_id)[thread_id] + self.setup_joined_thread(thread) + + def setup_joined_thread(self, thread): + self.my_joined_rooms[thread.uid] = True + + if thread.type == ThreadType.GROUP: + members = self.client.fetchAllUsersFromThreads([thread]) + + self.write({ + "_type": JOINED, + "room": thread.uid, + }) + + self.send_room_info(thread, members) + self.send_room_members(thread, members) + + self.backlog_room(thread) + + def send_room_info(self, thread, members): room_info = {} if thread.name is not None: @@ -378,12 +377,41 @@ class MessengerBridge: def send_room_members(self, thread, members): for member in members: - sys.stderr.write("fb thread member: {}\n".format(member)) + sys.stderr.write("(python) fb thread member: {}\n".format(member)) self.ensureJoined(self.getUserId(member), thread.uid) + def backlog_room(self, thread): + prev_last_seen = self.cache_get("last_seen_%s"%thread.uid) + if prev_last_seen == "": + prev_last_seen = None + + messages = [] + found = False + while not found: + before = None + if len(messages) > 0: + before = messages[-1].timestamp + page = self.client.fetchThreadMessages(thread.uid, before=before, limit=20) + for m in page: + if m.uid == prev_last_seen or len(messages) > self.init_backlog_length: + found = True + break + else: + messages.append(m) + + for m in reversed(messages): + if m.text is None: + m.text = "" + m.text = "[{}] {}".format( + time.strftime("%Y-%m-%d %H:%M %Z", time.localtime(float(m.timestamp)/1000)).strip(), + m.text) + self.onMessage(thread_id=thread.uid, + thread_type=thread.type, + message_object=m) + def ensureJoined(self, userId, room): key = "{}--{}".format(userId, room) - if not key in self.joined_map: + if not key in self.others_joined_map: self.write({ "_type": EVENT, "data": { @@ -392,9 +420,13 @@ class MessengerBridge: "room": room, } }) - self.joined_map[key] = True + self.others_joined_map[key] = True + + # ---- Event handlers ---- def onMessage(self, thread_id, thread_type, message_object, **kwargs): + self.ensure_i_joined(thread_id) + if message_object.author == self.client.uid: # Ignore our own messages return @@ -442,29 +474,44 @@ class MessengerBridge: event["room"] = thread_id self.ensureJoined(author, thread_id) - self.write({"_type": EVENT, "data": event}) + if event["text"] != "" or len(event["attachments"]) > 0: + self.write({"_type": EVENT, "data": event}) self.cache_put("last_seen_%s"%thread_id, message_object.uid) def onPeopleAdded(self, added_ids, thread_id, *args, **kwargs): for user_id in added_ids: - self.ensureJoined(self.getUserIdFromUid(user_id), thread_id) + if user_id == self.client.uid: + self.ensure_i_joined(thread_id) + else: + self.ensureJoined(self.getUserIdFromUid(user_id), thread_id) def onPersonRemoved(self, removed_id, thread_id, *args, **kwargs): - userId = self.getUserIdFromUid(removed_id), - self.write({ - "_type": EVENT, - "data": { - "type": EVENT_JOIN, - "author": userId, + if removed_id == self.client.uid: + self.write({ + "_type": LEFT, "room": thread_id, - } - }) - del self.joined_map["{}--{}".format(userId, thread_id)] + }) + if thread_id in self.my_joined_rooms: + del self.my_joined_rooms[thread_id] + else: + userId = self.getUserIdFromUid(removed_id), + self.write({ + "_type": EVENT, + "data": { + "type": EVENT_JOIN, + "author": userId, + "room": thread_id, + } + }) + map_key = "{}--{}".format(userId, thread_id) + if map_key in self.others_joined_map: + del self.others_joined_map[map_key] def onTitleChange(self, author_id, new_title, thread_id, thread_type, *args, **kwargs): + self.ensure_i_joined(thread_id) if thread_type == ThreadType.GROUP: - self.bridge.write({ + self.write({ "_type": ROOM_INFO_UPDATED, "room": thread_id, "data": {"name": new_title},