forked from LeeYouRan/Discuss-Bot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
391 lines (338 loc) · 16.7 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
# -*- coding: utf-8 -*-
import os
import time
import random
import logging
import platform
import requests
import html
import io
from datetime import datetime
from configparser import ConfigParser
from tabulate import tabulate
from playwright.sync_api import sync_playwright, TimeoutError
from config import reply_generator
# I stumbled upon this site thinking it might be a promising open-source Linux community. After exploring a bit, it seems like it's still in its early stages and doesn't quite live up to the 'community' label yet. There’s no shortage of overconfident individuals here, but it feels more like an amateurish forum rather than a serious place for Linux enthusiasts.
# 创建一个 StringIO 对象用于捕获日志
log_stream = io.StringIO()
# 创建日志记录器
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# 创建控制台输出的处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# 创建 log_stream 处理器
stream_handler = logging.StreamHandler(log_stream)
stream_handler.setLevel(logging.INFO)
# 创建格式化器
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
# 为处理器设置格式化器
console_handler.setFormatter(formatter)
stream_handler.setFormatter(formatter)
# 将处理器添加到日志记录器中
logger.addHandler(console_handler)
logger.addHandler(stream_handler)
# 自动判断运行环境
IS_GITHUB_ACTIONS = 'GITHUB_ACTIONS' in os.environ
IS_SERVER = platform.system() == "Linux" and not IS_GITHUB_ACTIONS
# 从配置文件或环境变量中读取配置信息
def load_config():
config = ConfigParser()
if IS_SERVER:
config_file = './config/config.ini'
elif IS_GITHUB_ACTIONS:
config_file = None
else:
config_file = 'config/config.ini'
if config_file and os.path.exists(config_file):
config.read(config_file)
return config
config = load_config()
USERNAME = os.getenv("LINUXDO_USERNAME", config.get('credentials', 'username', fallback=None))
PASSWORD = os.getenv("LINUXDO_PASSWORD", config.get('credentials', 'password', fallback=None))
LIKE_PROBABILITY = float(os.getenv("LIKE_PROBABILITY", config.get('settings', 'like_probability', fallback='0.02')))
REPLY_PROBABILITY = float(os.getenv("REPLY_PROBABILITY", config.get('settings', 'reply_probability', fallback='0')))
COLLECT_PROBABILITY = float(os.getenv("COLLECT_PROBABILITY", config.get('settings', 'collect_probability', fallback='0.02')))
HOME_URL = config.get('urls', 'home_url', fallback="https://linux.do/")
CONNECT_URL = config.get('urls', 'connect_url', fallback="https://connect.linux.do/")
USE_WXPUSHER = os.getenv("USE_WXPUSHER", config.get('wxpusher', 'use_wxpusher', fallback='false')).lower() == 'true'
APP_TOKEN = os.getenv("APP_TOKEN", config.get('wxpusher', 'app_token', fallback=None))
TOPIC_ID = os.getenv("TOPIC_ID", config.get('wxpusher', 'topic_id', fallback=None))
MAX_TOPICS = int(os.getenv("MAX_TOPICS", config.get('settings', 'max_topics', fallback='10')))
# 检查必要配置
missing_configs = []
if not USERNAME:
missing_configs.append("USERNAME")
if not PASSWORD:
missing_configs.append("PASSWORD")
if USE_WXPUSHER and not APP_TOKEN:
missing_configs.append("APP_TOKEN")
if USE_WXPUSHER and not TOPIC_ID:
missing_configs.append("TOPIC_ID")
if missing_configs:
logging.error(f"缺少必要配置: {', '.join(missing_configs)},请在环境变量或配置文件中设置。")
exit(1)
class NotificationManager:
def __init__(self, use_wxpusher, app_token, topic_id):
self.use_wxpusher = use_wxpusher
self.app_token = app_token
self.topic_id = topic_id
def send_message(self, content, summary):
if self.use_wxpusher:
try:
data = {
"appToken": self.app_token,
"content": content,
"summary": summary,
"contentType": 2,
"topicIds": [self.topic_id],
"verifyPayType": 0
}
# 使用单独的请求日志记录器来避免混淆
request_logger = logging.getLogger("request_logger")
request_logger.info("发送 wxpusher 消息...")
response = requests.post("https://wxpusher.zjiecode.com/api/send/message", json=data)
if response.status_code == 200:
request_logger.info("wxpusher 消息发送成功")
else:
request_logger.error(f"wxpusher 消息发送失败: {response.status_code}, {response.text}")
except Exception as e:
request_logger.error(f"发送 wxpusher 消息时出错: {e}")
class LinuxDoBrowser:
def __init__(self) -> None:
logging.info("启动 Playwright...")
self.pw = sync_playwright().start()
logging.info("以无头模式启动 Firefox...")
self.browser = self.pw.firefox.launch(headless=True)
self.context = self.browser.new_context()
self.page = self.context.new_page()
logging.info(f"导航到 {HOME_URL}...")
self.page.goto(HOME_URL)
logging.info("初始化完成。")
def load_messages(self, filename):
"""从指定的文件加载消息并返回消息列表。"""
script_dir = os.path.dirname(os.path.abspath(__file__))
file_path = os.path.join(script_dir, filename)
with open(file_path, 'r', encoding='utf-8') as file:
messages = file.readlines()
return [message.strip() for message in messages if message.strip()]
def get_random_message(self, messages):
"""从列表中选择一个随机消息。"""
return random.choice(messages)
def login(self) -> bool:
try:
logging.info("尝试登录...")
self.page.click(".login-button .d-button-label")
time.sleep(2)
self.page.fill("#login-account-name", USERNAME)
time.sleep(2)
self.page.fill("#login-account-password", PASSWORD)
time.sleep(2)
self.page.click("#login-button")
time.sleep(10) # 等待页面加载完成
user_ele = self.page.query_selector("#current-user")
if not user_ele:
logging.error("登录失败")
return False
else:
logging.info("登录成功")
return True
except TimeoutError:
logging.error("登录失败:页面加载超时或元素未找到")
return False
def click_topic(self):
try:
logging.info("开始处理主题...")
topics = self.page.query_selector_all("#list-area .title")
total_topics = len(topics)
logging.info(f"共找到 {total_topics} 个主题。")
# 限制处理的最大主题数
if total_topics > MAX_TOPICS:
logging.info(f"处理主题数超过最大限制 {MAX_TOPICS},仅处理前 {MAX_TOPICS} 个主题。")
topics = topics[:MAX_TOPICS]
browsed_articles = []
liked_articles = []
like_count = 0
replied_articles = []
reply_count = 0
collected_articles = []
collect_count = 0
for idx, topic in enumerate(topics):
article_title = topic.text_content().strip()
logging.info(f"打开第 {idx + 1}/{len(topics)} 个主题 :{article_title} ... ")
page = self.context.new_page()
article_url = HOME_URL + topic.get_attribute("href")
browsed_articles.append({"title": article_title, "url": article_url})
try:
page.goto(article_url)
time.sleep(3) # 等待页面完全加载
if random.random() < LIKE_PROBABILITY:
self.click_like(page)
liked_articles.append({"title": article_title, "url": article_url})
like_count += 1
if random.random() < REPLY_PROBABILITY:
reply_message = self.click_reply(page)
if reply_message:
replied_articles.append(
{"title": article_title, "url": article_url, "reply": reply_message})
reply_count += 1
if random.random() < COLLECT_PROBABILITY:
self.click_collect(page)
collected_articles.append({"title": article_title, "url": article_url})
collect_count += 1
except TimeoutError:
logging.warning(f"打开主题 : {article_title} 超时,跳过该主题。")
finally:
time.sleep(3) # 等待一段时间,防止操作过快导致出错
page.close()
logging.info(f"已关闭第 {idx + 1}/{len(topics)} 个主题 : {article_title} ...")
# 打印浏览的文章信息
logging.info("--------------浏览的文章信息-----------------")
logging.info("\n%s",tabulate(browsed_articles, headers="keys", tablefmt="pretty"))
# 打印点赞的文章信息
logging.info(f"一共点赞了 {like_count} 篇文章。")
if like_count > 0:
logging.info("--------------点赞的文章信息-----------------")
logging.info("\n%s",tabulate(liked_articles, headers="keys", tablefmt="pretty"))
# 打印回复的文章信息
logging.info(f"一共回复了 {reply_count} 篇文章。")
if reply_count > 0:
logging.info("--------------回复的文章信息-----------------")
logging.info("\n%s",tabulate(replied_articles, headers="keys", tablefmt="pretty"))
# 打印加入书签的文章信息
logging.info(f"一共加入书签了 {collect_count} 篇文章。")
if collect_count > 0:
logging.info("--------------加入书签的文章信息-----------------")
logging.info("\n%s", tabulate(collected_articles, headers="keys", tablefmt="pretty"))
except Exception as e:
logging.error(f"处理主题时出错: {e}")
def run(self):
start_time = datetime.now()
logging.info(f"开始执行时间: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
try:
logging.info("开始运行自动化流程...")
if not self.login():
return
self.click_topic()
self.print_connect_info()
except Exception as e:
logging.error(f"运行过程中出错: {e}")
finally:
end_time = datetime.now()
logging.info(f"结束执行时间: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
self.context.close()
self.browser.close()
self.pw.stop()
if USE_WXPUSHER:
elapsed_time = end_time - start_time
summary = f"Linux.do保活脚本 {end_time.strftime('%Y-%m-%d %H:%M:%S')}"
# 获取并转义日志内容
log_content = log_stream.getvalue()
escaped_log_content = html.escape(log_content)
html_log_content = f"<pre>{escaped_log_content}</pre>"
# 创建 HTML 格式的内容
content = (
f"<h1>Linux.do保活脚本 {end_time.strftime('%Y-%m-%d %H:%M:%S')}</h1>"
f"<br/><p style='color:red;'>"
f"开始执行时间: {start_time.strftime('%Y-%m-%d %H:%M:%S')}<br/>"
f"结束执行时间: {end_time.strftime('%Y-%m-%d %H:%M:%S')}<br/>"
f"总耗时: {elapsed_time}<br/>"
f"</p>"
f"<h2>日志内容</h2>"
f"{html_log_content}"
)
wx_pusher = NotificationManager(USE_WXPUSHER, APP_TOKEN, TOPIC_ID)
wx_pusher.send_message(content, summary)
def print_connect_info(self):
try:
logging.info(f"导航到 {CONNECT_URL}...")
self.page.goto(CONNECT_URL)
time.sleep(2)
logging.info(f"当前页面URL: {self.page.url}")
time.sleep(2)
rows = self.page.query_selector_all("table tr")
info = []
for row in rows:
cells = row.query_selector_all("td")
if len(cells) >= 3:
project = cells[0].text_content().strip()
current = cells[1].text_content().strip()
requirement = cells[2].text_content().strip()
info.append([project, current, requirement])
logging.info("--------------Connect Info-----------------")
logging.info("\n%s", tabulate(info, headers=["项目", "当前", "要求"], tablefmt="pretty"))
self.page.close()
except TimeoutError:
logging.error("连接信息页面加载超时")
except Exception as e:
logging.error(f"打印连接信息时出错: {e}")
def click_like(self, page):
try:
page.wait_for_selector(".discourse-reactions-reaction-button button", timeout=2000)
like_button = page.locator(".discourse-reactions-reaction-button").first
if like_button:
like_button.click()
logging.info("文章已点赞")
else:
logging.info("未找到点赞按钮")
except TimeoutError:
logging.warning("点赞按钮定位超时")
except Exception as e:
logging.error(f"点赞操作失败: {e}")
def click_reply(self, page):
try:
# 加载消息
random_message = reply_generator.get_random_reply()
# 选择一条随机消息
page.wait_for_selector(".reply.create.btn-icon-text", timeout=2000)
reply_button = page.locator(".reply.create.btn-icon-text").first
if reply_button:
reply_button.click()
logging.info("回复按钮已点击")
# 等待文本区域可见
page.wait_for_selector(".d-editor-input", timeout=2000)
text_area = page.locator(".d-editor-input").first
if text_area:
# 在文本区域中键入随机消息
text_area.fill(random_message)
logging.info(f"回复内容: {random_message}")
# 点击提交按钮
page.wait_for_selector(".save-or-cancel .btn-primary.create", timeout=2000)
submit_button = page.locator(".save-or-cancel .btn-primary.create").first
if submit_button:
time.sleep(2)
submit_button.click()
logging.info("回复已提交")
return random_message # 返回实际的回复内容
else:
logging.warning("未找到提交按钮")
else:
logging.warning("未找到回复文本框")
else:
logging.info("未找到回复按钮")
return None # 如果回复失败,返回 None
except TimeoutError:
logging.warning("元素定位超时")
return None
except Exception as e:
logging.error(f"回复操作失败: {e}")
return None
def click_collect(self, page):
try:
# 等待并点击书签按钮
page.wait_for_selector(".btn.bookmark-menu-trigger", timeout=2000) # 增加等待时间
bookmark_button = page.locator(".btn.bookmark-menu-trigger").first
if bookmark_button:
# 等待几秒钟以确保加入书签操作已完成
time.sleep(2)
bookmark_button.click()
logging.info("帖子已加入书签")
else:
logging.warning("未找到书签按钮")
except TimeoutError:
logging.warning("书签按钮定位超时")
except Exception as e:
logging.error(f"加入书签操作失败: {e}")
if __name__ == "__main__":
ldb = LinuxDoBrowser()
ldb.run()