o
    'hI                     @  s  d Z ddlmZ ddlZddlmZmZmZmZm	Z	m
Z
mZ ddlmZmZ ddlmZ ddlmZ ddlmZmZ dd	lmZ dd
lmZmZmZmZmZ ddlmZ ddl m!Z!m"Z"m#Z# ddl$m%Z% ddl&m'Z'm(Z(m)Z) dZ*e+g dZ,erddl-m.Z. ddl/m0Z0 ddl1m2Z2 ddl3m4Z4 ddl5m6Z6 d$ddZ7G dd dee( Z8G dd de8e( Z9G d d! d!e8e( Z:G d"d# d#e:e( Z;dS )%zAWatch changes on a collection, a database, or the entire cluster.    )annotationsN)TYPE_CHECKINGAnyGenericMappingOptionalTypeUnion)CodecOptions_bson_to_dict)RawBSONDocument)	Timestamp)_csotcommon)validate_collation_or_none)ConnectionFailureCursorNotFoundInvalidOperationOperationFailurePyMongoError)_Op)_AggregationCommand_CollectionAggregationCommand_DatabaseAggregationCommand)CommandCursor)_CollationIn_DocumentType	_PipelineT)      Y   [      i  i)#  i{'  iP-  iR-  i{4  i|4  ?      iL4        )ClientSession)
Collection)Database)MongoClient)
Connectionexcr   returnboolc                 C  sV   t | ttfr	dS t | tr)| jdu rdS | jdko| dp(| jdk o(| jtv S dS )z5Return True if given a resumable change stream error.TNF	   ResumableChangeStreamError)
isinstancer   r   r   _max_wire_versionhas_error_labelcode_RESUMABLE_GETMORE_ERRORS)r,    r6   ]/var/www/html/olx_land/venv/lib/python3.10/site-packages/pymongo/synchronous/change_stream.py
_resumableM   s   

r8   c                   @  s   e Zd ZdZ			dTdUddZdVddZedWd!d"ZedXd$d%ZdYd'd(Z	dYd)d*Z
dZd,d-Zd[d2d3Zd\d7d8Zd]d9d:ZdVd;d<ZdVd=d>Zd^d@dAZed_dBdCZejd`dEdFZeZedadGdHZejdbdJdKZd^dLdMZdcdRdSZdS )dChangeStreama  The internal abstract base class for change stream cursors.

    Should not be called directly by application developers. Use
    :meth:`pymongo.collection.Collection.watch`,
    :meth:`pymongo.database.Database.watch`, or
    :meth:`pymongo.mongo_client.MongoClient.watch` instead.

    .. versionadded:: 3.6
    .. seealso:: The MongoDB documentation on `changeStreams <https://mongodb.com/docs/manual/changeStreams/>`_.
    NtargetUUnion[MongoClient[_DocumentType], Database[_DocumentType], Collection[_DocumentType]]pipelineOptional[_Pipeline]full_documentOptional[str]resume_afterOptional[Mapping[str, Any]]max_await_time_msOptional[int]
batch_size	collationOptional[_CollationIn]start_at_operation_timeOptional[Timestamp]sessionOptional[ClientSession]start_aftercommentOptional[Any]full_document_before_changeshow_expanded_eventsOptional[bool]r-   Nonec                 C  s   |d u rg }t d|}t d| t| t d| d| _|j| _|jjj	r8d| _|j
|jj
tdd| _n|| _t|| _|| _|| _|
d u| _|d u| _t|
pV|| _|| _|| _|| _|| _|	| _|| _d| _| jj| _|| _d S )Nr<   r>   	batchSizeFT)document_class)codec_options)r   validate_listvalidate_string_or_noner   %validate_non_negative_integer_or_none_decode_customrT   _orig_codec_optionstype_registry_decoder_mapwith_optionsr   _targetcopydeepcopy	_pipeline_full_document_full_document_before_change_uses_start_after_uses_resume_after_resume_token_max_await_time_ms_batch_size
_collation_start_at_operation_time_session_comment_closed_timeout_show_expanded_events)selfr:   r<   r>   r@   rB   rD   rE   rG   rI   rK   rL   rN   rO   r6   r6   r7   __init__f   s:   





zChangeStream.__init__c                 C  s   |   | _d S N)_create_cursor_cursorro   r6   r6   r7   _initialize_cursor   s   zChangeStream._initialize_cursorType[_AggregationCommand]c                 C     t )z)The aggregation command class to be used.NotImplementedErrorrt   r6   r6   r7   _aggregation_command_class   s   z'ChangeStream._aggregation_command_classr*   c                 C  rw   )zeThe client against which the aggregation commands for
        this ChangeStream will be run.
        rx   rt   r6   r6   r7   _client   s   zChangeStream._clientdict[str, Any]c                 C  s|   i }| j dur| j |d< | jdur| j|d< | j}|dur*| jr%||d< n||d< n
| jdur4| j|d< | jr<| j|d< |S )z=Return the options dict for the $changeStream pipeline stage.NfullDocumentfullDocumentBeforeChange
startAfterresumeAfterstartAtOperationTimeshowExpandedEvents)ra   rb   resume_tokenrc   ri   rn   )ro   optionsr   r6   r6   r7   _change_stream_options   s   








