from bs4 import BeautifulSoup
from pyotp import TOTP
from urllib import parse
import json
from .. import BaseClient
from . import exceptions
import time
def captcha_decorator(func):
def wrapper(*args, **kwargs):
self = args[0]
captcha_guess = kwargs.get('captcha_guess')
solver = self.session()._captcha_solver
guess_uuid = None
while True:
try:
_kwargs = {k: v for k, v in kwargs.items()}
if captcha_guess:
_kwargs['captcha_guess'] = captcha_guess
return func(*args, **_kwargs)
except exceptions.CaptchaRequiredException as e:
if solver is None:
raise
if guess_uuid and captcha_guess and captcha_guess.action == e.action:
solver.incorrect(guess_uuid)
guess_uuid = solver.solve(url=e.CaptchaURL)
while True:
guess = solver.result(guess_uuid)
if guess is None:
time.sleep(5)
else:
break
captcha_guess = e.guess(guess)
continue
break
return wrapper
[docs]class Client(BaseClient):
REQUIRES_AUTHENTICATION = False
_REDIRECT_URL = "https://console.aws.amazon.com/console/home?state=hashArgs%23&isauthcode=true"
def __init__(self, session):
super().__init__(session)
self.__csrf_token = None
self.__session_id = None
def _csrf_token(self):
if self.__csrf_token == None:
self._get_tokens()
return self.__csrf_token
def _session_id(self):
if self.__session_id == None:
self._get_tokens()
return self.__session_id
def _get_tokens(self):
r = self.session()._get(
"https://signin.aws.amazon.com/signin?redirect_uri=https%3A%2F%2Fconsole.aws.amazon.com%2Fconsole%2Fhome%3Fstate%3DhashArgs%2523%26isauthcode%3Dtrue&client_id=arn%3Aaws%3Aiam%3A%3A015428540659%3Auser%2Fhomepage&forceMobileApp=0"
)
if r.status_code != 200:
raise Exception("failed get tokens")
soup = BeautifulSoup(r.text, 'html.parser')
meta = {
m['name']: m['content']
for m in soup.find_all('meta') if 'name' in m.attrs
}
self.__csrf_token = meta['csrf_token']
self.__session_id = meta['session_id']
def _action(self, action, data=None, api="signin", captcha_guess=None):
"""
Execute an action on the signin API.
Args:
action: Action to execute.
data: Arguments for the action.
Returns:
dict: Action response.
"""
if not data:
data = {}
data['action'] = action
data['redirect_uri'] = self._REDIRECT_URL
data['csrf'] = self._csrf_token()
data['sessionId'] = self._session_id()
if captcha_guess and captcha_guess.action == action:
data['captcha_token'] = captcha_guess.captcha_token
data['captchaObfuscationToken'] = \
captcha_guess.captchaObfuscationToken
data['captcha_guess'] = captcha_guess.guess
r = self.session()._post(
"https://signin.aws.amazon.com/{}".format(api),
data=data,
)
if r.status_code != 200:
raise Exception("failed action {}: {}".format(action, r.text))
out = json.loads(r.text)
state = out.get('state', 'none').lower()
properties = out.get('properties', {})
if properties.get('Captcha',
'false') == 'true' and action != "captcha":
raise exceptions.CaptchaRequiredException(
properties['CES'], properties['CaptchaURL'],
properties['captchaObfuscationToken'], action)
if state != 'success':
if 'Message' in properties:
raise Exception("failed action {}: {}".format(
action, properties['Message']))
else:
raise Exception("failed action {}: {}".format(action, r.text))
return properties
[docs] @captcha_decorator
def get_account_type(self, email, captcha_guess=None):
"""
Determine the type of account.
Account Types:
Coupled: Coupled to an amazon.com account.
Decoupled: Independend from amazon.com.
Unknown: Non-existent account.
Request Syntax:
.. code-block:: python
response = client.get_account_type(
email=str,
)
Args:
email: Account email address.
Returns:
str: Account type
"""
response = self._action(
'resolveAccountType', {'email': email},
captcha_guess=captcha_guess)
return response['resolvedAccountType']
[docs] def mfa_required(self, email):
mfa_client = self.session().client('mfa')
mfa = mfa_client.get_mfa_status(email)
if 'mfaType' in mfa:
if mfa['mfaType'] == 'NONE':
return False
else:
return True
return True
return True
[docs] def signin(self, email, password, mfa_secret=None):
# check mfa
mfa_required = self.mfa_required(email)
if mfa_required and (mfa_secret is None or len(mfa_secret) == 0):
raise Exception("account mfa protected but no secret provided")
if not mfa_required:
mfa_secret = None
return self.signin_decoupled(email, password, mfa_secret)
[docs] @captcha_decorator
def signin_decoupled(self,
email,
password,
mfa_secret=None,
captcha_guess=None):
"""
Signin into the AWS Management Console using account root user.
Request Syntax:
.. code-block:: python
response = client.signin_decoupled(
email=str,
password=str,
mfa_secret=str,
)
Args:
email: Account email address.
password: Account password.
mfa_secret: Account mfa secret. The Base32 seed defined as specified
in RFC3548. The Base32StringSeed is Base64-encoded.
Returns:
bool: Signin successful
"""
data = {
'email': email,
'password': password,
'client_id': 'arn:aws:iam::015428540659:user/homepage',
}
if mfa_secret is not None:
data['mfaType'] = 'OTP'
data['mfa1'] = TOTP(mfa_secret).now()
data['mfaSerial'] = 'undefined'
# an exception is thrown if authentication was unsuccessful
self._action('authenticateRoot', data, captcha_guess=captcha_guess)
self.session().authenticated = True
self.session().root = True
return True
[docs] def get_password_recovery_captcha(self):
"""
Obtains a captcha for password recovery.
The value ``CES`` must be used as ``captcha_token`` in
:py:meth:`get_reset_password_token`.
Returns:
dict: Response Syntax
.. code-block:: python
{
'CES': str,
'Captcha': bool,
'CaptchaURL': str,
'captchaObfuscationToken': str,
}
"""
return self._action('captcha', {'forgotpassword': True})
[docs] def raise_password_recovery_captcha(self):
"""
Obtains a captcha for password recovery and raises a
CaptchaRequiredException.
"""
captcha = self.get_password_recovery_captcha()
raise exceptions.CaptchaRequiredException(
captcha['CES'], captcha['CaptchaURL'],
captcha['captchaObfuscationToken'], 'getResetPasswordToken')
[docs] @captcha_decorator
def get_reset_password_token(self, email, captcha_guess=None):
"""
Asks for a password reset token to be sent to the registered email
address.
The value token url from the resulting email must be used as
``reset_token_url`` in :py:meth:`reset_password`.
Returns:
dict: Response Syntax
.. code-block:: python
{
'recovery_result': 'email_sent'
}
"""
if not captcha_guess:
self.raise_password_recovery_captcha()
try:
return self._action(
'getResetPasswordToken', {'email': email},
captcha_guess=captcha_guess)
except Exception as e:
if str(
e
) == "failed action getResetPasswordToken: Enter the characters and try again":
self.raise_password_recovery_captcha()