AWS buludundan PostgreSQL jurnalının yüklənməsi

Və ya bir az tətbiq olunan tetrisologiya.
Yeni hər şey yaxşı unudulmuş köhnədir.
Epiqraflar.
AWS buludundan PostgreSQL jurnalının yüklənməsi

Problem problemi

Siz vaxtaşırı olaraq cari PostgreSQL log faylını AWS buludundan yerli Linux hostunuza endirməlisiniz. Real vaxtda deyil, deyək ki, bir az gecikmə ilə.
Günlük faylı yeniləmənin endirilmə müddəti 5 dəqiqədir.
AWS-də jurnal faylı hər saat fırlanır.

İstifadə olunmuş alətlər

Giriş faylını hosta yükləmək üçün AWS API-ni çağıran bir bash skripti istifadə olunur "aws rds download-db-log-fayl hissəsi.

Variantları:

  • —db-instance-identifier: AWS instansiya adı;
  • --log-file-name: hazırda yaradılan log faylının adı
  • --max-item: Komanda çıxışında qaytarılan elementlərin ümumi sayı.Yüklənmiş faylın hissə ölçüsü.
  • --starting-token: Başlanğıc nişanəsi

Bu vəziyyətdə, iş zamanı logların yüklənməsi vəzifəsi yarandı PostgreSQL sorğu performansının monitorinqi.

Və bu sadədir - iş saatlarında təlim və müxtəliflik üçün maraqlı bir tapşırıq.
Güman edəcəm ki, problem gündəlik həyatla əlaqədar artıq həll olunub. Ancaq sürətli bir Google heç bir həll təklif etmədi və daha dərindən axtarış etmək istəyim yox idi. Hər halda, bu, yaxşı bir məşqdir.

Tapşırıqın rəsmiləşdirilməsi

Son jurnal faylı dəyişən uzunluqlu bir çox sətirdən ibarətdir. Qrafik olaraq, log faylı belə bir şəkildə təmsil oluna bilər:
AWS buludundan PostgreSQL jurnalının yüklənməsi

Artıq sizə nəyisə xatırladırmı? Tetrisin bununla nə əlaqəsi var? Və burada onunla nə əlaqəsi var.
Növbəti faylı qrafik olaraq yükləyərkən yarana biləcək mümkün variantları təsəvvür etsək (sadəlik üçün bu halda xətlərin eyni uzunluqda olmasına icazə verin) standart Tetris parçaları:

1) Fayl tam şəkildə endirilib və yekundur. Bölmə ölçüsü son fayl ölçüsündən böyükdür:
AWS buludundan PostgreSQL jurnalının yüklənməsi

2) Fayl davam edir. Parçanın ölçüsü son fayl ölçüsündən kiçikdir:
AWS buludundan PostgreSQL jurnalının yüklənməsi

3) Fayl əvvəlki faylın davamıdır və davamı var. Parçanın ölçüsü son faylın qalan hissəsinin ölçüsündən kiçikdir:
AWS buludundan PostgreSQL jurnalının yüklənməsi

4) Fayl əvvəlki faylın davamıdır və sonuncudur. Parçanın ölçüsü son faylın qalan hissəsinin ölçüsündən böyükdür:
AWS buludundan PostgreSQL jurnalının yüklənməsi

Tapşırıq düzbucaqlı yığmaq və ya yeni səviyyədə Tetris oynamaqdır.
AWS buludundan PostgreSQL jurnalının yüklənməsi

Problemi həll edərkən yaranan problemlər

1) 2 ədəd bir ip yapışdırın

AWS buludundan PostgreSQL jurnalının yüklənməsi
Ümumiyyətlə, heç bir xüsusi problem yox idi. İlkin proqramlaşdırma kursundan standart problem.

Optimal xidmət ölçüsü

Amma bu bir az daha maraqlıdır.
Təəssüf ki, başlanğıc hissəsi etiketindən sonra ofsetdən istifadə etmək üçün heç bir yol yoxdur:

Artıq bildiyiniz kimi seçim — starting-token səhifələşdirməyə haradan başlamaq lazım olduğunu müəyyən etmək üçün istifadə olunur. Bu seçim String dəyərlərini qəbul edir, bu o deməkdir ki, Next Token sətirinin önünə ofset dəyəri əlavə etməyə çalışsanız, seçim ofset kimi nəzərə alınmayacaq.

