easybridge/external/messenger.py

397 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
import sys
import json
import signal
import threading
import queue
import pickle
import time
import hashlib
import fbchat
from fbchat.models import *
# ---- MESSAGE TYPES ----
# ezbr -> external
CONFIGURE = "configure"
GET_USER = "get_user"
SET_USER_INFO = "set_user_info"
SET_ROOM_INFO = "set_room_info"
JOIN = "join"
INVITE = "invite"
LEAVE = "leave"
SEND = "send"
CLOSE = "close"
# external -> ezbr
JOINED = "joined"
LEFT = "left"
USER_INFO_UPDATED = "user_info_updated"
ROOM_INFO_UPDATED = "room_info_updated"
EVENT = "event"
CACHE_PUT = "cache_put"
CACHE_GET = "cache_get"
# reply messages
# ezbr -> external: all must wait for a reply!
# external -> ezbr: only CACHE_GET produces a reply
REP_OK = "rep_ok"
REP_ERROR = "rep_error"
# Event types
EVENT_JOIN = "join"
EVENT_LEAVE = "leave"
EVENT_MESSAGE = "message"
EVENT_ACTION = "action"
def mediaObjectOfURL(url):
return {
"filename": url.split("?")[0].split("/")[-1],
"url": url,
}
# ---- MESSENGER CLIENT CLASS THAT HANDLES EVENTS ----
class MessengerBridgeClient(fbchat.Client):
def __init__(self, *args, **kwargs):
super(MessengerBridgeClient, self).__init__(*args, **kwargs)
self.bridge = None
def setBridge(self, bridge):
self.bridge = bridge
## Redirect all interesting events to Bridge
def onMessage(self, *args, **kwargs):
self.bridge.onMessage(*args, **kwargs)
def onPeopleAdded(self, *args, **kwargs):
self.bridge.onPeopleAdded(*args, **kwargs)
def onPersonRemoved(self, *args, **kwargs):
self.bridge.onPersonRemoved(*args, **kwargs)
def onTitleChange(self, *args, **kwargs):
self.bridge.onTitleChange(*args, **kwargs)
# ---- SEPARATE THREADS FOR INITIAL SYNC & CLIENT LISTEN ----
class InitialSyncThread(threading.Thread):
def __init__(self, client, bridge, threads, *args, **kwargs):
super(InitialSyncThread, self).__init__(*args, **kwargs)
self.client = client
self.bridge = bridge
self.threads = threads
def run(self):
sys.stderr.write("fb thread list: {}\n".format(threads))
for thread in self.threads:
sys.stderr.write("fb thread: {}\n".format(thread))
if thread.type == ThreadType.GROUP:
members = self.client.fetchAllUsersFromThreads([thread])
self.bridge.write({
"_type": JOINED,
"room": thread.uid,
})
self.send_room_info(thread, members)
self.send_room_members(thread, members)
self.backlog_room(thread)
def send_room_info(self, thread, members):
room_info = {}
if thread.name is not None:
room_info["name"] = thread.name
else:
who = [m for m in members if m.uid != self.client.uid]
if len(who) > 3:
room_info["name"] = ", ".join([self.bridge.getUserShortName(m) for m in who[:3]] + ["..."])
else:
room_info["name"] = ", ".join([self.bridge.getUserShortName(m) for m in who])
if thread.photo is not None:
room_info["picture"] = mediaObjectOfURL(thread.photo)
else:
for m in members:
if m.uid != self.client.uid and m.photo is not None:
room_info["picture"] = mediaObjectOfURL(m.photo)
break
self.bridge.write({
"_type": ROOM_INFO_UPDATED,
"room": thread.uid,
"data": room_info,
})
def send_room_members(self, thread, members):
for member in members:
sys.stderr.write("fb thread member: {}\n".format(member))
self.bridge.ensureJoined(self.bridge.getUserId(member), thread.uid)
def backlog_room(self, thread):
prev_last_seen = self.bridge.cache_get("last_seen_%s"%thread.uid)
if prev_last_seen == "":
messages = self.client.fetchThreadMessages(thread.uid, limit=100)
else:
messages = []
found = False
while not found:
before = None
if len(messages) > 0:
before = messages[-1].timestamp
page = self.client.fetchThreadMessages(thread.uid, before=before, limit=20)
for m in page:
if m.uid == prev_last_seen:
found = True
break
else:
messages.append(m)
for m in reversed(messages):
self.bridge.onMessage(thread_id=thread.uid,
thread_type=thread.type,
message_object=m)
class ClientListenThread(threading.Thread):
def __init__(self, client, *args, **kwargs):
super(ClientListenThread, self).__init__(*args, **kwargs)
self.client = client
def run(self):
sys.stderr.write("Start client.listen()\n")
self.client.listen()
# ---- MAIN LOOP THAT HANDLES REQUESTS FROM BRIDGE ----
class MessengerBridge:
def __init__(self):
self.rev_uid = {}
self.uid_map = {}
self.joined_map = {}
def getUserId(self, user):
retval = None
if user.url is not None and not "?" in user.url:
user_id = user.url.split("/")[-1]
self.rev_uid[user_id] = user.uid
retval = user_id
else:
retval = user.uid
if user.uid not in self.uid_map:
self.uid_map[user.uid] = retval
user_info = {
"display_name": user.name,
}
if user.photo is not None:
user_info["avatar"] = mediaObjectOfURL(user.photo)
self.bridge.write({
"_type": USER_INFO_UPDATED,
"user": self.bridge.getUserId(user),
"data": user_info,
})
return retval
def getUserIdFromUid(self, uid):
if uid in self.uid_map:
return self.uid_map[uid]
else:
user = self.client.fetchUserInfo(uid)[uid]
return self.getUserId(user)
def revUserId(self, user_id):
if user_id in self.rev_uid:
return self.rev_uid[user_id]
else:
return user_id
def getUserShortName(self, user):
if user.first_name != None:
return user.first_name
else:
return user.name
def run(self):
self.client = None
self.keep_running = True
self.cache_gets = {}
self.num = 0
while self.keep_running:
try:
line = sys.stdin.readline()
except KeyboardInterrupt:
sys.stderr.write("(messenger) shutting down")
self.close()
time.sleep(5)
sys.exit(0)
sys.stderr.write("(python) reading {}\n".format(line.strip()))
cmd = json.loads(line)
try:
rep = self.handle_cmd(cmd)
if rep is None:
rep = {}
if "_type" not in rep:
rep["_type"] = REP_OK
except Exception as e:
rep = {
"_type": REP_ERROR,
"error": "{}".format(e)
}
rep["_id"] = cmd["_id"]
self.write(rep)
def write(self, msg):
msgstr = json.dumps(msg)
sys.stderr.write("(python) writing {}\n".format(msgstr))
sys.stdout.write(msgstr + "\n")
sys.stdout.flush()
def handle_cmd(self, cmd):
ty = cmd["_type"]
if ty == CONFIGURE:
client_file = "/tmp/fbclient_" + hashlib.sha224(cmd["data"]["email"].encode("utf-8")).hexdigest()
try:
f = open(client_file, "rb")
self.client = pickle.load(f)
f.close()
sys.stderr.write("(python messenger) using previous client: {}\n".format(client_file))
except:
self.client = None
if self.client is None:
email, password = cmd["data"]["email"], cmd["data"]["password"]
self.client = MessengerBridgeClient(email=email, password=password, max_tries=1)
if not self.client.isLoggedIn():
return {"_type": "ret_error", "error": "Unable to login (?)"}
try:
f = open(client_file, "wb")
pickle.dump(self.client, f)
f.close()
except:
pass
self.client.setBridge(self)
threads = self.client.fetchThreadList()
# ensure we have a correct mapping for bridged user IDs to fb uids
# (this should be fast)
for thread in threads:
if thread.type == ThreadType.GROUP:
self.getUserId(thread)
InitialSyncThread(self.client, self, threads).start()
ClientListenThread(self.client).start()
elif ty == CLOSE:
self.close()
elif ty == GET_USER:
userId = self.getUserIdFromUid(self.client.uid)
return {"_type": REP_OK, "user": userId}
elif ty == INVITE and cmd["room"] == "":
return {"_type": REP_OK}
elif ty == SEND:
event = cmd["data"]
if event["type"] in [EVENT_MESSAGE, EVENT_ACTION]:
# TODO: attachments
msg = Message(event["text"])
if event["type"] == EVENT_ACTION:
msg.text = "* " + event["text"]
if event["room"] != "":
msg_id = self.client.send(msg, thread_id=event["room"], thread_type=ThreadType.GROUP)
elif event["recipient"] != "":
uid = self.revUserId(event["recipient"])
msg_id = self.client.send(msg, thread_id=uid, thread_type=ThreadType.USER)
else:
return {"_type": REP_ERROR, "error": "Invalid message"}
return {"_type": REP_OK, "event_id": msg_id}
elif ty == REP_OK and cmd["_id"] in self.cache_gets:
self.cache_gets[cmd["_id"]].put(cmd["value"])
else:
return {"_type": REP_ERROR, "error": "Not implemented"}
def close(self):
self.keep_running = False
self.client.stopListening()
def cache_get(self, key):
self.num += 1
num = self.num
q = queue.Queue(1)
self.cache_gets[num] = q
self.write({"_type": CACHE_GET, "_id": num, "key": key})
rep = q.get(block=True, timeout=30)
del self.cache_gets[num]
return rep
def cache_put(self, key, value):
self.write({"_type": CACHE_PUT, "key": key, "value": value})
def ensureJoined(self, userId, room):
key = "{}--{}".format(userId, room)
if not key in self.joined_map:
self.bridge.write({
"_type": EVENT,
"data": {
"type": EVENT_JOIN,
"author": userId,
"room": room,
}
})
self.joined_map[key] = true
def onMessage(self, thread_id, thread_type, message_object, **kwargs):
if message_object.author == self.client.uid:
# Ignore our own messages
return
author = self.getUserIdFromUid(message_object.author)
event = {
"type": EVENT_MESSAGE,
"author": author,
"text": message_object.text,
}
if thread_type == ThreadType.GROUP:
event["room"] = thread_id
self.ensureJoined(author, thread_id)
self.write({"_type": EVENT, "data": event})
self.cache_put("last_seen_%s"%thread_id, message_object.uid)
def onPeopleAdded(self, *args, **kwargs):
pass
def onPersonRemoved(self, *args, **kwargs):
pass
def onTitleChange(self, *args, **kwargs):
pass
if __name__ == "__main__":
bridge = MessengerBridge()
bridge.run()