Import of the watch repository from Pebble

This commit is contained in:
Matthieu Jeanson 2024-12-12 16:43:03 -08:00 committed by Katharine Berry
commit 3b92768480
10334 changed files with 2564465 additions and 0 deletions

7
python_libs/pebble-loghash/.gitignore vendored Normal file
View file

@ -0,0 +1,7 @@
*.pyc
.env
.DS_Store
dist
MANIFEST
__pycache__
.cache

View file

@ -0,0 +1,2 @@
# pyegg-pebble-loghash
A python egg for dealing with hashed TinTin logs

View file

@ -0,0 +1,18 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Namespace module
"""
__import__('pkg_resources').declare_namespace(__name__)

View file

@ -0,0 +1,14 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View file

@ -0,0 +1,78 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#/usr/bin/env python
"""
Constants used in this module
"""
import re
# Hash Mask
HASH_MASK = 0x00FFFFFF
# Regular Expressions
LOG_LINE_CONSOLE_REGEX = r"^(?P<re_level>.)\s+(?P<task>.)\s+(?P<time>.*)\s+(?P<msg>.*:.*>\s+LH.*)$"
LOG_LINE_SUPPORT_REGEX = r"^(?P<date>.*)\s+(?P<time>.*)\s+(?P<msg>.*:.*\s+LH.*)$"
LOG_MSG_REGEX = r"^(?P<f>\w*\.?\w*):(?P<l>\d*)>?\s+(?:LH:)?(?P<h>(?:0x)?[a-f0-9]{1,8}\s?.*)$"
DEHASHED_MSG_REGEX = r"^(\w+\.?\w?):(\d+)?:?(.*)$"
HASHED_INFO_REGEX = r"^(?P<hash_key>(?:0x)?[a-f0-9]{1,8})\s?(?P<arg_list>.+)?$"
FORMAT_TAG_REGEX = r"%(\.\*)?#?[0-9]{0,3}[Xdilupcszxh]+"
STR_LITERAL_REGEX = r"^(.*?)(\".*\"\s*(?:(?:PRI[A-z](?:\d{1,2}|PTR))|B[DT]_.*_FMT)*)(.*)$"
FORMAT_SPECIFIER_REGEX = r"(%#?[0-9]{0,3}[Xdilupcszxh]+)"
# New Logging Regular Expressions
NEWLOG_LINE_CONSOLE_REGEX = LOG_LINE_CONSOLE_REGEX.replace('LH', 'NL')
NEWLOG_LINE_SUPPORT_REGEX = LOG_LINE_SUPPORT_REGEX.replace('LH', 'NL')
NEWLOG_HASHED_INFO_REGEX = r"^(?::0[>]? NL:)(?P<hash_key>(?:0x)?[a-f0-9]{1,8})\s?(?P<arg_list>.+)?$"
POINTER_FORMAT_TAG_REGEX = r"(?P<format>%-?[0-9]*)p"
HEX_FORMAT_SPECIFIER_REGEX = r"%[- +#0]*\d*(\.\d+)?(hh|h|l|ll|j|z|t|L)?(x|X)"
# re patterns
STR_LITERAL_PATTERN = re.compile(STR_LITERAL_REGEX)
FORMAT_SPECIFIER_PATTERN = re.compile(FORMAT_SPECIFIER_REGEX)
LOG_LINE_CONSOLE_PATTERN = re.compile(LOG_LINE_CONSOLE_REGEX)
LOG_LINE_SUPPORT_PATTERN = re.compile(LOG_LINE_SUPPORT_REGEX)
LOG_MSG_PATTERN = re.compile(LOG_MSG_REGEX)
DEHASHED_MSG_PATTERN = re.compile(DEHASHED_MSG_REGEX)
HASHED_INFO_PATTERN = re.compile(HASHED_INFO_REGEX)
FORMAT_TAG_PATTERN = re.compile(FORMAT_TAG_REGEX)
# New Logging Patterns
NEWLOG_LINE_CONSOLE_PATTERN = re.compile(NEWLOG_LINE_CONSOLE_REGEX)
NEWLOG_LINE_SUPPORT_PATTERN = re.compile(NEWLOG_LINE_SUPPORT_REGEX)
NEWLOG_HASHED_INFO_PATTERN = re.compile(NEWLOG_HASHED_INFO_REGEX)
POINTER_FORMAT_TAG_PATTERN = re.compile(POINTER_FORMAT_TAG_REGEX)
HEX_FORMAT_SPECIFIER_PATTERN = re.compile(HEX_FORMAT_SPECIFIER_REGEX)
# Output file lines
FORMAT_IDENTIFIER_STRING_FMT = "char *format_string_{} = \"{}\";\n"
LOOKUP_RESULT_STRING_FMT = "if (loghash == {}) fmt = format_string_{};\n"
LOOKUP_DEFAULT_STRING = "fmt = \"\";\n"
FILE_IGNORE_LIST = []
# Lines to hash
GENERIC_LOG_TYPES = ["PBL_LOG", "PBL_ASSERT", "PBL_CROAK"]
BT_LOG_TYPES = ["BLE_LOG_DEBUG", "BLE_GAP_LOG_DEBUG", "BLE_CORE_LOG_DEBUG",
"BT_LOG_ERROR", "BT_LOG_DEBUG", "HCI_LOG_ERROR", "GAP_LOG_ERROR",
"GAP_LOG_DEBUG", "GAP_LOG_WARNING", "HCI_LOG_DEBUG"]
QEMU_LOG_TYPES = ["QEMU_LOG_DEBUG", "QEMU_LOG_ERROR"]
MISC_LOG_TYPES = ["ACCEL_LOG_DEBUG", "ANIMATION_LOG_DEBUG", "VOICE_LOG",
"ISPP_LOG_DEBUG", "ISPP_LOG_DEBUG_VERBOSE",
"RECONNECT_IOS_DEBUG", "SDP_LOG_DEBUG", "SDP_LOG_ERROR",
"ANALYTICS_LOG_DEBUG"]
LINES_TO_HASH = GENERIC_LOG_TYPES + BT_LOG_TYPES + QEMU_LOG_TYPES + MISC_LOG_TYPES
# Key to force next line to be hashed
HASH_NEXT_LINE = "// HASH_NEXT_LINE"

View file

@ -0,0 +1,206 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#/usr/bin/env python
"""
Module for de-hashing log strings
"""
from pebble.loghashing.constants import (LOG_LINE_CONSOLE_PATTERN, LOG_LINE_SUPPORT_PATTERN,
LOG_MSG_PATTERN, DEHASHED_MSG_PATTERN, HASHED_INFO_PATTERN,
FORMAT_TAG_PATTERN)
from pebble.loghashing.newlogging import dehash_line as newlogging_dehash_line
from pebble.loghashing.newlogging import LOG_DICT_KEY_VERSION
def dehash_file(file_name, lookup_dict):
"""
Dehash a file
:param file_name: Path of the file to dehash
:type file_name: str
:param lookup_dict: Hash lookup dictionary
:type lookup_dict: dict
:returns: A list containing the dehashed lines
"""
# Grab the lines from the file
with open(file_name, 'r') as fp:
lines = fp.readlines()
# Dehash the lines
lines = [dehash_line(x, lookup_dict) + "\n" for x in lines]
return lines
def dehash_line(line, lookup_dict):
"""
Dehash a line
:param line: The line to dehash
:type line: str
:param lookup_dict: Hash lookup dictionary
:type lookup_dict: dict
If the lookup dictionary contains the 'new_logging_version' key, it's a newlogging style
print. Pass it off to the appropriate handler.
:returns: A string containing the dehashed line, or the submitted line.
"""
if LOG_DICT_KEY_VERSION in lookup_dict:
return newlogging_dehash_line(line, lookup_dict)
return parse_line(line, lookup_dict) or parse_support_line(line, lookup_dict) or line
def parse_line(line, lookup_dict):
"""
Parse a log line
:param msg: The line to parse
:type msg: str
:param lookup_dict: Hash lookup dictionary
:type lookup_dict: dict
:returns: A string containing the parsed line, or a null string.
"""
match = LOG_LINE_CONSOLE_PATTERN.search(line)
output = ""
if match:
parsed = parse_message(match.group('msg'), lookup_dict)
output = "{} {} {} {}:{}> {}".format(match.group('re_level'), match.group('task'),
match.group('time'), parsed['file'],
parsed['line'], parsed['msg'])
return output
def parse_support_line(line, lookup_dict):
"""
Parse a log line
:param msg: The line to parse
:type msg: str
:param lookup_dict: Hash lookup dictionary
:type lookup_dict: dict
:returns: A string containing the parsed line, or a null string.
"""
match = LOG_LINE_SUPPORT_PATTERN.search(line)
output = ""
if match:
parsed = parse_message(match.group('msg'), lookup_dict)
output = "{} {} {}:{}> {}".format(match.group('date'), match.group('time'),
parsed['file'], parsed['line'], parsed['msg'])
return output
def parse_message(msg, lookup_dict):
"""
Parse the log message part of a line
:param msg: The message to parse
:type msg: str
:param lookup_dict: Hash lookup dictionary
:type lookup_dict: dict
:returns: A dictionary containing the parsed message, file name, and line number
"""
output = {'msg':msg, 'file':"", 'line':""}
match = LOG_MSG_PATTERN.search(msg)
if match:
output['file'] = match.group('f')
output['line'] = match.group('l')
hashed = match.group('h')
dehashed_str = dehash_str(hashed, lookup_dict)
output['msg'] = "LH:{}".format(dehashed_str)
match2 = DEHASHED_MSG_PATTERN.search(dehashed_str)
if match2:
output['file'] = match2.group(1) or output['file']
output['line'] = match2.group(2) or output['line']
output['msg'] = match2.group(3) or dehashed_str
return output
def dehash_str(hashed_info, lookup_dict):
"""
Search the lookup dictionary for a match, and return the dehashed string
:param hashed_info: Hash and arguments
:type hashed_info: str
:returns: A string with after doing a hash lookup, and substituting arguments
"""
match = HASHED_INFO_PATTERN.search(hashed_info)
# If there's no mach, return the hashed info as the log message
output = hashed_info
if match:
# Look for the hex value in the dictionary keys
# If we can't find a match, set formatted string to hashed_info
formatted_string = lookup_dict.get(str(match.group('hash_key')), hashed_info)
# If we couldn't find a match, try converting to base 10 to find a match
# If we can't find a match, set formatted string to hashed_info
if formatted_string == hashed_info:
formatted_string = lookup_dict.get(str(int(match.group('hash_key'), 16)), hashed_info)
# For each argument, substitute a C-style format specififier in the string
for arg in parse_args(match.group('arg_list')):
formatted_string = FORMAT_TAG_PATTERN.sub(arg, formatted_string, 1)
# Return the filename, and log message
output = formatted_string
return output
def parse_args(raw_args):
"""
Split the argument list, taking care of `delimited strings`
Idea taken from http://bit.ly/1KHzc0y
:param raw_args: Raw argument list
:type raw_args: str
:returns: A list containing the arguments
"""
args = []
arg_run = []
in_str = False
if raw_args:
for arg_ch in raw_args:
# Start or stop of a ` delimited string
if arg_ch == "`":
in_str = not in_str
# If we find a space, and we're not in a ` delimited string, this is a Boundary
elif arg_ch == " " and not in_str:
args.append("".join(arg_run).strip())
arg_run = []
else:
arg_run.append(arg_ch)
if arg_run:
args.append("".join(arg_run).strip())
return args

View file

@ -0,0 +1,204 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#/usr/bin/env python
"""
Module for hashing log strings
"""
import json
import os
import re
from pebble.loghashing.constants import (STR_LITERAL_PATTERN, FORMAT_SPECIFIER_PATTERN,
FORMAT_IDENTIFIER_STRING_FMT, LOOKUP_RESULT_STRING_FMT,
LINES_TO_HASH, HASH_MASK, HASH_NEXT_LINE,
LOOKUP_DEFAULT_STRING)
def hash_directory(path, output_file_name):
"""
Runs the line hasher on every file in a directory tree
:param path: Root of the tree to hash
:type path: str
"""
lookup_dict = {}
for walk in os.walk(path, followlinks=True):
# First and third item, respectively
root, file_names = walk[0::2]
for file_name in file_names:
lookup_dict.update(hash_file("{}/{}".format(root, file_name)))
# Read in hash_lookup
# Write lines out
with open(output_file_name, 'w') as fp:
json.dump(lookup_dict, fp)
def hash_file(file_name):
"""
Attempt to hash each line of a file
:param file_name: Name of file to hash
:type file_name: str
:returns: A hash lookup dictionary
"""
# Read in lines
with open(file_name, 'r') as fp:
lines = fp.readlines()
hashed_lines = []
lookup_dict = {}
force_hash = False
# Hash appropriate lines with line number, and file name
for index, line in enumerate(lines):
hashed_line, line_dict = hash_line(line, file_name, index + 1, force_hash)
force_hash = False
if HASH_NEXT_LINE in hashed_line:
force_hash = True
hashed_lines.append(hashed_line)
lookup_dict.update(line_dict)
# Write lines out
with open(file_name, 'w') as fp:
fp.writelines(hashed_lines)
return lookup_dict
def hash_line(line, file_name, line_num, force_hash=False):
"""
Search line for hashable strings, and hash them.
:param line: Line to search
:type line: str
:param file_name: Name of the file that the line is in
:type file_name: str
:param line_num: Line number of the line
:type line_num: int
:returns: A tuple with: The input line (with all hashable strings hashed),
and a hash lookup dictionary
"""
hash_dict = {}
# Only match lines that contain one of the following substrings
if force_hash or any(x in line for x in LINES_TO_HASH):
if force_hash or not any(x in line for x in ["PBL_CROAK_OOM"]):
match = STR_LITERAL_PATTERN.search(line)
if match:
# Strip all double quotes from the string
str_literal = re.sub("\"", "", match.group(2))
str_literal = inttype_conversion(str_literal)
# Hash the file name and line number in as well
line_to_hash = "{}:{}:{}".format(os.path.basename(file_name), line_num, str_literal)
hashed_msg = hash_string(line_to_hash)
hash_dict[hashed_msg] = line_to_hash
line = "{}{}{}\n".format(match.group(1), hashed_msg, match.group(3))
return (line, hash_dict)
def hash_string(string):
"""
Hash and return a given string.
:param string: String to hash
:type string: str
:returns: The input string, hashed
"""
return hex(hash(string) & HASH_MASK)
def inttype_conversion(inttype):
"""
Change PRI specifiers into classical printf format specifiers
:param inttype: PRI specifier to convert
:type inttype: str
:returns: The classical printf format specifier that inttype represents
"""
# Change ' PRIu32 ' to '32u'
output = re.sub(r"\s*PRI([diouxX])(8|16|32|64|PTR)\s*", r"\g<2>\g<1>", inttype)
# No length modifier for 8 or 16 modifier
output = re.sub("(8|16)", "", output)
# 'l' modifier for 32 or PTR modifier
output = re.sub("(32|PTR)", "l", output)
# 'll' modifier for 64 modifier
output = re.sub("64", "ll", output)
# Change BT_MAC_FMT and BT_ADDR_FMT
output = re.sub("BT_MAC_FMT", "%02X:%02X:%02X:%02X:%02X:%02X", output)
output = re.sub("BT_ADDR_FMT", "%02X:%02X:%02X:%02X:%02X:%02X", output)
output = re.sub("BT_DEVICE_ADDRESS_FMT", "%02X:%02X:%02X:%02X:%02X:%02X", output)
return output
def string_formats(string):
"""
Parses a string for all format identifiers
:param string: String to parse
:type string: str
:returns: A list of all format specifiers
"""
return FORMAT_SPECIFIER_PATTERN.findall(string)
def create_lookup_function(lookup_dict, output_file_name):
"""
Create a C source file for hash to format specifiers lookup
:param lookup_dict: Hash to string lookup dictionary
:type lookup_dict: dict
"""
strings = []
lines = [LOOKUP_DEFAULT_STRING]
format_lookup = {}
index = 1
format_map = [[x, string_formats(lookup_dict[x])] for x in lookup_dict.keys()]
for line, formats in format_map:
# Only make an entry if there's a format string!
if formats:
format_as_string = ''.join(formats)
if format_as_string not in format_lookup:
format_lookup[format_as_string] = index
strings.append(FORMAT_IDENTIFIER_STRING_FMT.format(index, format_as_string))
index = index + 1
lines.append(LOOKUP_RESULT_STRING_FMT.format(line, format_lookup[format_as_string]))
with open(output_file_name, 'w') as fp:
fp.writelines(strings)
fp.writelines(lines)

View file

@ -0,0 +1,303 @@
#!/usr/bin/env python
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -*- coding: utf8 -*-
"""
Module for dehashing NewLog input
"""
import os
import re
import string
import struct
from pebble.loghashing.constants import (NEWLOG_LINE_CONSOLE_PATTERN,
NEWLOG_LINE_SUPPORT_PATTERN,
NEWLOG_HASHED_INFO_PATTERN,
POINTER_FORMAT_TAG_PATTERN,
HEX_FORMAT_SPECIFIER_PATTERN)
hex_digits = set(string.hexdigits)
LOG_DICT_KEY_VERSION = 'new_logging_version'
NEW_LOGGING_VERSION = 'NL0101'
LOG_LEVEL_ALWAYS = 0
LOG_LEVEL_ERROR = 1
LOG_LEVEL_WARNING = 50
LOG_LEVEL_INFO = 100
LOG_LEVEL_DEBUG = 200
LOG_LEVEL_DEBUG_VERBOSE = 255
level_strings_map = {
LOG_LEVEL_ALWAYS: '*',
LOG_LEVEL_ERROR: 'E',
LOG_LEVEL_WARNING: 'W',
LOG_LEVEL_INFO: 'I',
LOG_LEVEL_DEBUG: 'D',
LOG_LEVEL_DEBUG_VERBOSE: 'V'
}
# Location of the core number in the message hash
PACKED_CORE_OFFSET = 30
PACKED_CORE_MASK = 0x03
def dehash_file(file_name, log_dict):
"""
Dehash a file
:param line: The line to dehash
:type line: str
:param log_dict: dict of dicts created from .log_strings section from tintin_fw.elf
:type log_dict: dict of dicts
:returns: A list containing the dehashed lines
"""
# Grab the lines from the file
with open(file_name, 'r') as fp:
lines = fp.readlines()
# Dehash the lines
lines = [dehash_line(x, log_dict) + "\n" for x in lines]
return lines
def dehash_line(line, log_dict):
"""
Dehash a line. Return with old formatting.
:param line: The line to dehash
:type line: str
:param log_dict: dict of dicts created from .log_strings section from tintin_fw.elf
:type log_dict: dict of dicts
:returns: Formatted line
On error, the provided line
"""
line_dict = dehash_line_unformatted(line, log_dict)
if not line_dict:
return line
output = []
if 'date' not in line_dict and 're_level' in line_dict:
output.append(line_dict['re_level'])
if 'task' in line_dict:
output.append(line_dict['task'])
if 'date' in line_dict:
output.append(line_dict['date'])
if 'time' in line_dict:
output.append(line_dict['time'])
if 'file' in line_dict and 'line' in line_dict:
filename = os.path.basename(line_dict['file'])
output.append('{}:{}>'.format(filename, line_dict['line']))
output.append(line_dict['formatted_msg'])
return " ".join(output)
def dehash_line_unformatted(line, log_dict):
"""
Dehash a line. Return an unformatted dict of the info.
:param line: The line to dehash
:type line: str
:param log_dict: dict of dicts created from .log_strings section from tintin_fw.elf
:type log_dict: dict of dicts
:returns: A line_dict with keys 'formatted_msg', 'level', 'task', 'date', 'time', added.
On error, 'formatted_output' = <input line>
"""
line_dict = parse_line(line, log_dict)
if not line_dict:
return { 'formatted_msg': line }
return line_dict
def parse_line(line, log_dict):
"""
Parse a log line
:param line: The line to dehash
:type line: str
:param log_dict: dict of dicts created from .log_strings section from tintin_fw.elf
:type log_dict: dict of dicts
:returns: A line_dict with keys 'formatted_msg', 'level', 'task', 'date', 'time',
'core_number' added.
On error, None
"""
if not log_dict:
return None
# Handle BLE logs. They have no date, time, level in the input string
ble_line = line.startswith(':0> NL:')
match = None
if not ble_line:
match = NEWLOG_LINE_CONSOLE_PATTERN.search(line)
if not match:
match = NEWLOG_LINE_SUPPORT_PATTERN.search(line)
if not match:
return None
# Search for the 'msg' in the entire log dictionary, getting back the sub-dictionary for this
# specific message
if ble_line:
line_dict = parse_message(line, log_dict)
else:
line_dict = parse_message(match.group('msg'), log_dict)
if line_dict:
if ble_line:
line_dict['task'] = '-'
else:
# Add all of the match groups (.e.g, date, time, level) to the line dict
line_dict.update(match.groupdict())
# Fixup 'level' which came from the msg string (re_level) with the ascii char
if 'level' in line_dict:
line_dict['re_level'] = level_strings_map.get(int(line_dict['level']), '?')
return line_dict
def parse_message(msg, log_dict):
"""
Parse the log message part of a line
:param msg: The message to parse
:type msg: str
:param log_dict: dict of dicts created from .log_strings section from tintin_fw.elf
:type log_dict: dict of dicts
:returns: the dict entry for the log line and the formatted message
"""
match = NEWLOG_HASHED_INFO_PATTERN.search(msg)
if not match:
return None
try:
line_number = int(match.group('hash_key'), 16)
output_dict = log_dict[str(line_number)].copy() # Must be a copy!
except KeyError:
# Hash key not found. Wrong .elf?
return None
# Python's 'printf' doesn't support %p. Sigh. Convert to %x and hope for the best
safe_output_msg = POINTER_FORMAT_TAG_PATTERN.sub('\g<format>x', output_dict['msg'])
# Python's 'printf' doesn't handle (negative) 32-bit hex values correct. Build a new
# arg list from the parsed arg list by searching for %<format>X conversions and masking
# them to 32 bits.
arg_list = []
index = 0
for arg in parse_args(match.group('arg_list')):
index = safe_output_msg.find('%', index)
if index == -1:
# This is going to cause an error below...
arg_list.append(arg)
elif HEX_FORMAT_SPECIFIER_PATTERN.match(safe_output_msg, index):
# We found a %<format>X
arg_list.append(arg & 0xFFFFFFFF)
else:
arg_list.append(arg)
# Use "printf" to generate the reconstructed string. Make sure the arguments are correct
try:
output_msg = safe_output_msg % tuple(arg_list)
except (TypeError, UnicodeDecodeError) as e:
output_msg = msg + ' ----> ERROR: ' + str(e)
# Add the formatted msg to the copy of our line dict
output_dict['formatted_msg'] = output_msg
# Add the core number to the line dict
output_dict['core_number'] = str((line_number >> PACKED_CORE_OFFSET) & PACKED_CORE_MASK)
return output_dict
def parse_args(raw_args):
"""
Split the argument list, taking care of `delimited strings`
Idea taken from http://bit.ly/1KHzc0y
:param raw_args: Raw argument list. Values are either in hex or in `strings`
:type raw_args: str
:returns: A list containing the arguments
"""
args = []
arg_run = []
in_str = False
if raw_args:
for arg_ch in raw_args:
if arg_ch not in "` ":
arg_run.append(arg_ch)
continue
if in_str:
if arg_ch == ' ':
arg_run.append(' ')
else: # Must be ending `
args.append("".join(arg_run).strip())
in_str = False
arg_run = []
continue
# Start of a string
if arg_ch == '`':
in_str = True
continue
# Must be a space boundary (arg_ch == ' ')
arg = "".join(arg_run).strip()
if not len(arg):
continue
if not all(c in hex_digits for c in arg_run):
# Hack to prevent hex conversion failure
args.append(arg)
else:
# Every parameter is a 32-bit signed integer printed as a hex string with no
# leading zeros. Add the zero padding if necessary, convert to 4 hex bytes,
# and then reinterpret as a 32-bit signed big-endian integer.
args.append(struct.unpack('>i', arg.rjust(8, '0').decode('hex'))[0])
arg_run = []
# Clean up if anything is remaining (there is no trailing space)
arg = "".join(arg_run).strip()
if len(arg):
# Handle the case where the trailing ` is missing.
if not all(c in hex_digits for c in arg):
args.append(arg)
else:
# Every parameter is a 32-bit signed integer printed as a hex string with no
# leading zeros. Add the zero padding if necessary, convert to 4 hex bytes,
# and then reinterpret as a 32-bit signed big-endian integer.
args.append(struct.unpack('>i', arg.rjust(8, '0').decode('hex'))[0])
return args

View file

@ -0,0 +1,33 @@
#!/usr/bin/env python
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Setup.py for distutils
"""
from setuptools import setup, find_packages
setup(
name='pebble.loghash',
version='2.6.0',
description='Pebble Log Hashing module',
author='Pebble Technology Corp',
author_email='francois@pebble.com',
url='https://github.com/pebble/pyegg-pebble-loghash',
namespace_packages = ['pebble'],
packages = find_packages()
)