Ona görə də onu hissə-hissə oxumaq lazımdır.
Böyük hissələrdə oxusanız, oxunuşların sayı minimal olacaq, lakin həcmi maksimum olacaq.
Kiçik hissələrdə oxusanız, əksinə, oxunuşların sayı maksimum olacaq, lakin həcmi minimal olacaqdır.
Buna görə də, trafiki azaltmaq və həllin ümumi gözəlliyi üçün, təəssüf ki, bir az qoltuqağası kimi görünən bir həll tapmalı oldum.

Təsvir üçün, log faylının 2 çox sadələşdirilmiş versiyada yüklənməsi prosesini nəzərdən keçirək. Hər iki halda oxunuşların sayı hissənin ölçüsündən asılıdır.

1) Kiçik hissələrdə yükləyin:
AWS buludundan PostgreSQL jurnalının yüklənməsi

2) Böyük hissələrdə yükləyin:
AWS buludundan PostgreSQL jurnalının yüklənməsi

Həmişə olduğu kimi, optimal həll ortadadır.
Xidmət ölçüsü minimaldır, lakin oxuma prosesi zamanı oxunma sayını azaltmaq üçün ölçüsü artırıla bilər.

Qeyd etmək lazımdır oxuna bilən hissənin optimal ölçüsünün seçilməsi probleminin hələ də həll olunmadığını və daha dərindən öyrənilməsi və təhlilini tələb etdiyini. Bəlkə bir az sonra.

İcranın ümumi təsviri

İstifadə olunan xidmət masaları

CREATE TABLE endpoint
(
id SERIAL ,
host text 
);

TABLE database
(
id SERIAL , 
…
last_aws_log_time text ,
last_aws_nexttoken text ,
aws_max_item_size integer 
);
last_aws_log_time — временная метка последнего загруженного лог-файла в формате YYYY-MM-DD-HH24.
last_aws_nexttoken — текстовая метка последней загруженной порции.
aws_max_item_size- эмпирическим путем, подобранный начальный размер порции.

Tam skript mətni

download_aws_piece.sh

