Source code for

"""Model and Manager for Event resources."""

import json
import logging
from datetime import datetime
from typing import Any, Dict, Optional, Union, Iterator

from podman import api
from podman.api.client import APIClient

logger = logging.getLogger("")

[docs] class EventsManager: # pylint: disable=too-few-public-methods """Specialized Manager for Event resources.""" def __init__(self, client: APIClient) -> None: """Initialize EventManager object. Args: client: Connection to Podman service. """ self.client = client
[docs] def list( self, since: Union[datetime, int, None] = None, until: Union[datetime, int, None] = None, filters: Optional[Dict[str, Any]] = None, decode: bool = False, ) -> Iterator[Union[str, Dict[str, Any]]]: """Report on networks. Args: decode: When True, decode stream into dict's. Default: False filters: Criteria for including events. since: Get events newer than this time. until: Get events older than this time. Yields: When decode is True, Iterator[Dict[str, Any]] When decode is False, Iterator[str] """ params = { "filters": api.prepare_filters(filters), "since": api.prepare_timestamp(since), "stream": True, "until": api.prepare_timestamp(until), } response = self.client.get("/events", params=params, stream=True) response.raise_for_status() for item in response.iter_lines(): if decode: yield json.loads(item) else: yield item