AWS булутунан PostgreSQL журналын жүктөө

Же бир аз прикладдык тетрисология.
Жаңы нерсенин баары эски унутулат.
Эпиграфтар.
AWS булутунан PostgreSQL журналын жүктөө

Тапшырманын коюлушу

Сиз мезгил-мезгили менен учурдагы PostgreSQL журнал файлын AWS булутунан жергиликтүү Linux хостуңузга жүктөп турушуңуз керек. реалдуу убакытта эмес, бирок, айталы, бир аз кечигүү менен.
Журнал файлын жаңыртуу жүктөө мөөнөтү 5 мүнөт.
AWSдеги журнал файлы ар бир саат сайын айланып турат.

Колдонулган куралдар

Журнал файлын хостко жүктөп алуу үчүн, AWS API чакырган bash скрипти колдонулат "aws rds жүктөө-db-лог-файл бөлүгү«.

Options:

  • —db-instance-identifier: AWS инстанциясынын аты;
  • --log-file-name: учурда түзүлгөн журнал файлынын аты
  • --max-item: Буйрук чыгарууда кайтарылган нерселердин жалпы саны.Жүктөлгөн файлдын бөлүгүнүн өлчөмү.
  • --starting-token: Башталгыч белги

Бул учурда, журналдарды жүктөө милдети иш учурунда пайда болгон PostgreSQL сурамдарынын аткарылышын көзөмөлдөө.

Ал эми бул жөнөкөй - жумуш убактысында окутуу жана ар түрдүү үчүн кызыктуу тапшырма.
Күнүмдүк турмуштун айынан маселе чечилди деп ойлой берем. Бирок тез Google эч кандай чечимдерди сунуш кылган жок жана мен тереңирээк издөөнү каалабадым. Кандай болбосун, бул жакшы машыгуу.

Тапшырманы формалдаштыруу

Акыркы журнал файлы өзгөрүлмө узундуктагы көптөгөн саптардан турат. Графикалык түрдө, журнал файлы төмөнкүдөй көрсөтүлүшү мүмкүн:
AWS булутунан PostgreSQL журналын жүктөө

Бул сизге бир нерсени эскертип жатабы? Тетристин буга кандай тиешеси бар? Бул жерде анын эмне менен байланышы бар.
Эгерде кийинки файлды графикалык түрдө жүктөөдө пайда болуучу варианттарды элестетсек (жөнөкөйлүк үчүн, бул учурда сызыктар бирдей узундукта болсун), биз алабыз стандарттык Tetris даана:

1) Файл толугу менен жүктөлүп алынган жана акыркы болуп саналат. Бөлүмдүн өлчөмү файлдын акыркы өлчөмүнөн чоңураак:
AWS булутунан PostgreSQL журналын жүктөө

2) Файл уланууда. Бөлүмдүн көлөмү файлдын акыркы өлчөмүнөн кичине:
AWS булутунан PostgreSQL журналын жүктөө

3) Файл мурунку файлдын уландысы жана уландысы бар. Бөлүмдүн өлчөмү акыркы файлдын калган бөлүгүнүн өлчөмүнөн кичине:
AWS булутунан PostgreSQL журналын жүктөө

4) Файл мурунку файлдын уландысы жана акыркы файл болуп саналат. Бөлүмдүн өлчөмү акыркы файлдын калган көлөмүнөн чоңураак:
AWS булутунан PostgreSQL журналын жүктөө

Тапшырма тик бурчту чогултуу же Тетристи жаңы деңгээлде ойноо.
AWS булутунан PostgreSQL журналын жүктөө

Проблеманы чечүүдө пайда болгон көйгөйлөр

1) 2 бөлүктөн турган жипти жабыңыз

AWS булутунан PostgreSQL журналын жүктөө
Негизинен өзгөчө көйгөйлөр болгон жок. Баштапкы программалоо курсунун стандарттуу маселеси.

Оптималдуу кызмат өлчөмү

Бирок бул бир аз кызыктуураак.
Тилекке каршы, баштапкы бөлүгү энбелгисинен кийин офсетти колдонуунун эч кандай жолу жок:

Белгилүү болгондой, -starting-token параметри баракчаларды кайдан баштоону көрсөтүү үчүн колдонулат. Бул параметр String маанилерин алат, бул кийинки Токен сабынын алдына офсет маанисин кошууга аракет кылсаңыз, опция офсет катары каралбайт дегенди билдирет.

