
    6jL                         U d Z ddlZddlZddlZddlZddlZddlZddlZddlm	Z	 ddl
mZ ddlmZ  ej        e          ZdedefdZde	fd	Zdefd
Z G d d          Zdaedz  ed<    ej                    Zd Zd Zdedz  fdZdS )ah  
Hermes Web UI -- Gateway session watcher.

Background daemon thread that polls state.db every 5 seconds for changes
to gateway sessions (telegram, discord, slack, etc.). When changes are
detected, it pushes notifications to all subscribed SSE clients.

This enables real-time session list updates in the sidebar without
requiring any changes to hermes-agent.
    N)Path)HOME)"read_importable_agent_session_rowssessionsreturnc                     d                     d t          | d           D                       }t          j        |                                d                                          S )zMCreate a lightweight hash of session IDs and timestamps for change detection.|c           	   3      K   | ]<}|d           d|                     dd           d|                     dd           V  =dS )
session_id:
updated_atr   message_countN)get).0ss     )/root/hermes-webui/api/gateway_watcher.py	<genexpr>z!_snapshot_hash.<locals>.<genexpr>   sl         \?QQQUU<33QQaeeOQ6O6OQQ         c                     | d         S )Nr    )xs    r   <lambda>z _snapshot_hash.<locals>.<lambda>    s
    , r   )keyF)usedforsecurity)joinsortedhashlibmd5encode	hexdigest)r   r   s     r   _snapshot_hashr!      sm    
((  &?&?@@@    C ;szz||U;;;EEGGGr   c            
      h   	 ddl m}  t           |                                                                                       }nk# t
          $ r^ t          t          j        dt          t          dz                                                                                                }Y nw xY w|dz  S )z-Resolve state.db path for the active profile.r   )get_active_hermes_homeHERMES_HOMEz.hermeszstate.db)
api.profilesr#   r   
expanduserresolve	Exceptionosgetenvstrr   )r#   hermes_homes     r   _get_state_db_pathr-   '   s    c777777113344??AAIIKK c c c29]Cy8H4I4IJJKKVVXX``bbc##s   AA A%B,+B,c                     t                      } |                                 sg S 	 g }t          | dt                    D ]}|                    |d         |d         pd|d         pd|d         p	|d	         pd
|d         |d         p|d         |d         pd|                    d          |                    d          |                    d          d
           |S # t          $ r g cY S w xY w)znRead all non-webui sessions from state.db.
    Returns list of session dicts, or empty list on any error.
       )limitlogidtitlezAgent SessionmodelNr   actual_message_countr   
started_atlast_activitysourcecli
raw_sourcesession_sourcesource_label)
r   r3   r4   r   
created_atr   r8   r:   r;   r<   )r-   existsr   loggerappendr   r(   )db_pathr   rows      r   _get_agent_sessions_from_dbrC   1   s*    !""G>> 	5gSfUUU 	 	COO!$iW8W-!$_!5!Y=S9T!YXY!,/!/2Gc,6Gh-05!ggl33"%''*:";"; # 7 7         			s   B8C C.-C.c                   v    e Zd ZdZdZdZd Zd ZdefdZ	d Z
dej        fd	Zd
ej        fdZdefdZd ZdS )GatewayWatchera  Background thread that polls state.db for agent session changes.

    Usage:
        watcher = GatewayWatcher()
        watcher.start()
        q = watcher.subscribe()
        # ... receive change events via q.get() ...
        watcher.unsubscribe(q)
        watcher.stop()
          c                     g | _         t          j                    | _        t          j                    | _        d | _        d| _        g | _        d S )N )	_subscribers	threadingLock	_sub_lockEvent_stop_event_thread
_last_hash_last_sessionsselfs    r   __init__zGatewayWatcher.__init__^   sE    /1"))$?,,04!$&r   c                     | j         r| j                                         rdS | j                                         t	          j        | j        dd          | _         | j                                          dS )z Start the watcher daemon thread.NTzgateway-watcher)targetdaemonname)rP   is_aliverO   clearrK   Thread
_poll_loopstartrS   s    r   r^   zGatewayWatcher.startf   sq    < 	DL1133 	F    'ttRcdddr   r   c                 @    | j         }|duo|                                S )a  Return True when the poll thread is running.

        Public accessor used by ``/api/sessions/gateway/stream`` probe mode and
        the live SSE handler to detect a watcher instance whose poll thread
        died silently (e.g. uncaught exception in ``_poll_loop``).  Callers
        use this to decide whether to return 503 and trigger the client-side
        polling fallback, instead of handing out an SSE connection that would
        never emit events.
        N)rP   rZ   )rT   ts     r   rZ   zGatewayWatcher.is_aliven   s"     L}--r   c                 b   | j                                          | j        5  | j        D ]B}	 |                    d           # t
          $ r t                              d           Y ?w xY w	 ddd           n# 1 swxY w Y   | j        r$| j        	                    d           d| _        dS dS )zStop the watcher thread.Nz%Failed to send sentinel to subscriber   )timeout)
