¡Caminando hacia el éxito!

Aprende en Comunidad

Avalados por :

Solução para normalizar um JSON aninhado em Python utilizando pandas e json_normalize

  • Creado 01/03/2024
  • Modificado 01/03/2024
  • 29 Vistas
0
Cargando...

Olá Equipe,

Para o JSON aninhado abaixo, o array não está sendo normalizado corretamente usando

import pandas as pd
import json

def on_input(data):

df = pd.read_json(data)df = pd.json_normalize(json.loads(data))

api.send("output", df.to_json(orient="records"))

api.set_port_callback("input1", on_input)

Também tentei com max_level e depois record_path mas sem sucesso.

Fluxo : Produtor de Kafka (cadeia JSON) -> decodificador avro -> Python3 -> Cliente Hana

Existe alguma forma de normalizar o JSON para verificar a condição IF Then else e depois atualizar o sufixo para o campo de coluna e, finalmente, atualizá-lo no banco de dados sem duplicatas? Aqui, header.poNumber e data.id são chaves primárias/identificadores únicos.

Exemplo:

 [
   {
      "header.poNumber":"9023496",
      "data.id":"10013459",
      "message.source":[
         {
            "createSource":null,
            "timeStamp":"2023-05-12T19:30:00.0000000+02:00",
            "type":"full"
         },
         {
            "createSource":"testdev",
            "timeStamp":"2023-05-11T19:30:00.0000000+02:00",
            "type":"ordersEstimated"
         },
         {
            "createSource": "event",
            "timeStamp":"2023-05-12T12:30:00.0000000+01:00",
            "type":"ordersCreated"
         }
      ],
      "message.time":[
         {
            "timeSource":"UTC",
            "typeId":"full"
         },
         {
            "timeSource":"IST",
            "typeId":"actual"
         }
      ]
   }
]

Resultado esperado:

image.png
Pedro Pascal
Se unió el 07/03/2018
Pinterest
Telegram
Linkedin
Whatsapp

4 Respuestas

0
Cargando...

vitaliy.rudnytskiy

Aqui estão os dados enviados para o operador Python 3 como uma string JSON. Isso precisa ser filtrado e depois atualizado na tabela do banco de dados sem colunas duplicadas.

poNumber e id são chaves primárias/identificadores únicos em cada linha. Obrigado!

{
   "header":{
      "poNumber":"9023496"
   },
   "data":{
      "id":"10013459"
   },
   "message":{
      "source":[
         {
            "createSource":null,
            "timeStamp":"2023-05-12T19:30:00.0000000+02:00",
            "type":"full"
         },
         {
            "createSource":"testdev",
            "timeStamp":"2023-05-11T19:30:00.0000000+02:00",
            "type":"ordersEstimated"
         },
         {
            "createSource":"event",
            "timeStamp":"2023-05-12T12:30:00.0000000+01:00",
            "type":"ordersCreated"
         }
      ],
      "time":[
         {
            "timeSource":"UTC",
            "typeId":"full"
         },
         {
            "timeSource":"IST",
            "typeId":"actual"
         }
      ]
   }
}
Respondido el 15/04/2024
LUCIANO RIOJA GHIOTTO
Se unió el 13/07/2019
0
Cargando...

Olá Rajesh,

Você tem um valor exato dos `dados` que são enviados para a entrada de `on_input(data)`?

Poderia ser diferente do que você compartilhou no exemplo.

Saudações,
-Witalij

Respondido el 15/04/2024
LUCIANO RIOJA GHIOTTO
Se unió el 13/07/2019
0
Cargando...
import pandas as pd
import json

def on_input(data):
    data_as_json = json.loads(data)
    data_as_json_filtered=list()
    for record in data_as_json:
        record_filtered = dict()
        for key in list(record.keys()):
            if key=='message.time':
                record_filtered.update({key: item for item in [time for time in record[key]] 
                        if item['typeId']=='actual'})
            elif key=='message.source':
                record_filtered.update(: item for item in [source for source in record[key]] 
                          if item['type']=='actual'})
            else:
                record_filtered.update(: record[key]})
        data_as_json_filtered.append(record_filtered)
    df_source=pd.json_normalize(data_as_json_filtered)
    api.send("output", df_source.to_json(orient="records"))
api.set_port_callback("input1", on_input)

vitaliy.rudnytskiy

Está falhando em for key in list(record.keys()) com o erro abaixo


Mensagens em grupo:

Grupo: padrão; Mensagens: Falha no gráfico: operator.com.sap.system.python3Operator:python3operator1: Erro ao executar o callback registrado na(s) porta(s) ['input1']: 'str' object has no attribute 'keys' [linha 10]

Processo(s) terminado(s) com erro(s). restartOnFailure==false

error123.png

a entrada é uma string json e a saída é uma string básica no operador Python3

Por favor, solicite suas entradas e suporte valiosos. Obrigado vitaliy.rudnytskiy

Respondido el 15/04/2024
LUCIANO RIOJA GHIOTTO
Se unió el 13/07/2019
0
Cargando...

Por favor, verifique se isso está bom vitaliy.rudnytskiy

import pandas as pd
import json

def on_input(data):
  data_as_json=json.loads(data)

  data_as_json_filtered=list()
  for record in data_as_json:
     record_filtered=dict()
     for key in list(record.keys()):
         if key=='message.time':
             record_filtered.update({key: item for item in [time for time in record[key]] 
                      if item['typeId']=='actual'})
         elif key=='message.source':
             record_filtered.update({key: item for item in [source for source in record[key]] 
                      if item['type']=='actual'})
         else:
             record_filtered.update(: record[key]})
     data_as_json_filtered.append(record_filtered)
  display(data_as_json_filtered)

  df_source=pd.json_normalize(data_as_json_filtered)

  api.send("output", df_source.to_json(orient="records"))

api.set_port_callback("input1", on_input)

°Muito obrigado!

Respondido el 15/04/2024
LUCIANO RIOJA GHIOTTO
Se unió el 13/07/2019

contacto@primeinstitute.com

(+51) 1641 9379
(+57) 1489 6964

© 2024 Copyright. Todos los derechos reservados.

Desarrollado por Prime Institute

¡Hola! Soy Diana, asesora académica de Prime Institute, indícame en que curso estas interesado, saludos!
Hola ¿Puedo ayudarte?