Skip to content

Commit 884bd3b

Browse files
committed
Merge remote-tracking branch 'i1i1/add-geoip-db' into subspace-customizations
2 parents 17906aa + 55c3b6e commit 884bd3b

File tree

6 files changed

+72
-151
lines changed

6 files changed

+72
-151
lines changed

backend/Cargo.lock

Lines changed: 22 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/telemetry_core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ hex = "0.4.3"
1717
http = "0.2.4"
1818
hyper = "0.14.11"
1919
log = "0.4.14"
20+
maxminddb = "0.23.0"
2021
num_cpus = "1.13.0"
2122
once_cell = "1.8.0"
2223
parking_lot = "0.11.1"
65.7 MB
Binary file not shown.

backend/telemetry_core/src/aggregator/aggregator.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::find_location::find_location;
1919
use crate::state::NodeId;
2020
use common::id_type;
2121
use futures::{future, Sink, SinkExt};
22-
use std::net::Ipv4Addr;
22+
use std::net::IpAddr;
2323
use std::sync::atomic::AtomicU64;
2424
use std::sync::Arc;
2525

@@ -94,7 +94,7 @@ impl Aggregator {
9494
/// any more, this task will gracefully end.
9595
async fn handle_messages(
9696
rx_from_external: flume::Receiver<inner_loop::ToAggregator>,
97-
tx_to_aggregator: flume::Sender<(NodeId, Ipv4Addr)>,
97+
tx_to_aggregator: flume::Sender<(NodeId, IpAddr)>,
9898
max_queue_len: usize,
9999
denylist: Vec<String>,
100100
max_third_party_nodes: usize,

backend/telemetry_core/src/aggregator/inner_loop.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,7 @@ use std::sync::{
3030
atomic::{AtomicU64, Ordering},
3131
Arc,
3232
};
33-
use std::{
34-
net::{IpAddr, Ipv4Addr},
35-
str::FromStr,
36-
};
33+
use std::{net::IpAddr, str::FromStr};
3734

3835
/// Incoming messages come via subscriptions, and end up looking like this.
3936
#[derive(Clone, Debug)]
@@ -171,7 +168,7 @@ pub struct InnerLoop {
171168
chain_to_feed_conn_ids: MultiMapUnique<BlockHash, ConnId>,
172169

173170
/// Send messages here to make geographical location requests.
174-
tx_to_locator: flume::Sender<(NodeId, Ipv4Addr)>,
171+
tx_to_locator: flume::Sender<(NodeId, IpAddr)>,
175172

176173
/// How big can the queue of messages coming in to the aggregator get before messages
177174
/// are prioritised and dropped to try and get back on track.
@@ -181,7 +178,7 @@ pub struct InnerLoop {
181178
impl InnerLoop {
182179
/// Create a new inner loop handler with the various state it needs.
183180
pub fn new(
184-
tx_to_locator: flume::Sender<(NodeId, Ipv4Addr)>,
181+
tx_to_locator: flume::Sender<(NodeId, IpAddr)>,
185182
denylist: Vec<String>,
186183
max_queue_len: usize,
187184
max_third_party_nodes: usize,
@@ -380,10 +377,7 @@ impl InnerLoop {
380377
self.finalize_and_broadcast_to_all_feeds(feed_messages_for_all);
381378

382379
// Ask for the grographical location of the node.
383-
// Currently we only geographically locate IPV4 addresses so ignore IPV6.
384-
if let IpAddr::V4(ip_v4) = ip {
385-
let _ = self.tx_to_locator.send((node_id, ip_v4));
386-
}
380+
let _ = self.tx_to_locator.send((node_id, ip));
387381
}
388382
}
389383
}

backend/telemetry_core/src/find_location.rs

Lines changed: 43 additions & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -14,36 +14,34 @@
1414
// You should have received a copy of the GNU General Public License
1515
// along with this program. If not, see <https://www.gnu.org/licenses/>.
1616

17-
use std::net::Ipv4Addr;
17+
use std::net::{IpAddr, Ipv4Addr};
1818
use std::sync::Arc;
1919

2020
use futures::{Sink, SinkExt};
21+
use maxminddb::{geoip2::City, Reader as GeoIpReader};
2122
use parking_lot::RwLock;
2223
use rustc_hash::FxHashMap;
23-
use serde::Deserialize;
2424

25-
use anyhow::Context;
2625
use common::node_types::NodeLocation;
27-
use tokio::sync::Semaphore;
2826

2927
/// The returned location is optional; it may be None if not found.
3028
pub type Location = Option<Arc<NodeLocation>>;
3129

3230
/// This is responsible for taking an IP address and attempting
3331
/// to find a geographical location from this
34-
pub fn find_location<Id, R>(response_chan: R) -> flume::Sender<(Id, Ipv4Addr)>
32+
pub fn find_location<Id, R>(response_chan: R) -> flume::Sender<(Id, IpAddr)>
3533
where
3634
R: Sink<(Id, Option<Arc<NodeLocation>>)> + Unpin + Send + Clone + 'static,
3735
Id: Clone + Send + 'static,
3836
{
3937
let (tx, rx) = flume::unbounded();
4038

4139
// cache entries
42-
let mut cache: FxHashMap<Ipv4Addr, Arc<NodeLocation>> = FxHashMap::default();
40+
let mut cache: FxHashMap<IpAddr, Arc<NodeLocation>> = FxHashMap::default();
4341

4442
// Default entry for localhost
4543
cache.insert(
46-
Ipv4Addr::new(127, 0, 0, 1),
44+
Ipv4Addr::new(127, 0, 0, 1).into(),
4745
Arc::new(NodeLocation {
4846
latitude: 52.516_6667,
4947
longitude: 13.4,
@@ -52,28 +50,18 @@ where
5250
);
5351

5452
// Create a locator with our cache. This is used to obtain locations.
55-
let locator = Locator::new(cache);
53+
let locator = Arc::new(Locator::new(cache));
5654

5755
// Spawn a loop to handle location requests
5856
tokio::spawn(async move {
59-
// Allow 4 requests at a time. acquiring a token will block while the
60-
// number of concurrent location requests is more than this.
61-
let semaphore = Arc::new(Semaphore::new(4));
62-
6357
loop {
6458
while let Ok((id, ip_address)) = rx.recv_async().await {
65-
let permit = semaphore.clone().acquire_owned().await.unwrap();
6659
let mut response_chan = response_chan.clone();
67-
let locator = locator.clone();
60+
let locator = Arc::clone(&locator);
6861

69-
// Once we have acquired our permit, spawn a task to avoid
70-
// blocking this loop so that we can handle concurrent requests.
7162
tokio::spawn(async move {
72-
let location = locator.locate(ip_address).await;
63+
let location = locator.locate(ip_address);
7364
let _ = response_chan.send((id, location)).await;
74-
75-
// ensure permit is moved into task by dropping it explicitly:
76-
drop(permit);
7765
});
7866
}
7967
}
@@ -84,23 +72,26 @@ where
8472

8573
/// This struct can be used to make location requests, given
8674
/// an IPV4 address.
87-
#[derive(Clone)]
75+
#[derive(Debug, Clone)]
8876
struct Locator {
89-
client: reqwest::Client,
90-
cache: Arc<RwLock<FxHashMap<Ipv4Addr, Arc<NodeLocation>>>>,
77+
city: Arc<maxminddb::Reader<&'static [u8]>>,
78+
cache: Arc<RwLock<FxHashMap<IpAddr, Arc<NodeLocation>>>>,
9179
}
9280

9381
impl Locator {
94-
pub fn new(cache: FxHashMap<Ipv4Addr, Arc<NodeLocation>>) -> Self {
95-
let client = reqwest::Client::new();
96-
97-
Locator {
98-
client,
82+
/// taken from here: https://github.com/P3TERX/GeoLite.mmdb/releases/tag/2022.06.07
83+
const CITY_DATA: &'static [u8] = include_bytes!("GeoLite2-City.mmdb");
84+
85+
pub fn new(cache: FxHashMap<IpAddr, Arc<NodeLocation>>) -> Self {
86+
Self {
87+
city: GeoIpReader::from_source(Self::CITY_DATA)
88+
.map(Arc::new)
89+
.expect("City data is always valid"),
9990
cache: Arc::new(RwLock::new(cache)),
10091
}
10192
}
10293

103-
pub async fn locate(&self, ip: Ipv4Addr) -> Option<Arc<NodeLocation>> {
94+
pub fn locate(&self, ip: IpAddr) -> Option<Arc<NodeLocation>> {
10495
// Return location quickly if it's cached:
10596
let cached_loc = {
10697
let cache_reader = self.cache.read();
@@ -110,98 +101,25 @@ impl Locator {
110101
return cached_loc;
111102
}
112103

113-
// Look it up via ipapi.co:
114-
let mut location = self.iplocate_ipapi_co(ip).await;
115-
116-
// If that fails, try looking it up via ipinfo.co instead:
117-
if let Err(e) = &location {
118-
log::warn!(
119-
"Couldn't obtain location information for {} from ipapi.co: {}",
120-
ip,
121-
e
122-
);
123-
location = self.iplocate_ipinfo_io(ip).await
124-
}
125-
126-
// If both fail, we've logged the errors and we'll return None.
127-
if let Err(e) = &location {
128-
log::warn!(
129-
"Couldn't obtain location information for {} from ipinfo.co: {}",
130-
ip,
131-
e
132-
);
133-
}
134-
135-
// If we successfully obtained a location, cache it
136-
if let Ok(location) = &location {
137-
self.cache.write().insert(ip, location.clone());
138-
}
139-
140-
// Discard the error; we've logged information above.
141-
location.ok()
142-
}
143-
144-
async fn iplocate_ipapi_co(&self, ip: Ipv4Addr) -> Result<Arc<NodeLocation>, anyhow::Error> {
145-
let location = self.query(&format!("https://ipapi.co/{}/json", ip)).await?;
146-
147-
Ok(Arc::new(location))
148-
}
149-
150-
async fn iplocate_ipinfo_io(&self, ip: Ipv4Addr) -> Result<Arc<NodeLocation>, anyhow::Error> {
151-
let location = self
152-
.query::<IPApiLocate>(&format!("https://ipinfo.io/{}/json", ip))
153-
.await?
154-
.into_node_location()
155-
.with_context(|| "Could not convert response into node location")?;
156-
157-
Ok(Arc::new(location))
158-
}
159-
160-
async fn query<T>(&self, url: &str) -> Result<T, anyhow::Error>
161-
where
162-
for<'de> T: Deserialize<'de>,
163-
{
164-
let res = self
165-
.client
166-
.get(url)
167-
.send()
168-
.await?
169-
.bytes()
170-
.await
171-
.with_context(|| "Failed to obtain response body")?;
172-
173-
serde_json::from_slice(&res)
174-
.with_context(|| format!{"Failed to decode '{}'", std::str::from_utf8(&res).unwrap_or("INVALID_UTF8")})
175-
}
176-
}
177-
178-
/// This is the format returned from ipinfo.co, so we do
179-
/// a little conversion to get it into the shape we want.
180-
#[derive(Deserialize, Debug, Clone)]
181-
struct IPApiLocate {
182-
city: Box<str>,
183-
loc: Box<str>,
184-
}
185-
186-
impl IPApiLocate {
187-
fn into_node_location(self) -> Option<NodeLocation> {
188-
let IPApiLocate { city, loc } = self;
189-
190-
let mut loc = loc.split(',').map(|n| n.parse());
191-
192-
let latitude = loc.next()?.ok()?;
193-
let longitude = loc.next()?.ok()?;
194-
195-
// Guarantee that the iterator has been exhausted
196-
if loc.next().is_some() {
197-
return None;
198-
}
199-
200-
Some(NodeLocation {
104+
let City { city, location, .. } = self.city.lookup(ip.into()).ok()?;
105+
let city = city
106+
.as_ref()?
107+
.names
108+
.as_ref()?
109+
.get("en")?
110+
.to_string()
111+
.into_boxed_str();
112+
let latitude = location.as_ref()?.latitude? as f32;
113+
let longitude = location?.longitude? as f32;
114+
115+
let location = Arc::new(NodeLocation {
116+
city,
201117
latitude,
202118
longitude,
203-
city,
204-
})
119+
});
120+
self.cache.write().insert(ip, Arc::clone(&location));
121+
122+
Some(location)
205123
}
206124
}
207125

@@ -210,28 +128,14 @@ mod tests {
210128
use super::*;
211129

212130
#[test]
213-
fn ipapi_locate_to_node_location() {
214-
let ipapi = IPApiLocate {
215-
loc: "12.5,56.25".into(),
216-
city: "Foobar".into(),
217-
};
218-
219-
let location = ipapi.into_node_location().unwrap();
220-
221-
assert_eq!(location.latitude, 12.5);
222-
assert_eq!(location.longitude, 56.25);
223-
assert_eq!(&*location.city, "Foobar");
131+
fn locator_construction() {
132+
Locator::new(Default::default());
224133
}
225134

226135
#[test]
227-
fn ipapi_locate_to_node_location_too_many() {
228-
let ipapi = IPApiLocate {
229-
loc: "12.5,56.25,1.0".into(),
230-
city: "Foobar".into(),
231-
};
232-
233-
let location = ipapi.into_node_location();
234-
235-
assert!(location.is_none());
136+
fn locate_random_ip() {
137+
let ip = "12.5.56.25".parse().unwrap();
138+
let node_location = Locator::new(Default::default()).locate(ip).unwrap();
139+
assert_eq!(&*node_location.city, "El Paso");
236140
}
237141
}

0 commit comments

Comments
 (0)