release lock when no longer needed
This commit is contained in:
parent
e93732d9bb
commit
a8e33383f4
1 changed files with 18 additions and 2 deletions
|
@ -8,7 +8,7 @@ use futures::{future::BoxFuture, Future, FutureExt};
|
||||||
use k2v_client::{CausalValue, CausalityToken, K2vClient, K2vValue};
|
use k2v_client::{CausalValue, CausalityToken, K2vClient, K2vValue};
|
||||||
use rusoto_s3::{PutObjectRequest, S3Client, S3};
|
use rusoto_s3::{PutObjectRequest, S3Client, S3};
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
use tracing::{error, info};
|
use tracing::{error, info, warn};
|
||||||
|
|
||||||
use crate::cryptoblob;
|
use crate::cryptoblob;
|
||||||
use crate::login::{Credentials, PublicCredentials};
|
use crate::login::{Credentials, PublicCredentials};
|
||||||
|
@ -285,7 +285,23 @@ async fn k2v_lock_loop_internal(
|
||||||
.boxed();
|
.boxed();
|
||||||
|
|
||||||
let res = futures::try_join!(watch_lock_loop, lock_notify_loop, take_lock_loop);
|
let res = futures::try_join!(watch_lock_loop, lock_notify_loop, take_lock_loop);
|
||||||
info!("lock loop exited: {:?}", res);
|
|
||||||
|
info!("lock loop exited: {:?}, releasing", res);
|
||||||
|
|
||||||
|
if !held_tx.is_closed() {
|
||||||
|
warn!("wierd...");
|
||||||
|
let _ = held_tx.send(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
// If lock is ours, release it
|
||||||
|
let release = match &*state_rx.borrow() {
|
||||||
|
LockState::Held(pid, _, ct) if *pid == our_pid => Some(ct.clone()),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
if let Some(ct) = release {
|
||||||
|
let _ = k2v.delete_item(pk, sk, ct.clone()).await;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- UTIL: function to wait for a value to have changed in K2V ----
|
// ---- UTIL: function to wait for a value to have changed in K2V ----
|
||||||
|
|
Loading…
Reference in a new issue