View file

@ -0,0 +1,115 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#/usr/bin/env python
"""
Tests for pebble.loghashing.dehashing
"""
LOOKUP_DICT = {"13108": "activity.c:activity tracking started",
"45803": "ispp.c:Start Authentication Process (%d) %s"}
from pebble.loghashing.dehashing import (dehash_line, parse_line, parse_support_line, parse_message,
dehash_str, parse_args)
def test_dehash_file():
"""
Test for dehash_file()
"""
pass
def test_dehash_line():
"""
Test for dehash_line()
"""
# Console Line - No arguments
assert ("D A 21:35:14.375 activity.c:804> activity tracking started" ==
dehash_line("D A 21:35:14.375 :804> LH:3334", LOOKUP_DICT))
# Console Line - Arguments
assert ("D A 21:35:14.375 ispp.c:872> Start Authentication Process (2) Success" ==
dehash_line("D A 21:35:14.375 :872> LH:b2eb 2 `Success`", LOOKUP_DICT))
# Support Line - No arguments
assert ("2015-09-05 02:16:16:000GMT activity.c:804> activity tracking started" ==
dehash_line("2015-09-05 02:16:16:000GMT :804 LH:3334", LOOKUP_DICT))
# Support Line - Arguments
assert ("2015-09-05 02:16:19:000GMT ispp.c:872> Start Authentication Process (2) Success" ==
dehash_line("2015-09-05 02:16:19:000GMT :872 LH:b2eb 2 `Success`", LOOKUP_DICT))
def test_parse_line():
"""
Test for parse_line()
"""
# No arguments
assert ("D A 21:35:14.375 activity.c:804> activity tracking started" ==
parse_line("D A 21:35:14.375 :804> LH:3334", LOOKUP_DICT))
# Arguments
assert ("D A 21:35:14.375 ispp.c:872> Start Authentication Process (2) Success" ==
parse_line("D A 21:35:14.375 :872> LH:b2eb 2 `Success`", LOOKUP_DICT))
def test_parse_support_line():
"""
Test for parse_support_line()
"""
# No arguments
assert ("2015-09-05 02:16:16:000GMT activity.c:804> activity tracking started" ==
parse_support_line("2015-09-05 02:16:16:000GMT :804 LH:3334", LOOKUP_DICT))
# Arguments
assert ("2015-09-05 02:16:19:000GMT ispp.c:872> Start Authentication Process (2) Success" ==
parse_support_line("2015-09-05 02:16:19:000GMT :872 LH:b2eb 2 `Success`", LOOKUP_DICT))
def test_parse_message():
"""
Test for parse_message()
"""
# Console Line - No arguments
assert ({'msg': 'activity tracking started', 'line': '804', 'file': 'activity.c'} ==
parse_message(":804> LH:3334", LOOKUP_DICT))
# Console Line - Arguments
assert ({'msg': 'Start Authentication Process (2) Success', 'line': '872', 'file': 'ispp.c'} ==
parse_message(":872> LH:b2eb 2 `Success`", LOOKUP_DICT))
# Support Line - No arguments
assert ({'msg': 'activity tracking started', 'line': '804', 'file': 'activity.c'} ==
parse_message(":804 LH:3334", LOOKUP_DICT))
# Support Line - Arguments
assert ({'msg': 'Start Authentication Process (2) Success', 'line': '872', 'file': 'ispp.c'} ==
parse_message(":872 LH:b2eb 2 `Success`", LOOKUP_DICT))
def test_dehash_str():
"""
Test for dehash_str()
"""
# No arguments
assert ("activity.c:activity tracking started" ==
dehash_str("3334", LOOKUP_DICT))
# Arguments
assert ("ispp.c:Start Authentication Process (%d) %s" ==
dehash_str("b2eb", LOOKUP_DICT))
def test_parse_args():
"""
Test for parse_args()
"""
# No `` delimted strings
assert ["foo", "bar", "baz"] == parse_args("foo bar baz")
# `` delimited strings
assert ["foo", "bar baz"] == parse_args("foo `bar baz`")

