-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathgithub_cve_monitor.py
More file actions
2989 lines (2616 loc) · 123 KB
/
github_cve_monitor.py
File metadata and controls
2989 lines (2616 loc) · 123 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/python3
# -*- coding:utf-8 -*-
# @Author : anonymous520
__version__ = "V1.1.1"
# 每3分钟检测一次github
# 配置优先级: 环境变量 > 配置文件
import json
from collections import OrderedDict
import requests, time, re, os
import dingtalkchatbot.chatbot as cb
import datetime
import hashlib
import yaml
from lxml import etree
import sqlite3
import pytz
import logging
import logging.handlers
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('github_monitor.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger('github_monitor')
logger.info('程序启动')
# 配置requests会话,添加重试机制和SSL优化
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from urllib3.exceptions import InsecureRequestWarning
# 禁用不安全请求警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
http_session = requests.Session()
# 配置重试策略,包括SSL错误重试
retry_strategy = Retry(
total=3, # 总重试次数
status_forcelist=[429, 500, 502, 503, 504], # 触发重试的HTTP状态码
allowed_methods=["HEAD", "GET", "OPTIONS"], # 允许重试的HTTP方法
backoff_factor=1, # 重试间隔因子
raise_on_status=False
)
# 应用重试策略
adapter = HTTPAdapter(max_retries=retry_strategy)
http_session.mount("https://", adapter)
http_session.mount("http://", adapter)
# SSL配置优化
http_session.verify = False # 禁用SSL证书验证(解决证书问题导致的SSL:997错误)
http_session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'application/vnd.github.v3+json'
})
# 全局配置变量
GLOBAL_CONFIG = {
'github_token': '',
'translate': False,
'push_channel': {
'type': '',
'webhook': '',
'secretKey': '',
'token': '',
'group_id': '',
'send_daily_report': 0,
'send_normal_msg': 1
},
'workflow': {
'night_sleep_switch': 'ON',
'daily_report_switch': 'ON',
'push_switch': 'ON'
}
}
# 消息队列配置
MESSAGE_QUEUE_CONFIG = {
'max_queue_size': 1000, # 队列最大容量
'max_per_minute': 20, # 每分钟最多发送20条消息
'batch_size': 5 # 批量发送大小
}
# 消息队列和相关变量
message_queue = [] # 消息队列
last_send_time = 0 # 上次发送时间
message_count = 0 # 分钟内发送消息计数
message_cache = set() # 已发送消息缓存,用于去重
# 消息队列类
class MessageQueue:
"""轻量级消息队列,处理钉钉推送限制"""
def __init__(self):
self.queue = []
self.last_send = 0
self.send_count = 0
self.cache = set()
def add_message(self, message, priority=1):
"""添加消息到队列,带优先级"""
# 生成消息唯一标识
msg_id = hashlib.md5(str(message).encode()).hexdigest()
# 检查是否已发送过
if msg_id in self.cache:
return False
# 添加到队列,按优先级排序
self.queue.append({
'content': message,
'priority': priority,
'timestamp': time.time(),
'id': msg_id
})
# 按优先级降序排序
self.queue.sort(key=lambda x: x['priority'], reverse=True)
# 限制队列大小
if len(self.queue) > MESSAGE_QUEUE_CONFIG['max_queue_size']:
self.queue = self.queue[:MESSAGE_QUEUE_CONFIG['max_queue_size']]
return True
def send_messages(self):
"""发送队列中的消息,处理速率限制"""
# 检查推送开关
if GLOBAL_CONFIG['workflow']['push_switch'] != 'ON':
print("[+] 推送功能已关闭")
# 清空队列,避免消息积压
self.queue = []
return 0
global last_send_time, message_count
current_time = time.time()
sent_count = 0
# 检查是否可以发送新消息(每分钟20条)
if current_time - last_send_time > 60:
# 重置计数和时间
last_send_time = current_time
message_count = 0
# 计算可以发送的消息数量
can_send = min(
MESSAGE_QUEUE_CONFIG['max_per_minute'] - message_count,
MESSAGE_QUEUE_CONFIG['batch_size'],
len(self.queue)
)
if can_send <= 0:
return 0
# 发送消息
for i in range(can_send):
if not self.queue:
break
message = self.queue.pop(0)
try:
# 调用实际发送函数
self._send_message(message['content'])
# 添加到已发送缓存
self.cache.add(message['id'])
sent_count += 1
message_count += 1
# 避免发送过快
time.sleep(0.5)
except Exception as e:
print(f"[-] 发送消息失败: {e}")
# 可以选择重新入队或丢弃
return sent_count
def _send_message(self, message):
"""实际发送消息的函数,根据配置调用不同渠道"""
app_name, _, webhook, secretKey, _ = load_config()
try:
from dingtalkchatbot.chatbot import DingtalkChatbot
if app_name == "dingding":
# 直接调用DingtalkChatbot,避免递归调用
ding = DingtalkChatbot(webhook, secret=secretKey)
ding.send_text(msg=message, is_at_all=False)
update_push_count()
logger.info("[+] 钉钉消息发送成功")
elif app_name == "feishu":
# 直接发送飞书消息,避免递归调用
import requests
headers = {"Content-Type": "application/json"}
data = {
"msg_type": "text",
"content": {
"text": message
}
}
response = requests.post(webhook, json=data, headers=headers, timeout=10)
if response.status_code == 200:
update_push_count()
logger.info("[+] 飞书消息发送成功")
elif app_name == "tgbot":
# 直接发送Telegram消息,避免递归调用
import telegram
bot = telegram.Bot(token=webhook)
bot.send_message(chat_id=secretKey, text=message, timeout=10)
update_push_count()
logger.info("[+] Telegram消息发送成功")
except Exception as e:
logger.error(f"[-] 发送消息失败: {e}")
# 初始化消息队列实例
msg_queue = MessageQueue()
# 初始化全局配置
def init_config():
global GLOBAL_CONFIG
# 读取配置文件
config = {}
try:
with open('config.yaml', 'r', encoding='utf-8') as f:
config = yaml.load(f, Loader=yaml.FullLoader)
config = config.get('all_config', {})
except Exception as e:
print(f"[警告] 读取配置文件失败: {e}")
# 优先使用环境变量,其次使用配置文件
# GitHub Token 配置
GLOBAL_CONFIG['github_token'] = os.environ.get('GITHUB_TOKEN',
config.get('github_token', ''))
# 翻译配置
translate_enable = os.environ.get('TRANSLATE_ENABLE', '')
if translate_enable:
GLOBAL_CONFIG['translate'] = translate_enable.upper() == 'ON' or translate_enable == '1'
else:
try:
translate_config = config.get('translate', [{'enable': 'OFF'}])[0]['enable']
GLOBAL_CONFIG['translate'] = translate_config.upper() == 'ON' if isinstance(translate_config, str) else bool(int(translate_config))
except:
GLOBAL_CONFIG['translate'] = False
# 推送渠道配置
# 检测哪个推送渠道被启用
push_channel = ''
channel_config = {}
# 优先检测环境变量中的推送渠道
if os.environ.get('DINGDING_WEBHOOK'):
push_channel = 'dingding'
elif os.environ.get('FEISHU_WEBHOOK'):
push_channel = 'feishu'
elif os.environ.get('TG_BOT_TOKEN'):
push_channel = 'tgbot'
elif os.environ.get('DISCARD_WEBHOOK'):
push_channel = 'discard'
else:
# 从配置文件检测
for channel in ['dingding', 'feishu', 'tgbot', 'discard']:
channel_config = config.get(channel, [])
if len(channel_config) > 0:
try:
enable_value = channel_config[0]['enable']
if isinstance(enable_value, str):
if enable_value.upper() == 'ON':
push_channel = channel
break
else:
if int(enable_value) == 1:
push_channel = channel
break
except:
continue
# 确保channel_config被正确设置为当前push_channel的配置
if push_channel and push_channel != '' and (not channel_config or len(channel_config) == 0):
channel_config = config.get(push_channel, [])
GLOBAL_CONFIG['push_channel']['type'] = push_channel
# 根据推送渠道类型加载配置
if push_channel == 'dingding':
GLOBAL_CONFIG['push_channel']['webhook'] = os.environ.get('DINGDING_WEBHOOK',
channel_config[1]['webhook'] if len(channel_config) > 1 else '')
GLOBAL_CONFIG['push_channel']['secretKey'] = os.environ.get('DINGDING_SECRETKEY',
channel_config[2]['secretKey'] if len(channel_config) > 2 else '')
elif push_channel == 'feishu':
GLOBAL_CONFIG['push_channel']['webhook'] = os.environ.get('FEISHU_WEBHOOK',
channel_config[1]['webhook'] if len(channel_config) > 1 else '')
elif push_channel == 'tgbot':
GLOBAL_CONFIG['push_channel']['token'] = os.environ.get('TG_BOT_TOKEN',
channel_config[1]['token'] if len(channel_config) > 1 else '')
GLOBAL_CONFIG['push_channel']['group_id'] = os.environ.get('TG_GROUP_ID',
channel_config[2]['group_id'] if len(channel_config) > 2 else '')
elif push_channel == 'discard':
GLOBAL_CONFIG['push_channel']['webhook'] = os.environ.get('DISCARD_WEBHOOK',
channel_config[1]['webhook'] if len(channel_config) > 1 else '')
# 安全处理环境变量转换,防止'***'等无效值
try:
send_daily_report = os.environ.get('DISCARD_SEND_DAILY_REPORT', '')
if send_daily_report and send_daily_report != '***':
# 只支持 ON/OFF 格式
GLOBAL_CONFIG['push_channel']['send_daily_report'] = 1 if send_daily_report.upper() == 'ON' else 0
else:
# 从配置文件读取,支持 ON/OFF 和数字格式
config_value = channel_config[2]['send_daily_report'] if len(channel_config) > 2 else 0
if isinstance(config_value, str):
GLOBAL_CONFIG['push_channel']['send_daily_report'] = 1 if config_value.upper() == 'ON' else 0
else:
GLOBAL_CONFIG['push_channel']['send_daily_report'] = config_value
except Exception as e:
GLOBAL_CONFIG['push_channel']['send_daily_report'] = 0
try:
send_normal_msg = os.environ.get('DISCARD_SEND_NORMAL_MSG', '')
if send_normal_msg and send_normal_msg != '***':
# 只支持 ON/OFF 格式
GLOBAL_CONFIG['push_channel']['send_normal_msg'] = 1 if send_normal_msg.upper() == 'ON' else 0
else:
# 从配置文件读取,支持 ON/OFF 和数字格式
config_value = channel_config[3]['send_normal_msg'] if len(channel_config) > 3 else 1
if isinstance(config_value, str):
GLOBAL_CONFIG['push_channel']['send_normal_msg'] = 1 if config_value.upper() == 'ON' else 0
else:
GLOBAL_CONFIG['push_channel']['send_normal_msg'] = config_value
except Exception as e:
GLOBAL_CONFIG['push_channel']['send_normal_msg'] = 1
# 处理 DISCARD_SWITCH 环境变量(可选,用于控制推送开关)
try:
discard_switch = os.environ.get('DISCARD_SWITCH', '')
if discard_switch and discard_switch != '***':
# DISCARD_SWITCH 是 ON/OFF 格式,用于控制是否启用推送
GLOBAL_CONFIG['workflow']['push_switch'] = discard_switch
except Exception as e:
# 如果处理失败,保留原来的 push_switch 配置
pass
# 加载 workflow dispatch 输入配置
GLOBAL_CONFIG['workflow']['night_sleep_switch'] = os.environ.get('NIGHT_SLEEP_SWITCH', 'ON')
GLOBAL_CONFIG['workflow']['daily_report_switch'] = os.environ.get('DAILY_REPORT_SWITCH', 'ON')
GLOBAL_CONFIG['workflow']['push_switch'] = os.environ.get('PUSH_SWITCH', 'ON')
# 读取配置文件 - 兼容旧代码
def load_config():
init_config()
channel = GLOBAL_CONFIG['push_channel']
channel_type = channel['type']
if channel_type == 'dingding':
return 'dingding', GLOBAL_CONFIG['github_token'], channel['webhook'], channel['secretKey'], GLOBAL_CONFIG['translate']
elif channel_type == 'feishu':
return 'feishu', GLOBAL_CONFIG['github_token'], channel['webhook'], channel['webhook'], GLOBAL_CONFIG['translate']
elif channel_type == 'tgbot':
return 'tgbot', GLOBAL_CONFIG['github_token'], channel['token'], channel['group_id'], GLOBAL_CONFIG['translate']
elif channel_type == 'discard':
return 'discard', GLOBAL_CONFIG['github_token'], channel['webhook'], channel['webhook'], GLOBAL_CONFIG['translate']
else:
print("[-] 配置文件有误, 未找到启用的推送渠道")
return '', '', '', '', False
# 全局github_headers,使用GLOBAL_CONFIG
github_headers = {
'Authorization': "token {}" .format(GLOBAL_CONFIG['github_token'])
}
# 初始化配置
init_config()
# 更新github_headers
github_headers['Authorization'] = "token {}" .format(GLOBAL_CONFIG['github_token'])
# 黑名单用户缓存
BLACK_USER_CACHE = []
# 加载黑名单用户
def load_black_user():
global BLACK_USER_CACHE
BLACK_USER_CACHE = []
try:
with open('config.yaml', 'r', encoding='utf-8') as f:
config = yaml.load(f, Loader=yaml.FullLoader)
BLACK_USER_CACHE = config.get('all_config', {}).get('black_user', [])
logger.info(f"成功加载 {len(BLACK_USER_CACHE)} 个黑名单用户")
logger.info(f"已启用黑名单配置,共 {len(BLACK_USER_CACHE)} 个用户")
except Exception as e:
logger.warning(f"加载黑名单用户失败: {e}")
BLACK_USER_CACHE = []
# 获取黑名单用户
def black_user():
global BLACK_USER_CACHE
if not BLACK_USER_CACHE:
load_black_user()
return BLACK_USER_CACHE
# 初始化创建数据库
def create_database():
try:
conn = sqlite3.connect('data.db')
cur = conn.cursor()
# 创建CVE监控表
cur.execute('''CREATE TABLE IF NOT EXISTS cve_monitor
(cve_name varchar(255),
pushed_at varchar(255),
cve_url varchar(255));''')
logger.info("成功创建CVE监控表")
# 创建关键字监控表
cur.execute('''CREATE TABLE IF NOT EXISTS keyword_monitor
(keyword_name varchar(255),
pushed_at varchar(255),
keyword_url varchar(255));''')
logger.info("成功创建关键字监控表")
# 创建红队工具监控表
cur.execute('''CREATE TABLE IF NOT EXISTS redteam_tools_monitor
(tools_name varchar(255),
pushed_at varchar(255),
tag_name varchar(255));''')
logger.info("成功创建红队工具监控表")
# 创建大佬仓库监控表
cur.execute('''CREATE TABLE IF NOT EXISTS user_monitor
(repo_name varchar(255));''')
logger.info("成功创建大佬仓库监控表")
# 创建推送计数表
cur.execute('''CREATE TABLE IF NOT EXISTS push_count
(date TEXT PRIMARY KEY,
count INTEGER);''')
logger.info("成功创建推送计数表")
# 创建消息缓存表
cur.execute('''CREATE TABLE IF NOT EXISTS message_cache
(id INTEGER PRIMARY KEY AUTOINCREMENT,
message_type varchar(50),
message_content TEXT,
message_data TEXT,
create_time datetime,
status varchar(20));''')
logger.info("成功创建消息缓存表")
conn.commit()
conn.close()
# 发送连接成功消息
app_name, _, webhook, secretKey, _ = load_config()
if app_name == "dingding":
dingding("spaceX", "连接成功~", webhook, secretKey)
elif app_name == "tgbot":
tgbot("spaceX", "连接成功~", webhook, secretKey)
except Exception as e:
logger.error(f"创建监控表失败!报错:{e}")
if 'conn' in locals():
conn.close()
#根据排序获取本年前20条CVE
def getNews():
today_cve_info_tmp = []
try:
# 抓取本年的
year = datetime.datetime.now().year
api = "https://api.github.com/search/repositories?q=CVE-{}&sort=updated" .format(year)
json_str = http_session.get(api, headers=github_headers, timeout=10).json()
today_date = datetime.date.today()
n = len(json_str.get('items', []))
if n > 20:
n = 20
for i in range(0, n):
try:
cve_url = json_str['items'][i]['html_url']
if cve_url.split("/")[-2] not in black_user():
try:
cve_name_tmp = json_str['items'][i]['name'].upper()
cve_name = re.findall(r'(CVE\-\d+\-\d+)', cve_name_tmp)[0].upper()
# 同时检查创建时间和更新时间
created_at = re.findall(r'\d{4}-\d{2}-\d{2}', json_str['items'][i]['created_at'])[0]
pushed_at = re.findall(r'\d{4}-\d{2}-\d{2}', json_str['items'][i]['pushed_at'])[0]
# 如果创建时间或更新时间是今天,都视为今天的CVE
if created_at == str(today_date) or pushed_at == str(today_date):
today_cve_info_tmp.append({"cve_name": cve_name, "cve_url": cve_url, "pushed_at": pushed_at})
logger.info("[+] 该{}的创建时间为{},更新时间为{},属于今天的CVE" .format(cve_name, created_at, pushed_at))
else:
logger.info("[-] 该{}的创建时间为{},更新时间为{}, 不属于今天的CVE" .format(cve_name, created_at, pushed_at))
except Exception as e:
logger.debug(f"处理CVE项目失败: {e}")
except Exception as e:
logger.debug(f"遍历CVE项目列表失败: {e}")
today_cve_info = OrderedDict()
for item in today_cve_info_tmp:
today_cve_info.setdefault(item['cve_name'], {**item, })
today_cve_info = list(today_cve_info.values())
logger.info(f"成功获取 {len(today_cve_info)} 条今日CVE信息")
return today_cve_info
# return cve_total_count, cve_description, cve_url, cve_name
#\d{4}-\d{2}-\d{2}
except Exception as e:
logger.error(f"getNews 函数 error: {e}")
return []
def getKeywordNews(keyword):
today_keyword_info_tmp = []
try:
# 特殊关键词处理
special_keywords = ['poc', 'exp', 'cve']
is_special = keyword.lower() in special_keywords or 'cve-' in keyword.lower()
if is_special:
# 使用专门的特殊关键词搜索函数
today_keyword_info_tmp = get_special_keyword_news(keyword)
else:
# 获取搜索配置
search_config = get_github_search_config()
# 普通关键词搜索,按配置排序
api = "https://api.github.com/search/repositories?q={}&sort={}&order={}&per_page={}" .format(
keyword, search_config['sort'], search_config['order'], search_config['per_page'])
json_str = http_session.get(api, headers=github_headers, timeout=10).json()
today_date = datetime.date.today()
for repo in json_str.get('items', []):
try:
if repo['html_url'].split("/")[-2] not in black_user():
pushed_at = re.findall(r'\d{4}-\d{2}-\d{2}', repo['pushed_at'])[0]
if pushed_at == str(today_date):
# 使用关键词检测函数判断相关性
if is_keyword_relevant(repo, keyword):
today_keyword_info_tmp.append({
"keyword_name": repo['name'],
"keyword_url": repo['html_url'],
"pushed_at": pushed_at,
"description": repo.get('description', '作者未写描述'),
"stargazers_count": repo.get('stargazers_count', 0)
})
logger.info("keyword: {} ,{} ({} stars)" .format(keyword, repo['name'], repo.get('stargazers_count', 0)))
except Exception as e:
logger.error(f"处理项目 {repo.get('name', '未知')} 时出错: {e}")
continue
# 去重处理
today_keyword_info = OrderedDict()
for item in today_keyword_info_tmp:
today_keyword_info.setdefault(item['keyword_name'], {**item, })
today_keyword_info = list(today_keyword_info.values())
logger.info(f"成功获取 {len(today_keyword_info)} 条关键词 '{keyword}' ")
return today_keyword_info
except Exception as e:
logger.error(f"getKeywordNews 函数 error: {e}")
return today_keyword_info_tmp
#获取到的关键字仓库信息插入到数据库
def keyword_insert_into_sqlite3(data):
conn = sqlite3.connect('data.db')
print("keyword_insert_into_sqlite3 函数 打开数据库成功!")
print(data)
cur = conn.cursor()
for i in range(len(data)):
try:
keyword_name = data[i]['keyword_name']
cur.execute("INSERT INTO keyword_monitor (keyword_name,pushed_at,keyword_url) VALUES ('{}', '{}','{}')".format(keyword_name, data[i]['pushed_at'], data[i]['keyword_url']))
print("keyword_insert_into_sqlite3 函数: {}插入数据成功!".format(keyword_name))
except Exception as e:
print("keyword_insert_into_sqlite3 error {}".format(e))
pass
conn.commit()
conn.close()
#查询数据库里是否存在该关键字仓库的方法
def query_keyword_info_database(keyword_name):
conn = sqlite3.connect('data.db')
cur = conn.cursor()
sql_grammar = "SELECT keyword_name FROM keyword_monitor WHERE keyword_name = '{}';".format(keyword_name)
cursor = cur.execute(sql_grammar)
return len(list(cursor))
#获取不存在数据库里的关键字信息
def get_today_keyword_info(today_keyword_info_data):
today_all_keyword_info = []
for i in range(len(today_keyword_info_data)):
try:
today_keyword_name = today_keyword_info_data[i]['keyword_name']
today_cve_name = re.findall(r'(CVE\-\d+\-\d+)', today_keyword_info_data[i]['keyword_name'].upper())
# 如果仓库名字带有 cve-xxx-xxx, 先查询看看 cve 监控中是否存在, 防止重复推送
if len(today_cve_name) > 0 and query_cve_info_database(today_cve_name.upper()) == 1:
pass
Verify = query_keyword_info_database(today_keyword_name)
if Verify == 0:
print("[+] 数据库里不存在{}".format(today_keyword_name))
today_all_keyword_info.append(today_keyword_info_data[i])
else:
print("[-] 数据库里存在{}".format(today_keyword_name))
except Exception as e:
pass
return today_all_keyword_info
#获取到的CVE信息插入到数据库
def cve_insert_into_sqlite3(data):
conn = sqlite3.connect('data.db')
print("cve_insert_into_sqlite3 函数 打开数据库成功!")
cur = conn.cursor()
for i in range(len(data)):
try:
cve_name = re.findall(r'(CVE\-\d+\-\d+)', data[i]['cve_name'])[0].upper()
cur.execute("INSERT INTO cve_monitor (cve_name,pushed_at,cve_url) VALUES ('{}', '{}', '{}')".format(cve_name, data[i]['pushed_at'], data[i]['cve_url']))
print("cve_insert_into_sqlite3 函数: {}插入数据成功!".format(cve_name))
except Exception as e:
pass
conn.commit()
conn.close()
#查询数据库里是否存在该CVE的方法
def query_cve_info_database(cve_name):
conn = sqlite3.connect('data.db')
cur = conn.cursor()
sql_grammar = "SELECT cve_name FROM cve_monitor WHERE cve_name = '{}';".format(cve_name)
cursor = cur.execute(sql_grammar)
return len(list(cursor))
#查询数据库里是否存在该tools工具名字的方法
def query_tools_info_database(tools_name):
conn = sqlite3.connect('data.db')
cur = conn.cursor()
sql_grammar = "SELECT tools_name FROM redteam_tools_monitor WHERE tools_name = '{}';".format(tools_name)
cursor = cur.execute(sql_grammar)
return len(list(cursor))
#获取不存在数据库里的CVE信息
def get_today_cve_info(today_cve_info_data):
today_all_cve_info = []
# today_cve_info_data = getNews()
for i in range(len(today_cve_info_data)):
try:
today_cve_item = today_cve_info_data[i]
today_cve_name = re.findall(r'(CVE\-\d+\-\d+)', today_cve_item['cve_name'])[0].upper()
# 检查CVE是否存在于数据库中,避免重复推送
Verify = query_cve_info_database(today_cve_name.upper())
if Verify == 0:
# 检查CVE是否在mitre.org上存在
cve_exists = exist_cve(today_cve_name)
# 如果CVE存在于mitre.org或创建/更新时间是今天,都推送
if cve_exists == 1 or today_cve_item['pushed_at'] == str(datetime.date.today()):
print("[+] 数据库里不存在{},mitre.org状态: {},属于今天的CVE" .format(today_cve_name.upper(), "存在" if cve_exists == 1 else "不存在"))
today_all_cve_info.append(today_cve_item)
else:
print("[-] 数据库里不存在{},但mitre.org上也不存在,且不是今天更新,跳过" .format(today_cve_name.upper()))
else:
print("[-] 数据库里存在{}".format(today_cve_name.upper()))
except Exception as e:
print(f"[-] 处理CVE {today_cve_item.get('cve_name', '未知')} 时出错: {e}")
pass
logger.info(f"get_today_cve_info 返回 {len(today_all_cve_info)} 条CVE信息")
return today_all_cve_info
#获取红队工具信息插入到数据库
def tools_insert_into_sqlite3(data):
conn = sqlite3.connect('data.db')
print("tools_insert_into_sqlite3 函数 打开数据库成功!")
cur = conn.cursor()
for i in range(len(data)):
Verify = query_tools_info_database(data[i]['tools_name'])
if Verify == 0:
print("[+] 红队工具表数据库里不存在{}".format(data[i]['tools_name']))
cur.execute("INSERT INTO redteam_tools_monitor (tools_name,pushed_at,tag_name) VALUES ('{}', '{}','{}')".format(data[i]['tools_name'], data[i]['pushed_at'], data[i]['tag_name']))
print("tools_insert_into_sqlite3 函数: {}插入数据成功!".format(format(data[i]['tools_name'])))
else:
print("[-] 红队工具表数据库里存在{}".format(data[i]['tools_name']))
conn.commit()
conn.close()
# 工具列表缓存
TOOLS_LIST_CACHE = {
'tools_list': [],
'keyword_list': [],
'user_list': [],
'last_load_time': 0
}
# 读取本地红队工具链接文件转换成list
def load_tools_list():
global TOOLS_LIST_CACHE
current_time = time.time()
# 缓存有效期300秒(5分钟)
if current_time - TOOLS_LIST_CACHE['last_load_time'] < 300 and TOOLS_LIST_CACHE['tools_list']:
return TOOLS_LIST_CACHE['tools_list'], TOOLS_LIST_CACHE['keyword_list'], TOOLS_LIST_CACHE['user_list']
try:
with open('tools_list.yaml', 'r', encoding='utf-8') as f:
list_data = yaml.load(f,Loader=yaml.FullLoader)
tools_list = list_data.get('tools_list', [])
keyword_list = list_data.get('keyword_list', [])
user_list = list_data.get('user_list', [])
# 从环境变量中读取keywords,如果存在则直接使用环境变量中的关键词,不合并
env_keywords = os.environ.get('keywords', '')
if env_keywords:
keyword_list = [kw.strip() for kw in env_keywords.split(' ') if kw.strip()]
# 去重
keyword_list = list(set(keyword_list))
# 更新缓存
TOOLS_LIST_CACHE = {
'tools_list': tools_list,
'keyword_list': keyword_list,
'user_list': user_list,
'last_load_time': current_time
}
print(f"[+] 成功加载工具列表:{len(tools_list)}个工具,{len(keyword_list)}个关键字,{len(user_list)}个用户")
return tools_list, keyword_list, user_list
except Exception as e:
print(f"[警告] 加载工具列表失败: {e}")
# 返回缓存数据或空列表
return TOOLS_LIST_CACHE['tools_list'], TOOLS_LIST_CACHE['keyword_list'], TOOLS_LIST_CACHE['user_list']
#获取红队工具的名称,更新时间,版本名称信息
def get_pushed_at_time(tools_list):
tools_info_list = []
for url in tools_list:
try:
# 将 GitHub 仓库 URL 转换为 API URL
import re
github_url_match = re.match(r'https://github.com/([^/]+)/([^/]+)', url)
if github_url_match:
owner, repo = github_url_match.groups()
api_url = f"https://api.github.com/repos/{owner}/{repo}"
elif url.startswith("https://api.github.com/repos/"):
api_url = url
else:
print(f"[警告] 无效的 GitHub URL: {url}")
continue
tools_json = http_session.get(api_url, headers=github_headers, timeout=10).json()
# 检查关键字段是否存在
if 'pushed_at' in tools_json and 'name' in tools_json:
pushed_at_tmp = tools_json['pushed_at']
pushed_at = re.findall(r'\d{4}-\d{2}-\d{2}', pushed_at_tmp)[0] if pushed_at_tmp else datetime.date.today().strftime('%Y-%m-%d')
tools_name = tools_json['name']
html_url = tools_json.get('html_url', '')
try:
releases_url = f"{api_url}/releases"
releases_json = http_session.get(releases_url, headers=github_headers, timeout=10).json()
tag_name = releases_json[0]['tag_name'] if releases_json and len(releases_json) > 0 else "no releases"
except Exception as e:
tag_name = "no releases"
tools_info_list.append({"tools_name":tools_name,"pushed_at":pushed_at,"api_url":html_url,"tag_name":tag_name})
else:
print(f"[警告] API返回数据缺少关键字段: {api_url}")
print(f"[调试] API返回: {str(tools_json)[:100]}...")
except Exception as e:
print(f"get_pushed_at_time 处理 {url} 时出错: {e}")
pass
return tools_info_list
#根据红队名名称查询数据库红队工具的更新时间以及版本名称并返回
def tools_query_sqlite3(tools_name):
result_list = []
conn = sqlite3.connect('data.db')
cur = conn.cursor()
sql_grammar = "SELECT pushed_at,tag_name FROM redteam_tools_monitor WHERE tools_name = '{}';".format(tools_name)
cursor = cur.execute(sql_grammar)
for result in cursor:
result_list.append({"pushed_at":result[0],"tag_name":result[1]})
conn.close()
print("[###########] tools_query_sqlite3 函数内 result_list 的值 为 - > {}".format(result_list))
return result_list
#获取更新了的红队工具在数据库里面的时间和版本
def get_tools_update_list(data):
tools_update_list = []
for dist in data:
print("dist 变量 ->{}".format(dist))
query_result = tools_query_sqlite3(dist['tools_name'])
if len(query_result) > 0:
today_tools_pushed_at = query_result[0]['pushed_at']
# print("[!!] 今日获取时间: ", dist['pushed_at'], "获取数据库时间: ", today_tools_pushed_at, dist['tools_name'])
if dist['pushed_at'] != today_tools_pushed_at:
print("今日获取时间: ",dist['pushed_at'],"获取数据库时间: ",today_tools_pushed_at,dist['tools_name'],"update!!!!")
#返回数据库里面的时间和版本
tools_update_list.append({"api_url":dist['api_url'],"pushed_at":today_tools_pushed_at,"tag_name":query_result[0]['tag_name']})
else:
print("今日获取时间: ",dist['pushed_at'],"获取数据库时间: ",today_tools_pushed_at,dist['tools_name']," no update")
return tools_update_list
# 监控用户是否新增仓库,不是 fork 的
def getUserRepos(user):
try:
api = "https://api.github.com/users/{}/repos".format(user)
json_str = http_session.get(api, headers=github_headers, timeout=10).json()
today_date = datetime.date.today()
for i in range(0, len(json_str)):
created_at = re.findall(r'\d{4}-\d{2}-\d{2}', json_str[i]['created_at'])[0]
if json_str[i]['fork'] == False and created_at == str(today_date):
Verify = user_insert_into_sqlite3(json_str[i]['full_name'])
print(json_str[i]['full_name'], Verify)
if Verify == 0:
name = json_str[i]['name']
try:
description = json_str[i]['description']
except Exception as e:
description = "作者未写描述"
download_url = json_str[i]['html_url']
text = r'大佬' + r'** ' + user + r' ** ' + r'又分享了一款工具! '
body = "工具名称: " + name + " \r\n" + "工具地址: " + download_url + " \r\n" + "工具描述: " + "" + description
if load_config()[0] == "dingding":
dingding(text, body,load_config()[2],load_config()[3])
if load_config()[0] == "tgbot":
tgbot(text,body,load_config()[2],load_config()[3])
if load_config()[0] == "discard":
tools_name = json_str[i]['name']
if discard(text, body, load_config()[2], GLOBAL_CONFIG['push_channel']['send_normal_msg']):
logger.info(f"discard 发送用户仓库 {tools_name} 成功")
except Exception as e:
print(e, "github链接不通")
#获取用户或者组织信息插入到数据库
def user_insert_into_sqlite3(repo_name):
conn = sqlite3.connect('data.db')
cur = conn.cursor()
sql_grammar = "SELECT repo_name FROM user_monitor WHERE repo_name = '{}';".format(repo_name)
Verify = len(list(cur.execute(sql_grammar)))
if Verify == 0:
print("[+] 用户仓库表数据库里不存在{}".format(repo_name))
cur.execute("INSERT INTO user_monitor (repo_name) VALUES ('{}')".format(repo_name))
print("user_insert_into_sqlite3 函数: {}插入数据成功!".format(repo_name))
else:
print("[-] 用户仓库表数据库里存在{}".format(repo_name))
conn.commit()
conn.close()
return Verify
#获取更新信息并发送到对应社交软件
def send_body(url,query_pushed_at,query_tag_name):
# 考虑到有的工具没有 releases, 则通过 commits 记录获取更新描述
# 判断是否有 releases 记录
json_str = http_session.get(url + '/releases', headers=github_headers, timeout=10).json()
new_pushed_at = re.findall(r'\d{4}-\d{2}-\d{2}', http_session.get(url, headers=github_headers, timeout=10).json()['pushed_at'])[0]
if len(json_str) != 0:
tag_name = json_str[0]['tag_name']
if query_pushed_at < new_pushed_at :
print("[*] 数据库里的pushed_at -->", query_pushed_at, ";;;; api的pushed_at -->", new_pushed_at)
if tag_name != query_tag_name:
try:
update_log = json_str[0]['body']
except Exception as e:
update_log = "作者未写更新内容"
download_url = json_str[0]['html_url']
tools_name = url.split('/')[-1]
text = r'** ' + tools_name + r' ** 工具,版本更新啦!'
body = "工具名称:" + tools_name + "\r\n" + "工具地址:" + download_url + "\r\n" + "工具更新日志:" + "\r\n" + update_log
if load_config()[0] == "dingding":
dingding(text, body,load_config()[2],load_config()[3])
if load_config()[0] == "tgbot":
tgbot(text,body,load_config()[2],load_config()[3])
conn = sqlite3.connect('data.db')
cur = conn.cursor()
sql_grammar = "UPDATE redteam_tools_monitor SET tag_name = '{}' WHERE tools_name='{}'" .format(tag_name,tools_name)
sql_grammar1 = "UPDATE redteam_tools_monitor SET pushed_at = '{}' WHERE tools_name='{}'" .format(new_pushed_at, tools_name)
cur.execute(sql_grammar)
cur.execute(sql_grammar1)
conn.commit()
conn.close()
print("[+] tools_name -->", tools_name, "pushed_at 已更新,现在pushed_at 为 -->", new_pushed_at,"tag_name 已更新,现在tag_name为 -->",tag_name)
elif tag_name == query_tag_name:
commits_url = url + "/commits"
commits_url_response_json = http_session.get(commits_url).text
commits_json = json.loads(commits_url_response_json)
tools_name = url.split('/')[-1]
download_url = commits_json[0]['html_url']
try:
update_log = commits_json[0]['commit']['message']
except Exception as e:
update_log = "作者未写更新内容,具体点击更新详情地址的URL进行查看"
text = r'** ' + tools_name + r' ** 工具小更新了一波!'
body = "工具名称:" + tools_name + "\r\n" + "更新详情地址:" + download_url + "\r\n" + "commit更新日志:" + "\r\n" + update_log
if load_config()[0] == "dingding":
dingding(text, body,load_config()[2],load_config()[3])
if load_config()[0] == "feishu":
feishu(text,body,load_config()[2])
if load_config()[0] == "tgbot":
tgbot(text,body,load_config()[2],load_config()[3])
if load_config()[0] == "discard":
if discard(text, body, load_config()[2], GLOBAL_CONFIG['push_channel']['send_normal_msg']):
logger.info(f"discard 发送工具更新 {tools_name} 成功")
conn = sqlite3.connect('data.db')
cur = conn.cursor()
sql_grammar = "UPDATE redteam_tools_monitor SET pushed_at = '{}' WHERE tools_name='{}'" .format(new_pushed_at,tools_name)
cur.execute(sql_grammar)
conn.commit()
conn.close()
print("[+] tools_name -->",tools_name,"pushed_at 已更新,现在pushed_at 为 -->",new_pushed_at)
# return update_log, download_url, tools_version
else:
if query_pushed_at != new_pushed_at:
print("[*] 数据库里的pushed_at -->", query_pushed_at, ";;;; api的pushed_at -->", new_pushed_at)
json_str = http_session.get(url + '/commits', headers=github_headers, timeout=10).json()
update_log = json_str[0]['commit']['message']
download_url = json_str[0]['html_url']
tools_name = url.split('/')[-1]
text = r'** ' + tools_name + r' ** 工具更新啦!'
body = "工具名称:" + tools_name + "\r\n" + "工具地址:" + download_url + "\r\n" + "commit更新日志:" + "\r\n" + update_log
if load_config()[0] == "dingding":
dingding(text, body, load_config()[2], load_config()[3])
if load_config()[0] == "feishu":
feishu(text,body,load_config()[2])
if load_config()[0] == "tgbot":
tgbot(text, body, load_config()[2], load_config()[3])
if load_config()[0] == "discard":
if discard(text, body, load_config()[2], GLOBAL_CONFIG['push_channel']['send_normal_msg']):
logger.info(f"discard 发送工具更新 {tools_name} 成功")
conn = sqlite3.connect('data.db')
cur = conn.cursor()
sql_grammar = "UPDATE redteam_tools_monitor SET pushed_at = '{}' WHERE tools_name='{}'" .format(new_pushed_at,tools_name)
cur.execute(sql_grammar)
conn.commit()
conn.close()
print("[+] tools_name -->", tools_name, "pushed_at 已更新,现在pushed_at 为 -->", new_pushed_at)
# return update_log, download_url
# 创建md5对象
def nmd5(str):
m = hashlib.md5()
b = str.encode(encoding='utf-8')
m.update(b)
str_md5 = m.hexdigest()
return str_md5
# Google翻译
def google_translate(word):
try:
url = f"https://translate.googleapis.com/translate_a/single?client=gtx&sl=auto&tl=zh-CN&dt=t&q={requests.utils.quote(word)}"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36'
}
# 缩短超时时间,避免长时间挂起
res = http_session.get(url=url, headers=headers, timeout=3)
res.raise_for_status()
result_dict = res.json()
result = ""
# 增强JSON解析的健壮性
if isinstance(result_dict, list) and len(result_dict) > 0:
for item in result_dict[0]:
if isinstance(item, list) and len(item) > 0 and item[0]:
result += item[0]
return result.strip() or word
except Exception as e:
print(f"Google翻译失败,使用百度翻译: {e}")
return baidu_translate(word)
# 翻译结果缓存,避免重复翻译相同内容
TRANSLATION_CACHE = {}
# 上次API调用时间,用于控制访问频率
LAST_TRANSLATE_TIME = 0
# API调用最小间隔(毫秒)
MIN_TRANSLATE_INTERVAL = 1000
# 百度翻译实现
def baidu_translate(word):
global LAST_TRANSLATE_TIME
# 检查缓存,避免重复翻译
if word in TRANSLATION_CACHE:
return TRANSLATION_CACHE[word]
try:
import random
import hashlib
import time
import yaml
import os
# 控制访问频率,避免54003错误
current_time = time.time() * 1000 # 转换为毫秒