Search
🎴

SQLAlchemy Identity Map

생성일
2023/12/31 11:36
태그
날짜
3 more properties

Flow

session이 identity map을 관리하는 로직만 살펴보자

API Handler

Result.one 호출

@app.get("/{name}") def get_user(name: str, db: Session = Depends(get_db)): # some calls would cache the objects pre_stmt = select(User).where(User.name == 'yang') pre_db_user = db.scalars(pre_stmt).one() # some business logic stmt = select(User).where(User.name == name) db_user = db.scalars(stmt).one() return db_user
Python
복사

Session.scalars

# sqlalchemy/orm/session.py class Session(...): ... def scalars(self, statement: Executable, ...) -> ScalarResult[Any]: return self._execute_internal( statetment, ... ).scalars()
Python
복사
# sqlalchemy/orm/session.py class Session(...): ... def _execute_internal(self, statement, ...): ... if ( statement._propagate_attrs.get("compile_state_plugin", None) == "orm" ): compile_state_cls = CompileState._get_plugin_class_for_plugin( statement, "orm" ) ... if compile_state_cls is not None: # now run orm_pre_session_exec() "for real". if there were # event hooks, this will re-run the steps that interpret # new execution_options into load_options / update_delete_options, # which we assume the event hook might have updated. # autoflush will also be invoked in this step if enabled. ( statement, execution_options, ) = compile_state_cls.orm_pre_session_exec( self, statement, params, execution_options, bind_arguments, False, ) ...
Python
복사
# sqlalchemy/orm/context.py class ORMCompileState(AbstractORMCompileState): ... @classmethod def orm_pre_session_exec(cls, session, statement, ...): # consume result-level load_options. These may have been set up # in an ORMExecuteState hook ( load_options, execution_options, ) = QueryContext.default_load_options.from_execution_options( "_sa_orm_load_options", { "populate_existing", "autoflush", "yield_per", "identity_token", "sa_top_level_orm_context", }, execution_options, statement._execution_options, ) ... return statement, execution_options
Python
복사
# sqlalchemy/orm/session.py class Session(...): ... def _execute_internal(self, statement, ...): ... if compile_state_cls is not None: ( statement, execution_options, ) = compile_state_cls.orm_pre_session_exec( self, statement, params, execution_options, bind_arguments, False, ) ... # important !! later on bind = self.get_bind(**bind_arguments) # Engine(postgresql://postgres:***@127.0.0.1/postgres) conn = self._connection_for_bind(bind) ... if compile_state_cls: result: Result[Any] = compile_state_cls.orm_execute_statement( self, statement, params or {}, execution_options, bind_arguments, conn, ) ... return result
Python
복사
# sqlalchemy/orm/context.py class AbstractORMCompileState(CompileState): ... def orm_execute_statement( cls, session, statement, params, execution_options, bind_arguments, conn, ) -> Result: result = conn.execute( statement, params or {}, execution_options=execution_options ) return cls.orm_setup_cursor_result( session, statement, params, execution_options, bind_arguments, result, )
Python
복사
# sqlalchemy/orm/context.py class ORMCompileState(AbstractORMCompileState): ... @classmethod def orm_setup_cursor_result( cls, session, statement, params, execution_options, bind_arguments, result, ): execution_context = result.context compile_state = execution_context.compiled.compile_state # cover edge case where ORM entities used in legacy select # were passed to session.execute: # session.execute(legacy_select([User.id, User.name])) # see test_query->test_legacy_tuple_old_select load_options = execution_options.get( "_sa_orm_load_options", QueryContext.default_load_options ) ... querycontext = QueryContext( compile_state, statement, params, session, load_options, execution_options, bind_arguments, ) return loading.instances(result, querycontext)
Python
복사
# sqlalchemy/orm/loading.py def instances(cursor: CursorResult[Any], context: QueryContext) -> Result[Any]: """Return a :class:`.Result` given an ORM query context. :param cursor: a :class:`.CursorResult`, generated by a statement which came from :class:`.ORMCompileState` :param context: a :class:`.QueryContext` object :return: a :class:`.Result` object representing ORM results .. versionchanged:: 1.4 The instances() function now uses :class:`.Result` objects and has an all new interface. """ context.runid = _new_runid() if context.top_level_context: is_top_level = False context.post_load_paths = context.top_level_context.post_load_paths else: is_top_level = True context.post_load_paths = {} compile_state = context.compile_state filtered = compile_state._has_mapper_entities single_entity = ( not context.load_options._only_return_tuples and len(compile_state._entities) == 1 and compile_state._entities[0].supports_single_entity ) try: (process, labels, extra) = list( zip( *[ query_entity.row_processor(context, cursor) for query_entity in context.compile_state._entities ] ) ) ... ...
Python
복사
# sqlalchemy/orm/context.py class _MapperEntity(_QueryEntity): ... def row_processor(self, context, result): compile_state = context.compile_state adapter = self._get_entity_clauses(compile_state) ... _instance = loading._instance_processor( self, self.mapper, context, result, self.path, adapter, ... ) return _instance, self._label_name, self._extra_entities
Python
복사
# sqlalchemy/orm/loading.py def _instance_processor( query_entity, mapper, context, result, path, adapter, ... ): """Produce a mapper level row processor callable which processes rows into mapped instances.""" # note that this method, most of which exists in a closure # called _instance(), resists being broken out, as # attempts to do so tend to add significant function # call overhead. _instance() is the most # performance-critical section in the whole ORM. identity_class = mapper._identity_class compile_state = context.compile_state # look for "row getter" functions that have been assigned along # with the compile state that were cached from a previous load. # these are operator.itemgetter() objects that each will extract a # particular column from each row. getter_key = ("getters", mapper) # ('getters', Mapper[User(users)]) getters = path.get(compile_state.attributes, getter_key, None) # None at the first application run, second -> {'cached_populators': {'new': [], 'quick': [('id', operator.itemgetter(0)), ('name', operator.itemgetter(1)), ('email', operator.itemgetter(2))], 'deferred': [], 'expire': [], 'existing': [], 'eager': []}, 'todo': [], 'primary_key_getter': operator.itemgetter(slice(0, 1, None))} if getters is None: # no getters, so go through a list of attributes we are loading for, # and the ones that are column based will have already put information # for us in another collection "memoized_setups", which represents the # output of the LoaderStrategy.setup_query() method. We can just as # easily call LoaderStrategy.create_row_processor for each, but by # getting it all at once from setup_query we save another method call # per attribute. props = mapper._prop_set # frozenset({User.id, User.name, User.email}) if only_load_props is not None: props = props.intersection( mapper._props[k] for k in only_load_props ) quick_populators = path.get( context.attributes, "memoized_setups", EMPTY_DICT ) todo = [] cached_populators = { "new": [], "quick": [], "deferred": [], "expire": [], "existing": [], "eager": [], } if refresh_state is None: # we can also get the "primary key" tuple getter function pk_cols = mapper.primary_key if adapter: pk_cols = [adapter.columns[c] for c in pk_cols] primary_key_getter = result._tuple_getter(pk_cols) else: primary_key_getter = None getters = { "cached_populators": cached_populators, "todo": todo, "primary_key_getter": primary_key_getter, } for prop in props: # prop: User.id, User.name, User.email if prop in quick_populators: # this is an inlined path just for column-based attributes. col = quick_populators[prop] # users.id if col is _DEFER_FOR_STATE: cached_populators["new"].append( (prop.key, prop._deferred_column_loader) ) elif col is _SET_DEFERRED_EXPIRED: # note that in this path, we are no longer # searching in the result to see if the column might # be present in some unexpected way. cached_populators["expire"].append((prop.key, False)) elif col is _RAISE_FOR_STATE: cached_populators["new"].append( (prop.key, prop._raise_column_loader) ) else: getter = None if adapter: # see here # this logic had been removed for all 1.4 releases # up until 1.4.18; the adapter here is particularly # the compound eager adapter which isn't accommodated # in the quick_populators right now. The "fallback" # logic below instead took over in many more cases # until issue #6596 was identified. # note there is still an issue where this codepath # produces no "getter" for cases where a joined-inh # mapping includes a labeled column property, meaning # KeyError is caught internally and we fall back to # _getter(col), which works anyway. The adapter # here for joined inh without any aliasing might not # be useful. Tests which see this include # test.orm.inheritance.test_basic -> # EagerTargetingTest.test_adapt_stringency # OptimizedLoadTest.test_column_expression_joined # PolymorphicOnNotLocalTest.test_polymorphic_on_column_prop # noqa: E501 # adapted_col = adapter.columns[col] if adapted_col is not None: getter = result._getter(adapted_col, False) if not getter: # getter is set here getter = result._getter(col, False) if getter: cached_populators["quick"].append((prop.key, getter)) else: # fall back to the ColumnProperty itself, which # will iterate through all of its columns # to see if one fits prop.create_row_processor( context, query_entity, path, mapper, result, adapter, cached_populators, ) else: # loader strategies like subqueryload, selectinload, # joinedload, basically relationships, these need to interact # with the context each time to work correctly. todo.append(prop) path.set(compile_state.attributes, getter_key, getters) cached_populators = getters["cached_populators"] populators = {key: list(value) for key, value in cached_populators.items()} for prop in getters["todo"]: # pass prop.create_row_processor( context, query_entity, path, mapper, result, adapter, populators ) propagated_loader_options = context.propagated_loader_options load_path = ( # ORM Path[Mapper[User(users)]] context.compile_state.current_path + path if context.compile_state.current_path.path else path ) # identity map here session_identity_map = context.session.identity_map # <sqlalchemy.orm.identity.WeakInstanceDict object at 0x127ca7250> populate_existing = context.populate_existing or mapper.always_refresh load_evt = bool(mapper.class_manager.dispatch.load) refresh_evt = bool(mapper.class_manager.dispatch.refresh) persistent_evt = bool(context.session.dispatch.loaded_as_persistent) if persistent_evt: loaded_as_persistent = context.session.dispatch.loaded_as_persistent instance_state = attributes.instance_state instance_dict = attributes.instance_dict session_id = context.session.hash_key runid = context.runid identity_token = context.identity_token ... post_load = PostLoad.for_context(context, load_path, only_load_props) if refresh_state: refresh_identity_key = refresh_state.key if refresh_identity_key is None: # super-rare condition; a refresh is being called # on a non-instance-key instance; this is meant to only # occur within a flush() refresh_identity_key = mapper._identity_key_from_state( refresh_state ) else: refresh_identity_key = None primary_key_getter = getters["primary_key_getter"] ... def _instance(row): # determine the state that we'll be populating ... instance = session_identity_map.get(identitykey) ... return instance ... return _instance
Python
복사
# sqlalchemy/orm/context.py class _MapperEntity(_QueryEntity): ... def row_processor(self, context, result): compile_state = context.compile_state adapter = self._get_entity_clauses(compile_state) ... _instance = loading._instance_processor( self, self.mapper, context, result, self.path, adapter, ... ) return _instance, self._label_name, self._extra_entities
Python
복사
# sqlalchemy/orm/loading.py def instances(cursor: CursorResult[Any], context: QueryContext) -> Result[Any]: ... try: # now, process: (_instance function, ) (process, labels, extra) = list( zip( *[ query_entity.row_processor(context, cursor) for query_entity in context.compile_state._entities ] ) ) if context.yield_per and ( context.loaders_require_buffering or context.loaders_require_uniquing ): raise sa_exc.InvalidRequestError( "Can't use yield_per with eager loaders that require uniquing " "or row buffering, e.g. joinedload() against collections " "or subqueryload(). Consider the selectinload() strategy " "for better flexibility in loading objects." ) except Exception: with util.safe_reraise(): cursor.close() ... def chunks(size): # type: ignore ... ... result = ChunkedIteratorResult( row_metadata, chunks, source_supports_scalars=single_entity, raw=cursor, dynamic_yield_per=cursor.context._is_server_side, ) ... return result
Python
복사
# sqlalchemy/orm/context.py class ORMCompileState(AbstractORMCompileState): ... @classmethod def orm_setup_cursor_result( cls, session, statement, params, execution_options, bind_arguments, result, ): ... return loading.instances(result, querycontext)
Python
복사
# sqlalchemy/orm/context.py class AbstractORMCompileState(CompileState): ... def orm_execute_statement( cls, session, statement, params, execution_options, bind_arguments, conn, ) -> Result: result = conn.execute( statement, params or {}, execution_options=execution_options ) return cls.orm_setup_cursor_result( session, statement, params, execution_options, bind_arguments, result, )
Python
복사
# sqlalchemy/orm/session.py class Session(...): ... def _execute_internal(self, statement, ...): ... if compile_state_cls: result: Result[Any] = compile_state_cls.orm_execute_statement( self, statement, params or {}, execution_options, bind_arguments, conn, ) ... """ result = ChunkedIteratorResult( row_metadata, chunks, source_supports_scalars=single_entity, raw=cursor, dynamic_yield_per=cursor.context._is_server_side, ) from loading.instances """ return result
Python
복사

