o
    'h5x                     @  s  d Z ddlmZ ddlZddlZddlZddlmZ ddlm	Z	 ddl
mZmZmZmZmZmZ ddlmZ ddlmZ dd	lmZmZ dd
lmZmZ ddlmZ ddlmZ ddlm Z  ddl!m"Z" errddl#m$Z$ ddl%m&Z& ddl'm(Z(m)Z) ddl*m+Z+m,Z,m-Z- ddl.m/Z/m0Z0m1Z1m2Z2m3Z3m4Z4 ddl5m6Z6 ddl7m8Z8m9Z9m:Z: ddl;m<Z<m=Z=m>Z>m?Z?m@Z@ ddlAmBZB ddlCmDZDmEZEmFZFmGZG ddlHmIZImJZJ ddlKmLZL dZMG dd dZNdS )zIThe client-level bulk write operations interface.

.. versionadded:: 4.9
    )annotationsN)MutableMapping)islice)TYPE_CHECKINGAnyMappingOptionalTypeUnion)ObjectId)RawBSONDocument)_csotcommon)AsyncClientSession_validate_session_write_concern)AsyncCollection)AsyncCommandCursor)AsyncDatabase)_handle_reauth)AsyncMongoClient)AsyncConnection)_merge_command"_throw_client_bulk_write_exception)validate_is_document_typevalidate_ok_for_replacevalidate_ok_for_update)ConfigurationErrorConnectionFailureInvalidOperationNotPrimaryErrorOperationFailureWaitQueueTimeoutError)_RETRYABLE_ERROR_CODES)_COMMAND_LOGGER_CommandStatusMessage
_debug_log)_ClientBulkWriteContext_convert_client_bulk_exception_convert_exception_convert_write_result_randint)ReadPreference)ClientBulkWriteResultDeleteResultInsertOneResultUpdateResult)_DocumentOut	_Pipeline)WriteConcernFc                   @  s   e Zd ZdZ					dfdgddZedhddZdiddZ					djdkd,d-Z				dldmd/d0Z			dndod1d2Z
edpd?d@ZdqdBdCZdrdIdJZdsdLdMZdtdTdUZ	dudvdZd[Zdwd]d^Zdxd_d`ZdxdadbZdydddeZdS )z_AsyncClientBulkz4The private guts of the client-level bulk write API.TNFclientr   write_concernr2   orderedboolbypass_document_validationOptional[bool]commentOptional[str]letOptional[Any]verbose_resultsreturnNonec                 C  s   || _ || _|| _| jdurtd| j || _|| _|| _|| _g | _	g | _
d| _d| _d| _d| _d| _| j jj| _d| _d| _dS )z'Initialize a _AsyncClientBulk instance.Nr<   r   F)r4   r5   r<   r   r   r6   bypass_doc_valr:   r>   ops
namespaces
idx_offset	total_opsexecuteduses_collationuses_array_filtersoptionsretry_writesis_retryableretryingstarted_retryable_write)selfr4   r5   r6   r8   r:   r<   r>    rO   \/var/www/html/olx_land/venv/lib/python3.10/site-packages/pymongo/asynchronous/client_bulk.py__init__Y   s&   

z_AsyncClientBulk.__init__Type[_ClientBulkWriteContext]c                 C  s   t S N)r&   )rN   rO   rO   rP   bulk_ctx_classx   s   z_AsyncClientBulk.bulk_ctx_class	namespacestrdocumentr0   c                 C  s^   t d| t|tsd|v st |d< d|d}| jd|f | j| |  jd7  _dS )z*Add an insert document to the list of ops.rW   _id)insertrW   rZ      N)r   
isinstancer   r   rB   appendrC   rE   )rN   rU   rW   cmdrO   rO   rP   
add_insert|   s   


z_AsyncClientBulk.add_insertselectorMapping[str, Any]update#Union[Mapping[str, Any], _Pipeline]multiupsert	collationOptional[Mapping[str, Any]]array_filters!Optional[list[Mapping[str, Any]]]hint Union[str, dict[str, Any], None]sortc
                 C  s   t | d|||d}
|dur||
d< |durd| _||
d< |dur&||
d< |dur1d| _||
d< |	dur9|	|
d	< |r>d
| _| jd|
f | j| |  jd7  _dS )z8Create an update document and add it to the list of ops.rY   rb   filter
updateModsrd   Nre   TarrayFiltersrj   rf   rl   Frb   r[   )r   rH   rG   rK   rB   r]   rC   rE   )rN   rU   r`   rb   rd   re   rf   rh   rj   rl   r^   rO   rO   rP   
add_update   s.   z_AsyncClientBulk.add_updatereplacementc           	      C  s   t | d||dd}|dur||d< |dur||d< |dur&d| _||d< |dur.||d	< | jd
|f | j| |  jd7  _dS )z8Create a replace document and add it to the list of ops.rY   Frm   Nre   rj   Trf   rl   replacer[   )r   rG   rB   r]   rC   rE   )	rN   rU   r`   rr   re   rf   rj   rl   r^   rO   rO   rP   add_replace   s$   z_AsyncClientBulk.add_replacec                 C  sj   d||d}|dur||d< |durd| _ ||d< |rd| _| jd|f | j| |  jd	7  _dS )
z7Create a delete document and add it to the list of ops.rY   )deletern   rd   Nrj   Trf   Fru   r[   )rG   rK   rB   r]   rC   rE   )rN   rU   r`   rd   rf   rj   r^   rO   rO   rP   
add_delete   s   	z_AsyncClientBulk.add_deletebwcr&   r^   MutableMapping[str, Any]
request_idintmsgUnion[bytes, dict[str, Any]]op_docslist[Mapping[str, Any]]ns_docsdict[str, Any]c                   sX  ||d< ||d< t tjr6tt tj|jj|t	t
||j|||jj|jj|jjd |jjd |jjd |jrA||||| zZ|j|||jI dH }tj |j }	t tjrtt tj|jj|	|t	t
||j|||jj|jj|jjd |jjd |jjd |jr||||	 | j||jI dH  W |S  ty+ }
 ztj |j }	t|