#!/bin/bash
#########################################################
# download_aws_piece.sh
# downloan piece of log from AWS
# version HABR
 let min_item_size=1024
 let max_item_size=1048576
 let growth_factor=3
 let growth_counter=1
 let growth_counter_max=3

 echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:''STARTED'
 
 AWS_LOG_TIME=$1
 echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:AWS_LOG_TIME='$AWS_LOG_TIME
  
 database_id=$2
 echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:database_id='$database_id
 RESULT_FILE=$3 
  
 endpoint=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE_DATABASE -A -t -c "select e.host from endpoint e join database d on e.id = d.endpoint_id where d.id = $database_id "`
 echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:endpoint='$endpoint
  
 db_instance=`echo $endpoint | awk -F"." '{print toupper($1)}'`
 
 echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:db_instance='$db_instance

 LOG_FILE=$RESULT_FILE'.tmp_log'
 TMP_FILE=$LOG_FILE'.tmp'
 TMP_MIDDLE=$LOG_FILE'.tmp_mid'  
 TMP_MIDDLE2=$LOG_FILE'.tmp_mid2'  
  
 current_aws_log_time=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select last_aws_log_time from database where id = $database_id "`

 echo $(date +%Y%m%d%H%M)':      download_aws_piece.sh:current_aws_log_time='$current_aws_log_time
  
  if [[ $current_aws_log_time != $AWS_LOG_TIME  ]];
  then
    is_new_log='1'
	if ! psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -q -c "update database set last_aws_log_time = '$AWS_LOG_TIME' where id = $database_id "
	then
	  echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: FATAL_ERROR - update database set last_aws_log_time .'
	  exit 1
	fi
  else
    is_new_log='0'
  fi
  
  echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:is_new_log='$is_new_log
  
  let last_aws_max_item_size=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select aws_max_item_size from database where id = $database_id "`
  echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: last_aws_max_item_size='$last_aws_max_item_size
  
  let count=1
  if [[ $is_new_log == '1' ]];
  then    
	echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: START DOWNLOADING OF NEW AWS LOG'
	if ! aws rds download-db-log-file-portion 
		--max-items $last_aws_max_item_size 
		--region REGION 
		--db-instance-identifier  $db_instance 
		--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
	then
		echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
		exit 2
	fi  	
  else
    next_token=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -c "select last_aws_nexttoken from database where id = $database_id "`
	
	if [[ $next_token == '' ]];
	then
	  next_token='0'	  
	fi
	
	echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: CONTINUE DOWNLOADING OF AWS LOG'
	if ! aws rds download-db-log-file-portion 
	    --max-items $last_aws_max_item_size 
		--starting-token $next_token 
		--region REGION 
		--db-instance-identifier  $db_instance 
		--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
	then
		echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
		exit 3
	fi       
	
	line_count=`cat  $LOG_FILE | wc -l`
	let lines=$line_count-1
	  
	tail -$lines $LOG_FILE > $TMP_MIDDLE 
	mv -f $TMP_MIDDLE $LOG_FILE
  fi
  
  next_token_str=`cat $LOG_FILE | grep NEXTTOKEN` 
  next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
  
  grep -v NEXTTOKEN $LOG_FILE  > $TMP_FILE 
  
  if [[ $next_token == '' ]];
  then
	  cp $TMP_FILE $RESULT_FILE
	  
	  echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:  NEXTTOKEN NOT FOUND - FINISH '
	  rm $LOG_FILE 
	  rm $TMP_FILE
	  rm $TMP_MIDDLE
          rm $TMP_MIDDLE2	  
	  exit 0  
  else
	psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
  fi
  
  first_str=`tail -1 $TMP_FILE`
  
  line_count=`cat  $TMP_FILE | wc -l`
  let lines=$line_count-1    
  
  head -$lines $TMP_FILE  > $RESULT_FILE

###############################################
# MAIN CIRCLE
  let count=2
  while [[ $next_token != '' ]];
  do 
    echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: count='$count
	
	echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: START DOWNLOADING OF AWS LOG'
	if ! aws rds download-db-log-file-portion 
             --max-items $last_aws_max_item_size 
             --starting-token $next_token 
             --region REGION 
             --db-instance-identifier  $db_instance 
             --log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
	then
		echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
		exit 4
	fi

	next_token_str=`cat $LOG_FILE | grep NEXTTOKEN` 
	next_token=`echo $next_token_str | awk -F" " '{ print $2}' `

	TMP_FILE=$LOG_FILE'.tmp'
	grep -v NEXTTOKEN $LOG_FILE  > $TMP_FILE  
	
	last_str=`head -1 $TMP_FILE`
  
    if [[ $next_token == '' ]];
	then
	  concat_str=$first_str$last_str
	  	  
	  echo $concat_str >> $RESULT_FILE
		 
	  line_count=`cat  $TMP_FILE | wc -l`
	  let lines=$line_count-1
	  
	  tail -$lines $TMP_FILE >> $RESULT_FILE
	  
	  echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:  NEXTTOKEN NOT FOUND - FINISH '
	  rm $LOG_FILE 
	  rm $TMP_FILE
	  rm $TMP_MIDDLE
          rm $TMP_MIDDLE2	  
	  exit 0  
	fi
	
    if [[ $next_token != '' ]];
	then
		let growth_counter=$growth_counter+1
		if [[ $growth_counter -gt $growth_counter_max ]];
		then
			let last_aws_max_item_size=$last_aws_max_item_size*$growth_factor
			let growth_counter=1
		fi
	
		if [[ $last_aws_max_item_size -gt $max_item_size ]]; 
		then
			let last_aws_max_item_size=$max_item_size
		fi 

	  psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
	  
	  concat_str=$first_str$last_str
	  	  
	  echo $concat_str >> $RESULT_FILE
		 
	  line_count=`cat  $TMP_FILE | wc -l`
	  let lines=$line_count-1
	  
	  #############################
	  #Get middle of file
	  head -$lines $TMP_FILE > $TMP_MIDDLE
	  
	  line_count=`cat  $TMP_MIDDLE | wc -l`
	  let lines=$line_count-1
	  tail -$lines $TMP_MIDDLE > $TMP_MIDDLE2
	  
	  cat $TMP_MIDDLE2 >> $RESULT_FILE	  
	  
	  first_str=`tail -1 $TMP_FILE`	  
	fi
	  
    let count=$count+1

  done
#
#################################################################

exit 0  

Bəzi izahatlarla skript fraqmentləri:

Skript daxiletmə parametrləri:

  • YYYY-AA-GG-HH24 formatında log fayl adının vaxt möhürü: AWS_LOG_TIME=$1
  • Database ID: database_id=$2
  • Toplanmış jurnal faylının adı: RESULT_FILE=$3

Son yüklənmiş log faylının vaxt möhürünü əldə edin:

current_aws_log_time=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select last_aws_log_time from database where id = $database_id "`

