-
Notifications
You must be signed in to change notification settings - Fork 9.9k
Expand file tree
/
Copy pathapp.py
More file actions
315 lines (273 loc) · 11.9 KB
/
app.py
File metadata and controls
315 lines (273 loc) · 11.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
# encoding:utf-8
import os
import signal
import sys
import time
from channel import channel_factory
from common import const
from common.log import logger
from config import load_config, conf
from plugins import *
import threading
_channel_mgr = None
def get_channel_manager():
return _channel_mgr
def _parse_channel_type(raw) -> list:
"""
Parse channel_type config value into a list of channel names.
Supports:
- single string: "feishu"
- comma-separated string: "feishu, dingtalk"
- list: ["feishu", "dingtalk"]
"""
if isinstance(raw, list):
return [ch.strip() for ch in raw if ch.strip()]
if isinstance(raw, str):
return [ch.strip() for ch in raw.split(",") if ch.strip()]
return []
class ChannelManager:
"""
Manage the lifecycle of multiple channels running concurrently.
Each channel.startup() runs in its own daemon thread.
The web channel is started as default console unless explicitly disabled.
"""
def __init__(self):
self._channels = {} # channel_name -> channel instance
self._threads = {} # channel_name -> thread
self._primary_channel = None
self._lock = threading.Lock()
self.cloud_mode = False # set to True when cloud client is active
@property
def channel(self):
"""Return the primary (first non-web) channel for backward compatibility."""
return self._primary_channel
def get_channel(self, channel_name: str):
return self._channels.get(channel_name)
def start(self, channel_names: list, first_start: bool = False):
"""
Create and start one or more channels in sub-threads.
If first_start is True, plugins and linkai client will also be initialized.
"""
with self._lock:
channels = []
for name in channel_names:
ch = channel_factory.create_channel(name)
ch.cloud_mode = self.cloud_mode
self._channels[name] = ch
channels.append((name, ch))
if self._primary_channel is None and name != "web":
self._primary_channel = ch
if self._primary_channel is None and channels:
self._primary_channel = channels[0][1]
if first_start:
PluginManager().load_plugins()
# Cloud client is optional. It is only started when
# use_linkai=True AND cloud_deployment_id is set.
# By default neither is configured, so the app runs
# entirely locally without any remote connection.
if conf().get("use_linkai") and (
os.environ.get("CLOUD_DEPLOYMENT_ID") or conf().get("cloud_deployment_id")
):
try:
from common import cloud_client
threading.Thread(
target=cloud_client.start,
args=(self._primary_channel, self),
daemon=True,
).start()
except Exception:
pass
# Start web console first so its logs print cleanly,
# then start remaining channels after a brief pause.
web_entry = None
other_entries = []
for entry in channels:
if entry[0] == "web":
web_entry = entry
else:
other_entries.append(entry)
ordered = ([web_entry] if web_entry else []) + other_entries
for i, (name, ch) in enumerate(ordered):
if i > 0 and name != "web":
time.sleep(0.1)
t = threading.Thread(target=self._run_channel, args=(name, ch), daemon=True)
self._threads[name] = t
t.start()
logger.debug(f"[ChannelManager] Channel '{name}' started in sub-thread")
def _run_channel(self, name: str, channel):
try:
channel.startup()
except Exception as e:
logger.error(f"[ChannelManager] Channel '{name}' startup error: {e}")
logger.exception(e)
def stop(self, channel_name: str = None):
"""
Stop channel(s). If channel_name is given, stop only that channel;
otherwise stop all channels.
"""
# Pop under lock, then stop outside lock to avoid deadlock
with self._lock:
names = [channel_name] if channel_name else list(self._channels.keys())
to_stop = []
for name in names:
ch = self._channels.pop(name, None)
th = self._threads.pop(name, None)
to_stop.append((name, ch, th))
if channel_name and self._primary_channel is self._channels.get(channel_name):
self._primary_channel = None
for name, ch, th in to_stop:
if ch is None:
logger.warning(f"[ChannelManager] Channel '{name}' not found in managed channels")
if th and th.is_alive():
self._interrupt_thread(th, name)
continue
logger.info(f"[ChannelManager] Stopping channel '{name}'...")
graceful = False
if hasattr(ch, 'stop'):
try:
ch.stop()
graceful = True
except Exception as e:
logger.warning(f"[ChannelManager] Error during channel '{name}' stop: {e}")
if th and th.is_alive():
th.join(timeout=5)
if th.is_alive():
if graceful:
logger.info(f"[ChannelManager] Channel '{name}' thread still alive after stop(), "
"leaving daemon thread to finish on its own")
else:
logger.warning(f"[ChannelManager] Channel '{name}' thread did not exit in 5s, forcing interrupt")
self._interrupt_thread(th, name)
@staticmethod
def _interrupt_thread(th: threading.Thread, name: str):
"""Raise SystemExit in target thread to break blocking loops like start_forever."""
import ctypes
try:
tid = th.ident
if tid is None:
return
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_ulong(tid), ctypes.py_object(SystemExit)
)
if res == 1:
logger.info(f"[ChannelManager] Interrupted thread for channel '{name}'")
elif res > 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_ulong(tid), None)
logger.warning(f"[ChannelManager] Failed to interrupt thread for channel '{name}'")
except Exception as e:
logger.warning(f"[ChannelManager] Thread interrupt error for '{name}': {e}")
def restart(self, new_channel_name: str):
"""
Restart a single channel with a new channel type.
Can be called from any thread (e.g. linkai config callback).
"""
logger.info(f"[ChannelManager] Restarting channel to '{new_channel_name}'...")
self.stop(new_channel_name)
_clear_singleton_cache(new_channel_name)
time.sleep(1)
self.start([new_channel_name], first_start=False)
logger.info(f"[ChannelManager] Channel restarted to '{new_channel_name}' successfully")
def add_channel(self, channel_name: str):
"""
Dynamically add and start a new channel.
If the channel is already running, restart it instead.
"""
with self._lock:
if channel_name in self._channels:
logger.info(f"[ChannelManager] Channel '{channel_name}' already exists, restarting")
if self._channels.get(channel_name):
self.restart(channel_name)
return
logger.info(f"[ChannelManager] Adding channel '{channel_name}'...")
_clear_singleton_cache(channel_name)
self.start([channel_name], first_start=False)
logger.info(f"[ChannelManager] Channel '{channel_name}' added successfully")
def remove_channel(self, channel_name: str):
"""
Dynamically stop and remove a running channel.
"""
with self._lock:
if channel_name not in self._channels:
logger.warning(f"[ChannelManager] Channel '{channel_name}' not found, nothing to remove")
return
logger.info(f"[ChannelManager] Removing channel '{channel_name}'...")
self.stop(channel_name)
logger.info(f"[ChannelManager] Channel '{channel_name}' removed successfully")
def _clear_singleton_cache(channel_name: str):
"""
Clear the singleton cache for the channel class so that
a new instance can be created with updated config.
"""
cls_map = {
"web": "channel.web.web_channel.WebChannel",
"wechatmp": "channel.wechatmp.wechatmp_channel.WechatMPChannel",
"wechatmp_service": "channel.wechatmp.wechatmp_channel.WechatMPChannel",
"wechatcom_app": "channel.wechatcom.wechatcomapp_channel.WechatComAppChannel",
const.FEISHU: "channel.feishu.feishu_channel.FeiShuChanel",
const.DINGTALK: "channel.dingtalk.dingtalk_channel.DingTalkChanel",
const.WECOM_BOT: "channel.wecom_bot.wecom_bot_channel.WecomBotChannel",
const.QQ: "channel.qq.qq_channel.QQChannel",
const.WEIXIN: "channel.weixin.weixin_channel.WeixinChannel",
"wx": "channel.weixin.weixin_channel.WeixinChannel",
}
module_path = cls_map.get(channel_name)
if not module_path:
return
try:
parts = module_path.rsplit(".", 1)
module_name, class_name = parts[0], parts[1]
import importlib
module = importlib.import_module(module_name)
wrapper = getattr(module, class_name, None)
if wrapper and hasattr(wrapper, '__closure__') and wrapper.__closure__:
for cell in wrapper.__closure__:
try:
cell_contents = cell.cell_contents
if isinstance(cell_contents, dict):
cell_contents.clear()
logger.debug(f"[ChannelManager] Cleared singleton cache for {class_name}")
break
except ValueError:
pass
except Exception as e:
logger.warning(f"[ChannelManager] Failed to clear singleton cache: {e}")
def sigterm_handler_wrap(_signo):
old_handler = signal.getsignal(_signo)
def func(_signo, _stack_frame):
logger.info("signal {} received, exiting...".format(_signo))
conf().save_user_datas()
if callable(old_handler): # check old_handler
return old_handler(_signo, _stack_frame)
sys.exit(0)
signal.signal(_signo, func)
def run():
global _channel_mgr
try:
# load config
load_config()
# ctrl + c
sigterm_handler_wrap(signal.SIGINT)
# kill signal
sigterm_handler_wrap(signal.SIGTERM)
# Parse channel_type into a list
raw_channel = conf().get("channel_type", "web")
if "--cmd" in sys.argv:
channel_names = ["terminal"]
else:
channel_names = _parse_channel_type(raw_channel)
if not channel_names:
channel_names = ["web"]
# Auto-start web console unless explicitly disabled
web_console_enabled = conf().get("web_console", True)
if web_console_enabled and "web" not in channel_names:
channel_names.append("web")
logger.info(f"[App] Starting channels: {channel_names}")
_channel_mgr = ChannelManager()
_channel_mgr.start(channel_names, first_start=True)
while True:
time.sleep(1)
except Exception as e:
logger.error("App startup failed!")
logger.exception(e)
if __name__ == "__main__":
run()