661 lines
16 KiB
Go
661 lines
16 KiB
Go
package mattermost
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net/http"
|
|
_ "os"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/42wim/matterbridge/matterclient"
|
|
"github.com/mattermost/mattermost-server/model"
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
. "git.deuxfleurs.fr/Deuxfleurs/easybridge/connector"
|
|
)
|
|
|
|
// User id format: nickname@server
|
|
// Room id format: room_name@team@server
|
|
|
|
type Mattermost struct {
|
|
handler Handler
|
|
|
|
server string
|
|
username string
|
|
teams map[string]bool
|
|
|
|
initial_members int // How many room members (maximum) to load when first joining a channel
|
|
initial_backlog int // How many previous messages (maximum) to load when first joining a channel
|
|
|
|
conn *matterclient.MMClient
|
|
handlerStopChan chan struct{}
|
|
|
|
caches mmCaches
|
|
}
|
|
|
|
type mmCaches struct {
|
|
sync.Mutex
|
|
|
|
mmusers map[string]string // map mm username to mm user id
|
|
sentjoined map[string]bool // map username/room name to bool
|
|
displayname map[UserID]string // map username to last displayname
|
|
initsynced map[string]bool // chans for which init sync has been done
|
|
}
|
|
|
|
func (mm *Mattermost) SetHandler(h Handler) {
|
|
mm.handler = h
|
|
}
|
|
|
|
func (mm *Mattermost) Protocol() string {
|
|
return MATTERMOST_PROTOCOL
|
|
}
|
|
|
|
func (mm *Mattermost) Configure(c Configuration) error {
|
|
if mm.conn != nil {
|
|
mm.Close()
|
|
}
|
|
|
|
// Reinitialize shared data structures
|
|
mm.handlerStopChan = make(chan struct{})
|
|
|
|
mm.caches.mmusers = make(map[string]string)
|
|
mm.caches.sentjoined = make(map[string]bool)
|
|
mm.caches.displayname = make(map[UserID]string)
|
|
mm.caches.initsynced = make(map[string]bool)
|
|
|
|
// Read config
|
|
var err error
|
|
|
|
mm.server, err = c.GetString("server")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
mm.username, err = c.GetString("username")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
mm.initial_members, err = c.GetInt("initial_members", 100)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
mm.initial_backlog, err = c.GetInt("initial_backlog", 1000)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
teams, err := c.GetString("teams")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
mm.teams = map[string]bool{}
|
|
anyteam := ""
|
|
for _, team := range strings.Split(teams, ",") {
|
|
anyteam = strings.TrimSpace(team)
|
|
mm.teams[anyteam] = true
|
|
}
|
|
|
|
notls, err := c.GetBool("no_tls", false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
password, _ := c.GetString("password", "")
|
|
token, _ := c.GetString("token", "")
|
|
if token != "" {
|
|
password = "token=" + token
|
|
}
|
|
|
|
// Try to log in
|
|
mm.conn = matterclient.New(mm.username, password, anyteam, mm.server)
|
|
mm.conn.Credentials.NoTLS = notls
|
|
err = mm.conn.Login()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Try to start listening for messages
|
|
// Everytime the listener reconnects, mm.handleConnected does a sync of room status
|
|
mm.conn.OnWsConnect = mm.handleConnected
|
|
go mm.conn.WsReceiver()
|
|
go mm.conn.StatusLoop()
|
|
go mm.handleLoop(mm.conn.MessageChan, mm.handlerStopChan)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (mm *Mattermost) User() UserID {
|
|
return UserID(mm.username + "@" + mm.server)
|
|
}
|
|
|
|
func (mm *Mattermost) getTeamIdByName(name string) string {
|
|
for _, team := range mm.conn.OtherTeams {
|
|
if team.Team.Name == name {
|
|
return team.Id
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func (mm *Mattermost) checkRoomId(id RoomID) (string, error) {
|
|
x := strings.Split(string(id), "@")
|
|
if len(x) == 1 {
|
|
return "", fmt.Errorf("Please write whole room ID with team and server: %s@<team>@%s", id, mm.server)
|
|
}
|
|
if len(x) == 2 {
|
|
return x[0], nil
|
|
}
|
|
if len(x) != 3 || x[2] != mm.server {
|
|
return "", fmt.Errorf("Invalid room ID: %s", id)
|
|
}
|
|
|
|
team_id := mm.getTeamIdByName(x[1])
|
|
if team_id == "" {
|
|
return "", fmt.Errorf("Team not found: %s", id)
|
|
}
|
|
|
|
ch_id := mm.conn.GetChannelId(x[0], team_id)
|
|
if ch_id == "" {
|
|
return "", fmt.Errorf("Channel not found: %s", id)
|
|
}
|
|
return ch_id, nil
|
|
}
|
|
|
|
func (mm *Mattermost) reverseRoomId(id string) (bool, RoomID) {
|
|
team := mm.conn.GetChannelTeamId(id)
|
|
if team == "" {
|
|
return true, RoomID(fmt.Sprintf("%s@%s", id, mm.server))
|
|
} else {
|
|
teamName := mm.conn.GetTeamName(team)
|
|
if u, ok := mm.teams[teamName]; ok && u {
|
|
name := mm.conn.GetChannelName(id)
|
|
return true, RoomID(fmt.Sprintf("%s@%s@%s", name, teamName, mm.server))
|
|
} else {
|
|
return false, ""
|
|
}
|
|
}
|
|
}
|
|
|
|
func (mm *Mattermost) checkUserId(id UserID) (string, error) {
|
|
x := strings.Split(string(id), "@")
|
|
if len(x) == 1 {
|
|
return "", fmt.Errorf("Please write whole user ID with server: %s@%s", id, mm.server)
|
|
}
|
|
if len(x) != 2 || x[1] != mm.server {
|
|
return "", fmt.Errorf("Invalid user ID: %s", id)
|
|
}
|
|
|
|
mm.caches.Lock()
|
|
defer mm.caches.Unlock()
|
|
|
|
if user_id, ok := mm.caches.mmusers[x[0]]; ok {
|
|
return user_id, nil
|
|
}
|
|
|
|
u, resp := mm.conn.Client.GetUserByUsername(x[0], "")
|
|
if u == nil || resp.Error != nil {
|
|
return "", fmt.Errorf("Not found: %s (%s)", x[0], resp.Error)
|
|
}
|
|
mm.caches.mmusers[x[0]] = u.Id
|
|
|
|
return u.Id, nil
|
|
}
|
|
|
|
func (mm *Mattermost) SetUserInfo(info *UserInfo) error {
|
|
return fmt.Errorf("Not implemented")
|
|
}
|
|
|
|
func (mm *Mattermost) SetRoomInfo(roomId RoomID, info *RoomInfo) error {
|
|
ch, err := mm.checkRoomId(roomId)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if info.Topic != "" {
|
|
mm.conn.UpdateChannelHeader(ch, info.Topic)
|
|
}
|
|
|
|
if info.Picture.MediaObject != nil {
|
|
err = fmt.Errorf("Not supported: channel picture on mattermost")
|
|
}
|
|
|
|
if info.Name != "" {
|
|
err = fmt.Errorf("Not supported: channel name on mattermost")
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (mm *Mattermost) Join(roomId RoomID) error {
|
|
ch, err := mm.checkRoomId(roomId)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return mm.conn.JoinChannel(ch)
|
|
}
|
|
|
|
func (mm *Mattermost) Invite(userId UserID, roomId RoomID) error {
|
|
if roomId == "" {
|
|
_, err := mm.checkUserId(userId)
|
|
return err
|
|
}
|
|
|
|
return fmt.Errorf("Not supported: invite on mattermost")
|
|
}
|
|
|
|
func (mm *Mattermost) Leave(roomId RoomID) {
|
|
// Not supported? TODO
|
|
}
|
|
|
|
func (mm *Mattermost) SearchForUsers(query string) ([]UserSearchResult, error) {
|
|
// TODO
|
|
return nil, fmt.Errorf("Not implemented")
|
|
}
|
|
|
|
func (mm *Mattermost) Send(event *Event) (string, error) {
|
|
post := &model.Post{
|
|
Message: event.Text,
|
|
}
|
|
if event.Type == EVENT_ACTION {
|
|
post.Type = "me"
|
|
}
|
|
|
|
if event.Room != "" {
|
|
ch, err := mm.checkRoomId(event.Room)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
post.ChannelId = ch
|
|
} else if event.Recipient != "" {
|
|
ui, err := mm.checkUserId(event.Recipient)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
_, resp := mm.conn.Client.CreateDirectChannel(mm.conn.User.Id, ui)
|
|
if resp.Error != nil {
|
|
return "", resp.Error
|
|
}
|
|
channelName := model.GetDMNameFromIds(ui, mm.conn.User.Id)
|
|
|
|
err = mm.conn.UpdateChannels()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
post.ChannelId = mm.conn.GetChannelId(channelName, "")
|
|
} else {
|
|
return "", fmt.Errorf("Invalid target")
|
|
}
|
|
|
|
if event.Attachments != nil {
|
|
post.FileIds = []string{}
|
|
for _, file := range event.Attachments {
|
|
rdr, err := file.Read()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
defer rdr.Close()
|
|
data, err := ioutil.ReadAll(rdr)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
up_file, err := mm.conn.UploadFile(data, post.ChannelId, file.Filename())
|
|
if err != nil {
|
|
log.Warnf("UploadFile error: %s", err)
|
|
return "", err
|
|
}
|
|
post.FileIds = append(post.FileIds, up_file)
|
|
}
|
|
}
|
|
|
|
created_post, resp := mm.conn.Client.CreatePost(post)
|
|
if resp.Error != nil {
|
|
log.Warnf("CreatePost error: %s", resp.Error)
|
|
return "", resp.Error
|
|
}
|
|
return created_post.Id, nil
|
|
}
|
|
|
|
func (mm *Mattermost) Close() {
|
|
if mm.conn != nil {
|
|
mm.conn.WsQuit = true
|
|
}
|
|
if mm.handlerStopChan != nil {
|
|
close(mm.handlerStopChan)
|
|
mm.handlerStopChan = nil
|
|
}
|
|
}
|
|
|
|
func (mm *Mattermost) handleConnected() {
|
|
log.Debugf("(Re-)connected to mattermost: %s@%s ; doing channel sync\n", mm.username, mm.server)
|
|
|
|
// Initial channel sync
|
|
chans := mm.conn.GetChannels()
|
|
doneCh := make(map[string]bool)
|
|
for _, ch := range chans {
|
|
if _, ok := doneCh[ch.Id]; !ok {
|
|
doneCh[ch.Id] = true
|
|
go mm.syncChannel(ch)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (mm *Mattermost) syncChannel(ch *model.Channel) {
|
|
// The first time we see a chan, sync everything (member list, names, profile pictures)
|
|
must_initsync := func() bool {
|
|
mm.caches.Lock()
|
|
defer mm.caches.Unlock()
|
|
|
|
if x, ok := mm.caches.initsynced[ch.Id]; ok && x {
|
|
return false
|
|
}
|
|
mm.caches.initsynced[ch.Id] = true
|
|
return true
|
|
}()
|
|
|
|
if must_initsync {
|
|
mm.initSyncChannel(ch)
|
|
}
|
|
|
|
// The following times, only sync missing messages
|
|
mm.backlogChannel(ch)
|
|
}
|
|
|
|
func (mm *Mattermost) initSyncChannel(ch *model.Channel) {
|
|
if len(strings.Split(ch.Name, "__")) == 2 {
|
|
// DM channel
|
|
// Update remote user info
|
|
users := strings.Split(ch.Name, "__")
|
|
for _, uid := range users {
|
|
if uid != mm.conn.User.Id {
|
|
user := mm.conn.GetUser(uid)
|
|
if user != nil {
|
|
mm.updateUserInfo(user)
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
interested, id := mm.reverseRoomId(ch.Id)
|
|
if !interested {
|
|
// Skip channels that are not in teams we want to bridge
|
|
return
|
|
}
|
|
mm.handler.Joined(id)
|
|
|
|
// Update room info
|
|
room_info := &RoomInfo{
|
|
Name: ch.DisplayName,
|
|
Topic: ch.Header,
|
|
}
|
|
for _, t := range mm.conn.OtherTeams {
|
|
if t.Id == ch.TeamId {
|
|
if t.Team.DisplayName != "" {
|
|
room_info.Name = t.Team.DisplayName + " / " + room_info.Name
|
|
} else {
|
|
room_info.Name = t.Team.Name + " / " + room_info.Name
|
|
}
|
|
if t.Team.LastTeamIconUpdate > 0 {
|
|
room_info.Picture = SMediaObject{&LazyBlobMediaObject{
|
|
ObjectFilename: fmt.Sprintf("%s-%d",
|
|
t.Team.Name,
|
|
t.Team.LastTeamIconUpdate),
|
|
GetFn: func(o *LazyBlobMediaObject) error {
|
|
team_img, resp := mm.conn.Client.GetTeamIcon(t.Id, "")
|
|
if resp.Error != nil {
|
|
log.Warnf("Could not get team image: %s", resp.Error.Error())
|
|
return resp.Error
|
|
}
|
|
o.ObjectData = team_img
|
|
o.ObjectMimetype = http.DetectContentType(team_img)
|
|
return nil
|
|
},
|
|
}}
|
|
}
|
|
break
|
|
}
|
|
}
|
|
mm.handler.RoomInfoUpdated(id, UserID(""), room_info)
|
|
|
|
// Update member list
|
|
// TODO (when this will be slow, i.e. hundreds of members): do only a diff
|
|
members, resp := mm.conn.Client.GetChannelMembers(ch.Id, 0, mm.initial_members, "")
|
|
if resp.Error == nil {
|
|
for _, mem := range *members {
|
|
if mem.UserId == mm.conn.User.Id {
|
|
continue
|
|
}
|
|
user := mm.conn.GetUser(mem.UserId)
|
|
if user != nil {
|
|
mm.ensureJoined(user, id)
|
|
mm.updateUserInfo(user)
|
|
} else {
|
|
log.Warnf("Could not find joined user: %s", mem.UserId)
|
|
}
|
|
}
|
|
} else {
|
|
log.Warnf("Could not get channel members: %s", resp.Error.Error())
|
|
}
|
|
}
|
|
}
|
|
|
|
func (mm *Mattermost) backlogChannel(ch *model.Channel) {
|
|
// Read backlog
|
|
last_seen_post := mm.handler.CacheGet(fmt.Sprintf("last_seen_%s", ch.Id))
|
|
if last_seen_post != "" {
|
|
const NUM_PER_PAGE = 100
|
|
page := 0
|
|
backlogs := []*model.PostList{}
|
|
for {
|
|
backlog, resp := mm.conn.Client.GetPostsAfter(ch.Id, last_seen_post, page, NUM_PER_PAGE, "")
|
|
if resp.Error == nil {
|
|
backlogs = append(backlogs, backlog)
|
|
if len(backlog.Order) == NUM_PER_PAGE {
|
|
page += 1
|
|
} else {
|
|
break
|
|
}
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
for i := 0; i < len(backlogs); i++ {
|
|
mm.processBacklog(ch, backlogs[i])
|
|
}
|
|
} else {
|
|
backlog, resp := mm.conn.Client.GetPostsForChannel(ch.Id, 0, mm.initial_backlog, "")
|
|
if resp.Error == nil {
|
|
mm.processBacklog(ch, backlog)
|
|
} else {
|
|
log.Warnf("Could not get channel backlog: %s", resp.Error)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (mm *Mattermost) processBacklog(ch *model.Channel, backlog *model.PostList) {
|
|
for i := 0; i < len(backlog.Order); i++ {
|
|
post_id := backlog.Order[len(backlog.Order)-i-1]
|
|
post := backlog.Posts[post_id]
|
|
post_time := time.Unix(post.CreateAt/1000, 0)
|
|
post.Message = fmt.Sprintf("[%s] %s",
|
|
post_time.Format("2006-01-02 15:04 MST"), post.Message)
|
|
mm.handlePost(ch.Name, post, true)
|
|
}
|
|
}
|
|
|
|
func (mm *Mattermost) handleLoop(msgCh chan *matterclient.Message, quitCh chan struct{}) {
|
|
for {
|
|
select {
|
|
case <-quitCh:
|
|
break
|
|
case msg := <-msgCh:
|
|
log.Tracef("Mattermost: %#v\n", msg)
|
|
log.Tracef("Mattermost raw: %#v\n", msg.Raw)
|
|
err := mm.handlePosted(msg.Raw)
|
|
if err != nil {
|
|
log.Warnf("Mattermost error: %s", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (mm *Mattermost) updateUserInfo(user *model.User) {
|
|
userId := UserID(fmt.Sprintf("%s@%s", user.Username, mm.server))
|
|
userDisp := user.GetDisplayName(model.SHOW_NICKNAME_FULLNAME)
|
|
|
|
mm.caches.Lock()
|
|
defer mm.caches.Unlock()
|
|
|
|
if lastdn, ok := mm.caches.displayname[userId]; !ok || lastdn != userDisp {
|
|
ui := &UserInfo{
|
|
DisplayName: userDisp,
|
|
}
|
|
if user.LastPictureUpdate > 0 {
|
|
ui.Avatar = SMediaObject{&LazyBlobMediaObject{
|
|
ObjectFilename: fmt.Sprintf("%s-%d",
|
|
user.Username,
|
|
user.LastPictureUpdate),
|
|
GetFn: func(o *LazyBlobMediaObject) error {
|
|
img, resp := mm.conn.Client.GetProfileImage(user.Id, "")
|
|
if resp.Error != nil {
|
|
log.Warnf("Could not get profile picture: %s", resp.Error.Error())
|
|
return resp.Error
|
|
}
|
|
o.ObjectData = img
|
|
o.ObjectMimetype = http.DetectContentType(img)
|
|
return nil
|
|
},
|
|
}}
|
|
}
|
|
mm.handler.UserInfoUpdated(userId, ui)
|
|
mm.caches.displayname[userId] = userDisp
|
|
}
|
|
}
|
|
|
|
func (mm *Mattermost) ensureJoined(user *model.User, roomId RoomID) {
|
|
userId := UserID(fmt.Sprintf("%s@%s", user.Username, mm.server))
|
|
cache_key := fmt.Sprintf("%s / %s", userId, roomId)
|
|
|
|
mm.caches.Lock()
|
|
defer mm.caches.Unlock()
|
|
|
|
if _, ok := mm.caches.sentjoined[cache_key]; !ok {
|
|
mm.handler.Event(&Event{
|
|
Author: userId,
|
|
Room: roomId,
|
|
Type: EVENT_JOIN,
|
|
})
|
|
mm.caches.sentjoined[cache_key] = true
|
|
}
|
|
}
|
|
|
|
func (mm *Mattermost) handlePosted(msg *model.WebSocketEvent) error {
|
|
channel_name, ok := msg.Data["channel_name"].(string)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
post_str := msg.Data["post"].(string)
|
|
var post model.Post
|
|
err := json.Unmarshal([]byte(post_str), &post)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return mm.handlePost(channel_name, &post, false)
|
|
}
|
|
|
|
func (mm *Mattermost) handlePost(channel_name string, post *model.Post, only_messages bool) error {
|
|
// Skip self messages
|
|
if post.UserId == mm.conn.User.Id {
|
|
return nil
|
|
}
|
|
|
|
// Find sending user
|
|
user := mm.conn.GetUser(post.UserId)
|
|
if user == nil {
|
|
return fmt.Errorf("Invalid user")
|
|
}
|
|
userId := UserID(fmt.Sprintf("%s@%s", user.Username, mm.server))
|
|
mm.updateUserInfo(user)
|
|
|
|
// Build message event
|
|
msg_ev := &Event{
|
|
Id: post.Id,
|
|
Author: userId,
|
|
Text: post.Message,
|
|
Type: EVENT_MESSAGE,
|
|
}
|
|
if post.Type == "me" {
|
|
msg_ev.Type = EVENT_ACTION
|
|
}
|
|
|
|
// Handle files
|
|
if post.FileIds != nil && len(post.FileIds) > 0 {
|
|
msg_ev.Attachments = []SMediaObject{}
|
|
for _, file := range post.Metadata.Files {
|
|
media_object := &LazyBlobMediaObject{
|
|
ObjectFilename: file.Name,
|
|
ObjectMimetype: file.MimeType,
|
|
GetFn: func(o *LazyBlobMediaObject) error {
|
|
blob, resp := mm.conn.Client.GetFile(file.Id)
|
|
if resp.Error != nil {
|
|
return resp.Error
|
|
}
|
|
o.ObjectData = blob
|
|
return nil
|
|
},
|
|
}
|
|
if file.Width > 0 {
|
|
media_object.ObjectImageSize = &ImageSize{
|
|
Width: file.Width,
|
|
Height: file.Height,
|
|
}
|
|
}
|
|
msg_ev.Attachments = append(msg_ev.Attachments, SMediaObject{media_object})
|
|
}
|
|
}
|
|
|
|
// Dispatch as PM or as room message
|
|
if len(strings.Split(channel_name, "__")) == 2 {
|
|
// Private message, no need to find room id
|
|
if user.Id == mm.conn.User.Id {
|
|
// Skip self sent messages
|
|
return nil
|
|
}
|
|
|
|
mm.handler.Event(msg_ev)
|
|
mm.handler.CachePut(fmt.Sprintf("last_seen_%s", post.ChannelId), post.Id)
|
|
} else {
|
|
interested, roomId := mm.reverseRoomId(post.ChannelId)
|
|
if !interested {
|
|
return nil
|
|
}
|
|
if roomId == "" {
|
|
return fmt.Errorf("Invalid channel id")
|
|
}
|
|
|
|
mm.ensureJoined(user, roomId)
|
|
|
|
if post.Type == "system_header_change" {
|
|
if !only_messages {
|
|
new_header := post.Props["new_header"].(string)
|
|
mm.handler.RoomInfoUpdated(roomId, userId, &RoomInfo{
|
|
Topic: new_header,
|
|
})
|
|
}
|
|
} else if post.Type == "" || post.Type == "me" {
|
|
msg_ev.Room = roomId
|
|
mm.handler.Event(msg_ev)
|
|
mm.handler.CachePut(fmt.Sprintf("last_seen_%s", post.ChannelId), post.Id)
|
|
} else {
|
|
return fmt.Errorf("Unhandled post type: %s", post.Type)
|
|
}
|
|
}
|
|
return nil
|
|
}
|