Son yüklənmiş jurnal faylının vaxt möhürü daxiletmə parametrinə uyğun gəlmirsə, yeni jurnal faylı yüklənir:

if [[ $current_aws_log_time != $AWS_LOG_TIME  ]];
  then
    is_new_log='1'
	if ! psql -h ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -c "update database set last_aws_log_time = '$AWS_LOG_TIME' where id = $database_id "
	then
	  echo '***download_aws_piece.sh -FATAL_ERROR - update database set last_aws_log_time .'
	  exit 1
	fi
  else
    is_new_log='0'
  fi

Yüklənmiş fayldan nexttoken etiketinin dəyərini alırıq:

  next_token_str=`cat $LOG_FILE | grep NEXTTOKEN` 
  next_token=`echo $next_token_str | awk -F" " '{ print $2}' `

Boş növbəti işarə dəyəri yükləmənin bitməsinin əlaməti kimi xidmət edir.

Döngüdə biz faylın hissələrini sayırıq, yol boyu xətləri birləşdirərək hissənin ölçüsünü artırırıq:
Əsas Döngü

# MAIN CIRCLE
  let count=2
  while [[ $next_token != '' ]];
  do 
    echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: count='$count
	
	echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: START DOWNLOADING OF AWS LOG'
	if ! aws rds download-db-log-file-portion 
     --max-items $last_aws_max_item_size 
	 --starting-token $next_token 
     --region REGION 
     --db-instance-identifier  $db_instance 
     --log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
	then
		echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
		exit 4
	fi

	next_token_str=`cat $LOG_FILE | grep NEXTTOKEN` 
	next_token=`echo $next_token_str | awk -F" " '{ print $2}' `

	TMP_FILE=$LOG_FILE'.tmp'
	grep -v NEXTTOKEN $LOG_FILE  > $TMP_FILE  
	
	last_str=`head -1 $TMP_FILE`
  
    if [[ $next_token == '' ]];
	then
	  concat_str=$first_str$last_str
	  	  
	  echo $concat_str >> $RESULT_FILE
		 
	  line_count=`cat  $TMP_FILE | wc -l`
	  let lines=$line_count-1
	  
	  tail -$lines $TMP_FILE >> $RESULT_FILE
	  
	  echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:  NEXTTOKEN NOT FOUND - FINISH '
	  rm $LOG_FILE 
	  rm $TMP_FILE
	  rm $TMP_MIDDLE
         rm $TMP_MIDDLE2	  
	  exit 0  
	fi
	
    if [[ $next_token != '' ]];
	then
		let growth_counter=$growth_counter+1
		if [[ $growth_counter -gt $growth_counter_max ]];
		then
			let last_aws_max_item_size=$last_aws_max_item_size*$growth_factor
			let growth_counter=1
		fi
	
		if [[ $last_aws_max_item_size -gt $max_item_size ]]; 
		then
			let last_aws_max_item_size=$max_item_size
		fi 

	  psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
	  
	  concat_str=$first_str$last_str
	  	  
	  echo $concat_str >> $RESULT_FILE
		 
	  line_count=`cat  $TMP_FILE | wc -l`
	  let lines=$line_count-1
	  
	  #############################
	  #Get middle of file
	  head -$lines $TMP_FILE > $TMP_MIDDLE
	  
	  line_count=`cat  $TMP_MIDDLE | wc -l`
	  let lines=$line_count-1
	  tail -$lines $TMP_MIDDLE > $TMP_MIDDLE2
	  
	  cat $TMP_MIDDLE2 >> $RESULT_FILE	  
	  
	  first_str=`tail -1 $TMP_FILE`	  
	fi
	  
    let count=$count+1

  done

Sonra nə var?