Ошондуктан, аны бөлүктөргө бөлүп окуу керек.
Эгерде сиз чоң бөлүктөр менен окусаңыз, анда окуулардын саны минималдуу болот, бирок көлөмү максималдуу болот.
Эгер сиз кичинекей бөлүктөр менен окусаңыз, анда тескерисинче, окуулардын саны максималдуу болот, бирок көлөмү минималдуу болот.
Ошондуктан, жол кыймылын азайтуу жана чечимдин жалпы кооздугу үчүн, мен, тилекке каршы, бир аз балдак сыяктуу көрүнгөн чечимди ойлоп табууга туура келди.

Иллюстрация үчүн, лог файлын өтө жөнөкөйлөштүрүлгөн 2 версияда жүктөө процессин карап көрөлү. Эки учурда тең окуулардын саны порциянын өлчөмүнө жараша болот.

1) Чакан бөлүктөргө жүктөө:
AWS булутунан PostgreSQL журналын жүктөө

2) чоң бөлүктөрүндө жүктөө:
AWS булутунан PostgreSQL журналын жүктөө

Адаттагыдай эле оптималдуу чечим ортодо.
Кызматтын көлөмү минималдуу, бирок окуу процессинде окуулардын санын азайтуу үчүн өлчөмүн көбөйтүүгө болот.

Белгилей кетчү нерсе окула турган бөлүктүн оптималдуу өлчөмүн тандоо маселеси али чечиле электиги жана тереңирээк изилдөөнү жана талдоону талап кылат. Балким, бир аз кийинчерээк.

Ишке ашыруунун жалпы сүрөттөлүшү

Кызмат столдору колдонулат

CREATE TABLE endpoint
(
id SERIAL ,
host text 
);

TABLE database
(
id SERIAL , 
…
last_aws_log_time text ,
last_aws_nexttoken text ,
aws_max_item_size integer 
);
last_aws_log_time — временная метка последнего загруженного лог-файла в формате YYYY-MM-DD-HH24.
last_aws_nexttoken — текстовая метка последней загруженной порции.
aws_max_item_size- эмпирическим путем, подобранный начальный размер порции.

Толук скрипт тексти

download_aws_piece.sh

