¡Caminando hacia el éxito!

Aprende en Comunidad

Avalados por :

Solución para normalizar un JSON anidado en Python utilizando pandas y json_normalize

  • Creado 01/03/2024
  • Modificado 01/03/2024
  • 150 Vistas
0
Cargando...

Hello Team,

Para el JSON anidado a continuación, el array no se está normalizando correctamente utilizando

import pandas as pd
import json

def on_input(data):

df = pd.read_json(data)df = pd.json_normalize(json.loads(data))

api.send("output", df.to_json(orient="records"))

api.set_port_callback("input1", on_input)

También intenté con max_level y luego record_path pero sin suerte.

Flujo : Productor de Kafka (cadena JSON) -> decodificador avro -> Python3 -> Cliente Hana

¿Hay alguna forma de normalizar el JSON para verificar la condición IF Then else y luego actualizar el sufijo para el campo de columna y, finalmente, actualizarlo en la base de datos sin duplicados? Aquí, header.poNumber y data.id son claves primarias/identificadores únicos.

Ejemplo:

 [
   {
      "header.poNumber":"9023496",
      "data.id":"10013459",
      "message.source":[
         {
            "createSource":null,
            "timeStamp":"2023-05-12T19:30:00.0000000+02:00",
            "type":"full"
         },
         {
            "createSource":"testdev",
            "timeStamp":"2023-05-11T19:30:00.0000000+02:00",
            "type":"ordersEstimated"
         },
         {
            "createSource": "event",
            "timeStamp":"2023-05-12T12:30:00.0000000+01:00",
            "type":"ordersCreated"
         }
      ],
      "message.time":[
         {
            "timeSource":"UTC",
            "typeId":"full"
         },
         {
            "timeSource":"IST",
            "typeId":"actual"
         }
      ]
   }
]

Resultado esperado:

image.png
Pedro Pascal
Se unió el 07/03/2018
Pinterest
Telegram
Linkedin
Whatsapp

4 Respuestas

0
Cargando...

vitaliy.rudnytskiy

A continuación se muestra los datos enviados al operador de Python 3 como cadena JSON. Esto necesita ser filtrado y luego actualizado en la tabla de la base de datos sin columnas duplicadas.

poNumber e id son claves primarias/identificadores únicos en cada fila. ¡Gracias!

{
   "header":{
      "poNumber":"9023496"
   },
   "data":{
      "id":"10013459"
   },
   "message":{
      "source":[
         {
            "createSource":null,
            "timeStamp":"2023-05-12T19:30:00.0000000+02:00",
            "type":"full"
         },
         {
            "createSource":"testdev",
            "timeStamp":"2023-05-11T19:30:00.0000000+02:00",
            "type":"ordersEstimated"
         },
         {
            "createSource":"event",
            "timeStamp":"2023-05-12T12:30:00.0000000+01:00",
            "type":"ordersCreated"
         }
      ],
      "time":[
         {
            "timeSource":"UTC",
            "typeId":"full"
         },
         {
            "timeSource":"IST",
            "typeId":"actual"
         }
      ]
   }
}
Respondido el 15/04/2024
LUCIANO RIOJA GHIOTTO
Se unió el 13/07/2019
0
Cargando...

Hola Rajesh,

¿Tienes un valor exacto de los `datos` que se envían a la entrada de `on_input(data)`?

Podría ser diferente al que compartiste en el ejemplo.

Saludos,
-Witalij

Respondido el 15/04/2024
LUCIANO RIOJA GHIOTTO
Se unió el 13/07/2019
0
Cargando...
import pandas as pd
import json

def on_input(data):
    data_as_json = json.loads(data)
    data_as_json_filtered=list()
    for record in data_as_json:
        record_filtered = dict()
        for key in list(record.keys()):
            if key=='message.time':
                record_filtered.update({key: item for item in [time for time in record[key]] 
                        if item['typeId']=='actual'})
            elif key=='message.source':
                record_filtered.update({key: item for item in [source for source in record[key]] 
                          if item['type']=='actual'})
            else:
                record_filtered.update({key: record[key]})
        data_as_json_filtered.append(record_filtered)
    df_source=pd.json_normalize(data_as_json_filtered)
    api.send("output", df_source.to_json(orient="records"))
api.set_port_callback("input1", on_input)

vitaliy.rudnytskiy

It is failing at for key in list(record.keys()) with below error


Group messages:

Group: default; Messages: Graph failure: operator.com.sap.system.python3Operator:python3operator1: Error while executing callback registered on port(s) ['input1']: 'str' object has no attribute 'keys' [line 10]

Process(es) terminated with error(s). restartOnFailure==false

error123.png

input is json string and outpus is basic string in Python3 operator

Please require your valuable inputs and support. Thank you vitaliy.rudnytskiy

Respondido el 15/04/2024
LUCIANO RIOJA GHIOTTO
Se unió el 13/07/2019
0
Cargando...

Por favor, verifica si esto se ve bien vitaliy.rudnytskiy

import pandas as pd
import json

def on_input(data):
  data_as_json=json.loads(data)

  data_as_json_filtered=list()
  for record in data_as_json:
     record_filtered=dict()
     for key in list(record.keys()):
         if key=='message.time':
             record_filtered.update({key: item for item in [time for time in record[key]] 
                      if item['typeId']=='actual'})
         elif key=='message.source':
             record_filtered.update({key: item for item in [source for source in record[key]] 
                      if item['type']=='actual'})
         else:
             record_filtered.update({key: record[key]})
     data_as_json_filtered.append(record_filtered)
  display(data_as_json_filtered)

  df_source=pd.json_normalize(data_as_json_filtered)

  api.send("output", df_source.to_json(orient="records"))

api.set_port_callback("input1", on_input)

¡Muchas gracias!

Respondido el 15/04/2024
LUCIANO RIOJA GHIOTTO
Se unió el 13/07/2019

contacto@primeinstitute.com

(+51) 1641 9379
(+57) 1489 6964

© 2024 Copyright. Todos los derechos reservados.

Desarrollado por Prime Institute

¡Hola! Soy Diana, asesora académica de Prime Institute, indícame en que curso estas interesado, saludos!
Hola ¿Puedo ayudarte?