运动的流水氧变革的过程中发生错误JSON化的响应

0

的问题

我用以下代码(Lambda的功能)进行数据变换,使用的动数据流水. 我下错误。

代码:

#This function is created to Transform the data from Kinesis Data Firehose -> S3 Bucket
#It converts single line json to multi line json as expected by AWS Athena best practice.
#It also removes special characters from json keys (column name in Athena) as Athena expects column names without special characters

import json
import boto3
import base64
import string
from typing import Optional, Iterable, Union

delete_dict = {sp_character: '' for sp_character in string.punctuation}
PUNCT_TABLE = str.maketrans(delete_dict)
output = []

def lambda_handler(event, context):
    
    for record in event['records']:
        payload = base64.b64decode(record['data']).decode('utf-8')
        
        remove_special_char = json.loads(payload, object_pairs_hook=clean_keys)
        row_w_newline = str(remove_special_char) + "\n"
        row_w_newline = base64.b64encode(row_w_newline.encode('utf-8'))
        
        
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': row_w_newline
        }
        output.append(output_record)

    print('Processed {} records.'.format(len(event['records'])))
    
    return {'records': output}
    
def strip_punctuation(s: str,
                      exclude_chars: Optional[Union[str, Iterable]] = None) -> str:
    """
    Remove punctuation and spaces from a string.

    If `exclude_chars` is passed, certain characters will not be removed
    from the string.

    """
    punct_table = PUNCT_TABLE.copy()
    if exclude_chars:
        for char in exclude_chars:
            punct_table.pop(ord(char), None)

    # Next, remove the desired punctuation from the string
    return s.translate(punct_table)

def clean_keys(o):
    return {strip_punctuation(k): v for k, v in o}

错误:

An error occurred during JSON serialization of response: b'eyd2ZXJzaW9uJzogJzAnLCAnaWQnOiAnNjFhMGI4YjQtOGRhYS0xNGMwLTllOTMtNzhhNjk0MTY0MDgxJywgJ2RldGFpbHR5cGUnOiAnQVdTIEFQSSBDYWxsIHZpYSBDbG91ZFRyYWlsJywgJ3NvdXJjZSc6ICdhd3Muc2VjdXJpdHlodWInLCAnYWNjb3VudCc6ICc5MzQ3NTU5ODkxNzYnLCAndGltZSc6ICcyMDIxLTExLTIzVDE1OjQxOjQ3WicsICdyZWdpb24nOiAndXMtZWFzdC0xJywgJ3Jlc291cmNlcyc6IFtdLCAnZGV0YWlsJzogeydldmVudFZlcnNpb24nOiAnMS4wOCcsICd1c2VySWRlbnRpdH'
     is not JSON serializable
    Traceback (most recent call last):
      File "/var/lang/lib/python3.6/json/__init__.py", line 238, in dumps
        **kw).encode(obj)
      File "/var/lang/lib/python3.6/json/encoder.py", line 199, in encode
        chunks = self.iterencode(o, _one_shot=True)
      File "/var/lang/lib/python3.6/json/encoder.py", line 257, in iterencode
        return _iterencode(o, 0)
      File "/var/runtime/bootstrap.py", line 135, in decimal_serializer
        raise TypeError(repr(o) + " is not JSON serializable")

活动:

