From ec071781cad750dfeca1724cdfda5acc4fc8c7a0 Mon Sep 17 00:00:00 2001 From: peitingwei Date: Mon, 17 Oct 2022 17:00:00 +0800 Subject: [PATCH 1/4] support hot data precache by checkpoint --- src/backend/access/transam/xlog.c | 5 + src/backend/catalog/Makefile | 3 +- src/backend/catalog/pg_hot_data.c | 276 ++++++++++++++++++++++++++++++ src/include/catalog/pg_hot_data.h | 2 + 4 files changed, 285 insertions(+), 1 deletion(-) create mode 100644 src/backend/catalog/pg_hot_data.c diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index cee44bb..1c62fd6 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -40,6 +40,7 @@ #include "catalog/catversion.h" #include "catalog/pg_control.h" #include "catalog/pg_database.h" +#include "catalog/pg_hot_data.h" #include "commands/progress.h" #include "commands/tablespace.h" #include "common/controldata_utils.h" @@ -9625,6 +9626,10 @@ void PushCheckPointGuts(XLogRecPtr checkPointRedo, int flags) { static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags) { + if (ControlFile->state == DB_IN_PRODUCTION && !push_standby) + { + PrecacheHotData(); + } CheckPointRelationMap(); CheckPointReplicationSlots(); CheckPointSnapBuild(); diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile index 487a4ac..dac70b8 100644 --- a/src/backend/catalog/Makefile +++ b/src/backend/catalog/Makefile @@ -43,7 +43,8 @@ OBJS = \ pg_subscription.o \ pg_type.o \ storage.o \ - toasting.o + toasting.o \ + pg_hot_data.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/catalog/pg_hot_data.c b/src/backend/catalog/pg_hot_data.c new file mode 100644 index 0000000..550064e --- /dev/null +++ b/src/backend/catalog/pg_hot_data.c @@ -0,0 +1,276 @@ +/*------------------------------------------------------------------------- + * + * pg_hot_data.c + * for hot data precache + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "catalog/pg_hot_data.h" +#include "libpq-fe.h" +#include "lib/stringinfo.h" +#include "utils/timestamp.h" +#include "access/xlog.h" +#include "postmaster/postmaster.h" +#include + +void PrecacheHotData() +{ + char instanceName[NAMEDATALEN]; //default:master + char primaryHost[16]; //default:127.0.0.1 + char primaryUser[NAMEDATALEN]; //default:postgres + char primaryPw[NAMEDATALEN]; //default:123456 + char primaryPort[8]; //default:PostPortNumber + char localPort[8]; //default:master + StringInfoData cmd, primaryConnStr, localConnStr; + + initStringInfo(&cmd); + initStringInfo(&primaryConnStr); + initStringInfo(&localConnStr); + + memset(instanceName, 0, NAMEDATALEN); + memset(primaryHost, 0, 16); + memset(primaryUser, 0, NAMEDATALEN); + memset(primaryPw, 0, NAMEDATALEN); + memset(primaryPort, 0, 8); + memset(localPort, 0, 8); + + //parse + if (strlen(PrimaryConnInfo) > 0) + { + char *temStr; + char *temChr; + int temStrLen; + + //instanceName + temStr = strstr(PrimaryConnInfo, "application_name="); + temStrLen = strlen("application_name="); + + if (temStr != NULL) + { + temChr = strchr(temStr, ' '); + if (temChr != NULL) + { + memcpy(instanceName, temStr + temStrLen, temChr - temStr - temStrLen); + } + else + { + strcpy(instanceName, temStr + temStrLen); + } + } + else + { + strcpy(instanceName, "master"); + } + + //primaryHost + temStr = strstr(PrimaryConnInfo, " host="); + temStrLen = strlen(" host="); + + if (temStr != NULL) + { + temChr = strchr(temStr, ' '); + if (temChr != NULL) + { + memcpy(primaryHost, temStr + temStrLen, temChr - temStr - temStrLen); + } + else + { + strcpy(primaryHost, temStr + temStrLen); + } + } + else + { + strcpy(primaryHost, "127.0.0.1"); + } + + //primaryUser + temStr = strstr(PrimaryConnInfo, " user="); + temStrLen = strlen(" user="); + + if (temStr != NULL) + { + temChr = strchr(temStr, ' '); + if (temChr != NULL) + { + memcpy(primaryUser, temStr + temStrLen, temChr - temStr - temStrLen); + } + else + { + strcpy(primaryUser, temStr + temStrLen); + } + } + else + { + strcpy(primaryUser, "postgres"); + } + + //primaryPw + temStr = strstr(PrimaryConnInfo, " password="); + temStrLen = strlen(" password="); + + if (temStr != NULL) + { + temChr = strchr(temStr, ' '); + if (temChr != NULL) + { + memcpy(primaryPw, temStr + temStrLen, temChr - temStr - temStrLen); + } + else + { + strcpy(primaryPw, temStr + temStrLen); + } + } + else + { + strcpy(primaryPw, "123456"); + } + + //primaryPort + temStr = strstr(PrimaryConnInfo, " port="); + temStrLen = strlen(" port="); + + if (temStr != NULL) + { + temChr = strchr(temStr, ' '); + if (temChr != NULL) + { + memcpy(primaryPort, temStr + temStrLen, temChr - temStr - temStrLen); + } + else + { + strcpy(primaryPort, temStr + temStrLen); + } + } + else + { + sprintf(primaryPort, "%d", PostPortNumber); + } + } + else + { + strcpy(instanceName, "master"); + strcpy(primaryHost, "127.0.0.1"); + strcpy(primaryUser, "postgres"); + strcpy(primaryPw, "123456"); + sprintf(primaryPort, "%d", PostPortNumber); + } + + //assemble primaryConnStr + appendStringInfoString(&primaryConnStr, "host="); + appendStringInfoString(&primaryConnStr, primaryHost); + appendStringInfoString(&primaryConnStr, " user="); + appendStringInfoString(&primaryConnStr, primaryUser); + appendStringInfoString(&primaryConnStr, " password="); + appendStringInfoString(&primaryConnStr, primaryPw); + appendStringInfoString(&primaryConnStr, " port="); + appendStringInfoString(&primaryConnStr, primaryPort); + appendStringInfoString(&primaryConnStr, " dbname=postgres"); + + //conn local + sprintf(localPort, "%d", PostPortNumber); + appendStringInfoString(&localConnStr, "host=127.0.0.1 port="); + appendStringInfoString(&localConnStr, localPort); + appendStringInfoString(&localConnStr, " user=postgres dbname=postgres"); + PGconn *localConn = PQconnectdb(localConnStr.data); + if (PQstatus(localConn) != CONNECTION_OK) + { + PQfinish(localConn); + //log + return; + } + + appendStringInfoString(&cmd, "SELECT datname, relname, crules FROM pg_hot_data WHERE crulessettime>cachetime AND clientname='"); + appendStringInfoString(&cmd, instanceName); + appendStringInfoString(&cmd, "'"); + + //Query the corresponding precache policy + PGresult *ruleRes = PQexec(localConn, cmd.data); + if (PQresultStatus(ruleRes) != PGRES_TUPLES_OK) + { + PQclear(ruleRes); + PQfinish(localConn); + //log + return; + } + int rows = PQntuples(ruleRes); + for(int i=0; i Date: Wed, 19 Oct 2022 16:37:22 +0800 Subject: [PATCH 2/4] optimizing read-only instance precache --- src/backend/access/transam/xlog.c | 2 +- src/backend/catalog/pg_hot_data.c | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 1c62fd6..a18e261 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -9626,7 +9626,7 @@ void PushCheckPointGuts(XLogRecPtr checkPointRedo, int flags) { static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags) { - if (ControlFile->state == DB_IN_PRODUCTION && !push_standby) + if ((ControlFile->state == DB_IN_PRODUCTION || ControlFile->state == DB_IN_ARCHIVE_RECOVERY) && !push_standby) { PrecacheHotData(); } diff --git a/src/backend/catalog/pg_hot_data.c b/src/backend/catalog/pg_hot_data.c index 550064e..bd4ecf0 100644 --- a/src/backend/catalog/pg_hot_data.c +++ b/src/backend/catalog/pg_hot_data.c @@ -65,8 +65,8 @@ void PrecacheHotData() } //primaryHost - temStr = strstr(PrimaryConnInfo, " host="); - temStrLen = strlen(" host="); + temStr = strstr(PrimaryConnInfo, "host="); + temStrLen = strlen("host="); if (temStr != NULL) { @@ -86,8 +86,8 @@ void PrecacheHotData() } //primaryUser - temStr = strstr(PrimaryConnInfo, " user="); - temStrLen = strlen(" user="); + temStr = strstr(PrimaryConnInfo, "user="); + temStrLen = strlen("user="); if (temStr != NULL) { @@ -107,8 +107,8 @@ void PrecacheHotData() } //primaryPw - temStr = strstr(PrimaryConnInfo, " password="); - temStrLen = strlen(" password="); + temStr = strstr(PrimaryConnInfo, "password="); + temStrLen = strlen("password="); if (temStr != NULL) { @@ -128,8 +128,8 @@ void PrecacheHotData() } //primaryPort - temStr = strstr(PrimaryConnInfo, " port="); - temStrLen = strlen(" port="); + temStr = strstr(PrimaryConnInfo, "port="); + temStrLen = strlen("port="); if (temStr != NULL) { -- Gitee From f36ef98373f568503b0bc9486877b4068bd8bbe9 Mon Sep 17 00:00:00 2001 From: peitingwei Date: Wed, 19 Oct 2022 19:03:44 +0800 Subject: [PATCH 3/4] optimizing the compilation of precache --- src/backend/catalog/Makefile | 1 + 1 file changed, 1 insertion(+) diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile index dac70b8..56a8815 100644 --- a/src/backend/catalog/Makefile +++ b/src/backend/catalog/Makefile @@ -12,6 +12,7 @@ subdir = src/backend/catalog top_builddir = ../../.. include $(top_builddir)/src/Makefile.global +override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS) OBJS = \ aclchk.o \ -- Gitee From 2c642395ef20a548d167e8348399e586157d5f12 Mon Sep 17 00:00:00 2001 From: peitingwei Date: Fri, 21 Oct 2022 11:26:40 +0800 Subject: [PATCH 4/4] Optimizing the precache framework --- src/backend/executor/execMain.c | 236 +++----------------- src/backend/tcop/postgres.c | 384 +------------------------------- src/backend/tcop/pquery.c | 301 ------------------------- src/include/executor/executor.h | 4 - src/include/storage/bufmgr.h | 2 + 5 files changed, 33 insertions(+), 894 deletions(-) diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 96b49e9..ab85119 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -89,14 +89,6 @@ static void ExecutePlan(EState *estate, PlanState *planstate, ScanDirection direction, DestReceiver *dest, bool execute_once); -static void PrecacheExecutePlan(EState *estate, PlanState *planstate, - bool use_parallel_mode, - CmdType operation, - bool sendTuples, - uint64 numberTuples, - ScanDirection direction, - DestReceiver *dest, - bool execute_once); static bool ExecCheckRTEPerms(RangeTblEntry *rte); static bool ExecCheckRTEPermsModified(Oid relOid, Oid userid, Bitmapset *modifiedCols, @@ -313,21 +305,6 @@ ExecutorRun(QueryDesc *queryDesc, standard_ExecutorRun(queryDesc, direction, count, execute_once); } -/* ---------------------------------------------------------------- - * PrecacheExecutorRun - * - * for Hot data precache scenario. - * - * ---------------------------------------------------------------- - */ -void -PrecacheExecutorRun(QueryDesc *queryDesc, - ScanDirection direction, uint64 count, - bool execute_once) -{ - precache_Standard_ExecutorRun(queryDesc, direction, count, execute_once); -} - void standard_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once) @@ -404,82 +381,6 @@ standard_ExecutorRun(QueryDesc *queryDesc, MemoryContextSwitchTo(oldcontext); } -void -precache_Standard_ExecutorRun(QueryDesc *queryDesc, - ScanDirection direction, uint64 count, bool execute_once) -{ - EState *estate; - CmdType operation; - DestReceiver *dest; - bool sendTuples; - MemoryContext oldcontext; - - /* sanity checks */ - Assert(queryDesc != NULL); - - estate = queryDesc->estate; - - Assert(estate != NULL); - Assert(!(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY)); - - /* - * Switch into per-query memory context - */ - oldcontext = MemoryContextSwitchTo(estate->es_query_cxt); - - /* Allow instrumentation of Executor overall runtime */ - if (queryDesc->totaltime) - InstrStartNode(queryDesc->totaltime); - - /* - * extract information from the query descriptor and the query feature. - */ - operation = queryDesc->operation; - dest = queryDesc->dest; - - /* - * startup tuple receiver, if we will be emitting tuples - */ - estate->es_processed = 0; - - sendTuples = (operation == CMD_SELECT || - queryDesc->plannedstmt->hasReturning); - - if (sendTuples) - dest->rStartup(dest, operation, queryDesc->tupDesc); - - /* - * run plan - */ - if (!ScanDirectionIsNoMovement(direction)) - { - if (execute_once && queryDesc->already_executed) - elog(ERROR, "can't re-execute query flagged for single execution"); - queryDesc->already_executed = true; - - PrecacheExecutePlan(estate, - queryDesc->planstate, - queryDesc->plannedstmt->parallelModeNeeded, - operation, - sendTuples, - count, - direction, - dest, - execute_once); - } - - /* - * shutdown tuple receiver, if we started it - */ - if (sendTuples) - dest->rShutdown(dest); - - if (queryDesc->totaltime) - InstrStopNode(queryDesc->totaltime, estate->es_processed); - - MemoryContextSwitchTo(oldcontext); -} - /* ---------------------------------------------------------------- * ExecutorFinish * @@ -1656,122 +1557,35 @@ ExecutePlan(EState *estate, if (TupIsNull(slot)) break; - /* - * If we have a junk filter, then project a new tuple with the junk - * removed. - * - * Store this new "clean" tuple in the junkfilter's resultSlot. - * (Formerly, we stored it back over the "dirty" tuple, which is WRONG - * because that tuple slot has the wrong descriptor.) - */ - if (estate->es_junkFilter != NULL) - slot = ExecFilterJunk(estate->es_junkFilter, slot); - - /* - * If we are supposed to send the tuple somewhere, do so. (In - * practice, this is probably always the case at this point.) - */ - if (sendTuples) + if (!isPreCache) { /* - * If we are not able to send the tuple, we assume the destination - * has closed and no more tuples can be sent. If that's the case, - * end the loop. - */ - if (!dest->receiveSlot(slot, dest)) - break; - } - - /* - * Count tuples processed, if this is a SELECT. (For other operation - * types, the ModifyTable plan node must count the appropriate - * events.) - */ - if (operation == CMD_SELECT) - (estate->es_processed)++; - - /* - * check our tuple count.. if we've processed the proper number then - * quit, else loop again and process more tuples. Zero numberTuples - * means no limit. - */ - current_tuple_count++; - if (numberTuples && numberTuples == current_tuple_count) - break; - } - - /* - * If we know we won't need to back up, we can release resources at this - * point. - */ - if (!(estate->es_top_eflags & EXEC_FLAG_BACKWARD)) - (void) ExecShutdownNode(planstate); - - if (use_parallel_mode) - ExitParallelMode(); -} - -/* ---------------------------------------------------------------- - * PrecacheExecutePlan - * - * for hot data precache scenario. - * ---------------------------------------------------------------- - */ -static void -PrecacheExecutePlan(EState *estate, - PlanState *planstate, - bool use_parallel_mode, - CmdType operation, - bool sendTuples, - uint64 numberTuples, - ScanDirection direction, - DestReceiver *dest, - bool execute_once) -{ - TupleTableSlot *slot; - uint64 current_tuple_count; + * If we have a junk filter, then project a new tuple with the junk + * removed. + * + * Store this new "clean" tuple in the junkfilter's resultSlot. + * (Formerly, we stored it back over the "dirty" tuple, which is WRONG + * because that tuple slot has the wrong descriptor.) + */ + if (estate->es_junkFilter != NULL) + slot = ExecFilterJunk(estate->es_junkFilter, slot); - /* - * initialize local variables - */ - current_tuple_count = 0; - - /* - * Set the direction. - */ - estate->es_direction = direction; - - /* - * If the plan might potentially be executed multiple times, we must force - * it to run without parallelism, because we might exit early. - */ - if (!execute_once) - use_parallel_mode = false; - - estate->es_use_parallel_mode = use_parallel_mode; - if (use_parallel_mode) - EnterParallelMode(); - - /* - * Loop until we've processed the proper number of tuples from the plan. - */ - for (;;) - { - /* Reset the per-output-tuple exprcontext */ - ResetPerTupleExprContext(estate); - - /* - * Execute the plan and obtain a tuple - */ - slot = ExecProcNode(planstate); + /* + * If we are supposed to send the tuple somewhere, do so. (In + * practice, this is probably always the case at this point.) + */ + if (sendTuples) + { + /* + * If we are not able to send the tuple, we assume the destination + * has closed and no more tuples can be sent. If that's the case, + * end the loop. + */ + if (!dest->receiveSlot(slot, dest)) + break; + } + } - /* - * if the tuple is null, then we assume there is nothing more to - * process so we just end the loop... - */ - if (TupIsNull(slot)) - break; - /* * Count tuples processed, if this is a SELECT. (For other operation * types, the ModifyTable plan node must count the appropriate diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index a3da4a9..71ce973 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -86,6 +86,7 @@ * global variables * ---------------- */ +bool isPreCache = false; const char *debug_query_string; /* client-supplied query string */ /* Note: whereToSendOutput is initialized for the bootstrap/standalone case */ @@ -1318,374 +1319,6 @@ exec_simple_query(const char *query_string) debug_query_string = NULL; } -/* - * precache_exec_simple_query - * - * for Hot data precache scenario. - */ -static void -precache_exec_simple_query(const char *query_string) -{ - CommandDest dest = whereToSendOutput; - MemoryContext oldcontext; - List *parsetree_list; - ListCell *parsetree_item; - bool save_log_statement_stats = log_statement_stats; - bool was_logged = false; - bool use_implicit_block; - char msec_str[32]; - - /* - * Report query to various monitoring facilities. - */ - debug_query_string = query_string; - - pgstat_report_activity(STATE_RUNNING, query_string); - - TRACE_POSTGRESQL_QUERY_START(query_string); - - /* - * We use save_log_statement_stats so ShowUsage doesn't report incorrect - * results because ResetUsage wasn't called. - */ - if (save_log_statement_stats) - ResetUsage(); - - /* - * Start up a transaction command. All queries generated by the - * query_string will be in this same command block, *unless* we find a - * BEGIN/COMMIT/ABORT statement; we have to force a new xact command after - * one of those, else bad things will happen in xact.c. (Note that this - * will normally change current memory context.) - */ - start_xact_command(); - - /* - * Zap any pre-existing unnamed statement. (While not strictly necessary, - * it seems best to define simple-Query mode as if it used the unnamed - * statement and portal; this ensures we recover any storage used by prior - * unnamed operations.) - */ - drop_unnamed_stmt(); - - /* - * Switch to appropriate context for constructing parsetrees. - */ - oldcontext = MemoryContextSwitchTo(MessageContext); - - /* - * Do basic parsing of the query or queries (this should be safe even if - * we are in aborted transaction state!) - */ - parsetree_list = pg_parse_query(query_string); - - /* Log immediately if dictated by log_statement */ - if (check_log_statement(parsetree_list)) - { - ereport(LOG, - (errmsg("statement: %s", query_string), - errhidestmt(true), - errdetail_execute(parsetree_list))); - was_logged = true; - } - - /* - * Switch back to transaction context to enter the loop. - */ - MemoryContextSwitchTo(oldcontext); - - /* - * For historical reasons, if multiple SQL statements are given in a - * single "simple Query" message, we execute them as a single transaction, - * unless explicit transaction control commands are included to make - * portions of the list be separate transactions. To represent this - * behavior properly in the transaction machinery, we use an "implicit" - * transaction block. - */ - use_implicit_block = (list_length(parsetree_list) > 1); - - /* - * Run through the raw parsetree(s) and process each one. - */ - foreach(parsetree_item, parsetree_list) - { - RawStmt *parsetree = lfirst_node(RawStmt, parsetree_item); - bool snapshot_set = false; - CommandTag commandTag; - QueryCompletion qc; - MemoryContext per_parsetree_context = NULL; - List *querytree_list, - *plantree_list; - Portal portal; - DestReceiver *receiver; - int16 format; - - pgstat_report_query_id(0, true); - - /* - * Get the command name for use in status display (it also becomes the - * default completion tag, down inside PortalRun). Set ps_status and - * do any special start-of-SQL-command processing needed by the - * destination. - */ - commandTag = CreateCommandTag(parsetree->stmt); - - set_ps_display(GetCommandTagName(commandTag)); - - BeginCommand(commandTag, dest); - - /* - * If we are in an aborted transaction, reject all commands except - * COMMIT/ABORT. It is important that this test occur before we try - * to do parse analysis, rewrite, or planning, since all those phases - * try to do database accesses, which may fail in abort state. (It - * might be safe to allow some additional utility commands in this - * state, but not many...) - */ - if (IsAbortedTransactionBlockState() && - !IsTransactionExitStmt(parsetree->stmt)) - ereport(ERROR, - (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION), - errmsg("current transaction is aborted, " - "commands ignored until end of transaction block"), - errdetail_abort())); - - /* Make sure we are in a transaction command */ - start_xact_command(); - - /* - * If using an implicit transaction block, and we're not already in a - * transaction block, start an implicit block to force this statement - * to be grouped together with any following ones. (We must do this - * each time through the loop; otherwise, a COMMIT/ROLLBACK in the - * list would cause later statements to not be grouped.) - */ - if (use_implicit_block) - BeginImplicitTransactionBlock(); - - /* If we got a cancel signal in parsing or prior command, quit */ - CHECK_FOR_INTERRUPTS(); - - /* - * Set up a snapshot if parse analysis/planning will need one. - */ - if (analyze_requires_snapshot(parsetree)) - { - PushActiveSnapshot(GetTransactionSnapshot()); - snapshot_set = true; - } - - /* - * OK to analyze, rewrite, and plan this query. - * - * Switch to appropriate context for constructing query and plan trees - * (these can't be in the transaction context, as that will get reset - * when the command is COMMIT/ROLLBACK). If we have multiple - * parsetrees, we use a separate context for each one, so that we can - * free that memory before moving on to the next one. But for the - * last (or only) parsetree, just use MessageContext, which will be - * reset shortly after completion anyway. In event of an error, the - * per_parsetree_context will be deleted when MessageContext is reset. - */ - if (lnext(parsetree_list, parsetree_item) != NULL) - { - per_parsetree_context = - AllocSetContextCreate(MessageContext, - "per-parsetree message context", - ALLOCSET_DEFAULT_SIZES); - oldcontext = MemoryContextSwitchTo(per_parsetree_context); - } - else - oldcontext = MemoryContextSwitchTo(MessageContext); - - querytree_list = pg_analyze_and_rewrite(parsetree, query_string, - NULL, 0, NULL); - - plantree_list = pg_plan_queries(querytree_list, query_string, - CURSOR_OPT_PARALLEL_OK, NULL); - - /* - * Done with the snapshot used for parsing/planning. - * - * While it looks promising to reuse the same snapshot for query - * execution (at least for simple protocol), unfortunately it causes - * execution to use a snapshot that has been acquired before locking - * any of the tables mentioned in the query. This creates user- - * visible anomalies, so refrain. Refer to - * https://postgr.es/m/flat/5075D8DF.6050500@fuzzy.cz for details. - */ - if (snapshot_set) - PopActiveSnapshot(); - - /* If we got a cancel signal in analysis or planning, quit */ - CHECK_FOR_INTERRUPTS(); - - /* - * Create unnamed portal to run the query or queries in. If there - * already is one, silently drop it. - */ - portal = CreatePortal("", true, true); - /* Don't display the portal in pg_cursors */ - portal->visible = false; - - /* - * We don't have to copy anything into the portal, because everything - * we are passing here is in MessageContext or the - * per_parsetree_context, and so will outlive the portal anyway. - */ - PortalDefineQuery(portal, - NULL, - query_string, - commandTag, - plantree_list, - NULL); - - /* - * Start the portal. No parameters here. - */ - PortalStart(portal, NULL, 0, InvalidSnapshot); - - /* - * Select the appropriate output format: text unless we are doing a - * FETCH from a binary cursor. (Pretty grotty to have to do this here - * --- but it avoids grottiness in other places. Ah, the joys of - * backward compatibility...) - */ - format = 0; /* TEXT is default */ - if (IsA(parsetree->stmt, FetchStmt)) - { - FetchStmt *stmt = (FetchStmt *) parsetree->stmt; - - if (!stmt->ismove) - { - Portal fportal = GetPortalByName(stmt->portalname); - - if (PortalIsValid(fportal) && - (fportal->cursorOptions & CURSOR_OPT_BINARY)) - format = 1; /* BINARY */ - } - } - PortalSetResultFormat(portal, 1, &format); - - /* - * Now we can create the destination receiver object. - */ - receiver = CreateDestReceiver(dest); - if (dest == DestRemote) - SetRemoteDestReceiverParams(receiver, portal); - - /* - * Switch back to transaction context for execution. - */ - MemoryContextSwitchTo(oldcontext); - - /* - * Run the portal to completion, and then drop it (and the receiver). - */ - (void) PrecachePortalRun(portal, - FETCH_ALL, - true, /* always top level */ - true, - receiver, - receiver, - &qc); - - receiver->rDestroy(receiver); - - PortalDrop(portal, false); - - if (lnext(parsetree_list, parsetree_item) == NULL) - { - /* - * If this is the last parsetree of the query string, close down - * transaction statement before reporting command-complete. This - * is so that any end-of-transaction errors are reported before - * the command-complete message is issued, to avoid confusing - * clients who will expect either a command-complete message or an - * error, not one and then the other. Also, if we're using an - * implicit transaction block, we must close that out first. - */ - if (use_implicit_block) - EndImplicitTransactionBlock(); - finish_xact_command(); - } - else if (IsA(parsetree->stmt, TransactionStmt)) - { - /* - * If this was a transaction control statement, commit it. We will - * start a new xact command for the next command. - */ - finish_xact_command(); - } - else - { - /* - * We need a CommandCounterIncrement after every query, except - * those that start or end a transaction block. - */ - CommandCounterIncrement(); - - /* - * Disable statement timeout between queries of a multi-query - * string, so that the timeout applies separately to each query. - * (Our next loop iteration will start a fresh timeout.) - */ - disable_statement_timeout(); - } - - /* - * Tell client that we're done with this query. Note we emit exactly - * one EndCommand report for each raw parsetree, thus one for each SQL - * command the client sent, regardless of rewriting. (But a command - * aborted by error will not send an EndCommand report at all.) - */ - EndCommand(&qc, dest, false); - - /* Now we may drop the per-parsetree context, if one was created. */ - if (per_parsetree_context) - MemoryContextDelete(per_parsetree_context); - } /* end loop over parsetrees */ - - /* - * Close down transaction statement, if one is open. (This will only do - * something if the parsetree list was empty; otherwise the last loop - * iteration already did it.) - */ - finish_xact_command(); - - /* - * If there were no parsetrees, return EmptyQueryResponse message. - */ - if (!parsetree_list) - NullCommand(dest); - - /* - * Emit duration logging if appropriate. - */ - switch (check_log_duration(msec_str, was_logged)) - { - case 1: - ereport(LOG, - (errmsg("duration: %s ms", msec_str), - errhidestmt(true))); - break; - case 2: - ereport(LOG, - (errmsg("duration: %s ms statement: %s", - msec_str, query_string), - errhidestmt(true), - errdetail_execute(parsetree_list))); - break; - } - - if (save_log_statement_stats) - ShowUsage("QUERY STATISTICS"); - - TRACE_POSTGRESQL_QUERY_DONE(query_string); - - debug_query_string = NULL; -} - /* * exec_parse_message * @@ -4861,20 +4494,15 @@ PostgresMain(int argc, char *argv[], bool PrivateConn, } else { - if (strstr(query_string, "precache ") == NULL) + if (strstr(query_string, "precache ") != NULL && query_string - strstr(query_string, "precache ") == 0) { - exec_simple_query(query_string); + isPreCache = true; + exec_simple_query(query_string + strlen("precache ")); + isPreCache = false; } else { - if (query_string - strstr(query_string, "precache ") == 0) - { - precache_exec_simple_query(query_string + strlen("precache ")); - } - else - { - exec_simple_query(query_string); - } + exec_simple_query(query_string); } } diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index cd4713f..61e1892 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -46,8 +46,6 @@ static uint64 RunFromStore(Portal portal, ScanDirection direction, uint64 count, DestReceiver *dest); static uint64 PortalRunSelect(Portal portal, bool forward, long count, DestReceiver *dest); -static uint64 PrecachePortalRunSelect(Portal portal, bool forward, long count, - DestReceiver *dest); static void PortalRunUtility(Portal portal, PlannedStmt *pstmt, bool isTopLevel, bool setHoldSnapshot, DestReceiver *dest, QueryCompletion *qc); @@ -844,175 +842,6 @@ PortalRun(Portal portal, long count, bool isTopLevel, bool run_once, return result; } -/* - * PrecachePortalRun - * for Hot data precache scenario. - */ -bool -PrecachePortalRun(Portal portal, long count, bool isTopLevel, bool run_once, - DestReceiver *dest, DestReceiver *altdest, - QueryCompletion *qc) -{ - bool result; - uint64 nprocessed; - ResourceOwner saveTopTransactionResourceOwner; - MemoryContext saveTopTransactionContext; - Portal saveActivePortal; - ResourceOwner saveResourceOwner; - MemoryContext savePortalContext; - MemoryContext saveMemoryContext; - - AssertArg(PortalIsValid(portal)); - - TRACE_POSTGRESQL_QUERY_EXECUTE_START(); - - /* Initialize empty completion data */ - if (qc) - InitializeQueryCompletion(qc); - - if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY) - { - elog(DEBUG3, "PortalRun"); - /* PORTAL_MULTI_QUERY logs its own stats per query */ - ResetUsage(); - } - - /* - * Check for improper portal use, and mark portal active. - */ - MarkPortalActive(portal); - - /* Set run_once flag. Shouldn't be clear if previously set. */ - Assert(!portal->run_once || run_once); - portal->run_once = run_once; - - /* - * Set up global portal context pointers. - * - * We have to play a special game here to support utility commands like - * VACUUM and CLUSTER, which internally start and commit transactions. - * When we are called to execute such a command, CurrentResourceOwner will - * be pointing to the TopTransactionResourceOwner --- which will be - * destroyed and replaced in the course of the internal commit and - * restart. So we need to be prepared to restore it as pointing to the - * exit-time TopTransactionResourceOwner. (Ain't that ugly? This idea of - * internally starting whole new transactions is not good.) - * CurrentMemoryContext has a similar problem, but the other pointers we - * save here will be NULL or pointing to longer-lived objects. - */ - saveTopTransactionResourceOwner = TopTransactionResourceOwner; - saveTopTransactionContext = TopTransactionContext; - saveActivePortal = ActivePortal; - saveResourceOwner = CurrentResourceOwner; - savePortalContext = PortalContext; - saveMemoryContext = CurrentMemoryContext; - PG_TRY(); - { - ActivePortal = portal; - if (portal->resowner) - CurrentResourceOwner = portal->resowner; - PortalContext = portal->portalContext; - - MemoryContextSwitchTo(PortalContext); - - switch (portal->strategy) - { - case PORTAL_ONE_SELECT: - case PORTAL_ONE_RETURNING: - case PORTAL_ONE_MOD_WITH: - case PORTAL_UTIL_SELECT: - - /* - * If we have not yet run the command, do so, storing its - * results in the portal's tuplestore. But we don't do that - * for the PORTAL_ONE_SELECT case. - */ - if (portal->strategy != PORTAL_ONE_SELECT && !portal->holdStore) - FillPortalStore(portal, isTopLevel); - - /* - * Now fetch desired portion of results. - */ - nprocessed = PrecachePortalRunSelect(portal, true, count, dest); - - /* - * If the portal result contains a command tag and the caller - * gave us a pointer to store it, copy it and update the - * rowcount. - */ - if (qc && portal->qc.commandTag != CMDTAG_UNKNOWN) - { - CopyQueryCompletion(qc, &portal->qc); - qc->nprocessed = nprocessed; - } - - /* Mark portal not active */ - portal->status = PORTAL_READY; - - /* - * Since it's a forward fetch, say DONE iff atEnd is now true. - */ - result = portal->atEnd; - break; - - case PORTAL_MULTI_QUERY: - PortalRunMulti(portal, isTopLevel, false, - dest, altdest, qc); - - /* Prevent portal's commands from being re-executed */ - MarkPortalDone(portal); - - /* Always complete at end of RunMulti */ - result = true; - break; - - default: - elog(ERROR, "unrecognized portal strategy: %d", - (int) portal->strategy); - result = false; /* keep compiler quiet */ - break; - } - } - PG_CATCH(); - { - /* Uncaught error while executing portal: mark it dead */ - MarkPortalFailed(portal); - - /* Restore global vars and propagate error */ - if (saveMemoryContext == saveTopTransactionContext) - MemoryContextSwitchTo(TopTransactionContext); - else - MemoryContextSwitchTo(saveMemoryContext); - ActivePortal = saveActivePortal; - if (saveResourceOwner == saveTopTransactionResourceOwner) - CurrentResourceOwner = TopTransactionResourceOwner; - else - CurrentResourceOwner = saveResourceOwner; - PortalContext = savePortalContext; - - PG_RE_THROW(); - } - PG_END_TRY(); - - if (saveMemoryContext == saveTopTransactionContext) - MemoryContextSwitchTo(TopTransactionContext); - else - MemoryContextSwitchTo(saveMemoryContext); - ActivePortal = saveActivePortal; - if (saveResourceOwner == saveTopTransactionResourceOwner) - CurrentResourceOwner = TopTransactionResourceOwner; - else - CurrentResourceOwner = saveResourceOwner; - PortalContext = savePortalContext; - - if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY) - ShowUsage("EXECUTOR STATISTICS"); - - TRACE_POSTGRESQL_QUERY_EXECUTE_DONE(); - - return result; -} - /* * PortalRunSelect * Execute a portal's query in PORTAL_ONE_SELECT mode, and also @@ -1157,136 +986,6 @@ PortalRunSelect(Portal portal, return nprocessed; } -/* - * PrecachePortalRunSelect - * for Hot data precache scenario. - */ -static uint64 -PrecachePortalRunSelect(Portal portal, - bool forward, - long count, - DestReceiver *dest) -{ - QueryDesc *queryDesc; - ScanDirection direction; - uint64 nprocessed; - - /* - * NB: queryDesc will be NULL if we are fetching from a held cursor or a - * completed utility query; can't use it in that path. - */ - queryDesc = portal->queryDesc; - - /* Caller messed up if we have neither a ready query nor held data. */ - Assert(queryDesc || portal->holdStore); - - /* - * Force the queryDesc destination to the right thing. This supports - * MOVE, for example, which will pass in dest = DestNone. This is okay to - * change as long as we do it on every fetch. (The Executor must not - * assume that dest never changes.) - */ - if (queryDesc) - queryDesc->dest = dest; - - /* - * Determine which direction to go in, and check to see if we're already - * at the end of the available tuples in that direction. If so, set the - * direction to NoMovement to avoid trying to fetch any tuples. (This - * check exists because not all plan node types are robust about being - * called again if they've already returned NULL once.) Then call the - * executor (we must not skip this, because the destination needs to see a - * setup and shutdown even if no tuples are available). Finally, update - * the portal position state depending on the number of tuples that were - * retrieved. - */ - if (forward) - { - if (portal->atEnd || count <= 0) - { - direction = NoMovementScanDirection; - count = 0; /* don't pass negative count to executor */ - } - else - direction = ForwardScanDirection; - - /* In the executor, zero count processes all rows */ - if (count == FETCH_ALL) - count = 0; - - if (portal->holdStore) - nprocessed = RunFromStore(portal, direction, (uint64) count, dest); - else - { - PushActiveSnapshot(queryDesc->snapshot); - PrecacheExecutorRun(queryDesc, direction, (uint64) count, - portal->run_once); - nprocessed = queryDesc->estate->es_processed; - PopActiveSnapshot(); - } - - if (!ScanDirectionIsNoMovement(direction)) - { - if (nprocessed > 0) - portal->atStart = false; /* OK to go backward now */ - if (count == 0 || nprocessed < (uint64) count) - portal->atEnd = true; /* we retrieved 'em all */ - portal->portalPos += nprocessed; - } - } - else - { - if (portal->cursorOptions & CURSOR_OPT_NO_SCROLL) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("cursor can only scan forward"), - errhint("Declare it with SCROLL option to enable backward scan."))); - - if (portal->atStart || count <= 0) - { - direction = NoMovementScanDirection; - count = 0; /* don't pass negative count to executor */ - } - else - direction = BackwardScanDirection; - - /* In the executor, zero count processes all rows */ - if (count == FETCH_ALL) - count = 0; - - if (portal->holdStore) - nprocessed = RunFromStore(portal, direction, (uint64) count, dest); - else - { - PushActiveSnapshot(queryDesc->snapshot); - PrecacheExecutorRun(queryDesc, direction, (uint64) count, - portal->run_once); - nprocessed = queryDesc->estate->es_processed; - PopActiveSnapshot(); - } - - if (!ScanDirectionIsNoMovement(direction)) - { - if (nprocessed > 0 && portal->atEnd) - { - portal->atEnd = false; /* OK to go forward now */ - portal->portalPos++; /* adjust for endpoint case */ - } - if (count == 0 || nprocessed < (uint64) count) - { - portal->atStart = true; /* we retrieved 'em all */ - portal->portalPos = 0; - } - else - { - portal->portalPos -= nprocessed; - } - } - } - - return nprocessed; -} - /* * FillPortalStore * Run the query and load result tuples into the portal's tuple store. diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 9156d70..3dc03c9 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -189,12 +189,8 @@ extern void ExecutorStart(QueryDesc *queryDesc, int eflags); extern void standard_ExecutorStart(QueryDesc *queryDesc, int eflags); extern void ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once); -extern void PrecacheExecutorRun(QueryDesc *queryDesc, - ScanDirection direction, uint64 count, bool execute_once); extern void standard_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once); -extern void precache_Standard_ExecutorRun(QueryDesc *queryDesc, - ScanDirection direction, uint64 count, bool execute_once); extern void ExecutorFinish(QueryDesc *queryDesc); extern void standard_ExecutorFinish(QueryDesc *queryDesc); extern void ExecutorEnd(QueryDesc *queryDesc); diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index f048293..072654d 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -79,6 +79,8 @@ extern int bgwriter_flush_after; extern bool bulk_io_is_in_progress; extern int bulk_io_in_progress_count; +extern bool isPreCache; + /* in buf_init.c */ extern PGDLLIMPORT char *BufferBlocks; -- Gitee