rO   setrM   rJ   putr(   r?   debugrP   r   rT   qs     r   stopzGatewayWatcher.stop{   s(   ^ 	J 	J& J JJEE$KKKK  J J JLL!HIIIIIJJ	J 	J 	J 	J 	J 	J 	J 	J 	J 	J 	J 	J 	J 	J 	J < 	 La(((DLLL	  	 s4   
A9AA9$A)&A9(A))A99A= A=c                     t          j        d          }| j        5  | j                            |           ddd           n# 1 swxY w Y   |S )zSubscribe to change events. Returns a queue.Queue.
        Events are dicts: {'type': 'sessions_changed', 'sessions': [...]}
        A None sentinel means the watcher is stopping.
        
   )maxsizeN)queueQueuerM   rJ   r@   rg   s     r   	subscribezGatewayWatcher.subscribe   s    
 K###^ 	( 	($$Q'''	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	(s   AAArh   c                     | j         5  	 | j                            |           n# t          $ r Y nw xY wddd           dS # 1 swxY w Y   dS )zRemove a subscriber queue.N)rM   rJ   remove
ValueErrorrg   s     r   unsubscribezGatewayWatcher.unsubscribe   s    ^ 	 	!((++++   	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	s,   A%A
2A2AA	Ar   c                    d|d}| j         5  g }| j        D ]b}	 |                    |           # t          j        $ r |                    |           Y ?t          $ r |                    |           Y _w xY w|D ]n}	 | j                            |           n# t          $ r Y nw xY w	 |                    d           E# t          $ r t          
                    d           Y kw xY w	 ddd           dS # 1 swxY w Y   dS )z%Push change event to all subscribers.sessions_changed)typer   Nz*Failed to send sentinel to dead subscriber)rM   rJ   
put_nowaitrm   Fullr@   r(   rq   rr   r?   rf   )rT   r   eventdeadrh   s        r   _notify_subscribersz"GatewayWatcher._notify_subscribers   s    ' 
 
 ^ 	O 	OD& # ##LL''''z # # #KKNNNNN  # # #KKNNNNN# 
O 
O%,,Q////!   DOLL&&&&  O O OLL!MNNNNNO
O	O 	O 	O 	O 	O 	O 	O 	O 	O 	O 	O 	O 	O 	O 	O 	O 	O 	Os   C90C9$A7C9A74C96A77C9?BC9
B'$C9&B''C9+C C9$C(%C9'C((C99C= C=c                    | j                                         s	 t                      }t          |          }|| j        k    r#|| _        || _        |                     |           n,# t          $ r t          	                    dd           Y nw xY wt          | j        dz            D ]2}| j                                         r dS t          j        d           3| j                                         dS dS )z+Main polling loop. Runs in a daemon thread.z"Error in gateway watcher poll loopT)exc_infork   Ng?)rO   is_setrC   r!   rQ   rR   r{   r(   r?   rf   rangePOLL_INTERVALtimesleep)rT   r   current_hash_s       r   r]   zGatewayWatcher._poll_loop   s   "))++ 	 	R688-h774?22&2DO*2D',,X666 R R RADQQQQQR 4-233    #**,, FF
3! "))++ 	  	  	  	  	 s   AA' '&BBN)__name__
__module____qualname____doc__r   SUBSCRIBER_TIMEOUTrU   r^   boolrZ   ri   rm   rn   ro   rs   listr{   r]   r   r   r   rE   rE   O   s        	 	 M' ' '  .$ . . . .     5;    U[    OD O O O O6         r   rE   _watcherc                      t           5  t          't                      at                                           ddd           dS # 1 swxY w Y   dS )z.Start the global gateway watcher (idempotent).N)_watcher_lockr   rE   r^   r   r   r   start_watcherr      s     
  %''HNN                 s   /AAAc                      t           5  t          t                                           daddd           dS # 1 swxY w Y   dS )z Stop the global gateway watcher.N)r   r   ri   r   r   r   stop_watcherr      s     
  MMOOOH                 s   #8<<c                  R    t           5  t          cddd           S # 1 swxY w Y   dS )z9Get the global watcher instance (or None if not started).N)r   r   r   r   r   get_watcherr      so    	                   s     )r   r   jsonloggingr)   rm   rK   r   pathlibr   
api.configr   api.agent_sessionsr   	getLoggerr   r?   r   r+   r!   r-   rC   rE   r   __annotations__rL   r   r   r   r   r   r   r   <module>r      s  	 	 	    				                   A A A A A A		8	$	$
HT Hc H H H H$D $ $ $ $T    <y  y  y  y  y  y  y  y | #'.4
 & & &	      ^d*      r   