aiohttp 模块详解 0. 前言 本文翻译自aiohttp的官方文档,如有纰漏,欢迎指出。
aiohttp分为服务器端和客户端,本文只介绍客户端。
另外我已经对 aiohttp 和 asyncio进行了封装,可以参考我的 github 地址:
https://github.com/web-trump/ahttp
由于上下文的缘故,请求代码必须在一个异步的函数中进行:
1. aiohttp安装
1.1. 基本请求用法 1 2 async with aiohttp.get('https://github.com' ) as r: await r.text()
其中r.text(), 可以在括号中指定解码方式,编码方式,例如
1 await resp.text(encoding='windows-1251' )
或者也可以选择不编码,适合读取图像等,是无法编码的
2.发起一个session请求 首先是导入aiohttp模块:
然后我们试着获取一个web源码,这里以GitHub的公共Time-line页面为例:
1 2 3 4 async with aiohttp.ClientSession() as session: async with session.get('https://api.github.com/events' ) as resp: print (resp.status) print (await resp.text())
上面的代码中,我们创建了一个 ClientSession 对象命名为session,然后通过session的get方法得到一个 ClientResponse 对象,命名为resp,get方法中传入了一个必须的参数url,就是要获得源码的http url。至此便通过协程完成了一个异步IO的get请求。
有get请求当然有post请求,并且post请求也是一个协程:
1 session.post('http://httpbin.org/post' , data=b'data' )
用法和get是一样的,区别是post需要一个额外的参数data,即是需要post的数据。
除了get和post请求外,其他http的操作方法也是一样的:
1 2 3 4 5 6 7 8 9 session.put('http://httpbin.org/put' , data=b'data' ) session.delete('http://httpbin.org/delete' ) session.head('http://httpbin.org/get' ) session.options('http://httpbin.org/get' ) session.patch('http://httpbin.org/patch' , data=b'data' )
小记:
不要为每次的连接都创建一次session,一般情况下只需要创建一个session,然后使用这个session执行所有的请求。
每个session对象,内部包含了一个连接池,并且将会保持连接和连接复用(默认开启)可以加快整体的性能。
3.在URL中传递参数 我们经常需要通过 get 在url中传递一些参数,参数将会作为url问号后面的一部分发给服务器。在aiohttp的请求中,允许以dict的形式来表示问号后的参数。举个例子,如果你想传递 key1=value1 key2=value2 到 httpbin.org/get 你可以使用下面的代码:
1 2 3 params = {'key1' : 'value1' , 'key2' : 'value2' }async with session.get('http://httpbin.org/get' , params=params) as resp: assert resp.url == 'http://httpbin.org/get?key2=value2&key1=value1'
可以看到,代码正确的执行了,说明参数被正确的传递了进去。不管是一个参数两个参数,还是更多的参数,都可以通过这种方式来传递。除了这种方式之外,还有另外一个,使用一个 list 来传递(这种方式可以传递一些特殊的参数,例如下面两个key是相等的也可以正确传递):
1 2 3 params = [('key' , 'value1' ), ('key' , 'value2' )]async with session.get('http://httpbin.org/get' , params=params) as r: assert r.url == 'http://httpbin.org/get?key=value2&key=value1'
除了上面两种,我们也可以直接通过传递字符串作为参数来传递,但是需要注意,通过字符串传递的特殊字符不会被编码:
1 2 async with session.get('http://httpbin.org/get' , params='key=value+1' ) as r: assert r.url == 'http://httpbin.org/get?key=value+1'
4.响应的内容 还是以GitHub的公共Time-line页面为例,我们可以获得页面响应的内容:
1 2 async with session.get('https://api.github.com/events' ) as resp: print (await resp.text())
运行之后,会打印出类似于如下的内容:
1 '[{"created_at":"2015-06-12T14:06:22Z","public":true,"actor":{...
resp的text方法,会自动将服务器端返回的内容进行解码–decode,当然我们也可以自定义编码方式:
1 await resp.text(encoding='gb2312' )
除了text方法可以返回解码后的内容外,我们也可以得到类型是字节的内容:
1 print (await resp.read())
运行的结果是:
1 b'[{"created_at":"2015-06-12T14:06:22Z","public":true,"actor":{...
gzip和deflate转换编码已经为你自动解码。
小记:
text(),read()方法是把整个响应体读入内存,如果你是获取大量的数据,请考虑使用”字节流“(streaming response)
5.特殊响应内容:json 如果我们获取的页面的响应内容是json,aiohttp内置了更好的方法来处理json:
1 2 async with session.get('https://api.github.com/events' ) as resp: print (await resp.json())
如果因为某种原因而导致resp.json()解析json失败,例如返回不是json字符串等等,那么resp.json()将抛出一个错误,也可以给json()方法指定一个解码方式:
1 print (await resp.json(encoding='gb2312' ))
或者传递一个函数进去:
1 print (await resp.json(lambda (x:x.replace('a' ,'b' ))))
6.以字节流的方式读取响应内容 虽然json(),text(),read()很方便的能把响应的数据读入到内存,但是我们仍然应该谨慎的使用它们,因为它们是把整个的响应体全部读入了内存。即使你只是想下载几个字节大小的文件,但这些方法却将在内存中加载所有的数据。所以我们可以通过控制字节数来控制读入内存的响应内容:
1 2 async with session.get('https://api.github.com/events' ) as resp: await resp.content.read(10 )
一般地,我们应该使用以下的模式来把读取的字节流保存到文件中:
1 2 3 4 5 6 with open (filename, 'wb' ) as fd: while True : chunk = await resp.content.read(chunk_size) if not chunk: break fd.write(chunk)
7.自定义请求头 如果你想添加请求头,可以像get添加参数那样以dict的形式,作为get或者post的参数进行请求:
1 2 3 4 5 import json url = 'https://api.github.com/some/endpoint' payload = {'some' : 'data' } headers = {'content-type' : 'application/json' }await session.post(url, data=json.dumps(payload), headers=headers)
8.自定义Cookie 给服务器发送cookie,可以通过给 ClientSession 传递一个cookie参数:
1 2 3 4 5 url = 'http://httpbin.org/cookies' cookies = {'cookies_are' : 'working' }async with ClientSession(cookies=cookies) as session: async with session.get(url) as resp: assert await resp.json() == {"cookies" : {"cookies_are" : "working" }}
可直接访问链接 “httpbin.org/cookies”查看当前cookie,访问session中的cookie请见第10节。
9.post数据的几种方式 (1)模拟表单post数据 1 2 3 payload = {'key1' : 'value1' , 'key2' : 'value2' }async with session.post('http://httpbin.org/post' , data=payload) as resp: print (await resp.text())
注意:data=dict的方式post的数据将被转码,和form提交数据是一样的作用,如果你不想被转码,可以直接以字符串的形式 data=str 提交,这样就不会被转码。
(2)post json 1 2 3 4 5 import json url = 'https://api.github.com/some/endpoint' payload = {'some' : 'data' }async with session.post(url, data=json.dumps(payload)) as resp: ...
其实json.dumps(payload)返回的也是一个字符串,只不过这个字符串可以被识别为json格式
(3)post 小文件 1 2 3 url = 'http://httpbin.org/post' files = {'file' : open ('report.xls' , 'rb' )}await session.post(url, data=files)
可以设置好文件名和content-type:
1 2 3 4 5 url = 'http://httpbin.org/post' data = FormData() data.add_field('file' , open ('report.xls' , 'rb' ), filename='report.xls' , content_type='application/vnd.ms-excel' )await session.post(url, data=data)
如果将文件对象设置为数据参数,aiohttp将自动以字节流的形式发送给服务器。
(4)post 大文件 aiohttp支持多种类型的文件以流媒体的形式上传,所以我们可以在文件未读入内存的情况下发送大文件。
1 2 3 4 5 6 7 8 9 10 @aiohttp.streamer def file_sender (writer, file_name=None ): with open (file_name, 'rb' ) as f: chunk = f.read(2 **16 ) while chunk: yield from writer.write(chunk) chunk = f.read(2 **16 )async with session.post('http://httpbin.org/post' , data=file_sender(file_name='huge_file' )) as resp: print (await resp.text())
同时我们可以从一个url获取文件后,直接post给另一个url,并计算hash值:
1 2 3 4 5 6 7 8 9 10 11 12 13 async def feed_stream (resp, stream ): h = hashlib.sha256() while True : chunk = await resp.content.readany() if not chunk: break h.update(chunk) stream.feed_data(chunk) return h.hexdigest() resp = session.get('http://httpbin.org/post' ) stream = StreamReader() loop.create_task(session.post('http://httpbin.org/post' , data=stream)) file_hash = await feed_stream(resp, stream)
因为响应内容类型是StreamReader,所以可以把get和post连接起来,同时进行post和get:
1 2 3 r = await session.get('http://python.org' )await session.post('http://httpbin.org/post' , data=r.content)
(5)post预压缩数据 在通过aiohttp发送前就已经压缩的数据, 调用压缩函数的函数名(通常是deflate 或 zlib)作为content-encoding的值:
1 2 3 4 5 async def my_coroutine (session, headers, my_data ): data = zlib.compress(my_data) headers = {'Content-Encoding' : 'deflate' } async with session.post('http://httpbin.org/post' , data=data, headers=headers) pass
10.keep-alive, 连接池,共享cookie
ClientSession 用于在多个连接之间共享cookie:
1 2 3 4 5 6 7 async with aiohttp.ClientSession() as session: await session.get('http://httpbin.org/cookies/set?my_cookie=my_value' ) filtered = session.cookie_jar.filter_cookies('http://httpbin.org' ) assert filtered['my_cookie' ].value == 'my_value' async with session.get('http://httpbin.org/cookies' ) as r: json_body = await r.json() assert json_body['cookies' ]['my_cookie' ] == 'my_value'
也可以为所有的连接设置共同的请求头:
1 2 3 4 5 async with aiohttp.ClientSession( headers={"Authorization" : "Basic bG9naW46cGFzcw==" }) as session: async with session.get("http://httpbin.org/headers" ) as r: json_body = await r.json() assert json_body['headers' ]['Authorization' ] == 'Basic bG9naW46cGFzcw=='
ClientSession 还支持 keep-alive连接和连接池(connection pooling)
11.cookie安全性 默认ClientSession使用的是严格模式的 aiohttp.CookieJar. RFC 2109,明确的禁止接受url和ip地址产生的cookie,只能接受 DNS 解析IP产生的cookie。可以通过设置aiohttp.CookieJar 的 unsafe=True 来配置:
1 2 jar = aiohttp.CookieJar(unsafe=True ) session = aiohttp.ClientSession(cookie_jar=jar)
12.控制同时连接的数量(连接池) 也可以理解为同时请求的数量,为了限制同时打开的连接数量,我们可以将限制参数传递给连接器:
1 conn = aiohttp.TCPConnector(limit=30 )
限制同时打开限制同时打开连接到同一端点的数量((host, port, is_ssl) 三的倍数),可以通过设置 limit_per_host 参数:
1 conn = aiohttp.TCPConnector(limit_per_host=30 )
13.自定义域名解析 我们可以指定域名服务器的 IP 对我们提供的get或post的url进行解析:
1 2 3 from aiohttp.resolver import AsyncResolver resolver = AsyncResolver(nameservers=["8.8.8.8" , "8.8.4.4" ]) conn = aiohttp.TCPConnector(resolver=resolver)
14.设置代理 aiohttp支持使用代理来访问网页:
1 2 3 async with aiohttp.ClientSession() as session: async with session.get("http://python.org" , proxy="http://some.proxy.com" ) as resp: print (resp.status)
当然也支持需要授权的页面:
1 2 3 4 async with aiohttp.ClientSession() as session: proxy_auth = aiohttp.BasicAuth('user' , 'pass' ) async with session.get("http://python.org" , proxy="http://some.proxy.com" , proxy_auth=proxy_auth) as resp: print (resp.status)
或者通过这种方式来验证授权:
1 session.get("http://python.org" , proxy="http://user:pass@some.proxy.com" )
15.响应状态码 response status code 可以通过 resp.status来检查状态码是不是200:
1 2 async with session.get('http://httpbin.org/get' ) as resp: assert resp.status == 200
16.响应头 我们可以直接使用 resp.headers 来查看响应头,得到的值类型是一个dict:
1 2 3 4 5 6 7 >>> resp.headers {'ACCESS-CONTROL-ALLOW-ORIGIN' : '*' , 'CONTENT-TYPE' : 'application/json' , 'DATE' : 'Tue, 15 Jul 2014 16:49:51 GMT' , 'SERVER' : 'gunicorn/18.0' , 'CONTENT-LENGTH' : '331' , 'CONNECTION' : 'keep-alive' }
或者我们可以查看原生的响应头:
1 2 3 4 5 6 >>> resp.raw_headers ((b'SERVER' , b'nginx' ), (b'DATE' , b'Sat, 09 Jan 2016 20:28:40 GMT' ), (b'CONTENT-TYPE' , b'text/html; charset=utf-8' ), (b'CONTENT-LENGTH' , b'12150' ), (b'CONNECTION' , b'keep-alive' ))
17.查看cookie 1 2 3 url = 'http://example.com/some/cookie/setting/url' async with session.get(url) as resp: print (resp.cookies)
18.重定向的响应头 如果一个请求被重定向了,我们依然可以查看被重定向之前的响应头信息:
1 2 3 4 5 >>> resp = await session.get('http://example.com/some/redirect/' )>>> resp <ClientResponse(http://example.com/some/other/url/) [200 ]>>>> resp.history (<ClientResponse(http://example.com/some/redirect/) [301 ]>,)
19.超时处理 默认的IO操作都有5分钟的响应时间 我们可以通过 timeout 进行重写:
1 2 async with session.get('https://github.com' , timeout=60 ) as r: ...
如果 timeout=None 或者 timeout=0 将不进行超时检查,也就是不限时长