3. Flask 源码-上下文线程管理

Flask 上下文源码及扩展

知识点-栈

后进先出,通过列表可以实现一个栈

v = [11,22,33]
v.append(44)
v.pop()
44

知识点-面向对象

__setattr__、__getattr__方法,obj.x 设置对象属性,如果init内没有定义,obj.x = yy 则调用__setattr__方法设置变量,obj.x 使用 __getattr__ 来获取值

```python
class Foo(object):

    def __setattr__(self, key, value):
        print("__setattr__执行")
        print(key,value)

    def __getattr__(self, item):
        print("__getattr__执行")
        print(item)

obj = Foo()
obj.x = 123
obj.x

"""
__setattr__执行
x 123
__getattr__执行
x
"""</code></pre>
<h5><strong>DRF(django rest framework)中request</strong></h5>
<ul>
<li>request-旧
<ul>
<li>request._request</li>
<li>request._request.POST</li>
<li>request._request.GET</li>
</ul></li>
<li>request-新
<ul>
<li>request.data</li>
<li>request.query_params</li>
<li>request.POST  从旧_request获取</li>
<li>request.Data  从旧_request获取</li>
</ul></li>
</ul>
<pre><code class="language-python"># 实现方式
class Local(object):
    def __init__(self):
        # self.storage = {}   # 下述方法self.storage 会调用__getattr__取值,产生递归错误循环
        object.__setattr__(self,"storage",{})

    def __setattr__(self, key, value):
        self.storage[key] = value

    def __getattr__(self, item):
        return self.storage.get(item)

local = Local()
local.x1 = 123
print(local.__dict__['storage'].get('x1'))
print(local.x1)</code></pre>
<pre><code>123
123</code></pre>
<pre><code class="language-python"># 通过下面的代码更清晰直观的了解.点的特性
class ClassA(object):

    def __init__(self, classname):
        self.classname = classname

    def __getattr__(self, attr):
        return('invoke __getattr__', attr)

insA = ClassA('ClassA')
print(insA.__dict__) # 实例insA已经有classname属性了
# {'classname': 'ClassA'}

print(insA.classname) # 不会调用__getattr__
# ClassA

print(insA.grade) # grade属性没有找到,调用__getattr__
# ('invoke __getattr__', 'grade')
</code></pre>
<pre><code>{&#039;classname&#039;: &#039;ClassA&#039;}
ClassA
(&#039;invoke__getattr__&#039;, &#039;grade&#039;)</code></pre>
<h5><strong>线程唯一标识</strong></h5>
<pre><code class="language-python">import threading
from threading import get_ident

def task():
    ident = get_ident()
    print(ident)

for i in range(5):
    t = threading.Thread(target=task)
    t.start()</code></pre>
<pre><code>1091724
1091684
1083212
1088896
1091000</code></pre>
<h5><strong>自定义threading.local</strong></h5>
<pre><code class="language-python">import threading
"""
storage = {
    1097864: {'x1': 0}, 
    1097928: {'x1': 1}, 
    1098012: {'x1': 2}, 
    1097916: {'x1': 3}, 
    1097940: {'x1': 4}
    }
"""
class Local(object):
    def __init__(self):
        object.__setattr__(self,'storage',{})

    def __setattr__(self, key, value):
        ident = threading.get_ident()
        if ident in self.storage:
            self.storage[ident][key] = value
        else:
            self.storage[ident] = {key:value}

    def __getattr__(self, item):
        ident = threading.get_ident()
        if ident not in self.storage:
            return
        return self.storage[ident].get(item)

local = Local()

def task(arg):
    local.x1 = arg
    print(local.x1)
    print(local.__dict__)
    print('ident:', threading.get_ident())

for i in range(5):
    t = threading.Thread(target=task,args=(i,))
    t.start()</code></pre>
<pre><code>0
{&#039;storage&#039;: {1097864: {&#039;x1&#039;: 0}}}
ident: 1097864
1
{&#039;storage&#039;: {1097864: {&#039;x1&#039;: 0}, 1097928: {&#039;x1&#039;: 1}}}
ident: 1097928
2
{&#039;storage&#039;: {1097864: {&#039;x1&#039;: 0}, 1097928: {&#039;x1&#039;: 1}, 1098012: {&#039;x1&#039;: 2}}}
ident: 1098012
3
{&#039;storage&#039;: {1097864: {&#039;x1&#039;: 0}, 1097928: {&#039;x1&#039;: 1}, 1098012: {&#039;x1&#039;: 2}, 1097916: {&#039;x1&#039;: 3}}}
ident: 1097916
4
{&#039;storage&#039;: {1097864: {&#039;x1&#039;: 0}, 1097928: {&#039;x1&#039;: 1}, 1098012: {&#039;x1&#039;: 2}, 1097916: {&#039;x1&#039;: 3}, 1097940: {&#039;x1&#039;: 4}}}
ident: 1097940</code></pre>
<h5><strong>加强版threading.local</strong></h5>
<pre><code class="language-python">import threading
"""
storage = {
    1148492: {'x1': [0]}, 
    1152244: {'x1': [1]}, 
    1152492: {'x1': [2]}, 
    1151512: {'x1': [3]}, 
    1150952: {'x1': [4]}
}
"""
class Local(object):
    def __init__(self):
        object.__setattr__(self,'storage',{})

    def __setattr__(self, key, value):
        ident = threading.get_ident()
        if ident in self.storage:
            self.storage[ident][key].append(value)
        else:
            self.storage[ident] = {key:[value,]}

    def __getattr__(self, item):
        ident = threading.get_ident()
        if ident not in self.storage:
            return
        return self.storage[ident][item][-1]

local = Local()

def task(arg):
    local.x1 = arg
    print(local.__dict__)
    print('ident:', threading.get_ident())
    print(local.x1)

for i in range(5):
    t = threading.Thread(target=task,args=(i,))
    t.start()
</code></pre>
<pre><code>{&#039;storage&#039;: {1151848: {&#039;x1&#039;: [0]}}}
ident: 1151848
0
{&#039;storage&#039;: {1151848: {&#039;x1&#039;: [0]}, 1154220: {&#039;x1&#039;: [1]}}}
ident: 1154220
1
{&#039;storage&#039;: {1151848: {&#039;x1&#039;: [0]}, 1154220: {&#039;x1&#039;: [1]}, 1154188: {&#039;x1&#039;: [2]}}}
ident: 1154188
2
{&#039;storage&#039;: {1151848: {&#039;x1&#039;: [0]}, 1154220: {&#039;x1&#039;: [1]}, 1154188: {&#039;x1&#039;: [2]}, 1154308: {&#039;x1&#039;: [3]}}}
ident: 1154308
3
{&#039;storage&#039;: {1151848: {&#039;x1&#039;: [0]}, 1154220: {&#039;x1&#039;: [1]}, 1154188: {&#039;x1&#039;: [2]}, 1154308: {&#039;x1&#039;: [3]}, 1154312: {&#039;x1&#039;: [4]}}}
ident: 1154312
4</code></pre>
<h5><strong>flask源码关于local的实现(核心)</strong></h5>
<p>为每个线程或协程创建一个独立的空间,使得线程或协程对自己的空间中的数据进行操作(数据隔离和数据安全)</p>
<p>flask1.x版本使用了LocalStack来管理上下文,LocalStack是基于Local即一个字典来实现的上下文管理</p>
<pre><code class="language-python">try:
    # 协程
    from greenlet import getcurrent as get_ident
except ImportError:
    try:
        # 线程
        from thread import get_ident
    except ImportError:
        from _thread import get_ident

"""
__storage__ = {
    1111:{"stack":[waws] }
}
"""
class Local(object):

    def __init__(self):
        # self.__storage__ = {}
        # self.__ident_func__ = get_ident
        object.__setattr__(self, "__storage__", {})
        object.__setattr__(self, "__ident_func__", get_ident)

    def __iter__(self):
        return iter(self.__storage__.items())

    def __release_local__(self):
        self.__storage__.pop(self.__ident_func__(), None)

    def __getattr__(self, name):
        try:
            return self.__storage__[self.__ident_func__()][name]
        except KeyError:
            raise AttributeError(name)

    def __setattr__(self, name, value):
        ident = self.__ident_func__() # 1111
        storage = self.__storage__
        try:
            storage[ident][name] = value
        except KeyError:
            storage[ident] = {name: value}

    def __delattr__(self, name):
        try:
            del self.__storage__[self.__ident_func__()][name]
        except KeyError:
            raise AttributeError(name)

class LocalStack(object):
    def __init__(self):
        self._local = Local()

    def push(self, obj):
        """Pushes a new item to the stack"""
        # self._local.stack ----> Local类里的 __getattr__ 无stack值
        # rv = None
        rv = getattr(self._local, "stack", None)
        if rv is None:
            # self._local.stack = [] -----> __setattr__ ----> Local().__storage__ = {thread_id:{'stack':[]}}
            self._local.stack = rv = []
        rv.append(obj)   # Local().__storage__ = {thread_id:{'stack':["waws"]}}
        return rv

    def pop(self):
        # getattr(self._local, "stack", None) ==》等价于 self._local.stack
        # 因为.stack,没有stack,走__getattr__,self.__storage__[self.__ident_func__()][name],
        # 就拿到了self.__storage__[thread_id]['stack'],在取self.__storage__[thread_id]['stack'][-1]
        # or self.__storage__[thread_id]['stack'].pop()
        stack = getattr(self._local, "stack", None)
        if stack is None:
            return None
        elif len(stack) == 0:
            return None
        # elif len(stack) == 1:
        #     # release_local(self._local)
        #     # del __storage__[1111]
        #     # return stack[-1]
        #     return stack.pop()
        else:
            return stack.pop()

    @property
    def top(self):
        try:
            # 因为.stack,没有stack,走__getattr__,self.__storage__[self.__ident_func__()][name],
            # 就拿到了self.__storage__[thread_id]['stack'],在取self.__storage__[thread_id]['stack'][-1]
            return self._local.stack[-1]
        except (AttributeError, IndexError):
            return None

if __name__ == '__main__':

    obj = LocalStack()
    print(obj.push('waws'))
    print(obj.push('killer'))
    print(obj.top)
    print('-----' * 20)
    print(obj.pop())
    print(obj.pop())
    print(obj.pop())</code></pre>
<pre><code>[&#039;waws&#039;]
[&#039;waws&#039;, &#039;killer&#039;]</code></pre>
<table>
<thead>
<tr>
<th>killer</th>
</tr>
</thead>
<tbody>
<tr>
<td>killer</td>
</tr>
<tr>
<td>waws</td>
</tr>
<tr>
<td>None</td>
</tr>
</tbody>
</table>
<h5>在flask2.x版本以后使用了ContextVar管理上下文</h5>
<pre><code class="language-python"># 导入相关的包作为查看源码的入口
from flask import globals

# 该模块是py3.7之后加入的模块。
from contextvars import ContextVar

s = ContextVar("s") # 实例化对象
print(id(s))  # 2445564771312
values = s.get({}).copy()
print(type(values))  # <class 'dict'>
print(id(values)) # 2445561398336
# print(values)
values['aaa'] = 1223
s.set(values)
print(s.get({}))# {'aaa': 1223}
v = s.get({}).copy()
print(v) # 对象中已经存在的值得对象,相当于执行字典的copy()方法。{'aaa': 1223}
print(id(v)) # 2445564921664 返回一个新的字典。
v['bb'] = '456'
s.set(v)
print(s.get({})) #{'aaa': 1223, 'bb': '456'}</code></pre>
<pre><code>2452248812880
<class &#039;dict&#039;>
2452248682176
{&#039;aaa&#039;: 1223}
{&#039;aaa&#039;: 1223}
2452248950400
{&#039;aaa&#039;: 1223, &#039;bb&#039;: &#039;456&#039;}</code></pre>
<pre><code class="language-python"># Flask 3.0.2
from werkzeug.local import LocalProxy
import typing as t
T = t.TypeVar("T")
# contextvar类似于多线程的thread.local;
# 将slots属性设置为一个元组,元组内写类实例中定义好的属性,不能在动态添加属性,这样做的目的是节省内存空间,因为字典占用的内存空间远大于元组。关于__slots__中的更多作用

class Local:
    """Create a namespace of context-local data. This wraps a
    :class:<code>ContextVar</code> containing a :class:<code>dict</code> value.

    This may incur a performance penalty compared to using individual
    context vars, as it has to copy data to avoid mutating the dict
    between nested contexts.

    :param context_var: The :class:<code>~contextvars.ContextVar</code> to use as
        storage for this local. If not given, one will be created.
        Context vars not created at the global scope may interfere with
        garbage collection.

    .. versionchanged:: 2.0
        Uses `<code>ContextVar</code>` instead of a custom storage implementation.
    """

    __slots__ = ("__storage",)

    def __init__(self, context_var: ContextVar[dict[str, t.Any]] | None = None) -> None:
        if context_var is None:
            # A ContextVar not created at global scope interferes with
            # Python's garbage collection. However, a local only makes
            # sense defined at the global scope as well, in which case
            # the GC issue doesn't seem relevant.
            context_var = ContextVar(f"werkzeug.Local<{id(self)}>.storage")

        object.__setattr__(self, "_Local__storage", context_var)

    def __iter__(self) -> t.Iterator[tuple[str, t.Any]]:
        return iter(self.__storage.get({}).items())

    def __call__(self, name: str, *, unbound_message: str | None = None) -> LocalProxy:
        """Create a :class:<code>LocalProxy</code> that access an attribute on this
        local namespace.

        :param name: Proxy this attribute.
        :param unbound_message: The error message that the proxy will
            show if the attribute isn't set.
        """
        return LocalProxy(self, name, unbound_message=unbound_message)

    def __release_local__(self) -> None:
        self.__storage.set({})

    def __getattr__(self, name: str) -> t.Any:
        values = self.__storage.get({})

        if name in values:
            return values[name]

        raise AttributeError(name)

    def __setattr__(self, name: str, value: t.Any) -> None:
        values = self.__storage.get({}).copy()
        values[name] = value
        self.__storage.set(values)

    def __delattr__(self, name: str) -> None:
        values = self.__storage.get({})

        if name in values:
            values = values.copy()
            del values[name]
            self.__storage.set(values)
        else:
            raise AttributeError(name)

class LocalStack(t.Generic[T]):
    """Create a stack of context-local data. This wraps a
    :class:<code>ContextVar</code> containing a :class:<code>list</code> value.

    This may incur a performance penalty compared to using individual
    context vars, as it has to copy data to avoid mutating the list
    between nested contexts.

    :param context_var: The :class:<code>~contextvars.ContextVar</code> to use as
        storage for this local. If not given, one will be created.
        Context vars not created at the global scope may interfere with
        garbage collection.

    .. versionchanged:: 2.0
        Uses `<code>ContextVar</code>` instead of a custom storage implementation.

    .. versionadded:: 0.6.1
    """

    __slots__ = ("_storage",)

    def __init__(self, context_var: ContextVar[list[T]] | None = None) -> None:
        if context_var is None:
            # A ContextVar not created at global scope interferes with
            # Python's garbage collection. However, a local only makes
            # sense defined at the global scope as well, in which case
            # the GC issue doesn't seem relevant.
            context_var = ContextVar(f"werkzeug.LocalStack<{id(self)}>.storage")

        self._storage = context_var

    def __release_local__(self) -> None:
        self._storage.set([])

    def push(self, obj: T) -> list[T]:
        """Add a new item to the top of the stack."""
        stack = self._storage.get([]).copy()
        stack.append(obj)
        self._storage.set(stack)
        return stack

    def pop(self) -> T | None:
        """Remove the top item from the stack and return it. If the
        stack is empty, return `<code>None</code>`.
        """
        stack = self._storage.get([])

        if len(stack) == 0:
            return None

        rv = stack[-1]
        self._storage.set(stack[:-1])
        return rv

    @property
    def top(self) -> T | None:
        """The topmost item on the stack.  If the stack is empty,
        <code>None</code> is returned.
        """
        stack = self._storage.get([])

        if len(stack) == 0:
            return None

        return stack[-1]

    def __call__(
        self, name: str | None = None, *, unbound_message: str | None = None
    ) -> LocalProxy:
        """Create a :class:<code>LocalProxy</code> that accesses the top of this
        local stack.

        :param name: If given, the proxy access this attribute of the
            top item, rather than the item itself.
        :param unbound_message: The error message that the proxy will
            show if the stack is empty.
        """
        return LocalProxy(self, name, unbound_message=unbound_message)

if __name__ == '__main__':
    obj = LocalStack()
    print(obj.push('waws'))
    print(obj.push('killer'))
    print(obj.top)
    print('-----' * 20)
    print(obj.pop())
    print(obj.pop())
    print(obj.pop())</code></pre>
<pre><code>[&#039;waws&#039;]
[&#039;waws&#039;, &#039;killer&#039;]</code></pre>
<table>
<thead>
<tr>
<th>killer</th>
</tr>
</thead>
<tbody>
<tr>
<td>killer</td>
</tr>
<tr>
<td>waws</td>
</tr>
<tr>
<td>None</td>
</tr>
</tbody>
</table>
<h4><strong>总结</strong></h4>
<pre><code class="language-txt">在flask中有个local类,他和threading.local的功能一样,为每个线程开辟空间进行存取数据,
他们两个的内部实现机制,内部维护一个字典,以线程(协程)ID为key,进行数据隔离,如:
    __storage__ = {
        1211:{'k1':123}
    }

    obj = Local()
    obj.k1 = 123

在flask中还有一个LocalStack的类,他内部会依赖local对象,local对象负责存储数据,localstack对象用于将local中的值维护成一个栈。
    __storage__ = {
        1211:{'stack':['k1',]}
    }

    obj= LocalStack()
    obj.push('k1')
    obj.top
    obj.pop()

##### flask源码中总共有2个localstack对象 flask == 1.1.1
上下文管理:
* 请求上下文管理
* 应用上下文管理
```py
# context locals
__storage__ = {
    1111:{'stack':[RequestContext(reqeust,session),]},
    1123:{'stack':[RequestContext(reqeust,session),]},
}
_request_ctx_stack = LocalStack()