tt fr|
j!}nt"|
}t tjrtt tj#|jj|	|t	t
||j|||jj|jj|jjd |jjd |jjt|
t d |jr|$|||	 d	|
i}t|
t r| j|
j!|jI dH  n| ji |jI dH  W Y d}
~
|S W Y d}
~
|S d}
~
ww )
zHA proxy for AsyncConnection.write_command that handles event publishing.rB   nsInfor   r[   messageclientIdcommandcommandNamedatabaseName	requestIdoperationIddriverConnectionIdserverConnectionId
serverHost
serverPort	serviceIdNr   r   
durationMSreplyr   r   r   r   r   r   r   r   r   r   r   r   failurer   r   r   r   r   r   r   r   r   isServerSideErrorerror)%r#   isEnabledForloggingDEBUGr%   r$   STARTED_topology_settings_topology_idnextiterdb_nameconnidserver_connection_idaddress
service_idpublish_startwrite_commandcodecdatetimenow
start_time	SUCCEEDED_succeedr4   _process_responsesession	Exceptionr\   r   r    detailsr(   FAILED_fail)rN   rw   r^   ry   r{   r}   r   r4   r   durationexcr   rO   rO   rP   r      s   





#


 
z_AsyncClientBulk.write_commandbytesc                   s6  t tjr.tt tj|jj|t	t
||j|||jj|jj|jjd |jjd |jjd |jr9|||||}za|j||jI dH }tj |j }	|durYt|j||}
n2ddi}
t tjrtt tj|jj|	|
t	t
||j|||jj|jj|jjd |jjd |jjd |jr|||
|	 W |
S W |
S  ty } zstj |j }	t|trt|j||j}nt|t r|j}nt!|}t tjrtt tj"|jj|	|t	t
||j|||jj|jj|jjd |jjd |jjt|td |jr|jdusJ |#|||	 d|i}
W Y d}~|
S d}~ww )	zFA proxy for AsyncConnection.unack_write that handles event publishing.r   r[   r   Nokr   r   r   )$r#   r   r   r   r%   r$   r   r   r   r   r   r   r   r   r   r   r   r   r   unack_writemax_bson_sizer   r   r   r)   namer   r   r   r\   r    r   r   r(   r   r   )rN   rw   r^   ry   r{   r}   r   r4   resultr   r   r   r   rO   rO   rP   r   ?  s   





 !




z_AsyncClientBulk.unack_writerB   #list[tuple[str, Mapping[str, Any]]]rC   	list[str]7tuple[list[Mapping[str, Any]], list[Mapping[str, Any]]]c           	   	     s>   | |||\}}}}| ||||||| jI dH  ||fS )z6Executes a batch of bulkWrite server commands (unack).N)batch_commandr   r4   )	rN   rw   r^   rB   rC   ry   r{   to_send_ops
to_send_nsrO   rO   rP   _execute_batch_unack  s   z%_AsyncClientBulk._execute_batch_unackGtuple[dict[str, Any], list[Mapping[str, Any]], list[Mapping[str, Any]]]c           
   	     s@   | |||\}}}}| ||||||| jI dH }	|	||fS )z4Executes a batch of bulkWrite server commands (ack).N)r   r   r4   )