One

# sqlalchemy/engine/result.py class ScalarResult(FilterResult[_R]): ... def one(self) -> _R: """Return exactly one object or raise an exception. Equivalent to :meth:`_engine.Result.one` except that scalar values, rather than :class:`_engine.Row` objects, are returned. """ return self._only_one_row( raise_for_second_row=True, raise_for_none=True, scalar=False )
Python
복사
# sqlalchemy/engine/result.py class ResultInternal(InplaceGenerative, Generic[_R]): ... def _only_one_row( self, raise_for_second_row: bool, raise_for_none: bool, scalar: bool, ) -> Optional[_R]: onerow = self._fetchone_impl # <bound method FilterResult._fetchone_impl of <sqlalchemy.engine.result.ScalarResult object at 0x14d30e570>> row: Optional[_InterimRowType[Any]] = onerow(hard_close=True) if row is None: if raise_for_none: raise exc.NoResultFound( "No row was found when one was required" ) else: return None if scalar and self._source_supports_scalars: self._generate_rows = False make_row = None else: make_row = self._row_getter try: row = make_row(row) if make_row else row except: self._soft_close(hard=True) raise if raise_for_second_row: if self._unique_filter_state: # for no second row but uniqueness, need to essentially # consume the entire result :( uniques, strategy = self._unique_strategy existing_row_hash = strategy(row) if strategy else row while True: next_row: Any = onerow(hard_close=True) if next_row is None: next_row = _NO_ROW break try: next_row = make_row(next_row) if make_row else next_row if strategy: assert next_row is not _NO_ROW if existing_row_hash == strategy(next_row): continue elif row == next_row: continue # here, we have a row and it's different break except: self._soft_close(hard=True) raise else: next_row = onerow(hard_close=True) if next_row is None: next_row = _NO_ROW if next_row is not _NO_ROW: self._soft_close(hard=True) raise exc.MultipleResultsFound( "Multiple rows were found when exactly one was required" if raise_for_none else "Multiple rows were found when one or none " "was required" ) else: next_row = _NO_ROW # if we checked for second row then that would have # closed us :) self._soft_close(hard=True) if not scalar: post_creational_filter = self._post_creational_filter if post_creational_filter: row = post_creational_filter(row) if scalar and make_row: return row[0] # type: ignore else: return row # type: ignore
Python
복사
# sqlalchemy/engine/result.py class FilterResult(ResultInternal[_R]): ... def _fetchone_impl( self, hard_close: bool = False ) -> Optional[_InterimRowType[Row[Any]]]: return self._real_result._fetchone_impl(hard_close=hard_close)
Python
복사
# sqlalchemy/engine/result.py class IteratorResult(Result[_TP]): """A :class:`_engine.Result` that gets data from a Python iterator of :class:`_engine.Row` objects or similar row-like data. .. versionadded:: 1.4 """ ... def _fetchone_impl( self, hard_close: bool = False ) -> Optional[_InterimRowType[Row[Any]]]: if self._hard_closed: self._raise_hard_closed() row = next(self.iterator, _NO_ROW) if row is _NO_ROW: self._soft_close(hard=hard_close) return None else: return row
Python
복사
# sqlalchemy/orm/loading.py # next(self.iterator) leads to "chunk" generator in "instance" def instances(cursor: CursorResult[Any], context: QueryContext) -> Result[Any]: """Return a :class:`.Result` given an ORM query context. :param cursor: a :class:`.CursorResult`, generated by a statement which came from :class:`.ORMCompileState` :param context: a :class:`.QueryContext` object :return: a :class:`.Result` object representing ORM results .. versionchanged:: 1.4 The instances() function now uses :class:`.Result` objects and has an all new interface. """ ... def chunks(size): # type: ignore # size: None while True: yield_per = size context.partials = {} if yield_per: fetch = cursor.fetchmany(yield_per) if not fetch: break else: fetch = cursor._raw_all_rows() # we get pk here!! ...
Python
복사
# sqlalchemy/engine/result.py class ResultInternal(InPlaceGenerative, Generic[_R]): ... def _raw_all_rows(self) -> List[_R]: make_row = self._row_getter assert make_row is not None rows = self._fetchall_impl() return [make_row(row) for row in rows]
Python
복사
# sqlalchemy/engine/cursor.py class CursorResult(Result[_T]): """A Result that is representing state from a DBAPI cursor. .. versionchanged:: 1.4 The :class:`.CursorResult`` class replaces the previous :class:`.ResultProxy` interface. This classes are based on the :class:`.Result` calling API which provides an updated usage model and calling facade for SQLAlchemy Core and SQLAlchemy ORM. Returns database rows via the :class:`.Row` class, which provides additional API features and behaviors on top of the raw data returned by the DBAPI. Through the use of filters such as the :meth:`.Result.scalars` method, other kinds of objects may also be returned. .. seealso:: :ref:`tutorial_selecting_data` - introductory material for accessing :class:`_engine.CursorResult` and :class:`.Row` objects. """ ... def _fetchall_impl(self): return self.cursor_strategy.fetchall(self, self.cursor)
Python
복사
# sqlalchemy/engine/cursor.py class CursorFetchStrategy(ResultFetchStrategy): """Call fetch methods from a DBAPI cursor. Alternate versions of this class may instead buffer the rows from cursors or not use cursors at all. """ ... def fetchall( self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, ) -> Any: try: # fetch here ??? rows = dbapi_cursor.fetchall() # [(2, 'yang', 'yang@gmail.com0')] result._soft_close() return rows except BaseException as e: self.handle_exception(result, dbapi_cursor, e)
Python
복사
# see more back back ... again from here ...
Python
복사

