diff --git a/account.go b/account.go index 44173c6..b4e7118 100644 --- a/account.go +++ b/account.go @@ -128,6 +128,15 @@ func RemoveAccount(mxUser string, name string) { } } +func CloseAllAcountsForShutdown() { + accountsLock.Lock() + for _, accl := range registeredAccounts { + for _, acct := range accl { + acct.Conn.Close() + } + } +} + // ---- func SaveDbAccounts(mxid string, key *[32]byte) { diff --git a/external/messenger.py b/external/messenger.py index 792a356..6af2ec2 100755 --- a/external/messenger.py +++ b/external/messenger.py @@ -6,6 +6,7 @@ import signal import threading import queue import pickle +import time import hashlib @@ -47,8 +48,6 @@ EVENT_MESSAGE = "message" EVENT_ACTION = "action" -# ---- MESSENGER CLIENT CLASS THAT HANDLES EVENTS ---- - def mediaObjectOfURL(url): return { @@ -57,12 +56,27 @@ def mediaObjectOfURL(url): } -# class MessengerBridgeClient(fbchat.Client): -# def __init__(self, bridge, *args, **kwargs): -# super(MessengerBridgeClient, self).__init__(*args, **kwargs) -# -# # TODO: handle events +# ---- MESSENGER CLIENT CLASS THAT HANDLES EVENTS ---- +class MessengerBridgeClient(fbchat.Client): + def __init__(self, *args, **kwargs): + super(MessengerBridgeClient, self).__init__(*args, **kwargs) + self.bridge = None + + def setBridge(self, bridge): + self.bridge = bridge + + ## Redirect all interesting events to Bridge + def onMessage(self, *args, **kwargs): + self.bridge.onMessage(*args, **kwargs) + def onPeopleAdded(self, *args, **kwargs): + self.bridge.onPeopleAdded(*args, **kwargs) + def onPersonRemoved(self, *args, **kwargs): + self.bridge.onPersonRemoved(*args, **kwargs) + def onTitleChange(self, *args, **kwargs): + self.bridge.onTitleChange(*args, **kwargs) + +# ---- SEPARATE THREADS FOR INITIAL SYNC & CLIENT LISTEN ---- class InitialSyncThread(threading.Thread): def __init__(self, client, bridge, *args, **kwargs): @@ -83,7 +97,7 @@ class InitialSyncThread(threading.Thread): "_type": JOINED, "room": thread.uid, }) - + self.send_room_info(thread, members) self.send_room_members(thread, members) elif thread.type == ThreadType.USER: @@ -147,7 +161,7 @@ class InitialSyncThread(threading.Thread): else: messages = [] found = False - while not Found: + while not found: before = None if len(messages) > 0: before = messages[-1].timestamp @@ -159,8 +173,19 @@ class InitialSyncThread(threading.Thread): else: messages.append(m) for m in reversed(messages): - self.bridge.handleMessage(thread, m) + self.bridge.onMessage(thread_id=thread.uid, + thread_type=thread.type, + message_object=m) +class ClientListenThread(threading.Thread): + def __init__(self, client, *args, **kwargs): + super(ClientListenThread, self).__init__(*args, **kwargs) + + self.client = client + + def run(self): + sys.stderr.write("Start client.listen()\n") + self.client.listen() # ---- MAIN LOOP THAT HANDLES REQUESTS FROM BRIDGE ---- @@ -175,9 +200,17 @@ class MessengerBridge: user_id = user.url.split("/")[-1] self.rev_uid[user_id] = user.uid self.uid_map[user.uid] = user_id - return user_id else: - return user.uid + self.uid_map[user.uid] = user.uid + + return self.uid_map[user.uid] + + def getUserIdFromUid(self, uid): + if uid in self.uid_map: + return self.uid_map[uid] + else: + user = self.client.fetchUserInfo(uid)[uid] + return self.getUserId(user) def revUserId(self, user_id): if user_id in self.rev_uid: @@ -199,7 +232,14 @@ class MessengerBridge: self.num = 0 while self.keep_running: - line = sys.stdin.readline() + try: + line = sys.stdin.readline() + except KeyboardInterrupt: + sys.stderr.write("(messenger) shutting down") + self.close() + time.sleep(5) + sys.exit(0) + sys.stderr.write("(python) reading {}\n".format(line.strip())) cmd = json.loads(line) @@ -239,7 +279,7 @@ class MessengerBridge: if self.client is None: email, password = cmd["data"]["email"], cmd["data"]["password"] - self.client = fbchat.Client(email=email, password=password, max_tries=1) + self.client = MessengerBridgeClient(email=email, password=password, max_tries=1) if self.client.isLoggedIn(): try: @@ -249,13 +289,16 @@ class MessengerBridge: except: pass + self.client.setBridge(self) InitialSyncThread(self.client, self).start() + ClientListenThread(self.client).start() elif ty == CLOSE: - self.keep_running = False + self.close() elif ty == GET_USER: - return {"_type": REP_OK, "user": self.client.uid} + userId = self.getUserIdFromUid(self.client.uid) + return {"_type": REP_OK, "user": userId} elif ty == INVITE and cmd["room"] == "": return {"_type": REP_OK} @@ -284,6 +327,10 @@ class MessengerBridge: else: return {"_type": REP_ERROR, "error": "Not implemented"} + def close(self): + self.keep_running = False + self.client.stopListening() + def cache_get(self, key): self.num += 1 num = self.num @@ -294,19 +341,33 @@ class MessengerBridge: del self.cache_gets[num] return rep - def handleMessage(self, thread, m): - author = m.author - if author in self.uid_map: - author = self.uid_map[m.author] + def cache_put(self, key, value): + self.write({"_type": CACHE_PUT, "key": key, "value": value}) - event = { - "type": EVENT_MESSAGE, - "author": author, - "text": m.text, - } - if thread.type == ThreadType.GROUP: - event["room"] = thread.uid - self.write({"_type": EVENT, "data": event}) + def onMessage(self, thread_id, thread_type, message_object, **kwargs): + if message_object.author == self.client.uid: + # Ignore our own messages + return + + author = self.getUserIdFromUid(message_object.author) + + event = { + "type": EVENT_MESSAGE, + "author": author, + "text": message_object.text, + } + if thread_type == ThreadType.GROUP: + event["room"] = thread_id + self.write({"_type": EVENT, "data": event}) + + self.cache_put("last_seen_%s"%thread_id, message_object.uid) + + def onPeopleAdded(self, *args, **kwargs): + pass + def onPersonRemoved(self, *args, **kwargs): + pass + def onTitleChange(self, *args, **kwargs): + pass if __name__ == "__main__": diff --git a/main.go b/main.go index 514bd4c..d6597a2 100644 --- a/main.go +++ b/main.go @@ -7,6 +7,8 @@ import ( "flag" "io/ioutil" "os" + "os/signal" + "syscall" log "github.com/sirupsen/logrus" "gopkg.in/yaml.v2" @@ -174,6 +176,9 @@ func main() { // Start appservice and web management interface errch := make(chan error) + sigch := make(chan os.Signal) + signal.Notify(sigch, os.Interrupt, syscall.SIGTERM) + err = StartAppService(errch) if err != nil { log.Fatal(err) @@ -181,9 +186,16 @@ func main() { StartWeb(errch) - // Wait for an error somewhere - err = <-errch - if err != nil { - log.Fatal(err) + // Wait for an error somewhere or interrupt signal + select { + case err = <-errch: + if err != nil { + log.Error(err) + } + case sig := <-sigch: + log.Warnf("Got signal %s", sig.String()) } + + log.Warn("Shuttind down") + CloseAllAcountsForShutdown() }