Read backlog; handle messages in the correct order
This commit is contained in:
parent
810e75a34d
commit
018f4a751a
2 changed files with 51 additions and 2 deletions
17
connector/external/external.go
vendored
17
connector/external/external.go
vendored
|
@ -86,6 +86,7 @@ type External struct {
|
||||||
generation int
|
generation int
|
||||||
proc *exec.Cmd
|
proc *exec.Cmd
|
||||||
|
|
||||||
|
handlerChan chan *extMessageWithData
|
||||||
counter uint64
|
counter uint64
|
||||||
inflightRequests map[uint64]chan *extMessageWithData
|
inflightRequests map[uint64]chan *extMessageWithData
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
|
@ -110,6 +111,9 @@ func (ext *External) Configure(c Configuration) error {
|
||||||
|
|
||||||
ext.generation += 1
|
ext.generation += 1
|
||||||
|
|
||||||
|
ext.handlerChan = make(chan *extMessageWithData)
|
||||||
|
go ext.handlerLoop(ext.generation)
|
||||||
|
|
||||||
err = ext.setupProc()
|
err = ext.setupProc()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -254,7 +258,17 @@ func (ext *External) recvLoop() {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
} else {
|
} else {
|
||||||
go ext.handleCmd(&msg)
|
ext.handlerChan <- &msg
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ext *External) handlerLoop(generation int) {
|
||||||
|
for ext.handlerChan != nil && ext.generation == generation {
|
||||||
|
select {
|
||||||
|
case msg := <-ext.handlerChan:
|
||||||
|
ext.handleCmd(msg)
|
||||||
|
case <-time.After(10 * time.Second):
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -311,6 +325,7 @@ func (ext *External) Close() {
|
||||||
ext.recv = nil
|
ext.recv = nil
|
||||||
ext.send = nil
|
ext.send = nil
|
||||||
ext.sendJson = nil
|
ext.sendJson = nil
|
||||||
|
ext.handlerChan = nil
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(10 * time.Second)
|
time.Sleep(10 * time.Second)
|
||||||
|
|
36
external/messenger.py
vendored
36
external/messenger.py
vendored
|
@ -141,7 +141,25 @@ class InitialSyncThread(threading.Thread):
|
||||||
})
|
})
|
||||||
|
|
||||||
def backlog_room(self, thread):
|
def backlog_room(self, thread):
|
||||||
pass # TODO
|
prev_last_seen = self.bridge.cache_get("last_seen_%s"%thread.uid)
|
||||||
|
if prev_last_seen == "":
|
||||||
|
messages = self.client.fetchThreadMessages(thread.uid, limit=100)
|
||||||
|
else:
|
||||||
|
messages = []
|
||||||
|
found = False
|
||||||
|
while not Found:
|
||||||
|
before = None
|
||||||
|
if len(messages) > 0:
|
||||||
|
before = messages[-1].timestamp
|
||||||
|
page = self.client.fetchThreadMessages(thread.uid, before=before, limit=20)
|
||||||
|
for m in page:
|
||||||
|
if m.uid == prev_last_seen:
|
||||||
|
found = True
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
messages.append(m)
|
||||||
|
for m in reversed(messages):
|
||||||
|
self.bridge.handleMessage(thread, m)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -150,11 +168,13 @@ class InitialSyncThread(threading.Thread):
|
||||||
class MessengerBridge:
|
class MessengerBridge:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.rev_uid = {}
|
self.rev_uid = {}
|
||||||
|
self.uid_map = {}
|
||||||
|
|
||||||
def getUserId(self, user):
|
def getUserId(self, user):
|
||||||
if user.url is not None and not "?" in user.url:
|
if user.url is not None and not "?" in user.url:
|
||||||
user_id = user.url.split("/")[-1]
|
user_id = user.url.split("/")[-1]
|
||||||
self.rev_uid[user_id] = user.uid
|
self.rev_uid[user_id] = user.uid
|
||||||
|
self.uid_map[user.uid] = user_id
|
||||||
return user_id
|
return user_id
|
||||||
else:
|
else:
|
||||||
return user.uid
|
return user.uid
|
||||||
|
@ -274,6 +294,20 @@ class MessengerBridge:
|
||||||
del self.cache_gets[num]
|
del self.cache_gets[num]
|
||||||
return rep
|
return rep
|
||||||
|
|
||||||
|
def handleMessage(self, thread, m):
|
||||||
|
author = m.author
|
||||||
|
if author in self.uid_map:
|
||||||
|
author = self.uid_map[m.author]
|
||||||
|
|
||||||
|
event = {
|
||||||
|
"type": EVENT_MESSAGE,
|
||||||
|
"author": author,
|
||||||
|
"text": m.text,
|
||||||
|
}
|
||||||
|
if thread.type == ThreadType.GROUP:
|
||||||
|
event["room"] = thread.uid
|
||||||
|
self.write({"_type": EVENT, "data": event})
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
bridge = MessengerBridge()
|
bridge = MessengerBridge()
|
||||||
|
|
Loading…
Reference in a new issue