mfapi/mfapi/client.py
Léo VIALLON GALINIER 53ccb57208 First commit
2025-03-21 08:24:08 +01:00

231 lines
9.2 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
This file is largely inspired from the example provided on
https://portail-api.meteofrance.fr/web/fr/faq
It allow to connect to Météo-France API with oauth2.
"""
import json
import time
import os
import logging
import configparser
import datetime
import requests
DEFAULT_EXPIRATION_TIME = 3590 # Default validity of the JWT token
# url to obtain acces token
TOKEN_URL = "https://portail-api.meteofrance.fr/token"
URL_BRA = "https://public-api.meteofrance.fr/public/DPBRA/v1/massif/BRA?id-massif={id_massif:d}&format={format}"
URL_BRA_IMAGES = {
"montagne_risques_{id_massif}.png": "https://public-api.meteofrance.fr/public/DPBRA/v1/massif/image/montagne-risques?id-massif={id_massif:d}",
"rose_pentes_{id_massif}.png": "https://public-api.meteofrance.fr/public/DPBRA/v1/massif/image/rose-pentes?id-massif={id_massif:d}",
"montagne_enneigement_{id_massif}.png": "https://public-api.meteofrance.fr/public/DPBRA/v1/massif/image/?id-massif={id_massif:d}",
"graphe_neige_fraiche_{id_massif}.png": "https://public-api.meteofrance.fr/public/DPBRA/v1/massif/image/graphe-neige-fraiche?id-massif={id_massif:d}",
"apercu_meteo_{id_massif}.png": "https://public-api.meteofrance.fr/public/DPBRA/v1/massif/image/apercu-meteo?id-massif={id_massif:d}",
"sept_derniers_jours_{id_massif}.png": "https://public-api.meteofrance.fr/public/DPBRA/v1/massif/image/sept-derniers-jours?id-massif={id_massif:d}",
}
URL_CLIM_H = "https://public-api.meteofrance.fr/public/DPClim/v1/commande-station/horaire"
URL_CLIM_H_DATA = 'https://public-api.meteofrance.fr/public/DPClim/v1/commande/fichier'
class MFAPIClient(object):
def __init__(self, config_file='~/.mfapi', token_file='./mfapi_tokens.json'):
"""
Client object to provide transparent authentication.
The application_id have to be provided in a configuration file
(default is ~/.mfapi).
The config file is a ini-like format.
Keys are:
- application_id for your personal unique application id that can
be found in the curl's command to generate the JWT token
- default_expiration_time : the default JWT expiration time
:param config_file: Path to the configuration file
:param token_file: Path to the temporary file to store token.
"""
self.session = requests.Session()
self.token_file = token_file
if not os.path.isfile(config_file):
logging.critical(f'Config file {config_file} not found.')
raise ValueError(f'Config file {config_file} not found.')
config = configparser.ConfigParser()
config.read(config_file)
self.application_id = config.get('DEFAULT', 'application_id')
self.default_expire = config.getint('DEFAULT', 'default_expiration_time',
fallback=DEFAULT_EXPIRATION_TIME)
self.token = None
self.token_expire = time.time()
if os.path.isfile(self.token_file):
try:
with open(self.token_file, 'r') as ff:
token_json = json.load(ff)
if 'token' in token_json:
self.token = token_json['token']
if 'token_expire' in token_json:
self.token_expire = int(token_json['token_expire'])
except Exception:
pass
def request(self, method, url, **kwargs):
# If token is supposed to be expired or does not exist yet
if self.token is None or time.time() > self.token_expire:
self._obtain_token()
# Optimistically attempt to dispatch reqest
if 'headers' not in kwargs:
kwargs['headers'] = {}
kwargs['headers']['Authorization'] = 'Bearer %s' % self.token
response = self.session.request(method, url, **kwargs)
if self._token_has_expired(response):
self._obtain_token()
response = self.session.request(method, url, **kwargs)
return response
def get(self, url, **kwargs):
"""
The function to interrogate an API endpoint.
:param url: The URL to interrogate
:trturns: raw requests Response from API
"""
return self.request('GET', url, **kwargs)
def _token_has_expired(self, response):
status = response.status_code
content_type = response.headers['Content-Type']
if status == 401 and 'application/json' in content_type:
repJson = response.json()
if 'description' in repJson and 'Invalid JWT token' in repJson['description']:
return True
return False
def _obtain_token(self):
"""
Obtain a new token and store it into self.token_file
"""
data = {'grant_type': 'client_credentials'}
headers = {'Authorization': 'Basic ' + self.application_id}
access_token_response = requests.post(TOKEN_URL, data=data,
allow_redirects=False,
headers=headers)
self.token = access_token_response.json()['access_token']
self.token_expire = time.time() + self.default_expire
with open(self.token_file, 'w') as ff:
json.dump({'token': self.token, 'token_expire': self.token_expire},
ff)
def get_bra(self, id_massif: int,
format: str = 'xml',
dest_filename: str | None = None,
images: bool = False) -> str | None:
"""
Get BRA (Bulletin d'estimation du risque d'avlanche).
You have to subscribe to this API first.
:param id_massif: Massif number (int)
:param format: xml or pdf
:param dest_filename: A custom output filename (default is BRA_{id_massif}).
:param images: Download images also (for XML display, only if format=xml)
:returns: The created filename or None if not found
"""
if format not in ['xml', 'pdf']:
raise ValueError('Format should be either pdf or xml')
if dest_filename is None:
dest_filename = f"BRA_{id_massif}.{format}"
r = self.get(URL_BRA.format(id_massif=id_massif, format=format),
headers={'Accept': '*/*'})
s = r.status_code
if s == 404:
return
with open(dest_filename, 'wb') as ff:
ff.write(r.content)
if images and format == 'xml':
for filename, url in URL_BRA_IMAGES.items():
filename = filename.format(id_massif=id_massif)
r = self.get(url.format(id_massif=id_massif),
headers={'Accept': '*/*'})
if r.status_code == 404:
continue
else:
with open(filename, 'wb') as ff:
ff.write(r.content)
return dest_filename
def get_clim_h(self, id_station: str,
begin: datetime.datetime,
end: datetime.datetime | None = None,
filename: str = None) -> str | None:
"""
Get hourly data from one station
:param id_station: The station ID (string, 9 numeric chars)
:param begin: begin date, supposed to be UTC.
:param end: end date (default to now)
:param filename: output filename (default is id_station.csv).
:returns: Output filename if succeded else None.
Return the raw csv if '-' is provided as filename.
"""
if filename is None:
filename = f'{id_station}.csv'
begin = begin.isoformat(sep='T', timespec='seconds') + 'Z'
if end is None:
end = datetime.datetime.utcnow()
end = datetime.datetime(end.year, end.month, end.day, end.hour)
end = end.isoformat(sep='T', timespec='seconds') + 'Z'
data = {
'id-station': id_station,
'date-deb-periode': begin,
'date-fin-periode': end,
}
# Send request
r = self.get(URL_CLIM_H, params=data)
content_type = r.headers['Content-Type']
if r.status_code == 202 and 'application/json' in content_type:
j = r.json()
if 'elaboreProduitAvecDemandeResponse' in j and 'return' in j['elaboreProduitAvecDemandeResponse']:
id_cmde = j['elaboreProduitAvecDemandeResponse']['return']
else:
logging.error(f'Incorrect JSON answer: {j}')
return None
else:
logging.error(f'Error in request. Answer: {r.text}')
return None
# Get data
rcode = 204
while rcode == 204:
r = self.get(URL_CLIM_H_DATA, params={'id-cmde': id_cmde})
rcode = r.status_code
if rcode == 201:
if filename == '-':
return r.text
else:
with open(filename, 'w') as ff:
ff.write(r.text)
break
elif rcode != 204:
logging.error('Error while downloading the data: {r.text}')
break
logging.info('Waiting for the data...')
time.sleep(10)