Abstract database behind generic interface and implement alternative drivers #322
3 changed files with 28 additions and 13 deletions
|
@ -28,7 +28,9 @@ impl BlockManagerMetrics {
|
||||||
Self {
|
Self {
|
||||||
_resync_queue_len: meter
|
_resync_queue_len: meter
|
||||||
.u64_value_observer("block.resync_queue_length", move |observer| {
|
.u64_value_observer("block.resync_queue_length", move |observer| {
|
||||||
observer.observe(resync_queue.len().unwrap() as u64, &[]) // TODO fix unwrap
|
if let Ok(v) = resync_queue.len() {
|
||||||
|
observer.observe(v as u64, &[]);
|
||||||
|
}
|
||||||
})
|
})
|
||||||
.with_description(
|
.with_description(
|
||||||
"Number of block hashes queued for local check and possible resync",
|
"Number of block hashes queued for local check and possible resync",
|
||||||
|
@ -36,7 +38,9 @@ impl BlockManagerMetrics {
|
||||||
.init(),
|
.init(),
|
||||||
_resync_errored_blocks: meter
|
_resync_errored_blocks: meter
|
||||||
.u64_value_observer("block.resync_errored_blocks", move |observer| {
|
.u64_value_observer("block.resync_errored_blocks", move |observer| {
|
||||||
observer.observe(resync_errors.len().unwrap() as u64, &[]) // TODO fix unwrap
|
if let Ok(v) = resync_errors.len() {
|
||||||
|
observer.observe(v as u64, &[]);
|
||||||
|
}
|
||||||
})
|
})
|
||||||
.with_description("Number of block hashes whose last resync resulted in an error")
|
.with_description("Number of block hashes whose last resync resulted in an error")
|
||||||
.init(),
|
.init(),
|
||||||
|
|
|
@ -26,10 +26,12 @@ impl TableMetrics {
|
||||||
.u64_value_observer(
|
.u64_value_observer(
|
||||||
"table.merkle_updater_todo_queue_length",
|
"table.merkle_updater_todo_queue_length",
|
||||||
move |observer| {
|
move |observer| {
|
||||||
observer.observe(
|
if let Ok(v) = merkle_todo.len() {
|
||||||
merkle_todo.len().unwrap() as u64, // TODO fix unwrap
|
observer.observe(
|
||||||
&[KeyValue::new("table_name", table_name)],
|
v as u64,
|
||||||
)
|
&[KeyValue::new("table_name", table_name)],
|
||||||
|
);
|
||||||
|
}
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
.with_description("Merkle tree updater TODO queue length")
|
.with_description("Merkle tree updater TODO queue length")
|
||||||
|
@ -38,10 +40,12 @@ impl TableMetrics {
|
||||||
.u64_value_observer(
|
.u64_value_observer(
|
||||||
"table.gc_todo_queue_length",
|
"table.gc_todo_queue_length",
|
||||||
move |observer| {
|
move |observer| {
|
||||||
observer.observe(
|
if let Ok(v) = gc_todo.len() {
|
||||||
gc_todo.len().unwrap() as u64, // TODO fix unwrap
|
observer.observe(
|
||||||
&[KeyValue::new("table_name", table_name)],
|
v as u64,
|
||||||
)
|
&[KeyValue::new("table_name", table_name)],
|
||||||
|
);
|
||||||
|
}
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
.with_description("Table garbage collector TODO queue length")
|
.with_description("Table garbage collector TODO queue length")
|
||||||
|
|
|
@ -603,9 +603,16 @@ impl SyncTodo {
|
||||||
let retain = nodes.contains(&my_id);
|
let retain = nodes.contains(&my_id);
|
||||||
if !retain {
|
if !retain {
|
||||||
// Check if we have some data to send, otherwise skip
|
// Check if we have some data to send, otherwise skip
|
||||||
if data.store.range(begin..end).unwrap().next().is_none() {
|
match data.store.range(begin..end) {
|
||||||
// TODO fix unwrap
|
Ok(mut iter) => {
|
||||||
continue;
|
if iter.next().is_none() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("DB error in add_full_sync: {}", e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue