Fix Mattermost event deduplication

Mattermost assigns its own IDs to messages, thus when sending a message
to Mattermost the event_seen key that has to be written must take into
account that ID and not the one that we put in the event (which was the
Matrix event ID)

Note that for XMPP anything can be used as an ID, so using the Matrix
event ID there worked, but it's actually not so good.
This commit is contained in:
Alex 2020-02-29 10:01:42 +01:00
parent 1c038995be
commit 38a3f1bdb1
7 changed files with 70 additions and 40 deletions

View file

@ -418,10 +418,17 @@ func (a *Account) eventInternal(event *Event) error {
}
}
var cache_key string
if event.Id != "" {
cache_key := fmt.Sprintf("%s/event_seen/%s/%s",
// If the event has an ID, make sure it is processed only once
cache_key = fmt.Sprintf("%s/event_seen/%s/%s",
a.Protocol, mx_room_id, event.Id)
if !dbKvTestAndSet(cache_key, "yes") {
slot_key := dbKvSlotKey(cache_key)
dbLockSlot(slot_key)
defer dbUnlockSlot(slot_key)
if dbKvGet(cache_key) == "yes" {
// false: cache key was not modified, meaning we
// already saw the event
return nil
@ -470,6 +477,12 @@ func (a *Account) eventInternal(event *Event) error {
}
}
}
// Mark message as received in db
if cache_key != "" {
dbKvPutLocked(cache_key, "yes")
}
return nil
}
}

View file

