Flow
•
session이 identity map을 관리하는 로직만 살펴보자
API Handler
Result.one 호출
@app.get("/{name}")
def get_user(name: str, db: Session = Depends(get_db)):
# some calls would cache the objects
pre_stmt = select(User).where(User.name == 'yang')
pre_db_user = db.scalars(pre_stmt).one()
# some business logic
stmt = select(User).where(User.name == name)
db_user = db.scalars(stmt).one()
return db_user
Python
복사
Session.scalars
# sqlalchemy/orm/session.py
class Session(...):
...
def scalars(self, statement: Executable, ...) -> ScalarResult[Any]:
return self._execute_internal(
statetment,
...
).scalars()
Python
복사
# sqlalchemy/orm/session.py
class Session(...):
...
def _execute_internal(self, statement, ...):
...
if (
statement._propagate_attrs.get("compile_state_plugin", None)
== "orm"
):
compile_state_cls = CompileState._get_plugin_class_for_plugin(
statement, "orm"
)
...
if compile_state_cls is not None:
# now run orm_pre_session_exec() "for real". if there were
# event hooks, this will re-run the steps that interpret
# new execution_options into load_options / update_delete_options,
# which we assume the event hook might have updated.
# autoflush will also be invoked in this step if enabled.
(
statement,
execution_options,
) = compile_state_cls.orm_pre_session_exec(
self,
statement,
params,
execution_options,
bind_arguments,
False,
)
...
Python
복사
# sqlalchemy/orm/context.py
class ORMCompileState(AbstractORMCompileState):
...
@classmethod
def orm_pre_session_exec(cls, session, statement, ...):
# consume result-level load_options. These may have been set up
# in an ORMExecuteState hook
(
load_options,
execution_options,
) = QueryContext.default_load_options.from_execution_options(
"_sa_orm_load_options",
{
"populate_existing",
"autoflush",
"yield_per",
"identity_token",
"sa_top_level_orm_context",
},
execution_options,
statement._execution_options,
)
...
return statement, execution_options
Python
복사
# sqlalchemy/orm/session.py
class Session(...):
...
def _execute_internal(self, statement, ...):
...
if compile_state_cls is not None:
(
statement,
execution_options,
) = compile_state_cls.orm_pre_session_exec(
self,
statement,
params,
execution_options,
bind_arguments,
False,
)
...
# important !! later on
bind = self.get_bind(**bind_arguments) # Engine(postgresql://postgres:***@127.0.0.1/postgres)
conn = self._connection_for_bind(bind)
...
if compile_state_cls:
result: Result[Any] = compile_state_cls.orm_execute_statement(
self,
statement,
params or {},
execution_options,
bind_arguments,
conn,
)
...
return result
Python
복사
# sqlalchemy/orm/context.py
class AbstractORMCompileState(CompileState):
...
def orm_execute_statement(
cls,
session,
statement,
params,
execution_options,
bind_arguments,
conn,
) -> Result:
result = conn.execute(
statement, params or {}, execution_options=execution_options
)
return cls.orm_setup_cursor_result(
session,
statement,
params,
execution_options,
bind_arguments,
result,
)
Python
복사
# sqlalchemy/orm/context.py
class ORMCompileState(AbstractORMCompileState):
...
@classmethod
def orm_setup_cursor_result(
cls,
session,
statement,
params,
execution_options,
bind_arguments,
result,
):
execution_context = result.context
compile_state = execution_context.compiled.compile_state
# cover edge case where ORM entities used in legacy select
# were passed to session.execute:
# session.execute(legacy_select([User.id, User.name]))
# see test_query->test_legacy_tuple_old_select
load_options = execution_options.get(
"_sa_orm_load_options", QueryContext.default_load_options
)
...
querycontext = QueryContext(
compile_state,
statement,
params,
session,
load_options,
execution_options,
bind_arguments,
)
return loading.instances(result, querycontext)
Python
복사
# sqlalchemy/orm/loading.py
def instances(cursor: CursorResult[Any], context: QueryContext) -> Result[Any]:
"""Return a :class:`.Result` given an ORM query context.
:param cursor: a :class:`.CursorResult`, generated by a statement
which came from :class:`.ORMCompileState`
:param context: a :class:`.QueryContext` object
:return: a :class:`.Result` object representing ORM results
.. versionchanged:: 1.4 The instances() function now uses
:class:`.Result` objects and has an all new interface.
"""
context.runid = _new_runid()
if context.top_level_context:
is_top_level = False
context.post_load_paths = context.top_level_context.post_load_paths
else:
is_top_level = True
context.post_load_paths = {}
compile_state = context.compile_state
filtered = compile_state._has_mapper_entities
single_entity = (
not context.load_options._only_return_tuples
and len(compile_state._entities) == 1
and compile_state._entities[0].supports_single_entity
)
try:
(process, labels, extra) = list(
zip(
*[
query_entity.row_processor(context, cursor)
for query_entity in context.compile_state._entities
]
)
)
...
...
Python
복사
# sqlalchemy/orm/context.py
class _MapperEntity(_QueryEntity):
...
def row_processor(self, context, result):
compile_state = context.compile_state
adapter = self._get_entity_clauses(compile_state)
...
_instance = loading._instance_processor(
self,
self.mapper,
context,
result,
self.path,
adapter,
...
)
return _instance, self._label_name, self._extra_entities
Python
복사
# sqlalchemy/orm/loading.py
def _instance_processor(
query_entity,
mapper,
context,
result,
path,
adapter,
...
):
"""Produce a mapper level row processor callable
which processes rows into mapped instances."""
# note that this method, most of which exists in a closure
# called _instance(), resists being broken out, as
# attempts to do so tend to add significant function
# call overhead. _instance() is the most
# performance-critical section in the whole ORM.
identity_class = mapper._identity_class
compile_state = context.compile_state
# look for "row getter" functions that have been assigned along
# with the compile state that were cached from a previous load.
# these are operator.itemgetter() objects that each will extract a
# particular column from each row.
getter_key = ("getters", mapper) # ('getters', Mapper[User(users)])
getters = path.get(compile_state.attributes, getter_key, None) # None at the first application run, second -> {'cached_populators': {'new': [], 'quick': [('id', operator.itemgetter(0)), ('name', operator.itemgetter(1)), ('email', operator.itemgetter(2))], 'deferred': [], 'expire': [], 'existing': [], 'eager': []}, 'todo': [], 'primary_key_getter': operator.itemgetter(slice(0, 1, None))}
if getters is None:
# no getters, so go through a list of attributes we are loading for,
# and the ones that are column based will have already put information
# for us in another collection "memoized_setups", which represents the
# output of the LoaderStrategy.setup_query() method. We can just as
# easily call LoaderStrategy.create_row_processor for each, but by
# getting it all at once from setup_query we save another method call
# per attribute.
props = mapper._prop_set # frozenset({User.id, User.name, User.email})
if only_load_props is not None:
props = props.intersection(
mapper._props[k] for k in only_load_props
)
quick_populators = path.get(
context.attributes, "memoized_setups", EMPTY_DICT
)
todo = []
cached_populators = {
"new": [],
"quick": [],
"deferred": [],
"expire": [],
"existing": [],
"eager": [],
}
if refresh_state is None:
# we can also get the "primary key" tuple getter function
pk_cols = mapper.primary_key
if adapter:
pk_cols = [adapter.columns[c] for c in pk_cols]
primary_key_getter = result._tuple_getter(pk_cols)
else:
primary_key_getter = None
getters = {
"cached_populators": cached_populators,
"todo": todo,
"primary_key_getter": primary_key_getter,
}
for prop in props: # prop: User.id, User.name, User.email
if prop in quick_populators:
# this is an inlined path just for column-based attributes.
col = quick_populators[prop] # users.id
if col is _DEFER_FOR_STATE:
cached_populators["new"].append(
(prop.key, prop._deferred_column_loader)
)
elif col is _SET_DEFERRED_EXPIRED:
# note that in this path, we are no longer
# searching in the result to see if the column might
# be present in some unexpected way.
cached_populators["expire"].append((prop.key, False))
elif col is _RAISE_FOR_STATE:
cached_populators["new"].append(
(prop.key, prop._raise_column_loader)
)
else:
getter = None
if adapter: # see here
# this logic had been removed for all 1.4 releases
# up until 1.4.18; the adapter here is particularly
# the compound eager adapter which isn't accommodated
# in the quick_populators right now. The "fallback"
# logic below instead took over in many more cases
# until issue #6596 was identified.
# note there is still an issue where this codepath
# produces no "getter" for cases where a joined-inh
# mapping includes a labeled column property, meaning
# KeyError is caught internally and we fall back to
# _getter(col), which works anyway. The adapter
# here for joined inh without any aliasing might not
# be useful. Tests which see this include
# test.orm.inheritance.test_basic ->
# EagerTargetingTest.test_adapt_stringency
# OptimizedLoadTest.test_column_expression_joined
# PolymorphicOnNotLocalTest.test_polymorphic_on_column_prop # noqa: E501
#
adapted_col = adapter.columns[col]
if adapted_col is not None:
getter = result._getter(adapted_col, False)
if not getter: # getter is set here
getter = result._getter(col, False)
if getter:
cached_populators["quick"].append((prop.key, getter))
else:
# fall back to the ColumnProperty itself, which
# will iterate through all of its columns
# to see if one fits
prop.create_row_processor(
context,
query_entity,
path,
mapper,
result,
adapter,
cached_populators,
)
else:
# loader strategies like subqueryload, selectinload,
# joinedload, basically relationships, these need to interact
# with the context each time to work correctly.
todo.append(prop)
path.set(compile_state.attributes, getter_key, getters)
cached_populators = getters["cached_populators"]
populators = {key: list(value) for key, value in cached_populators.items()}
for prop in getters["todo"]: # pass
prop.create_row_processor(
context, query_entity, path, mapper, result, adapter, populators
)
propagated_loader_options = context.propagated_loader_options
load_path = ( # ORM Path[Mapper[User(users)]]
context.compile_state.current_path + path
if context.compile_state.current_path.path
else path
)
# identity map here
session_identity_map = context.session.identity_map # <sqlalchemy.orm.identity.WeakInstanceDict object at 0x127ca7250>
populate_existing = context.populate_existing or mapper.always_refresh
load_evt = bool(mapper.class_manager.dispatch.load)
refresh_evt = bool(mapper.class_manager.dispatch.refresh)
persistent_evt = bool(context.session.dispatch.loaded_as_persistent)
if persistent_evt:
loaded_as_persistent = context.session.dispatch.loaded_as_persistent
instance_state = attributes.instance_state
instance_dict = attributes.instance_dict
session_id = context.session.hash_key
runid = context.runid
identity_token = context.identity_token
...
post_load = PostLoad.for_context(context, load_path, only_load_props)
if refresh_state:
refresh_identity_key = refresh_state.key
if refresh_identity_key is None:
# super-rare condition; a refresh is being called
# on a non-instance-key instance; this is meant to only
# occur within a flush()
refresh_identity_key = mapper._identity_key_from_state(
refresh_state
)
else:
refresh_identity_key = None
primary_key_getter = getters["primary_key_getter"]
...
def _instance(row):
# determine the state that we'll be populating
...
instance = session_identity_map.get(identitykey)
...
return instance
...
return _instance
Python
복사
# sqlalchemy/orm/context.py
class _MapperEntity(_QueryEntity):
...
def row_processor(self, context, result):
compile_state = context.compile_state
adapter = self._get_entity_clauses(compile_state)
...
_instance = loading._instance_processor(
self,
self.mapper,
context,
result,
self.path,
adapter,
...
)
return _instance, self._label_name, self._extra_entities
Python
복사
# sqlalchemy/orm/loading.py
def instances(cursor: CursorResult[Any], context: QueryContext) -> Result[Any]:
...
try:
# now, process: (_instance function, )
(process, labels, extra) = list(
zip(
*[
query_entity.row_processor(context, cursor)
for query_entity in context.compile_state._entities
]
)
)
if context.yield_per and (
context.loaders_require_buffering
or context.loaders_require_uniquing
):
raise sa_exc.InvalidRequestError(
"Can't use yield_per with eager loaders that require uniquing "
"or row buffering, e.g. joinedload() against collections "
"or subqueryload(). Consider the selectinload() strategy "
"for better flexibility in loading objects."
)
except Exception:
with util.safe_reraise():
cursor.close()
...
def chunks(size): # type: ignore
...
...
result = ChunkedIteratorResult(
row_metadata,
chunks,
source_supports_scalars=single_entity,
raw=cursor,
dynamic_yield_per=cursor.context._is_server_side,
)
...
return result
Python
복사
# sqlalchemy/orm/context.py
class ORMCompileState(AbstractORMCompileState):
...
@classmethod
def orm_setup_cursor_result(
cls,
session,
statement,
params,
execution_options,
bind_arguments,
result,
):
...
return loading.instances(result, querycontext)
Python
복사
# sqlalchemy/orm/context.py
class AbstractORMCompileState(CompileState):
...
def orm_execute_statement(
cls,
session,
statement,
params,
execution_options,
bind_arguments,
conn,
) -> Result:
result = conn.execute(
statement, params or {}, execution_options=execution_options
)
return cls.orm_setup_cursor_result(
session,
statement,
params,
execution_options,
bind_arguments,
result,
)
Python
복사
# sqlalchemy/orm/session.py
class Session(...):
...
def _execute_internal(self, statement, ...):
...
if compile_state_cls:
result: Result[Any] = compile_state_cls.orm_execute_statement(
self,
statement,
params or {},
execution_options,
bind_arguments,
conn,
)
...
"""
result = ChunkedIteratorResult(
row_metadata,
chunks,
source_supports_scalars=single_entity,
raw=cursor,
dynamic_yield_per=cursor.context._is_server_side,
)
from loading.instances
"""
return result
Python
복사
One
# sqlalchemy/engine/result.py
class ScalarResult(FilterResult[_R]):
...
def one(self) -> _R:
"""Return exactly one object or raise an exception.
Equivalent to :meth:`_engine.Result.one` except that
scalar values, rather than :class:`_engine.Row` objects,
are returned.
"""
return self._only_one_row(
raise_for_second_row=True, raise_for_none=True, scalar=False
)
Python
복사
# sqlalchemy/engine/result.py
class ResultInternal(InplaceGenerative, Generic[_R]):
...
def _only_one_row(
self,
raise_for_second_row: bool,
raise_for_none: bool,
scalar: bool,
) -> Optional[_R]:
onerow = self._fetchone_impl # <bound method FilterResult._fetchone_impl of <sqlalchemy.engine.result.ScalarResult object at 0x14d30e570>>
row: Optional[_InterimRowType[Any]] = onerow(hard_close=True)
if row is None:
if raise_for_none:
raise exc.NoResultFound(
"No row was found when one was required"
)
else:
return None
if scalar and self._source_supports_scalars:
self._generate_rows = False
make_row = None
else:
make_row = self._row_getter
try:
row = make_row(row) if make_row else row
except:
self._soft_close(hard=True)
raise
if raise_for_second_row:
if self._unique_filter_state:
# for no second row but uniqueness, need to essentially
# consume the entire result :(
uniques, strategy = self._unique_strategy
existing_row_hash = strategy(row) if strategy else row
while True:
next_row: Any = onerow(hard_close=True)
if next_row is None:
next_row = _NO_ROW
break
try:
next_row = make_row(next_row) if make_row else next_row
if strategy:
assert next_row is not _NO_ROW
if existing_row_hash == strategy(next_row):
continue
elif row == next_row:
continue
# here, we have a row and it's different
break
except:
self._soft_close(hard=True)
raise
else:
next_row = onerow(hard_close=True)
if next_row is None:
next_row = _NO_ROW
if next_row is not _NO_ROW:
self._soft_close(hard=True)
raise exc.MultipleResultsFound(
"Multiple rows were found when exactly one was required"
if raise_for_none
else "Multiple rows were found when one or none "
"was required"
)
else:
next_row = _NO_ROW
# if we checked for second row then that would have
# closed us :)
self._soft_close(hard=True)
if not scalar:
post_creational_filter = self._post_creational_filter
if post_creational_filter:
row = post_creational_filter(row)
if scalar and make_row:
return row[0] # type: ignore
else:
return row # type: ignore
Python
복사
# sqlalchemy/engine/result.py
class FilterResult(ResultInternal[_R]):
...
def _fetchone_impl(
self, hard_close: bool = False
) -> Optional[_InterimRowType[Row[Any]]]:
return self._real_result._fetchone_impl(hard_close=hard_close)
Python
복사
# sqlalchemy/engine/result.py
class IteratorResult(Result[_TP]):
"""A :class:`_engine.Result` that gets data from a Python iterator of
:class:`_engine.Row` objects or similar row-like data.
.. versionadded:: 1.4
"""
...
def _fetchone_impl(
self, hard_close: bool = False
) -> Optional[_InterimRowType[Row[Any]]]:
if self._hard_closed:
self._raise_hard_closed()
row = next(self.iterator, _NO_ROW)
if row is _NO_ROW:
self._soft_close(hard=hard_close)
return None
else:
return row
Python
복사
# sqlalchemy/orm/loading.py
# next(self.iterator) leads to "chunk" generator in "instance"
def instances(cursor: CursorResult[Any], context: QueryContext) -> Result[Any]:
"""Return a :class:`.Result` given an ORM query context.
:param cursor: a :class:`.CursorResult`, generated by a statement
which came from :class:`.ORMCompileState`
:param context: a :class:`.QueryContext` object
:return: a :class:`.Result` object representing ORM results
.. versionchanged:: 1.4 The instances() function now uses
:class:`.Result` objects and has an all new interface.
"""
...
def chunks(size): # type: ignore # size: None
while True:
yield_per = size
context.partials = {}
if yield_per:
fetch = cursor.fetchmany(yield_per)
if not fetch:
break
else:
fetch = cursor._raw_all_rows() # we get pk here!!
...
Python
복사
# sqlalchemy/engine/result.py
class ResultInternal(InPlaceGenerative, Generic[_R]):
...
def _raw_all_rows(self) -> List[_R]:
make_row = self._row_getter
assert make_row is not None
rows = self._fetchall_impl()
return [make_row(row) for row in rows]
Python
복사
# sqlalchemy/engine/cursor.py
class CursorResult(Result[_T]):
"""A Result that is representing state from a DBAPI cursor.
.. versionchanged:: 1.4 The :class:`.CursorResult``
class replaces the previous :class:`.ResultProxy` interface.
This classes are based on the :class:`.Result` calling API
which provides an updated usage model and calling facade for
SQLAlchemy Core and SQLAlchemy ORM.
Returns database rows via the :class:`.Row` class, which provides
additional API features and behaviors on top of the raw data returned by
the DBAPI. Through the use of filters such as the :meth:`.Result.scalars`
method, other kinds of objects may also be returned.
.. seealso::
:ref:`tutorial_selecting_data` - introductory material for accessing
:class:`_engine.CursorResult` and :class:`.Row` objects.
"""
...
def _fetchall_impl(self):
return self.cursor_strategy.fetchall(self, self.cursor)
Python
복사
# sqlalchemy/engine/cursor.py
class CursorFetchStrategy(ResultFetchStrategy):
"""Call fetch methods from a DBAPI cursor.
Alternate versions of this class may instead buffer the rows from
cursors or not use cursors at all.
"""
...
def fetchall(
self,
result: CursorResult[Any],
dbapi_cursor: DBAPICursor,
) -> Any:
try:
# fetch here ???
rows = dbapi_cursor.fetchall() # [(2, 'yang', 'yang@gmail.com0')]
result._soft_close()
return rows
except BaseException as e:
self.handle_exception(result, dbapi_cursor, e)
Python
복사
# see more back back ... again from here ...
Python
복사
After fetching rows
# sqlalchemy/orm/loading.py
# next(self.iterator) leads to "chunk" generator in "instance"
def instances(cursor: CursorResult[Any], context: QueryContext) -> Result[Any]:
"""Return a :class:`.Result` given an ORM query context.
:param cursor: a :class:`.CursorResult`, generated by a statement
which came from :class:`.ORMCompileState`
:param context: a :class:`.QueryContext` object
:return: a :class:`.Result` object representing ORM results
.. versionchanged:: 1.4 The instances() function now uses
:class:`.Result` objects and has an all new interface.
"""
...
def chunks(size): # type: ignore # size: None
while True:
yield_per = size
context.partials = {}
if yield_per:
fetch = cursor.fetchmany(yield_per)
if not fetch:
break
else:
fetch = cursor._raw_all_rows() # we get pk here!!
if single_entity:
proc = process[0]
rows = [proc(row) for row in fetch]
else:
# remember, proc is <function _instance_processor.<locals>._instance at 0x14d36f1c0>
rows = [
tuple([proc(row) for proc in process]) for row in fetch
]
...
Python
복사
# sqlalchemy/orm/loading.py
def _instance_processor(
query_entity,
mapper,
context,
result,
path,
...
):
"""Produce a mapper level row processor callable
which processes rows into mapped instances."""
# note that this method, most of which exists in a closure
# called _instance(), resists being broken out, as
# attempts to do so tend to add significant function
# call overhead. _instance() is the most
# performance-critical section in the whole ORM.
identity_class = mapper._identity_class
compile_state = context.compile_state
# look for "row getter" functions that have been assigned along
# with the compile state that were cached from a previous load.
# these are operator.itemgetter() objects that each will extract a
# particular column from each row.
getter_key = ("getters", mapper)
getters = path.get(compile_state.attributes, getter_key, None)
...
session_identity_map = context.session.identity_map
populate_existing = context.populate_existing or mapper.always_refresh
load_evt = bool(mapper.class_manager.dispatch.load)
refresh_evt = bool(mapper.class_manager.dispatch.refresh)
persistent_evt = bool(context.session.dispatch.loaded_as_persistent)
if persistent_evt:
loaded_as_persistent = context.session.dispatch.loaded_as_persistent
instance_state = attributes.instance_state
instance_dict = attributes.instance_dict
session_id = context.session.hash_key
runid = context.runid
identity_token = context.identity_token
...
def _instance(row): # (2, 'yang', 'yang@gmail.com0')
# already know the pk ??
# determine the state that we'll be populating
if refresh_identity_key: # currently None
# fixed state that we're refreshing
state = refresh_state
instance = state.obj()
dict_ = instance_dict(instance)
isnew = state.runid != runid
currentload = True
loaded_instance = False
else:
# look at the row, see if that identity is in the
# session, or we have to create a new one
identitykey = (
identity_class,
primary_key_getter(row),
identity_token, # None
)
# check identity map first
instance = session_identity_map.get(identitykey)
if instance is not None:
# existing instance
state = instance_state(instance)
dict_ = instance_dict(instance)
isnew = state.runid != runid
currentload = not isnew
loaded_instance = False
if version_check and version_id_getter and not currentload:
_validate_version_id(
mapper, state, dict_, row, version_id_getter
)
else:
# create a new instance
# check for non-NULL values in the primary key columns,
# else no entity is returned for the row
if is_not_primary_key(identitykey[1]): # (2,)
# how does session know the pk of the instance? where called?
# this is the breakpoint!! see from here next time its too late
return None
isnew = True
currentload = True
loaded_instance = True
instance = mapper.class_manager.new_instance()
dict_ = instance_dict(instance)
state = instance_state(instance)
state.key = identitykey
state.identity_token = identity_token
# attach instance to session.
state.session_id = session_id
session_identity_map._add_unpresent(state, identitykey)
effective_populate_existing = populate_existing
if refresh_state is state:
effective_populate_existing = True
# populate. this looks at whether this state is new
# for this load or was existing, and whether or not this
# row is the first row with this identity.
if currentload or effective_populate_existing:
# full population routines. Objects here are either
# just created, or we are doing a populate_existing
# be conservative about setting load_path when populate_existing
# is in effect; want to maintain options from the original
# load. see test_expire->test_refresh_maintains_deferred_options
if isnew and (
propagated_loader_options or not effective_populate_existing
):
state.load_options = propagated_loader_options
state.load_path = load_path
_populate_full(
context,
row,
state,
dict_,
isnew,
load_path,
loaded_instance,
effective_populate_existing,
populators,
)
if isnew:
# state.runid should be equal to context.runid / runid
# here, however for event checks we are being more conservative
# and checking against existing run id
# assert state.runid == runid
existing_runid = state.runid
if loaded_instance:
if load_evt:
state.manager.dispatch.load(state, context)
if state.runid != existing_runid:
_warn_for_runid_changed(state)
if persistent_evt:
loaded_as_persistent(context.session, state)
if state.runid != existing_runid:
_warn_for_runid_changed(state)
elif refresh_evt:
state.manager.dispatch.refresh(
state, context, only_load_props
)
if state.runid != runid:
_warn_for_runid_changed(state)
if effective_populate_existing or state.modified:
if refresh_state and only_load_props:
state._commit(dict_, only_load_props)
else:
state._commit_all(dict_, session_identity_map)
if post_load:
post_load.add_state(state, True)
else:
# partial population routines, for objects that were already
# in the Session, but a row matches them; apply eager loaders
# on existing objects, etc.
unloaded = state.unloaded
isnew = state not in context.partials
if not isnew or unloaded or populators["eager"]:
# state is having a partial set of its attributes
# refreshed. Populate those attributes,
# and add to the "context.partials" collection.
to_load = _populate_partial(
context,
row,
state,
dict_,
isnew,
load_path,
unloaded,
populators,
)
if isnew:
if refresh_evt:
existing_runid = state.runid
state.manager.dispatch.refresh(state, context, to_load)
if state.runid != existing_runid:
_warn_for_runid_changed(state)
state._commit(dict_, to_load)
if post_load and context.invoke_all_eagers:
post_load.add_state(state, False)
return instance
...
return _instance
Python
복사