asyncio Support#
The asyncio-related APIs can be found within the prometheus_async.aio
package.
Decorator Wrappers#
All of these functions take a prometheus_client metrics object and can either be applied as a decorator to functions and methods, or they can be passed an asyncio.Future
for a second argument.
- coroutine prometheus_async.aio.time(metric: Observer) Callable[[Callable[P, R]], Callable[P, R]] #
- coroutine prometheus_async.aio.time(metric: Observer, future: Awaitable[T]) Awaitable[T]
Call
metric.observe(time)
with the runtime in seconds.Works as a decorator as well as on
asyncio.Future
s.- Returns
coroutine function (if decorator) or coroutine.
The most common use case is using it as a decorator:
import asyncio
from aiohttp import web
from prometheus_client import Histogram
from prometheus_async.aio import time
REQ_TIME = Histogram("req_time_seconds", "time spent in requests")
@time(REQ_TIME)
async def req(request):
await asyncio.sleep(1)
return web.Response(body=b"hello")
- coroutine prometheus_async.aio.count_exceptions(metric: Incrementer, *, exc: type[BaseException] = 'BaseException') Callable[[Callable[P, R]], Callable[P, R]] #
- coroutine prometheus_async.aio.count_exceptions(metric: Incrementer, future: Awaitable[T], *, exc: type[BaseException] = 'BaseException') Awaitable[T]
Call
metric.inc()
whenever exc is caught.Works as a decorator as well as on
asyncio.Future
s.- Returns
coroutine function (if decorator) or coroutine.
- coroutine prometheus_async.aio.track_inprogress(metric: Gauge) Callable[[Callable[P, R]], Callable[P, R]] #
- coroutine prometheus_async.aio.track_inprogress(metric: Gauge, future: Awaitable[T]) Awaitable[T]
Call
metrics.inc()
on entry andmetric.dec()
on exit.Works as a decorator, as well on
asyncio.Future
s.- Returns
coroutine function (if decorator) or coroutine.
Metric Exposure#
prometheus-async offers methods to expose your metrics using aiohttp under prometheus_async.aio.web
:
- coroutine prometheus_async.aio.web.start_http_server(*, addr='', port=0, ssl_ctx=None, service_discovery=None)#
Start an HTTP(S) server on addr:port.
If ssl_ctx is set, use TLS.
- Parameters
addr (str) – Interface to listen on. Leaving empty will listen on all interfaces.
port (int) – Port to listen on.
ssl_ctx (ssl.SSLContext) – TLS settings
service_discovery (ServiceDiscovery | None) – see Service Discovery
- Return type
Deprecated since version 18.2.0: The loop argument is a no-op now and will be removed in one year by the earliest.
Changed in version 21.1.0: The loop argument has been removed.
- prometheus_async.aio.web.start_http_server_in_thread(*, port=0, addr='', ssl_ctx=None, service_discovery=None)#
Start an asyncio HTTP(S) server in a new thread with an own event loop.
Ideal to expose your metrics in non-asyncio Python 3 applications.
For arguments see
start_http_server()
.- Return type
Warning
Please note that if you want to use uWSGI together with start_http_server_in_thread()
, you have to tell uWSGI to enable threads using its configuration option or by passing it --enable-threads
.
Currently the recommended mode to run uWSGI with --master
is broken if you want to clean up using atexit
handlers.
Therefore the usage of prometheus_sync.aio.web
together with uWSGI is strongly discouraged.
- async prometheus_async.aio.web.server_stats(request)#
Return a web response with the plain text version of the metrics.
- Return type
Useful if you want to install your metrics within your own application:
from aiohttp import web
from prometheus_async import aio
app = web.Application()
app.router.add_get("/metrics", aio.web.server_stats)
# your other routes go here.
- class prometheus_async.aio.web.MetricsHTTPServer(socket, runner, app, https)#
A stoppable metrics HTTP server.
Returned by
start_http_server()
. Do not instantiate it yourself.- Variables
socket – Socket the server is listening on. namedtuple of either (
ipaddress.IPv4Address
, port) or (ipaddress.IPv6Address
, port).https (bool) – Whether the server uses SSL/TLS.
url (str) – A valid URL to the metrics endpoint.
is_registered (bool) – Is the web endpoint registered with a service discovery system?
- coroutine close()#
Stop the server and clean up.
- class prometheus_async.aio.web.ThreadedMetricsHTTPServer(http_server, thread, loop)#
A stoppable metrics HTTP server that runs in a separate thread.
Returned by
start_http_server_in_thread()
. Do not instantiate it yourself.- Variables
- close()#
Stop the server, close the event loop, and join the thread.
Service Discovery#
Web exposure is much more useful if it comes with an easy way to integrate it with service discovery.
Currently prometheus-async only ships integration with a local Consul agent using aiohttp. We do not plan add more.
- class prometheus_async.aio.sd.ConsulAgent(*, name='app-metrics', service_id=None, tags=(), token=None, deregister=True)#
Service discovery via a local Consul agent.
Pass as
service_discovery
intoprometheus_async.aio.web.start_http_server()
/prometheus_async.aio.web.start_http_server_in_thread()
.- Parameters
name (str) – Application name that is used for the name and the service ID if not set.
service_id (str) – Consul Service ID. If not set, name is used.
tags (tuple) – Tags to use in Consul registration.
token (str) – A consul access token.
deregister (bool) – Whether to deregister when the HTTP server is closed.
Custom Service Discovery#
Adding own service discovery methods is simple:
all you need is to provide an instance with a coroutine register(self, metrics_server, loop)
that registers the passed metrics_server
with the service of your choicer and returns another coroutine that is called for de-registration when the metrics server is shut down.
Have a look at our implementations if you need inspiration or check out the ServiceDiscovery
typing.Protocol
in the types
module