#!/bin/bash
#########################################################
# download_aws_piece.sh
# downloan piece of log from AWS
# version HABR
 let min_item_size=1024
 let max_item_size=1048576
 let growth_factor=3
 let growth_counter=1
 let growth_counter_max=3

 echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:''STARTED'
 
 AWS_LOG_TIME=$1
 echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:AWS_LOG_TIME='$AWS_LOG_TIME
  
 database_id=$2
 echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:database_id='$database_id
 RESULT_FILE=$3 
  
 endpoint=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE_DATABASE -A -t -c "select e.host from endpoint e join database d on e.id = d.endpoint_id where d.id = $database_id "`
 echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:endpoint='$endpoint
  
 db_instance=`echo $endpoint | awk -F"." '{print toupper($1)}'`
 
 echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:db_instance='$db_instance

 LOG_FILE=$RESULT_FILE'.tmp_log'
 TMP_FILE=$LOG_FILE'.tmp'
 TMP_MIDDLE=$LOG_FILE'.tmp_mid'  
 TMP_MIDDLE2=$LOG_FILE'.tmp_mid2'  
  
 current_aws_log_time=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select last_aws_log_time from database where id = $database_id "`

 echo $(date +%Y%m%d%H%M)':      download_aws_piece.sh:current_aws_log_time='$current_aws_log_time
  
  if [[ $current_aws_log_time != $AWS_LOG_TIME  ]];
  then
    is_new_log='1'
	if ! psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -q -c "update database set last_aws_log_time = '$AWS_LOG_TIME' where id = $database_id "
	then
	  echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: FATAL_ERROR - update database set last_aws_log_time .'
	  exit 1
	fi
  else
    is_new_log='0'
  fi
  
  echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:is_new_log='$is_new_log
  
  let last_aws_max_item_size=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select aws_max_item_size from database where id = $database_id "`
  echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: last_aws_max_item_size='$last_aws_max_item_size
  
  let count=1
  if [[ $is_new_log == '1' ]];
  then    
	echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: START DOWNLOADING OF NEW AWS LOG'
	if ! aws rds download-db-log-file-portion 
		--max-items $last_aws_max_item_size 
		--region REGION 
		--db-instance-identifier  $db_instance 
		--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
	then
		echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
		exit 2
	fi  	
  else
    next_token=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -c "select last_aws_nexttoken from database where id = $database_id "`
	
	if [[ $next_token == '' ]];
	then
	  next_token='0'	  
	fi
	
	echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: CONTINUE DOWNLOADING OF AWS LOG'
	if ! aws rds download-db-log-file-portion 
	    --max-items $last_aws_max_item_size 
		--starting-token $next_token 
		--region REGION 
		--db-instance-identifier  $db_instance 
		--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
	then
		echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
		exit 3
	fi       
	
	line_count=`cat  $LOG_FILE | wc -l`
	let lines=$line_count-1
	  
	tail -$lines $LOG_FILE > $TMP_MIDDLE 
	mv -f $TMP_MIDDLE $LOG_FILE
  fi
  
  next_token_str=`cat $LOG_FILE | grep NEXTTOKEN` 
  next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
  
  grep -v NEXTTOKEN $LOG_FILE  > $TMP_FILE 
  
  if [[ $next_token == '' ]];
  then
	  cp $TMP_FILE $RESULT_FILE
	  
	  echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:  NEXTTOKEN NOT FOUND - FINISH '
	  rm $LOG_FILE 
	  rm $TMP_FILE
	  rm $TMP_MIDDLE
          rm $TMP_MIDDLE2	  
	  exit 0  
  else
	psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
  fi
  
  first_str=`tail -1 $TMP_FILE`
  
  line_count=`cat  $TMP_FILE | wc -l`
  let lines=$line_count-1    
  
  head -$lines $TMP_FILE  > $RESULT_FILE

###############################################
# MAIN CIRCLE
  let count=2
  while [[ $next_token != '' ]];
  do 
    echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: count='$count
	
	echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: START DOWNLOADING OF AWS LOG'
	if ! aws rds download-db-log-file-portion 
             --max-items $last_aws_max_item_size 
             --starting-token $next_token 
             --region REGION 
             --db-instance-identifier  $db_instance 
             --log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
	then
		echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
		exit 4
	fi

	next_token_str=`cat $LOG_FILE | grep NEXTTOKEN` 
	next_token=`echo $next_token_str | awk -F" " '{ print $2}' `

	TMP_FILE=$LOG_FILE'.tmp'
	grep -v NEXTTOKEN $LOG_FILE  > $TMP_FILE  
	
	last_str=`head -1 $TMP_FILE`
  
    if [[ $next_token == '' ]];
	then
	  concat_str=$first_str$last_str
	  	  
	  echo $concat_str >> $RESULT_FILE
		 
	  line_count=`cat  $TMP_FILE | wc -l`
	  let lines=$line_count-1
	  
	  tail -$lines $TMP_FILE >> $RESULT_FILE
	  
	  echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:  NEXTTOKEN NOT FOUND - FINISH '
	  rm $LOG_FILE 
	  rm $TMP_FILE
	  rm $TMP_MIDDLE
          rm $TMP_MIDDLE2	  
	  exit 0  
	fi
	
    if [[ $next_token != '' ]];
	then
		let growth_counter=$growth_counter+1
		if [[ $growth_counter -gt $growth_counter_max ]];
		then
			let last_aws_max_item_size=$last_aws_max_item_size*$growth_factor
			let growth_counter=1
		fi
	
		if [[ $last_aws_max_item_size -gt $max_item_size ]]; 
		then
			let last_aws_max_item_size=$max_item_size
		fi 

	  psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
	  
	  concat_str=$first_str$last_str
	  	  
	  echo $concat_str >> $RESULT_FILE
		 
	  line_count=`cat  $TMP_FILE | wc -l`
	  let lines=$line_count-1
	  
	  #############################
	  #Get middle of file
	  head -$lines $TMP_FILE > $TMP_MIDDLE
	  
	  line_count=`cat  $TMP_MIDDLE | wc -l`
	  let lines=$line_count-1
	  tail -$lines $TMP_MIDDLE > $TMP_MIDDLE2
	  
	  cat $TMP_MIDDLE2 >> $RESULT_FILE	  
	  
	  first_str=`tail -1 $TMP_FILE`	  
	fi
	  
    let count=$count+1

  done
#
#################################################################

exit 0  

Кээ бир түшүндүрмөлөр менен скрипт фрагменттери:

Скрипт киргизүү параметрлери:

  • ЖЖЖЖ-АА-КК-HH24 форматындагы журнал файлынын аталышынын убакыт белгиси: AWS_LOG_TIME=$1
  • Маалымат базасы ID: database_id=$2
  • Чогултулган журнал файлынын аты: RESULT_FILE=$3

Акыркы жүктөлгөн журнал файлынын убакыт белгисин алыңыз:

current_aws_log_time=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select last_aws_log_time from database where id = $database_id "`

