easybridge/connector/xmpp/xmpp.go
Alex Auvolat 38a3f1bdb1 Fix Mattermost event deduplication
Mattermost assigns its own IDs to messages, thus when sending a message
to Mattermost the event_seen key that has to be written must take into
account that ID and not the one that we put in the event (which was the
Matrix event ID)

Note that for XMPP anything can be used as an ID, so using the Matrix
event ID there worked, but it's actually not so good.
2020-02-29 10:01:42 +01:00

355 lines
7.4 KiB
Go

package xmpp
import (
"crypto/tls"
"fmt"
"strings"
"time"
gxmpp "github.com/matterbridge/go-xmpp"
"github.com/rs/xid"
log "github.com/sirupsen/logrus"
. "git.deuxfleurs.fr/Deuxfleurs/easybridge/connector"
)
// User id format: username@server (= JID)
// OR: nickname@room_name@muc_server
// Room id format: room_name@muc_server (= MUC ID)
type XMPP struct {
handler Handler
connectorLoopNum int
connected bool
timeout int
server string
port int
ssl bool
jid string
jid_localpart string
password string
nickname string
conn *gxmpp.Client
isMUC map[string]bool
}
func (xm *XMPP) SetHandler(h Handler) {
xm.handler = h
}
func (xm *XMPP) Protocol() string {
return XMPP_PROTOCOL
}
func (xm *XMPP) Configure(c Configuration) error {
if xm.conn != nil {
xm.Close()
}
// Parse and validate configuration
var err error
xm.port, err = c.GetInt("port", 5222)
if err != nil {
return err
}
xm.ssl, err = c.GetBool("ssl", true)
if err != nil {
return err
}
xm.jid, err = c.GetString("jid")
if err != nil {
return err
}
jid_parts := strings.Split(xm.jid, "@")
if len(jid_parts) != 2 {
return fmt.Errorf("Invalid JID: %s", xm.jid)
}
xm.server = jid_parts[1]
xm.jid_localpart = jid_parts[0]
xm.nickname, _ = c.GetString("nickname", xm.jid_localpart)
xm.password, err = c.GetString("password")
if err != nil {
return err
}
// Try to connect
if xm.isMUC == nil {
xm.isMUC = make(map[string]bool)
}
xm.connectorLoopNum += 1
go xm.connectLoop(xm.connectorLoopNum)
for i := 0; i < 42; i++ {
time.Sleep(time.Duration(1) * time.Second)
if xm.connected {
return nil
}
}
return fmt.Errorf("Failed to connect after 42s attempting")
}
func (xm *XMPP) connectLoop(num int) {
xm.timeout = 10
for {
if xm.connectorLoopNum != num {
return
}
tc := &tls.Config{
ServerName: strings.Split(xm.jid, "@")[1],
InsecureSkipVerify: true,
}
options := gxmpp.Options{
Host: xm.server,
User: xm.jid,
Password: xm.password,
NoTLS: true,
StartTLS: xm.ssl,
Session: true,
TLSConfig: tc,
}
var err error
xm.conn, err = options.NewClient()
if err != nil {
xm.connected = false
log.Debugf("XMPP failed to connect / disconnected: %s\n", err)
log.Debugf("Retrying in %ds\n", xm.timeout)
time.Sleep(time.Duration(xm.timeout) * time.Second)
xm.timeout *= 2
if xm.timeout > 600 {
xm.timeout = 600
}
} else {
xm.connected = true
xm.timeout = 10
err = xm.handleXMPP()
if err != nil {
xm.connected = false
log.Debugf("XMPP disconnected: %s\n", err)
log.Debugf("Reconnecting.\n")
}
}
}
}
func (xm *XMPP) xmppKeepAlive() chan bool {
done := make(chan bool)
go func() {
ticker := time.NewTicker(90 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := xm.conn.PingC2S("", ""); err != nil {
log.Debugf("PING failed %#v\n", err)
}
case <-done:
return
}
}
}()
return done
}
func (xm *XMPP) handleXMPP() error {
done := xm.xmppKeepAlive()
defer close(done)
for {
m, err := xm.conn.Recv()
if err != nil {
return err
}
log.Tracef("XMPP: %#v\n", m)
switch v := m.(type) {
case gxmpp.Chat:
remote_sp := strings.Split(v.Remote, "/")
// Skip self-sent events
if v.Remote == xm.jid || (v.Type == "groupchat" && len(remote_sp) == 2 && remote_sp[1] == xm.nickname) {
continue
}
// If empty text, make sure we joined the room
// We would do this at every incoming message if it were not so costly
if v.Text == "" && v.Type == "groupchat" {
xm.handler.Joined(RoomID(remote_sp[0]))
}
// Handle subject change in group chats
if v.Subject != "" && v.Type == "groupchat" {
author := UserID("")
if len(remote_sp) == 2 {
if remote_sp[1] == xm.nickname {
author = xm.User()
} else {
author = UserID(remote_sp[1] + "@" + remote_sp[0])
}
}
xm.handler.RoomInfoUpdated(RoomID(remote_sp[0]), author, &RoomInfo{
Topic: v.Subject,
})
}
// Handle text message
if v.Text != "" {
event := &Event{
Type: EVENT_MESSAGE,
Text: v.Text,
}
if strings.HasPrefix(event.Text, "/me ") {
event.Type = EVENT_ACTION
event.Text = strings.Replace(event.Text, "/me ", "", 1)
}
if v.Type == "chat" {
event.Author = UserID(remote_sp[0])
xm.handler.Event(event)
}
if v.Type == "groupchat" && len(remote_sp) == 2 {
event.Room = RoomID(remote_sp[0])
event.Author = UserID(remote_sp[1] + "@" + remote_sp[0])
event.Id = v.ID
xm.handler.Event(event)
}
}
case gxmpp.Presence:
remote := strings.Split(v.From, "/")
if ismuc, ok := xm.isMUC[remote[0]]; ok && ismuc {
// skip presence with no user and self-presence
if len(remote) < 2 || remote[1] == xm.nickname {
continue
}
user := UserID(remote[1] + "@" + remote[0])
event := &Event{
Type: EVENT_JOIN,
Room: RoomID(remote[0]),
Author: user,
}
if v.Type == "unavailable" {
event.Type = EVENT_LEAVE
}
xm.handler.Event(event)
xm.handler.UserInfoUpdated(user, &UserInfo{
DisplayName: remote[1],
})
}
}
}
}
func (xm *XMPP) User() UserID {
return UserID(xm.jid)
}
func (xm *XMPP) SetUserInfo(info *UserInfo) error {
return fmt.Errorf("Not implemented")
}
func (xm *XMPP) SetRoomInfo(roomId RoomID, info *RoomInfo) error {
if info.Topic != "" {
_, err := xm.conn.Send(gxmpp.Chat{
Type: "groupchat",
Remote: string(roomId),
Subject: info.Topic,
})
if err != nil {
return err
}
}
if info.Picture != nil {
// TODO
return fmt.Errorf("Room picture change not implemented on xmpp")
}
if info.Name != "" && info.Name != string(roomId) {
// TODO
return fmt.Errorf("Room name change not implemented on xmpp")
}
return nil
}
func (xm *XMPP) Join(roomId RoomID) error {
xm.isMUC[string(roomId)] = true
log.Tracef("Join %s with nick %s\n", roomId, xm.nickname)
_, err := xm.conn.JoinMUCNoHistory(string(roomId), xm.nickname)
return err
}
func (xm *XMPP) Invite(userId UserID, roomId RoomID) error {
if roomId == "" {
xm.conn.RequestSubscription(string(userId))
xm.conn.ApproveSubscription(string(userId))
return nil
}
// TODO
return fmt.Errorf("Not implemented")
}
func (xm *XMPP) Leave(roomId RoomID) {
xm.conn.LeaveMUC(string(roomId))
}
func (xm *XMPP) Send(event *Event) (string, error) {
if event.Attachments != nil && len(event.Attachments) > 0 {
for _, at := range event.Attachments {
url := at.URL()
if url == "" {
// TODO find a way to send them using some hosing of some kind
return "", fmt.Errorf("Attachment without URL sent to XMPP")
} else {
event.Text += fmt.Sprintf("\n%s (%s, %dkb)",
url, at.Mimetype(), at.Size()/1024)
}
}
}
if event.Id == "" {
event.Id = xid.New().String()
}
log.Tracef("xm *XMPP Send %#v\n", event)
if len(event.Recipient) > 0 {
_, err := xm.conn.Send(gxmpp.Chat{
Type: "chat",
Remote: string(event.Recipient),
Text: event.Text,
})
return event.Id, err
} else if len(event.Room) > 0 {
_, err := xm.conn.Send(gxmpp.Chat{
Type: "groupchat",
Remote: string(event.Room),
Text: event.Text,
ID: event.Id,
})
return event.Id, err
} else {
return "", fmt.Errorf("Invalid event")
}
}
func (xm *XMPP) Close() {
if xm.conn != nil {
xm.conn.Close()
}
xm.conn = nil
xm.connectorLoopNum += 1
xm.connected = false
}