Galaxie Shell 0.2.6 documentation


Navigation:   | Index   | Search   | Top   | Up   |
Table of Content: |

© Copyright 2020-2024, Galaxie Shell Team.

Top » Module code » glxshell.glxush

Source code for glxshell.glxush

import datetime
import fnmatch
import gc
import math
import os
import os.path
import re
import stat
import subprocess
import sys
import termios
import time
from decimal import Decimal
from getpass import getuser
from grp import getgrgid
from pwd import getpwuid
from socket import getfqdn, gethostname
from time import localtime, sleep
from typing import Iterator

from glxshell.lib.path import exists, expanduser
from glxshell.lib.stat import S_ISDIR, S_ISLNK, S_ISREG


APPLICATION_AUTHORS = ["Tuuuux"]
APPLICATION_DESCRIPTION = ""
APPLICATION_LICENSE = "License WTFPL v2"
APPLICATION_NAME = "glxsh"
APPLICATION_VERSION = "0.2.5"
APPLICATION_PATCH_LEVEL = "a1"
APPLICATION_WARRANTY = "Copyright (C) 2020-2022 Galaxie Shell Project.\nLicense WTFPL Version 2, December 2004 <http://www.wtfpl.net/about/>.\nThis is free software: you are free to change and redistribute it.\nThere is NO WARRANTY, to the extent permitted by law.\n"

def error_code_to_text(code):
    error_code = {
        1: "Operation not permitted",
        2: "No such file or directory",
        3: "No such process",
        4: "Interrupted system call",
        5: "I/O error",
        6: "No such device or address",
        7: "Argument list too long",
        8: "Exec format error",
        9: "Bad file number",
        10: "No child processes",
        11: "Try again",
        12: "Out of memory",
        13: "Permission denied",
        14: "Bad address",
        15: "Block device required",
        16: "Device or resource busy",
        17: "File exists",
        18: "Cross-device link",
        19: "No such device",
        20: "Not a directory",
        21: "Is a directory",
        22: "Invalid argument",
        23: "File table overflow",
        24: "Too many open files",
        25: "Not a typewriter",
        26: "Text file busy",
        27: "File too large",
        28: "No space left on device",
        29: "Illegal seek",
        30: "Read-only file system",
        31: "Too many links",
        32: "Broken pipe",
        33: "Math argument out of domain of func",
        34: "Math result not representable",
        39: "Directory not empty",
        97: "Address family not supported by protocol",
        104: "Connection timed out",
        110: "Connection timed out",
        115: "Operation now in progress",
    }
    try:
        return error_code[code]
    except KeyError:
        return "%s" % code

def size_of(size, suffix="B") -> str:
    for unit in ("", "K", "M", "G", "T", "P", "E", "Z", "Y"):
        if size < 1024:
            return "{size:.2f}{unit}{suffix}".format(
                size=size, unit=unit, suffix=suffix
            )
        size /= 1024
    return "{size:.2f}".format(size=size)

def get_bold_text(text):
    if sys.stdout.isatty():
        return "\x1b[1m%s\x1b[0m" % text
    return "%s" % text

class Xlator(dict):
    def _make_regex(self):
        return re.compile("|".join(map(re.escape, self.keys())))

    def __call__(self, match):
        return self[match.group(0)]

    def xlat(self, text):
        return self._make_regex().sub(self, text)

def quoted_split(s):
    def strip_quotes(s):
        if s and ((s[0] == '"') or (s[0] == "'")) and (s[0] == s[(-1)]):
            return s[1:(-1)]
        return s
    return [
        strip_quotes(p).replace('\\"', '"').replace("\\'", "'")
        for p in re.findall(
            "(?:[^\"\\s]*\"(?:\\\\.|[^\"])*\"[^\"\\s]*)+|(?:[^\\'\\s]*\\'(?:\\\\.|[^\\'])*\\'[^\\'\\s]*)+|[^\\s]+",
            s,
        )
    ]

def humanized_size(num, suffix="B", si=False):
    if si:
        units = ["", "K", "M", "G", "T", "P", "E", "Z"]
        last_unit = "Y"
        div = 1000.0
    else:
        units = ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]
        last_unit = "Yi"
        div = 1024.0
    for unit in units:
        if abs(num) < div:
            return "%3.1f%s%s" % (num, unit, suffix)
        num /= div
    return "%.1f%s%s" % (num, last_unit, suffix)


sep = "/"

def normcase(path):
    return path

def normpath(path):
    if path == "":
        return "."
    initial_slashes = path.startswith(sep)
    if (
        initial_slashes
        and path.startswith((sep * 2))
        and (not path.startswith((sep * 3)))
    ):
        initial_slashes = 2
    new_comps = []
    for comp in path.split(sep):
        if comp in {"", "."}:
            continue
        if (
            (comp != "..")
            or ((not initial_slashes) and (not new_comps))
            or (new_comps and (new_comps[(-1)] == ".."))
        ):
            new_comps.append(comp)
        elif new_comps:
            new_comps.pop()
    path = sep.join(new_comps)
    if initial_slashes:
        path = (sep * initial_slashes) + path
    return path or "."

def getcwd():
    return os.getcwd()

def abspath(path):
    if not isabs(path):
        return normpath(joinpath(getcwd(), path))
    return normpath(path)

def joinpath(*args):
    res = ""
    for a in args:
        if (not res) or a.startswith(sep):
            res = a
        else:
            res += sep + a
    return res.replace((sep * 2), sep)

def split(path):
    if path == "":
        return ("", "")
    r = path.rsplit(sep, 1)
    if len(r) == 1:
        return ("", path)
    head = r[0]
    if not head:
        head = sep
    return (head, r[1])

def splitext(path):
    r = path.rsplit(".", 1)
    if len(r) == 1:
        return (path, "")
    if not r[0]:
        return (path, "")
    return (r[0], ("." + r[1]))

def splitdrive(path):
    return ("", path)

def dirname(path):
    if (not path) or (sep not in path):
        return "."
    if path in (sep, (sep * 2), (sep * 3)):
        return sep
    r = normpath(path).rsplit(sep, 1)
    head = ""
    if len(r) > 1:
        if r[0]:
            head = r[0]
        else:
            head = sep
    if sep not in head:
        return "."
    return head

def basename(string=None, suffix=None):
    if not string:
        return "."
    if string in (sep, (sep * 2), (sep * 3)):
        return sep
    if sep not in string:
        return string
    if suffix:
        return normpath(string).split(sep)[(-1)].replace(suffix, "")
    return normpath(string).split(sep)[(-1)]

def exists(path):
    try:
        os.stat(path)
    except OSError:
        return False
    return True

def lexists(path):
    try:
        return os.access(path, os.F_OK) and os.readlink(path)
    except OSError:
        return False

def isfile(path):
    try:
        return S_ISREG(os.stat(path)[0])
    except (OSError, ArithmeticError):
        return False

def isdir(path):
    try:
        return S_ISDIR(os.stat(path)[0])
    except (OSError, ArithmeticError):
        return False

def islink(path):
    try:
        return S_ISLNK(os.lstat(path)[0])
    except (OSError, ArithmeticError):
        return False

def isabs(path):
    return path.startswith(sep)

def realpath(filename, *, strict=False):
    filename = os.fspath(filename)
    (path, _) = _joinrealpath(filename[:0], filename, strict, {})
    return abspath(path)

def _joinrealpath(path, rest, strict, seen):
    curdir = "."
    pardir = ".."
    if isabs(rest):
        rest = rest[1:]
        path = sep
    while rest:
        (name, _, rest) = rest.partition(sep)
        if (not name) or (name == curdir):
            continue
        if name == pardir:
            if path:
                (path, name) = split(path)
                if name == pardir:
                    path = joinpath(path, pardir, pardir)
            else:
                path = pardir
            continue
        newpath = joinpath(path, name)
        try:
            st = os.lstat(newpath)
        except OSError:
            if strict:
                raise
            is_link = False
        else:
            is_link = S_ISLNK(st.st_mode)
        if not is_link:
            path = newpath
            continue
        if newpath in seen:
            path = seen[newpath]
            if path is not None:
                continue
            if strict:
                os.stat(newpath)
            else:
                return (joinpath(newpath, rest), False)
        seen[newpath] = None
        (path, ok) = _joinrealpath(path, os.readlink(newpath), strict, seen)
        if not ok:
            return (joinpath(path, rest), False)
        seen[newpath] = path
    return (path, True)

def expanduser(s):
    if (s == "~") or s.startswith(("~" + sep)):
        h = os.getenv("HOME")
        return h + s[1:]
    if s[0] == "~":
        return ((sep + "home") + sep) + s[1:]
    return s

def commonprefix(m):
    if not m:
        return ""
    s1 = min(m)
    s2 = max(m)
    for i, c in enumerate(s1):
        if c != s2[i]:
            return s1[:i]
    return s1

def relpath(path, start=None):
    if not path:
        raise ValueError("no path specified")
    curdir = "."
    pardir = ".."
    if start is None:
        start = curdir
    start_list = [x for x in abspath(start).split(sep) if x]
    path_list = [x for x in abspath(path).split(sep) if x]
    i = len(commonprefix([start_list, path_list]))
    rel_list = ([pardir] * (len(start_list) - i)) + path_list[i:]
    if not rel_list:
        return curdir
    return joinpath(*rel_list)


__all__ = ["parse_date", "ParseError", "UTC", "FixedOffset"]
ISO8601_REGEX = re.compile(
    "\n    (?P<year>[0-9]{4})\n    (\n        (\n            (-(?P<monthdash>[0-9]{1,2}))\n            |\n            (?P<month>[0-9]{2})\n            (?!$)  # Don't allow YYYYMM\n        )\n        (\n            (\n                (-(?P<daydash>[0-9]{1,2}))\n                |\n                (?P<day>[0-9]{2})\n            )\n            (\n                (\n                    (?P<separator>[ T])\n                    (?P<hour>[0-9]{2})\n                    (:{0,1}(?P<minute>[0-9]{2})){0,1}\n                    (\n                        :{0,1}(?P<second>[0-9]{1,2})\n                        ([.,](?P<second_fraction>[0-9]+)){0,1}\n                    ){0,1}\n                    (?P<timezone>\n                        Z\n                        |\n                        (\n                            (?P<tz_sign>[-+])\n                            (?P<tz_hour>[0-9]{2})\n                            :{0,1}\n                            (?P<tz_minute>[0-9]{2}){0,1}\n                        )\n                    ){0,1}\n                ){0,1}\n            )\n        ){0,1}  # YYYY-MM\n    ){0,1}  # YYYY only\n    $\n    ",
    re.VERBOSE,
)

class ParseError(ValueError):
    pass

UTC = datetime.timezone.utc

def FixedOffset(
    offset_hours: float, offset_minutes: float, name: str
) -> datetime.timezone:
    return datetime.timezone(
        datetime.timedelta(hours=offset_hours, minutes=offset_minutes), name
    )

def parse_timezone(
    matches: dict[(str, str)], default_timezone=UTC
) -> datetime.timezone:
    tz = matches.get("timezone", None)
    if tz == "Z":
        return UTC
    if tz is None:
        return default_timezone
    sign = matches.get("tz_sign", None)
    hours = int(matches.get("tz_hour", 0))
    minutes = int(matches.get("tz_minute", 0))
    description = f"{sign}{hours:02d}:{minutes:02d}"
    if sign == "-":
        hours = -hours
        minutes = -minutes
    return FixedOffset(hours, minutes, description)

def parse_date(datestring: str, default_timezone=UTC) -> datetime.datetime:
    try:
        m = ISO8601_REGEX.match(datestring)
    except Exception as e:
        raise ParseError(e) from e
    if not m:
        raise ParseError(f"Unable to parse date string {datestring!r}")
    groups: dict[(str, str)] = {
        k: v for (k, v) in m.groupdict().items() if (v is not None)
    }
    try:
        return datetime.datetime(
            year=int(groups.get("year", 0)),
            month=int(groups.get("month", groups.get("monthdash", 1))),
            day=int(groups.get("day", groups.get("daydash", 1))),
            hour=int(groups.get("hour", 0)),
            minute=int(groups.get("minute", 0)),
            second=int(groups.get("second", 0)),
            microsecond=int(
                (
                    Decimal(f"0.{groups.get('second_fraction', 0)}")
                    * Decimal("1000000.0")
                )
            ),
            tzinfo=parse_timezone(groups, default_timezone=default_timezone),
        )
    except Exception as e:
        raise ParseError(e) from e

def is_iso8601(datestring: str) -> bool:
    try:
        m = ISO8601_REGEX.match(datestring)
        return bool(m)
    except Exception as e:
        raise ParseError(e) from e


def glob(pathname):
    return list(iglob(pathname))

def iglob(pathname):
    if not has_magic(pathname):
        if os.path.lexists(pathname):
            (yield pathname)
        return
    (dirname, basename) = os.path.split(pathname)
    if not dirname:
        (yield from glob1(None, basename))
        return
    if (dirname != pathname) and has_magic(dirname):
        dirs = iglob(dirname)
    else:
        dirs = [dirname]
    if has_magic(basename):
        glob_in_dir = glob1
    else:
        glob_in_dir = glob0
    for dirname in dirs:
        for name in glob_in_dir(dirname, basename):
            (yield os.path.join(dirname, name))

def glob1(dirname, pattern):
    if not dirname:
        if isinstance(pattern, bytes):
            dirname = bytes(os.curdir, "ASCII")
        else:
            dirname = os.curdir
    try:
        names = os.listdir(dirname)
    except os.error:
        return []
    if not _ishidden(pattern):
        names = [x for x in names if (not _ishidden(x))]
    return fnmatch.filter(names, pattern)

def glob0(dirname, basename):
    if not basename:
        if os.path.isdir(dirname):
            return [basename]
    elif os.path.lexists(os.path.join(dirname, basename)):
        return [basename]
    return []

magic_check = re.compile("[*?[]")
magic_check_bytes = re.compile(b"[*?[]")

def has_magic(s):
    if isinstance(s, bytes):
        match = magic_check_bytes.search(s)
    else:
        match = magic_check.search(s)
    return match is not None

def _ishidden(path):
    return path[0] in (".", b"."[0])

ST_MODE = 0
ST_INO = 1
ST_DEV = 2
ST_NLINK = 3
ST_UID = 4
ST_GID = 5
ST_SIZE = 6
ST_ATIME = 7
ST_MTIME = 8
ST_CTIME = 9

def S_IMODE(mode):
    return mode & 4095

def S_IFMT(mode):
    return mode & 61440

S_IFDIR = 16384
S_IFCHR = 8192
S_IFBLK = 24576
S_IFREG = 32768
S_IFIFO = 4096
S_IFLNK = 40960
S_IFSOCK = 49152

def S_ISDIR(mode):
    return S_IFMT(mode) == S_IFDIR

def S_ISCHR(mode):
    return S_IFMT(mode) == S_IFCHR

def S_ISBLK(mode):
    return S_IFMT(mode) == S_IFBLK

def S_ISREG(mode):
    return S_IFMT(mode) == S_IFREG

def S_ISFIFO(mode):
    return S_IFMT(mode) == S_IFIFO

def S_ISLNK(mode):
    return S_IFMT(mode) == S_IFLNK

def S_ISSOCK(mode):
    return S_IFMT(mode) == S_IFSOCK

S_ISUID = 2048
S_ISGID = 1024
S_ENFMT = S_ISGID
S_ISVTX = 512
S_IREAD = 256
S_IWRITE = 128
S_IEXEC = 64
S_IRWXU = 448
S_IRUSR = 256
S_IWUSR = 128
S_IXUSR = 64
S_IRWXG = 56
S_IRGRP = 32
S_IWGRP = 16
S_IXGRP = 8
S_IRWXO = 7
S_IROTH = 4
S_IWOTH = 2
S_IXOTH = 1
UF_NODUMP = 1
UF_IMMUTABLE = 2
UF_APPEND = 4
UF_OPAQUE = 8
UF_NOUNLINK = 16
UF_COMPRESSED = 32
UF_HIDDEN = 32768
SF_ARCHIVED = 65536
SF_IMMUTABLE = 131072
SF_APPEND = 262144
SF_NOUNLINK = 1048576
SF_SNAPSHOT = 2097152
_filemode_table = (
    (
        (S_IFLNK, "l"),
        (S_IFREG, "-"),
        (S_IFBLK, "b"),
        (S_IFDIR, "d"),
        (S_IFCHR, "c"),
        (S_IFIFO, "p"),
    ),
    ((S_IRUSR, "r"),),
    ((S_IWUSR, "w"),),
    (((S_IXUSR | S_ISUID), "s"), (S_ISUID, "S"), (S_IXUSR, "x")),
    ((S_IRGRP, "r"),),
    ((S_IWGRP, "w"),),
    (((S_IXGRP | S_ISGID), "s"), (S_ISGID, "S"), (S_IXGRP, "x")),
    ((S_IROTH, "r"),),
    ((S_IWOTH, "w"),),
    (((S_IXOTH | S_ISVTX), "t"), (S_ISVTX, "T"), (S_IXOTH, "x")),
)