Акыркы жүктөлгөн журнал файлынын убакыт белгиси киргизүү параметрине дал келбесе, жаңы журнал файлы жүктөлөт:

if [[ $current_aws_log_time != $AWS_LOG_TIME  ]];
  then
    is_new_log='1'
	if ! psql -h ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -c "update database set last_aws_log_time = '$AWS_LOG_TIME' where id = $database_id "
	then
	  echo '***download_aws_piece.sh -FATAL_ERROR - update database set last_aws_log_time .'
	  exit 1
	fi
  else
    is_new_log='0'
  fi

Жүктөлгөн файлдан кийинки белгинин маанисин алабыз:

  next_token_str=`cat $LOG_FILE | grep NEXTTOKEN` 
  next_token=`echo $next_token_str | awk -F" " '{ print $2}' `

Бош кийинки белги жүктөөнүн аякташынын белгиси катары кызмат кылат.

Циклде биз файлдын бөлүктөрүн санайбыз, жол боюндагы сызыктарды бириктиребиз жана бөлүктүн көлөмүн көбөйтөбүз:
Негизги цикл

# MAIN CIRCLE
  let count=2
  while [[ $next_token != '' ]];
  do 
    echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: count='$count
	
	echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: START DOWNLOADING OF AWS LOG'
	if ! aws rds download-db-log-file-portion 
     --max-items $last_aws_max_item_size 
	 --starting-token $next_token 
     --region REGION 
     --db-instance-identifier  $db_instance 
     --log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
	then
		echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
		exit 4
	fi

	next_token_str=`cat $LOG_FILE | grep NEXTTOKEN` 
	next_token=`echo $next_token_str | awk -F" " '{ print $2}' `

	TMP_FILE=$LOG_FILE'.tmp'
	grep -v NEXTTOKEN $LOG_FILE  > $TMP_FILE  
	
	last_str=`head -1 $TMP_FILE`
  
    if [[ $next_token == '' ]];
	then
	  concat_str=$first_str$last_str
	  	  
	  echo $concat_str >> $RESULT_FILE
		 
	  line_count=`cat  $TMP_FILE | wc -l`
	  let lines=$line_count-1
	  
	  tail -$lines $TMP_FILE >> $RESULT_FILE
	  
	  echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:  NEXTTOKEN NOT FOUND - FINISH '
	  rm $LOG_FILE 
	  rm $TMP_FILE
	  rm $TMP_MIDDLE
         rm $TMP_MIDDLE2	  
	  exit 0  
	fi
	
    if [[ $next_token != '' ]];
	then
		let growth_counter=$growth_counter+1
		if [[ $growth_counter -gt $growth_counter_max ]];
		then
			let last_aws_max_item_size=$last_aws_max_item_size*$growth_factor
			let growth_counter=1
		fi
	
		if [[ $last_aws_max_item_size -gt $max_item_size ]]; 
		then
			let last_aws_max_item_size=$max_item_size
		fi 

	  psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
	  
	  concat_str=$first_str$last_str
	  	  
	  echo $concat_str >> $RESULT_FILE
		 
	  line_count=`cat  $TMP_FILE | wc -l`
	  let lines=$line_count-1
	  
	  #############################
	  #Get middle of file
	  head -$lines $TMP_FILE > $TMP_MIDDLE
	  
	  line_count=`cat  $TMP_MIDDLE | wc -l`
	  let lines=$line_count-1
	  tail -$lines $TMP_MIDDLE > $TMP_MIDDLE2
	  
	  cat $TMP_MIDDLE2 >> $RESULT_FILE	  
	  
	  first_str=`tail -1 $TMP_FILE`	  
	fi
	  
    let count=$count+1

  done

Кийинкиси эмне?