Beləliklə, ilk aralıq vəzifə - "buluddan log faylını yükləmək" həll edildi. Yüklənmiş jurnalla nə etməli?
Əvvəlcə log faylını təhlil etməli və ondan faktiki sorğuları çıxarmalısınız.
Tapşırıq çox çətin deyil. Ən sadə bash skripti işi olduqca yaxşı yerinə yetirir.
upload_log_query.sh

#!/bin/bash
#########################################################
# upload_log_query.sh
# Upload table table from dowloaded aws file 
# version HABR
###########################################################  
echo 'TIMESTAMP:'$(date +%c)' Upload log_query table '
source_file=$1
echo 'source_file='$source_file
database_id=$2
echo 'database_id='$database_id

beginer=' '
first_line='1'
let "line_count=0"
sql_line=' '
sql_flag=' '    
space=' '
cat $source_file | while read line
do
  line="$space$line"

  if [[ $first_line == "1" ]]; then
    beginer=`echo $line | awk -F" " '{ print $1}' `
    first_line='0'
  fi

  current_beginer=`echo $line | awk -F" " '{ print $1}' `

  if [[ $current_beginer == $beginer ]]; then
    if [[ $sql_flag == '1' ]]; then
     sql_flag='0' 
     log_date=`echo $sql_line | awk -F" " '{ print $1}' `
     log_time=`echo $sql_line | awk -F" " '{ print $2}' `
     duration=`echo $sql_line | awk -F" " '{ print $5}' `

     #replace ' to ''
     sql_modline=`echo "$sql_line" | sed 's/'''/''''''/g'`
     sql_line=' '

	 ################
	 #PROCESSING OF THE SQL-SELECT IS HERE
     if ! psql -h ENDPOINT.rds.amazonaws.com -U USER -d DATABASE -v ON_ERROR_STOP=1 -A -t -c "select log_query('$ip_port',$database_id , '$log_date' , '$log_time' , '$duration' , '$sql_modline' )" 
     then
        echo 'FATAL_ERROR - log_query '
        exit 1
     fi
	 ################

    fi #if [[ $sql_flag == '1' ]]; then

    let "line_count=line_count+1"

    check=`echo $line | awk -F" " '{ print $8}' `
    check_sql=${check^^}    

    #echo 'check_sql='$check_sql
    
    if [[ $check_sql == 'SELECT' ]]; then
     sql_flag='1'    
     sql_line="$sql_line$line"
	 ip_port=`echo $sql_line | awk -F":" '{ print $4}' `
    fi
  else       

    if [[ $sql_flag == '1' ]]; then
      sql_line="$sql_line$line"
    fi   
    
  fi #if [[ $current_beginer == $beginer ]]; then

done

İndi log faylından seçilmiş sorğu ilə işləyə bilərsiniz.

Və bir sıra faydalı imkanlar açılır.

Təhlil edilmiş sorğular bir yerdə saxlanmalıdır. Bunun üçün bir xidmət masası istifadə olunur log_sorğu

CREATE TABLE log_query
(
   id SERIAL ,
   queryid bigint ,
   query_md5hash text not null ,
   database_id integer not null ,  
   timepoint timestamp without time zone not null,
   duration double precision not null ,
   query text not null ,
   explained_plan text[],
   plan_md5hash text  , 
   explained_plan_wo_costs text[],
   plan_hash_value text  ,
   baseline_id integer ,
   ip text ,
   port text 
);
ALTER TABLE log_query ADD PRIMARY KEY (id);
ALTER TABLE log_query ADD CONSTRAINT queryid_timepoint_unique_key UNIQUE (queryid, timepoint );
ALTER TABLE log_query ADD CONSTRAINT query_md5hash_timepoint_unique_key UNIQUE (query_md5hash, timepoint );

CREATE INDEX log_query_timepoint_idx ON log_query (timepoint);
CREATE INDEX log_query_queryid_idx ON log_query (queryid);
ALTER TABLE log_query ADD CONSTRAINT database_id_fk FOREIGN KEY (database_id) REFERENCES database (id) ON DELETE CASCADE ;

Təhlil edilmiş sorğu işlənir plpgsql funksiyaları"log_sorğu.
log_query.sql