View file

@ -0,0 +1,226 @@
#! /usr/bin/env python
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -*- coding: utf8 -*-
"""
Tests for pebble.loghashing.newlogging
"""
from pebble.loghashing.newlogging import dehash_line, dehash_line_unformatted
from pebble.loghashing.dehashing import dehash_line as legacy_dehash_line
import os
test_log_dict = {'43': {'file': '../src/fw/activity/activity.c',
'line': '804',
'level': '200',
'color': 'YELLOW',
'msg': 'activity tracking started'},
'114': {'file': '../src/fw/driver/ispp.c',
'line': '1872',
'level': '0',
'color': 'RED',
'msg': 'Start Authentication Process %d (%x) %s'},
'214': {'file': 'pointer_print.c',
'line': '1872',
'level': '0',
'color': 'RED',
'msg': 'My address is %p %p'},
'64856': {'color': 'GREY',
'file': '../src/fw/services/common/clock.c',
'level': '200',
'line': '768',
'msg': 'Changed timezone to id %u, gmtoff is %ld'},
'100000': {'color': 'GREY',
'file': '../src/fw/services/common/string.c',
'level': '200',
'line': '111',
'msg': 'string 1 %s, string 2 %s'},
'11082': {'color': 'GREY',
'file': '../src/fw/resource/resource_storage.c',
'level': '50',
'line': '120',
'msg': '0x%lx != 0x%lx'},
'1073741824': {'color': 'GREY',
'file': 'hc_protocol.c',
'level': '0',
'line': '69',
'msg': 'Init BLE SPI Protocol'},
'new_logging_version': 'NL0101'
}
def test_dehash_line():
"""
Test for dehash_line()
"""
# Console Line - No arguments
line = "? A 21:35:14.375 :0> NL:{:x}".format(43)
assert ("D A 21:35:14.375 activity.c:804> activity tracking started" ==
dehash_line(line, test_log_dict))
# Console Line - Arguments
line = "? A 21:35:14.375 :0> NL:{:x} a a `Success`".format(114)
assert ("* A 21:35:14.375 ispp.c:1872> Start Authentication Process 10 (a) Success" ==
dehash_line(line, test_log_dict))
# Support Line - No arguments
line = "2015-09-05 02:16:16:000GMT :0> NL:{:x}".format(43)
assert ("2015-09-05 02:16:16:000GMT activity.c:804> activity tracking started" ==
dehash_line(line, test_log_dict))
# Support Line - Arguments
line = "2015-09-05 02:16:19:000GMT :0> NL:{:x} 10 10 `Success`".format(114)
assert ("2015-09-05 02:16:19:000GMT ispp.c:1872> Start Authentication Process 16 (10) Success" ==
dehash_line(line, test_log_dict))
# App Log
line = "D A 21:35:14.375 file.c:0> This is an app debug line"
assert (line == dehash_line(line, test_log_dict))
# Pointer format conversion
line = "2015-09-05 02:16:19:000GMT :0> NL:{:x} 164 1FfF".format(214)
assert ("2015-09-05 02:16:19:000GMT pointer_print.c:1872> My address is 164 1fff" ==
dehash_line(line, test_log_dict))
# Two's compliment negative value
line = "2015-09-05 02:16:19:000GMT :0> NL:{:x} 10 ffff8170".format(64856)
assert ("2015-09-05 02:16:19:000GMT clock.c:768> Changed timezone to id 16, gmtoff is -32400" ==
dehash_line(line, test_log_dict))
# Two's compliment negative value
line = "2015-09-05 02:16:19:000GMT :0> NL:{:x} 9AEBC155 43073997".format(11082)
assert ("2015-09-05 02:16:19:000GMT resource_storage.c:120> 0x9aebc155 != 0x43073997" ==
dehash_line(line, test_log_dict))
# Empty string parameter - 1
line = "? A 21:35:14.375 :0> NL:{:x} `` `string`".format(100000)
assert ("D A 21:35:14.375 string.c:111> string 1 , string 2 string" ==
dehash_line(line, test_log_dict))
# Empty string parameter - 2 - trailing space
line = "? A 21:35:14.375 :0> NL:{:x} `string` `` ".format(100000)
assert ("D A 21:35:14.375 string.c:111> string 1 string, string 2 " ==
dehash_line(line, test_log_dict))
# Empty string parameter - 2 - no trailing space
line = "? A 21:35:14.375 :0> NL:{:x} `string` ``".format(100000)
assert ("D A 21:35:14.375 string.c:111> string 1 string, string 2 " ==
dehash_line(line, test_log_dict))
# Missing closing `
line = "? A 21:35:14.375 :0> NL:{:x} `string` `string".format(100000)
assert ("D A 21:35:14.375 string.c:111> string 1 string, string 2 string" ==
dehash_line(line, test_log_dict))
def test_dehash_invalid_parameters():
"""
Tests for invalid number of parameters
"""
# Not enough parameters
line = "2015-09-05 02:16:19:000GMT :0> NL:{:x} 164".format(214)
assert ("2015-09-05 02:16:19:000GMT pointer_print.c:1872> :0> NL:d6 164 " \
"----> ERROR: not enough arguments for format string" ==
dehash_line(line, test_log_dict))
# Too many parameters
line = "2015-09-05 02:16:19:000GMT :0> NL:{:x} 164 1FfF 17".format(214)
assert ("2015-09-05 02:16:19:000GMT pointer_print.c:1872> :0> NL:d6 164 1FfF 17 " \
"----> ERROR: not all arguments converted during string formatting" ==
dehash_line(line, test_log_dict))
# Unterminated string (last `)
line = "2015-09-05 02:16:19:000GMT :0> NL:{:x} 10 10 `Success".format(114)
assert ("2015-09-05 02:16:19:000GMT ispp.c:1872> Start Authentication Process 16 (10) Success" ==
dehash_line(line, test_log_dict))
# Unterminated string (first `)
line = "2015-09-05 02:16:19:000GMT :0> NL:{:x} 10 10 Success`".format(114)
assert ("2015-09-05 02:16:19:000GMT ispp.c:1872> Start Authentication Process 16 (10) Success" ==
dehash_line(line, test_log_dict))
# Unterminated string (No `s)
line = "2015-09-05 02:16:19:000GMT :0> NL:{:x} 10 10 Success".format(114)
assert ("2015-09-05 02:16:19:000GMT ispp.c:1872> Start Authentication Process 16 (10) Success" ==
dehash_line(line, test_log_dict))
# Invalid hex character
line = "2015-09-05 02:16:19:000GMT :0> NL:{:x} 10 1q0 Success".format(114)
assert ("2015-09-05 02:16:19:000GMT ispp.c:1872> :0> NL:72 10 1q0 Success " \
"----> ERROR: %x format: a number is required, not str" ==
dehash_line(line, test_log_dict))
# Unicode
line = "? A 21:35:14.375 :0> NL:{:x} `unicode` `Pebble β`".format(100000)
assert ("D A 21:35:14.375 string.c:111> string 1 unicode, string 2 Pebble β" ==
dehash_line(line, test_log_dict))
def test_legacy_dehash_line():
"""
Test legacy dehash_line()
"""
# Console Line - No arguments
line = "? A 21:35:14.375 :0> NL:{:x}".format(43)
assert ("D A 21:35:14.375 activity.c:804> activity tracking started" ==
legacy_dehash_line(line, test_log_dict))
def test_unformatted():
"""
Test dehash_line_unformatted()
"""
line = "? A 21:35:14.375 :0> NL:{:x} a a `Success`".format(114)
line_dict = dehash_line_unformatted(line, test_log_dict)
assert (line_dict['level'] == "0")
assert (line_dict['task'] == "A")
assert (line_dict['time'] == "21:35:14.375")
assert (os.path.basename(line_dict['file']) == "ispp.c")
assert (line_dict['line'] == "1872")
assert (line_dict['formatted_msg'] == "Start Authentication Process 10 (a) Success")
def test_core_number():
"""
Test core number decoding
"""
# Core number 0
line = "? A 21:35:14.375 :0> NL:{:x} a a `Success`".format(114)
line_dict = dehash_line_unformatted(line, test_log_dict)
assert (line_dict['core_number'] == "0")
# Core number 1
line = "? A 21:35:14.375 :0> NL:{:x}".format(1073741824)
line_dict = dehash_line_unformatted(line, test_log_dict)
assert (line_dict['core_number'] == "1")
def test_ble_decode():
"""
Test BLE decode.
timedate.now() is used, so ignore the date/time
"""
line = ":0> NL:{:x}".format(1073741824)
line_dict = dehash_line_unformatted(line, test_log_dict)
assert (line_dict['level'] == "0")
assert (line_dict['task'] == "-")
assert (os.path.basename(line_dict['file']) == "hc_protocol.c")
assert (line_dict['line'] == "69")
assert (line_dict['formatted_msg'] == "Init BLE SPI Protocol")