From 018f4a751ac4bff9113874666a92b4c5d8679af3 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Sun, 1 Mar 2020 13:27:29 +0100 Subject: [PATCH] Read backlog; handle messages in the correct order --- connector/external/external.go | 17 +++++++++++++++- external/messenger.py | 36 +++++++++++++++++++++++++++++++++- 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/connector/external/external.go b/connector/external/external.go index 741802c..9a4137d 100644 --- a/connector/external/external.go +++ b/connector/external/external.go @@ -86,6 +86,7 @@ type External struct { generation int proc *exec.Cmd + handlerChan chan *extMessageWithData counter uint64 inflightRequests map[uint64]chan *extMessageWithData lock sync.Mutex @@ -110,6 +111,9 @@ func (ext *External) Configure(c Configuration) error { ext.generation += 1 + ext.handlerChan = make(chan *extMessageWithData) + go ext.handlerLoop(ext.generation) + err = ext.setupProc() if err != nil { return err @@ -254,7 +258,17 @@ func (ext *External) recvLoop() { } }() } else { - go ext.handleCmd(&msg) + ext.handlerChan <- &msg + } + } +} + +func (ext *External) handlerLoop(generation int) { + for ext.handlerChan != nil && ext.generation == generation { + select { + case msg := <-ext.handlerChan: + ext.handleCmd(msg) + case <-time.After(10 * time.Second): } } } @@ -311,6 +325,7 @@ func (ext *External) Close() { ext.recv = nil ext.send = nil ext.sendJson = nil + ext.handlerChan = nil go func() { time.Sleep(10 * time.Second) diff --git a/external/messenger.py b/external/messenger.py index 6ec6e9c..792a356 100755 --- a/external/messenger.py +++ b/external/messenger.py @@ -141,7 +141,25 @@ class InitialSyncThread(threading.Thread): }) def backlog_room(self, thread): - pass # TODO + prev_last_seen = self.bridge.cache_get("last_seen_%s"%thread.uid) + if prev_last_seen == "": + messages = self.client.fetchThreadMessages(thread.uid, limit=100) + else: + messages = [] + found = False + while not Found: + before = None + if len(messages) > 0: + before = messages[-1].timestamp + page = self.client.fetchThreadMessages(thread.uid, before=before, limit=20) + for m in page: + if m.uid == prev_last_seen: + found = True + break + else: + messages.append(m) + for m in reversed(messages): + self.bridge.handleMessage(thread, m) @@ -150,11 +168,13 @@ class InitialSyncThread(threading.Thread): class MessengerBridge: def __init__(self): self.rev_uid = {} + self.uid_map = {} def getUserId(self, user): if user.url is not None and not "?" in user.url: user_id = user.url.split("/")[-1] self.rev_uid[user_id] = user.uid + self.uid_map[user.uid] = user_id return user_id else: return user.uid @@ -274,6 +294,20 @@ class MessengerBridge: del self.cache_gets[num] return rep + def handleMessage(self, thread, m): + author = m.author + if author in self.uid_map: + author = self.uid_map[m.author] + + event = { + "type": EVENT_MESSAGE, + "author": author, + "text": m.text, + } + if thread.type == ThreadType.GROUP: + event["room"] = thread.uid + self.write({"_type": EVENT, "data": event}) + if __name__ == "__main__": bridge = MessengerBridge()