403Webshell
Server IP : 66.29.132.122  /  Your IP : 3.137.219.237
Web Server : LiteSpeed
System : Linux business142.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : admazpex ( 531)
PHP Version : 7.2.34
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/model/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/model/messages_to_send.py
from collections import namedtuple

from peewee import FloatField, BlobField

from defence360agent.model import instance, Model


class MessageToSend(Model):
    """
    Storage for messages to be sent to server
    while connection to server is not available
    """

    class Meta:
        database = instance.db
        db_table = "messages_to_send_nr"

    #: When the message was added to the queue to be sent to the server.
    timestamp = FloatField(null=False)
    #: The message itself.
    message = BlobField(null=False)
    MessageToSendT = namedtuple("MessageToSendT", "timestamp message")

    @classmethod
    def get_oldest(cls, limit=1):
        old = cls.select().order_by(cls.timestamp).limit(limit)
        return old

    @classmethod
    def delete_in(cls, query):
        q = cls.delete().where(cls.id.in_(query))
        return q.execute()

    @classmethod
    def delete_old(cls, limit=1):
        old = cls.select().order_by(cls.timestamp).limit(limit)
        q = cls.delete().where(cls.id.in_(old))
        return q.execute()

    @classmethod
    def insert_many(cls, rows, **kwargs) -> None:
        # sqlite may have internal limit of variables-per-query
        for i in range(0, len(rows), 100):
            data = [
                cls.MessageToSendT(*row)._asdict() for row in rows[i : i + 100]
            ]
            super().insert_many(data, **kwargs).execute()

Youez - 2016 - github.com/yon3zu
LinuXploit