asyncio Support#

The asyncio-related APIs can be found within the prometheus_async.aio package.

Decorator Wrappers#

All of these functions take a prometheus_client metrics object and can either be applied as a decorator to functions and methods, or they can be passed an asyncio.Future for a second argument.

coroutine prometheus_async.aio.time(metric: Observer) Callable[[Callable[P, R]], Callable[P, R]]#
coroutine prometheus_async.aio.time(metric: Observer, future: Awaitable[T]) Awaitable[T]

Call metric.observe(time) with the runtime in seconds.

Works as a decorator as well as on asyncio.Futures.

Returns

coroutine function (if decorator) or coroutine.

The most common use case is using it as a decorator:

import asyncio

from aiohttp import web
from prometheus_client import Histogram
from prometheus_async.aio import time

REQ_TIME = Histogram("req_time_seconds", "time spent in requests")

@time(REQ_TIME)
async def req(request):
    await asyncio.sleep(1)
    return web.Response(body=b"hello")
coroutine prometheus_async.aio.count_exceptions(metric: Incrementer, *, exc: type[BaseException] = 'BaseException') Callable[[Callable[P, R]], Callable[P, R]]#
coroutine prometheus_async.aio.count_exceptions(metric: Incrementer, future: Awaitable[T], *, exc: type[BaseException] = 'BaseException') Awaitable[T]

Call metric.inc() whenever exc is caught.

Works as a decorator as well as on asyncio.Futures.

Returns

coroutine function (if decorator) or coroutine.

coroutine prometheus_async.aio.track_inprogress(metric: Gauge) Callable[[Callable[P, R]], Callable[P, R]]#
coroutine prometheus_async.aio.track_inprogress(metric: Gauge, future: Awaitable[T]) Awaitable[T]

Call metrics.inc() on entry and metric.dec() on exit.

Works as a decorator, as well on asyncio.Futures.

Returns

coroutine function (if decorator) or coroutine.

Metric Exposure#

prometheus-async offers methods to expose your metrics using aiohttp under prometheus_async.aio.web:

coroutine prometheus_async.aio.web.start_http_server(*, addr='', port=0, ssl_ctx=None, service_discovery=None)#

Start an HTTP(S) server on addr:port.

If ssl_ctx is set, use TLS.

Parameters
  • addr (str) – Interface to listen on. Leaving empty will listen on all interfaces.

  • port (int) – Port to listen on.

  • ssl_ctx (ssl.SSLContext) – TLS settings

  • service_discovery (ServiceDiscovery | None) – see Service Discovery

Return type

MetricsHTTPServer

Deprecated since version 18.2.0: The loop argument is a no-op now and will be removed in one year by the earliest.

Changed in version 21.1.0: The loop argument has been removed.

prometheus_async.aio.web.start_http_server_in_thread(*, port=0, addr='', ssl_ctx=None, service_discovery=None)#

Start an asyncio HTTP(S) server in a new thread with an own event loop.

Ideal to expose your metrics in non-asyncio Python 3 applications.

For arguments see start_http_server().

Return type

ThreadedMetricsHTTPServer

Warning

Please note that if you want to use uWSGI together with start_http_server_in_thread(), you have to tell uWSGI to enable threads using its configuration option or by passing it --enable-threads.

Currently the recommended mode to run uWSGI with --master is broken if you want to clean up using atexit handlers.

Therefore the usage of prometheus_sync.aio.web together with uWSGI is strongly discouraged.

async prometheus_async.aio.web.server_stats(request)#

Return a web response with the plain text version of the metrics.

Return type

aiohttp.web.Response

Useful if you want to install your metrics within your own application:

from aiohttp import web
from prometheus_async import aio

app = web.Application()
app.router.add_get("/metrics", aio.web.server_stats)
# your other routes go here.
class prometheus_async.aio.web.MetricsHTTPServer(socket, runner, app, https)#

A stoppable metrics HTTP server.

Returned by start_http_server(). Do not instantiate it yourself.

Variables
  • socket – Socket the server is listening on. namedtuple of either (ipaddress.IPv4Address, port) or (ipaddress.IPv6Address, port).

  • https (bool) – Whether the server uses SSL/TLS.

  • url (str) – A valid URL to the metrics endpoint.

  • is_registered (bool) – Is the web endpoint registered with a service discovery system?

coroutine close()#

Stop the server and clean up.

class prometheus_async.aio.web.ThreadedMetricsHTTPServer(http_server, thread, loop)#

A stoppable metrics HTTP server that runs in a separate thread.

Returned by start_http_server_in_thread(). Do not instantiate it yourself.

Variables
  • socket – Socket the server is listening on. namedtuple of Socket(addr, port).

  • https (bool) – Whether the server uses SSL/TLS.

  • url (str) – A valid URL to the metrics endpoint.

  • is_registered (bool) – Is the web endpoint registered with a service discovery system?

close()#

Stop the server, close the event loop, and join the thread.

Service Discovery#

Web exposure is much more useful if it comes with an easy way to integrate it with service discovery.

Currently prometheus-async only ships integration with a local Consul agent using aiohttp. We do not plan add more.

class prometheus_async.aio.sd.ConsulAgent(*, name='app-metrics', service_id=None, tags=(), token=None, deregister=True)#

Service discovery via a local Consul agent.

Pass as service_discovery into prometheus_async.aio.web.start_http_server()/ prometheus_async.aio.web.start_http_server_in_thread().

Parameters
  • name (str) – Application name that is used for the name and the service ID if not set.

  • service_id (str) – Consul Service ID. If not set, name is used.

  • tags (tuple) – Tags to use in Consul registration.

  • token (str) – A consul access token.

  • deregister (bool) – Whether to deregister when the HTTP server is closed.

Custom Service Discovery#

Adding own service discovery methods is simple: all you need is to provide an instance with a coroutine register(self, metrics_server, loop) that registers the passed metrics_server with the service of your choicer and returns another coroutine that is called for de-registration when the metrics server is shut down.

Have a look at our implementations if you need inspiration or check out the ServiceDiscovery typing.Protocol in the types module