tor_multipath_voip/bench/bench1/client.js

227 lines
6.9 KiB
JavaScript

'use strict'
const request = require('request')
const socks_client = require('socks').SocksClient
const fs = require('fs')
const torm = require('./tor.js')
const logm = require('./log.js')
const worker = require('./worker.js')
const measure_interval = 1000
const measure_count = 600
const parralelism = 100
const base_url = process.env.SERVER_URL || "http://127.0.0.1:3000"
const repeat_for = (times, delay, fn) => new Promise((resolve, reject) => {
const timer = setInterval(() => {
if (times-- <= 0) {
clearInterval(timer)
resolve()
return
}
try {
fn()
} catch(err) {
clearInterval(timer)
reject(err)
return
}
}, delay)
})
const doMeasure = socket => () => {
socket.write(`${Date.now().toString()}\n`)
}
const clean = (id) => new Promise((resolve, reject) => {
request({uri: `${base_url}/hidden_service`, method: "DELETE", json: {id: id}}, (err,res,body) => {
if (err) reject(err)
if (body.err) reject(body.err)
resolve(body)
})
})
const wait_for_proxy = (socks_opt, max) => new Promise((resolve, reject) => {
const loop = cur => {
socks_client.createConnection(socks_opt, (err, info) => {
if (err && max <= 0) {
console.error("No more retry")
reject("No more retry")
return
}
if (err) {
console.error(`Proxy failed to build with error "${err.message}", ${cur} retries left...`)
setTimeout(loop, 5000, cur - 1)
return
}
resolve(info)
})
}
loop(max)
})
const create_hidden_service = base_url => new Promise((resolve, reject) => {
request.post(`${base_url}/hidden_service`, {json: {}}, (err,res,body) => {
if (err) {
reject(err)
return
}
if (!body.hs || !body.port) reject("Missing value from API endpoint")
else resolve(body)
})
})
const wait_for_hs = socks_port =>
create_hidden_service(base_url)
.then(body => Promise.all([
body,
logm.get_write_fd(`logs/${Date.now()}-${body.hs}.onion-client.log`)
]))
.then(([body, log]) => {
fs.write(log, `#info ${Date.now().toString()} Got ${body.hs}:${body.port}\n`,() => {})
console.info(`We got this hidden service: ${body.hs} on port ${body.port}`)
const socks_opt = {
proxy: {ipaddress: '127.0.0.1', port: socks_port, type: 5},
command: 'connect',
destination: { host: `${body.hs}.onion`, port: body.port }
}
/*
// used to speedup tests
socks_opt.destination.host = "ysw5uau3pbjaviha.onion"
socks_opt.destination.port = 43035
*/
return Promise.all([
wait_for_proxy(socks_opt, 120),
{body: body, socks_opt: socks_opt, log: log}
])
})
.then(([info, data]) => new Promise((resolve, reject) => {
info.socket.on('close', () => resolve(data))
info.socket.destroy()
}))
const do_measurement = data =>
Promise.all([wait_for_proxy(data.socks_opt), data])
.then(([info, data]) => {
const socks_opt = data.socks_opt
fs.write(data.log, `#info ${Date.now().toString()} Socks has been created, HS is up\n`, () => {})
console.info(`SOCKS proxy to ${socks_opt.destination.host}:${socks_opt.destination.port} has been created`)
const recv_back = new Promise((resolve, reject) => {
setTimeout(reject, measure_count * measure_interval + 2 * 60 * 1000, "Receive answer timed out")
let missing = measure_count
info.socket.on('data', recv_data => {
const tss = recv_data.split('\n')
tss.pop() // Remove last empty line
tss.forEach((ts, idx) => {
fs.write(data.log, `delta_timestamp, ${missing}, ${socks_opt.destination.host}, ${socks_opt.destination.port}, ${Date.now()}, ${ts}\n`, () => {})
if (idx > 0) fs.write(data.log, `#info aggregation: ${missing} was aggregated with ${missing + idx} for ${socks_opt.destination.host}\n`, () => {})
if (--missing == 0) resolve()
})
})
})
info.socket.setNoDelay(true)
info.socket.setEncoding("utf-8")
data.info = info
return Promise.all([
recv_back,
repeat_for(measure_count, measure_interval, doMeasure(info.socket)),
data
])
})
.then(([recv_back, repeat, data]) => {
data.info.socket.destroy()
fs.write(data.log, `#info ${Date.now().toString()} Done\n`, () => fs.close(data.log, () => {}))
console.info(`Measurement terminated for ${data.socks_opt.destination.host}\n`)
return Promise.all([clean(data.body.id), data])
})
.then(([cl, data]) => {
console.info(`Done for ${data.socks_opt.destination.host}`)
})
.catch(err => {
data.info.socket.destroy()
fs.write(data.log, `#info ${Date.now().toString()} ERROR occured\n`, () => fs.close(data.log, () => {}))
console.error(err)
})
const wait_bootstrap = tor => new Promise((resolve, reject) => {
const loop = () => {
console.info("Waiting for bootstrap, next check in 5s...")
setTimeout(() => {
tor.getInfo("status/circuit-established", (err, res) => {
if (err) reject(err)
else if (res == '0\n') loop()
else resolve()
})
}, 5000)
}
loop()
})
const get_socks_port = tor => new Promise((resolve, reject) => {
tor.getInfo('net/listeners/socks', (err, result) => {
if (err) {
reject(err)
return
}
const port = parseInt(result.match(/:(\d+)/)[1])
console.info(`Tor daemon accepts SOCKS connections on port ${port}`)
resolve(port)
})
})
logm
.init
.then(([stream_fd, circ_fd]) => torm.init(stream_fd, circ_fd))
.then(tor => Promise.all([
get_socks_port(tor),
wait_bootstrap(tor),
tor
]))
//.then(([socks_port, boot, tor]) => one_req(socks_port))
.then(([socks_port, boot, tor]) => {
const hs_spawn_worker = new worker(200, 5000)
const measurement_worker = new worker(100, 1000)
for (let i = 0; i < 200; i++) {
hs_spawn_worker.add(() => wait_for_hs(socks_port))
}
hs_spawn_worker.on('success', v => {
console.debug("HS ready, forwarding it to measurement")
measurement_worker.add(() => do_measurement(v))
})
hs_spawn_worker.on('error', e => {
console.error("Failed to create HS", e)
hs_spawn_worker.add(() => wait_for_hs(socks_port))
})
measurement_worker.on('success', v => {
console.debug("Measurement done, readding token to HS")
hs_spawn_worker.add(() => wait_for_hs(socks_port))
})
measurement_worker.on('error', e => {
console.error("Failed to do a measurement, readding token to HS")
hs_spawn_worker.add(() => wait_for_hs(socks_port))
})
//let loop = () => one_req(socks_port).then(loop).catch(console.error)
//for (let i = 0; i < 100; i++) setTimeout(loop, i * 10 * 1000)
})
.catch(console.error)
// status/bootstrap-phase
// status/circuit-established
// status/enough-dir-info
// status/good-server-descriptor
// status/accepted-server-descriptor