Commit 7ad3a010 authored by Jonas Konrath's avatar Jonas Konrath
Browse files

Merge branch 'historic-data' into 'main'

Changed main branch to track historic data revision

See merge request !1
parents b3be917a 1a60d2e8
......@@ -12,3 +12,5 @@ dummy_*
artemis-web/node_modules/
todos/
db_cleaner/
bgpstream_stuff/
thesis_report
Subproject commit b665fc5f3727aff459ecc281da4ddff2b72f35f5
Subproject commit 70f457552e8d3683ac3f99e48d0b9ea4402cd4bd
......@@ -3,7 +3,10 @@ WORKDIR /app
RUN mkdir /data ripe_data_collection node_ip_collection util
RUN apt-get update
#for building modules
RUN apt-get install gcc make autoconf python-dev libbz2-dev zlib1g-dev libffi-dev -y
RUN apt-get install gcc make autoconf python-dev libbz2-dev zlib1g-dev libffi-dev curl apt-transport-https ca-certificates ssl-cert gnupg lsb-release sudo -y
#convenience script for libBGPStream
RUN curl -s https://pkg.caida.org/os/$(lsb_release -si|awk '{print tolower($0)}')/bootstrap.sh | bash
RUN apt-get install bgpstream -y
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt
COPY *.py ./
......@@ -11,4 +14,8 @@ COPY ripe_data_collection/*.py ripe_data_collection/
COPY util/*.py util/
COPY node_ip_collection/*.py node_ip_collection/
COPY wait-for wait-for
CMD ["python", "generate_cfg.py" ]
RUN pwd
RUN ls
CMD ["./wait-for","artemis_configuration_1:3000", "-t", "0", "--", "./wait-for", "distr_hijack_redis:6379", "-t", "0", "--", "python3","generate_cfg.py"]
#CMD ["python", "generate_cfg.py" ]
"""used to generate a config file for artemis that covers all
currently announced prefixes that contain the ip addresses of tracked
nodes and then push that config to an artemis instance"""
import csv
import datetime
import json
import logging
from ipaddress import ip_network
from ipaddress import ip_address, ip_network
from os import environ, path
from sys import stdout
from time import sleep
from typing import Callable
import pyasn
import pybgpstream
import requests
from requests.exceptions import ReadTimeout
from redis import Redis
import ujson
import yaml
from redis import Redis
from requests.exceptions import ReadTimeout
from node_ip_collection.bitcoin import BitcoinIpCollector
from ripe_data_collection.read_announcements import (
from ripe_data_collection.fetch_routing_table import (
AnnouncedPrefix,
RipeAnnouncementsParser,
)
from util.ip import IpVersion
from util.lightnode import LightNode
logging.basicConfig(stream=stdout, level=logging.INFO)
......@@ -35,15 +38,46 @@ if int(environ.get("TESTING", default=0)) > 0:
SEND_TO_ARTEMIS = False
ARTEMIS_CONFIG_PATH = path.join(path.dirname(__file__), "config.yaml")
LATEST_ROUTING_TABLE_PATH = "/data/latest-table.bz"
ITER_DELAY = int(environ.get("LOOP_DELAY", default=300))
STARTING_TIMESTAMP = int(environ.get("TIMESPAN_START_STAMP"))
ENDING_TIMESTAMP = int(
environ.get("TIMESPAN_END_STAMP", default=datetime.datetime.now().timestamp())
)
NOTIFY = True
DISCORD_WEBHOOK = environ.get("DISCORD_WEBHOOK_LINK", default="")
if DISCORD_WEBHOOK == "":
NOTIFY = False
IGNORE_6TO4 = True # default case
ignore_6to4_env = environ.get("IGNORE_6TO4")
if ignore_6to4_env and ignore_6to4_env.lower() == "false":
IGNORE_6TO4 = False
if ignore_6to4_env and ignore_6to4_env.lower() not in ("false", "true"):
logger.warning(
"%s is not a valid value for IGNORE_6TO4, must be either 'true' or 'false'",
ignore_6to4_env,
)
OVERLAY_NETWORKS: list[tuple[str, Callable[[None], set[LightNode]]]] = (
OVERLAY_NETWORKS: list[tuple[str, Callable[[int, int], set[LightNode]]]] = (
(
"bitcoin",
BitcoinIpCollector().setup_and_return_light_nodes,
),
("tor", set),
(
"youtube-test",
lambda from_timestamp, to_timestamp: [
LightNode(
ip_addr=ip_address("208.65.153.238"),
asn=36561,
ipversion=IpVersion.IPV4,
snapshot_timestamp=1203889980,
)
],
),
# ("tor", set),
# ("some_overlay", some_method),
)
......@@ -54,16 +88,18 @@ overlay_names = list(map(lambda x: x[0], OVERLAY_NETWORKS))
def generate_yaml_dict(prefixes_by_asn: dict[int, set[AnnouncedPrefix]]) -> dict:
"""given a mapping of asn->prefixes this will generate an artemis config in dict form"""
output_prefixes = {}
output_monitors = {"riperis": [""], "bgpstreamlive": ["routeviews", "ris"]}
output_monitors = {
"riperis": [""],
"bgpstreamlive": ["routeviews", "ris"],
"bgpstreamhist": "/csv_dir",
}
output_asns = {}
output_rules = []
for asn, asn_prefixes in prefixes_by_asn.items():
prefix_section_name = f"{asn}_prefixes"
output_prefixes[prefix_section_name] = []
for prefix_object in asn_prefixes:
output_prefixes[prefix_section_name].append(str(prefix_object.prefix))
asn_section_name = f"{asn}_asn"
output_asns[asn_section_name] = [asn]
output_rules_dict = {
......@@ -83,7 +119,74 @@ def generate_yaml_dict(prefixes_by_asn: dict[int, set[AnnouncedPrefix]]) -> dict
return output
def gen_config(ipasn_file: str) -> None:
def community_list(value):
"""Copied from artemis' conversion script"""
com_list = []
for i in value:
asn_val_pair = i.split(":")
asn_val_dict = {"asn": int(asn_val_pair[0]), "value": int(asn_val_pair[1])}
com_list.append(asn_val_dict)
json_dump = ujson.dumps(com_list)
return json_dump
def load_updates_bgpstream(prefixes_by_asn: dict[int, set[AnnouncedPrefix]]):
"""given the mapping from asn to announced
prefixes that contain active nodes, use bgpstream to
download all bgp updates affecting those prefixes during the
monitored timespan"""
filter_string = "prefix"
for _asn, asn_prefixes in prefixes_by_asn.items():
for prefix in asn_prefixes:
filter_string += f" any {prefix.prefix}"
stream = pybgpstream.BGPStream(
from_time=STARTING_TIMESTAMP,
until_time=ENDING_TIMESTAMP,
collectors=["rrc00"],
record_type="updates",
filter=filter_string,
)
out_file = f"/csv_dir/updates_{datetime.datetime.now().timestamp()//1}.csv"
with open(out_file, "w", encoding="utf-8") as out_file_object:
csv_writer = csv.writer(out_file_object, delimiter="|")
elemcount = 0
for elem in stream:
elemcount += 1
if elemcount % 10000 == 0:
logger.info("Update #%i", elemcount)
if (elem.status != "valid") or (elem.record.rec.type != "update"):
continue
if elem.type in ["A", "W"]:
elem_csv_list = []
if elem.type == "A":
elem_csv_list = [
str(elem.fields["prefix"]),
str(elem.fields["as-path"].split(" ")[-1]),
str(elem.peer_asn),
str(elem.fields["as-path"]),
str(elem.project),
str(elem.collector),
str(elem.type),
community_list(elem.fields["communities"]),
str(elem.time),
]
else:
elem_csv_list = [
str(elem.fields["prefix"]),
"",
str(elem.peer_asn),
"",
str(elem.project),
str(elem.collector),
str(elem.type),
ujson.dumps([]),
str(elem.time),
]
csv_writer.writerow(elem_csv_list)
def gen_config(ipasn_file: str, oldest_timestamp: int, newest_timestamp: int) -> None:
"""
takes ipasn.dat filename/path as input and (using the bitnode api)
generates a config for ARTEMIS BGP that covers all current nodes on the overlay networks
......@@ -94,9 +197,10 @@ def gen_config(ipasn_file: str) -> None:
logger.info("getting node ip addresses")
overlay_nodes: dict[str, set[LightNode]] = {}
for overlay_name, collector_method in OVERLAY_NETWORKS:
logger.debug("getting ips for %s", overlay_name)
overlay_nodes[overlay_name] = collector_method()
logger.debug("%s has %i nodes", overlay_name, len(overlay_nodes[overlay_name]))
logger.info("getting ips for %s", overlay_name)
overlay_nodes[overlay_name] = collector_method(
from_timestamp=oldest_timestamp, to_timestamp=newest_timestamp
)
logger.info("processing collected data")
# with transaction set to True the enqueued commands will be executed atomically
......@@ -106,10 +210,18 @@ def gen_config(ipasn_file: str) -> None:
prefixes_by_asn: dict[int, set[AnnouncedPrefix]] = {}
for overlay_name, nodes in overlay_nodes.items():
num_nodes = len(nodes)
found_ip_addresses = set()
numnodes_key = f"{overlay_name}_numnodes"
pipeline.set(numnodes_key, num_nodes)
for node in nodes:
if IGNORE_6TO4:
# per ip_address docs, if an ipv6 address is not a 6to4 address
# the "sixtofour" field will simply be None
# hence this checks for an address being ipv6 && being 6to4
if (
node.ipversion == IpVersion.IPV6
and node.ip_addr.sixtofour is not None
):
continue
asn, matching_prefix = asndb.lookup(format(node.ip_addr))
if matching_prefix is None:
logger.warning("%s no matching prefix found", node)
......@@ -117,27 +229,41 @@ def gen_config(ipasn_file: str) -> None:
prefix_object = AnnouncedPrefix(ip_network(matching_prefix), asn)
prefix_set_key = f"{overlay_name}_{str(prefix_object.prefix)}"
pipeline.sadd(prefix_set_key, int(node.ip_addr))
# mapping from ip addresses to when the nodes were active
ip_timestamps_key = f"{overlay_name}_{int(node.ip_addr)}"
pipeline.sadd(ip_timestamps_key, node.timestamp_seen)
found_ip_addresses.add(int(node.ip_addr))
if asn in prefixes_by_asn:
prefixes_by_asn[asn].add(prefix_object)
else:
prefixes_by_asn[asn] = {
prefix_object,
}
num_nodes = len(found_ip_addresses)
pipeline.set(numnodes_key, num_nodes)
pipeline.execute() # run the commands
logger.info("Getting bgpstream updates")
load_updates_bgpstream(prefixes_by_asn)
logger.info("generating config structure")
# PRINT COMMAND FOR GETTING ENTIRE HISTORY IN TIMEFRAME FOR ALL PREFIXES
final_config = generate_yaml_dict(prefixes_by_asn)
logger.info("exporting config")
with open(ARTEMIS_CONFIG_PATH, "w", encoding="utf_8") as file:
yaml.safe_dump(final_config, file)
def update_artemis(ipasn_file: str):
def update_artemis(ipasn_file: str, oldest_timestamp: int, newest_timestamp: int):
"""takes the path to a pyasn ipasn database file which will be
used in combination with for example bitnode API to generate an
artemis config that covers all currently online ipv4/ipv6 bitcoin
nodes. This config will then be pushed to an artemis instance"""
logger.info("generating config")
gen_config(ipasn_file)
gen_config(
ipasn_file, oldest_timestamp=oldest_timestamp, newest_timestamp=newest_timestamp
)
with open(ARTEMIS_CONFIG_PATH, "r", encoding="utf_8") as file:
yaml_lines = file.readlines()
data = json.dumps({"type": "yaml", "content": yaml_lines})
......@@ -157,11 +283,22 @@ def update_artemis(ipasn_file: str):
else:
logger.info("Not actually sending to artemis...")
logger.info(
'#requests.post("http://artemis_configuration_1:3000/config", data=data, timeout=10)'
'#requests.post("http://artemis_configuration_1:3000/config", data=data, timeout=)'
)
return True
def send_notification(message: str) -> None:
data = {
"username": "Thesis Notifier",
"embeds": [
{"title": "Preprocessing info", "description": message, "color": "12344"}
],
}
requests.post(DISCORD_WEBHOOK, json=data)
def file_older_than(fpath: str, treshold: datetime.timedelta) -> bool:
"""returns if the file at `fpath` has last been modified more than `treshold` ago"""
mod_tstamp = datetime.datetime.fromtimestamp(path.getmtime(fpath))
......@@ -170,11 +307,20 @@ def file_older_than(fpath: str, treshold: datetime.timedelta) -> bool:
if __name__ == "__main__":
processing_start_time = datetime.datetime.now().timestamp()
if NOTIFY:
logger.info("sending starting notification")
send_notification("Starting")
starting_pipeline = redis_conn.pipeline()
logger.info("Downloading routing table...")
parser = RipeAnnouncementsParser()
ipasn_dat_file = parser.get_routing_table(LATEST_ROUTING_TABLE_PATH)
while True:
# below line uses default value for monitor=RRC0
ipasn_dat_file = parser.get_routing_table(
LATEST_ROUTING_TABLE_PATH, tstamp=STARTING_TIMESTAMP
)
while True: # repeat until successfully pushing new configuration file
if not path.exists(LATEST_ROUTING_TABLE_PATH) or file_older_than(
LATEST_ROUTING_TABLE_PATH, datetime.timedelta(hours=8)
):
......@@ -186,11 +332,26 @@ if __name__ == "__main__":
ipasn_dat_file = path.join(
path.dirname(LATEST_ROUTING_TABLE_PATH), "ipasn.dat"
)
SUCCESS = update_artemis(
ipasn_dat_file,
oldest_timestamp=STARTING_TIMESTAMP,
newest_timestamp=ENDING_TIMESTAMP,
)
if SUCCESS:
logger.info(
"Successfully pushed configuration covering %i to %i",
STARTING_TIMESTAMP,
ENDING_TIMESTAMP,
)
break # stop retrying
success = update_artemis(ipasn_dat_file)
if success:
logger.info("entering sleep for %i seconds", ITER_DELAY)
sleep(ITER_DELAY)
else:
logger.info("Didn't succeed, will retry")
sleep(180)
logger.info("Didn't succeed, will retry")
sleep(180)
processing_end_time = datetime.datetime.now().timestamp()
duration = int(processing_end_time - processing_start_time)
logger.info("Finished after %is", duration)
if NOTIFY:
logger.info("sending ending notification")
send_notification(f"DONE after {duration}s!!")
"""Module to collect ip addresses of nodes in the bitcoin network"""
import datetime
from ipaddress import IPv4Address, IPv6Address, ip_address
from sys import stdout
from typing import Union
import logging
import requests
from util.ip import IpVersion
......@@ -11,6 +14,10 @@ API_URL = "https://bitnodes.io/api/v1/"
SNAPSHOTS_URL = API_URL + "snapshots/"
logging.basicConfig(stream=stdout, level=logging.INFO)
logger = logging.getLogger()
def clean_port(ip_str: str) -> str:
"""Strip port from a string representing
either an ipv4 or ipv6 address"""
......@@ -24,7 +31,8 @@ def clean_port(ip_str: str) -> str:
class BitcoinNode:
"""Class to wrap api object Node"""
def __init__(self, ip_str: str, info: list):
def __init__(self, ip_str: str, info: list, snapshot_timestamp):
self.snapshot_tstamp: int = snapshot_timestamp
self.ip_addr = ip_address(clean_port(ip_str))
(
self.protocol,
......@@ -46,12 +54,12 @@ class BitcoinNode:
elif isinstance(self.ip_addr, IPv6Address):
self.ipversion = IpVersion.IPV6
else:
print(self.ip_addr)
logger.warning(self.ip_addr)
raise ValueError("Unknown ip protocol??")
def make_light(self):
"""Convert a `BitcoinNode` Object to a generic `LightNode`"""
return LightNode(self.ip_addr, self.asn, self.ipversion)
return LightNode(self.ip_addr, self.asn, self.ipversion, self.snapshot_tstamp)
def __eq__(self, __o: object) -> bool:
if not isinstance(__o, BitcoinNode):
......@@ -103,7 +111,7 @@ class BitcoinSnapshot:
onion += 1
elif k[0].isnumeric() or k[0] == "[": # ipv4/v6 addresses
ip_count += 1
nodes.append(BitcoinNode(k, res["nodes"][k]))
nodes.append(BitcoinNode(k, res["nodes"][k], self.timestamp))
else:
raise ValueError("Weird IP address found: " + k)
if DEBUG:
......@@ -119,17 +127,57 @@ class BitcoinIpCollector:
self.collected_snapshots: set[BitcoinSnapshot] = set()
self.nodes: set[Union[BitcoinNode, LightNode]] = set()
def collect_snapshots(self):
def collect_snapshots(self, newest_timestamp: int, oldest_timestamp: int = None):
"""Update the list of snapshots with the latest available ones"""
res = requests.get(SNAPSHOTS_URL).json()
for snap_info in res["results"]:
self.snapshots.add(BitcoinSnapshot(snap_info))
def collect_ip_addresses(self, update_snapshots=True, make_light=False):
res = requests.get(SNAPSHOTS_URL + "?limit=100").json()
if oldest_timestamp is None:
for snap_info in res["results"]:
self.snapshots.add(BitcoinSnapshot(snap_info))
else:
young_enough = True
while young_enough:
next_page_link = res["next"]
logger.info("[bitcoin] new page %s", next_page_link)
for snap_info in res["results"]:
# check that snapshot is within time window
if snap_info["timestamp"] < oldest_timestamp:
# end of window reached, can exit for loop
young_enough = False
break
if snap_info["timestamp"] > newest_timestamp:
logger.debug(
"[bitcoin] snapshot too young: %i > %i",
snap_info["timestamp"],
newest_timestamp,
)
continue # snapshot too young
# else add snapshot
self.snapshots.add(BitcoinSnapshot(snap_info))
if next_page_link is None: # last page reached
young_enough = False
continue
res = requests.get(next_page_link).json()
def collect_ip_addresses(
self,
newest_timestamp: int,
update_snapshots: bool = True,
make_light: bool = False,
oldest_timestamp: int = None,
):
"""collects all ip addressses in the collected snapshots"""
if update_snapshots:
self.collect_snapshots()
for snap in self.snapshots:
self.collect_snapshots(
newest_timestamp=newest_timestamp, oldest_timestamp=oldest_timestamp
)
num_snaps = len(self.snapshots)
for indx, snap in enumerate(self.snapshots):
if indx % 10 == 0:
logger.info("Processing snapshot %i/%i", indx + 1, num_snaps)
if snap in self.collected_snapshots:
continue
nodes = snap.get_ip_addresses()
......@@ -140,20 +188,29 @@ class BitcoinIpCollector:
self.nodes.add(node)
self.collected_snapshots.add(snap)
def setup_and_return_light_nodes(self) -> set[LightNode]:
def setup_and_return_light_nodes(
self, from_timestamp: int, to_timestamp: int = None
) -> set[LightNode]:
"""One-does-all method that sets up the collector and
returns the list of ip addresses of nodes contained in
the latest available snapshots"""
self.snapshots.clear() # so no "old" data is still present
self.collected_snapshots.clear()
self.nodes.clear()
self.collect_ip_addresses(update_snapshots=True, make_light=True)
if to_timestamp is None: # window end is optional
logger.info("no to_timestamp, using current time")
to_timestamp = int(datetime.datetime.now().timestamp())
self.collect_ip_addresses(
newest_timestamp=to_timestamp,
update_snapshots=True,
make_light=True,
oldest_timestamp=from_timestamp,
)
return self.nodes
# TODO yeet the whole collector class thing... utterly useless
# leave for now... 15.6.2022
if __name__ == "__main__" and DEBUG:
collector = BitcoinIpCollector()
collector.collect_snapshots()
collector.collect_ip_addresses()
if __name__ == "__main__":
print("Not supposed to be run on its own")
......@@ -2,4 +2,6 @@ bgpdumpy
pyasn
pyaml
requests
redis
\ No newline at end of file
redis
pybgpstream
ujson
\ No newline at end of file
# pylint: disable=no-member
"""Module that parses update file with bgp updates in
mrt format"""
"""Module that handles downloading a routing table from ripe ris"""
from datetime import datetime as dt
from datetime import timedelta as td
from enum import Enum
from ipaddress import IPv4Network, IPv6Network, ip_network
from os import path, system
from typing import Union
import requests
import re
from bgpdumpy import BGPDump, TableDumpV2
from bgpdumpy.BGPDump import TableDumpV2RouteEntry
from util.ip import IpVersion, get_ip_version
......@@ -86,11 +88,30 @@ class RipeAnnouncementsParser:
file.write(result.content)
@staticmethod
def get_routing_table(out_fpath, monitor=BGPMonitors.RRC00):
def generate_routing_table_link(monitor: BGPMonitors, tstamp: int) -> str:
"""will search for the youngest full routing table
provided by the given monitor on the date one day before the timestamp
that was given as an argument, this way it is guaranteed to have been
dumped before the timestamp"""
stamp_date = dt.fromtimestamp(tstamp)
string_arg_date = stamp_date - td(days=1)
string_arg = f"{string_arg_date.year}{str(string_arg_date.month).zfill(2)}{str(string_arg_date.day).zfill(2)}"
index_link = f"{monitor.value}{string_arg_date.year}.{str(string_arg_date.month).zfill(2)}/"
index_response = requests.get(index_link)
re_pattern = fr'<a.*href="(.?view\.{string_arg}.*\.gz)"'
filepaths = re.findall(re_pattern, index_response.content.decode())
latest_full_link = index_link + filepaths[0]
return latest_full_link
@staticmethod
def get_routing_table(out_fpath, monitor=BGPMonitors.RRC00, tstamp=None):
"""downloads the latest full routing table for a given
monitor"""
routing_table_remote_name = "latest-bview.gz"
url = monitor.value + routing_table_remote_name
if tstamp is None:
url = monitor.value + "latest-bview.gz"
else:
url = RipeAnnouncementsParser.generate_routing_table_link(monitor, tstamp)
dirname = path.dirname(out_fpath)
with requests.get(url, stream=True) as result:
......
......@@ -11,19 +11,28 @@ class LightNode:
"""
def __init__(
self, ip_addr: Union[IPv4Address, IPv6Address], asn: int, ipversion: IpVersion
self,
ip_addr: Union[IPv4Address, IPv6Address],
asn: int,
ipversion: IpVersion,