1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
| import asyncio import telegram import json import traceback
TOKEN = "xxxxxx" chat_id = "-xxx" bot = telegram.Bot(token=TOKEN)
class FilterMsg(object): def __init__(self, text): self.event_value = json.loads(text['EventValue'])
self.data = dict() self.data['kind'] = self.event_value['involvedObject']['kind'] self.data['namespace'] = self.event_value['involvedObject']['namespace'] self.data['reason'] = self.event_value['reason'] self.data['message'] = self.event_value['message'] self.data['first_timestamp'] = self.event_value['firstTimestamp'] self.data['last_timestamp'] = self.event_value['lastTimestamp'] self.data['count'] = self.event_value['count'] self.data['type'] = self.event_value['type'] self.data['event_time'] = self.event_value['eventTime'] self.data['pod_hostname'] = text['EventTags']['hostname'] self.data['pod_name'] = text['EventTags']['pod_name']
def convert(self): msg_markdown = f""" *Kubernetes Cluster Event* `Kind: {self.data['kind']}` `Namescodeace: {self.data['namespace']}` `Reason: {self.data['reason']}` `Timestamp: {self.data['first_timestamp']} to {self.data['last_timestamp']}` `Count: {self.data['count']}` `EventType: {self.data['type']}` `EventTime: {self.data['event_time']}` `PodHostname: {self.data['pod_hostname']}` `PodName: {self.data['pod_name']}` `Message: {self.data['message']}` """ return msg_markdown
async def send_message(text): try: convert_text = json.loads(text.decode('utf8').replace('\\n', '')) msg_instance = FilterMsg(convert_text) msg = msg_instance.convert() send_result = bot.send_message(chat_id=chat_id, text=msg, parse_mode='MarkdownV2') return send_result except KeyError as e: msg = "Unknow message.." send_result = bot.send_message(chat_id=chat_id, text=msg) return send_result except Exception as e: print(e.__str__()) print('send message to telegram failed,please check.')
if __name__ == '__main__': text = b'' text = json.loads(text.decode('utf8').replace('\\n', '')) send_result = asyncio.run(send_message(text)) print(send_result)
from kafka import KafkaConsumer, TopicPartition from telegrambot import send_message import asyncio
class KConsumer(object): """kafka consumer instance""" def __init__(self, topic, group_id, bootstrap_servers, auto_offset_reset, enable_auto_commit=False): """ :param topic: :param group_id: :param bootstrap_servers: """ self.consumer = KafkaConsumer( topic, bootstrap_servers=bootstrap_servers, group_id=group_id, auto_offset_reset=auto_offset_reset, enable_auto_commit=enable_auto_commit, consumer_timeout_ms=10000 ) self.tp = TopicPartition(topic, 0)
def start_consumer(self): while True: try: msg_list_dict = self.consumer.poll(timeout_ms=30000) for tp, msg_list in msg_list_dict.items(): for msg in msg_list: send_result = asyncio.run(send_message(msg.value)) print(send_result) self.consumer.commit() except Exception as e: print('ERROR: get cluster events failed,please check.')
def close_consumer(self): try: self.consumer.unsubscribe() self.consumer.close() except: print("consumer stop failed,please check.")
if __name__ == '__main__': topic = 'yakirtopic' bootstrap_servers = 'yakir-kafka-headless:9092' group_id = 'yakir1.group' auto_offset_reset = 'earliest' enable_auto_commit = False
consumer = KConsumer(topic, group_id=group_id, bootstrap_servers=bootstrap_servers, auto_offset_reset=auto_offset_reset, enable_auto_commit=enable_auto_commit) consumer.start_consumer()
|