--log_query.sql
--verison HABR
CREATE OR REPLACE FUNCTION log_query( ip_port text ,log_database_id integer , log_date text , log_time text , duration text , sql_line text   ) RETURNS boolean AS $$
DECLARE
  result boolean ;
  log_timepoint timestamp without time zone ;
  log_duration double precision ; 
  pos integer ;
  log_query text ;
  activity_string text ;
  log_md5hash text ;
  log_explain_plan text[] ;
  
  log_planhash text ;
  log_plan_wo_costs text[] ; 
  
  database_rec record ;
  
  pg_stat_query text ; 
  test_log_query text ;
  log_query_rec record;
  found_flag boolean;
  
  pg_stat_history_rec record ;
  port_start integer ;
  port_end integer ;
  client_ip text ;
  client_port text ;
  log_queryid bigint ;
  log_query_text text ;
  pg_stat_query_text text ; 
BEGIN
  result = TRUE ;

  RAISE NOTICE '***log_query';
  
  port_start = position('(' in ip_port);
  port_end = position(')' in ip_port);
  client_ip = substring( ip_port from 1 for port_start-1 );
  client_port = substring( ip_port from port_start+1 for port_end-port_start-1 );

  SELECT e.host , d.name , d.owner_pwd 
  INTO database_rec
  FROM database d JOIN endpoint e ON e.id = d.endpoint_id
  WHERE d.id = log_database_id ;
  
  log_timepoint = to_timestamp(log_date||' '||log_time,'YYYY-MM-DD HH24-MI-SS');
  log_duration = duration:: double precision; 

  
  pos = position ('SELECT' in UPPER(sql_line) );
  log_query = substring( sql_line from pos for LENGTH(sql_line));
  log_query = regexp_replace(log_query,' +',' ','g');
  log_query = regexp_replace(log_query,';+','','g');
  log_query = trim(trailing ' ' from log_query);
 

  log_md5hash = md5( log_query::text );
  
  --Explain execution plan--
  EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')'; 
  
  log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
  log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
    
  PERFORM dblink_disconnect('LINK1');
  --------------------------
  BEGIN
	INSERT INTO log_query
	(
		query_md5hash ,
		database_id , 
		timepoint ,
		duration ,
		query ,
		explained_plan ,
		plan_md5hash , 
		explained_plan_wo_costs , 
		plan_hash_value , 
		ip , 
		port
	) 
	VALUES 
	(
		log_md5hash ,
		log_database_id , 
		log_timepoint , 
		log_duration , 
		log_query ,
		log_explain_plan , 
		md5(log_explain_plan::text) ,
		log_plan_wo_costs , 
		md5(log_plan_wo_costs::text),
		client_ip , 
		client_port		
	);
	activity_string = 	'New query has logged '||
						' database_id = '|| log_database_id ||
						' query_md5hash='||log_md5hash||
						' , timepoint = '||to_char(log_timepoint,'YYYYMMDD HH24:MI:SS');
					
	RAISE NOTICE '%',activity_string;					
					 
	PERFORM pg_log( log_database_id , 'log_query' , activity_string);  

	EXCEPTION
	  WHEN unique_violation THEN
		RAISE NOTICE '*** unique_violation *** query already has logged';
	END;

	SELECT 	queryid
	INTO   	log_queryid
	FROM 	log_query 
	WHERE 	query_md5hash = log_md5hash AND
			timepoint = log_timepoint;

	IF log_queryid IS NOT NULL 
	THEN 
	  RAISE NOTICE 'log_query with query_md5hash = % and timepoint = % has already has a QUERYID = %',log_md5hash,log_timepoint , log_queryid ;
	  RETURN result;
	END IF;
	
	------------------------------------------------
	RAISE NOTICE 'Update queryid';	
	
	SELECT * 
	INTO log_query_rec
	FROM log_query
	WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ; 
	
	log_query_rec.query=regexp_replace(log_query_rec.query,';+','','g');
	
	FOR pg_stat_history_rec IN
	 SELECT 
         queryid ,
	  query 
	 FROM 
         pg_stat_db_queries 
     WHERE  
      database_id = log_database_id AND
       queryid is not null 
	LOOP
	  pg_stat_query = pg_stat_history_rec.query ; 
	  pg_stat_query=regexp_replace(pg_stat_query,'n+',' ','g');
	  pg_stat_query=regexp_replace(pg_stat_query,'t+',' ','g');
	  pg_stat_query=regexp_replace(pg_stat_query,' +',' ','g');
	  pg_stat_query=regexp_replace(pg_stat_query,'$.','%','g');
	
	  log_query_text = trim(trailing ' ' from log_query_rec.query);
	  pg_stat_query_text = pg_stat_query; 
	
	  
	  --SELECT log_query_rec.query like pg_stat_query INTO found_flag ; 
	  IF (log_query_text LIKE pg_stat_query_text) THEN
		found_flag = TRUE ;
	  ELSE
		found_flag = FALSE ;
	  END IF;	  
	  
	  
	  IF found_flag THEN
	    
		UPDATE log_query SET queryid = pg_stat_history_rec.queryid WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
		activity_string = 	' updated queryid = '||pg_stat_history_rec.queryid||
		                    ' for log_query with id = '||log_query_rec.id               
		   				    ;						
	    RAISE NOTICE '%',activity_string;	
		EXIT ;
	  END IF ;
	  
	END LOOP ;
	
  RETURN result ;
