Now receiving messages from messenger as well (rudimentary)

This commit is contained in:
Alex 2020-03-01 14:14:18 +01:00
parent 018f4a751a
commit 6055544313
3 changed files with 114 additions and 32 deletions

View file

@ -128,6 +128,15 @@ func RemoveAccount(mxUser string, name string) {
} }
} }
func CloseAllAcountsForShutdown() {
accountsLock.Lock()
for _, accl := range registeredAccounts {
for _, acct := range accl {
acct.Conn.Close()
}
}
}
// ---- // ----
func SaveDbAccounts(mxid string, key *[32]byte) { func SaveDbAccounts(mxid string, key *[32]byte) {

103
external/messenger.py vendored
View file

@ -6,6 +6,7 @@ import signal
import threading import threading
import queue import queue
import pickle import pickle
import time
import hashlib import hashlib
@ -47,8 +48,6 @@ EVENT_MESSAGE = "message"
EVENT_ACTION = "action" EVENT_ACTION = "action"
# ---- MESSENGER CLIENT CLASS THAT HANDLES EVENTS ----
def mediaObjectOfURL(url): def mediaObjectOfURL(url):
return { return {
@ -57,12 +56,27 @@ def mediaObjectOfURL(url):
} }
# class MessengerBridgeClient(fbchat.Client): # ---- MESSENGER CLIENT CLASS THAT HANDLES EVENTS ----
# def __init__(self, bridge, *args, **kwargs):
# super(MessengerBridgeClient, self).__init__(*args, **kwargs)
#
# # TODO: handle events
class MessengerBridgeClient(fbchat.Client):
def __init__(self, *args, **kwargs):
super(MessengerBridgeClient, self).__init__(*args, **kwargs)
self.bridge = None
def setBridge(self, bridge):
self.bridge = bridge
## Redirect all interesting events to Bridge
def onMessage(self, *args, **kwargs):
self.bridge.onMessage(*args, **kwargs)
def onPeopleAdded(self, *args, **kwargs):
self.bridge.onPeopleAdded(*args, **kwargs)
def onPersonRemoved(self, *args, **kwargs):
self.bridge.onPersonRemoved(*args, **kwargs)
def onTitleChange(self, *args, **kwargs):
self.bridge.onTitleChange(*args, **kwargs)
# ---- SEPARATE THREADS FOR INITIAL SYNC & CLIENT LISTEN ----
class InitialSyncThread(threading.Thread): class InitialSyncThread(threading.Thread):
def __init__(self, client, bridge, *args, **kwargs): def __init__(self, client, bridge, *args, **kwargs):
@ -147,7 +161,7 @@ class InitialSyncThread(threading.Thread):
else: else:
messages = [] messages = []
found = False found = False
while not Found: while not found:
before = None before = None
if len(messages) > 0: if len(messages) > 0:
before = messages[-1].timestamp before = messages[-1].timestamp
@ -159,8 +173,19 @@ class InitialSyncThread(threading.Thread):
else: else:
messages.append(m) messages.append(m)
for m in reversed(messages): for m in reversed(messages):
self.bridge.handleMessage(thread, m) self.bridge.onMessage(thread_id=thread.uid,
thread_type=thread.type,
message_object=m)
class ClientListenThread(threading.Thread):
def __init__(self, client, *args, **kwargs):
super(ClientListenThread, self).__init__(*args, **kwargs)
self.client = client
def run(self):
sys.stderr.write("Start client.listen()\n")
self.client.listen()
# ---- MAIN LOOP THAT HANDLES REQUESTS FROM BRIDGE ---- # ---- MAIN LOOP THAT HANDLES REQUESTS FROM BRIDGE ----
@ -175,9 +200,17 @@ class MessengerBridge:
user_id = user.url.split("/")[-1] user_id = user.url.split("/")[-1]
self.rev_uid[user_id] = user.uid self.rev_uid[user_id] = user.uid
self.uid_map[user.uid] = user_id self.uid_map[user.uid] = user_id
return user_id
else: else:
return user.uid self.uid_map[user.uid] = user.uid
return self.uid_map[user.uid]
def getUserIdFromUid(self, uid):
if uid in self.uid_map:
return self.uid_map[uid]
else:
user = self.client.fetchUserInfo(uid)[uid]
return self.getUserId(user)
def revUserId(self, user_id): def revUserId(self, user_id):
if user_id in self.rev_uid: if user_id in self.rev_uid:
@ -199,7 +232,14 @@ class MessengerBridge:
self.num = 0 self.num = 0
while self.keep_running: while self.keep_running:
try:
line = sys.stdin.readline() line = sys.stdin.readline()
except KeyboardInterrupt:
sys.stderr.write("(messenger) shutting down")
self.close()
time.sleep(5)
sys.exit(0)
sys.stderr.write("(python) reading {}\n".format(line.strip())) sys.stderr.write("(python) reading {}\n".format(line.strip()))
cmd = json.loads(line) cmd = json.loads(line)
@ -239,7 +279,7 @@ class MessengerBridge:
if self.client is None: if self.client is None:
email, password = cmd["data"]["email"], cmd["data"]["password"] email, password = cmd["data"]["email"], cmd["data"]["password"]
self.client = fbchat.Client(email=email, password=password, max_tries=1) self.client = MessengerBridgeClient(email=email, password=password, max_tries=1)
if self.client.isLoggedIn(): if self.client.isLoggedIn():
try: try:
@ -249,13 +289,16 @@ class MessengerBridge:
except: except:
pass pass
self.client.setBridge(self)
InitialSyncThread(self.client, self).start() InitialSyncThread(self.client, self).start()
ClientListenThread(self.client).start()
elif ty == CLOSE: elif ty == CLOSE:
self.keep_running = False self.close()
elif ty == GET_USER: elif ty == GET_USER:
return {"_type": REP_OK, "user": self.client.uid} userId = self.getUserIdFromUid(self.client.uid)
return {"_type": REP_OK, "user": userId}
elif ty == INVITE and cmd["room"] == "": elif ty == INVITE and cmd["room"] == "":
return {"_type": REP_OK} return {"_type": REP_OK}
@ -284,6 +327,10 @@ class MessengerBridge:
else: else:
return {"_type": REP_ERROR, "error": "Not implemented"} return {"_type": REP_ERROR, "error": "Not implemented"}
def close(self):
self.keep_running = False
self.client.stopListening()
def cache_get(self, key): def cache_get(self, key):
self.num += 1 self.num += 1
num = self.num num = self.num
@ -294,20 +341,34 @@ class MessengerBridge:
del self.cache_gets[num] del self.cache_gets[num]
return rep return rep
def handleMessage(self, thread, m): def cache_put(self, key, value):
author = m.author self.write({"_type": CACHE_PUT, "key": key, "value": value})
if author in self.uid_map:
author = self.uid_map[m.author] def onMessage(self, thread_id, thread_type, message_object, **kwargs):
if message_object.author == self.client.uid:
# Ignore our own messages
return
author = self.getUserIdFromUid(message_object.author)
event = { event = {
"type": EVENT_MESSAGE, "type": EVENT_MESSAGE,
"author": author, "author": author,
"text": m.text, "text": message_object.text,
} }
if thread.type == ThreadType.GROUP: if thread_type == ThreadType.GROUP:
event["room"] = thread.uid event["room"] = thread_id
self.write({"_type": EVENT, "data": event}) self.write({"_type": EVENT, "data": event})
self.cache_put("last_seen_%s"%thread_id, message_object.uid)
def onPeopleAdded(self, *args, **kwargs):
pass
def onPersonRemoved(self, *args, **kwargs):
pass
def onTitleChange(self, *args, **kwargs):
pass
if __name__ == "__main__": if __name__ == "__main__":
bridge = MessengerBridge() bridge = MessengerBridge()

18
main.go
View file

@ -7,6 +7,8 @@ import (
"flag" "flag"
"io/ioutil" "io/ioutil"
"os" "os"
"os/signal"
"syscall"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
@ -174,6 +176,9 @@ func main() {
// Start appservice and web management interface // Start appservice and web management interface
errch := make(chan error) errch := make(chan error)
sigch := make(chan os.Signal)
signal.Notify(sigch, os.Interrupt, syscall.SIGTERM)
err = StartAppService(errch) err = StartAppService(errch)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
@ -181,9 +186,16 @@ func main() {
StartWeb(errch) StartWeb(errch)
// Wait for an error somewhere // Wait for an error somewhere or interrupt signal
err = <-errch select {
case err = <-errch:
if err != nil { if err != nil {
log.Fatal(err) log.Error(err)
} }
case sig := <-sigch:
log.Warnf("Got signal %s", sig.String())
}
log.Warn("Shuttind down")
CloseAllAcountsForShutdown()
} }