#!/usr/bin/env python3 import sys import json import signal import threading import queue import pickle import time import traceback from urllib.parse import unquote as UrlUnquote import base64 import getpass import zlib import fbchat from fbchat.models import * # ---- MESSAGE TYPES ---- # ezbr -> external CONFIGURE = "configure" GET_USER = "get_user" SET_USER_INFO = "set_user_info" SET_ROOM_INFO = "set_room_info" JOIN = "join" INVITE = "invite" LEAVE = "leave" SEARCH = "search" SEND = "send" USER_COMMAND = "user_command" CLOSE = "close" # external -> ezbr SAVE_CONFIG = "save_config" SYSTEM_MESSAGE = "system_message" JOINED = "joined" LEFT = "left" USER_INFO_UPDATED = "user_info_updated" ROOM_INFO_UPDATED = "room_info_updated" EVENT = "event" CACHE_PUT = "cache_put" CACHE_GET = "cache_get" # reply messages # ezbr -> external: all must wait for a reply! # external -> ezbr: only CACHE_GET produces a reply REP_OK = "rep_ok" REP_SEARCH_RESULTS = "rep_search_results" REP_ERROR = "rep_error" # Event types EVENT_JOIN = "join" EVENT_LEAVE = "leave" EVENT_MESSAGE = "message" EVENT_ACTION = "action" def mediaObjectOfURL(url): return { "filename": url.split("?")[0].split("/")[-1], "url": url, } def stripFbLinkPrefix(url): PREFIX = "https://l.facebook.com/l.php?u=" if url[:len(PREFIX)] == PREFIX: return UrlUnquote(url[len(PREFIX):].split('&')[0]) else: return url # ---- MESSENGER CLIENT CLASS THAT HANDLES EVENTS ---- class MessengerBridgeClient(fbchat.Client): def __init__(self, bridge, *args, **kwargs): self.bridge = bridge super(MessengerBridgeClient, self).__init__(*args, **kwargs) def setBridge(self, bridge): self.bridge = bridge ## Redirect all interesting events to Bridge def onMessage(self, *args, **kwargs): self.bridge.onMessage(*args, **kwargs) def onPeopleAdded(self, *args, **kwargs): self.bridge.onPeopleAdded(*args, **kwargs) def onPersonRemoved(self, *args, **kwargs): self.bridge.onPersonRemoved(*args, **kwargs) def onTitleChange(self, *args, **kwargs): self.bridge.onTitleChange(*args, **kwargs) def on2FACode(self, *args, **kwargs): return self.bridge.on2FACode(*args, **kwargs) # ---- SEPARATE THREADS FOR INITIAL SYNC & CLIENT LISTEN ---- class LoginThread(threading.Thread): def __init__(self, bridge, *args, **kwargs): super(LoginThread, self).__init__(*args, **kwargs) self.bridge = bridge def run(self): self.bridge.processLogin() class SyncerThread(threading.Thread): def __init__(self, bridge, thread_queue, *args, **kwargs): super(SyncerThread, self).__init__(*args, **kwargs) self.bridge = bridge self.thread_queue = thread_queue def run(self): while True: thread = self.thread_queue.get(block=True) sys.stderr.write("(python) fb thread: {}\n".format(thread)) self.bridge.setup_joined_thread(thread) class ClientListenThread(threading.Thread): def __init__(self, client, *args, **kwargs): super(ClientListenThread, self).__init__(*args, **kwargs) self.client = client def run(self): sys.stderr.write("(python messenger) Start client.listen()\n") self.client.listen() # ---- MAIN LOOP THAT HANDLES REQUESTS FROM BRIDGE ---- class MessengerBridge: def __init__(self): self.init_backlog_length = 100 self.config = None self.login_in_progress = None # We cache maps between two kinds of identifiers: # - facebook uids of users # - identifiers for the bridge, which are the username when defined (otherwise equal to above) # Generally speaking, the first is referred to as uid whereas the second is just id # THESE MAPS SHOULD NOT BE USED DIRECTLY, instead functions getUserId, getUserIdFromUid and revUserId should be used self.uid_map = {} # map from fb user uid to bridge id self.rev_uid = {} # map fro bridge id to fb user uid # caches the room we (the user of the bridge) have joined (map keys = room uid) self.my_joined_rooms = {} # caches for the people that are in rooms so that we don't send JOINED every time (map keys = "--") self.others_joined_map = {} # queue for thread syncing self.sync_thread_queue = queue.Queue(100) def getUserId(self, user): retval = None if user.url is not None and not "?" in user.url: retval = user.url.split("/")[-1] else: retval = user.uid if user.uid not in self.uid_map: self.uid_map[user.uid] = retval self.rev_uid[retval] = user.uid user_info = { "display_name": user.name, } if user.photo is not None: user_info["avatar"] = mediaObjectOfURL(user.photo) self.write({ "_type": USER_INFO_UPDATED, "user": retval, "data": user_info, }) return retval def getUserIdFromUid(self, uid): if uid in self.uid_map: return self.uid_map[uid] else: user = self.client.fetchUserInfo(uid)[uid] return self.getUserId(user) def revUserId(self, user_id): if user_id not in self.rev_uid: for user in self.client.searchForUsers(user_id): self.getUserId(user) if user_id not in self.rev_uid: raise ValueError("User not found: {}".format(user_id)) return self.rev_uid[user_id] def getUserShortName(self, user): if user.first_name != None: return user.first_name else: return user.name def run(self): self.client = None self.keep_running = True self.cache_gets = {} self.num = 0 self.my_user_id = "" while self.keep_running: try: line = sys.stdin.readline() except KeyboardInterrupt: sys.stderr.write("(python messenger) shutting down") self.close() break sys.stderr.write("(python) reading {}\n".format(line.strip())) cmd = json.loads(line) try: rep = self.handle_cmd(cmd) if rep is None: rep = {} if "_type" not in rep: rep["_type"] = REP_OK except Exception as e: sys.stderr.write("(python) {}\n".format(traceback.format_exc())) rep = { "_type": REP_ERROR, "error": "{}".format(e) } rep["_id"] = cmd["_id"] self.write(rep) def write(self, msg): msgstr = json.dumps(msg) sys.stderr.write("(python) writing {}\n".format(msgstr)) sys.stdout.write(msgstr + "\n") sys.stdout.flush() def system_message(self, msg): self.write({ "_type": SYSTEM_MESSAGE, "value": msg, }) def handle_cmd(self, cmd): ty = cmd["_type"] if ty == CONFIGURE: if self.login_in_progress is None: self.config = cmd["data"] self.login_in_progress = queue.Queue(1) LoginThread(self).start() else: return {"_type": REP_ERROR, "error": "Already logging in (CONFIGURE sent twice)"} elif ty == CLOSE: self.close() elif ty == GET_USER: return {"_type": REP_OK, "user": self.my_user_id} elif ty == JOIN: if self.client is None: pass else: self.ensure_i_joined(cmd["room"]) elif ty == LEAVE: thread_id = cmd["room"] self.client.removeUserFromGroup(self.client.uid, thread_id) if thread_id in self.my_joined_rooms: del self.my_joined_rooms[thread_id] elif ty == INVITE: if cmd["room"] != "": uid = self.revUserId(cmd["user"]) self.client.addUsersToGroup([uid], cmd["room"]) elif ty == SEARCH: users = self.client.searchForUsers(cmd["data"]) rep = [] for user in users: rep.append({ "id": self.getUserId(user), "display_name": user.name, }) return {"_type": REP_SEARCH_RESULTS, "data": rep} elif ty == SEND: event = cmd["data"] if event["type"] in [EVENT_MESSAGE, EVENT_ACTION]: attachments = [] if "attachments" in event and isinstance(event["attachments"], list): for at in event["attachments"]: if "url" in at: attachments.append(at["url"]) else: # TODO sys.stdout.write("Unhandled: attachment without URL") msg = Message(event["text"]) if event["type"] == EVENT_ACTION: msg.text = "* " + event["text"] if event["room"] != "": if len(attachments) > 0: msg_id = self.client.sendRemoteFiles(attachments, message=msg, thread_id=event["room"], thread_type=ThreadType.GROUP) else: msg_id = self.client.send(msg, thread_id=event["room"], thread_type=ThreadType.GROUP) elif event["recipient"] != "": uid = self.revUserId(event["recipient"]) sys.stderr.write("(python) Sending to {}\n".format(uid)) if len(attachments) > 0: msg_id = self.client.sendRemoteFiles(attachments, message=msg, thread_id=uid, thread_type=ThreadType.USER) else: msg_id = self.client.send(msg, thread_id=uid, thread_type=ThreadType.USER) else: return {"_type": REP_ERROR, "error": "Invalid message"} return {"_type": REP_OK, "event_id": msg_id} elif ty == REP_OK and cmd["_id"] in self.cache_gets: self.cache_gets[cmd["_id"]].put(cmd["value"]) elif ty == USER_COMMAND: self.handleUserCommand(cmd["value"]) else: return {"_type": REP_ERROR, "error": "Not implemented"} def close(self): self.keep_running = False self.client.stopListening() def cache_get(self, key): self.num += 1 num = self.num q = queue.Queue(1) self.cache_gets[num] = q self.write({"_type": CACHE_GET, "_id": num, "key": key}) try: rep = q.get(block=True, timeout=30) except queue.Empty: rep = "" del self.cache_gets[num] return rep def cache_put(self, key, value): self.write({"_type": CACHE_PUT, "key": key, "value": value}) # ---- Process login (called from separate thread) ---- def processLogin(self): self.init_backlog_length = int(self.config["initial_backlog"]) has_pickle = "client_pickle" in self.config and len(self.config["client_pickle"]) > 0 if has_pickle: data = base64.b64decode(self.config["client_pickle"]) data = zlib.decompress(data) self.client = pickle.loads(data) else: email, password = self.config["email"], self.config["password"] self.client = MessengerBridgeClient(bridge=self, email=email, password=password, max_tries=1) if not self.client.isLoggedIn(): self.system_message("Unable to login (invalid pickle? dunno)") else: self.system_message("Login complete, will now sync threads.") if not has_pickle: self.client.setBridge(None) data = pickle.dumps(self.client) data = zlib.compress(data) self.config["client_pickle"] = base64.b64encode(data).decode('ascii') self.write({"_type": SAVE_CONFIG, "data": self.config}) self.client.setBridge(self) self.my_user_id = self.getUserIdFromUid(self.client.uid) threads = self.client.fetchThreadList(limit=10) # ensure we have a correct mapping for bridged user IDs to fb uids # (this should be fast) for thread in threads: if thread.type == ThreadType.USER: self.getUserId(thread) SyncerThread(self, self.sync_thread_queue).start() for thread in reversed(threads): self.sync_thread_queue.put(thread) ClientListenThread(self.client).start() self.login_in_progress = None # ---- Info sync ---- def ensure_i_joined(self, thread_id): if thread_id not in self.my_joined_rooms: self.my_joined_rooms[thread_id] = True thread = self.client.fetchThreadInfo(thread_id)[thread_id] self.sync_thread_queue.put(thread) def setup_joined_thread(self, thread): sys.stderr.write("(python) setup_joined_thread {}".format(thread)) if thread.type == ThreadType.GROUP: members = self.client.fetchAllUsersFromThreads([thread]) self.write({ "_type": JOINED, "room": thread.uid, }) self.send_room_info(thread, members) self.send_room_members(thread, members) self.backlog_room(thread) def send_room_info(self, thread, members): members.sort(key=lambda m: m.uid) room_info = {} if thread.name is not None: room_info["name"] = thread.name else: who = [m for m in members if m.uid != self.client.uid] if len(who) > 3: room_info["name"] = ", ".join([self.getUserShortName(m) for m in who[:3]] + ["..."]) else: room_info["name"] = ", ".join([self.getUserShortName(m) for m in who]) if thread.photo is not None: room_info["picture"] = mediaObjectOfURL(thread.photo) else: for m in members: if m.uid != self.client.uid and m.photo is not None: room_info["picture"] = mediaObjectOfURL(m.photo) break self.write({ "_type": ROOM_INFO_UPDATED, "room": thread.uid, "data": room_info, }) def send_room_members(self, thread, members): for member in members: sys.stderr.write("(python) fb thread member: {}\n".format(member)) self.ensureJoined(self.getUserId(member), thread.uid) def backlog_room(self, thread): prev_last_seen = self.cache_get("last_seen_%s"%thread.uid) if prev_last_seen == "": prev_last_seen = None messages = [] found = False while not found: before = None if len(messages) > 0: before = messages[-1].timestamp page = self.client.fetchThreadMessages(thread.uid, before=before, limit=20) for m in page: if m.uid == prev_last_seen or len(messages) > self.init_backlog_length: found = True break else: messages.append(m) for m in reversed(messages): if m.text is None: m.text = "" m.text = "[{}] {}".format( time.strftime("%Y-%m-%d %H:%M %Z", time.localtime(float(m.timestamp)/1000)).strip(), m.text) self.onMessage(thread_id=thread.uid, thread_type=thread.type, message_object=m) def ensureJoined(self, userId, room): key = "{}--{}".format(userId, room) if not key in self.others_joined_map: self.write({ "_type": EVENT, "data": { "type": EVENT_JOIN, "author": userId, "room": room, } }) self.others_joined_map[key] = True # ---- Event handlers ---- def onMessage(self, thread_id, thread_type, message_object, **kwargs): self.ensure_i_joined(thread_id) if message_object.author == self.client.uid: # Ignore our own messages return sys.stderr.write("(python messenger) Got message: {}\n".format(message_object)) author = self.getUserIdFromUid(message_object.author) event = { "id": message_object.uid, "type": EVENT_MESSAGE, "author": author, "text": message_object.text, "attachments": [] } if event["text"] is None: event["text"] = "" for at in message_object.attachments: if isinstance(at, ImageAttachment): try: full_url = self.client.fetchImageUrl(at.uid) except: time.sleep(1) full_url = self.client.fetchImageUrl(at.uid) event["attachments"].append({ "filename": full_url.split("?")[0].split("/")[-1], "url": full_url, "image_size": { "width": at.width, "height": at.height, }, }) elif isinstance(at, FileAttachment): url = stripFbLinkPrefix(at.url) event["attachments"].append({ "filename": at.name, "url": url, }) elif isinstance(at, AudioAttachment): url = stripFbLinkPrefix(at.url) event["attachments"].append({ "filename": at.filename, "url": url, }) elif isinstance(at, ShareAttachment): event["text"] += "\n{}\n{}".format(at.description, at.url) else: event["text"] += "\nUnhandled attachment: {}".format(at) if isinstance(message_object.sticker, Sticker): stk = message_object.sticker event["attachments"].append({ "filename": stk.label, "url": stk.url, "image_size": { "width": stk.width, "height": stk.height, }, }) if thread_type == ThreadType.GROUP: event["room"] = thread_id self.ensureJoined(author, thread_id) if event["text"] != "" or len(event["attachments"]) > 0: self.write({"_type": EVENT, "data": event}) self.cache_put("last_seen_%s"%thread_id, message_object.uid) def onPeopleAdded(self, added_ids, thread_id, *args, **kwargs): for user_id in added_ids: if user_id == self.client.uid: self.ensure_i_joined(thread_id) else: self.ensureJoined(self.getUserIdFromUid(user_id), thread_id) def onPersonRemoved(self, removed_id, thread_id, *args, **kwargs): if removed_id == self.client.uid: self.write({ "_type": LEFT, "room": thread_id, }) if thread_id in self.my_joined_rooms: del self.my_joined_rooms[thread_id] else: userId = self.getUserIdFromUid(removed_id), self.write({ "_type": EVENT, "data": { "type": EVENT_JOIN, "author": userId, "room": thread_id, } }) map_key = "{}--{}".format(userId, thread_id) if map_key in self.others_joined_map: del self.others_joined_map[map_key] def onTitleChange(self, author_id, new_title, thread_id, thread_type, *args, **kwargs): self.ensure_i_joined(thread_id) if thread_type == ThreadType.GROUP: self.write({ "_type": ROOM_INFO_UPDATED, "room": thread_id, "data": {"name": new_title}, }) def on2FACode(self, *args, **kwargs): if self.login_in_progress is None: self.system_message("Facebook messenger requests 2 factor authentication, but we have a bug so that won't work.") return None else: self.system_message("Facebook messenger requests 2 factor authentication. Enter it by saying: cmd messenger 2fa (replace messenger by your account name if you have several messenger accounts)") uc = self.login_in_progress.get(block=True) return uc["2fa_code"] def handleUserCommand(self, cmd): cmd = cmd.split(' ') if cmd[0] == "2fa": if self.login_in_progress is not None: self.login_in_progress.put({"2fa_code": cmd[1]}) else: self.system_message("2FA code not required at this point.") else: self.system_message("Invalid user command.") # ---- CLI ---- def createClientPickle(): email = input("Email address of Facebook account: ") password = getpass.getpass() client = MessengerBridgeClient(email, password, max_tries=1) if not client.isLoggedIn(): print("Could not log in (why???)") print("Still creating pickle though, maybe it will work after login was authorized?") print("") data = pickle.dumps(client) data = zlib.compress(data) data = base64.b64encode(data).decode('ascii') print(data) if __name__ == "__main__": if "create_client_pickle" in sys.argv: createClientPickle() else: bridge = MessengerBridge() bridge.run()