@ -59,8 +59,14 @@ type Connector interface {
// Leave a channel
Leave(roomId RoomID)
// Send an event
Send(event *Event) error
// Send an event. Returns the ID of the created remote message.
// This ID is used to deduplicate messages: if it comes back, it should have the same Id
// than the one returned here.
// For backends that do not implement IDs (e.g. IRC), an empty string is returned.
// (FIXME how to deduplicate IRC messages?)
// The event that is fed in this function may have its ID already set,
// in which case the backend is free to re-use the ID or select a new one.
Send(event *Event) (string, error)
// Close the connection
Close()

View file

@ -198,9 +198,9 @@ func (irc *IRC) Leave(roomId RoomID) {
irc.conn.Cmd.Part(ch)
}
func (irc *IRC) Send(event *Event) error {
func (irc *IRC) Send(event *Event) (string, error) {
if irc.conn == nil {
return fmt.Errorf("Not connected")
return "", fmt.Errorf("Not connected")
}
// Workaround girc bug
@ -212,17 +212,17 @@ func (irc *IRC) Send(event *Event) error {
if event.Room != "" {
ch, err := irc.checkRoomId(event.Room)
if err != nil {
return err
return "", err
}
dest = ch
} else if event.Recipient != "" {
ui, err := irc.checkUserId(event.Recipient)
if err != nil {
return err
return "", err
}
dest = ui
} else {
return fmt.Errorf("Invalid target")
return "", fmt.Errorf("Invalid target")
}
if event.Attachments != nil && len(event.Attachments) > 0 {
@ -230,7 +230,7 @@ func (irc *IRC) Send(event *Event) error {
url := at.URL()
if url == "" {
// TODO find a way to send them using some hosing of some kind
return fmt.Errorf("Attachment without URL sent to IRC")
return "", fmt.Errorf("Attachment without URL sent to IRC")
} else {
irc.conn.Cmd.Message(dest, fmt.Sprintf("%s (%s, %dkb)",
url, at.Mimetype(), at.Size()/1024))
@ -243,9 +243,9 @@ func (irc *IRC) Send(event *Event) error {
} else if event.Type == EVENT_ACTION {
irc.conn.Cmd.Action(dest, event.Text)
} else {
return fmt.Errorf("Invalid event type")
return "", fmt.Errorf("Invalid event type")
}
return nil
return "", nil
}
func (irc *IRC) Close() {

View file

@ -243,7 +243,7 @@ func (mm *Mattermost) Leave(roomId RoomID) {
// Not supported? TODO
}
func (mm *Mattermost) Send(event *Event) error {
func (mm *Mattermost) Send(event *Event) (string, error) {
post := &model.Post{
Message: event.Text,
}
@ -254,29 +254,29 @@ func (mm *Mattermost) Send(event *Event) error {
if event.Room != "" {
ch, err := mm.checkRoomId(event.Room)
if err != nil {
return err
return "", err
}
post.ChannelId = ch
} else if event.Recipient != "" {
ui, err := mm.checkUserId(event.Recipient)
if err != nil {
return err
return "", err
}
_, resp := mm.conn.Client.CreateDirectChannel(mm.conn.User.Id, ui)
if resp.Error != nil {
return resp.Error
return "", resp.Error
}
channelName := model.GetDMNameFromIds(ui, mm.conn.User.Id)
err = mm.conn.UpdateChannels()
if err != nil {
return err
return "", err
}
post.ChannelId = mm.conn.GetChannelId(channelName, "")
} else {
return fmt.Errorf("Invalid target")
return "", fmt.Errorf("Invalid target")
}
if event.Attachments != nil {
@ -284,28 +284,28 @@ func (mm *Mattermost) Send(event *Event) error {
for _, file := range event.Attachments {
rdr, err := file.Read()
if err != nil {
return err
return "", err
}
defer rdr.Close()
data, err := ioutil.ReadAll(rdr)
if err != nil {
return err
return "", err
}
up_file, err := mm.conn.UploadFile(data, post.ChannelId, file.Filename())
if err != nil {
log.Warnf("UploadFile error: %s", err)
return err
return "", err
}
post.FileIds = append(post.FileIds, up_file)
}
}
_, resp := mm.conn.Client.CreatePost(post)
created_post, resp := mm.conn.Client.CreatePost(post)
if resp.Error != nil {
log.Warnf("CreatePost error: %s", resp.Error)
return resp.Error
return "", resp.Error
}
return nil
return created_post.Id, nil
}
func (mm *Mattermost) Close() {

View file

@ -306,13 +306,13 @@ func (xm *XMPP) Leave(roomId RoomID) {
xm.conn.LeaveMUC(string(roomId))
}
func (xm *XMPP) Send(event *Event) error {
func (xm *XMPP) Send(event *Event) (string, error) {
if event.Attachments != nil && len(event.Attachments) > 0 {
for _, at := range event.Attachments {
url := at.URL()
if url == "" {
// TODO find a way to send them using some hosing of some kind
return fmt.Errorf("Attachment without URL sent to XMPP")
return "", fmt.Errorf("Attachment without URL sent to XMPP")
} else {
event.Text += fmt.Sprintf("\n%s (%s, %dkb)",
url, at.Mimetype(), at.Size()/1024)
@ -320,6 +320,10 @@ func (xm *XMPP) Send(event *Event) error {
}
}
if event.Id == "" {
event.Id = xid.New().String()
}
log.Tracef("xm *XMPP Send %#v\n", event)
if len(event.Recipient) > 0 {
_, err := xm.conn.Send(gxmpp.Chat{
@ -327,20 +331,17 @@ func (xm *XMPP) Send(event *Event) error {
Remote: string(event.Recipient),
Text: event.Text,
})
return err
return event.Id, err
} else if len(event.Room) > 0 {
if event.Id == "" {
event.Id = xid.New().String()
}
_, err := xm.conn.Send(gxmpp.Chat{
Type: "groupchat",
Remote: string(event.Room),
Text: event.Text,
ID: event.Id,
})
return err
return event.Id, err
} else {
return fmt.Errorf("Invalid event")
return "", fmt.Errorf("Invalid event")
}
}

12
db.go
View file

@ -163,6 +163,14 @@ func dbKvPut(key string, value string) {
dbLockSlot(slot_key)
defer dbUnlockSlot(slot_key)
dbKvPutLocked(key, value)
}
// Variant of dbKvPut that does not take a lock,
// use this if the slot is already locked
func dbKvPutLocked(key string, value string) {
slot_key := dbKvSlotKey(key)
var entry DbKv
db.Where(&DbKv{Key: key}).Assign(&DbKv{Value: value}).FirstOrCreate(&entry)
dbCache.Add(slot_key, value)
@ -179,9 +187,7 @@ func dbKvTestAndSet(key string, value string) bool {
return false
}
var entry DbKv
db.Where(&DbKv{Key: key}).Assign(&DbKv{Value: value}).FirstOrCreate(&entry)
dbCache.Add(slot_key, value)
dbKvPutLocked(key, value)
return true
}

View file

@ -145,18 +145,22 @@ func handleTxnEvent(e *mxlib.Event) error {
} else if e.Sender == pm_room.MxUserID {
ev.Author = acct.Conn.User()
ev.Recipient = pm_room.UserID
return acct.Conn.Send(ev)
_, err := acct.Conn.Send(ev)
return err
}
} else if room := dbIsPublicRoom(e.RoomId); room != nil {
cache_key := fmt.Sprintf("%s/event_seen/%s/%s",
room.Protocol, e.RoomId, ev.Id)
dbKvPut(cache_key, "yes")
// If this is a regular room
acct := FindJoinedAccount(e.Sender, room.Protocol, room.RoomID)
if acct != nil {
ev.Author = acct.Conn.User()
ev.Room = room.RoomID
return acct.Conn.Send(ev)
created_ev_id, err := acct.Conn.Send(ev)
if err == nil && created_ev_id != "" {
cache_key := fmt.Sprintf("%s/event_seen/%s/%s",
room.Protocol, e.RoomId, created_ev_id)
dbKvPut(cache_key, "yes")
}
return err
} else {
mx.RoomKick(e.RoomId, e.Sender, fmt.Sprintf("Not present in %s on %s, please talk with Easybridge to rejoin", room.RoomID, room.Protocol))
return fmt.Errorf("not joined %s on %s", room.RoomID, room.Protocol)