Ошентип, биринчи аралык милдет - "булуттан журнал файлын жүктөп алуу" чечилди. Жүктөлүп алынган журнал менен эмне кылуу керек?
Биринчиден, сиз журнал файлын талдап, андан чыныгы суроо-талаптарды чыгарып алышыңыз керек.
Тапшырма өтө оор эмес. Эң жөнөкөй bash скрипти ишти жакшы аткарат.
upload_log_query.sh

#!/bin/bash
#########################################################
# upload_log_query.sh
# Upload table table from dowloaded aws file 
# version HABR
###########################################################  
echo 'TIMESTAMP:'$(date +%c)' Upload log_query table '
source_file=$1
echo 'source_file='$source_file
database_id=$2
echo 'database_id='$database_id

beginer=' '
first_line='1'
let "line_count=0"
sql_line=' '
sql_flag=' '    
space=' '
cat $source_file | while read line
do
  line="$space$line"

  if [[ $first_line == "1" ]]; then
    beginer=`echo $line | awk -F" " '{ print $1}' `
    first_line='0'
  fi

  current_beginer=`echo $line | awk -F" " '{ print $1}' `

  if [[ $current_beginer == $beginer ]]; then
    if [[ $sql_flag == '1' ]]; then
     sql_flag='0' 
     log_date=`echo $sql_line | awk -F" " '{ print $1}' `
     log_time=`echo $sql_line | awk -F" " '{ print $2}' `
     duration=`echo $sql_line | awk -F" " '{ print $5}' `

     #replace ' to ''
     sql_modline=`echo "$sql_line" | sed 's/'''/''''''/g'`
     sql_line=' '

	 ################
	 #PROCESSING OF THE SQL-SELECT IS HERE
     if ! psql -h ENDPOINT.rds.amazonaws.com -U USER -d DATABASE -v ON_ERROR_STOP=1 -A -t -c "select log_query('$ip_port',$database_id , '$log_date' , '$log_time' , '$duration' , '$sql_modline' )" 
     then
        echo 'FATAL_ERROR - log_query '
        exit 1
     fi
	 ################

    fi #if [[ $sql_flag == '1' ]]; then

    let "line_count=line_count+1"

    check=`echo $line | awk -F" " '{ print $8}' `
    check_sql=${check^^}    

    #echo 'check_sql='$check_sql
    
    if [[ $check_sql == 'SELECT' ]]; then
     sql_flag='1'    
     sql_line="$sql_line$line"
	 ip_port=`echo $sql_line | awk -F":" '{ print $4}' `
    fi
  else       

    if [[ $sql_flag == '1' ]]; then
      sql_line="$sql_line$line"
    fi   
    
  fi #if [[ $current_beginer == $beginer ]]; then

done

Эми сиз журнал файлынан тандалган суроо менен иштей аласыз.

Жана бир нече пайдалуу мүмкүнчүлүктөр ачылат.

Талданган сурамдар бир жерде сакталышы керек. Бул үчүн тейлөө стол колдонулат log_query

CREATE TABLE log_query
(
   id SERIAL ,
   queryid bigint ,
   query_md5hash text not null ,
   database_id integer not null ,  
   timepoint timestamp without time zone not null,
   duration double precision not null ,
   query text not null ,
   explained_plan text[],
   plan_md5hash text  , 
   explained_plan_wo_costs text[],
   plan_hash_value text  ,
   baseline_id integer ,
   ip text ,
   port text 
);
ALTER TABLE log_query ADD PRIMARY KEY (id);
ALTER TABLE log_query ADD CONSTRAINT queryid_timepoint_unique_key UNIQUE (queryid, timepoint );
ALTER TABLE log_query ADD CONSTRAINT query_md5hash_timepoint_unique_key UNIQUE (query_md5hash, timepoint );

CREATE INDEX log_query_timepoint_idx ON log_query (timepoint);
CREATE INDEX log_query_queryid_idx ON log_query (queryid);
ALTER TABLE log_query ADD CONSTRAINT database_id_fk FOREIGN KEY (database_id) REFERENCES database (id) ON DELETE CASCADE ;

Талданган сурам иштетилет plpgsql функциялары"log_query«.
log_query.sql

