Simple hub'eau client for python
This package is currently under active development. Every API on Hub'eau will be covered by this package in due time.
At this stage, the following APIs are covered by cl-hubeau:
- phytopharmaceuticals transactions/vente et achat de produits phytopharmaceutiques
- watercourses flow/écoulement des cours d'eau
- drinking water quality/qualité de l'eau potable
- hydrobiology/hydrobiologie
- hydrometry/hydrométrie
- superficial waterbodies quality/qualité des cours d'eau
- ground waterbodies quality/qualité des nappes
- piezometry/piézométrie
For any help on available kwargs for each endpoint, please refer
directly to the documentation on hub'eau
(this will not be covered
by the current documentation).
Assume that each function from cl-hubeau
will be consistent with
it's hub'eau
counterpart, with the exception of the size
and
page
or cursor
arguments (those will be set automatically by
cl-hubeau
to crawl allong the results).
cl-hubeau
already uses simple multithreading pools to perform requests.
In order not to endanger the webservers and share ressources among users, a
rate limiter is set to 10 queries per second. This limiter should work fine on
any given machine, whatever the context (even with a new parallelization
overlay).
However cl-hubeau
should NOT be used in containers (or pods) with
parallelization. There is currently no way of tracking the queries' rate
among multiple machines: greedy queries may end up blacklisted by the
team managing Hub'eau.
Starting with pynsee 0.2.0
, no API keys are needed anymore.
In case of bugs, please open an issue on the repo.
You will find in the present README a basic documentation in english. For further information, please refer to :
- the docstrings (which are mostly up-to-date);
- the complete documentation (in french) available here.
Any help is welcome. Please refer to the CONTRIBUTING file.
GPL-3.0-or-later
This package is currently under active development.
from cl_hubeau.utils import clean_all_cache
clean_all_cache()
Hub'Eau
has currently a limit set to 20k results for any query. To circumvente
this, cl-hubeau
defines upper-level functions which may slightly differ from
the low-level classes (which try to mimick hub'eau
's standard beahviour).
The upper-level functions are all using loops to avoid reaching the 20k results
threshold. For any query that could accept time ranges parameters, time ranges
will be automatically added to your desired query (if not already specified);
in case of reaching the 20k result threshold, the timeranges will be splitted
in two (thus bypassing that threshold). If you ever reach the 20k nonetheless,
please get in touch and submit an issue.
cl-hubeau
configuration can be accessed by the following code:
from cl_hubeau import _config
print(_config)
This configuration (stored as a dictionnary) can be altered any time you want. For instance, if you want to alter the default cache expiration, you could do the following:
from cl_hubeau import _config
from datetime import timedelta
# set a one year cache for multi-purpose cache
_config["DEFAULT_EXPIRE_AFTER"] = datetime.timedelta(day=365)
# set a one hour cache of realtime datasets
_config["DEFAULT_EXPIRE_AFTER_REALTIME"] = datetime.timedelta(day=365)
Note that you can also alter the number of threads used to query Hub'eau
.
Nonetheless, there is also a ratelimit of 10 queries/second imposed by
cl-hubeau
to avoid overloading the server.
As a consequence, you should only reduce the THREADS
configuration
(if your machine has trouble with that) and never increase it (which shouldn't
have any effect).
Also note that the query rate you will see on tqdm
's progress bar does not
reflect the query rate of Hub'Eau
: the cursor/page iterations of one subquery
will not be displayed. Hence a 2 it/s displayed might very well be
a 10 requests/s load on Hub'Eau
's server.
cl-hubeau
executes two types of http(s) requests:
- some made by
pynsee
to gather INSEE & IGN datasets; - some made by
cl-hubeau
itself to gatherHub'Eau
datasets.
To work behind corporate proxies, it should be enough to configure two environment variables :
- http_proxy
- https_proxy
You can also set the proxies using a dictionnary as an argument when creating
sessions (low-level classes from cl-hubeau
).
Note that pynsee
store those proxies in a configuration file.
In case of troubles, don't hesitate to manually delete that file.
4 high level functions are available (and one class for low level operations).
Note that high level functions introduce new arguments (filter_regions
and filter_departements
to better target territorial data.
Get all active substances bought (uses a 30 days caching):
from cl_hubeau import phytopharmaceuticals_transactions as pt
df = pt.get_all_active_substances_bought()
# or to get regional data:
df = pt.get_all_active_substances_bought(
type_territoire="Région", code_territoire="32"
)
# or to get departemantal data:
df = pt.get_all_active_substances_bought(
type_territoire="Département", filter_regions="32"
)
# or to get postcode-zoned data:
df = pt.get_all_active_substances_bought(
type_territoire="Zone postale", filter_departements=["59", "62"]
)
Get all phytopharmaceutical products bought (uses a 30 days caching):
from cl_hubeau import phytopharmaceuticals_transactions as pt
df = pt.get_all_phytopharmaceutical_products_bought()
# or to get regional data:
df = pt.get_all_phytopharmaceutical_products_bought(
type_territoire="Région", code_territoire="32"
)
# or to get departemantal data:
df = pt.get_all_phytopharmaceutical_products_bought(
type_territoire="Département", filter_regions="32"
)
# or to get postcode-zoned data:
df = pt.get_all_phytopharmaceutical_products_bought(
type_territoire="Zone postale", filter_departements=["59", "62"]
)
Get all active substances sold (uses a 30 days caching):
from cl_hubeau import phytopharmaceuticals_transactions as pt
df = pt.get_all_active_substances_sold()
# or to get regional data:
df = pt.get_all_active_substances_sold(
type_territoire="Région", code_territoire="32"
)
# or to get departemantal data:
df = pt.get_all_active_substances_sold(
type_territoire="Département", filter_regions="32"
)
Get all phytopharmaceutical products sold (uses a 30 days caching):
from cl_hubeau import phytopharmaceuticals_transactions as pt
df = pt.get_all_phytopharmaceutical_products_sold()
# or to get regional data:
df = pt.get_all_phytopharmaceutical_products_sold(
type_territoire="Région", code_territoire="32"
)
# or to get departemantal data:
df = pt.get_all_phytopharmaceutical_products_sold(
type_territoire="Département", filter_regions="32"
)
Low level class to perform the same tasks:
Note that :
- the API is forbidding results > 20k rows and you may need inner loops
- the cache handling will be your responsibility
with pt.PhytopharmaceuticalsSession() as session:
df = session.active_substances_sold(
annee_min=2010,
annee_max=2015,
code_territoire=["32"],
type_territoire="Région",
)
df = session.phytopharmaceutical_products_sold(
annee_min=2010,
annee_max=2015,
code_territoire=["32"],
type_territoire="Région",
eaj="Oui",
unite="l",
)
df = session.active_substances_bought(
annee_min=2010,
annee_max=2015,
code_territoire=["32"],
type_territoire="Région",
)
df = session.phytopharmaceutical_products_bought(
code_territoire=["32"],
type_territoire="Région",
eaj="Oui",
unite="l",
)
3 high level functions are available (and one class for low level operations).
Get all stations (uses a 30 days caching):
from cl_hubeau import watercourses_flow
df = watercourses_flow.get_all_stations()
Get all observations (uses a 30 days caching):
from cl_hubeau import watercourses_flow
df = watercourses_flow.get_all_observations()
Note that this query is heavy, users should restrict it to a given territory when possible. For instance, you could use :
df = watercourses_flow.get_all_observations(code_region="11")
Get all campaigns:
from cl_hubeau import watercourses_flow
df = watercourses_flow.get_all_campaigns()
Low level class to perform the same tasks:
Note that :
- the API is forbidding results > 20k rows and you may need inner loops
- the cache handling will be your responsibility
with watercourses_flow.WatercoursesFlowSession() as session:
df = session.get_stations(code_departement="59")
df = session.get_campaigns(code_campagne=[12])
df = session.get_observations(code_station="F6640008")
2 high level functions are available (and one class for low level operations).
Get all water networks (UDI) (uses a 30 days caching):
from cl_hubeau import drinking_water_quality
df = drinking_water_quality.get_all_water_networks()
Get the sanitary controls's results for nitrates on all networks of Paris, Lyon & Marseille (uses a 30 days caching) for nitrates
networks = drinking_water_quality.get_all_water_networks(code_region=["11", "84", "93"])
networks = networks[
networks.nom_commune.isin(["PARIS", "MARSEILLE", "LYON"])
]["code_reseau"].unique().tolist()
df = drinking_water_quality.get_control_results(
code_reseau=networks, code_parametre="1340"
)
df = df[df.nom_commune.isin(["PARIS", "MARSEILLE", "LYON"])]
Note that this query is heavy, even if this was already restricted to nitrates. In theory, you could also query the API without specifying the substance you're tracking, but this has not been tested.
You can also call the same function, using official city codes directly:
df = drinking_water_quality.get_control_results(
code_commune=['59350'],
code_parametre="1340"
)
Low level class to perform the same tasks:
Note that :
- the API is forbidding results > 20k rows and you may need inner loops
- the cache handling will be your responsibility
with drinking_water_quality.DrinkingWaterQualitySession() as session:
df = session.get_cities_networks(nom_commune="LILLE")
df = session.get_control_results(code_departement='02', code_parametre="1340")
3 high level functions are available (and one class for low level operations).
Get all stations (uses a 30 days caching):
from cl_hubeau import hydrobiology
df = hydrobiology.get_all_water_networks()
Get the taxa identified on stations in Paris (uses a 30 days caching):
df = hydrobiology.get_all_taxa(code_commune=["75056"])
Note that this query is heavy if not restricted to areas and/or timeranges. In theory, you could query the API without arguments, but this has not been tested (this should not be possible on standard machines because of the RAM consumption).
Get the indexes identified on stations in Paris (uses a 30 days caching):
df = hydrobiology.get_all_indexes(code_commune=["75056"])
Note that this query is heavy if not restricted to areas and/or timeranges. In theory, you could query the API without arguments, but this has not been tested (this should not be possible on standard machines because of the RAM consumption).
Low level class to perform the same tasks:
Note that :
- the API is forbidding results > 20k rows and you may need inner loops
- the cache handling will be your responsibility
with hydrobiology.HydrobiologySession() as session:
df = session.get_stations(code_commune="75056")
df = session.get_taxa(code_commune="75056")
df = session.get_indexes(code_commune="75056")
4 high level functions are available (and one class for low level operations).
Get all stations (uses a 30 days caching):
from cl_hubeau import hydrometry
gdf = hydrometry.get_all_stations()
Get all sites (uses a 30 days caching):
gdf = hydrometry.get_all_sites()
Get observations for the first 5 sites (uses a 30 days caching): Note that this will also work with stations (instead of sites).
df = hydrometry.get_observations(gdf["code_site"].head(5).tolist())
Get realtime data for the first 5 sites (no cache stored):
A small cache is stored to allow for realtime consumption (cache expires after only 15 minutes). Please, adopt a responsible usage with this functionnality !
df = hydrometry.get_realtime_observations(gdf["code_site"].head(5).tolist())
Low level class to perform the same tasks:
Note that :
- the API is forbidding results > 20k rows and you may need inner loops
- the cache handling will be your responsibility, noticely for realtime data
with hydrometry.HydrometrySession() as session:
df = session.get_stations(code_station="K437311001")
df = session.get_sites(code_departement=['02', '59', '60', '62', '80'], format="geojson")
df = session.get_realtime_observations(code_entite="K437311001")
df = session.get_observations(code_entite="K437311001")
4 high level functions are available (and one class for low level operations).
Get all stations (uses a 30 days caching):
from cl_hubeau import superficial_waterbodies_quality
df = superficial_waterbodies_quality.get_all_stations()
Get all operations (uses a 30 days caching):
from cl_hubeau import superficial_waterbodies_quality
df = superficial_waterbodies_quality.get_all_operations()
Note that this query is heavy, users should restrict it to a given territory. For instance, you could use :
df = superficial_waterbodies_quality.get_all_operations(code_region="11")
Get all environmental conditions:
from cl_hubeau import superficial_waterbodies_quality
df = superficial_waterbodies_quality.get_all_environmental_conditions()
Note that this query is heavy, users should restrict it to a given territory. For instance, you could use :
df = superficial_waterbodies_quality.get_all_environmental_conditions(code_region="11")
Get all physicochemical analyses:
from cl_hubeau import superficial_waterbodies_quality
df = superficial_waterbodies_quality.get_all_analyses()
Note that this query is heavy, users should restrict it to a given territory and given parameters. For instance, you could use :
df = superficial_waterbodies_quality.get_all_analyses(
code_departement="59",
code_parametre="1313"
)
Low level class to perform the same tasks:
Note that :
- the API is forbidding results > 20k rows and you may need inner loops
- the cache handling will be your responsibility
with superficial_waterbodies_quality.SuperficialWaterbodiesQualitySession() as session:
df = session.get_stations(code_commune="59183")
df = session.get_operations(code_commune="59183")
df = session.get_environmental_conditions(code_commune="59183")
df = session.get_analyses(code_commune='59183', code_parametre="1340")
2 high level functions are available (and one class for low level operations).
Get all stations (uses a 30 days caching):
from cl_hubeau import ground_water_quality
df = ground_water_quality.get_all_stations()
Get the tests results for nitrates :
df = ground_water_quality.df = get_all_analyses(code_param="1340")
Note that this query is heavy, even if this was already restricted to nitrates, and that it may fail. In theory, you could even query the API without specifying the substance you're tracking, but you will hit the 20k threshold and trigger an exception.
In practice, you should call the same function with a territorial restriction or with
specific bss_id
s.
For instance, you could use official city codes directly:
df = ground_water_quality.get_all_analyses(
num_departement=["59"]
code_param="1340"
)
Note: a bit of caution is needed here, as the arguments are NOT the same
in the two endpoints. Please have a look at the documentation on
hubeau.
For instance, the city's number is called "code_insee_actuel"
on analyses' endpoint
and "code_commune"
on station's.
Low level class to perform the same tasks:
Note that :
- the API is forbidding results > 20k rows and you may need inner loops
- the cache handling will be your responsibility
with ground_water_quality.GroundWaterQualitySession() as session:
df = session.get_stations(bss_id="01832B0600")
df = session.get_analyses(
bss_id=["BSS000BMMA"],
code_param="1461",
)
3 high level functions are available (and one class for low level operations).
Get all piezometers (uses a 30 days caching):
from cl_hubeau import piezometry
gdf = piezometry.get_all_stations()
Get chronicles for the first 100 piezometers (uses a 30 days caching):
df = piezometry.get_chronicles(gdf["code_bss"].head(100).tolist())
Get realtime data for the first 100 piezometers:
A small cache is stored to allow for realtime consumption (cache expires after only 15 minutes). Please, adopt a responsible usage with this functionnality !
df = get_realtime_chronicles(gdf["code_bss"].head(100).tolist())
Low level class to perform the same tasks:
Note that :
- the API is forbidding results > 20k rows and you may need inner loops
- the cache handling will be your responsibility, noticely for realtime data
with piezometry.PiezometrySession() as session:
df = session.get_chronicles(code_bss="07548X0009/F")
df = session.get_stations(code_departement=['02', '59', '60', '62', '80'], format="geojson")
df = session.get_chronicles_real_time(code_bss="07548X0009/F")
In order to ease queries on hydrographic territories, some convenience functions have been added to this module.
In these process, we are harvesting official geodatasets which are not available on hub'eau; afterwards, simple geospatial joins are performed with the latest geodataset of french cities.
These are convenience tools and there will be approximations (geographical precision of both datasets might not match).
You can retrieve a SAGE's communal components using the following snippet:
from cl_hubeau.utils import cities_for_sage
d = cities_for_sage()
The official geodataset is eaufrance's SAGE.