403Webshell
Server IP : 66.29.132.122  /  Your IP : 18.117.189.228
Web Server : LiteSpeed
System : Linux business142.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : admazpex ( 531)
PHP Version : 7.2.34
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /proc/self/root/proc/thread-self/root/opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/utils/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /proc/self/root/proc/thread-self/root/opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/utils/buffer.py
class LineBuffer(object):
    """
    Allows to accumulate data, and than iterate over it getting tokens
    split by line breaks '\n'. If at the end there is no line break,
    the data will sit in the line buffer until more data with line
    break comes in.
    """

    def __init__(self):
        self.buf = ""

    def append(self, data):
        self.buf += data

    def __iter__(self):
        return self

    def __next__(self):
        pos = self.buf.find("\n")
        if pos != -1:
            result = self.buf[0:pos]
            self.buf = self.buf[pos + 1 :]
            return result
        raise StopIteration

    def clean(self):
        self.buf = ""


class SizeBuffer:
    def __init__(self, size_len=2):
        self._buf = b""
        self._size_len = size_len

    def append(self, data):
        self._buf += data

    def __iter__(self):
        return self

    def __next__(self):
        if not self._buf:
            raise StopIteration
        size = int.from_bytes(self._buf[: self._size_len], "big")
        if len(self._buf[self._size_len :]) >= size:
            data = self._buf[self._size_len : self._size_len + size]
            self._buf = self._buf[self._size_len + size :]
            return data
        raise StopIteration

Youez - 2016 - github.com/yon3zu
LinuXploit