0%

tornado.web.RequestHandler 安全 Cookie 与 XSRF 防护

引言

网站安全不外乎就是信息安全的三个基本要点,简单来说就是:

  1. 机密性,网站要保护用户的隐私数据不被窃取,账号不被盗用,面对的主要的攻击方式是植入木马;
  2. 可用性,网站要保证其服务是可用的,面对的主要的攻击方式是DOS和DDOS;
  3. 完整性,网站要确保用户请求信息或数据不被未授权的篡改或在篡改后能够被迅速发现,面对主要的攻击方式是XSS和CSRF;

这篇笔记主要分析 Tornado 对 CSRF 的防范。众所周知 HTTP 协议是一个无状态协议,这便意味着每次请求都是独立的,要在两次请求之间共享数据(或者称为保持会话状态)便必须在请求时重传数据。 Cookie 和 Session 分别是客户端和服务端保持会话状态的机制。 Tornado Web 框架中已经默认对 Cookie 提供了支持,而对 Session 却没有提供内置的 Session 实现。我猜测这是因为考虑到现在的 Web 应用往往比较复杂并需要分布式存储 Session 数据,若要实现一个大而全 Session 来满足复杂的应能用场景基本上不可能,干脆就不内置而交由应用去自行根据需求来实现。

通常 Cookie 的使用不当往往导致安全问题, Tornado 在设计时便考虑到这个问题,为此内置 secure cookies 来阻止客户端非法修改 cookie 数据,以此来防范常见的 cookie 安全漏洞。Tornado 官方文档中专门用了一节 《认证和安全》 来介绍相关内容(中文翻译:https://tornado-zh.readthedocs.org/zh/latest/guide/security.html ),在网上也有很多介绍这方面的资料,比如:http://demo.pythoner.com/itt2zh/ch6.html#ch6-2-1 就写的很好,所以这篇笔记不再从 “如何使用 secure cookies 来编写安全应用” 的角度来讨论,而是结合代码分析实现原理。

Secure Cookies

Cookies 是不安全的,可以被客户端轻易修改和伪造,Tornado 的 Secure Cookies 使用加密签名来验证 Cookie 数据是否被非法修改过,签名后的 Cookie 数据包括时间戳、 HMAC 签名和编码后的 cookie 值等信。tornado.web.RequestHandler 通过 set_secure_cookie()get_secure_cookie() 方法来设置和获取 Secure Cookies,因为 HMAC 签名密钥是由 tornado.web.Application 实例来提供的,所以在实例化 tornado.web.Application 时必须在 settings 中提供 cookie_secret 参数才能使用安全 Cookies。否则,self.require_setting("cookie_secret", "secure cookies") 会抛出未设置 cookie_secret 异常。

cookie_secret 参数是一个随机字节序列,用来制作 HMAC 签名,可以使用下面的代码来生成:

1
2
>>> import base64, uuid
>>> base64.b64encode(uuid.uuid4().bytes + uuid.uuid4().bytes)

set_secure_cookie 方法是对 set_cookie 方法的包装,其方法签名中需要注意的是 expires_daysversion 参数。

  1. expires_days 按照 HTTP 协议的规定,用于指示客户端该 Cookie 的有效期,默认 30 天。这个参数与 get_secure_cookie 方法的 max_age_days 参数严格来讲没有必然的联系,我们可以使用一个小于 expires_daysmax_age_days 值在服务端控制安全 Cookie 的有效期(这个是与安全 Cookie 中的时间戳比较得到的,后面会从代码中看到)。

  2. version 参数的引入主要是兼容旧的签名方式(版本号为 1 ,使用 SHA1 签名),目前默认使用新的签名方式(版本号为 2,使用 SHA256 签名)。两种方式不仅签名算法不同,签名输出的 Cookie 数据字段和格式也不同。

1
2
3
4
5
6
7
8
9
def set_secure_cookie(self, name, value, expires_days=30, version=None, **kwargs):
self.set_cookie(name, self.create_signed_value(name, value,
version=version),
expires_days=expires_days, **kwargs)

def create_signed_value(self, name, value, version=None):
self.require_setting("cookie_secret", "secure cookies")
return create_signed_value(self.application.settings["cookie_secret"],
name, value, version=version)

上面代码明确调用 self.require_setting("cookie_secret", "secure cookies") 来检查是否已经设置有签名密钥。

create_signed_value 函数的代码及相关签名函数如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
DEFAULT_SIGNED_VALUE_VERSION = 2

def create_signed_value(secret, name, value, version=None, clock=None):
if version is None:
version = DEFAULT_SIGNED_VALUE_VERSION
if clock is None:
clock = time.time
timestamp = utf8(str(int(clock())))
value = base64.b64encode(utf8(value))
if version == 1:
signature = _create_signature_v1(secret, name, value, timestamp)
value = b"|".join([value, timestamp, signature])
return value
elif version == 2:
# The v2 format consists of a version number and a series of
# length-prefixed fields "%d:%s", the last of which is a
# signature, all separated by pipes. All numbers are in
# decimal format with no leading zeros. The signature is an
# HMAC-SHA256 of the whole string up to that point, including
# the final pipe.
#
# The fields are:
# - format version (i.e. 2; no length prefix)
# - key version (currently 0; reserved for future key rotation features)
# - timestamp (integer seconds since epoch)
# - name (not encoded; assumed to be ~alphanumeric)
# - value (base64-encoded)
# - signature (hex-encoded; no length prefix)
def format_field(s):
return utf8("%d:" % len(s)) + utf8(s)
to_sign = b"|".join([
b"2|1:0",
format_field(timestamp),
format_field(name),
format_field(value),
b''])
signature = _create_signature_v2(secret, to_sign)
return to_sign + signature
else:
raise ValueError("Unsupported version %d" % version)

def _create_signature_v1(secret, *parts):
hash = hmac.new(utf8(secret), digestmod=hashlib.sha1)
for part in parts:
hash.update(utf8(part))
return utf8(hash.hexdigest())


def _create_signature_v2(secret, s):
hash = hmac.new(utf8(secret), digestmod=hashlib.sha256)
hash.update(utf8(s))
return utf8(hash.hexdigest())

Cookie 值通过 value = base64.b64encode(utf8(value)) 进行 base64 编码转换,所以 set_secure_cookie 能支持任意的字符,这与 set_cookie 方法不同(python2 是转换为 str,python3 时转换为 unicode string,且不允许输入 “\x00-\x20” 之间的字符,其实现代码中由正则表达式来检查)。

模块变量 DEFAULT_SIGNED_VALUE_VERSION 硬编码指示默认的签名版本是 2,除非调用 set_secure_cookie 时指定版本号。两个版本之间的数据格式看代码就很明确,版本 1 就是简单的 “value|timestamp|signature” 拼接,版本 2 多了几个字段,并且记录值的字符串长度,尤其是预留的 key_version 字段为后续轮流使用多个 cookie_secret 提供了支持(我目前分析使用的 Tornado v4.0.1 还没有实现这个特性,但最新的 v4.3 已经默认支持,具体我没有去查看代码,感兴趣可以去看看),并且对整个字符串进行签名(版本 1 仅仅对 value 进行了签名),这样可以大大增加安全系数。这里额外提一下时间戳(timestamp)字段,由于客户端在发送 Cookie 时并不会提供有效期,为了能够准确控制有效期,这里将 Cookie 生成的时间戳写入值当中,以便后续在服务端进行有效期验证。

get_secure_cookie 方法签名中的 value 参数指的是通过 set_secure_cookie 加密签名后的 Cookie 值,默认是 None 则会从客户端发送回来的 Cookies 中获取指定名称的 Cookie 值作为 value,再进行签名验证,传入的 max_age_daysmin_version 将对 Cookie 做进一步比较验证,验证通过以后返回 base64 解码的 Cookie 值(也就是下面注释中说的不论 python 的版本,返回的是 byte string,与 get_cookie 方法不同。get_cookie 方法在 python3 中返回的是 unicode string。)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def get_secure_cookie(self, name, value=None, max_age_days=31, min_version=None):
"""Returns the given signed cookie if it validates, or None.

The decoded cookie value is returned as a byte string (unlike
`get_cookie`).

.. versionchanged:: 3.2.1

Added the ``min_version`` argument. Introduced cookie version 2;
both versions 1 and 2 are accepted by default.
"""
self.require_setting("cookie_secret", "secure cookies")
if value is None:
value = self.get_cookie(name)
return decode_signed_value(self.application.settings["cookie_secret"],
name, value, max_age_days=max_age_days,
min_version=min_version)

decode_signed_value 方法的相关代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
DEFAULT_SIGNED_VALUE_MIN_VERSION = 1

# A leading version number in decimal with no leading zeros, followed by a pipe.
_signed_value_version_re = re.compile(br"^([1-9][0-9]*)\|(.*)$")


def decode_signed_value(secret, name, value, max_age_days=31, clock=None, min_version=None):
if clock is None:
clock = time.time
if min_version is None:
min_version = DEFAULT_SIGNED_VALUE_MIN_VERSION
if min_version > 2:
raise ValueError("Unsupported min_version %d" % min_version)
if not value:
return None

# Figure out what version this is. Version 1 did not include an
# explicit version field and started with arbitrary base64 data,
# which makes this tricky.
value = utf8(value)
m = _signed_value_version_re.match(value)
if m is None:
version = 1
else:
try:
version = int(m.group(1))
if version > 999:
# Certain payloads from the version-less v1 format may
# be parsed as valid integers. Due to base64 padding
# restrictions, this can only happen for numbers whose
# length is a multiple of 4, so we can treat all
# numbers up to 999 as versions, and for the rest we
# fall back to v1 format.
version = 1
except ValueError:
version = 1

if version < min_version:
return None
if version == 1:
return _decode_signed_value_v1(secret, name, value, max_age_days, clock)
elif version == 2:
return _decode_signed_value_v2(secret, name, value, max_age_days, clock)
else:
return None


def _decode_signed_value_v1(secret, name, value, max_age_days, clock):
parts = utf8(value).split(b"|")
if len(parts) != 3:
return None
signature = _create_signature_v1(secret, name, parts[0], parts[1])
if not _time_independent_equals(parts[2], signature):
gen_log.warning("Invalid cookie signature %r", value)
return None
timestamp = int(parts[1])
if timestamp < clock() - max_age_days * 86400:
gen_log.warning("Expired cookie %r", value)
return None
if timestamp > clock() + 31 * 86400:
# _cookie_signature does not hash a delimiter between the
# parts of the cookie, so an attacker could transfer trailing
# digits from the payload to the timestamp without altering the
# signature. For backwards compatibility, sanity-check timestamp
# here instead of modifying _cookie_signature.
gen_log.warning("Cookie timestamp in future; possible tampering %r", value)
return None
if parts[1].startswith(b"0"):
gen_log.warning("Tampered cookie %r", value)
return None
try:
return base64.b64decode(parts[0])
except Exception:
return None


def _decode_signed_value_v2(secret, name, value, max_age_days, clock):
def _consume_field(s):
length, _, rest = s.partition(b':')
n = int(length)
field_value = rest[:n]
# In python 3, indexing bytes returns small integers; we must
# use a slice to get a byte string as in python 2.
if rest[n:n + 1] != b'|':
raise ValueError("malformed v2 signed value field")
rest = rest[n + 1:]
return field_value, rest
rest = value[2:] # remove version number
try:
key_version, rest = _consume_field(rest)
timestamp, rest = _consume_field(rest)
name_field, rest = _consume_field(rest)
value_field, rest = _consume_field(rest)
except ValueError:
return None
passed_sig = rest
signed_string = value[:-len(passed_sig)]
expected_sig = _create_signature_v2(secret, signed_string)
if not _time_independent_equals(passed_sig, expected_sig):
return None
if name_field != utf8(name):
return None
timestamp = int(timestamp)
if timestamp < clock() - max_age_days * 86400:
# The signature has expired.
return None
try:
return base64.b64decode(value_field)
except Exception:
return None

if hasattr(hmac, 'compare_digest'): # python 3.3
_time_independent_equals = hmac.compare_digest
else:
def _time_independent_equals(a, b):
if len(a) != len(b):
return False
result = 0
if isinstance(a[0], int): # python3 byte strings
for x, y in zip(a, b):
result |= x ^ y
else: # python2
for x, y in zip(a, b):
result |= ord(x) ^ ord(y)
return result == 0

_signed_value_version_re 正则表达式用于获取签名所用的版本号,对于旧版本(版本 1 )加密签名的 cookie 数据中没有版本号这个字段,默认取 1。然后与指定的 min_version 进行比较,仅当大于等于 min_version 才进行下一步验证。版本 1 由函数 _decode_signed_value_v1 验证,版本 2 由 函数 _decode_signed_value_v2 验证,这两个函数主要就是按照对应签名格式解析数据,并对目标签名和时间戳等字段进行比较验证。需要说一下的是由于版本 1 的设计缺陷,没有对 timestamp 进行签名,为了尽可能防止攻击者篡改时间戳来进行攻击, _decode_signed_value_v1 函数对 timestamp 执行了额外的检查(timestamp > clock() + 31 * 86400),但这个检查并不能完全杜绝此类攻击。这应该也是重新设计版本 2 的一个原因。

XSRF

XSRF(Cross-site request forgery,跨站请求伪造),也被简写为 CSRF,发音为”sea surf”,这是一个常见的安全漏洞(这里有一篇写的不错的文章《浅谈CSRF攻击方式》,有兴趣的可以简单了解下)。通常 Synchronizer token pattern 是一种常见的解决方案,该方案利用了第三方站点无法访问 cookie 的限制,为每个客户端设置一个不同的 token,并将其存储在 cookie 中。当用户发起有副作用的 HTTP 请求时,则必须携带一个包含该 token 的参数(也可以通过 Http Header 传递),服务端将对存储在 cookie 和请求参数中的 token 进行比较,以防止潜在的跨站请求伪造。

生成 xsrf_token

tornado.web.RequestHandler 中与生成跨站请求伪造 token 直接相关的是 xsrf_token 属性和 xsrf_form_html 方法。

  1. xsrf_token 属性在首次访问时会为客户端设置一个名为 _xsrf 的 cookie,其值变为前面所说的 token。token 有两个版本,版本号分别为 1 和 2,若没有在应用中设置 xsrf_cookie_version 参数则默认使用版本 2。版本 2 为每次请求都生成一个随机的掩码,相比较版本 1 而言安全性大大增强。

  2. xsrf_form_html 就是返回一个隐藏的 HTML < input/> 元素,用于包含在页面的 Form 元素中以便在 POST 请求时将 token 发送给服务端验证。

详细代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@property
def xsrf_token(self):
if not hasattr(self, "_xsrf_token"):
version, token, timestamp = self._get_raw_xsrf_token()
output_version = self.settings.get("xsrf_cookie_version", 2)
if output_version == 1:
self._xsrf_token = binascii.b2a_hex(token)
elif output_version == 2:
mask = os.urandom(4)
self._xsrf_token = b"|".join([
b"2",
binascii.b2a_hex(mask),
binascii.b2a_hex(_websocket_mask(mask, token)),
utf8(str(int(timestamp)))])
else:
raise ValueError("unknown xsrf cookie version %d",
output_version)
if version is None:
expires_days = 30 if self.current_user else None
self.set_cookie("_xsrf", self._xsrf_token,
expires_days=expires_days)
return self._xsrf_token

def xsrf_form_html(self):
"""An HTML ``<input/>`` element to be included with all POST forms.

It defines the ``_xsrf`` input value, which we check on all POST
requests to prevent cross-site request forgery. If you have set
the ``xsrf_cookies`` application setting, you must include this
HTML within all of your HTML forms.

In a template, this method should be called with ``{% module
xsrf_form_html() %}``

See `check_xsrf_cookie()` above for more information.
"""
return '<input type="hidden" name="_xsrf" value="' + \
escape.xhtml_escape(self.xsrf_token) + '"/>'

上述两个方法关联的另外几个方法是 _get_raw_xsrf_token_decode_xsrf_token。首次访问 _get_raw_xsrf_token 方法时,将尝试为当前用户请求生成 token(若已经生成,则直接从 cookie “_xsrf” 中获取),并赋值给 handler 的 _raw_xsrf_token 字段。_decode_xsrf_token 方法将 token 解析为 (version, token, timestamp) 元组返回(兼容版本 1 ,版本 1 中没有 version 和 timestamp 字段)。代码很简单,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def _get_raw_xsrf_token(self):
"""Read or generate the xsrf token in its raw form.

The raw_xsrf_token is a tuple containing:

* version: the version of the cookie from which this token was read,
or None if we generated a new token in this request.
* token: the raw token data; random (non-ascii) bytes.
* timestamp: the time this token was generated (will not be accurate
for version 1 cookies)
"""
if not hasattr(self, '_raw_xsrf_token'):
cookie = self.get_cookie("_xsrf")
if cookie:
version, token, timestamp = self._decode_xsrf_token(cookie)
else:
version, token, timestamp = None, None, None
if token is None:
version = None
token = os.urandom(16)
timestamp = time.time()
self._raw_xsrf_token = (version, token, timestamp)
return self._raw_xsrf_token

def _decode_xsrf_token(self, cookie):
"""Convert a cookie string into a the tuple form returned by
_get_raw_xsrf_token.
"""
m = _signed_value_version_re.match(utf8(cookie))
if m:
version = int(m.group(1))
if version == 2:
_, mask, masked_token, timestamp = cookie.split("|")
mask = binascii.a2b_hex(utf8(mask))
token = _websocket_mask(
mask, binascii.a2b_hex(utf8(masked_token)))
timestamp = int(timestamp)
return version, token, timestamp
else:
# Treat unknown versions as not present instead of failing.
return None, None, None
else:
version = 1
try:
token = binascii.a2b_hex(utf8(cookie))
except (binascii.Error, TypeError):
token = utf8(cookie)
# We don't have a usable timestamp in older versions.
timestamp = int(time.time())
return (version, token, timestamp)

检查 xsrf_token

对 xsrf_token 的检查在 _execute 方法中委托 check_xsrf_cookie 方法进行,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@gen.coroutine
def _execute(self, transforms, *args, **kwargs):
…………
# If XSRF cookies are turned on, reject form submissions without
# the proper cookie
if self.request.method not in ("GET", "HEAD", "OPTIONS") and \
self.application.settings.get("xsrf_cookies"):
self.check_xsrf_cookie()
…………

def check_xsrf_cookie(self):
"""Verifies that the ``_xsrf`` cookie matches the ``_xsrf`` argument.

To prevent cross-site request forgery, we set an ``_xsrf``
cookie and include the same value as a non-cookie
field with all ``POST`` requests. If the two do not match, we
reject the form submission as a potential forgery.

The ``_xsrf`` value may be set as either a form field named ``_xsrf``
or in a custom HTTP header named ``X-XSRFToken`` or ``X-CSRFToken``
(the latter is accepted for compatibility with Django).

See http://en.wikipedia.org/wiki/Cross-site_request_forgery

Prior to release 1.1.1, this check was ignored if the HTTP header
``X-Requested-With: XMLHTTPRequest`` was present. This exception
has been shown to be insecure and has been removed. For more
information please see
http://www.djangoproject.com/weblog/2011/feb/08/security/
http://weblog.rubyonrails.org/2011/2/8/csrf-protection-bypass-in-ruby-on-rails

.. versionchanged:: 3.2.2
Added support for cookie version 2. Both versions 1 and 2 are
supported.
"""
token = (self.get_argument("_xsrf", None) or
self.request.headers.get("X-Xsrftoken") or
self.request.headers.get("X-Csrftoken"))
if not token:
raise HTTPError(403, "'_xsrf' argument missing from POST")
_, token, _ = self._decode_xsrf_token(token)
_, expected_token, _ = self._get_raw_xsrf_token()
if not _time_independent_equals(utf8(token), utf8(expected_token)):
raise HTTPError(403, "XSRF cookie does not match POST argument")

check_xsrf_cookie 方法代码显示与 cookie 中的 token 进行比较的 token 来源于请求参数 _xsrf 或者 HTTP 头域(X-Xsrftoken 或者 X-Csrftoken)。目前仅比较 token 值,对其中的 timestamp 和 version 字段不做比较验证。

由上可见,xsrf cookies 的生成仅与是否访问 xsrf_token 属性相关,要进行验证则需要为应用设置 xsrf_cookies 为 True。