z#ChangeStream._change_stream_optionsc                 C  s0   i }| j dur| j |d< | jdur| j|d< |S )z4Return the options dict for the aggregation command.NmaxAwaitTimeMSrR   )rf   rg   ro   r   r6   r6   r7   _command_options   s   



zChangeStream._command_optionslist[dict[str, Any]]c                 C  s"   |   }d|ig}|| j |S )z;Return the full aggregation pipeline for this ChangeStream.z$changeStream)r   extendr`   )ro   r   full_pipeliner6   r6   r7   _aggregation_pipeline   s   
z"ChangeStream._aggregation_pipelineresultMapping[str, Any]connr+   c                 C  s   |d d s;d|d v r|d d | _ dS | jdu r=| jdu r?| jdu rA|jdkrC|d| _| jdu rEtd|dS dS dS dS dS dS )	aM  Callback that caches the postBatchResumeToken or
        startAtOperationTime from a changeStream aggregate command response
        containing an empty batch of change documents.

        This is implemented as a callback because we need access to the wire
        version in order to determine whether to cache this value.
        cursor
firstBatchpostBatchResumeTokenNFr   operationTimez?Expected field 'operationTime' missing from command response : )re   ri   rd   rc   max_wire_versiongetr   )ro   r   r   r6   r6   r7   _process_result   s(   




zChangeStream._process_resultexplicit_sessionr.   r   c              	   C  sF   | j | jt|  |  || j| jd}| jj|j	| j
||tjdS )ztRun the full aggregation pipeline for this ChangeStream and return
        the corresponding CommandCursor.
        )result_processorrL   )	operation)rz   r]   r   r   r   r   rk   r{   _retryable_read
get_cursor_read_preference_forr   	AGGREGATE)ro   rI   r   cmdr6   r6   r7   _run_aggregation_cmd   s   	
z!ChangeStream._run_aggregation_cmdc                 C  sJ   | j j| jdd}| j|| jd udW  d    S 1 sw   Y  d S )NFclose)rI   r   )r{   _tmp_sessionrj   r   )ro   sr6   r6   r7   rr     s   $zChangeStream._create_cursorc                 C  s0   z| j   W n	 ty   Y nw |  | _ dS )z7Reestablish this change stream after a resumable error.N)rs   r   r   rr   rt   r6   r6   r7   _resume  s   zChangeStream._resumec                 C  s   d| _ | j  dS )zClose this ChangeStream.TN)rl   rs   r   rt   r6   r6   r7   r     s   zChangeStream.closeChangeStream[_DocumentType]c                 C     | S rq   r6   rt   r6   r6   r7   __iter__     zChangeStream.__iter__c                 C  s   t | jS )zThe cached resume token that will be used to resume after the most
        recently returned change.

        .. versionadded:: 3.9
        )r^   r_   re   rt   r6   r6   r7   r     s   zChangeStream.resume_tokenr   c                 C  s$   | j r|  }|dur|S | j st)a  Advance the cursor.

        This method blocks until the next change document is returned or an
        unrecoverable error is raised. This method is used when iterating over
        all changes in the cursor. For example::

            try:
                resume_token = None
                pipeline = [{'$match': {'operationType': 'insert'}}]
                with db.collection.watch(pipeline) as stream:
                    for insert_change in stream:
                        print(insert_change)
                        resume_token = stream.resume_token
            except pymongo.errors.PyMongoError:
                # The ChangeStream encountered an unrecoverable error or the
                # resume attempt failed to recreate the cursor.
                if resume_token is None:
                    # There is no usable resume token because there was a
                    # failure during ChangeStream initialization.
                    logging.error('...')
                else:
                    # Use the interrupted ChangeStream's resume token to create
                    # a new ChangeStream. The new stream will continue from the
                    # last seen insert change without missing any events.
                    with db.collection.watch(
                            pipeline, resume_after=resume_token) as stream:
                        for insert_change in stream:
                            print(insert_change)

        Raises :exc:`StopIteration` if this ChangeStream is closed.
        N)alivetry_nextStopIteration)ro   docr6   r6   r7   next!  s   !zChangeStream.nextc                 C  s   | j  S )zDoes this cursor have the potential to return more data?

        .. note:: Even if :attr:`alive` is ``True``, :meth:`next` can raise
            :exc:`StopIteration` and :meth:`try_next` can return ``None``.

        .. versionadded:: 3.8
        )rl   rt   r6   r6   r7   r   K  s   	zChangeStream.aliveOptional[_DocumentType]c              
   C  sd  | j s| jjs|   z,z| jd}W n! ty5 } zt|s! |   | jd}W Y d}~nd}~ww W n# tyO } zt|sJ|jsJ|    d}~w t	yZ   |    w | jjsbd| _ |du rv| jj