rN   rw   r^   rB   rC   ry   r{   r   r   r   rO   rO   rP   _execute_batch  s   

z_AsyncClientBulk._execute_batchfull_resultr   r   r   r   Optional[AsyncClientSession]c              
     sd  | drtt| jddd}t||d |j||du| jd}||I dH  zd|2 z]3 dH W }|d | j }| j	| \}	}
|d sQ|d	 
| | jrQ W dS |d r| jr|	d
krh|
d d }t|dd}|	dv rud}	t|ddd}|	dkrt|dd}|||	 d |< q+6 W dS  ty } z|jr| I dH  t||d< W Y d}~dS d}~ww dS )z?Internal helper for processing the server reply command cursor.cursoradminz$cmd.bulkWrite)databaser   N)r   explicit_sessionr:   idxr   writeErrorsrZ   rW   rX   T)acknowledged)rb   rs   rb   )r   in_client_bulkru   Resultsr   )getr   r   r4   r   r   r:   _maybe_pin_connectionrD   rB   r]   r6   r>   r.   r/   r-   r   alivecloser'   )rN   r   r   r   r   coll
cmd_cursordocoriginal_indexop_typeopinserted_idresr   rO   rO   rP   _process_results_cursor  sT   

z(_AsyncClientBulk._process_results_cursorop_id	retryablefinal_write_concernOptional[WriteConcern]c              	     sT  d}d}	| j j}
|| j | | ||	|||
|| j j}| j| jk r| j| j |jkr1|p0|}ddi}| j |d< | j	|d< |oE|j
 }|sJ|sPt|| | jdurZ| j|d< | jrb| j|d< | jrj| j|d	< |r|rx| jsx|  d
| _|||tj| |||| j  || || j | t| j| jd}t| j| jd}|jrz| ||||I dH \}}}|}|dr|d }t|dot|j t!o|j ddt"v }t|t#ot|t$t%f }|r|s|rt&'|}t(| j| j|| t)|| j nt(| j| j|| t)|| j d|d< g |d< |ddt*|k r%d
|d< |d s9||d< t(| j| j|| dS |r_|di }|ddt"v r_t&'|}t(| j| j|| t)|| j | +||||I dH  t(| j| j|| d| _,d| _n| -||||I dH \}}|  jt*|7  _|d s| j	r|d rdS | j| jk s$dS dS )z<Internal helper for executing batches of bulkWrite commands.r   	bulkWriter[   
errorsOnlyr6   NbypassDocumentValidationr:   r<   Tr   r   coder   r   nErrorsanySuccessfulr   writeConcernErrorF).r4   _event_listenersvalidate_sessionrT   codec_optionsrD   rE   max_write_batch_sizer>   r6   in_transactionr   apply_write_concernrA   r:   r<   rM   _start_retryable_write	_apply_tor+   PRIMARYsend_cluster_timeadd_server_apiapply_timeoutr   rB   rC   r   r   r   hasattrr\   r   dictr"   r   r   r!   copydeepcopyr   r   lenr   rL   r   )rN   r5   r   r   r   r   r   r   r   cmd_name	listenersrw   r^   not_in_transactionrB   rC   
raw_resultr   _r   r   retryable_top_level_errorretryable_network_errorfullwcerO   rO   rP   _execute_command  s   