__storage__ = {
    1111:{'stack':[AppContenxt(app,g),]}
    1123:{'stack':[AppContenxt(app,g),]},
}
_app_ctx_stack = LocalStack()

_request_ctx_stack.push('waws')
_app_ctx_stack.push('killer')
flask源码中总共有2个localstack对象 flask == 3.0.2
_no_app_msg = """\
Working outside of application context.

This typically means that you attempted to use functionality that needed
the current application. To solve this, set up an application context
with app.app_context(). See the documentation for more information.\
"""
_cv_app: ContextVar[AppContext] = ContextVar("flask.app_ctx")
app_ctx: AppContext = LocalProxy(  # type: ignore[assignment]
    _cv_app, unbound_message=_no_app_msg
)
current_app: Flask = LocalProxy(  # type: ignore[assignment]
    _cv_app, "app", unbound_message=_no_app_msg
)
g: _AppCtxGlobals = LocalProxy(  # type: ignore[assignment]
    _cv_app, "g", unbound_message=_no_app_msg
)

_no_req_msg = """\
Working outside of request context.

This typically means that you attempted to use functionality that needed
an active HTTP request. Consult the documentation on testing for
information about how to avoid this problem.\
"""
_cv_request: ContextVar[RequestContext] = ContextVar("flask.request_ctx")
request_ctx: RequestContext = LocalProxy(  # type: ignore[assignment]
    _cv_request, unbound_message=_no_req_msg
)
request: Request = LocalProxy(  # type: ignore[assignment]
    _cv_request, "request", unbound_message=_no_req_msg
)
session: SessionMixin = LocalProxy(  # type: ignore[assignment]
    _cv_request, "session", unbound_message=_no_req_msg
)