After fetching rows

# sqlalchemy/orm/loading.py # next(self.iterator) leads to "chunk" generator in "instance" def instances(cursor: CursorResult[Any], context: QueryContext) -> Result[Any]: """Return a :class:`.Result` given an ORM query context. :param cursor: a :class:`.CursorResult`, generated by a statement which came from :class:`.ORMCompileState` :param context: a :class:`.QueryContext` object :return: a :class:`.Result` object representing ORM results .. versionchanged:: 1.4 The instances() function now uses :class:`.Result` objects and has an all new interface. """ ... def chunks(size): # type: ignore # size: None while True: yield_per = size context.partials = {} if yield_per: fetch = cursor.fetchmany(yield_per) if not fetch: break else: fetch = cursor._raw_all_rows() # we get pk here!! if single_entity: proc = process[0] rows = [proc(row) for row in fetch] else: # remember, proc is <function _instance_processor.<locals>._instance at 0x14d36f1c0> rows = [ tuple([proc(row) for proc in process]) for row in fetch ] ...
Python
복사
# sqlalchemy/orm/loading.py def _instance_processor( query_entity, mapper, context, result, path, ... ): """Produce a mapper level row processor callable which processes rows into mapped instances.""" # note that this method, most of which exists in a closure # called _instance(), resists being broken out, as # attempts to do so tend to add significant function # call overhead. _instance() is the most # performance-critical section in the whole ORM. identity_class = mapper._identity_class compile_state = context.compile_state # look for "row getter" functions that have been assigned along # with the compile state that were cached from a previous load. # these are operator.itemgetter() objects that each will extract a # particular column from each row. getter_key = ("getters", mapper) getters = path.get(compile_state.attributes, getter_key, None) ... session_identity_map = context.session.identity_map populate_existing = context.populate_existing or mapper.always_refresh load_evt = bool(mapper.class_manager.dispatch.load) refresh_evt = bool(mapper.class_manager.dispatch.refresh) persistent_evt = bool(context.session.dispatch.loaded_as_persistent) if persistent_evt: loaded_as_persistent = context.session.dispatch.loaded_as_persistent instance_state = attributes.instance_state instance_dict = attributes.instance_dict session_id = context.session.hash_key runid = context.runid identity_token = context.identity_token ... def _instance(row): # (2, 'yang', 'yang@gmail.com0') # already know the pk ?? # determine the state that we'll be populating if refresh_identity_key: # currently None # fixed state that we're refreshing state = refresh_state instance = state.obj() dict_ = instance_dict(instance) isnew = state.runid != runid currentload = True loaded_instance = False else: # look at the row, see if that identity is in the # session, or we have to create a new one identitykey = ( identity_class, primary_key_getter(row), identity_token, # None ) # check identity map first instance = session_identity_map.get(identitykey) if instance is not None: # existing instance state = instance_state(instance) dict_ = instance_dict(instance) isnew = state.runid != runid currentload = not isnew loaded_instance = False if version_check and version_id_getter and not currentload: _validate_version_id( mapper, state, dict_, row, version_id_getter ) else: # create a new instance # check for non-NULL values in the primary key columns, # else no entity is returned for the row if is_not_primary_key(identitykey[1]): # (2,) # how does session know the pk of the instance? where called? # this is the breakpoint!! see from here next time its too late return None isnew = True currentload = True loaded_instance = True instance = mapper.class_manager.new_instance() dict_ = instance_dict(instance) state = instance_state(instance) state.key = identitykey state.identity_token = identity_token # attach instance to session. state.session_id = session_id session_identity_map._add_unpresent(state, identitykey) effective_populate_existing = populate_existing if refresh_state is state: effective_populate_existing = True # populate. this looks at whether this state is new # for this load or was existing, and whether or not this # row is the first row with this identity. if currentload or effective_populate_existing: # full population routines. Objects here are either # just created, or we are doing a populate_existing # be conservative about setting load_path when populate_existing # is in effect; want to maintain options from the original # load. see test_expire->test_refresh_maintains_deferred_options if isnew and ( propagated_loader_options or not effective_populate_existing ): state.load_options = propagated_loader_options state.load_path = load_path _populate_full( context, row, state, dict_, isnew, load_path, loaded_instance, effective_populate_existing, populators, ) if isnew: # state.runid should be equal to context.runid / runid # here, however for event checks we are being more conservative # and checking against existing run id # assert state.runid == runid existing_runid = state.runid if loaded_instance: if load_evt: state.manager.dispatch.load(state, context) if state.runid != existing_runid: _warn_for_runid_changed(state) if persistent_evt: loaded_as_persistent(context.session, state) if state.runid != existing_runid: _warn_for_runid_changed(state) elif refresh_evt: state.manager.dispatch.refresh( state, context, only_load_props ) if state.runid != runid: _warn_for_runid_changed(state) if effective_populate_existing or state.modified: if refresh_state and only_load_props: state._commit(dict_, only_load_props) else: state._commit_all(dict_, session_identity_map) if post_load: post_load.add_state(state, True) else: # partial population routines, for objects that were already # in the Session, but a row matches them; apply eager loaders # on existing objects, etc. unloaded = state.unloaded isnew = state not in context.partials if not isnew or unloaded or populators["eager"]: # state is having a partial set of its attributes # refreshed. Populate those attributes, # and add to the "context.partials" collection. to_load = _populate_partial( context, row, state, dict_, isnew, load_path, unloaded, populators, ) if isnew: if refresh_evt: existing_runid = state.runid state.manager.dispatch.refresh(state, context, to_load) if state.runid != existing_runid: _warn_for_runid_changed(state) state._commit(dict_, to_load) if post_load and context.invoke_all_eagers: post_load.add_state(state, False) return instance ... return _instance
Python
복사