--log_query.sql
--verison HABR
CREATE OR REPLACE FUNCTION log_query( ip_port text ,log_database_id integer , log_date text , log_time text , duration text , sql_line text   ) RETURNS boolean AS $$
DECLARE
  result boolean ;
  log_timepoint timestamp without time zone ;
  log_duration double precision ; 
  pos integer ;
  log_query text ;
  activity_string text ;
  log_md5hash text ;
  log_explain_plan text[] ;
  
  log_planhash text ;
  log_plan_wo_costs text[] ; 
  
  database_rec record ;
  
  pg_stat_query text ; 
  test_log_query text ;
  log_query_rec record;
  found_flag boolean;
  
  pg_stat_history_rec record ;
  port_start integer ;
  port_end integer ;
  client_ip text ;
  client_port text ;
  log_queryid bigint ;
  log_query_text text ;
  pg_stat_query_text text ; 
BEGIN
  result = TRUE ;

  RAISE NOTICE '***log_query';
  
  port_start = position('(' in ip_port);
  port_end = position(')' in ip_port);
  client_ip = substring( ip_port from 1 for port_start-1 );
  client_port = substring( ip_port from port_start+1 for port_end-port_start-1 );

  SELECT e.host , d.name , d.owner_pwd 
  INTO database_rec
  FROM database d JOIN endpoint e ON e.id = d.endpoint_id
  WHERE d.id = log_database_id ;
  
  log_timepoint = to_timestamp(log_date||' '||log_time,'YYYY-MM-DD HH24-MI-SS');
  log_duration = duration:: double precision; 

  
  pos = position ('SELECT' in UPPER(sql_line) );
  log_query = substring( sql_line from pos for LENGTH(sql_line));
  log_query = regexp_replace(log_query,' +',' ','g');
  log_query = regexp_replace(log_query,';+','','g');
  log_query = trim(trailing ' ' from log_query);
 

  log_md5hash = md5( log_query::text );
  
  --Explain execution plan--
  EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')'; 
  
  log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
  log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
    
  PERFORM dblink_disconnect('LINK1');
  --------------------------
  BEGIN
	INSERT INTO log_query
	(
		query_md5hash ,
		database_id , 
		timepoint ,
		duration ,
		query ,
		explained_plan ,
		plan_md5hash , 
		explained_plan_wo_costs , 
		plan_hash_value , 
		ip , 
		port
	) 
	VALUES 
	(
		log_md5hash ,
		log_database_id , 
		log_timepoint , 
		log_duration , 
		log_query ,
		log_explain_plan , 
		md5(log_explain_plan::text) ,
		log_plan_wo_costs , 
		md5(log_plan_wo_costs::text),
		client_ip , 
		client_port		
	);
	activity_string = 	'New query has logged '||
						' database_id = '|| log_database_id ||
						' query_md5hash='||log_md5hash||
						' , timepoint = '||to_char(log_timepoint,'YYYYMMDD HH24:MI:SS');
					
	RAISE NOTICE '%',activity_string;					
					 
	PERFORM pg_log( log_database_id , 'log_query' , activity_string);  

	EXCEPTION
	  WHEN unique_violation THEN
		RAISE NOTICE '*** unique_violation *** query already has logged';
	END;

	SELECT 	queryid
	INTO   	log_queryid
	FROM 	log_query 
	WHERE 	query_md5hash = log_md5hash AND
			timepoint = log_timepoint;

	IF log_queryid IS NOT NULL 
	THEN 
	  RAISE NOTICE 'log_query with query_md5hash = % and timepoint = % has already has a QUERYID = %',log_md5hash,log_timepoint , log_queryid ;
	  RETURN result;
	END IF;
	
	------------------------------------------------
	RAISE NOTICE 'Update queryid';	
	
	SELECT * 
	INTO log_query_rec
	FROM log_query
	WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ; 
	
	log_query_rec.query=regexp_replace(log_query_rec.query,';+','','g');
	
	FOR pg_stat_history_rec IN
	 SELECT 
         queryid ,
	  query 
	 FROM 
         pg_stat_db_queries 
     WHERE  
      database_id = log_database_id AND
       queryid is not null 
	LOOP
	  pg_stat_query = pg_stat_history_rec.query ; 
	  pg_stat_query=regexp_replace(pg_stat_query,'n+',' ','g');
	  pg_stat_query=regexp_replace(pg_stat_query,'t+',' ','g');
	  pg_stat_query=regexp_replace(pg_stat_query,' +',' ','g');
	  pg_stat_query=regexp_replace(pg_stat_query,'$.','%','g');
	
	  log_query_text = trim(trailing ' ' from log_query_rec.query);
	  pg_stat_query_text = pg_stat_query; 
	
	  
	  --SELECT log_query_rec.query like pg_stat_query INTO found_flag ; 
	  IF (log_query_text LIKE pg_stat_query_text) THEN
		found_flag = TRUE ;
	  ELSE
		found_flag = FALSE ;
	  END IF;	  
	  
	  
	  IF found_flag THEN
	    
		UPDATE log_query SET queryid = pg_stat_history_rec.queryid WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
		activity_string = 	' updated queryid = '||pg_stat_history_rec.queryid||
		                    ' for log_query with id = '||log_query_rec.id               
		   				    ;						
	    RAISE NOTICE '%',activity_string;	
		EXIT ;
	  END IF ;
	  
	END LOOP ;
	
  RETURN result ;