END
$$ LANGUAGE plpgsql;

Emal zamanı xidmət masasından istifadə olunur pg_stat_db_queries, cədvəldən cari sorğuların şəklini ehtiva edir pg_stat_history (Cədvəlin istifadəsi burada təsvir edilmişdir - PostgreSQL sorğularının performansının monitorinqi. 1-ci hissə - hesabat)

TABLE pg_stat_db_queries
(
   database_id integer,  
   queryid bigint ,  
   query text , 
   max_time double precision 
);

TABLE pg_stat_history 
(
…
database_id integer ,
…
queryid bigint ,
…
max_time double precision	 , 	
…
);

Funksiya log faylından sorğuların işlənməsi üçün bir sıra faydalı imkanları həyata keçirməyə imkan verir. Məhz:

Fürsət №1 - Sorğunun icra tarixi

Performans insidentini həll etməyə başlamaq üçün çox faydalıdır. Əvvəlcə tarixlə tanış olun - yavaşlama nə vaxt başladı?
Sonra klassiklərə görə, xarici səbəbləri axtarın. Ola bilsin ki, verilənlər bazası yükü sadəcə olaraq kəskin artıb və xüsusi sorğunun bununla heç bir əlaqəsi yoxdur.
log_query cədvəlinə yeni giriş əlavə edin

  port_start = position('(' in ip_port);
  port_end = position(')' in ip_port);
  client_ip = substring( ip_port from 1 for port_start-1 );
  client_port = substring( ip_port from port_start+1 for port_end-port_start-1 );

  SELECT e.host , d.name , d.owner_pwd 
  INTO database_rec
  FROM database d JOIN endpoint e ON e.id = d.endpoint_id
  WHERE d.id = log_database_id ;
  
  log_timepoint = to_timestamp(log_date||' '||log_time,'YYYY-MM-DD HH24-MI-SS');
  log_duration = to_number(duration,'99999999999999999999D9999999999'); 

  
  pos = position ('SELECT' in UPPER(sql_line) );
  log_query = substring( sql_line from pos for LENGTH(sql_line));
  log_query = regexp_replace(log_query,' +',' ','g');
  log_query = regexp_replace(log_query,';+','','g');
  log_query = trim(trailing ' ' from log_query);
 
  RAISE NOTICE 'log_query=%',log_query ;   

  log_md5hash = md5( log_query::text );
  
  --Explain execution plan--
  EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')'; 
  
  log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
  log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
    
  PERFORM dblink_disconnect('LINK1');
  --------------------------
  BEGIN
	INSERT INTO log_query
	(
		query_md5hash ,
		database_id , 
		timepoint ,
		duration ,
		query ,
		explained_plan ,
		plan_md5hash , 
		explained_plan_wo_costs , 
		plan_hash_value , 
		ip , 
		port
	) 
	VALUES 
	(
		log_md5hash ,
		log_database_id , 
		log_timepoint , 
		log_duration , 
		log_query ,
		log_explain_plan , 
		md5(log_explain_plan::text) ,
		log_plan_wo_costs , 
		md5(log_plan_wo_costs::text),
		client_ip , 
		client_port		
	);

İmkan №2 - Sorğunun icra planlarını yadda saxlayın

Bu zaman etiraz-aydınlaşdırma-şərh yarana bilər: “Amma artıq avtomatik izahat var" Bəli, var, amma icra planı eyni jurnal faylında saxlanılırsa və onu sonrakı təhlil üçün saxlamaq üçün log faylını təhlil etməlisinizsə, nə mənası var?

