#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
| This file is part of the web2py Web Framework
| Copyrighted by Massimo Di Pierro <mdipierro@cs.depaul.edu>
| License: LGPLv3 (http://www.gnu.org/licenses/lgpl.html)
This file specifically includes utilities for security.
--------------------------------------------------------
"""
import threading
import struct
import uuid
import random
import inspect
import time
import os
import re
import sys
import logging
import socket
import base64
import zlib
_struct_2_long_long = struct.Struct('=QQ')
python_version = sys.version_info[0]
if python_version == 2:
import cPickle as pickle
else:
import pickle
import hashlib
from hashlib import md5, sha1, sha224, sha256, sha384, sha512
try:
from Crypto.Cipher import AES
except ImportError:
import gluon.contrib.aes as AES
import hmac
if hasattr(hashlib, "pbkdf2_hmac"):
def pbkdf2_hex(data, salt, iterations=1000, keylen=24, hashfunc=None):
hashfunc = hashfunc or sha1
return hashlib.pbkdf2_hmac(hashfunc().name,
data, salt, iterations,
keylen).encode("hex")
HAVE_PBKDF2 = True
else:
try:
try:
from gluon.contrib.pbkdf2_ctypes import pbkdf2_hex
except (ImportError, AttributeError):
from gluon.contrib.pbkdf2 import pbkdf2_hex
HAVE_PBKDF2 = True
except ImportError:
try:
from .pbkdf2 import pbkdf2_hex
HAVE_PBKDF2 = True
except (ImportError, ValueError):
HAVE_PBKDF2 = False
logger = logging.getLogger("web2py")
[docs]def AES_new(key, IV=None):
""" Returns an AES cipher object and random IV if None specified """
if IV is None:
IV = fast_urandom16()
return AES.new(key, AES.MODE_CBC, IV), IV
[docs]def compare(a, b):
""" Compares two strings and not vulnerable to timing attacks """
if len(a) != len(b):
return False
result = 0
for x, y in zip(a, b):
result |= ord(x) ^ ord(y)
return result == 0
[docs]def md5_hash(text):
""" Generates a md5 hash with the given text """
return md5(text).hexdigest()
[docs]def simple_hash(text, key='', salt='', digest_alg='md5'):
"""
Generates hash with the given text using the specified
digest hashing algorithm
"""
if not digest_alg:
raise RuntimeError("simple_hash with digest_alg=None")
elif not isinstance(digest_alg, str): # manual approach
h = digest_alg(text + key + salt)
elif digest_alg.startswith('pbkdf2'): # latest and coolest!
iterations, keylen, alg = digest_alg[7:-1].split(',')
return pbkdf2_hex(text, salt, int(iterations),
int(keylen), get_digest(alg))
elif key: # use hmac
digest_alg = get_digest(digest_alg)
h = hmac.new(key + salt, text, digest_alg)
else: # compatible with third party systems
h = get_digest(digest_alg)()
h.update(text + salt)
return h.hexdigest()
[docs]def get_digest(value):
"""
Returns a hashlib digest algorithm from a string
"""
if not isinstance(value, str):
return value
value = value.lower()
if value == "md5":
return md5
elif value == "sha1":
return sha1
elif value == "sha224":
return sha224
elif value == "sha256":
return sha256
elif value == "sha384":
return sha384
elif value == "sha512":
return sha512
else:
raise ValueError("Invalid digest algorithm: %s" % value)
DIGEST_ALG_BY_SIZE = {
128 / 4: 'md5',
160 / 4: 'sha1',
224 / 4: 'sha224',
256 / 4: 'sha256',
384 / 4: 'sha384',
512 / 4: 'sha512',
}
[docs]def get_callable_argspec(fn):
if inspect.isfunction(fn) or inspect.ismethod(fn):
inspectable = fn
elif inspect.isclass(fn):
inspectable = fn.__init__
elif hasattr(fn, '__call__'):
inspectable = fn.__call__
else:
inspectable = fn
return inspect.getargspec(inspectable)
[docs]def pad(s, n=32, padchar=' '):
return s + (32 - len(s) % 32) * padchar
[docs]def secure_dumps(data, encryption_key, hash_key=None, compression_level=None):
if not hash_key:
hash_key = sha1(encryption_key).hexdigest()
dump = pickle.dumps(data, pickle.HIGHEST_PROTOCOL)
if compression_level:
dump = zlib.compress(dump, compression_level)
key = pad(encryption_key[:32])
cipher, IV = AES_new(key)
encrypted_data = base64.urlsafe_b64encode(IV + cipher.encrypt(pad(dump)))
signature = hmac.new(hash_key, encrypted_data).hexdigest()
return signature + ':' + encrypted_data
[docs]def secure_loads(data, encryption_key, hash_key=None, compression_level=None):
if not ':' in data:
return None
if not hash_key:
hash_key = sha1(encryption_key).hexdigest()
signature, encrypted_data = data.split(':', 1)
actual_signature = hmac.new(hash_key, encrypted_data).hexdigest()
if not compare(signature, actual_signature):
return None
key = pad(encryption_key[:32])
encrypted_data = base64.urlsafe_b64decode(encrypted_data)
IV, encrypted_data = encrypted_data[:16], encrypted_data[16:]
cipher, _ = AES_new(key, IV=IV)
try:
data = cipher.decrypt(encrypted_data)
data = data.rstrip(' ')
if compression_level:
data = zlib.decompress(data)
return pickle.loads(data)
except Exception, e:
return None
### compute constant CTOKENS
[docs]def initialize_urandom():
"""
This function and the web2py_uuid follow from the following discussion:
`http://groups.google.com/group/web2py-developers/browse_thread/thread/7fd5789a7da3f09`
At startup web2py compute a unique ID that identifies the machine by adding
uuid.getnode() + int(time.time() * 1e3)
This is a 48-bit number. It converts the number into 16 8-bit tokens.
It uses this value to initialize the entropy source ('/dev/urandom') and to seed random.
If os.random() is not supported, it falls back to using random and issues a warning.
"""
node_id = uuid.getnode()
microseconds = int(time.time() * 1e6)
ctokens = [((node_id + microseconds) >> ((i % 6) * 8)) %
256 for i in range(16)]
random.seed(node_id + microseconds)
try:
os.urandom(1)
have_urandom = True
try:
# try to add process-specific entropy
frandom = open('/dev/urandom', 'wb')
try:
if python_version == 2:
frandom.write(''.join(chr(t) for t in ctokens)) # python 2
else:
frandom.write(bytes([]).join(bytes([t]) for t in ctokens)) # python 3
finally:
frandom.close()
except IOError:
# works anyway
pass
except NotImplementedError:
have_urandom = False
logger.warning(
"""Cryptographically secure session management is not possible on your system because
your system does not provide a cryptographically secure entropy source.
This is not specific to web2py; consider deploying on a different operating system.""")
if python_version == 2:
packed = ''.join(chr(x) for x in ctokens) # python 2
else:
packed = bytes([]).join(bytes([x]) for x in ctokens) # python 3
unpacked_ctokens = _struct_2_long_long.unpack(packed)
return unpacked_ctokens, have_urandom
UNPACKED_CTOKENS, HAVE_URANDOM = initialize_urandom()
[docs]def fast_urandom16(urandom=[], locker=threading.RLock()):
"""
This is 4x faster than calling os.urandom(16) and prevents
the "too many files open" issue with concurrent access to os.urandom()
"""
try:
return urandom.pop()
except IndexError:
try:
locker.acquire()
ur = os.urandom(16 * 1024)
urandom += [ur[i:i + 16] for i in xrange(16, 1024 * 16, 16)]
return ur[0:16]
finally:
locker.release()
[docs]def web2py_uuid(ctokens=UNPACKED_CTOKENS):
"""
This function follows from the following discussion:
`http://groups.google.com/group/web2py-developers/browse_thread/thread/7fd5789a7da3f09`
It works like uuid.uuid4 except that tries to use os.urandom() if possible
and it XORs the output with the tokens uniquely associated with this machine.
"""
rand_longs = (random.getrandbits(64), random.getrandbits(64))
if HAVE_URANDOM:
urand_longs = _struct_2_long_long.unpack(fast_urandom16())
byte_s = _struct_2_long_long.pack(rand_longs[0] ^ urand_longs[0] ^ ctokens[0],
rand_longs[1] ^ urand_longs[1] ^ ctokens[1])
else:
byte_s = _struct_2_long_long.pack(rand_longs[0] ^ ctokens[0],
rand_longs[1] ^ ctokens[1])
return str(uuid.UUID(bytes=byte_s, version=4))
REGEX_IPv4 = re.compile('(\d+)\.(\d+)\.(\d+)\.(\d+)')
[docs]def is_valid_ip_address(address):
"""
Examples:
Better than a thousand words::
>>> is_valid_ip_address('127.0')
False
>>> is_valid_ip_address('127.0.0.1')
True
>>> is_valid_ip_address('2001:660::1')
True
"""
# deal with special cases
if address.lower() in ('127.0.0.1', 'localhost', '::1', '::ffff:127.0.0.1'):
return True
elif address.lower() in ('unknown', ''):
return False
elif address.count('.') == 3: # assume IPv4
if address.startswith('::ffff:'):
address = address[7:]
if hasattr(socket, 'inet_aton'): # try validate using the OS
try:
socket.inet_aton(address)
return True
except socket.error: # invalid address
return False
else: # try validate using Regex
match = REGEX_IPv4.match(address)
if match and all(0 <= int(match.group(i)) < 256 for i in (1, 2, 3, 4)):
return True
return False
elif hasattr(socket, 'inet_pton'): # assume IPv6, try using the OS
try:
socket.inet_pton(socket.AF_INET6, address)
return True
except socket.error: # invalid address
return False
else: # do not know what to do? assume it is a valid address
return True
[docs]def is_loopback_ip_address(ip=None, addrinfo=None):
"""
Determines whether the address appears to be a loopback address.
This assumes that the IP is valid.
"""
if addrinfo: # see socket.getaddrinfo() for layout of addrinfo tuple
if addrinfo[0] == socket.AF_INET or addrinfo[0] == socket.AF_INET6:
ip = addrinfo[4]
if not isinstance(ip, basestring):
return False
# IPv4 or IPv6-embedded IPv4 or IPv4-compatible IPv6
if ip.count('.') == 3:
return ip.lower().startswith(('127', '::127', '0:0:0:0:0:0:127',
'::ffff:127', '0:0:0:0:0:ffff:127'))
return ip == '::1' or ip == '0:0:0:0:0:0:0:1' # IPv6 loopback
[docs]def getipaddrinfo(host):
"""
Filter out non-IP and bad IP addresses from getaddrinfo
"""
try:
return [addrinfo for addrinfo in socket.getaddrinfo(host, None)
if (addrinfo[0] == socket.AF_INET or
addrinfo[0] == socket.AF_INET6)
and isinstance(addrinfo[4][0], basestring)]
except socket.error:
return []