END
$$ LANGUAGE plpgsql;

Иштетүү учурунда тейлөө столу колдонулат pg_stat_db_queries, таблицадагы учурдагы сурамдардын сүрөтүн камтыган pg_stat_history (Таблицаны колдонуу бул жерде сүрөттөлгөн - PostgreSQL сурамдарынын аткарылышын көзөмөлдөө. 1-бөлүк - отчеттуулук)

TABLE pg_stat_db_queries
(
   database_id integer,  
   queryid bigint ,  
   query text , 
   max_time double precision 
);

TABLE pg_stat_history 
(
…
database_id integer ,
…
queryid bigint ,
…
max_time double precision	 , 	
…
);

Функция журнал файлынан сурамдарды иштетүү үчүн бир катар пайдалуу мүмкүнчүлүктөрдү ишке ашырууга мүмкүндүк берет. Тактап айтканда:

Мүмкүнчүлүк №1 - Суроолорду аткаруу тарыхы

аткаруу инцидент чече баштоо үчүн абдан пайдалуу. Биринчиден, тарых менен таанышыңыз – жайлоо качан башталган?
Андан кийин, классиктердин айтымында, тышкы себептерди издөө. Мүмкүн, базанын жүгү жөн эле кескин көбөйгөн жана конкреттүү суроо-талаптын аны менен эч кандай байланышы жок.
log_query таблицасына жаңы жазуу кошуңуз

  port_start = position('(' in ip_port);
  port_end = position(')' in ip_port);
  client_ip = substring( ip_port from 1 for port_start-1 );
  client_port = substring( ip_port from port_start+1 for port_end-port_start-1 );

  SELECT e.host , d.name , d.owner_pwd 
  INTO database_rec
  FROM database d JOIN endpoint e ON e.id = d.endpoint_id
  WHERE d.id = log_database_id ;
  
  log_timepoint = to_timestamp(log_date||' '||log_time,'YYYY-MM-DD HH24-MI-SS');
  log_duration = to_number(duration,'99999999999999999999D9999999999'); 

  
  pos = position ('SELECT' in UPPER(sql_line) );
  log_query = substring( sql_line from pos for LENGTH(sql_line));
  log_query = regexp_replace(log_query,' +',' ','g');
  log_query = regexp_replace(log_query,';+','','g');
  log_query = trim(trailing ' ' from log_query);
 
  RAISE NOTICE 'log_query=%',log_query ;   

  log_md5hash = md5( log_query::text );
  
  --Explain execution plan--
  EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')'; 
  
  log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
  log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
    
  PERFORM dblink_disconnect('LINK1');
  --------------------------
  BEGIN
	INSERT INTO log_query
	(
		query_md5hash ,
		database_id , 
		timepoint ,
		duration ,
		query ,
		explained_plan ,
		plan_md5hash , 
		explained_plan_wo_costs , 
		plan_hash_value , 
		ip , 
		port
	) 
	VALUES 
	(
		log_md5hash ,
		log_database_id , 
		log_timepoint , 
		log_duration , 
		log_query ,
		log_explain_plan , 
		md5(log_explain_plan::text) ,
		log_plan_wo_costs , 
		md5(log_plan_wo_costs::text),
		client_ip , 
		client_port		
	);

Мүмкүнчүлүк №2 - Сурамдарды аткаруу пландарын сактоо

Бул учурда каршы-тактоо-комментарий пайда болушу мүмкүн: "Бирок буга чейин автотүшүндүрүү бар" Ооба, ал бар, бирок аткаруу планы ошол эле журнал файлында сакталса жана аны андан ары талдоо үчүн сактоо үчүн журнал файлын талдооңуз керекпи?