z!_AsyncClientBulk._execute_command	operationc                   s   ddg g dddddi i i d t  d fdd}jjj|||dI dH   d s8 d s8 d r>t j  S )z'Execute commands with w=1 WriteConcern.FNr   )r   r   r   writeConcernErrors	nInserted	nUpsertednMatched	nModifiednDeletedinsertResultsupdateResultsdeleteResultsr   r   r   r   r   r7   r?   r@   c                   s4   |j dk r
tdj| || I d H  d S )N   <MongoClient.bulk_write requires MongoDB server version 8.0+.)max_wire_versionr   r  r5   )r   r   r   r   r   rN   rO   rP   retryable_bulk~  s   
z8_AsyncClientBulk.execute_command.<locals>.retryable_bulk)bulkoperation_idr   r   r  )r   r   r   r   r   r7   r?   r@   )r*   r4   _retryable_writerK   r   r>   )rN   r   r  r  rO   r  rP   execute_commandh  s6   	z _AsyncClientBulk.execute_commandc              	     s   d}d}| j j}t }| |||||d| j j}| j| jk r~ddi}d|d< d|d< | jdur5| j|d	< d
di|d< | jrC| j|d< | j	rK| j	|d< |
| t| j| jd}t| j| jd}	| ||||	I dH \}
}|  jt|
7  _| j| jk sdS dS )zDExecute commands with OP_MSG and w=0 writeConcern. Always unordered.r   r   Nr[   Tr   Fr6   r   wr   writeConcernr:   r<   )r4   r   r*   rT   r   rD   rE   rA   r:   r<   r   r   rB   rC   r   r   )rN   r   r   r   r   r   rw   r^   rB   rC   r   r   rO   rO   rP   execute_command_unack  s>   





z&_AsyncClientBulk.execute_command_unackc                   s@   | j rtd| jrtd| jdurtd| |I dH S )z3Execute all operations, returning no results (w=0).z3Collation is unsupported for unacknowledged writes.z6arrayFilters is unsupported for unacknowledged writes.NzGCannot set bypass_document_validation with unacknowledged write concern)rG   r   rH   rA   r    r  )rN   r   rO   rO   rP   execute_no_results  s   
z#_AsyncClientBulk.execute_no_resultsr   c              	     s   | j std| jrtdd| _t|| j}| jjsW| j||I dH 4 I dH "}|jdk r4td| 	|I dH  t
dddW  d  I dH  S 1 I dH sRw   Y  | ||I dH }t
|| jj| jS )zExecute operations.zNo operations to executez*Bulk operations can only be executed once.TNr  r  F)rB   r   rF   r   r5   r   r4   _conn_for_writesr  r  r,   r  r>   )rN   r   r  
connectionr   rO   rO   rP   execute  s,   

,z_AsyncClientBulk.execute)TNNNF)r4   r   r5   r2   r6   r7   r8   r9   r:   r;   r<   r=   r>   r7   r?   r@   )r?   rR   )rU   rV   rW   r0   r?   r@   )NNNNN)rU   rV   r`   ra   rb   rc   rd   r7   re   r9   rf   rg   rh   ri   rj   rk   rl   rg   r?   r@   )NNNN)rU   rV   r`   ra   rr   ra   re   r9   rf   rg   rj   rk   rl   rg   r?   r@   )NN)rU   rV   r`   ra   rd   r7   rf   rg   rj   rk   r?   r@   )rw   r&   r^   rx   ry   rz   r{   r|   r}   r~   r   r~   r4   r   r?   r   )rw   r&   r^   rx   ry   rz   r{   r   r}   r~   r   r~   r4   r   r?   rg   )
rw   r&   r^   r   rB   r   rC   r   r?   r   )
rw   r&   r^   r   rB   r   rC   r   r?   r   )
r   rx   r   rx   r   r   r   r   r?   r@   rS   )r5   r2   r   r   r   r   r   rz   r   r7   r   rx   r   r   r?   r@   )r   r   r  rV   r?   rx   )r   r   r?   r@   )r   r   r  rV   r?   r   )__name__
__module____qualname____doc__rQ   propertyrT   r_   rq   rt   rv   r   r   r   r   r   r   r  r  r  r  r  rO   rO   rO   rP   r3   V   sL    
,$
[
Y

; 

5
*r3   )Or  
__future__r   r   r   r   collections.abcr   	itertoolsr   typingr   r   r   r   r	   r
   bson.objectidr   bson.raw_bsonr   pymongor   r   #pymongo.asynchronous.client_sessionr   r   pymongo.asynchronous.collectionr   #pymongo.asynchronous.command_cursorr   pymongo.asynchronous.databaser   pymongo.asynchronous.helpersr   !pymongo.asynchronous.mongo_clientr   pymongo.asynchronous.poolr   pymongo._client_bulk_sharedr   r   pymongo.commonr   r   r   pymongo.errorsr   r   r   r   r    r!   pymongo.helpers_sharedr"   pymongo.loggerr#   r$   r%   pymongo.messager&   r'   r(   r)   r*   pymongo.read_preferencesr+   pymongo.resultsr,   r-   r.   r/   pymongo.typingsr0   r1   pymongo.write_concernr2   _IS_SYNCr3   rO   rO   rO   rP   <module>   s>    	 