session_util.py 976 Bytes
Newer Older
lcn's avatar
lcn committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
import asyncio_redis

from sanic import Sanic
from sanic.response import text
from sanic_session import Session, RedisSessionInterface

app = Sanic()


class Redis:
    """
    A simple wrapper class that allows you to share a connection
    pool across your application.
    """
    _pool = None

    async def get_redis_pool(self):
        if not self._pool:
            self._pool = await asyncio_redis.Pool.create(
                host='172.18.1.253', port=6379, db=2, poolsize=10
            )

        return self._pool


redis = Redis()

Session(app, interface=RedisSessionInterface(redis.get_redis_pool))


@app.route("/")
async def test(request):
    # interact with the session like a normal dict
    if not request.ctx.session.get('foo'):
        request.ctx.session['foo'] = 0

    request.ctx.session['foo'] += 1

    response = text(request.ctx.session['foo'])

    return response


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8001, debug=True)