Мага эмне керек болчу:
биринчи: мониторингдин маалымат базасынын тейлөө таблицасында аткаруу планын сактоо;
экинчиден: суроону аткаруу планы өзгөргөнүн дароо көрүү үчүн аткаруу пландарын бири-бири менен салыштыра билүү.

конкреттүү аткаруу параметрлери менен суроо-талап бар. EXPLAIN аркылуу анын аткаруу планын алуу жана сактоо - бул элементардык милдет.
Мындан тышкары, EXPLAIN (COSTS FALSE) туюнтмасын колдонуу менен, сиз пландын скелетин ала аласыз, ал пландын хэш маанисин алуу үчүн колдонулат, бул аткаруу планындагы өзгөрүүлөрдүн тарыхын кийинки талдоого жардам берет.
Аткаруу планынын шаблонун алыңыз

  --Explain execution plan--
  EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')'; 
  
  log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
  log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
    
  PERFORM dblink_disconnect('LINK1');

Мүмкүнчүлүк №3 - Мониторинг үчүн суроо журналын колдонуу

Майнаптуулук көрсөткүчтөрү суроо-талаптын текстинде эмес, анын идентификаторунда конфигурациялангандыктан, сиз журнал файлындагы сурамдарды аткаруу көрсөткүчтөрү конфигурацияланган сурамдар менен байланыштырышыңыз керек.
Жок дегенде, аткаруу инцидентинин так убактысын билүү үчүн.

Ушундай жол менен, суроо-талаптын идентификатору үчүн аткаруу инциденти болгондо, белгилүү бир параметрдин маанилери жана суроо-талаптын так аткарылуу убактысы жана узактыгы менен белгилүү бир суроого шилтеме болот. Бул маалыматты көрүү аркылуу гана алыңыз pg_stat_statements - тыюу салынган.
Сурамдын queryidди табыңыз жана log_query таблицасындагы жазууну жаңыртыңыз

SELECT * 
	INTO log_query_rec
	FROM log_query
	WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ; 
	
	log_query_rec.query=regexp_replace(log_query_rec.query,';+','','g');
	
	FOR pg_stat_history_rec IN
	 SELECT 
      queryid ,
	  query 
	 FROM 
       pg_stat_db_queries 
     WHERE  
	   database_id = log_database_id AND
       queryid is not null 
	LOOP
	  pg_stat_query = pg_stat_history_rec.query ; 
	  pg_stat_query=regexp_replace(pg_stat_query,'n+',' ','g');
	  pg_stat_query=regexp_replace(pg_stat_query,'t+',' ','g');
	  pg_stat_query=regexp_replace(pg_stat_query,' +',' ','g');
	  pg_stat_query=regexp_replace(pg_stat_query,'$.','%','g');
	
	  log_query_text = trim(trailing ' ' from log_query_rec.query);
	  pg_stat_query_text = pg_stat_query; 
	  
	  --SELECT log_query_rec.query like pg_stat_query INTO found_flag ; 
	  IF (log_query_text LIKE pg_stat_query_text) THEN
		found_flag = TRUE ;
	  ELSE
		found_flag = FALSE ;
	  END IF;	  
	  
	  
	  IF found_flag THEN
	    
		UPDATE log_query SET queryid = pg_stat_history_rec.queryid WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
		activity_string = 	' updated queryid = '||pg_stat_history_rec.queryid||
		                    ' for log_query with id = '||log_query_rec.id		                    
		   				    ;						
					
	    RAISE NOTICE '%',activity_string;	
		EXIT ;
	  END IF ;
	  
	END LOOP ;

аягы

Сүрөттөлгөн техника акырында колдонууну тапты иштелип чыккан PostgreSQL суроо аткаруу мониторинг системасы, пайда болгон суроо аткаруу инциденттерин чечүүдө талдоо үчүн көбүрөөк маалыматка ээ болууга мүмкүндүк берет.

Албетте, менин жеке оюмча, жүктөлүп алынган бөлүктүн көлөмүн тандоо жана өзгөртүү алгоритминин үстүнөн көбүрөөк иштөө керек болот. Жалпы иш боюнча маселе азырынча чечиле элек. Бул, балким, кызыктуу болот.

Бирок бул таптакыр башка окуя...

Source: www.habr.com

Комментарий кошуу