easybridge/external/messenger.py

477 lines
15 KiB
Python
Executable File

#!/usr/bin/env python3
import sys
import json
import signal
import threading
import queue
import pickle
import time
import traceback
from urllib.parse import unquote as UrlUnquote
import hashlib
import fbchat
from fbchat.models import *
# ---- MESSAGE TYPES ----
# ezbr -> external
CONFIGURE = "configure"
GET_USER = "get_user"
SET_USER_INFO = "set_user_info"
SET_ROOM_INFO = "set_room_info"
JOIN = "join"
INVITE = "invite"
LEAVE = "leave"
SEND = "send"
CLOSE = "close"
# external -> ezbr
JOINED = "joined"
LEFT = "left"
USER_INFO_UPDATED = "user_info_updated"
ROOM_INFO_UPDATED = "room_info_updated"
EVENT = "event"
CACHE_PUT = "cache_put"
CACHE_GET = "cache_get"
# reply messages
# ezbr -> external: all must wait for a reply!
# external -> ezbr: only CACHE_GET produces a reply
REP_OK = "rep_ok"
REP_ERROR = "rep_error"
# Event types
EVENT_JOIN = "join"
EVENT_LEAVE = "leave"
EVENT_MESSAGE = "message"
EVENT_ACTION = "action"
def mediaObjectOfURL(url):
return {
"filename": url.split("?")[0].split("/")[-1],
"url": url,
}
def stripFbLinkPrefix(url):
PREFIX = "https://l.facebook.com/l.php?u="
if url[:len(PREFIX)] == PREFIX:
return UrlUnquote(url[len(PREFIX):].split('&')[0])
else:
return url
# ---- MESSENGER CLIENT CLASS THAT HANDLES EVENTS ----
class MessengerBridgeClient(fbchat.Client):
def __init__(self, *args, **kwargs):
super(MessengerBridgeClient, self).__init__(*args, **kwargs)
self.bridge = None
def setBridge(self, bridge):
self.bridge = bridge
## Redirect all interesting events to Bridge
def onMessage(self, *args, **kwargs):
self.bridge.onMessage(*args, **kwargs)
def onPeopleAdded(self, *args, **kwargs):
self.bridge.onPeopleAdded(*args, **kwargs)
def onPersonRemoved(self, *args, **kwargs):
self.bridge.onPersonRemoved(*args, **kwargs)
def onTitleChange(self, *args, **kwargs):
self.bridge.onTitleChange(*args, **kwargs)
# ---- SEPARATE THREADS FOR INITIAL SYNC & CLIENT LISTEN ----
class InitialSyncThread(threading.Thread):
def __init__(self, client, bridge, threads, *args, **kwargs):
super(InitialSyncThread, self).__init__(*args, **kwargs)
self.client = client
self.bridge = bridge
self.threads = threads
def run(self):
sys.stderr.write("fb thread list: {}\n".format(self.threads))
for thread in self.threads:
sys.stderr.write("fb thread: {}\n".format(thread))
if thread.type == ThreadType.GROUP:
members = self.client.fetchAllUsersFromThreads([thread])
self.bridge.write({
"_type": JOINED,
"room": thread.uid,
})
self.bridge.send_room_info(thread, members)
self.bridge.send_room_members(thread, members)
self.backlog_room(thread)
def backlog_room(self, thread):
prev_last_seen = self.bridge.cache_get("last_seen_%s"%thread.uid)
if prev_last_seen == "":
prev_last_seen = None
messages = []
found = False
while not found:
before = None
if len(messages) > 0:
before = messages[-1].timestamp
page = self.client.fetchThreadMessages(thread.uid, before=before, limit=20)
for m in page:
if m.uid == prev_last_seen or len(messages) > self.bridge.init_backlog_length:
found = True
break
else:
messages.append(m)
for m in reversed(messages):
if m.text is None:
m.text = ""
m.text = "[{}] {}".format(
time.strftime("%Y-%m-%d %H:%M %Z", time.localtime(float(m.timestamp)/1000)).strip(),
m.text)
self.bridge.onMessage(thread_id=thread.uid,
thread_type=thread.type,
message_object=m)
class ClientListenThread(threading.Thread):
def __init__(self, client, *args, **kwargs):
super(ClientListenThread, self).__init__(*args, **kwargs)
self.client = client
def run(self):
sys.stderr.write("Start client.listen()\n")
self.client.listen()
# ---- MAIN LOOP THAT HANDLES REQUESTS FROM BRIDGE ----
class MessengerBridge:
def __init__(self):
self.rev_uid = {}
self.uid_map = {}
self.joined_map = {}
self.init_backlog_length = 100
def getUserId(self, user):
retval = None
if user.url is not None and not "?" in user.url:
retval = user.url.split("/")[-1]
else:
retval = user.uid
if user.uid not in self.uid_map:
self.uid_map[user.uid] = retval
self.rev_uid[retval] = user.uid
user_info = {
"display_name": user.name,
}
if user.photo is not None:
user_info["avatar"] = mediaObjectOfURL(user.photo)
self.write({
"_type": USER_INFO_UPDATED,
"user": self.getUserId(user),
"data": user_info,
})
return retval
def getUserIdFromUid(self, uid):
if uid in self.uid_map:
return self.uid_map[uid]
else:
user = self.client.fetchUserInfo(uid)[uid]
return self.getUserId(user)
def revUserId(self, user_id):
if user_id in self.rev_uid:
return self.rev_uid[user_id]
else:
return user_id
def getUserShortName(self, user):
if user.first_name != None:
return user.first_name
else:
return user.name
def run(self):
self.client = None
self.keep_running = True
self.cache_gets = {}
self.num = 0
while self.keep_running:
try:
line = sys.stdin.readline()
except KeyboardInterrupt:
sys.stderr.write("(messenger) shutting down")
self.close()
time.sleep(5)
sys.exit(0)
sys.stderr.write("(python) reading {}\n".format(line.strip()))
cmd = json.loads(line)
try:
rep = self.handle_cmd(cmd)
if rep is None:
rep = {}
if "_type" not in rep:
rep["_type"] = REP_OK
except Exception as e:
sys.stderr.write("{}\n".format(traceback.format_exc()))
rep = {
"_type": REP_ERROR,
"error": "{}".format(e)
}
rep["_id"] = cmd["_id"]
self.write(rep)
def write(self, msg):
msgstr = json.dumps(msg)
sys.stderr.write("(python) writing {}\n".format(msgstr))
sys.stdout.write(msgstr + "\n")
sys.stdout.flush()
def handle_cmd(self, cmd):
ty = cmd["_type"]
if ty == CONFIGURE:
self.init_backlog_length = int(cmd["data"]["initial_backlog"])
client_file = "/tmp/fbclient_" + hashlib.sha224(cmd["data"]["email"].encode("utf-8")).hexdigest()
try:
f = open(client_file, "rb")
self.client = pickle.load(f)
f.close()
sys.stderr.write("(python messenger) using previous client: {}\n".format(client_file))
except:
self.client = None
if self.client is None:
email, password = cmd["data"]["email"], cmd["data"]["password"]
self.client = MessengerBridgeClient(email=email, password=password, max_tries=1)
if not self.client.isLoggedIn():
return {"_type": "ret_error", "error": "Unable to login (?)"}
try:
f = open(client_file, "wb")
pickle.dump(self.client, f)
f.close()
except:
pass
self.client.setBridge(self)
threads = self.client.fetchThreadList()
# ensure we have a correct mapping for bridged user IDs to fb uids
# (this should be fast)
for thread in threads:
if thread.type == ThreadType.USER:
self.getUserId(thread)
InitialSyncThread(self.client, self, threads).start()
ClientListenThread(self.client).start()
elif ty == CLOSE:
self.close()
elif ty == GET_USER:
userId = self.getUserIdFromUid(self.client.uid)
return {"_type": REP_OK, "user": userId}
elif ty == INVITE and cmd["room"] == "":
return {"_type": REP_OK}
elif ty == SEND:
event = cmd["data"]
if event["type"] in [EVENT_MESSAGE, EVENT_ACTION]:
attachments = []
if "attachments" in event and isinstance(event["attachments"], list):
for at in event["attachments"]:
if "url" in at:
attachments.append(at["url"])
else:
# TODO
sys.stdout.write("Unhandled: attachment without URL")
msg = Message(event["text"])
if event["type"] == EVENT_ACTION:
msg.text = "* " + event["text"]
if event["room"] != "":
if len(attachments) > 0:
msg_id = self.client.sendRemoteFiles(attachments, message=msg, thread_id=event["room"], thread_type=ThreadType.GROUP)
else:
msg_id = self.client.send(msg, thread_id=event["room"], thread_type=ThreadType.GROUP)
elif event["recipient"] != "":
uid = self.revUserId(event["recipient"])
sys.stderr.write("Sending to {}\n".format(uid))
if len(attachments) > 0:
msg_id = self.client.sendRemoteFiles(attachments, message=msg, thread_id=uid, thread_type=ThreadType.USER)
else:
msg_id = self.client.send(msg, thread_id=uid, thread_type=ThreadType.USER)
else:
return {"_type": REP_ERROR, "error": "Invalid message"}
return {"_type": REP_OK, "event_id": msg_id}
elif ty == REP_OK and cmd["_id"] in self.cache_gets:
self.cache_gets[cmd["_id"]].put(cmd["value"])
else:
return {"_type": REP_ERROR, "error": "Not implemented"}
def close(self):
self.keep_running = False
self.client.stopListening()
def cache_get(self, key):
self.num += 1
num = self.num
q = queue.Queue(1)
self.cache_gets[num] = q
self.write({"_type": CACHE_GET, "_id": num, "key": key})
rep = q.get(block=True, timeout=30)
del self.cache_gets[num]
return rep
def cache_put(self, key, value):
self.write({"_type": CACHE_PUT, "key": key, "value": value})
def send_room_info(self, thread, members):
room_info = {}
if thread.name is not None:
room_info["name"] = thread.name
else:
who = [m for m in members if m.uid != self.client.uid]
if len(who) > 3:
room_info["name"] = ", ".join([self.getUserShortName(m) for m in who[:3]] + ["..."])
else:
room_info["name"] = ", ".join([self.getUserShortName(m) for m in who])
if thread.photo is not None:
room_info["picture"] = mediaObjectOfURL(thread.photo)
else:
for m in members:
if m.uid != self.client.uid and m.photo is not None:
room_info["picture"] = mediaObjectOfURL(m.photo)
break
self.write({
"_type": ROOM_INFO_UPDATED,
"room": thread.uid,
"data": room_info,
})
def send_room_members(self, thread, members):
for member in members:
sys.stderr.write("fb thread member: {}\n".format(member))
self.ensureJoined(self.getUserId(member), thread.uid)
def ensureJoined(self, userId, room):
key = "{}--{}".format(userId, room)
if not key in self.joined_map:
self.write({
"_type": EVENT,
"data": {
"type": EVENT_JOIN,
"author": userId,
"room": room,
}
})
self.joined_map[key] = True
def onMessage(self, thread_id, thread_type, message_object, **kwargs):
if message_object.author == self.client.uid:
# Ignore our own messages
return
sys.stderr.write("(python messenger) Got message: {}\n".format(message_object))
author = self.getUserIdFromUid(message_object.author)
event = {
"type": EVENT_MESSAGE,
"author": author,
"text": message_object.text,
"attachments": []
}
if event["text"] is None:
event["text"] = ""
for at in message_object.attachments:
if isinstance(at, ImageAttachment):
full_url = self.client.fetchImageUrl(at.uid)
event["attachments"].append({
"filename": full_url.split("?")[0].split("/")[-1],
"url": full_url,
"image_size": {
"width": at.width,
"height": at.height,
},
})
elif isinstance(at, FileAttachment):
url = stripFbLinkPrefix(at.url)
event["attachments"].append({
"filename": at.name,
"url": url,
})
elif isinstance(at, AudioAttachment):
url = stripFbLinkPrefix(at.url)
event["attachments"].append({
"filename": at.filename,
"url": url,
})
else:
event["text"] += "\nUnhandled attachment: {}".format(at)
if thread_type == ThreadType.GROUP:
event["room"] = thread_id
self.ensureJoined(author, thread_id)
self.write({"_type": EVENT, "data": event})
self.cache_put("last_seen_%s"%thread_id, message_object.uid)
def onPeopleAdded(self, added_ids, thread_id, *args, **kwargs):
for user_id in added_ids:
self.ensureJoined(self.getUserIdFromUid(user_id), thread_id)
def onPersonRemoved(self, removed_id, thread_id, *args, **kwargs):
userId = self.getUserIdFromUid(removed_id),
self.write({
"_type": EVENT,
"data": {
"type": EVENT_JOIN,
"author": userId,
"room": thread_id,
}
})
del self.joined_map["{}--{}".format(userId, thread_id)]
def onTitleChange(self, author_id, new_title, thread_id, thread_type, *args, **kwargs):
if thread_type == ThreadType.GROUP:
self.bridge.write({
"_type": ROOM_INFO_UPDATED,
"room": thread_id,
"data": {"name": new_title},
})
if __name__ == "__main__":
bridge = MessengerBridge()
bridge.run()