easybridge/connector/xmpp/xmpp.go

445 lines
9.4 KiB
Go

package xmpp
import (
"crypto/tls"
"fmt"
"strings"
"sync"
"time"
gxmpp "github.com/matterbridge/go-xmpp"
"github.com/rs/xid"
log "github.com/sirupsen/logrus"
. "git.deuxfleurs.fr/Deuxfleurs/easybridge/connector"
)
// User id format: username@server (= JID)
// OR: nickname@room_name@muc_server
// Room id format: room_name@muc_server (= MUC ID)
type XMPP struct {
handler Handler
connectorLoopNum int
connected bool
timeout int
server string
port int
ssl bool
jid string
jid_localpart string
password string
nickname string
conn *gxmpp.Client
stateLock sync.Mutex
muc map[RoomID]*mucInfo
}
type mucInfo struct {
joined bool
pendingJoins map[UserID]string
pendingLeaves map[UserID]struct{}
}
func (xm *XMPP) SetHandler(h Handler) {
xm.handler = h
}
func (xm *XMPP) Protocol() string {
return XMPP_PROTOCOL
}
func (xm *XMPP) Configure(c Configuration) error {
if xm.conn != nil {
xm.Close()
}
// Parse and validate configuration
var err error
xm.port, err = c.GetInt("port", 5222)
if err != nil {
return err
}
xm.ssl, err = c.GetBool("ssl", true)
if err != nil {
return err
}
xm.jid, err = c.GetString("jid")
if err != nil {
return err
}
jid_parts := strings.Split(xm.jid, "@")
if len(jid_parts) != 2 {
return fmt.Errorf("Invalid JID: %s", xm.jid)
}
xm.server = jid_parts[1]
xm.jid_localpart = jid_parts[0]
xm.nickname, _ = c.GetString("nickname", xm.jid_localpart)
xm.password, err = c.GetString("password")
if err != nil {
return err
}
// Try to connect
xm.muc = make(map[RoomID]*mucInfo)
xm.connectorLoopNum += 1
go xm.connectLoop(xm.connectorLoopNum)
for i := 0; i < 42; i++ {
time.Sleep(time.Duration(1) * time.Second)
if xm.connected {
return nil
}
}
return fmt.Errorf("Failed to connect after 42s attempting")
}
func (xm *XMPP) connectLoop(num int) {
xm.timeout = 10
for {
if xm.connectorLoopNum != num {
return
}
tc := &tls.Config{
ServerName: xm.server,
InsecureSkipVerify: true,
}
options := gxmpp.Options{
Host: xm.server,
User: xm.jid,
Password: xm.password,
NoTLS: true,
StartTLS: xm.ssl,
Session: true,
TLSConfig: tc,
}
var err error
xm.conn, err = options.NewClient()
if err != nil {
xm.connected = false
xm.handler.SystemMessage(fmt.Sprintf("XMPP failed to connect (%s). Retrying in %ds", err, xm.timeout))
time.Sleep(time.Duration(xm.timeout) * time.Second)
xm.timeout *= 2
if xm.timeout > 600 {
xm.timeout = 600
}
} else {
xm.connected = true
xm.timeout = 10
for muc, mucInfo := range xm.muc {
if mucInfo.joined {
_, err := xm.conn.JoinMUCNoHistory(string(muc), xm.nickname)
if err != nil {
xm.handler.SystemMessage(fmt.Sprintf("Could not rejoin MUC %s after reconnection: %s", muc, err))
xm.handler.Left(RoomID(muc))
mucInfo.joined = false
}
}
}
err = xm.handleXMPP()
xm.connected = false
xm.handler.SystemMessage(fmt.Sprintf("XMPP disconnected (%s), reconnecting)", err))
}
}
}
func (xm *XMPP) xmppKeepAlive() chan bool {
done := make(chan bool)
go func() {
ticker := time.NewTicker(90 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := xm.conn.PingC2S("", ""); err != nil {
log.Debugf("PING failed %#v\n", err)
}
case <-done:
return
}
}
}()
return done
}
func (xm *XMPP) handleXMPP() error {
done := xm.xmppKeepAlive()
defer close(done)
for {
m, err := xm.conn.Recv()
if err != nil {
return err
}
log.Tracef("XMPP: %#v\n", m)
xm.handleXMPPStance(m)
}
}
func (xm *XMPP) handleXMPPStance(m interface{}) {
xm.stateLock.Lock()
defer xm.stateLock.Unlock()
switch v := m.(type) {
case gxmpp.Chat:
remote_sp := strings.Split(v.Remote, "/")
// Skip self-sent events
if v.Remote == xm.jid || (v.Type == "groupchat" && len(remote_sp) == 2 && remote_sp[1] == xm.nickname) {
return
}
// If empty text, make sure we joined the room
// We would do this at every incoming message if it were not so costly
if v.Text == "" && v.Type == "groupchat" {
xm.handler.Joined(RoomID(remote_sp[0]))
}
// Handle subject change in group chats
if v.Subject != "" && v.Type == "groupchat" {
author := UserID("")
if len(remote_sp) == 2 {
if remote_sp[1] == xm.nickname {
author = xm.User()
} else {
author = UserID(remote_sp[1] + "@" + remote_sp[0])
}
}
xm.handler.RoomInfoUpdated(RoomID(remote_sp[0]), author, &RoomInfo{
Topic: v.Subject,
})
}
// Handle text message
if v.Text != "" {
event := &Event{
Type: EVENT_MESSAGE,
Text: v.Text,
}
if strings.HasPrefix(event.Text, "/me ") {
event.Type = EVENT_ACTION
event.Text = strings.Replace(event.Text, "/me ", "", 1)
}
if v.Type == "chat" {
event.Author = UserID(remote_sp[0])
xm.handler.Event(event)
}
if v.Type == "groupchat" && len(remote_sp) == 2 {
// First flush pending leaves and joins
room_id := RoomID(remote_sp[0])
if muc, ok := xm.muc[room_id]; ok {
muc.flushLeavesJoins(room_id, xm.handler)
}
// Now send event
event.Room = room_id
event.Author = UserID(remote_sp[1] + "@" + remote_sp[0])
event.Id = v.ID
xm.handler.Event(event)
}
}
case gxmpp.Presence:
remote := strings.Split(v.From, "/")
room := RoomID(remote[0])
if mucInfo, ok := xm.muc[room]; ok {
// skip presence with no user and self-presence
if len(remote) < 2 || remote[1] == xm.nickname {
return
}
user := UserID(remote[1] + "@" + remote[0])
if v.Type != "unavailable" {
if _, ok := mucInfo.pendingLeaves[user]; ok {
delete(mucInfo.pendingLeaves, user)
} else {
mucInfo.pendingJoins[user] = remote[1]
}
} else {
if _, ok := mucInfo.pendingJoins[user]; ok {
delete(mucInfo.pendingJoins, user)
} else {
mucInfo.pendingLeaves[user] = struct{}{}
}
}
}
}
}
func (muc *mucInfo) flushLeavesJoins(room RoomID, handler Handler) {
for user, display_name := range muc.pendingJoins {
handler.Event(&Event{
Type: EVENT_JOIN,
Room: room,
Author: user,
})
handler.UserInfoUpdated(user, &UserInfo{
DisplayName: display_name,
})
}
for user, _ := range muc.pendingLeaves {
handler.Event(&Event{
Type: EVENT_LEAVE,
Room: room,
Author: user,
})
}
muc.pendingJoins = make(map[UserID]string)
muc.pendingLeaves = make(map[UserID]struct{})
}
func (xm *XMPP) User() UserID {
return UserID(xm.jid)
}
func (xm *XMPP) SetUserInfo(info *UserInfo) error {
return fmt.Errorf("Not implemented")
}
func (xm *XMPP) SetRoomInfo(roomId RoomID, info *RoomInfo) error {
if info.Topic != "" {
_, err := xm.conn.Send(gxmpp.Chat{
Type: "groupchat",
Remote: string(roomId),
Subject: info.Topic,
})
if err != nil {
return err
}
}
if info.Picture.MediaObject != nil {
// TODO
return fmt.Errorf("Room picture change not implemented on xmpp")
}
if info.Name != "" && info.Name != string(roomId) {
// TODO
return fmt.Errorf("Room name change not implemented on xmpp")
}
return nil
}
func (xm *XMPP) Join(roomId RoomID) error {
xm.stateLock.Lock()
defer xm.stateLock.Unlock()
xm.muc[roomId] = &mucInfo{
pendingJoins: make(map[UserID]string),
pendingLeaves: make(map[UserID]struct{}),
}
log.Tracef("Join %s with nick %s\n", roomId, xm.nickname)
_, err := xm.conn.JoinMUCNoHistory(string(roomId), xm.nickname)
if err == nil {
xm.muc[roomId].joined = true
}
return err
}
func (xm *XMPP) Invite(userId UserID, roomId RoomID) error {
if roomId == "" {
xm.conn.RequestSubscription(string(userId))
xm.conn.ApproveSubscription(string(userId))
return nil
}
// TODO
return fmt.Errorf("Not implemented")
}
func (xm *XMPP) Leave(roomId RoomID) {
xm.stateLock.Lock()
defer xm.stateLock.Unlock()
xm.conn.LeaveMUC(string(roomId))
if muc, ok := xm.muc[roomId]; ok {
muc.joined = false
}
}
func (xm *XMPP) SearchForUsers(query string) ([]UserSearchResult, error) {
// TODO: search roster
return nil, fmt.Errorf("Not implemented")
}
func (xm *XMPP) Send(event *Event) (string, error) {
xm.stateLock.Lock()
defer xm.stateLock.Unlock()
if event.Attachments != nil && len(event.Attachments) > 0 {
for _, at := range event.Attachments {
url := at.URL()
if url == "" {
// TODO find a way to send them using some hosing of some kind
return "", fmt.Errorf("Attachment without URL sent to XMPP")
} else {
event.Text += fmt.Sprintf("\n%s (%s, %dkb)",
url, at.Mimetype(), at.Size()/1024)
}
}
}
if event.Id == "" {
event.Id = xid.New().String()
}
log.Tracef("xm *XMPP Send %#v\n", event)
if len(event.Recipient) > 0 {
_, err := xm.conn.Send(gxmpp.Chat{
Type: "chat",
Remote: string(event.Recipient),
Text: event.Text,
})
return event.Id, err
} else if len(event.Room) > 0 {
if muc, ok := xm.muc[event.Room]; ok {
muc.flushLeavesJoins(event.Room, xm.handler)
}
_, err := xm.conn.Send(gxmpp.Chat{
Type: "groupchat",
Remote: string(event.Room),
Text: event.Text,
ID: event.Id,
})
return event.Id, err
} else {
return "", fmt.Errorf("Invalid event")
}
}
func (xm *XMPP) UserCommand(cmd string) {
xm.handler.SystemMessage("Command not supported.")
}
func (xm *XMPP) Close() {
xm.stateLock.Lock()
defer xm.stateLock.Unlock()
if xm.conn != nil {
xm.conn.Close()
}
xm.conn = nil
xm.connectorLoopNum += 1
xm.connected = false
}