def filemode(mode):
    perm = []
    for table in _filemode_table:
        for bit, char in table:
            if (mode & bit) == bit:
                perm.append(char)
                break
        else:
            perm.append("-")
    return "".join(perm)


__all__ = ["TextWrapper", "wrap", "fill", "dedent", "indent", "shorten"]
_whitespace = "\t\n\x0b\x0c\r "

[docs] class TextWrapper: unicode_whitespace_trans = {} uspace = ord(" ") for x in _whitespace: unicode_whitespace_trans[ord(x)] = uspace wordsep_re = re.compile( "(\\s+|[^\\s\\w]*\\w+[^0-9\\W]-(?=\\w+[^0-9\\W])|(?<=[\\w\\!\\\"\\'\\&\\.\\,\\?])-{2,}(?=\\w))" ) wordsep_simple_re = re.compile("(\\s+)") sentence_end_re = re.compile("[a-z][\\.\\!\\?][\\\"\\']?\\Z") def __init__( self, width=None, initial_indent="", subsequent_indent="", expand_tabs=True, replace_whitespace=True, fix_sentence_endings=False, break_long_words=True, drop_whitespace=True, break_on_hyphens=True, tabsize=8, *, max_lines=None, placeholder=" [...]" ): if width is None: width = 70 self.width = width self.initial_indent = initial_indent self.subsequent_indent = subsequent_indent self.expand_tabs = expand_tabs self.replace_whitespace = replace_whitespace self.fix_sentence_endings = fix_sentence_endings self.break_long_words = break_long_words self.drop_whitespace = drop_whitespace self.break_on_hyphens = break_on_hyphens self.tabsize = tabsize self.max_lines = max_lines self.placeholder = placeholder def _munge_whitespace(self, text): if self.expand_tabs: text = text.expandtabs(self.tabsize) if self.replace_whitespace: text = text.translate(self.unicode_whitespace_trans) return text def _split(self, text): if self.break_on_hyphens is True: chunks = self.wordsep_re.split(text) else: chunks = self.wordsep_simple_re.split(text) chunks = [c for c in chunks if c] return chunks def _fix_sentence_endings(self, chunks): i = 0 patsearch = self.sentence_end_re.search while i < (len(chunks) - 1): if (chunks[(i + 1)] == " ") and patsearch(chunks[i]): chunks[(i + 1)] = " " i += 2 else: i += 1 def _handle_long_word(self, reversed_chunks, cur_line, cur_len, width): if width < 1: space_left = 1 else: space_left = width - cur_len if self.break_long_words: cur_line.append(reversed_chunks[(-1)][:space_left]) reversed_chunks[(-1)] = reversed_chunks[(-1)][space_left:] elif not cur_line: cur_line.append(reversed_chunks.pop()) def _wrap_chunks(self, chunks): lines = [] if self.width <= 0: raise ValueError(("invalid width %r (must be > 0)" % self.width)) if self.max_lines: if self.max_lines > 1: indentation = self.subsequent_indent else: indentation = self.initial_indent if (len(indentation) + len(self.placeholder.lstrip())) > self.width: raise ValueError("placeholder too large for max width") chunks.reverse() while chunks: cur_line = [] cur_len = 0 if lines: indentation = self.subsequent_indent else: indentation = self.initial_indent width = self.width - len(indentation) if self.drop_whitespace and (chunks[(-1)].strip() == "") and lines: del chunks[(-1)] while chunks: l = len(chunks[(-1)]) if (cur_len + l) <= width: cur_line.append(chunks.pop()) cur_len += l else: break if chunks and (len(chunks[(-1)]) > width): self._handle_long_word(chunks, cur_line, cur_len, width) cur_len = sum(map(len, cur_line)) if self.drop_whitespace and cur_line and (cur_line[(-1)].strip() == ""): cur_len -= len(cur_line[(-1)]) del cur_line[(-1)] if self.max_lines: that_the_end_1 = (len(lines) + 1) < self.max_lines else: that_the_end_1 = False that_the_end_2 = (not chunks) or ( self.drop_whitespace and (len(chunks) == 1) and (not chunks[0].strip()) ) if cur_line: if ( (self.max_lines is None) or that_the_end_1 or (that_the_end_2 and (cur_len <= width)) ): lines.append((indentation + "".join(cur_line))) else: while cur_line: if cur_line[(-1)].strip() and ( (cur_len + len(self.placeholder)) <= width ): cur_line.append(self.placeholder) lines.append((indentation + "".join(cur_line))) break cur_len -= len(cur_line[(-1)]) del cur_line[(-1)] else: if lines: prev_line = lines[(-1)].rstrip() if (len(prev_line) + len(self.placeholder)) <= self.width: lines[(-1)] = prev_line + self.placeholder break lines.append((indentation + self.placeholder.lstrip())) break return lines def _split_chunks(self, text): text = self._munge_whitespace(text) return self._split(text)
[docs] def wrap(self, text): chunks = self._split_chunks(text) if self.fix_sentence_endings: self._fix_sentence_endings(chunks) return self._wrap_chunks(chunks)
[docs] def fill(self, text): return "\n".join(self.wrap(text))
[docs] def wrap(text, width=None, **kwargs): if width is None: width = 79 w = TextWrapper(width=width, **kwargs) return w.wrap(text)
[docs] def fill(text, width=70, **kwargs): w = TextWrapper(width=width, **kwargs) return w.fill(text)
[docs] def shorten(text, width, **kwargs): w = TextWrapper(width=width, max_lines=1, **kwargs) return w.fill(" ".join(text.strip().split()))
_whitespace_only_re = re.compile("^[ \t]+$", re.MULTILINE) _leading_whitespace_re = re.compile("(^[ \t]*)(?:[^ \t\n])", re.MULTILINE)
[docs] def dedent(text): margin = None text = _whitespace_only_re.sub("", text) indents = _leading_whitespace_re.findall(text) for indentation in indents: if margin is None: margin = indentation elif indentation.startswith(margin): pass elif margin.startswith(indentation): margin = indentation else: margin = "" break if margin: for line in text.split("\n"): assert (not line) or line.startswith(margin), "line = %r, margin = %r" % ( line, margin, ) text = re.sub(("(?m)^" + margin), "", text) return text
[docs] def indent(text, prefix, predicate=None): if predicate is None: def predicate(line): return line.strip() def prefixed_lines(): for line in text.splitlines(True): (yield ((prefix + line) if predicate(line) else line)) return "".join(prefixed_lines())
class DescriptionString: def __set_name__(self, owner, name): self.public_name = name self.private_name = "_" + name def __get__(self, obj, objtype=None): return getattr(obj, self.private_name) def __set__(self, obj, value): if value and (not isinstance(value, str)): raise TypeError( ("'%s' value must be a str type or None" % self.public_name) ) if self.private_name != value: setattr(obj, self.private_name, value) class SynopsisList: def __set_name__(self, owner, name): self.public_name = name self.private_name = "_" + name def __get__(self, obj, objtype=None): return getattr(obj, self.private_name) def __set__(self, obj, value): if value and (not isinstance(value, list)): raise TypeError( ("'%s' value must be a list type or None" % self.public_name) ) if self.private_name != value: setattr(obj, self.private_name, value) class DescriptionDict: def __set_name__(self, owner, name): self.public_name = name self.private_name = "_" + name def __get__(self, obj, objtype=None): return getattr(obj, self.private_name) def __set__(self, obj, value): if value and (not isinstance(value, dict)): raise TypeError( ("'%s' value must be a dict type or None" % self.public_name) ) if self.private_name != value: setattr(obj, self.private_name, value) class DescriptionBoolean: def __set_name__(self, owner, name): self.public_name = name self.private_name = "_" + name def __get__(self, obj, objtype=None): return getattr(obj, self.private_name) def __set__(self, obj, value): if value and (not isinstance(value, bool)): raise TypeError( ("'%s' value must be a dict type or None" % self.public_name) ) if self.private_name != value: setattr(obj, self.private_name, value) class UtilityDescription: name = DescriptionString() synopsis = SynopsisList() description = DescriptionString() options = DescriptionString() operator = DescriptionString() stdin = DescriptionString() input_files = DescriptionString() environment_variables = DescriptionString() asynchronous_events = DescriptionString() stdout = DescriptionString() stderr = DescriptionString() output_files = DescriptionString() extended_description = DescriptionString() exit_status = DescriptionDict() consequences_of_errors = DescriptionString() application_usage = DescriptionString() examples = DescriptionString() rationale = DescriptionString() future_directions = DescriptionString() see_also = DescriptionString() change_history = DescriptionString() add_help = DescriptionBoolean() prog = DescriptionString() short_description = DescriptionString() def __init__( self, name=None, prog=None, synopsis=None, description=None, short_description=None, options=None, operator=None, stdin=None, input_files=None, environment_variables=None, asynchronous_events=None, stdout=None, stderr=None, output_files=None, extended_description=None, exit_status=None, consequences_of_errors=None, application_usage=None, examples=None, rationale=None, future_directions=None, see_also=None, change_history=None, add_help=None, ): self.name = name self.prog = prog self.synopsis = synopsis self.description = description self.short_description = short_description self.options = options self.operands = operator self.stdin = stdin self.input_files = input_files self.environment_variables = environment_variables self.asynchronous_events = asynchronous_events self.stdout = stdout self.stderr = stderr self.output_files = output_files self.extended_description = extended_description self.exit_status = exit_status self.consequences_of_errors = consequences_of_errors self.application_usage = application_usage self.examples = examples self.rationale = rationale self.future_directions = future_directions self.see_also = see_also self.change_history = change_history self.add_help = add_help users_re = re.compile("[ugoa]+") operation_re = re.compile("[+=-]") permissions_re = re.compile("[rwxXsugo]+") def _apply_symbolic_mode(mode, users, operation, permissions, umask, isdir): if users == "": users = "a" else: umask = 0 mult = 0 user_bits = S_ISGID | S_ISUID if ("u" in users) or ("a" in users): mult = mult + S_IXUSR user_bits = user_bits | S_IRWXU if ("g" in users) or ("a" in users): mult = mult + S_IXGRP user_bits = user_bits | S_IRWXG if ("o" in users) or ("a" in users): mult = mult + S_IXOTH user_bits = user_bits | S_IRWXO assert mult != 0 mask = 0 if "r" in permissions: mask = mask | (S_IROTH * mult) if "w" in permissions: mask = mask | (S_IWOTH * mult) if ("x" in permissions) or (("X" in permissions) and isdir): mask = mask | (S_IXOTH * mult) elif "X" in permissions: if mode & ((S_IXOTH | S_IXGRP) | S_IXUSR): mask = mask | (S_IXOTH * mult) if "u" in permissions: mask = mask | (((mode & S_IRWXU) >> 6) * mult) if "g" in permissions: mask = mask | (((mode & S_IRWXG) >> 3) * mult) if "o" in permissions: mask = mask | (((mode & S_IRWXO) >> 0) * mult) if "s" in permissions: if "u" in users: mask = mask | S_ISUID if "g" in users: mask = mask | S_ISGID if "o" in users: raise ValueError("Cannot use 'o' user flag with 's' permissions") if "a" in users: raise ValueError("Cannot use 'a' user flag with 's' permissions") if operation == "+": mode = (mode & umask) | ((mode | mask) & (~umask)) elif operation == "-": mode = (mode & umask) | ((mode & (~mask)) & (~umask)) elif operation == "=": umask = umask | (~user_bits) mode = (mode & umask) | (mask & (~umask)) else: raise AssertionError(("unknown operation " + operation)) return mode def symbolic_mode(symbolic, mode=None, isdir=0, umask=0): pos = 0 if mode is None: mode = 493 while pos < len(symbolic): m = users_re.match(symbolic, pos) if m is None: users = "" else: users = m.group(0) pos = pos + len(users) while 1: m = operation_re.match(symbolic, pos) if m is None: raise TypeError("Missing operation in mode") operation = m.group(0) pos = pos + len(operation) m = permissions_re.match(symbolic, pos) if m is None: permissions = "" else: permissions = m.group(0) pos = pos + len(permissions) mode = _apply_symbolic_mode( mode, users, operation, permissions, umask, isdir ) if not (pos < len(symbolic)): break if symbolic[pos] == ",": pos = pos + 1 break return mode OPTIONAL = "?" ZERO_OR_MORE = "*" ONE_OR_MORE = "+" PARSER = "A..." REMAINDER = "..." class WrapperCmdLineArgParser: def __init__(self, parser): self.parser = parser def __call__(self, func): if not self.parser: self.parser = func(None, None, None, True) def wrapped_function(*args): line = args[1].split() try: parsed = self.parser.parse_args(line) except SystemExit: return None return func(*args, parsed) return wrapped_function class Namespace: pass class _ArgError(BaseException): pass class _Arg: def __init__(self, names, dest, action, nargs, const, default, arg_type, arg_help): self.names = names self.dest = dest self.action = action self.nargs = nargs self.const = const self.default = default self.type = arg_type self.help = arg_help def parse(self, optname, eq_arg, args): if self.action in {"store", "append"}: if self.nargs is None: if eq_arg: return self.type(eq_arg) if args: return self.type(args.pop(0)) raise _ArgError(("expecting value for %s" % optname)) if self.nargs == OPTIONAL: if eq_arg: return self.type(eq_arg) if args: return self.type(args.pop(0)) return self.default ret = [] if self.nargs == ZERO_OR_MORE: n = -1 elif self.nargs == ONE_OR_MORE: if not args: raise _ArgError(("expecting value for %s" % optname)) n = -1 elif self.nargs == REMAINDER: n = 0 while args: ret.append(args.pop(0)) else: n = int(self.nargs) stop_at_opt = True while args and (n != 0): if stop_at_opt and args[0].startswith("-") and (args[0] != "-"): if args[0] == "--": stop_at_opt = False args.pop(0) else: break else: ret.append(args.pop(0)) n -= 1 if n > 0: raise _ArgError(("expecting value for %s" % optname)) return ret if self.action == "store_const": return self.const assert False def _dest_from_optnames(opt_names): dest = opt_names[0] for name in opt_names: if name.startswith("--"): dest = name break return dest.lstrip("-").replace("-", "_") class ArgumentParser(UtilityDescription): def __init__(self, **kwargs): UtilityDescription.__init__(self) self.columns = 79 add_help = kwargs.get("add_help", False) self.name = kwargs.get("name", None) self.synopsis = kwargs.get("synopsis", None) self.description = kwargs.get("description", None) self.options = kwargs.get("options", None) self.operands = kwargs.get("operands", None) self.stdin = kwargs.get("stdin", None) self.input_files = kwargs.get("input_files", None) self.environment_variables = kwargs.get("environment_variables", None) self.asynchronous_events = kwargs.get("asynchronous_events", None) self.stdout = kwargs.get("stdout", None) self.stderr = kwargs.get("stderr", None) self.output_files = kwargs.get("output_files", None) self.extended_description = kwargs.get("extended_description", None) self.exit_status = kwargs.get("exit_status", None) self.consequences_of_errors = kwargs.get("consequences_of_errors", None) self.application_usage = kwargs.get("application_usage", None) self.examples = kwargs.get("examples", None) self.rationale = kwargs.get("rationale", None) self.future_directions = kwargs.get("future_directions", None) self.see_also = kwargs.get("see_also", None) self.change_history = kwargs.get("change_history", None) self.opt = [] self.pos = [] if add_help: self.add_argument( "-h", "--help", dest="help", action="store_true", help="Show this help message and exit", ) def add_argument(self, *args, **kwargs): action = kwargs.get("action", "store") if action == "store_true": action = "store_const" const = True default = kwargs.get("default", False) elif action == "store_false": action = "store_const" const = False default = kwargs.get("default", True) elif action == "append": const = None default = kwargs.get("default", []) else: const = kwargs.get("const", None) default = kwargs.get("default", None) if args and args[0].startswith("-"): args_list = self.opt dest = kwargs.get("dest") if dest is None: dest = _dest_from_optnames(args) else: args_list = self.pos dest = kwargs.get("dest") if dest is None: dest = args[0] if not args: args = [dest] args_list.append( _Arg( args, dest, action, kwargs.get("nargs", None), const, default, kwargs.get("type", str), kwargs.get("help", ""), ) ) @staticmethod def error(msg): sys.stderr.write(("Error: %s\n" % msg)) sys.exit(2) def parse_args(self, args=None, namespace=None): return self._parse_args_impl(args, namespace, False) def parse_known_args(self, args=None, namespace=None): return self._parse_args_impl(args, namespace, True) def _parse_args_impl(self, args, namespace, return_unknown): if args is None: args = sys.argv[1:] else: args = args[:] if namespace is None: namespace = Namespace() try: return self._parse_args(args, namespace, return_unknown) except _ArgError as e: self.print_usage() self.error(str(e)) return None def _parse_args(self, args, arg_holder, return_unknown): for opt in self.opt: setattr(arg_holder, opt.dest, opt.default) unknown = [] def consume_unknown(): while args and (not args[0].startswith("-")): unknown.append(args.pop(0)) parsed_pos = False while args or (not parsed_pos): if ( args and args[0].startswith("-") and (args[0] != "-") and (args[0] != "--") ): a = args.pop(0) if ("--" not in a) and (len(a) > 2): index_focus = 0 for letter in a.replace("-", ""): args.insert(index_focus, ("-%s" % letter)) index_focus += 1 a = args.pop(0) eq_arg = None if a.startswith("--") and ("=" in a): (a, eq_arg) = a.split("=", 1) found = False for _, opt in enumerate(self.opt): if a in opt.names: val = opt.parse(a, eq_arg, args) if opt.action == "append": getattr(arg_holder, opt.dest).append(val) else: setattr(arg_holder, opt.dest, val) found = True break if not found: if return_unknown: unknown.append(a) consume_unknown() else: raise _ArgError(("unknown option %s" % a)) else: if parsed_pos: if return_unknown: unknown = unknown + args break raise _ArgError(("extra args: %s" % " ".join(args))) for pos in self.pos: setattr(arg_holder, pos.dest, pos.parse(pos.names[0], None, args)) parsed_pos = True if return_unknown: consume_unknown() return (arg_holder, unknown) if return_unknown else arg_holder @staticmethod def render_arg(arg): if arg.action == "store": if arg.nargs == ONE_OR_MORE: return " %s..." % arg.dest if arg.nargs == ZERO_OR_MORE: return " [%s...]" % arg.dest if arg.nargs == OPTIONAL: return " [%s]" % arg.dest return " %s" % arg.dest return "" def format_usage(self): if self.synopsis: for usage in self.synopsis: if usage == self.synopsis[0]: sys.stdout.write(("Usage: %s\n" % usage)) else: sys.stdout.write((" %s\n" % usage)) else: if self.name: sys.stdout.write(("Usage: %s" % self.name.split()[0])) else: sys.stdout.write(("Usage: %s" % sys.argv[0])) for opt in self.opt: sys.stdout.write((" [%s]" % ", ".join(opt.names))) for pos in self.pos: sys.stdout.write(self.render_arg(pos)) sys.stdout.write("\n") if self.pos is None: pass def format_help(self, columns): if columns is None: self.columns = 79 else: self.columns = columns if self.name: application_name = self.name.split()[0] else: application_name = sys.argv[0] self._format_help_name() self._format_help_synopsis(application_name) self._format_help_description() self._format_help_operands() self._format_help_options() self._format_exit_status() def _format_help_name(self): if self.name: sys.stdout.write(("%s\n" % get_bold_text("NAME"))) for line in wrap(self.name, self.columns, replace_whitespace=False): sys.stdout.write(("%s\n" % indent(line, " "))) def _format_help_synopsis(self, application_name): sys.stdout.write(("\n%s\n" % get_bold_text("SYNOPSIS"))) if self.synopsis: for synopsis_variation in self.synopsis: for line in wrap( synopsis_variation, self.columns, replace_whitespace=False ): sys.stdout.write( ( "%s\n" % indent( line.replace( application_name, get_bold_text(application_name) ), " ", ) ) ) else: if self.name: sys.stdout.write((" %s" % get_bold_text(self.name.split()[0]))) else: sys.stdout.write((" %s" % get_bold_text(sys.argv[0]))) for opt in self.opt: sys.stdout.write((" [%s]" % ", ".join(opt.names))) for pos in self.pos: sys.stdout.write(self.render_arg(pos)) sys.stdout.write("\n") def _format_help_description(self): if self.description: sys.stdout.write(("\n%s\n" % get_bold_text("DESCRIPTION"))) for line in wrap(self.description, self.columns, replace_whitespace=False): sys.stdout.write(("%s\n" % indent(line, " "))) def _format_help_operands(self): if self.pos: sys.stdout.write(("\n%s\n" % get_bold_text("OPERANDS"))) max_size = max((len(x.names[0]) for x in self.pos)) for pos in self.pos: the_name = pos.names[0] pos.help = pos.help[0].upper() + pos.help[1:] the_help = wrap(pos.help, ((self.columns - max_size) - 4)) sys.stdout.write(indent(get_bold_text(the_name), " ")) for help_line in the_help: if help_line == the_help[0]: sys.stdout.write( indent( help_line, (" " * int(((max_size - len(the_name)) + 2))) ) ) sys.stdout.write("\n") else: sys.stdout.write(indent(help_line, (" " * int((max_size + 4))))) sys.stdout.write("\n") def _format_help_options(self): if self.opt: sys.stdout.write(("\n%s\n" % get_bold_text("OPTIONS"))) max_size = max((len(", ".join(x.names)) for x in self.opt)) for opt in self.opt: the_name = ", ".join(opt.names) opt.help = opt.help[0].upper() + opt.help[1:] the_help = wrap(opt.help, ((self.columns - max_size) - 4)) sys.stdout.write(indent(get_bold_text(the_name), " ")) for help_line in the_help: if help_line == the_help[0]: sys.stdout.write( indent( help_line, (" " * int(((max_size - len(the_name)) + 2))) ) ) sys.stdout.write("\n") else: sys.stdout.write(indent(help_line, (" " * int((max_size + 4))))) sys.stdout.write("\n") def _format_exit_status(self): if self.exit_status: sys.stdout.write(("\n%s\n" % get_bold_text("EXIT STATUS"))) max_size = max( ( len(exit_code) for (exit_code, description) in self.exit_status.items() ) ) for exit_code, description in self.exit_status.items(): sys.stdout.write(indent(get_bold_text(exit_code), " ")) if description: the_help = wrap(description, (self.columns - 4)) for help_line in the_help: if help_line == the_help[0]: sys.stdout.write( indent( help_line, (" " * int(((max_size - len(exit_code)) + 2))), ) ) sys.stdout.write("\n") else: sys.stdout.write( indent(help_line, (" " * int((max_size + 4)))) ) sys.stdout.write("\n") else: sys.stdout.write("\n") def print_usage(self): self.format_usage() def print_help(self, columns=None): self.format_help(columns=columns) class FileType: def __init__(self, mode="r", bufsize=(-1), encoding=None, errors=None): self._mode = mode self._bufsize = bufsize self._encoding = encoding self._errors = errors def __call__(self, string): if string == "-": if "r" in self._mode: return sys.stdin if "w" in self._mode: return sys.stdout raise ValueError(('argument "-" with mode %r' % self._mode)) try: return open(string, self._mode, self._bufsize, self._encoding, self._errors) except OSError as e: args = {"filename": string, "error": e} message = "can't open '%(filename)s': %(error)s" raise TypeError((message % args)) from e def __repr__(self): args = (self._mode, self._bufsize) kwargs = [("encoding", self._encoding), ("errors", self._errors)] args_str = ", ".join( ( [repr(arg) for arg in args if (arg != (-1))] + [("%s=%r" % (kw, arg)) for (kw, arg) in kwargs if (arg is not None)] ) ) return "%s(%s)" % (type(self).__name__, args_str) def columnize(columnize_data, display_width=80): if not columnize_data: sys.stdout.write("<empty>\n") return non_strings = [ i for i in range(len(columnize_data)) if (not isinstance(columnize_data[i], str)) ] if non_strings: sys.stderr.write( ("list[i] not a string for i in %s" % ", ".join(map(str, non_strings))) ) return size = len(columnize_data) if size == 1: sys.stdout.write(("%s\n" % str(columnize_data[0]))) return for num_rows in range(1, len(columnize_data)): num_cols = ((size + num_rows) - 1) // num_rows col_widths = [] tot_width = -2 for col in range(num_cols): colwidth = 0 for row in range(num_rows): i = row + (num_rows * col) if i >= size: break x = columnize_data[i] colwidth = max(colwidth, len(x)) col_widths.append(colwidth) tot_width += colwidth + 2 if tot_width > display_width: break if tot_width <= display_width: break else: num_rows = len(columnize_data) num_cols = 1 col_widths = [0] for row in range(num_rows): texts = [] for col in range(num_cols): i = row + (num_rows * col) if i >= size: x = "" else: x = columnize_data[i] texts.append(x) while texts and (not texts[(-1)]): del texts[(-1)] for col, _ in enumerate(texts): texts[col] = "%-*s" % (col_widths[col], texts[col]) sys.stdout.write(("%s\n" % str(" ".join(texts)))) try: import subprocess except ImportError: subprocess = None parser_man = ArgumentParser( prog="man", description="The man utility shall write information about each of the name operands.", add_help=True, ) parser_man.add_argument( "name", nargs="?", help="A keyword or the name of a standard utility." ) PROMPT = "(Cmd) " IDENTCHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_" class Cmd: prompt = PROMPT identchars = IDENTCHARS ruler = "=" lastcmd = "" intro = None doc_leader = "" doc_header = "Documented commands (type man <topic>):" misc_header = "Miscellaneous help topics:" undoc_header = "Undocumented commands:" nohelp = "No manual entry for %s" use_rawinput = 1 def __init__(self, completekey="tab"): self.cmdqueue = [] self.completekey = completekey self.completion_matches = [] self.exit_code = 0 self.rc_file = ".glxshrc" self.logout_file = ".glxsh_logout" self.history_path = expanduser("~/.glxsh_history") self.history_length = 100 def cmdloop(self, intro=None): self.preloop() if self.use_rawinput and self.completekey: try: import readline self.old_completer = readline.get_completer() readline.set_completer(self.complete) readline.parse_and_bind((self.completekey + ": complete")) self.old_delims = readline.get_completer_delims() readline.set_completer_delims(self.old_delims.replace("-", "")) readline.parse_and_bind("set colored-completion-prefix off") readline.parse_and_bind("set show-all-if-unmodified on") readline.parse_and_bind("set horizontal-scroll-mode on") if not exists(self.history_path): open(self.history_path, "a+").close() readline.read_history_file(self.history_path) readline.set_history_length(self.history_length) except ImportError: pass try: if intro is not None: self.intro = intro if self.intro: sys.stdout.write((str(self.intro) + "\n")) stop = None while not stop: if self.cmdqueue: line = self.cmdqueue.pop(0) elif self.use_rawinput: try: sys.stdout.write(self.prompt) sys.stdout.flush() line = input() except EOFError: line = "EOF" except KeyboardInterrupt: sys.stdout.write("^C") sys.stdout.flush() line = "" else: try: sys.stdout.write(self.prompt) sys.stdout.flush() line = sys.stdin.readline() if not len(line): line = "EOF" else: line = line.rstrip("\r\n") except KeyboardInterrupt: sys.stdout.write("^C") sys.stdout.flush() line = "" line = self.precmd(line) stop = self.onecmd(line) stop = self.postcmd(stop, line) self.postloop() finally: if self.use_rawinput and self.completekey: try: import readline readline.set_completer(self.old_completer) readline.set_completer_delims(self.old_delims) readline.write_history_file(self.history_path) except ImportError: pass return self.exit_code def precmd(self, line): return line def postcmd(self, stop, line): return stop def preloop(self): pass def postloop(self): pass def parseline(self, line): line = line.strip() if not line: return (None, None, line) elif line[0] == "?": line = "man " + line[1:] elif line[0] == "!": if hasattr(self, "do_shell"): line = "shell " + line[1:] else: return (None, None, line) (i, n) = (0, len(line)) while (i < n) and (line[i] in self.identchars): i = i + 1 (cmd, arg) = (line[:i], line[i:].strip()) return (cmd, arg, line) def onecmd(self, line): (cmd, arg, line) = self.parseline(line) if not line: return self.emptyline() if cmd is None: return self.default(line) self.lastcmd = line if line == "EOF": self.lastcmd = "" if cmd == "": return self.default(line) else: try: func = getattr(self, ("do_" + cmd)) except AttributeError: return self.default(line) return func(arg) def onecmdhooks(self, line): self.onecmd(line) return self.exit_code def emptyline(self): if self.lastcmd: return self.onecmd(self.lastcmd) def default(self, line): sys.stdout.write(("*** Unknown syntax: %s\n" % line)) def default_completer(self, *ignored): return [] def completenames(self, text, *ignored): return [ ("%s " % a[3:]) for a in self.get_names() if a.startswith(("do_%s" % text)) ] def complete(self, text, state): if state == 0: import readline original_line = readline.get_line_buffer() line = original_line.lstrip() stripped = len(original_line) - len(line) start_index = readline.get_begidx() - stripped end_index = readline.get_endidx() - stripped if start_index > 0: (cmd, args, foo) = self.parseline(line) if cmd == "": complete_function = self.default_completer else: try: complete_function = getattr(self, ("complete_" + cmd)) except AttributeError: complete_function = self.default_completer else: complete_function = self.completenames self.completion_matches = complete_function( text, line, start_index, end_index ) try: return self.completion_matches[state] except IndexError: return None def get_names(self): return dir(self.__class__) def complete_man(self, *args): commands = set( [a[3:] for a in self.get_names() if a.startswith(("do_" + args[0]))] ) topics = set( (a[5:] for a in self.get_names() if a.startswith(("help_" + args[0]))) ) return list((commands | topics)) @staticmethod def help_man(): parser_man.print_help() @WrapperCmdLineArgParser(parser_man) def do_man(self, arg, parsed): if parsed.help: self.help_man() return 0 if arg: try: func = getattr(self, ("help_" + arg)) except AttributeError: sys.stdout.write(("%s\n" % str((self.nohelp % (arg,))))) return 1 return func() else: names = self.get_names() cmds_doc = [] cmds_undoc = [] help = {} for name in names: if name[:5] == "help_": help[name[5:]] = 1 names.sort() prevname = "" for name in names: if name[:3] == "do_": if name == prevname: continue prevname = name cmd = name[3:] if cmd in help: cmds_doc.append(cmd) del help[cmd] else: cmds_undoc.append(cmd) sys.stdout.write(("%s\n" % self.doc_leader)) self.print_topics(self.doc_header, cmds_doc, 15, 80) self.print_topics(self.misc_header, list(help.keys()), 15, 80) if cmds_undoc != ["EOF"]: self.print_topics(self.undoc_header, cmds_undoc, 15, 80) return 0 def print_topics(self, header, cmds, cmdlen, maxcol): if cmds: sys.stdout.write(("%s\n" % header)) if self.ruler: sys.stdout.write(("%s\n" % str((self.ruler * len(header))))) columnize(cmds, (maxcol - 1)) sys.stdout.write("\n") class GLXAlias: def __init__(self): self.__alias = None self.alias = None @property def alias(self): return self.__alias @alias.setter def alias(self, value): if value is None: value = {} if value and (not isinstance(value, dict)): raise TypeError("'alias' property value must be a dict type or None") if self.alias != value: self.__alias = value def tabulate(tabular_data, headers, tablefmt, colalign): value_to_return = [] if tablefmt is None: tablefmt = None columns_info = {} if headers: tabular_data.insert(0, headers) for line in tabular_data: for index_col, cell_value in enumerate(line): columns_info[index_col] = {} columns_info[index_col]["data"] = [] columns_info[index_col]["text"] = [] columns_info[index_col]["size"] = 0 columns_info[index_col]["colalign"] = None for line in tabular_data: for index_col, cell_value in enumerate(line): columns_info[index_col]["data"].append(cell_value) columns_info[index_col]["text"].append(cell_value) if colalign: columns_info[index_col]["colalign"] = colalign[index_col] if len(str(cell_value)) > columns_info[index_col]["size"]: columns_info[index_col]["size"] = len(str(cell_value)) for _, value in columns_info.items(): for index, item in enumerate(value["data"]): value["text"][index] = str(value["text"][index]) if len(str(item)) < value["size"]: if value["colalign"].lower() == "right": spacing = " " * int((value["size"] - len(str(item)))) value["text"][index] = "%s%s" % (spacing, value["text"][index]) elif value["colalign"].lower() == "center": spacing = " " * int((int((value["size"] - len(str(item)))) / 2)) value["text"][index] = "%s%s" % (spacing, value["text"][index]) while len(value["text"][index]) < value["size"]: value["text"][index] = "%s " % value["text"][index] else: spacing = " " * int((value["size"] - len(str(item)))) value["text"][index] = "%s%s" % (value["text"][index], spacing) line_to_append = "" spacing = " " for line, line_value in enumerate(tabular_data): for col, _ in enumerate(line_value): line_to_append += "%s%s" % (columns_info[col]["text"][line], spacing) value_to_return.append(line_to_append.strip(" ")) line_to_append = "" return "\n".join(value_to_return) class EINVAL(Exception): pass class ENOMEM(Exception): pass class GLXEnviron: def __init__(self): self.__environ = None self.environ = {} @property def environ(self) -> dict: return self.__environ @environ.setter def environ(self, value): if value is None: value = {} if not isinstance(value, dict): raise TypeError("'environ' property value must be a dict type or None") if self.environ != value: self.__environ = value def getenv(self, name=None): if name and (not isinstance(name, str)): raise TypeError("'name' parameter must be a str or None") return self.environ.get(name, None) def setenv(self, envname=None, envval=None, overwrite=None): if not isinstance(envname, str): raise TypeError("'name' parameter must be a str") if not isinstance(envval, str): raise TypeError("'value' parameter must be a str") if (envname == "") or ("=" in envname): raise EINVAL() if (envname in self.environ) and (overwrite == 0): return 0 if ((envname in self.environ) and (overwrite != 0)) or ( envname not in self.environ ): try: self.environ[envname] = envval return 0 except MemoryError as exc: raise ENOMEM() from exc return -1 def unsetenv(self, name): if not isinstance(name, str): raise TypeError("'name' parameter value must be a str type") if (name == "") or ("=" in name): raise EINVAL() if name not in self.environ: return 0 try: del self.environ[name] return 0 except KeyError: return -1 def _append_slash_if_dir(p): if p and isdir(p) and (p[(-1)] != sep): return p + sep else: return p def glxsh_completer_directory(_, line, __, ___): arg = line.split()[1:] if not arg: return [(normpath(f) + sep) for f in os.listdir(getcwd()) if isdir(f)] else: (directory, part, base) = arg[(-1)].rpartition(sep) if part == "": directory = getcwd() elif directory == "": directory = sep completions = [] for f in os.listdir(directory): if f.startswith(base): if isdir(joinpath(directory, f)): completions.append(joinpath(f, "")) return completions def glxsh_completer_file(_, line, __, ___): arg = line.split()[1:] if not arg: return os.listdir(getcwd()) else: (directory, part, base) = arg[(-1)].rpartition(sep) if part == "": directory = getcwd() elif directory == "": directory = sep return [ ((normpath(f) + sep) if isdir(f) else normpath(f)) for f in os.listdir(directory) if f.startswith(base) ] def glxsh_complete_chmod(text, line, begidx, endidx): return glxsh_completer_file(text, line, begidx, endidx) def glxsh_complete_rmdir2(_, line, begidx, endidx): before_arg = line.rfind(" ", 0, begidx) if before_arg == (-1): return fixed = line[(before_arg + 1) : begidx] arg = line[(before_arg + 1) : endidx] pattern = arg + "*" completions = [] for path in glob(pattern): path = _append_slash_if_dir(path) completions.append(path.replace(fixed, "", 1)) return completions def glxsh_complete_rmdir(_, line, __, ___): if line == "rmdir": return ["rmdir "] arg = line.split()[1:] if not arg: return [(normpath(f) + sep) for f in os.listdir(getcwd()) if isdir(f)] else: (directory, part, base) = arg[(-1)].rpartition(sep) if part == "": directory = "." + sep elif directory == "": directory = sep completions = [] for f in os.listdir(directory): if f.startswith(base): if isdir(os.path.join(directory, f)): completions.append((f + sep)) return completions var_env_pattern = re.compile(".*\\$$", re.IGNORECASE) var_env_name_pattern = re.compile(".*\\$(\\w+)$", re.IGNORECASE) def glxsh_complete_echo(_, line, __, ___, shell=None): if line == "echo": return ["echo "] arg = line.split()[1:] if not arg: pass else: completions = [] if var_env_name_pattern.match(arg[(-1)]): search_result = re.search(var_env_name_pattern, arg[(-1)]) if not search_result: return None for key, value in shell.environ.items(): if str(key).startswith(search_result.group(1)) and ( key != search_result.group(1) ): completions.append(("%s " % key)) return completions elif var_env_pattern.match(arg[(-1)]): search_result = re.search(var_env_pattern, arg[(-1)]) if not search_result: return None for key, value in shell.environ.items(): completions.append(("%s " % key)) return completions else: return None def glxsh_complete_du(text, line, begidx, endidx, shell=None): if line == "du": return ["du "] arg = line.split()[1:] completions = [] if not arg: for f in os.listdir(getcwd()): if isdir(f): completions.append((normpath(f) + sep)) if isfile(f) or islink(f): completions.append(normpath(f)) else: (directory, part, base) = arg[(-1)].rpartition(sep) if part == "": directory = "." + sep elif directory == "": directory = sep completions = [] for f in os.listdir(directory): if f.startswith(base) or f.startswith((base + sep)): if isdir(f): completions.append((normpath(f) + sep)) if isfile(f) or islink(f): completions.append(normpath(f)) return completions parser_alias = ArgumentParser( name="alias - define or display aliases", description="The alias utility shall create or redefine alias definitions or write the values of existing alias definitions to standard output. An alias definition provides a string value that shall replace a command name when it is encountered", synopsis=["alias [alias-name[=string]...]"], exit_status={ "0": "Successful completion.", ">0": "One of the name operands specified did not have an alias definition, or an error occurred.", }, ) parser_alias.add_argument( "alias-name", nargs="*", type=str, help="Write the alias definition to standard output.", ) parser_alias.add_argument( "alias-name=string", nargs="*", help="Assign the value of string to the alias alias-name.", ) def glxsh_alias(**kwargs): shell = kwargs.get("shell", None) string = kwargs.get("string", None) exit_code = 0 if string is None: if shell: for key, value in shell.alias.items(): sys.stdout.write(("%s='%s'\n" % (key, value))) return 0 else: return 1 try: split_line = quoted_split(string) if split_line: for alias_name in split_line: if "=" in alias_name: spited_value = alias_name.split("=") shell.alias[spited_value[0]] = str( spited_value[1].strip('"').strip("'") ) elif alias_name in shell.alias: sys.stdout.write( ("%s='%s'\n" % (alias_name, shell.alias[alias_name])) ) return exit_code else: for key, value in shell.alias.items(): sys.stdout.write(("%s='%s'\n" % (key, value))) return 0 except OSError as error: sys.stderr.write(("alias: %s\n" % error_code_to_text(error.errno))) return 1 parser_basename = ArgumentParser( name="basename - return non-directory portion of a pathname", prog="basename", description="Print string with any leading directory components removed. If specified, also remove a trailing suffix.", ) parser_basename.add_argument( "string", type=str, nargs="?", default=None, help="a string" ) parser_basename.add_argument("suffix", nargs="?", default=None, help="a string") def glxsh_basename(string=None, suffix=None): try: sys.stdout.write(("%s\n" % basename(string, suffix))) return 0 except (Exception, ArithmeticError) as error: sys.stderr.write(("basename: %s\n" % error)) return 1 parser_cat = ArgumentParser( name="cat - concatenate and print files", description="The cat utility shall read files in sequence and shall write their contents to the standard output in the same sequence.", ) parser_cat.add_argument( "file", nargs="*", type=FileType("r"), help="A pathname of an input file. If no file operands are specified, the standard input shall be used. If a file is '-', the cat utility shall read from the standard input at that point in the sequence. The cat utility shall not close and reopen standard input when it is referenced in this way, but shall accept multiple occurrences of '-' as a file operand.", ) parser_cat.add_argument( "-u", dest="update", action="store_true", default=False, help="ignored" ) def glxsh_cat(files): def read_file_in_chunks(file_object, chunk_size=3072): while True: data = file_object.read(chunk_size) if not data: break (yield data) def read_stdin(file_object): while True: data = file_object.readline() if not data: break (yield data) try: if (files is None) or (files == []): files = ["-"] for file in files: if file == "-": for piece in read_stdin(sys.stdin): sys.stdout.write(piece) else: with open(file, "r") as f: for piece in read_file_in_chunks(f): sys.stdout.write(piece) return 0 except (Exception, ArithmeticError) as error: sys.stderr.write(("cat: %s\n" % error)) return 1 parser_cd = ArgumentParser( name="cd - change the working directory", description="The cd utility shall change the working directory of the current shell execution environment", ) parser_cd.add_argument( "directory", nargs="?", const=0, help="An absolute or relative pathname of the directory that shall become the new working directory. The interpretation of a relative pathname by cd depends on the -L option and the CDPATH and PWD environment variables. If directory is an empty string, the directory be come HOME environment variable.", ) parser_cd.add_argument( "-P", dest="physical", action="store_true", default=False, help="Handle the operand dot-dot physically; symbolic link components shall be resolved before dot-dot components are processed", ) parser_cd.add_argument( "-L", dest="logical", action="store_true", default=False, help="Handle the operand dot-dot logically; symbolic link components shall not be resolved before dot-dot components are processed ", ) def glxsh_cd(directory=None, logical=None, physical=None, shell=None): if directory is None: directory = shell.getenv("HOME") if logical and physical: physical = False if (not logical) and (not physical): logical = True if directory == "-": if shell.getenv("OLDPWD"): directory = shell.getenv("OLDPWD") else: directory = shell.getenv("PWD") elif directory: directory = expanduser(directory) try: if logical: os.chdir(normpath(directory)) else: os.chdir(realpath(directory)) shell.setenv("OLDPWD", shell.getenv("PWD"), 1) shell.setenv("PWD", os.getcwd(), 1) return 0 except (Exception, ArithmeticError) as error: sys.stderr.write( ("cd: %s: '%s'\n" % (error_code_to_text(error.errno), directory)) ) return 1 parser_chmod = ArgumentParser( name="chmod - change the file modes", description="The chmod utility shall change any or all of the file mode bits of the file named by each file operand in the way specified by the mode operandself.\n\nOnly a process whose effective user ID matches the user ID of the file, or a process with appropriate privileges, shall be permitted to change the file mode bits of a file.\n", ) parser_chmod.add_argument( "-R", dest="recursive", action="store_true", default=False, help="Recursively change file mode bits.", ) parser_chmod.add_argument( "mode", dest="mode", help="Represents the change to be made to the file mode bits of each file named by one of the file operands", ) parser_chmod.add_argument( "file", dest="file", nargs="+", help="A pathname of a file whose file mode bits shall be modified.", ) def glxsh_chmod(recursive=None, mode=None, file=None): exit_code = 0 def _chmod(path, mode): try: os.chmod(path, mode) return 0 except OSError as error: sys.stderr.write( ("chmod: %s: '%s'\n" % (error_code_to_text(error.errno), path)) ) return 1 except (Exception, BaseException) as error: sys.stderr.write(("chmod: %s: '%s'\n" % (error, path))) return 1 for path in file: if recursive: dir_scan = glob(("%s/**/*" % path)) else: dir_scan = glob(path) for f in dir_scan: if stat.S_ISDIR(os.stat(f)[0]): try: mode = symbolic_mode( mode, mode=os.stat(f).st_mode, umask=os.umask(0), isdir=1 ) except TypeError: pass else: try: mode = symbolic_mode( mode, mode=os.stat(f).st_mode, umask=os.umask(0), isdir=0 ) except TypeError: pass if not isinstance(mode, int): mode = int(mode, 8) exit_code += _chmod(f, mode) return 1 if exit_code else 0 parser_clear = ArgumentParser(name="clear", description="Clear screen") def glxsh_clear(): try: sys.stdout.write("\x1b[2J\x1b[H") return 0 except (Exception, ArithmeticError) as error: sys.stderr.write(("clear: %s\n" % error)) return 1 parser_cp = ArgumentParser( name="cp - copy files", description="The first synopsis form is denoted by two operands, neither of which are existing files of type directory. The cp utility shall copy the contents of source_file (or, if source_file is a file of type symbolic link, the contents of the file referenced by source_file) to the destination path named by target_file.", exit_status={ "0": "The utility executed successfully and all requested changes were made.", ">0": "An error occurred.", }, ) parser_cp.add_argument( "-f", dest="force", action="store_true", help="If a file descriptor for a destination file cannot be obtained, as described in step 3.a.ii., attempt to unlink the destination file and proceed.", ) parser_cp.add_argument( "-i", dest="interactive", action="store_true", help="Write a prompt to standard error before copying to any existing non-directory destination file.", ) parser_cp.add_argument( "source_file", nargs="+", help="A pathname of a file to be copied. If a source_file operand is '-', it shall refer to a file named -; implementations shall not treat it as meaning standard input. target_file", ) parser_cp.add_argument( "target_file", help="A pathname of an existing or nonexistent file, used for the output when a single file is copied. If a target_file operand is '-', it shall refer to a file named -; implementations shall not treat it as meaning standard output.", ) def glxsh_cp(source_file, target_file, interactive=False): try: with open(source_file, "r") as source_file_descriptor: pass except (Exception, ArithmeticError) as error: sys.stderr.write( ("cp: %s: '%s'\n" % (error_code_to_text(error.errno), source_file)) ) return 1 if interactive and exists(target_file): if ( input(("do you want to overwrite %s file ? (Y/n)" % target_file)) .upper() .startswith("N") ): return 0 try: with open(target_file, "w") as target_file_descriptor: pass except (Exception, ArithmeticError) as error: sys.stderr.write( ("cp: %s: '%s'\n" % (error_code_to_text(error.errno), target_file)) ) return 1 try: with open(source_file, "r") as source_file_descriptor: with open(target_file, "w") as target_file_descriptor: target_file_descriptor.write(source_file_descriptor.read()) return 0 except (Exception, ArithmeticError) as error: sys.stderr.write(("cp: %s:\n" % error_code_to_text(error.errno))) return 1 parser_date = ArgumentParser( name="date - write the date and time", description="The date utility shall write the date and time to standard output or attempt to set the system date and time. By default, the current date and time shall be written. If an operand beginning with '+' is specified, the output format of date shall be controlled by the conversion specifications and other text in the operand.", ) parser_date.add_argument( "-u", dest="u", action="store_true", help="Perform operations as if the TZ environment variable was set to the string 'UTC0', or its equivalent historical value of 'GMT0'. Otherwise, date shall use the timezone indicated by the TZ environment variable or the system default if that variable is unset or null.", ) parser_date.add_argument( "format", nargs="*", help="When the format is specified, each conversion specifier shall be replaced in the standard output by its corresponding value. All other characters shall be copied to the output without change. The output shall always be terminated with a <newline>.", ) def glxsh_date(u=None, custom_format=None, shell=None): exit_code = 0 from time import localtime, tzname tm = localtime() dow = ("Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun") dow_full = ( "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday", ) mon = ( "???", "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec", ) mon_full = ( "???", "January", "February", "March", "April", "May", "June", "July", "August", "September", "October", "November", "December", ) if custom_format == "": custom_format = "+%a %b %e %H:%M:%S %Z %Y" if custom_format.startswith('"') and custom_format.endswith('"'): custom_format = custom_format[1:][:(-1)] if custom_format.startswith("'") and custom_format.endswith("'"): custom_format = custom_format[1:][:(-1)] if custom_format and str(custom_format).startswith("+"): custom_format = custom_format.replace("%a", dow[tm[6]]) custom_format = custom_format.replace("%A", dow_full[tm[6]]) custom_format = custom_format.replace("%b", mon[tm[1]]) custom_format = custom_format.replace("%B", mon_full[tm[1]]) custom_format = custom_format.replace("%C", ("%02d" % int((tm[0] / 100)))) custom_format = custom_format.replace("%d", ("%02d" % tm[2])) custom_format = custom_format.replace( "%D", ("%02d/%02d/%s" % (tm[1], tm[2], str(tm[0])[(-2):])) ) custom_format = custom_format.replace("%e", ("%d" % tm[2])) custom_format = custom_format.replace("%h", mon[tm[1]]) custom_format = custom_format.replace("%H", ("%02d" % tm[3])) if tm[3] in range(13, 23, 1): hour = tm[3] - 12 elif tm[3] == 0: hour = 12 else: hour = tm[3] custom_format = custom_format.replace("%i", ("%02d" % hour)) custom_format = custom_format.replace("%j", ("%003d" % tm[7])) custom_format = custom_format.replace("%m", ("%02d" % tm[1])) custom_format = custom_format.replace("%M", ("%02d" % tm[4])) custom_format = custom_format.replace("%n", "\n") if tm[3] in range(1, 11, 1): am_pm_text = "AM" elif tm[3] in range(13, 23, 1): am_pm_text = "PM" elif tm[3] == 0: am_pm_text = "AM" else: am_pm_text = "" custom_format = custom_format.replace("%p", am_pm_text) custom_format = custom_format.replace( "%r", ("%02d:%02d:%02d %s" % (hour, tm[4], tm[5], am_pm_text)) ) custom_format = custom_format.replace("%S", ("%02d" % tm[5])) custom_format = custom_format.replace("%t", "\t") custom_format = custom_format.replace( "%T", ("%02d:%02d:%02d" % (tm[3], tm[4], tm[5])) ) custom_format = custom_format.replace("%u", ("%1d" % int((tm[6] + 1)))) custom_format = custom_format.replace("%U", ("%02d" % int(((tm[7] + 6) / 7)))) custom_format = custom_format.replace("%V", ("%02d" % int(((tm[7] + 6) / 7)))) custom_format = custom_format.replace("%w", ("%1d" % int((tm[6] + 1)))) custom_format = custom_format.replace("%W", ("%02d" % int(((tm[6] / 7) or 7)))) custom_format = custom_format.replace("%y", ("%s" % str(tm[0])[(-2):])) custom_format = custom_format.replace("%Y", ("%d" % tm[0])) if u: custom_format = custom_format.replace("%Z", "UTC") elif shell.environ.get("TZ"): custom_format = custom_format.replace( "%Z", ("%s" % shell.environ.get("TZ")) ) elif tzname: custom_format = custom_format.replace("%Z", tzname[0]) else: custom_format = custom_format.replace("%Z ", "") custom_format = custom_format.replace("%%", "%") custom_format = custom_format[1:] sys.stdout.write(("%s\n" % custom_format)) else: exit_code += 1 return 1 if exit_code else 0 parser_df = ArgumentParser( name="df - report free disk space", description="The df utility shall write the amount of available space and file slots for file systems on which the invoking user has appropriate read access. File systems shall be specified by the file operands; when none are specified, information shall be written for all file systems.The format of the default output from df is unspecified, but all space figures are reported in 512-byte units, unless the -k option is specified. This output shall contain at least the file system names, amount of available space on each of these file systems, and, if no options other than -t are specified, the number of free file slots, or inodes, available; when -t is specified, the output shall contain the total allocated space as well.", ) parser_df.add_argument( "-h", dest="human_readable", action="store_true", default=False, help="print sizes in powers of 1024 (e.g., 1023M)", ) parser_df.add_argument( "-k", dest="kilo", action="store_const", const=1024, help="use 1024-byte units, instead of the default 512-byte units, when writing space figures.", ) parser_df.add_argument( "-P", dest="portability", action="store_true", default=True, help="produce a POSIX output", ) parser_df.add_argument( "-t", dest="total", action="store_true", help="include total allocated-space figures in the output. ", ) parser_df.add_argument( "file", nargs="?", const=0, help="A pathname of a file within the hierarchy of the desired file system. If a file other than a FIFO, a regular file, a directory, or a special file representing the device containing the file system (for example, /dev/dsk/0s1) is specified, the results are unspecified. If the file operand names a file other than a special file containing a file system, df shall write the amount of free space in the file system containing the specified file operand. Otherwise, df shall write the amount of free space in that file system. ", ) def glxsh_df(file=None, block_size=None, total=None, human_readable=None): if block_size is None: block_size = 512 devices_list = [] if file: if not exists(file): return "df: %s: No such file or directory" % file if (not os.access(file, os.R_OK)) or ( not os.access(df_find_mount_point(file), os.R_OK) ): return "df: %s : Permission denied\n" % file for line in df_get_devices(): if df_find_mount_point(file) == line[1]: devices_list.append( df_get_device_information( file_system_name=line[0], file_system_root=line[1], block_size=block_size, ) ) else: for line in df_get_devices(): devices_list.append( df_get_device_information( file_system_name=line[0], file_system_root=line[1], block_size=block_size, ) ) if devices_list: if total: (total_space_free, total_space_used, total_total_space) = df_get_totals( devices_list ) devices_list.append( [ "total", total_total_space, total_space_used, total_space_free, ( "%d%%" % int( math.ceil( ( 100 * ( float((total_total_space - total_space_free)) / total_total_space ) ) ) ) ), "-", ] ) (block_size_text, tabular_data) = df_get_info_to_print( block_size, devices_list, human_readable ) df_print_final(block_size_text, tabular_data) return 0 else: return 1 def df_get_info_to_print(block_size, devices_list, human_readable): block_size_text = f"{block_size}-blocks" if human_readable: tabular_data = [] block_size_text = "Size" for line in devices_list: if ( (str(line[1]) != "-") and (str(line[2]) != "-") and (str(line[3]) != "-") ): tabular_data.append( [ line[0], size_of(size=(int(line[1]) * block_size), suffix=""), size_of(size=(int(line[2]) * block_size), suffix=""), size_of(size=(int(line[3]) * block_size), suffix=""), line[4], line[5], ] ) else: tabular_data.append(line) else: tabular_data = devices_list return (block_size_text, tabular_data) def df_get_totals(devices_list): total_total_space = 0 total_space_used = 0 total_space_free = 0 for device in devices_list: try: total_total_space += int(device[1]) except ValueError: pass try: total_space_used += int(device[2]) except ValueError: pass try: total_space_free += int(device[3]) except ValueError: pass return (total_space_free, total_space_used, total_total_space) def df_print_final(block_size_text, tabular_data): sys.stdout.write( ( "%s\n" % tabulate( tabular_data=tabular_data, headers=[ "Filesystem", block_size_text, "Used", "Available", "Capacity", "Mounted on", ], tablefmt="plain", colalign=("left", "right", "right", "right", "right", "left"), ) ) ) def df_find_mount_point(path): if not islink(path): path = abspath(path) elif islink(path) and lexists(os.readlink(path)): path = realpath(path) if hasattr(os, "path") and hasattr(os.path, "ismount"): while not os.path.ismount(path): path = dirname(path) if islink(path) and lexists(os.readlink(path)): path = realpath(path) return path def df_get_device_information( file_system_name=None, file_system_root=None, block_size=None ): try: statvfs = os.statvfs(file_system_root) if type(statvfs) == tuple: space_free = (statvfs[4] * statvfs[1]) / block_size total_space = (statvfs[2] * statvfs[1]) / block_size else: space_free = (statvfs.f_bavail * statvfs.f_frsize) / block_size total_space = (statvfs.f_blocks * statvfs.f_frsize) / block_size space_used = total_space - space_free if total_space == 0: percentage_used = "-" else: percentage_used = "%d%%" % int( math.ceil((100 * (float((total_space - space_free)) / total_space))) ) return [ ("%s" % file_system_name), ("%d" % total_space), ("%d" % space_used), ("%d" % space_free), ("%s" % percentage_used), ("%s" % file_system_root), ] except PermissionError: return [ ("%s" % file_system_name), ("%s" % "-"), ("%s" % "-"), ("%s" % "-"), ("%s" % "-"), ("%s" % file_system_root), ] def df_get_devices(file=None): if (file is None) and exists("/etc/mtab"): file = "/etc/mtab" if (file is None) and exists("/proc/mounts"): file = "/proc/mounts" if file is None: raise SystemError("Impossible to locate /etc/mtab or /proc/mounts file") file_entries = [] for line in df_get_file_content(file=file).splitlines(): if len(line.split()) < 4: continue file_entries.append(line.split()) return file_entries def df_get_file_content(file=None): if not exists(file): raise FileExistsError(f"{file} do not exist") if not os.access(file, os.R_OK): raise PermissionError(f"{file} can't be read") with open(file) as datafile: return datafile.read().strip() parser_dirname = ArgumentParser( name="dirname - return the directory portion of a pathname", description="The string operand shall be treated as a pathname, as defined in XBD Pathname. The string string shall be converted to the name of the directory containing the filename corresponding to the last pathname component in string.", ) parser_dirname.add_argument("string", nargs="?", const=0, help="A string") def glxsh_dirname(string=None): try: sys.stdout.write(("%s\n" % dirname(string))) return 0 except (Exception, ArithmeticError) as error: sys.stderr.write(("dirname: %s\n" % error)) return 1 parser_du = ArgumentParser( name="du - estimate file space usage", description="The du utility shall write to standard output the size of the file space allocated to, and the size of the file space allocated to each subdirectory of, the file hierarchy rooted in each of the specified files.", synopsis=["du [-a|-s] [-kx] [-H|-L] [file...]"], exit_status={"0": "Successful completion.", ">0": "An error occurred."}, ) parser_du.add_argument( "-a", action="store_true", dest="a", help="In addition to the default output, report the size of each file not of type directory in the file hierarchy rooted in the specified file. The -a option shall not affect whether non-directories given as file operands are listed.", ) parser_du.add_argument( "-H", action="store_true", dest="H", help="If a symbolic link is specified on the command line, du shall count the size of the file or file hierarchy referenced by the link.", ) parser_du.add_argument( "-k", action="store_true", dest="k", default=False, help="Write the files sizes in units of 1024 bytes, rather than the default 512-byte units.", ) parser_du.add_argument( "-L", action="store_true", dest="L", help="If a symbolic link is specified on the command line or encountered during the traversal of a file hierarchy, du shall count the size of the file or file hierarchy referenced by the link.", ) parser_du.add_argument( "-s", action="store_true", dest="s", help="Instead of the default output, report only the total sum for each of the specified files.", ) parser_du.add_argument( "-x", action="store_true", dest="x", help="When evaluating file sizes, evaluate only those files that have the same device as the file specified by the file operand.", ) parser_du.add_argument( "file", dest="files", nargs="*", help="The pathname of a file whose size is to be written. If no file is specified, the current directory shall be used.", ) def glxsh_du(a, H, k, L, s, x, files): exit_code = 0 if k: byte_unit = 1024 else: byte_unit = 512 if not files: files = [getcwd()] try: for path in files: if os.access(path, os.R_OK): if islink(path): sys.stdout.write(("%d %s\n" % (os.lstat(path).st_size, path))) if isfile(path): sys.stdout.write( ( "%d %s\n" % (((os.lstat(path).st_blocks * 512) / byte_unit), path) ) ) have = [] for directory_path, directory_names, filenames in os.walk(path): for f in filenames: fp = joinpath(directory_path, f) if islink(fp): sys.stdout.write(("%d %s\n" % (os.lstat(fp).st_size, fp))) continue st = os.lstat(fp) if st.st_ino in have: continue have.append(st.st_ino) sys.stdout.write( ("%d %s\n" % (((st.st_blocks * 512) / byte_unit), fp)) ) for d in directory_names: dp = joinpath(directory_path, d) if islink(dp): sys.stdout.write(("%d %s\n" % (os.lstat(dp).st_size, dp))) else: sys.stderr.write(("du: %s: '%s'\n" % (error_code_to_text(13), path))) except OSError as error: sys.stderr.write(("du: %s\n" % error_code_to_text(error.errno))) exit_code += 1 except KeyboardInterrupt: sys.stdout.write("\n") return exit_code parser_echo = ArgumentParser( name="echo - write arguments to standard output", description="The echo utility writes its arguments to standard output, followed by a <newline>. If there are no arguments, only the <newline> is written.", ) parser_echo.add_argument( "-n", dest="newline", action="store_true", help="Suppress the <newline> that would otherwise follow the final argument in the output.", ) parser_echo.add_argument( "string", nargs="*", type=str, help="A string to be written to standard outputself.\n", ) def glxsh_echo(**kwargs): shell = kwargs.get("shell", None) newline = kwargs.get("newline", False) string = kwargs.get("string", "") try: value_to_return = str(string) if string.startswith('"') and string.endswith('"'): value_to_return = string[1:][:(-1)] for value in value_to_return.split(" "): if value.startswith("$"): value_to_return = value_to_return.replace( value, shell.environ.get(value.replace("$", ""), "") ) if string.startswith("$") and (" " not in string): value_to_return = string.replace( string, shell.environ.get(string.replace("$", ""), "") ) sys.stdout.write(value_to_return) if not newline: sys.stdout.write("\n") return 0 except (Exception, BaseException) as error: sys.stderr.write(error) return 1 parser_env = ArgumentParser( name="env - set the environment for command invocation", description="The env utility shall obtain the current environment, modify it according to its arguments, then invoke the utility named by the utility operand with the modified environment.", ) parser_env.add_argument( "-i", dest="invoke", action="store_true", help="Invoke utility with exactly the environment specified by the arguments; the inherited environment shall be ignored completely.", ) parser_env.add_argument( "name", nargs="?", dest="name", help="Arguments of the form name= value shall modify the execution environment, and shall be placed into the inherited environment before the utility is invoked.", ) parser_env.add_argument( "utility", nargs="?", dest="utility", help="The name of the utility to be invoked. If the utility operand names any of the special built-in utilities in Special Built-In Utilities, the results are undefined.", ) parser_env.add_argument( "argument", nargs="?", dest="argument", help="A string to pass as an argument for the invoked utility.", ) def glxsh_env(name, utility, argument, shell): if shell and hasattr(shell, "environ"): if name: try: func = getattr(shell, ("do_" + utility)) return func(argument) except AttributeError: try: pr = subprocess.run(utility, argument, env=shell.environ) return pr.returncode except (Exception, BaseException): return shell.default(utility) else: for name, value in shell.environ.items(): sys.stdout.write(("%s=%s\n" % (name, value))) return 0 parser_exit = ArgumentParser( name="exit", description="exit shell with a given exit code" ) parser_exit.add_argument("code", nargs="*", type="int", help="exit code") def glxsh_exit(*args, **kwargs): shell = kwargs.get("shell", None) code = kwargs.get("code", None) if (code is None) or (not code): code = 0 else: code = code[0] if shell: shell.exit_code = code sys.exit(code) else: return code parser_false = ArgumentParser( name="false - return false value", synopsis=["false"], description="The false utility shall return with a non-zero exit code.", exit_status={"1": ""}, ) def glxsh_false(): return 1 parser_head = ArgumentParser( name="head - copy the first part of", description="The head utility shall copy its input files to the standard output, ending the output for each file at a designated point. ", ) parser_head.add_argument( "file", nargs="*", help="A pathname of an input file. If no file operands are specified, the standard input shall be used.", ) parser_head.add_argument( "-n", dest="number", type=int, default=10, help="The first number lines of each input file shall be copied to standard output. ", ) def glxsh_head(files, number=10): if isinstance(number, int) and (number < 1): return 0 if (files is None) or (not isinstance(files, list)) or (files == []): files = ["-"] def read_stdin(): stdin_count = 1 while stdin_count <= number: data = sys.stdin.readline() if not data: break (yield data) stdin_count += 1 try: count = 0 for file in files: filename = file if file == "-": filename = "standard input" if (count == 0) and (len(files) > 1): sys.stdout.write(("==> %s <==\n" % filename)) elif count > 0: sys.stdout.write(("\n==> %s <==\n" % filename)) if file == "-": try: for piece in read_stdin(): sys.stdout.write(piece) except KeyboardInterrupt: sys.stdout.write("\n") return 130 else: with open(file) as f: for _ in range(number): line = f.readline() if not line: break sys.stdout.write(line) count += 1 return 0 except (Exception, ArithmeticError) as error: sys.stderr.write(("head: %s\n" % error)) return 1 parser_ls = ArgumentParser( name="ls - list directory contents", description="List information about the files (the current directory by default).\nSort entries alphabetically if none of -cftuvSUX is specified.", synopsis=[ "ls [-ikqrs] [-glno] [-A|-a] [-C|-m|-x|-1] [-F|-p] [-H|-L] [-R|-d] [-S|-f|-t] [-c|-u] [file...]" ], ) parser_ls.add_argument( "-A", action="store_true", dest="A", help="Do not list implied . and .." ) parser_ls.add_argument( "-C", action="store_true", dest="C", help="List entries by columns" ) parser_ls.add_argument( "-F", dest="F", action="store_true", help="Append indicator (one of */=>@|) to entries", ) parser_ls.add_argument( "-H", dest="H", action="store_true", help="Follow symbolic links listed on the command line", ) parser_ls.add_argument( "-L", dest="L", action="store_true", help="when showing file information for a symbolic link, show information for the file the link references rather than for the link itself", ) parser_ls.add_argument( "-R", dest="recurse", action="store_true", help="List subdirectories recursively" ) parser_ls.add_argument( "-S", dest="S", action="store_true", help="sort by file size, largest first" ) parser_ls.add_argument( "-a", dest="a", action="store_true", help="do not ignore entries starting with ." ) parser_ls.add_argument( "-c", dest="c", action="store_true", help="with -lt: sort by, and show, ctime (time of last modification of file status information); otherwise: sort by ctime, newest first list entries by columns", ) parser_ls.add_argument( "-d", dest="d", action="store_true", help="list directories themselves, not their contents", ) parser_ls.add_argument( "-f", dest="f", action="store_true", help="do not sort, enable -aU, disable -ls --color", ) parser_ls.add_argument( "-g", dest="g", action="store_true", help="group directories before files;" ) parser_ls.add_argument( "-i", dest="i", action="store_true", help="print the index number of each file" ) parser_ls.add_argument( "-k", dest="k", action="store_true", help="default to 1024-byte blocks for disk usage; used only with -s and per directory totals", ) parser_ls.add_argument( "-l", dest="l", action="store_true", help="use a long listing format" ) parser_ls.add_argument( "-m", dest="m", action="store_true", help="fill width with a comma separated list of entries", ) parser_ls.add_argument( "-n", dest="n", action="store_true", help="like -l, but list numeric user and group IDs", ) parser_ls.add_argument( "-o", dest="o", action="store_true", help="like -l, but do not list group information", ) parser_ls.add_argument( "-p", dest="p", action="store_true", help="append / indicator to directories" ) parser_ls.add_argument( "-q", dest="q", action="store_true", help="enclose entry names in double quotes" ) parser_ls.add_argument( "-r", dest="r", action="store_true", help="reverse order while sorting" ) parser_ls.add_argument( "-s", dest="s", action="store_true", help="print the allocated size of each file, in blocks", ) parser_ls.add_argument( "-t", dest="t", action="store_true", help="sort by time, newest first" ) parser_ls.add_argument( "-u", dest="u", action="store_true", help="with -lt: sort by, and show, access time; with -l: show access time and sort by name; otherwise: sort by access time, newest first", ) parser_ls.add_argument( "-x", dest="x", action="store_true", help="list entries by lines instead of by columns", ) parser_ls.add_argument( "-1", dest="one", action="store_true", help="list one file per line. Avoid '\n' with -q or -b", ) parser_ls.add_argument( "file", nargs="*", help="A pathname of a file to be written. If the file specified is not found, a diagnostic message shall be output on standard error.", ) def glxsh_ls( A=None, C=None, F=None, H=None, L=None, recurse=None, S=None, a=None, c=None, d=None, f=None, g=None, i=None, k=None, l=None, m=None, n=None, o=None, p=None, q=None, r=None, s=None, t=None, u=None, x=None, one=None, file=None, **kwargs ): def _print_long_format(list_to_display): tabular_data = [] for pathname in list_to_display: lstat = os.stat(pathname) tabular_data.append( [ ("%s" % filemode(lstat.st_mode)), ("%u" % lstat.st_nlink), ("%s" % getpwuid(lstat.st_uid)[0]), ("%s" % getgrgid(lstat.st_gid)[0]), ("%u" % lstat.st_mode), ( "%s" % time.strftime("%b %d %H:%M", time.localtime(lstat.st_mtime)) ), ("%s" % _add_slash_if_is_dir(pathname)), ] ) sys.stdout.write( ( "%s\n" % tabulate( tabular_data=tabular_data, headers=[], tablefmt="plain", colalign=( "left", "right", "right", "right", "left", "right", "left", ), ) ) ) def _add_slash_if_is_dir(item): if p and isdir(item) and (not item.endswith("/")): return "%s/" % item return item exit_code = 0 stdout = kwargs.get("stdout", sys.stdout) stdin = kwargs.get("stdin", sys.stdin) stderr = kwargs.get("stderr", sys.stderr) shell = kwargs.get("shell") or None _files_to_look = [] if not file: if recurse: _files_to_look = ["**/*"] else: _files_to_look = ["*"] if file: for _file in file: if isdir(_file): if recurse: _files_to_look.append(("%s/**/*" % _file)) else: _files_to_look.append(("%s/*" % _file)) else: _files_to_look.append(_file) for pathname in _files_to_look: try: list_to_display = [] dir_scan = glob(pathname) if a: dir_scan.insert(0, "..") dir_scan.insert(0, ".") for f in dir_scan: if F: if isdir(("%s/%s" % (pathname, f))): f = "%s/" % f elif os.access(("%s/%s" % (pathname, f)), os.X_OK): f = "%s*" % f elif islink(("%s/%s" % (pathname, f))): f = "%s@" % f if A or a: if A: if (f != ".") and (f != ".."): list_to_display.append(f) else: list_to_display.append(f) elif not f.startswith("."): list_to_display.append(f) if C and (not l): columnize(list_to_display) elif l: _print_long_format(list_to_display) else: for f in list_to_display: f = _add_slash_if_is_dir(f) sys.stdout.write(("%s\n" % f)) except (Exception, ArithmeticError) as error: sys.stderr.write(("ls: %s: '%s'\n" % (error, pathname))) exit_code += 1 return exit_code parser_mkdir = ArgumentParser( name="mkdir - make directories", description="The mkdir utility shall create the directories specified by the operands", ) parser_mkdir.add_argument( "-p", dest="parents", action="store_true", help="Create any missing intermediate pathname components.", ) parser_mkdir.add_argument( "-m", dest="mode", nargs="?", type=str, default="755", help="Set the file permission bits of the newly-created directory to the specified mode value.", ) parser_mkdir.add_argument( "dir", nargs="+", help="A pathname of a directory to be created." ) def glxsh_mkdir(directories=None, parents=False, mode="755"): exit_code = 0 def make_directory(path, path_mode): try: from os import mkdir try: mkdir(path=path, mode=path_mode) except TypeError: mkdir(path) except ImportError as error: sys.stderr.write(("mkdir: '%s'\n" % error)) except OSError as error: sys.stderr.write( ("mkdir: '%s': %s\n" % (path, error_code_to_text(error.errno))) ) return 1 return 0 for directory in directories: if parents: if directory.startswith(sep): directory_to_create = sep else: directory_to_create = "" for sub_directory in directory.split(sep): if directory_to_create == sep: directory_to_create = "%s%s" % (directory_to_create, sub_directory) elif (directory_to_create != "") and (sub_directory != ""): directory_to_create = joinpath(directory_to_create, sub_directory) elif sub_directory == "": continue elif sub_directory == ".": directory_to_create = "." continue elif sub_directory == "..": directory_to_create = ".." continue else: directory_to_create = sub_directory exit_code += make_directory(directory_to_create, mode) else: exit_code += make_directory(directory, mode) return 1 if exit_code else 0 parser_mv = ArgumentParser( name="mv - move files", description="The mv utility shall move the file named by the source_file operand to the destination specified by the target_file. This first synopsis form is assumed when the final operand does not name an existing directory and is not a symbolic link referring to an existing directory. In this case, if source_file names a non-directory file and target_file ends with a trailing <slash> character, mv shall treat this as an error and no source_file operands will be processed.", ) parser_mv.add_argument( "source_file", nargs="?", help="A pathname of a file or directory to be moved." ) parser_mv.add_argument( "target_file", nargs="?", help="A new pathname for the file or directory being moved.", ) parser_mv.add_argument( "target_dir", nargs="?", help="A pathname of an existing directory into which to move the input files.", ) parser_mv.add_argument( "-f", dest="force", action="store_true", default=False, help="Do not prompt for confirmation if the destination path exists. Any previous occurrence of the -i option is ignored.", ) parser_mv.add_argument( "-i", dest="interactive", action="store_true", default=False, help="Prompt for confirmation if the destination path exists. Any previous occurrence of the -f option is ignored.", ) def glxsh_mv( source_file=None, target_file=None, target_dir=None, force=None, interactive=None ): if interactive and exists(target_file): sys.stdout.write(("do you want to overwrite %s file ? (Y/n) " % target_file)) sys.stdout.flush() if sys.stdin.readline().upper().startswith("N"): return 0 try: if os.access(source_file, os.F_OK): os.rename(source_file, target_file) return 0 else: return 1 except Exception as error: sys.stderr.write(("mv: %s\n" % error)) return 1 parser_pwd = ArgumentParser( name="pwd - return working directory name", description="The pwd utility shall write to standard output an absolute pathname of the current working directory, which does not contain the filenames dot or dot-dot.", ) parser_pwd.add_argument( "-L", dest="logical", action="store_true", default=False, help="Print the value of $PWD if it names the current working directory", ) parser_pwd.add_argument( "-P", dest="physical", action="store_true", default=False, help="Print the physical directory, without any symbolic links", ) def glxsh_pwd(logical=None, physical=None): if (not logical) and (not physical): logical = True try: if logical: sys.stdout.write(("%s\n" % normpath(os.getcwd()))) else: sys.stdout.write(("%s\n" % realpath(os.getcwd()))) return 0 except (Exception, ArithmeticError) as error: sys.stderr.write(("pwd: %s\n" % error)) return 1 parser_rm = ArgumentParser( name="rm - remove directory entries", description="The rm utility shall remove the directory entry specified by each file argument.\n\nIf either of the files dot or dot-dot are specified as the basename portion of an operand (that is, the final pathname component) or if an operand resolves to the root directory, rm shall write a diagnostic message to standard error and do nothing more with such operands.", ) parser_rm.add_argument( "-i", dest="interactive", action="store_true", help="Prompt for confirmation as described previously. Any previous occurrences of the -f option shall be ignored.", ) parser_rm.add_argument( "-R", "-r", dest="recursive", action="store_true", help="Remove file hierarchies. See the DESCRIPTION.", ) parser_rm.add_argument( "-f", dest="force", action="store_true", help="Do not prompt for confirmation. Do not write diagnostic messages or modify the exit status in the case of no file operands, or in the case of operands that do not exist. Any previous occurrences of the -i option shall be ignored.", ) parser_rm.add_argument( "file", nargs="+", help="A pathname of a directory entry to be removed." ) def glxsh_rm(file=None, recursive=None, interactive=None, force=None): exit_code = 0 if force: interactive = False def _rm(path): try: os.remove(path) return 0 except OSError as error: sys.stderr.write( ("rm: %s: '%s'\n" % (error_code_to_text(error.errno), path)) ) return 1 except (Exception, BaseException) as error: sys.stderr.write(("chmod: %s: '%s'\n" % (error, path))) return 1 for path in file: if recursive: for dirpath, dirnames, filenames in os.walk(path): for dname in dirnames: if interactive: if ( input( ( "do you want to remove %s directory ? (Y/n)" % joinpath(dirpath, dname) ) ) .upper() .startswith("Y") ): exit_code += _rm(joinpath(dirpath, dname)) else: exit_code += _rm(joinpath(dirpath, dname)) for fname in filenames: if interactive: if ( input( ( "do you want to remove %s file ? (Y/n)" % joinpath(dirpath, fname) ) ) .upper() .startswith("Y") ): exit_code += _rm(joinpath(dirpath, fname)) else: exit_code += _rm(joinpath(dirpath, fname)) elif interactive: if ( input(("do you want to remove %s file ? (Y/n)" % path)) .upper() .startswith("Y") ): exit_code += _rm(path) else: exit_code += _rm(path) return 1 if exit_code else 0 parser_rmdir = ArgumentParser( name="rmdir - remove directories", description="The rmdir utility shall remove the directory entry specified by each dir operand.\n\nDirectories shall be processed in the order specified. If a directory and a subdirectory of that directory are specified in a single invocation of the rmdir utility, the application shall specify the subdirectory before the parent directory so that the parent directory will be empty when the rmdir utility tries to remove it.", ) parser_rmdir.add_argument( "dir", dest="dir", nargs="*", help="A pathname of an empty directory to be removed." ) parser_rmdir.add_argument( "-p", dest="parents", action="store_true", default=False, help="Remove all directories in a pathname.", ) def glxsh_rmdir(directories=None, parents=False): exit_code = 0 def rmdir(d): try: os.rmdir(path=d) return 0 except (Exception, ArithmeticError) as error: sys.stderr.write( ("rmdir: %s: '%s'\n" % (error_code_to_text(error.errno), d)) ) return 1 for directory in directories: if exists(directory) and parents: for path, _, files in os.walk(directory, False): for f in files: os.unlink(((path + "/") + f)) exit_code += rmdir(path) else: exit_code += rmdir(directory) return 1 if exit_code else 0 parser_sleep = ArgumentParser( name="sleep - suspend execution for an interval", description="The sleep utility shall suspend execution for at least the integral number of seconds specified by the time operand. ", ) parser_sleep.add_argument( "time", default=0, help="A non-negative decimal integer or float specifying the number of seconds for which to suspend execution.", ) def glxsh_sleep(sec): exit_code = 0 def string_to_numeric_if_possible(x): try: val = float(x) return int(val) if (val == int(val)) else val except (TypeError, ValueError): return x try: time.sleep(string_to_numeric_if_possible(sec)) except (Exception, ArithmeticError) as error: sys.stderr.write(("sleep: %s\n" % error)) exit_code += 1 except KeyboardInterrupt: sys.stdout.write("\n") exit_code += 1 return exit_code parser_tail = ArgumentParser( name="tail - copy the last part of a file", synopsis=["tail [-f] [-c number|-n number] [file]"], description="The tail utility shall copy its input file to the standard output beginning at a designated place.\n\nCopying shall begin at the point in the file indicated by the -c number or -n number options. The option-argument number shall be counted in units of lines or bytes, according to the options -n and -c. Both line and byte counts start from 1.", ) parser_tail.add_argument( "-f", dest="f", action="store_true", default=False, help="If the input file is a regular file or if the file operand specifies a FIFO, do not terminate after the last line of the input file has been copied, but read and copy further bytes from the input file when they become available. If no file operand is specified and standard input is a pipe or FIFO, the -f option shall be ignored. If the input file is not a FIFO, pipe, or regular file, it is unspecified whether or not the -f option shall be ignored.", ) parser_tail.add_argument( "-c", dest="c", type=str, help="The origin for counting shall be 1; that is, -c +1 represents the first byte of the file, -c -1 the last.", ) parser_tail.add_argument( "-n", dest="n", type=str, help="This option shall be equivalent to -c number, except the starting location in the file shall be measured in lines instead of bytes. The origin for counting shall be 1; that is, -n +1 represents the first line of the file, -n -1 the last.", ) parser_tail.add_argument( "file", nargs="*", type=FileType("r"), help="A pathname of an input file. If no file operand is specified, the standard input shall be used.", ) def glxsh_tail(c, f, n, files): def follow(file, sleep_sec=0.1) -> Iterator[str]: line = "" while True: tmp = file.readline() if tmp is not None: line += tmp if line.endswith("\n"): (yield line) line = "" elif sleep_sec: sleep(sleep_sec) try: if (files is None) or (files == []): files = ["-"] if f: for file in files: if file == "-": file = sys.stdin with open(file, "r") as fd: loglines = follow(fd) for line in loglines: sys.stdout.write(("%s" % line)) return 0 except OSError as error: sys.stderr.write(("tail: %s\n" % error_code_to_text(error.errno))) return 1 except KeyboardInterrupt: sys.stdout.write("\n") return 0 parser_tee = ArgumentParser( name="tee - duplicate standard input", synopsis=["tee [-ai] [file...]"], description="The tee utility shall copy standard input to standard output, making a copy in zero or more files. The tee utility shall not buffer output.\n\nIf the -a option is not specified, output files shall be written.", exit_status={ "0": "The standard input was successfully copied to all output files.", ">0": "An error occurred.", }, ) parser_tee.add_argument( "-a", dest="a", action="store_true", default=False, help="Append the output to the files.", ) parser_tee.add_argument( "-i", dest="i", action="store_true", default=False, help="Ignore the SIGINT signal." ) parser_tee.add_argument( "file", nargs="*", type=FileType("r"), help="A pathname of an output file. If a file operand is '-', it refer to a file named '-'", ) def glxsh_tee(a=None, i=None, files=None): a: bool i: bool files: list if a is True: mode = "a" else: mode = "w" exit_code = 0 opened_files = [] if files: for file in files: try: opened_files.append(open(file, mode)) except (Exception, BaseException) as error: sys.stderr.write( ("tee: %s: '%s'\n" % (error_code_to_text(error.errno), file)) ) exit_code += 1 def read_stdin(file_object): while True: data = file_object.readline() if not data: break (yield data) def process_data(data): sys.stdout.write(piece) for opened_file in opened_files: opened_file.write(piece) opened_file.flush() def close_data(): try: for opened_file in opened_files: opened_file.flush() opened_file.close() return 0 except (Exception, BaseException) as err: sys.stderr.write( ("tee: %s: '%s'\n" % (error_code_to_text(err.errno), file)) ) return 1 try: for piece in read_stdin(sys.stdin): process_data(piece) except KeyboardInterrupt: exit_code += close_data() return 1 if exit_code else 0 finally: exit_code += close_data() return 1 if exit_code else 0 parser_time = ArgumentParser( name="time - time a simple command", description="The time utility shall invoke the utility named by the utility operand with arguments supplied as the argument operands and write a message to standard error that lists timing statistics for the utility. ", synopsis=["time [-p] utility [argument...]"], exit_status={ "1-125": "An error occurred in the time utility.", "126": "The utility specified by utility was found but could not be invoked.", "127": "The utility specified by utility could not be found.", }, ) parser_time.add_argument( "-p", dest="p", action="store_true", help="Write the timing output to standard error", ) parser_time.add_argument( "utility", nargs="?", help="The name of a utility that is to be invoked." ) parser_time.add_argument( "argument", nargs="?", help="Any string to be supplied as an argument when invoking the utility named by the utility operand.", ) def glxsh_time(p=None, utility=None, argument=None, line=None, shell=None): from os import times from time import time exit_code = 0 en_time = times() start = time() exit_code = shell.onecmd(line) en_time = times() sys.stderr.write( ( "real %f\nuser %f\nsys %f\n" % ((time() - start), en_time.user, en_time.system) ) ) sys.stderr.flush() return exit_code parser_touch = ArgumentParser( name="touch - change file access and modification times", description="The touch utility shall change the last data modification timestamps, the last data access timestamps, or both.\n\nThe time used can be specified by the -t time option-argument, the corresponding time fields of the file referenced by the -r ref_file option-argument, or the -d date_time option-argument, as specified in the following sections. If none of these are specified, touch shall use the current time.", synopsis=["touch [-acm] [-r ref_file|-t time|-d date_time] file..."], exit_status={ "0": "The utility executed successfully and all requested changes were made.", ">0": "An error occurred.", }, ) parser_touch.add_argument( "-a", action="store_true", default=False, help="Change the access time of file. Do not change the modification time unless -m is also specified.", ) parser_touch.add_argument( "-c", action="store_true", default=False, help="Do not create a specified file if it does not exist. Do not write any diagnostic messages concerning this condition.", ) parser_touch.add_argument( "-m", action="store_true", default=False, help="Change the modification time of file. Do not change the access time unless -a is also specified.", ) parser_touch.add_argument( "-d", nargs="?", help="Use the specified date_time instead of the current time" ) parser_touch.add_argument( "-r", nargs="?", help="Use the corresponding time of the file named by the pathname ref_file instead of the current time.", ) parser_touch.add_argument( "-t", nargs="?", help="Use the specified time instead of the current time. The option-argument shall be a decimal number of the form", ) parser_touch.add_argument( "file", nargs="+", help="A pathname of a file whose times shall be modified.", type=FileType("w"), ) def glxsh_touch(a=None, c=None, d=None, m=None, r=None, t=None, files=None): exit_code = 0 if d: if is_iso8601(d) is False: sys.stderr.write("-d argument is not a valid iso8601 format\n") return 1 if files is None: files = [] for file in files: if exists(file): try: if r: os.utime(file, ns=(os.stat(r).st_atime_ns, os.stat(r).st_mtime_ns)) elif d: os.utime(file, ns=(time.time_ns(), parse_date(d).timetuple())) elif (a is True) and (m is False): os.utime(file, ns=(time.time_ns(), os.stat(file).st_mtime_ns)) elif (a is False) and (m is True): os.utime(file, ns=(os.stat(file).st_atime_ns, time.time_ns())) else: os.utime(file, None) except OSError as error: sys.stderr.write( ("touch: %s: '%s'\n" % (error_code_to_text(error.errno), file)) ) exit_code += 1 elif c is False: try: with open(file, "w"): pass if r: os.utime(file, ns=(os.stat(r).st_atime_ns, os.stat(r).st_mtime_ns)) elif d: os.utime(file, ns=(time.time_ns(), parse_date(d).timetuple())) elif (a is True) and (m is False): os.utime(file, ns=(time.time_ns(), os.stat(file).st_mtime_ns)) elif (a is False) and (m is True): os.utime(file, ns=(os.stat(file).st_atime_ns, time.time_ns())) else: os.utime(file, None) except OSError as error: sys.stderr.write( ("touch: %s: '%s'\n" % (error_code_to_text(error.errno), file)) ) exit_code += 1 return 1 if exit_code else 0 parser_true = ArgumentParser( name="true - return true value", synopsis=["true"], description="The true utility shall return with exit code zero.", exit_status={"0": ""}, ) def glxsh_true(): return 0 parser_tty = ArgumentParser( name="tty - return user's terminal name", description="The tty utility shall write to the standard output the name of the terminal that is open as standard input.", ) def glxsh_tty(): try: sys.stdout.write(("%s\n" % os.ttyname(sys.stdin.fileno()))) return 0 except OSError: sys.stdout.write("not a tty\n") return 1 except (Exception, ArithmeticError) as error: sys.stderr.write(("tty: %s\n" % error_code_to_text(error.errno))) return 1 def symbolic_matcher(): return re.compile("([ugo]*|a)([+-=])([^\\s,]*)") order = "rwx" name_to_value = {"x": 1, "w": 2, "r": 4} value_to_name = {v: k for (k, v) in name_to_value.items()} class_to_loc = {"u": 6, "g": 3, "o": 0} loc_to_class = {v: k for (k, v) in class_to_loc.items()} function_map = { "+": (lambda orig, new: (orig | new)), "-": (lambda orig, new: (orig & (~new))), "=": (lambda orig, new: new), } def invert(perms): return 511 - perms def get_oct_digits(mode): if not (0 <= mode <= 511): raise ValueError("expected a value between 000 and 777") return {"u": ((mode & 448) >> 6), "g": ((mode & 56) >> 3), "o": (mode & 7)} def from_oct_digits(digits): o = 0 for c, m in digits.items(): o |= m << class_to_loc[c] return o def get_symbolic_rep_single(digit): o = "" for sym in "rwx": num = name_to_value[sym] if digit & num: o += sym digit -= num return o def get_symbolic_rep(number): digits = get_oct_digits(number) return "u=%s,g=%s,o=%s" % ( get_symbolic_rep_single(digits["u"]), get_symbolic_rep_single(digits["g"]), get_symbolic_rep_single(digits["o"]), ) def get_numeric_rep_single(rep): o = 0 for sym in set(rep): o += name_to_value[sym] return o parser_umask = ArgumentParser( name="umask - get or set the file mode creation mask", description="The umask utility shall set the file mode creation mask of the current shell execution environment (see Shell Execution Environment) to the value specified by the mask operand. This mask shall affect the initial value of the file permission bits of subsequently created files. If umask is called in a subshell or separate utility execution environment, such as one of the following:\n\n(umask 002)\nnohup umask ...\nfind . -exec umask ... \\;\n\nit shall not affect the file mode creation mask of the caller's environment.\n\nIf the mask operand is not specified, the umask utility shall write to standard output the value of the file mode creation mask of the invoking process.\n", ) parser_umask.add_argument( "mask", nargs="?", const=0, help="A string specifying the new file mode creation mask. The string is treated in the same way as the mode operand described in the EXTENDED DESCRIPTION section for chmod.\nFor a symbolic_mode value, the new value of the file mode creation mask shall be the logical complement of the file permission bits portion of the file mode specified by the symbolic_mode string.\n\nIn a symbolic_mode value, the permissions op characters '+' and '-' shall be interpreted relative to the current file mode creation mask; '+' shall cause the bits for the indicated permissions to be cleared in the mask; '-' shall cause the bits for the indicated permissions to be set in the mask.\n\nThe interpretation of mode values that specify file mode bits other than the file permission bits is unspecified.\n\nIn the octal integer form of mode, the specified bits are set in the file mode creation mask.\n\nThe file mode creation mask shall be set to the resulting numeric value.\n\nThe default output of a prior invocation of umask on the same system with no operand also shall be recognized as a mask operand.\n", ) parser_umask.add_argument( "-S", dest="symbolic", action="store_true", default=False, help="Produce symbolic output.", ) def single_symbolic_arg(arg, old=None): if old is None: old = os.umask(0) os.umask(old) match = symbolic_matcher.match(arg) if not match: raise ValueError(("could not parse argument %r" % arg)) (class_, op, mask) = match.groups() if class_ == "a": class_ = "ugo" invalid_chars = [i for i in mask if (i not in name_to_value)] if invalid_chars: raise ValueError(("invalid mask %r" % mask)) digits = get_oct_digits(old) new_num = get_numeric_rep_single(mask) for c in set(class_): digits[c] = function_map[op](digits[c], new_num) return from_oct_digits(digits) def valid_numeric_argument(x): try: return (len(x) == 3) and all(((0 <= int(i) <= 7) for i in x)) except: return False def octal_to_string(octal): result = "" value_letters = [(4, "r"), (2, "w"), (1, "x")] for permissions in [int(n) for n in str(octal)]: for value, letter in value_letters: if permissions >= value: result += letter permissions -= value else: result += "-" return result def glxsh_umask(mask=None, symbolic=True): cur = os.umask(0) os.umask(cur) if mask: print(cur) print(mask) sys.stdout.write( ("%s\n" % oct((511 - symbolic_mode(mask, umask=cur, isdir=0)))) ) return 0 else: if symbolic: sys.stdout.write(("%s\n" % get_symbolic_rep(invert(cur)))) sys.stdout.write(("%s\n" % octal_to_string(invert(cur)))) return 0 else: to_print = oct(cur)[2:] while len(to_print) < 3: to_print = "0%s" % to_print sys.stdout.write(("%s\n" % to_print)) return 0 parser_unalias = ArgumentParser( name="unalias - remove alias definitions", description="The unalias utility shall remove the definition for each alias name specified.", synopsis=["unalias alias-name..."], exit_status={ "0": "Successful completion.", ">0": "One of the alias-name operands specified did not represent a valid alias definition, or an error occurred.", }, ) parser_unalias.add_argument( "-a", dest="a", action="store_true", help="Remove all alias definitions from the current shell execution environment.", ) parser_unalias.add_argument( "alias-name", dest="alias_name", nargs="*", type=str, help="The name of an alias to be removed.", ) def glxsh_unalias(a=None, alias_name=None, shell=None): exit_code = 0 try: if a: shell.alias = {} else: for alias in alias_name: if alias in shell.alias: del shell.alias[alias] else: exit_code += 1 return exit_code except (Exception, BaseException) as error: sys.stderr.write(("alias: %s\n" % error)) return 1 parser_uname = ArgumentParser( name="uname - return system name", description="By default, the uname utility shall write the operating system name to standard output. When options are specified, symbols representing one or more system characteristics shall be written to the standard output.", ) parser_uname.add_argument( "-a", dest="all", action="store_true", help="Behave as though all of the options -mnrsv were specified.", ) parser_uname.add_argument( "-s", dest="sysname", action="store_true", help="Write the name of the implementation of the operating system.", ) parser_uname.add_argument( "-n", dest="nodename", action="store_true", help="Write the name of this node within an implementation-defined communications network.", ) parser_uname.add_argument( "-r", dest="release", action="store_true", help="Write the current release level of the operating system implementation.", ) parser_uname.add_argument( "-v", action="store_true", dest="version", help="Write the current version level of this release of the operating system implementation.", ) parser_uname.add_argument( "-m", dest="machine", action="store_true", help="Write the name of the hardware type on which the system is running.", ) def glxsh_uname( all=False, sysname=False, nodename=False, release=False, version=False, machine=False, ): try: info = os.uname() def gen_lines(): if all or nodename: (yield info.nodename) if all or release: (yield info.release) if all or version: (yield info.version) if all or machine: (yield info.machine) lines = list(gen_lines()) if all or sysname or (not lines): lines.insert(0, info.sysname) sys.stdout.write(("%s\n" % " ".join(lines))) return 0 except (Exception, ArithmeticError) as error: sys.stderr.write(("uname: %s\n" % error_code_to_text(error.errno))) return 1 try: import subprocess except ImportError: subprocess = None class GLXUsh(Cmd, GLXEnviron, GLXAlias): if hasattr(sys.implementation, "mpy"): loader_mpy = "MPY %s" % sys.implementation.mpy else: loader_mpy = "" if hasattr(gc, "mem_free") and hasattr(gc, "mem_alloc"): gc.collect() memory_total = ( "%s MEMORY SYSTEM\n" % str(size_of((gc.mem_free() + gc.mem_alloc()))).upper() ) memory_free = "%s FREE\n" % str(size_of(gc.mem_free())).upper() elif hasattr(os, "sysconf"): memory_total = ( "%s RAM SYSTEM\n" % str( size_of((os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES"))) ).upper() ) memory_free = ( "%s FREE\n" % str( size_of((os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_AVPHYS_PAGES"))) ).upper() ) else: memory_total = "" memory_free = "" intro = ( "******************************* %s V%s **********************************\n\n%s\nLOADER %s %s %s\nEXEC PYTHON V%s\n%s%s" % ( APPLICATION_NAME.upper(), APPLICATION_VERSION.upper(), APPLICATION_LICENSE.upper(), sys.implementation.name.upper(), ".".join((str(item).upper() for item in list(sys.implementation.version))), loader_mpy, sys.version.upper(), memory_total, memory_free, ) ) dow = ("Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun") mon = ( "???", "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec", ) ps1_clean_up_1 = re.compile("\\$\\{.*\\}") ps1_clean_up_2 = re.compile("\\$\\(.*\\)") ps1_exit_code = re.compile("\\$\\?") ps1_hostname_sort = re.compile("\\\\h") ps1_hostname = re.compile("\\\\H") ps1_date = re.compile("\\\\d") ps1_shell = re.compile("\\\\s") ps1_username = re.compile("\\\\u") ps1_shell_version = re.compile("\\\\v") ps1_shell_release = re.compile("\\\\V") ps1_working_directory = re.compile("\\\\w") ps1_working_directory_basename = re.compile("\\\\W") ps1_prompt_sign = re.compile("\\\\\\$") ps1_newline = re.compile("\\\\n") ps1_carriage_return = re.compile("\\\\r") ps1_bell = re.compile("\\\\a") ps1_time_24_hour = re.compile("\\\\t") ps1_time_12_hour = re.compile("\\\\T") ps1_time_am_pm = re.compile("\\\\@") ps1_begin_a_sequence_of_non_printing_characters = re.compile("\\\\\\[\\\\033") ps1_end_a_sequence_of_non_printing_characters = re.compile("\\\\\\]") ps1_virtual_env = re.compile("\\$VIRTUAL_ENV") def __init__(self): super().__init__() GLXEnviron.__init__(self) GLXAlias.__init__(self) if os.isatty(sys.stdin.fileno()): self.init_inside_a_tty = True else: self.init_inside_a_tty = False if hasattr(os, "environ"): self.environ = os.environ.copy() else: self.setenv("PATH", getcwd(), 1) self.setenv("HOME", sep, 1) self.setenv("PWD", getcwd(), 1) self.setenv( "PS1", "$VIRTUAL_ENV\\[\\033[01;32m\\]\\u@\\h\\[\\033[00m\\]:\\[\\033[00;34m\\]\\w\\[\\033[00m\\]\\$ ", ) self.update_columns_and_lines_vars() self.load_alias() def load_alias(self): file = expanduser(("~%s.glxsh_alias" % sep)) if exists(file): with open(file=file, mode="r", encoding="utf8") as alias_file: for line in alias_file.readlines(): self.do_alias(line) @property def prompt(self): tm = localtime() if self.environ.get("PS1"): tmp_value = self.environ.get("PS1") if self.environ.get("VIRTUAL_ENV"): tmp_value = self.ps1_virtual_env.sub( ("(%s)" % basename(self.environ.get("VIRTUAL_ENV"))), tmp_value ) tmp_value = self.ps1_clean_up_1.sub("", tmp_value) tmp_value = self.ps1_clean_up_2.sub("", tmp_value) if self.exit_code: exit_code = "\x1b[01;31m%s\x1b[00m" % self.exit_code else: exit_code = "\x1b[01;32m%s\x1b[00m" % self.exit_code tmp_value = self.ps1_exit_code.sub(exit_code, tmp_value) tmp_value = self.ps1_date.sub( ("%s %s %02d" % (self.dow[tm[6]], self.mon[tm[1]], tm[2])), tmp_value ) tmp_value = self.ps1_hostname_sort.sub(gethostname(), tmp_value) tmp_value = self.ps1_hostname.sub(getfqdn(), tmp_value) tmp_value = self.ps1_shell.sub(APPLICATION_NAME, tmp_value) tmp_value = self.ps1_time_24_hour.sub( ("%02d:%02d:%02d" % (tm[3], tm[4], tm[5])), tmp_value ) if tm[3] in range(13, 23, 1): hour = tm[3] - 12 elif tm[3] == 0: hour = 12 else: hour = tm[3] tmp_value = self.ps1_time_12_hour.sub( ("%02d:%02d:%02d" % (hour, tm[4], tm[5])), tmp_value ) if tm[3] in range(1, 11, 1): am_pm_text = "AM" elif tm[3] in range(13, 23, 1): am_pm_text = "PM" elif tm[3] == 0: am_pm_text = "AM" else: am_pm_text = "" tmp_value = self.ps1_time_am_pm.sub(am_pm_text, tmp_value) tmp_value = self.ps1_username.sub(getuser(), tmp_value) tmp_value = self.ps1_shell_version.sub(APPLICATION_VERSION, tmp_value) tmp_value = self.ps1_shell_release.sub( ("%s.%s" % (APPLICATION_VERSION, APPLICATION_PATCH_LEVEL)), tmp_value ) tmp_value = self.ps1_working_directory.sub( getcwd().replace( ("%s%s" % (self.environ.get("HOME"), sep)), ("~%s" % sep) ), tmp_value, ) tmp_value = self.ps1_working_directory_basename.sub( basename(self.environ.get("PWD")), tmp_value ) tmp_value = self.ps1_prompt_sign.sub( ("$" if os.getuid() else "#"), tmp_value ) tmp_value = self.ps1_newline.sub("\n", tmp_value) tmp_value = self.ps1_carriage_return.sub("\r", tmp_value) tmp_value = self.ps1_bell.sub("\x07", tmp_value) tmp_value = self.ps1_begin_a_sequence_of_non_printing_characters.sub( "\x1b", tmp_value ) tmp_value = self.ps1_end_a_sequence_of_non_printing_characters.sub( "", tmp_value ) return tmp_value return "%s%s " % (self.exit_code, ">") @staticmethod def do_EOF(_): sys.stdout.write("\n") sys.stdout.flush() return True def default(self, line): sys.stdout.write(("glxsh: %s\n" % line)) self.exit_code = 127 def precmd(self, line): if (not str(line).startswith("alias")) and ( not str(line).startswith("unalias") ): for key, value in self.alias.items(): if line.rstrip() == key: line = line.replace(key, value) return line def postcmd(self, stop, line): self.update_columns_and_lines_vars() return stop def onecmd(self, line): (cmd, arg, line) = self.parseline(line) if not line: return self.emptyline() if cmd is None: return self.default(("cmd is None for: %s" % line)) self.lastcmd = line if line == "EOF": self.lastcmd = "" if cmd == "": return self.default(("cmd is '' for: %s" % line)) if "|" in line: return self.run_multiple_commands(line) return self.run_simple_command(cmd, arg, line) def update_columns_and_lines_vars(self): if self.init_inside_a_tty: self.environ["COLUMNS"] = str(os.get_terminal_size().columns) self.environ["LINES"] = str(os.get_terminal_size().lines) @staticmethod def cmdline_split(s, platform=1): if platform == "this": platform = sys.platform != "win32" if platform == 1: RE_CMD_LEX = '"((?:\\\\["\\\\]|[^"])*)"|\'([^\']*)\'|(\\\\.)|(&&?|\\|\\|?|\\d?\\>|[<])|([^\\s\'"\\\\&|<>]+)|(\\s+)|(.)' elif platform == 0: RE_CMD_LEX = '"((?:""|\\\\["\\\\]|[^"])*)"?()|(\\\\\\\\(?=\\\\*")|\\\\")|(&&?|\\|\\|?|\\d?>|[<])|([^\\s"&|<>]+)|(\\s+)|(.)' else: raise AssertionError(("unkown platform %r" % platform)) args = [] accu = None for qs, qss, esc, pipe, word, white, fail in re.findall(RE_CMD_LEX, s): if word: pass elif esc: word = esc[1] elif white or pipe: if accu is not None: args.append(accu) if pipe: args.append(pipe) accu = None continue elif fail: raise ValueError("invalid or incomplete shell string") elif qs: word = qs.replace('\\"', '"').replace("\\\\", "\\") if platform == 0: word = word.replace('""', '"') else: word = qss accu = (accu or "") + word if accu is not None: args.append(accu) return args def run_multiple_commands(self, line): (s_in, s_out) = (0, 0) s_in = os.dup(0) s_out = os.dup(1) fdin = os.dup(s_in) for command in line.split("|"): os.dup2(fdin, 0) os.close(fdin) if command == line.split("|")[(-1)]: fdout = os.dup(s_out) else: (fdin, fdout) = os.pipe() os.dup2(fdout, 1) os.close(fdout) (tmp_cmd, tmp_arg, tmp_line) = self.parseline(command) self.run_simple_command(tmp_cmd, tmp_arg, tmp_line) os.dup2(s_in, 0) os.dup2(s_out, 1) os.close(s_in) os.close(s_out) def run_simple_command(self, cmd, arg, line): if hasattr(self, ("do_%s" % cmd)): try: func = getattr(self, ("do_%s" % cmd)) self.exit_code = func(arg) self.setenv("?", str(self.exit_code)) except KeyboardInterrupt: pass except SystemExit as code: self.exit_code = code self.setenv("?", str(self.exit_code)) return True return None try: stdin_fd = sys.stdin.fileno() save_settings = termios.tcgetattr(stdin_fd) except (ModuleNotFoundError, IOError): save_settings = None stdin_fd = -1 try: pr = subprocess.run( line.split(" "), start_new_session=True, env=self.environ, check=False ) self.exit_code = pr.returncode self.setenv("?", str(self.exit_code)) except KeyboardInterrupt: pass except FileNotFoundError: sys.stdout.write(("%s: %s : command not found\n" % (APPLICATION_NAME, cmd))) self.exit_code = 127 self.setenv("?", str(self.exit_code)) finally: if save_settings: termios.tcsetattr(stdin_fd, termios.TCSANOW, save_settings) return None def _print_help(self, parser): if self.environ.get("COLUMNS"): parser.print_help(columns=int(self.environ.get("COLUMNS"))) else: parser.print_help() def help_alias(self): self._print_help(parser_alias) def do_alias(self, line): return glxsh_alias(string=line, shell=self) def help_basename(self): self._print_help(parser_basename) @WrapperCmdLineArgParser(parser_basename) def do_basename(self, _, parsed): return glxsh_basename(string=parsed.string, suffix=parsed.suffix) @WrapperCmdLineArgParser(parser_cat) def do_cat(self, _, parsed): return glxsh_cat(files=parsed.file) def help_cat(self): self._print_help(parser_cat) @staticmethod def complete_cat(text, line, begidx, endidx): return glxsh_completer_file(text, line, begidx, endidx) @WrapperCmdLineArgParser(parser_cd) def do_cd(self, _, parsed): return glxsh_cd( directory=parsed.directory, logical=parsed.logical, physical=parsed.physical, shell=self, ) def help_cd(self): self._print_help(parser_cd) @staticmethod def complete_cd(text, line, begidx, endidx): return glxsh_completer_directory(text, line, begidx, endidx) @WrapperCmdLineArgParser(parser_clear) def do_clear(self, _, __): return glxsh_clear() def help_clear(self): self._print_help(parser_clear) @WrapperCmdLineArgParser(parser_chmod) def do_chmod(self, _, parsed): if (not parsed.mode) or (not parsed.file): parser_chmod.print_usage() return 1 return glxsh_chmod( recursive=parsed.recursive, mode=parsed.mode, file=parsed.file ) def help_chmod(self): self._print_help(parser_chmod) @staticmethod def complete_chmod(text, line, begidx, endidx): return glxsh_complete_chmod(text, line, begidx, endidx) @WrapperCmdLineArgParser(parser_cp) def do_cp(self, _, parsed): return glxsh_cp( source_file=parsed.source_file, target_file=parsed.target_file, interactive=parsed.interactive, ) def help_cp(self): self._print_help(parser_cp) @WrapperCmdLineArgParser(parser_date) def do_date(self, line, parsed): if parsed.u: line = line.replace("-u ", "") line = line.replace("-u", "") return glxsh_date(u=parsed.u, custom_format=line, shell=self) def help_date(self): self._print_help(parser_date) @WrapperCmdLineArgParser(parser_df) def do_df(self, _, parsed): return glxsh_df( file=parsed.file, block_size=parsed.kilo, total=parsed.total, human_readable=parsed.human_readable, ) def help_df(self): self._print_help(parser_df) @WrapperCmdLineArgParser(parser_dirname) def do_dirname(self, _, parsed): return glxsh_dirname(parsed.string) def help_dirname(self): self._print_help(parser_dirname) @WrapperCmdLineArgParser(parser_du) def do_du(self, _, parsed): return glxsh_du( a=parsed.a, H=parsed.H, k=parsed.k, L=parsed.L, s=parsed.s, x=parsed.x, files=parsed.files, ) @staticmethod def complete_du(text, line, begidx, endidx): return glxsh_complete_du(text, line, begidx, endidx) def help_du(self): self._print_help(parser_du) def help_echo(self): self._print_help(parser_echo) @WrapperCmdLineArgParser(parser_echo) def do_echo(self, line, parsed): if parsed.newline: line = line.replace("-n ", "") return glxsh_echo(string=line, newline=parsed.newline, shell=self) def complete_echo(self, text, line, begidx, endidx): return glxsh_complete_echo(text, line, begidx, endidx, shell=self) def help_env(self): self._print_help(parser_env) @WrapperCmdLineArgParser(parser_env) def do_env(self, _, parsed): return glxsh_env( name=parsed.name, utility=parsed.utility, argument=parsed.argument, shell=self, ) @WrapperCmdLineArgParser(parser_exit) def do_exit(self, _, parsed): if parsed.code: self.exit_code = parsed.code[0] return glxsh_exit(code=parsed.code, shell=self) def help_exit(self): self._print_help(parser_exit) @WrapperCmdLineArgParser(parser_false) def do_false(self, _, __): return glxsh_false() def help_false(self): self._print_help(parser_false) @WrapperCmdLineArgParser(parser_head) def do_head(self, _, parsed): return glxsh_head(files=parsed.file, number=parsed.number) @staticmethod def complete_head(text, line, begidx, endidx): return glxsh_completer_file(text, line, begidx, endidx) def help_head(self): self._print_help(parser_head) def emptyline(self): sys.stdout.write("\n") @WrapperCmdLineArgParser(parser_ls) def do_ls(self, _, parsed): return glxsh_ls( A=parsed.A, C=parsed.C, F=parsed.F, H=parsed.H, L=parsed.L, recurse=parsed.recurse, S=parsed.S, a=parsed.a, c=parsed.c, d=parsed.d, f=parsed.f, g=parsed.g, i=parsed.i, k=parsed.k, l=parsed.l, m=parsed.m, n=parsed.n, o=parsed.o, p=parsed.p, q=parsed.q, r=parsed.r, s=parsed.s, t=parsed.t, u=parsed.u, x=parsed.x, one=parsed.one, file=parsed.file, shell=self, ) def help_ls(self): self._print_help(parser_ls) @WrapperCmdLineArgParser(parser_mkdir) def do_mkdir(self, _, parsed): if not parsed.dir: parser_mkdir.print_usage() return 1 return glxsh_mkdir( directories=parsed.dir, parents=parsed.parents, mode=parsed.mode ) def help_mkdir(self): self._print_help(parser_mkdir) @WrapperCmdLineArgParser(parser_mv) def do_mv(self, _, parsed): if (not parsed.target_file) or parsed.source_file: parser_mv.print_usage() return 1 return glxsh_mv( source_file=parsed.source_file, target_file=parsed.target_file, target_dir=parsed.target_dir, force=parsed.force, interactive=parsed.interactive, ) def help_mv(self): self._print_help(parser_mv) @WrapperCmdLineArgParser(parser_pwd) def do_pwd(self, _, parsed): return glxsh_pwd(logical=parsed.logical, physical=parsed.physical) def help_pwd(self): self._print_help(parser_pwd) @WrapperCmdLineArgParser(parser_rm) def do_rm(self, _, parsed): if not parsed.file: parser_rm.print_usage() return 1 return glxsh_rm( file=parsed.file, recursive=parsed.recursive, interactive=parsed.interactive, force=parsed.force, ) def help_rm(self): self._print_help(parser_rm) @staticmethod def complete_rmdir(text, line, begidx, endidx): return glxsh_complete_rmdir(text, line, begidx, endidx) @WrapperCmdLineArgParser(parser_rmdir) def do_rmdir(self, _, parsed): if not parsed.dir: parser_rmdir.print_usage() return 1 return glxsh_rmdir(directories=parsed.dir, parents=parsed.parents) def help_rmdir(self): self._print_help(parser_rmdir) @WrapperCmdLineArgParser(parser_sleep) def do_sleep(self, _, parsed): if not parsed.time: parser_sleep.print_usage() return 1 return glxsh_sleep(sec=parsed.time) @WrapperCmdLineArgParser(parser_tee) def do_tee(self, _, parsed): return glxsh_tee(a=parsed.a, i=parsed.i, files=parsed.file) def help_tee(self): self._print_help(parser_tee) @WrapperCmdLineArgParser(parser_time) def do_time(self, line, parsed): return glxsh_time( p=parsed.p, utility=parsed.utility, argument=parsed.argument, line=line, shell=self, ) def help_time(self): self._print_help(parser_time) @WrapperCmdLineArgParser(parser_touch) def do_touch(self, _, parsed): return glxsh_touch( a=parsed.a, c=parsed.c, d=parsed.d, m=parsed.m, r=parsed.r, t=parsed.t, files=parsed.file, ) def help_touch(self): self._print_help(parser_touch) @WrapperCmdLineArgParser(parser_true) def do_true(self, _, __): return glxsh_true() def help_true(self): self._print_help(parser_true) @WrapperCmdLineArgParser(parser_tty) def do_tty(self, _, __): return glxsh_tty() def help_tty(self): self._print_help(parser_tty) def help_sleep(self): if self.environ.get("COLUMNS"): parser_sleep.print_help(columns=int(self.environ.get("COLUMNS"))) else: parser_sleep.print_help() @WrapperCmdLineArgParser(parser_uname) def do_uname(self, _, parsed): return glxsh_uname( all=parsed.all, sysname=parsed.sysname, nodename=parsed.nodename, release=parsed.release, version=parsed.version, machine=parsed.machine, ) def help_uname(self): self._print_help(parser_uname) def help_umask(self): self._print_help(parser_umask) @WrapperCmdLineArgParser(parser_umask) def do_umask(self, _, parsed): return glxsh_umask(mask=parsed.mask, symbolic=parsed.symbolic) def help_unalias(self): self._print_help(parser_unalias) @WrapperCmdLineArgParser(parser_unalias) def do_unalias(self, _, parsed): return glxsh_unalias(a=parsed.a, alias_name=parsed.alias_name, shell=self) @WrapperCmdLineArgParser(parser_tail) def do_tail(self, _, parsed): return glxsh_tail(c=parsed.c, f=parsed.f, n=parsed.n, files=parsed.file) def help_tail(self): self._print_help(parser_tail) parser_glxsh = ArgumentParser(prog="glxsh", add_help=True) parser_glxsh.add_argument( "command", nargs="?", help="optional commands or file to run, if no commands given, enter an interactive shell", ) parser_glxsh.add_argument( "command_args", nargs="...", help="if commands is not a file use optional arguments for commands", ) def main(): if len(sys.argv) > 1: args = parser_glxsh.parse_args(sys.argv[1:]) if args.help: parser_glxsh.print_help() return 0 if isfile(args.command): try: with open(args.command) as rcFile: for line in rcFile.readlines(): line = line.rstrip() if (len(line) > 0) and (line[0] != "#"): exit_code = GLXUsh().onecmdhooks(("%s" % line)) except IOError: exit_code = 1 else: return exit_code else: return GLXUsh().onecmdhooks( ("%s %s" % (args.command, " ".join(args.command_args))) ) else: return GLXUsh().cmdloop() if __name__ == "__main__": sys.exit(main())

Top » Module code » glxshell.glxush

© Copyright 2020-2024, Galaxie Shell Team.
This page is licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Public License (CC BY-NC-SA 4.0).
Examples, recipes, and other code in the documentation are additionally licensed under the Zero Clause BSD License.
See History and License for more information.

Last updated on None.
Created using Sphinx 8.0.2.