durt| jj
| _d| _|S z|d }W n ty   |   tddw | j s| jj
r| jj
}d| _d| _|| _d| _| jrt|j| jS |S )a  Advance the cursor without blocking indefinitely.

        This method returns the next change document without waiting
        indefinitely for the next change. For example::

            with db.collection.watch() as stream:
                while stream.alive:
                    change = stream.try_next()
                    # Note that the ChangeStream's resume token may be updated
                    # even when no changes are returned.
                    print("Current resume token: %r" % (stream.resume_token,))
                    if change is not None:
                        print("Change document: %r" % (change,))
                        continue
                    # We end up here when there are no recent changes.
                    # Sleep for a while before trying again to avoid flooding
                    # the server with getMore requests when no changes are
                    # available.
                    time.sleep(10)

        If no change document is cached locally then this method runs a single
        getMore command. If the getMore yields any documents, the next
        document is returned, otherwise, if the getMore returns no documents
        (because there have been no changes) then ``None`` is returned.

        :return: The next change document or ``None`` when no document is available
          after running a single getMore or when the cursor is closed.

        .. versionadded:: 3.8
        TFN_idzECannot provide resume functionality when the resume token is missing.)rl   rs   r   r   	_try_nextr   r8   timeoutr   BaseException_post_batch_resume_tokenre   ri   KeyErrorr   	_has_nextrc   rd   rX   r   rawrY   )ro   changer,   r   r6   r6   r7   r   V  s`    
zChangeStream.try_nextc                 C  r   rq   r6   rt   r6   r6   r7   	__enter__  r   zChangeStream.__enter__exc_typer   exc_valexc_tbc                 C  s   |    d S rq   r   )ro   r   r   r   r6   r6   r7   __exit__  s   zChangeStream.__exit__)NNN)r:   r;   r<   r=   r>   r?   r@   rA   rB   rC   rD   rC   rE   rF   rG   rH   rI   rJ   rK   rA   rL   rM   rN   r?   rO   rP   r-   rQ   )r-   rQ   )r-   rv   )r-   r*   r-   r|   )r-   r   )r   r   r   r+   r-   rQ   )rI   rJ   r   r.   r-   r   )r-   r   )r-   r   )r-   rA   )r-   r   )r-   r.   )r-   r   )r   r   r   r   r   r   r-   rQ   )__name__
__module____qualname____doc__rp   ru   propertyrz   r{   r   r   r   r   r   rr   r   r   r   r   r   applyr   __next__r   r   r   r   r6   r6   r6   r7   r9   Z   s>    
7


	





'

_r9   c                   @  6   e Zd ZU dZded< edddZedd	d
ZdS )CollectionChangeStreamzA change stream that watches changes on a single collection.

    Should not be called directly by application developers. Use
    helper method :meth:`pymongo.collection.Collection.watch` instead.

    .. versionadded:: 3.7
    zCollection[_DocumentType]r]   r-   #Type[_CollectionAggregationCommand]c                 C     t S rq   )r   rt   r6   r6   r7   rz        z1CollectionChangeStream._aggregation_command_classMongoClient[_DocumentType]c                 C  s
   | j jjS rq   )r]   databaseclientrt   r6   r6   r7   r{     s   
zCollectionChangeStream._clientN)r-   r   r-   r   r   r   r   r   __annotations__r   rz   r{   r6   r6   r6   r7   r        
 r   c                   @  r   )DatabaseChangeStreamzA change stream that watches changes on all collections in a database.

    Should not be called directly by application developers. Use
    helper method :meth:`pymongo.database.Database.watch` instead.

    .. versionadded:: 3.7
    zDatabase[_DocumentType]r]   r-   !Type[_DatabaseAggregationCommand]c                 C  r   rq   )r   rt   r6   r6   r7   rz     r   z/DatabaseChangeStream._aggregation_command_classr   c                 C  s   | j jS rq   )r]   r   rt   r6   r6   r7   r{     s   zDatabaseChangeStream._clientN)r-   r   r   r   r6   r6   r6   r7   r     r   r   c                      s"   e Zd ZdZd fddZ  ZS )ClusterChangeStreamzA change stream that watches changes on all collections in the cluster.

    Should not be called directly by application developers. Use
    helper method :meth:`pymongo.mongo_client.MongoClient.watch` instead.

    .. versionadded:: 3.7
    r-   r|   c                   s   t   }d|d< |S )NTallChangesForCluster)superr   r   	__class__r6   r7   r     s   
z*ClusterChangeStream._change_stream_optionsr   )r   r   r   r   r   __classcell__r6   r6   r   r7   r     s    r   )r,   r   r-   r.   )<r   
__future__r   r^   typingr   r   r   r   r   r   r	   bsonr
   r   bson.raw_bsonr   bson.timestampr   pymongor   r   pymongo.collationr   pymongo.errorsr   r   r   r   r   pymongo.operationsr   pymongo.synchronous.aggregationr   r   r   "pymongo.synchronous.command_cursorr   pymongo.typingsr   r   r   _IS_SYNC	frozensetr5   "pymongo.synchronous.client_sessionr'   pymongo.synchronous.collectionr(   pymongo.synchronous.databaser)    pymongo.synchronous.mongo_clientr*   pymongo.synchronous.poolr+   r8   r9   r   r   r   r6   r6   r6   r7   <module>   s>   $
  e