Mənə lazım olan bu idi:
birinci: icra planını monitorinq məlumat bazasının xidmət cədvəlində saxlamaq;
ikincisi: sorğunun icra planının dəyişdiyini dərhal görmək üçün icra planlarını bir-biri ilə müqayisə edə bilmək.

Xüsusi icra parametrləri olan bir sorğu var. EXPLAIN istifadə edərək onun icra planını əldə etmək və saxlamaq ibtidai işdir.
Üstəlik, EXPLAIN (COSTS FALSE) ifadəsindən istifadə edərək, planın skeletini əldə edə bilərsiniz, bu planın hash dəyərini əldə etmək üçün istifadə ediləcək və bu, icra planında dəyişikliklər tarixinin sonrakı təhlilinə kömək edəcəkdir.
İcra planı şablonunu əldə edin

  --Explain execution plan--
  EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')'; 
  
  log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
  log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
    
  PERFORM dblink_disconnect('LINK1');

İmkan №3 - Monitorinq üçün sorğu jurnalından istifadə

Performans göstəriciləri sorğu mətnində deyil, onun identifikatorunda konfiqurasiya edildiyi üçün siz log faylından olan sorğuları performans ölçülərinin konfiqurasiya olunduğu sorğularla əlaqələndirməlisiniz.
Yaxşı, heç olmasa bir performans hadisəsinin baş vermə vaxtını dəqiq bilmək üçün.

Bu yolla, sorğu identifikatoru üçün performans hadisəsi baş verdikdə, xüsusi parametr dəyərləri və sorğunun dəqiq icra müddəti və müddəti ilə xüsusi sorğuya keçid olacaq. Yalnız görünüşdən istifadə edərək bu məlumatı əldə edin pg_stat_statements - qadağandır.
Sorğunun sorğuidini tapın və log_query cədvəlindəki girişi yeniləyin

SELECT * 
	INTO log_query_rec
	FROM log_query
	WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ; 
	
	log_query_rec.query=regexp_replace(log_query_rec.query,';+','','g');
	
	FOR pg_stat_history_rec IN
	 SELECT 
      queryid ,
	  query 
	 FROM 
       pg_stat_db_queries 
     WHERE  
	   database_id = log_database_id AND
       queryid is not null 
	LOOP
	  pg_stat_query = pg_stat_history_rec.query ; 
	  pg_stat_query=regexp_replace(pg_stat_query,'n+',' ','g');
	  pg_stat_query=regexp_replace(pg_stat_query,'t+',' ','g');
	  pg_stat_query=regexp_replace(pg_stat_query,' +',' ','g');
	  pg_stat_query=regexp_replace(pg_stat_query,'$.','%','g');
	
	  log_query_text = trim(trailing ' ' from log_query_rec.query);
	  pg_stat_query_text = pg_stat_query; 
	  
	  --SELECT log_query_rec.query like pg_stat_query INTO found_flag ; 
	  IF (log_query_text LIKE pg_stat_query_text) THEN
		found_flag = TRUE ;
	  ELSE
		found_flag = FALSE ;
	  END IF;	  
	  
	  
	  IF found_flag THEN
	    
		UPDATE log_query SET queryid = pg_stat_history_rec.queryid WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
		activity_string = 	' updated queryid = '||pg_stat_history_rec.queryid||
		                    ' for log_query with id = '||log_query_rec.id		                    
		   				    ;						
					
	    RAISE NOTICE '%',activity_string;	
		EXIT ;
	  END IF ;
	  
	END LOOP ;

Sözündən sonra

Təsvir edilən texnika nəticədə tətbiq tapdı hazırlanmış PostgreSQL sorğu performansının monitorinqi sistemi, yaranan sorğu performans hadisələrini həll edərkən təhlil etmək üçün daha çox məlumat əldə etməyə imkan verir.

Baxmayaraq ki, əlbəttə ki, mənim şəxsi fikrimcə, yüklənmiş hissənin ölçüsünü seçmək və dəyişdirmək üçün alqoritm üzərində daha çox işləmək lazım olacaq. Ümumi işdə problem hələ də həllini tapmayıb. Yəqin ki, maraqlı olacaq.

Amma bu tamam başqa hekayədir...

Mənbə: www.habr.com

Добавить комментарий