{'recordId': '49623720050963652954313901532126731765249603147428528130000000', 'approximateArrivalTimestamp': 1637711607661, 'data': 'eyJ2ZXJzaW9uIjoiMCIsImlkIjoiMzFkOGE3MmItYWUxNC02ZDYzLWRjODUtMTZmNWViMzk3ZTAyIiwiZGV0YWlsLXR5cGUiOiJBV1MgQVBJIENhbGwgdmlhIENsb3VkVHJhaWwiLCJzb3VyY2UiOiJhd3Muc2VjdXJpdHlodWIiLCJhY2NvdW50IjoiMjIwMzA3MjAyMzYyIiwidGltZSI6IjIwMjEtMTEtMjNUMjM6NTM6MTdaIiwicmVnaW9uIjoidXMtd2VzdC0yIiwicmVzb3VyY2VzIjpbXSwiZGV0YWlsIjp7ImV2ZW50VmVyc2lvbiI6IjEuMDgiLCJ1c2VySWRlbnRpdHkiOnsidHlwZSI6IlJvb3QiLCJwcmluY2lwYWxJZCI6IjIyMDMwNzIwMjM2MiIsImFybiI6ImFybjphd3M6aWFtOjoyMjAzMDcyMDIzNjI6cm9vdCIsImFjY291bnRJZCI6IjIyMDMwNzIwMjM2MiIsImFjY2Vzc0tleUlkIjoiQVNJQVRHUzJWRUU1TEQ2TUZFRlYiLCJzZXNzaW9uQ29udGV4dCI6eyJzZXNzaW9uSXNzdWVyIjp7fSwid2ViSWRGZWRlcmF0aW9uRGF0YSI6e30sImF0dHJpYnV0ZXMiOnsiY3JlYXRpb25EYXRlIjoiMjAyMS0xMS0yM1QxNToxMDo1N1oiLCJtZmFBdXRoZW50aWNhdGVkIjoiZmFsc2UifX19LCJldmVudFRpbWUiOiIyMDIxLTExLTIzVDIzOjUzOjE3WiIsImV2ZW50U291cmNlIjoic2VjdXJpdHlodWIuYW1hem9uYXdzLmNvbSIsImV2ZW50TmFtZSI6IkJhdGNoRGlzYWJsZVN0YW5kYXJkcyIsImF3c1JlZ2lvbiI6InVzLXdlc3QtMiIsInNvdXJjZUlQQWRkcmVzcyI6IjEwNC4xMjkuMTk4LjEwMSIsInVzZXJBZ2VudCI6ImF3cy1pbnRlcm5hbC8zIGF3cy1zZGstamF2YS8xLjEyLjExMiBMaW51eC81LjQuMTU2LTk0LjI3My5hbXpuMmludC54ODZfNjQgT3BlbkpES182NC1CaXRfU2VydmVyX1ZNLzI1LjMxMi1iMDcgamF2YS8xLjguMF8zMTIgdmVuZG9yL09yYWNsZV9Db3Jwb3JhdGlvbiBjZmcvcmV0cnktbW9kZS9zdGFuZGFyZCIsInJlcXVlc3RQYXJhbWV0ZXJzIjp7IlN0YW5kYXJkc1N1YnNjcmlwdGlvbkFybnMiOlsiYXJuOmF3czpzZWN1cml0eWh1Yjp1cy13ZXN0LTI6MjIwMzA3MjAyMzYyOnN1YnNjcmlwdGlvbi9hd3MtZm91bmRhdGlvbmFsLXNlY3VyaXR5LWJlc3QtcHJhY3RpY2VzL3YvMS4wLjAiXX0sInJlc3BvbnNlRWxlbWVudHMiOnsiU3RhbmRhcmRzU3Vic2NyaXB0aW9ucyI6W3siU3RhbmRhcmRzQXJuIjoiYXJuOmF3czpzZWN1cml0eWh1Yjp1cy13ZXN0LTI6OnN0YW5kYXJkcy9hd3MtZm91bmRhdGlvbmFsLXNlY3VyaXR5LWJlc3QtcHJhY3RpY2VzL3YvMS4wLjAiLCJTdGFuZGFyZHNJbnB1dCI6e30sIlN0YW5kYXJkc1N0YXR1cyI6IkRFTEVUSU5HIiwiU3RhbmRhcmRzU3Vic2NyaXB0aW9uQXJuIjoiYXJuOmF3czpzZWN1cml0eWh1Yjp1cy13ZXN0LTI6MjIwMzA3MjAyMzYyOnN1YnNjcmlwdGlvbi9hd3MtZm91bmRhdGlvbmFsLXNlY3VyaXR5LWJlc3QtcHJhY3RpY2VzL3YvMS4wLjAiLCJTdGFuZGFyZHNTdGF0dXNSZWFzb24iOnsiU3RhdHVzUmVhc29uQ29kZSI6Ik5PX0FWQUlMQUJMRV9DT05GSUdVUkFUSU9OX1JFQ09SREVSIn19XX0sInJlcXVlc3RJRCI6IjcyYzVjODYyLTJmOWEtNDBjYS05NDExLTY2YzIxMTcyNjIxMCIsImV2ZW50SUQiOiI3YWY4NjFiZS03YjExLTRmOTQtOWZlYS0yYTgyZjg5NDIxNWYiLCJyZWFkT25seSI6ZmFsc2UsImV2ZW50VHlwZSI6IkF3c0FwaUNhbGwiLCJtYW5hZ2VtZW50RXZlbnQiOnRydWUsInJlY2lwaWVudEFjY291bnRJZCI6IjIyMDMwNzIwMjM2MiIsImV2ZW50Q2F0ZWdvcnkiOiJNYW5hZ2VtZW50In19'}
1

最好的答案

0

这段代码我帮助与上述问题

def lambda_handler(event, context):
    
    for record in event['records']:
        payload = base64.b64decode(record['data']).decode('utf-8')
        remove_special_char = json.loads(payload, object_pairs_hook=clean_keys)
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(json.dumps(remove_special_char).encode('utf-8') + b'\n').decode('utf-8')
        }
        output.append(output_record)

    print('Processed {} records.'.format(len(event['records'])))
    return {'records': output}
2021-11-24 21:26:32

其他语言

此页面有其他语言版本

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................