Source code for pyhafas.profile.base.helper.request

import json
from hashlib import md5
from typing import Tuple

import requests

from pyhafas.profile import ProfileInterface
from pyhafas.profile.base.mappings.error_codes import BaseErrorCodesMapping
from pyhafas.profile.interfaces.helper.request import RequestHelperInterface
from pyhafas.types.hafas_response import HafasResponse


[docs]class BaseRequestHelper(RequestHelperInterface):
[docs] def calculate_checksum(self: ProfileInterface, data: str) -> str: """ Calculates the checksum of the request (required for most profiles) :param data: Complete body as string :return: Checksum for the request """ return md5((data + self.salt).encode('utf-8')).hexdigest()
[docs] def calculate_mic_mac( self: ProfileInterface, data: str) -> Tuple[str, str]: """ Calculates the mic-mac for the request (required for some profiles) :param data: Complete body as string :return: Mic and mac to be sent to HaFAS """ mic = md5(data.encode('utf-8')).hexdigest() mac = self.calculate_checksum(mic) return mic, mac
[docs] def url_formatter(self: ProfileInterface, data: str) -> str: """ Formats the URL for HaFAS (adds the checksum or mic-mac) :param data: Complete body as string :return: Request-URL (maybe with checksum or mic-mac) """ url = self.baseUrl if self.addChecksum or self.addMicMac: parameters = [] if self.addChecksum: parameters.append( 'checksum={}'.format( self.calculate_checksum(data))) if self.addMicMac: parameters.append( 'mic={}&mac={}'.format( *self.calculate_mic_mac(data))) url += '?{}'.format('&'.join(parameters)) return url
[docs] def request(self: ProfileInterface, body) -> HafasResponse: """ Sends the request and does a basic parsing of the response and error handling :param body: Reqeust body as dict (without the `requestBody` of the profile) :return: HafasRespone object or Exception when HaFAS response returns an error """ data = { 'svcReqL': [body] } data.update(self.requestBody) data = json.dumps(data) res = requests.post( self.url_formatter(data), data=data, headers={ 'User-Agent': self.userAgent, 'Content-Type': 'application/json'}) return HafasResponse(res, BaseErrorCodesMapping)