patroni.postgresql package¶
Submodules¶
- patroni.postgresql.bootstrap module
Bootstrap
Bootstrap.__init__()
Bootstrap._custom_bootstrap()
Bootstrap._initdb()
Bootstrap._post_restore()
Bootstrap.basebackup()
Bootstrap.bootstrap()
Bootstrap.call_post_bootstrap()
Bootstrap.clone()
Bootstrap.create_or_update_role()
Bootstrap.create_replica()
Bootstrap.keep_existing_recovery_conf
Bootstrap.post_bootstrap()
Bootstrap.process_user_options()
Bootstrap.running_custom_bootstrap
- patroni.postgresql.callback_executor module
CallbackAction
CallbackAction.NOOP
CallbackAction.ON_RELOAD
CallbackAction.ON_RESTART
CallbackAction.ON_ROLE_CHANGE
CallbackAction.ON_START
CallbackAction.ON_STOP
CallbackAction._generate_next_value_()
CallbackAction._member_map_
CallbackAction._member_names_
CallbackAction._member_type_
CallbackAction._new_member_()
CallbackAction._unhashable_values_
CallbackAction._use_args_
CallbackAction._value2member_map_
CallbackAction._value_repr_()
CallbackExecutor
OnReloadExecutor
- patroni.postgresql.cancellable module
- patroni.postgresql.citus module
CitusHandler
CitusHandler.__init__()
CitusHandler._add_task()
CitusHandler.add_task()
CitusHandler.adjust_postgres_gucs()
CitusHandler.bootstrap()
CitusHandler.find_task_by_group()
CitusHandler.group()
CitusHandler.handle_event()
CitusHandler.ignore_replication_slot()
CitusHandler.is_coordinator()
CitusHandler.is_enabled()
CitusHandler.is_worker()
CitusHandler.load_pg_dist_node()
CitusHandler.on_demote()
CitusHandler.pick_task()
CitusHandler.process_task()
CitusHandler.process_tasks()
CitusHandler.query()
CitusHandler.run()
CitusHandler.schedule_cache_rebuild()
CitusHandler.sync_pg_dist_node()
CitusHandler.update_node()
PgDistNode
- patroni.postgresql.config module
ConfigHandler
ConfigHandler.CMDLINE_OPTIONS
ConfigHandler._RECOVERY_PARAMETERS
ConfigHandler.__init__()
ConfigHandler._adjust_recovery_parameters()
ConfigHandler._check_passfile()
ConfigHandler._check_primary_conninfo()
ConfigHandler._configuration_to_save
ConfigHandler._get_pg_settings()
ConfigHandler._get_tcp_local_address()
ConfigHandler._get_unix_local_address()
ConfigHandler._handle_wal_buffers()
ConfigHandler._pgpass_line()
ConfigHandler._read_recovery_params()
ConfigHandler._read_recovery_params_pre_v12()
ConfigHandler._recovery_parameters_to_compare
ConfigHandler._remove_file_if_exists()
ConfigHandler._sanitize_auto_conf()
ConfigHandler._triggerfile_wrong_name
ConfigHandler._write_recovery_params()
ConfigHandler.append_pg_hba()
ConfigHandler.build_recovery_params()
ConfigHandler.check_directories()
ConfigHandler.check_recovery_conf()
ConfigHandler.config_dir
ConfigHandler.config_writer()
ConfigHandler.effective_configuration
ConfigHandler.format_dsn()
ConfigHandler.get()
ConfigHandler.get_server_parameters()
ConfigHandler.hba_file
ConfigHandler.ident_file
ConfigHandler.load_current_server_parameters()
ConfigHandler.pg_hba_conf
ConfigHandler.postgresql_conf
ConfigHandler.primary_conninfo_params()
ConfigHandler.recovery_conf_exists()
ConfigHandler.reload_config()
ConfigHandler.remove_recovery_conf()
ConfigHandler.replace_pg_hba()
ConfigHandler.replace_pg_ident()
ConfigHandler.replication
ConfigHandler.resolve_connection_addresses()
ConfigHandler.restore_command()
ConfigHandler.restore_configuration_files()
ConfigHandler.rewind_credentials
ConfigHandler.save_configuration_files()
ConfigHandler.set_file_permissions()
ConfigHandler.set_synchronous_standby_names()
ConfigHandler.setup_server_parameters()
ConfigHandler.superuser
ConfigHandler.triggerfile_good_name
ConfigHandler.try_to_create_dir()
ConfigHandler.write_pgpass()
ConfigHandler.write_postgresql_conf()
ConfigHandler.write_recovery_conf()
ConfigWriter
_bool_is_true_validator()
_bool_validator()
_false_validator()
conninfo_parse()
conninfo_uri_parse()
mtime()
parse_dsn()
read_param_value()
read_recovery_param_value()
strip_comment()
- patroni.postgresql.connection module
- patroni.postgresql.misc module
- patroni.postgresql.postmaster module
PostmasterProcess
PostmasterProcess.__init__()
PostmasterProcess._from_pidfile()
PostmasterProcess._is_postmaster_process()
PostmasterProcess._read_postmaster_pidfile()
PostmasterProcess.from_pid()
PostmasterProcess.from_pidfile()
PostmasterProcess.pg_ctl_kill()
PostmasterProcess.signal_kill()
PostmasterProcess.signal_stop()
PostmasterProcess.start()
PostmasterProcess.wait_for_user_backends_to_close()
pg_ctl_start()
- patroni.postgresql.rewind module
REWIND_STATUS
Rewind
Rewind.__checkpoint()
Rewind.__init__()
Rewind._archive_ready_wals()
Rewind._buid_archiver_command()
Rewind._check_timeline_and_lsn()
Rewind._conn_kwargs()
Rewind._fetch_missing_wal()
Rewind._find_missing_wal()
Rewind._get_checkpoint_end()
Rewind._get_local_timeline_lsn()
Rewind._get_local_timeline_lsn_from_controldata()
Rewind._log_primary_history()
Rewind._maybe_clean_pg_replslot()
Rewind.can_rewind
Rewind.can_rewind_or_reinitialize_allowed
Rewind.check_leader_has_run_checkpoint()
Rewind.check_leader_is_not_in_recovery()
Rewind.checkpoint_after_promote()
Rewind.cleanup_archive_status()
Rewind.configuration_allows_rewind()
Rewind.enabled
Rewind.ensure_checkpoint_after_promote()
Rewind.ensure_clean_shutdown()
Rewind.execute()
Rewind.executed
Rewind.failed
Rewind.is_needed
Rewind.pg_rewind()
Rewind.read_postmaster_opts()
Rewind.reset_state()
Rewind.rewind_or_reinitialize_needed_and_possible()
Rewind.should_remove_data_directory_on_diverged_timelines
Rewind.single_user_mode()
Rewind.trigger_check_diverged_lsn()
- patroni.postgresql.slots module
SlotsAdvanceThread
SlotsHandler
SlotsHandler.__init__()
SlotsHandler._copy_items()
SlotsHandler._drop_incorrect_slots()
SlotsHandler._ensure_logical_slots_primary()
SlotsHandler._ensure_logical_slots_replica()
SlotsHandler._ensure_physical_slots()
SlotsHandler._get_leader_connection_cursor()
SlotsHandler._query()
SlotsHandler._ready_logical_slots()
SlotsHandler._update_pending_logical_slot_primary()
SlotsHandler.check_logical_slots_readiness()
SlotsHandler.copy_logical_slots()
SlotsHandler.drop_replication_slot()
SlotsHandler.get_local_connection_cursor()
SlotsHandler.ignore_replication_slot()
SlotsHandler.load_replication_slots()
SlotsHandler.on_promote()
SlotsHandler.process_permanent_slots()
SlotsHandler.schedule()
SlotsHandler.schedule_advance_slots()
SlotsHandler.sync_replication_slots()
compare_slots()
- patroni.postgresql.sync module
- patroni.postgresql.validator module
Bool
Enum
EnumBool
Integer
InvalidGucValidatorsFile
Number
Real
String
ValidatorFactory
ValidatorFactoryInvalidSpec
ValidatorFactoryInvalidType
ValidatorFactoryNoType
_Transformable
_get_postgres_guc_validators()
_load_postgres_gucs_validators()
_read_postgres_gucs_validators_file()
_transform_parameter_value()
transform_postgresql_parameter_value()
transform_recovery_parameter_value()
Module contents¶
- class patroni.postgresql.Postgresql(config: Dict[str, Any])¶
Bases:
object
- POSTMASTER_START_TIME = 'pg_catalog.pg_postmaster_start_time()'¶
- TL_LSN = "CASE WHEN pg_catalog.pg_is_in_recovery() THEN 0 ELSE ('x' || pg_catalog.substr(pg_catalog.pg_{0}file_name(pg_catalog.pg_current_{0}_{1}()), 1, 8))::bit(32)::int END, CASE WHEN pg_catalog.pg_is_in_recovery() THEN 0 ELSE pg_catalog.pg_{0}_{1}_diff(pg_catalog.pg_current_{0}{2}_{1}(), '0/0')::bigint END, pg_catalog.pg_{0}_{1}_diff(pg_catalog.pg_last_{0}_replay_{1}(), '0/0')::bigint, pg_catalog.pg_{0}_{1}_diff(COALESCE(pg_catalog.pg_last_{0}_receive_{1}(), '0/0'), '0/0')::bigint, pg_catalog.pg_is_in_recovery() AND pg_catalog.pg_is_{0}_replay_paused()"¶
- _do_stop(mode: str, block_callbacks: bool, checkpoint: bool, on_safepoint: Optional[Callable[[...], Any]], on_shutdown: Optional[Callable[[...], Any]], before_shutdown: Optional[Callable[[...], Any]], stop_timeout: Optional[int]) Tuple[bool, bool] ¶
- _get_gucs() CaseInsensitiveSet ¶
Get all available GUCs based on
postgres --describe-config
output.- Returns:
all available GUCs in the local Postgres server.
- _pre_promote() bool ¶
Runs a fencing script after the leader lock is acquired but before the replica is promoted. If the script exits with a non-zero code, promotion does not happen and the leader key is removed from DCS.
- _query(sql: str, *params: Any) List[Tuple[Any, ...]] ¶
Execute sql query with params and optionally return results.
- Parameters:
sql – SQL statement to execute.
params – parameters to pass.
- Returns:
a query response as a list of tuples if there is any.
- Raises:
Error
if had issues while executing sql.PostgresConnectionException
: if had issues while connecting to the database.RetryFailedError
: if it was detected that connection/query failed due to PostgreSQL restart.
- _wait_for_connection_close(postmaster: PostmasterProcess) None ¶
- static _wal_position(is_primary: bool, wal_position: int, received_location: Optional[int], replayed_location: Optional[int]) int ¶
- property available_gucs: CaseInsensitiveSet¶
GUCs available in this Postgres server.
- call_nowait(cb_type: CallbackAction) None ¶
pick a callback command and call it without waiting for it to finish
- can_create_replica_without_replication_connection(replica_methods: Optional[List[str]]) bool ¶
go through the replication methods to see if there are ones that does not require a working replication connection.
- check_for_startup() bool ¶
Checks PostgreSQL status and returns if PostgreSQL is in the middle of startup.
- check_startup_state_changed() bool ¶
Checks if PostgreSQL has completed starting up or failed or still starting.
Should only be called when state == ‘starting’
- Returns:
True if state was changed from ‘starting’
- checkpoint(connect_kwargs: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None) Optional[str] ¶
- property cluster_info_query: str¶
Returns the monitoring query with a fixed number of fields.
The query text is constructed based on current state in DCS and PostgreSQL version:
function names depend on version. wal/lsn for v10+ and xlog/location for pre v10.
for primary we query timeline_id (extracted from pg_walfile_name()) and pg_current_wal_lsn()
for replicas we query pg_last_wal_receive_lsn(), pg_last_wal_replay_lsn(), and pg_is_wal_replay_paused()
for v9.6+ we query primary_slot_name and primary_conninfo from pg_stat_get_wal_receiver()
for v11+ with permanent logical slots we query from pg_replication_slots and aggregate the result
for standby_leader node running v9.6+ we also query pg_control_checkpoint to fetch timeline_id
if sync replication is enabled we query pg_stat_replication and aggregate the result. In addition to that we get current values of synchronous_commit and synchronous_standby_names GUCs.
If some conditions are not satisfied we simply put static values instead. E.g., NULL, 0, ‘’, and so on.
- controldata() Dict[str, str] ¶
return the contents of pg_controldata, or non-True value if pg_controldata call failed
- ensure_major_version_is_known() bool ¶
Calls configure_server_parameters() if _major_version is not known
- Returns:
True if _major_version is set, otherwise False
- follow(member: Optional[Union[Leader, Member]], role: str = 'replica', timeout: Optional[float] = None, do_reload: bool = False) Optional[bool] ¶
Reconfigure postgres to follow a new member or use different recovery parameters.
Method may call on_role_change callback if role is changing.
- Parameters:
member – The member to follow
role – The desired role, normally ‘replica’, but could also be a ‘standby_leader’
timeout – start timeout, how long should the start() method wait for postgres accepting connections
do_reload – indicates that after updating postgresql.conf we just need to do a reload instead of restart
- Returns:
True - if restart/reload were successfully performed, False - if restart/reload failed None - if nothing was done or if Postgres is still in starting state after timeout seconds.
- get_major_version() int ¶
Reads major version from PG_VERSION file
- Returns:
major PostgreSQL version in integer format or 0 in case of missing file or errors
- get_replication_connection_cursor(host: Optional[str] = None, port: Union[int, str] = 5432, **kwargs: Any) Iterator[Union[cursor, Cursor[Any]]] ¶
- property global_config: Optional[GlobalConfig]¶
- initdb(*args: str, **kwargs: Any) bool ¶
Builds and executes the initdb command.
- Parameters:
args – List of arguments to be joined into the initdb command.
kwargs – Keyword arguments to pass to
subprocess.call
.
- Returns:
True
if the result ofsubprocess.call`, the exit code, is ``0
.
- is_running() Optional[PostmasterProcess] ¶
Returns PostmasterProcess if one is running on the data directory or None. If most recently seen process is running updates the cached process based on pid file.
- latest_checkpoint_location() Optional[int] ¶
Returns checkpoint location for the cleanly shut down primary. But, if we know that the checkpoint was written to the new WAL due to the archive_mode=on, we will return the LSN of prev wal record (SWITCH).
- parse_wal_record(timeline: str, lsn: str) Union[Tuple[str, str, str, str], Tuple[None, None, None, None]] ¶
- pg_ctl(cmd: str, *args: str, **kwargs: Any) bool ¶
Builds and executes pg_ctl command
- Returns:
!True when return_code == 0, otherwise !False
- pg_isready() str ¶
Runs pg_isready to see if PostgreSQL is accepting connections.
- Returns:
‘ok’ if PostgreSQL is up, ‘reject’ if starting up, ‘no_resopnse’ if not up.
- pg_stat_replication() List[Dict[str, Any]] ¶
- Returns:
a result set of ‘SELECT * FROM pg_stat_replication’.
- pg_tblspc_realpaths() Dict[str, str] ¶
Returns a dict containing the symlink (key) and target (values) for the tablespaces
- pg_wal_realpath() Dict[str, str] ¶
Returns a dict containing the symlink (key) and target (value) for the wal directory
- pgcommand(cmd: str) str ¶
Return path to the specified PostgreSQL command.
Note
If
postgresql.bin_name.*cmd*
was configured by the user then that binary name is used, otherwise the default binary name cmd is used.- Parameters:
cmd – the Postgres binary name to get path to.
- Returns:
path to Postgres binary named cmd.
- promote(wait_seconds: int, task: CriticalTask, before_promote: Optional[Callable[[...], Any]] = None) Optional[bool] ¶
- query(sql: str, *params: Any, retry: bool = True) List[Tuple[Any, ...]] ¶
Execute sql query with params and optionally return results.
- Parameters:
sql – SQL statement to execute.
params – parameters to pass.
retry – whether the query should be retried upon failure or given up immediately.
- Returns:
a query response as a list of tuples if there is any.
- Raises:
Error
if had issues while executing sql.PostgresConnectionException
: if had issues while connecting to the database.RetryFailedError
: if it was detected that connection/query failed due to PostgreSQL restart or if retry deadline was exceeded.
- replication_state() Optional[str] ¶
Checks replication state from pg_stat_get_wal_receiver().
Note
Available only since 9.6
- Returns:
streaming
,in archive recovery
, orNone
- replication_state_from_parameters(is_primary: bool, receiver_state: Optional[str], restore_command: Optional[str]) Optional[str] ¶
Figure out the replication state from input parameters.
Note
This method could be only called when Postgres is up, running and queries are successfuly executed.
- Is_primary:
True is postgres is not running in recovery
- Receiver_state:
value from pg_stat_get_wal_receiver.state or None if Postgres is older than 9.6
- Restore_command:
value of
restore_command
GUC for PostgreSQL 12+ or postgresql.recovery_conf.restore_command if it is set in Patroni configuration- Returns:
None for the primary and for Postgres older than 9.6;
’streaming’ if replica is streaming according to the pg_stat_wal_receiver view;
’in archive recovery’ if replica isn’t streaming and there is a restore_command
- reset_cluster_info_state(cluster: Optional[Cluster], nofailover: bool = False, global_config: Optional[GlobalConfig] = None) None ¶
Reset monitoring query cache.
It happens in the beginning of heart-beat loop and on change of synchronous_standby_names.
- Parameters:
cluster – currently known cluster state from DCS
nofailover – whether this node could become a new primary. Important when there are logical permanent replication slots because “nofailover” node could do cascading replication and should enable hot_standby_feedback
global_config – last known
GlobalConfig
object
- restart(timeout: Optional[float] = None, task: Optional[CriticalTask] = None, block_callbacks: bool = False, role: Optional[str] = None, before_shutdown: Optional[Callable[[...], Any]] = None, after_start: Optional[Callable[[...], Any]] = None) Optional[bool] ¶
Restarts PostgreSQL.
When timeout parameter is set the call will block either until PostgreSQL has started, failed to start or timeout arrives.
- Returns:
True when restart was successful and timeout did not expire when waiting.
- schedule_sanity_checks_after_pause() None ¶
After coming out of pause we have to: 1. configure server parameters if necessary 2. sync replication slots, because it might happen that slots were removed 3. get new ‘Database system identifier’ to make sure that it wasn’t changed
- start(timeout: Optional[float] = None, task: Optional[CriticalTask] = None, block_callbacks: bool = False, role: Optional[str] = None, after_start: Optional[Callable[[...], Any]] = None) Optional[bool] ¶
Start PostgreSQL
Waits for postmaster to open ports or terminate so pg_isready can be used to check startup completion or failure.
- Returns:
True if start was initiated and postmaster ports are open, False if start failed, and None if postgres is still starting up
- stop(mode: str = 'fast', block_callbacks: bool = False, checkpoint: Optional[bool] = None, on_safepoint: Optional[Callable[[...], Any]] = None, on_shutdown: Optional[Callable[[int], Any]] = None, before_shutdown: Optional[Callable[[...], Any]] = None, stop_timeout: Optional[int] = None) bool ¶
Stop PostgreSQL
Supports a callback when a safepoint is reached. A safepoint is when no user backend can return a successful commit to users. Currently this means we wait for user backends to close. But in the future alternate mechanisms could be added.
- Parameters:
on_safepoint – This callback is called when no user backends are running.
on_shutdown – is called when pg_controldata starts reporting Database cluster state: shut down
before_shutdown – is called after running optional CHECKPOINT and before running pg_ctl stop
- property supports_multiple_sync: bool¶
True if Postgres version supports more than one synchronous node.
- Type:
returns
- terminate_postmaster(postmaster: PostmasterProcess, mode: str, stop_timeout: Optional[int]) Optional[bool] ¶
- terminate_starting_postmaster(postmaster: PostmasterProcess) None ¶
Terminates a postmaster that has not yet opened ports or possibly even written a pid file. Blocks until the process goes away.
- wait_for_port_open(postmaster: PostmasterProcess, timeout: float) bool ¶
Waits until PostgreSQL opens ports.
- wait_for_startup(timeout: float = 0) Optional[bool] ¶
Waits for PostgreSQL startup to complete or fail.
- Returns:
True if start was successful, False otherwise
- patroni.postgresql.null_context()¶