476 lines
15 KiB
Python
Executable file
476 lines
15 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
import sys
|
|
import json
|
|
import signal
|
|
import threading
|
|
import queue
|
|
import pickle
|
|
import time
|
|
import traceback
|
|
from urllib.parse import unquote as UrlUnquote
|
|
|
|
import hashlib
|
|
|
|
import fbchat
|
|
from fbchat.models import *
|
|
|
|
# ---- MESSAGE TYPES ----
|
|
|
|
# ezbr -> external
|
|
CONFIGURE = "configure"
|
|
GET_USER = "get_user"
|
|
SET_USER_INFO = "set_user_info"
|
|
SET_ROOM_INFO = "set_room_info"
|
|
JOIN = "join"
|
|
INVITE = "invite"
|
|
LEAVE = "leave"
|
|
SEND = "send"
|
|
CLOSE = "close"
|
|
|
|
# external -> ezbr
|
|
JOINED = "joined"
|
|
LEFT = "left"
|
|
USER_INFO_UPDATED = "user_info_updated"
|
|
ROOM_INFO_UPDATED = "room_info_updated"
|
|
EVENT = "event"
|
|
CACHE_PUT = "cache_put"
|
|
CACHE_GET = "cache_get"
|
|
|
|
# reply messages
|
|
# ezbr -> external: all must wait for a reply!
|
|
# external -> ezbr: only CACHE_GET produces a reply
|
|
REP_OK = "rep_ok"
|
|
REP_ERROR = "rep_error"
|
|
|
|
# Event types
|
|
EVENT_JOIN = "join"
|
|
EVENT_LEAVE = "leave"
|
|
EVENT_MESSAGE = "message"
|
|
EVENT_ACTION = "action"
|
|
|
|
|
|
|
|
def mediaObjectOfURL(url):
|
|
return {
|
|
"filename": url.split("?")[0].split("/")[-1],
|
|
"url": url,
|
|
}
|
|
|
|
def stripFbLinkPrefix(url):
|
|
PREFIX = "https://l.facebook.com/l.php?u="
|
|
if url[:len(PREFIX)] == PREFIX:
|
|
return UrlUnquote(url[len(PREFIX):].split('&')[0])
|
|
else:
|
|
return url
|
|
|
|
# ---- MESSENGER CLIENT CLASS THAT HANDLES EVENTS ----
|
|
|
|
class MessengerBridgeClient(fbchat.Client):
|
|
def __init__(self, *args, **kwargs):
|
|
super(MessengerBridgeClient, self).__init__(*args, **kwargs)
|
|
self.bridge = None
|
|
|
|
def setBridge(self, bridge):
|
|
self.bridge = bridge
|
|
|
|
## Redirect all interesting events to Bridge
|
|
def onMessage(self, *args, **kwargs):
|
|
self.bridge.onMessage(*args, **kwargs)
|
|
def onPeopleAdded(self, *args, **kwargs):
|
|
self.bridge.onPeopleAdded(*args, **kwargs)
|
|
def onPersonRemoved(self, *args, **kwargs):
|
|
self.bridge.onPersonRemoved(*args, **kwargs)
|
|
def onTitleChange(self, *args, **kwargs):
|
|
self.bridge.onTitleChange(*args, **kwargs)
|
|
|
|
# ---- SEPARATE THREADS FOR INITIAL SYNC & CLIENT LISTEN ----
|
|
|
|
class InitialSyncThread(threading.Thread):
|
|
def __init__(self, client, bridge, threads, *args, **kwargs):
|
|
super(InitialSyncThread, self).__init__(*args, **kwargs)
|
|
|
|
self.client = client
|
|
self.bridge = bridge
|
|
self.threads = threads
|
|
|
|
def run(self):
|
|
sys.stderr.write("fb thread list: {}\n".format(self.threads))
|
|
|
|
for thread in self.threads:
|
|
sys.stderr.write("fb thread: {}\n".format(thread))
|
|
if thread.type == ThreadType.GROUP:
|
|
members = self.client.fetchAllUsersFromThreads([thread])
|
|
|
|
self.bridge.write({
|
|
"_type": JOINED,
|
|
"room": thread.uid,
|
|
})
|
|
|
|
self.bridge.send_room_info(thread, members)
|
|
self.bridge.send_room_members(thread, members)
|
|
|
|
self.backlog_room(thread)
|
|
|
|
|
|
def backlog_room(self, thread):
|
|
prev_last_seen = self.bridge.cache_get("last_seen_%s"%thread.uid)
|
|
if prev_last_seen == "":
|
|
prev_last_seen = None
|
|
|
|
messages = []
|
|
found = False
|
|
while not found:
|
|
before = None
|
|
if len(messages) > 0:
|
|
before = messages[-1].timestamp
|
|
page = self.client.fetchThreadMessages(thread.uid, before=before, limit=20)
|
|
for m in page:
|
|
if m.uid == prev_last_seen or len(messages) > self.bridge.init_backlog_length:
|
|
found = True
|
|
break
|
|
else:
|
|
messages.append(m)
|
|
|
|
for m in reversed(messages):
|
|
if m.text is None:
|
|
m.text = ""
|
|
m.text = "[{}] {}".format(
|
|
time.strftime("%Y-%m-%d %H:%M %Z", time.localtime(float(m.timestamp)/1000)).strip(),
|
|
m.text)
|
|
self.bridge.onMessage(thread_id=thread.uid,
|
|
thread_type=thread.type,
|
|
message_object=m)
|
|
|
|
class ClientListenThread(threading.Thread):
|
|
def __init__(self, client, *args, **kwargs):
|
|
super(ClientListenThread, self).__init__(*args, **kwargs)
|
|
|
|
self.client = client
|
|
|
|
def run(self):
|
|
sys.stderr.write("Start client.listen()\n")
|
|
self.client.listen()
|
|
|
|
|
|
# ---- MAIN LOOP THAT HANDLES REQUESTS FROM BRIDGE ----
|
|
|
|
class MessengerBridge:
|
|
def __init__(self):
|
|
self.rev_uid = {}
|
|
self.uid_map = {}
|
|
self.joined_map = {}
|
|
self.init_backlog_length = 100
|
|
|
|
def getUserId(self, user):
|
|
retval = None
|
|
if user.url is not None and not "?" in user.url:
|
|
retval = user.url.split("/")[-1]
|
|
else:
|
|
retval = user.uid
|
|
|
|
if user.uid not in self.uid_map:
|
|
self.uid_map[user.uid] = retval
|
|
self.rev_uid[retval] = user.uid
|
|
|
|
user_info = {
|
|
"display_name": user.name,
|
|
}
|
|
if user.photo is not None:
|
|
user_info["avatar"] = mediaObjectOfURL(user.photo)
|
|
self.write({
|
|
"_type": USER_INFO_UPDATED,
|
|
"user": self.getUserId(user),
|
|
"data": user_info,
|
|
})
|
|
|
|
return retval
|
|
|
|
def getUserIdFromUid(self, uid):
|
|
if uid in self.uid_map:
|
|
return self.uid_map[uid]
|
|
else:
|
|
user = self.client.fetchUserInfo(uid)[uid]
|
|
return self.getUserId(user)
|
|
|
|
def revUserId(self, user_id):
|
|
if user_id in self.rev_uid:
|
|
return self.rev_uid[user_id]
|
|
else:
|
|
return user_id
|
|
|
|
def getUserShortName(self, user):
|
|
if user.first_name != None:
|
|
return user.first_name
|
|
else:
|
|
return user.name
|
|
|
|
def run(self):
|
|
self.client = None
|
|
self.keep_running = True
|
|
self.cache_gets = {}
|
|
self.num = 0
|
|
|
|
while self.keep_running:
|
|
try:
|
|
line = sys.stdin.readline()
|
|
except KeyboardInterrupt:
|
|
sys.stderr.write("(messenger) shutting down")
|
|
self.close()
|
|
time.sleep(5)
|
|
sys.exit(0)
|
|
|
|
sys.stderr.write("(python) reading {}\n".format(line.strip()))
|
|
cmd = json.loads(line)
|
|
|
|
try:
|
|
rep = self.handle_cmd(cmd)
|
|
if rep is None:
|
|
rep = {}
|
|
if "_type" not in rep:
|
|
rep["_type"] = REP_OK
|
|
except Exception as e:
|
|
sys.stderr.write("{}\n".format(traceback.format_exc()))
|
|
rep = {
|
|
"_type": REP_ERROR,
|
|
"error": "{}".format(e)
|
|
}
|
|
|
|
rep["_id"] = cmd["_id"]
|
|
self.write(rep)
|
|
|
|
def write(self, msg):
|
|
msgstr = json.dumps(msg)
|
|
sys.stderr.write("(python) writing {}\n".format(msgstr))
|
|
sys.stdout.write(msgstr + "\n")
|
|
sys.stdout.flush()
|
|
|
|
def handle_cmd(self, cmd):
|
|
ty = cmd["_type"]
|
|
if ty == CONFIGURE:
|
|
self.init_backlog_length = int(cmd["data"]["initial_backlog"])
|
|
client_file = "/tmp/fbclient_" + hashlib.sha224(cmd["data"]["email"].encode("utf-8")).hexdigest()
|
|
|
|
try:
|
|
f = open(client_file, "rb")
|
|
self.client = pickle.load(f)
|
|
f.close()
|
|
sys.stderr.write("(python messenger) using previous client: {}\n".format(client_file))
|
|
except:
|
|
self.client = None
|
|
|
|
if self.client is None:
|
|
email, password = cmd["data"]["email"], cmd["data"]["password"]
|
|
self.client = MessengerBridgeClient(email=email, password=password, max_tries=1)
|
|
|
|
if not self.client.isLoggedIn():
|
|
return {"_type": "ret_error", "error": "Unable to login (?)"}
|
|
|
|
try:
|
|
f = open(client_file, "wb")
|
|
pickle.dump(self.client, f)
|
|
f.close()
|
|
except:
|
|
pass
|
|
|
|
self.client.setBridge(self)
|
|
|
|
threads = self.client.fetchThreadList()
|
|
# ensure we have a correct mapping for bridged user IDs to fb uids
|
|
# (this should be fast)
|
|
for thread in threads:
|
|
if thread.type == ThreadType.USER:
|
|
self.getUserId(thread)
|
|
|
|
InitialSyncThread(self.client, self, threads).start()
|
|
ClientListenThread(self.client).start()
|
|
|
|
elif ty == CLOSE:
|
|
self.close()
|
|
|
|
elif ty == GET_USER:
|
|
userId = self.getUserIdFromUid(self.client.uid)
|
|
return {"_type": REP_OK, "user": userId}
|
|
|
|
elif ty == INVITE and cmd["room"] == "":
|
|
return {"_type": REP_OK}
|
|
|
|
elif ty == SEND:
|
|
event = cmd["data"]
|
|
if event["type"] in [EVENT_MESSAGE, EVENT_ACTION]:
|
|
attachments = []
|
|
if "attachments" in event and isinstance(event["attachments"], list):
|
|
for at in event["attachments"]:
|
|
if "url" in at:
|
|
attachments.append(at["url"])
|
|
else:
|
|
# TODO
|
|
sys.stdout.write("Unhandled: attachment without URL")
|
|
|
|
msg = Message(event["text"])
|
|
if event["type"] == EVENT_ACTION:
|
|
msg.text = "* " + event["text"]
|
|
|
|
if event["room"] != "":
|
|
if len(attachments) > 0:
|
|
msg_id = self.client.sendRemoteFiles(attachments, message=msg, thread_id=event["room"], thread_type=ThreadType.GROUP)
|
|
else:
|
|
msg_id = self.client.send(msg, thread_id=event["room"], thread_type=ThreadType.GROUP)
|
|
elif event["recipient"] != "":
|
|
uid = self.revUserId(event["recipient"])
|
|
sys.stderr.write("Sending to {}\n".format(uid))
|
|
if len(attachments) > 0:
|
|
msg_id = self.client.sendRemoteFiles(attachments, message=msg, thread_id=uid, thread_type=ThreadType.USER)
|
|
else:
|
|
msg_id = self.client.send(msg, thread_id=uid, thread_type=ThreadType.USER)
|
|
else:
|
|
return {"_type": REP_ERROR, "error": "Invalid message"}
|
|
|
|
return {"_type": REP_OK, "event_id": msg_id}
|
|
|
|
elif ty == REP_OK and cmd["_id"] in self.cache_gets:
|
|
self.cache_gets[cmd["_id"]].put(cmd["value"])
|
|
|
|
else:
|
|
return {"_type": REP_ERROR, "error": "Not implemented"}
|
|
|
|
def close(self):
|
|
self.keep_running = False
|
|
self.client.stopListening()
|
|
|
|
def cache_get(self, key):
|
|
self.num += 1
|
|
num = self.num
|
|
q = queue.Queue(1)
|
|
self.cache_gets[num] = q
|
|
self.write({"_type": CACHE_GET, "_id": num, "key": key})
|
|
rep = q.get(block=True, timeout=30)
|
|
del self.cache_gets[num]
|
|
return rep
|
|
|
|
def cache_put(self, key, value):
|
|
self.write({"_type": CACHE_PUT, "key": key, "value": value})
|
|
|
|
def send_room_info(self, thread, members):
|
|
room_info = {}
|
|
if thread.name is not None:
|
|
room_info["name"] = thread.name
|
|
else:
|
|
who = [m for m in members if m.uid != self.client.uid]
|
|
if len(who) > 3:
|
|
room_info["name"] = ", ".join([self.getUserShortName(m) for m in who[:3]] + ["..."])
|
|
else:
|
|
room_info["name"] = ", ".join([self.getUserShortName(m) for m in who])
|
|
|
|
if thread.photo is not None:
|
|
room_info["picture"] = mediaObjectOfURL(thread.photo)
|
|
else:
|
|
for m in members:
|
|
if m.uid != self.client.uid and m.photo is not None:
|
|
room_info["picture"] = mediaObjectOfURL(m.photo)
|
|
break
|
|
|
|
self.write({
|
|
"_type": ROOM_INFO_UPDATED,
|
|
"room": thread.uid,
|
|
"data": room_info,
|
|
})
|
|
|
|
def send_room_members(self, thread, members):
|
|
for member in members:
|
|
sys.stderr.write("fb thread member: {}\n".format(member))
|
|
self.ensureJoined(self.getUserId(member), thread.uid)
|
|
|
|
def ensureJoined(self, userId, room):
|
|
key = "{}--{}".format(userId, room)
|
|
if not key in self.joined_map:
|
|
self.write({
|
|
"_type": EVENT,
|
|
"data": {
|
|
"type": EVENT_JOIN,
|
|
"author": userId,
|
|
"room": room,
|
|
}
|
|
})
|
|
self.joined_map[key] = True
|
|
|
|
def onMessage(self, thread_id, thread_type, message_object, **kwargs):
|
|
if message_object.author == self.client.uid:
|
|
# Ignore our own messages
|
|
return
|
|
|
|
sys.stderr.write("(python messenger) Got message: {}\n".format(message_object))
|
|
|
|
author = self.getUserIdFromUid(message_object.author)
|
|
|
|
event = {
|
|
"type": EVENT_MESSAGE,
|
|
"author": author,
|
|
"text": message_object.text,
|
|
"attachments": []
|
|
}
|
|
if event["text"] is None:
|
|
event["text"] = ""
|
|
|
|
for at in message_object.attachments:
|
|
if isinstance(at, ImageAttachment):
|
|
full_url = self.client.fetchImageUrl(at.uid)
|
|
event["attachments"].append({
|
|
"filename": full_url.split("?")[0].split("/")[-1],
|
|
"url": full_url,
|
|
"image_size": {
|
|
"width": at.width,
|
|
"height": at.height,
|
|
},
|
|
})
|
|
elif isinstance(at, FileAttachment):
|
|
url = stripFbLinkPrefix(at.url)
|
|
event["attachments"].append({
|
|
"filename": at.name,
|
|
"url": url,
|
|
})
|
|
elif isinstance(at, AudioAttachment):
|
|
url = stripFbLinkPrefix(at.url)
|
|
event["attachments"].append({
|
|
"filename": at.filename,
|
|
"url": url,
|
|
})
|
|
else:
|
|
event["text"] += "\nUnhandled attachment: {}".format(at)
|
|
|
|
if thread_type == ThreadType.GROUP:
|
|
event["room"] = thread_id
|
|
self.ensureJoined(author, thread_id)
|
|
|
|
self.write({"_type": EVENT, "data": event})
|
|
|
|
self.cache_put("last_seen_%s"%thread_id, message_object.uid)
|
|
|
|
def onPeopleAdded(self, added_ids, thread_id, *args, **kwargs):
|
|
for user_id in added_ids:
|
|
self.ensureJoined(self.getUserIdFromUid(user_id), thread_id)
|
|
|
|
def onPersonRemoved(self, removed_id, thread_id, *args, **kwargs):
|
|
userId = self.getUserIdFromUid(removed_id),
|
|
self.write({
|
|
"_type": EVENT,
|
|
"data": {
|
|
"type": EVENT_JOIN,
|
|
"author": userId,
|
|
"room": thread_id,
|
|
}
|
|
})
|
|
del self.joined_map["{}--{}".format(userId, thread_id)]
|
|
|
|
def onTitleChange(self, author_id, new_title, thread_id, thread_type, *args, **kwargs):
|
|
if thread_type == ThreadType.GROUP:
|
|
self.bridge.write({
|
|
"_type": ROOM_INFO_UPDATED,
|
|
"room": thread_id,
|
|
"data": {"name": new_title},
|
|
})
|
|
|
|
if __name__ == "__main__":
|
|
bridge = MessengerBridge()
|
|
bridge.run()
|
|
|